I have a problem with every insert query (little query) which is executed in celery tasks asynchronously. In sync mode when i do insert all done great, but when it executed in apply_async() i get this:
OperationTimedOut('errors=errors=errors={}, last_host=***.***.*.***, last_host=None, last_host=None',)
Traceback:
Traceback (most recent call last):File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_taskR = retval = fun(*args, **kwargs)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__return self.run(*args, **kwargs)File "/var/nfs_www/***/www_v1/app/mods/news_feed/tasks.py", line 26, in send_new_comment_reply_notificationssend_new_comment_reply_notifications_method(comment_id)File "/var/nfs_www/***www_v1/app/mods/news_feed/methods.py", line 83, in send_new_comment_reply_notificationscomment_type='comment_reply'File "/var/nfs_www/***/www_v1/app/mods/news_feed/models/storage.py", line 129, in addCommentsFeed(**kwargs).save()File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/models.py", line 531, in saveconsistency=self.__consistency__).save()File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 907, in saveself._execute(insert)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 786, in _executetmp = execute(q, consistency_level=self._consistency)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/connection.py", line 95, in executeresult = session.execute(query, params)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 1103, in executeresult = future.result(timeout)File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 2475, in resultraise OperationTimedOut(errors=self._errors, last_host=self._current_host)
OperationTimedOut: errors={}, last_host=***.***.*.***
Does anyone have ideas about problem?
I found this When cassandra-driver was executing the query, cassandra-driver returned error OperationTimedOut, but my query is very little and problem only in celery tasks.
UPDATE:
I made a test task and it raises this error too.
@celery.task()
def test_task_with_cassandra():from app import cassandra_sessioncassandra_session.execute('use news_feed')return 'Done'
UPDATE 2: Made this:
@celery.task()
def test_task_with_cassandra():from cqlengine import connectionconnection.setup(app.config['CASSANDRA_SERVERS'], port=app.config['CASSANDRA_PORT'],default_keyspace='test_keyspace')from .models import FeedFeed.objects.count()return 'Done'
Got this:
NoHostAvailable('Unable to connect to any servers', {'***.***.*.***': OperationTimedOut('errors=errors=Timed out creating connection, last_host=None, last_host=None',)})
From shell i can connect to it
UPDATE 3: From deleted thread on github issue (found this in my emails): (this worked for me too) Here's how, in substance, I plug CQLengine to Celery:
from celery import Celery
from celery.signals import worker_process_init, beat_init
from cqlengine import connection
from cqlengine.connection import (cluster as cql_cluster, session as cql_session)def cassandra_init():""" Initialize a clean Cassandra connection. """if cql_cluster is not None:cql_cluster.shutdown()if cql_session is not None:cql_session.shutdown()connection.setup()# Initialize worker context for both standard and periodic tasks.
worker_process_init.connect(cassandra_init)
beat_init.connect(cassandra_init)app = Celery()
This is crude, but works. Should we add this snippet in the FAQ ?