Spark Dataframes: Skewed Partition after Join

2024/11/17 0:55:52

I've two dataframes, df1 with 22 million records and df2 with 2 million records. I'm doing the right join on email_address as a key.

test_join = df2.join(df1, "email_address", how = 'right').cache()

There are very few duplicate (if any) emails in both data frames. After the join I'm trying to find the partition size of the resulting dataframe test_join, using this code:

l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))

The result shows that the largest partition is around 100 times bigger than the average partition size. This skew in partition size is giving performance issues in post-join transformations and actions.

I know I can equally re-partition it after the join using repartion(num_partitions) command, but my question is why am I experiencing this uneven partition result, and is there any way to avoid it in the first place.

P.S: Just to check the assumption if the problem is only with email_address hashing function, I also checked partition size on couple of other joins, I also saw the issue in a numeric key join as well.

Answer

@user6910411 you were spot on. The problem was with my data, there was some dumb convention followed to enter empty emails, which was causing this skew key issue.

Upon inspecting the enteries in the largest partition, I came to know what was going in there. I found this debugging technique quite useful, and I'm sure this could help others who are facing the same issue.

BTW, this is the function I wrote, to find the skeweness of the RDD partitions:

from itertools import islice
def check_skewness(df):sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fastl = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()max_part = max(l,key=lambda item:item[1])min_part = min(l,key=lambda item:item[1])if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 timesprint 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))else:print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part

and then I just pass the dataframe for which I want to check the skewness like this:

check_skewness(test_join)

and it gives me nice information about its skewness.

https://en.xdnf.cn/q/71555.html

Related Q&A

Caught TypeError while rendering: __init__() got an unexpected keyword argument use_decimal

While running the program i am getting the following error messageCaught TypeError while rendering: __init__() got an unexpected keyword argument use_decimalHere is my code i am using jquery 1.6.4 d…

How to get chunks of elements from a queue?

I have a queue from which I need to get chunks of 10 entries and put them in a list, which is then processed further. The code below works (the "processed further" is, in the example, just pr…

Receiving commandline input while listening for connections in Python

I am trying to write a program that has clients connect to it while the server is still able to send commands to all of the clients. I am using the "Twisted" solution. How can I go about this…

Passing a parameter through AJAX URL with Django

Below is my code. n logs correctly in the console, and everything works perfectly if I manually enter the value for n into url: {% url "delete_photo" iddy=2%}. Alas, when I try to use n as a …

WARNING: toctree contains reference to nonexisting document error with Sphinx

I used the sphinx-quickstart to set everything up. I used doc/ for the documentation root location. The folder containing my package is setup as: myfolder/doc/mypackage/__init__.pymoprob.py...After the…

Removing nan from list - Python

I am trying to remove nan from a list, but it is refusing to go. I have tried both np.nan and nan.This is my code:ztt = [] for i in z:if i != nan:ztt.append(i) zttor:ztt = [] for i in z:if i != np.nan…

Safely unpacking results of str.split [duplicate]

This question already has answers here:How do I reliably split a string in Python, when it may not contain the pattern, or all n elements?(5 answers)Closed 6 years ago.Ive often been frustrated by the…

Get a structure of HTML code

Im using BeautifulSoup4 and Im curious whether is there a function which returns a structure (ordered tags) of the HTML code. Here is an example:<html> <body> <h1>Simple example</h…

max_help_position is not works in python argparse library

Hi colleagues I have the code (max_help_position is 2000):formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=2000) parser = argparse.ArgumentParser(formatter_class=formatter_cl…

Python - How to parse argv on the command line using stdin/stdout?

Im new to programming. I looked at tutorials for this, but Im just getting more confused. But what Im trying to do is use stdin and stdout to take in data, pass it through arguments and print out outpu…