IllegalArgumentException thrown when count and collect function in spark

2024/10/2 10:37:41

I tried to load a small dataset on local Spark when this exception is thrown when I used count() in PySpark (take() seems working). I tried to search about this issue but got no luck in figuring out why. It seems something is wrong with the partition of RDD. Any ideas? Thank you in advance!

sc.stop()
sc = SparkContext("local[4]", "temp")
testfile1 = sc.textFile(localpath('part-00000-Copy1.xml'))
testfile1.filter(lambda x: x.strip().encode('utf-8').startswith(b'<row')).take(1) ## take function seems working

and this is what the data looks like:

['  <row AcceptedAnswerId="15" AnswerCount="5" Body="&lt;p&gt;How should I elicit prior distributions from experts when fitting a Bayesian model?&lt;/p&gt;&#10;" CommentCount="1" CreationDate="2010-07-19T19:12:12.510" FavoriteCount="17" Id="1" LastActivityDate="2010-09-15T21:08:26.077" OwnerUserId="8" PostTypeId="1" Score="26" Tags="&lt;bayesian&gt;&lt;prior&gt;&lt;elicitation&gt;" Title="Eliciting priors from experts" ViewCount="1457" />']

this is the problem:

test1 = testfile1.filter(lambda x: (x.strip().encode('utf-8').startswith(b'<row'))).filter(lambda x: x is not None)
test1.count()

here is the exception:

    ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-34-d7626ed81f56> in <module>()
----> 1 test1.count()/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in count(self)1039         31040         """
-> 1041         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()1042 1043     def stats(self):/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in sum(self)1030         6.01031         """
-> 1032         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)1033 1034     def count(self):/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in fold(self, zeroValue, op)904         # zeroValue provided to each partition is unique from the one provided905         # to the final reduce call
--> 906         vals = self.mapPartitions(func).collect()907         return reduce(op, vals, zeroValue)908 /usr/local/lib/python3.6/site-packages/pyspark/rdd.py in collect(self)807         """808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())810         return list(_load_from_socket(port, self._jrdd_deserializer))811 /usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)1131         answer = self.gateway_client.send_command(command)1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)1134 1135         for temp_arg in temp_args:/usr/local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)317                 raise Py4JJavaError(318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)320             else:321                 raise Py4JError(Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentExceptionat org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)at org.apache.xbean.asm5.ClassReader.a(Unknown Source)at org.apache.xbean.asm5.ClassReader.b(Unknown Source)at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)at scala.collection.immutable.List.foreach(List.scala:381)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.collect(RDD.scala:935)at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:564)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.base/java.lang.Thread.run(Thread.java:844)

~~~~~~~~~~~~ Update: So I tried to create a minimal example in PySpark, and the issue of count() still exists. I was wondering if this issue is related to the configuration of Spark in my jupyter notebook. Here's a minimal example:

import pyspark
from pyspark import SparkContext
import findspark
findspark.init()
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):x, y = random.random(), random.random()return x*x + y*y < 1
sc.parallelize(range(0, num_samples)).filter(inside).take(10)

And here's the output:

[0, 1, 3, 4, 7, 9, 11, 12, 13, 14]

And do the count() again:

sc.parallelize(range(0, num_samples)).filter(inside).count()

And the output looks similar to the previous example:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-23-5588c6957b1d> in <module>()
----> 1 sc.parallelize(range(0, num_samples)).filter(inside).count()/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in count(self)1039         31040         """
-> 1041         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()1042 1043     def stats(self):/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in sum(self)1030         6.01031         """
-> 1032         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)1033 1034     def count(self):/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in fold(self, zeroValue, op)904         # zeroValue provided to each partition is unique from the one provided905         # to the final reduce call
--> 906         vals = self.mapPartitions(func).collect()907         return reduce(op, vals, zeroValue)908 /usr/local/lib/python3.6/site-packages/pyspark/rdd.py in collect(self)807         """808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())810         return list(_load_from_socket(port, self._jrdd_deserializer))811 /usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)1131         answer = self.gateway_client.send_command(command)1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)1134 1135         for temp_arg in temp_args:/usr/local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)317                 raise Py4JJavaError(318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)320             else:321                 raise Py4JError(Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentExceptionat org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)at org.apache.xbean.asm5.ClassReader.a(Unknown Source)at org.apache.xbean.asm5.ClassReader.b(Unknown Source)at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)at scala.collection.immutable.List.foreach(List.scala:381)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.collect(RDD.scala:935)at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:564)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.base/java.lang.Thread.run(Thread.java:844)

One thing that I noticed is that when I run PySpark over jupyter notebook, the terminal shows some WARNING:

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/usr/local/lib/python3.6/site-packages/pyspark/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
18/01/15 14:05:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

And a google search revealed this post talked about the last WARNING.

Answer

Are you using Java 9? There are so many issues using Java9 with Apache projects now.

If that is the case and this is a Mac OS, do the following:

cd /usr/libexec
./java_home -V

And you will be able to see the java versions you have. Pick up an older version as your new JAVA_HOME.

export JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.{YourVersion}.jdk/Contents/Home"

If this works (I am not sure because information is missing), add the export command to your profile init script.

https://en.xdnf.cn/q/70866.html

Related Q&A

Check if string does not contain strings from the list

I have the following code: mystring = ["reddit", "google"] mylist = ["a", "b", "c", "d"] for mystr in mystring:if any(x not in mystr for x in…

How do I conditionally include a file in a Sphinx toctree? [duplicate]

This question already has answers here:Conditional toctree in Sphinx(4 answers)Closed 8 years ago.I would like to include one of my files in my Sphinx TOC only when a certain tag is set, however the ob…

Use BeautifulSoup to extract sibling nodes between two nodes

Ive got a document like this:<p class="top">I dont want this</p><p>I want this</p> <table><!-- ... --> </table><img ... /><p> and all tha…

Put HTML into ValidationError in Django

I want to put an anchor tag into this ValidationError:Customer.objects.get(email=value)if self.register:# this address is already registeredraise forms.ValidationError(_(An account already exists for t…

python os.listdir doesnt show all files

In my windows7 64bit system, there is a file named msconfig.exe in folder c:/windows/system32. Yes, it must exists.But when i use os.listdir to search the folder c:/windows/system32, I didnt get the fi…

how to save modified ELF by pyelftools

Recently Ive been interested in ELF File Structure. Searching on web, I found an awesome script named pyelftools. But in fact I didnt know the way to save the modified ELF; ELFFile class doesnt have an…

Access train and evaluation error in xgboost

I started using python xgboost backage. Is there a way to get training and validation errors at each training epoch? I cant find one in the documentation Have trained a simple model and got output:[09…

Gtk* backend requires pygtk to be installed

From within a virtual environment, trying to load a script which uses matplotlibs GTKAgg backend, I fail with the following traceback:Traceback (most recent call last):File "<stdin>", l…

ValueError: A value in x_new is below the interpolation range

This is a scikit-learn error that I get when I domy_estimator = LassoLarsCV(fit_intercept=False, normalize=False, positive=True, max_n_alphas=1e5)Note that if I decrease max_n_alphas from 1e5 down to 1…

Parsing Python function calls to get argument positions

I want code that can analyze a function call like this:whatever(foo, baz(), puppet, 24+2, meow=3, *meowargs, **meowargs)And return the positions of each and every argument, in this case foo, baz(), pup…