criticism this python code (crawler with threadpool)

2024/10/13 5:19:53

how good this python code ? need criticism) there is a error in this code, some times script do print "ALL WAIT - CAN FINISH!" and freeze (no more actions are happend..) but i can't find reason why this happend?

site crawler with threadpool:

import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import ThreadW_WAIT = 1
W_WORK = 0class Worker(Thread):"""Thread executing tasks from a given tasks queue"""def __init__(self, pool, tasks):Thread.__init__(self)self.tasks = tasksself.daemon = Trueself.start()self.pool = poolself.state = Nonedef is_wait(self):return self.state == W_WAITdef run(self):while True:#if all workers wait - time to exsitprint "CHECK WAIT: !!! ",self.pool.is_all_wait()if self.pool.is_all_wait():print "ALL WAIT - CAN FINISH!"returntry:func, args, kargs = self.tasks.get(timeout=3)except Empty:print "task wait timeout"continueself.state = W_WORKprint "START !!! in thread %s" % str(self)#print argstry: func(*args, **kargs)except Exception, e: print eprint "!!! STOP in thread %s", str(self)self.tasks.task_done()self.state = W_WAIT#threads can fast empty it!#if self.tasks.qsize() == 0:#    print "QUIT!!!!!!"#    breakclass ThreadPool:"""Pool of threads consuming tasks from a queue"""def __init__(self, num_threads):#self.tasks = Queue(num_threads)self.tasks = Queue()self.workers = []for _ in range(num_threads): self.workers.append(Worker(self,self.tasks))def add_task(self, func, *args, **kargs):"""Add a task to the queue"""self.tasks.put((func, args, kargs))def wait_completion(self):"""Wait for completion of all the tasks in the queue"""self.tasks.join()def is_all_wait(self):for w in self.workers:if not w.is_wait():return Falsereturn Truevisited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0def process(pool,host,url):try:content = urlopen(url).read()except UnicodeDecodeError:returnfor link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):try:href = link['href']except KeyError:continueif not href.startswith('http://'):href = 'http://%s%s' % (host, href)if not href.startswith('http://%s%s' % (host, '/')):continueinternal_links_set.add(href)if href not in visited:visited.add(href)pool.add_task(process,pool,host,href)else:passdef start(host,charset):pool = ThreadPool(20)pool.add_task(process,pool,host,'http://%s/' % (host))pool.wait_completion()start('','utf8') 

Thanx for help! i make new implementation: What you can say about this code#2 ? ==================================TRY #2=======================================

    import sysfrom urllib import urlopenfrom BeautifulSoup import BeautifulSoup, SoupStrainerimport refrom Queue import Queue, Emptyfrom threading import ThreadW_STOP = 1class Worker(Thread):"""Thread executing tasks from a given tasks queue"""def __init__(self, pool, tasks):Thread.__init__(self)self.tasks = tasksself.daemon = Trueself.pool = poolself.state = Noneself.start()def stop(self):self.state = W_STOPdef run(self):while True:if self.state == W_STOP:print "\ncalled stop"breaktry:func, args, kargs = self.tasks.get(timeout=3)except Empty:continueprint "\n***START*** %s" % str(self)try: func(*args, **kargs)except Exception, e: print eprint "\n***STOP*** %s", str(self)self.tasks.task_done()class ThreadPool:"""Pool of threads consuming tasks from a queue"""def __init__(self, num_threads):#self.tasks = Queue(num_threads)self.tasks = Queue()self.workers = []for _ in range(num_threads): self.workers.append(Worker(self,self.tasks))def add_task(self, func, *args, **kargs):"""Add a task to the queue"""self.tasks.put((func, args, kargs))def wait_completion(self):"""Wait for completion of all the tasks in the queue"""self.tasks.join()def stop_threads(self):for w in self.workers:w.stop()def wait_stop(self):self.wait_completion()self.stop_threads()visited = set()queue = Queue()external_links_set = set()internal_links_set = set()external_links = 0def process(pool,host,url):try:content = urlopen(url).read()except UnicodeDecodeError:returnfor link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):try:href = link['href']except KeyError:continueif not href.startswith('http://'):href = 'http://%s%s' % (host, href)if not href.startswith('http://%s%s' % (host, '/')):continueinternal_links_set.add(href)if href not in visited:visited.add(href)pool.add_task(process,pool,host,href)else:passdef start(host,charset):pool = ThreadPool(20)pool.add_task(process,pool,host,'http://%s/' % (host))pool.wait_stop()start('','utf8') 

You are sharing state between threads (i.e., in is_all_wait) without synchronization. Plus, the fact that all threads are "waiting" is not a reliable indicator that the queue is empty (for instance, they could all be in the process of getting a task). I suspect that, occasionally, threads are exiting before the queue is truly empty. If this happens often enough, you will be left with tasks in the queue but no threads to run them. So queue.join() will wait forever.

My recomendation is:

  1. Get rid of is_all_wait -- it's not a reliable indicator
  2. Get rid of the task state -- it's not really necessary
  3. Rely on queue.join to let you know when everything is processed

If you need to kill the threads (for example, this is part of a larger, long-running program), then do so after the queue.join().

