python send csv data to spark streaming

2024/10/4 23:25:46

I would like to try and load a csv data in python and stream each row spark via SPark Streaming.

Im pretty new to network stuff. Im not exactly if Im supposed to create a server python script that once it establishes a connection(with spark streaming) it will start sending each row. In the Spark Streaming Documentation they do a nc -l 9999 which is a netcat server listening on port 9999 if im correct. So I tried creating a python script similar that parses a csv and sends on port 60000

import socket                   # Import socket module
import csvport = 60000                    # Reserve a port for your service.s = socket.socket()             # Create a socket objecthost = socket.gethostname()     # Get local machine names.bind((host, port))            # Bind to the ports.listen(5)                     # Now wait for client connection.print('Server listening....')while True:conn, addr = s.accept()     # Establish connection with client.print('Got connection from', addr)csvfile = open('Titantic.csv', 'rb')reader = csv.reader(csvfile, delimiter = ',')for row in reader:line = ','.join(row)conn.send(line)print(line)csvfile.close()print('Done sending')conn.send('Thank you for connecting')conn.close()

SPark Streaming Script -

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)# Create a DStream that will connect to hostname:port, like localhost:9999
lines_RDD = ssc.socketTextStream("localhost", 60000)# Split each line into words
data_RDD = lines_RDD.flatMap(lambda line: line.split(","))data_RDD.pprint()ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

When run the spark script(This is in Jupyter Notebooks btw) I get this error - IllegalArgumentException: 'requirement failed: No output operations registered, so nothing to execute'

I' dont think I am doing my socket script properly but im not really sure what to do Im basically trying to replicate what nc -lk 9999 does so I can send text data over the port and then spark streaming is listening to it and receives the data and processes it.

Any help would be greatly appreciated

Answer

I'm trying to do something similar, but I want to stream a row every 10 seconds. I solved with this script:

import socket
from time import sleephost = 'localhost'
port = 12345s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:print('\nListening for a client at',host , port)conn, addr = s.accept()print('\nConnected by', addr)try:print('\nReading file...\n')with open('iris_test.csv') as f:for line in f:out = line.encode('utf-8')print('Sending line',line)conn.send(out)sleep(10)print('End Of Stream.')except socket.error:print ('Error Occured.\n\nClient disconnected.\n')
conn.close()

Hope this helps.

https://en.xdnf.cn/q/70549.html

Related Q&A

Python string representation of binary data

Im trying to understand the way Python displays strings representing binary data.Heres an example using os.urandomIn [1]: random_bytes = os.urandom(4)In [2]: random_bytes Out[2]: \xfd\xa9\xbe\x87In [3]…

Combining Spark Streaming + MLlib

Ive tried to use a Random Forest model in order to predict a stream of examples, but it appears that I cannot use that model to classify the examples. Here is the code used in pyspark:sc = SparkContext…

How to select dataframe rows according to multi-(other column)-condition on columnar groups?

Copy the following dataframe to your clipboard:textId score textInfo 0 name1 1.0 text_stuff 1 name1 2.0 different_text_stuff 2 name1 2.0 text_stuff …

Python Recursive Search of Dict with Nested Keys

I recently had to solve a problem in a real data system with a nested dict/list combination. I worked on this for quite a while and came up with a solution, but I am very unsatisfied. I had to resort t…

Scrapy: how to catch download error and try download it again

During my crawling, some pages failed due to unexpected redirection and no response returned. How can I catch this kind of error and re-schedule a request with original url, not with the redirected url…

Cryptacular is broken

this weekend our docker image broke because it cannot be build anymore. While looking into the stats, I saw this line:crypt_blowfish-1.2/crypt.h:17:23: fatal error: gnu-crypt.h: No such file or directo…

how to run test against the built image before pushing to containers registry?

From the gitlab documentation this is how to create a docker image using kaniko: build:stage: buildimage:name: gcr.io/kaniko-project/executor:debugentrypoint: [""]script:- mkdir -p /kaniko/.d…

Adding a colorbar to a pcolormesh with polar projection

I am trying to add a colorbar to a pcolormesh plot with polar projection. The code works fine if I dont specify a polar projection. With polar projection specified, a tiny plot results, and the colorba…

GridSearch for Multi-label classification in Scikit-learn

I am trying to do GridSearch for best hyper-parameters in every individual one of ten folds cross validation, it worked fine with my previous multi-class classification work, but not the case this time…

Visualize tree in bash, like the output of unix tree

Given input:apple: banana eggplant banana: cantaloupe durian eggplant: fig:I would like to concatenate it into the format:├─ apple │ ├─ banana │ │ ├─ cantaloupe │ │ └─ durian │ └…