1import shutil 2import tempfile 3import unittest 4import collections 5 6from twisted.internet import defer 7from twisted.trial.unittest import TestCase 8 9from scrapy.crawler import Crawler 10from scrapy.core.downloader import Downloader 11from scrapy.core.scheduler import Scheduler 12from scrapy.http import Request 13from scrapy.spiders import Spider 14from scrapy.utils.httpobj import urlparse_cached 15from scrapy.utils.test import get_crawler 16from tests.mockserver import MockServer 17 18 19MockEngine = collections.namedtuple('MockEngine', ['downloader']) 20MockSlot = collections.namedtuple('MockSlot', ['active']) 21 22 23class MockDownloader: 24 def __init__(self): 25 self.slots = dict() 26 27 def _get_slot_key(self, request, spider): 28 if Downloader.DOWNLOAD_SLOT in request.meta: 29 return request.meta[Downloader.DOWNLOAD_SLOT] 30 31 return urlparse_cached(request).hostname or '' 32 33 def increment(self, slot_key): 34 slot = self.slots.setdefault(slot_key, MockSlot(active=list())) 35 slot.active.append(1) 36 37 def decrement(self, slot_key): 38 slot = self.slots.get(slot_key) 39 slot.active.pop() 40 41 def close(self): 42 pass 43 44 45class MockCrawler(Crawler): 46 def __init__(self, priority_queue_cls, jobdir): 47 48 settings = dict( 49 SCHEDULER_DEBUG=False, 50 SCHEDULER_DISK_QUEUE='scrapy.squeues.PickleLifoDiskQueue', 51 SCHEDULER_MEMORY_QUEUE='scrapy.squeues.LifoMemoryQueue', 52 SCHEDULER_PRIORITY_QUEUE=priority_queue_cls, 53 JOBDIR=jobdir, 54 DUPEFILTER_CLASS='scrapy.dupefilters.BaseDupeFilter', 55 ) 56 super().__init__(Spider, settings) 57 self.engine = MockEngine(downloader=MockDownloader()) 58 59 60class SchedulerHandler: 61 priority_queue_cls = None 62 jobdir = None 63 64 def create_scheduler(self): 65 self.mock_crawler = MockCrawler(self.priority_queue_cls, self.jobdir) 66 self.scheduler = Scheduler.from_crawler(self.mock_crawler) 67 self.spider = Spider(name='spider') 68 self.scheduler.open(self.spider) 69 70 def close_scheduler(self): 71 self.scheduler.close('finished') 72 self.mock_crawler.stop() 73 self.mock_crawler.engine.downloader.close() 74 75 def setUp(self): 76 self.create_scheduler() 77 78 def tearDown(self): 79 self.close_scheduler() 80 81 82_PRIORITIES = [("http://foo.com/a", -2), 83 ("http://foo.com/d", 1), 84 ("http://foo.com/b", -1), 85 ("http://foo.com/c", 0), 86 ("http://foo.com/e", 2)] 87 88 89_URLS = {"http://foo.com/a", "http://foo.com/b", "http://foo.com/c"} 90 91 92class BaseSchedulerInMemoryTester(SchedulerHandler): 93 def test_length(self): 94 self.assertFalse(self.scheduler.has_pending_requests()) 95 self.assertEqual(len(self.scheduler), 0) 96 97 for url in _URLS: 98 self.scheduler.enqueue_request(Request(url)) 99 100 self.assertTrue(self.scheduler.has_pending_requests()) 101 self.assertEqual(len(self.scheduler), len(_URLS)) 102 103 def test_dequeue(self): 104 for url in _URLS: 105 self.scheduler.enqueue_request(Request(url)) 106 107 urls = set() 108 while self.scheduler.has_pending_requests(): 109 urls.add(self.scheduler.next_request().url) 110 111 self.assertEqual(urls, _URLS) 112 113 def test_dequeue_priorities(self): 114 for url, priority in _PRIORITIES: 115 self.scheduler.enqueue_request(Request(url, priority=priority)) 116 117 priorities = list() 118 while self.scheduler.has_pending_requests(): 119 priorities.append(self.scheduler.next_request().priority) 120 121 self.assertEqual(priorities, 122 sorted([x[1] for x in _PRIORITIES], key=lambda x: -x)) 123 124 125class BaseSchedulerOnDiskTester(SchedulerHandler): 126 127 def setUp(self): 128 self.jobdir = tempfile.mkdtemp() 129 self.create_scheduler() 130 131 def tearDown(self): 132 self.close_scheduler() 133 134 shutil.rmtree(self.jobdir) 135 self.jobdir = None 136 137 def test_length(self): 138 self.assertFalse(self.scheduler.has_pending_requests()) 139 self.assertEqual(len(self.scheduler), 0) 140 141 for url in _URLS: 142 self.scheduler.enqueue_request(Request(url)) 143 144 self.close_scheduler() 145 self.create_scheduler() 146 147 self.assertTrue(self.scheduler.has_pending_requests()) 148 self.assertEqual(len(self.scheduler), len(_URLS)) 149 150 def test_dequeue(self): 151 for url in _URLS: 152 self.scheduler.enqueue_request(Request(url)) 153 154 self.close_scheduler() 155 self.create_scheduler() 156 157 urls = set() 158 while self.scheduler.has_pending_requests(): 159 urls.add(self.scheduler.next_request().url) 160 161 self.assertEqual(urls, _URLS) 162 163 def test_dequeue_priorities(self): 164 for url, priority in _PRIORITIES: 165 self.scheduler.enqueue_request(Request(url, priority=priority)) 166 167 self.close_scheduler() 168 self.create_scheduler() 169 170 priorities = list() 171 while self.scheduler.has_pending_requests(): 172 priorities.append(self.scheduler.next_request().priority) 173 174 self.assertEqual(priorities, 175 sorted([x[1] for x in _PRIORITIES], key=lambda x: -x)) 176 177 178class TestSchedulerInMemory(BaseSchedulerInMemoryTester, unittest.TestCase): 179 priority_queue_cls = 'scrapy.pqueues.ScrapyPriorityQueue' 180 181 182class TestSchedulerOnDisk(BaseSchedulerOnDiskTester, unittest.TestCase): 183 priority_queue_cls = 'scrapy.pqueues.ScrapyPriorityQueue' 184 185 186_URLS_WITH_SLOTS = [("http://foo.com/a", 'a'), 187 ("http://foo.com/b", 'a'), 188 ("http://foo.com/c", 'b'), 189 ("http://foo.com/d", 'b'), 190 ("http://foo.com/e", 'c'), 191 ("http://foo.com/f", 'c')] 192 193 194class TestMigration(unittest.TestCase): 195 196 def setUp(self): 197 self.tmpdir = tempfile.mkdtemp() 198 199 def tearDown(self): 200 shutil.rmtree(self.tmpdir) 201 202 def _migration(self, tmp_dir): 203 prev_scheduler_handler = SchedulerHandler() 204 prev_scheduler_handler.priority_queue_cls = 'scrapy.pqueues.ScrapyPriorityQueue' 205 prev_scheduler_handler.jobdir = tmp_dir 206 207 prev_scheduler_handler.create_scheduler() 208 for url in _URLS: 209 prev_scheduler_handler.scheduler.enqueue_request(Request(url)) 210 prev_scheduler_handler.close_scheduler() 211 212 next_scheduler_handler = SchedulerHandler() 213 next_scheduler_handler.priority_queue_cls = 'scrapy.pqueues.DownloaderAwarePriorityQueue' 214 next_scheduler_handler.jobdir = tmp_dir 215 216 next_scheduler_handler.create_scheduler() 217 218 def test_migration(self): 219 with self.assertRaises(ValueError): 220 self._migration(self.tmpdir) 221 222 223def _is_scheduling_fair(enqueued_slots, dequeued_slots): 224 """ 225 We enqueued same number of requests for every slot. 226 Assert correct order, e.g. 227 228 >>> enqueued = ['a', 'b', 'c'] * 2 229 >>> correct = ['a', 'c', 'b', 'b', 'a', 'c'] 230 >>> incorrect = ['a', 'a', 'b', 'c', 'c', 'b'] 231 >>> _is_scheduling_fair(enqueued, correct) 232 True 233 >>> _is_scheduling_fair(enqueued, incorrect) 234 False 235 """ 236 if len(dequeued_slots) != len(enqueued_slots): 237 return False 238 239 slots_number = len(set(enqueued_slots)) 240 for i in range(0, len(dequeued_slots), slots_number): 241 part = dequeued_slots[i:i + slots_number] 242 if len(part) != len(set(part)): 243 return False 244 245 return True 246 247 248class DownloaderAwareSchedulerTestMixin: 249 priority_queue_cls = 'scrapy.pqueues.DownloaderAwarePriorityQueue' 250 reopen = False 251 252 def test_logic(self): 253 for url, slot in _URLS_WITH_SLOTS: 254 request = Request(url) 255 request.meta[Downloader.DOWNLOAD_SLOT] = slot 256 self.scheduler.enqueue_request(request) 257 258 if self.reopen: 259 self.close_scheduler() 260 self.create_scheduler() 261 262 dequeued_slots = list() 263 requests = [] 264 downloader = self.mock_crawler.engine.downloader 265 while self.scheduler.has_pending_requests(): 266 request = self.scheduler.next_request() 267 # pylint: disable=protected-access 268 slot = downloader._get_slot_key(request, None) 269 dequeued_slots.append(slot) 270 downloader.increment(slot) 271 requests.append(request) 272 273 for request in requests: 274 # pylint: disable=protected-access 275 slot = downloader._get_slot_key(request, None) 276 downloader.decrement(slot) 277 278 self.assertTrue(_is_scheduling_fair(list(s for u, s in _URLS_WITH_SLOTS), 279 dequeued_slots)) 280 self.assertEqual(sum(len(s.active) for s in downloader.slots.values()), 0) 281 282 283class TestSchedulerWithDownloaderAwareInMemory(DownloaderAwareSchedulerTestMixin, 284 BaseSchedulerInMemoryTester, 285 unittest.TestCase): 286 pass 287 288 289class TestSchedulerWithDownloaderAwareOnDisk(DownloaderAwareSchedulerTestMixin, 290 BaseSchedulerOnDiskTester, 291 unittest.TestCase): 292 reopen = True 293 294 295class StartUrlsSpider(Spider): 296 297 def __init__(self, start_urls): 298 self.start_urls = start_urls 299 super().__init__(name='StartUrlsSpider') 300 301 def parse(self, response): 302 pass 303 304 305class TestIntegrationWithDownloaderAwareInMemory(TestCase): 306 def setUp(self): 307 self.crawler = get_crawler( 308 spidercls=StartUrlsSpider, 309 settings_dict={ 310 'SCHEDULER_PRIORITY_QUEUE': 'scrapy.pqueues.DownloaderAwarePriorityQueue', 311 'DUPEFILTER_CLASS': 'scrapy.dupefilters.BaseDupeFilter', 312 }, 313 ) 314 315 @defer.inlineCallbacks 316 def tearDown(self): 317 yield self.crawler.stop() 318 319 @defer.inlineCallbacks 320 def test_integration_downloader_aware_priority_queue(self): 321 with MockServer() as mockserver: 322 323 url = mockserver.url("/status?n=200", is_secure=False) 324 start_urls = [url] * 6 325 yield self.crawler.crawl(start_urls) 326 self.assertEqual(self.crawler.stats.get_value('downloader/response_count'), 327 len(start_urls)) 328 329 330class TestIncompatibility(unittest.TestCase): 331 332 def _incompatible(self): 333 settings = dict( 334 SCHEDULER_PRIORITY_QUEUE='scrapy.pqueues.DownloaderAwarePriorityQueue', 335 CONCURRENT_REQUESTS_PER_IP=1, 336 ) 337 crawler = Crawler(Spider, settings) 338 scheduler = Scheduler.from_crawler(crawler) 339 spider = Spider(name='spider') 340 scheduler.open(spider) 341 342 def test_incompatibility(self): 343 with self.assertRaises(ValueError): 344 self._incompatible() 345