As shown in the title, I have a big data frame (df
) that needs to be processed row-wise, as df
is big (6 GB), I want to utilize the multiprocessing
package of python to speed it up, below is a toy example, given my writing skill and complexity of the task, I'll describe what I want to achieve briefly and levea the details for the code.
The original data is df
, from which I want to perform some row-wise analysis(order does not matter) that requires not just the focal row itself but other rows that satisfy certain conditions. Below are the toy data and my code,
import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),'district': (['upper'] * 5 + ['down'] * 5) * 3,'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})df['row_id'] = df.indexprint(df)value district region row_id
0 8 upper A 0
1 4 upper A 1
2 0 upper A 2
3 3 upper A 3
4 0 upper A 4
5 0 down A 5
6 3 down A 6
7 7 down A 7
8 1 down A 8
9 7 down A 9
10 7 upper B 10
11 3 upper B 11
12 9 upper B 12
13 8 upper B 13
14 2 upper B 14
15 4 down B 15
16 5 down B 16
17 3 down B 17
18 5 down B 18
19 3 down B 19
20 3 upper C 20
21 1 upper C 21
22 3 upper C 22
23 0 upper C 23
24 3 upper C 24
25 2 down C 25
26 0 down C 26
27 1 down C 27
28 1 down C 28
29 0 down C 29
What I want to do is to add two other columns count_b
and count_a
, which simply count the number of rows that fall in the range (value - 2, value) and (value, value + 2) within the same region
and district
subset, for instance,
count_b
for row row_id==0
should be 0 since no rows within the region=='A'
and district == 'upper'
has value 7, which fall in (8-2, 8). So the desired output should be:
count_a count_b region row_id
0 0 0 A 0
1 0 1 A 1
2 0 0 A 2
3 1 0 A 3
4 0 0 A 4
5 1 0 A 5
6 0 0 A 6
7 0 0 A 7
8 0 1 A 8
9 0 0 A 9
10 1 0 B 10
11 0 1 B 11
12 0 1 B 12
13 1 1 B 13
14 1 0 B 14
15 2 2 B 15
16 0 1 B 16
17 1 0 B 17
18 0 1 B 18
19 1 0 B 19
20 0 0 C 20
21 0 1 C 21
22 0 0 C 22
23 1 0 C 23
24 0 0 C 24
25 0 2 C 25
26 2 0 C 26
27 1 2 C 27
28 1 2 C 28
29 2 0 C 29
question 1: can such task be vectorized?
question 2: how can we use multiprocessing
to speed it up (solved)?
I decided to go with multiprocessing
for the reason that I'm not sure how to accomplish this through vectorization. The solution is (based on the answer provided by )
multiprocessing
def b_a(input_df,r_d):print('length of input dataframe: ' + str(len(input_df)))# print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]print('length of sliced dataframe: ' + str(len(sub_df)))print(r_d[0],r_d[1])b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])for id in sub_df['row_id']:print('processing row: ' + str(id))focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']temp_b = sub_df.loc[(sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]temp_a = sub_df.loc[(sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]if len(temp_a):temp_a['count_a'] = temp_a['row_id'].count()else:temp_a = temp_a.append(pd.Series(), ignore_index=True)temp_a = temp_a.reindex(columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)print(temp_a)if len(temp_b):temp_b['count_b'] = temp_b['row_id'].count()else:temp_b = temp_b.append(pd.Series(), ignore_index=True)temp_b = temp_b.reindex(columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)print(len(temp_a),len(temp_b))temp_b.drop_duplicates('count_b', inplace=True)temp_a.drop_duplicates('count_a', inplace=True)temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),temp_a[['count_a']].reset_index(drop=True)], axis=1)temp['row_id'] = idtemp['region'] = str(r_d[0])b_a = pd.concat([b_a, temp])return b_ar_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))if __name__ == '__main__':P = Pool(3)out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],list(itertools.chain.from_iterable(r_d_list)))) # S3# out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2# out = P.starmap(b_a,zip(df,r_d_list)) # S1# print(out)P.close()P.join()final = pd.concat(out, ignore_index=True)print(final)final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))
Since using P.starmap
(as well as P.map
) requires one to feed the function with all possible pairs of argument for b_a
, solution S1
won't work since the zip(df,r_d_list)
actually produces zip between columns names of df
and elements in r_d_list
, which will then cause the error AttributeError: 'str' object has no attribute 'loc'
because the input_df
for function b_a
is literally a string (column name df), which can be verified by looking into the output of print('length of input dataframe: ' + str(len(input_df)))
, which will produce the length of column names of input_df
(in this case df
). The accepted answer corrects this by creating a reference array (S2
) (not sure what that exactly is) which has the same length as the parameter list (r_d_list
). This solution works great but maybe slow when df
is large since, to my personal understanding, it requires a search through the entire dataframe for each pair of parameters (region
and distrcit
), so I came up with a modified version which split the data into chunks based on region
and distrcit
and then searches within each chunk, instead of the entire data frame (S3). For me, this solution improves performance by 20 percent in terms of running time, see below for the code:
region = df['region'].unique()chunk_numbers = 3chunk_region = math.ceil(len(region) / chunk_numbers)chunks = list()r_d_list = list()row_count = 0for i in range(chunk_numbers):print(i)if i < chunk_numbers-1:regions = region[(i*chunk_region):((i+1)*chunk_region)]temp = df.loc[df['region'].isin(regions.tolist())]chunks.append(temp)r_d_list.append(list(itertools.product(regions,temp['district'].unique())))del tempelse:regions = region[(i * chunk_region):len(region)]temp = df.loc[df['region'].isin(regions.tolist())]chunks.append(temp)r_d_list.append(list(itertools.product(regions,temp['district'].unique())))del temprow_count = row_count + len(chunks[i])print(row_count)
add this between print(df)
and def b_a()
, and remember to comment out the r_d_list = ...
before if __name__ == '__main__'
.
Thanks for this wonderful community, I have a workable solution now, I updated my question to provide some material for those who may run into the same problem in the future as well as to better formulate the question to get even better solutions.