PySpark 2.x: Programmatically adding Maven JAR Coordinates to Spark

2024/10/16 0:24:56

The following is my PySpark startup snippet, which is pretty reliable (I've been using it a long time). Today I added the two Maven Coordinates shown in the spark.jars.packages option (effectively "plugging" in Kafka support). Now that normally triggers dependency downloads (performed by Spark automatically):

import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row# ------------------------------------------# Note: Row() in .../pyspark/sql/types.py# isn't included in '__all__' list(), so# we must import it by name here.# ------------------------------------------num_cpus = multiprocessing.cpu_count()        # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None)     # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None)     # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME',        None)     # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP',     None)     # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre'  # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVY_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0','org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[{}]'.format(num_cpus)),('spark.app.name', 'myApp'),('spark.submit.deployMode', 'client'),('spark.ui.showConsoleProgress', 'true'),('spark.eventLog.enabled', 'false'),('spark.logConf', 'false'),('spark.jars.repositories', 'file:/' + JARS_IVY_REPO),('spark.jars.ivy', JARS_IVY_REPO),('spark.jars.packages', spark_jars_packages), ])spark_sesn            = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt            = spark_sesn.sparkContext
spark_reader          = spark_sesn.read
spark_streamReader    = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")

However the plugins aren't downloading and/or loading when I run the snippet (e.g. ./python -i init_spark.py), as they should.

This mechanism used to work, but then stopped. What am I missing?

Thank you in advance!

Answer

This is the kind of post where the QUESTION will be worth more than the ANSWER, because the code above works but isn't anywhere to be found in Spark 2.x documentation or examples.

The above is how I've programmatically added functionality to Spark 2.x by way of Maven Coordinates. I had this working but then it stopped working. Why?

When I ran the above code in a jupyter notebook, the notebook had -- behind the scenes -- already run that identical code snippet by way of my PYTHONSTARTUP script. That PYTHONSTARTUP script has the same code as the above, but omits the maven coordinates (by intent).

Here, then, is how this subtle problem emerges:

spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()

Because a Spark Session already existed, the above statement simply reused that existing session (.getOrCreate()), which did not have the jars/libraries loaded (again, because my PYTHONSTARTUP script intentionally omits them). This is why it is a good idea to put print statements in PYTHONSTARTUP scripts (which are otherwise silent).

In the end, I simply forgot to do this: $ unset PYTHONSTARTUP before starting the JupyterLab / Notebook daemon.

I hope the Question helps others because that's how to programmatically add functionality to Spark 2.x (in this case Kafka). Note that you'll need an internet connection for the one-time download of the specified jars and recursive dependencies from Maven Central.

https://en.xdnf.cn/q/69222.html

Related Q&A

Python: How to create simple web pages without a huge framework? [closed]

As it currently stands, this question is not a good fit for our Q&A format. We expect answers to be supported by facts, references, or expertise, but this question will likely solicit debate, argum…

AttributeError: module MySQLdb.constants.FIELD_TYPE has no attribute JSON while migrating in Django

I do not know in what way solve this error. Any hints? I have simple Django projects and receive this error when try to do python3 manage.py migrate. This is related to any programming error in app or…

Downloading file using IE from python

Im trying to download file with Python using IE:from win32com.client import DispatchWithEventsclass EventHandler(object):def OnDownloadBegin(self):passie = DispatchWithEvents("InternetExplorer.App…

Good resources to start python for web development?

Im really interested in learning Python for web development. Can anyone point me in the right direction? Ive been looking at stuff on Google, but havent really found anything that shows proper documen…

django file upload: [Errno 13] Permission denied: /static

I am trying to upload several files in django. On my local maching where I use the djangos build in server everything works fine but on my productivity server I get this error:[Errno 13] Permission den…

efficient way to change the header of a file in Python

I am trying to write a python script to update the header (only the first line) of some huge files, but as the new header is not necessary to be the same size (in bytes) as the original one, is there a…

Converting a numpy array of dtype objects to dtype complex

I have a numpy array which I want to convert from an object to complex. If I take that array as dtype string and convert it, there is no problem:In[22]: bane Out[22]: array([1.000027337501943-7.3310852…

Python ZeroMQ PUSH/PULL -- Lost Messages?

I am trying to use python with zeroMQ in PUSH / PULL mode, sending messages of size 4[MB] every few seconds.For some reason, while it looks like all the messages are sent, ONLY SOME of them appear to h…

Using object as key in dictionary in Python - Hash function

I am trying to use an object as the key value to a dictionary in Python. I follow the recommendations from some other posts that we need to implement 2 functions: __hash__ and __eq__ And with that, I a…

Compressing request body with python-requests?

(This question is not about transparent decompression of gzip-encoded responses from a web server; I know that requests handles that automatically.)ProblemIm trying to POST a file to a RESTful web serv…