I've two dataframes, df1
with 22 million records and df2
with 2 million records. I'm doing the right join on email_address
as a key.
test_join = df2.join(df1, "email_address", how = 'right').cache()
There are very few duplicate (if any) emails in both data frames. After the join I'm trying to find the partition size of the resulting dataframe test_join
, using this code:
l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))
The result shows that the largest partition is around 100 times bigger than the average partition size. This skew in partition size is giving performance issues in post-join transformations and actions.
I know I can equally re-partition it after the join using repartion(num_partitions)
command, but my question is why am I experiencing this uneven partition result, and is there any way to avoid it in the first place.
P.S: Just to check the assumption if the problem is only with email_address hashing function, I also checked partition size on couple of other joins, I also saw the issue in a numeric key join as well.
@user6910411 you were spot on. The problem was with my data, there was some dumb convention followed to enter empty emails, which was causing this skew key issue.
Upon inspecting the enteries in the largest partition, I came to know what was going in there. I found this debugging technique quite useful, and I'm sure this could help others who are facing the same issue.
BTW, this is the function I wrote, to find the skeweness of the RDD partitions:
from itertools import islice
def check_skewness(df):sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fastl = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()max_part = max(l,key=lambda item:item[1])min_part = min(l,key=lambda item:item[1])if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 timesprint 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))else:print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
and then I just pass the dataframe for which I want to check the skewness like this:
check_skewness(test_join)
and it gives me nice information about its skewness.