1 /*
2 Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
3
4 All rights reserved. This program and the accompanying materials
5 are made available under the terms of the Eclipse Public License 2.0
6 and Eclipse Distribution License v1.0 which accompany this distribution.
7
8 The Eclipse Public License is available at
9 https://www.eclipse.org/legal/epl-2.0/
10 and the Eclipse Distribution License is available at
11 http://www.eclipse.org/org/documents/edl-v10.php.
12
13 SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15 Contributors:
16 Roger Light - initial implementation and documentation.
17 Tatsuzo Osawa - Add epoll.
18 */
19
20 #include "config.h"
21
22 #ifndef WIN32
23 # define _GNU_SOURCE
24 #endif
25
26 #include <assert.h>
27 #ifndef WIN32
28 #include <unistd.h>
29 #else
30 #include <process.h>
31 #include <winsock2.h>
32 #include <ws2tcpip.h>
33 #endif
34
35 #include <errno.h>
36 #include <signal.h>
37 #include <stdio.h>
38 #include <string.h>
39 #ifndef WIN32
40 # include <sys/socket.h>
41 #endif
42 #include <time.h>
43 #include <utlist.h>
44
45 #ifdef WITH_WEBSOCKETS
46 # include <libwebsockets.h>
47 #endif
48
49 #include "mosquitto_broker_internal.h"
50 #include "memory_mosq.h"
51 #include "mqtt_protocol.h"
52 #include "packet_mosq.h"
53 #include "send_mosq.h"
54 #include "sys_tree.h"
55 #include "time_mosq.h"
56 #include "util_mosq.h"
57
58 extern bool flag_reload;
59 #ifdef WITH_PERSISTENCE
60 extern bool flag_db_backup;
61 #endif
62 extern bool flag_tree_print;
63 extern int run;
64
65 #if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
lws__sul_callback(struct lws_sorted_usec_list * l)66 void lws__sul_callback(struct lws_sorted_usec_list *l)
67 {
68 }
69
70 static struct lws_sorted_usec_list sul;
71 #endif
72
single_publish(struct mosquitto * context,struct mosquitto_message_v5 * msg,uint32_t message_expiry)73 static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg, uint32_t message_expiry)
74 {
75 struct mosquitto_msg_store *stored;
76 uint16_t mid;
77
78 stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
79 if(stored == NULL) return MOSQ_ERR_NOMEM;
80
81 stored->topic = msg->topic;
82 msg->topic = NULL;
83 stored->retain = 0;
84 stored->payloadlen = (uint32_t)msg->payloadlen;
85 stored->payload = mosquitto__malloc(stored->payloadlen+1);
86 if(stored->payload == NULL){
87 db__msg_store_free(stored);
88 return MOSQ_ERR_NOMEM;
89 }
90 /* Ensure payload is always zero terminated, this is the reason for the extra byte above */
91 ((uint8_t *)stored->payload)[stored->payloadlen] = 0;
92 memcpy(stored->payload, msg->payload, stored->payloadlen);
93
94 if(msg->properties){
95 stored->properties = msg->properties;
96 msg->properties = NULL;
97 }
98
99 if(db__message_store(context, stored, message_expiry, 0, mosq_mo_broker)) return 1;
100
101 if(msg->qos){
102 mid = mosquitto__mid_generate(context);
103 }else{
104 mid = 0;
105 }
106 return db__message_insert(context, mid, mosq_md_out, (uint8_t)msg->qos, 0, stored, msg->properties, true);
107 }
108
109
read_message_expiry_interval(mosquitto_property ** proplist,uint32_t * message_expiry)110 static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t *message_expiry)
111 {
112 mosquitto_property *p, *previous = NULL;
113
114 *message_expiry = 0;
115
116 if(!proplist) return;
117
118 p = *proplist;
119 while(p){
120 if(p->identifier == MQTT_PROP_MESSAGE_EXPIRY_INTERVAL){
121 *message_expiry = p->value.i32;
122 if(p == *proplist){
123 *proplist = p->next;
124 }else{
125 previous->next = p->next;
126 }
127 property__free(&p);
128 return;
129
130 }
131 previous = p;
132 p = p->next;
133 }
134 }
135
queue_plugin_msgs(void)136 static void queue_plugin_msgs(void)
137 {
138 struct mosquitto_message_v5 *msg, *tmp;
139 struct mosquitto *context;
140 uint32_t message_expiry;
141
142 DL_FOREACH_SAFE(db.plugin_msgs, msg, tmp){
143 DL_DELETE(db.plugin_msgs, msg);
144
145 read_message_expiry_interval(&msg->properties, &message_expiry);
146
147 if(msg->clientid){
148 HASH_FIND(hh_id, db.contexts_by_id, msg->clientid, strlen(msg->clientid), context);
149 if(context){
150 single_publish(context, msg, message_expiry);
151 }
152 }else{
153 db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, message_expiry, &msg->properties);
154 }
155 mosquitto__free(msg->topic);
156 mosquitto__free(msg->payload);
157 mosquitto_property_free_all(&msg->properties);
158 mosquitto__free(msg->clientid);
159 mosquitto__free(msg);
160 }
161 }
162
163
mosquitto_main_loop(struct mosquitto__listener_sock * listensock,int listensock_count)164 int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count)
165 {
166 #ifdef WITH_SYS_TREE
167 time_t start_time = mosquitto_time();
168 #endif
169 #ifdef WITH_PERSISTENCE
170 time_t last_backup = mosquitto_time();
171 #endif
172 #ifdef WITH_WEBSOCKETS
173 int i;
174 #endif
175 int rc;
176
177
178 #if defined(WITH_WEBSOCKETS) && LWS_LIBRARY_VERSION_NUMBER == 3002000
179 memset(&sul, 0, sizeof(struct lws_sorted_usec_list));
180 #endif
181
182 db.now_s = mosquitto_time();
183 db.now_real_s = time(NULL);
184
185 #ifdef WITH_BRIDGE
186 rc = bridge__register_local_connections();
187 if(rc) return rc;
188 #endif
189
190 while(run){
191 queue_plugin_msgs();
192 context__free_disused();
193 #ifdef WITH_SYS_TREE
194 if(db.config->sys_interval > 0){
195 sys_tree__update(db.config->sys_interval, start_time);
196 }
197 #endif
198
199 keepalive__check();
200
201 #ifdef WITH_BRIDGE
202 bridge_check();
203 #endif
204
205 rc = mux__handle(listensock, listensock_count);
206 if(rc) return rc;
207
208 session_expiry__check();
209 will_delay__check();
210 #ifdef WITH_PERSISTENCE
211 if(db.config->persistence && db.config->autosave_interval){
212 if(db.config->autosave_on_changes){
213 if(db.persistence_changes >= db.config->autosave_interval){
214 persist__backup(false);
215 db.persistence_changes = 0;
216 }
217 }else{
218 if(last_backup + db.config->autosave_interval < db.now_s){
219 persist__backup(false);
220 last_backup = db.now_s;
221 }
222 }
223 }
224 #endif
225
226 #ifdef WITH_PERSISTENCE
227 if(flag_db_backup){
228 persist__backup(false);
229 flag_db_backup = false;
230 }
231 #endif
232 if(flag_reload){
233 log__printf(NULL, MOSQ_LOG_INFO, "Reloading config.");
234 config__read(db.config, true);
235 listeners__reload_all_certificates();
236 mosquitto_security_cleanup(true);
237 mosquitto_security_init(true);
238 mosquitto_security_apply();
239 log__close(db.config);
240 log__init(db.config);
241 flag_reload = false;
242 }
243 if(flag_tree_print){
244 sub__tree_print(db.subs, 0);
245 flag_tree_print = false;
246 #ifdef WITH_XTREPORT
247 xtreport();
248 #endif
249 }
250 #ifdef WITH_WEBSOCKETS
251 for(i=0; i<db.config->listener_count; i++){
252 /* Extremely hacky, should be using the lws provided external poll
253 * interface, but their interface has changed recently and ours
254 * will soon, so for now websockets clients are second class
255 * citizens. */
256 if(db.config->listeners[i].ws_context){
257 #if LWS_LIBRARY_VERSION_NUMBER > 3002000
258 lws_service(db.config->listeners[i].ws_context, -1);
259 #elif LWS_LIBRARY_VERSION_NUMBER == 3002000
260 lws_sul_schedule(db.config->listeners[i].ws_context, 0, &sul, lws__sul_callback, 10);
261 lws_service(db.config->listeners[i].ws_context, 0);
262 #else
263 lws_service(db.config->listeners[i].ws_context, 0);
264 #endif
265
266 }
267 }
268 #endif
269 plugin__handle_tick();
270 }
271
272 mux__cleanup();
273
274 return MOSQ_ERR_SUCCESS;
275 }
276
do_disconnect(struct mosquitto * context,int reason)277 void do_disconnect(struct mosquitto *context, int reason)
278 {
279 const char *id;
280 #ifdef WITH_WEBSOCKETS
281 bool is_duplicate = false;
282 #endif
283
284 if(context->state == mosq_cs_disconnected){
285 return;
286 }
287 #ifdef WITH_WEBSOCKETS
288 if(context->wsi){
289 if(context->state == mosq_cs_duplicate){
290 is_duplicate = true;
291 }
292
293 if(context->state != mosq_cs_disconnecting && context->state != mosq_cs_disconnect_with_will){
294 mosquitto__set_state(context, mosq_cs_disconnect_ws);
295 }
296 if(context->wsi){
297 lws_callback_on_writable(context->wsi);
298 }
299 if(context->sock != INVALID_SOCKET){
300 HASH_DELETE(hh_sock, db.contexts_by_sock, context);
301 mux__delete(context);
302 context->sock = INVALID_SOCKET;
303 }
304 if(is_duplicate){
305 /* This occurs if another client is taking over the same client id.
306 * It is important to remove this from the by_id hash here, so it
307 * doesn't leave us with multiple clients in the hash with the same
308 * id. Websockets doesn't actually close the connection here,
309 * unlike for normal clients, which means there is extra time when
310 * there could be two clients with the same id in the hash. */
311 context__remove_from_by_id(context);
312 }
313 }else
314 #endif
315 {
316 if(db.config->connection_messages == true){
317 if(context->id){
318 id = context->id;
319 }else{
320 id = "<unknown>";
321 }
322 if(context->state != mosq_cs_disconnecting && context->state != mosq_cs_disconnect_with_will){
323 switch(reason){
324 case MOSQ_ERR_SUCCESS:
325 break;
326 case MOSQ_ERR_MALFORMED_PACKET:
327 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to malformed packet.", id);
328 break;
329 case MOSQ_ERR_PROTOCOL:
330 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to protocol error.", id);
331 break;
332 case MOSQ_ERR_CONN_LOST:
333 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s closed its connection.", id);
334 break;
335 case MOSQ_ERR_AUTH:
336 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected, not authorised.", id);
337 break;
338 case MOSQ_ERR_KEEPALIVE:
339 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", id);
340 break;
341 case MOSQ_ERR_OVERSIZE_PACKET:
342 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to oversize packet.", id);
343 break;
344 case MOSQ_ERR_PAYLOAD_SIZE:
345 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to oversize payload.", id);
346 break;
347 case MOSQ_ERR_NOMEM:
348 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to out of memory.", id);
349 break;
350 case MOSQ_ERR_NOT_SUPPORTED:
351 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to using not allowed feature (QoS too high, retain not supported, or bad AUTH method).", id);
352 break;
353 case MOSQ_ERR_ADMINISTRATIVE_ACTION:
354 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s been disconnected by administrative action.", id);
355 break;
356 case MOSQ_ERR_ERRNO:
357 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected: %s.", id, strerror(errno));
358 break;
359 default:
360 log__printf(NULL, MOSQ_LOG_NOTICE, "Bad socket read/write on client %s: %s", id, mosquitto_strerror(reason));
361 break;
362 }
363 }else{
364 if(reason == MOSQ_ERR_ADMINISTRATIVE_ACTION){
365 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s been disconnected by administrative action.", id);
366 }else{
367 log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected.", id);
368 }
369 }
370 }
371 mux__delete(context);
372 context__disconnect(context);
373 }
374 }
375
376
377