Spark version 3.0.
I have two dataframes.
I create one dataframe with date columns using pandas date range.
I have a 2nd spark dataframe contains the company name, dates and value.
I want to merge the DF2 to DF1 grouping it by company, so I can fill the missing dates, and also fill the missing value from the previous row.
How can i do this? I thought about left join, but it doesn't seem to work well.
Try this. A bit complex.
import pyspark.sql.functions as f
from pyspark.sql import Windowdf1 = spark.read.option("header","true").option("inferSchema","true").csv("test1.csv") \.withColumn('Date', f.to_date('Date', 'dd/MM/yyyy'))
df2 = spark.read.option("header","true").option("inferSchema","true").csv("test2.csv") \.withColumn('Date', f.to_date('Date', 'dd/MM/yyyy'))w1 = Window.orderBy('Company', 'Date')
w2 = Window.orderBy('Company', 'Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
w3 = Window.partitionBy('partition').orderBy('Company', 'Date')df1.crossJoin(df2.select('Company').distinct()) \.join(df2, ['Company', 'Date'], 'left') \.withColumn('range', (f.col('Value').isNull() | f.lead(f.col('Value'), 1, 0).over(w1).isNull()) != f.col('Value').isNull()) \.withColumn('partition', f.sum(f.col('range').cast('int')).over(w2)) \.withColumn('fill', f.first('Value').over(w3)) \.orderBy('Company', 'Date') \.selectExpr('Company', 'Date', 'coalesce(Value, fill) as Value') \.show(20, False)+-------+----------+-----+
|Company|Date |Value|
+-------+----------+-----+
|A |2000-01-01|13 |
|A |2000-01-02|14 |
|A |2000-01-03|15 |
|A |2000-01-04|19 |
|A |2000-01-05|19 |
|A |2000-01-06|19 |
|A |2000-01-07|19 |
|A |2000-01-08|19 |
|A |2000-01-09|19 |
|B |2000-01-01|19 |
|B |2000-01-02|19 |
|B |2000-01-03|20 |
|B |2000-01-04|25 |
|B |2000-01-05|23 |
|B |2000-01-06|24 |
|B |2000-01-07|24 |
|B |2000-01-08|24 |
|B |2000-01-09|24 |
+-------+----------+-----+
You can see what happen for each line by adding .show
multiple times that might be helpful.