how to do a nested for-each loop with PySpark

2024/12/9 22:17:34

Imagine a large dataset (>40GB parquet file) containing value observations of thousands of variables as triples (variable, timestamp, value).

Now think of a query in which you are just interested in a subset of 500 variables. And you want to retrieve the observations (values --> time series) for those variables for specific points in time (observation window or timeframe). Such having a start and end time.

Without distributed computing (Spark), you could code it like this:

for var_ in variables_of_interest:for incident in incidents:var_df = df_all.filter((df.Variable == var_)& (df.Time > incident.startTime)& (df.Time < incident.endTime))

My question is: how to do that with Spark/PySpark? I was thinking of either:

  1. joining the incidents somehow with the variables and filter the dataframe afterward.
  2. broadcasting the incident dataframe and use it within a map-function when filtering the variable observations (df_all).
  3. use RDD.cartasian or RDD.mapParitions somehow (remark: the parquet file was saved partioned by variable).

The expected output should be:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

Where dataframe 1 contains all variables and their observed values within the timeframe of incident 1 and dataframe 2 those values within the timeframe of incident 2.

I hope you got the idea.

UPDATE

I tried to code a solution based on idea #1 and the code from the answer given by zero323. Work's quite well, but I wonder how to aggregate/group it to the incident in the final step? I tried adding a sequential number to each incident, but then I got errors in the last step. Would be cool if you can review and/or complete the code. Therefore I uploaded sample data and the scripts. The environment is Spark 1.4 (PySpark):

  • Incidents: incidents.csv
  • Variable value observation data (77MB): parameters_sample.csv (put it to HDFS)
  • Jupyter Notebook: nested_for_loop_optimized.ipynb
  • Python Script: nested_for_loop_optimized.py
  • PDF export of Script: nested_for_loop_optimized.pdf
Answer

Generally speaking only the first approach looks sensible to me. Exact joining strategy on the number of records and distribution but you can either create a top level data frame:

ref = sc.parallelize([(var_, incident) for var_ in variables_of_interest:for incident in incidents
]).toDF(["var_", "incident"])

and simply join

same_var = col("Variable") == col("var_")
same_time = col("Time").between(col("incident.startTime"),col("incident.endTime")
)ref.join(df.alias("df"), same_var &  same_time)

or perform joins against particular partitions:

incidents_ = sc.parallelize([(incident, ) for incident in incidents
]).toDF(["incident"])for var_ in variables_of_interest:df = spark.read.parquet("/some/path/Variable={0}".format(var_))df.join(incidents_, same_time)

optionally marking one side as small enough to be broadcasted.

https://en.xdnf.cn/q/72776.html

Related Q&A

Understanding an issue with the namedtuple typename and pickle in Python

Earlier today I was having trouble trying to pickle a namedtuple instance. As a sanity check, I tried running some code that was posted in another answer. Here it is, simplified a little more:from coll…

SQLAlchemy Columns result processing

Im working with a IBM DB2 database using ibm_db2 driver and sqlalchemy. My model is:class User(Model):id = Column(UID, Integer, primary_key=True)user = Column(USER, String(20))password …

How can I access relative paths in Python 2.7 when imported by different modules

The Goal: Access / Write to the same temp files when using a common utility function called from various python modules.Background: I am using the python Unittest module to run sets of custom tests tha…

Emacs: Inferior-mode python-shell appears lagged

Im a Python(3.1.2)/emacs(23.2) newbie teaching myself tkinter using the pythonware tutorial found here. Relevant code is pasted below the question.Question: when I click the Hello button (which should …

AttributeError: module spacy has no attribute load

import spacy nlp = spacy.load(en_core_web_sm)**Error:** Traceback (most recent call last):File "C:\Users\PavanKumar\.spyder-py3\ExcelML.py", line 27, in <module>nlp = spacy.load(en_core…

No module named Win32com.client error when using the pyttsx package

Today, while surfing on Quora, I came across answers on amazing things that python can do. I tried to use the pyttsx Text to Speech Convertor and that gave me an No module named Win32com.client error.T…

Python: How to create and use a custom logger in python use logging module?

I am trying to create a custom logger as in the code below. However, no matter what level I pass to the function, logger only prints warning messages. For example even if I set the argument level = log…

Flask-Mail - Sending email asynchronously, based on Flask-Cookiecutter

My flask project is based on Flask-Cookiecutter and I need to send emails asynchronously.Function for sending email was configured by Miguel’s Tutorial and sending synchronously works fine, but i don’…

Change text_factory in Django/sqlite

I have a django project that uses a sqlite database that can be written to by an external tool. The text is supposed to be UTF-8, but in some cases there will be errors in the encoding. The text is fro…

Shuffle patches in image batch

I am trying to create a transform that shuffles the patches of each image in a batch. I aim to use it in the same manner as the rest of the transformations in torchvision: trans = transforms.Compose([t…