How to convert XComArg to string values in Airflow 2.x?

2024/10/14 16:22:25


from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from import GCSHookclass GCSUploadOperator(BaseOperator):@apply_defaultsdef __init__(self,bucket_name,target_file_name,data_as_str,gcp_conn_id="google_cloud_default",*args,**kwargs,):super(GCSUploadOperator, self).__init__(*args, **kwargs)self.bucket_name = bucket_nameself.data_as_str = data_as_strself.gcp_conn_id = gcp_conn_idself.target_file_name = target_file_namedef execute(self, context):hook = GCSHook(self.gcp_conn_id)hook.upload(bucket_name=self.bucket_name,object_name=context["execution_date"].strftime(f"year=2022/month=%m/day=%d/{self.target_file_name}"),data=self.data_as_str,)numbers = PythonOperator(task_id="numbers", python_callable=lambda: "abcde")
gcs = GCSUploadOperator(task_id="upload_content_to_GCS",bucket_name=BUCKET_NAME,target_file_name=f"{STORE_KEY_CONTENT}.json",data_as_str=?????????,   # I need to pass a string result of previous task

What I've tried for data_as_str:

    gcs = GCSUploadOperator(task_id="upload_content_to_GCS",bucket_name=BUCKET_NAME,target_file_name=f"{STORE_KEY_CONTENT}.json",data_as_str=numbers)--> TypeError: <Task(PythonOperator): numbers> could not be converted to bytesgcs = GCSUploadOperator(task_id="upload_content_to_GCS",bucket_name=BUCKET_NAME,target_file_name=f"{STORE_KEY_CONTENT}.json",data_as_str=numbers.output)--> TypeError: <airflow.models.xcom_arg.XComArg object at 0x7f6e8ed76760> could not be converted to bytes

Any idea?


To make it work, you have to define the field you are expecting in your Operator as a template_field. I made this working example:

class CustomDummyOperator(BaseOperator):template_fields = ('msg_from_previous_task',)@apply_defaultsdef __init__(self,msg_from_previous_task,*args, **kwargs) -> None:super(CustomDummyOperator, self).__init__(*args, **kwargs)self.msg_from_previous_task = msg_from_previous_taskdef execute(self, context):print(f"Message: {self.msg_from_previous_task}")


dag = DAG('xcom_arg_custom_op',schedule_interval="@once",start_date=days_ago(2),default_args={'owner': 'airflow'},tags=['example'],catchup=False)def return_a_str():return "string_value_from_op1"task_1 = PythonOperator(task_id='task_1',dag=dag,python_callable=return_a_str,
)task_2 = CustomDummyOperator(task_id='task_2',dag=dag,msg_from_previous_task=task_1.output
)task_1 >> task_2

Output log:

[2021-05-25 13:51:50,848] {} INFO - Exporting the following env vars:
Message: string_value_from_op1

Under the hood we are using the str() method of XComArg which provides backward compatibility for regular ("none-Taskflow") Operators.

Let me know if that worked for you!

