I have a dataframe in pyspark.
Say the has some columns a,b,c...
I want to group the data into groups as the value of column changes. Say
A B
1 x
1 y
0 x
0 y
0 x
1 y
1 x
1 y
There will be 3 groups as (1x,1y),(0x,0y,0x),(1y,1x,1y)
And corresponding row data
If I understand correctly you want to create a distinct group every time column A changes values.
First we'll create a monotonically increasing id to keep the row order as it is:
import pyspark.sql.functions as psf
df = sc.parallelize([[1,'x'],[1,'y'],[0,'x'],[0,'y'],[0,'x'],[1,'y'],[1,'x'],[1,'y']])\.toDF(['A', 'B'])\.withColumn("rn", psf.monotonically_increasing_id())
df.show()+---+---+----------+| A| B| rn|+---+---+----------+| 1| x| 0|| 1| y| 1|| 0| x| 2|| 0| y| 3|| 0| x|8589934592|| 1| y|8589934593|| 1| x|8589934594|| 1| y|8589934595|+---+---+----------+
Now we'll use a window function to create a column that contains 1
every time column A changes:
from pyspark.sql import Window
w = Window.orderBy('rn')
df = df.withColumn("changed", (df.A != psf.lag('A', 1, 0).over(w)).cast('int'))+---+---+----------+-------+| A| B| rn|changed|+---+---+----------+-------+| 1| x| 0| 1|| 1| y| 1| 0|| 0| x| 2| 1|| 0| y| 3| 0|| 0| x|8589934592| 0|| 1| y|8589934593| 1|| 1| x|8589934594| 0|| 1| y|8589934595| 0|+---+---+----------+-------+
Finally we'll use another window function to allocate different numbers to each group:
df = df.withColumn("group_id", psf.sum("changed").over(w)).drop("rn").drop("changed")+---+---+--------+| A| B|group_id|+---+---+--------+| 1| x| 1|| 1| y| 1|| 0| x| 2|| 0| y| 2|| 0| x| 2|| 1| y| 3|| 1| x| 3|| 1| y| 3|+---+---+--------+
Now you can build you groups