1from __future__ import unicode_literals
2import argparse
3from codecs import open as codecs_open
4from functools import lru_cache
5# codecs_open = open
6from os.path import isabs
7import sys
8from typing import Dict, Type, Union, Tuple, List, Optional
9from urllib.parse import urlsplit, SplitResult
10
11from .base import BaseTLDSourceParser
12from .exceptions import (
13    TldBadUrl,
14    TldDomainNotFound,
15    TldImproperlyConfigured,
16    TldIOError,
17)
18from .helpers import project_dir
19from .trie import Trie
20from .registry import Registry
21from .result import Result
22
23__author__ = 'Artur Barseghyan'
24__copyright__ = '2013-2020 Artur Barseghyan'
25__license__ = 'MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later'
26__all__ = (
27    'BaseMozillaTLDSourceParser',
28    'get_fld',
29    'get_tld',
30    'get_tld_names',
31    'get_tld_names_container',
32    'is_tld',
33    'MozillaTLDSourceParser',
34    'parse_tld',
35    'pop_tld_names_container',
36    'process_url',
37    'reset_tld_names',
38    'Result',
39    'tld_names',
40    'update_tld_names',
41    'update_tld_names_cli',
42    'update_tld_names_container',
43)
44
45tld_names: Dict[str, Trie] = {}
46
47
48def get_tld_names_container() -> Dict[str, Trie]:
49    """Get container of all tld names.
50
51    :return:
52    :rtype dict:
53    """
54    global tld_names
55    return tld_names
56
57
58def update_tld_names_container(tld_names_local_path: str,
59                               trie_obj: Trie) -> None:
60    """Update TLD Names container item.
61
62    :param tld_names_local_path:
63    :param trie_obj:
64    :return:
65    """
66    global tld_names
67    # tld_names.update({tld_names_local_path: trie_obj})
68    tld_names[tld_names_local_path] = trie_obj
69
70
71def pop_tld_names_container(tld_names_local_path: str) -> None:
72    """Remove TLD names container item.
73
74    :param tld_names_local_path:
75    :return:
76    """
77    global tld_names
78    tld_names.pop(tld_names_local_path, None)
79
80
81@lru_cache(maxsize=128, typed=True)
82def update_tld_names(
83    fail_silently: bool = False,
84    parser_uid: str = None
85) -> bool:
86    """Update TLD names.
87
88    :param fail_silently:
89    :param parser_uid:
90    :return:
91    """
92    results: List[bool] = []
93    results_append = results.append
94    if parser_uid:
95        parser_cls = Registry.get(parser_uid, None)
96        if parser_cls and parser_cls.source_url:
97            results_append(
98                parser_cls.update_tld_names(fail_silently=fail_silently)
99            )
100    else:
101        for parser_uid, parser_cls in Registry.items():
102            if parser_cls and parser_cls.source_url:
103                results_append(
104                    parser_cls.update_tld_names(fail_silently=fail_silently)
105                )
106
107    return all(results)
108
109
110def update_tld_names_cli() -> int:
111    """CLI wrapper for update_tld_names.
112
113    Since update_tld_names returns True on success, we need to negate the
114    result to match CLI semantics.
115    """
116    parser = argparse.ArgumentParser(description='Update TLD names')
117    parser.add_argument(
118        'parser_uid',
119        nargs='?',
120        default=None,
121        help="UID of the parser to update TLD names for.",
122    )
123    parser.add_argument(
124        '--fail-silently',
125        dest="fail_silently",
126        default=False,
127        action='store_true',
128        help="Fail silently",
129    )
130    args = parser.parse_args(sys.argv[1:])
131    parser_uid = args.parser_uid
132    fail_silently = args.fail_silently
133    return int(
134        not update_tld_names(
135            parser_uid=parser_uid,
136            fail_silently=fail_silently
137        )
138    )
139
140
141def get_tld_names(
142    fail_silently: bool = False,
143    retry_count: int = 0,
144    parser_class: Type[BaseTLDSourceParser] = None
145) -> Dict[str, Trie]:
146    """Build the ``tlds`` list if empty. Recursive.
147
148    :param fail_silently: If set to True, no exceptions are raised and None
149        is returned on failure.
150    :param retry_count: If greater than 1, we raise an exception in order
151        to avoid infinite loops.
152    :param parser_class:
153    :type fail_silently: bool
154    :type retry_count: int
155    :type parser_class: BaseTLDSourceParser
156    :return: List of TLD names
157    :rtype: obj:`tld.utils.Trie`
158    """
159    if not parser_class:
160        parser_class = MozillaTLDSourceParser
161
162    return parser_class.get_tld_names(
163        fail_silently=fail_silently,
164        retry_count=retry_count
165    )
166
167
168# **************************************************************************
169# **************************** Parser classes ******************************
170# **************************************************************************
171
172class BaseMozillaTLDSourceParser(BaseTLDSourceParser):
173
174    @classmethod
175    def get_tld_names(
176        cls,
177        fail_silently: bool = False,
178        retry_count: int = 0
179    ) -> Optional[Dict[str, Trie]]:
180        """Parse.
181
182        :param fail_silently:
183        :param retry_count:
184        :return:
185        """
186        if retry_count > 1:
187            if fail_silently:
188                return None
189            else:
190                raise TldIOError
191
192        global tld_names
193        _tld_names = tld_names
194        # _tld_names = get_tld_names_container()
195
196        # If already loaded, return
197        if (
198            cls.local_path in _tld_names
199            and _tld_names[cls.local_path] is not None
200        ):
201            return _tld_names
202
203        try:
204            # Load the TLD names file
205            if isabs(cls.local_path):
206                local_path = cls.local_path
207            else:
208                local_path = project_dir(cls.local_path)
209            local_file = codecs_open(
210                local_path,
211                'r',
212                encoding='utf8'
213            )
214            trie = Trie()
215            trie_add = trie.add  # Performance opt
216            # Make a list of it all, strip all garbage
217            private_section = False
218
219            for line in local_file:
220                if '===BEGIN PRIVATE DOMAINS===' in line:
221                    private_section = True
222
223                # Puny code TLD names
224                if '// xn--' in line:
225                    line = line.split()[1]
226
227                if line[0] in ('/', '\n'):
228                    continue
229
230                trie_add(
231                    f'{line.strip()}',
232                    private=private_section
233                )
234
235            update_tld_names_container(cls.local_path, trie)
236
237            local_file.close()
238        except IOError as err:
239            # Grab the file
240            cls.update_tld_names(
241                fail_silently=fail_silently
242            )
243            # Increment ``retry_count`` in order to avoid infinite loops
244            retry_count += 1
245            # Run again
246            return cls.get_tld_names(
247                fail_silently=fail_silently,
248                retry_count=retry_count
249            )
250        except Exception as err:
251            if fail_silently:
252                return None
253            else:
254                raise err
255        finally:
256            try:
257                local_file.close()
258            except Exception:
259                pass
260
261        return _tld_names
262
263
264class MozillaTLDSourceParser(BaseMozillaTLDSourceParser):
265    """Mozilla TLD source."""
266
267    uid: str = 'mozilla'
268    source_url: str = 'https://publicsuffix.org/list/public_suffix_list.dat'
269    local_path: str = 'res/effective_tld_names.dat.txt'
270
271# **************************************************************************
272# **************************** Core functions ******************************
273# **************************************************************************
274
275
276def process_url(
277    url: str,
278    fail_silently: bool = False,
279    fix_protocol: bool = False,
280    search_public: bool = True,
281    search_private: bool = True,
282    parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
283) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]:
284    """Process URL.
285
286    :param parser_class:
287    :param url:
288    :param fail_silently:
289    :param fix_protocol:
290    :param search_public:
291    :param search_private:
292    :return:
293    """
294    if not (search_public or search_private):
295        raise TldImproperlyConfigured(
296            "Either `search_public` or `search_private` (or both) shall be "
297            "set to True."
298        )
299
300    # Init
301    _tld_names = get_tld_names(
302        fail_silently=fail_silently,
303        parser_class=parser_class
304    )
305
306    if not isinstance(url, SplitResult):
307        url = url.lower()
308
309        if (
310            fix_protocol and not url.startswith(('//', 'http://', 'https://'))
311        ):
312            url = f'https://{url}'
313
314        # Get parsed URL as we might need it later
315        parsed_url = urlsplit(url)
316    else:
317        parsed_url = url
318
319    # Get (sub) domain name
320    domain_name = parsed_url.hostname
321
322    if not domain_name:
323        if fail_silently:
324            return None, None, parsed_url
325        else:
326            raise TldBadUrl(url=url)
327
328    # This will correctly handle dots at the end of domain name in URLs like
329    # https://github.com............/barseghyanartur/tld/
330    if domain_name.endswith('.'):
331        domain_name = domain_name.rstrip('.')
332
333    domain_parts = domain_name.split('.')
334    tld_names_local_path = parser_class.local_path
335
336    # Now we query our Trie iterating on the domain parts in reverse order
337    node = _tld_names[tld_names_local_path].root
338    current_length = 0
339    tld_length = 0
340    match = None
341    len_domain_parts = len(domain_parts)
342    for i in range(len_domain_parts-1, -1, -1):
343        part = domain_parts[i]
344
345        # Cannot go deeper
346        if node.children is None:
347            break
348
349        # Exception
350        if part == node.exception:
351            break
352
353        child = node.children.get(part)
354
355        # Wildcards
356        if child is None:
357            child = node.children.get('*')
358
359        # If the current part is not in current node's children, we can stop
360        if child is None:
361            break
362
363        # Else we move deeper and increment our tld offset
364        current_length += 1
365        node = child
366
367        if node.leaf:
368            tld_length = current_length
369            match = node
370
371    # Checking the node we finished on is a leaf and is one we allow
372    if (
373        (match is None) or
374        (not match.leaf) or
375        (not search_public and not match.private) or
376        (not search_private and match.private)
377    ):
378        if fail_silently:
379            return None, None, parsed_url
380        else:
381            raise TldDomainNotFound(domain_name=domain_name)
382
383    if len_domain_parts == tld_length:
384        non_zero_i = -1  # hostname = tld
385    else:
386        non_zero_i = max(1, len_domain_parts - tld_length)
387
388    return domain_parts, non_zero_i, parsed_url
389
390
391def get_fld(
392    url: str,
393    fail_silently: bool = False,
394    fix_protocol: bool = False,
395    search_public: bool = True,
396    search_private: bool = True,
397    parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser,
398    **kwargs
399) -> Optional[str]:
400    """Extract the first level domain.
401
402    Extract the top level domain based on the mozilla's effective TLD names
403    dat file. Returns a string. May throw ``TldBadUrl`` or
404    ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
405    match found respectively.
406
407    :param url: URL to get top level domain from.
408    :param fail_silently: If set to True, no exceptions are raised and None
409        is returned on failure.
410    :param fix_protocol: If set to True, missing or wrong protocol is
411        ignored (https is appended instead).
412    :param search_public: If set to True, search in public domains.
413    :param search_private: If set to True, search in private domains.
414    :param parser_class:
415    :type url: str
416    :type fail_silently: bool
417    :type fix_protocol: bool
418    :type search_public: bool
419    :type search_private: bool
420    :return: String with top level domain (if ``as_object`` argument
421        is set to False) or a ``tld.utils.Result`` object (if ``as_object``
422        argument is set to True); returns None on failure.
423    :rtype: str
424    """
425    if 'as_object' in kwargs:
426        raise TldImproperlyConfigured(
427            "`as_object` argument is deprecated for `get_fld`. Use `get_tld` "
428            "instead."
429        )
430
431    domain_parts, non_zero_i, parsed_url = process_url(
432        url=url,
433        fail_silently=fail_silently,
434        fix_protocol=fix_protocol,
435        search_public=search_public,
436        search_private=search_private,
437        parser_class=parser_class
438    )
439
440    if domain_parts is None:
441        return None
442
443    # This should be None when domain_parts is None
444    # but mypy isn't quite smart enough to figure that out yet
445    assert non_zero_i is not None
446    if non_zero_i < 0:
447        # hostname = tld
448        return parsed_url.hostname
449
450    return ".".join(domain_parts[non_zero_i-1:])
451
452
453def get_tld(
454    url: str,
455    fail_silently: bool = False,
456    as_object: bool = False,
457    fix_protocol: bool = False,
458    search_public: bool = True,
459    search_private: bool = True,
460    parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
461) -> Optional[Union[str, Result]]:
462    """Extract the top level domain.
463
464    Extract the top level domain based on the mozilla's effective TLD names
465    dat file. Returns a string. May throw ``TldBadUrl`` or
466    ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD
467    match found respectively.
468
469    :param url: URL to get top level domain from.
470    :param fail_silently: If set to True, no exceptions are raised and None
471        is returned on failure.
472    :param as_object: If set to True, ``tld.utils.Result`` object is returned,
473        ``domain``, ``suffix`` and ``tld`` properties.
474    :param fix_protocol: If set to True, missing or wrong protocol is
475        ignored (https is appended instead).
476    :param search_public: If set to True, search in public domains.
477    :param search_private: If set to True, search in private domains.
478    :param parser_class:
479    :type url: str
480    :type fail_silently: bool
481    :type as_object: bool
482    :type fix_protocol: bool
483    :type search_public: bool
484    :type search_private: bool
485    :return: String with top level domain (if ``as_object`` argument
486        is set to False) or a ``tld.utils.Result`` object (if ``as_object``
487        argument is set to True); returns None on failure.
488    :rtype: str
489    """
490    domain_parts, non_zero_i, parsed_url = process_url(
491        url=url,
492        fail_silently=fail_silently,
493        fix_protocol=fix_protocol,
494        search_public=search_public,
495        search_private=search_private,
496        parser_class=parser_class
497    )
498
499    if domain_parts is None:
500        return None
501
502    # This should be None when domain_parts is None
503    # but mypy isn't quite smart enough to figure that out yet
504    assert non_zero_i is not None
505
506    if not as_object:
507        if non_zero_i < 0:
508            # hostname = tld
509            return parsed_url.hostname
510        return ".".join(domain_parts[non_zero_i:])
511
512    if non_zero_i < 0:
513        # hostname = tld
514        subdomain = ""
515        domain = ""
516        # This is checked in process_url but the type is ambiguous (Optional[str])
517        # so this assertion is just to satisfy mypy
518        assert parsed_url.hostname is not None, "No hostname in URL"
519        _tld = parsed_url.hostname
520    else:
521        subdomain = ".".join(domain_parts[:non_zero_i-1])
522        domain = ".".join(
523            domain_parts[non_zero_i-1:non_zero_i]
524        )
525        _tld = ".".join(domain_parts[non_zero_i:])
526
527    return Result(
528        subdomain=subdomain,
529        domain=domain,
530        tld=_tld,
531        parsed_url=parsed_url
532    )
533
534
535def parse_tld(
536    url: str,
537    fail_silently: bool = False,
538    fix_protocol: bool = False,
539    search_public: bool = True,
540    search_private: bool = True,
541    parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
542) -> Union[Tuple[None, None, None], Tuple[str, str, str]]:
543    """Parse TLD into parts.
544
545    :param url:
546    :param fail_silently:
547    :param fix_protocol:
548    :param search_public:
549    :param search_private:
550    :param parser_class:
551    :return: Tuple (tld, domain, subdomain)
552    :rtype: tuple
553    """
554    try:
555        obj = get_tld(
556            url,
557            fail_silently=fail_silently,
558            as_object=True,
559            fix_protocol=fix_protocol,
560            search_public=search_public,
561            search_private=search_private,
562            parser_class=parser_class
563        )
564        if obj is None:
565            return None, None, None
566
567        return obj.tld, obj.domain, obj.subdomain  # type: ignore
568
569    except (
570        TldBadUrl,
571        TldDomainNotFound,
572        TldImproperlyConfigured,
573        TldIOError
574    ):
575        pass
576
577    return None, None, None
578
579
580def is_tld(
581    value: str,
582    search_public: bool = True,
583    search_private: bool = True,
584    parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser
585) -> bool:
586    """Check if given URL is tld.
587
588    :param value: URL to get top level domain from.
589    :param search_public: If set to True, search in public domains.
590    :param search_private: If set to True, search in private domains.
591    :param parser_class:
592    :type value: str
593    :type search_public: bool
594    :type search_private: bool
595    :return:
596    :rtype: bool
597    """
598    _tld = get_tld(
599        url=value,
600        fail_silently=True,
601        fix_protocol=True,
602        search_public=search_public,
603        search_private=search_private,
604        parser_class=parser_class
605    )
606    return value == _tld
607
608
609def reset_tld_names(tld_names_local_path: str = None) -> None:
610    """Reset the ``tld_names`` to empty value.
611
612    If ``tld_names_local_path`` is given, removes specified
613    entry from ``tld_names`` instead.
614
615    :param tld_names_local_path:
616    :type tld_names_local_path: str
617    :return:
618    """
619
620    if tld_names_local_path:
621        pop_tld_names_container(tld_names_local_path)
622    else:
623        global tld_names
624        tld_names = {}
625