1# -*- coding: utf-8 -*- 2############################################################################ 3# 4# Copyright © 2014, 2015, 2016 OnlineGroups.net and Contributors. 5# All Rights Reserved. 6# 7# This software is subject to the provisions of the Zope Public License, 8# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. 9# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED 10# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 11# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 12# FOR A PARTICULAR PURPOSE. 13# 14# Original software part of https://github.com/groupserver/gs.dmarc/ 15# 16# This has been modified from the original software. 17# 01/24/2017 Valimail Inc 18# Contact: Gene Shuman <gene@valimail.com> 19# 20############################################################################ 21from __future__ import absolute_import, unicode_literals, print_function 22from pkg_resources import resource_filename # Part of setuptools 23try: 24 # typing is needed by mypy, but is unused otherwise 25 from typing import Dict, Text # noqa: F401 26except ImportError: 27 pass 28from dns.resolver import (query, NXDOMAIN, NoAnswer, NoNameservers) 29try: 30 from publicsuffix2 import PublicSuffixList 31except ImportError: 32 # Fall back to deprecated publicsuffix if publicsuffix2 is not available 33 from publicsuffix import PublicSuffixList 34import sys 35 36def answer_to_dict(answer): 37 # type: (Text) -> Dict[unicode, unicode] 38 '''Turn the DNS DMARC answer into a dict of tag:value pairs.''' 39 a = answer.strip('"').strip(' ') 40 rawTags = [t.split('=') for t in a.split(';') if t] 41 retval = {t[0].strip(): t[1].strip() for t in rawTags} 42 return retval 43 44def dns_query(name, qtype='TXT'): 45 try: 46 return query(name, qtype) 47 except (NXDOMAIN, NoAnswer, NoNameservers): 48 return None 49 50def lookup_receiver_record(host, dnsfunc=dns_query): 51 # type: (str), dnsfunc(optional) -> Dict[unicode, unicode] 52 '''Lookup the reciever policy for a host 53 :param str host: The host to query. The *actual* host that is queried has 54 ``_dmarc.`` prepended to it. 55 :param dnsfunc. a function from domain names to txt records for DNS lookup 56 :returns: The DMARC receiver record for the host. If there is no published 57 record then None is returned. 58 :rtype: {tag => value} 59 ''' 60 61 dmarcHost = '_dmarc.{0}'.format(host) 62 63 answer = dnsfunc(dmarcHost) 64 65 # This is because dns_query returns a dns.resolver.Answer object while the 66 # test suite dnsfunc returns a string (which does not have quotes on it 67 # like the dns.resolver object. 68 if dnsfunc != dns_query: 69 if answer: 70 answer = ['"' + answer + '"'] 71 72 if not answer: 73 return {} 74 else: 75 # Check that v= field is the first one in the answer (which is in 76 # double quotes) as per Section 7.1 (5): 77 # In particular, the "v=DMARC1" tag is mandatory and MUST appear 78 # first in the list. Discard any that do not pass this test. 79 # http://tools.ietf.org/html/draft-kucherawy-dmarc-base-04#section-7.1 80 if str(answer[0])[:9] == '"v=DMARC1': 81 tags = answer_to_dict(str(answer[0])) 82 return tags 83 else: 84 return {} # maybe raise exception instead? 85 86 87def receiver_record(host, dnsfunc=dns_query): 88 # type: (str), dnsfunc(optional) -> (Dict[unicode, unicode], is_subdomain) 89 '''Get the DMARC receiver record for a host. 90 :param str host: The host to lookup. 91 :param dnsfunc. a function from domain names to txt records for DNS lookup 92 :returns: The DMARC reciever record for the host. 93 :rtype: A dict of {tag => value} results 94 95 The :func:`receiver_record` function looks up the DMARC reciever record 96 for ``host``. If the host does not have a pubished record 97 `the organizational domain`_ is determined. The DMARC record for the 98 organizational domain is queried 99 (if specified) or the overall record for the domain is returned. 100 ''' 101 hostSansDmarc = host if host[:7] != '_dmarc.' else host[7:] 102 103 retval = lookup_receiver_record(hostSansDmarc, dnsfunc) 104 if retval: 105 return (retval, False) 106 107 # lookup for org_domain 108 newHost = get_org_domain(host) 109 retval = lookup_receiver_record(newHost, dnsfunc) 110 111 return (retval, True) 112 113 114def get_org_domain(domain): 115 fn = get_suffix_list_file_name() 116 with open(fn) as suffixList: 117 psl = PublicSuffixList(suffixList) 118 return psl.get_public_suffix(domain) 119 120 121def get_suffix_list_file_name(): 122 # type: () -> Text 123 '''Get the file name for the public-suffix list data file 124 125 :returns: The filename for the datafile in this module. 126 :rtype: ``str``''' 127 # TODO: automatically update the suffix list data file 128 # <https://publicsuffix.org/list/effective_tld_names.dat> 129 130 if sys.version_info < (3, 0): 131 try: 132 from authheaders.findpsl import location 133 except ImportError: 134 location = resource_filename('authheaders', 'public_suffix_list.txt') 135 else: 136 try: 137 from authheaders.findpsl import location 138 except ModuleNotFoundError: 139 location = resource_filename('authheaders', 'public_suffix_list.txt') 140 return location 141