1 /*
2 Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
3 
4 All rights reserved. This program and the accompanying materials
5 are made available under the terms of the Eclipse Public License v1.0
6 and Eclipse Distribution License v1.0 which accompany this distribution.
7 
8 The Eclipse Public License is available at
9    http://www.eclipse.org/legal/epl-v10.html
10 and the Eclipse Distribution License is available at
11   http://www.eclipse.org/org/documents/edl-v10.php.
12 
13 Contributors:
14    Roger Light - initial implementation and documentation.
15 */
16 
17 #include "config.h"
18 
19 #include <assert.h>
20 #include <stdio.h>
21 #include <utlist.h>
22 
23 #include "mosquitto_broker_internal.h"
24 #include "memory_mosq.h"
25 #include "send_mosq.h"
26 #include "sys_tree.h"
27 #include "time_mosq.h"
28 #include "util_mosq.h"
29 
30 static unsigned long max_inflight_bytes = 0;
31 static int max_queued = 100;
32 static unsigned long max_queued_bytes = 0;
33 
34 /**
35  * Is this context ready to take more in flight messages right now?
36  * @param context the client context of interest
37  * @param qos qos for the packet of interest
38  * @return true if more in flight are allowed.
39  */
db__ready_for_flight(struct mosquitto_msg_data * msgs,int qos)40 static bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos)
41 {
42 	bool valid_bytes;
43 	bool valid_count;
44 
45 	if(qos == 0 || (msgs->inflight_maximum == 0 && max_inflight_bytes == 0)){
46 		return true;
47 	}
48 
49 	valid_bytes = msgs->msg_bytes12 < max_inflight_bytes;
50 	valid_count = msgs->inflight_quota > 0;
51 
52 	if(msgs->inflight_maximum == 0){
53 		return valid_bytes;
54 	}
55 	if(max_inflight_bytes == 0){
56 		return valid_count;
57 	}
58 
59 	return valid_bytes && valid_count;
60 }
61 
62 
63 /**
64  * For a given client context, are more messages allowed to be queued?
65  * It is assumed that inflight checks and queue_qos0 checks have already
66  * been made.
67  * @param context client of interest
68  * @param qos destination qos for the packet of interest
69  * @return true if queuing is allowed, false if should be dropped
70  */
db__ready_for_queue(struct mosquitto * context,int qos,struct mosquitto_msg_data * msg_data)71 static bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data)
72 {
73 	int source_count;
74 	int adjust_count;
75 	unsigned long source_bytes;
76 	unsigned long adjust_bytes = max_inflight_bytes;
77 
78 	if(max_queued == 0 && max_queued_bytes == 0){
79 		return true;
80 	}
81 
82 	if(qos == 0){
83 		source_bytes = msg_data->msg_bytes;
84 		source_count = msg_data->msg_count;
85 	}else{
86 		source_bytes = msg_data->msg_bytes12;
87 		source_count = msg_data->msg_count12;
88 	}
89 	adjust_count = msg_data->inflight_maximum;
90 
91 	/* nothing in flight for offline clients */
92 	if(context->sock == INVALID_SOCKET){
93 		adjust_bytes = 0;
94 		adjust_count = 0;
95 	}
96 
97 	bool valid_bytes = source_bytes - adjust_bytes < max_queued_bytes;
98 	bool valid_count = source_count - adjust_count < max_queued;
99 
100 	if(max_queued_bytes == 0){
101 		return valid_count;
102 	}
103 	if(max_queued == 0){
104 		return valid_bytes;
105 	}
106 
107 	return valid_bytes && valid_count;
108 }
109 
110 
db__open(struct mosquitto__config * config,struct mosquitto_db * db)111 int db__open(struct mosquitto__config *config, struct mosquitto_db *db)
112 {
113 	struct mosquitto__subhier *subhier;
114 
115 	if(!config || !db) return MOSQ_ERR_INVAL;
116 
117 	db->last_db_id = 0;
118 
119 	db->contexts_by_id = NULL;
120 	db->contexts_by_sock = NULL;
121 	db->contexts_for_free = NULL;
122 #ifdef WITH_BRIDGE
123 	db->bridges = NULL;
124 	db->bridge_count = 0;
125 #endif
126 
127 	// Initialize the hashtable
128 	db->clientid_index_hash = NULL;
129 
130 	db->subs = NULL;
131 
132 	subhier = sub__add_hier_entry(NULL, &db->subs, "", strlen(""));
133 	if(!subhier) return MOSQ_ERR_NOMEM;
134 
135 	subhier = sub__add_hier_entry(NULL, &db->subs, "$SYS", strlen("$SYS"));
136 	if(!subhier) return MOSQ_ERR_NOMEM;
137 
138 	db->unpwd = NULL;
139 
140 #ifdef WITH_PERSISTENCE
141 	if(persist__restore(db)) return 1;
142 #endif
143 
144 	return MOSQ_ERR_SUCCESS;
145 }
146 
subhier_clean(struct mosquitto_db * db,struct mosquitto__subhier ** subhier)147 static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **subhier)
148 {
149 	struct mosquitto__subhier *peer, *subhier_tmp;
150 	struct mosquitto__subleaf *leaf, *nextleaf;
151 
152 	HASH_ITER(hh, *subhier, peer, subhier_tmp){
153 		leaf = peer->subs;
154 		while(leaf){
155 			nextleaf = leaf->next;
156 			mosquitto__free(leaf);
157 			leaf = nextleaf;
158 		}
159 		if(peer->retained){
160 			db__msg_store_ref_dec(db, &peer->retained);
161 		}
162 		subhier_clean(db, &peer->children);
163 		mosquitto__free(peer->topic);
164 
165 		HASH_DELETE(hh, *subhier, peer);
166 		mosquitto__free(peer);
167 	}
168 }
169 
db__close(struct mosquitto_db * db)170 int db__close(struct mosquitto_db *db)
171 {
172 	subhier_clean(db, &db->subs);
173 	db__msg_store_clean(db);
174 
175 	return MOSQ_ERR_SUCCESS;
176 }
177 
178 
db__msg_store_add(struct mosquitto_db * db,struct mosquitto_msg_store * store)179 void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store)
180 {
181 	store->next = db->msg_store;
182 	store->prev = NULL;
183 	if(db->msg_store){
184 		db->msg_store->prev = store;
185 	}
186 	db->msg_store = store;
187 }
188 
189 
db__msg_store_remove(struct mosquitto_db * db,struct mosquitto_msg_store * store)190 void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store)
191 {
192 	int i;
193 
194 	if(store->prev){
195 		store->prev->next = store->next;
196 		if(store->next){
197 			store->next->prev = store->prev;
198 		}
199 	}else{
200 		db->msg_store = store->next;
201 		if(store->next){
202 			store->next->prev = NULL;
203 		}
204 	}
205 	db->msg_store_count--;
206 	db->msg_store_bytes -= store->payloadlen;
207 
208 	mosquitto__free(store->source_id);
209 	mosquitto__free(store->source_username);
210 	if(store->dest_ids){
211 		for(i=0; i<store->dest_id_count; i++){
212 			mosquitto__free(store->dest_ids[i]);
213 		}
214 		mosquitto__free(store->dest_ids);
215 	}
216 	mosquitto__free(store->topic);
217 	mosquitto_property_free_all(&store->properties);
218 	UHPA_FREE_PAYLOAD(store);
219 	mosquitto__free(store);
220 }
221 
222 
db__msg_store_clean(struct mosquitto_db * db)223 void db__msg_store_clean(struct mosquitto_db *db)
224 {
225 	struct mosquitto_msg_store *store, *next;;
226 
227 	store = db->msg_store;
228 	while(store){
229 		next = store->next;
230 		db__msg_store_remove(db, store);
231 		store = next;
232 	}
233 }
234 
db__msg_store_ref_inc(struct mosquitto_msg_store * store)235 void db__msg_store_ref_inc(struct mosquitto_msg_store *store)
236 {
237 	store->ref_count++;
238 }
239 
db__msg_store_ref_dec(struct mosquitto_db * db,struct mosquitto_msg_store ** store)240 void db__msg_store_ref_dec(struct mosquitto_db *db, struct mosquitto_msg_store **store)
241 {
242 	(*store)->ref_count--;
243 	if((*store)->ref_count == 0){
244 		db__msg_store_remove(db, *store);
245 		*store = NULL;
246 	}
247 }
248 
249 
db__msg_store_compact(struct mosquitto_db * db)250 void db__msg_store_compact(struct mosquitto_db *db)
251 {
252 	struct mosquitto_msg_store *store, *next;
253 
254 	store = db->msg_store;
255 	while(store){
256 		next = store->next;
257 		if(store->ref_count < 1){
258 			db__msg_store_remove(db, store);
259 		}
260 		store = next;
261 	}
262 }
263 
264 
db__message_remove(struct mosquitto_db * db,struct mosquitto_msg_data * msg_data,struct mosquitto_client_msg * item)265 static void db__message_remove(struct mosquitto_db *db, struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
266 {
267 	if(!msg_data || !item){
268 		return;
269 	}
270 
271 	DL_DELETE(msg_data->inflight, item);
272 	if(item->store){
273 		msg_data->msg_count--;
274 		msg_data->msg_bytes -= item->store->payloadlen;
275 		if(item->qos > 0){
276 			msg_data->msg_count12--;
277 			msg_data->msg_bytes12 -= item->store->payloadlen;
278 		}
279 		db__msg_store_ref_dec(db, &item->store);
280 	}
281 
282 	mosquitto_property_free_all(&item->properties);
283 	mosquitto__free(item);
284 }
285 
286 
db__message_dequeue_first(struct mosquitto * context,struct mosquitto_msg_data * msg_data)287 void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data)
288 {
289 	struct mosquitto_client_msg *msg;
290 
291 	msg = msg_data->queued;
292 	DL_DELETE(msg_data->queued, msg);
293 	DL_APPEND(msg_data->inflight, msg);
294 	if(msg_data->inflight_quota > 0){
295 		msg_data->inflight_quota--;
296 	}
297 }
298 
299 
db__message_delete_outgoing(struct mosquitto_db * db,struct mosquitto * context,uint16_t mid,enum mosquitto_msg_state expect_state,int qos)300 int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos)
301 {
302 	struct mosquitto_client_msg *tail, *tmp;
303 	int msg_index = 0;
304 
305 	if(!context) return MOSQ_ERR_INVAL;
306 
307 	DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){
308 		msg_index++;
309 		if(tail->mid == mid){
310 			if(tail->qos != qos){
311 				return MOSQ_ERR_PROTOCOL;
312 			}else if(qos == 2 && tail->state != expect_state){
313 				return MOSQ_ERR_PROTOCOL;
314 			}
315 			msg_index--;
316 			db__message_remove(db, &context->msgs_out, tail);
317 		}
318 	}
319 
320 	DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){
321 		if(context->msgs_out.inflight_maximum != 0 && msg_index >= context->msgs_out.inflight_maximum){
322 			break;
323 		}
324 
325 		msg_index++;
326 		tail->timestamp = mosquitto_time();
327 		switch(tail->qos){
328 			case 0:
329 				tail->state = mosq_ms_publish_qos0;
330 				break;
331 			case 1:
332 				tail->state = mosq_ms_publish_qos1;
333 				break;
334 			case 2:
335 				tail->state = mosq_ms_publish_qos2;
336 				break;
337 		}
338 		db__message_dequeue_first(context, &context->msgs_out);
339 	}
340 
341 	return MOSQ_ERR_SUCCESS;
342 }
343 
db__message_insert(struct mosquitto_db * db,struct mosquitto * context,uint16_t mid,enum mosquitto_msg_direction dir,int qos,bool retain,struct mosquitto_msg_store * stored,mosquitto_property * properties)344 int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties)
345 {
346 	struct mosquitto_client_msg *msg;
347 	struct mosquitto_msg_data *msg_data;
348 	enum mosquitto_msg_state state = mosq_ms_invalid;
349 	int rc = 0;
350 	int i;
351 	char **dest_ids;
352 
353 	assert(stored);
354 	if(!context) return MOSQ_ERR_INVAL;
355 	if(!context->id) return MOSQ_ERR_SUCCESS; /* Protect against unlikely "client is disconnected but not entirely freed" scenario */
356 
357 	if(dir == mosq_md_out){
358 		msg_data = &context->msgs_out;
359 	}else{
360 		msg_data = &context->msgs_in;
361 	}
362 
363 	/* Check whether we've already sent this message to this client
364 	 * for outgoing messages only.
365 	 * If retain==true then this is a stale retained message and so should be
366 	 * sent regardless. FIXME - this does mean retained messages will received
367 	 * multiple times for overlapping subscriptions, although this is only the
368 	 * case for SUBSCRIPTION with multiple subs in so is a minor concern.
369 	 */
370 	if(context->protocol != mosq_p_mqtt5
371 			&& db->config->allow_duplicate_messages == false
372 			&& dir == mosq_md_out && retain == false && stored->dest_ids){
373 
374 		for(i=0; i<stored->dest_id_count; i++){
375 			if(!strcmp(stored->dest_ids[i], context->id)){
376 				/* We have already sent this message to this client. */
377 				mosquitto_property_free_all(&properties);
378 				return MOSQ_ERR_SUCCESS;
379 			}
380 		}
381 	}
382 	if(context->sock == INVALID_SOCKET){
383 		/* Client is not connected only queue messages with QoS>0. */
384 		if(qos == 0 && !db->config->queue_qos0_messages){
385 			if(!context->bridge){
386 				mosquitto_property_free_all(&properties);
387 				return 2;
388 			}else{
389 				if(context->bridge->start_type != bst_lazy){
390 					mosquitto_property_free_all(&properties);
391 					return 2;
392 				}
393 			}
394 		}
395 	}
396 
397 	if(context->sock != INVALID_SOCKET){
398 		if(db__ready_for_flight(msg_data, qos)){
399 			if(dir == mosq_md_out){
400 				switch(qos){
401 					case 0:
402 						state = mosq_ms_publish_qos0;
403 						break;
404 					case 1:
405 						state = mosq_ms_publish_qos1;
406 						break;
407 					case 2:
408 						state = mosq_ms_publish_qos2;
409 						break;
410 				}
411 			}else{
412 				if(qos == 2){
413 					state = mosq_ms_wait_for_pubrel;
414 				}else{
415 					mosquitto_property_free_all(&properties);
416 					return 1;
417 				}
418 			}
419 		}else if(db__ready_for_queue(context, qos, msg_data)){
420 			state = mosq_ms_queued;
421 			rc = 2;
422 		}else{
423 			/* Dropping message due to full queue. */
424 			if(context->is_dropping == false){
425 				context->is_dropping = true;
426 				log__printf(NULL, MOSQ_LOG_NOTICE,
427 						"Outgoing messages are being dropped for client %s.",
428 						context->id);
429 			}
430 			G_MSGS_DROPPED_INC();
431 			mosquitto_property_free_all(&properties);
432 			return 2;
433 		}
434 	}else{
435 		if (db__ready_for_queue(context, qos, msg_data)){
436 			state = mosq_ms_queued;
437 		}else{
438 			G_MSGS_DROPPED_INC();
439 			if(context->is_dropping == false){
440 				context->is_dropping = true;
441 				log__printf(NULL, MOSQ_LOG_NOTICE,
442 						"Outgoing messages are being dropped for client %s.",
443 						context->id);
444 			}
445 			mosquitto_property_free_all(&properties);
446 			return 2;
447 		}
448 	}
449 	assert(state != mosq_ms_invalid);
450 
451 #ifdef WITH_PERSISTENCE
452 	if(state == mosq_ms_queued){
453 		db->persistence_changes++;
454 	}
455 #endif
456 
457 	msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg));
458 	if(!msg) return MOSQ_ERR_NOMEM;
459 	msg->prev = NULL;
460 	msg->next = NULL;
461 	msg->store = stored;
462 	db__msg_store_ref_inc(msg->store);
463 	msg->mid = mid;
464 	msg->timestamp = mosquitto_time();
465 	msg->direction = dir;
466 	msg->state = state;
467 	msg->dup = false;
468 	if(qos > context->maximum_qos){
469 		msg->qos = context->maximum_qos;
470 	}else{
471 		msg->qos = qos;
472 	}
473 	msg->retain = retain;
474 	msg->properties = properties;
475 
476 	if(state == mosq_ms_queued){
477 		DL_APPEND(msg_data->queued, msg);
478 	}else{
479 		DL_APPEND(msg_data->inflight, msg);
480 	}
481 	msg_data->msg_count++;
482 	msg_data->msg_bytes+= msg->store->payloadlen;
483 	if(qos > 0){
484 		msg_data->msg_count12++;
485 		msg_data->msg_bytes12 += msg->store->payloadlen;
486 	}
487 
488 	if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){
489 		/* Record which client ids this message has been sent to so we can avoid duplicates.
490 		 * Outgoing messages only.
491 		 * If retain==true then this is a stale retained message and so should be
492 		 * sent regardless. FIXME - this does mean retained messages will received
493 		 * multiple times for overlapping subscriptions, although this is only the
494 		 * case for SUBSCRIPTION with multiple subs in so is a minor concern.
495 		 */
496 		dest_ids = mosquitto__realloc(stored->dest_ids, sizeof(char *)*(stored->dest_id_count+1));
497 		if(dest_ids){
498 			stored->dest_ids = dest_ids;
499 			stored->dest_id_count++;
500 			stored->dest_ids[stored->dest_id_count-1] = mosquitto__strdup(context->id);
501 			if(!stored->dest_ids[stored->dest_id_count-1]){
502 				return MOSQ_ERR_NOMEM;
503 			}
504 		}else{
505 			return MOSQ_ERR_NOMEM;
506 		}
507 	}
508 #ifdef WITH_BRIDGE
509 	if(context->bridge && context->bridge->start_type == bst_lazy
510 			&& context->sock == INVALID_SOCKET
511 			&& context->msgs_out.msg_count >= context->bridge->threshold){
512 
513 		context->bridge->lazy_reconnect = true;
514 	}
515 #endif
516 
517 	if(dir == mosq_md_out && msg->qos > 0){
518 		util__decrement_send_quota(context);
519 	}
520 #ifdef WITH_WEBSOCKETS
521 	if(context->wsi && rc == 0){
522 		return db__message_write(db, context);
523 	}else{
524 		return rc;
525 	}
526 #else
527 	return rc;
528 #endif
529 }
530 
db__message_update_outgoing(struct mosquitto * context,uint16_t mid,enum mosquitto_msg_state state,int qos)531 int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos)
532 {
533 	struct mosquitto_client_msg *tail;
534 
535 	DL_FOREACH(context->msgs_out.inflight, tail){
536 		if(tail->mid == mid){
537 			if(tail->qos != qos){
538 				return MOSQ_ERR_PROTOCOL;
539 			}
540 			tail->state = state;
541 			tail->timestamp = mosquitto_time();
542 			return MOSQ_ERR_SUCCESS;
543 		}
544 	}
545 	return MOSQ_ERR_NOT_FOUND;
546 }
547 
548 
db__messages_delete_list(struct mosquitto_db * db,struct mosquitto_client_msg ** head)549 void db__messages_delete_list(struct mosquitto_db *db, struct mosquitto_client_msg **head)
550 {
551 	struct mosquitto_client_msg *tail, *tmp;
552 
553 	DL_FOREACH_SAFE(*head, tail, tmp){
554 		DL_DELETE(*head, tail);
555 		db__msg_store_ref_dec(db, &tail->store);
556 		mosquitto_property_free_all(&tail->properties);
557 		mosquitto__free(tail);
558 	}
559 	*head = NULL;
560 }
561 
562 
db__messages_delete(struct mosquitto_db * db,struct mosquitto * context)563 int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
564 {
565 	if(!context) return MOSQ_ERR_INVAL;
566 
567 	db__messages_delete_list(db, &context->msgs_in.inflight);
568 	db__messages_delete_list(db, &context->msgs_in.queued);
569 	db__messages_delete_list(db, &context->msgs_out.inflight);
570 	db__messages_delete_list(db, &context->msgs_out.queued);
571 
572 	context->msgs_in.msg_bytes = 0;
573 	context->msgs_in.msg_bytes12 = 0;
574 	context->msgs_in.msg_count = 0;
575 	context->msgs_in.msg_count12 = 0;
576 
577 	context->msgs_out.msg_bytes = 0;
578 	context->msgs_out.msg_bytes12 = 0;
579 	context->msgs_out.msg_count = 0;
580 	context->msgs_out.msg_count12 = 0;
581 
582 	return MOSQ_ERR_SUCCESS;
583 }
584 
db__messages_easy_queue(struct mosquitto_db * db,struct mosquitto * context,const char * topic,int qos,uint32_t payloadlen,const void * payload,int retain,uint32_t message_expiry_interval,mosquitto_property ** properties)585 int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties)
586 {
587 	struct mosquitto_msg_store *stored;
588 	char *source_id;
589 	char *topic_heap;
590 	mosquitto__payload_uhpa payload_uhpa;
591 	mosquitto_property *local_properties = NULL;
592 	enum mosquitto_msg_origin origin;
593 
594 	assert(db);
595 
596 	payload_uhpa.ptr = NULL;
597 
598 	if(!topic) return MOSQ_ERR_INVAL;
599 	topic_heap = mosquitto__strdup(topic);
600 	if(!topic_heap) return MOSQ_ERR_INVAL;
601 
602 	if(db->config->retain_available == false){
603 		retain = 0;
604 	}
605 
606 	if(UHPA_ALLOC(payload_uhpa, payloadlen) == 0){
607 		mosquitto__free(topic_heap);
608 		return MOSQ_ERR_NOMEM;
609 	}
610 	memcpy(UHPA_ACCESS(payload_uhpa, payloadlen), payload, payloadlen);
611 
612 	if(context && context->id){
613 		source_id = context->id;
614 	}else{
615 		source_id = "";
616 	}
617 	if(properties){
618 		local_properties = *properties;
619 		*properties = NULL;
620 	}
621 
622 	if(context){
623 		origin = mosq_mo_client;
624 	}else{
625 		origin = mosq_mo_broker;
626 	}
627 	if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1;
628 
629 	return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
630 }
631 
632 /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
db__message_store(struct mosquitto_db * db,const struct mosquitto * source,uint16_t source_mid,char * topic,int qos,uint32_t payloadlen,mosquitto__payload_uhpa * payload,int retain,struct mosquitto_msg_store ** stored,uint32_t message_expiry_interval,mosquitto_property * properties,dbid_t store_id,enum mosquitto_msg_origin origin)633 int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
634 {
635 	struct mosquitto_msg_store *temp = NULL;
636 	int rc = MOSQ_ERR_SUCCESS;
637 
638 	assert(db);
639 	assert(stored);
640 
641 	temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
642 	if(!temp){
643 		log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
644 		rc = MOSQ_ERR_NOMEM;
645 		goto error;
646 	}
647 
648 	temp->topic = NULL;
649 	temp->payload.ptr = NULL;
650 
651 	temp->ref_count = 0;
652 	if(source && source->id){
653 		temp->source_id = mosquitto__strdup(source->id);
654 	}else{
655 		temp->source_id = mosquitto__strdup("");
656 	}
657 	if(!temp->source_id){
658 		log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
659 		rc = MOSQ_ERR_NOMEM;
660 		goto error;
661 	}
662 
663 	if(source && source->username){
664 		temp->source_username = mosquitto__strdup(source->username);
665 		if(!temp->source_username){
666 			rc = MOSQ_ERR_NOMEM;
667 			goto error;
668 		}
669 	}
670 	if(source){
671 		temp->source_listener = source->listener;
672 	}
673 	temp->source_mid = source_mid;
674 	temp->mid = 0;
675 	temp->qos = qos;
676 	temp->retain = retain;
677 	temp->topic = topic;
678 	topic = NULL;
679 	temp->payloadlen = payloadlen;
680 	temp->properties = properties;
681 	temp->origin = origin;
682 	if(payloadlen){
683 		UHPA_MOVE(temp->payload, *payload, payloadlen);
684 	}else{
685 		temp->payload.ptr = NULL;
686 	}
687 	if(message_expiry_interval > 0){
688 		temp->message_expiry_time = time(NULL) + message_expiry_interval;
689 	}else{
690 		temp->message_expiry_time = 0;
691 	}
692 
693 	temp->dest_ids = NULL;
694 	temp->dest_id_count = 0;
695 	db->msg_store_count++;
696 	db->msg_store_bytes += payloadlen;
697 	(*stored) = temp;
698 
699 	if(!store_id){
700 		temp->db_id = ++db->last_db_id;
701 	}else{
702 		temp->db_id = store_id;
703 	}
704 
705 	db__msg_store_add(db, temp);
706 
707 	return MOSQ_ERR_SUCCESS;
708 error:
709 	mosquitto__free(topic);
710 	if(temp){
711 		mosquitto__free(temp->source_id);
712 		mosquitto__free(temp->source_username);
713 		mosquitto__free(temp->topic);
714 		mosquitto__free(temp);
715 	}
716 	mosquitto_property_free_all(&properties);
717 	UHPA_FREE(*payload, payloadlen);
718 	return rc;
719 }
720 
db__message_store_find(struct mosquitto * context,uint16_t mid,struct mosquitto_msg_store ** stored)721 int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)
722 {
723 	struct mosquitto_client_msg *tail;
724 
725 	if(!context) return MOSQ_ERR_INVAL;
726 
727 	*stored = NULL;
728 	DL_FOREACH(context->msgs_in.inflight, tail){
729 		if(tail->store->source_mid == mid){
730 			*stored = tail->store;
731 			return MOSQ_ERR_SUCCESS;
732 		}
733 	}
734 
735 	DL_FOREACH(context->msgs_in.queued, tail){
736 		if(tail->store->source_mid == mid){
737 			*stored = tail->store;
738 			return MOSQ_ERR_SUCCESS;
739 		}
740 	}
741 
742 	return 1;
743 }
744 
745 /* Called on reconnect to set outgoing messages to a sensible state and force a
746  * retry, and to set incoming messages to expect an appropriate retry. */
db__message_reconnect_reset_outgoing(struct mosquitto_db * db,struct mosquitto * context)747 int db__message_reconnect_reset_outgoing(struct mosquitto_db *db, struct mosquitto *context)
748 {
749 	struct mosquitto_client_msg *msg, *tmp;
750 
751 	context->msgs_out.msg_bytes = 0;
752 	context->msgs_out.msg_bytes12 = 0;
753 	context->msgs_out.msg_count = 0;
754 	context->msgs_out.msg_count12 = 0;
755 	context->msgs_out.inflight_quota = context->msgs_out.inflight_maximum;
756 
757 	DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){
758 		context->msgs_out.msg_count++;
759 		context->msgs_out.msg_bytes += msg->store->payloadlen;
760 		if(msg->qos > 0){
761 			context->msgs_out.msg_count12++;
762 			context->msgs_out.msg_bytes12 += msg->store->payloadlen;
763 			util__decrement_receive_quota(context);
764 		}
765 
766 		switch(msg->qos){
767 			case 0:
768 				msg->state = mosq_ms_publish_qos0;
769 				break;
770 			case 1:
771 				msg->state = mosq_ms_publish_qos1;
772 				break;
773 			case 2:
774 				if(msg->state == mosq_ms_wait_for_pubcomp){
775 					msg->state = mosq_ms_resend_pubrel;
776 				}else{
777 					msg->state = mosq_ms_publish_qos2;
778 				}
779 				break;
780 		}
781 	}
782 	/* Messages received when the client was disconnected are put
783 	 * in the mosq_ms_queued state. If we don't change them to the
784 	 * appropriate "publish" state, then the queued messages won't
785 	 * get sent until the client next receives a message - and they
786 	 * will be sent out of order.
787 	 */
788 	DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){
789 		context->msgs_out.msg_count++;
790 		context->msgs_out.msg_bytes += msg->store->payloadlen;
791 		if(msg->qos > 0){
792 			context->msgs_out.msg_count12++;
793 			context->msgs_out.msg_bytes12 += msg->store->payloadlen;
794 		}
795 		if(db__ready_for_flight(&context->msgs_out, msg->qos)){
796 			switch(msg->qos){
797 				case 0:
798 					msg->state = mosq_ms_publish_qos0;
799 					break;
800 				case 1:
801 					msg->state = mosq_ms_publish_qos1;
802 					break;
803 				case 2:
804 					msg->state = mosq_ms_publish_qos2;
805 					break;
806 			}
807 			db__message_dequeue_first(context, &context->msgs_out);
808 		}
809 	}
810 
811 	return MOSQ_ERR_SUCCESS;
812 }
813 
814 
815 /* Called on reconnect to set incoming messages to expect an appropriate retry. */
db__message_reconnect_reset_incoming(struct mosquitto_db * db,struct mosquitto * context)816 int db__message_reconnect_reset_incoming(struct mosquitto_db *db, struct mosquitto *context)
817 {
818 	struct mosquitto_client_msg *msg, *tmp;
819 
820 	context->msgs_in.msg_bytes = 0;
821 	context->msgs_in.msg_bytes12 = 0;
822 	context->msgs_in.msg_count = 0;
823 	context->msgs_in.msg_count12 = 0;
824 	context->msgs_in.inflight_quota = context->msgs_in.inflight_maximum;
825 
826 	DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){
827 		context->msgs_in.msg_count++;
828 		context->msgs_in.msg_bytes += msg->store->payloadlen;
829 		if(msg->qos > 0){
830 			context->msgs_in.msg_count12++;
831 			context->msgs_in.msg_bytes12 += msg->store->payloadlen;
832 			util__decrement_receive_quota(context);
833 		}
834 
835 		if(msg->qos != 2){
836 			/* Anything <QoS 2 can be completely retried by the client at
837 			 * no harm. */
838 			db__message_remove(db, &context->msgs_in, msg);
839 		}else{
840 			/* Message state can be preserved here because it should match
841 			 * whatever the client has got. */
842 		}
843 	}
844 
845 	/* Messages received when the client was disconnected are put
846 	 * in the mosq_ms_queued state. If we don't change them to the
847 	 * appropriate "publish" state, then the queued messages won't
848 	 * get sent until the client next receives a message - and they
849 	 * will be sent out of order.
850 	 */
851 	DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){
852 		context->msgs_in.msg_count++;
853 		context->msgs_in.msg_bytes += msg->store->payloadlen;
854 		if(msg->qos > 0){
855 			context->msgs_in.msg_count12++;
856 			context->msgs_in.msg_bytes12 += msg->store->payloadlen;
857 		}
858 		if(db__ready_for_flight(&context->msgs_in, msg->qos)){
859 			switch(msg->qos){
860 				case 0:
861 					msg->state = mosq_ms_publish_qos0;
862 					break;
863 				case 1:
864 					msg->state = mosq_ms_publish_qos1;
865 					break;
866 				case 2:
867 					msg->state = mosq_ms_publish_qos2;
868 					break;
869 			}
870 			db__message_dequeue_first(context, &context->msgs_in);
871 		}
872 	}
873 
874 	return MOSQ_ERR_SUCCESS;
875 }
876 
877 
db__message_reconnect_reset(struct mosquitto_db * db,struct mosquitto * context)878 int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context)
879 {
880 	int rc;
881 
882 	rc = db__message_reconnect_reset_outgoing(db, context);
883 	if(rc) return rc;
884 	return db__message_reconnect_reset_incoming(db, context);
885 }
886 
887 
db__message_release_incoming(struct mosquitto_db * db,struct mosquitto * context,uint16_t mid)888 int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid)
889 {
890 	struct mosquitto_client_msg *tail, *tmp;
891 	int retain;
892 	char *topic;
893 	char *source_id;
894 	int msg_index = 0;
895 	bool deleted = false;
896 	int rc;
897 
898 	if(!context) return MOSQ_ERR_INVAL;
899 
900 	DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
901 		msg_index++;
902 		if(tail->mid == mid){
903 			if(tail->store->qos != 2){
904 				return MOSQ_ERR_PROTOCOL;
905 			}
906 			topic = tail->store->topic;
907 			retain = tail->retain;
908 			source_id = tail->store->source_id;
909 
910 			/* topic==NULL should be a QoS 2 message that was
911 			 * denied/dropped and is being processed so the client doesn't
912 			 * keep resending it. That means we don't send it to other
913 			 * clients. */
914 			if(!topic){
915 				db__message_remove(db, &context->msgs_in, tail);
916 				deleted = true;
917 			}else{
918 				rc = sub__messages_queue(db, source_id, topic, 2, retain, &tail->store);
919 				if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){
920 					db__message_remove(db, &context->msgs_in, tail);
921 					deleted = true;
922 				}else{
923 					return 1;
924 				}
925 			}
926 		}
927 	}
928 
929 	DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){
930 		if(context->msgs_in.inflight_maximum != 0 && msg_index >= context->msgs_in.inflight_maximum){
931 			break;
932 		}
933 
934 		msg_index++;
935 		tail->timestamp = mosquitto_time();
936 
937 		if(tail->qos == 2){
938 			send__pubrec(context, tail->mid, 0);
939 			tail->state = mosq_ms_wait_for_pubrel;
940 			db__message_dequeue_first(context, &context->msgs_in);
941 		}
942 	}
943 	if(deleted){
944 		return MOSQ_ERR_SUCCESS;
945 	}else{
946 		return MOSQ_ERR_NOT_FOUND;
947 	}
948 }
949 
db__message_write(struct mosquitto_db * db,struct mosquitto * context)950 int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
951 {
952 	int rc;
953 	struct mosquitto_client_msg *tail, *tmp;
954 	uint16_t mid;
955 	int retries;
956 	int retain;
957 	const char *topic;
958 	int qos;
959 	uint32_t payloadlen;
960 	const void *payload;
961 	int msg_count = 0;
962 	mosquitto_property *cmsg_props = NULL, *store_props = NULL;
963 	time_t now = 0;
964 	uint32_t expiry_interval;
965 
966 	if(!context || context->sock == INVALID_SOCKET
967 			|| (context->state == mosq_cs_active && !context->id)){
968 		return MOSQ_ERR_INVAL;
969 	}
970 
971 	if(context->state != mosq_cs_active){
972 		return MOSQ_ERR_SUCCESS;
973 	}
974 
975 	DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
976 		msg_count++;
977 		if(tail->store->message_expiry_time){
978 			if(now == 0){
979 				now = time(NULL);
980 			}
981 			if(now > tail->store->message_expiry_time){
982 				/* Message is expired, must not send. */
983 				db__message_remove(db, &context->msgs_in, tail);
984 				continue;
985 			}
986 		}
987 		mid = tail->mid;
988 
989 		switch(tail->state){
990 			case mosq_ms_send_pubrec:
991 				rc = send__pubrec(context, mid, 0);
992 				if(!rc){
993 					tail->state = mosq_ms_wait_for_pubrel;
994 				}else{
995 					return rc;
996 				}
997 				break;
998 
999 			case mosq_ms_resend_pubcomp:
1000 				rc = send__pubcomp(context, mid);
1001 				if(!rc){
1002 					tail->state = mosq_ms_wait_for_pubrel;
1003 				}else{
1004 					return rc;
1005 				}
1006 				break;
1007 
1008 			case mosq_ms_invalid:
1009 			case mosq_ms_publish_qos0:
1010 			case mosq_ms_publish_qos1:
1011 			case mosq_ms_publish_qos2:
1012 			case mosq_ms_resend_pubrel:
1013 			case mosq_ms_wait_for_puback:
1014 			case mosq_ms_wait_for_pubrec:
1015 			case mosq_ms_wait_for_pubrel:
1016 			case mosq_ms_wait_for_pubcomp:
1017 			case mosq_ms_queued:
1018 				break;
1019 		}
1020 	}
1021 
1022 	DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){
1023 		msg_count++;
1024 		if(tail->store->message_expiry_time){
1025 			if(now == 0){
1026 				now = time(NULL);
1027 			}
1028 			if(now > tail->store->message_expiry_time){
1029 				/* Message is expired, must not send. */
1030 				db__message_remove(db, &context->msgs_out, tail);
1031 				continue;
1032 			}else{
1033 				expiry_interval = tail->store->message_expiry_time - now;
1034 			}
1035 		}else{
1036 			expiry_interval = 0;
1037 		}
1038 		mid = tail->mid;
1039 		retries = tail->dup;
1040 		retain = tail->retain;
1041 		topic = tail->store->topic;
1042 		qos = tail->qos;
1043 		payloadlen = tail->store->payloadlen;
1044 		payload = UHPA_ACCESS_PAYLOAD(tail->store);
1045 		cmsg_props = tail->properties;
1046 		store_props = tail->store->properties;
1047 
1048 		switch(tail->state){
1049 			case mosq_ms_publish_qos0:
1050 				rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
1051 				if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
1052 					db__message_remove(db, &context->msgs_out, tail);
1053 				}else{
1054 					return rc;
1055 				}
1056 				break;
1057 
1058 			case mosq_ms_publish_qos1:
1059 				rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
1060 				if(rc == MOSQ_ERR_SUCCESS){
1061 					tail->timestamp = mosquitto_time();
1062 					tail->dup = 1; /* Any retry attempts are a duplicate. */
1063 					tail->state = mosq_ms_wait_for_puback;
1064 				}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
1065 					db__message_remove(db, &context->msgs_out, tail);
1066 				}else{
1067 					return rc;
1068 				}
1069 				break;
1070 
1071 			case mosq_ms_publish_qos2:
1072 				rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
1073 				if(rc == MOSQ_ERR_SUCCESS){
1074 					tail->timestamp = mosquitto_time();
1075 					tail->dup = 1; /* Any retry attempts are a duplicate. */
1076 					tail->state = mosq_ms_wait_for_pubrec;
1077 				}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
1078 					db__message_remove(db, &context->msgs_out, tail);
1079 				}else{
1080 					return rc;
1081 				}
1082 				break;
1083 
1084 			case mosq_ms_resend_pubrel:
1085 				rc = send__pubrel(context, mid);
1086 				if(!rc){
1087 					tail->state = mosq_ms_wait_for_pubcomp;
1088 				}else{
1089 					return rc;
1090 				}
1091 				break;
1092 
1093 			case mosq_ms_invalid:
1094 			case mosq_ms_send_pubrec:
1095 			case mosq_ms_resend_pubcomp:
1096 			case mosq_ms_wait_for_puback:
1097 			case mosq_ms_wait_for_pubrec:
1098 			case mosq_ms_wait_for_pubrel:
1099 			case mosq_ms_wait_for_pubcomp:
1100 			case mosq_ms_queued:
1101 				break;
1102 		}
1103 	}
1104 
1105 	DL_FOREACH_SAFE(context->msgs_in.queued, tail, tmp){
1106 		if(context->msgs_out.inflight_maximum != 0 && context->msgs_in.inflight_quota == 0){
1107 			break;
1108 		}
1109 
1110 		msg_count++;
1111 
1112 		if(tail->qos == 2){
1113 			tail->state = mosq_ms_send_pubrec;
1114 			db__message_dequeue_first(context, &context->msgs_in);
1115 			rc = send__pubrec(context, tail->mid, 0);
1116 			if(!rc){
1117 				tail->state = mosq_ms_wait_for_pubrel;
1118 			}else{
1119 				return rc;
1120 			}
1121 		}
1122 	}
1123 
1124 	DL_FOREACH_SAFE(context->msgs_out.queued, tail, tmp){
1125 		if(context->msgs_out.inflight_maximum != 0 && context->msgs_out.inflight_quota == 0){
1126 			break;
1127 		}
1128 
1129 		msg_count++;
1130 
1131 		switch(tail->qos){
1132 			case 0:
1133 				tail->state = mosq_ms_publish_qos0;
1134 				break;
1135 			case 1:
1136 				tail->state = mosq_ms_publish_qos1;
1137 				break;
1138 			case 2:
1139 				tail->state = mosq_ms_publish_qos2;
1140 				break;
1141 		}
1142 		db__message_dequeue_first(context, &context->msgs_out);
1143 	}
1144 
1145 	return MOSQ_ERR_SUCCESS;
1146 }
1147 
db__limits_set(unsigned long inflight_bytes,int queued,unsigned long queued_bytes)1148 void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes)
1149 {
1150 	max_inflight_bytes = inflight_bytes;
1151 	max_queued = queued;
1152 	max_queued_bytes = queued_bytes;
1153 }
1154 
1155