Celery design help: how to prevent concurrently executing tasks

2024/10/6 12:25:36

I'm fairly new to Celery/AMQP and am trying to come up with a task/queue/worker design to meet the following requirements.

I have multiple types of "per-user" tasks: e.g., TaskA, TaskB, TaskC. Each of these "per-user" tasks read/write data for one particular user in the system. So at any given time, I might need to create tasks User1_TaskA, User1_TaskB, User1_TaskC, User2_TaskA, User2_TaskB, etc. I need to ensure that, for each user, no two tasks of any task type execute concurrently. I want a system in which no worker can execute User1_TaskA at the same time as any other worker is executing User1_TaskB or User1_TaskC, but while User1_TaskA is executing, other workers shouldn't be blocked from concurrently executing User2_TaskA, User3_TaskA, etc.

I realize this could be implemented using some sort of external locking mechanism (e.g., in the DB), but I'm hoping there's a more elegant task/queue/worker design that would work.

I suppose one possible solution is to implement queues as user buckets such that, when the workers are launched there's config that specifies how many buckets to create, and each "bucket worker" is bound to exactly one bucket. Then an "intermediate worker" would pull off tasks from the main task queue and assign them into the bucketed queues via, say, a hash/mod scheme. So UserA's tasks would always end up in the same queue, and multiple tasks for UserA would back up behind each other. I don't love this approach, as it would require the number of buckets to be defined ahead of time, and would seem to prevent (easily) adding workers dynamically. Seems to me there's got to be a better way -- suggestions would be greatly appreciated.

Answer

What's so bad in using an external locking mechanism? It's simple, straightforward, and efficient enough. You can find an example of distributed task locking in Celery here. Extend it by creating a lock per user, and you're done!

https://en.xdnf.cn/q/70364.html

Related Q&A

Google App Engine - Using Search API Python with list fields

Im using ndb.Model. The Search API has the following field classes:TextField : plain textHtmlField : HTML formatted textAtomField : a string which is treated as a single tokenNumberField : a numeric v…

Handling PyMySql exceptions - Best Practices

My question regards exception best practices. Ill present my question on a specific case with PyMySQL but it regards errors handling in general. I am using PyMySQL and out of the many possible exceptio…

Beautifulsoup find element by text using `find_all` no matter if there are elements in it

For examplebs = BeautifulSoup("<html><a>sometext</a></html>") print bs.find_all("a",text=re.compile(r"some"))returns [<a>sometext</a>] …

Python: how to get values from a dictionary from pandas series

I am very new to python and trying to get value from dictionary where keys are defined in a dataframe column (pandas). I searched quite a bit and the closest thing is a question in the link below, but…

Django No Module Named URLs error

There are many similar questions posted already, but Ive already tried those solutions to no avail. Im working through a basic Django tutorial, and here is my code:urls.pyfrom django.conf.urls import …

Matplotlib artist to stay same size when zoomed in but ALSO move with panning?

This is a very direct follow-up on this question.Using matplotlib, Id like to be able to place a sort of "highlighting bar" over a range of data markers that I know will all be in a straight …

How to invoke Lambda function with Event Invocation Type via API Gateway?

Docs says:By default, the Invoke API assumes RequestResponse invocation type. You can optionally request asynchronous execution by specifying Event as the InvocationType. So all I can send to my functi…

How do I unlock the app engine database when localhost runs?

Right now I get a blank page when localhost runs, but the deployed app is fine. The logs show the "database is locked". How do I "unlock" the database for localhost?

PyCrypto: Generate RSA key protected with DES3 password

I have been able to create a RSA key protected by password with DES3 (well... I think because Im very new to this encryption world) by using the command:openssl genrsa -out "/tmp/myKey.pem" -…

Normalize/Standardize a numpy recarray

I wonder what the best way of normalizing/standardizing a numpy recarray is. To make it clear, Im not talking about a mathematical matrix, but a record array that also has e.g. textual columns (such as…