criticism this python code (crawler with threadpool)

2024/10/13 5:19:53

how good this python code ? need criticism) there is a error in this code, some times script do print "ALL WAIT - CAN FINISH!" and freeze (no more actions are happend..) but i can't find reason why this happend?

site crawler with threadpool:

import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import ThreadW_WAIT = 1
W_WORK = 0class Worker(Thread):"""Thread executing tasks from a given tasks queue"""def __init__(self, pool, tasks):Thread.__init__(self)self.tasks = tasksself.daemon = Trueself.start()self.pool = poolself.state = Nonedef is_wait(self):return self.state == W_WAITdef run(self):while True:#if all workers wait - time to exsitprint "CHECK WAIT: !!! ",self.pool.is_all_wait()if self.pool.is_all_wait():print "ALL WAIT - CAN FINISH!"returntry:func, args, kargs = self.tasks.get(timeout=3)except Empty:print "task wait timeout"continueself.state = W_WORKprint "START !!! in thread %s" % str(self)#print argstry: func(*args, **kargs)except Exception, e: print eprint "!!! STOP in thread %s", str(self)self.tasks.task_done()self.state = W_WAIT#threads can fast empty it!#if self.tasks.qsize() == 0:#    print "QUIT!!!!!!"#    breakclass ThreadPool:"""Pool of threads consuming tasks from a queue"""def __init__(self, num_threads):#self.tasks = Queue(num_threads)self.tasks = Queue()self.workers = []for _ in range(num_threads): self.workers.append(Worker(self,self.tasks))def add_task(self, func, *args, **kargs):"""Add a task to the queue"""self.tasks.put((func, args, kargs))def wait_completion(self):"""Wait for completion of all the tasks in the queue"""self.tasks.join()def is_all_wait(self):for w in self.workers:if not w.is_wait():return Falsereturn Truevisited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0def process(pool,host,url):try:content = urlopen(url).read()except UnicodeDecodeError:returnfor link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):try:href = link['href']except KeyError:continueif not href.startswith('http://'):href = 'http://%s%s' % (host, href)if not href.startswith('http://%s%s' % (host, '/')):continueinternal_links_set.add(href)if href not in visited:visited.add(href)pool.add_task(process,pool,host,href)else:passdef start(host,charset):pool = ThreadPool(20)pool.add_task(process,pool,host,'http://%s/' % (host))pool.wait_completion()start('evgenm.com','utf8') 

Thanx for help! i make new implementation: What you can say about this code#2 ? ==================================TRY #2=======================================

    import sysfrom urllib import urlopenfrom BeautifulSoup import BeautifulSoup, SoupStrainerimport refrom Queue import Queue, Emptyfrom threading import ThreadW_STOP = 1class Worker(Thread):"""Thread executing tasks from a given tasks queue"""def __init__(self, pool, tasks):Thread.__init__(self)self.tasks = tasksself.daemon = Trueself.pool = poolself.state = Noneself.start()def stop(self):self.state = W_STOPdef run(self):while True:if self.state == W_STOP:print "\ncalled stop"breaktry:func, args, kargs = self.tasks.get(timeout=3)except Empty:continueprint "\n***START*** %s" % str(self)try: func(*args, **kargs)except Exception, e: print eprint "\n***STOP*** %s", str(self)self.tasks.task_done()class ThreadPool:"""Pool of threads consuming tasks from a queue"""def __init__(self, num_threads):#self.tasks = Queue(num_threads)self.tasks = Queue()self.workers = []for _ in range(num_threads): self.workers.append(Worker(self,self.tasks))def add_task(self, func, *args, **kargs):"""Add a task to the queue"""self.tasks.put((func, args, kargs))def wait_completion(self):"""Wait for completion of all the tasks in the queue"""self.tasks.join()def stop_threads(self):for w in self.workers:w.stop()def wait_stop(self):self.wait_completion()self.stop_threads()visited = set()queue = Queue()external_links_set = set()internal_links_set = set()external_links = 0def process(pool,host,url):try:content = urlopen(url).read()except UnicodeDecodeError:returnfor link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):try:href = link['href']except KeyError:continueif not href.startswith('http://'):href = 'http://%s%s' % (host, href)if not href.startswith('http://%s%s' % (host, '/')):continueinternal_links_set.add(href)if href not in visited:visited.add(href)pool.add_task(process,pool,host,href)else:passdef start(host,charset):pool = ThreadPool(20)pool.add_task(process,pool,host,'http://%s/' % (host))pool.wait_stop()start('evgenm.com','utf8') 
Answer

You are sharing state between threads (i.e., in is_all_wait) without synchronization. Plus, the fact that all threads are "waiting" is not a reliable indicator that the queue is empty (for instance, they could all be in the process of getting a task). I suspect that, occasionally, threads are exiting before the queue is truly empty. If this happens often enough, you will be left with tasks in the queue but no threads to run them. So queue.join() will wait forever.

My recomendation is:

  1. Get rid of is_all_wait -- it's not a reliable indicator
  2. Get rid of the task state -- it's not really necessary
  3. Rely on queue.join to let you know when everything is processed

If you need to kill the threads (for example, this is part of a larger, long-running program), then do so after the queue.join().

https://en.xdnf.cn/q/118117.html

Related Q&A

cx_Freeze executable not displaying matplotlib figures

I am using Python 3.5 and I was able to create an executable using cx_Freeze but whenever I try to run the executable it runs without error but it cannot display any matplotlib figure. I have used Tkin…

Saving variables in n Entry widgets Tkinter interface

Firstly apologises for the length of code but I wanted to show it all.I have an interface that looks like this:When I change the third Option Menu to "List" I will add in the option to have n…

Pulling the href from a link when web scraping using Python

I am scraping from this page: https://www.pro-football-reference.com/years/2018/week_1.htmIt is a list of game scores for American Football. I want to open the link to the stats for the first game. The…

Php: Running a python script using blender from a php project using cmd commands

I need to run in cmd a python script for blender from blender and print the result from a php project, but I dont get the all result. Here is my code:$script = "C:\Users\madalina\Desktop\workspace…

Pymysql when executing Union query with %s (Parameter Placeholder)

This is the code about UNION QUERY:smith =Smithsmithb=Smithsql="""SELECT Distinct Pnumber FROM PROJECT, DEPARTMENT, EMPLOYEE WHERE Dnum = Dnumber AND Mgr_ssn=Ssn AND Lname= %s UNION SELE…

Django - Calling list or dict item using a variable in template

Im trying to call a dictionary or list object in a template using a variable in that template with no results.What Im trying to is identical to this general python code:keylist=[firstkey,secondkey,thir…

Multi-Classification NN with Keras error

I am getting an error when trying to do multi-classification with three classes. Error: TypeError: fit_generator() got multiple values for argument steps_per_epochCode Giving Error: NN.fit_generator(tr…

How to do time diff in each group on Pandas in Python

Heres the phony data:df = pd.DataFrame({email: [u1,u1,u1,u2,u2,u2],timestamp: [3, 1, 5, 11, 15, 9]})What I intend to retrieve is the time diff in each group of email. Thus, after sorting by timestamp i…

How to copy contents of a subdirectory in python

I am newbie to python, I am trying to achieve following task-I have a directory WP_Test containing a sub-directory test, I want to copy all the files and folders inside this sub-directory test to anoth…

Facing issue while providing dynamic name to file in python through a function

the line : with open(new%s.txt % intg ,a) as g : is giving error in below code. Every time I call the function "Repeat", it should create file with name new1.txt, new2.txt and so on. But it …