Partition pyspark dataframe based on the change in column value

2024/11/16 11:37:52

I have a dataframe in pyspark. Say the has some columns a,b,c... I want to group the data into groups as the value of column changes. Say

A  B
1  x
1  y
0  x
0  y
0  x
1  y
1  x
1  y

There will be 3 groups as (1x,1y),(0x,0y,0x),(1y,1x,1y) And corresponding row data

Answer

If I understand correctly you want to create a distinct group every time column A changes values.

First we'll create a monotonically increasing id to keep the row order as it is:

import pyspark.sql.functions as psf
df = sc.parallelize([[1,'x'],[1,'y'],[0,'x'],[0,'y'],[0,'x'],[1,'y'],[1,'x'],[1,'y']])\.toDF(['A', 'B'])\.withColumn("rn", psf.monotonically_increasing_id())
df.show()+---+---+----------+|  A|  B|        rn|+---+---+----------+|  1|  x|         0||  1|  y|         1||  0|  x|         2||  0|  y|         3||  0|  x|8589934592||  1|  y|8589934593||  1|  x|8589934594||  1|  y|8589934595|+---+---+----------+

Now we'll use a window function to create a column that contains 1 every time column A changes:

from pyspark.sql import Window
w = Window.orderBy('rn')
df = df.withColumn("changed", (df.A != psf.lag('A', 1, 0).over(w)).cast('int'))+---+---+----------+-------+|  A|  B|        rn|changed|+---+---+----------+-------+|  1|  x|         0|      1||  1|  y|         1|      0||  0|  x|         2|      1||  0|  y|         3|      0||  0|  x|8589934592|      0||  1|  y|8589934593|      1||  1|  x|8589934594|      0||  1|  y|8589934595|      0|+---+---+----------+-------+

Finally we'll use another window function to allocate different numbers to each group:

df = df.withColumn("group_id", psf.sum("changed").over(w)).drop("rn").drop("changed")+---+---+--------+|  A|  B|group_id|+---+---+--------+|  1|  x|       1||  1|  y|       1||  0|  x|       2||  0|  y|       2||  0|  x|       2||  1|  y|       3||  1|  x|       3||  1|  y|       3|+---+---+--------+

Now you can build you groups

https://en.xdnf.cn/q/71676.html

Related Q&A

Error group argument must be None for now in multiprocessing.pool

Below is my python script.import multiprocessing # We must import this explicitly, it is not imported by the top-level # multiprocessing module. import multiprocessing.pool import timefrom random impor…

Making the diamond square fractal algorithm infinite

Im trying to generate an infinite map, as such. Im doing this in Python, and I cant get the noise libraries to correctly work (they dont seem to ever find my VS2010, and doing it in raw Python would be…

How do I generate coverage xml report for a single package?

Im using nose and coverage to generate coverage reports. I only have one package right now, ae, so I specify to only cover that: nosetests -w tests/unit --with-xunit --with-coverage --cover-package=aeA…

Asynchronous URLfetch when we dont care about the result? [Python]

In some code Im writing for GAE I need to periodically perform a GET on a URL on another system, in essence pinging it and Im not terribly concerned if the request fails, times out or succeeds.As I bas…

Python: How to fill out form all at once with splinter/Browser?

Currently, I’m filling out the form on a site with the following:browser.fill(‘form[firstname]’, ‘Mabel’) browser.fill(‘form[email]’, ‘[email protected]’) browser.select(‘form[color]’, ‘yel…

Dump elementtree into xml file

I created an xml tree with something like thistop = Element(top) child = SubElement(top, child) child.text = some texthow do I dump it into an XML file? I tried top.write(filename), but the method doe…

Crash reporting in Python

Is there a crash reporting framework that can be used for pure Python Tkinter applications? Ideally, it should work cross-platform.Practically speaking, this is more of exception reporting since the P…

Python SocketServer

How can I call shutdown() in a SocketServer after receiving a certain message "exit"? As I know, the call to serve_forever() will block the server.Thanks!

Loading JSON file in BigQuery using Google BigQuery Client API

Is there a way to load a JSON file from local file system to BigQuery using Google BigQuery Client API?All the options I found are:1- Streaming the records one by one.2- Loading JSON data from GCS.3- …

How to extract tables from a pdf with PDFMiner?

I am trying to extract information from some tables in a pdf document. Consider the input:Title 1 some text some text some text some text some text some text some text some text some text some textTabl…