how good this python code ? need criticism) there is a error in this code, some times script do print "ALL WAIT - CAN FINISH!" and freeze (no more actions are happend..) but i can't find reason why this happend?
site crawler with threadpool:
import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import ThreadW_WAIT = 1
W_WORK = 0class Worker(Thread):"""Thread executing tasks from a given tasks queue"""def __init__(self, pool, tasks):Thread.__init__(self)self.tasks = tasksself.daemon = Trueself.start()self.pool = poolself.state = Nonedef is_wait(self):return self.state == W_WAITdef run(self):while True:#if all workers wait - time to exsitprint "CHECK WAIT: !!! ",self.pool.is_all_wait()if self.pool.is_all_wait():print "ALL WAIT - CAN FINISH!"returntry:func, args, kargs = self.tasks.get(timeout=3)except Empty:print "task wait timeout"continueself.state = W_WORKprint "START !!! in thread %s" % str(self)#print argstry: func(*args, **kargs)except Exception, e: print eprint "!!! STOP in thread %s", str(self)self.tasks.task_done()self.state = W_WAIT#threads can fast empty it!#if self.tasks.qsize() == 0:# print "QUIT!!!!!!"# breakclass ThreadPool:"""Pool of threads consuming tasks from a queue"""def __init__(self, num_threads):#self.tasks = Queue(num_threads)self.tasks = Queue()self.workers = []for _ in range(num_threads): self.workers.append(Worker(self,self.tasks))def add_task(self, func, *args, **kargs):"""Add a task to the queue"""self.tasks.put((func, args, kargs))def wait_completion(self):"""Wait for completion of all the tasks in the queue"""self.tasks.join()def is_all_wait(self):for w in self.workers:if not w.is_wait():return Falsereturn Truevisited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0def process(pool,host,url):try:content = urlopen(url).read()except UnicodeDecodeError:returnfor link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):try:href = link['href']except KeyError:continueif not href.startswith('http://'):href = 'http://%s%s' % (host, href)if not href.startswith('http://%s%s' % (host, '/')):continueinternal_links_set.add(href)if href not in visited:visited.add(href)pool.add_task(process,pool,host,href)else:passdef start(host,charset):pool = ThreadPool(20)pool.add_task(process,pool,host,'http://%s/' % (host))pool.wait_completion()start('evgenm.com','utf8')
Thanx for help! i make new implementation: What you can say about this code#2 ? ==================================TRY #2=======================================
import sysfrom urllib import urlopenfrom BeautifulSoup import BeautifulSoup, SoupStrainerimport refrom Queue import Queue, Emptyfrom threading import ThreadW_STOP = 1class Worker(Thread):"""Thread executing tasks from a given tasks queue"""def __init__(self, pool, tasks):Thread.__init__(self)self.tasks = tasksself.daemon = Trueself.pool = poolself.state = Noneself.start()def stop(self):self.state = W_STOPdef run(self):while True:if self.state == W_STOP:print "\ncalled stop"breaktry:func, args, kargs = self.tasks.get(timeout=3)except Empty:continueprint "\n***START*** %s" % str(self)try: func(*args, **kargs)except Exception, e: print eprint "\n***STOP*** %s", str(self)self.tasks.task_done()class ThreadPool:"""Pool of threads consuming tasks from a queue"""def __init__(self, num_threads):#self.tasks = Queue(num_threads)self.tasks = Queue()self.workers = []for _ in range(num_threads): self.workers.append(Worker(self,self.tasks))def add_task(self, func, *args, **kargs):"""Add a task to the queue"""self.tasks.put((func, args, kargs))def wait_completion(self):"""Wait for completion of all the tasks in the queue"""self.tasks.join()def stop_threads(self):for w in self.workers:w.stop()def wait_stop(self):self.wait_completion()self.stop_threads()visited = set()queue = Queue()external_links_set = set()internal_links_set = set()external_links = 0def process(pool,host,url):try:content = urlopen(url).read()except UnicodeDecodeError:returnfor link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):try:href = link['href']except KeyError:continueif not href.startswith('http://'):href = 'http://%s%s' % (host, href)if not href.startswith('http://%s%s' % (host, '/')):continueinternal_links_set.add(href)if href not in visited:visited.add(href)pool.add_task(process,pool,host,href)else:passdef start(host,charset):pool = ThreadPool(20)pool.add_task(process,pool,host,'http://%s/' % (host))pool.wait_stop()start('evgenm.com','utf8')