Connection is closed when a SQLAlchemy event triggers a Celery task

2024/10/2 19:47:10

When one of my unit tests deletes a SQLAlchemy object, the object triggers an after_delete event which triggers a Celery task to delete a file from the drive.

The task is CELERY_ALWAYS_EAGER = True when testing.

gist to reproduce the issue easily

The example has two tests. One triggers the task in the event, the other outside the event. Only the one in the event closes the connection.

To quickly reproduce the error you can run:

git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py

The stack:

$     python test.py
E
======================================================================
ERROR: test_delete_task (__main__.CeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):File "test.py", line 73, in test_delete_taskdb.session.commit()File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in doreturn getattr(self.registry(), name)(*args, **kwargs)File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commitself.transaction.commit()File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commitself._prepare_impl()File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_implself.session.flush()File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flushself._flush(objects)File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flushtransaction.rollback(_capture_exception=True)File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__compat.reraise(type_, value, traceback)File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flushtransaction.rollback(_capture_exception=True)File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollbackself._assert_active(prepared_ok=True, rollback_ok=True)File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_activeraise sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError: This transaction is closed----------------------------------------------------------------------
Ran 1 test in 0.014sFAILED (errors=1)
Answer

I think I found the problem - it's in how you set up your Celery task. If you remove the app context call from your celery setup, everything runs fine:

class ContextTask(TaskBase):abstract = Truedef __call__(self, *args, **kwargs):# deleted --> with app.app_context():return TaskBase.__call__(self, *args, **kwargs)

There's a big warning in the SQLAlchemy docs about never modifying the session during after_delete events: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

So I suspect the with app.app_context(): is being called during the delete, trying to attach to and/or modify the session that Flask-SQLAlchemy stores in the app object, and therefore the whole thing is bombing.

Flask-SQlAlchemy does a lot of magic behind the scenes for you, but you can bypass this and use SQLAlchemy directly. If you need to talk to the database during the delete event, you can create a new session to the db:

@celery.task()
def my_task():# obviously here I create a new objectsession = db.create_scoped_session()session.add(User(id=13, value="random string"))session.commit()return

But it sounds like you don't need this, you're just trying to delete an image path. In that case, I would just change your task so it takes a path:

# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):my_task.delay(target.value)@celery.task()
def my_task(image_path):os.remove(image_path) 

Hopefully that's helpful - let me know if any of that doesn't work for you. Thanks for the very detailed setup, it really helped in debugging.

https://en.xdnf.cn/q/70694.html

Related Q&A

Python escape sequence \N{name} not working as per definition

I am trying to print unicode characters given their name as follows:# -*- coding: utf-8 -*- print "\N{SOLIDUS}" print "\N{BLACK SPADE SUIT}"However the output I get is not very enco…

Binary integer programming with PULP using vector syntax for variables?

New to the python library PULP and Im finding the documentation somewhat unhelpful, as it does not include examples using lists of variables. Ive tried to create an absolutely minimalist example below …

Nonblocking Scrapy pipeline to database

I have a web scraper in Scrapy that gets data items. I want to asynchronously insert them into a database as well. For example, I have a transaction that inserts some items into my db using SQLAlchemy …

python function to return javascript date.getTime()

Im attempting to create a simple python function which will return the same value as javascript new Date().getTime() method. As written here, javascript getTime() method returns number of milliseconds …

Pulling MS access tables and putting them in data frames in python

I have tried many different things to pull the data from Access and put it into a neat data frame. right now my code looks like this.from pandas import DataFrame import numpy as npimport pyodbc from sq…

Infinite loop while adding two integers using bitwise operations?

I am trying to solve a problem, using python code, which requires me to add two integers without the use of + or - operators. I have the following code which works perfectly for two positive numbers: d…

When is pygame.init() needed?

I am studying pygame and in the vast majority of tutorials it is said that one should run pygame.init() before doing anything. I was doing one particular tutorial and typing out the code as one does an…

mypy overrides in toml are ignored?

The following is a simplified version of the toml file example from the mypy documentation: [tool.mypy] python_version = "3.7" warn_return_any = true warn_unused_configs = true[[tool.mypy.ove…

/usr/bin/env: python2.6: No such file or directory error

I have python2.6, python2.7 and python3 in my /usr/lib/I am trying to run a file which has line given below in it as its first line#!/usr/bin/env python2.6after trying to run it it gives me following e…

pandas, dataframe, groupby, std

New to pandas here. A (trivial) problem: hosts, operations, execution times. I want to group by host, then by host+operation, calculate std deviation for execution time per host, then by host+operation…