1 /*
2 Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
3 
4 All rights reserved. This program and the accompanying materials
5 are made available under the terms of the Eclipse Public License 2.0
6 and Eclipse Distribution License v1.0 which accompany this distribution.
7 
8 The Eclipse Public License is available at
9    https://www.eclipse.org/legal/epl-2.0/
10 and the Eclipse Distribution License is available at
11   http://www.eclipse.org/org/documents/edl-v10.php.
12 
13 SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14 
15 Contributors:
16    Roger Light - initial implementation and documentation.
17    Tatsuzo Osawa - Add epoll.
18 */
19 
20 #include "config.h"
21 
22 #ifndef WIN32
23 #  define _GNU_SOURCE
24 #endif
25 
26 #include <assert.h>
27 #ifndef WIN32
28 #include <unistd.h>
29 #else
30 #include <process.h>
31 #include <winsock2.h>
32 #include <ws2tcpip.h>
33 #endif
34 
35 #include <errno.h>
36 #include <signal.h>
37 #include <stdio.h>
38 #include <string.h>
39 #ifndef WIN32
40 #  include <sys/socket.h>
41 #endif
42 #include <time.h>
43 #include <utlist.h>
44 
45 #ifdef WITH_WEBSOCKETS
46 #  include <libwebsockets.h>
47 #endif
48 
49 #include "mosquitto_broker_internal.h"
50 #include "memory_mosq.h"
51 #include "mqtt_protocol.h"
52 #include "packet_mosq.h"
53 #include "send_mosq.h"
54 #include "sys_tree.h"
55 #include "time_mosq.h"
56 #include "util_mosq.h"
57 
58 extern bool flag_reload;
59 #ifdef WITH_PERSISTENCE
60 extern bool flag_db_backup;
61 #endif
62 extern bool flag_tree_print;
63 extern int run;
64 
65 #if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
lws__sul_callback(struct lws_sorted_usec_list * l)66 void lws__sul_callback(struct lws_sorted_usec_list *l)
67 {
68 }
69 
70 static struct lws_sorted_usec_list sul;
71 #endif
72 
single_publish(struct mosquitto * context,struct mosquitto_message_v5 * msg,uint32_t message_expiry)73 static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg, uint32_t message_expiry)
74 {
75 	struct mosquitto_msg_store *stored;
76 	uint16_t mid;
77 
78 	stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
79 	if(stored == NULL) return MOSQ_ERR_NOMEM;
80 
81 	stored->topic = msg->topic;
82 	msg->topic = NULL;
83 	stored->retain = 0;
84 	stored->payloadlen = (uint32_t)msg->payloadlen;
85 	stored->payload = mosquitto__malloc(stored->payloadlen+1);
86 	if(stored->payload == NULL){
87 		db__msg_store_free(stored);
88 		return MOSQ_ERR_NOMEM;
89 	}
90 	/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
91 	((uint8_t *)stored->payload)[stored->payloadlen] = 0;
92 	memcpy(stored->payload, msg->payload, stored->payloadlen);
93 
94 	if(msg->properties){
95 		stored->properties = msg->properties;
96 		msg->properties = NULL;
97 	}
98 
99 	if(db__message_store(context, stored, message_expiry, 0, mosq_mo_broker)) return 1;
100 
101 	if(msg->qos){
102 		mid = mosquitto__mid_generate(context);
103 	}else{
104 		mid = 0;
105 	}
106 	return db__message_insert(context, mid, mosq_md_out, (uint8_t)msg->qos, 0, stored, msg->properties, true);
107 }
108 
109 
read_message_expiry_interval(mosquitto_property ** proplist,uint32_t * message_expiry)110 static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t *message_expiry)
111 {
112 	mosquitto_property *p, *previous = NULL;
113 
114 	*message_expiry = 0;
115 
116 	if(!proplist) return;
117 
118 	p = *proplist;
119 	while(p){
120 		if(p->identifier == MQTT_PROP_MESSAGE_EXPIRY_INTERVAL){
121 			*message_expiry = p->value.i32;
122 			if(p == *proplist){
123 				*proplist = p->next;
124 			}else{
125 				previous->next = p->next;
126 			}
127 			property__free(&p);
128 			return;
129 
130 		}
131 		previous = p;
132 		p = p->next;
133 	}
134 }
135 
queue_plugin_msgs(void)136 static void queue_plugin_msgs(void)
137 {
138 	struct mosquitto_message_v5 *msg, *tmp;
139 	struct mosquitto *context;
140 	uint32_t message_expiry;
141 
142 	DL_FOREACH_SAFE(db.plugin_msgs, msg, tmp){
143 		DL_DELETE(db.plugin_msgs, msg);
144 
145 		read_message_expiry_interval(&msg->properties, &message_expiry);
146 
147 		if(msg->clientid){
148 			HASH_FIND(hh_id, db.contexts_by_id, msg->clientid, strlen(msg->clientid), context);
149 			if(context){
150 				single_publish(context, msg, message_expiry);
151 			}
152 		}else{
153 			db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, message_expiry, &msg->properties);
154 		}
155 		mosquitto__free(msg->topic);
156 		mosquitto__free(msg->payload);
157 		mosquitto_property_free_all(&msg->properties);
158 		mosquitto__free(msg->clientid);
159 		mosquitto__free(msg);
160 	}
161 }
162 
163 
mosquitto_main_loop(struct mosquitto__listener_sock * listensock,int listensock_count)164 int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count)
165 {
166 #ifdef WITH_SYS_TREE
167 	time_t start_time = mosquitto_time();
168 #endif
169 #ifdef WITH_PERSISTENCE
170 	time_t last_backup = mosquitto_time();
171 #endif
172 #ifdef WITH_WEBSOCKETS
173 	int i;
174 #endif
175 	int rc;
176 
177 
178 #if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
179 	memset(&sul, 0, sizeof(struct lws_sorted_usec_list));
180 #endif
181 
182 	db.now_s = mosquitto_time();
183 	db.now_real_s = time(NULL);
184 
185 #ifdef WITH_BRIDGE
186 	rc = bridge__register_local_connections();
187 	if(rc) return rc;
188 #endif
189 
190 	while(run){
191 		queue_plugin_msgs();
192 		context__free_disused();
193 #ifdef WITH_SYS_TREE
194 		if(db.config->sys_interval > 0){
195 			sys_tree__update(db.config->sys_interval, start_time);
196 		}
197 #endif
198 
199 		keepalive__check();
200 
201 #ifdef WITH_BRIDGE
202 		bridge_check();
203 #endif
204 
205 		rc = mux__handle(listensock, listensock_count);
206 		if(rc) return rc;
207 
208 		session_expiry__check();
209 		will_delay__check();
210 #ifdef WITH_PERSISTENCE
211 		if(db.config->persistence && db.config->autosave_interval){
212 			if(db.config->autosave_on_changes){
213 				if(db.persistence_changes >= db.config->autosave_interval){
214 					persist__backup(false);
215 					db.persistence_changes = 0;
216 				}
217 			}else{
218 				if(last_backup + db.config->autosave_interval < db.now_s){
219 					persist__backup(false);
220 					last_backup = db.now_s;
221 				}
222 			}
223 		}
224 #endif
225 
226 #ifdef WITH_PERSISTENCE
227 		if(flag_db_backup){
228 			persist__backup(false);
229 			flag_db_backup = false;
230 		}
231 #endif
232 		if(flag_reload){
233 			log__printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
234 			config__read(db.config, true);
235 			listeners__reload_all_certificates();
236 			mosquitto_security_cleanup(true);
237 			mosquitto_security_init(true);
238 			mosquitto_security_apply();
239 			log__close(db.config);
240 			log__init(db.config);
241 			flag_reload = false;
242 		}
243 		if(flag_tree_print){
244 			sub__tree_print(db.subs, 0);
245 			flag_tree_print = false;
246 #ifdef WITH_XTREPORT
247 			xtreport();
248 #endif
249 		}
250 #ifdef WITH_WEBSOCKETS
251 		for(i=0; i<db.config->listener_count; i++){
252 			/* Extremely hacky, should be using the lws provided external poll
253 			 * interface, but their interface has changed recently and ours
254 			 * will soon, so for now websockets clients are second class
255 			 * citizens. */
256 			if(db.config->listeners[i].ws_context){
257 #if LWS_LIBRARY_VERSION_NUMBER > 3002000
258 				lws_service(db.config->listeners[i].ws_context, -1);
259 #elif LWS_LIBRARY_VERSION_NUMBER == 3002000
260 				lws_sul_schedule(db.config->listeners[i].ws_context, 0, &sul, lws__sul_callback, 10);
261 				lws_service(db.config->listeners[i].ws_context, 0);
262 #else
263 				lws_service(db.config->listeners[i].ws_context, 0);
264 #endif
265 
266 			}
267 		}
268 #endif
269 		plugin__handle_tick();
270 	}
271 
272 	mux__cleanup();
273 
274 	return MOSQ_ERR_SUCCESS;
275 }
276 
do_disconnect(struct mosquitto * context,int reason)277 void do_disconnect(struct mosquitto *context, int reason)
278 {
279 	const char *id;
280 #ifdef WITH_WEBSOCKETS
281 	bool is_duplicate = false;
282 #endif
283 
284 	if(context->state == mosq_cs_disconnected){
285 		return;
286 	}
287 #ifdef WITH_WEBSOCKETS
288 	if(context->wsi){
289 		if(context->state == mosq_cs_duplicate){
290 			is_duplicate = true;
291 		}
292 
293 		if(context->state != mosq_cs_disconnecting && context->state != mosq_cs_disconnect_with_will){
294 			mosquitto__set_state(context, mosq_cs_disconnect_ws);
295 		}
296 		if(context->wsi){
297 			lws_callback_on_writable(context->wsi);
298 		}
299 		if(context->sock != INVALID_SOCKET){
300 			HASH_DELETE(hh_sock, db.contexts_by_sock, context);
301 			mux__delete(context);
302 			context->sock = INVALID_SOCKET;
303 		}
304 		if(is_duplicate){
305 			/* This occurs if another client is taking over the same client id.
306 			 * It is important to remove this from the by_id hash here, so it
307 			 * doesn't leave us with multiple clients in the hash with the same
308 			 * id. Websockets doesn't actually close the connection here,
309 			 * unlike for normal clients, which means there is extra time when
310 			 * there could be two clients with the same id in the hash. */
311 			context__remove_from_by_id(context);
312 		}
313 	}else
314 #endif
315 	{
316 		if(db.config->connection_messages == true){
317 			if(context->id){
318 				id = context->id;
319 			}else{
320 				id = "<unknown>";
321 			}
322 			if(context->state != mosq_cs_disconnecting && context->state != mosq_cs_disconnect_with_will){
323 				switch(reason){
324 					case MOSQ_ERR_SUCCESS:
325 						break;
326 					case MOSQ_ERR_MALFORMED_PACKET:
327 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to malformed packet.", id);
328 						break;
329 					case MOSQ_ERR_PROTOCOL:
330 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to protocol error.", id);
331 						break;
332 					case MOSQ_ERR_CONN_LOST:
333 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s closed its connection.", id);
334 						break;
335 					case MOSQ_ERR_AUTH:
336 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected, not authorised.", id);
337 						break;
338 					case MOSQ_ERR_KEEPALIVE:
339 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", id);
340 						break;
341 					case MOSQ_ERR_OVERSIZE_PACKET:
342 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to oversize packet.", id);
343 						break;
344 					case MOSQ_ERR_PAYLOAD_SIZE:
345 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to oversize payload.", id);
346 						break;
347 					case MOSQ_ERR_NOMEM:
348 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to out of memory.", id);
349 						break;
350 					case MOSQ_ERR_NOT_SUPPORTED:
351 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to using not allowed feature (QoS too high, retain not supported, or bad AUTH method).", id);
352 						break;
353 					case MOSQ_ERR_ADMINISTRATIVE_ACTION:
354 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s been disconnected by administrative action.", id);
355 						break;
356 					case MOSQ_ERR_ERRNO:
357 						log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected: %s.", id, strerror(errno));
358 						break;
359 					default:
360 						log__printf(NULL, MOSQ_LOG_NOTICE, "Bad socket read/write on client %s: %s", id, mosquitto_strerror(reason));
361 						break;
362 				}
363 			}else{
364 				if(reason == MOSQ_ERR_ADMINISTRATIVE_ACTION){
365 					log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s been disconnected by administrative action.", id);
366 				}else{
367 					log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", id);
368 				}
369 			}
370 		}
371 		mux__delete(context);
372 		context__disconnect(context);
373 	}
374 }
375 
376 
377