we have to read the data from delta table and then we are joining the all the tables based on our requirements, then we would have to call the our internal APIS to pass the each row data. this is our goal. there is another one requirement, we would have to read bulk data is first time only, the second time we have to read changes or updated data from source of the delta tables. kindly help us, how to implement this senario.
bulk load code, as below format.
a = spark.table("tablename1")
b = spark.table("tablename2")
c = spark.table("tablename3")final_df = spark.sql(" joinig 3 dataframe as per our requirement")
Calling APIS for each row in above that dataframe "final_df"
now, we could not use changedata properties for source tables. is it possible to read incremental data any timestamp manner or any customer implementation, kindly share to us.
Thanks
You can use below functions to load latest data, this doesn't load data automatically you need run it whenever new data occurs.
First, do bulk load.
from delta.tables import *
from pyspark.sql.functions import coldef bulk_load(table1,table2,table3):tbl1Dict = {"path":table1,"data":spark.table(f"delta.`{table1}`"),"max_timestamp":DeltaTable.forPath(spark,path=f"{table1}").history().selectExpr("max(timestamp)").collect()[0][0]}tbl2Dict = {"path":table2,"data":spark.table(f"delta.`{table2}`"),"max_timestamp":DeltaTable.forPath(spark,path=f"{table2}").history().selectExpr("max(timestamp)").collect()[0][0]}tbl3Dict = {"path":table3,"data":spark.table(f"delta.`{table3}`"),"max_timestamp":DeltaTable.forPath(spark,path=f"{table3}").history().selectExpr("max(timestamp)").collect()[0][0]}return tbl1Dict,tbl2Dict,tbl3Dict
This gives you the dictionary having path, data and max_timestamp.
Use these return values in your join query by getting value from data key.
tbl1,tbl2,tbl3 = bulk_load("/delta_df_1/","/delta_df_2/","/delta_df_3/")
tbl1["data"].show()
tbl2["data"].show()
tbl3["data"].show()
Next pass this dictionary to increment_load function which gives you the same structure dictionary having latest records.
def increment_load(table1,table2,table3):tblDict1 = {"path":table1["path"],"data":spark.table(f'delta.`{table1["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table1["max_timestamp"]),"max_timestamp":DeltaTable.forPath(spark,path=f'{table1["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]}tblDict2 = {"path":table2["path"],"data":spark.table(f'delta.`{table2["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table2["max_timestamp"]),"max_timestamp":DeltaTable.forPath(spark,path=f'{table2["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]}tblDict3 = {"path":table3["path"],"data":spark.table(f'delta.`{table3["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table3["max_timestamp"]),"max_timestamp":DeltaTable.forPath(spark,path=f'{table3["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]}return tblDict1,tblDict2,tblDict3
This function gets the records having file_modification_time greater than the previous timestamp. That is the latest record.
After, bulk load you having tbl1
,tbl2
and tbl3
variables.
If you do increment load you will get empty results as we did not write any data.
Now let us write some data to these 3 tables.
Below is the data written.
Now, do increment load.
inc_tbl1,inc_tbl2,inc_tbl3 = increment_load(tbl1,tbl2,tbl3)
Output:
Here, you can see data latest record is fetched.
Then you can use this in your join operation and Api calls.
Remember while calling increment load function with dict tables with previous loaded data.