1from threading import Thread, Event 2import logging 3import sys 4 5# pylint: disable=invalid-name 6logger = logging.getLogger('grab.spider.base_service') 7# pylint: enable=invalid-name 8 9 10class ServiceWorker(object): 11 def __init__(self, spider, worker_callback): 12 self.spider = spider 13 self.thread = Thread( 14 target=self.worker_callback_wrapper(worker_callback), 15 args=[self] 16 ) 17 self.thread.daemon = True 18 th_name = 'worker:%s:%s' % ( 19 worker_callback.__self__.__class__.__name__, 20 worker_callback.__name__, 21 ) 22 self.thread.name = th_name 23 self.pause_event = Event() 24 self.stop_event = Event() 25 self.resume_event = Event() 26 self.activity_paused = Event() 27 self.is_busy_event = Event() 28 29 def worker_callback_wrapper(self, callback): 30 def wrapper(*args, **kwargs): 31 try: 32 callback(*args, **kwargs) 33 except Exception as ex: # pylint: disable=broad-except 34 logger.error('Spider Service Fatal Error', exc_info=ex) 35 self.spider.fatal_error_queue.put(sys.exc_info()) 36 return wrapper 37 38 def start(self): 39 self.thread.start() 40 41 def stop(self): 42 self.stop_event.set() 43 44 def process_pause_signal(self): 45 if self.pause_event.is_set(): 46 self.activity_paused.set() 47 self.resume_event.wait() 48 49 def pause(self): 50 self.resume_event.clear() 51 self.pause_event.set() 52 while True: 53 if self.activity_paused.wait(0.1): 54 break 55 if not self.is_alive(): 56 break 57 58 def resume(self): 59 self.pause_event.clear() 60 self.activity_paused.clear() 61 self.resume_event.set() 62 63 def is_alive(self): 64 return self.thread.isAlive() 65 66 67class BaseService(object): 68 def create_worker(self, worker_action): 69 # pylint: disable=no-member 70 return ServiceWorker(self.spider, worker_action) 71 72 def iterate_workers(self, objects): 73 for obj in objects: 74 assert isinstance(obj, (ServiceWorker, list)) 75 if isinstance(obj, ServiceWorker): 76 yield obj 77 elif isinstance(obj, list): 78 for item in obj: 79 yield item 80 81 def start(self): 82 for worker in self.iterate_workers(self.worker_registry): 83 worker.start() 84 85 def stop(self): 86 for worker in self.iterate_workers(self.worker_registry): 87 worker.stop() 88 89 def pause(self): 90 for worker in self.iterate_workers(self.worker_registry): 91 worker.pause() 92 #logging.debug('Service %s paused' % self.__class__.__name__) 93 94 def resume(self): 95 for worker in self.iterate_workers(self.worker_registry): 96 worker.resume() 97 #logging.debug('Service %s resumed' % self.__class__.__name__) 98 99 def register_workers(self, *args): 100 # pylint: disable=attribute-defined-outside-init 101 self.worker_registry = args 102 103 def is_busy(self): 104 return any(x.is_busy_event.is_set() for x in 105 self.iterate_workers(self.worker_registry)) 106 107 def is_alive(self): 108 return any(x.is_alive() for x in 109 self.iterate_workers(self.worker_registry)) 110