1#!/usr/bin/env python3 2 3# Check access 4 5from mosq_test_helper import * 6 7def write_config(filename, port, per_listener): 8 with open(filename, 'w') as f: 9 f.write("per_listener_settings %s\n" % (per_listener)) 10 f.write("port %d\n" % (port)) 11 f.write("allow_anonymous true\n") 12 f.write("acl_file %s\n" % (filename.replace('.conf', '.acl'))) 13 14def write_acl(filename, global_en, user_en, pattern_en): 15 with open(filename, 'w') as f: 16 if global_en: 17 f.write('topic readwrite topic/global/#\n') 18 f.write('topic deny topic/global/except\n') 19 if user_en: 20 f.write('user username\n') 21 f.write('topic readwrite topic/username/#\n') 22 f.write('topic deny topic/username/except\n') 23 if pattern_en: 24 f.write('pattern readwrite pattern/%u/#\n') 25 f.write('pattern deny pattern/%u/except\n') 26 27 28 29def single_test(port, per_listener, username, topic, expect_deny): 30 rc = 1 31 32 conf_file = os.path.basename(__file__).replace('.py', '.conf') 33 write_config(conf_file, port, per_listener) 34 35 broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) 36 37 try: 38 keepalive = 60 39 connect_packet = mosq_test.gen_connect("acl-check", keepalive=keepalive, username=username) 40 connack_packet = mosq_test.gen_connack(rc=0) 41 42 mid = 1 43 subscribe_packet = mosq_test.gen_subscribe(mid=mid, topic=topic, qos=1) 44 suback_packet = mosq_test.gen_suback(mid=mid, qos=1) 45 46 mid = 2 47 publish1s_packet = mosq_test.gen_publish(topic=topic, mid=mid, qos=1, payload="message") 48 puback1s_packet = mosq_test.gen_puback(mid) 49 50 mid=1 51 publish1r_packet = mosq_test.gen_publish(topic=topic, mid=mid, qos=1, payload="message") 52 53 sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) 54 mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") 55 sock.send(publish1s_packet) 56 if expect_deny: 57 mosq_test.expect_packet(sock, "puback", puback1s_packet) 58 mosq_test.do_ping(sock) 59 else: 60 mosq_test.receive_unordered(sock, puback1s_packet, publish1r_packet, "puback / publish1r") 61 sock.close() 62 63 rc = 0 64 except mosq_test.TestError: 65 pass 66 finally: 67 os.remove(conf_file) 68 broker.terminate() 69 broker.wait() 70 (stdo, stde) = broker.communicate() 71 if rc: 72 print(stde.decode('utf-8')) 73 exit(rc) 74 75def acl_test(port, per_listener, global_en, user_en, pattern_en): 76 acl_file = os.path.basename(__file__).replace('.py', '.acl') 77 78 write_acl(acl_file, global_en=global_en, user_en=user_en, pattern_en=pattern_en) 79 80 if global_en: 81 single_test(port, per_listener, username=None, topic="topic/global", expect_deny=False) 82 single_test(port, per_listener, username="username", topic="topic/global", expect_deny=True) 83 single_test(port, per_listener, username=None, topic="topic/global/except", expect_deny=True) 84 if user_en: 85 single_test(port, per_listener, username=None, topic="topic/username", expect_deny=True) 86 single_test(port, per_listener, username="username", topic="topic/username", expect_deny=False) 87 single_test(port, per_listener, username="username", topic="topic/username/except", expect_deny=True) 88 if pattern_en: 89 single_test(port, per_listener, username=None, topic="pattern/username", expect_deny=True) 90 single_test(port, per_listener, username="username", topic="pattern/username", expect_deny=False) 91 single_test(port, per_listener, username="username", topic="pattern/username/except", expect_deny=True) 92 93def do_test(port, per_listener): 94 try: 95 acl_test(port, per_listener, global_en=False, user_en=False, pattern_en=True) 96 acl_test(port, per_listener, global_en=False, user_en=True, pattern_en=False) 97 acl_test(port, per_listener, global_en=True, user_en=False, pattern_en=False) 98 acl_test(port, per_listener, global_en=False, user_en=True, pattern_en=True) 99 acl_test(port, per_listener, global_en=True, user_en=False, pattern_en=True) 100 acl_test(port, per_listener, global_en=True, user_en=True, pattern_en=True) 101 finally: 102 acl_file = os.path.basename(__file__).replace('.py', '.acl') 103 os.remove(acl_file) 104 105port = mosq_test.get_port() 106 107do_test(port, "true") 108do_test(port, "false") 109