PySpark groupby and max value selection

2024/9/16 23:21:15

I have a PySpark dataframe like

 name   city     datesatya  Mumbai  13/10/2016satya  Pune    02/11/2016satya  Mumbai  22/11/2016satya  Pune    29/11/2016satya  Delhi   30/11/2016panda  Delhi   29/11/2016brata  BBSR    28/11/2016brata  Goa     30/10/2016brata  Goa     30/10/2016

I need to find-out most preferred CITY for each name and Logic is " take city as fav_city if city having max no. of city occurrence on aggregate 'name'+'city' pair". And if multiple same occurrence found then consider city with latest Date. WIll explain:

d = df.groupby('name','city').count()
#name  city  count
brata Goa    2  #clear favourite
brata BBSR   1
panda Delhi  1  #as single so clear favourite
satya Pune   2  ##Confusion
satya Mumbai 2  ##confusion
satya Delhi  1   ##shd be discard as other cities having higher count than this city#So get cities having max count
dd = d.groupby('name').agg(F.max('count').alias('count'))
ddd = dd.join(d,['name','count'],'left')
#name  count  citybrata    2   Goa    #fav foundpanda    1   Delhi  #fav foundsatya    2   Mumbai #can't saysatya    2   Pune   #can't say

In case of user 'satya' I need to go back to trx_history and get latest date for cities having equal_max count I:e from 'Mumbai' or 'Pune' which is last transacted (max date), consider that city as fav_city. In this case 'Pune' as '29/11/2016' is latest/max date.

But I am not able to proceed further how to get that done.

Please help me with logic or if any better solution(faster/compact way), please suggest. Thanks.

Answer

First convert date to the DateType:

import pyspark.sql.functions as Fdf_with_date = df.withColumn("date",F.to_date("date", "dd/MM/yyyy")# For Spark < 2.2# F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)

Next groupBy user and city but extend aggregation like this:

df_agg = (df_with_date.groupBy("name", "city").agg(F.count("city").alias("count"), F.max("date").alias("max_date")))

Define a window:

from pyspark.sql.window import Windoww = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))

Add rank:

df_with_rank = (df_agg.withColumn("rank", F.dense_rank().over(w)))

And filter:

result = df_with_rank.where(F.col("rank") == 1)

You can detect remaining duplicates using code like this:

import sysfinal_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)
https://en.xdnf.cn/q/72795.html

Related Q&A

Nesting descriptors/decorators in python

Im having a hard time understanding what happens when I try to nest descriptors/decorators. Im using python 2.7.For example, lets take the following simplified versions of property and classmethod:clas…

Retrieve definition for parenthesized abbreviation, based on letter count

I need to retrieve the definition of an acronym based on the number of letters enclosed in parentheses. For the data Im dealing with, the number of letters in parentheses corresponds to the number of w…

Python (Watchdog) - Waiting for file to be created correctly

Im new to Python and Im trying to implement a good "file creation" detection. If I do not put a time.sleep(x) my files are elaborated in a wrong way since they are still being "created&q…

How do I display add model in tabular format in the Django admin?

Im just starting out with Django writing my first app - a chore chart manager for my family. In the tutorial it shows you how to add related objects in a tabular form. I dont care about the related obj…

Python Matplotlib - Impose shape dimensions with Imsave

I plot a great number of pictures with matplotlib in order to make video with it but when i try to make the video i saw the shape of the pictures is not the same in time...It induces some errors. Is th…

Move x-axis tick labels one position to left [duplicate]

This question already has answers here:Aligning rotated xticklabels with their respective xticks(6 answers)Closed last year.I am making a bar chart and I want to move the x-axis tick labels one positio…

PUT dictionary in dictionary in Python requests

I want to send a PUT request with the following data structure:{ body : { version: integer, file_id: string }}Here is the client code:def check_id():id = request.form[id]res = logic.is_id_valid(id)file…

Does python have header files like C/C++? [closed]

Closed. This question needs details or clarity. It is not currently accepting answers.Want to improve this question? Add details and clarify the problem by editing this post.Closed 9 years ago.Improve…

python: Greatest common divisor (gcd) for floats, preferably in numpy

I am looking for an efficient way to determine the greatest common divisor of two floats with python. The routine should have the following layoutgcd(a, b, rtol=1e-05, atol=1e-08) """ Re…

Difference between @property and property()

Is there a difference betweenclass Example(object):def __init__(self, prop):self._prop = propdef get_prop(self):return self._propdef set_prop(self, prop):self._prop = propprop = property(get_prop, set_…