1 /*
2 * Copyright (C) 2020-2021 Bareos GmbH & Co. KG
3 * Copyright (C) 2010 SCALITY SA. All rights reserved.
4 * http://www.scality.com
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 *
13 * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY SCALITY SA ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 * DISCLAIMED. IN NO EVENT SHALL SCALITY SA OR CONTRIBUTORS BE LIABLE FOR
21 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
25 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 *
29 * The views and conclusions contained in the software and documentation
30 * are those of the authors and should not be interpreted as representing
31 * official policies, either expressed or implied, of SCALITY SA.
32 *
33 * https://github.com/scality/Droplet
34 */
35 #include "dropletp.h"
36 #include "sys/uio.h"
37
38 /** @file */
39
40 /* #define DPRINTF(fmt,...) fprintf(stderr, fmt, ##__VA_ARGS__) */
41 #define DPRINTF(fmt, ...)
42
43 //#define DEBUG
44
45 dpl_ctx_t* dpl_default_conn_ctx = NULL;
46
conn_hashcode(const unsigned char * data,size_t len)47 static u_int conn_hashcode(const unsigned char* data, size_t len)
48 {
49 const unsigned char* p;
50 u_int h, g;
51 int i = 0;
52
53 h = g = 0;
54
55 for (p = data; i < len; p = p + 1, i++) {
56 h = (h << 4) + (*p);
57 if ((g = h & 0xf0000000)) {
58 h = h ^ (g >> 24);
59 h = h ^ g;
60 }
61 }
62 return h;
63 }
64
dpl_conn_get_nolock(dpl_ctx_t * ctx,struct hostent * host,u_short port)65 static dpl_conn_t* dpl_conn_get_nolock(dpl_ctx_t* ctx,
66 struct hostent* host,
67 u_short port)
68 {
69 u_int bucket;
70 struct dpl_hash_info hash_info;
71 dpl_conn_t* conn;
72
73 memset(&hash_info, 0, sizeof(hash_info));
74
75 memcpy(&hash_info.addr, host->h_addr, host->h_length);
76 hash_info.port = port;
77
78 bucket = conn_hashcode((unsigned char*)&hash_info, sizeof(hash_info))
79 % ctx->n_conn_buckets;
80
81 for (conn = ctx->conn_buckets[bucket]; conn; conn = conn->prev) {
82 if (!memcmp(&conn->hash_info, &hash_info, sizeof(hash_info))) return conn;
83 }
84
85 return NULL;
86 }
87
is_usable(dpl_conn_t * conn)88 static int is_usable(dpl_conn_t* conn)
89 {
90 struct pollfd pfd;
91 int retval;
92
93 memset(&pfd, 0, sizeof(struct pollfd));
94
95 pfd.fd = conn->fd;
96 #ifdef POLLRDHUP
97 pfd.events = POLLIN | POLLPRI | POLLRDHUP;
98 #else
99 pfd.events = POLLIN | POLLPRI;
100 #endif
101
102 retval = poll(&pfd, 1, 0);
103
104 switch (retval) {
105 case 1: {
106 char buf[1];
107 int size;
108
109 if (conn->ctx->use_https)
110 size = SSL_read(conn->ssl, buf, sizeof(buf));
111 else
112 size = recv(conn->fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_PEEK);
113
114 if (size == 0) {
115 DPRINTF("is_usable: rv %d returning False\n", retval);
116 return 0;
117 }
118 }
119 /* fall down */
120 case 0:
121 DPRINTF("is_usable: rv %d returning True\n", retval);
122 return 1;
123 default:
124 DPRINTF("is_usable: rv %d returning False\n", retval);
125 return 0;
126 }
127
128 return 0; /* not reached */
129 }
130
dpl_conn_add_nolock(dpl_conn_t * conn)131 static void dpl_conn_add_nolock(dpl_conn_t* conn)
132 {
133 u_int bucket;
134
135 bucket
136 = conn_hashcode((unsigned char*)&conn->hash_info, sizeof(conn->hash_info))
137 % conn->ctx->n_conn_buckets;
138
139 conn->next = NULL;
140 conn->prev = conn->ctx->conn_buckets[bucket];
141
142 if (NULL != conn->ctx->conn_buckets[bucket])
143 conn->ctx->conn_buckets[bucket]->next = conn;
144
145 conn->ctx->conn_buckets[bucket] = conn;
146 }
147
dpl_conn_remove_nolock(dpl_ctx_t * ctx,dpl_conn_t * conn)148 static void dpl_conn_remove_nolock(dpl_ctx_t* ctx, dpl_conn_t* conn)
149 {
150 u_int bucket;
151
152 bucket
153 = conn_hashcode((unsigned char*)&conn->hash_info, sizeof(conn->hash_info))
154 % ctx->n_conn_buckets;
155
156 if (conn->prev) conn->prev->next = conn->next;
157
158 if (conn->next) conn->next->prev = conn->prev;
159
160 if (ctx->conn_buckets[bucket] == conn) ctx->conn_buckets[bucket] = conn->prev;
161 }
162
safe_close(dpl_ctx_t * ctx,int fd)163 static void safe_close(dpl_ctx_t* ctx, int fd)
164 {
165 int ret;
166
167 DPRINTF("closing fd=%d\n", fd);
168
169 do {
170 ret = close(fd);
171 } while (ret == -1 && errno == EINTR);
172
173 if (ret == -1)
174 DPL_TRACE(ctx, DPL_TRACE_WARN, "close failed: %s", strerror(errno));
175 }
176
dpl_conn_free(dpl_conn_t * conn)177 static void dpl_conn_free(dpl_conn_t* conn)
178 {
179 if (NULL != conn->ssl) {
180 int ssl_ret;
181 char buf[256];
182 unsigned long ssl_err;
183 SSL_set_shutdown(conn->ssl, SSL_SENT_SHUTDOWN | SSL_RECEIVED_SHUTDOWN);
184 ssl_ret = SSL_shutdown(conn->ssl);
185 if (1 == ssl_ret) {
186 DPL_TRACE(conn->ctx, DPL_TRACE_WARN,
187 "SSL shutdown was successfully completed");
188 } else if (0 == ssl_ret) {
189 ssl_err = SSL_get_error(conn->ssl, ssl_ret);
190 ERR_error_string_n(ssl_err, buf, sizeof buf);
191 DPL_TRACE(
192 conn->ctx, DPL_TRACE_WARN,
193 "SSL shutdown is not yet finished, calling for a second time: %s",
194 buf);
195 ssl_ret = SSL_shutdown(conn->ssl);
196 if (1 == ssl_ret) {
197 DPL_TRACE(conn->ctx, DPL_TRACE_WARN,
198 "SSL shutdown was successfully completed");
199 } else {
200 DPL_TRACE(conn->ctx, DPL_TRACE_ERR,
201 "SSL shutdown was not successfully completed");
202 }
203 } else if (0 > ssl_ret) {
204 ssl_err = SSL_get_error(conn->ssl, ssl_ret);
205 ERR_error_string_n(ssl_err, buf, sizeof buf);
206 DPL_TRACE(
207 conn->ctx, DPL_TRACE_WARN,
208 "SSL shutdown was not successful because a fatal error occurred: %s",
209 buf);
210 }
211 SSL_free(conn->ssl);
212 }
213
214 if (-1 != conn->fd) safe_close(conn->ctx, conn->fd);
215
216 if (NULL != conn->read_buf) free(conn->read_buf);
217
218 if (NULL != conn->host) free(conn->host);
219
220 if (NULL != conn->port) free(conn->port);
221
222 free(conn);
223 }
224
dpl_conn_terminate_nolock(dpl_conn_t * conn)225 static void dpl_conn_terminate_nolock(dpl_conn_t* conn)
226 {
227 DPL_TRACE(conn->ctx, DPL_TRACE_CONN, "conn_terminate conn=%p", conn);
228
229 conn->ctx->n_conn_fds--;
230 dpl_conn_free(conn);
231 }
232
do_connect(dpl_ctx_t * ctx,struct hostent * host,u_short port)233 static int do_connect(dpl_ctx_t* ctx, struct hostent* host, u_short port)
234 {
235 int fd = -1, ret, on, error;
236 struct pollfd fds;
237 socklen_t errorlen;
238 char ident[DPL_ADDR_IDENT_STRLEN];
239
240 fd = socket(host->h_addrtype, SOCK_STREAM, 0);
241 if (fd == -1) {
242 DPL_TRACE(ctx, DPL_TRACE_ERR, "socket failed");
243 goto end;
244 }
245
246 on = 1;
247 ret = ioctl(fd, FIONBIO, &on);
248 if (-1 == ret) {
249 DPL_LOG(ctx, DPL_ERROR, "ioctl(FIONBIO) failed: %s", strerror(errno));
250 safe_close(ctx, fd);
251 fd = -1;
252 goto end;
253 }
254
255 dpl_addr_get_ident(host, port, ident, sizeof(ident));
256 DPL_TRACE(ctx, DPL_TRACE_CONN, "connect %s", ident);
257
258 if (host->h_addrtype == AF_INET) {
259 struct sockaddr_in sin;
260
261 sin.sin_family = host->h_addrtype;
262 sin.sin_port = htons(port);
263 memcpy(&sin.sin_addr, host->h_addr, host->h_length);
264
265 ret = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
266 } else {
267 struct sockaddr_in6 sin;
268
269 sin.sin6_family = host->h_addrtype;
270 sin.sin6_port = htons(port);
271 memcpy(&sin.sin6_addr, host->h_addr, host->h_length);
272
273 ret = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
274 }
275
276 if (-1 == ret) {
277 if (EINPROGRESS != errno) {
278 DPL_LOG(ctx, DPL_ERROR, "Connect to server %s failed: %s", ident,
279 strerror(errno));
280 safe_close(ctx, fd);
281 fd = -1;
282 goto end;
283 }
284 }
285
286 retry:
287 memset(&fds, 0, sizeof(fds));
288 fds.fd = fd;
289 fds.events = POLLOUT;
290
291 ret = poll(&fds, 1, ctx->conn_timeout * 1000);
292 if (-1 == ret) {
293 if (errno == EINTR) goto retry;
294 DPL_LOG(ctx, DPL_ERROR, "poll failed: %s", strerror(errno));
295 safe_close(ctx, fd);
296 fd = -1;
297 goto end;
298 }
299
300 if (0 == ret) {
301 DPL_LOG(ctx, DPL_ERROR,
302 "Timed out connecting to server %s after %d seconds", ident,
303 ctx->conn_timeout);
304 safe_close(ctx, fd);
305 fd = -1;
306 goto end;
307 } else if (!(fds.revents & POLLOUT)) {
308 DPL_LOG(ctx, DPL_ERROR, "poll returned strange results");
309 safe_close(ctx, fd);
310 fd = -1;
311 goto end;
312 }
313
314 on = 0;
315 ret = ioctl(fd, FIONBIO, &on);
316 if (-1 == ret) {
317 DPL_LOG(ctx, DPL_ERROR, "ioctl(FIONBIO) failed: %s", strerror(errno));
318 safe_close(ctx, fd);
319 fd = -1;
320 goto end;
321 }
322
323 /* errors from the async connect() are reported through the SO_ERROR sockopt
324 */
325
326 errorlen = sizeof(error);
327 error = 0;
328 ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &errorlen);
329 if (-1 == ret) {
330 DPL_LOG(ctx, DPL_ERROR, "getsockopt(SO_ERROR) failed: %s", strerror(errno));
331 safe_close(ctx, fd);
332 fd = -1;
333 goto end;
334 }
335
336 if (error != 0) {
337 DPL_LOG(ctx, DPL_ERROR, "Connect to server %s failed: %s", ident,
338 strerror(error));
339 safe_close(ctx, fd);
340 fd = -1;
341 goto end;
342 }
343
344 end:
345
346 DPL_TRACE(ctx, DPL_TRACE_CONN, "connect fd=%d", fd);
347
348 return fd;
349 }
350
init_ssl_conn(dpl_ctx_t * ctx,dpl_conn_t * conn)351 static int init_ssl_conn(dpl_ctx_t* ctx, dpl_conn_t* conn)
352 {
353 int ret;
354
355 conn->ssl = SSL_new(ctx->ssl_ctx);
356 if (conn->ssl == NULL) return 0;
357
358 conn->bio = BIO_new_socket(conn->fd, BIO_NOCLOSE);
359 if (conn->bio == NULL) return 0;
360
361 SSL_set_bio(conn->ssl, conn->bio, conn->bio);
362
363 ret = SSL_connect(conn->ssl);
364 if (ret <= 0) {
365 int ret_ssl = 0;
366
367 int ssl_err = SSL_get_error(conn->ssl, ret);
368 char buf[256];
369 ERR_error_string_n(ssl_err, buf, sizeof(buf));
370
371 DPL_SSL_PERROR(ctx, "SSL_connect");
372 DPL_LOG(ctx, DPL_ERROR, "SSL connect error: %d (%s)", ret, buf);
373
374 ret_ssl = SSL_get_verify_result(conn->ssl);
375 DPL_LOG(ctx, DPL_ERROR, "SSL certificate verification status: %ld: %s",
376 ret_ssl, X509_verify_cert_error_string(ret_ssl));
377 return 0;
378 }
379 if (0 == ctx->cert_verif) {
380 long ret_ssl = 0;
381 ret_ssl = SSL_get_verify_result(conn->ssl);
382 DPL_TRACE(ctx, DPL_TRACE_SSL,
383 "SSL certificate verification status: %ld: %s", ret_ssl,
384 X509_verify_cert_error_string(ret_ssl));
385 }
386 DPL_TRACE(ctx, DPL_TRACE_SSL, "SSL cipher used: %s",
387 SSL_get_cipher(conn->ssl));
388
389 return 1;
390 }
391
392 /*
393 * check an existing connection bound on (addr,port). if none is found
394 * a new connection is created.
395 */
396
conn_open(dpl_ctx_t * ctx,struct hostent * host,u_short port)397 static dpl_conn_t* conn_open(dpl_ctx_t* ctx, struct hostent* host, u_short port)
398 {
399 dpl_conn_t* conn = NULL;
400 time_t now = time(0);
401 char ident[DPL_ADDR_IDENT_STRLEN];
402
403 dpl_ctx_lock(ctx);
404
405 dpl_addr_get_ident(host, port, ident, sizeof(ident));
406 DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open %s", ident);
407
408 again:
409
410 conn = dpl_conn_get_nolock(ctx, host, port);
411
412 if (NULL != conn) {
413 if (0 == is_usable(conn)) {
414 dpl_conn_remove_nolock(ctx, conn);
415 dpl_conn_terminate_nolock(conn);
416 goto again;
417 }
418
419 dpl_conn_remove_nolock(ctx, conn);
420 if (conn->n_hits >= ctx->n_conn_max_hits
421 || (now - conn->close_time) >= ctx->conn_idle_time) {
422 DPRINTF("auto-close\n");
423 dpl_conn_terminate_nolock(conn);
424 conn = NULL;
425 } else {
426 // OK reuse
427 conn->n_hits++;
428 goto end;
429 }
430 }
431
432 if (ctx->n_conn_fds >= ctx->n_conn_max) {
433 DPL_TRACE(ctx, DPL_TRACE_ERR, "reaching limit %d", ctx->n_conn_fds);
434 conn = NULL;
435 goto end;
436 }
437
438 conn = malloc(sizeof(*conn));
439 if (NULL == conn) {
440 DPL_TRACE(ctx, DPL_TRACE_ERR, "malloc failed");
441 conn = NULL;
442 goto end;
443 }
444
445 DPL_TRACE(ctx, DPL_TRACE_CONN, "new_conn %s %p", ident, conn);
446
447 memset(conn, 0, sizeof(*conn));
448
449 conn->type = DPL_CONN_TYPE_HTTP;
450 conn->ctx = ctx;
451 conn->read_buf_size = ctx->read_buf_size;
452 conn->fd = -1;
453
454 if ((conn->read_buf = malloc(conn->read_buf_size)) == NULL) {
455 dpl_conn_free(conn);
456 conn = NULL;
457 goto end;
458 }
459
460 memcpy(&conn->hash_info.addr, host->h_addr_list[0], host->h_length);
461 conn->hash_info.port = port;
462
463 conn->fd = do_connect(ctx, host, port);
464 if (-1 == conn->fd) {
465 dpl_conn_free(conn);
466 conn = NULL;
467 goto end;
468 }
469
470 conn->start_time = now;
471 conn->n_hits = 0;
472
473 if (ctx->use_https) {
474 if (!init_ssl_conn(ctx, conn)) {
475 dpl_conn_free(conn);
476 conn = NULL;
477 goto end;
478 }
479 }
480
481 ctx->n_conn_fds++;
482
483 end:
484
485 dpl_ctx_unlock(ctx);
486
487 DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open conn=%p", conn);
488
489 return conn;
490 }
491
dpl_conn_open_host(dpl_ctx_t * ctx,int af,const char * host,const char * portstr)492 dpl_conn_t* dpl_conn_open_host(dpl_ctx_t* ctx,
493 int af,
494 const char* host,
495 const char* portstr)
496 {
497 int ret2;
498 struct hostent hret, *hresult;
499 char hbuf[1024];
500 int herr = 0;
501 u_short port;
502 dpl_conn_t* conn = NULL;
503 char* nstr;
504
505 ret2 = dpl_gethostbyname2_r(host, af, &hret, hbuf, sizeof(hbuf), &hresult,
506 &herr);
507 if (0 != ret2 || hresult == NULL) {
508 DPL_LOG(ctx, DPL_ERROR, "Failed to lookup hostname \"%s\": %s", host,
509 hstrerror(herr));
510 goto bad;
511 }
512
513 port = atoi(portstr);
514 conn = conn_open(ctx, hresult, port);
515 if (NULL == conn) {
516 DPL_TRACE(ctx, DPL_TRACE_ERR, "connect failed");
517 goto bad;
518 }
519
520 nstr = strdup(host);
521 if (NULL == nstr) goto bad;
522
523 if (NULL != conn->host) free(conn->host);
524
525 conn->host = nstr;
526
527 nstr = strdup(portstr);
528 if (NULL == nstr) goto bad;
529
530 if (NULL != conn->port) free(conn->port);
531
532 conn->port = nstr;
533
534 return conn;
535
536 bad:
537
538 if (NULL != conn) dpl_conn_release(conn);
539
540 return NULL;
541 }
542
dpl_blacklist_host(dpl_ctx_t * ctx,const char * host,const char * portstr)543 void dpl_blacklist_host(dpl_ctx_t* ctx, const char* host, const char* portstr)
544 {
545 DPL_TRACE(ctx, DPL_TRACE_CONN, "blacklisting %s:%s", host, portstr);
546
547 (void)dpl_addrlist_blacklist(ctx->addrlist, host, portstr,
548 ctx->blacklist_expiretime);
549 }
550
551 /**
552 * Get a connection from the context.
553 *
554 * Creates or re-uses a connection suitable for use with the request
555 * `req`. The calling thread is guaranteed exclusive use of the
556 * connection. If a recently released connection is suitable, it will be
557 * returned.
558 *
559 * If multiple hosts are specified in the `host` variable in
560 * the profile, connections will be distributed between those hosts in
561 * a round-robin manner. Any failure while connecting will cause the
562 * failing host to be blacklisted and the connection retried with
563 * another host; if no hosts remain, `DPL_FAILURE` is returned.
564 *
565 * On success, a pointer to a connection is returned in `*connp`. You
566 * should release the connection by calling either `dpl_conn_release()` or
567 * `dpl_conn_terminate()`. On error the value in `*connp` is unchanged.
568 *
569 * @param ctx the context from which to create a connection
570 * @param req the request for which this connection will be used
571 * @param[out] connp used to return the new connection
572 * @retval DPL_SUCCESS on success, or
573 * @return a Droplet error code on failure
574 */
dpl_try_connect(dpl_ctx_t * ctx,dpl_req_t * req,dpl_conn_t ** connp)575 dpl_status_t dpl_try_connect(dpl_ctx_t* ctx, dpl_req_t* req, dpl_conn_t** connp)
576 {
577 int cur_host;
578 dpl_addr_t* addr;
579 dpl_conn_t* conn = NULL;
580 dpl_status_t ret, ret2;
581 char virtual_host[1024], *hostp = NULL;
582
583 retry:
584 pthread_mutex_lock(&ctx->lock);
585
586 cur_host = ctx->cur_host;
587 ++ctx->cur_host;
588
589 pthread_mutex_unlock(&ctx->lock);
590
591 ret2 = dpl_addrlist_get_nth(ctx->addrlist, cur_host, &addr);
592 if (DPL_SUCCESS != ret2) {
593 DPL_TRACE(ctx, DPL_TRACE_CONN, "no more host to contact, giving up");
594 ret = DPL_FAILURE;
595 goto end;
596 }
597
598 if (req->behavior_flags & DPL_BEHAVIOR_VIRTUAL_HOSTING) {
599 snprintf(virtual_host, sizeof(virtual_host), "%s.%s", req->bucket,
600 addr->host);
601 hostp = virtual_host;
602 } else
603 hostp = addr->host;
604
605 conn = dpl_conn_open_host(ctx, addr->h->h_addrtype, hostp, addr->portstr);
606 if (NULL == conn) {
607 if (req->behavior_flags & DPL_BEHAVIOR_VIRTUAL_HOSTING) {
608 ret = DPL_FAILURE;
609 goto end;
610 } else {
611 dpl_blacklist_host(ctx, addr->host, addr->portstr);
612 goto retry;
613 }
614 }
615
616 ret2 = dpl_req_set_host(req, hostp);
617 if (DPL_SUCCESS != ret2) {
618 ret = ret2;
619 goto end;
620 }
621
622 ret2 = dpl_req_set_port(req, addr->portstr);
623 if (DPL_SUCCESS != ret2) {
624 ret = ret2;
625 goto end;
626 }
627
628 ret = DPL_SUCCESS;
629
630 if (NULL != connp) {
631 *connp = conn;
632 conn = NULL; // consumed
633 }
634
635 end:
636
637 if (NULL != conn) dpl_conn_terminate(conn);
638
639 return ret;
640 }
641
642 /**
643 * Release the connection after use.
644 *
645 * Releases a connection when you have finished using it. The
646 * connection cannot be used after calling `dpl_conn_release()`.
647 * Note that `dpl_conn_release()` may choose to keep the connection
648 * in an idle state for later re-use, i.e. the same connection
649 * may be returned from a future call to `dpl_try_connect()`.
650 * This means you should not call `dpl_conn_release()` if there has been
651 * an error condition on the connection; instead you should call
652 * `dpl_conn_terminate()`.
653 *
654 * @param conn connection to release
655 */
dpl_conn_release(dpl_conn_t * conn)656 void dpl_conn_release(dpl_conn_t* conn)
657 {
658 dpl_ctx_lock(conn->ctx);
659
660 if (conn->type == DPL_CONN_TYPE_FILE) {
661 if (-1 != conn->fd) close(conn->fd);
662 dpl_ctx_unlock(conn->ctx);
663 free(conn);
664 return;
665 }
666
667 DPL_TRACE(conn->ctx, DPL_TRACE_CONN, "conn_release conn=%p", conn);
668
669 conn->close_time = time(0);
670 dpl_conn_add_nolock(conn);
671
672 dpl_ctx_unlock(conn->ctx);
673 }
674
675 /**
676 * Release and immediately terminate a connection.
677 *
678 * Releases a connection when you have finished using it, with immediate
679 * destruction of the underlying network socket. Call this instead of
680 * `dpl_conn_release()` if you have encountered any error conditions on
681 * the connection.
682 *
683 * @param conn connection to release
684 */
dpl_conn_terminate(dpl_conn_t * conn)685 void dpl_conn_terminate(dpl_conn_t* conn)
686 {
687 dpl_ctx_t* ctx;
688
689 DPRINTF("explicit termination n_hits=%d\n", conn->n_hits);
690
691 ctx = conn->ctx;
692
693 dpl_ctx_lock(ctx);
694
695 assert(conn->type == DPL_CONN_TYPE_HTTP);
696 dpl_conn_terminate_nolock(conn);
697
698 dpl_ctx_unlock(ctx);
699 }
700
dpl_conn_pool_init(dpl_ctx_t * ctx)701 dpl_status_t dpl_conn_pool_init(dpl_ctx_t* ctx)
702 {
703 ctx->conn_buckets = malloc(ctx->n_conn_buckets * sizeof(dpl_conn_t*));
704 if (NULL == ctx->conn_buckets) return DPL_FAILURE;
705
706 memset(ctx->conn_buckets, 0, ctx->n_conn_buckets * sizeof(dpl_conn_t*));
707
708 return DPL_SUCCESS;
709 }
710
dpl_conn_pool_destroy(dpl_ctx_t * ctx)711 void dpl_conn_pool_destroy(dpl_ctx_t* ctx)
712 {
713 int bucket;
714 dpl_conn_t *conn, *prev;
715
716 if (NULL != ctx->conn_buckets) {
717 for (bucket = 0; bucket < ctx->n_conn_buckets; bucket++) {
718 for (conn = ctx->conn_buckets[bucket]; conn; conn = prev) {
719 prev = conn->prev;
720 dpl_conn_terminate_nolock(conn);
721 }
722 }
723
724 free(ctx->conn_buckets);
725 }
726 }
727
728 /*
729 * I/O
730 */
731
732 /*
733 * Write an IO vector to a connection with retry and timeout
734 *
735 * @note: modifies the iov in place
736 *
737 * @param timeout in secs or -1
738 */
writev_all_plaintext(dpl_conn_t * conn,struct iovec * iov,int n_iov,int timeout)739 static dpl_status_t writev_all_plaintext(dpl_conn_t* conn,
740 struct iovec* iov,
741 int n_iov,
742 int timeout)
743 {
744 ssize_t cc = 0;
745 int i, ret;
746
747 DPRINTF("writev n_iov=%d\n", n_iov);
748
749 while (1) {
750 if (-1 != timeout) {
751 struct pollfd fds;
752
753 retry:
754 memset(&fds, 0, sizeof(fds));
755 fds.fd = conn->fd;
756 fds.events = POLLOUT;
757
758 ret = poll(&fds, 1, timeout * 1000);
759 if (-1 == ret) {
760 if (errno == EINTR) goto retry;
761 return DPL_FAILURE;
762 }
763
764 if (0 == ret)
765 return DPL_ETIMEOUT;
766 else if (!(fds.revents & POLLOUT)) {
767 return DPL_FAILURE;
768 }
769 }
770
771 cc = writev(conn->fd, iov, n_iov);
772 if (-1 == cc) {
773 if (EINTR == errno) continue;
774
775 return DPL_FAILURE;
776 }
777
778 for (i = 0; i < n_iov; i++) {
779 if (iov[i].iov_len > cc) {
780 iov[i].iov_base = (char*)iov[i].iov_base + cc;
781 iov[i].iov_len -= cc;
782 break;
783 }
784 cc -= iov[i].iov_len;
785 iov[i].iov_len = 0;
786 }
787
788 if (n_iov == i) return DPL_SUCCESS;
789 }
790
791 return DPL_SUCCESS;
792 }
793
794 /*
795 * Write an IO vector to a connection via the SSL library with retry
796 *
797 */
writev_all_ssl(dpl_conn_t * conn,struct iovec * iov,int n_iov,int timeout)798 static dpl_status_t writev_all_ssl(dpl_conn_t* conn,
799 struct iovec* iov,
800 int n_iov,
801 int timeout)
802 {
803 int i, ret;
804 u_int amount_left, total_size, off;
805 char* ptr = NULL;
806
807 total_size = 0;
808
809 for (i = 0; i < n_iov; i++) total_size += iov[i].iov_len;
810 if (total_size == 0) return DPL_FAILURE;
811 ptr = malloc(total_size);
812 if (NULL == ptr) return DPL_FAILURE;
813
814 off = 0;
815 for (i = 0; i < n_iov; i++) {
816 memcpy(ptr + off, iov[i].iov_base, iov[i].iov_len);
817 off += iov[i].iov_len;
818 }
819
820 amount_left = total_size;
821
822 again:
823 ret = SSL_write(conn->ssl, &ptr[total_size - amount_left], amount_left);
824
825 if (ret > 0) {
826 amount_left -= ret;
827 if (amount_left > 0) goto again;
828 } else { // ret <= 0
829 int err = SSL_get_error(conn->ssl, ret);
830
831 if (SSL_ERROR_WANT_WRITE == err || SSL_ERROR_WANT_READ == err) goto again;
832
833 DPL_SSL_PERROR(conn->ctx, "SSL_write");
834 free(ptr);
835 return DPL_FAILURE;
836 }
837
838 free(ptr);
839 return DPL_SUCCESS;
840 }
841
842 /**
843 * Write an IO vector to the connection.
844 *
845 * Writes an IO vector to the connection. If the `use_https` variable
846 * in the profile is `true`, the data will be written via SSL. All the
847 * data is written, without partial writes. The `iov` structure may be
848 * modified. Note that `timeout` is not implemented for SSL, due to a
849 * limitation of the SSL library.
850 *
851 * @param conn the connection to write to
852 * @param iov IO vector which points to data to write
853 * @param length of the IO vector
854 * @param timeout per-write timeout in seconds or -1 for no timeout
855 * @retval DPL_SUCCESS on success, or
856 * @return a Droplet error code on failure
857 */
dpl_conn_writev_all(dpl_conn_t * conn,struct iovec * iov,int n_iov,int timeout)858 dpl_status_t dpl_conn_writev_all(dpl_conn_t* conn,
859 struct iovec* iov,
860 int n_iov,
861 int timeout)
862 {
863 dpl_status_t ret;
864
865 DPL_TRACE(conn->ctx, DPL_TRACE_IO, "writev conn=%p https=%d size=%ld", conn,
866 conn->ctx->use_https, dpl_iov_size(iov, n_iov));
867
868 if (conn->ctx->trace_buffers)
869 dpl_iov_dump(iov, n_iov, dpl_iov_size(iov, n_iov), conn->ctx->trace_binary);
870
871 if (0 == conn->ctx->use_https)
872 ret = writev_all_plaintext(conn, iov, n_iov, timeout);
873 else
874 ret = writev_all_ssl(conn, iov, n_iov, timeout);
875
876 if (DPL_SUCCESS != ret) {
877 // blacklist host
878 if (DPL_CONN_TYPE_HTTP == conn->type)
879 dpl_blacklist_host(conn->ctx, conn->host, conn->port);
880 }
881
882 return ret;
883 }
884
885 /**
886 * Create a connection to a local file
887 *
888 * This function is used by the `posix` backend to create a connection
889 * which reads from or writes to an open local file. Call `dpl_conn_release()`
890 * to release the connection when you have finished using it.
891 *
892 * @param ctx the context from which to create a connection
893 * @param fd an open file descriptor for a local file
894 * @return a new context, or NULL on failure
895 */
dpl_conn_open_file(dpl_ctx_t * ctx,int fd)896 dpl_conn_t* dpl_conn_open_file(dpl_ctx_t* ctx, int fd)
897 {
898 dpl_conn_t* conn = NULL;
899 time_t now = time(0);
900
901 DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open_file fd=%d", fd);
902
903 DPRINTF("allocate new conn\n");
904
905 conn = malloc(sizeof(*conn));
906 if (NULL == conn) {
907 DPL_TRACE(ctx, DPL_TRACE_ERR, "malloc failed");
908 conn = NULL;
909 goto end;
910 }
911
912 memset(conn, 0, sizeof(*conn));
913
914 conn->type = DPL_CONN_TYPE_FILE;
915 conn->ctx = ctx;
916 conn->read_buf_size = ctx->read_buf_size;
917 conn->fd = fd;
918
919 if ((conn->read_buf = malloc(conn->read_buf_size)) == NULL) {
920 dpl_conn_free(conn);
921 conn = NULL;
922 goto end;
923 }
924
925 conn->start_time = now;
926 conn->n_hits = 0;
927
928 end:
929
930 DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open conn=%p", conn);
931
932 return conn;
933 }
934
935 /* @} */
936