Export environment variables at runtime with airflow

2024/10/7 8:31:54

I am currently converting workflows that were implemented in bash scripts before to Airflow DAGs. In the bash scripts, I was just exporting the variables at run time with

export HADOOP_CONF_DIR="/etc/hadoop/conf"

Now I'd like to do the same in Airflow, but haven't found a solution for this yet. The one workaround I found was setting the variables with os.environ[VAR_NAME]='some_text' outside of any method or operator, but that means they get exported the moment the script gets loaded, not at run time.

Now when I try to call os.environ[VAR_NAME] = 'some_text' in a function that gets called by a PythonOperator, it does not work. My code looks like this

def set_env():os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"os.environ['PATH'] = "somePath:" + os.environ['PATH']os.environ['SPARK_HOME'] = "pathToSparkHome"os.environ['PYTHONPATH'] = "somePythonPath"os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()set_env_operator = PythonOperator(task_id='set_env_vars_NOT_WORKING',python_callable=set_env,dag=dag)

Now when my SparkSubmitOperator gets executed, I get the exception:

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

My use case where this is relevant is that I have SparkSubmitOperator, where I submit jobs to YARN, therefore either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment. Setting them in my .bashrc or any other config is sadly not possible for me, which is why I need to set them at runtime.

Preferably I'd like to set them in an Operator before executing the SparkSubmitOperator, but if there was the possibility to pass them as arguments to the SparkSubmitOperator, that would be at least something.

Answer

From what I can see in the spark submit operator you can pass in environment variables to spark-submit as a dictionary.

:param env_vars: Environment variables for spark-submit. Itsupports yarn and k8s mode too.
:type env_vars: dict

Have you tried this?

https://en.xdnf.cn/q/70264.html

Related Q&A

Extracting diagonal blocks from a numpy array

I am searching for a neat way to extract the diagonal blocks of size 2x2 that lie along the main diagonal of a (2N)x(2N) numpy array (that is, there will be N such blocks). This generalises numpy.diag,…

AttributeError: Unknown property density [duplicate]

This question already has an answer here:matplotlib histogram plot density argument not working(1 answer)Closed 3 years ago.I am trying to get a hold of SciPy, but I am stuck with Unknown property dens…

Find diagonals sums in numpy (faster)

I have some board numpy arrays like that:array([[0, 0, 0, 1, 0, 0, 0, 0],[1, 0, 0, 0, 0, 1, 0, 1],[0, 0, 0, 0, 0, 0, 0, 1],[0, 1, 0, 1, 0, 0, 0, 0],[0, 0, 0, 0, 0, 0, 0, 1],[0, 0, 0, 0, 1, 0, 0, 0],[0,…

Create dictionary from list python

I have many lists in this format:[1, O1, , , , 0.0000, 0.0000, , ] [2, AP, , , , 35.0000, 105.0000, , ] [3, EU, , , , 47.0000, 8.0000, , ]I need to create a dictionary with key as the first element in …

Outputting height of a pyramid

So for this coding exercise I have to input a number of imaginary blocks and it will tell me how many complete rows high the pyramid is. So for example if I input 6 blocks...I want it to tell me that t…

PySide SVG image formats not found?

I am using PyDev plugin for Eclipse with Qt integration. I have PySide installed and I am having trouble with SVG image formats. I know when I run my application the formats located in C:\Python27\Lib\…

convert ascii character to signed 8-bit integer python

This feels like it should be very simple, but I havent been able to find an answer..In a python script I am reading in data from a USB device (x and y movements of a USB mouse). it arrives in single AS…

What is the equivalent way of doing this type of pythonic vectorized assignment in MATLAB?

Im trying to translate this line of code from Python to MATLAB:new_img[M[0, :] - corners[0][0], M[1, :] - corners[1][0], :] = img[T[0, :], T[1, :], :]So, naturally, I wrote something like this:new_img(…

How do I connect mitmproxy to another proxy outside of my control?

The process would be that the browser send a request to MITMproxy and then generate a request that gets sent to target proxy server which isnt controlled by us. The proxy server would send a response t…

How does conda-env list / conda info --envs find environments?

Ive been experimenting with anaconda/miniconda because my users use structural biology programs installed with miniconda and none of the authors A) take into account that there might be other miniconda…