1import time
2
3from six.moves.queue import Queue, Empty
4
5from grab.spider.base_service import BaseService
6from grab.spider.queue_backend import memory
7
8
9class CacheServiceBase(BaseService):
10    def __init__(self, spider, backend):
11        self.spider = spider
12        self.backend = backend
13        self.queue_size_limit = 100
14        # pylint: disable=no-member
15        self.input_queue = self.create_input_queue()
16        self.worker = self.create_worker(self.worker_callback)
17        # pylint: enable=no-member
18        self.register_workers(self.worker)
19
20    def stop(self):
21        super(CacheServiceBase, self).stop()
22        self.backend.close()
23
24
25class CacheReaderService(CacheServiceBase):
26    def create_input_queue(self):
27        return memory.QueueBackend(spider_name=None)
28
29    def worker_callback(self, worker):
30        while not worker.stop_event.is_set():
31            worker.process_pause_signal()
32            try:
33                # Can't use (block=True, timeout=0.1) because
34                # the backend could be mongodb, mysql, etc
35                task = self.input_queue.get()
36            except Empty:
37                time.sleep(0.1)
38            else:
39                grab = self.spider.setup_grab_for_task(task)
40                item = None
41                if self.is_read_allowed(task, grab):
42                    item = self.load_from_cache(task, grab)
43                    if item:
44                        result, task = item
45                        self.spider.task_dispatcher.input_queue.put(
46                            (result, task, None)
47                        )
48                if not item:
49                    self.spider.task_dispatcher.input_queue.put((
50                        task, None, {'source': 'cache_reader'}
51                    ))
52
53    def is_read_allowed(self, task, grab):
54        return (
55            not task.get('refresh_cache', False)
56            and not task.get('disable_cache', False)
57            and grab.detect_request_method() == 'GET'
58        )
59
60    def load_from_cache(self, task, grab):
61        cache_item = self.backend.get_item(grab.config['url'])
62        if cache_item is None:
63            self.spider.stat.inc('cache:req-miss')
64        else:
65            grab.prepare_request()
66            self.backend.load_response(grab, cache_item)
67            grab.log_request('CACHED')
68            self.spider.stat.inc('cache:req-hit')
69
70            return {
71                'ok': True,
72                'grab': grab,
73                'grab_config_backup': grab.dump_config(),
74                'emsg': None,
75                'from_cache': True,
76            }, task
77
78
79class CacheWriterService(CacheServiceBase):
80    def create_input_queue(self):
81        return Queue()
82
83    def worker_callback(self, worker):
84        while not worker.stop_event.is_set():
85            worker.process_pause_signal()
86            try:
87                task, grab = self.input_queue.get(True, 0.1)
88            except Empty:
89                pass
90            else:
91                if self.is_write_allowed(task, grab):
92                    self.backend.save_response(task.url, grab)
93
94    def is_write_allowed(self, task, grab):
95        return (
96            grab.request_method == 'GET'
97            and not task.get('disable_cache')
98            and self.spider.is_valid_network_response_code(
99                grab.doc.code, task
100            )
101        )
102