Partition pyspark dataframe based on the change in column value

2024/9/24 17:18:48

I have a dataframe in pyspark. Say the has some columns a,b,c... I want to group the data into groups as the value of column changes. Say

A  B
1  x
1  y
0  x
0  y
0  x
1  y
1  x
1  y

There will be 3 groups as (1x,1y),(0x,0y,0x),(1y,1x,1y) And corresponding row data


If I understand correctly you want to create a distinct group every time column A changes values.

First we'll create a monotonically increasing id to keep the row order as it is:

import pyspark.sql.functions as psf
df = sc.parallelize([[1,'x'],[1,'y'],[0,'x'],[0,'y'],[0,'x'],[1,'y'],[1,'x'],[1,'y']])\.toDF(['A', 'B'])\.withColumn("rn", psf.monotonically_increasing_id())|  A|  B|        rn|+---+---+----------+|  1|  x|         0||  1|  y|         1||  0|  x|         2||  0|  y|         3||  0|  x|8589934592||  1|  y|8589934593||  1|  x|8589934594||  1|  y|8589934595|+---+---+----------+

Now we'll use a window function to create a column that contains 1 every time column A changes:

from pyspark.sql import Window
w = Window.orderBy('rn')
df = df.withColumn("changed", (df.A != psf.lag('A', 1, 0).over(w)).cast('int'))+---+---+----------+-------+|  A|  B|        rn|changed|+---+---+----------+-------+|  1|  x|         0|      1||  1|  y|         1|      0||  0|  x|         2|      1||  0|  y|         3|      0||  0|  x|8589934592|      0||  1|  y|8589934593|      1||  1|  x|8589934594|      0||  1|  y|8589934595|      0|+---+---+----------+-------+

Finally we'll use another window function to allocate different numbers to each group:

df = df.withColumn("group_id", psf.sum("changed").over(w)).drop("rn").drop("changed")+---+---+--------+|  A|  B|group_id|+---+---+--------+|  1|  x|       1||  1|  y|       1||  0|  x|       2||  0|  y|       2||  0|  x|       2||  1|  y|       3||  1|  x|       3||  1|  y|       3|+---+---+--------+

Now you can build you groups

