1#!/usr/bin/env python3
2
3# Check access
4
5from mosq_test_helper import *
6
7def write_config(filename, port, per_listener):
8    with open(filename, 'w') as f:
9        f.write("per_listener_settings %s\n" % (per_listener))
10        f.write("port %d\n" % (port))
11        f.write("allow_anonymous true\n")
12        f.write("acl_file %s\n" % (filename.replace('.conf', '.acl')))
13
14def write_acl(filename, global_en, user_en, pattern_en):
15    with open(filename, 'w') as f:
16        if global_en:
17            f.write('topic readwrite topic/global/#\n')
18            f.write('topic deny      topic/global/except\n')
19        if user_en:
20            f.write('user username\n')
21            f.write('topic readwrite topic/username/#\n')
22            f.write('topic deny      topic/username/except\n')
23        if pattern_en:
24            f.write('pattern readwrite pattern/%u/#\n')
25            f.write('pattern deny      pattern/%u/except\n')
26
27
28
29def single_test(port, per_listener, username, topic, expect_deny):
30    rc = 1
31
32    conf_file = os.path.basename(__file__).replace('.py', '.conf')
33    write_config(conf_file, port, per_listener)
34
35    broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
36
37    try:
38        keepalive = 60
39        connect_packet = mosq_test.gen_connect("acl-check", keepalive=keepalive, username=username)
40        connack_packet = mosq_test.gen_connack(rc=0)
41
42        mid = 1
43        subscribe_packet = mosq_test.gen_subscribe(mid=mid, topic=topic, qos=1)
44        suback_packet = mosq_test.gen_suback(mid=mid, qos=1)
45
46        mid = 2
47        publish1s_packet = mosq_test.gen_publish(topic=topic, mid=mid, qos=1, payload="message")
48        puback1s_packet = mosq_test.gen_puback(mid)
49
50        mid=1
51        publish1r_packet = mosq_test.gen_publish(topic=topic, mid=mid, qos=1, payload="message")
52
53        sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
54        mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
55        sock.send(publish1s_packet)
56        if expect_deny:
57            mosq_test.expect_packet(sock, "puback", puback1s_packet)
58            mosq_test.do_ping(sock)
59        else:
60            mosq_test.receive_unordered(sock, puback1s_packet, publish1r_packet, "puback / publish1r")
61        sock.close()
62
63        rc = 0
64    except mosq_test.TestError:
65        pass
66    finally:
67        os.remove(conf_file)
68        broker.terminate()
69        broker.wait()
70        (stdo, stde) = broker.communicate()
71        if rc:
72            print(stde.decode('utf-8'))
73            exit(rc)
74
75def acl_test(port, per_listener, global_en, user_en, pattern_en):
76    acl_file = os.path.basename(__file__).replace('.py', '.acl')
77
78    write_acl(acl_file, global_en=global_en, user_en=user_en, pattern_en=pattern_en)
79
80    if global_en:
81        single_test(port, per_listener, username=None,       topic="topic/global", expect_deny=False)
82        single_test(port, per_listener, username="username", topic="topic/global", expect_deny=True)
83        single_test(port, per_listener, username=None,       topic="topic/global/except", expect_deny=True)
84    if user_en:
85        single_test(port, per_listener, username=None,       topic="topic/username", expect_deny=True)
86        single_test(port, per_listener, username="username", topic="topic/username", expect_deny=False)
87        single_test(port, per_listener, username="username", topic="topic/username/except", expect_deny=True)
88    if pattern_en:
89        single_test(port, per_listener, username=None,       topic="pattern/username", expect_deny=True)
90        single_test(port, per_listener, username="username", topic="pattern/username", expect_deny=False)
91        single_test(port, per_listener, username="username", topic="pattern/username/except", expect_deny=True)
92
93def do_test(port, per_listener):
94    try:
95        acl_test(port, per_listener, global_en=False, user_en=False, pattern_en=True)
96        acl_test(port, per_listener, global_en=False, user_en=True, pattern_en=False)
97        acl_test(port, per_listener, global_en=True, user_en=False, pattern_en=False)
98        acl_test(port, per_listener, global_en=False, user_en=True, pattern_en=True)
99        acl_test(port, per_listener, global_en=True, user_en=False, pattern_en=True)
100        acl_test(port, per_listener, global_en=True, user_en=True, pattern_en=True)
101    finally:
102        acl_file = os.path.basename(__file__).replace('.py', '.acl')
103        os.remove(acl_file)
104
105port = mosq_test.get_port()
106
107do_test(port, "true")
108do_test(port, "false")
109