1 /* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "net.h"
5 #include "str.h"
6 #include "str-sanitize.h"
7 #include "hash.h"
8 #include "array.h"
9 #include "bsearch-insert-pos.h"
10 #include "llist.h"
11 #include "ioloop.h"
12 #include "istream.h"
13 #include "ostream.h"
14 #include "time-util.h"
15 #include "dns-lookup.h"
16 #include "http-response-parser.h"
17
18 #include "http-client-private.h"
19
20 #define TIMEOUT_CMP_MARGIN_USECS 2000
21
22 static void
23 http_client_queue_fail_full(struct http_client_queue *queue,
24 unsigned int status, const char *error, bool all);
25 static void
26 http_client_queue_set_delay_timer(struct http_client_queue *queue,
27 struct timeval time);
28 static void
29 http_client_queue_set_request_timer(struct http_client_queue *queue,
30 const struct timeval *time);
31
32 /*
33 * Queue object
34 */
35
36 static struct http_client_queue *
http_client_queue_find(struct http_client_host * host,const struct http_client_peer_addr * addr)37 http_client_queue_find(struct http_client_host *host,
38 const struct http_client_peer_addr *addr)
39 {
40 struct http_client_queue *queue;
41
42 array_foreach_elem(&host->queues, queue) {
43 if (http_client_peer_addr_cmp(&queue->addr, addr) == 0)
44 return queue;
45 }
46
47 return NULL;
48 }
49
50 static struct http_client_queue *
http_client_queue_create(struct http_client_host * host,const struct http_client_peer_addr * addr)51 http_client_queue_create(struct http_client_host *host,
52 const struct http_client_peer_addr *addr)
53 {
54 const char *hostname = host->shared->name;
55 struct http_client_queue *queue;
56
57 queue = i_new(struct http_client_queue, 1);
58 queue->client = host->client;
59 queue->host = host;
60 queue->addr = *addr;
61
62 switch (addr->type) {
63 case HTTP_CLIENT_PEER_ADDR_RAW:
64 queue->name = i_strdup_printf("raw://%s:%u",
65 hostname, addr->a.tcp.port);
66 queue->addr.a.tcp.https_name = NULL;
67 break;
68 case HTTP_CLIENT_PEER_ADDR_HTTPS_TUNNEL:
69 case HTTP_CLIENT_PEER_ADDR_HTTPS:
70 queue->name = i_strdup_printf("https://%s:%u",
71 hostname, addr->a.tcp.port);
72 queue->addr_name = i_strdup(addr->a.tcp.https_name);
73 queue->addr.a.tcp.https_name = queue->addr_name;
74 break;
75 case HTTP_CLIENT_PEER_ADDR_HTTP:
76 queue->name = i_strdup_printf("http://%s:%u",
77 hostname, addr->a.tcp.port);
78 queue->addr.a.tcp.https_name = NULL;
79 break;
80 case HTTP_CLIENT_PEER_ADDR_UNIX:
81 queue->name = i_strdup_printf("unix:%s", addr->a.un.path);
82 queue->addr_name = i_strdup(addr->a.un.path);
83 queue->addr.a.un.path = queue->addr_name;
84 break;
85 default:
86 i_unreached();
87 }
88
89 queue->event = event_create(queue->client->event);
90 event_set_append_log_prefix(queue->event,
91 t_strdup_printf("queue %s: ", str_sanitize(queue->name, 256)));
92 queue->ips_connect_idx = 0;
93 i_array_init(&queue->pending_peers, 8);
94 i_array_init(&queue->requests, 16);
95 i_array_init(&queue->queued_requests, 16);
96 i_array_init(&queue->queued_urgent_requests, 16);
97 i_array_init(&queue->delayed_requests, 4);
98 array_push_back(&host->queues, &queue);
99
100 return queue;
101 }
102
103 struct http_client_queue *
http_client_queue_get(struct http_client_host * host,const struct http_client_peer_addr * addr)104 http_client_queue_get(struct http_client_host *host,
105 const struct http_client_peer_addr *addr)
106 {
107 struct http_client_queue *queue;
108
109 queue = http_client_queue_find(host, addr);
110 if (queue == NULL)
111 queue = http_client_queue_create(host, addr);
112
113 return queue;
114 }
115
http_client_queue_free(struct http_client_queue * queue)116 void http_client_queue_free(struct http_client_queue *queue)
117 {
118 struct http_client_peer *peer;
119 ARRAY_TYPE(http_client_peer) peers;
120
121 e_debug(queue->event, "Destroy");
122
123 /* Currently only called when peer is freed, so there is no need to
124 unlink from the peer */
125
126 /* Unlink all peers */
127 if (queue->cur_peer != NULL) {
128 struct http_client_peer *peer = queue->cur_peer;
129
130 queue->cur_peer = NULL;
131 http_client_peer_unlink_queue(peer, queue);
132 }
133 t_array_init(&peers, array_count(&queue->pending_peers));
134 array_copy(&peers.arr, 0, &queue->pending_peers.arr, 0,
135 array_count(&queue->pending_peers));
136 array_foreach_elem(&peers, peer)
137 http_client_peer_unlink_queue(peer, queue);
138 array_free(&queue->pending_peers);
139
140 /* Abort all requests */
141 http_client_queue_fail_full(queue, HTTP_CLIENT_REQUEST_ERROR_ABORTED,
142 "Aborted", TRUE);
143 array_free(&queue->requests);
144 array_free(&queue->queued_requests);
145 array_free(&queue->queued_urgent_requests);
146 array_free(&queue->delayed_requests);
147
148 /* Cancel timeouts */
149 timeout_remove(&queue->to_connect);
150 timeout_remove(&queue->to_delayed);
151
152 /* Free */
153 event_unref(&queue->event);
154 i_free(queue->addr_name);
155 i_free(queue->name);
156 i_free(queue);
157 }
158
159 /*
160 * Error handling
161 */
162
163 static void
http_client_queue_fail_full(struct http_client_queue * queue,unsigned int status,const char * error,bool all)164 http_client_queue_fail_full(struct http_client_queue *queue,
165 unsigned int status, const char *error, bool all)
166 {
167 ARRAY_TYPE(http_client_request) *req_arr, treqs;
168 struct http_client_request *req;
169 unsigned int retained = 0;
170
171 /* Abort requests */
172 req_arr = &queue->requests;
173 t_array_init(&treqs, array_count(req_arr));
174 array_copy(&treqs.arr, 0, &req_arr->arr, 0, array_count(req_arr));
175 array_foreach_elem(&treqs, req) {
176 i_assert(req->state >= HTTP_REQUEST_STATE_QUEUED);
177 if (!all &&
178 req->state != HTTP_REQUEST_STATE_QUEUED)
179 retained++;
180 else
181 http_client_request_error(&req, status, error);
182 }
183
184 /* All queues should be empty now... unless new requests were submitted
185 from the callback. this invariant captures it all: */
186 i_assert((retained +
187 array_count(&queue->delayed_requests) +
188 array_count(&queue->queued_requests) +
189 array_count(&queue->queued_urgent_requests)) ==
190 array_count(&queue->requests));
191 }
192
193 static void
http_client_queue_fail(struct http_client_queue * queue,unsigned int status,const char * error)194 http_client_queue_fail(struct http_client_queue *queue,
195 unsigned int status, const char *error)
196 {
197 http_client_queue_fail_full(queue, status, error, FALSE);
198 }
199
200 /*
201 * Connection management
202 */
203
204 static bool
http_client_queue_is_last_connect_ip(struct http_client_queue * queue)205 http_client_queue_is_last_connect_ip(struct http_client_queue *queue)
206 {
207 const struct http_client_settings *set =
208 &queue->client->set;
209 struct http_client_host *host = queue->host;
210 unsigned int ips_count = http_client_host_get_ips_count(host);
211
212 i_assert(queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX);
213 i_assert(queue->ips_connect_idx < ips_count);
214 i_assert(queue->ips_connect_start_idx < ips_count);
215
216 /* If a maximum connect attempts > 1 is set, enforce it directly */
217 if (set->max_connect_attempts > 1 &&
218 queue->connect_attempts >= set->max_connect_attempts)
219 return TRUE;
220
221 /* Otherwise, we'll always go through all the IPs. we don't necessarily
222 start connecting from the first IP, so we'll need to treat the IPs as
223 a ring buffer where we automatically wrap back to the first IP
224 when necessary. */
225 return ((queue->ips_connect_idx + 1) % ips_count ==
226 queue->ips_connect_start_idx);
227 }
228
229 static void
http_client_queue_recover_from_lookup(struct http_client_queue * queue)230 http_client_queue_recover_from_lookup(struct http_client_queue *queue)
231 {
232 struct http_client_host *host = queue->host;
233 unsigned int ip_idx;
234
235 i_assert(queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX);
236
237 if (queue->cur_peer == NULL) {
238 queue->ips_connect_idx = queue->ips_connect_start_idx = 0;
239 return;
240 }
241
242 if (http_client_host_get_ip_idx(
243 host, &queue->cur_peer->shared->addr.a.tcp.ip, &ip_idx)) {
244 /* Continue with current peer */
245 queue->ips_connect_idx = queue->ips_connect_start_idx = ip_idx;
246 } else {
247 /* Reset connect attempts */
248 queue->ips_connect_idx = queue->ips_connect_start_idx = 0;
249 }
250 }
251
252 static void
http_client_queue_soft_connect_timeout(struct http_client_queue * queue)253 http_client_queue_soft_connect_timeout(struct http_client_queue *queue)
254 {
255 struct http_client_host *host = queue->host;
256 const struct http_client_peer_addr *addr = &queue->addr;
257 unsigned int ips_count = http_client_host_get_ips_count(host);
258 const char *https_name;
259
260 i_assert(queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX);
261
262 timeout_remove(&queue->to_connect);
263
264 if (http_client_queue_is_last_connect_ip(queue)) {
265 /* No more IPs to try */
266 return;
267 }
268
269 /* If our our previous connection attempt takes longer than the
270 soft_connect_timeout, we start a connection attempt to the next IP in
271 parallel */
272 https_name = http_client_peer_addr_get_https_name(addr);
273 e_debug(queue->event, "Connection to %s%s is taking a long time; "
274 "starting parallel connection attempt to next IP",
275 http_client_peer_addr2str(addr),
276 (https_name == NULL ?
277 "" : t_strdup_printf(" (SSL=%s)", https_name)));
278
279 /* Next IP */
280 queue->ips_connect_idx = (queue->ips_connect_idx + 1) % ips_count;
281
282 /* Setup connection to new peer (can start new soft timeout) */
283 http_client_queue_connection_setup(queue);
284 }
285
286 static struct http_client_peer *
http_client_queue_connection_attempt(struct http_client_queue * queue)287 http_client_queue_connection_attempt(struct http_client_queue *queue)
288 {
289 struct http_client *client = queue->client;
290 struct http_client_host *host = queue->host;
291 struct http_client_peer *peer;
292 struct http_client_peer_addr *addr = &queue->addr;
293 unsigned int num_requests =
294 array_count(&queue->queued_requests) +
295 array_count(&queue->queued_urgent_requests);
296 const char *ssl = "";
297 int ret;
298
299 if (num_requests == 0)
300 return NULL;
301
302 /* Check whether host IPs are still up-to-date */
303 ret = http_client_host_refresh(host);
304 if (ret < 0) {
305 /* Performing asynchronous lookup */
306 timeout_remove(&queue->to_connect);
307 return NULL;
308 }
309 if (ret > 0) {
310 /* New lookup performed */
311 http_client_queue_recover_from_lookup(queue);
312 }
313
314 /* Update our peer address */
315 if (queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
316 const struct ip_addr *ip =
317 http_client_host_get_ip(host, queue->ips_connect_idx);
318
319 queue->addr.a.tcp.ip = *ip;
320 ssl = http_client_peer_addr_get_https_name(addr);
321 ssl = (ssl == NULL ? "" : t_strdup_printf(" (SSL=%s)", ssl));
322 }
323
324 /* Already got a peer? */
325 peer = NULL;
326 if (queue->cur_peer != NULL) {
327 i_assert(array_count(&queue->pending_peers) == 0);
328
329 /* Is it still the one we want? */
330 if (http_client_peer_addr_cmp(
331 addr, &queue->cur_peer->shared->addr) == 0) {
332 /* Is it still connected? */
333 if (http_client_peer_is_connected(queue->cur_peer)) {
334 /* Yes */
335 e_debug(queue->event,
336 "Using existing connection to %s%s "
337 "(%u requests pending)",
338 http_client_peer_addr2str(addr),
339 ssl, num_requests);
340
341 /* Handle requests; */
342 http_client_peer_trigger_request_handler(
343 queue->cur_peer);
344 return queue->cur_peer;
345 }
346 /* No */
347 peer = queue->cur_peer;
348 } else {
349 /* Peer is not relevant to this queue anymore */
350 http_client_peer_unlink_queue(queue->cur_peer, queue);
351 }
352
353 queue->cur_peer = NULL;
354 }
355
356 if (peer == NULL)
357 peer = http_client_peer_get(queue->client, addr);
358
359 e_debug(queue->event,
360 "Setting up connection to %s%s (%u requests pending)",
361 http_client_peer_addr2str(addr), ssl, num_requests);
362
363 /* Create provisional link between queue and peer */
364 http_client_peer_link_queue(peer, queue);
365
366 /* Handle requests; creates new connections when needed/possible */
367 http_client_peer_trigger_request_handler(peer);
368
369 if (http_client_peer_is_connected(peer)) {
370 /* Drop any pending peers */
371 if (array_count(&queue->pending_peers) > 0) {
372 struct http_client_peer *pending_peer;
373
374 array_foreach_elem(&queue->pending_peers, pending_peer) {
375 if (pending_peer == peer) {
376 /* This can happen with shared clients
377 */
378 continue;
379 }
380 i_assert(http_client_peer_addr_cmp(
381 &pending_peer->shared->addr, addr) != 0);
382 http_client_peer_unlink_queue(pending_peer, queue);
383 }
384 array_clear(&queue->pending_peers);
385 }
386 queue->cur_peer = peer;
387
388 http_client_peer_trigger_request_handler(queue->cur_peer);
389
390 } else {
391 struct http_client_peer *pending_peer;
392 unsigned int msecs;
393 bool new_peer = TRUE;
394
395 /* Not already connected, wait for connections */
396
397 /* We may be waiting for this peer already */
398 array_foreach_elem(&queue->pending_peers, pending_peer) {
399 if (http_client_peer_addr_cmp(
400 &pending_peer->shared->addr, addr) == 0) {
401 i_assert(pending_peer == peer);
402 new_peer = FALSE;
403 break;
404 }
405 }
406 if (new_peer) {
407 e_debug(queue->event, "Started new connection to %s%s",
408 http_client_peer_addr2str(addr), ssl);
409
410 array_push_back(&queue->pending_peers, &peer);
411 if (queue->connect_attempts++ == 0)
412 queue->first_connect_time = ioloop_timeval;
413 }
414
415 /* Start soft connect time-out
416 (but only if we have another IP left) */
417 if (queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
418 msecs = client->set.soft_connect_timeout_msecs;
419 if (!http_client_queue_is_last_connect_ip(queue) &&
420 msecs > 0 && queue->to_connect == NULL) {
421 queue->to_connect = timeout_add_to(
422 client->ioloop, msecs,
423 http_client_queue_soft_connect_timeout,
424 queue);
425 }
426 }
427 }
428
429 return peer;
430 }
431
http_client_queue_connection_setup(struct http_client_queue * queue)432 void http_client_queue_connection_setup(struct http_client_queue *queue)
433 {
434 (void)http_client_queue_connection_attempt(queue);
435 }
436
437 unsigned int
http_client_queue_host_lookup_done(struct http_client_queue * queue)438 http_client_queue_host_lookup_done(struct http_client_queue *queue)
439 {
440 unsigned int reqs_pending =
441 http_client_queue_requests_pending(queue, NULL);
442
443 http_client_queue_recover_from_lookup(queue);
444 if (reqs_pending > 0)
445 http_client_queue_connection_setup(queue);
446 return reqs_pending;
447 }
448
http_client_queue_host_lookup_failure(struct http_client_queue * queue,const char * error)449 void http_client_queue_host_lookup_failure(
450 struct http_client_queue *queue, const char *error)
451 {
452 http_client_queue_fail(
453 queue, HTTP_CLIENT_REQUEST_ERROR_HOST_LOOKUP_FAILED, error);
454 }
455
http_client_queue_connection_success(struct http_client_queue * queue,struct http_client_peer * peer)456 void http_client_queue_connection_success(struct http_client_queue *queue,
457 struct http_client_peer *peer)
458 {
459 const struct http_client_peer_addr *addr = &peer->shared->addr;
460 struct http_client_host *host = queue->host;
461
462 if (http_client_host_ready(host) &&
463 queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
464 /* We achieved at least one connection the the addr->ip */
465 if (!http_client_host_get_ip_idx(
466 host, &addr->a.tcp.ip, &queue->ips_connect_start_idx)) {
467 /* list of IPs changed during connect */
468 queue->ips_connect_start_idx = 0;
469 }
470 }
471
472 /* Reset attempt counter */
473 queue->connect_attempts = 0;
474
475 /* stop soft connect time-out */
476 timeout_remove(&queue->to_connect);
477
478 /* Drop all other attempts to the hport. note that we get here whenever
479 a connection is successfully created, so pending_peers array
480 may be empty. */
481 if (array_count(&queue->pending_peers) > 0) {
482 struct http_client_peer *pending_peer;
483
484 array_foreach_elem(&queue->pending_peers, pending_peer) {
485 if (pending_peer == peer) {
486 /* Don't drop any connections to the
487 successfully connected peer, even if some of
488 the connections are pending. they may be
489 intended for urgent requests. */
490 i_assert(queue->cur_peer == NULL);
491 queue->cur_peer = pending_peer;
492 continue;
493 }
494 /* Unlink this queue from the peer; if this was the
495 last/only queue, the peer will be freed, closing all
496 connections.
497 */
498 http_client_peer_unlink_queue(pending_peer, queue);
499 }
500
501 array_clear(&queue->pending_peers);
502 i_assert(queue->cur_peer != NULL);
503 }
504 }
505
http_client_queue_connection_failure(struct http_client_queue * queue,struct http_client_peer * peer,const char * reason)506 void http_client_queue_connection_failure(struct http_client_queue *queue,
507 struct http_client_peer *peer,
508 const char *reason)
509 {
510 const struct http_client_settings *set =
511 &queue->client->set;
512 const struct http_client_peer_addr *addr = &peer->shared->addr;
513 const char *https_name = http_client_peer_addr_get_https_name(addr);
514 struct http_client_host *host = queue->host;
515 unsigned int ips_count = http_client_host_get_ips_count(host);
516 struct http_client_peer *const *peer_idx;
517 unsigned int num_requests =
518 array_count(&queue->queued_requests) +
519 array_count(&queue->queued_urgent_requests);
520
521 e_debug(queue->event,
522 "Failed to set up connection to %s%s: %s "
523 "(%u peers pending, %u requests pending)",
524 http_client_peer_addr2str(addr),
525 (https_name == NULL ?
526 "" : t_strdup_printf(" (SSL=%s)", https_name)),
527 reason, array_count(&queue->pending_peers), num_requests);
528
529 http_client_peer_unlink_queue(peer, queue);
530
531 if (array_count(&queue->pending_peers) == 0) {
532 i_assert(queue->cur_peer == NULL || queue->cur_peer == peer);
533 queue->cur_peer = NULL;
534 } else {
535 bool found = FALSE;
536
537 i_assert(queue->cur_peer == NULL);
538
539 /* We're still doing the initial connections to this hport. if
540 we're also doing parallel connections with soft timeouts
541 (pending_peer_count>1), wait for them to finish first. */
542 array_foreach(&queue->pending_peers, peer_idx) {
543 if (*peer_idx == peer) {
544 array_delete(&queue->pending_peers,
545 array_foreach_idx(
546 &queue->pending_peers,
547 peer_idx), 1);
548 found = TRUE;
549 break;
550 }
551 }
552 i_assert(found);
553 if (array_count(&queue->pending_peers) > 0) {
554 e_debug(queue->event,
555 "Waiting for remaining pending peers.");
556 return;
557 }
558
559 /* One of the connections failed. if we're not using soft
560 timeouts, we need to try to connect to the next IP. if we are
561 using soft timeouts, we've already tried all of the IPs by
562 now. */
563 timeout_remove(&queue->to_connect);
564
565 if (queue->addr.type == HTTP_CLIENT_PEER_ADDR_UNIX) {
566 http_client_queue_fail(
567 queue, HTTP_CLIENT_REQUEST_ERROR_CONNECT_FAILED,
568 reason);
569 return;
570 }
571 }
572
573 if (http_client_queue_is_last_connect_ip(queue)) {
574 if (array_count(&queue->pending_peers) > 0) {
575 /* Other connection attempts still pending */
576 return;
577 }
578
579 /* All IPs failed up until here and we allow no more connect
580 attempts, but try the next ones on the next request. */
581 queue->ips_connect_idx = queue->ips_connect_start_idx =
582 (queue->ips_connect_idx + 1) % ips_count;
583
584 if (set->max_connect_attempts == 0 ||
585 queue->connect_attempts >= set->max_connect_attempts) {
586
587 e_debug(queue->event,
588 "Failed to set up any connection; "
589 "failing all queued requests");
590 if (queue->connect_attempts > 1) {
591 unsigned int total_msecs =
592 timeval_diff_msecs(&ioloop_timeval,
593 &queue->first_connect_time);
594 reason = t_strdup_printf(
595 "%s (%u attempts in %u.%03u secs)",
596 reason, queue->connect_attempts,
597 total_msecs/1000, total_msecs%1000);
598 }
599 queue->connect_attempts = 0;
600 http_client_queue_fail(
601 queue, HTTP_CLIENT_REQUEST_ERROR_CONNECT_FAILED,
602 reason);
603 return;
604 }
605 } else {
606 queue->ips_connect_idx =
607 (queue->ips_connect_idx + 1) % ips_count;
608 }
609
610 if (http_client_queue_connection_attempt(queue) != peer)
611 http_client_peer_unlink_queue(peer, queue);
612 return;
613 }
614
http_client_queue_peer_disconnected(struct http_client_queue * queue,struct http_client_peer * peer)615 void http_client_queue_peer_disconnected(struct http_client_queue *queue,
616 struct http_client_peer *peer)
617 {
618 struct http_client_peer *const *peer_idx;
619
620 if (queue->cur_peer == peer) {
621 queue->cur_peer = NULL;
622 return;
623 }
624
625 array_foreach(&queue->pending_peers, peer_idx) {
626 if (*peer_idx == peer) {
627 array_delete(&queue->pending_peers,
628 array_foreach_idx(&queue->pending_peers,
629 peer_idx), 1);
630 break;
631 }
632 }
633 }
634
635 /*
636 * Main request queue
637 */
638
http_client_queue_drop_request(struct http_client_queue * queue,struct http_client_request * req)639 void http_client_queue_drop_request(struct http_client_queue *queue,
640 struct http_client_request *req)
641 {
642 struct http_client_request **reqs;
643 unsigned int count, i;
644
645 e_debug(queue->event,
646 "Dropping request %s", http_client_request_label(req));
647
648 /* Drop from queue */
649 if (req->urgent) {
650 reqs = array_get_modifiable(&queue->queued_urgent_requests,
651 &count);
652 for (i = 0; i < count; i++) {
653 if (reqs[i] == req) {
654 array_delete(&queue->queued_urgent_requests,
655 i, 1);
656 break;
657 }
658 }
659 } else {
660 reqs = array_get_modifiable(&queue->queued_requests, &count);
661 for (i = 0; i < count; i++) {
662 if (reqs[i] == req) {
663 array_delete(&queue->queued_requests, i, 1);
664 break;
665 }
666 }
667 }
668
669 /* Drop from delay queue */
670 if (req->release_time.tv_sec > 0) {
671 reqs = array_get_modifiable(&queue->delayed_requests, &count);
672 for (i = 0; i < count; i++) {
673 if (reqs[i] == req)
674 break;
675 }
676 if (i < count) {
677 if (i == 0) {
678 if (queue->to_delayed != NULL) {
679 timeout_remove(&queue->to_delayed);
680 if (count > 1) {
681 i_assert(reqs[1]->release_time.tv_sec > 0);
682 http_client_queue_set_delay_timer(
683 queue, reqs[1]->release_time);
684 }
685 }
686 }
687 array_delete(&queue->delayed_requests, i, 1);
688 }
689 }
690
691 /* Drop from main request list */
692 reqs = array_get_modifiable(&queue->requests, &count);
693 for (i = 0; i < count; i++) {
694 if (reqs[i] == req)
695 break;
696 }
697 i_assert(i < count);
698
699 if (i == 0) {
700 if (queue->to_request != NULL) {
701 timeout_remove(&queue->to_request);
702 if (count > 1 && reqs[1]->timeout_time.tv_sec > 0) {
703 http_client_queue_set_request_timer(queue,
704 &reqs[1]->timeout_time);
705 }
706 }
707 }
708 req->queue = NULL;
709 array_delete(&queue->requests, i, 1);
710
711 if (array_count(&queue->requests) == 0)
712 http_client_host_check_idle(queue->host);
713 return;
714 }
715
http_client_queue_request_timeout(struct http_client_queue * queue)716 static void http_client_queue_request_timeout(struct http_client_queue *queue)
717 {
718 struct http_client_request *const *reqs;
719 ARRAY_TYPE(http_client_request) failed_requests;
720 struct timeval new_to = { 0, 0 };
721 string_t *str;
722 size_t prefix_size;
723 unsigned int count, i;
724
725 e_debug(queue->event, "Timeout (now: %s.%03lu)",
726 t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
727 ((unsigned long)ioloop_timeval.tv_usec) / 1000);
728
729 timeout_remove(&queue->to_request);
730
731 /* Collect failed requests */
732 reqs = array_get(&queue->requests, &count);
733 i_assert(count > 0);
734 t_array_init(&failed_requests, count);
735 for (i = 0; i < count; i++) {
736 if (reqs[i]->timeout_time.tv_sec > 0 &&
737 timeval_cmp_margin(&reqs[i]->timeout_time,
738 &ioloop_timeval,
739 TIMEOUT_CMP_MARGIN_USECS) > 0) {
740 break;
741 }
742 array_push_back(&failed_requests, &reqs[i]);
743 }
744
745 /* Update timeout */
746 if (i < count)
747 new_to = reqs[i]->timeout_time;
748
749 str = t_str_new(64);
750 str_append(str, "Request ");
751 prefix_size = str_len(str);
752
753 /* Abort all failed request */
754 reqs = array_get(&failed_requests, &count);
755 i_assert(count > 0); /* At least one request timed out */
756 for (i = 0; i < count; i++) {
757 struct http_client_request *req = reqs[i];
758
759 str_truncate(str, prefix_size);
760 http_client_request_append_stats_text(req, str);
761
762 e_debug(queue->event,
763 "Absolute timeout expired for request %s (%s)",
764 http_client_request_label(req), str_c(str));
765 http_client_request_error(
766 &req, HTTP_CLIENT_REQUEST_ERROR_TIMED_OUT,
767 t_strdup_printf(
768 "Absolute request timeout expired (%s)",
769 str_c(str)));
770 }
771
772 if (new_to.tv_sec > 0) {
773 e_debug(queue->event, "New timeout");
774 http_client_queue_set_request_timer(queue, &new_to);
775 }
776 }
777
778 static void
http_client_queue_set_request_timer(struct http_client_queue * queue,const struct timeval * time)779 http_client_queue_set_request_timer(struct http_client_queue *queue,
780 const struct timeval *time)
781 {
782 i_assert(time->tv_sec > 0);
783 timeout_remove(&queue->to_request);
784
785 e_debug(queue->event,
786 "Set request timeout to %s.%03lu (now: %s.%03lu)",
787 t_strflocaltime("%Y-%m-%d %H:%M:%S", time->tv_sec),
788 ((unsigned long)time->tv_usec) / 1000,
789 t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
790 ((unsigned long)ioloop_timeval.tv_usec) / 1000);
791
792 /* Set timer */
793 queue->to_request = timeout_add_absolute_to(
794 queue->client->ioloop, time,
795 http_client_queue_request_timeout, queue);
796 }
797
798 static int
http_client_queue_request_timeout_cmp(struct http_client_request * const * req1,struct http_client_request * const * req2)799 http_client_queue_request_timeout_cmp(struct http_client_request *const *req1,
800 struct http_client_request *const *req2)
801 {
802 int ret;
803
804 /* 0 means no timeout */
805 if ((*req1)->timeout_time.tv_sec == 0) {
806 if ((*req2)->timeout_time.tv_sec == 0) {
807 /* sort by age */
808 ret = timeval_cmp(&(*req1)->submit_time,
809 &(*req2)->submit_time);
810 if (ret != 0)
811 return ret;
812 } else {
813 return 1;
814 }
815 } else if ((*req2)->timeout_time.tv_sec == 0) {
816 return -1;
817
818 /* Sort by timeout */
819 } else if ((ret = timeval_cmp(&(*req1)->timeout_time,
820 &(*req2)->timeout_time)) != 0) {
821 return ret;
822 }
823
824 /* Sort by minimum attempts for fairness */
825 return ((int)(*req2)->attempts - (int)(*req1)->attempts);
826 }
827
828 static void
http_client_queue_submit_now(struct http_client_queue * queue,struct http_client_request * req)829 http_client_queue_submit_now(struct http_client_queue *queue,
830 struct http_client_request *req)
831 {
832 ARRAY_TYPE(http_client_request) *req_queue;
833
834 req->release_time.tv_sec = 0;
835 req->release_time.tv_usec = 0;
836
837 if (req->urgent)
838 req_queue = &queue->queued_urgent_requests;
839 else
840 req_queue = &queue->queued_requests;
841
842 /* Enqueue */
843 if (req->timeout_time.tv_sec == 0) {
844 /* No timeout; enqueue at end */
845 array_push_back(req_queue, &req);
846 } else if (timeval_diff_msecs(&req->timeout_time,
847 &ioloop_timeval) <= 1) {
848 /* Pretty much already timed out; don't bother */
849 return;
850 } else {
851 unsigned int insert_idx;
852
853 /* Keep transmission queue sorted earliest timeout first */
854 (void)array_bsearch_insert_pos(
855 req_queue, &req,
856 http_client_queue_request_timeout_cmp, &insert_idx);
857 array_insert(req_queue, insert_idx, &req, 1);
858 }
859
860 http_client_queue_connection_setup(queue);
861 }
862
863 /*
864 * Delayed request queue
865 */
866
867 static void
http_client_queue_delay_timeout(struct http_client_queue * queue)868 http_client_queue_delay_timeout(struct http_client_queue *queue)
869 {
870 struct http_client_request *const *reqs;
871 unsigned int count, i, finished;
872
873 timeout_remove(&queue->to_delayed);
874 io_loop_time_refresh();
875
876 finished = 0;
877 reqs = array_get(&queue->delayed_requests, &count);
878 for (i = 0; i < count; i++) {
879 if (timeval_cmp_margin(&reqs[i]->release_time,
880 &ioloop_timeval,
881 TIMEOUT_CMP_MARGIN_USECS) > 0) {
882 break;
883 }
884
885 e_debug(queue->event, "Activated delayed request %s%s",
886 http_client_request_label(reqs[i]),
887 (reqs[i]->urgent ? " (urgent)" : ""));
888 http_client_queue_submit_now(queue, reqs[i]);
889 finished++;
890 }
891 if (i < count)
892 http_client_queue_set_delay_timer(queue, reqs[i]->release_time);
893 array_delete(&queue->delayed_requests, 0, finished);
894 }
895
896 static void
http_client_queue_set_delay_timer(struct http_client_queue * queue,struct timeval time)897 http_client_queue_set_delay_timer(struct http_client_queue *queue,
898 struct timeval time)
899 {
900 struct http_client *client = queue->client;
901 int usecs = timeval_diff_usecs(&time, &ioloop_timeval);
902 int msecs;
903
904 /* Round up to nearest microsecond */
905 msecs = (usecs + 999) / 1000;
906
907 /* Set timer */
908 timeout_remove(&queue->to_delayed);
909 queue->to_delayed = timeout_add_to(
910 client->ioloop, msecs,
911 http_client_queue_delay_timeout, queue);
912 }
913
914 static int
http_client_queue_delayed_cmp(struct http_client_request * const * req1,struct http_client_request * const * req2)915 http_client_queue_delayed_cmp(struct http_client_request *const *req1,
916 struct http_client_request *const *req2)
917 {
918 return timeval_cmp(&(*req1)->release_time, &(*req2)->release_time);
919 }
920
921 /*
922 * Request submission
923 */
924
http_client_queue_submit_request(struct http_client_queue * queue,struct http_client_request * req)925 void http_client_queue_submit_request(struct http_client_queue *queue,
926 struct http_client_request *req)
927 {
928 unsigned int insert_idx;
929
930 if (req->queue != NULL)
931 http_client_queue_drop_request(req->queue, req);
932 req->queue = queue;
933
934 /* Check delay vs timeout */
935 if (req->release_time.tv_sec > 0 && req->timeout_time.tv_sec > 0 &&
936 timeval_cmp_margin(&req->release_time, &req->timeout_time,
937 TIMEOUT_CMP_MARGIN_USECS) >= 0) {
938 /* Release time is later than absolute timeout */
939 req->release_time.tv_sec = 0;
940 req->release_time.tv_usec = 0;
941
942 /* Timeout rightaway */
943 req->timeout_time = ioloop_timeval;
944
945 e_debug(queue->event,
946 "Delayed request %s%s already timed out",
947 http_client_request_label(req),
948 (req->urgent ? " (urgent)" : ""));
949 }
950
951 /* Add to main request list */
952 if (req->timeout_time.tv_sec == 0) {
953 /* No timeout; just append */
954 array_push_back(&queue->requests, &req);
955 } else {
956 unsigned int insert_idx;
957
958 /* Keep main request list sorted earliest timeout first */
959 (void)array_bsearch_insert_pos(
960 &queue->requests, &req,
961 http_client_queue_request_timeout_cmp, &insert_idx);
962 array_insert(&queue->requests, insert_idx, &req, 1);
963
964 /* Now first in queue; update timer */
965 if (insert_idx == 0) {
966 http_client_queue_set_request_timer(queue,
967 &req->timeout_time);
968 }
969 }
970
971 /* Handle delay */
972 if (req->release_time.tv_sec > 0) {
973 io_loop_time_refresh();
974
975 if (timeval_cmp_margin(&req->release_time, &ioloop_timeval,
976 TIMEOUT_CMP_MARGIN_USECS) > 0) {
977 e_debug(queue->event,
978 "Delayed request %s%s submitted "
979 "(time remaining: %d msecs)",
980 http_client_request_label(req),
981 (req->urgent ? " (urgent)" : ""),
982 timeval_diff_msecs(&req->release_time,
983 &ioloop_timeval));
984
985 (void)array_bsearch_insert_pos(
986 &queue->delayed_requests, &req,
987 http_client_queue_delayed_cmp, &insert_idx);
988 array_insert(&queue->delayed_requests, insert_idx,
989 &req, 1);
990 if (insert_idx == 0) {
991 http_client_queue_set_delay_timer(
992 queue, req->release_time);
993 }
994 return;
995 }
996 }
997
998 http_client_queue_submit_now(queue, req);
999 }
1000
1001 /*
1002 * Request retrieval
1003 */
1004
1005 struct http_client_request *
http_client_queue_claim_request(struct http_client_queue * queue,const struct http_client_peer_addr * addr,bool no_urgent)1006 http_client_queue_claim_request(struct http_client_queue *queue,
1007 const struct http_client_peer_addr *addr,
1008 bool no_urgent)
1009 {
1010 struct http_client_request *const *requests;
1011 struct http_client_request *req;
1012 unsigned int i, count;
1013
1014 count = 0;
1015 if (!no_urgent)
1016 requests = array_get(&queue->queued_urgent_requests, &count);
1017
1018 if (count == 0)
1019 requests = array_get(&queue->queued_requests, &count);
1020 if (count == 0)
1021 return NULL;
1022 i = 0;
1023 req = requests[i];
1024 if (req->urgent)
1025 array_delete(&queue->queued_urgent_requests, i, 1);
1026 else
1027 array_delete(&queue->queued_requests, i, 1);
1028
1029 e_debug(queue->event,
1030 "Connection to peer %s claimed request %s %s",
1031 http_client_peer_addr2str(addr), http_client_request_label(req),
1032 (req->urgent ? "(urgent)" : ""));
1033
1034 return req;
1035 }
1036
1037 unsigned int
http_client_queue_requests_pending(struct http_client_queue * queue,unsigned int * num_urgent_r)1038 http_client_queue_requests_pending(struct http_client_queue *queue,
1039 unsigned int *num_urgent_r)
1040 {
1041 unsigned int urg_count = array_count(&queue->queued_urgent_requests);
1042
1043 if (num_urgent_r != NULL)
1044 *num_urgent_r = urg_count;
1045 return array_count(&queue->queued_requests) + urg_count;
1046 }
1047
http_client_queue_requests_active(struct http_client_queue * queue)1048 unsigned int http_client_queue_requests_active(struct http_client_queue *queue)
1049 {
1050 return array_count(&queue->requests);
1051 }
1052
1053 /*
1054 * Ioloop
1055 */
1056
http_client_queue_switch_ioloop(struct http_client_queue * queue)1057 void http_client_queue_switch_ioloop(struct http_client_queue *queue)
1058 {
1059 if (queue->to_connect != NULL)
1060 queue->to_connect = io_loop_move_timeout(&queue->to_connect);
1061 if (queue->to_request != NULL)
1062 queue->to_request = io_loop_move_timeout(&queue->to_request);
1063 if (queue->to_delayed != NULL)
1064 queue->to_delayed = io_loop_move_timeout(&queue->to_delayed);
1065 }
1066