用Python并行处理大文件,看这篇就够了!
为了进行并行处理,我们将任务划分为多个子单元。它增加了程序处理的工作数量,减少了整体处理时间。
例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式送入函数,它将根据可用的工作者的数量,一次并行处理多个值。这些工作器是基于你的处理器内的核心数量的。
注意:在一个较小的数据集上使用并行处理,不会提高处理时间。
在这篇博客中,我们将学习如何使用多处理、joblib和tqdm Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。
注意:我们使用的是Kaggle笔记本进行实验。处理时间可能因机器不同而不同。
开始处理
我们将使用Kaggle的美国事故(2016 - 2021)数据集(https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents),该数据集由280万条记录和47列组成。
我们将导入multiprocessing、joblib和tqdm用于并行处理,pandas用于数据摄取,re、nltk和string用于文本处理。
# Parallel Computing import multiprocessing as mp from joblib import Parallel, delayed from tqdm.notebook import tqdm # Data Ingestion import pandas as pd # Text Processing import re from nltk.corpus import stopwords import string
在我们直接进入之前,让我们通过加倍cpu_count()
来设置n_workers
。正如你所看到的,我们有8个工人。
n_workers = 2 * mp.cpu_count() print(f"{n_workers} workers are available") >>> 8 workers are available
在下一步,我们将使用pandas read_csv函数摄取大型CSV文件。然后,打印出数据框的形状、列的名称和处理时间。
注意:Jupyter的神奇函数%time
可以在处理结束后显示CPU times和wall time。
%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")
输出
Shape:(2845342, 47) Column Names: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s Wall time: 46.9 s
清理文本
clean_text
是一个用于处理和清理文本的简单函数。我们将使用nltk.copus
获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。这将是确定串行、并行和批处理的处理时间的基线函数。
def clean_text(text): # Remove stop words stops = stopwords.words("english") text = " ".join([word for word in text.split() if word not in stops]) # Remove Special Characters text = text.translate(str.maketrans('', '', string.punctuation)) # removing the extra spaces text = re.sub(' +',' ', text) return text
串行处理
对于串行处理,我们可以使用pandas的.apply()
函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()
函数。
我们将处理280万条记录,并将结果保存回 "描述 "列。
%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)
输出
高端处理器串行处理280万条记录用了9分5秒。
100% ?????????? 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s Wall time: 9min 5s
多处理
有多种方法可以对文件进行并行处理,我们将了解所有这些方法。multiprocessing
是一个内置的python包,通常用于并行处理大文件。
我们将创建一个有8个工作者的多处理池,并使用map函数来启动进程。为了显示进度条,我们将使用tqdm。
map函数由两部分组成。第一个部分需要函数,第二个部分需要一个参数或参数列表。
通过阅读文档了解更多。(https://docs.python.org/3/library/multiprocessing.html)
%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))
输出
我们的处理时间几乎提高了3倍。处理时间从9分5秒下降到3分51秒。
100% ?????????? 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s Wall time: 3min 51s
并行处理
我们现在将学习另一个Python包来执行并行处理。在本节中,我们将使用joblib的Parallel和delayed来复制map函数。
- Parallel需要两个参数:n_job = 8和backend = multiprocessing。
- 然后,我们将在delayed函数中加入clean_text。
- 创建一个循环,每次送入一个值。下面的过程是相当通用的,你可以根据你的需要修改你的函数和数组。我曾用它来处理成千上万的音频和视频文件,没有任何问题。
建议:使用 "try: "和 "except: "添加异常处理。
def text_parallel_clean(array): result = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (text) for text in tqdm(array) ) return result
在text_parallel_clean()
中添加 "描述 "列。
%%time df['Description'] = text_parallel_clean(df['Description'])
输出
我们的函数比多处理Pool多花了13秒。即使如此,并行处理也比串行处理快4分59秒。
100% ?????????? 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s Wall time: 4min 4s
并行批量处理
有一个更好的方法来处理大文件,就是把它们分成若干批,然后并行处理。让我们从创建一个批处理函数开始,该函数将在单个批次的值上运行clean_function
。
批量处理函数
def proc_batch(batch): return [ clean_text(text) for text in batch ]
将文件分割成批处理
下面的函数将根据工作者的数量把文件分成多个批次。在我们的例子中,我们得到8个批次。
def batch_file(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] for ix in tqdm(range(0, file_len, batch_size)) ] return batches batches = batch_file(df['Description'],n_workers) >>> 100% ?????????? 8/8 [00:00<00:00, 280.01it/s]
运行并行的批处理
最后,我们将使用Parallel和delayed来处理批次。
注意:为了得到一个单数组的值,我们必须运行列表理解,如下图所示。
%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) for batch in tqdm(batches) ) df['Description'] = [j for i in batch_output for j in i]
输出
我们已经改善了处理时间。这种技术在处理复杂数据和训练深度学习模型方面非常有名。
100% ?????????? 8/8 [00:00<00:00, 2.19it/s] CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s Wall time: 3min 56s
tqdm 并发
tqdm将多进程带到了一个新的水平。它简单而强大。我将向每个数据科学家推荐它。
查看文档,了解更多关于多处理的信息。(https://tqdm.github.io/)
process_map
需要:
- 函数名称
- 数据框架列
- max_workers
- chucksize与批量大小类似。我们将使用工人的数量来计算批处理的大小,或者你可以根据你的偏好来添加这个数字。
%%time from tqdm.contrib.concurrent import process_map batch = round(len(df)/n_workers) df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)
输出
通过一行代码,我们得到了最好的结果。
100% ?????????? 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s Wall time: 3min 51s
结论
你需要找到一个平衡点,选择最适合你情况的技术。它可以是串行处理、并行处理或批处理。如果你正在处理一个较小的、不太复杂的数据集,并行处理可能会适得其反。
在这个迷你教程中,我们已经了解了各种Python包和技术,它们允许我们对数据函数进行并行处理。
如果你只是在处理一个表格数据集,并且想提高你的处理性能,那么我将建议你尝试Dask(https://www.dask.org/)、datatable(https://github.com/h2oai/datatable)和RAPIDS(https://rapids.ai/)。
参考:https://www.kdnuggets.com/2022/07/parallel-processing-large-file-python.html
