I am currently converting workflows that were implemented in bash scripts before to Airflow DAGs. In the bash scripts, I was just exporting the variables at run time with
export HADOOP_CONF_DIR="/etc/hadoop/conf"
Now I'd like to do the same in Airflow, but haven't found a solution for this yet. The one workaround I found was setting the variables with os.environ[VAR_NAME]='some_text'
outside of any method or operator, but that means they get exported the moment the script gets loaded, not at run time.
Now when I try to call os.environ[VAR_NAME] = 'some_text'
in a function that gets called by a PythonOperator, it does not work. My code looks like this
def set_env():os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"os.environ['PATH'] = "somePath:" + os.environ['PATH']os.environ['SPARK_HOME'] = "pathToSparkHome"os.environ['PYTHONPATH'] = "somePythonPath"os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()set_env_operator = PythonOperator(task_id='set_env_vars_NOT_WORKING',python_callable=set_env,dag=dag)
Now when my SparkSubmitOperator gets executed, I get the exception:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
My use case where this is relevant is that I have SparkSubmitOperator
, where I submit jobs to YARN, therefore either HADOOP_CONF_DIR
or YARN_CONF_DIR
must be set in the environment. Setting them in my .bashrc
or any other config is sadly not possible for me, which is why I need to set them at runtime.
Preferably I'd like to set them in an Operator before executing the SparkSubmitOperator
, but if there was the possibility to pass them as arguments to the SparkSubmitOperator
, that would be at least something.