1 /*
2 Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
3
4 All rights reserved. This program and the accompanying materials
5 are made available under the terms of the Eclipse Public License v1.0
6 and Eclipse Distribution License v1.0 which accompany this distribution.
7
8 The Eclipse Public License is available at
9 http://www.eclipse.org/legal/epl-v10.html
10 and the Eclipse Distribution License is available at
11 http://www.eclipse.org/org/documents/edl-v10.php.
12
13 Contributors:
14 Roger Light - initial implementation and documentation.
15 */
16
17 #include "config.h"
18
19 #include <assert.h>
20 #include <stdio.h>
21 #include <utlist.h>
22
23 #include "mosquitto_broker_internal.h"
24 #include "memory_mosq.h"
25 #include "send_mosq.h"
26 #include "sys_tree.h"
27 #include "time_mosq.h"
28 #include "util_mosq.h"
29
30 static unsigned long max_inflight_bytes = 0;
31 static int max_queued = 100;
32 static unsigned long max_queued_bytes = 0;
33
34 /**
35 * Is this context ready to take more in flight messages right now?
36 * @param context the client context of interest
37 * @param qos qos for the packet of interest
38 * @return true if more in flight are allowed.
39 */
db__ready_for_flight(struct mosquitto_msg_data * msgs,int qos)40 static bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos)
41 {
42 bool valid_bytes;
43 bool valid_count;
44
45 if(qos == 0 || (msgs->inflight_maximum == 0 && max_inflight_bytes == 0)){
46 return true;
47 }
48
49 valid_bytes = msgs->msg_bytes12 < max_inflight_bytes;
50 valid_count = msgs->inflight_quota > 0;
51
52 if(msgs->inflight_maximum == 0){
53 return valid_bytes;
54 }
55 if(max_inflight_bytes == 0){
56 return valid_count;
57 }
58
59 return valid_bytes && valid_count;
60 }
61
62
63 /**
64 * For a given client context, are more messages allowed to be queued?
65 * It is assumed that inflight checks and queue_qos0 checks have already
66 * been made.
67 * @param context client of interest
68 * @param qos destination qos for the packet of interest
69 * @return true if queuing is allowed, false if should be dropped
70 */
db__ready_for_queue(struct mosquitto * context,int qos,struct mosquitto_msg_data * msg_data)71 static bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data)
72 {
73 int source_count;
74 int adjust_count;
75 unsigned long source_bytes;
76 unsigned long adjust_bytes = max_inflight_bytes;
77
78 if(max_queued == 0 && max_queued_bytes == 0){
79 return true;
80 }
81
82 if(qos == 0){
83 source_bytes = msg_data->msg_bytes;
84 source_count = msg_data->msg_count;
85 }else{
86 source_bytes = msg_data->msg_bytes12;
87 source_count = msg_data->msg_count12;
88 }
89 adjust_count = msg_data->inflight_maximum;
90
91 /* nothing in flight for offline clients */
92 if(context->sock == INVALID_SOCKET){
93 adjust_bytes = 0;
94 adjust_count = 0;
95 }
96
97 bool valid_bytes = source_bytes - adjust_bytes < max_queued_bytes;
98 bool valid_count = source_count - adjust_count < max_queued;
99
100 if(max_queued_bytes == 0){
101 return valid_count;
102 }
103 if(max_queued == 0){
104 return valid_bytes;
105 }
106
107 return valid_bytes && valid_count;
108 }
109
110
db__open(struct mosquitto__config * config,struct mosquitto_db * db)111 int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
112 {
113 struct mosquitto__subhier *subhier;
114
115 if(!config || !db) return MOSQ_ERR_INVAL;
116
117 db->last_db_id = 0;
118
119 db->contexts_by_id = NULL;
120 db->contexts_by_sock = NULL;
121 db->contexts_for_free = NULL;
122 #ifdef WITH_BRIDGE
123 db->bridges = NULL;
124 db->bridge_count = 0;
125 #endif
126
127 // Initialize the hashtable
128 db->clientid_index_hash = NULL;
129
130 db->subs = NULL;
131
132 subhier = sub__add_hier_entry(NULL, &db->subs, "", strlen(""));
133 if(!subhier) return MOSQ_ERR_NOMEM;
134
135 subhier = sub__add_hier_entry(NULL, &db->subs, "$SYS", strlen("$SYS"));
136 if(!subhier) return MOSQ_ERR_NOMEM;
137
138 db->unpwd = NULL;
139
140 #ifdef WITH_PERSISTENCE
141 if(persist__restore(db)) return 1;
142 #endif
143
144 return MOSQ_ERR_SUCCESS;
145 }
146
subhier_clean(struct mosquitto_db * db,struct mosquitto__subhier ** subhier)147 static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **subhier)
148 {
149 struct mosquitto__subhier *peer, *subhier_tmp;
150 struct mosquitto__subleaf *leaf, *nextleaf;
151
152 HASH_ITER(hh, *subhier, peer, subhier_tmp){
153 leaf = peer->subs;
154 while(leaf){
155 nextleaf = leaf->next;
156 mosquitto__free(leaf);
157 leaf = nextleaf;
158 }
159 if(peer->retained){
160 db__msg_store_ref_dec(db, &peer->retained);
161 }
162 subhier_clean(db, &peer->children);
163 mosquitto__free(peer->topic);
164
165 HASH_DELETE(hh, *subhier, peer);
166 mosquitto__free(peer);
167 }
168 }
169
db__close(struct mosquitto_db * db)170 int db__close(struct mosquitto_db *db)
171 {
172 subhier_clean(db, &db->subs);
173 db__msg_store_clean(db);
174
175 return MOSQ_ERR_SUCCESS;
176 }
177
178
db__msg_store_add(struct mosquitto_db * db,struct mosquitto_msg_store * store)179 void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store)
180 {
181 store->next = db->msg_store;
182 store->prev = NULL;
183 if(db->msg_store){
184 db->msg_store->prev = store;
185 }
186 db->msg_store = store;
187 }
188
189
db__msg_store_remove(struct mosquitto_db * db,struct mosquitto_msg_store * store)190 void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store)
191 {
192 int i;
193
194 if(store->prev){
195 store->prev->next = store->next;
196 if(store->next){
197 store->next->prev = store->prev;
198 }
199 }else{
200 db->msg_store = store->next;
201 if(store->next){
202 store->next->prev = NULL;
203 }
204 }
205 db->msg_store_count--;
206 db->msg_store_bytes -= store->payloadlen;
207
208 mosquitto__free(store->source_id);
209 mosquitto__free(store->source_username);
210 if(store->dest_ids){
211 for(i=0; i<store->dest_id_count; i++){
212 mosquitto__free(store->dest_ids[i]);
213 }
214 mosquitto__free(store->dest_ids);
215 }
216 mosquitto__free(store->topic);
217 mosquitto_property_free_all(&store->properties);
218 UHPA_FREE_PAYLOAD(store);
219 mosquitto__free(store);
220 }
221
222
db__msg_store_clean(struct mosquitto_db * db)223 void db__msg_store_clean(struct mosquitto_db *db)
224 {
225 struct mosquitto_msg_store *store, *next;;
226
227 store = db->msg_store;
228 while(store){
229 next = store->next;
230 db__msg_store_remove(db, store);
231 store = next;
232 }
233 }
234
db__msg_store_ref_inc(struct mosquitto_msg_store * store)235 void db__msg_store_ref_inc(struct mosquitto_msg_store *store)
236 {
237 store->ref_count++;
238 }
239
db__msg_store_ref_dec(struct mosquitto_db * db,struct mosquitto_msg_store ** store)240 void db__msg_store_ref_dec(struct mosquitto_db *db, struct mosquitto_msg_store **store)
241 {
242 (*store)->ref_count--;
243 if((*store)->ref_count == 0){
244 db__msg_store_remove(db, *store);
245 *store = NULL;
246 }
247 }
248
249
db__msg_store_compact(struct mosquitto_db * db)250 void db__msg_store_compact(struct mosquitto_db *db)
251 {
252 struct mosquitto_msg_store *store, *next;
253
254 store = db->msg_store;
255 while(store){
256 next = store->next;
257 if(store->ref_count < 1){
258 db__msg_store_remove(db, store);
259 }
260 store = next;
261 }
262 }
263
264
db__message_remove(struct mosquitto_db * db,struct mosquitto_msg_data * msg_data,struct mosquitto_client_msg * item)265 static void db__message_remove(struct mosquitto_db *db, struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
266 {
267 if(!msg_data || !item){
268 return;
269 }
270
271 DL_DELETE(msg_data->inflight, item);
272 if(item->store){
273 msg_data->msg_count--;
274 msg_data->msg_bytes -= item->store->payloadlen;
275 if(item->qos > 0){
276 msg_data->msg_count12--;
277 msg_data->msg_bytes12 -= item->store->payloadlen;
278 }
279 db__msg_store_ref_dec(db, &item->store);
280 }
281
282 mosquitto_property_free_all(&item->properties);
283 mosquitto__free(item);
284 }
285
286
db__message_dequeue_first(struct mosquitto * context,struct mosquitto_msg_data * msg_data)287 void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data)
288 {
289 struct mosquitto_client_msg *msg;
290
291 msg = msg_data->queued;
292 DL_DELETE(msg_data->queued, msg);
293 DL_APPEND(msg_data->inflight, msg);
294 if(msg_data->inflight_quota > 0){
295 msg_data->inflight_quota--;
296 }
297 }
298
299
db__message_delete_outgoing(struct mosquitto_db * db,struct mosquitto * context,uint16_t mid,enum mosquitto_msg_state expect_state,int qos)300 int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos)
301 {
302 struct mosquitto_client_msg *tail, *tmp;
303 int msg_index = 0;
304
305 if(!context) return MOSQ_ERR_INVAL;
306
307 DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){
308 msg_index++;
309 if(tail->mid == mid){
310 if(tail->qos != qos){
311 return MOSQ_ERR_PROTOCOL;
312 }else if(qos == 2 && tail->state != expect_state){
313 return MOSQ_ERR_PROTOCOL;
314 }
315 msg_index--;
316 db__message_remove(db, &context->msgs_out, tail);
317 }
318 }
319
320 DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){
321 if(context->msgs_out.inflight_maximum != 0 && msg_index >= context->msgs_out.inflight_maximum){
322 break;
323 }
324
325 msg_index++;
326 tail->timestamp = mosquitto_time();
327 switch(tail->qos){
328 case 0:
329 tail->state = mosq_ms_publish_qos0;
330 break;
331 case 1:
332 tail->state = mosq_ms_publish_qos1;
333 break;
334 case 2:
335 tail->state = mosq_ms_publish_qos2;
336 break;
337 }
338 db__message_dequeue_first(context, &context->msgs_out);
339 }
340
341 return MOSQ_ERR_SUCCESS;
342 }
343
db__message_insert(struct mosquitto_db * db,struct mosquitto * context,uint16_t mid,enum mosquitto_msg_direction dir,int qos,bool retain,struct mosquitto_msg_store * stored,mosquitto_property * properties)344 int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties)
345 {
346 struct mosquitto_client_msg *msg;
347 struct mosquitto_msg_data *msg_data;
348 enum mosquitto_msg_state state = mosq_ms_invalid;
349 int rc = 0;
350 int i;
351 char **dest_ids;
352
353 assert(stored);
354 if(!context) return MOSQ_ERR_INVAL;
355 if(!context->id) return MOSQ_ERR_SUCCESS; /* Protect against unlikely "client is disconnected but not entirely freed" scenario */
356
357 if(dir == mosq_md_out){
358 msg_data = &context->msgs_out;
359 }else{
360 msg_data = &context->msgs_in;
361 }
362
363 /* Check whether we've already sent this message to this client
364 * for outgoing messages only.
365 * If retain==true then this is a stale retained message and so should be
366 * sent regardless. FIXME - this does mean retained messages will received
367 * multiple times for overlapping subscriptions, although this is only the
368 * case for SUBSCRIPTION with multiple subs in so is a minor concern.
369 */
370 if(context->protocol != mosq_p_mqtt5
371 && db->config->allow_duplicate_messages == false
372 && dir == mosq_md_out && retain == false && stored->dest_ids){
373
374 for(i=0; i<stored->dest_id_count; i++){
375 if(!strcmp(stored->dest_ids[i], context->id)){
376 /* We have already sent this message to this client. */
377 mosquitto_property_free_all(&properties);
378 return MOSQ_ERR_SUCCESS;
379 }
380 }
381 }
382 if(context->sock == INVALID_SOCKET){
383 /* Client is not connected only queue messages with QoS>0. */
384 if(qos == 0 && !db->config->queue_qos0_messages){
385 if(!context->bridge){
386 mosquitto_property_free_all(&properties);
387 return 2;
388 }else{
389 if(context->bridge->start_type != bst_lazy){
390 mosquitto_property_free_all(&properties);
391 return 2;
392 }
393 }
394 }
395 }
396
397 if(context->sock != INVALID_SOCKET){
398 if(db__ready_for_flight(msg_data, qos)){
399 if(dir == mosq_md_out){
400 switch(qos){
401 case 0:
402 state = mosq_ms_publish_qos0;
403 break;
404 case 1:
405 state = mosq_ms_publish_qos1;
406 break;
407 case 2:
408 state = mosq_ms_publish_qos2;
409 break;
410 }
411 }else{
412 if(qos == 2){
413 state = mosq_ms_wait_for_pubrel;
414 }else{
415 mosquitto_property_free_all(&properties);
416 return 1;
417 }
418 }
419 }else if(db__ready_for_queue(context, qos, msg_data)){
420 state = mosq_ms_queued;
421 rc = 2;
422 }else{
423 /* Dropping message due to full queue. */
424 if(context->is_dropping == false){
425 context->is_dropping = true;
426 log__printf(NULL, MOSQ_LOG_NOTICE,
427 "Outgoing messages are being dropped for client %s.",
428 context->id);
429 }
430 G_MSGS_DROPPED_INC();
431 mosquitto_property_free_all(&properties);
432 return 2;
433 }
434 }else{
435 if (db__ready_for_queue(context, qos, msg_data)){
436 state = mosq_ms_queued;
437 }else{
438 G_MSGS_DROPPED_INC();
439 if(context->is_dropping == false){
440 context->is_dropping = true;
441 log__printf(NULL, MOSQ_LOG_NOTICE,
442 "Outgoing messages are being dropped for client %s.",
443 context->id);
444 }
445 mosquitto_property_free_all(&properties);
446 return 2;
447 }
448 }
449 assert(state != mosq_ms_invalid);
450
451 #ifdef WITH_PERSISTENCE
452 if(state == mosq_ms_queued){
453 db->persistence_changes++;
454 }
455 #endif
456
457 msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg));
458 if(!msg) return MOSQ_ERR_NOMEM;
459 msg->prev = NULL;
460 msg->next = NULL;
461 msg->store = stored;
462 db__msg_store_ref_inc(msg->store);
463 msg->mid = mid;
464 msg->timestamp = mosquitto_time();
465 msg->direction = dir;
466 msg->state = state;
467 msg->dup = false;
468 if(qos > context->maximum_qos){
469 msg->qos = context->maximum_qos;
470 }else{
471 msg->qos = qos;
472 }
473 msg->retain = retain;
474 msg->properties = properties;
475
476 if(state == mosq_ms_queued){
477 DL_APPEND(msg_data->queued, msg);
478 }else{
479 DL_APPEND(msg_data->inflight, msg);
480 }
481 msg_data->msg_count++;
482 msg_data->msg_bytes+= msg->store->payloadlen;
483 if(qos > 0){
484 msg_data->msg_count12++;
485 msg_data->msg_bytes12 += msg->store->payloadlen;
486 }
487
488 if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
489 /* Record which client ids this message has been sent to so we can avoid duplicates.
490 * Outgoing messages only.
491 * If retain==true then this is a stale retained message and so should be
492 * sent regardless. FIXME - this does mean retained messages will received
493 * multiple times for overlapping subscriptions, although this is only the
494 * case for SUBSCRIPTION with multiple subs in so is a minor concern.
495 */
496 dest_ids = mosquitto__realloc(stored->dest_ids, sizeof(char *)*(stored->dest_id_count+1));
497 if(dest_ids){
498 stored->dest_ids = dest_ids;
499 stored->dest_id_count++;
500 stored->dest_ids[stored->dest_id_count-1] = mosquitto__strdup(context->id);
501 if(!stored->dest_ids[stored->dest_id_count-1]){
502 return MOSQ_ERR_NOMEM;
503 }
504 }else{
505 return MOSQ_ERR_NOMEM;
506 }
507 }
508 #ifdef WITH_BRIDGE
509 if(context->bridge && context->bridge->start_type == bst_lazy
510 && context->sock == INVALID_SOCKET
511 && context->msgs_out.msg_count >= context->bridge->threshold){
512
513 context->bridge->lazy_reconnect = true;
514 }
515 #endif
516
517 if(dir == mosq_md_out && msg->qos > 0){
518 util__decrement_send_quota(context);
519 }
520 #ifdef WITH_WEBSOCKETS
521 if(context->wsi && rc == 0){
522 return db__message_write(db, context);
523 }else{
524 return rc;
525 }
526 #else
527 return rc;
528 #endif
529 }
530
db__message_update_outgoing(struct mosquitto * context,uint16_t mid,enum mosquitto_msg_state state,int qos)531 int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos)
532 {
533 struct mosquitto_client_msg *tail;
534
535 DL_FOREACH(context->msgs_out.inflight, tail){
536 if(tail->mid == mid){
537 if(tail->qos != qos){
538 return MOSQ_ERR_PROTOCOL;
539 }
540 tail->state = state;
541 tail->timestamp = mosquitto_time();
542 return MOSQ_ERR_SUCCESS;
543 }
544 }
545 return MOSQ_ERR_NOT_FOUND;
546 }
547
548
db__messages_delete_list(struct mosquitto_db * db,struct mosquitto_client_msg ** head)549 void db__messages_delete_list(struct mosquitto_db *db, struct mosquitto_client_msg **head)
550 {
551 struct mosquitto_client_msg *tail, *tmp;
552
553 DL_FOREACH_SAFE(*head, tail, tmp){
554 DL_DELETE(*head, tail);
555 db__msg_store_ref_dec(db, &tail->store);
556 mosquitto_property_free_all(&tail->properties);
557 mosquitto__free(tail);
558 }
559 *head = NULL;
560 }
561
562
db__messages_delete(struct mosquitto_db * db,struct mosquitto * context)563 int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
564 {
565 if(!context) return MOSQ_ERR_INVAL;
566
567 db__messages_delete_list(db, &context->msgs_in.inflight);
568 db__messages_delete_list(db, &context->msgs_in.queued);
569 db__messages_delete_list(db, &context->msgs_out.inflight);
570 db__messages_delete_list(db, &context->msgs_out.queued);
571
572 context->msgs_in.msg_bytes = 0;
573 context->msgs_in.msg_bytes12 = 0;
574 context->msgs_in.msg_count = 0;
575 context->msgs_in.msg_count12 = 0;
576
577 context->msgs_out.msg_bytes = 0;
578 context->msgs_out.msg_bytes12 = 0;
579 context->msgs_out.msg_count = 0;
580 context->msgs_out.msg_count12 = 0;
581
582 return MOSQ_ERR_SUCCESS;
583 }
584
db__messages_easy_queue(struct mosquitto_db * db,struct mosquitto * context,const char * topic,int qos,uint32_t payloadlen,const void * payload,int retain,uint32_t message_expiry_interval,mosquitto_property ** properties)585 int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties)
586 {
587 struct mosquitto_msg_store *stored;
588 char *source_id;
589 char *topic_heap;
590 mosquitto__payload_uhpa payload_uhpa;
591 mosquitto_property *local_properties = NULL;
592 enum mosquitto_msg_origin origin;
593
594 assert(db);
595
596 payload_uhpa.ptr = NULL;
597
598 if(!topic) return MOSQ_ERR_INVAL;
599 topic_heap = mosquitto__strdup(topic);
600 if(!topic_heap) return MOSQ_ERR_INVAL;
601
602 if(db->config->retain_available == false){
603 retain = 0;
604 }
605
606 if(UHPA_ALLOC(payload_uhpa, payloadlen) == 0){
607 mosquitto__free(topic_heap);
608 return MOSQ_ERR_NOMEM;
609 }
610 memcpy(UHPA_ACCESS(payload_uhpa, payloadlen), payload, payloadlen);
611
612 if(context && context->id){
613 source_id = context->id;
614 }else{
615 source_id = "";
616 }
617 if(properties){
618 local_properties = *properties;
619 *properties = NULL;
620 }
621
622 if(context){
623 origin = mosq_mo_client;
624 }else{
625 origin = mosq_mo_broker;
626 }
627 if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1;
628
629 return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
630 }
631
632 /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
db__message_store(struct mosquitto_db * db,const struct mosquitto * source,uint16_t source_mid,char * topic,int qos,uint32_t payloadlen,mosquitto__payload_uhpa * payload,int retain,struct mosquitto_msg_store ** stored,uint32_t message_expiry_interval,mosquitto_property * properties,dbid_t store_id,enum mosquitto_msg_origin origin)633 int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
634 {
635 struct mosquitto_msg_store *temp = NULL;
636 int rc = MOSQ_ERR_SUCCESS;
637
638 assert(db);
639 assert(stored);
640
641 temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
642 if(!temp){
643 log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
644 rc = MOSQ_ERR_NOMEM;
645 goto error;
646 }
647
648 temp->topic = NULL;
649 temp->payload.ptr = NULL;
650
651 temp->ref_count = 0;
652 if(source && source->id){
653 temp->source_id = mosquitto__strdup(source->id);
654 }else{
655 temp->source_id = mosquitto__strdup("");
656 }
657 if(!temp->source_id){
658 log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
659 rc = MOSQ_ERR_NOMEM;
660 goto error;
661 }
662
663 if(source && source->username){
664 temp->source_username = mosquitto__strdup(source->username);
665 if(!temp->source_username){
666 rc = MOSQ_ERR_NOMEM;
667 goto error;
668 }
669 }
670 if(source){
671 temp->source_listener = source->listener;
672 }
673 temp->source_mid = source_mid;
674 temp->mid = 0;
675 temp->qos = qos;
676 temp->retain = retain;
677 temp->topic = topic;
678 topic = NULL;
679 temp->payloadlen = payloadlen;
680 temp->properties = properties;
681 temp->origin = origin;
682 if(payloadlen){
683 UHPA_MOVE(temp->payload, *payload, payloadlen);
684 }else{
685 temp->payload.ptr = NULL;
686 }
687 if(message_expiry_interval > 0){
688 temp->message_expiry_time = time(NULL) + message_expiry_interval;
689 }else{
690 temp->message_expiry_time = 0;
691 }
692
693 temp->dest_ids = NULL;
694 temp->dest_id_count = 0;
695 db->msg_store_count++;
696 db->msg_store_bytes += payloadlen;
697 (*stored) = temp;
698
699 if(!store_id){
700 temp->db_id = ++db->last_db_id;
701 }else{
702 temp->db_id = store_id;
703 }
704
705 db__msg_store_add(db, temp);
706
707 return MOSQ_ERR_SUCCESS;
708 error:
709 mosquitto__free(topic);
710 if(temp){
711 mosquitto__free(temp->source_id);
712 mosquitto__free(temp->source_username);
713 mosquitto__free(temp->topic);
714 mosquitto__free(temp);
715 }
716 mosquitto_property_free_all(&properties);
717 UHPA_FREE(*payload, payloadlen);
718 return rc;
719 }
720
db__message_store_find(struct mosquitto * context,uint16_t mid,struct mosquitto_msg_store ** stored)721 int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)
722 {
723 struct mosquitto_client_msg *tail;
724
725 if(!context) return MOSQ_ERR_INVAL;
726
727 *stored = NULL;
728 DL_FOREACH(context->msgs_in.inflight, tail){
729 if(tail->store->source_mid == mid){
730 *stored = tail->store;
731 return MOSQ_ERR_SUCCESS;
732 }
733 }
734
735 DL_FOREACH(context->msgs_in.queued, tail){
736 if(tail->store->source_mid == mid){
737 *stored = tail->store;
738 return MOSQ_ERR_SUCCESS;
739 }
740 }
741
742 return 1;
743 }
744
745 /* Called on reconnect to set outgoing messages to a sensible state and force a
746 * retry, and to set incoming messages to expect an appropriate retry. */
db__message_reconnect_reset_outgoing(struct mosquitto_db * db,struct mosquitto * context)747 int db__message_reconnect_reset_outgoing(struct mosquitto_db *db, struct mosquitto *context)
748 {
749 struct mosquitto_client_msg *msg, *tmp;
750
751 context->msgs_out.msg_bytes = 0;
752 context->msgs_out.msg_bytes12 = 0;
753 context->msgs_out.msg_count = 0;
754 context->msgs_out.msg_count12 = 0;
755 context->msgs_out.inflight_quota = context->msgs_out.inflight_maximum;
756
757 DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){
758 context->msgs_out.msg_count++;
759 context->msgs_out.msg_bytes += msg->store->payloadlen;
760 if(msg->qos > 0){
761 context->msgs_out.msg_count12++;
762 context->msgs_out.msg_bytes12 += msg->store->payloadlen;
763 util__decrement_receive_quota(context);
764 }
765
766 switch(msg->qos){
767 case 0:
768 msg->state = mosq_ms_publish_qos0;
769 break;
770 case 1:
771 msg->state = mosq_ms_publish_qos1;
772 break;
773 case 2:
774 if(msg->state == mosq_ms_wait_for_pubcomp){
775 msg->state = mosq_ms_resend_pubrel;
776 }else{
777 msg->state = mosq_ms_publish_qos2;
778 }
779 break;
780 }
781 }
782 /* Messages received when the client was disconnected are put
783 * in the mosq_ms_queued state. If we don't change them to the
784 * appropriate "publish" state, then the queued messages won't
785 * get sent until the client next receives a message - and they
786 * will be sent out of order.
787 */
788 DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){
789 context->msgs_out.msg_count++;
790 context->msgs_out.msg_bytes += msg->store->payloadlen;
791 if(msg->qos > 0){
792 context->msgs_out.msg_count12++;
793 context->msgs_out.msg_bytes12 += msg->store->payloadlen;
794 }
795 if(db__ready_for_flight(&context->msgs_out, msg->qos)){
796 switch(msg->qos){
797 case 0:
798 msg->state = mosq_ms_publish_qos0;
799 break;
800 case 1:
801 msg->state = mosq_ms_publish_qos1;
802 break;
803 case 2:
804 msg->state = mosq_ms_publish_qos2;
805 break;
806 }
807 db__message_dequeue_first(context, &context->msgs_out);
808 }
809 }
810
811 return MOSQ_ERR_SUCCESS;
812 }
813
814
815 /* Called on reconnect to set incoming messages to expect an appropriate retry. */
db__message_reconnect_reset_incoming(struct mosquitto_db * db,struct mosquitto * context)816 int db__message_reconnect_reset_incoming(struct mosquitto_db *db, struct mosquitto *context)
817 {
818 struct mosquitto_client_msg *msg, *tmp;
819
820 context->msgs_in.msg_bytes = 0;
821 context->msgs_in.msg_bytes12 = 0;
822 context->msgs_in.msg_count = 0;
823 context->msgs_in.msg_count12 = 0;
824 context->msgs_in.inflight_quota = context->msgs_in.inflight_maximum;
825
826 DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){
827 context->msgs_in.msg_count++;
828 context->msgs_in.msg_bytes += msg->store->payloadlen;
829 if(msg->qos > 0){
830 context->msgs_in.msg_count12++;
831 context->msgs_in.msg_bytes12 += msg->store->payloadlen;
832 util__decrement_receive_quota(context);
833 }
834
835 if(msg->qos != 2){
836 /* Anything <QoS 2 can be completely retried by the client at
837 * no harm. */
838 db__message_remove(db, &context->msgs_in, msg);
839 }else{
840 /* Message state can be preserved here because it should match
841 * whatever the client has got. */
842 }
843 }
844
845 /* Messages received when the client was disconnected are put
846 * in the mosq_ms_queued state. If we don't change them to the
847 * appropriate "publish" state, then the queued messages won't
848 * get sent until the client next receives a message - and they
849 * will be sent out of order.
850 */
851 DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){
852 context->msgs_in.msg_count++;
853 context->msgs_in.msg_bytes += msg->store->payloadlen;
854 if(msg->qos > 0){
855 context->msgs_in.msg_count12++;
856 context->msgs_in.msg_bytes12 += msg->store->payloadlen;
857 }
858 if(db__ready_for_flight(&context->msgs_in, msg->qos)){
859 switch(msg->qos){
860 case 0:
861 msg->state = mosq_ms_publish_qos0;
862 break;
863 case 1:
864 msg->state = mosq_ms_publish_qos1;
865 break;
866 case 2:
867 msg->state = mosq_ms_publish_qos2;
868 break;
869 }
870 db__message_dequeue_first(context, &context->msgs_in);
871 }
872 }
873
874 return MOSQ_ERR_SUCCESS;
875 }
876
877
db__message_reconnect_reset(struct mosquitto_db * db,struct mosquitto * context)878 int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context)
879 {
880 int rc;
881
882 rc = db__message_reconnect_reset_outgoing(db, context);
883 if(rc) return rc;
884 return db__message_reconnect_reset_incoming(db, context);
885 }
886
887
db__message_release_incoming(struct mosquitto_db * db,struct mosquitto * context,uint16_t mid)888 int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid)
889 {
890 struct mosquitto_client_msg *tail, *tmp;
891 int retain;
892 char *topic;
893 char *source_id;
894 int msg_index = 0;
895 bool deleted = false;
896 int rc;
897
898 if(!context) return MOSQ_ERR_INVAL;
899
900 DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
901 msg_index++;
902 if(tail->mid == mid){
903 if(tail->store->qos != 2){
904 return MOSQ_ERR_PROTOCOL;
905 }
906 topic = tail->store->topic;
907 retain = tail->retain;
908 source_id = tail->store->source_id;
909
910 /* topic==NULL should be a QoS 2 message that was
911 * denied/dropped and is being processed so the client doesn't
912 * keep resending it. That means we don't send it to other
913 * clients. */
914 if(!topic){
915 db__message_remove(db, &context->msgs_in, tail);
916 deleted = true;
917 }else{
918 rc = sub__messages_queue(db, source_id, topic, 2, retain, &tail->store);
919 if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){
920 db__message_remove(db, &context->msgs_in, tail);
921 deleted = true;
922 }else{
923 return 1;
924 }
925 }
926 }
927 }
928
929 DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){
930 if(context->msgs_in.inflight_maximum != 0 && msg_index >= context->msgs_in.inflight_maximum){
931 break;
932 }
933
934 msg_index++;
935 tail->timestamp = mosquitto_time();
936
937 if(tail->qos == 2){
938 send__pubrec(context, tail->mid, 0);
939 tail->state = mosq_ms_wait_for_pubrel;
940 db__message_dequeue_first(context, &context->msgs_in);
941 }
942 }
943 if(deleted){
944 return MOSQ_ERR_SUCCESS;
945 }else{
946 return MOSQ_ERR_NOT_FOUND;
947 }
948 }
949
db__message_write(struct mosquitto_db * db,struct mosquitto * context)950 int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
951 {
952 int rc;
953 struct mosquitto_client_msg *tail, *tmp;
954 uint16_t mid;
955 int retries;
956 int retain;
957 const char *topic;
958 int qos;
959 uint32_t payloadlen;
960 const void *payload;
961 int msg_count = 0;
962 mosquitto_property *cmsg_props = NULL, *store_props = NULL;
963 time_t now = 0;
964 uint32_t expiry_interval;
965
966 if(!context || context->sock == INVALID_SOCKET
967 || (context->state == mosq_cs_active && !context->id)){
968 return MOSQ_ERR_INVAL;
969 }
970
971 if(context->state != mosq_cs_active){
972 return MOSQ_ERR_SUCCESS;
973 }
974
975 DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
976 msg_count++;
977 if(tail->store->message_expiry_time){
978 if(now == 0){
979 now = time(NULL);
980 }
981 if(now > tail->store->message_expiry_time){
982 /* Message is expired, must not send. */
983 db__message_remove(db, &context->msgs_in, tail);
984 continue;
985 }
986 }
987 mid = tail->mid;
988
989 switch(tail->state){
990 case mosq_ms_send_pubrec:
991 rc = send__pubrec(context, mid, 0);
992 if(!rc){
993 tail->state = mosq_ms_wait_for_pubrel;
994 }else{
995 return rc;
996 }
997 break;
998
999 case mosq_ms_resend_pubcomp:
1000 rc = send__pubcomp(context, mid);
1001 if(!rc){
1002 tail->state = mosq_ms_wait_for_pubrel;
1003 }else{
1004 return rc;
1005 }
1006 break;
1007
1008 case mosq_ms_invalid:
1009 case mosq_ms_publish_qos0:
1010 case mosq_ms_publish_qos1:
1011 case mosq_ms_publish_qos2:
1012 case mosq_ms_resend_pubrel:
1013 case mosq_ms_wait_for_puback:
1014 case mosq_ms_wait_for_pubrec:
1015 case mosq_ms_wait_for_pubrel:
1016 case mosq_ms_wait_for_pubcomp:
1017 case mosq_ms_queued:
1018 break;
1019 }
1020 }
1021
1022 DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){
1023 msg_count++;
1024 if(tail->store->message_expiry_time){
1025 if(now == 0){
1026 now = time(NULL);
1027 }
1028 if(now > tail->store->message_expiry_time){
1029 /* Message is expired, must not send. */
1030 db__message_remove(db, &context->msgs_out, tail);
1031 continue;
1032 }else{
1033 expiry_interval = tail->store->message_expiry_time - now;
1034 }
1035 }else{
1036 expiry_interval = 0;
1037 }
1038 mid = tail->mid;
1039 retries = tail->dup;
1040 retain = tail->retain;
1041 topic = tail->store->topic;
1042 qos = tail->qos;
1043 payloadlen = tail->store->payloadlen;
1044 payload = UHPA_ACCESS_PAYLOAD(tail->store);
1045 cmsg_props = tail->properties;
1046 store_props = tail->store->properties;
1047
1048 switch(tail->state){
1049 case mosq_ms_publish_qos0:
1050 rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
1051 if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
1052 db__message_remove(db, &context->msgs_out, tail);
1053 }else{
1054 return rc;
1055 }
1056 break;
1057
1058 case mosq_ms_publish_qos1:
1059 rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
1060 if(rc == MOSQ_ERR_SUCCESS){
1061 tail->timestamp = mosquitto_time();
1062 tail->dup = 1; /* Any retry attempts are a duplicate. */
1063 tail->state = mosq_ms_wait_for_puback;
1064 }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
1065 db__message_remove(db, &context->msgs_out, tail);
1066 }else{
1067 return rc;
1068 }
1069 break;
1070
1071 case mosq_ms_publish_qos2:
1072 rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
1073 if(rc == MOSQ_ERR_SUCCESS){
1074 tail->timestamp = mosquitto_time();
1075 tail->dup = 1; /* Any retry attempts are a duplicate. */
1076 tail->state = mosq_ms_wait_for_pubrec;
1077 }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
1078 db__message_remove(db, &context->msgs_out, tail);
1079 }else{
1080 return rc;
1081 }
1082 break;
1083
1084 case mosq_ms_resend_pubrel:
1085 rc = send__pubrel(context, mid);
1086 if(!rc){
1087 tail->state = mosq_ms_wait_for_pubcomp;
1088 }else{
1089 return rc;
1090 }
1091 break;
1092
1093 case mosq_ms_invalid:
1094 case mosq_ms_send_pubrec:
1095 case mosq_ms_resend_pubcomp:
1096 case mosq_ms_wait_for_puback:
1097 case mosq_ms_wait_for_pubrec:
1098 case mosq_ms_wait_for_pubrel:
1099 case mosq_ms_wait_for_pubcomp:
1100 case mosq_ms_queued:
1101 break;
1102 }
1103 }
1104
1105 DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){
1106 if(context->msgs_out.inflight_maximum != 0 && context->msgs_in.inflight_quota == 0){
1107 break;
1108 }
1109
1110 msg_count++;
1111
1112 if(tail->qos == 2){
1113 tail->state = mosq_ms_send_pubrec;
1114 db__message_dequeue_first(context, &context->msgs_in);
1115 rc = send__pubrec(context, tail->mid, 0);
1116 if(!rc){
1117 tail->state = mosq_ms_wait_for_pubrel;
1118 }else{
1119 return rc;
1120 }
1121 }
1122 }
1123
1124 DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){
1125 if(context->msgs_out.inflight_maximum != 0 && context->msgs_out.inflight_quota == 0){
1126 break;
1127 }
1128
1129 msg_count++;
1130
1131 switch(tail->qos){
1132 case 0:
1133 tail->state = mosq_ms_publish_qos0;
1134 break;
1135 case 1:
1136 tail->state = mosq_ms_publish_qos1;
1137 break;
1138 case 2:
1139 tail->state = mosq_ms_publish_qos2;
1140 break;
1141 }
1142 db__message_dequeue_first(context, &context->msgs_out);
1143 }
1144
1145 return MOSQ_ERR_SUCCESS;
1146 }
1147
db__limits_set(unsigned long inflight_bytes,int queued,unsigned long queued_bytes)1148 void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
1149 {
1150 max_inflight_bytes = inflight_bytes;
1151 max_queued = queued;
1152 max_queued_bytes = queued_bytes;
1153 }
1154
1155