Providing context in TriggerDagRunOperator

2024/9/8 10:33:55

I have a dag that has been triggered by another dag. I have passed through to this dag some configuration variables via the DagRunOrder().payload dictionary in the same way the official example has done.

Now in this dag I have another TriggerDagRunOperator to start a second dag and would like to pass those same configuration variables through.

I have succesfully accessed the payload variables in a PythonOperator like so:

def run_this_func(ds, **kwargs):print("Remotely received value of {} for message and {} for day".format(kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"]))run_this = PythonOperator(task_id='run_this',provide_context=True,python_callable=run_this_func,dag=dag
)

But the same pattern does not work in the TriggerDagRunOperator:

def trigger(context, dag_run_obj, **kwargs):dag_run_obj.payload = {"message": kwargs["dag_run"].conf["message"],"day": kwargs["dag_run"].conf["day"]}return dag_run_objtrigger_step = TriggerDagRunOperator(task_id="trigger_modelling",trigger_dag_id="Dummy_Modelling",provide_context=True,python_callable=trigger,dag=dag
)

It yields a warning regarding the use of provide_context:

INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
INFO - Subtask: *args: ()
INFO - Subtask: **kwargs: {'provide_context': True}
INFO - Subtask:   category=PendingDeprecationWarning

And this error suggesting I haven't passed the conf :

INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
INFO - Subtask:     result = task_copy.execute(context=context)
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute
INFO - Subtask:     dro = self.python_callable(context, dro)
INFO - Subtask:   File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger
INFO - Subtask:     "message": kwargs["dag_run"].conf["message"],
INFO - Subtask: KeyError: 'dag_run'

A second pattern that I've tried which also hasn't worked is using the params argument like so:

def trigger(context, dag_run_obj):dag_run_obj.payload = {"message": context['params']['message'],"day": context['params']['day']}return dag_run_objtrigger_step = TriggerDagRunOperator(task_id="trigger_modelling",trigger_dag_id="Dummy_Modelling",python_callable=trigger,params={"message": "{{ dag_run.conf['message'] }}","day": "{{ dag_run.conf['day'] }}"},dag=dag
)

This pattern does not yield an error but instead passes the parameters through to the next dag as strings ie it doesn't evaluate the expressions.


How can I access the configuration variables in the TriggerDagRunOperator of the second dag?

Answer

In Airflow2.0.x, the equivalent of @efbbrown's answer is:

from airflow.operators.trigger_dagrun import TriggerDagRunOperatortrigger_step = TriggerDagRunOperator(task_id="trigger_modelling",trigger_dag_id="Dummy_Modelling",conf={"message": "{{ dag_run.conf['message'] }}", "day":"{{ dag_run.conf['day'] }}"},dag=dag
)

The pull request is described here on GitHub.

See the documentation for external-triggers and for trigger_dagrun.

Here is a YouTube video on the topic that shows the correct imports.

https://en.xdnf.cn/q/72579.html

Related Q&A

Install gstreamer support for opencv python package

I have built my own opencv python package from source. import cv2 print(cv2.__version__)prints: 3.4.5Now the issue I am facing is regarding the use of gstreamer from the VideoCapture class of opencv. I…

How to use query function with bool in python pandas?

Im trying to do something like df.query("column == a").count()but withdf.query("column == False").count()What is the right way of using query with a bool column?

Text Extraction from image after detecting text region with contours

I want to build an OCR for an image using machine learning in python. I have preprocessed image by converting it to grayscale , applied otsu thresholding . I then used the contours to find the text re…

Pyinstaller executable fails importing torchvision

This is my main.py:import torchvision input("Press key")It runs correctly in the command line: python main.pyI need an executable for windows. So I did : pyinstaller main.pyBut when I launche…

Embedding Python in C: Having problems importing local modules

I need to run Python scripts within a C-based app. I am able to import standard modules from the Python libraries i.e.:PyRun_SimpleString("import sys")But when I try to import a local module …

Primitive Calculator - Dynamic Approach

Im having some trouble getting the correct solution for the following problem: Your goal is given a positive integer n, find the minimum number ofoperations needed to obtain the number n starting from …

Cant pickle : attribute lookup builtin.function failed

Im getting the error below, the error only happens when I add delay to process_upload function, otherwise it works without a problem.Could someone explain what this error is, why its happening and any…

Pandas Merge two DataFrames without some columns

ContextIm trying to merge two big CSV files together.ProblemLets say Ive one Pandas DataFrame like the following...EntityNum foo ... ------------------------ 1001.01 100 1002.02 50 1003…

Getting Youtube data using Python

Im trying to learn how to analyze social media data available on the web and Im starting with Youtube.from apiclient.errors import HttpError from outh2client.tools import argparser from apiclient.disco…

matplotlib does not show legend in scatter plot

I am trying to work on a clustering problem for which I need to plot a scatter plot for my clusters.%matplotlib inline import matplotlib.pyplot as plt df = pd.merge(dataframe,actual_cluster) plt.scatte…