原文:
towardsdatascience.com/maximizing-python-code-efficiency-strategies-to-overcome-common-performance-hurdles-c6292610d785
https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/88373d079cd895a3690ee1cb5380fb63.png
照片由Kevin Canlas在Unsplash上拍摄
概述
在对 Python 代码优化进行探索时,我们查看阻碍性能并导致开销的常见问题。在这里我们分析了两个问题——一个与嵌套循环相关,另一个与读取大量数据集引起的内存/分配问题相关。
针对嵌套循环问题,我们通过一个示例用例来了解嵌套循环困境,然后转向一个作为嵌套循环引起的性能问题替代方案的解决方案。
针对在大数据集中遇到内存/分配问题,我们探讨了多种数据读取策略,并比较了每种策略的性能。让我们进一步探讨。
性能问题-1:嵌套循环困境
https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/c7dd6d884cb360305abe8a7d1720da2d.png
嵌套循环困境 – 照片由愚木混株 cdd20在Unsplash上拍摄
虽然嵌套循环是一种常见的编程结构,但它们低效的实现可能会导致性能不佳。在嵌套循环中可能会遇到的一个显著挑战是“内核持续运行”的问题。这发生在代码中嵌套了低效实现的循环,导致执行时间延长;在大多数情况下,会导致无限循环。嵌套循环易于实现,但为了优化性能有时需要牺牲嵌套结构的简单性。嵌套循环可能导致更高的算法复杂度,导致执行时间更长,尤其是在处理大数据集时。需要注意的是,尽管嵌套循环本身可能并非“不好”,但理解它们的含义并考虑替代方法可以导致更高效的 Python 代码。在这种情况下,考虑 Python 的功能和库是很好的。
讨论嵌套循环的用例
我们有两个文件,其中一些记录是彼此的重复。两个文件中都有一个标识符列,但 ID 值格式不同。因此,如果我们比较一个文件中的整个行与另一个文件,它不会被标记为重复。因此,确定重复的唯一方法是比较除标识符列之外的其他特定列。
例如,假设文件包含以下值 –
File1.csv
ID,Name,Value A123,John,100 B456,Jane,200 C789,Bob,150
File2.csv
ID,Name,Value A-123,Jane,200 B_456,Bob,150 C-789,John,100 D-839,Sarah,180
让我们看看使用嵌套循环的实现。我们将文件加载到两个数据框(df_primary 和 df_secondary)中,然后以嵌套的方式使用两个循环遍历这两个数据框。
df_primary['Flag']=''# Iterate over rows in the primary dataframeforidx_primary,row_primaryindf_primary.iterrows():# Iterate over rows in the secondary dataframeforidx_secondary,row_secondaryindf_secondary.iterrows():# Check for matching conditionsif(row_primary['Column1']==row_secondary['Column1'])and(row_primary['Column2']==row_secondary['Column2'])and(row_primary['Column3']==row_secondary['Column3'])and(row_primary['Column4']==row_secondary['Column4']):# Update the 'Flag' column in the primary dataframe for the current rowdf_primary.at[idx_primary,'Flag']='Y'在这种情况下,使用了‘iterrows’和嵌套循环。与嵌套循环一起使用的‘iterrows’由于需要隐式类型转换而产生了额外的计算成本。
嵌套循环的替代方案 – pd.merge
为了更好的性能,建议在 Pandas 中使用‘merge’函数。pd.merge 是 pandas 库中的一个函数,用于根据公共列或索引合并两个数据框对象。它是一个高级函数,提供了一种方便的方式来执行数据库风格的连接。合并过程涉及根据指定的键匹配和合并数据。此外,pandas 使用向量化操作来有效地处理这项任务。
merged_df=pd.merge(df_primary,df_secondary,on=['Column1','Column2','Column3','Column4'],how='outer',indicator=True)pd.merge 函数中的指示器参数用于向结果数据框添加一个名为‘_merge’的特殊列,该列指示每行的来源。在结果数据框(merged_df)中,‘_merge’列将指示每行是否仅来自左数据框(left_only)、仅来自右数据框(right_only)或来自两个数据框(both)。这在我们想要跟踪合并过程中哪个数据框对特定行有贡献时特别有用。在这种情况下,我们可以使用‘_merge’列中对应于值‘both’的行。通过这个操作,每一组重复项都会在 merged_df 中合并成一行。因此,‘merged_df’将只包含唯一的行。这个解决方案不仅语法上更简单,还有助于减少性能开销,并消除了迭代的需求。
性能问题 2:内存/分配问题
在尝试加载大文件时,你可能遇到了以下错误 –MemoryError: Unable to allocate 512. KiB for an array with shape (65536,) and data type int64.这表明程序在尝试执行内存密集型操作(如加载大型数据集)时已耗尽内存。这种情况尤其是在尝试在单台机器上读取数据集(使用如 Pandas read_csv 等库)时发生。
作为先决条件,请从datacatalog.worldbank.org/search/dataset/0065200/Data-Use-in-Academia-Dataset下载数据集‘data_use_article_database’。(此数据集受 Creative Commons Attribution 4.0-datacatalog.worldbank.org/public-licenses许可)。
文件以 1.9 GB 大小的‘.csv’文件形式下载。请注意,我们尝试在一个 8 GB RAM 的计算机上用 Python 将这个数据集加载到 dataframe 中。这个练习的唯一目的是尝试使用资源最少的计算机加载大型数据集;然后确定最优化策略以避免上述提到的内存错误。
克服内存/分配问题的策略
基本分块
考虑以块的形式读取文件的一个策略是因为在这里我们的机器会耗尽内存。当在 pandas read_csv 函数中指定 chunksize 参数时,结果(‘chunks’)是 pandas.io.parsers.readers.TextFileReader 的一个实例。然后我们通过迭代‘chunks’变量使用一个‘for’循环一次读取一个块。在这种情况下,变量‘chunk’是一个临时 dataframe,它被连接到最终的 dataframe‘result_df’中。
importosimportpandasaspd csv_file_path='results_completed_updated_20231003.csv'chunk_size=10000chunks=pd.read_csv(csv_file_path,chunksize=chunk_size)#Process each chunkresult_df=pd.DataFrame()forchunkinchunks:#Concatenate the resultsresult_df=pd.concat([result_df,chunk],ignore_index=True)注意,块的大小可以根据你为读取文件这个过程分配的内存量来确定。
上述基本分块策略的效率取决于各种因素,包括你的数据集大小、可用的系统资源以及在每个块上执行的具体操作。
改进的分块
对于上述讨论的基本分块策略,有一些考虑和潜在的改进可以实施。
当 result_df 的大小增长时,pd.concat 操作可能变得低效,尤其是在每次迭代中连接大型 dataframes 时。这是因为它在每次迭代中创建一个新的 dataframe,而复制数据可能很昂贵。
而不是在每次迭代中连接 dataframes,考虑将块存储在一个列表中,并在循环外部连接它们。在基本分块中,pd.concat 需要保留三个 dataframe 副本(result_df、chunk 和最终的赋值 df‘result_df’)在内存中,而在改进的分块中,我们只使用一个列表来追加数据。
chunks_list=[]csv_file_path='results_completed_updated_20231003.csv'chunk_size=10000chunks=pd.read_csv(csv_file_path,chunksize=chunk_size)result_df=pd.DataFrame()forchunkinchunks:#Process each chunkchunks_list.append(chunk)result_df=pd.DataFrame(chunks_list)并行处理策略
另一个选择是使用并行处理策略。如果你喜欢类似 Pandas 的界面,考虑使用库‘Dask’。Dask 为大于内存的数据集提供了并行和分布式计算能力。
Dask 将‘.csv’文件分割成多个块,每个块在不同的工作者上独立处理。每个工作者并发地读取其分配的块,导致并行读取。
工作者是执行计算的一个独立进程或线程。在单机模式下,工作者使用多个线程或进程来并行化计算。在集群中,Dask 将计算分布在集群中的多个节点上。集群中的每台机器都成为工作者,Dask 协调这些分布式工作者之间的任务执行。
这里是一个基本的图解,用于说明使用 Dask 的操作。
https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/0455e0713d81c9052bc589a5eba9d5a5.png
图 1. 解释 ‘Dask’ 的基本图解(作者图片)
让我们尝试使用 dask 读取手头的数据集。
importdask.dataframeasdd csv_file_path='results_completed_updated_20231003.csv'ddf=dd.read_csv(csv_file_path,assume_missing=True,dtype={'titleId':'object','ordering':'object','title':'object','region':'object','language':'object','types':'object','attributes':'object','isOriginalTitle':'object'})result_df=ddf.compute()在这种情况下,我们使用 ‘dask’ 库的 read_csv 选项读取 ‘.csv’ 文件。当参数 assume_missing 设置为 True 时,Dask 将某些值,如 NaN(非数字)或 None,视为缺失值。然而,当值设置为 False 时,Dask 不会对表示缺失数据的特定值做出假设。所有值都将被视为常规数据,并且不会对缺失值进行特殊处理。
此外,提供显式数据类型(dtype)是可选的,因为 Dask 在读取过程中设计为自动推断数据类型。然而,在某些情况下,您可能希望使用 dd.read_csv 中的 dtype 参数显式指定数据类型。以下是一些原因——
提高性能:显式指定数据类型可以提高性能。Dask 的自动类型推断可能会产生一些开销,尤其是在大型数据集上。提供显式数据类型允许 Dask 跳过推断步骤。
内存效率:当 Dask 读取 CSV 文件时,它需要扫描数据样本以推断数据类型。对于大型数据集,此过程可能会消耗大量的内存。指定数据类型可以帮助提高内存效率。
避免类型推断错误:在某些情况下,Dask 的自动类型推断可能无法正确推断出预期的数据类型,尤其是对于包含混合数据的列。显式指定数据类型有助于避免类型推断中的潜在错误。
自定义数据类型:如果您对某些列的数据类型有特定要求,可以使用 dtype 参数来自定义它们。例如,您可能希望将一个列视为字符串,即使它包含数值。在这种情况下,显式指定数据类型是有意义的。
让我们继续下一个数据读取策略。
列式数据读取与并行处理策略(混合)
此策略涉及以列式格式读取数据,这是 PyArrow 的主要关注点。与基于行的格式相比,列式数据表示对于某些分析和处理任务更有效。在此用例中,我们使用 Dask 将大型数据文件纯读取到 Dask Dataframe 中。然后我们将 Dask Dataframe 加载到 PyArrow 表中,该表可以进一步用于复杂分析任务。
importdask.dataframeasddimportpyarrowaspaimportpyarrow.csvascsvimporttime#Specify the CSV file pathcsv_file_path='results_completed_updated_20231003.csv'start_time=time.time()#Read the file into a Dask DataFrameddf=dd.read_csv(csv_file_path,assume_missing=True,dtype={'titleId':'object','ordering':'object','title':'object','region':'object','language':'object','types':'object','attributes':'object','isOriginalTitle':'object'})#Convert Dask DataFrame to PyArrow Tabletable=pa.Table.from_pandas(ddf.compute(),preserve_index=False)end_time=time.time()#Calculate time taken in secondstime_taken_seconds=end_time-start_time#Convert time to minutestime_taken_minutes=time_taken_seconds/60#Display the PyArrow Tableprint(table)print(f"Time taken:{time_taken_minutes}minutes")在 read_options 中使用 use_threads=True 表示 PyArrow 被配置为使用多个线程进行并行读取。这可以加快数据的读取速度;尤其是在从磁盘读取时。在‘convert_options’下,strings_can_be_null=True 允许 PyArrow 处理数据中的字符串值可以表示为 null 的情况。
评估策略性能
让我们比较改进的块处理、并行处理策略以及涉及列式数据读取与并行处理的混合策略所需的时间。我们省略了基本块处理策略,因为它相当耗时。
importtime chunks_list=[]csv_file_path='results_completed_updated_20231003.csv'chunk_size=10000start_time=time.time()chunks=pd.read_csv(csv_file_path,chunksize=chunk_size)result_df=pd.DataFrame()forchunkinchunks:#Process each chunkchunks_list.append(chunk)result_df=pd.concat(chunks_list,ignore_index=True)end_time=time.time()#Calculate time taken in secondstime_taken_seconds=end_time-start_time#Convert time to minutestime_taken_minutes=time_taken_seconds/60time_taken_minutes使用改进的块处理策略,所需时间在 2.15 到 2.38 分钟之间。
importdask.dataframeasdd csv_file_path='results_completed_updated_20231003.csv'start_time=time.time()ddf=dd.read_csv(csv_file_path,assume_missing=True,dtype={'titleId':'object','ordering':'object','title':'object','region':'object','language':'object','types':'object','attributes':'object','isOriginalTitle':'object'})result_df=ddf.compute()end_time=time.time()# Calculate time taken in secondstime_taken_seconds=end_time-start_time# Convert time to minutestime_taken_minutes=time_taken_seconds/60time_taken_minutes使用并行处理策略,考虑到多次运行,所需时间在 0.9 到 1.2 分钟之间。
importdask.dataframeasddimportpyarrowaspaimportpyarrow.csvascsvimporttime csv_file_path='results_completed_updated_20231003.csv'start_time=time.time()#Read the CSV file into a Dask DataFrameddf=dd.read_csv(csv_file_path,assume_missing=True,dtype={'titleId':'object','ordering':'object','title':'object','region':'object','language':'object','types':'object','attributes':'object','isOriginalTitle':'object'})#Convert Dask DataFrame to PyArrow Tabletable=pa.Table.from_pandas(ddf.compute(),preserve_index=False)end_time=time.time()#Calculate time taken in secondstime_taken_seconds=end_time-start_time#Convert time to minutestime_taken_minutes=time_taken_seconds/60#Display the PyArrow Tableprint(table)print(f"Time taken:{time_taken_minutes}minutes")使用列式数据读取与并行处理策略,多次运行所需时间在 1.5 到 2.5 分钟之间。
考虑事项与结论
总之,解决数据处理中嵌套循环困境和内存/分配问题的双重挑战对于实现最佳性能至关重要。在性能问题-1 中,提出了一个详细的使用案例,并推荐使用 pd.merge 实现战略解决方案。利用 Pandas 及其高效的“合并”功能可以显著提高计算速度并简化复杂的数据操作。
在性能问题-2 中,重点转向解决内存/分配问题,这是处理大型数据集时常见的障碍。探索了各种块处理策略作为克服内存限制的有效补救措施。采用将数据读入更小、更易管理的块的方法;以及利用并行处理技术可以显著提高内存效率,并允许在不影响性能的情况下处理大量数据集。
如果你的主要关注点是处理不适合内存的大型数据集,那么仅使用 Dask 就足够了。如果你有从 PyArrow 的列式数据表示(例如,分析操作)中受益的具体任务,将 Dask 与 PyArrow 结合使用将是有益的(列式数据读取与并行处理策略)。
通过针对这些性能问题采取针对性解决方案,希望数据科学家和分析师能够确保他们的工作流程优化以提高速度、可扩展性和资源利用率。