1#!/usr/bin/env python3 2 3 4""" ./QA/telegram.py - main QA script for telegram 5 6 This script performs a bunch of telegram tests under censored 7 network conditions and verifies that the measurement is consistent 8 with the expectations, by parsing the resulting JSONL. """ 9 10import contextlib 11import json 12import os 13import shlex 14import subprocess 15import sys 16import time 17import urllib.parse 18 19sys.path.insert(0, ".") 20import common 21 22 23ALL_POP_IPS = ( 24 "149.154.175.50", 25 "149.154.167.51", 26 "149.154.175.100", 27 "149.154.167.91", 28 "149.154.171.5", 29 "95.161.76.100", 30) 31 32 33def execute_jafar_and_return_validated_test_keys(ooni_exe, outfile, tag, args): 34 """ Executes jafar and returns the validated parsed test keys, or throws 35 an AssertionError if the result is not valid. """ 36 tk = common.execute_jafar_and_miniooni(ooni_exe, outfile, "telegram", tag, args) 37 assert isinstance(tk["requests"], list) 38 assert len(tk["requests"]) > 0 39 for entry in tk["requests"]: 40 assert isinstance(entry, dict) 41 failure = entry["failure"] 42 assert isinstance(failure, str) or failure is None 43 assert isinstance(entry["request"], dict) 44 req = entry["request"] 45 common.check_maybe_binary_value(req["body"]) 46 assert isinstance(req["headers"], dict) 47 for key, value in req["headers"].items(): 48 assert isinstance(key, str) 49 common.check_maybe_binary_value(value) 50 assert isinstance(req["method"], str) 51 assert isinstance(entry["response"], dict) 52 resp = entry["response"] 53 common.check_maybe_binary_value(resp["body"]) 54 assert isinstance(resp["code"], int) 55 if resp["headers"] is not None: 56 for key, value in resp["headers"].items(): 57 assert isinstance(key, str) 58 common.check_maybe_binary_value(value) 59 assert isinstance(tk["tcp_connect"], list) 60 assert len(tk["tcp_connect"]) > 0 61 for entry in tk["tcp_connect"]: 62 assert isinstance(entry, dict) 63 assert isinstance(entry["ip"], str) 64 assert isinstance(entry["port"], int) 65 assert isinstance(entry["status"], dict) 66 failure = entry["status"]["failure"] 67 success = entry["status"]["success"] 68 assert isinstance(failure, str) or failure is None 69 assert isinstance(success, bool) 70 return tk 71 72 73def args_for_blocking_all_pop_ips(): 74 """ Returns the arguments useful for blocking all POPs IPs """ 75 args = [] 76 for ip in ALL_POP_IPS: 77 args.append("-iptables-reset-ip") 78 args.append(ip) 79 return args 80 81 82def args_for_blocking_web_telegram_org_http(): 83 """ Returns arguments for blocking web.telegram.org over http """ 84 return ["-iptables-reset-keyword", "Host: web.telegram.org"] 85 86 87def args_for_blocking_web_telegram_org_https(): 88 """ Returns arguments for blocking web.telegram.org over https """ 89 # 90 # 00 00 <SNI extension ID> 91 # 00 15 <full extension length> 92 # 00 13 <first entry length> 93 # 00 <DNS hostname type> 94 # 00 10 <string length> 95 # 77 65 ... 67 web.telegram.org 96 # 97 return [ 98 "-iptables-reset-keyword-hex", 99 "|00 00 00 15 00 13 00 00 10 77 65 62 2e 74 65 6c 65 67 72 61 6d 2e 6f 72 67|", 100 ] 101 102 103def telegram_block_everything(ooni_exe, outfile): 104 """ Test case where everything we measure is blocked """ 105 args = [] 106 args.extend(args_for_blocking_all_pop_ips()) 107 args.extend(args_for_blocking_web_telegram_org_https()) 108 args.extend(args_for_blocking_web_telegram_org_http()) 109 tk = execute_jafar_and_return_validated_test_keys( 110 ooni_exe, outfile, "telegram_block_everything", args, 111 ) 112 assert tk["telegram_tcp_blocking"] == True 113 assert tk["telegram_http_blocking"] == True 114 assert tk["telegram_web_failure"] == "connection_reset" 115 assert tk["telegram_web_status"] == "blocked" 116 117 118def telegram_tcp_blocking_all(ooni_exe, outfile): 119 """ Test case where all POPs are TCP/IP blocked """ 120 args = args_for_blocking_all_pop_ips() 121 tk = execute_jafar_and_return_validated_test_keys( 122 ooni_exe, outfile, "telegram_tcp_blocking_all", args 123 ) 124 assert tk["telegram_tcp_blocking"] == True 125 assert tk["telegram_http_blocking"] == True 126 assert tk["telegram_web_failure"] == None 127 assert tk["telegram_web_status"] == "ok" 128 129 130def telegram_tcp_blocking_some(ooni_exe, outfile): 131 """ Test case where some POPs are TCP/IP blocked """ 132 args = [ 133 "-iptables-reset-ip", 134 ALL_POP_IPS[0], 135 ] 136 tk = execute_jafar_and_return_validated_test_keys( 137 ooni_exe, outfile, "telegram_tcp_blocking_some", args 138 ) 139 assert tk["telegram_tcp_blocking"] == False 140 assert tk["telegram_http_blocking"] == False 141 assert tk["telegram_web_failure"] == None 142 assert tk["telegram_web_status"] == "ok" 143 144 145def telegram_http_blocking_all(ooni_exe, outfile): 146 """ Test case where all POPs are HTTP blocked """ 147 args = [] 148 for ip in ALL_POP_IPS: 149 args.append("-iptables-reset-keyword") 150 args.append(ip) 151 tk = execute_jafar_and_return_validated_test_keys( 152 ooni_exe, outfile, "telegram_http_blocking_all", args, 153 ) 154 assert tk["telegram_tcp_blocking"] == False 155 assert tk["telegram_http_blocking"] == True 156 assert tk["telegram_web_failure"] == None 157 assert tk["telegram_web_status"] == "ok" 158 159 160def telegram_http_blocking_some(ooni_exe, outfile): 161 """ Test case where some POPs are HTTP blocked """ 162 args = [ 163 "-iptables-reset-keyword", 164 ALL_POP_IPS[0], 165 ] 166 tk = execute_jafar_and_return_validated_test_keys( 167 ooni_exe, outfile, "telegram_http_blocking_some", args, 168 ) 169 assert tk["telegram_tcp_blocking"] == False 170 assert tk["telegram_http_blocking"] == False 171 assert tk["telegram_web_failure"] == None 172 assert tk["telegram_web_status"] == "ok" 173 174 175def telegram_web_failure_http(ooni_exe, outfile): 176 """ Test case where the web HTTP endpoint is blocked """ 177 args = args_for_blocking_web_telegram_org_http() 178 tk = execute_jafar_and_return_validated_test_keys( 179 ooni_exe, outfile, "telegram_web_failure_http", args, 180 ) 181 assert tk["telegram_tcp_blocking"] == False 182 assert tk["telegram_http_blocking"] == False 183 assert tk["telegram_web_failure"] == "connection_reset" 184 assert tk["telegram_web_status"] == "blocked" 185 186 187def telegram_web_failure_https(ooni_exe, outfile): 188 """ Test case where the web HTTPS endpoint is blocked """ 189 args = args_for_blocking_web_telegram_org_https() 190 tk = execute_jafar_and_return_validated_test_keys( 191 ooni_exe, outfile, "telegram_web_failure_https", args, 192 ) 193 assert tk["telegram_tcp_blocking"] == False 194 assert tk["telegram_http_blocking"] == False 195 assert tk["telegram_web_failure"] == "connection_reset" 196 assert tk["telegram_web_status"] == "blocked" 197 198 199def main(): 200 if len(sys.argv) != 2: 201 sys.exit("usage: %s /path/to/ooniprobelegacy-like/binary" % sys.argv[0]) 202 outfile = "telegram.jsonl" 203 ooni_exe = sys.argv[1] 204 tests = [ 205 telegram_block_everything, 206 telegram_tcp_blocking_all, 207 telegram_tcp_blocking_some, 208 telegram_http_blocking_all, 209 telegram_http_blocking_some, 210 telegram_web_failure_http, 211 telegram_web_failure_https, 212 ] 213 for test in tests: 214 test(ooni_exe, outfile) 215 time.sleep(7) 216 217 218if __name__ == "__main__": 219 main() 220