Python cassandra-driver OperationTimeOut on every query in Celery task

2024/11/16 18:01:07

I have a problem with every insert query (little query) which is executed in celery tasks asynchronously. In sync mode when i do insert all done great, but when it executed in apply_async() i get this:

OperationTimedOut('errors=errors=errors={}, last_host=***.***.*.***, last_host=None, last_host=None',)

Traceback:

Traceback (most recent call last):File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_taskR = retval = fun(*args, **kwargs)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__return self.run(*args, **kwargs)File "/var/nfs_www/***/www_v1/app/mods/news_feed/tasks.py", line 26, in send_new_comment_reply_notificationssend_new_comment_reply_notifications_method(comment_id)File "/var/nfs_www/***www_v1/app/mods/news_feed/methods.py", line 83, in send_new_comment_reply_notificationscomment_type='comment_reply'File "/var/nfs_www/***/www_v1/app/mods/news_feed/models/storage.py", line 129, in addCommentsFeed(**kwargs).save()File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/models.py", line 531, in saveconsistency=self.__consistency__).save()File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 907, in saveself._execute(insert)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 786, in _executetmp = execute(q, consistency_level=self._consistency)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/connection.py", line 95, in executeresult = session.execute(query, params)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 1103, in executeresult = future.result(timeout)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 2475, in resultraise OperationTimedOut(errors=self._errors, last_host=self._current_host)
OperationTimedOut: errors={}, last_host=***.***.*.***

Does anyone have ideas about problem?

I found this When cassandra-driver was executing the query, cassandra-driver returned error OperationTimedOut, but my query is very little and problem only in celery tasks.

UPDATE:

I made a test task and it raises this error too.

@celery.task()
def test_task_with_cassandra():from app import cassandra_sessioncassandra_session.execute('use news_feed')return 'Done'

UPDATE 2: Made this:

@celery.task()
def test_task_with_cassandra():from cqlengine import connectionconnection.setup(app.config['CASSANDRA_SERVERS'], port=app.config['CASSANDRA_PORT'],default_keyspace='test_keyspace')from .models import FeedFeed.objects.count()return 'Done'

Got this:

NoHostAvailable('Unable to connect to any servers', {'***.***.*.***': OperationTimedOut('errors=errors=Timed out creating connection, last_host=None, last_host=None',)})

From shell i can connect to it

UPDATE 3: From deleted thread on github issue (found this in my emails): (this worked for me too) Here's how, in substance, I plug CQLengine to Celery:

from celery import Celery
from celery.signals import worker_process_init, beat_init
from cqlengine import connection
from cqlengine.connection import (cluster as cql_cluster, session as cql_session)def cassandra_init():""" Initialize a clean Cassandra connection. """if cql_cluster is not None:cql_cluster.shutdown()if cql_session is not None:cql_session.shutdown()connection.setup()# Initialize worker context for both standard and periodic tasks.
worker_process_init.connect(cassandra_init)
beat_init.connect(cassandra_init)app = Celery()

This is crude, but works. Should we add this snippet in the FAQ ?

Answer

I had a similar issue. It seemed to be related to sharing the Cassandra session between tasks. I solved it by creating a session per thread. Make sure you call get_session() from you tasks and then do this:

thread_local = threading.local()def get_session():if hasattr(thread_local, "cassandra_session"):return thread_local.cassandra_sessioncluster = Cluster(settings.CASSANDRA_HOSTS)session = cluster.connect(settings.CASSANDRA_KEYSPACE)thread_local.cassandra_session = sessionreturn session
https://en.xdnf.cn/q/71313.html

Related Q&A

Function which returns the least-squares solution to a linear matrix equation

I have been trying to rewrite the code from Python to Swift but Im stuck on the function which should return the least-squares solution to a linear matrix equation. Does anyone know a library written i…

Divide .csv file into chunks with Python

I have a large .csv file that is well over 300 gb. I would like to chunk it into smaller files of 100,000,000 rows each (each row has approximately 55-60 bytes).I wrote the following code:import panda…

Why cant I use operator.itemgetter in a multiprocessing.Pool?

The following program:import multiprocessing,operator f = operator.itemgetter(0) # def f(*a): return operator.itemgetter(0)(*a) if __name__ == __main__:multiprocessing.Pool(1).map(f, ["ab"])f…

Writing a compiler for a DSL in python

I am writing a game in python and have decided to create a DSL for the map data files. I know I could write my own parser with regex, but I am wondering if there are existing python tools which can do …

How to setup Celery to talk ssl to Azure Redis Instance

Using the great answer to "How to configure celery-redis in django project on microsoft azure?", I can configure Celery to use Azure Redis Cache using the non-ssl port, 6379, using the follo…

Cant save data from yfinance into a CSV file

I found library that allows me to get data from yahoo finance very efficiently. Its a wonderful library.The problem is, I cant save the data into a csv file.Ive tried converting the data to a Panda Da…

silhouette coefficient in python with sklearn

Im having trouble computing the silhouette coefficient in python with sklearn. Here is my code :from sklearn import datasets from sklearn.metrics import * iris = datasets.load_iris() X = pd.DataFrame(i…

Force dask to_parquet to write single file

When using dask.to_parquet(df, filename) a subfolder filename is created and several files are written to that folder, whereas pandas.to_parquet(df, filename) writes exactly one file. Can I use dasks t…

Unable to get python embedded to work with zipd library

Im trying to embed python, and provide the dll and a zip of the python libraries and not use any installed python. That is, if a user doesnt have python, I want my code to work using the provided dll/…

Convert integer to a random but deterministically repeatable choice

How do I convert an unsigned integer (representing a user ID) to a random looking but actually a deterministically repeatable choice? The choice must be selected with equal probability (irrespective o…