I'm trying to upsert a pandas dataframe to a MS SQL Server using pyodbc. I've used a similar approach before to do straight inserts, but the solution I've tried this time is incredibly slow. Is there a more streamlined way to accomplish an upsert than what I have?
sql_connect = pyodbc.connect('Driver={SQL Server Native Client 11.0}; Server=blank1; Database=blank2; UID=blank3; PWD=blank4')
cursor = sql_connect.cursor()for index, row in bdf.iterrows():res = cursor.execute("UPDATE dbo.MPA_BOOK_RAW SET [SITE]=?, [SHIP_TO]=?, [PROD_LINE]=?, [GROUP_NUMBER]=?, [DESCRIPTION]=?, [ORDER_QTY]=?, [BPS_INCLUDE]=? WHERE [CUST]=? AND [ORDER_NUMBER]=? AND [ORDER_DATE]=? AND [PURCHASE_ORDER]=? AND [CHANNEL]=? AND [ITEM]=? AND [END_DT]=?", row['SITE'], row['SHIP_TO'],row['PROD_LINE'],row['GROUP_NUMBER'],row['DESCRIPTION'],row['ORDER_QTY'],row['BPS_INCLUDE'],row['CUST'],row['ORDER_NUMBER'], row['ORDER_DATE'],row['PURCHASE_ORDER'], row['CHANNEL'],row['ITEM'],row['END_DT'])if res.rowcount == 0:cursor.execute("INSERT INTO dbo.MPA_BOOK_RAW ([SITE], [CUST], [ORDER_NUMBER], [ORDER_DATE], [PURCHASE_ORDER], [CHANNEL], [SHIP_TO], [PROD_LINE], [GROUP_NUMBER], [DESCRIPTION], [ITEM], [ORDER_QTY], [END_DT], [BPS_INCLUDE]) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", row['SITE'], row['CUST'],row['ORDER_NUMBER'], row['ORDER_DATE'],row['PURCHASE_ORDER'], row['CHANNEL'],row['SHIP_TO'],row['PROD_LINE'],row['GROUP_NUMBER'],row['DESCRIPTION'],row['ITEM'],row['ORDER_QTY'],row['END_DT'],row['BPS_INCLUDE'])sql_connect.commit()cursor.close()
sql_connect.close()
I tried the above with a five row sample of my original ~50k row dataframe and it worked fine. So the logic seems okay. It's just the speed that is an issue.
Update, July 2022: You can save some typing by using this function to build the MERGE statement and perform the upsert for you.
Here is an example of an "upsert" using MERGE:
from pprint import pprintimport pandas as pd
import sqlalchemy as saconnection_string = ("Driver=ODBC Driver 17 for SQL Server;""Server=192.168.0.199;""UID=scott;PWD=tiger^5HHH;""DATABASE=test;""UseFMTONLY=Yes;"
)
sqlalchemy_url = sa.engine.URL.create("mssql+pyodbc", query={"odbc_connect": connection_string}
)
engine = sa.create_engine(sqlalchemy_url, fast_executemany=True)with engine.begin() as conn:# set up test environmentconn.exec_driver_sql("DROP TABLE IF EXISTS actual_table;")conn.exec_driver_sql("""\CREATE TABLE actual_table (institution_no VARCHAR(3), transit_no VARCHAR(5), branch_name VARCHAR(50),CONSTRAINT PK_actual_table PRIMARY KEY CLUSTERED (institution_no, transit_no));""")# actual_table initial stateconn.exec_driver_sql("""\INSERT INTO actual_table (institution_no, transit_no, branch_name) VALUES ('002', '45678', 'Scotiabank branch #45678 - *** UPDATE NEEDED ***'),('003', '67890', 'RBC branch #67890 - Sudbury, ON');""")# test data to be updated or inserteddf_update = pd.DataFrame([("004", "12345", "TD branch #12345 - London, ON"),("002", "45678", "Scotiabank branch #45678 - Timmins, ON"),("004", "34567", "TD branch #34567 - Toronto, ON"),],columns=["institution_no", "transit_no", "branch_name"],)# Here's where the real work begins ...## Step 1: upload update datadf_update.to_sql("#update_table", conn, index=False)## Step 2: perform the "upsert"sql = """\MERGE actual_table WITH (HOLDLOCK) AS aUSING (SELECT institution_no, transit_no, branch_name FROM #update_table) as uON (a.institution_no = u.institution_no AND a.transit_no = u.transit_no)WHEN MATCHED THENUPDATE SET branch_name = u.branch_nameWHEN NOT MATCHED THENINSERT (institution_no, transit_no, branch_name)VALUES (u.institution_no, u.transit_no, u.branch_name);"""result = conn.exec_driver_sql(sql)# verify results
with engine.begin() as conn:pprint(conn.exec_driver_sql("SELECT * FROM actual_table").fetchall())"""console output:[('002', '45678', 'Scotiabank branch #45678 - Timmins, ON'),('003', '67890', 'RBC branch #67890 - Sudbury, ON'),('004', '12345', 'TD branch #12345 - London, ON'),('004', '34567', 'TD branch #34567 - Toronto, ON')]"""