How to emulate multiprocessing.Pool.map() in AWS Lambda?

2024/10/13 10:29:19

Python on AWS Lambda does not support multiprocessing.Pool.map(), as documented in this other question. Please note that the other question was asking why it doesn't work. This question is different, I'm asking how to emulate the functionality given the lack of underlying support.

One of the answers to that other question gave us this code:

# Python 3.6
from multiprocessing import Pipe, Processdef myWorkFunc(data, connection):result = None# Do some work and store it in resultif result:connection.send([result])else:connection.send([None])def myPipedMultiProcessFunc():# Get number of available logical coresplimit = multiprocessing.cpu_count()# Setup management variablesresults = []parent_conns = []processes = []pcount = 0pactive = []i = 0for data in iterable:# Create the pipe for parent-child process communicationparent_conn, child_conn = Pipe()# create the process, pass data to be operated on and connectionprocess = Process(target=myWorkFunc, args=(data, child_conn,))parent_conns.append(parent_conn)process.start()pcount += 1if pcount == plimit: # There is not currently room for another process# Wait until there are results in the PipesfinishedConns = multiprocessing.connection.wait(parent_conns)# Collect the results and remove the connection as processing# the connection again will lead to errorsfor conn in finishedConns:results.append(conn.recv()[0])parent_conns.remove(conn)# Decrement pcount so we can add a new processpcount -= 1# Ensure all remaining active processes have their results collectedfor conn in parent_conns:results.append(conn.recv()[0])conn.close()# Process results as needed

Can this sample code be modified to support multiprocessing.Pool.map()?

What have I tried so far

I analysed the above code and I do not see a parameter for the function to be executed or the data, so I'm inferring that it does not perform the same function as multiprocessing.Pool.map(). It is not clear what the code does, other than demonstrating the building blocks that could be assembled into a solution.

Is this a "write my code for me" question?

Yes to some extent, it is. This issue impacts thousands of Python developers, and it would be far more efficient for the world economy, less green-house gas emissions, etc if all of us share the same code, instead of forcing every SO user who encounters this to go and develop their own workaround. I hope I've done my part by distilling this into a clear question with the presumed building blocks ready to go.

Answer

I was able to get this working for my own tests. I've based my code on this link : https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

NB1: you MUST increase memory allocation to the lambda function. with the default minimal amount, there's no increase in performance with multiprocessing. With the maximum my account can allocate (3008MB) the figures below were attained.

NB2: I'm completely ignoring max processes in parallel here. My usage doesn't have a whole lot of elements to work on.

with the code below, usage is:

work = funcmap(yourfunction,listofstufftoworkon)
yourresults = work.run()

running from my laptop:

jumper@jumperdebian[3333] ~/scripts/tmp  2019-09-04 11:52:30
└─ $ ∙ python3 -c "import tst; tst.lambda_handler(None,None)"
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 9.574460506439209
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 6.422513484954834

running from aws:

Function Logs:
START RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97 Version: $LATEST
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 12.135798215866089
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 7.293526887893677
END RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97

Here's the test code:

import time
from multiprocessing import Process, Pipe
import boto3class funcmap(object):fmfunction=Nonefmlist=Nonedef __init__(self,pfunction,plist):self.fmfunction=pfunctionself.fmlist=plistdef calculation(self, pfunction, pload, conn):panswer=pfunction(pload)conn.send([pload,panswer])conn.close()def run(self):datalist = self.fmlistprocesses = []parent_connections = []for datum in datalist:parent_conn, child_conn = Pipe()parent_connections.append(parent_conn)process = Process(target=self.calculation, args=(self.fmfunction, datum, child_conn,))processes.append(process)pstart=time.time()for process in processes:process.start()#print("starting at t+ {} s".format(time.time()-pstart))for process in processes:process.join()#print("joining at t+ {} s".format(time.time()-pstart))results = []for parent_connection in parent_connections:resp=parent_connection.recv()results.append((resp[0],resp[1]))return resultsdef fibo(n):if n <= 2 : return 1return fibo(n-1)+fibo(n-2)def lambda_handler(event, context):#worklist=[22,23,24,25,26,27,28,29,30,31,32,31,30,29,28,27,26,27,28,29]#worklist=[22,23,24,25,26,27,28,29,30]worklist=[30,30,30,30]#worklist=[30]_start = time.time()results=[]for a in worklist:results.append((a,fibo(a)))print("results : {}".format(results))_end = time.time()print("SP runtime : {}".format(_end-_start))_mstart = time.time()work = funcmap(fibo,worklist)results = work.run()print("results : {}".format(results))_mend = time.time()print("MP runtime : {}".format(_mend-_mstart))

hope it helps.

https://en.xdnf.cn/q/69545.html

Related Q&A

Tkinter overrideredirect no longer receiving event bindings

I have a tinter Toplevel window that I want to come up without a frame or a titlebar and slightly transparent, and then solid when the mouse moves over the window. To do this I am using both Toplevel.…

Reusing Tensorflow session in multiple threads causes crash

Background: I have some complex reinforcement learning algorithm that I want to run in multiple threads. ProblemWhen trying to call sess.run in a thread I get the following error message:RuntimeError: …

Conditional column arithmetic in pandas dataframe

I have a pandas dataframe with the following structure:import numpy as np import pandas as pd myData = pd.DataFrame({x: [1.2,2.4,5.3,2.3,4.1], y: [6.7,7.5,8.1,5.3,8.3], condition:[1,1,np.nan,np.nan,1],…

Need some assistance with Python threading/queue

import threading import Queue import urllib2 import timeclass ThreadURL(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue = queuedef run(self):while True:host = self…

Python redirect (with delay)

So I have this python page running on flask. It works fine until I want to have a redirect. @app.route("/last_visit") def check_last_watered():templateData = template(text = water.get_last_wa…

Python Selenium. How to use driver.set_page_load_timeout() properly?

from selenium import webdriverdriver = webdriver.Chrome() driver.set_page_load_timeout(7)def urlOpen(url):try:driver.get(url)print driver.current_urlexcept:returnThen I have URL lists and call above me…

Editing both sides of M2M in Admin Page

First Ill lay out what Im trying to achieve in case theres a different way to go about it!I want to be able to edit both sides of an M2M relationship (preferably on the admin page although if needs be …

unstacking shift data (start and end time) into hourly data

I have a df as follows which shows when a person started a shift, ended a shift, the amount of hours and the date worked. Business_Date Number PayTimeStart PayTimeEnd Hours 0 2019-05-24 1…

Tensorflow model prediction is slow

I have a TensorFlow model with a single Dense layer: model = tf.keras.Sequential([tf.keras.layers.Dense(2)]) model.build(input_shape=(None, None, 25))I construct a single input vector in float32: np_ve…

Pandas Sqlite query using variable

With sqlite3 in Python if I want to make a db query using a variable instead of a fixed command I can do something like this :name = MSFTc.execute(INSERT INTO Symbol VALUES (?) , (name,))And when I tr…