1import time 2 3from six.moves.queue import Queue, Empty 4 5from grab.spider.base_service import BaseService 6from grab.spider.queue_backend import memory 7 8 9class CacheServiceBase(BaseService): 10 def __init__(self, spider, backend): 11 self.spider = spider 12 self.backend = backend 13 self.queue_size_limit = 100 14 # pylint: disable=no-member 15 self.input_queue = self.create_input_queue() 16 self.worker = self.create_worker(self.worker_callback) 17 # pylint: enable=no-member 18 self.register_workers(self.worker) 19 20 def stop(self): 21 super(CacheServiceBase, self).stop() 22 self.backend.close() 23 24 25class CacheReaderService(CacheServiceBase): 26 def create_input_queue(self): 27 return memory.QueueBackend(spider_name=None) 28 29 def worker_callback(self, worker): 30 while not worker.stop_event.is_set(): 31 worker.process_pause_signal() 32 try: 33 # Can't use (block=True, timeout=0.1) because 34 # the backend could be mongodb, mysql, etc 35 task = self.input_queue.get() 36 except Empty: 37 time.sleep(0.1) 38 else: 39 grab = self.spider.setup_grab_for_task(task) 40 item = None 41 if self.is_read_allowed(task, grab): 42 item = self.load_from_cache(task, grab) 43 if item: 44 result, task = item 45 self.spider.task_dispatcher.input_queue.put( 46 (result, task, None) 47 ) 48 if not item: 49 self.spider.task_dispatcher.input_queue.put(( 50 task, None, {'source': 'cache_reader'} 51 )) 52 53 def is_read_allowed(self, task, grab): 54 return ( 55 not task.get('refresh_cache', False) 56 and not task.get('disable_cache', False) 57 and grab.detect_request_method() == 'GET' 58 ) 59 60 def load_from_cache(self, task, grab): 61 cache_item = self.backend.get_item(grab.config['url']) 62 if cache_item is None: 63 self.spider.stat.inc('cache:req-miss') 64 else: 65 grab.prepare_request() 66 self.backend.load_response(grab, cache_item) 67 grab.log_request('CACHED') 68 self.spider.stat.inc('cache:req-hit') 69 70 return { 71 'ok': True, 72 'grab': grab, 73 'grab_config_backup': grab.dump_config(), 74 'emsg': None, 75 'from_cache': True, 76 }, task 77 78 79class CacheWriterService(CacheServiceBase): 80 def create_input_queue(self): 81 return Queue() 82 83 def worker_callback(self, worker): 84 while not worker.stop_event.is_set(): 85 worker.process_pause_signal() 86 try: 87 task, grab = self.input_queue.get(True, 0.1) 88 except Empty: 89 pass 90 else: 91 if self.is_write_allowed(task, grab): 92 self.backend.save_response(task.url, grab) 93 94 def is_write_allowed(self, task, grab): 95 return ( 96 grab.request_method == 'GET' 97 and not task.get('disable_cache') 98 and self.spider.is_valid_network_response_code( 99 grab.doc.code, task 100 ) 101 ) 102