1#! /usr/bin/env python3
2
3# run with python generate-domains-blocklist.py > list.txt.tmp && mv -f list.txt.tmp list
4
5from __future__ import print_function
6
7import argparse
8import re
9import sys
10import fnmatch
11
12try:
13    import urllib2 as urllib
14
15    URLLIB_NEW = False
16except (ImportError, ModuleNotFoundError):
17    import urllib.request as urllib
18    from urllib.request import Request
19
20    URLLIB_NEW = True
21
22
23log_info = sys.stderr
24log_err = sys.stderr
25
26
27def parse_trusted_list(content):
28    rx_comment = re.compile(r"^(#|$)")
29    rx_inline_comment = re.compile(r"\s*#\s*[a-z0-9-].*$")
30    rx_trusted = re.compile(r"^([*a-z0-9.-]+)\s*(@\S+)?$")
31    rx_timed = re.compile(r".+\s*@\S+$")
32
33    names = set()
34    time_restrictions = {}
35    globs = set()
36    rx_set = [rx_trusted]
37    for line in content.splitlines():
38        line = str.lower(str.strip(line))
39        if rx_comment.match(line):
40            continue
41        line = str.strip(rx_inline_comment.sub("", line))
42        if is_glob(line) and not rx_timed.match(line):
43            globs.add(line)
44            names.add(line)
45            continue
46        for rx in rx_set:
47            matches = rx.match(line)
48            if not matches:
49                continue
50            name = matches.group(1)
51            names.add(name)
52            time_restriction = matches.group(2)
53            if time_restriction:
54                time_restrictions[name] = time_restriction
55    return names, time_restrictions, globs
56
57
58def parse_list(content, trusted=False):
59    if trusted:
60        return parse_trusted_list(content)
61
62    rx_comment = re.compile(r"^(#|$)")
63    rx_inline_comment = re.compile(r"\s*#\s*[a-z0-9-].*$")
64    rx_u = re.compile(
65        r"^@*\|\|([a-z0-9][a-z0-9.-]*[.][a-z]{2,})\^?(\$(popup|third-party))?$"
66    )
67    rx_l = re.compile(r"^([a-z0-9][a-z0-9.-]*[.][a-z]{2,})$")
68    rx_h = re.compile(
69        r"^[0-9]{1,3}[.][0-9]{1,3}[.][0-9]{1,3}[.][0-9]{1,3}\s+([a-z0-9][a-z0-9.-]*[.][a-z]{2,})$"
70    )
71    rx_mdl = re.compile(r'^"[^"]+","([a-z0-9][a-z0-9.-]*[.][a-z]{2,})",')
72    rx_b = re.compile(r"^([a-z0-9][a-z0-9.-]*[.][a-z]{2,}),.+,[0-9: /-]+,")
73    rx_dq = re.compile(r"^address=/([a-z0-9][a-z0-9.-]*[.][a-z]{2,})/.")
74
75    names = set()
76    time_restrictions = {}
77    globs = set()
78    rx_set = [rx_u, rx_l, rx_h, rx_mdl, rx_b, rx_dq]
79    for line in content.splitlines():
80        line = str.lower(str.strip(line))
81        if rx_comment.match(line):
82            continue
83        line = str.strip(rx_inline_comment.sub("", line))
84        for rx in rx_set:
85            matches = rx.match(line)
86            if not matches:
87                continue
88            name = matches.group(1)
89            names.add(name)
90    return names, time_restrictions, globs
91
92
93def print_restricted_name(output_fd, name, time_restrictions):
94    if name in time_restrictions:
95        print("{}\t{}".format(name, time_restrictions[name]), file=output_fd, end="\n")
96    else:
97        print(
98            "# ignored: [{}] was in the time-restricted list, "
99            "but without a time restriction label".format(name),
100            file=output_fd,
101            end="\n",
102        )
103
104
105def load_from_url(url):
106    log_info.write("Loading data from [{}]\n".format(url))
107    req = urllib.Request(url=url, headers={"User-Agent": "dnscrypt-proxy"})
108    trusted = False
109
110    if URLLIB_NEW:
111        req_type = req.type
112    else:
113        req_type = req.get_type()
114    if req_type == "file":
115        trusted = True
116
117    response = None
118    try:
119        response = urllib.urlopen(req, timeout=int(args.timeout))
120    except urllib.URLError as err:
121        raise Exception("[{}] could not be loaded: {}\n".format(url, err))
122    if trusted is False and response.getcode() != 200:
123        raise Exception("[{}] returned HTTP code {}\n".format(url, response.getcode()))
124    content = response.read()
125    if URLLIB_NEW:
126        content = content.decode("utf-8", errors="replace")
127
128    return content, trusted
129
130
131def name_cmp(name):
132    parts = name.split(".")
133    parts.reverse()
134    return str.join(".", parts)
135
136
137def is_glob(pattern):
138    maybe_glob = False
139    for i in range(len(pattern)):
140        c = pattern[i]
141        if c == "?" or c == "[":
142            maybe_glob = True
143        elif c == "*" and i != 0:
144            if i < len(pattern) - 1 or pattern[i - 1] == ".":
145                maybe_glob = True
146    if maybe_glob:
147        try:
148            fnmatch.fnmatch("example", pattern)
149            return True
150        except:
151            pass
152    return False
153
154
155def covered_by_glob(globs, name):
156    if name in globs:
157        return False
158    for glob in globs:
159        try:
160            if fnmatch.fnmatch(name, glob):
161                return True
162        except:
163            pass
164    return False
165
166
167def has_suffix(names, name):
168    parts = str.split(name, ".")
169    while parts:
170        parts = parts[1:]
171        if str.join(".", parts) in names:
172            return True
173
174    return False
175
176
177def allowlist_from_url(url):
178    if not url:
179        return set()
180    content, trusted = load_from_url(url)
181
182    names, _time_restrictions, _globs = parse_list(content, trusted)
183    return names
184
185
186def blocklists_from_config_file(
187    file, allowlist, time_restricted_url, ignore_retrieval_failure, output_file
188):
189    blocklists = {}
190    allowed_names = set()
191    all_names = set()
192    unique_names = set()
193    all_globs = set()
194
195    # Load conf & blocklists
196    with open(file) as fd:
197        for line in fd:
198            line = str.strip(line)
199            if str.startswith(line, "#") or line == "":
200                continue
201            url = line
202            try:
203                content, trusted = load_from_url(url)
204                names, _time_restrictions, globs = parse_list(content, trusted)
205                blocklists[url] = names
206                all_names |= names
207                all_globs |= globs
208            except Exception as e:
209                log_err.write(str(e))
210                if not ignore_retrieval_failure:
211                    exit(1)
212
213    # Time-based blocklist
214    if time_restricted_url and not re.match(r"^[a-z0-9]+:", time_restricted_url):
215        time_restricted_url = "file:" + time_restricted_url
216
217    output_fd = sys.stdout
218    if output_file:
219        output_fd = open(output_file, "w")
220
221    if time_restricted_url:
222        time_restricted_content, _trusted = load_from_url(time_restricted_url)
223        time_restricted_names, time_restrictions, _globs = parse_trusted_list(
224            time_restricted_content
225        )
226
227        if time_restricted_names:
228            print(
229                "########## Time-based blocklist ##########\n", file=output_fd, end="\n"
230            )
231            for name in time_restricted_names:
232                print_restricted_name(output_fd, name, time_restrictions)
233
234        # Time restricted names should be allowed, or they could be always blocked
235        allowed_names |= time_restricted_names
236
237    # Allowed list
238    if allowlist and not re.match(r"^[a-z0-9]+:", allowlist):
239        allowlist = "file:" + allowlist
240
241    allowed_names |= allowlist_from_url(allowlist)
242
243    # Process blocklists
244    for url, names in blocklists.items():
245        print(
246            "\n\n########## Blocklist from {} ##########\n".format(url),
247            file=output_fd,
248            end="\n",
249        )
250        ignored, glob_ignored, allowed = 0, 0, 0
251        list_names = list()
252        for name in names:
253            if covered_by_glob(all_globs, name):
254                glob_ignored = glob_ignored + 1
255            elif has_suffix(all_names, name) or name in unique_names:
256                ignored = ignored + 1
257            elif has_suffix(allowed_names, name) or name in allowed_names:
258                allowed = allowed + 1
259            else:
260                list_names.append(name)
261                unique_names.add(name)
262
263        list_names.sort(key=name_cmp)
264        if ignored:
265            print("# Ignored duplicates: {}".format(ignored), file=output_fd, end="\n")
266        if glob_ignored:
267            print(
268                "# Ignored due to overlapping local patterns: {}".format(glob_ignored),
269                file=output_fd,
270                end="\n",
271            )
272        if allowed:
273            print(
274                "# Ignored entries due to the allowlist: {}".format(allowed),
275                file=output_fd,
276                end="\n",
277            )
278        if ignored or glob_ignored or allowed:
279            print(file=output_fd, end="\n")
280        for name in list_names:
281            print(name, file=output_fd, end="\n")
282
283    output_fd.close()
284
285
286argp = argparse.ArgumentParser(
287    description="Create a unified blocklist from a set of local and remote files"
288)
289argp.add_argument(
290    "-c",
291    "--config",
292    default="domains-blocklist.conf",
293    help="file containing blocklist sources",
294)
295argp.add_argument(
296    "-w",
297    "--whitelist",
298    help=argparse.SUPPRESS,
299)
300argp.add_argument(
301    "-a",
302    "--allowlist",
303    default="domains-allowlist.txt",
304    help="file containing a set of names to exclude from the blocklist",
305)
306argp.add_argument(
307    "-r",
308    "--time-restricted",
309    default="domains-time-restricted.txt",
310    help="file containing a set of names to be time restricted",
311)
312argp.add_argument(
313    "-i",
314    "--ignore-retrieval-failure",
315    action="store_true",
316    help="generate list even if some urls couldn't be retrieved",
317)
318argp.add_argument(
319    "-o",
320    "--output-file",
321    default=None,
322    help="save generated blocklist to a text file with the provided file name",
323)
324argp.add_argument("-t", "--timeout", default=30, help="URL open timeout")
325
326args = argp.parse_args()
327
328whitelist = args.whitelist
329if whitelist:
330    print(
331        "The option to provide a set of names to exclude from the blocklist has been changed from -w to -a\n"
332    )
333    argp.print_help()
334    exit(1)
335
336conf = args.config
337allowlist = args.allowlist
338time_restricted = args.time_restricted
339ignore_retrieval_failure = args.ignore_retrieval_failure
340output_file = args.output_file
341if output_file:
342    log_info = sys.stdout
343
344blocklists_from_config_file(
345    conf, allowlist, time_restricted, ignore_retrieval_failure, output_file
346)
347