ModuleNotFoundError in PySpark Worker on rdd.collect()

2024/9/28 5:24:41

I am running an Apache Spark program in python, and I am getting an error that I can't understand and can't begin to debug. I have a driver program that defines a function called hound in a file called hound.py. In the same directory, I have a file called hound_base.py that defines a function called hound_base_func. So to call this in hound, I import "from hound_base import hound_base_func." This works, and I call the function and pass a Spark dataframe. hound_base_func takes it as a parameter, does some work on its underlying rdd, and calls rdd.collect(). This actually crashes the code, with the error message "ModuleNotFoundError: No module named 'hound_base'" and this makes no sense! It is saying that the module the code is literally executing in can't be found. Willing to provide as many more details as I can, but this is all I know that relates to the problem... Are there any tips about how I can figure this out?

Full trace

2018-06-14 14:29:26 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in mainFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_commandFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_lengthreturn self.loads(obj)File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loadsreturn pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)at org.apache.spark.scheduler.Task.run(Task.scala:109)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)at java.lang.Thread.run(Unknown Source)
2018-06-14 14:29:26 WARN  TaskSetManager:66 - Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in mainFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_commandFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_lengthreturn self.loads(obj)File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loadsreturn pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)at org.apache.spark.scheduler.Task.run(Task.scala:109)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)at java.lang.Thread.run(Unknown Source)2018-06-14 14:29:26 ERROR TaskSetManager:70 - Task 0 in stage 2.0 failed 1 times; aborting job
[Stage 2:>                                                          (0 + 1) / 1]Traceback (most recent call last):File "F:\data\src\hound.py", line 43, in <module>hound("fakedata.csv", "Field1", "Field2", "Field3", ["Field4a", "Field4b"])File "F:\data\src\hound.py", line 37, in houndhound_base_func(data)File "F:\data\src\hound_base.py", line 220, in hound_base_funcrdd_collected = rdd_result.collect()File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\rdd.py", line 824, in collectport = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__answer, self.gateway_client, self.target_id, self.name)File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 63, in decoreturn f(*a, **kw)File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_valueformat(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in mainFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_commandFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_lengthreturn self.loads(obj)File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loadsreturn pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)at org.apache.spark.scheduler.Task.run(Task.scala:109)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)at java.lang.Thread.run(Unknown Source)Driver stacktrace:at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)at scala.Option.foreach(Option.scala:257)at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)at org.apache.spark.rdd.RDD.collect(RDD.scala:938)at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)at java.lang.reflect.Method.invoke(Unknown Source)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Unknown Source)Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in mainFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_commandFile "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_lengthreturn self.loads(obj)File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loadsreturn pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)at org.apache.spark.scheduler.Task.run(Task.scala:109)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)... 1 moreSUCCESS: The process with PID 18960 (child process of PID 6380) has been terminated.
SUCCESS: The process with PID 6380 (child process of PID 1400) has been terminated.
SUCCESS: The process with PID 1400 (child process of PID 19344) has been terminated.
[Finished in 21.811s]

Answer

Multiple problems here:

First off, you're not allowed to access the spark context from executor tasks, i.e. from any functions inside rdd.map().

Second, using outside functions inside the lambda function of .map is tricky. One solution is to move all function definitions inside the original function if possible. If any are in a different file, you have to explicity add that file using spark_context.addPyFile(path) as importing inside the driver isn't enough.

These things fixed the (many) problem's I've had with this error. Note that it only gets thrown on .collect() because of lazy evaluation. Not fun.

https://en.xdnf.cn/q/71376.html

Related Q&A

Sphinx is not able to import anything

I am trying to use sphinx to document a project of mine. I have used autodoc strings within all of my modules and files. I used sphinx-apidoc to automatically generate rst files for my code. So far, so…

Python : why a method from super class not seen?

i am trying to implement my own version of a DailyLogFile from twisted.python.logfile import DailyLogFileclass NDailyLogFile(DailyLogFile):def __init__(self, name, directory, rotateAfterN = 1, defaultM…

Extract features from last hidden layer Pytorch Resnet18

I am implementing an image classifier using the Oxford Pet dataset with the pre-trained Resnet18 CNN. The dataset consists of 37 categories with ~200 images in each of them. Rather than using the final…

Python Graphs: Latex Math rendering of node labels

I am using the following code to create a pygraphviz graph. But is it possible to make it render latex math equations (see Figure 1)? If not, is there an alternative python library that plots similar…

Given general 3D plane equation

Lets say I have a 3D plane equation:ax+by+cz=dHow can I plot this in python matplotlib?I saw some examples using plot_surface, but it accepts x,y,z values as 2D array. I dont understand how can I conv…

Spark-submit fails to import SparkContext

Im running Spark 1.4.1 on my local Mac laptop and am able to use pyspark interactively without any issues. Spark was installed through Homebrew and Im using Anaconda Python. However, as soon as I try…

Is there a Python API for event-driven Kafka consumer?

I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned to…

SWIG python initialise a pointer to NULL

Is it possible to initialise a ptr to NULL from the python side when dealing with SWIG module?For example, say I have wrapped a struct track_t in a swig module m (_m.so), I can create a pointer to the…

Replacing punctuation in a data frame based on punctuation list [duplicate]

This question already has answers here:Fast punctuation removal with pandas(4 answers)Closed 5 years ago.Using Canopy and Pandas, I have data frame a which is defined by:a=pd.read_csv(text.txt)df=pd.Da…

How to import one submodule from different submodule? [duplicate]

This question already has answers here:Relative imports for the billionth time(14 answers)Closed 6 years ago.My project has the following structure:DSTC/st/__init__.pya.pyg.pytb.pydstc.pyHere is a.py i…