1#!/usr/bin/env python3
2
3
4""" ./QA/telegram.py - main QA script for telegram
5
6    This script performs a bunch of telegram tests under censored
7    network conditions and verifies that the measurement is consistent
8    with the expectations, by parsing the resulting JSONL. """
9
10import contextlib
11import json
12import os
13import shlex
14import subprocess
15import sys
16import time
17import urllib.parse
18
19sys.path.insert(0, ".")
20import common
21
22
23ALL_POP_IPS = (
24    "149.154.175.50",
25    "149.154.167.51",
26    "149.154.175.100",
27    "149.154.167.91",
28    "149.154.171.5",
29    "95.161.76.100",
30)
31
32
33def execute_jafar_and_return_validated_test_keys(ooni_exe, outfile, tag, args):
34    """ Executes jafar and returns the validated parsed test keys, or throws
35        an AssertionError if the result is not valid. """
36    tk = common.execute_jafar_and_miniooni(ooni_exe, outfile, "telegram", tag, args)
37    assert isinstance(tk["requests"], list)
38    assert len(tk["requests"]) > 0
39    for entry in tk["requests"]:
40        assert isinstance(entry, dict)
41        failure = entry["failure"]
42        assert isinstance(failure, str) or failure is None
43        assert isinstance(entry["request"], dict)
44        req = entry["request"]
45        common.check_maybe_binary_value(req["body"])
46        assert isinstance(req["headers"], dict)
47        for key, value in req["headers"].items():
48            assert isinstance(key, str)
49            common.check_maybe_binary_value(value)
50        assert isinstance(req["method"], str)
51        assert isinstance(entry["response"], dict)
52        resp = entry["response"]
53        common.check_maybe_binary_value(resp["body"])
54        assert isinstance(resp["code"], int)
55        if resp["headers"] is not None:
56            for key, value in resp["headers"].items():
57                assert isinstance(key, str)
58                common.check_maybe_binary_value(value)
59    assert isinstance(tk["tcp_connect"], list)
60    assert len(tk["tcp_connect"]) > 0
61    for entry in tk["tcp_connect"]:
62        assert isinstance(entry, dict)
63        assert isinstance(entry["ip"], str)
64        assert isinstance(entry["port"], int)
65        assert isinstance(entry["status"], dict)
66        failure = entry["status"]["failure"]
67        success = entry["status"]["success"]
68        assert isinstance(failure, str) or failure is None
69        assert isinstance(success, bool)
70    return tk
71
72
73def args_for_blocking_all_pop_ips():
74    """ Returns the arguments useful for blocking all POPs IPs """
75    args = []
76    for ip in ALL_POP_IPS:
77        args.append("-iptables-reset-ip")
78        args.append(ip)
79    return args
80
81
82def args_for_blocking_web_telegram_org_http():
83    """ Returns arguments for blocking web.telegram.org over http """
84    return ["-iptables-reset-keyword", "Host: web.telegram.org"]
85
86
87def args_for_blocking_web_telegram_org_https():
88    """ Returns arguments for blocking web.telegram.org over https """
89    #
90    #  00 00          <SNI extension ID>
91    #  00 15          <full extension length>
92    #  00 13          <first entry length>
93    #  00             <DNS hostname type>
94    #  00 10          <string length>
95    #  77 65 ... 67   web.telegram.org
96    #
97    return [
98        "-iptables-reset-keyword-hex",
99        "|00 00 00 15 00 13 00 00 10 77 65 62 2e 74 65 6c 65 67 72 61 6d 2e 6f 72 67|",
100    ]
101
102
103def telegram_block_everything(ooni_exe, outfile):
104    """ Test case where everything we measure is blocked """
105    args = []
106    args.extend(args_for_blocking_all_pop_ips())
107    args.extend(args_for_blocking_web_telegram_org_https())
108    args.extend(args_for_blocking_web_telegram_org_http())
109    tk = execute_jafar_and_return_validated_test_keys(
110        ooni_exe, outfile, "telegram_block_everything", args,
111    )
112    assert tk["telegram_tcp_blocking"] == True
113    assert tk["telegram_http_blocking"] == True
114    assert tk["telegram_web_failure"] == "connection_reset"
115    assert tk["telegram_web_status"] == "blocked"
116
117
118def telegram_tcp_blocking_all(ooni_exe, outfile):
119    """ Test case where all POPs are TCP/IP blocked """
120    args = args_for_blocking_all_pop_ips()
121    tk = execute_jafar_and_return_validated_test_keys(
122        ooni_exe, outfile, "telegram_tcp_blocking_all", args
123    )
124    assert tk["telegram_tcp_blocking"] == True
125    assert tk["telegram_http_blocking"] == True
126    assert tk["telegram_web_failure"] == None
127    assert tk["telegram_web_status"] == "ok"
128
129
130def telegram_tcp_blocking_some(ooni_exe, outfile):
131    """ Test case where some POPs are TCP/IP blocked """
132    args = [
133        "-iptables-reset-ip",
134        ALL_POP_IPS[0],
135    ]
136    tk = execute_jafar_and_return_validated_test_keys(
137        ooni_exe, outfile, "telegram_tcp_blocking_some", args
138    )
139    assert tk["telegram_tcp_blocking"] == False
140    assert tk["telegram_http_blocking"] == False
141    assert tk["telegram_web_failure"] == None
142    assert tk["telegram_web_status"] == "ok"
143
144
145def telegram_http_blocking_all(ooni_exe, outfile):
146    """ Test case where all POPs are HTTP blocked """
147    args = []
148    for ip in ALL_POP_IPS:
149        args.append("-iptables-reset-keyword")
150        args.append(ip)
151    tk = execute_jafar_and_return_validated_test_keys(
152        ooni_exe, outfile, "telegram_http_blocking_all", args,
153    )
154    assert tk["telegram_tcp_blocking"] == False
155    assert tk["telegram_http_blocking"] == True
156    assert tk["telegram_web_failure"] == None
157    assert tk["telegram_web_status"] == "ok"
158
159
160def telegram_http_blocking_some(ooni_exe, outfile):
161    """ Test case where some POPs are HTTP blocked """
162    args = [
163        "-iptables-reset-keyword",
164        ALL_POP_IPS[0],
165    ]
166    tk = execute_jafar_and_return_validated_test_keys(
167        ooni_exe, outfile, "telegram_http_blocking_some", args,
168    )
169    assert tk["telegram_tcp_blocking"] == False
170    assert tk["telegram_http_blocking"] == False
171    assert tk["telegram_web_failure"] == None
172    assert tk["telegram_web_status"] == "ok"
173
174
175def telegram_web_failure_http(ooni_exe, outfile):
176    """ Test case where the web HTTP endpoint is blocked """
177    args = args_for_blocking_web_telegram_org_http()
178    tk = execute_jafar_and_return_validated_test_keys(
179        ooni_exe, outfile, "telegram_web_failure_http", args,
180    )
181    assert tk["telegram_tcp_blocking"] == False
182    assert tk["telegram_http_blocking"] == False
183    assert tk["telegram_web_failure"] == "connection_reset"
184    assert tk["telegram_web_status"] == "blocked"
185
186
187def telegram_web_failure_https(ooni_exe, outfile):
188    """ Test case where the web HTTPS endpoint is blocked """
189    args = args_for_blocking_web_telegram_org_https()
190    tk = execute_jafar_and_return_validated_test_keys(
191        ooni_exe, outfile, "telegram_web_failure_https", args,
192    )
193    assert tk["telegram_tcp_blocking"] == False
194    assert tk["telegram_http_blocking"] == False
195    assert tk["telegram_web_failure"] == "connection_reset"
196    assert tk["telegram_web_status"] == "blocked"
197
198
199def main():
200    if len(sys.argv) != 2:
201        sys.exit("usage: %s /path/to/ooniprobelegacy-like/binary" % sys.argv[0])
202    outfile = "telegram.jsonl"
203    ooni_exe = sys.argv[1]
204    tests = [
205        telegram_block_everything,
206        telegram_tcp_blocking_all,
207        telegram_tcp_blocking_some,
208        telegram_http_blocking_all,
209        telegram_http_blocking_some,
210        telegram_web_failure_http,
211        telegram_web_failure_https,
212    ]
213    for test in tests:
214        test(ooni_exe, outfile)
215        time.sleep(7)
216
217
218if __name__ == "__main__":
219    main()
220