Reading large file in Spark issue - python

2024/10/3 8:21:18

I have spark installed in local, with python, and when running the following code:

data=sc.textFile('C:\\Users\\xxxx\\Desktop\\train.csv')
data.first()

I get the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-fca93c6aedeb> in <module>()
----> 1 data.first()C:\Spark\python\pyspark\rdd.pyc in first(self)1313         ValueError: RDD is empty1314         """
-> 1315         rs = self.take(1)1316         if rs:1317             return rs[0]C:\Spark\python\pyspark\rdd.pyc in take(self, num)1295 1296             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1297             res = self.context.runJob(self, takeUpToNumLeft, p)1298 1299             items += resC:\Spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)937         # SparkContext#runJob.938         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 939         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)940         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))941 C:\Anaconda2\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args)1024         answer = self.gateway_client.send_command(command)1025         return_value = get_return_value(
-> 1026             answer, self.gateway_client, self.target_id, self.name)1027 1028         for temp_arg in temp_args:C:\Spark\python\pyspark\sql\utils.pyc in deco(*a, **kw)43     def deco(*a, **kw):44         try:
---> 45             return f(*a, **kw)46         except py4j.protocol.Py4JJavaError as e:47             s = e.java_exception.toString()C:\Anaconda2\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name)314                 raise Py4JJavaError(315                     "An error occurred while calling {0}{1}{2}.\n".
--> 316                     format(target_id, ".", name), value)317             else:318                 raise Py4JError(Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write errorat java.net.SocketOutputStream.socketWrite0(Native Method)at java.net.SocketOutputStream.socketWrite(Unknown Source)at java.net.SocketOutputStream.write(Unknown Source)at java.io.BufferedOutputStream.flushBuffer(Unknown Source)at java.io.BufferedOutputStream.write(Unknown Source)at java.io.DataOutputStream.write(Unknown Source)at java.io.FilterOutputStream.write(Unknown Source)at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)at scala.collection.Iterator$class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)Driver stacktrace:at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)at scala.Option.foreach(Option.scala:236)at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)at java.lang.reflect.Method.invoke(Unknown Source)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)at py4j.Gateway.invoke(Gateway.java:259)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:209)at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset by peer: socket write errorat java.net.SocketOutputStream.socketWrite0(Native Method)at java.net.SocketOutputStream.socketWrite(Unknown Source)at java.net.SocketOutputStream.write(Unknown Source)at java.io.BufferedOutputStream.flushBuffer(Unknown Source)at java.io.BufferedOutputStream.write(Unknown Source)at java.io.DataOutputStream.write(Unknown Source)at java.io.FilterOutputStream.write(Unknown Source)at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)at scala.collection.Iterator$class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

I am sure the path is correct, as I have tried with other files in the same folder. I think the issue is with the size of the file, which is around 3.4 gigabytes.

Any help, please?

Answer

Whether you are using Spark in the standalone mode or the cluster mode, the spark.driver.memory and spark.executor.memory defaults to 1GB of memory. You can add more memory to both the driver and executors by changing this configuration while starting your Jupyter notebook or in the Spark Conf file. With this you should be able to read the 3.4GB CSV file, provided you have the necessary RAM on your machine.

https://en.xdnf.cn/q/70752.html

Related Q&A

pyinstaller: 2 instances of my cherrypy app exe get executed

I have a cherrypy app that Ive made an exe with pyinstaller. now when I run the exe it loads itself twice into memory. Watching the taskmanager shows the first instance load into about 1k, then a seco…

python - Dataframes with RangeIndex vs.Int64Index - Why?

EDIT: I have just found a line in my code that changes my df from a RangeIndex to a numeric Int64Index. How and why does this happen?Before this line all my df are type RangeIndex. After this line of …

Uniform Circular LBP face recognition implementation

I am trying to implement a basic face recognition system using Uniform Circular LBP (8 Points in 1 unit radius neighborhood). I am taking an image, re-sizing it to 200 x 200 pixels and then splitting …

SQLAlchemy declarative one-to-many not defined error

Im trying to figure how to define a one-to-many relationship using SQLAlchemys declarative ORM, and trying to get the example to work, but Im getting an error that my sub-class cant be found (naturally…

Convert numpy.array object to PIL image object

I have been trying to convert a numpy array to PIL image using Image.fromarray but it shows the following error. Traceback (most recent call last): File "C:\Users\Shri1008 SauravDas\AppData\Loc…

Scheduling celery tasks with large ETA

I am currently experimenting with future tasks in celery using the ETA feature and a redis broker. One of the known issues with using a redis broker has to do with the visibility timeout:If a task isn’…

How to read out scroll wheel info from /dev/input/mice?

For a home robotics project I need to read out the raw mouse movement information. I partially succeeded in this by using the python script from this SO-answer. It basically reads out /dev/input/mice a…

Tell me why this does not end up with a timeout error (selenium 2 webdriver)?

from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWaitbrowser = webdriver.Firefox()browser.get("http://testsite.com")element = WebDriverWait(browser, 10).until…

PEP 8: comparison to True should be if cond is True: or if cond:

PyCharm is throwing a warning when I do np.where(temp == True)My full code:from numpy import where, arraya = array([[0.4682], [0.5318]]) b = array([[0.29828851, 0., 0.28676873, 0., 0., 0., 0., 0.288014…

Getting the title of youtube video in pytube3?

I am trying to build an app to download YouTube videos in python using pytube3. But I am unable to retrieve the title of the video. Here goes my code: from pytube import YouTube yt = YouTube(link) prin…