1 2import sys 3import copy 4from threading import Thread 5import six 6from six.moves import queue 7Queue = queue.Queue 8 9__version__ = (1, 5, 2) 10 11######################################################################################################### 12 13class CargoTruck: 14 """ 15 A CargoTruck instance is charged with "filling" itself up with 16 boxes (sub-containers) where each box except the last one has max_items in it 17 from the list of orders it must fulfill. (The last box may have fewer than max_items...). 18 Threads are used to obtain the items for the orders, where up to num_loaders 19 number of threads can be used to "simultaneously" fulfill an individual order. 20 An order (an URL) is retrieved and the data payload (JSON containing a list of "goods") 21 is stuffed into a box. The box has serial number (index associated with the order) 22 such that it is stuffed into the Truck's storage container so that upon emptying 23 the container, the items come out in the box serial number order. 24 """ 25 26 def __init__(self, orders, num_loaders): 27 self.orders = orders 28 self.num_loaders = num_loaders 29 self.tank = {} 30 31 def load(self, agent, method_name, timeout): 32 """ 33 Given an agent (a clone of a requests.Session instance) and a method name for that 34 agent to execute, start up num_threads threads to execute the method in parallel 35 on individual items in the self.orders list. The results are put into a dict that 36 is indexed from 1 .. num_threads with the value at each index a result of the 37 invocation on the agent of the method_name. 38 """ 39 self.agent = agent 40 self.tank = {} 41 threads = [] 42 payload_queue = Queue(0) 43 44 def pageGetter(agent, method_name, index, order, resq, timeout): 45 activity = getattr(agent, method_name) 46 try: 47 result = activity(order, timeout=timeout) 48 except: 49 exc_name, exc_desc = sys.exc_info()[:2] 50 notice = "||||||||||||||\nCargoTruck.load.pageGetter exception: %s, %s\n||||||||||||\n" 51 #sys.stderr.write(notice % (exc_name, exc_desc)) 52 result = None 53 resq.put((index, result)) 54 55 for ix, order in enumerate(self.orders): 56 self.tank[ix+1] = None 57 thread_safe_agent = copy.copy(self.agent) 58 getter_args = (thread_safe_agent, method_name, ix+1, order, payload_queue, timeout) 59 t = Thread(target=pageGetter, args=getter_args) 60 threads.append(t) 61 t.start() 62 63 for t in threads: 64 t.join() 65 66 if payload_queue.qsize() != len(self.orders): 67 raise Exception("CargoTruck.load payload_queue size too short, only %d of %d expected items present" % \ 68 (payload_queue.qsize(), len(self.orders))) 69 70 while not payload_queue.empty(): 71 ix, results = payload_queue.get() 72 #print("item: %d from payload_queue: |%s|" % (ix, results)) 73 self.tank[ix] = results 74 shorted = [ix for ix in self.tank if self.tank[ix] == None] 75 if shorted: 76 filled = len(self.orders) - len(shorted) 77 notice = "CargoTruck.load detected incomplete payload_queue, only %d of %d expected items present" 78 raise Exception(notice % (filled, len(self.orders))) 79 80 #payload_queue.task_done() # apparently not needed or useful 81 82 def dump(self): 83 """ 84 To be called after a load has completed. This method collects up the results that 85 were put in the self.tank dict by iterating over the keys in key integer order and 86 filling up a list with the results value for each dict key. This insures that the 87 final single list returned is in the correct item order. 88 """ 89 payload = [] 90 indices = sorted([k for k in self.tank.keys()]) 91 for ix in indices: 92 if not self.tank[ix]: # no result for this index? 93 raise Exception("CargoTruck.dump, no result at index %d for request: %s" % (ix, self.orders[ix])) 94 else: 95 payload.extend([self.tank[ix]]) 96 return payload 97 98