I was using pyspark on AWS EMR (4 r5.xlarge as 4 workers, each has one executor and 4 cores), and I got AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'
. Below is a snippet of the code that threw this error:
search = SearchEngine(db_file_dir = "/tmp/db")
conn = sqlite3.connect("/tmp/db/simple_db.sqlite")
pdf_ = pd.read_sql_query('''select zipcode, lat, lng, bounds_west, bounds_east, bounds_north, bounds_south from simple_zipcode''',conn)
brd_pdf = spark.sparkContext.broadcast(pdf_)
conn.close()@udf('string')
def get_zip_b(lat, lng):pdf = brd_pdf.value out = pdf[(np.array(pdf["bounds_north"]) >= lat) & (np.array(pdf["bounds_south"]) <= lat) & (np.array(pdf['bounds_west']) <= lng) & (np.array(pdf['bounds_east']) >= lng) ]if len(out):min_index = np.argmin( (np.array(out["lat"]) - lat)**2 + (np.array(out["lng"]) - lng)**2)zip_ = str(out["zipcode"].iloc[min_index])else:zip_ = 'bad'return zip_df = df.withColumn('zipcode', get_zip_b(col("latitude"),col("longitude")))
Below is the traceback, where line 102, in get_zip_b refers to pdf = brd_pdf.value
:
21/08/02 06:18:19 WARN TaskSetManager: Lost task 12.0 in stage 7.0 (TID 1814, ip-10-22-17-94.pclc0.merkle.local, executor 6): org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 605, in mainprocess()File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 597, in processserializer.dump_stream(out_iter, outfile)File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 223, in dump_streamself.serializer.dump_stream(self._batched(iterator), stream)File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 141, in dump_streamfor obj in iterator:File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 212, in _batchedfor item in iterator:File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 450, in mapperresult = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 90, in <lambda>return lambda *a: f(*a)File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/util.py", line 121, in wrapperreturn f(*args, **kwargs)File "/mnt/var/lib/hadoop/steps/s-1IBFS0SYWA19Z/Mobile_ID_process_center.py", line 102, in get_zip_bFile "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 146, in valueself._value = self.load_from_path(self._path)File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 123, in load_from_pathreturn self.load(f)File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 129, in loadreturn pickle.load(file)
AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks' from '/mnt/miniconda/lib/python3.9/site-packages/pandas/core/internals/blocks.py'>
Some observations and thought process:
1, After doing some search online, the AttributeError in pyspark seems to be caused by mismatched pandas versions between driver and workers?
2, But I ran the same code on two different datasets, one worked without any errors but the other didn't, which seems very strange and undeterministic, and it seems like the errors may not be caused by mismatched pandas versions. Otherwise, neither two datasets would succeed.
3, I then ran the same code on the successful dataset again, but this time with different spark configurations: setting spark.driver.memory from 2048M to 4192m, and it threw AttributeError.
4, In conclusion, I think the AttributeError has something to do with driver. But I can't tell how they are related from the error message, and how to fix it: AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'.