Python on AWS Lambda does not support multiprocessing.Pool.map()
, as documented in this other question. Please note that the other question was asking why it doesn't work. This question is different, I'm asking how to emulate the functionality given the lack of underlying support.
One of the answers to that other question gave us this code:
# Python 3.6
from multiprocessing import Pipe, Processdef myWorkFunc(data, connection):result = None# Do some work and store it in resultif result:connection.send([result])else:connection.send([None])def myPipedMultiProcessFunc():# Get number of available logical coresplimit = multiprocessing.cpu_count()# Setup management variablesresults = []parent_conns = []processes = []pcount = 0pactive = []i = 0for data in iterable:# Create the pipe for parent-child process communicationparent_conn, child_conn = Pipe()# create the process, pass data to be operated on and connectionprocess = Process(target=myWorkFunc, args=(data, child_conn,))parent_conns.append(parent_conn)process.start()pcount += 1if pcount == plimit: # There is not currently room for another process# Wait until there are results in the PipesfinishedConns = multiprocessing.connection.wait(parent_conns)# Collect the results and remove the connection as processing# the connection again will lead to errorsfor conn in finishedConns:results.append(conn.recv()[0])parent_conns.remove(conn)# Decrement pcount so we can add a new processpcount -= 1# Ensure all remaining active processes have their results collectedfor conn in parent_conns:results.append(conn.recv()[0])conn.close()# Process results as needed
Can this sample code be modified to support multiprocessing.Pool.map()
?
What have I tried so far
I analysed the above code and I do not see a parameter for the function to be executed or the data, so I'm inferring that it does not perform the same function as multiprocessing.Pool.map()
. It is not clear what the code does, other than demonstrating the building blocks that could be assembled into a solution.
Is this a "write my code for me" question?
Yes to some extent, it is. This issue impacts thousands of Python developers, and it would be far more efficient for the world economy, less green-house gas emissions, etc if all of us share the same code, instead of forcing every SO user who encounters this to go and develop their own workaround. I hope I've done my part by distilling this into a clear question with the presumed building blocks ready to go.
I was able to get this working for my own tests.
I've based my code on this link : https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/
NB1: you MUST increase memory allocation to the lambda function. with the default minimal amount, there's no increase in performance with multiprocessing. With the maximum my account can allocate (3008MB) the figures below were attained.
NB2: I'm completely ignoring max processes in parallel here. My usage doesn't have a whole lot of elements to work on.
with the code below, usage is:
work = funcmap(yourfunction,listofstufftoworkon)
yourresults = work.run()
running from my laptop:
jumper@jumperdebian[3333] ~/scripts/tmp 2019-09-04 11:52:30
└─ $ ∙ python3 -c "import tst; tst.lambda_handler(None,None)"
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 9.574460506439209
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 6.422513484954834
running from aws:
Function Logs:
START RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97 Version: $LATEST
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 12.135798215866089
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 7.293526887893677
END RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97
Here's the test code:
import time
from multiprocessing import Process, Pipe
import boto3class funcmap(object):fmfunction=Nonefmlist=Nonedef __init__(self,pfunction,plist):self.fmfunction=pfunctionself.fmlist=plistdef calculation(self, pfunction, pload, conn):panswer=pfunction(pload)conn.send([pload,panswer])conn.close()def run(self):datalist = self.fmlistprocesses = []parent_connections = []for datum in datalist:parent_conn, child_conn = Pipe()parent_connections.append(parent_conn)process = Process(target=self.calculation, args=(self.fmfunction, datum, child_conn,))processes.append(process)pstart=time.time()for process in processes:process.start()#print("starting at t+ {} s".format(time.time()-pstart))for process in processes:process.join()#print("joining at t+ {} s".format(time.time()-pstart))results = []for parent_connection in parent_connections:resp=parent_connection.recv()results.append((resp[0],resp[1]))return resultsdef fibo(n):if n <= 2 : return 1return fibo(n-1)+fibo(n-2)def lambda_handler(event, context):#worklist=[22,23,24,25,26,27,28,29,30,31,32,31,30,29,28,27,26,27,28,29]#worklist=[22,23,24,25,26,27,28,29,30]worklist=[30,30,30,30]#worklist=[30]_start = time.time()results=[]for a in worklist:results.append((a,fibo(a)))print("results : {}".format(results))_end = time.time()print("SP runtime : {}".format(_end-_start))_mstart = time.time()work = funcmap(fibo,worklist)results = work.run()print("results : {}".format(results))_mend = time.time()print("MP runtime : {}".format(_mend-_mstart))
hope it helps.