# -*- coding: UTF-8 -*-
import re
import sys
import threading
import traceback
import random
import time
import subprocess
import shlex
try:
import Queue # Python 2
except ImportError:
import queue as Queue # Python 3
class NoResultsPending(Exception):
pass
class NoWorkersAvailable(Exception):
pass
def _handle_thread_exception(request, exc_info):
traceback.print_exception(*exc_info)
def makeRequests(callable_, args_list, callback=None,
exc_callback=_handle_thread_exception):
requests = []
for item in args_list:
if isinstance(item, tuple):
requests.append(
WorkRequest(callable_, item[0], item[1], callback=callback,
exc_callback=exc_callback)
)
else:
requests.append(
WorkRequest(callable_, [item], None, callback=callback,
exc_callback=exc_callback)
)
return requests
class WorkerThread(threading.Thread):
def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
threading.Thread.__init__(self, **kwds)
self.setDaemon(1)
self._requests_queue = requests_queue
self._results_queue = results_queue
self._poll_timeout = poll_timeout
self._dismissed = threading.Event()
self.start()
def run(self):
while True:
if self._dismissed.isSet():
# we are dismissed, break out of loop
break
try:
request = self._requests_queue.get(True, self._poll_timeout)
except Queue.Empty:
continue
except Exception as e:
pass
else:
if self._dismissed.isSet():
self._requests_queue.put(request)
break
try:
result = request.callable(*request.args, **request.kwds)
self._results_queue.put((request, result))
except:
request.exception = True
self._results_queue.put((request, sys.exc_info()))
def dismiss(self):
self._dismissed.set()
class WorkRequest:
def __init__(self, callable_, args=None, kwds=None, requestID=None,
callback=None, exc_callback=_handle_thread_exception):
if requestID is None:
self.requestID = id(self)
else:
try:
self.requestID = hash(requestID)
except TypeError:
raise TypeError("requestID must be hashable.")
self.exception = False
self.callback = callback
self.exc_callback = exc_callback
self.callable = callable_
self.args = args or []
self.kwds = kwds or {}
def __str__(self):
return "<WorkRequest funname=%s id=%s args=%s kwargs=%s exception=%s>" % \
(self.callable,self.requestID, self.args, self.kwds, self.exception)
class ThreadPool:
"""A thread pool, distributing work requests and collecting results.
See the module docstring for more information.
"""
def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
self._requests_queue = Queue.Queue(q_size)
self._results_queue = Queue.Queue(resq_size)
self.workers = []
self.dismissedWorkers = []
self.workRequests = {}
self.createWorkers(num_workers, poll_timeout)
def createWorkers(self, num_workers, poll_timeout=5):
for i in range(num_workers):
self.workers.append(WorkerThread(self._requests_queue,
self._results_queue, poll_timeout=poll_timeout))
def dismissWorkers(self, num_workers, do_join=False):
"""Tell num_workers worker threads to quit after their current task."""
dismiss_list = []
for i in range(min(num_workers, len(self.workers))):
worker = self.workers.pop()
worker.dismiss()
dismiss_list.append(worker)
if do_join:
for worker in dismiss_list:
worker.join()
else:
self.dismissedWorkers.extend(dismiss_list)
def joinAllDismissedWorkers(self):
"""Perform Thread.join() on all worker threads that have been dismissed.
"""
for worker in self.dismissedWorkers:
worker.join()
self.dismissedWorkers = []
def putRequest(self, request, block=True, timeout=None):
"""Put work request into work queue and save its id for later."""
assert isinstance(request, WorkRequest)
# don't reuse old work requests
assert not getattr(request, 'exception', None)
self._requests_queue.put(request, block, timeout)
self.workRequests[request.requestID] = request
def poll(self, block=False):
"""Process any new results in the queue."""
while True:
# still results pending?
if not self.workRequests:
raise NoResultsPending
# are there still workers to process remaining requests?
elif block and not self.workers:
raise NoWorkersAvailable
try:
# get back next results
request, result = self._results_queue.get(block=block)
# has an exception occured?
if request.exception and request.exc_callback:
request.exc_callback(request, result)
# hand results to callback, if any
if request.callback and not \
(request.exception and request.exc_callback):
request.callback(request, result)
del self.workRequests[request.requestID]
except Queue.Empty:
break
def wait(self):
"""Wait for results, blocking until all have arrived."""
while 1:
try:
self.poll(True)
except NoResultsPending:
break
def func1(domain):
cmd = cmd = 'tracert %s ' % domain
try:
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE)
outroute1, err = proc.communicate()
except Exception as e:
pass
return outroute1
def do_something(data):
time.sleep(data*5)
result = round(random.random() * data, 5)
return result
global requests
global getrnum
def callfunction(mytaskpool,funname,data):
global SimuRunCount
requests=makeRequests(funname,data,print_result,handle_exception)
for req in requests:
global iIndex
global iwhile
iwhile=iwhile+1
iIndex = iIndex + 1
mytaskpool.putRequest(req)
funargsstr = re.findall(".*args=(.*) kwargs=.*", str(req))
funnamestr = re.findall(".*<function (.*) at .*", str(funname))
args=funargsstr[0]
funNamet=funnamestr[0]
print("Work request #funName %s #params %s #id %s added." % (funNamet,args, req.requestID))
if iIndex==SimuRunCount:
while True:
try:
time.sleep(0.5)
mytaskpool.poll()
if iwhile == SimuRunCount:
mytaskpool.createWorkers(SimuRunCount)
if iwhile== 20:
mytaskpool.dismissWorkers(2)
iwhile += 1
except KeyboardInterrupt:
print("**** Interrupted!")
break
except NoResultsPending:
break
iIndex = 0
iwhile = 0
# mytaskpool.wait()
#print finshedarrylist
def getreuslt(getnum):
getresult=[]
if getnum>0:
if getnum>len(finshedarrylist):
getnum=len(finshedarrylist)
#print getnum
for c in range(0,getnum):
getresult.append(finshedarrylist[c])
for ritem in getresult:
finshedarrylist.remove(ritem)
return getresult
def print_result(request, result):
requestlist=str(request)
funnamestr = re.findall(".*<WorkRequest funname=<function (.*)at.*", requestlist)
funargstr = re.findall(".* args=(.*) kwargs=.*", requestlist)
funName = str(funnamestr[0]).replace(' ', '')
argslist=str(funargstr[0]).replace(',', '').replace('(', '').replace(')', '')
finshExecArray = {}
finshExecArray['request'] = request.requestID
finshExecArray["params"] = argslist
finshExecArray["funname"]=funName
finshExecArray['result'] = result
print("**** Result from request #%s:%s:%s:%s" % (request.requestID,funName,argslist, result))
finshedarrylist.append(finshExecArray)
# this will be called when an exception occurs within a thread
# this example exception handler does little more than the default handler
def handle_exception(request, exc_info):
if not isinstance(exc_info, tuple):
# Something is seriously wrong...
print(request)
print(exc_info)
raise SystemExit
print("**** Exception occured in request #%s: %s" % \
(request.requestID, exc_info))
SimuRunCount = 2
finshedarrylist=[]
if __name__ == '__main__':
iIndex = 0
iwhile = 0
requests = None
FinishArray=[]
mytaskpool = ThreadPool(SimuRunCount)
data1 = [((3,), {}), ((5,), {})]
callfunction(mytaskpool, do_something, data1)
dataurl = [(('www.lessnet.cn',), {})]
callfunction(mytaskpool, func1,dataurl)
data = [((6,), {})]
callfunction(mytaskpool, do_something, data)
data = [((7,), {})]
callfunction(mytaskpool, do_something, data)
print finshedarrylist
getreusltlist = getreuslt(SimuRunCount)
print getreusltlist
print finshedarrylist
if mytaskpool.dismissedWorkers:
mytaskpool.joinAllDismissedWorkers()
print("Joining all dismissed worker threads...")
目前只能等待设定的线程数完成后才执行下一轮,如何更改表示我完成 5 个线程,继续添加等待的任务放入执行队列中。
1
wwqgtxx 2017-02-03 19:12:17 +08:00 via iPhone
直接用 concurrent.futures 类库不就行了,还有你这个排版…
|
2
simple221 OP :-D 我直接复制这段段代码过来的,然后就成这样了。
|