1
2import sys
3import copy
4from threading import Thread
5import six
6from six.moves import queue
7Queue = queue.Queue
8
9__version__ = (1, 5, 2)
10
11#########################################################################################################
12
13class CargoTruck:
14    """
15        A CargoTruck instance is charged with "filling" itself up with
16        boxes (sub-containers) where each box except the last one has max_items in it
17        from the list of orders it must fulfill.  (The last box may have fewer than max_items...).
18        Threads are used to obtain the items for the orders, where up to num_loaders
19        number of threads can be used to "simultaneously" fulfill an individual order.
20        An order (an URL) is retrieved and the data payload (JSON containing a list of "goods")
21        is stuffed into a box.  The box has serial number (index associated with the order)
22        such that it is stuffed into the Truck's storage container so that upon emptying
23        the container, the items come out in the box serial number order.
24    """
25
26    def __init__(self, orders, num_loaders):
27        self.orders      = orders
28        self.num_loaders = num_loaders
29        self.tank = {}
30
31    def load(self, agent, method_name, timeout):
32        """
33            Given an agent (a clone of a requests.Session instance) and a method name for that
34            agent to execute,  start up num_threads threads to execute the method in parallel
35            on individual items in the self.orders list.  The results are put into a dict that
36            is indexed from 1 .. num_threads with the value at each index a result of the
37            invocation on the agent of the method_name.
38        """
39        self.agent = agent
40        self.tank  = {}
41        threads    = []
42        payload_queue = Queue(0)
43
44        def pageGetter(agent, method_name, index, order, resq, timeout):
45            activity = getattr(agent, method_name)
46            try:
47                result = activity(order, timeout=timeout)
48            except:
49                exc_name, exc_desc = sys.exc_info()[:2]
50                notice = "||||||||||||||\nCargoTruck.load.pageGetter exception: %s, %s\n||||||||||||\n"
51                #sys.stderr.write(notice % (exc_name, exc_desc))
52                result = None
53            resq.put((index, result))
54
55        for ix, order in enumerate(self.orders):
56            self.tank[ix+1] = None
57            thread_safe_agent = copy.copy(self.agent)
58            getter_args = (thread_safe_agent, method_name, ix+1, order, payload_queue, timeout)
59            t = Thread(target=pageGetter, args=getter_args)
60            threads.append(t)
61            t.start()
62
63        for t in threads:
64            t.join()
65
66        if payload_queue.qsize() != len(self.orders):
67            raise Exception("CargoTruck.load payload_queue size too short, only %d of %d expected items present" % \
68                            (payload_queue.qsize(), len(self.orders)))
69
70        while not payload_queue.empty():
71            ix, results = payload_queue.get()
72            #print("item: %d from payload_queue: |%s|" % (ix, results))
73            self.tank[ix] = results
74        shorted = [ix for ix in self.tank if self.tank[ix] == None]
75        if shorted:
76            filled = len(self.orders) - len(shorted)
77            notice = "CargoTruck.load detected incomplete payload_queue, only %d of %d expected items present"
78            raise Exception(notice % (filled, len(self.orders)))
79
80        #payload_queue.task_done()  # apparently not needed or useful
81
82    def dump(self):
83        """
84            To be called after a load has completed.  This method collects up the results that
85            were put in the self.tank dict by iterating over the keys in key integer order and
86            filling up a list with the results value for each dict key. This insures that the
87            final single list returned is in the correct item order.
88        """
89        payload = []
90        indices = sorted([k for k in self.tank.keys()])
91        for ix in indices:
92            if not self.tank[ix]:  # no result for this index?
93                raise Exception("CargoTruck.dump, no result at index %d for request: %s" % (ix, self.orders[ix]))
94            else:
95                payload.extend([self.tank[ix]])
96        return payload
97
98