1 /*
2  * Copyright (C) 2020-2021 Bareos GmbH & Co. KG
3  * Copyright (C) 2010 SCALITY SA. All rights reserved.
4  * http://www.scality.com
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  *
13  * Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in the
15  * documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY SCALITY SA ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20  * DISCLAIMED. IN NO EVENT SHALL SCALITY SA OR CONTRIBUTORS BE LIABLE FOR
21  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
25  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and documentation
30  * are those of the authors and should not be interpreted as representing
31  * official policies, either expressed or implied, of SCALITY SA.
32  *
33  * https://github.com/scality/Droplet
34  */
35 #include "dropletp.h"
36 #include "sys/uio.h"
37 
38 /** @file */
39 
40 /* #define DPRINTF(fmt,...) fprintf(stderr, fmt, ##__VA_ARGS__) */
41 #define DPRINTF(fmt, ...)
42 
43 //#define DEBUG
44 
45 dpl_ctx_t* dpl_default_conn_ctx = NULL;
46 
conn_hashcode(const unsigned char * data,size_t len)47 static u_int conn_hashcode(const unsigned char* data, size_t len)
48 {
49   const unsigned char* p;
50   u_int h, g;
51   int i = 0;
52 
53   h = g = 0;
54 
55   for (p = data; i < len; p = p + 1, i++) {
56     h = (h << 4) + (*p);
57     if ((g = h & 0xf0000000)) {
58       h = h ^ (g >> 24);
59       h = h ^ g;
60     }
61   }
62   return h;
63 }
64 
dpl_conn_get_nolock(dpl_ctx_t * ctx,struct hostent * host,u_short port)65 static dpl_conn_t* dpl_conn_get_nolock(dpl_ctx_t* ctx,
66                                        struct hostent* host,
67                                        u_short port)
68 {
69   u_int bucket;
70   struct dpl_hash_info hash_info;
71   dpl_conn_t* conn;
72 
73   memset(&hash_info, 0, sizeof(hash_info));
74 
75   memcpy(&hash_info.addr, host->h_addr, host->h_length);
76   hash_info.port = port;
77 
78   bucket = conn_hashcode((unsigned char*)&hash_info, sizeof(hash_info))
79            % ctx->n_conn_buckets;
80 
81   for (conn = ctx->conn_buckets[bucket]; conn; conn = conn->prev) {
82     if (!memcmp(&conn->hash_info, &hash_info, sizeof(hash_info))) return conn;
83   }
84 
85   return NULL;
86 }
87 
is_usable(dpl_conn_t * conn)88 static int is_usable(dpl_conn_t* conn)
89 {
90   struct pollfd pfd;
91   int retval;
92 
93   memset(&pfd, 0, sizeof(struct pollfd));
94 
95   pfd.fd = conn->fd;
96 #ifdef POLLRDHUP
97   pfd.events = POLLIN | POLLPRI | POLLRDHUP;
98 #else
99   pfd.events = POLLIN | POLLPRI;
100 #endif
101 
102   retval = poll(&pfd, 1, 0);
103 
104   switch (retval) {
105     case 1: {
106       char buf[1];
107       int size;
108 
109       if (conn->ctx->use_https)
110         size = SSL_read(conn->ssl, buf, sizeof(buf));
111       else
112         size = recv(conn->fd, buf, sizeof(buf), MSG_DONTWAIT | MSG_PEEK);
113 
114       if (size == 0) {
115         DPRINTF("is_usable: rv %d returning False\n", retval);
116         return 0;
117       }
118     }
119       /* fall down */
120     case 0:
121       DPRINTF("is_usable: rv %d returning True\n", retval);
122       return 1;
123     default:
124       DPRINTF("is_usable: rv %d returning False\n", retval);
125       return 0;
126   }
127 
128   return 0; /* not reached */
129 }
130 
dpl_conn_add_nolock(dpl_conn_t * conn)131 static void dpl_conn_add_nolock(dpl_conn_t* conn)
132 {
133   u_int bucket;
134 
135   bucket
136       = conn_hashcode((unsigned char*)&conn->hash_info, sizeof(conn->hash_info))
137         % conn->ctx->n_conn_buckets;
138 
139   conn->next = NULL;
140   conn->prev = conn->ctx->conn_buckets[bucket];
141 
142   if (NULL != conn->ctx->conn_buckets[bucket])
143     conn->ctx->conn_buckets[bucket]->next = conn;
144 
145   conn->ctx->conn_buckets[bucket] = conn;
146 }
147 
dpl_conn_remove_nolock(dpl_ctx_t * ctx,dpl_conn_t * conn)148 static void dpl_conn_remove_nolock(dpl_ctx_t* ctx, dpl_conn_t* conn)
149 {
150   u_int bucket;
151 
152   bucket
153       = conn_hashcode((unsigned char*)&conn->hash_info, sizeof(conn->hash_info))
154         % ctx->n_conn_buckets;
155 
156   if (conn->prev) conn->prev->next = conn->next;
157 
158   if (conn->next) conn->next->prev = conn->prev;
159 
160   if (ctx->conn_buckets[bucket] == conn) ctx->conn_buckets[bucket] = conn->prev;
161 }
162 
safe_close(dpl_ctx_t * ctx,int fd)163 static void safe_close(dpl_ctx_t* ctx, int fd)
164 {
165   int ret;
166 
167   DPRINTF("closing fd=%d\n", fd);
168 
169   do {
170     ret = close(fd);
171   } while (ret == -1 && errno == EINTR);
172 
173   if (ret == -1)
174     DPL_TRACE(ctx, DPL_TRACE_WARN, "close failed: %s", strerror(errno));
175 }
176 
dpl_conn_free(dpl_conn_t * conn)177 static void dpl_conn_free(dpl_conn_t* conn)
178 {
179   if (NULL != conn->ssl) {
180     int ssl_ret;
181     char buf[256];
182     unsigned long ssl_err;
183     SSL_set_shutdown(conn->ssl, SSL_SENT_SHUTDOWN | SSL_RECEIVED_SHUTDOWN);
184     ssl_ret = SSL_shutdown(conn->ssl);
185     if (1 == ssl_ret) {
186       DPL_TRACE(conn->ctx, DPL_TRACE_WARN,
187                 "SSL shutdown was successfully completed");
188     } else if (0 == ssl_ret) {
189       ssl_err = SSL_get_error(conn->ssl, ssl_ret);
190       ERR_error_string_n(ssl_err, buf, sizeof buf);
191       DPL_TRACE(
192           conn->ctx, DPL_TRACE_WARN,
193           "SSL shutdown is not yet finished, calling for a second time: %s",
194           buf);
195       ssl_ret = SSL_shutdown(conn->ssl);
196       if (1 == ssl_ret) {
197         DPL_TRACE(conn->ctx, DPL_TRACE_WARN,
198                   "SSL shutdown was successfully completed");
199       } else {
200         DPL_TRACE(conn->ctx, DPL_TRACE_ERR,
201                   "SSL shutdown was not successfully completed");
202       }
203     } else if (0 > ssl_ret) {
204       ssl_err = SSL_get_error(conn->ssl, ssl_ret);
205       ERR_error_string_n(ssl_err, buf, sizeof buf);
206       DPL_TRACE(
207           conn->ctx, DPL_TRACE_WARN,
208           "SSL shutdown was not successful because a fatal error occurred: %s",
209           buf);
210     }
211     SSL_free(conn->ssl);
212   }
213 
214   if (-1 != conn->fd) safe_close(conn->ctx, conn->fd);
215 
216   if (NULL != conn->read_buf) free(conn->read_buf);
217 
218   if (NULL != conn->host) free(conn->host);
219 
220   if (NULL != conn->port) free(conn->port);
221 
222   free(conn);
223 }
224 
dpl_conn_terminate_nolock(dpl_conn_t * conn)225 static void dpl_conn_terminate_nolock(dpl_conn_t* conn)
226 {
227   DPL_TRACE(conn->ctx, DPL_TRACE_CONN, "conn_terminate conn=%p", conn);
228 
229   conn->ctx->n_conn_fds--;
230   dpl_conn_free(conn);
231 }
232 
do_connect(dpl_ctx_t * ctx,struct hostent * host,u_short port)233 static int do_connect(dpl_ctx_t* ctx, struct hostent* host, u_short port)
234 {
235   int fd = -1, ret, on, error;
236   struct pollfd fds;
237   socklen_t errorlen;
238   char ident[DPL_ADDR_IDENT_STRLEN];
239 
240   fd = socket(host->h_addrtype, SOCK_STREAM, 0);
241   if (fd == -1) {
242     DPL_TRACE(ctx, DPL_TRACE_ERR, "socket failed");
243     goto end;
244   }
245 
246   on = 1;
247   ret = ioctl(fd, FIONBIO, &on);
248   if (-1 == ret) {
249     DPL_LOG(ctx, DPL_ERROR, "ioctl(FIONBIO) failed: %s", strerror(errno));
250     safe_close(ctx, fd);
251     fd = -1;
252     goto end;
253   }
254 
255   dpl_addr_get_ident(host, port, ident, sizeof(ident));
256   DPL_TRACE(ctx, DPL_TRACE_CONN, "connect %s", ident);
257 
258   if (host->h_addrtype == AF_INET) {
259     struct sockaddr_in sin;
260 
261     sin.sin_family = host->h_addrtype;
262     sin.sin_port = htons(port);
263     memcpy(&sin.sin_addr, host->h_addr, host->h_length);
264 
265     ret = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
266   } else {
267     struct sockaddr_in6 sin;
268 
269     sin.sin6_family = host->h_addrtype;
270     sin.sin6_port = htons(port);
271     memcpy(&sin.sin6_addr, host->h_addr, host->h_length);
272 
273     ret = connect(fd, (struct sockaddr*)&sin, sizeof(sin));
274   }
275 
276   if (-1 == ret) {
277     if (EINPROGRESS != errno) {
278       DPL_LOG(ctx, DPL_ERROR, "Connect to server %s failed: %s", ident,
279               strerror(errno));
280       safe_close(ctx, fd);
281       fd = -1;
282       goto end;
283     }
284   }
285 
286 retry:
287   memset(&fds, 0, sizeof(fds));
288   fds.fd = fd;
289   fds.events = POLLOUT;
290 
291   ret = poll(&fds, 1, ctx->conn_timeout * 1000);
292   if (-1 == ret) {
293     if (errno == EINTR) goto retry;
294     DPL_LOG(ctx, DPL_ERROR, "poll failed: %s", strerror(errno));
295     safe_close(ctx, fd);
296     fd = -1;
297     goto end;
298   }
299 
300   if (0 == ret) {
301     DPL_LOG(ctx, DPL_ERROR,
302             "Timed out connecting to server %s after %d seconds", ident,
303             ctx->conn_timeout);
304     safe_close(ctx, fd);
305     fd = -1;
306     goto end;
307   } else if (!(fds.revents & POLLOUT)) {
308     DPL_LOG(ctx, DPL_ERROR, "poll returned strange results");
309     safe_close(ctx, fd);
310     fd = -1;
311     goto end;
312   }
313 
314   on = 0;
315   ret = ioctl(fd, FIONBIO, &on);
316   if (-1 == ret) {
317     DPL_LOG(ctx, DPL_ERROR, "ioctl(FIONBIO) failed: %s", strerror(errno));
318     safe_close(ctx, fd);
319     fd = -1;
320     goto end;
321   }
322 
323   /* errors from the async connect() are reported through the SO_ERROR sockopt
324    */
325 
326   errorlen = sizeof(error);
327   error = 0;
328   ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &errorlen);
329   if (-1 == ret) {
330     DPL_LOG(ctx, DPL_ERROR, "getsockopt(SO_ERROR) failed: %s", strerror(errno));
331     safe_close(ctx, fd);
332     fd = -1;
333     goto end;
334   }
335 
336   if (error != 0) {
337     DPL_LOG(ctx, DPL_ERROR, "Connect to server %s failed: %s", ident,
338             strerror(error));
339     safe_close(ctx, fd);
340     fd = -1;
341     goto end;
342   }
343 
344 end:
345 
346   DPL_TRACE(ctx, DPL_TRACE_CONN, "connect fd=%d", fd);
347 
348   return fd;
349 }
350 
init_ssl_conn(dpl_ctx_t * ctx,dpl_conn_t * conn)351 static int init_ssl_conn(dpl_ctx_t* ctx, dpl_conn_t* conn)
352 {
353   int ret;
354 
355   conn->ssl = SSL_new(ctx->ssl_ctx);
356   if (conn->ssl == NULL) return 0;
357 
358   conn->bio = BIO_new_socket(conn->fd, BIO_NOCLOSE);
359   if (conn->bio == NULL) return 0;
360 
361   SSL_set_bio(conn->ssl, conn->bio, conn->bio);
362 
363   ret = SSL_connect(conn->ssl);
364   if (ret <= 0) {
365     int ret_ssl = 0;
366 
367     int ssl_err = SSL_get_error(conn->ssl, ret);
368     char buf[256];
369     ERR_error_string_n(ssl_err, buf, sizeof(buf));
370 
371     DPL_SSL_PERROR(ctx, "SSL_connect");
372     DPL_LOG(ctx, DPL_ERROR, "SSL connect error: %d (%s)", ret, buf);
373 
374     ret_ssl = SSL_get_verify_result(conn->ssl);
375     DPL_LOG(ctx, DPL_ERROR, "SSL certificate verification status: %ld: %s",
376             ret_ssl, X509_verify_cert_error_string(ret_ssl));
377     return 0;
378   }
379   if (0 == ctx->cert_verif) {
380     long ret_ssl = 0;
381     ret_ssl = SSL_get_verify_result(conn->ssl);
382     DPL_TRACE(ctx, DPL_TRACE_SSL,
383               "SSL certificate verification status: %ld: %s", ret_ssl,
384               X509_verify_cert_error_string(ret_ssl));
385   }
386   DPL_TRACE(ctx, DPL_TRACE_SSL, "SSL cipher used: %s",
387             SSL_get_cipher(conn->ssl));
388 
389   return 1;
390 }
391 
392 /*
393  * check an existing connection bound on (addr,port). if none is found
394  * a new connection is created.
395  */
396 
conn_open(dpl_ctx_t * ctx,struct hostent * host,u_short port)397 static dpl_conn_t* conn_open(dpl_ctx_t* ctx, struct hostent* host, u_short port)
398 {
399   dpl_conn_t* conn = NULL;
400   time_t now = time(0);
401   char ident[DPL_ADDR_IDENT_STRLEN];
402 
403   dpl_ctx_lock(ctx);
404 
405   dpl_addr_get_ident(host, port, ident, sizeof(ident));
406   DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open %s", ident);
407 
408 again:
409 
410   conn = dpl_conn_get_nolock(ctx, host, port);
411 
412   if (NULL != conn) {
413     if (0 == is_usable(conn)) {
414       dpl_conn_remove_nolock(ctx, conn);
415       dpl_conn_terminate_nolock(conn);
416       goto again;
417     }
418 
419     dpl_conn_remove_nolock(ctx, conn);
420     if (conn->n_hits >= ctx->n_conn_max_hits
421         || (now - conn->close_time) >= ctx->conn_idle_time) {
422       DPRINTF("auto-close\n");
423       dpl_conn_terminate_nolock(conn);
424       conn = NULL;
425     } else {
426       // OK reuse
427       conn->n_hits++;
428       goto end;
429     }
430   }
431 
432   if (ctx->n_conn_fds >= ctx->n_conn_max) {
433     DPL_TRACE(ctx, DPL_TRACE_ERR, "reaching limit %d", ctx->n_conn_fds);
434     conn = NULL;
435     goto end;
436   }
437 
438   conn = malloc(sizeof(*conn));
439   if (NULL == conn) {
440     DPL_TRACE(ctx, DPL_TRACE_ERR, "malloc failed");
441     conn = NULL;
442     goto end;
443   }
444 
445   DPL_TRACE(ctx, DPL_TRACE_CONN, "new_conn %s %p", ident, conn);
446 
447   memset(conn, 0, sizeof(*conn));
448 
449   conn->type = DPL_CONN_TYPE_HTTP;
450   conn->ctx = ctx;
451   conn->read_buf_size = ctx->read_buf_size;
452   conn->fd = -1;
453 
454   if ((conn->read_buf = malloc(conn->read_buf_size)) == NULL) {
455     dpl_conn_free(conn);
456     conn = NULL;
457     goto end;
458   }
459 
460   memcpy(&conn->hash_info.addr, host->h_addr_list[0], host->h_length);
461   conn->hash_info.port = port;
462 
463   conn->fd = do_connect(ctx, host, port);
464   if (-1 == conn->fd) {
465     dpl_conn_free(conn);
466     conn = NULL;
467     goto end;
468   }
469 
470   conn->start_time = now;
471   conn->n_hits = 0;
472 
473   if (ctx->use_https) {
474     if (!init_ssl_conn(ctx, conn)) {
475       dpl_conn_free(conn);
476       conn = NULL;
477       goto end;
478     }
479   }
480 
481   ctx->n_conn_fds++;
482 
483 end:
484 
485   dpl_ctx_unlock(ctx);
486 
487   DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open conn=%p", conn);
488 
489   return conn;
490 }
491 
dpl_conn_open_host(dpl_ctx_t * ctx,int af,const char * host,const char * portstr)492 dpl_conn_t* dpl_conn_open_host(dpl_ctx_t* ctx,
493                                int af,
494                                const char* host,
495                                const char* portstr)
496 {
497   int ret2;
498   struct hostent hret, *hresult;
499   char hbuf[1024];
500   int herr = 0;
501   u_short port;
502   dpl_conn_t* conn = NULL;
503   char* nstr;
504 
505   ret2 = dpl_gethostbyname2_r(host, af, &hret, hbuf, sizeof(hbuf), &hresult,
506                               &herr);
507   if (0 != ret2 || hresult == NULL) {
508     DPL_LOG(ctx, DPL_ERROR, "Failed to lookup hostname \"%s\": %s", host,
509             hstrerror(herr));
510     goto bad;
511   }
512 
513   port = atoi(portstr);
514   conn = conn_open(ctx, hresult, port);
515   if (NULL == conn) {
516     DPL_TRACE(ctx, DPL_TRACE_ERR, "connect failed");
517     goto bad;
518   }
519 
520   nstr = strdup(host);
521   if (NULL == nstr) goto bad;
522 
523   if (NULL != conn->host) free(conn->host);
524 
525   conn->host = nstr;
526 
527   nstr = strdup(portstr);
528   if (NULL == nstr) goto bad;
529 
530   if (NULL != conn->port) free(conn->port);
531 
532   conn->port = nstr;
533 
534   return conn;
535 
536 bad:
537 
538   if (NULL != conn) dpl_conn_release(conn);
539 
540   return NULL;
541 }
542 
dpl_blacklist_host(dpl_ctx_t * ctx,const char * host,const char * portstr)543 void dpl_blacklist_host(dpl_ctx_t* ctx, const char* host, const char* portstr)
544 {
545   DPL_TRACE(ctx, DPL_TRACE_CONN, "blacklisting %s:%s", host, portstr);
546 
547   (void)dpl_addrlist_blacklist(ctx->addrlist, host, portstr,
548                                ctx->blacklist_expiretime);
549 }
550 
551 /**
552  * Get a connection from the context.
553  *
554  * Creates or re-uses a connection suitable for use with the request
555  * `req`.  The calling thread is guaranteed exclusive use of the
556  * connection.  If a recently released connection is suitable, it will be
557  * returned.
558  *
559  * If multiple hosts are specified in the `host` variable in
560  * the profile, connections will be distributed between those hosts in
561  * a round-robin manner.  Any failure while connecting will cause the
562  * failing host to be blacklisted and the connection retried with
563  * another host; if no hosts remain, `DPL_FAILURE` is returned.
564  *
565  * On success, a pointer to a connection is returned in `*connp`.  You
566  * should release the connection by calling either `dpl_conn_release()` or
567  * `dpl_conn_terminate()`.  On error the value in `*connp` is unchanged.
568  *
569  * @param ctx the context from which to create a connection
570  * @param req the request for which this connection will be used
571  * @param[out] connp used to return the new connection
572  * @retval DPL_SUCCESS on success, or
573  * @return a Droplet error code on failure
574  */
dpl_try_connect(dpl_ctx_t * ctx,dpl_req_t * req,dpl_conn_t ** connp)575 dpl_status_t dpl_try_connect(dpl_ctx_t* ctx, dpl_req_t* req, dpl_conn_t** connp)
576 {
577   int cur_host;
578   dpl_addr_t* addr;
579   dpl_conn_t* conn = NULL;
580   dpl_status_t ret, ret2;
581   char virtual_host[1024], *hostp = NULL;
582 
583 retry:
584   pthread_mutex_lock(&ctx->lock);
585 
586   cur_host = ctx->cur_host;
587   ++ctx->cur_host;
588 
589   pthread_mutex_unlock(&ctx->lock);
590 
591   ret2 = dpl_addrlist_get_nth(ctx->addrlist, cur_host, &addr);
592   if (DPL_SUCCESS != ret2) {
593     DPL_TRACE(ctx, DPL_TRACE_CONN, "no more host to contact, giving up");
594     ret = DPL_FAILURE;
595     goto end;
596   }
597 
598   if (req->behavior_flags & DPL_BEHAVIOR_VIRTUAL_HOSTING) {
599     snprintf(virtual_host, sizeof(virtual_host), "%s.%s", req->bucket,
600              addr->host);
601     hostp = virtual_host;
602   } else
603     hostp = addr->host;
604 
605   conn = dpl_conn_open_host(ctx, addr->h->h_addrtype, hostp, addr->portstr);
606   if (NULL == conn) {
607     if (req->behavior_flags & DPL_BEHAVIOR_VIRTUAL_HOSTING) {
608       ret = DPL_FAILURE;
609       goto end;
610     } else {
611       dpl_blacklist_host(ctx, addr->host, addr->portstr);
612       goto retry;
613     }
614   }
615 
616   ret2 = dpl_req_set_host(req, hostp);
617   if (DPL_SUCCESS != ret2) {
618     ret = ret2;
619     goto end;
620   }
621 
622   ret2 = dpl_req_set_port(req, addr->portstr);
623   if (DPL_SUCCESS != ret2) {
624     ret = ret2;
625     goto end;
626   }
627 
628   ret = DPL_SUCCESS;
629 
630   if (NULL != connp) {
631     *connp = conn;
632     conn = NULL;  // consumed
633   }
634 
635 end:
636 
637   if (NULL != conn) dpl_conn_terminate(conn);
638 
639   return ret;
640 }
641 
642 /**
643  * Release the connection after use.
644  *
645  * Releases a connection when you have finished using it.  The
646  * connection cannot be used after calling `dpl_conn_release()`.
647  * Note that `dpl_conn_release()` may choose to keep the connection
648  * in an idle state for later re-use, i.e. the same connection
649  * may be returned from a future call to `dpl_try_connect()`.
650  * This means you should not call `dpl_conn_release()` if there has been
651  * an error condition on the connection; instead you should call
652  * `dpl_conn_terminate()`.
653  *
654  * @param conn connection to release
655  */
dpl_conn_release(dpl_conn_t * conn)656 void dpl_conn_release(dpl_conn_t* conn)
657 {
658   dpl_ctx_lock(conn->ctx);
659 
660   if (conn->type == DPL_CONN_TYPE_FILE) {
661     if (-1 != conn->fd) close(conn->fd);
662     dpl_ctx_unlock(conn->ctx);
663     free(conn);
664     return;
665   }
666 
667   DPL_TRACE(conn->ctx, DPL_TRACE_CONN, "conn_release conn=%p", conn);
668 
669   conn->close_time = time(0);
670   dpl_conn_add_nolock(conn);
671 
672   dpl_ctx_unlock(conn->ctx);
673 }
674 
675 /**
676  * Release and immediately terminate a connection.
677  *
678  * Releases a connection when you have finished using it, with immediate
679  * destruction of the underlying network socket.  Call this instead of
680  * `dpl_conn_release()` if you have encountered any error conditions on
681  * the connection.
682  *
683  * @param conn connection to release
684  */
dpl_conn_terminate(dpl_conn_t * conn)685 void dpl_conn_terminate(dpl_conn_t* conn)
686 {
687   dpl_ctx_t* ctx;
688 
689   DPRINTF("explicit termination n_hits=%d\n", conn->n_hits);
690 
691   ctx = conn->ctx;
692 
693   dpl_ctx_lock(ctx);
694 
695   assert(conn->type == DPL_CONN_TYPE_HTTP);
696   dpl_conn_terminate_nolock(conn);
697 
698   dpl_ctx_unlock(ctx);
699 }
700 
dpl_conn_pool_init(dpl_ctx_t * ctx)701 dpl_status_t dpl_conn_pool_init(dpl_ctx_t* ctx)
702 {
703   ctx->conn_buckets = malloc(ctx->n_conn_buckets * sizeof(dpl_conn_t*));
704   if (NULL == ctx->conn_buckets) return DPL_FAILURE;
705 
706   memset(ctx->conn_buckets, 0, ctx->n_conn_buckets * sizeof(dpl_conn_t*));
707 
708   return DPL_SUCCESS;
709 }
710 
dpl_conn_pool_destroy(dpl_ctx_t * ctx)711 void dpl_conn_pool_destroy(dpl_ctx_t* ctx)
712 {
713   int bucket;
714   dpl_conn_t *conn, *prev;
715 
716   if (NULL != ctx->conn_buckets) {
717     for (bucket = 0; bucket < ctx->n_conn_buckets; bucket++) {
718       for (conn = ctx->conn_buckets[bucket]; conn; conn = prev) {
719         prev = conn->prev;
720         dpl_conn_terminate_nolock(conn);
721       }
722     }
723 
724     free(ctx->conn_buckets);
725   }
726 }
727 
728 /*
729  * I/O
730  */
731 
732 /*
733  * Write an IO vector to a connection with retry and timeout
734  *
735  * @note: modifies the iov in place
736  *
737  * @param timeout in secs or -1
738  */
writev_all_plaintext(dpl_conn_t * conn,struct iovec * iov,int n_iov,int timeout)739 static dpl_status_t writev_all_plaintext(dpl_conn_t* conn,
740                                          struct iovec* iov,
741                                          int n_iov,
742                                          int timeout)
743 {
744   ssize_t cc = 0;
745   int i, ret;
746 
747   DPRINTF("writev n_iov=%d\n", n_iov);
748 
749   while (1) {
750     if (-1 != timeout) {
751       struct pollfd fds;
752 
753     retry:
754       memset(&fds, 0, sizeof(fds));
755       fds.fd = conn->fd;
756       fds.events = POLLOUT;
757 
758       ret = poll(&fds, 1, timeout * 1000);
759       if (-1 == ret) {
760         if (errno == EINTR) goto retry;
761         return DPL_FAILURE;
762       }
763 
764       if (0 == ret)
765         return DPL_ETIMEOUT;
766       else if (!(fds.revents & POLLOUT)) {
767         return DPL_FAILURE;
768       }
769     }
770 
771     cc = writev(conn->fd, iov, n_iov);
772     if (-1 == cc) {
773       if (EINTR == errno) continue;
774 
775       return DPL_FAILURE;
776     }
777 
778     for (i = 0; i < n_iov; i++) {
779       if (iov[i].iov_len > cc) {
780         iov[i].iov_base = (char*)iov[i].iov_base + cc;
781         iov[i].iov_len -= cc;
782         break;
783       }
784       cc -= iov[i].iov_len;
785       iov[i].iov_len = 0;
786     }
787 
788     if (n_iov == i) return DPL_SUCCESS;
789   }
790 
791   return DPL_SUCCESS;
792 }
793 
794 /*
795  * Write an IO vector to a connection via the SSL library with retry
796  *
797  */
writev_all_ssl(dpl_conn_t * conn,struct iovec * iov,int n_iov,int timeout)798 static dpl_status_t writev_all_ssl(dpl_conn_t* conn,
799                                    struct iovec* iov,
800                                    int n_iov,
801                                    int timeout)
802 {
803   int i, ret;
804   u_int amount_left, total_size, off;
805   char* ptr = NULL;
806 
807   total_size = 0;
808 
809   for (i = 0; i < n_iov; i++) total_size += iov[i].iov_len;
810   if (total_size == 0) return DPL_FAILURE;
811   ptr = malloc(total_size);
812   if (NULL == ptr) return DPL_FAILURE;
813 
814   off = 0;
815   for (i = 0; i < n_iov; i++) {
816     memcpy(ptr + off, iov[i].iov_base, iov[i].iov_len);
817     off += iov[i].iov_len;
818   }
819 
820   amount_left = total_size;
821 
822 again:
823   ret = SSL_write(conn->ssl, &ptr[total_size - amount_left], amount_left);
824 
825   if (ret > 0) {
826     amount_left -= ret;
827     if (amount_left > 0) goto again;
828   } else {  // ret <= 0
829     int err = SSL_get_error(conn->ssl, ret);
830 
831     if (SSL_ERROR_WANT_WRITE == err || SSL_ERROR_WANT_READ == err) goto again;
832 
833     DPL_SSL_PERROR(conn->ctx, "SSL_write");
834     free(ptr);
835     return DPL_FAILURE;
836   }
837 
838   free(ptr);
839   return DPL_SUCCESS;
840 }
841 
842 /**
843  * Write an IO vector to the connection.
844  *
845  * Writes an IO vector to the connection.  If the `use_https` variable
846  * in the profile is `true`, the data will be written via SSL.  All the
847  * data is written, without partial writes.  The `iov` structure may be
848  * modified.  Note that `timeout` is not implemented for SSL, due to a
849  * limitation of  the SSL library.
850  *
851  * @param conn the connection to write to
852  * @param iov IO vector which points to data to write
853  * @param length of the IO vector
854  * @param timeout per-write timeout in seconds or -1 for no timeout
855  * @retval DPL_SUCCESS on success, or
856  * @return a Droplet error code on failure
857  */
dpl_conn_writev_all(dpl_conn_t * conn,struct iovec * iov,int n_iov,int timeout)858 dpl_status_t dpl_conn_writev_all(dpl_conn_t* conn,
859                                  struct iovec* iov,
860                                  int n_iov,
861                                  int timeout)
862 {
863   dpl_status_t ret;
864 
865   DPL_TRACE(conn->ctx, DPL_TRACE_IO, "writev conn=%p https=%d size=%ld", conn,
866             conn->ctx->use_https, dpl_iov_size(iov, n_iov));
867 
868   if (conn->ctx->trace_buffers)
869     dpl_iov_dump(iov, n_iov, dpl_iov_size(iov, n_iov), conn->ctx->trace_binary);
870 
871   if (0 == conn->ctx->use_https)
872     ret = writev_all_plaintext(conn, iov, n_iov, timeout);
873   else
874     ret = writev_all_ssl(conn, iov, n_iov, timeout);
875 
876   if (DPL_SUCCESS != ret) {
877     // blacklist host
878     if (DPL_CONN_TYPE_HTTP == conn->type)
879       dpl_blacklist_host(conn->ctx, conn->host, conn->port);
880   }
881 
882   return ret;
883 }
884 
885 /**
886  * Create a connection to a local file
887  *
888  * This function is used by the `posix` backend to create a connection
889  * which reads from or writes to an open local file.  Call `dpl_conn_release()`
890  * to release the connection when you have finished using it.
891  *
892  * @param ctx the context from which to create a connection
893  * @param fd an open file descriptor for a local file
894  * @return a new context, or NULL on failure
895  */
dpl_conn_open_file(dpl_ctx_t * ctx,int fd)896 dpl_conn_t* dpl_conn_open_file(dpl_ctx_t* ctx, int fd)
897 {
898   dpl_conn_t* conn = NULL;
899   time_t now = time(0);
900 
901   DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open_file fd=%d", fd);
902 
903   DPRINTF("allocate new conn\n");
904 
905   conn = malloc(sizeof(*conn));
906   if (NULL == conn) {
907     DPL_TRACE(ctx, DPL_TRACE_ERR, "malloc failed");
908     conn = NULL;
909     goto end;
910   }
911 
912   memset(conn, 0, sizeof(*conn));
913 
914   conn->type = DPL_CONN_TYPE_FILE;
915   conn->ctx = ctx;
916   conn->read_buf_size = ctx->read_buf_size;
917   conn->fd = fd;
918 
919   if ((conn->read_buf = malloc(conn->read_buf_size)) == NULL) {
920     dpl_conn_free(conn);
921     conn = NULL;
922     goto end;
923   }
924 
925   conn->start_time = now;
926   conn->n_hits = 0;
927 
928 end:
929 
930   DPL_TRACE(ctx, DPL_TRACE_CONN, "conn_open conn=%p", conn);
931 
932   return conn;
933 }
934 
935 /* @} */
936