1from threading import Thread, Event
2import logging
3import sys
4
5# pylint: disable=invalid-name
6logger = logging.getLogger('grab.spider.base_service')
7# pylint: enable=invalid-name
8
9
10class ServiceWorker(object):
11    def __init__(self, spider, worker_callback):
12        self.spider = spider
13        self.thread = Thread(
14            target=self.worker_callback_wrapper(worker_callback),
15            args=[self]
16        )
17        self.thread.daemon = True
18        th_name = 'worker:%s:%s' % (
19            worker_callback.__self__.__class__.__name__,
20            worker_callback.__name__,
21        )
22        self.thread.name = th_name
23        self.pause_event = Event()
24        self.stop_event = Event()
25        self.resume_event = Event()
26        self.activity_paused = Event()
27        self.is_busy_event = Event()
28
29    def worker_callback_wrapper(self, callback):
30        def wrapper(*args, **kwargs):
31            try:
32                callback(*args, **kwargs)
33            except Exception as ex: # pylint: disable=broad-except
34                logger.error('Spider Service Fatal Error', exc_info=ex)
35                self.spider.fatal_error_queue.put(sys.exc_info())
36        return wrapper
37
38    def start(self):
39        self.thread.start()
40
41    def stop(self):
42        self.stop_event.set()
43
44    def process_pause_signal(self):
45        if self.pause_event.is_set():
46            self.activity_paused.set()
47            self.resume_event.wait()
48
49    def pause(self):
50        self.resume_event.clear()
51        self.pause_event.set()
52        while True:
53            if self.activity_paused.wait(0.1):
54                break
55            if not self.is_alive():
56                break
57
58    def resume(self):
59        self.pause_event.clear()
60        self.activity_paused.clear()
61        self.resume_event.set()
62
63    def is_alive(self):
64        return self.thread.isAlive()
65
66
67class BaseService(object):
68    def create_worker(self, worker_action):
69        # pylint: disable=no-member
70        return ServiceWorker(self.spider, worker_action)
71
72    def iterate_workers(self, objects):
73        for obj in objects:
74            assert isinstance(obj, (ServiceWorker, list))
75            if isinstance(obj, ServiceWorker):
76                yield obj
77            elif isinstance(obj, list):
78                for item in obj:
79                    yield item
80
81    def start(self):
82        for worker in self.iterate_workers(self.worker_registry):
83            worker.start()
84
85    def stop(self):
86        for worker in self.iterate_workers(self.worker_registry):
87            worker.stop()
88
89    def pause(self):
90        for worker in self.iterate_workers(self.worker_registry):
91            worker.pause()
92        #logging.debug('Service %s paused' % self.__class__.__name__)
93
94    def resume(self):
95        for worker in self.iterate_workers(self.worker_registry):
96            worker.resume()
97        #logging.debug('Service %s resumed' % self.__class__.__name__)
98
99    def register_workers(self, *args):
100        # pylint: disable=attribute-defined-outside-init
101        self.worker_registry = args
102
103    def is_busy(self):
104        return any(x.is_busy_event.is_set() for x in
105                   self.iterate_workers(self.worker_registry))
106
107    def is_alive(self):
108        return any(x.is_alive() for x in
109                   self.iterate_workers(self.worker_registry))
110