1"""
2    sphinx.builders.linkcheck
3    ~~~~~~~~~~~~~~~~~~~~~~~~~
4
5    The CheckExternalLinksBuilder class.
6
7    :copyright: Copyright 2007-2021 by the Sphinx team, see AUTHORS.
8    :license: BSD, see LICENSE for details.
9"""
10
11import json
12import queue
13import re
14import socket
15import time
16import warnings
17from datetime import datetime, timezone
18from email.utils import parsedate_to_datetime
19from html.parser import HTMLParser
20from os import path
21from threading import Thread
22from typing import (Any, Dict, Generator, List, NamedTuple, Optional, Pattern, Set, Tuple,
23                    Union, cast)
24from urllib.parse import unquote, urlparse
25
26from docutils import nodes
27from docutils.nodes import Element
28from requests import Response
29from requests.exceptions import HTTPError, TooManyRedirects
30
31from sphinx.application import Sphinx
32from sphinx.builders.dummy import DummyBuilder
33from sphinx.config import Config
34from sphinx.deprecation import RemovedInSphinx50Warning
35from sphinx.environment import BuildEnvironment
36from sphinx.locale import __
37from sphinx.transforms.post_transforms import SphinxPostTransform
38from sphinx.util import encode_uri, logging, requests
39from sphinx.util.console import darkgray, darkgreen, purple, red, turquoise  # type: ignore
40from sphinx.util.nodes import get_node_line
41
42logger = logging.getLogger(__name__)
43
44uri_re = re.compile('([a-z]+:)?//')  # matches to foo:// and // (a protocol relative URL)
45
46Hyperlink = NamedTuple('Hyperlink', (('uri', str),
47                                     ('docname', str),
48                                     ('lineno', Optional[int])))
49CheckRequest = NamedTuple('CheckRequest', (('next_check', float),
50                                           ('hyperlink', Optional[Hyperlink])))
51CheckResult = NamedTuple('CheckResult', (('uri', str),
52                                         ('docname', str),
53                                         ('lineno', int),
54                                         ('status', str),
55                                         ('message', str),
56                                         ('code', int)))
57RateLimit = NamedTuple('RateLimit', (('delay', float), ('next_check', float)))
58
59# Tuple is old styled CheckRequest
60CheckRequestType = Union[CheckRequest, Tuple[float, str, str, int]]
61
62DEFAULT_REQUEST_HEADERS = {
63    'Accept': 'text/html,application/xhtml+xml;q=0.9,*/*;q=0.8',
64}
65CHECK_IMMEDIATELY = 0
66QUEUE_POLL_SECS = 1
67DEFAULT_DELAY = 60.0
68
69
70def node_line_or_0(node: Element) -> int:
71    """
72    PriorityQueue items must be comparable. The line number is part of the
73    tuple used by the PriorityQueue, keep an homogeneous type for comparison.
74    """
75    warnings.warn('node_line_or_0() is deprecated.',
76                  RemovedInSphinx50Warning, stacklevel=2)
77    return get_node_line(node) or 0
78
79
80class AnchorCheckParser(HTMLParser):
81    """Specialized HTML parser that looks for a specific anchor."""
82
83    def __init__(self, search_anchor: str) -> None:
84        super().__init__()
85
86        self.search_anchor = search_anchor
87        self.found = False
88
89    def handle_starttag(self, tag: Any, attrs: Any) -> None:
90        for key, value in attrs:
91            if key in ('id', 'name') and value == self.search_anchor:
92                self.found = True
93                break
94
95
96def check_anchor(response: requests.requests.Response, anchor: str) -> bool:
97    """Reads HTML data from a response object `response` searching for `anchor`.
98    Returns True if anchor was found, False otherwise.
99    """
100    parser = AnchorCheckParser(anchor)
101    # Read file in chunks. If we find a matching anchor, we break
102    # the loop early in hopes not to have to download the whole thing.
103    for chunk in response.iter_content(chunk_size=4096, decode_unicode=True):
104        if isinstance(chunk, bytes):    # requests failed to decode
105            chunk = chunk.decode()      # manually try to decode it
106
107        parser.feed(chunk)
108        if parser.found:
109            break
110    parser.close()
111    return parser.found
112
113
114class CheckExternalLinksBuilder(DummyBuilder):
115    """
116    Checks for broken external links.
117    """
118    name = 'linkcheck'
119    epilog = __('Look for any errors in the above output or in '
120                '%(outdir)s/output.txt')
121
122    def init(self) -> None:
123        self.hyperlinks = {}    # type: Dict[str, Hyperlink]
124        self._good = set()       # type: Set[str]
125        self._broken = {}        # type: Dict[str, str]
126        self._redirected = {}    # type: Dict[str, Tuple[str, int]]
127        # set a timeout for non-responding servers
128        socket.setdefaulttimeout(5.0)
129
130        # create queues and worker threads
131        self._wqueue = queue.PriorityQueue()  # type: queue.PriorityQueue[CheckRequestType]
132        self._rqueue = queue.Queue()  # type: queue.Queue
133
134    @property
135    def anchors_ignore(self) -> List[Pattern]:
136        warnings.warn(
137            "%s.%s is deprecated." % (self.__class__.__name__, "anchors_ignore"),
138            RemovedInSphinx50Warning,
139            stacklevel=2,
140        )
141        return [re.compile(x) for x in self.config.linkcheck_anchors_ignore]
142
143    @property
144    def auth(self) -> List[Tuple[Pattern, Any]]:
145        warnings.warn(
146            "%s.%s is deprecated." % (self.__class__.__name__, "auth"),
147            RemovedInSphinx50Warning,
148            stacklevel=2,
149        )
150        return [(re.compile(pattern), auth_info) for pattern, auth_info
151                in self.config.linkcheck_auth]
152
153    @property
154    def to_ignore(self) -> List[Pattern]:
155        warnings.warn(
156            "%s.%s is deprecated." % (self.__class__.__name__, "to_ignore"),
157            RemovedInSphinx50Warning,
158            stacklevel=2,
159        )
160        return [re.compile(x) for x in self.config.linkcheck_ignore]
161
162    @property
163    def good(self) -> Set[str]:
164        warnings.warn(
165            "%s.%s is deprecated." % (self.__class__.__name__, "good"),
166            RemovedInSphinx50Warning,
167            stacklevel=2,
168        )
169        return self._good
170
171    @property
172    def broken(self) -> Dict[str, str]:
173        warnings.warn(
174            "%s.%s is deprecated." % (self.__class__.__name__, "broken"),
175            RemovedInSphinx50Warning,
176            stacklevel=2,
177        )
178        return self._broken
179
180    @property
181    def redirected(self) -> Dict[str, Tuple[str, int]]:
182        warnings.warn(
183            "%s.%s is deprecated." % (self.__class__.__name__, "redirected"),
184            RemovedInSphinx50Warning,
185            stacklevel=2,
186        )
187        return self._redirected
188
189    def check_thread(self) -> None:
190        warnings.warn(
191            "%s.%s is deprecated." % (self.__class__.__name__, "check_thread"),
192            RemovedInSphinx50Warning,
193            stacklevel=2,
194        )
195        # do nothing.
196
197    def limit_rate(self, response: Response) -> Optional[float]:
198        warnings.warn(
199            "%s.%s is deprecated." % (self.__class__.__name__, "limit_rate"),
200            RemovedInSphinx50Warning,
201            stacklevel=2,
202        )
203        worker = HyperlinkAvailabilityCheckWorker(self.env, self.config,
204                                                  None, None, {})
205        return worker.limit_rate(response)
206
207    def rqueue(self, response: Response) -> queue.Queue:
208        warnings.warn(
209            "%s.%s is deprecated." % (self.__class__.__name__, "rqueue"),
210            RemovedInSphinx50Warning,
211            stacklevel=2,
212        )
213        return self._rqueue
214
215    def workers(self, response: Response) -> List[Thread]:
216        warnings.warn(
217            "%s.%s is deprecated." % (self.__class__.__name__, "workers"),
218            RemovedInSphinx50Warning,
219            stacklevel=2,
220        )
221        return []
222
223    def wqueue(self, response: Response) -> queue.Queue:
224        warnings.warn(
225            "%s.%s is deprecated." % (self.__class__.__name__, "wqueue"),
226            RemovedInSphinx50Warning,
227            stacklevel=2,
228        )
229        return self._wqueue
230
231    def process_result(self, result: Tuple[str, str, int, str, str, int]) -> None:
232        uri, docname, lineno, status, info, code = result
233
234        filename = self.env.doc2path(docname, None)
235        linkstat = dict(filename=filename, lineno=lineno,
236                        status=status, code=code, uri=uri,
237                        info=info)
238        if status == 'unchecked':
239            self.write_linkstat(linkstat)
240            return
241        if status == 'working' and info == 'old':
242            self.write_linkstat(linkstat)
243            return
244        if lineno:
245            logger.info('(%16s: line %4d) ', docname, lineno, nonl=True)
246        if status == 'ignored':
247            if info:
248                logger.info(darkgray('-ignored- ') + uri + ': ' + info)
249            else:
250                logger.info(darkgray('-ignored- ') + uri)
251            self.write_linkstat(linkstat)
252        elif status == 'local':
253            logger.info(darkgray('-local-   ') + uri)
254            self.write_entry('local', docname, filename, lineno, uri)
255            self.write_linkstat(linkstat)
256        elif status == 'working':
257            logger.info(darkgreen('ok        ') + uri + info)
258            self.write_linkstat(linkstat)
259        elif status == 'broken':
260            if self.app.quiet or self.app.warningiserror:
261                logger.warning(__('broken link: %s (%s)'), uri, info,
262                               location=(filename, lineno))
263            else:
264                logger.info(red('broken    ') + uri + red(' - ' + info))
265            self.write_entry('broken', docname, filename, lineno, uri + ': ' + info)
266            self.write_linkstat(linkstat)
267        elif status == 'redirected':
268            try:
269                text, color = {
270                    301: ('permanently', purple),
271                    302: ('with Found', purple),
272                    303: ('with See Other', purple),
273                    307: ('temporarily', turquoise),
274                    308: ('permanently', purple),
275                }[code]
276            except KeyError:
277                text, color = ('with unknown code', purple)
278            linkstat['text'] = text
279            logger.info(color('redirect  ') + uri + color(' - ' + text + ' to ' + info))
280            self.write_entry('redirected ' + text, docname, filename,
281                             lineno, uri + ' to ' + info)
282            self.write_linkstat(linkstat)
283        else:
284            raise ValueError("Unknown status %s." % status)
285
286    def write_entry(self, what: str, docname: str, filename: str, line: int,
287                    uri: str) -> None:
288        self.txt_outfile.write("%s:%s: [%s] %s\n" % (filename, line, what, uri))
289
290    def write_linkstat(self, data: dict) -> None:
291        self.json_outfile.write(json.dumps(data))
292        self.json_outfile.write('\n')
293
294    def finish(self) -> None:
295        checker = HyperlinkAvailabilityChecker(self.env, self.config, self)
296        logger.info('')
297
298        with open(path.join(self.outdir, 'output.txt'), 'w') as self.txt_outfile,\
299             open(path.join(self.outdir, 'output.json'), 'w') as self.json_outfile:
300            for result in checker.check(self.hyperlinks):
301                self.process_result(result)
302
303        if self._broken:
304            self.app.statuscode = 1
305
306
307class HyperlinkAvailabilityChecker:
308    def __init__(self, env: BuildEnvironment, config: Config,
309                 builder: CheckExternalLinksBuilder = None) -> None:
310        # Warning: builder argument will be removed in the sphinx-5.0.
311        # Don't use it from extensions.
312        # tag: RemovedInSphinx50Warning
313        self.builder = builder
314        self.config = config
315        self.env = env
316        self.rate_limits = {}  # type: Dict[str, RateLimit]
317        self.workers = []  # type: List[Thread]
318
319        self.to_ignore = [re.compile(x) for x in self.config.linkcheck_ignore]
320
321        if builder:
322            self.rqueue = builder._rqueue
323            self.wqueue = builder._wqueue
324        else:
325            self.rqueue = queue.Queue()
326            self.wqueue = queue.PriorityQueue()
327
328    def invoke_threads(self) -> None:
329        for i in range(self.config.linkcheck_workers):
330            thread = HyperlinkAvailabilityCheckWorker(self.env, self.config,
331                                                      self.rqueue, self.wqueue,
332                                                      self.rate_limits, self.builder)
333            thread.start()
334            self.workers.append(thread)
335
336    def shutdown_threads(self) -> None:
337        self.wqueue.join()
338        for worker in self.workers:
339            self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, None), False)
340
341    def check(self, hyperlinks: Dict[str, Hyperlink]) -> Generator[CheckResult, None, None]:
342        self.invoke_threads()
343
344        total_links = 0
345        for hyperlink in hyperlinks.values():
346            if self.is_ignored_uri(hyperlink.uri):
347                yield CheckResult(hyperlink.uri, hyperlink.docname, hyperlink.lineno,
348                                  'ignored', '', 0)
349            else:
350                self.wqueue.put(CheckRequest(CHECK_IMMEDIATELY, hyperlink), False)
351                total_links += 1
352
353        done = 0
354        while done < total_links:
355            yield self.rqueue.get()
356            done += 1
357
358        self.shutdown_threads()
359
360    def is_ignored_uri(self, uri: str) -> bool:
361        return any(pat.match(uri) for pat in self.to_ignore)
362
363
364class HyperlinkAvailabilityCheckWorker(Thread):
365    """A worker class for checking the availability of hyperlinks."""
366
367    def __init__(self, env: BuildEnvironment, config: Config, rqueue: queue.Queue,
368                 wqueue: queue.Queue, rate_limits: Dict[str, RateLimit],
369                 builder: CheckExternalLinksBuilder = None) -> None:
370        # Warning: builder argument will be removed in the sphinx-5.0.
371        # Don't use it from extensions.
372        # tag: RemovedInSphinx50Warning
373        self.config = config
374        self.env = env
375        self.rate_limits = rate_limits
376        self.rqueue = rqueue
377        self.wqueue = wqueue
378
379        self.anchors_ignore = [re.compile(x)
380                               for x in self.config.linkcheck_anchors_ignore]
381        self.auth = [(re.compile(pattern), auth_info) for pattern, auth_info
382                     in self.config.linkcheck_auth]
383
384        if builder:
385            # if given, fill the result of checks as cache
386            self._good = builder._good
387            self._broken = builder._broken
388            self._redirected = builder._redirected
389        else:
390            # only for compatibility. Will be removed in Sphinx-5.0
391            self._good = set()
392            self._broken = {}
393            self._redirected = {}
394
395        super().__init__(daemon=True)
396
397    def run(self) -> None:
398        kwargs = {}
399        if self.config.linkcheck_timeout:
400            kwargs['timeout'] = self.config.linkcheck_timeout
401
402        def get_request_headers() -> Dict:
403            url = urlparse(uri)
404            candidates = ["%s://%s" % (url.scheme, url.netloc),
405                          "%s://%s/" % (url.scheme, url.netloc),
406                          uri,
407                          "*"]
408
409            for u in candidates:
410                if u in self.config.linkcheck_request_headers:
411                    headers = dict(DEFAULT_REQUEST_HEADERS)
412                    headers.update(self.config.linkcheck_request_headers[u])
413                    return headers
414
415            return {}
416
417        def check_uri() -> Tuple[str, str, int]:
418            # split off anchor
419            if '#' in uri:
420                req_url, anchor = uri.split('#', 1)
421                for rex in self.anchors_ignore:
422                    if rex.match(anchor):
423                        anchor = None
424                        break
425            else:
426                req_url = uri
427                anchor = None
428
429            # handle non-ASCII URIs
430            try:
431                req_url.encode('ascii')
432            except UnicodeError:
433                req_url = encode_uri(req_url)
434
435            # Get auth info, if any
436            for pattern, auth_info in self.auth:
437                if pattern.match(uri):
438                    break
439            else:
440                auth_info = None
441
442            # update request headers for the URL
443            kwargs['headers'] = get_request_headers()
444
445            try:
446                if anchor and self.config.linkcheck_anchors:
447                    # Read the whole document and see if #anchor exists
448                    response = requests.get(req_url, stream=True, config=self.config,
449                                            auth=auth_info, **kwargs)
450                    response.raise_for_status()
451                    found = check_anchor(response, unquote(anchor))
452
453                    if not found:
454                        raise Exception(__("Anchor '%s' not found") % anchor)
455                else:
456                    try:
457                        # try a HEAD request first, which should be easier on
458                        # the server and the network
459                        response = requests.head(req_url, allow_redirects=True,
460                                                 config=self.config, auth=auth_info,
461                                                 **kwargs)
462                        response.raise_for_status()
463                    except (HTTPError, TooManyRedirects) as err:
464                        if isinstance(err, HTTPError) and err.response.status_code == 429:
465                            raise
466                        # retry with GET request if that fails, some servers
467                        # don't like HEAD requests.
468                        response = requests.get(req_url, stream=True,
469                                                config=self.config,
470                                                auth=auth_info, **kwargs)
471                        response.raise_for_status()
472            except HTTPError as err:
473                if err.response.status_code == 401:
474                    # We'll take "Unauthorized" as working.
475                    return 'working', ' - unauthorized', 0
476                elif err.response.status_code == 429:
477                    next_check = self.limit_rate(err.response)
478                    if next_check is not None:
479                        self.wqueue.put(CheckRequest(next_check, hyperlink), False)
480                        return 'rate-limited', '', 0
481                    return 'broken', str(err), 0
482                elif err.response.status_code == 503:
483                    # We'll take "Service Unavailable" as ignored.
484                    return 'ignored', str(err), 0
485                else:
486                    return 'broken', str(err), 0
487            except Exception as err:
488                return 'broken', str(err), 0
489            else:
490                netloc = urlparse(req_url).netloc
491                try:
492                    del self.rate_limits[netloc]
493                except KeyError:
494                    pass
495            if response.url.rstrip('/') == req_url.rstrip('/'):
496                return 'working', '', 0
497            else:
498                new_url = response.url
499                if anchor:
500                    new_url += '#' + anchor
501                # history contains any redirects, get last
502                if response.history:
503                    code = response.history[-1].status_code
504                    return 'redirected', new_url, code
505                else:
506                    return 'redirected', new_url, 0
507
508        def check(docname: str) -> Tuple[str, str, int]:
509            # check for various conditions without bothering the network
510            if len(uri) == 0 or uri.startswith(('#', 'mailto:', 'tel:')):
511                return 'unchecked', '', 0
512            elif not uri.startswith(('http:', 'https:')):
513                if uri_re.match(uri):
514                    # non supported URI schemes (ex. ftp)
515                    return 'unchecked', '', 0
516                else:
517                    srcdir = path.dirname(self.env.doc2path(docname))
518                    if path.exists(path.join(srcdir, uri)):
519                        return 'working', '', 0
520                    else:
521                        self._broken[uri] = ''
522                        return 'broken', '', 0
523            elif uri in self._good:
524                return 'working', 'old', 0
525            elif uri in self._broken:
526                return 'broken', self._broken[uri], 0
527            elif uri in self._redirected:
528                return 'redirected', self._redirected[uri][0], self._redirected[uri][1]
529
530            # need to actually check the URI
531            for _ in range(self.config.linkcheck_retries):
532                status, info, code = check_uri()
533                if status != "broken":
534                    break
535
536            if status == "working":
537                self._good.add(uri)
538            elif status == "broken":
539                self._broken[uri] = info
540            elif status == "redirected":
541                self._redirected[uri] = (info, code)
542
543            return (status, info, code)
544
545        while True:
546            check_request = self.wqueue.get()
547            try:
548                next_check, hyperlink = check_request
549                if hyperlink is None:
550                    break
551
552                uri, docname, lineno = hyperlink
553            except ValueError:
554                # old styled check_request (will be deprecated in Sphinx-5.0)
555                next_check, uri, docname, lineno = check_request
556
557            if uri is None:
558                break
559            netloc = urlparse(uri).netloc
560            try:
561                # Refresh rate limit.
562                # When there are many links in the queue, workers are all stuck waiting
563                # for responses, but the builder keeps queuing. Links in the queue may
564                # have been queued before rate limits were discovered.
565                next_check = self.rate_limits[netloc].next_check
566            except KeyError:
567                pass
568            if next_check > time.time():
569                # Sleep before putting message back in the queue to avoid
570                # waking up other threads.
571                time.sleep(QUEUE_POLL_SECS)
572                self.wqueue.put(CheckRequest(next_check, hyperlink), False)
573                self.wqueue.task_done()
574                continue
575            status, info, code = check(docname)
576            if status == 'rate-limited':
577                logger.info(darkgray('-rate limited-   ') + uri + darkgray(' | sleeping...'))
578            else:
579                self.rqueue.put((uri, docname, lineno, status, info, code))
580            self.wqueue.task_done()
581
582    def limit_rate(self, response: Response) -> Optional[float]:
583        next_check = None
584        retry_after = response.headers.get("Retry-After")
585        if retry_after:
586            try:
587                # Integer: time to wait before next attempt.
588                delay = float(retry_after)
589            except ValueError:
590                try:
591                    # An HTTP-date: time of next attempt.
592                    until = parsedate_to_datetime(retry_after)
593                except (TypeError, ValueError):
594                    # TypeError: Invalid date format.
595                    # ValueError: Invalid date, e.g. Oct 52th.
596                    pass
597                else:
598                    next_check = datetime.timestamp(until)
599                    delay = (until - datetime.now(timezone.utc)).total_seconds()
600            else:
601                next_check = time.time() + delay
602        netloc = urlparse(response.url).netloc
603        if next_check is None:
604            max_delay = self.config.linkcheck_rate_limit_timeout
605            try:
606                rate_limit = self.rate_limits[netloc]
607            except KeyError:
608                delay = DEFAULT_DELAY
609            else:
610                last_wait_time = rate_limit.delay
611                delay = 2.0 * last_wait_time
612                if delay > max_delay and last_wait_time < max_delay:
613                    delay = max_delay
614            if delay > max_delay:
615                return None
616            next_check = time.time() + delay
617        self.rate_limits[netloc] = RateLimit(delay, next_check)
618        return next_check
619
620
621class HyperlinkCollector(SphinxPostTransform):
622    builders = ('linkcheck',)
623    default_priority = 800
624
625    def run(self, **kwargs: Any) -> None:
626        builder = cast(CheckExternalLinksBuilder, self.app.builder)
627        hyperlinks = builder.hyperlinks
628
629        # reference nodes
630        for refnode in self.document.traverse(nodes.reference):
631            if 'refuri' not in refnode:
632                continue
633            uri = refnode['refuri']
634            lineno = get_node_line(refnode)
635            uri_info = Hyperlink(uri, self.env.docname, lineno)
636            if uri not in hyperlinks:
637                hyperlinks[uri] = uri_info
638
639        # image nodes
640        for imgnode in self.document.traverse(nodes.image):
641            uri = imgnode['candidates'].get('?')
642            if uri and '://' in uri:
643                lineno = get_node_line(imgnode)
644                uri_info = Hyperlink(uri, self.env.docname, lineno)
645                if uri not in hyperlinks:
646                    hyperlinks[uri] = uri_info
647
648
649def setup(app: Sphinx) -> Dict[str, Any]:
650    app.add_builder(CheckExternalLinksBuilder)
651    app.add_post_transform(HyperlinkCollector)
652
653    app.add_config_value('linkcheck_ignore', [], None)
654    app.add_config_value('linkcheck_auth', [], None)
655    app.add_config_value('linkcheck_request_headers', {}, None)
656    app.add_config_value('linkcheck_retries', 1, None)
657    app.add_config_value('linkcheck_timeout', None, None, [int])
658    app.add_config_value('linkcheck_workers', 5, None)
659    app.add_config_value('linkcheck_anchors', True, None)
660    # Anchors starting with ! are ignored since they are
661    # commonly used for dynamic pages
662    app.add_config_value('linkcheck_anchors_ignore', ["^!"], None)
663    app.add_config_value('linkcheck_rate_limit_timeout', 300.0, None)
664
665    return {
666        'version': 'builtin',
667        'parallel_read_safe': True,
668        'parallel_write_safe': True,
669    }
670