How to convert XComArg to string values in Airflow 2.x?

2024/10/14 16:22:25

Code:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.gcs import GCSHookclass GCSUploadOperator(BaseOperator):@apply_defaultsdef __init__(self,bucket_name,target_file_name,data_as_str,gcp_conn_id="google_cloud_default",*args,**kwargs,):super(GCSUploadOperator, self).__init__(*args, **kwargs)self.bucket_name = bucket_nameself.data_as_str = data_as_strself.gcp_conn_id = gcp_conn_idself.target_file_name = target_file_namedef execute(self, context):hook = GCSHook(self.gcp_conn_id)hook.upload(bucket_name=self.bucket_name,object_name=context["execution_date"].strftime(f"year=2022/month=%m/day=%d/{self.target_file_name}"),data=self.data_as_str,)numbers = PythonOperator(task_id="numbers", python_callable=lambda: "abcde")
gcs = GCSUploadOperator(task_id="upload_content_to_GCS",bucket_name=BUCKET_NAME,target_file_name=f"{STORE_KEY_CONTENT}.json",data_as_str=?????????,   # I need to pass a string result of previous task
)

What I've tried for data_as_str:

    gcs = GCSUploadOperator(task_id="upload_content_to_GCS",bucket_name=BUCKET_NAME,target_file_name=f"{STORE_KEY_CONTENT}.json",data_as_str=numbers)--> TypeError: <Task(PythonOperator): numbers> could not be converted to bytesgcs = GCSUploadOperator(task_id="upload_content_to_GCS",bucket_name=BUCKET_NAME,target_file_name=f"{STORE_KEY_CONTENT}.json",data_as_str=numbers.output)--> TypeError: <airflow.models.xcom_arg.XComArg object at 0x7f6e8ed76760> could not be converted to bytes

Any idea?

Answer

To make it work, you have to define the field you are expecting in your Operator as a template_field. I made this working example:


class CustomDummyOperator(BaseOperator):template_fields = ('msg_from_previous_task',)@apply_defaultsdef __init__(self,msg_from_previous_task,*args, **kwargs) -> None:super(CustomDummyOperator, self).__init__(*args, **kwargs)self.msg_from_previous_task = msg_from_previous_taskdef execute(self, context):print(f"Message: {self.msg_from_previous_task}")

DAG:

dag = DAG('xcom_arg_custom_op',schedule_interval="@once",start_date=days_ago(2),default_args={'owner': 'airflow'},tags=['example'],catchup=False)def return_a_str():return "string_value_from_op1"task_1 = PythonOperator(task_id='task_1',dag=dag,python_callable=return_a_str,
)task_2 = CustomDummyOperator(task_id='task_2',dag=dag,msg_from_previous_task=task_1.output
)task_1 >> task_2

Output log:

[2021-05-25 13:51:50,848] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_arg_custom_op
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2021-05-23T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-05-23T00:00:00+00:00
Message: string_value_from_op1

Under the hood we are using the str() method of XComArg which provides backward compatibility for regular ("none-Taskflow") Operators.

Let me know if that worked for you!

https://en.xdnf.cn/q/69395.html

Related Q&A

Python dryscrape scrape page with cookies

I wanna get some data from site, which requires loggin in. I log in by requestsurl = "http://example.com" response = requests.get(url, {"email":"[email protected]", "…

Python retry using the tenacity module

Im having having difficulty getting the tenacity library to work as expected. The retry in the following test doesnt trigger at all. I would expect a retry every 5 seconds and for the log file to refle…

How to write own logging methods for own logging levels

Hi I would like to extend my logger (taken by logging.getLogger("rrcheck")) with my own methods like: def warnpfx(...):How to do it best? My original wish is to have a root logger writing …

How to use pandas tz_convert to convert to multiple different time zones

I have some data as shown below with hour in UTC. I want to create a new column named local_hour based on time_zone. How can I do that? It seems like pandas tz_convert does not allow a column or panda…

virtualenv, python and subversion

Im trying to use the python subversion SWIG libraries in a virtualenv --no-site-packages environment. How can I make this work?

Float to Fraction conversion in Python

While doing exercise on the topic of float type to Fraction type conversion in Python 3.52, I found the difference between the two different ways of conversion.The first method is:>>> from fra…

How to update an SVM model with new data

I have two data set with different size.1) Data set 1 is with high dimensions 4500 samples (sketches).2) Data set 2 is with low dimension 1000 samples (real data). I suppose that "both data set ha…

Expanding NumPy array over extra dimension

What is the easiest way to expand a given NumPy array over an extra dimension?For example, suppose I have>>> np.arange(4) array([0, 1, 2, 3]) >>> _.shape (4,) >>> expand(np.…

Django-Haystack giving attribute error?

I am trying to use Haystack and Whoosh with my Django app. I followed the steps on Haystack docs, but i am getting this error when i do a searchAttributeError at /search/ module object has no attribute…

python calendar with holidays [duplicate]

This question already has answers here:Closed 12 years ago.Possible Duplicate:Holiday Calendars, File Formats, et al. Hi, Is there a calendar library in Python with which I can check for holidays, com…