How can I set up Celery to call a custom worker initialization?

2024/10/12 9:28:53

I am quite new to Celery and I have been trying to setup a project with 2 separate queues (one to calculate and the other to execute). So far, so good.

My problem is that the workers in the execute queue need to instantiate a class with a unique object_id (one id per worker). I was wondering if I could write a custom worker initialization to initialize the object at start and keep it in memory until the worker is killed.

I found a similar question on custom_task but the proposed solution does not work in my case.

Considering the following toy example:

celery.py

from celery import Celeryapp = Celery('proj',broker='amqp://guest@localhost//',backend='amqp://',include=['proj.tasks'])app.conf.update(CELERY_TASK_RESULT_EXPIRES=60,CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)if __name__ == '__main__':app.start()

tasks.py

from proj.celery import app
from celery.signals import worker_init@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):#SETUP id=1 for add1 here???@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):#SETUP id=2 for add1 here???@app.task
def add1(y):return id + y@app.task
def add(x, y):return x + y

initializing:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

Is this the right approach? If so, what should I write in the configure_worker1 function in tasks.py to setup id at the worker initialization?

Thanks

Answer

I found out the answer by following this http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation

The tasks.py looks like this:

from proj.celery import app
from celery import Taskclass Task1(Task):def __init__(self):self._x = 1.0class Task2(Task):def __init__(self):self._x = 2.0@app.task(base=Task1)
def add1(y):return add1._x + y@app.task(base=Task2)
def add2(y):return add2._x + y

initializing as before:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info
https://en.xdnf.cn/q/69671.html

Related Q&A

Why does print(__name__) give builtins?

Im using pycharm.2017.1.2. I installed anaconda2 with py3 environment. in Pycharm, Im using Python3 interpreter, and the code is simply:print(__name__)In Python console in Pycharm, it prints builtins.I…

List Comprehensions and Conditions?

I am trying to see if I can make this code better using list comprehensions. Lets say that I have the following lists:a_list = [HELLO,FOO,FO1BAR,ROOBAR,SHOEBAR]regex_list = [lambda x: re.search(rFOO,…

Python in operator time complexity on range()

I have the following function:def foo(length, num):return num in range(length)Whats the time complexity of this function? Noting that range() creates a Range object on Python 3, will the time complexi…

Pandas read data from a secure FTP server in Python 3

I am looking for a neat solution to read data (using either read_csv or read_sas) to a Pandas Dataframe from a secure FTP server in Python 3. All the examples I can find are many lines and some for Pyt…

How to read XML header in Python

How can I read the header of an XML document in Python 3?Ideally, I would use the defusedxml module as the documentation states that its safer, but at this point (after hours of trying to figure this …

Shift interpolation does not give expected behaviour

When using scipy.ndimage.interpolation.shift to shift a numpy data array along one axis with periodic boundary treatment (mode = wrap), I get an unexpected behavior. The routine tries to force the firs…

HEX decoding in Python 3.2

In Python 2.x Im able to do this:>>> 4f6c6567.decode(hex_codec) OlegBut in Python 3.2 I encounter this error:>>> b4f6c6567.decode(hex_codec) Traceback (most recent call last):File &qu…

How do I access session data in Jinja2 templates (Bottle framework on app engine)?

Im running the micro framework Bottle on Google App Engine. Im using Jinja2 for my templates. And Im using Beaker to handle the sessions. Im still a pretty big Python newbie and am pretty stoked I g…

What is a dimensional range of [-1,0] in Pytorch?

So Im struggling to understand some terminology about collections in Pytorch. I keep running into the same kinds of errors about the range of my tensors being incorrect, and when I try to Google for a …

Pyinstaller executable keeps opening

Background I am working on face recognition following this link and I would like to build a standalone application using Python. My main.py script looks like the following. # main.py# Import packages a…