Nonblocking Scrapy pipeline to database

2024/10/3 19:15:48

I have a web scraper in Scrapy that gets data items. I want to asynchronously insert them into a database as well.

For example, I have a transaction that inserts some items into my db using SQLAlchemy Core:

def process_item(self, item, spider):with self.connection.begin() as conn:conn.execute(insert(table1).values(item['part1'])conn.execute(insert(table2).values(item['part2'])

I understand that it's possible to use SQLAlchemy Core asynchronously with Twisted with alchimia. The documentation code example for alchimia is below.

What I don't understand is how can I use my above code in the alchimia framework. How can I set up process_item to use a reactor?

Can I do something like this?

@inlineCallbacks
def process_item(self, item, spider):with self.connection.begin() as conn:yield conn.execute(insert(table1).values(item['part1'])yield conn.execute(insert(table2).values(item['part2'])

How do I write the reactor part?

Or is there an easier way to do nonblocking database insertions in a Scrapy pipeline?


For reference, here is the code example from alchimia's documentation:

from alchimia import TWISTED_STRATEGYfrom sqlalchemy import (create_engine, MetaData, Table, Column, Integer, String
)
from sqlalchemy.schema import CreateTablefrom twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react@inlineCallbacks
def main(reactor):engine = create_engine("sqlite://", reactor=reactor, strategy=TWISTED_STRATEGY)metadata = MetaData()users = Table("users", metadata,Column("id", Integer(), primary_key=True),Column("name", String()),)# Create the tableyield engine.execute(CreateTable(users))# Insert some usersyield engine.execute(users.insert().values(name="Jeremy Goodwin"))yield engine.execute(users.insert().values(name="Natalie Hurley"))yield engine.execute(users.insert().values(name="Dan Rydell"))yield engine.execute(users.insert().values(name="Casey McCall"))yield engine.execute(users.insert().values(name="Dana Whitaker"))result = yield engine.execute(users.select(users.c.name.startswith("D")))d_users = yield result.fetchall()# Print out the usersfor user in d_users:print "Username: %s" % user[users.c.name]if __name__ == "__main__":react(main, [])
Answer

How can I set up process_item to use a reactor?

You don't need to manage another reactor in your pipeline.
Instead, you could do asynchronous database interactions within an item pipeline by returning a deferred from the pipeline.

See also Scrapy's doc and sample code doing asynchronous operations within an item pipeline by returning a deferred.

https://en.xdnf.cn/q/70691.html

Related Q&A

python function to return javascript date.getTime()

Im attempting to create a simple python function which will return the same value as javascript new Date().getTime() method. As written here, javascript getTime() method returns number of milliseconds …

Pulling MS access tables and putting them in data frames in python

I have tried many different things to pull the data from Access and put it into a neat data frame. right now my code looks like this.from pandas import DataFrame import numpy as npimport pyodbc from sq…

Infinite loop while adding two integers using bitwise operations?

I am trying to solve a problem, using python code, which requires me to add two integers without the use of + or - operators. I have the following code which works perfectly for two positive numbers: d…

When is pygame.init() needed?

I am studying pygame and in the vast majority of tutorials it is said that one should run pygame.init() before doing anything. I was doing one particular tutorial and typing out the code as one does an…

mypy overrides in toml are ignored?

The following is a simplified version of the toml file example from the mypy documentation: [tool.mypy] python_version = "3.7" warn_return_any = true warn_unused_configs = true[[tool.mypy.ove…

/usr/bin/env: python2.6: No such file or directory error

I have python2.6, python2.7 and python3 in my /usr/lib/I am trying to run a file which has line given below in it as its first line#!/usr/bin/env python2.6after trying to run it it gives me following e…

pandas, dataframe, groupby, std

New to pandas here. A (trivial) problem: hosts, operations, execution times. I want to group by host, then by host+operation, calculate std deviation for execution time per host, then by host+operation…

Count occurrences of a couple of specific words

I have a list of words, lets say: ["foo", "bar", "baz"] and a large string in which these words may occur. I now use for every word in the list the "string".coun…

numpy: how to fill multiple fields in a structured array at once

Very simple question: I have a structured array with multiple columns and Id like to fill only some of them (but more than one) with another preexisting array.This is what Im trying:strc = np.zeros(4, …

Combine date column and time column into datetime column

I have a Pandas dataframe like this; (obtained by parsing an excel file)| | COMPANY NAME | MEETING DATE | MEETING TIME| --------------------------------------------------------…