Naive install of PySpark to also support S3 access

2024/10/4 15:32:25

I would like to read Parquet data stored on S3 from PySpark.

I've downloaded spark from here:

http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz

And installed it to Python naively

cd python
python setup.py install

This seems to function fine and I can import pyspark, make a SparkContext, etc.. However when I go to read some publicly accessible parquet data I get the following:

import pyspark
sc = pyspark.SparkContext('local[4]')
sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3://bucket-name/mydata.parquet')

And I receive the following exception

Py4JJavaError: An error occurred while calling o55.parquet.
: java.io.IOException: No FileSystem for scheme: s3at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:372)at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)at scala.collection.immutable.List.foreach(List.scala:381)at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)at scala.collection.immutable.List.flatMap(List.scala:344)at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:745)

This error pops up a bit from google searches. So far none of the solutions provided have been helpful.

I'm on Linux (Ubuntu 16.04) on a personal computer without much else installed (everything is pretty stock).

Update

I downgraded to http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.4.tgz to have AWS included by default.

Now unfortunately my AWS credentials aren't being picked up. I've tried a few things:

  1. Including them as SparkConf parameters

    conf = (pyspark.SparkConf().set('fs.s3.awsAccessKeyId', ...').set('fs.s3.awsSecretAccessKey', '...'))
    sc = pyspark.SparkContext('local[4]', conf=conf)
    
  2. Including them in my local .aws/credentials file
  3. Including them in the URL (doesn't work because my access key has a forward slash)

Unfortunately in all cases I receive a traceback like the following

IllegalArgumentException: 'AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).'
Answer

Using the Hadoop-2.4 build of the pre-built spark 2.X binary (which I believe ships with s3 functionality) you can programmatically configure spark to pull s3 data in the following manner:

import pyspark
conf = pyspark.SparkConf()sc = pyspark.SparkContext('local[4]', conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "")sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3n://bucket-name/mydata.parquet')

A critical thing to note is the prefix s3n in both the URI for the bucket and the configuration name

https://en.xdnf.cn/q/70598.html

Related Q&A

Multiprocessing Pool hangs if child process killed

I launched a pool of worker processes and submitted a bunch of tasks. The system ran low on memory and the oomkiller killed one of the worker processes. The parent process just hung there waiting for t…

What does sys.maxunicode mean?

CPython stores unicode strings as either utf-16 or utf-32 internally depending on compile options. In utf-16 builds of Python string slicing, iteration, and len seem to work on code units, not code po…

How to detect dialogs close event?

Hi everyone.I am making a GUI application using python3.4, PyQt5 in windows 7. Application is very sample. User clicks a main windows button, information dialog pops up. And when a user clicks informat…

How to Make a Portable Jupyter Slideshow

How do I make a Jupyter slide show portable? I can serve the slideshow locally, but I cant send that to anyone and have it work with all the images, slide animation functionality, etc. I am using jupy…

How to animate a bar char being updated in Python

I want to create an animated, stacked bar chart.There is a great tutorial, which shows how to animate line graphs.However, for animating bar charts, the BarContainer object, does not have any attribute…

Add text to end of line without loading file

I need to store information into a very big file, in form of many dictionaries. Thats not so important, is just to say that I tried to first get all the data into these dictionaries and I run out of me…

How does one use `dis.dis` to analyze performance?

Im trying to use pythons dis library to experiment with & understand performance. Below is an experiment i tried, with the results.import disdef myfunc1(dictionary):t = tuple(dictionary.items())ret…

How do I require HTTPS for this Django view?

(r^login/?$,django.contrib.auth.views.login,{template_name:login.html, authentication_form:CustomAuthenticationForm}),How do I add HTTPS required to this? I usually have a decorator for it..But in th…

How many times a number appears in a numpy array

I need to find a way to count how many times each number from 0 to 9 appears in a random matrix created using np.random.randint()import numpy as np p = int(input("Length of matrix: ")) m = np…

python: How to remove values from 2 lists based on whats in 1 list

I have 2 lists of numbers, one called xVar and the other called yVar. I will use these 2 elements to plot X and Y values on a graph. They both have the same number of elements. Normally, I would jus…