Combining semaphore and time limiting in python-trio with asks http request

2024/10/14 13:23:57

I'm trying to use Python in an async manner in order to speed up my requests to a server. The server has a slow response time (often several seconds, but also sometimes faster than a second), but works well in parallel. I have no access to this server and can't change anything about it. So, I have a big list of URLs (in the code below, pages) which I know beforehand, and want to speed up their loading by making NO_TASKS=5 requests at a time. On the other hand, I don't want to overload the server, so I want a minimum pause between every request of 1 second (i. e. a limit of 1 request per second).

So far I have successfully implemented the semaphore part (five requests at a time) using a Trio queue.

import asks
import time
import trioNO_TASKS = 5asks.init('trio')
asks_session = asks.Session()
queue = trio.Queue(NO_TASKS)
next_request_at = 0
results = []pages = ['https://www.yahoo.com/','http://www.cnn.com','http://www.python.org','http://www.jython.org','http://www.pypy.org','http://www.perl.org','http://www.cisco.com','http://www.facebook.com','http://www.twitter.com','http://www.macrumors.com/','http://arstechnica.com/','http://www.reuters.com/','http://abcnews.go.com/','http://www.cnbc.com/',
]async def async_load_page(url):global next_request_atsleep = next_request_atnext_request_at = max(trio.current_time() + 1, next_request_at)await trio.sleep_until(sleep)next_request_at = max(trio.current_time() + 1, next_request_at)print('start loading page {} at {} seconds'.format(url, trio.current_time()))req = await asks_session.get(url)results.append(req.text)async def producer(url):await queue.put(url)  async def consumer():while True:if queue.empty():print('queue empty')returnurl = await queue.get()await async_load_page(url)async def main():async with trio.open_nursery() as nursery:for page in pages:nursery.start_soon(producer, page)await trio.sleep(0.2)for _ in range(NO_TASKS):nursery.start_soon(consumer)start = time.time()
trio.run(main)

However, I'm missing the implementation of the limiting part, i. e. the implementation of max. 1 request per second. You can see above my attempt to do so (first five lines of async_load_page), but as you can see when you execute the code, this is not working:

start loading page http://www.reuters.com/ at 58097.12261669573 seconds
start loading page http://www.python.org at 58098.12367392373 seconds
start loading page http://www.pypy.org at 58098.12380622773 seconds
start loading page http://www.macrumors.com/ at 58098.12389389973 seconds
start loading page http://www.cisco.com at 58098.12397854373 seconds
start loading page http://arstechnica.com/ at 58098.12405119873 seconds
start loading page http://www.facebook.com at 58099.12458010273 seconds
start loading page http://www.twitter.com at 58099.37738939873 seconds
start loading page http://www.perl.org at 58100.37830828273 seconds
start loading page http://www.cnbc.com/ at 58100.91712723473 seconds
start loading page http://abcnews.go.com/ at 58101.91770178373 seconds
start loading page http://www.jython.org at 58102.91875295573 seconds
start loading page https://www.yahoo.com/ at 58103.91993155273 seconds
start loading page http://www.cnn.com at 58104.48031027673 seconds
queue empty
queue empty
queue empty
queue empty
queue empty

I've spent some time searching for answers but couldn't find any.

Answer

One of the ways to achieve your goal would be using a mutex acquired by a worker before sending a request and released in a separate task after some interval:

async def fetch_urls(urls: Iterator, responses, n_workers, throttle):# Using binary `trio.Semaphore` to be able# to release it from a separate task.mutex = trio.Semaphore(1)async def tick():await trio.sleep(throttle)mutex.release()async def worker():for url in urls:await mutex.acquire()nursery.start_soon(tick)response = await asks.get(url)responses.append(response)async with trio.open_nursery() as nursery:for _ in range(n_workers):nursery.start_soon(worker)

If a worker gets response sooner than after throttle seconds, it will block on await mutex.acquire(). Otherwise the mutex will be released by the tick and another worker will be able to acquire it.

This is similar to how leaky bucket algorithm works:

  • Workers waiting for the mutex are like water in a bucket.
  • Each tick is like a bucket leaking at a constant rate.

If you add a bit of logging just before sending a request you should get an output similar to this:

   0.00169 started0.001821 n_workers: 50.001833 throttle: 10.002152 fetching https://httpbin.org/delay/41.012 fetching https://httpbin.org/delay/22.014 fetching https://httpbin.org/delay/23.017 fetching https://httpbin.org/delay/34.02 fetching https://httpbin.org/delay/05.022 fetching https://httpbin.org/delay/26.024 fetching https://httpbin.org/delay/27.026 fetching https://httpbin.org/delay/38.029 fetching https://httpbin.org/delay/09.031 fetching https://httpbin.org/delay/010.61 finished
https://en.xdnf.cn/q/117949.html

Related Q&A

Import of SWIG python module fails with apache

Importing a python mdule throws an exception in django when I run with apache. The same source code works fine with the django development server. I can also import the module from the command line. Th…

Pro-Football-Reference Team Stats XPath

I am using the scrapy shell on this page Pittsburgh Steelers at New England Patriots - September 10th, 2015 to pull individual team stats. For example, I want to pull total yards for the away team (46…

How to delete the last item of a collection in mongodb

I made a program with python and mongodb to do some diaries. Like thisSometimes I want to delete the last sentence, just by typing "delete!" But I dont know how to delete in a samrt way. I do…

Python+kivy+SQLite: How to set label initial value and how to update label text?

everyone,I want to use kivy+Python to display items from a db file. To this purpose I have asked a question before: Python+kivy+SQLite: How to use them together The App in the link contains one screen.…

how to debug ModelMultipleChoiceField [closed]

Closed. This question needs details or clarity. It is not currently accepting answers.Want to improve this question? Add details and clarify the problem by editing this post.Closed 7 years ago.Improve…

Standardization/preprocessing for 4-dimensional array

Id like to standardize my data to zero mean and std = 1. The shape of my data is 28783x4x24x7, and it can thought of as 28783 images with 4 channels and dimensions 24x7. The channels need to be standar…

My Python number guessing game

I have been trying to make a number guessing game for Python and so far it has altogether gone quite well. But what keeps bugging me is that it resets the number on every guess so that it is different,…

Class that takes another class as argument, copies behavior

Id like to create a class in Python that takes a single argument in the constructor, another Python class. The instance of the Copy class should have all the attributes and methods of the original clas…

Simple python script to get a libreoffice base field and play on vlc

Ive banged my head for hours on this one, and I dont understand the LibreOffice macro api well enough to know how to make this work:1) This script works in python:#!/usr/bin/env python3 import subproce…

Print month using the month and day

I need to print month using the month and day. But I cannot seem to move the numbers after 1 to the next line using Python.# This program shows example of "November" as month and "Sunday…