1#!/usr/bin/env python3 2 3# Test for CVE-2018-12546, with the broker being stopped to write the persistence file. 4 5from mosq_test_helper import * 6import signal 7 8def write_config(filename, port, per_listener): 9 with open(filename, 'w') as f: 10 f.write("per_listener_settings %s\n" % (per_listener)) 11 f.write("check_retain_source true\n") 12 f.write("port %d\n" % (port)) 13 f.write("allow_anonymous true\n") 14 f.write("acl_file %s\n" % (filename.replace('.conf', '.acl'))) 15 f.write("persistence true\n") 16 f.write("persistence_file %s\n" % (filename.replace('.conf', '.db'))) 17 18def write_acl_1(filename, username): 19 with open(filename, 'w') as f: 20 if username is not None: 21 f.write('user %s\n' % (username)) 22 f.write('topic readwrite test/topic\n') 23 24def write_acl_2(filename, username): 25 with open(filename, 'w') as f: 26 if username is not None: 27 f.write('user %s\n' % (username)) 28 f.write('topic read test/topic\n') 29 30 31def do_test(proto_ver, per_listener, username): 32 conf_file = os.path.basename(__file__).replace('.py', '.conf') 33 write_config(conf_file, port, per_listener) 34 35 persistence_file = os.path.basename(__file__).replace('.py', '.db') 36 try: 37 os.remove(persistence_file) 38 except OSError: 39 pass 40 41 acl_file = os.path.basename(__file__).replace('.py', '.acl') 42 write_acl_1(acl_file, username) 43 44 45 rc = 1 46 keepalive = 60 47 connect_packet = mosq_test.gen_connect("retain-check", keepalive=keepalive, username=username, proto_ver=proto_ver) 48 connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) 49 50 mid = 1 51 publish_packet = mosq_test.gen_publish("test/topic", qos=0, payload="retained message", retain=True, proto_ver=proto_ver) 52 subscribe_packet = mosq_test.gen_subscribe(mid, "test/topic", 0, proto_ver=proto_ver) 53 suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver) 54 55 broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) 56 57 try: 58 sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) 59 sock.send(publish_packet) 60 sock.close() 61 62 sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) 63 mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 1") 64 65 mosq_test.expect_packet(sock, "publish", publish_packet) 66 sock.close() 67 68 # Remove "write" ability 69 write_acl_2(acl_file, username) 70 broker.terminate() 71 broker.wait() 72 73 broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) 74 75 sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) 76 mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 2") 77 # If we receive the retained message here, it is a failure. 78 mosq_test.do_ping(sock) 79 rc = 0 80 81 sock.close() 82 except mosq_test.TestError: 83 pass 84 finally: 85 broker.terminate() 86 broker.wait() 87 os.remove(conf_file) 88 os.remove(acl_file) 89 os.remove(persistence_file) 90 (stdo, stde) = broker.communicate() 91 if rc: 92 print(stde.decode('utf-8')) 93 exit(rc) 94 95 96port = mosq_test.get_port() 97do_test(proto_ver=4, per_listener="true", username=None) 98do_test(proto_ver=4, per_listener="true", username="test") 99do_test(proto_ver=4, per_listener="false", username=None) 100do_test(proto_ver=4, per_listener="false", username="test") 101 102do_test(proto_ver=5, per_listener="true", username=None) 103do_test(proto_ver=5, per_listener="true", username="test") 104do_test(proto_ver=5, per_listener="false", username=None) 105do_test(proto_ver=5, per_listener="false", username="test") 106