How to DataBricks read Delta tables based on incremental data

2024/11/19 7:23:11

we have to read the data from delta table and then we are joining the all the tables based on our requirements, then we would have to call the our internal APIS to pass the each row data. this is our goal. there is another one requirement, we would have to read bulk data is first time only, the second time we have to read changes or updated data from source of the delta tables. kindly help us, how to implement this senario.

bulk load code, as below format.

a = spark.table("tablename1")
b = spark.table("tablename2")
c = spark.table("tablename3")final_df = spark.sql(" joinig 3 dataframe as per our requirement")

Calling APIS for each row in above that dataframe "final_df"

now, we could not use changedata properties for source tables. is it possible to read incremental data any timestamp manner or any customer implementation, kindly share to us.

Thanks

Answer

You can use below functions to load latest data, this doesn't load data automatically you need run it whenever new data occurs.

First, do bulk load.

from delta.tables import *
from pyspark.sql.functions import coldef bulk_load(table1,table2,table3):tbl1Dict = {"path":table1,"data":spark.table(f"delta.`{table1}`"),"max_timestamp":DeltaTable.forPath(spark,path=f"{table1}").history().selectExpr("max(timestamp)").collect()[0][0]}tbl2Dict = {"path":table2,"data":spark.table(f"delta.`{table2}`"),"max_timestamp":DeltaTable.forPath(spark,path=f"{table2}").history().selectExpr("max(timestamp)").collect()[0][0]}tbl3Dict = {"path":table3,"data":spark.table(f"delta.`{table3}`"),"max_timestamp":DeltaTable.forPath(spark,path=f"{table3}").history().selectExpr("max(timestamp)").collect()[0][0]}return tbl1Dict,tbl2Dict,tbl3Dict

This gives you the dictionary having path, data and max_timestamp. Use these return values in your join query by getting value from data key.

tbl1,tbl2,tbl3 = bulk_load("/delta_df_1/","/delta_df_2/","/delta_df_3/")
tbl1["data"].show()
tbl2["data"].show()
tbl3["data"].show()

enter image description here

Next pass this dictionary to increment_load function which gives you the same structure dictionary having latest records.

def increment_load(table1,table2,table3):tblDict1 = {"path":table1["path"],"data":spark.table(f'delta.`{table1["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table1["max_timestamp"]),"max_timestamp":DeltaTable.forPath(spark,path=f'{table1["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]}tblDict2 = {"path":table2["path"],"data":spark.table(f'delta.`{table2["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table2["max_timestamp"]),"max_timestamp":DeltaTable.forPath(spark,path=f'{table2["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]}tblDict3 = {"path":table3["path"],"data":spark.table(f'delta.`{table3["path"]}`').select("*","_metadata.file_modification_time").filter(col("file_modification_time")>table3["max_timestamp"]),"max_timestamp":DeltaTable.forPath(spark,path=f'{table3["path"]}').history().selectExpr("max(timestamp)").collect()[0][0]}return tblDict1,tblDict2,tblDict3

This function gets the records having file_modification_time greater than the previous timestamp. That is the latest record.

After, bulk load you having tbl1,tbl2 and tbl3 variables. If you do increment load you will get empty results as we did not write any data.

enter image description here

Now let us write some data to these 3 tables. Below is the data written.

enter image description here

Now, do increment load.

inc_tbl1,inc_tbl2,inc_tbl3 = increment_load(tbl1,tbl2,tbl3)

Output:

enter image description here Here, you can see data latest record is fetched.

Then you can use this in your join operation and Api calls. Remember while calling increment load function with dict tables with previous loaded data.

https://en.xdnf.cn/q/118567.html

Related Q&A

Converting an excel file to a specific Json in python using openpyxl library with datetime

I have the Excel data with the format shown in the image preview. How can I convert it into a JSON using Python? Expected Output: file_name = [ { A: Measurement( calculated_date=datetime(2022, 10, 1, …

How to find a word in a string in a list? (Python)

So im trying to find a way so I can read a txt file and find a specific word. I have been calling the file with myfile=open(daily.txt,r)r=myfile.readlines()that would return a list with a string for ea…

How to make a new default argument list every time [duplicate]

This question already has answers here:The Mutable Default Argument in Python(34 answers)Closed 10 years ago.I have the following setup:def returnList(arg=["abc"]):return arglist1 = returnLis…

How does one reorder information in an XML document in python 3?

Lets suppose I have the following XML structure:<?xml version="1.0" encoding="utf-8" ?> <Document><CstmrCdtTrfInitn><GrpHdr><other_tags>a</other_t…

Python - Replace only exact word in string [duplicate]

This question already has answers here:How to match a whole word with a regular expression?(4 answers)Closed 4 years ago.I want to replace only specific word in one string. However, some other words h…

How to write Hierarchical query in PYTHON

The given input is like:EMPLOYEE_ID NAME MANAGER_ID101 A 10102 B 1110 C 111 D 11 E nullEmployee Cycle LEVEL Path10…

Unable to launch selenium with python in mac

Im facing an issue with selenium with python in Mac OS.. Python 2.7 pydev 3.0My sample codefrom selenium import webdriver driver = webdriver.Firefox() driver.get("https://www.formsite.com/") …

Memory error In instantiating the numpy array

I have a list A of a 50,000 elements and each element is an array of shape (102400) I tried instantiating an array B.B=numpy.array(A)But this throws an exception MemoryError.I know that the memory and …

Setting column names in a pandas dataframe (Python)

When setting a column name for a pandas dataframe, why does the following work:df_coeff = pd.DataFrame(data = lm.coef_, index = X.columns, columns = [Coefficient])While this does not workdf_coeff = pd.…

Check that Python function does not modify argument?

You know how in Python, if v is a list or a dictionary, its quite common to write functions that modify v in place (instead of just returning the new value). Im wondering if it is possible to write a c…