1import shutil
2import tempfile
3import unittest
4import collections
5
6from twisted.internet import defer
7from twisted.trial.unittest import TestCase
8
9from scrapy.crawler import Crawler
10from scrapy.core.downloader import Downloader
11from scrapy.core.scheduler import Scheduler
12from scrapy.http import Request
13from scrapy.spiders import Spider
14from scrapy.utils.httpobj import urlparse_cached
15from scrapy.utils.test import get_crawler
16from tests.mockserver import MockServer
17
18
19MockEngine = collections.namedtuple('MockEngine', ['downloader'])
20MockSlot = collections.namedtuple('MockSlot', ['active'])
21
22
23class MockDownloader:
24    def __init__(self):
25        self.slots = dict()
26
27    def _get_slot_key(self, request, spider):
28        if Downloader.DOWNLOAD_SLOT in request.meta:
29            return request.meta[Downloader.DOWNLOAD_SLOT]
30
31        return urlparse_cached(request).hostname or ''
32
33    def increment(self, slot_key):
34        slot = self.slots.setdefault(slot_key, MockSlot(active=list()))
35        slot.active.append(1)
36
37    def decrement(self, slot_key):
38        slot = self.slots.get(slot_key)
39        slot.active.pop()
40
41    def close(self):
42        pass
43
44
45class MockCrawler(Crawler):
46    def __init__(self, priority_queue_cls, jobdir):
47
48        settings = dict(
49            SCHEDULER_DEBUG=False,
50            SCHEDULER_DISK_QUEUE='scrapy.squeues.PickleLifoDiskQueue',
51            SCHEDULER_MEMORY_QUEUE='scrapy.squeues.LifoMemoryQueue',
52            SCHEDULER_PRIORITY_QUEUE=priority_queue_cls,
53            JOBDIR=jobdir,
54            DUPEFILTER_CLASS='scrapy.dupefilters.BaseDupeFilter',
55        )
56        super().__init__(Spider, settings)
57        self.engine = MockEngine(downloader=MockDownloader())
58
59
60class SchedulerHandler:
61    priority_queue_cls = None
62    jobdir = None
63
64    def create_scheduler(self):
65        self.mock_crawler = MockCrawler(self.priority_queue_cls, self.jobdir)
66        self.scheduler = Scheduler.from_crawler(self.mock_crawler)
67        self.spider = Spider(name='spider')
68        self.scheduler.open(self.spider)
69
70    def close_scheduler(self):
71        self.scheduler.close('finished')
72        self.mock_crawler.stop()
73        self.mock_crawler.engine.downloader.close()
74
75    def setUp(self):
76        self.create_scheduler()
77
78    def tearDown(self):
79        self.close_scheduler()
80
81
82_PRIORITIES = [("http://foo.com/a", -2),
83               ("http://foo.com/d", 1),
84               ("http://foo.com/b", -1),
85               ("http://foo.com/c", 0),
86               ("http://foo.com/e", 2)]
87
88
89_URLS = {"http://foo.com/a", "http://foo.com/b", "http://foo.com/c"}
90
91
92class BaseSchedulerInMemoryTester(SchedulerHandler):
93    def test_length(self):
94        self.assertFalse(self.scheduler.has_pending_requests())
95        self.assertEqual(len(self.scheduler), 0)
96
97        for url in _URLS:
98            self.scheduler.enqueue_request(Request(url))
99
100        self.assertTrue(self.scheduler.has_pending_requests())
101        self.assertEqual(len(self.scheduler), len(_URLS))
102
103    def test_dequeue(self):
104        for url in _URLS:
105            self.scheduler.enqueue_request(Request(url))
106
107        urls = set()
108        while self.scheduler.has_pending_requests():
109            urls.add(self.scheduler.next_request().url)
110
111        self.assertEqual(urls, _URLS)
112
113    def test_dequeue_priorities(self):
114        for url, priority in _PRIORITIES:
115            self.scheduler.enqueue_request(Request(url, priority=priority))
116
117        priorities = list()
118        while self.scheduler.has_pending_requests():
119            priorities.append(self.scheduler.next_request().priority)
120
121        self.assertEqual(priorities,
122                         sorted([x[1] for x in _PRIORITIES], key=lambda x: -x))
123
124
125class BaseSchedulerOnDiskTester(SchedulerHandler):
126
127    def setUp(self):
128        self.jobdir = tempfile.mkdtemp()
129        self.create_scheduler()
130
131    def tearDown(self):
132        self.close_scheduler()
133
134        shutil.rmtree(self.jobdir)
135        self.jobdir = None
136
137    def test_length(self):
138        self.assertFalse(self.scheduler.has_pending_requests())
139        self.assertEqual(len(self.scheduler), 0)
140
141        for url in _URLS:
142            self.scheduler.enqueue_request(Request(url))
143
144        self.close_scheduler()
145        self.create_scheduler()
146
147        self.assertTrue(self.scheduler.has_pending_requests())
148        self.assertEqual(len(self.scheduler), len(_URLS))
149
150    def test_dequeue(self):
151        for url in _URLS:
152            self.scheduler.enqueue_request(Request(url))
153
154        self.close_scheduler()
155        self.create_scheduler()
156
157        urls = set()
158        while self.scheduler.has_pending_requests():
159            urls.add(self.scheduler.next_request().url)
160
161        self.assertEqual(urls, _URLS)
162
163    def test_dequeue_priorities(self):
164        for url, priority in _PRIORITIES:
165            self.scheduler.enqueue_request(Request(url, priority=priority))
166
167        self.close_scheduler()
168        self.create_scheduler()
169
170        priorities = list()
171        while self.scheduler.has_pending_requests():
172            priorities.append(self.scheduler.next_request().priority)
173
174        self.assertEqual(priorities,
175                         sorted([x[1] for x in _PRIORITIES], key=lambda x: -x))
176
177
178class TestSchedulerInMemory(BaseSchedulerInMemoryTester, unittest.TestCase):
179    priority_queue_cls = 'scrapy.pqueues.ScrapyPriorityQueue'
180
181
182class TestSchedulerOnDisk(BaseSchedulerOnDiskTester, unittest.TestCase):
183    priority_queue_cls = 'scrapy.pqueues.ScrapyPriorityQueue'
184
185
186_URLS_WITH_SLOTS = [("http://foo.com/a", 'a'),
187                    ("http://foo.com/b", 'a'),
188                    ("http://foo.com/c", 'b'),
189                    ("http://foo.com/d", 'b'),
190                    ("http://foo.com/e", 'c'),
191                    ("http://foo.com/f", 'c')]
192
193
194class TestMigration(unittest.TestCase):
195
196    def setUp(self):
197        self.tmpdir = tempfile.mkdtemp()
198
199    def tearDown(self):
200        shutil.rmtree(self.tmpdir)
201
202    def _migration(self, tmp_dir):
203        prev_scheduler_handler = SchedulerHandler()
204        prev_scheduler_handler.priority_queue_cls = 'scrapy.pqueues.ScrapyPriorityQueue'
205        prev_scheduler_handler.jobdir = tmp_dir
206
207        prev_scheduler_handler.create_scheduler()
208        for url in _URLS:
209            prev_scheduler_handler.scheduler.enqueue_request(Request(url))
210        prev_scheduler_handler.close_scheduler()
211
212        next_scheduler_handler = SchedulerHandler()
213        next_scheduler_handler.priority_queue_cls = 'scrapy.pqueues.DownloaderAwarePriorityQueue'
214        next_scheduler_handler.jobdir = tmp_dir
215
216        next_scheduler_handler.create_scheduler()
217
218    def test_migration(self):
219        with self.assertRaises(ValueError):
220            self._migration(self.tmpdir)
221
222
223def _is_scheduling_fair(enqueued_slots, dequeued_slots):
224    """
225    We enqueued same number of requests for every slot.
226    Assert correct order, e.g.
227
228    >>> enqueued = ['a', 'b', 'c'] * 2
229    >>> correct = ['a', 'c', 'b', 'b', 'a', 'c']
230    >>> incorrect = ['a', 'a', 'b', 'c', 'c', 'b']
231    >>> _is_scheduling_fair(enqueued, correct)
232    True
233    >>> _is_scheduling_fair(enqueued, incorrect)
234    False
235    """
236    if len(dequeued_slots) != len(enqueued_slots):
237        return False
238
239    slots_number = len(set(enqueued_slots))
240    for i in range(0, len(dequeued_slots), slots_number):
241        part = dequeued_slots[i:i + slots_number]
242        if len(part) != len(set(part)):
243            return False
244
245    return True
246
247
248class DownloaderAwareSchedulerTestMixin:
249    priority_queue_cls = 'scrapy.pqueues.DownloaderAwarePriorityQueue'
250    reopen = False
251
252    def test_logic(self):
253        for url, slot in _URLS_WITH_SLOTS:
254            request = Request(url)
255            request.meta[Downloader.DOWNLOAD_SLOT] = slot
256            self.scheduler.enqueue_request(request)
257
258        if self.reopen:
259            self.close_scheduler()
260            self.create_scheduler()
261
262        dequeued_slots = list()
263        requests = []
264        downloader = self.mock_crawler.engine.downloader
265        while self.scheduler.has_pending_requests():
266            request = self.scheduler.next_request()
267            # pylint: disable=protected-access
268            slot = downloader._get_slot_key(request, None)
269            dequeued_slots.append(slot)
270            downloader.increment(slot)
271            requests.append(request)
272
273        for request in requests:
274            # pylint: disable=protected-access
275            slot = downloader._get_slot_key(request, None)
276            downloader.decrement(slot)
277
278        self.assertTrue(_is_scheduling_fair(list(s for u, s in _URLS_WITH_SLOTS),
279                                            dequeued_slots))
280        self.assertEqual(sum(len(s.active) for s in downloader.slots.values()), 0)
281
282
283class TestSchedulerWithDownloaderAwareInMemory(DownloaderAwareSchedulerTestMixin,
284                                               BaseSchedulerInMemoryTester,
285                                               unittest.TestCase):
286    pass
287
288
289class TestSchedulerWithDownloaderAwareOnDisk(DownloaderAwareSchedulerTestMixin,
290                                             BaseSchedulerOnDiskTester,
291                                             unittest.TestCase):
292    reopen = True
293
294
295class StartUrlsSpider(Spider):
296
297    def __init__(self, start_urls):
298        self.start_urls = start_urls
299        super().__init__(name='StartUrlsSpider')
300
301    def parse(self, response):
302        pass
303
304
305class TestIntegrationWithDownloaderAwareInMemory(TestCase):
306    def setUp(self):
307        self.crawler = get_crawler(
308            spidercls=StartUrlsSpider,
309            settings_dict={
310                'SCHEDULER_PRIORITY_QUEUE': 'scrapy.pqueues.DownloaderAwarePriorityQueue',
311                'DUPEFILTER_CLASS': 'scrapy.dupefilters.BaseDupeFilter',
312            },
313        )
314
315    @defer.inlineCallbacks
316    def tearDown(self):
317        yield self.crawler.stop()
318
319    @defer.inlineCallbacks
320    def test_integration_downloader_aware_priority_queue(self):
321        with MockServer() as mockserver:
322
323            url = mockserver.url("/status?n=200", is_secure=False)
324            start_urls = [url] * 6
325            yield self.crawler.crawl(start_urls)
326            self.assertEqual(self.crawler.stats.get_value('downloader/response_count'),
327                             len(start_urls))
328
329
330class TestIncompatibility(unittest.TestCase):
331
332    def _incompatible(self):
333        settings = dict(
334            SCHEDULER_PRIORITY_QUEUE='scrapy.pqueues.DownloaderAwarePriorityQueue',
335            CONCURRENT_REQUESTS_PER_IP=1,
336        )
337        crawler = Crawler(Spider, settings)
338        scheduler = Scheduler.from_crawler(crawler)
339        spider = Spider(name='spider')
340        scheduler.open(spider)
341
342    def test_incompatibility(self):
343        with self.assertRaises(ValueError):
344            self._incompatible()
345