1#!/usr/bin/env python3 2 3# Check whether an extended auth plugin can change the username of a client. 4 5from mosq_test_helper import * 6 7def write_config(filename, acl_file, port, per_listener): 8 with open(filename, 'w') as f: 9 f.write("per_listener_settings %s\n" % (per_listener)) 10 f.write("port %d\n" % (port)) 11 f.write("acl_file %s\n" % (acl_file)) 12 f.write("auth_plugin c/auth_plugin_extended_single.so\n") 13 14def write_acl(filename): 15 with open(filename, 'w') as f: 16 f.write('user new_username\n') 17 f.write('topic readwrite topic/one\n') 18 19port = mosq_test.get_port() 20conf_file = os.path.basename(__file__).replace('.py', '.conf') 21acl_file = os.path.basename(__file__).replace('.py', '.acl') 22 23def do_test(per_listener): 24 write_config(conf_file, acl_file, port, per_listener) 25 write_acl(acl_file) 26 rc = 1 27 28 # Connect without a username - this means no access 29 connect1_packet = mosq_test.gen_connect("client-params-test1", keepalive=42, proto_ver=5) 30 connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) 31 32 mid = 1 33 subscribe_packet = mosq_test.gen_subscribe(mid, "topic/one", 1, proto_ver=5) 34 suback_packet = mosq_test.gen_suback(mid, 1, proto_ver=5) 35 36 mid = 2 37 publish1_packet = mosq_test.gen_publish("topic/one", qos=1, mid=mid, payload="message", proto_ver=5) 38 puback1_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NOT_AUTHORIZED) 39 40 # Connect without a username, but have the plugin change it 41 props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "change") 42 connect2_packet = mosq_test.gen_connect("client-params-test2", keepalive=42, proto_ver=5, properties=props) 43 props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "change") 44 connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=props) 45 46 mid = 2 47 publish2s_packet = mosq_test.gen_publish("topic/one", qos=1, mid=mid, payload="message", proto_ver=5) 48 puback2s_packet = mosq_test.gen_puback(mid, proto_ver=5) 49 50 mid = 1 51 publish2r_packet = mosq_test.gen_publish("topic/one", qos=1, mid=mid, payload="message", proto_ver=5) 52 53 broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) 54 55 try: 56 sock = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=20, port=port) 57 mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback1") 58 mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1") 59 mosq_test.do_ping(sock) 60 sock.close() 61 62 sock = mosq_test.do_client_connect(connect2_packet, connack2_packet, timeout=20, port=port) 63 mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback2") 64 mosq_test.do_send_receive(sock, publish2s_packet, puback2s_packet, "puback2") 65 if mosq_test.expect_packet(sock, "publish2", publish2r_packet): 66 mosq_test.do_ping(sock) 67 rc = 0 68 69 sock.close() 70 71 finally: 72 os.remove(conf_file) 73 os.remove(acl_file) 74 broker.terminate() 75 broker.wait() 76 (stdo, stde) = broker.communicate() 77 if rc: 78 print(stde.decode('utf-8')) 79 exit(rc) 80 81 82do_test("true") 83do_test("false") 84exit(0) 85