1 /*
2 Copyright (c) 2010-2020 Roger Light <roger@atchoo.org>
3
4 All rights reserved. This program and the accompanying materials
5 are made available under the terms of the Eclipse Public License v1.0
6 and Eclipse Distribution License v1.0 which accompany this distribution.
7
8 The Eclipse Public License is available at
9 http://www.eclipse.org/legal/epl-v10.html
10 and the Eclipse Distribution License is available at
11 http://www.eclipse.org/org/documents/edl-v10.php.
12
13 Contributors:
14 Roger Light - initial implementation and documentation.
15 */
16
17 #include "config.h"
18
19 #include <errno.h>
20 #ifndef WIN32
21 #include <sys/select.h>
22 #include <time.h>
23 #endif
24
25 #include "mosquitto.h"
26 #include "mosquitto_internal.h"
27 #include "net_mosq.h"
28 #include "packet_mosq.h"
29 #include "socks_mosq.h"
30 #include "tls_mosq.h"
31 #include "util_mosq.h"
32
33 #if !defined(WIN32) && !defined(__SYMBIAN32__)
34 #define HAVE_PSELECT
35 #endif
36
mosquitto_loop(struct mosquitto * mosq,int timeout,int max_packets)37 int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
38 {
39 #ifdef HAVE_PSELECT
40 struct timespec local_timeout;
41 #else
42 struct timeval local_timeout;
43 #endif
44 fd_set readfds, writefds;
45 int fdcount;
46 int rc;
47 char pairbuf;
48 int maxfd = 0;
49 time_t now;
50 #ifdef WITH_SRV
51 int state;
52 #endif
53
54 if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
55 #ifndef WIN32
56 if(mosq->sock >= FD_SETSIZE || mosq->sockpairR >= FD_SETSIZE){
57 return MOSQ_ERR_INVAL;
58 }
59 #endif
60
61 FD_ZERO(&readfds);
62 FD_ZERO(&writefds);
63 if(mosq->sock != INVALID_SOCKET){
64 maxfd = mosq->sock;
65 FD_SET(mosq->sock, &readfds);
66 pthread_mutex_lock(&mosq->current_out_packet_mutex);
67 pthread_mutex_lock(&mosq->out_packet_mutex);
68 if(mosq->out_packet || mosq->current_out_packet){
69 FD_SET(mosq->sock, &writefds);
70 }
71 #ifdef WITH_TLS
72 if(mosq->ssl){
73 if(mosq->want_write){
74 FD_SET(mosq->sock, &writefds);
75 }else if(mosq->want_connect){
76 /* Remove possible FD_SET from above, we don't want to check
77 * for writing if we are still connecting, unless want_write is
78 * definitely set. The presence of outgoing packets does not
79 * matter yet. */
80 FD_CLR(mosq->sock, &writefds);
81 }
82 }
83 #endif
84 pthread_mutex_unlock(&mosq->out_packet_mutex);
85 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
86 }else{
87 #ifdef WITH_SRV
88 if(mosq->achan){
89 state = mosquitto__get_state(mosq);
90 if(state == mosq_cs_connect_srv){
91 rc = ares_fds(mosq->achan, &readfds, &writefds);
92 if(rc > maxfd){
93 maxfd = rc;
94 }
95 }else{
96 return MOSQ_ERR_NO_CONN;
97 }
98 }
99 #else
100 return MOSQ_ERR_NO_CONN;
101 #endif
102 }
103 if(mosq->sockpairR != INVALID_SOCKET){
104 /* sockpairR is used to break out of select() before the timeout, on a
105 * call to publish() etc. */
106 FD_SET(mosq->sockpairR, &readfds);
107 if(mosq->sockpairR > maxfd){
108 maxfd = mosq->sockpairR;
109 }
110 }
111
112 if(timeout < 0){
113 timeout = 1000;
114 }
115
116 now = mosquitto_time();
117 if(mosq->next_msg_out && now + timeout/1000 > mosq->next_msg_out){
118 timeout = (mosq->next_msg_out - now)*1000;
119 }
120
121 if(timeout < 0){
122 /* There has been a delay somewhere which means we should have already
123 * sent a message. */
124 timeout = 0;
125 }
126
127 local_timeout.tv_sec = timeout/1000;
128 #ifdef HAVE_PSELECT
129 local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
130 #else
131 local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
132 #endif
133
134 #ifdef HAVE_PSELECT
135 fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
136 #else
137 fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
138 #endif
139 if(fdcount == -1){
140 #ifdef WIN32
141 errno = WSAGetLastError();
142 #endif
143 if(errno == EINTR){
144 return MOSQ_ERR_SUCCESS;
145 }else{
146 return MOSQ_ERR_ERRNO;
147 }
148 }else{
149 if(mosq->sock != INVALID_SOCKET){
150 if(FD_ISSET(mosq->sock, &readfds)){
151 rc = mosquitto_loop_read(mosq, max_packets);
152 if(rc || mosq->sock == INVALID_SOCKET){
153 return rc;
154 }
155 }
156 if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
157 #ifndef WIN32
158 if(read(mosq->sockpairR, &pairbuf, 1) == 0){
159 }
160 #else
161 recv(mosq->sockpairR, &pairbuf, 1, 0);
162 #endif
163 /* Fake write possible, to stimulate output write even though
164 * we didn't ask for it, because at that point the publish or
165 * other command wasn't present. */
166 if(mosq->sock != INVALID_SOCKET)
167 FD_SET(mosq->sock, &writefds);
168 }
169 if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){
170 #ifdef WITH_TLS
171 if(mosq->want_connect){
172 rc = net__socket_connect_tls(mosq);
173 if(rc) return rc;
174 }else
175 #endif
176 {
177 rc = mosquitto_loop_write(mosq, max_packets);
178 if(rc || mosq->sock == INVALID_SOCKET){
179 return rc;
180 }
181 }
182 }
183 }
184 #ifdef WITH_SRV
185 if(mosq->achan){
186 ares_process(mosq->achan, &readfds, &writefds);
187 }
188 #endif
189 }
190 return mosquitto_loop_misc(mosq);
191 }
192
193
mosquitto_loop_forever(struct mosquitto * mosq,int timeout,int max_packets)194 int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
195 {
196 int run = 1;
197 int rc;
198 unsigned long reconnect_delay;
199 #ifndef WIN32
200 struct timespec req, rem;
201 #endif
202 int state;
203
204 if(!mosq) return MOSQ_ERR_INVAL;
205
206 mosq->reconnects = 0;
207
208 while(run){
209 do{
210 rc = mosquitto_loop(mosq, timeout, max_packets);
211 }while(run && rc == MOSQ_ERR_SUCCESS);
212 /* Quit after fatal errors. */
213 switch(rc){
214 case MOSQ_ERR_NOMEM:
215 case MOSQ_ERR_PROTOCOL:
216 case MOSQ_ERR_INVAL:
217 case MOSQ_ERR_NOT_FOUND:
218 case MOSQ_ERR_TLS:
219 case MOSQ_ERR_PAYLOAD_SIZE:
220 case MOSQ_ERR_NOT_SUPPORTED:
221 case MOSQ_ERR_AUTH:
222 case MOSQ_ERR_ACL_DENIED:
223 case MOSQ_ERR_UNKNOWN:
224 case MOSQ_ERR_EAI:
225 case MOSQ_ERR_PROXY:
226 return rc;
227 case MOSQ_ERR_ERRNO:
228 break;
229 }
230 if(errno == EPROTO){
231 return rc;
232 }
233 do{
234 rc = MOSQ_ERR_SUCCESS;
235 state = mosquitto__get_state(mosq);
236 if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
237 run = 0;
238 }else{
239 if(mosq->reconnect_delay_max > mosq->reconnect_delay){
240 if(mosq->reconnect_exponential_backoff){
241 reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1)*(mosq->reconnects+1);
242 }else{
243 reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1);
244 }
245 }else{
246 reconnect_delay = mosq->reconnect_delay;
247 }
248
249 if(reconnect_delay > mosq->reconnect_delay_max){
250 reconnect_delay = mosq->reconnect_delay_max;
251 }else{
252 mosq->reconnects++;
253 }
254
255 #ifdef WIN32
256 Sleep(reconnect_delay*1000);
257 #else
258 req.tv_sec = reconnect_delay;
259 req.tv_nsec = 0;
260 while(nanosleep(&req, &rem) == -1 && errno == EINTR){
261 req = rem;
262 }
263 #endif
264
265 state = mosquitto__get_state(mosq);
266 if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
267 run = 0;
268 }else{
269 rc = mosquitto_reconnect(mosq);
270 }
271 }
272 }while(run && rc != MOSQ_ERR_SUCCESS);
273 }
274 return rc;
275 }
276
277
mosquitto_loop_misc(struct mosquitto * mosq)278 int mosquitto_loop_misc(struct mosquitto *mosq)
279 {
280 if(!mosq) return MOSQ_ERR_INVAL;
281 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
282
283 return mosquitto__check_keepalive(mosq);
284 }
285
286
mosquitto__loop_rc_handle(struct mosquitto * mosq,int rc)287 static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
288 {
289 int state;
290
291 if(rc){
292 net__socket_close(mosq);
293 state = mosquitto__get_state(mosq);
294 if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
295 rc = MOSQ_ERR_SUCCESS;
296 }
297 pthread_mutex_lock(&mosq->callback_mutex);
298 if(mosq->on_disconnect){
299 mosq->in_callback = true;
300 mosq->on_disconnect(mosq, mosq->userdata, rc);
301 mosq->in_callback = false;
302 }
303 if(mosq->on_disconnect_v5){
304 mosq->in_callback = true;
305 mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
306 mosq->in_callback = false;
307 }
308 pthread_mutex_unlock(&mosq->callback_mutex);
309 }
310 return rc;
311 }
312
313
mosquitto_loop_read(struct mosquitto * mosq,int max_packets)314 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
315 {
316 int rc;
317 int i;
318 if(max_packets < 1) return MOSQ_ERR_INVAL;
319
320 #ifdef WITH_TLS
321 if(mosq->want_connect){
322 return net__socket_connect_tls(mosq);
323 }
324 #endif
325
326 pthread_mutex_lock(&mosq->msgs_out.mutex);
327 max_packets = mosq->msgs_out.queue_len;
328 pthread_mutex_unlock(&mosq->msgs_out.mutex);
329
330 pthread_mutex_lock(&mosq->msgs_in.mutex);
331 max_packets += mosq->msgs_in.queue_len;
332 pthread_mutex_unlock(&mosq->msgs_in.mutex);
333
334 if(max_packets < 1) max_packets = 1;
335 /* Queue len here tells us how many messages are awaiting processing and
336 * have QoS > 0. We should try to deal with that many in this loop in order
337 * to keep up. */
338 for(i=0; i<max_packets || SSL_DATA_PENDING(mosq); i++){
339 #ifdef WITH_SOCKS
340 if(mosq->socks5_host){
341 rc = socks5__read(mosq);
342 }else
343 #endif
344 {
345 rc = packet__read(mosq);
346 }
347 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
348 return mosquitto__loop_rc_handle(mosq, rc);
349 }
350 }
351 return rc;
352 }
353
354
mosquitto_loop_write(struct mosquitto * mosq,int max_packets)355 int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
356 {
357 int rc;
358 int i;
359 if(max_packets < 1) return MOSQ_ERR_INVAL;
360
361 pthread_mutex_lock(&mosq->msgs_out.mutex);
362 max_packets = mosq->msgs_out.queue_len;
363 pthread_mutex_unlock(&mosq->msgs_out.mutex);
364
365 pthread_mutex_lock(&mosq->msgs_in.mutex);
366 max_packets += mosq->msgs_in.queue_len;
367 pthread_mutex_unlock(&mosq->msgs_in.mutex);
368
369 if(max_packets < 1) max_packets = 1;
370 /* Queue len here tells us how many messages are awaiting processing and
371 * have QoS > 0. We should try to deal with that many in this loop in order
372 * to keep up. */
373 for(i=0; i<max_packets; i++){
374 rc = packet__write(mosq);
375 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
376 return mosquitto__loop_rc_handle(mosq, rc);
377 }
378 }
379 return rc;
380 }
381
382