Imagine a large dataset (>40GB parquet file) containing value observations of thousands of variables as triples (variable, timestamp, value).
Now think of a query in which you are just interested in a subset of 500 variables. And you want to retrieve the observations (values --> time series) for those variables for specific points in time (observation window or timeframe). Such having a start and end time.
Without distributed computing (Spark), you could code it like this:
for var_ in variables_of_interest:for incident in incidents:var_df = df_all.filter((df.Variable == var_)& (df.Time > incident.startTime)& (df.Time < incident.endTime))
My question is: how to do that with Spark/PySpark? I was thinking of either:
- joining the incidents somehow with the variables and filter the dataframe afterward.
- broadcasting the incident dataframe and use it within a map-function when filtering the variable observations (df_all).
- use RDD.cartasian or RDD.mapParitions somehow (remark: the parquet file was saved partioned by variable).
The expected output should be:
incident1 --> dataframe 1
incident2 --> dataframe 2
...
Where dataframe 1 contains all variables and their observed values within the timeframe of incident 1 and dataframe 2 those values within the timeframe of incident 2.
I hope you got the idea.
UPDATE
I tried to code a solution based on idea #1 and the code from the answer given by zero323. Work's quite well, but I wonder how to aggregate/group it to the incident in the final step? I tried adding a sequential number to each incident, but then I got errors in the last step. Would be cool if you can review and/or complete the code. Therefore I uploaded sample data and the scripts. The environment is Spark 1.4 (PySpark):
- Incidents: incidents.csv
- Variable value observation data (77MB): parameters_sample.csv (put it to HDFS)
- Jupyter Notebook: nested_for_loop_optimized.ipynb
- Python Script: nested_for_loop_optimized.py
- PDF export of Script: nested_for_loop_optimized.pdf