1 /*
2 Copyright (c) 2010-2020 Roger Light <roger@atchoo.org>
3 
4 All rights reserved. This program and the accompanying materials
5 are made available under the terms of the Eclipse Public License v1.0
6 and Eclipse Distribution License v1.0 which accompany this distribution.
7 
8 The Eclipse Public License is available at
9    http://www.eclipse.org/legal/epl-v10.html
10 and the Eclipse Distribution License is available at
11   http://www.eclipse.org/org/documents/edl-v10.php.
12 
13 Contributors:
14    Roger Light - initial implementation and documentation.
15 */
16 
17 #include "config.h"
18 
19 #include <errno.h>
20 #ifndef WIN32
21 #include <sys/select.h>
22 #include <time.h>
23 #endif
24 
25 #include "mosquitto.h"
26 #include "mosquitto_internal.h"
27 #include "net_mosq.h"
28 #include "packet_mosq.h"
29 #include "socks_mosq.h"
30 #include "tls_mosq.h"
31 #include "util_mosq.h"
32 
33 #if !defined(WIN32) && !defined(__SYMBIAN32__)
34 #define HAVE_PSELECT
35 #endif
36 
mosquitto_loop(struct mosquitto * mosq,int timeout,int max_packets)37 int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
38 {
39 #ifdef HAVE_PSELECT
40 	struct timespec local_timeout;
41 #else
42 	struct timeval local_timeout;
43 #endif
44 	fd_set readfds, writefds;
45 	int fdcount;
46 	int rc;
47 	char pairbuf;
48 	int maxfd = 0;
49 	time_t now;
50 #ifdef WITH_SRV
51 	int state;
52 #endif
53 
54 	if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
55 #ifndef WIN32
56 	if(mosq->sock >= FD_SETSIZE || mosq->sockpairR >= FD_SETSIZE){
57 		return MOSQ_ERR_INVAL;
58 	}
59 #endif
60 
61 	FD_ZERO(&readfds);
62 	FD_ZERO(&writefds);
63 	if(mosq->sock != INVALID_SOCKET){
64 		maxfd = mosq->sock;
65 		FD_SET(mosq->sock, &readfds);
66 		pthread_mutex_lock(&mosq->current_out_packet_mutex);
67 		pthread_mutex_lock(&mosq->out_packet_mutex);
68 		if(mosq->out_packet || mosq->current_out_packet){
69 			FD_SET(mosq->sock, &writefds);
70 		}
71 #ifdef WITH_TLS
72 		if(mosq->ssl){
73 			if(mosq->want_write){
74 				FD_SET(mosq->sock, &writefds);
75 			}else if(mosq->want_connect){
76 				/* Remove possible FD_SET from above, we don't want to check
77 				 * for writing if we are still connecting, unless want_write is
78 				 * definitely set. The presence of outgoing packets does not
79 				 * matter yet. */
80 				FD_CLR(mosq->sock, &writefds);
81 			}
82 		}
83 #endif
84 		pthread_mutex_unlock(&mosq->out_packet_mutex);
85 		pthread_mutex_unlock(&mosq->current_out_packet_mutex);
86 	}else{
87 #ifdef WITH_SRV
88 		if(mosq->achan){
89 			state = mosquitto__get_state(mosq);
90 			if(state == mosq_cs_connect_srv){
91 				rc = ares_fds(mosq->achan, &readfds, &writefds);
92 				if(rc > maxfd){
93 					maxfd = rc;
94 				}
95 			}else{
96 				return MOSQ_ERR_NO_CONN;
97 			}
98 		}
99 #else
100 		return MOSQ_ERR_NO_CONN;
101 #endif
102 	}
103 	if(mosq->sockpairR != INVALID_SOCKET){
104 		/* sockpairR is used to break out of select() before the timeout, on a
105 		 * call to publish() etc. */
106 		FD_SET(mosq->sockpairR, &readfds);
107 		if(mosq->sockpairR > maxfd){
108 			maxfd = mosq->sockpairR;
109 		}
110 	}
111 
112 	if(timeout < 0){
113 		timeout = 1000;
114 	}
115 
116 	now = mosquitto_time();
117 	if(mosq->next_msg_out && now + timeout/1000 > mosq->next_msg_out){
118 		timeout = (mosq->next_msg_out - now)*1000;
119 	}
120 
121 	if(timeout < 0){
122 		/* There has been a delay somewhere which means we should have already
123 		 * sent a message. */
124 		timeout = 0;
125 	}
126 
127 	local_timeout.tv_sec = timeout/1000;
128 #ifdef HAVE_PSELECT
129 	local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
130 #else
131 	local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
132 #endif
133 
134 #ifdef HAVE_PSELECT
135 	fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
136 #else
137 	fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
138 #endif
139 	if(fdcount == -1){
140 #ifdef WIN32
141 		errno = WSAGetLastError();
142 #endif
143 		if(errno == EINTR){
144 			return MOSQ_ERR_SUCCESS;
145 		}else{
146 			return MOSQ_ERR_ERRNO;
147 		}
148 	}else{
149 		if(mosq->sock != INVALID_SOCKET){
150 			if(FD_ISSET(mosq->sock, &readfds)){
151 				rc = mosquitto_loop_read(mosq, max_packets);
152 				if(rc || mosq->sock == INVALID_SOCKET){
153 					return rc;
154 				}
155 			}
156 			if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
157 #ifndef WIN32
158 				if(read(mosq->sockpairR, &pairbuf, 1) == 0){
159 				}
160 #else
161 				recv(mosq->sockpairR, &pairbuf, 1, 0);
162 #endif
163 				/* Fake write possible, to stimulate output write even though
164 				 * we didn't ask for it, because at that point the publish or
165 				 * other command wasn't present. */
166 				if(mosq->sock != INVALID_SOCKET)
167 					FD_SET(mosq->sock, &writefds);
168 			}
169 			if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){
170 #ifdef WITH_TLS
171 				if(mosq->want_connect){
172 					rc = net__socket_connect_tls(mosq);
173 					if(rc) return rc;
174 				}else
175 #endif
176 				{
177 					rc = mosquitto_loop_write(mosq, max_packets);
178 					if(rc || mosq->sock == INVALID_SOCKET){
179 						return rc;
180 					}
181 				}
182 			}
183 		}
184 #ifdef WITH_SRV
185 		if(mosq->achan){
186 			ares_process(mosq->achan, &readfds, &writefds);
187 		}
188 #endif
189 	}
190 	return mosquitto_loop_misc(mosq);
191 }
192 
193 
mosquitto_loop_forever(struct mosquitto * mosq,int timeout,int max_packets)194 int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
195 {
196 	int run = 1;
197 	int rc;
198 	unsigned long reconnect_delay;
199 #ifndef WIN32
200 	struct timespec req, rem;
201 #endif
202 	int state;
203 
204 	if(!mosq) return MOSQ_ERR_INVAL;
205 
206 	mosq->reconnects = 0;
207 
208 	while(run){
209 		do{
210 			rc = mosquitto_loop(mosq, timeout, max_packets);
211 		}while(run && rc == MOSQ_ERR_SUCCESS);
212 		/* Quit after fatal errors. */
213 		switch(rc){
214 			case MOSQ_ERR_NOMEM:
215 			case MOSQ_ERR_PROTOCOL:
216 			case MOSQ_ERR_INVAL:
217 			case MOSQ_ERR_NOT_FOUND:
218 			case MOSQ_ERR_TLS:
219 			case MOSQ_ERR_PAYLOAD_SIZE:
220 			case MOSQ_ERR_NOT_SUPPORTED:
221 			case MOSQ_ERR_AUTH:
222 			case MOSQ_ERR_ACL_DENIED:
223 			case MOSQ_ERR_UNKNOWN:
224 			case MOSQ_ERR_EAI:
225 			case MOSQ_ERR_PROXY:
226 				return rc;
227 			case MOSQ_ERR_ERRNO:
228 				break;
229 		}
230 		if(errno == EPROTO){
231 			return rc;
232 		}
233 		do{
234 			rc = MOSQ_ERR_SUCCESS;
235 			state = mosquitto__get_state(mosq);
236 			if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
237 				run = 0;
238 			}else{
239 				if(mosq->reconnect_delay_max > mosq->reconnect_delay){
240 					if(mosq->reconnect_exponential_backoff){
241 						reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1)*(mosq->reconnects+1);
242 					}else{
243 						reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1);
244 					}
245 				}else{
246 					reconnect_delay = mosq->reconnect_delay;
247 				}
248 
249 				if(reconnect_delay > mosq->reconnect_delay_max){
250 					reconnect_delay = mosq->reconnect_delay_max;
251 				}else{
252 					mosq->reconnects++;
253 				}
254 
255 #ifdef WIN32
256 				Sleep(reconnect_delay*1000);
257 #else
258 				req.tv_sec = reconnect_delay;
259 				req.tv_nsec = 0;
260 				while(nanosleep(&req, &rem) == -1 && errno == EINTR){
261 					req = rem;
262 				}
263 #endif
264 
265 				state = mosquitto__get_state(mosq);
266 				if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
267 					run = 0;
268 				}else{
269 					rc = mosquitto_reconnect(mosq);
270 				}
271 			}
272 		}while(run && rc != MOSQ_ERR_SUCCESS);
273 	}
274 	return rc;
275 }
276 
277 
mosquitto_loop_misc(struct mosquitto * mosq)278 int mosquitto_loop_misc(struct mosquitto *mosq)
279 {
280 	if(!mosq) return MOSQ_ERR_INVAL;
281 	if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
282 
283 	return mosquitto__check_keepalive(mosq);
284 }
285 
286 
mosquitto__loop_rc_handle(struct mosquitto * mosq,int rc)287 static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
288 {
289 	int state;
290 
291 	if(rc){
292 		net__socket_close(mosq);
293 		state = mosquitto__get_state(mosq);
294 		if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
295 			rc = MOSQ_ERR_SUCCESS;
296 		}
297 		pthread_mutex_lock(&mosq->callback_mutex);
298 		if(mosq->on_disconnect){
299 			mosq->in_callback = true;
300 			mosq->on_disconnect(mosq, mosq->userdata, rc);
301 			mosq->in_callback = false;
302 		}
303 		if(mosq->on_disconnect_v5){
304 			mosq->in_callback = true;
305 			mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
306 			mosq->in_callback = false;
307 		}
308 		pthread_mutex_unlock(&mosq->callback_mutex);
309 	}
310 	return rc;
311 }
312 
313 
mosquitto_loop_read(struct mosquitto * mosq,int max_packets)314 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
315 {
316 	int rc;
317 	int i;
318 	if(max_packets < 1) return MOSQ_ERR_INVAL;
319 
320 #ifdef WITH_TLS
321 	if(mosq->want_connect){
322 		return net__socket_connect_tls(mosq);
323 	}
324 #endif
325 
326 	pthread_mutex_lock(&mosq->msgs_out.mutex);
327 	max_packets = mosq->msgs_out.queue_len;
328 	pthread_mutex_unlock(&mosq->msgs_out.mutex);
329 
330 	pthread_mutex_lock(&mosq->msgs_in.mutex);
331 	max_packets += mosq->msgs_in.queue_len;
332 	pthread_mutex_unlock(&mosq->msgs_in.mutex);
333 
334 	if(max_packets < 1) max_packets = 1;
335 	/* Queue len here tells us how many messages are awaiting processing and
336 	 * have QoS > 0. We should try to deal with that many in this loop in order
337 	 * to keep up. */
338 	for(i=0; i<max_packets || SSL_DATA_PENDING(mosq); i++){
339 #ifdef WITH_SOCKS
340 		if(mosq->socks5_host){
341 			rc = socks5__read(mosq);
342 		}else
343 #endif
344 		{
345 			rc = packet__read(mosq);
346 		}
347 		if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
348 			return mosquitto__loop_rc_handle(mosq, rc);
349 		}
350 	}
351 	return rc;
352 }
353 
354 
mosquitto_loop_write(struct mosquitto * mosq,int max_packets)355 int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
356 {
357 	int rc;
358 	int i;
359 	if(max_packets < 1) return MOSQ_ERR_INVAL;
360 
361 	pthread_mutex_lock(&mosq->msgs_out.mutex);
362 	max_packets = mosq->msgs_out.queue_len;
363 	pthread_mutex_unlock(&mosq->msgs_out.mutex);
364 
365 	pthread_mutex_lock(&mosq->msgs_in.mutex);
366 	max_packets += mosq->msgs_in.queue_len;
367 	pthread_mutex_unlock(&mosq->msgs_in.mutex);
368 
369 	if(max_packets < 1) max_packets = 1;
370 	/* Queue len here tells us how many messages are awaiting processing and
371 	 * have QoS > 0. We should try to deal with that many in this loop in order
372 	 * to keep up. */
373 	for(i=0; i<max_packets; i++){
374 		rc = packet__write(mosq);
375 		if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
376 			return mosquitto__loop_rc_handle(mosq, rc);
377 		}
378 	}
379 	return rc;
380 }
381 
382