Python Multiprocessing a large dataframe on Linux

2024/10/5 15:03:40

As shown in the title, I have a big data frame (df) that needs to be processed row-wise, as df is big (6 GB), I want to utilize the multiprocessing package of python to speed it up, below is a toy example, given my writing skill and complexity of the task, I'll describe what I want to achieve briefly and levea the details for the code.

The original data is df, from which I want to perform some row-wise analysis(order does not matter) that requires not just the focal row itself but other rows that satisfy certain conditions. Below are the toy data and my code,

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),'district': (['upper'] * 5 + ['down'] * 5) * 3,'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})df['row_id'] = df.indexprint(df)value district region  row_id
0       8    upper      A       0
1       4    upper      A       1
2       0    upper      A       2
3       3    upper      A       3
4       0    upper      A       4
5       0     down      A       5
6       3     down      A       6
7       7     down      A       7
8       1     down      A       8
9       7     down      A       9
10      7    upper      B      10
11      3    upper      B      11
12      9    upper      B      12
13      8    upper      B      13
14      2    upper      B      14
15      4     down      B      15
16      5     down      B      16
17      3     down      B      17
18      5     down      B      18
19      3     down      B      19
20      3    upper      C      20
21      1    upper      C      21
22      3    upper      C      22
23      0    upper      C      23
24      3    upper      C      24
25      2     down      C      25
26      0     down      C      26
27      1     down      C      27
28      1     down      C      28
29      0     down      C      29

What I want to do is to add two other columns count_b and count_a, which simply count the number of rows that fall in the range (value - 2, value) and (value, value + 2) within the same region and district subset, for instance, count_b for row row_id==0 should be 0 since no rows within the region=='A' and district == 'upper' has value 7, which fall in (8-2, 8). So the desired output should be:

   count_a count_b region row_id
0        0       0      A      0
1        0       1      A      1
2        0       0      A      2
3        1       0      A      3
4        0       0      A      4
5        1       0      A      5
6        0       0      A      6
7        0       0      A      7
8        0       1      A      8
9        0       0      A      9
10       1       0      B     10
11       0       1      B     11
12       0       1      B     12
13       1       1      B     13
14       1       0      B     14
15       2       2      B     15
16       0       1      B     16
17       1       0      B     17
18       0       1      B     18
19       1       0      B     19
20       0       0      C     20
21       0       1      C     21
22       0       0      C     22
23       1       0      C     23
24       0       0      C     24
25       0       2      C     25
26       2       0      C     26
27       1       2      C     27
28       1       2      C     28
29       2       0      C     29

question 1: can such task be vectorized?

question 2: how can we use multiprocessing to speed it up (solved)?

I decided to go with multiprocessing for the reason that I'm not sure how to accomplish this through vectorization. The solution is (based on the answer provided by )

multiprocessing

def b_a(input_df,r_d):print('length of input dataframe: ' + str(len(input_df)))# print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]print('length of sliced dataframe: ' + str(len(sub_df)))print(r_d[0],r_d[1])b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])for id in sub_df['row_id']:print('processing row: ' + str(id))focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']temp_b = sub_df.loc[(sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]temp_a = sub_df.loc[(sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]if len(temp_a):temp_a['count_a'] = temp_a['row_id'].count()else:temp_a = temp_a.append(pd.Series(), ignore_index=True)temp_a = temp_a.reindex(columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)print(temp_a)if len(temp_b):temp_b['count_b'] = temp_b['row_id'].count()else:temp_b = temp_b.append(pd.Series(), ignore_index=True)temp_b = temp_b.reindex(columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)print(len(temp_a),len(temp_b))temp_b.drop_duplicates('count_b', inplace=True)temp_a.drop_duplicates('count_a', inplace=True)temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),temp_a[['count_a']].reset_index(drop=True)], axis=1)temp['row_id'] = idtemp['region'] = str(r_d[0])b_a = pd.concat([b_a, temp])return b_ar_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))if __name__ == '__main__':P = Pool(3)out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],list(itertools.chain.from_iterable(r_d_list)))) # S3# out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2# out = P.starmap(b_a,zip(df,r_d_list)) # S1# print(out)P.close()P.join()final = pd.concat(out, ignore_index=True)print(final)final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))

Since using P.starmap (as well as P.map) requires one to feed the function with all possible pairs of argument for b_a, solution S1 won't work since the zip(df,r_d_list) actually produces zip between columns names of df and elements in r_d_list, which will then cause the error AttributeError: 'str' object has no attribute 'loc' because the input_df for function b_a is literally a string (column name df), which can be verified by looking into the output of print('length of input dataframe: ' + str(len(input_df))), which will produce the length of column names of input_df(in this case df). The accepted answer corrects this by creating a reference array (S2) (not sure what that exactly is) which has the same length as the parameter list (r_d_list). This solution works great but maybe slow when df is large since, to my personal understanding, it requires a search through the entire dataframe for each pair of parameters (region and distrcit), so I came up with a modified version which split the data into chunks based on region and distrcit and then searches within each chunk, instead of the entire data frame (S3). For me, this solution improves performance by 20 percent in terms of running time, see below for the code:

region = df['region'].unique()chunk_numbers = 3chunk_region = math.ceil(len(region) / chunk_numbers)chunks = list()r_d_list = list()row_count = 0for i in range(chunk_numbers):print(i)if i < chunk_numbers-1:regions = region[(i*chunk_region):((i+1)*chunk_region)]temp = df.loc[df['region'].isin(regions.tolist())]chunks.append(temp)r_d_list.append(list(itertools.product(regions,temp['district'].unique())))del tempelse:regions = region[(i * chunk_region):len(region)]temp = df.loc[df['region'].isin(regions.tolist())]chunks.append(temp)r_d_list.append(list(itertools.product(regions,temp['district'].unique())))del temprow_count = row_count + len(chunks[i])print(row_count)

add this between print(df) and def b_a(), and remember to comment out the r_d_list = ... before if __name__ == '__main__'.

Thanks for this wonderful community, I have a workable solution now, I updated my question to provide some material for those who may run into the same problem in the future as well as to better formulate the question to get even better solutions.

Answer

Change

out = P.starmap(b_a,zip(df,r_d_list))

into

out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list))

The output looks as the follows:

length of input dataframe: 300
region: B district: down
length of input dataframe: 300
region: C district: upper
length of sliced dataframe: 50
length of input dataframe: 300
region: C district: down
length of sliced dataframe: 50
length of sliced dataframe: 50
6
[  count_a count_b region row_id
0       6       7      A      0,   count_a count_b region row_id
0       2       4      A     50,   count_a count_b region row_id
0       1       4      B    100,   count_a count_b region row_id
0       7       4      B    150,   count_a count_b region row_id
0       4       9      C    200,   count_a count_b region row_id
0       4       4      C    250]

The array of df maintains references:

dfa = [df for i in range(len(r_d_list))]for i in dfa:print(['id(i): ', id(i)])

The output of the above looks as the follows:

['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]
['id(i): ', 4427699200]

Difference between zip(df, r_d_list) and zip(dfa, r_d_list):

Review the example on zip at https://docs.python.org/3.3/library/functions.html#zip to understand what zip does and how it constructs the result.

list(zip(df, r_d_list)) returns the follows:

[
('value', ('A', 'upper')),
('district', ('A', 'down')),
('region', ('B', 'upper')),
('row_id', ('B', 'down'))
]

list(zip(dfa, r_d_list)) returns the follows:

[
(fa, ('A', 'upper')),
(fa, ('A', 'down')),
(fa, ('B', 'upper')),
(fa, ('B', 'down'))
]

Here you can find some example on pool.starmap at Python multiprocessing pool.map for multiple arguments.

Updated the working code:

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pooldf = pd.DataFrame({'value': np.random.randint(0, 10, size=300),'district': (['upper'] * 50 + ['down'] * 50) * 3,'region': ['A'] * 100 + ['B'] * 100 + ['C'] * 100})df['row_id'] = df.index# b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])# solution 2: multi processing
def b_a(input_df, r_d):
#    print('length of input dataframe: ' + str(len(input_df)))
#    print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]  # subset data that in certain region and district#    print('length of sliced dataframe: ' + str(len(sub_df)))b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])  # an empty data frame to store resultfor id in sub_df['row_id']:focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']temp_b = sub_df.loc[(sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]temp_a = sub_df.loc[(sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]if len(temp_a):temp_a['count_a'] = temp_a['row_id'].count()else:temp_a = temp_a.reindex(columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)if len(temp_b):temp_b['count_b'] = temp_b['row_id'].count()else:temp_b = temp_b.reindex(columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)temp_b.drop_duplicates('count_b', inplace=True)temp_a.drop_duplicates('count_a', inplace=True)temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),temp_a[['count_a']].reset_index(drop=True)], axis=1)temp['row_id'] = idtemp['region'] = str(r_d[0])b_a = pd.concat([b_a, temp])return b_ar_d_list = list(itertools.product(df['region'].unique(), df['district'].unique()))# dfa = [df for i in range(len(r_d_list))]#for i in dfa:
#    print(['id(i): ', id(i)])if __name__ == '__main__':P = Pool(3)out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list))# print(len(out))P.close()P.join()final = pd.concat(out, ignore_index=True)print(final)

Output for final:

    count_a count_b region row_id
0         4       6      A      0
1         5       4      A      1
2       NaN       5      A      2
3         5       8      A      3
4         5     NaN      A      4
..      ...     ...    ...    ...
295       2       7      C    295
296       6     NaN      C    296
297       6       6      C    297
298       5       5      C    298
299       6       6      C    299[300 rows x 4 columns]
https://en.xdnf.cn/q/119664.html

Related Q&A

How to pass more arguments through tkinter bind

How do I pass more arguments through tkinters bind method? for the example:tk = Tk() def moveShip(event,key):if event.keysym == Down and player1.selectedCoord[1] != 9:if key == place:player1.selectedC…

Python Class method definition : unexpected indent [duplicate]

This question already has answers here:Im getting an IndentationError (or a TabError). How do I fix it?(6 answers)Closed 6 months ago.I am getting started with Django and Python so naturally Im doing …

pandas python - round() not behaving correctly

Im rounding values in a dataframe to 1 decimal place. Here is the dfVren 2015 Hsten 2014 Vren 2014 Question 1) Maten r vllagad oc…

Converting dictionary into string

d={a:Apple,b:ball,c:cat}The above dictionary I have and I want my Output like the below-mentioned resultres="a=Apple,b=ball,c=cat"Is it possible in a pythonic way then please answer it I have…

Django Getting QuerySet.values() based on condition

Lets say I have a model Class Parent and a Class Child. And child has a field called status and a ForeignKey relationship to Parent.Lets say I retrieve one parent by calling filter (so as to have a Que…

Condition checking in python with user input [duplicate]

This question already has answers here:If...else statement issue with raw_input on Python(2 answers)Closed 8 years ago.I tried taking an input from keyboard. the checking that input with an if else sta…

Triangle of T in Python

EDIT ** I cant multiply strings by an integer. Its for a homework and those were the instructions **I need to do a triangle in python using for loops or while loops(mandatory). The final output should …

Finding prime project euler

Find the 10,001st prime number.I am trying to do Project Euler problems without using copying and pasting code I dont understand. I wrote code that finds whether a number is prime or not and am trying …

Splitting a python string

I have a string in python that I want to split in a very particular manner. I want to split it into a list containing each separate word, except for the case when a group of words are bordered by a par…

file modifiaction and manipulation

How would you scan a dir for a text file and read the text file by date modified, print it to screen having the script scan the directory every 5 seconds for a newer file creadted and prints it. Is it …