1 /* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "net.h"
5 #include "str.h"
6 #include "str-sanitize.h"
7 #include "hash.h"
8 #include "array.h"
9 #include "bsearch-insert-pos.h"
10 #include "llist.h"
11 #include "ioloop.h"
12 #include "istream.h"
13 #include "ostream.h"
14 #include "time-util.h"
15 #include "dns-lookup.h"
16 #include "http-response-parser.h"
17 
18 #include "http-client-private.h"
19 
20 #define TIMEOUT_CMP_MARGIN_USECS 2000
21 
22 static void
23 http_client_queue_fail_full(struct http_client_queue *queue,
24 			    unsigned int status, const char *error, bool all);
25 static void
26 http_client_queue_set_delay_timer(struct http_client_queue *queue,
27 				  struct timeval time);
28 static void
29 http_client_queue_set_request_timer(struct http_client_queue *queue,
30 				    const struct timeval *time);
31 
32 /*
33  * Queue object
34  */
35 
36 static struct http_client_queue *
http_client_queue_find(struct http_client_host * host,const struct http_client_peer_addr * addr)37 http_client_queue_find(struct http_client_host *host,
38 		       const struct http_client_peer_addr *addr)
39 {
40 	struct http_client_queue *queue;
41 
42 	array_foreach_elem(&host->queues, queue) {
43 		if (http_client_peer_addr_cmp(&queue->addr, addr) == 0)
44 			return queue;
45 	}
46 
47 	return NULL;
48 }
49 
50 static struct http_client_queue *
http_client_queue_create(struct http_client_host * host,const struct http_client_peer_addr * addr)51 http_client_queue_create(struct http_client_host *host,
52 			 const struct http_client_peer_addr *addr)
53 {
54 	const char *hostname = host->shared->name;
55 	struct http_client_queue *queue;
56 
57 	queue = i_new(struct http_client_queue, 1);
58 	queue->client = host->client;
59 	queue->host = host;
60 	queue->addr = *addr;
61 
62 	switch (addr->type) {
63 	case HTTP_CLIENT_PEER_ADDR_RAW:
64 		queue->name = i_strdup_printf("raw://%s:%u",
65 					      hostname, addr->a.tcp.port);
66 		queue->addr.a.tcp.https_name = NULL;
67 		break;
68 	case HTTP_CLIENT_PEER_ADDR_HTTPS_TUNNEL:
69 	case HTTP_CLIENT_PEER_ADDR_HTTPS:
70 		queue->name = i_strdup_printf("https://%s:%u",
71 					      hostname, addr->a.tcp.port);
72 		queue->addr_name = i_strdup(addr->a.tcp.https_name);
73 		queue->addr.a.tcp.https_name = queue->addr_name;
74 		break;
75 	case HTTP_CLIENT_PEER_ADDR_HTTP:
76 		queue->name = i_strdup_printf("http://%s:%u",
77 					      hostname, addr->a.tcp.port);
78 		queue->addr.a.tcp.https_name = NULL;
79 		break;
80 	case HTTP_CLIENT_PEER_ADDR_UNIX:
81 		queue->name = i_strdup_printf("unix:%s", addr->a.un.path);
82 		queue->addr_name = i_strdup(addr->a.un.path);
83 		queue->addr.a.un.path = queue->addr_name;
84 		break;
85 	default:
86 		i_unreached();
87 	}
88 
89 	queue->event = event_create(queue->client->event);
90 	event_set_append_log_prefix(queue->event,
91 		t_strdup_printf("queue %s: ", str_sanitize(queue->name, 256)));
92 	queue->ips_connect_idx = 0;
93 	i_array_init(&queue->pending_peers, 8);
94 	i_array_init(&queue->requests, 16);
95 	i_array_init(&queue->queued_requests, 16);
96 	i_array_init(&queue->queued_urgent_requests, 16);
97 	i_array_init(&queue->delayed_requests, 4);
98 	array_push_back(&host->queues, &queue);
99 
100 	return queue;
101 }
102 
103 struct http_client_queue *
http_client_queue_get(struct http_client_host * host,const struct http_client_peer_addr * addr)104 http_client_queue_get(struct http_client_host *host,
105 		      const struct http_client_peer_addr *addr)
106 {
107 	struct http_client_queue *queue;
108 
109 	queue = http_client_queue_find(host, addr);
110 	if (queue == NULL)
111 		queue = http_client_queue_create(host, addr);
112 
113 	return queue;
114 }
115 
http_client_queue_free(struct http_client_queue * queue)116 void http_client_queue_free(struct http_client_queue *queue)
117 {
118 	struct http_client_peer *peer;
119 	ARRAY_TYPE(http_client_peer) peers;
120 
121 	e_debug(queue->event, "Destroy");
122 
123 	/* Currently only called when peer is freed, so there is no need to
124 	   unlink from the peer */
125 
126 	/* Unlink all peers */
127 	if (queue->cur_peer != NULL) {
128 		struct http_client_peer *peer = queue->cur_peer;
129 
130 		queue->cur_peer = NULL;
131 		http_client_peer_unlink_queue(peer, queue);
132 	}
133 	t_array_init(&peers, array_count(&queue->pending_peers));
134 	array_copy(&peers.arr, 0, &queue->pending_peers.arr, 0,
135 		   array_count(&queue->pending_peers));
136 	array_foreach_elem(&peers, peer)
137 		http_client_peer_unlink_queue(peer, queue);
138 	array_free(&queue->pending_peers);
139 
140 	/* Abort all requests */
141 	http_client_queue_fail_full(queue, HTTP_CLIENT_REQUEST_ERROR_ABORTED,
142 				    "Aborted", TRUE);
143 	array_free(&queue->requests);
144 	array_free(&queue->queued_requests);
145 	array_free(&queue->queued_urgent_requests);
146 	array_free(&queue->delayed_requests);
147 
148 	/* Cancel timeouts */
149 	timeout_remove(&queue->to_connect);
150 	timeout_remove(&queue->to_delayed);
151 
152 	/* Free */
153 	event_unref(&queue->event);
154 	i_free(queue->addr_name);
155 	i_free(queue->name);
156 	i_free(queue);
157 }
158 
159 /*
160  * Error handling
161  */
162 
163 static void
http_client_queue_fail_full(struct http_client_queue * queue,unsigned int status,const char * error,bool all)164 http_client_queue_fail_full(struct http_client_queue *queue,
165 			    unsigned int status, const char *error, bool all)
166 {
167 	ARRAY_TYPE(http_client_request) *req_arr, treqs;
168 	struct http_client_request *req;
169 	unsigned int retained = 0;
170 
171 	/* Abort requests */
172 	req_arr = &queue->requests;
173 	t_array_init(&treqs, array_count(req_arr));
174 	array_copy(&treqs.arr, 0, &req_arr->arr, 0, array_count(req_arr));
175 	array_foreach_elem(&treqs, req) {
176 		i_assert(req->state >= HTTP_REQUEST_STATE_QUEUED);
177 		if (!all &&
178 			req->state != HTTP_REQUEST_STATE_QUEUED)
179 			retained++;
180 		else
181 			http_client_request_error(&req, status, error);
182 	}
183 
184 	/* All queues should be empty now... unless new requests were submitted
185 	   from the callback. this invariant captures it all: */
186 	i_assert((retained +
187 		  array_count(&queue->delayed_requests) +
188 		  array_count(&queue->queued_requests) +
189 		  array_count(&queue->queued_urgent_requests)) ==
190 		 array_count(&queue->requests));
191 }
192 
193 static void
http_client_queue_fail(struct http_client_queue * queue,unsigned int status,const char * error)194 http_client_queue_fail(struct http_client_queue *queue,
195 		       unsigned int status, const char *error)
196 {
197 	http_client_queue_fail_full(queue, status, error, FALSE);
198 }
199 
200 /*
201  * Connection management
202  */
203 
204 static bool
http_client_queue_is_last_connect_ip(struct http_client_queue * queue)205 http_client_queue_is_last_connect_ip(struct http_client_queue *queue)
206 {
207 	const struct http_client_settings *set =
208 		&queue->client->set;
209 	struct http_client_host *host = queue->host;
210 	unsigned int ips_count = http_client_host_get_ips_count(host);
211 
212 	i_assert(queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX);
213 	i_assert(queue->ips_connect_idx < ips_count);
214 	i_assert(queue->ips_connect_start_idx < ips_count);
215 
216 	/* If a maximum connect attempts > 1 is set, enforce it directly */
217 	if (set->max_connect_attempts > 1 &&
218 		queue->connect_attempts >= set->max_connect_attempts)
219 		return TRUE;
220 
221 	/* Otherwise, we'll always go through all the IPs. we don't necessarily
222 	   start connecting from the first IP, so we'll need to treat the IPs as
223 	   a ring buffer where we automatically wrap back to the first IP
224 	   when necessary. */
225 	return ((queue->ips_connect_idx + 1) % ips_count ==
226 		queue->ips_connect_start_idx);
227 }
228 
229 static void
http_client_queue_recover_from_lookup(struct http_client_queue * queue)230 http_client_queue_recover_from_lookup(struct http_client_queue *queue)
231 {
232 	struct http_client_host *host = queue->host;
233 	unsigned int ip_idx;
234 
235 	i_assert(queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX);
236 
237 	if (queue->cur_peer == NULL) {
238 		queue->ips_connect_idx = queue->ips_connect_start_idx = 0;
239 		return;
240 	}
241 
242 	if (http_client_host_get_ip_idx(
243 		host, &queue->cur_peer->shared->addr.a.tcp.ip, &ip_idx)) {
244 		/* Continue with current peer */
245 		queue->ips_connect_idx = queue->ips_connect_start_idx = ip_idx;
246 	} else {
247 		/* Reset connect attempts */
248 		queue->ips_connect_idx = queue->ips_connect_start_idx = 0;
249 	}
250 }
251 
252 static void
http_client_queue_soft_connect_timeout(struct http_client_queue * queue)253 http_client_queue_soft_connect_timeout(struct http_client_queue *queue)
254 {
255 	struct http_client_host *host = queue->host;
256 	const struct http_client_peer_addr *addr = &queue->addr;
257 	unsigned int ips_count = http_client_host_get_ips_count(host);
258 	const char *https_name;
259 
260 	i_assert(queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX);
261 
262 	timeout_remove(&queue->to_connect);
263 
264 	if (http_client_queue_is_last_connect_ip(queue)) {
265 		/* No more IPs to try */
266 		return;
267 	}
268 
269 	/* If our our previous connection attempt takes longer than the
270 	   soft_connect_timeout, we start a connection attempt to the next IP in
271 	   parallel */
272 	https_name = http_client_peer_addr_get_https_name(addr);
273 	e_debug(queue->event, "Connection to %s%s is taking a long time; "
274 		"starting parallel connection attempt to next IP",
275 		http_client_peer_addr2str(addr),
276 		(https_name == NULL ?
277 		 "" : t_strdup_printf(" (SSL=%s)", https_name)));
278 
279 	/* Next IP */
280 	queue->ips_connect_idx = (queue->ips_connect_idx + 1) % ips_count;
281 
282 	/* Setup connection to new peer (can start new soft timeout) */
283 	http_client_queue_connection_setup(queue);
284 }
285 
286 static struct http_client_peer *
http_client_queue_connection_attempt(struct http_client_queue * queue)287 http_client_queue_connection_attempt(struct http_client_queue *queue)
288 {
289 	struct http_client *client = queue->client;
290 	struct http_client_host *host = queue->host;
291 	struct http_client_peer *peer;
292 	struct http_client_peer_addr *addr = &queue->addr;
293 	unsigned int num_requests =
294 		array_count(&queue->queued_requests) +
295 		array_count(&queue->queued_urgent_requests);
296 	const char *ssl = "";
297 	int ret;
298 
299 	if (num_requests == 0)
300 		return NULL;
301 
302 	/* Check whether host IPs are still up-to-date */
303 	ret = http_client_host_refresh(host);
304 	if (ret < 0) {
305 		/* Performing asynchronous lookup */
306 		timeout_remove(&queue->to_connect);
307 		return NULL;
308 	}
309 	if (ret > 0) {
310 		/* New lookup performed */
311 		http_client_queue_recover_from_lookup(queue);
312 	}
313 
314 	/* Update our peer address */
315 	if (queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
316 		const struct ip_addr *ip =
317 			http_client_host_get_ip(host, queue->ips_connect_idx);
318 
319 		queue->addr.a.tcp.ip = *ip;
320 		ssl = http_client_peer_addr_get_https_name(addr);
321 		ssl = (ssl == NULL ? "" : t_strdup_printf(" (SSL=%s)", ssl));
322 	}
323 
324 	/* Already got a peer? */
325 	peer = NULL;
326 	if (queue->cur_peer != NULL) {
327 		i_assert(array_count(&queue->pending_peers) == 0);
328 
329 		/* Is it still the one we want? */
330 		if (http_client_peer_addr_cmp(
331 			addr, &queue->cur_peer->shared->addr) == 0) {
332 			/* Is it still connected? */
333 			if (http_client_peer_is_connected(queue->cur_peer)) {
334 				/* Yes */
335 				e_debug(queue->event,
336 					"Using existing connection to %s%s "
337 					"(%u requests pending)",
338 					http_client_peer_addr2str(addr),
339 					ssl, num_requests);
340 
341 				/* Handle requests; */
342 				http_client_peer_trigger_request_handler(
343 					queue->cur_peer);
344 				return queue->cur_peer;
345 			}
346 			/* No */
347 			peer = queue->cur_peer;
348 		} else {
349 			/* Peer is not relevant to this queue anymore */
350 			http_client_peer_unlink_queue(queue->cur_peer, queue);
351 		}
352 
353 		queue->cur_peer = NULL;
354 	}
355 
356 	if (peer == NULL)
357 		peer = http_client_peer_get(queue->client, addr);
358 
359 	e_debug(queue->event,
360 		"Setting up connection to %s%s (%u requests pending)",
361 		http_client_peer_addr2str(addr), ssl, num_requests);
362 
363 	/* Create provisional link between queue and peer */
364 	http_client_peer_link_queue(peer, queue);
365 
366 	/* Handle requests; creates new connections when needed/possible */
367 	http_client_peer_trigger_request_handler(peer);
368 
369 	if (http_client_peer_is_connected(peer)) {
370 		/* Drop any pending peers */
371 		if (array_count(&queue->pending_peers) > 0) {
372 			struct http_client_peer *pending_peer;
373 
374 			array_foreach_elem(&queue->pending_peers, pending_peer) {
375 				if (pending_peer == peer) {
376 					/* This can happen with shared clients
377 					 */
378 					continue;
379 				}
380 				i_assert(http_client_peer_addr_cmp(
381 					&pending_peer->shared->addr, addr) != 0);
382 				http_client_peer_unlink_queue(pending_peer, queue);
383 			}
384 			array_clear(&queue->pending_peers);
385 		}
386 		queue->cur_peer = peer;
387 
388 		http_client_peer_trigger_request_handler(queue->cur_peer);
389 
390 	} else {
391 		struct http_client_peer *pending_peer;
392 		unsigned int msecs;
393 		bool new_peer = TRUE;
394 
395 		/* Not already connected, wait for connections */
396 
397 		/* We may be waiting for this peer already */
398 		array_foreach_elem(&queue->pending_peers, pending_peer) {
399 			if (http_client_peer_addr_cmp(
400 				&pending_peer->shared->addr, addr) == 0) {
401 				i_assert(pending_peer == peer);
402 				new_peer = FALSE;
403 				break;
404 			}
405 		}
406 		if (new_peer) {
407 			e_debug(queue->event, "Started new connection to %s%s",
408 				http_client_peer_addr2str(addr), ssl);
409 
410 			array_push_back(&queue->pending_peers, &peer);
411 			if (queue->connect_attempts++ == 0)
412 				queue->first_connect_time = ioloop_timeval;
413 		}
414 
415 		/* Start soft connect time-out
416 		   (but only if we have another IP left) */
417 		if (queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
418 			msecs = client->set.soft_connect_timeout_msecs;
419 			if (!http_client_queue_is_last_connect_ip(queue) &&
420 			    msecs > 0 && queue->to_connect == NULL) {
421 				queue->to_connect = timeout_add_to(
422 					client->ioloop, msecs,
423 					http_client_queue_soft_connect_timeout,
424 					queue);
425 			}
426 		}
427 	}
428 
429 	return peer;
430 }
431 
http_client_queue_connection_setup(struct http_client_queue * queue)432 void http_client_queue_connection_setup(struct http_client_queue *queue)
433 {
434 	(void)http_client_queue_connection_attempt(queue);
435 }
436 
437 unsigned int
http_client_queue_host_lookup_done(struct http_client_queue * queue)438 http_client_queue_host_lookup_done(struct http_client_queue *queue)
439 {
440 	unsigned int reqs_pending =
441 		http_client_queue_requests_pending(queue, NULL);
442 
443 	http_client_queue_recover_from_lookup(queue);
444 	if (reqs_pending > 0)
445 		http_client_queue_connection_setup(queue);
446 	return reqs_pending;
447 }
448 
http_client_queue_host_lookup_failure(struct http_client_queue * queue,const char * error)449 void http_client_queue_host_lookup_failure(
450 	struct http_client_queue *queue, const char *error)
451 {
452 	http_client_queue_fail(
453 		queue, HTTP_CLIENT_REQUEST_ERROR_HOST_LOOKUP_FAILED, error);
454 }
455 
http_client_queue_connection_success(struct http_client_queue * queue,struct http_client_peer * peer)456 void http_client_queue_connection_success(struct http_client_queue *queue,
457 					  struct http_client_peer *peer)
458 {
459 	const struct http_client_peer_addr *addr = &peer->shared->addr;
460 	struct http_client_host *host = queue->host;
461 
462 	if (http_client_host_ready(host) &&
463 	    queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
464 		/* We achieved at least one connection the the addr->ip */
465 		if (!http_client_host_get_ip_idx(
466 			host, &addr->a.tcp.ip, &queue->ips_connect_start_idx)) {
467 			/* list of IPs changed during connect */
468 			queue->ips_connect_start_idx = 0;
469 		}
470 	}
471 
472 	/* Reset attempt counter */
473 	queue->connect_attempts = 0;
474 
475 	/* stop soft connect time-out */
476 	timeout_remove(&queue->to_connect);
477 
478 	/* Drop all other attempts to the hport. note that we get here whenever
479 	   a connection is successfully created, so pending_peers array
480 	   may be empty. */
481 	if (array_count(&queue->pending_peers) > 0) {
482 		struct http_client_peer *pending_peer;
483 
484 		array_foreach_elem(&queue->pending_peers, pending_peer) {
485 			if (pending_peer == peer) {
486 				/* Don't drop any connections to the
487 				   successfully connected peer, even if some of
488 				   the connections are pending. they may be
489 				   intended for urgent requests. */
490 				i_assert(queue->cur_peer == NULL);
491 				queue->cur_peer = pending_peer;
492 				continue;
493 			}
494 			/* Unlink this queue from the peer; if this was the
495 			   last/only queue, the peer will be freed, closing all
496 			   connections.
497 			 */
498 			http_client_peer_unlink_queue(pending_peer, queue);
499 		}
500 
501 		array_clear(&queue->pending_peers);
502 		i_assert(queue->cur_peer != NULL);
503 	}
504 }
505 
http_client_queue_connection_failure(struct http_client_queue * queue,struct http_client_peer * peer,const char * reason)506 void http_client_queue_connection_failure(struct http_client_queue *queue,
507 					  struct http_client_peer *peer,
508 					  const char *reason)
509 {
510 	const struct http_client_settings *set =
511 		&queue->client->set;
512 	const struct http_client_peer_addr *addr = &peer->shared->addr;
513 	const char *https_name = http_client_peer_addr_get_https_name(addr);
514 	struct http_client_host *host = queue->host;
515 	unsigned int ips_count = http_client_host_get_ips_count(host);
516 	struct http_client_peer *const *peer_idx;
517 	unsigned int num_requests =
518 		array_count(&queue->queued_requests) +
519 		array_count(&queue->queued_urgent_requests);
520 
521 	e_debug(queue->event,
522 		"Failed to set up connection to %s%s: %s "
523 		"(%u peers pending, %u requests pending)",
524 		http_client_peer_addr2str(addr),
525 		(https_name == NULL ?
526 		 "" : t_strdup_printf(" (SSL=%s)", https_name)),
527 		reason, array_count(&queue->pending_peers), num_requests);
528 
529 	http_client_peer_unlink_queue(peer, queue);
530 
531 	if (array_count(&queue->pending_peers) == 0) {
532 		i_assert(queue->cur_peer == NULL || queue->cur_peer == peer);
533 		queue->cur_peer = NULL;
534 	} else {
535 		bool found = FALSE;
536 
537 		i_assert(queue->cur_peer == NULL);
538 
539 		/* We're still doing the initial connections to this hport. if
540 		   we're also doing parallel connections with soft timeouts
541 		   (pending_peer_count>1), wait for them to finish first. */
542 		array_foreach(&queue->pending_peers, peer_idx) {
543 			if (*peer_idx == peer) {
544 				array_delete(&queue->pending_peers,
545 					     array_foreach_idx(
546 						&queue->pending_peers,
547 						peer_idx), 1);
548 				found = TRUE;
549 				break;
550 			}
551 		}
552 		i_assert(found);
553 		if (array_count(&queue->pending_peers) > 0) {
554 			e_debug(queue->event,
555 				"Waiting for remaining pending peers.");
556 			return;
557 		}
558 
559 		/* One of the connections failed. if we're not using soft
560 		   timeouts, we need to try to connect to the next IP. if we are
561 		   using soft timeouts, we've already tried all of the IPs by
562 		   now. */
563 		timeout_remove(&queue->to_connect);
564 
565 		if (queue->addr.type == HTTP_CLIENT_PEER_ADDR_UNIX) {
566 			http_client_queue_fail(
567 				queue, HTTP_CLIENT_REQUEST_ERROR_CONNECT_FAILED,
568 				reason);
569 			return;
570 		}
571 	}
572 
573 	if (http_client_queue_is_last_connect_ip(queue)) {
574 		if (array_count(&queue->pending_peers) > 0) {
575 			/* Other connection attempts still pending */
576 			return;
577 		}
578 
579 		/* All IPs failed up until here and we allow no more connect
580 		   attempts, but try the next ones on the next request. */
581 		queue->ips_connect_idx = queue->ips_connect_start_idx =
582 			(queue->ips_connect_idx + 1) % ips_count;
583 
584 		if (set->max_connect_attempts == 0 ||
585 		    queue->connect_attempts >= set->max_connect_attempts) {
586 
587 			e_debug(queue->event,
588 				"Failed to set up any connection; "
589 				"failing all queued requests");
590 			if (queue->connect_attempts > 1) {
591 				unsigned int total_msecs =
592 					timeval_diff_msecs(&ioloop_timeval,
593 							   &queue->first_connect_time);
594 				reason = t_strdup_printf(
595 					"%s (%u attempts in %u.%03u secs)",
596 					reason, queue->connect_attempts,
597 					total_msecs/1000, total_msecs%1000);
598 			}
599 			queue->connect_attempts = 0;
600 			http_client_queue_fail(
601 				queue, HTTP_CLIENT_REQUEST_ERROR_CONNECT_FAILED,
602 				reason);
603 			return;
604 		}
605 	} else {
606 		queue->ips_connect_idx =
607 			(queue->ips_connect_idx + 1) % ips_count;
608 	}
609 
610 	if (http_client_queue_connection_attempt(queue) != peer)
611 		http_client_peer_unlink_queue(peer, queue);
612 	return;
613 }
614 
http_client_queue_peer_disconnected(struct http_client_queue * queue,struct http_client_peer * peer)615 void http_client_queue_peer_disconnected(struct http_client_queue *queue,
616 					 struct http_client_peer *peer)
617 {
618 	struct http_client_peer *const *peer_idx;
619 
620 	if (queue->cur_peer == peer) {
621 		queue->cur_peer = NULL;
622 		return;
623 	}
624 
625 	array_foreach(&queue->pending_peers, peer_idx) {
626 		if (*peer_idx == peer) {
627 			array_delete(&queue->pending_peers,
628 				     array_foreach_idx(&queue->pending_peers,
629 						       peer_idx), 1);
630 			break;
631 		}
632 	}
633 }
634 
635 /*
636  * Main request queue
637  */
638 
http_client_queue_drop_request(struct http_client_queue * queue,struct http_client_request * req)639 void http_client_queue_drop_request(struct http_client_queue *queue,
640 				    struct http_client_request *req)
641 {
642 	struct http_client_request **reqs;
643 	unsigned int count, i;
644 
645 	e_debug(queue->event,
646 		"Dropping request %s", http_client_request_label(req));
647 
648 	/* Drop from queue */
649 	if (req->urgent) {
650 		reqs = array_get_modifiable(&queue->queued_urgent_requests,
651 					    &count);
652 		for (i = 0; i < count; i++) {
653 			if (reqs[i] == req) {
654 				array_delete(&queue->queued_urgent_requests,
655 					     i, 1);
656 				break;
657 			}
658 		}
659 	} else {
660 		reqs = array_get_modifiable(&queue->queued_requests, &count);
661 		for (i = 0; i < count; i++) {
662 			if (reqs[i] == req) {
663 				array_delete(&queue->queued_requests, i, 1);
664 				break;
665 			}
666 		}
667 	}
668 
669 	/* Drop from delay queue */
670 	if (req->release_time.tv_sec > 0) {
671 		reqs = array_get_modifiable(&queue->delayed_requests, &count);
672 		for (i = 0; i < count; i++) {
673 			if (reqs[i] == req)
674 				break;
675 		}
676 		if (i < count) {
677 			if (i == 0) {
678 				if (queue->to_delayed != NULL) {
679 					timeout_remove(&queue->to_delayed);
680 					if (count > 1) {
681 						i_assert(reqs[1]->release_time.tv_sec > 0);
682 						http_client_queue_set_delay_timer(
683 							queue, reqs[1]->release_time);
684 					}
685 				}
686 			}
687 			array_delete(&queue->delayed_requests, i, 1);
688 		}
689 	}
690 
691 	/* Drop from main request list */
692 	reqs = array_get_modifiable(&queue->requests, &count);
693 	for (i = 0; i < count; i++) {
694 		if (reqs[i] == req)
695 			break;
696 	}
697 	i_assert(i < count);
698 
699 	if (i == 0) {
700 		if (queue->to_request != NULL) {
701 			timeout_remove(&queue->to_request);
702 			if (count > 1 && reqs[1]->timeout_time.tv_sec > 0) {
703 				http_client_queue_set_request_timer(queue,
704 					&reqs[1]->timeout_time);
705 			}
706 		}
707 	}
708 	req->queue = NULL;
709 	array_delete(&queue->requests, i, 1);
710 
711 	if (array_count(&queue->requests) == 0)
712 		http_client_host_check_idle(queue->host);
713 	return;
714 }
715 
http_client_queue_request_timeout(struct http_client_queue * queue)716 static void http_client_queue_request_timeout(struct http_client_queue *queue)
717 {
718 	struct http_client_request *const *reqs;
719 	ARRAY_TYPE(http_client_request) failed_requests;
720 	struct timeval new_to = { 0, 0 };
721 	string_t *str;
722 	size_t prefix_size;
723 	unsigned int count, i;
724 
725 	e_debug(queue->event, "Timeout (now: %s.%03lu)",
726 		t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
727 		((unsigned long)ioloop_timeval.tv_usec) / 1000);
728 
729 	timeout_remove(&queue->to_request);
730 
731 	/* Collect failed requests */
732 	reqs = array_get(&queue->requests, &count);
733 	i_assert(count > 0);
734 	t_array_init(&failed_requests, count);
735 	for (i = 0; i < count; i++) {
736 		if (reqs[i]->timeout_time.tv_sec > 0 &&
737 		    timeval_cmp_margin(&reqs[i]->timeout_time,
738 				       &ioloop_timeval,
739 				       TIMEOUT_CMP_MARGIN_USECS) > 0) {
740 			break;
741 		}
742 		array_push_back(&failed_requests, &reqs[i]);
743 	}
744 
745 	/* Update timeout */
746 	if (i < count)
747 		new_to = reqs[i]->timeout_time;
748 
749 	str = t_str_new(64);
750 	str_append(str, "Request ");
751 	prefix_size = str_len(str);
752 
753 	/* Abort all failed request */
754 	reqs = array_get(&failed_requests, &count);
755 	i_assert(count > 0); /* At least one request timed out */
756 	for (i = 0; i < count; i++) {
757 		struct http_client_request *req = reqs[i];
758 
759 		str_truncate(str, prefix_size);
760 		http_client_request_append_stats_text(req, str);
761 
762 		e_debug(queue->event,
763 			"Absolute timeout expired for request %s (%s)",
764 			http_client_request_label(req), str_c(str));
765 		http_client_request_error(
766 			&req, HTTP_CLIENT_REQUEST_ERROR_TIMED_OUT,
767 			t_strdup_printf(
768 				"Absolute request timeout expired (%s)",
769 				str_c(str)));
770 	}
771 
772 	if (new_to.tv_sec > 0) {
773 		e_debug(queue->event, "New timeout");
774 		http_client_queue_set_request_timer(queue, &new_to);
775 	}
776 }
777 
778 static void
http_client_queue_set_request_timer(struct http_client_queue * queue,const struct timeval * time)779 http_client_queue_set_request_timer(struct http_client_queue *queue,
780 				    const struct timeval *time)
781 {
782 	i_assert(time->tv_sec > 0);
783 	timeout_remove(&queue->to_request);
784 
785 	e_debug(queue->event,
786 		"Set request timeout to %s.%03lu (now: %s.%03lu)",
787 		t_strflocaltime("%Y-%m-%d %H:%M:%S", time->tv_sec),
788 		((unsigned long)time->tv_usec) / 1000,
789 		t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
790 		((unsigned long)ioloop_timeval.tv_usec) / 1000);
791 
792 	/* Set timer */
793 	queue->to_request = timeout_add_absolute_to(
794 		queue->client->ioloop, time,
795 		http_client_queue_request_timeout, queue);
796 }
797 
798 static int
http_client_queue_request_timeout_cmp(struct http_client_request * const * req1,struct http_client_request * const * req2)799 http_client_queue_request_timeout_cmp(struct http_client_request *const *req1,
800 				      struct http_client_request *const *req2)
801 {
802 	int ret;
803 
804 	/* 0 means no timeout */
805 	if ((*req1)->timeout_time.tv_sec == 0) {
806 		if ((*req2)->timeout_time.tv_sec == 0) {
807 			/* sort by age */
808 			ret = timeval_cmp(&(*req1)->submit_time,
809 					  &(*req2)->submit_time);
810 			if (ret != 0)
811 				return ret;
812 		} else {
813 			return 1;
814 		}
815 	} else if ((*req2)->timeout_time.tv_sec == 0) {
816 		return -1;
817 
818 	/* Sort by timeout */
819 	} else if ((ret = timeval_cmp(&(*req1)->timeout_time,
820 				      &(*req2)->timeout_time)) != 0) {
821 		return ret;
822 	}
823 
824 	/* Sort by minimum attempts for fairness */
825 	return ((int)(*req2)->attempts - (int)(*req1)->attempts);
826 }
827 
828 static void
http_client_queue_submit_now(struct http_client_queue * queue,struct http_client_request * req)829 http_client_queue_submit_now(struct http_client_queue *queue,
830 			     struct http_client_request *req)
831 {
832 	ARRAY_TYPE(http_client_request) *req_queue;
833 
834 	req->release_time.tv_sec = 0;
835 	req->release_time.tv_usec = 0;
836 
837 	if (req->urgent)
838 		req_queue = &queue->queued_urgent_requests;
839 	else
840 		req_queue = &queue->queued_requests;
841 
842 	/* Enqueue */
843 	if (req->timeout_time.tv_sec == 0) {
844 		/* No timeout; enqueue at end */
845 		array_push_back(req_queue, &req);
846 	} else if (timeval_diff_msecs(&req->timeout_time,
847 				      &ioloop_timeval) <= 1) {
848 		/* Pretty much already timed out; don't bother */
849 		return;
850 	} else {
851 		unsigned int insert_idx;
852 
853 		/* Keep transmission queue sorted earliest timeout first */
854 		(void)array_bsearch_insert_pos(
855 			req_queue, &req,
856 			http_client_queue_request_timeout_cmp, &insert_idx);
857 		array_insert(req_queue, insert_idx, &req, 1);
858 	}
859 
860 	http_client_queue_connection_setup(queue);
861 }
862 
863 /*
864  * Delayed request queue
865  */
866 
867 static void
http_client_queue_delay_timeout(struct http_client_queue * queue)868 http_client_queue_delay_timeout(struct http_client_queue *queue)
869 {
870 	struct http_client_request *const *reqs;
871 	unsigned int count, i, finished;
872 
873 	timeout_remove(&queue->to_delayed);
874 	io_loop_time_refresh();
875 
876 	finished = 0;
877 	reqs = array_get(&queue->delayed_requests, &count);
878 	for (i = 0; i < count; i++) {
879 		if (timeval_cmp_margin(&reqs[i]->release_time,
880 				       &ioloop_timeval,
881 				       TIMEOUT_CMP_MARGIN_USECS) > 0) {
882 			break;
883 		}
884 
885 		e_debug(queue->event, "Activated delayed request %s%s",
886 			http_client_request_label(reqs[i]),
887 			(reqs[i]->urgent ? " (urgent)" : ""));
888 		http_client_queue_submit_now(queue, reqs[i]);
889 		finished++;
890 	}
891 	if (i < count)
892 		http_client_queue_set_delay_timer(queue, reqs[i]->release_time);
893 	array_delete(&queue->delayed_requests, 0, finished);
894 }
895 
896 static void
http_client_queue_set_delay_timer(struct http_client_queue * queue,struct timeval time)897 http_client_queue_set_delay_timer(struct http_client_queue *queue,
898 				  struct timeval time)
899 {
900 	struct http_client *client = queue->client;
901 	int usecs = timeval_diff_usecs(&time, &ioloop_timeval);
902 	int msecs;
903 
904 	/* Round up to nearest microsecond */
905 	msecs = (usecs + 999) / 1000;
906 
907 	/* Set timer */
908 	timeout_remove(&queue->to_delayed);
909 	queue->to_delayed = timeout_add_to(
910 		client->ioloop, msecs,
911 		http_client_queue_delay_timeout, queue);
912 }
913 
914 static int
http_client_queue_delayed_cmp(struct http_client_request * const * req1,struct http_client_request * const * req2)915 http_client_queue_delayed_cmp(struct http_client_request *const *req1,
916 			      struct http_client_request *const *req2)
917 {
918 	return timeval_cmp(&(*req1)->release_time, &(*req2)->release_time);
919 }
920 
921 /*
922  * Request submission
923  */
924 
http_client_queue_submit_request(struct http_client_queue * queue,struct http_client_request * req)925 void http_client_queue_submit_request(struct http_client_queue *queue,
926 				      struct http_client_request *req)
927 {
928 	unsigned int insert_idx;
929 
930 	if (req->queue != NULL)
931 		http_client_queue_drop_request(req->queue, req);
932 	req->queue = queue;
933 
934 	/* Check delay vs timeout */
935 	if (req->release_time.tv_sec > 0 && req->timeout_time.tv_sec > 0 &&
936 	    timeval_cmp_margin(&req->release_time, &req->timeout_time,
937 			       TIMEOUT_CMP_MARGIN_USECS) >= 0) {
938 		/* Release time is later than absolute timeout */
939 		req->release_time.tv_sec = 0;
940 		req->release_time.tv_usec = 0;
941 
942 		/* Timeout rightaway */
943 		req->timeout_time = ioloop_timeval;
944 
945 		e_debug(queue->event,
946 			"Delayed request %s%s already timed out",
947 			http_client_request_label(req),
948 			(req->urgent ? " (urgent)" : ""));
949 	}
950 
951 	/* Add to main request list */
952 	if (req->timeout_time.tv_sec == 0) {
953 		/* No timeout; just append */
954 		array_push_back(&queue->requests, &req);
955 	} else {
956 		unsigned int insert_idx;
957 
958 		/* Keep main request list sorted earliest timeout first */
959 		(void)array_bsearch_insert_pos(
960 			&queue->requests, &req,
961 			http_client_queue_request_timeout_cmp, &insert_idx);
962 		array_insert(&queue->requests, insert_idx, &req, 1);
963 
964 		/* Now first in queue; update timer */
965 		if (insert_idx == 0) {
966 			http_client_queue_set_request_timer(queue,
967 							    &req->timeout_time);
968 		}
969 	}
970 
971 	/* Handle delay */
972 	if (req->release_time.tv_sec > 0) {
973 		io_loop_time_refresh();
974 
975 		if (timeval_cmp_margin(&req->release_time, &ioloop_timeval,
976 				       TIMEOUT_CMP_MARGIN_USECS) > 0) {
977 			e_debug(queue->event,
978 				"Delayed request %s%s submitted "
979 				"(time remaining: %d msecs)",
980 				http_client_request_label(req),
981 				(req->urgent ? " (urgent)" : ""),
982 				timeval_diff_msecs(&req->release_time,
983 						   &ioloop_timeval));
984 
985 			(void)array_bsearch_insert_pos(
986 				&queue->delayed_requests, &req,
987 				http_client_queue_delayed_cmp, &insert_idx);
988 			array_insert(&queue->delayed_requests, insert_idx,
989 				     &req, 1);
990 			if (insert_idx == 0) {
991 				http_client_queue_set_delay_timer(
992 					queue, req->release_time);
993 			}
994 			return;
995 		}
996 	}
997 
998 	http_client_queue_submit_now(queue, req);
999 }
1000 
1001 /*
1002  * Request retrieval
1003  */
1004 
1005 struct http_client_request *
http_client_queue_claim_request(struct http_client_queue * queue,const struct http_client_peer_addr * addr,bool no_urgent)1006 http_client_queue_claim_request(struct http_client_queue *queue,
1007 				const struct http_client_peer_addr *addr,
1008 				bool no_urgent)
1009 {
1010 	struct http_client_request *const *requests;
1011 	struct http_client_request *req;
1012 	unsigned int i, count;
1013 
1014 	count = 0;
1015 	if (!no_urgent)
1016 	 	requests = array_get(&queue->queued_urgent_requests, &count);
1017 
1018 	if (count == 0)
1019 	 	requests = array_get(&queue->queued_requests, &count);
1020 	if (count == 0)
1021 		return NULL;
1022 	i = 0;
1023 	req = requests[i];
1024 	if (req->urgent)
1025 		array_delete(&queue->queued_urgent_requests, i, 1);
1026 	else
1027 		array_delete(&queue->queued_requests, i, 1);
1028 
1029 	e_debug(queue->event,
1030 		"Connection to peer %s claimed request %s %s",
1031 		http_client_peer_addr2str(addr), http_client_request_label(req),
1032 		(req->urgent ? "(urgent)" : ""));
1033 
1034 	return req;
1035 }
1036 
1037 unsigned int
http_client_queue_requests_pending(struct http_client_queue * queue,unsigned int * num_urgent_r)1038 http_client_queue_requests_pending(struct http_client_queue *queue,
1039 				   unsigned int *num_urgent_r)
1040 {
1041 	unsigned int urg_count = array_count(&queue->queued_urgent_requests);
1042 
1043 	if (num_urgent_r != NULL)
1044 		*num_urgent_r = urg_count;
1045 	return array_count(&queue->queued_requests) + urg_count;
1046 }
1047 
http_client_queue_requests_active(struct http_client_queue * queue)1048 unsigned int http_client_queue_requests_active(struct http_client_queue *queue)
1049 {
1050 	return array_count(&queue->requests);
1051 }
1052 
1053 /*
1054  * Ioloop
1055  */
1056 
http_client_queue_switch_ioloop(struct http_client_queue * queue)1057 void http_client_queue_switch_ioloop(struct http_client_queue *queue)
1058 {
1059 	if (queue->to_connect != NULL)
1060 		queue->to_connect = io_loop_move_timeout(&queue->to_connect);
1061 	if (queue->to_request != NULL)
1062 		queue->to_request = io_loop_move_timeout(&queue->to_request);
1063 	if (queue->to_delayed != NULL)
1064 		queue->to_delayed = io_loop_move_timeout(&queue->to_delayed);
1065 }
1066