1 /*
2 * lws-minimal-mqtt-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 * Sakthi Kannan <saktr@amazon.com>
6 *
7 * This file is made available under the Creative Commons CC0 1.0
8 * Universal Public Domain Dedication.
9 */
10
11 #include <libwebsockets.h>
12 #include <string.h>
13 #include <signal.h>
14 #if defined(WIN32)
15 #define HAVE_STRUCT_TIMESPEC
16 #if defined(pid_t)
17 #undef pid_t
18 #endif
19 #endif
20 #include <pthread.h>
21 #include <assert.h>
22
23 #define COUNT 8
24
25 struct test_item {
26 struct lws_context *context;
27 struct lws *wsi;
28 lws_sorted_usec_list_t sul;
29 } items[COUNT];
30
31 enum {
32 STATE_SUBSCRIBE, /* subscribe to the topic */
33 STATE_WAIT_SUBACK,
34 STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
35 STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
36 STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
37 STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
38 STATE_UNSUBSCRIBE,
39 STATE_WAIT_UNSUBACK,
40
41 STATE_TEST_FINISH
42 };
43
44 static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
45 done, count = COUNT;
46
47 static const lws_retry_bo_t retry = {
48 .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
49 .secs_since_valid_hangup = 25, /* hangup if still idle secs */
50 };
51
52 static const lws_mqtt_client_connect_param_t client_connect_param = {
53 .client_id = NULL,
54 .keep_alive = 60,
55 .clean_start = 1,
56 .will_param = {
57 .topic = "good/bye",
58 .message = "sign-off",
59 .qos = 0,
60 .retain = 0,
61 },
62 .username = "lwsUser",
63 .password = "mySecretPassword",
64 };
65
66 static lws_mqtt_topic_elem_t topics[] = {
67 [0] = { .name = "test/topic0", .qos = QOS0 },
68 [1] = { .name = "test/topic1", .qos = QOS1 },
69 };
70
71 static lws_mqtt_subscribe_param_t sub_param = {
72 .topic = &topics[0],
73 .num_topics = LWS_ARRAY_SIZE(topics),
74 };
75
76 static const char * const test_string =
77 "No one would have believed in the last years of the nineteenth "
78 "century that this world was being watched keenly and closely by "
79 "intelligences greater than man's and yet as mortal as his own; that as "
80 "men busied themselves about their various concerns they were "
81 "scrutinised and studied, perhaps almost as narrowly as a man with a "
82 "microscope might scrutinise the transient creatures that swarm and "
83 "multiply in a drop of water. With infinite complacency men went to "
84 "and fro over this globe about their little affairs, serene in their "
85 "assurance of their empire over matter. It is possible that the "
86 "infusoria under the microscope do the same. No one gave a thought to "
87 "the older worlds of space as sources of human danger, or thought of "
88 "them only to dismiss the idea of life upon them as impossible or "
89 "improbable. It is curious to recall some of the mental habits of "
90 "those departed days. At most terrestrial men fancied there might be "
91 "other men upon Mars, perhaps inferior to themselves and ready to "
92 "welcome a missionary enterprise. Yet across the gulf of space, minds "
93 "that are to our minds as ours are to those of the beasts that perish, "
94 "intellects vast and cool and unsympathetic, regarded this earth with "
95 "envious eyes, and slowly and surely drew their plans against us. And "
96 "early in the twentieth century came the great disillusionment. ";
97
98 /* this reflects the length of the string above */
99 #define TEST_STRING_LEN 1337
100
101 struct pss {
102 lws_mqtt_publish_param_t pub_param;
103 int state;
104 size_t pos;
105 int retries;
106 };
107
108 static void
sigint_handler(int sig)109 sigint_handler(int sig)
110 {
111 interrupted = 1;
112 }
113
114 static int
connect_client(struct lws_context * context,struct test_item * item)115 connect_client(struct lws_context *context, struct test_item *item)
116 {
117 struct lws_client_connect_info i;
118
119 memset(&i, 0, sizeof i);
120
121 i.mqtt_cp = &client_connect_param;
122 i.opaque_user_data = item;
123 i.protocol = "test-mqtt";
124 i.address = "localhost";
125 i.host = "localhost";
126 i.pwsi = &item->wsi;
127 i.context = context;
128 i.method = "MQTT";
129 i.alpn = "mqtt";
130 i.port = 1883;
131
132 if (do_ssl) {
133 i.ssl_connection = LCCSCF_USE_SSL;
134 i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
135 i.port = 8883;
136 }
137
138 if (pipeline)
139 i.ssl_connection |= LCCSCF_PIPELINE;
140
141 if (!lws_client_connect_via_info(&i)) {
142 lwsl_err("%s: Client Connect Failed\n", __func__);
143
144 return 1;
145 }
146
147 return 0;
148 }
149
150 static void
start_conn(struct lws_sorted_usec_list * sul)151 start_conn(struct lws_sorted_usec_list *sul)
152 {
153 struct test_item *item = lws_container_of(sul, struct test_item, sul);
154
155 lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
156
157 if (connect_client(item->context, item))
158 interrupted = 1;
159 }
160
161
162 static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)163 system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
164 int current, int target)
165 {
166 struct lws_context *context = mgr->parent;
167 int n;
168
169 if (current != LWS_SYSTATE_OPERATIONAL ||
170 target != LWS_SYSTATE_OPERATIONAL)
171 return 0;
172
173 /*
174 * We delay trying to do the client connection until the protocols have
175 * been initialized for each vhost... this happens after we have network
176 * and time so we can judge tls cert validity.
177 *
178 * Stagger the connection attempts so we get some joining before the
179 * first has connected and some afterwards
180 */
181
182 for (n = 0; n < count; n++) {
183 items[n].context = context;
184 lws_sul_schedule(context, 0, &items[n].sul, start_conn,
185 n * stagger_us);
186 }
187
188 return 0;
189 }
190
191
192 static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)193 callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
194 void *user, void *in, size_t len)
195 {
196 struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
197 struct pss *pss = (struct pss *)user;
198 lws_mqtt_publish_param_t *pub;
199 size_t chunk;
200
201 switch (reason) {
202 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
203 lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
204 in ? (char *)in : "(null)");
205
206 if (++done == count)
207 goto finish_test;
208 break;
209
210 case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
211 lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
212
213 if (++done == count)
214 goto finish_test;
215 break;
216
217 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
218 lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
219 lws_callback_on_writable(wsi);
220
221 return 0;
222
223 case LWS_CALLBACK_MQTT_SUBSCRIBED:
224 lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
225
226 /* then we can get on with the actual test part */
227
228 pss->state++;
229 lws_callback_on_writable(wsi);
230 break;
231
232 case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
233 lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
234 __func__, (int)(item - &item[0]), wsi);
235 okay++;
236
237 if (++pss->state == STATE_TEST_FINISH) {
238 lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
239 __func__, (int)(item - &items[0]), okay, count);
240 /* We are done, request to close */
241 return -1;
242 }
243 break;
244
245 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
246
247 /*
248 * Extra WRITEABLE may appear here other than ones we asked
249 * for, so we must consult our own state to decide if we want
250 * to make use of the opportunity
251 */
252
253 switch (pss->state) {
254 case STATE_SUBSCRIBE:
255 lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
256
257 if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
258 lwsl_notice("%s: subscribe failed\n", __func__);
259
260 return -1;
261 }
262 pss->state++;
263 break;
264
265 case STATE_PUBLISH_QOS0:
266 case STATE_PUBLISH_QOS1:
267
268 lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
269
270 pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ?
271 "test/topic0" : "test/topic1";
272 pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
273 pss->pub_param.qos =
274 pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
275 pss->pub_param.payload_len = TEST_STRING_LEN;
276
277 /* We send the message out 300 bytes or less at at time */
278
279 chunk = 300;
280
281 if (chunk > TEST_STRING_LEN - pss->pos)
282 chunk = TEST_STRING_LEN - pss->pos;
283
284 lwsl_notice("%s: sending %d at +%d\n", __func__,
285 (int)chunk, (int)pss->pos);
286
287 if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
288 test_string + pss->pos, (uint32_t)chunk,
289 (pss->pos + chunk == TEST_STRING_LEN))) {
290 lwsl_notice("%s: publish failed\n", __func__);
291 return -1;
292 }
293
294 pss->pos += chunk;
295
296 if (pss->pos == TEST_STRING_LEN) {
297 lwsl_debug("%s: sent message\n", __func__);
298 pss->pos = 0;
299 pss->state++;
300 }
301 break;
302
303 case STATE_UNSUBSCRIBE:
304 lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
305 __func__, (int)(item - &item[0]), wsi);
306 pss->state++;
307 if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
308 lwsl_notice("%s: subscribe failed\n", __func__);
309 return -1;
310 }
311 break;
312 default:
313 break;
314 }
315
316 return 0;
317
318 case LWS_CALLBACK_MQTT_ACK:
319 lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
320 /*
321 * We can forget about the message we just sent, it's done.
322 *
323 * For our test, that's the indication we can close the wsi.
324 */
325
326 pss->state++;
327 if (pss->state != STATE_TEST_FINISH) {
328 lws_callback_on_writable(wsi);
329 break;
330 }
331
332 break;
333
334 case LWS_CALLBACK_MQTT_RESEND:
335 lwsl_user("%s: MQTT_RESEND\n", __func__);
336 /*
337 * We must resend the packet ID mentioned in len
338 */
339 if (++pss->retries == 3) {
340 lwsl_notice("%s: too many retries\n", __func__);
341 return 1; /* kill the connection */
342 }
343 pss->state--;
344 pss->pos = 0;
345 break;
346
347 case LWS_CALLBACK_MQTT_CLIENT_RX:
348 pub = (lws_mqtt_publish_param_t *)in;
349 assert(pub);
350 lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
351 (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
352 (int)pub->payload_len, (int)len);
353
354 //lwsl_hexdump_info(pub->payload, len);
355
356 return 0;
357
358 default:
359 break;
360 }
361
362 return 0;
363
364 finish_test:
365 interrupted = 1;
366 lws_cancel_service(lws_get_context(wsi));
367
368 return 0;
369 }
370
371 static const struct lws_protocols protocols[] = {
372 {
373 .name = "test-mqtt",
374 .callback = callback_mqtt,
375 .per_session_data_size = sizeof(struct pss)
376 },
377 { NULL, NULL, 0, 0 }
378 };
379
main(int argc,const char ** argv)380 int main(int argc, const char **argv)
381 {
382 lws_state_notify_link_t notifier = { {}, system_notify_cb, "app" };
383 lws_state_notify_link_t *na[] = { ¬ifier, NULL };
384 struct lws_context_creation_info info;
385 struct lws_context *context;
386 const char *p;
387 int n = 0;
388
389 signal(SIGINT, sigint_handler);
390 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
391 lws_cmdline_option_handle_builtin(argc, argv, &info);
392
393 do_ssl = !!lws_cmdline_option(argc, argv, "-s");
394 if (do_ssl)
395 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
396
397 if (lws_cmdline_option(argc, argv, "-p"))
398 pipeline = 1;
399
400 if ((p = lws_cmdline_option(argc, argv, "-i")))
401 stagger_us = atoi(p);
402
403 if ((p = lws_cmdline_option(argc, argv, "-c")))
404 count = atoi(p);
405
406 if (count > COUNT) {
407 count = COUNT;
408 lwsl_err("%s: clipped count at max %d\n", __func__, count);
409 }
410
411 lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
412 do_ssl ? "tls enabled": "unencrypted");
413
414 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
415 info.protocols = protocols;
416 info.register_notifier_list = na;
417 info.fd_limit_per_thread = 1 + COUNT + 1;
418 info.retry_and_idle_policy = &retry;
419
420 #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
421 /*
422 * OpenSSL uses the system trust store. mbedTLS has to be told which
423 * CA to trust explicitly.
424 */
425 info.client_ssl_ca_filepath = "./mosq-ca.crt";
426 #endif
427
428 context = lws_create_context(&info);
429 if (!context) {
430 lwsl_err("lws init failed\n");
431 return 1;
432 }
433
434 /* Event loop */
435 while (n >= 0 && !interrupted)
436 n = lws_service(context, 0);
437
438 lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
439 okay != count ? "failed" : "OK");
440 lws_context_destroy(context);
441
442 return okay != count;
443 }
444