1 /*
2  * twemproxy - A fast and lightweight proxy for memcached protocol.
3  * Copyright (C) 2011 Twitter, Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #include <stdlib.h>
19 #include <unistd.h>
20 
21 #include <nc_core.h>
22 #include <nc_server.h>
23 #include <nc_conf.h>
24 
25 static void
server_resolve(struct server * server,struct conn * conn)26 server_resolve(struct server *server, struct conn *conn)
27 {
28     rstatus_t status;
29 
30     status = nc_resolve(&server->addrstr, server->port, &server->info);
31     if (status != NC_OK) {
32         conn->err = EHOSTDOWN;
33         conn->done = 1;
34         return;
35     }
36 
37     conn->family = server->info.family;
38     conn->addrlen = server->info.addrlen;
39     conn->addr = (struct sockaddr *)&server->info.addr;
40 }
41 
42 void
server_ref(struct conn * conn,void * owner)43 server_ref(struct conn *conn, void *owner)
44 {
45     struct server *server = owner;
46 
47     ASSERT(!conn->client && !conn->proxy);
48     ASSERT(conn->owner == NULL);
49 
50     server_resolve(server, conn);
51 
52     server->ns_conn_q++;
53     TAILQ_INSERT_TAIL(&server->s_conn_q, conn, conn_tqe);
54 
55     conn->owner = owner;
56 
57     log_debug(LOG_VVERB, "ref conn %p owner %p into '%.*s", conn, server,
58               server->pname.len, server->pname.data);
59 }
60 
61 void
server_unref(struct conn * conn)62 server_unref(struct conn *conn)
63 {
64     struct server *server;
65 
66     ASSERT(!conn->client && !conn->proxy);
67     ASSERT(conn->owner != NULL);
68 
69     server = conn->owner;
70     conn->owner = NULL;
71 
72     ASSERT(server->ns_conn_q != 0);
73     server->ns_conn_q--;
74     TAILQ_REMOVE(&server->s_conn_q, conn, conn_tqe);
75 
76     log_debug(LOG_VVERB, "unref conn %p owner %p from '%.*s'", conn, server,
77               server->pname.len, server->pname.data);
78 }
79 
80 int
server_timeout(struct conn * conn)81 server_timeout(struct conn *conn)
82 {
83     struct server *server;
84     struct server_pool *pool;
85 
86     ASSERT(!conn->client && !conn->proxy);
87 
88     server = conn->owner;
89     pool = server->owner;
90 
91     return pool->timeout;
92 }
93 
94 bool
server_active(const struct conn * conn)95 server_active(const struct conn *conn)
96 {
97     ASSERT(!conn->client && !conn->proxy);
98 
99     if (!TAILQ_EMPTY(&conn->imsg_q)) {
100         log_debug(LOG_VVERB, "s %d is active", conn->sd);
101         return true;
102     }
103 
104     if (!TAILQ_EMPTY(&conn->omsg_q)) {
105         log_debug(LOG_VVERB, "s %d is active", conn->sd);
106         return true;
107     }
108 
109     if (conn->rmsg != NULL) {
110         log_debug(LOG_VVERB, "s %d is active", conn->sd);
111         return true;
112     }
113 
114     if (conn->smsg != NULL) {
115         log_debug(LOG_VVERB, "s %d is active", conn->sd);
116         return true;
117     }
118 
119     log_debug(LOG_VVERB, "s %d is inactive", conn->sd);
120 
121     return false;
122 }
123 
124 static rstatus_t
server_each_set_owner(void * elem,void * data)125 server_each_set_owner(void *elem, void *data)
126 {
127     struct server *s = elem;
128     struct server_pool *sp = data;
129 
130     s->owner = sp;
131 
132     return NC_OK;
133 }
134 
135 rstatus_t
server_init(struct array * server,struct array * conf_server,struct server_pool * sp)136 server_init(struct array *server, struct array *conf_server,
137             struct server_pool *sp)
138 {
139     rstatus_t status;
140     uint32_t nserver;
141 
142     nserver = array_n(conf_server);
143     ASSERT(nserver != 0);
144     ASSERT(array_n(server) == 0);
145 
146     status = array_init(server, nserver, sizeof(struct server));
147     if (status != NC_OK) {
148         return status;
149     }
150 
151     /* transform conf server to server */
152     status = array_each(conf_server, conf_server_each_transform, server);
153     if (status != NC_OK) {
154         server_deinit(server);
155         return status;
156     }
157     ASSERT(array_n(server) == nserver);
158 
159     /* set server owner */
160     status = array_each(server, server_each_set_owner, sp);
161     if (status != NC_OK) {
162         server_deinit(server);
163         return status;
164     }
165 
166     log_debug(LOG_DEBUG, "init %"PRIu32" servers in pool %"PRIu32" '%.*s'",
167               nserver, sp->idx, sp->name.len, sp->name.data);
168 
169     return NC_OK;
170 }
171 
172 void
server_deinit(struct array * server)173 server_deinit(struct array *server)
174 {
175     uint32_t i, nserver;
176 
177     for (i = 0, nserver = array_n(server); i < nserver; i++) {
178         struct server *s;
179 
180         s = array_pop(server);
181         ASSERT(TAILQ_EMPTY(&s->s_conn_q) && s->ns_conn_q == 0);
182     }
183     array_deinit(server);
184 }
185 
186 struct conn *
server_conn(struct server * server)187 server_conn(struct server *server)
188 {
189     struct server_pool *pool;
190     struct conn *conn;
191 
192     pool = server->owner;
193 
194     /*
195      * FIXME: handle multiple server connections per server and do load
196      * balancing on it. Support multiple algorithms for
197      * 'server_connections:' > 0 key
198      */
199 
200     if (server->ns_conn_q < pool->server_connections) {
201         return conn_get(server, false, pool->redis);
202     }
203     ASSERT(server->ns_conn_q == pool->server_connections);
204 
205     /*
206      * Pick a server connection from the head of the queue and insert
207      * it back into the tail of queue to maintain the lru order
208      */
209     conn = TAILQ_FIRST(&server->s_conn_q);
210     ASSERT(!conn->client && !conn->proxy);
211 
212     TAILQ_REMOVE(&server->s_conn_q, conn, conn_tqe);
213     TAILQ_INSERT_TAIL(&server->s_conn_q, conn, conn_tqe);
214 
215     return conn;
216 }
217 
218 static rstatus_t
server_each_preconnect(void * elem,void * data)219 server_each_preconnect(void *elem, void *data)
220 {
221     rstatus_t status;
222     struct server *server;
223     struct server_pool *pool;
224     struct conn *conn;
225 
226     server = elem;
227     pool = server->owner;
228 
229     conn = server_conn(server);
230     if (conn == NULL) {
231         return NC_ENOMEM;
232     }
233 
234     status = server_connect(pool->ctx, server, conn);
235     if (status != NC_OK) {
236         log_warn("connect to server '%.*s' failed, ignored: %s",
237                  server->pname.len, server->pname.data, strerror(errno));
238         server_close(pool->ctx, conn);
239     }
240 
241     return NC_OK;
242 }
243 
244 static rstatus_t
server_each_disconnect(void * elem,void * data)245 server_each_disconnect(void *elem, void *data)
246 {
247     struct server *server;
248     struct server_pool *pool;
249 
250     server = elem;
251     pool = server->owner;
252 
253     while (!TAILQ_EMPTY(&server->s_conn_q)) {
254         struct conn *conn;
255 
256         ASSERT(server->ns_conn_q > 0);
257 
258         conn = TAILQ_FIRST(&server->s_conn_q);
259         conn->close(pool->ctx, conn);
260     }
261 
262     return NC_OK;
263 }
264 
265 static void
server_failure(struct context * ctx,struct server * server)266 server_failure(struct context *ctx, struct server *server)
267 {
268     struct server_pool *pool = server->owner;
269     int64_t now, next;
270     rstatus_t status;
271 
272     if (!pool->auto_eject_hosts) {
273         return;
274     }
275 
276     server->failure_count++;
277 
278     log_debug(LOG_VERB, "server '%.*s' failure count %"PRIu32" limit %"PRIu32,
279               server->pname.len, server->pname.data, server->failure_count,
280               pool->server_failure_limit);
281 
282     if (server->failure_count < pool->server_failure_limit) {
283         return;
284     }
285 
286     now = nc_usec_now();
287     if (now < 0) {
288         return;
289     }
290 
291     stats_server_set_ts(ctx, server, server_ejected_at, now);
292 
293     next = now + pool->server_retry_timeout;
294 
295     log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to delete server '%.*s' "
296               "for next %"PRId64" secs", pool->idx, pool->name.len,
297               pool->name.data, server->pname.len, server->pname.data,
298               pool->server_retry_timeout / 1000 / 1000);
299 
300     stats_pool_incr(ctx, pool, server_ejects);
301 
302     server->failure_count = 0;
303     server->next_retry = next;
304 
305     status = server_pool_run(pool);
306     if (status != NC_OK) {
307         log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx,
308                   pool->name.len, pool->name.data, strerror(errno));
309     }
310 }
311 
312 static void
server_close_stats(struct context * ctx,struct server * server,err_t err,unsigned eof,unsigned connected)313 server_close_stats(struct context *ctx, struct server *server, err_t err,
314                    unsigned eof, unsigned connected)
315 {
316     if (connected) {
317         stats_server_decr(ctx, server, server_connections);
318     }
319 
320     if (eof) {
321         stats_server_incr(ctx, server, server_eof);
322         return;
323     }
324 
325     switch (err) {
326     case ETIMEDOUT:
327         stats_server_incr(ctx, server, server_timedout);
328         break;
329     case EPIPE:
330     case ECONNRESET:
331     case ECONNABORTED:
332     case ECONNREFUSED:
333     case ENOTCONN:
334     case ENETDOWN:
335     case ENETUNREACH:
336     case EHOSTDOWN:
337     case EHOSTUNREACH:
338     default:
339         stats_server_incr(ctx, server, server_err);
340         break;
341     }
342 }
343 
344 void
server_close(struct context * ctx,struct conn * conn)345 server_close(struct context *ctx, struct conn *conn)
346 {
347     rstatus_t status;
348     struct msg *msg, *nmsg; /* current and next message */
349     struct conn *c_conn;    /* peer client connection */
350 
351     ASSERT(!conn->client && !conn->proxy);
352 
353     server_close_stats(ctx, conn->owner, conn->err, conn->eof,
354                        conn->connected);
355 
356     conn->connected = false;
357 
358     if (conn->sd < 0) {
359         server_failure(ctx, conn->owner);
360         conn->unref(conn);
361         conn_put(conn);
362         return;
363     }
364 
365     for (msg = TAILQ_FIRST(&conn->imsg_q); msg != NULL; msg = nmsg) {
366         nmsg = TAILQ_NEXT(msg, s_tqe);
367 
368         /* dequeue the message (request) from server inq */
369         conn->dequeue_inq(ctx, conn, msg);
370 
371         /*
372          * Don't send any error response, if
373          * 1. request is tagged as noreply or,
374          * 2. client has already closed its connection
375          */
376         if (msg->swallow || msg->noreply) {
377             log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32
378                       " type %d", conn->sd, msg->id, msg->mlen, msg->type);
379             req_put(msg);
380         } else {
381             c_conn = msg->owner;
382             ASSERT(c_conn->client && !c_conn->proxy);
383 
384             msg->done = 1;
385             msg->error = 1;
386             msg->err = conn->err;
387 
388             if (msg->frag_owner != NULL) {
389                 msg->frag_owner->nfrag_done++;
390             }
391 
392             if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
393                 event_add_out(ctx->evb, msg->owner);
394             }
395 
396             log_debug(LOG_INFO, "close s %d schedule error for req %"PRIu64" "
397                       "len %"PRIu32" type %d from c %d%c %s", conn->sd, msg->id,
398                       msg->mlen, msg->type, c_conn->sd, conn->err ? ':' : ' ',
399                       conn->err ? strerror(conn->err): " ");
400         }
401     }
402     ASSERT(TAILQ_EMPTY(&conn->imsg_q));
403 
404     for (msg = TAILQ_FIRST(&conn->omsg_q); msg != NULL; msg = nmsg) {
405         nmsg = TAILQ_NEXT(msg, s_tqe);
406 
407         /* dequeue the message (request) from server outq */
408         conn->dequeue_outq(ctx, conn, msg);
409 
410         if (msg->swallow) {
411             log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32
412                       " type %d", conn->sd, msg->id, msg->mlen, msg->type);
413             req_put(msg);
414         } else {
415             c_conn = msg->owner;
416             ASSERT(c_conn->client && !c_conn->proxy);
417 
418             msg->done = 1;
419             msg->error = 1;
420             msg->err = conn->err;
421             if (msg->frag_owner != NULL) {
422                 msg->frag_owner->nfrag_done++;
423             }
424 
425             if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
426                 event_add_out(ctx->evb, msg->owner);
427             }
428 
429             log_debug(LOG_INFO, "close s %d schedule error for req %"PRIu64" "
430                       "len %"PRIu32" type %d from c %d%c %s", conn->sd, msg->id,
431                       msg->mlen, msg->type, c_conn->sd, conn->err ? ':' : ' ',
432                       conn->err ? strerror(conn->err): " ");
433         }
434     }
435     ASSERT(TAILQ_EMPTY(&conn->omsg_q));
436 
437     msg = conn->rmsg;
438     if (msg != NULL) {
439         conn->rmsg = NULL;
440 
441         ASSERT(!msg->request);
442         ASSERT(msg->peer == NULL);
443 
444         rsp_put(msg);
445 
446         log_debug(LOG_INFO, "close s %d discarding rsp %"PRIu64" len %"PRIu32" "
447                   "in error", conn->sd, msg->id, msg->mlen);
448     }
449 
450     ASSERT(conn->smsg == NULL);
451 
452     server_failure(ctx, conn->owner);
453 
454     conn->unref(conn);
455 
456     status = close(conn->sd);
457     if (status < 0) {
458         log_error("close s %d failed, ignored: %s", conn->sd, strerror(errno));
459     }
460     conn->sd = -1;
461 
462     conn_put(conn);
463 }
464 
465 rstatus_t
server_connect(struct context * ctx,struct server * server,struct conn * conn)466 server_connect(struct context *ctx, struct server *server, struct conn *conn)
467 {
468     rstatus_t status;
469 
470     ASSERT(!conn->client && !conn->proxy);
471 
472     if (conn->err) {
473       ASSERT(conn->done && conn->sd < 0);
474       errno = conn->err;
475       return NC_ERROR;
476     }
477 
478     if (conn->sd > 0) {
479         /* already connected on server connection */
480         return NC_OK;
481     }
482 
483     log_debug(LOG_VVERB, "connect to server '%.*s'", server->pname.len,
484               server->pname.data);
485 
486     conn->sd = socket(conn->family, SOCK_STREAM, 0);
487     if (conn->sd < 0) {
488         log_error("socket for server '%.*s' failed: %s", server->pname.len,
489                   server->pname.data, strerror(errno));
490         status = NC_ERROR;
491         goto error;
492     }
493 
494     status = nc_set_nonblocking(conn->sd);
495     if (status != NC_OK) {
496         log_error("set nonblock on s %d for server '%.*s' failed: %s",
497                   conn->sd, server->pname.len, server->pname.data,
498                   strerror(errno));
499         goto error;
500     }
501 
502     if (server->pname.data[0] != '/') {
503         status = nc_set_tcpnodelay(conn->sd);
504         if (status != NC_OK) {
505             log_warn("set tcpnodelay on s %d for server '%.*s' failed, ignored: %s",
506                      conn->sd, server->pname.len, server->pname.data,
507                      strerror(errno));
508         }
509     }
510 
511     status = event_add_conn(ctx->evb, conn);
512     if (status != NC_OK) {
513         log_error("event add conn s %d for server '%.*s' failed: %s",
514                   conn->sd, server->pname.len, server->pname.data,
515                   strerror(errno));
516         goto error;
517     }
518 
519     ASSERT(!conn->connecting && !conn->connected);
520 
521     status = connect(conn->sd, conn->addr, conn->addrlen);
522     if (status != NC_OK) {
523         if (errno == EINPROGRESS) {
524             conn->connecting = 1;
525             log_debug(LOG_DEBUG, "connecting on s %d to server '%.*s'",
526                       conn->sd, server->pname.len, server->pname.data);
527             return NC_OK;
528         }
529 
530         log_error("connect on s %d to server '%.*s' failed: %s", conn->sd,
531                   server->pname.len, server->pname.data, strerror(errno));
532 
533         goto error;
534     }
535 
536     ASSERT(!conn->connecting);
537     conn->connected = 1;
538     log_debug(LOG_INFO, "connected on s %d to server '%.*s'", conn->sd,
539               server->pname.len, server->pname.data);
540 
541     return NC_OK;
542 
543 error:
544     conn->err = errno;
545     return status;
546 }
547 
548 void
server_connected(struct context * ctx,struct conn * conn)549 server_connected(struct context *ctx, struct conn *conn)
550 {
551     struct server *server = conn->owner;
552 
553     ASSERT(!conn->client && !conn->proxy);
554     ASSERT(conn->connecting && !conn->connected);
555 
556     stats_server_incr(ctx, server, server_connections);
557 
558     conn->connecting = 0;
559     conn->connected = 1;
560 
561     conn->post_connect(ctx, conn, server);
562 
563     log_debug(LOG_INFO, "connected on s %d to server '%.*s'", conn->sd,
564               server->pname.len, server->pname.data);
565 }
566 
567 void
server_ok(struct context * ctx,struct conn * conn)568 server_ok(struct context *ctx, struct conn *conn)
569 {
570     struct server *server = conn->owner;
571 
572     ASSERT(!conn->client && !conn->proxy);
573     ASSERT(conn->connected);
574 
575     if (server->failure_count != 0) {
576         log_debug(LOG_VERB, "reset server '%.*s' failure count from %"PRIu32
577                   " to 0", server->pname.len, server->pname.data,
578                   server->failure_count);
579         server->failure_count = 0;
580         server->next_retry = 0LL;
581     }
582 }
583 
584 static rstatus_t
server_pool_update(struct server_pool * pool)585 server_pool_update(struct server_pool *pool)
586 {
587     rstatus_t status;
588     int64_t now;
589     uint32_t pnlive_server; /* prev # live server */
590 
591     if (!pool->auto_eject_hosts) {
592         return NC_OK;
593     }
594 
595     if (pool->next_rebuild == 0LL) {
596         return NC_OK;
597     }
598 
599     now = nc_usec_now();
600     if (now < 0) {
601         return NC_ERROR;
602     }
603 
604     if (now <= pool->next_rebuild) {
605         if (pool->nlive_server == 0) {
606             errno = ECONNREFUSED;
607             return NC_ERROR;
608         }
609         return NC_OK;
610     }
611 
612     pnlive_server = pool->nlive_server;
613 
614     status = server_pool_run(pool);
615     if (status != NC_OK) {
616         log_error("updating pool %"PRIu32" with dist %d failed: %s", pool->idx,
617                   pool->dist_type, strerror(errno));
618         return status;
619     }
620 
621     log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to add %"PRIu32" servers",
622               pool->idx, pool->name.len, pool->name.data,
623               pool->nlive_server - pnlive_server);
624 
625 
626     return NC_OK;
627 }
628 
629 static uint32_t
server_pool_hash(const struct server_pool * pool,const uint8_t * key,uint32_t keylen)630 server_pool_hash(const struct server_pool *pool, const uint8_t *key, uint32_t keylen)
631 {
632     ASSERT(array_n(&pool->server) != 0);
633     ASSERT(key != NULL);
634 
635     if (array_n(&pool->server) == 1) {
636         return 0;
637     }
638 
639     if (keylen == 0) {
640         return 0;
641     }
642 
643     return pool->key_hash((const char *)key, keylen);
644 }
645 
646 uint32_t
server_pool_idx(const struct server_pool * pool,const uint8_t * key,uint32_t keylen)647 server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t keylen)
648 {
649     uint32_t hash, idx;
650     uint32_t nserver = array_n(&pool->server);
651 
652     ASSERT(nserver != 0);
653     ASSERT(key != NULL);
654 
655     if (nserver == 1) {
656         /* Optimization: Skip hashing and dispatching for pools with only one server */
657         return 0;
658     }
659 
660     /*
661      * If hash_tag: is configured for this server pool, we use the part of
662      * the key within the hash tag as an input to the distributor. Otherwise
663      * we use the full key
664      */
665     if (!string_empty(&pool->hash_tag)) {
666         const struct string *tag = &pool->hash_tag;
667         const uint8_t *tag_start, *tag_end;
668 
669         tag_start = nc_strchr(key, key + keylen, tag->data[0]);
670         if (tag_start != NULL) {
671             tag_end = nc_strchr(tag_start + 1, key + keylen, tag->data[1]);
672             if ((tag_end != NULL) && (tag_end - tag_start > 1)) {
673                 key = tag_start + 1;
674                 keylen = (uint32_t)(tag_end - key);
675             }
676         }
677     }
678 
679     switch (pool->dist_type) {
680     case DIST_KETAMA:
681         hash = server_pool_hash(pool, key, keylen);
682         idx = ketama_dispatch(pool->continuum, pool->ncontinuum, hash);
683         break;
684 
685     case DIST_MODULA:
686         hash = server_pool_hash(pool, key, keylen);
687         idx = modula_dispatch(pool->continuum, pool->ncontinuum, hash);
688         break;
689 
690     case DIST_RANDOM:
691         idx = random_dispatch(pool->continuum, pool->ncontinuum, 0);
692         break;
693 
694     default:
695         NOT_REACHED();
696         return 0;
697     }
698     ASSERT(idx < array_n(&pool->server));
699     return idx;
700 }
701 
702 static struct server *
server_pool_server(struct server_pool * pool,const uint8_t * key,uint32_t keylen)703 server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen)
704 {
705     struct server *server;
706     uint32_t idx;
707 
708     idx = server_pool_idx(pool, key, keylen);
709     server = array_get(&pool->server, idx);
710 
711     log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen,
712               key, pool->dist_type, server->pname.len, server->pname.data);
713 
714     return server;
715 }
716 
717 struct conn *
server_pool_conn(struct context * ctx,struct server_pool * pool,const uint8_t * key,uint32_t keylen)718 server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key,
719                  uint32_t keylen)
720 {
721     rstatus_t status;
722     struct server *server;
723     struct conn *conn;
724 
725     status = server_pool_update(pool);
726     if (status != NC_OK) {
727         return NULL;
728     }
729 
730     /* from a given {key, keylen} pick a server from pool */
731     server = server_pool_server(pool, key, keylen);
732     if (server == NULL) {
733         return NULL;
734     }
735 
736     /* pick a connection to a given server */
737     conn = server_conn(server);
738     if (conn == NULL) {
739         return NULL;
740     }
741 
742     status = server_connect(ctx, server, conn);
743     if (status != NC_OK) {
744         server_close(ctx, conn);
745         return NULL;
746     }
747 
748     return conn;
749 }
750 
751 static rstatus_t
server_pool_each_preconnect(void * elem,void * data)752 server_pool_each_preconnect(void *elem, void *data)
753 {
754     rstatus_t status;
755     struct server_pool *sp = elem;
756 
757     if (!sp->preconnect) {
758         return NC_OK;
759     }
760 
761     status = array_each(&sp->server, server_each_preconnect, NULL);
762     if (status != NC_OK) {
763         return status;
764     }
765 
766     return NC_OK;
767 }
768 
769 rstatus_t
server_pool_preconnect(struct context * ctx)770 server_pool_preconnect(struct context *ctx)
771 {
772     rstatus_t status;
773 
774     status = array_each(&ctx->pool, server_pool_each_preconnect, NULL);
775     if (status != NC_OK) {
776         return status;
777     }
778 
779     return NC_OK;
780 }
781 
782 static rstatus_t
server_pool_each_disconnect(void * elem,void * data)783 server_pool_each_disconnect(void *elem, void *data)
784 {
785     rstatus_t status;
786     struct server_pool *sp = elem;
787 
788     status = array_each(&sp->server, server_each_disconnect, NULL);
789     if (status != NC_OK) {
790         return status;
791     }
792 
793     return NC_OK;
794 }
795 
796 void
server_pool_disconnect(struct context * ctx)797 server_pool_disconnect(struct context *ctx)
798 {
799     array_each(&ctx->pool, server_pool_each_disconnect, NULL);
800 }
801 
802 static rstatus_t
server_pool_each_set_owner(void * elem,void * data)803 server_pool_each_set_owner(void *elem, void *data)
804 {
805     struct server_pool *sp = elem;
806     struct context *ctx = data;
807 
808     sp->ctx = ctx;
809 
810     return NC_OK;
811 }
812 
813 static rstatus_t
server_pool_each_calc_connections(void * elem,void * data)814 server_pool_each_calc_connections(void *elem, void *data)
815 {
816     struct server_pool *sp = elem;
817     struct context *ctx = data;
818 
819     ctx->max_nsconn += sp->server_connections * array_n(&sp->server);
820     ctx->max_nsconn += 1; /* pool listening socket */
821 
822     return NC_OK;
823 }
824 
825 rstatus_t
server_pool_run(struct server_pool * pool)826 server_pool_run(struct server_pool *pool)
827 {
828     ASSERT(array_n(&pool->server) != 0);
829 
830     switch (pool->dist_type) {
831     case DIST_KETAMA:
832         return ketama_update(pool);
833 
834     case DIST_MODULA:
835         return modula_update(pool);
836 
837     case DIST_RANDOM:
838         return random_update(pool);
839 
840     default:
841         NOT_REACHED();
842         return NC_ERROR;
843     }
844 
845     return NC_OK;
846 }
847 
848 static rstatus_t
server_pool_each_run(void * elem,void * data)849 server_pool_each_run(void *elem, void *data)
850 {
851     return server_pool_run(elem);
852 }
853 
854 rstatus_t
server_pool_init(struct array * server_pool,struct array * conf_pool,struct context * ctx)855 server_pool_init(struct array *server_pool, struct array *conf_pool,
856                  struct context *ctx)
857 {
858     rstatus_t status;
859     uint32_t npool;
860 
861     npool = array_n(conf_pool);
862     ASSERT(npool != 0);
863     ASSERT(array_n(server_pool) == 0);
864 
865     status = array_init(server_pool, npool, sizeof(struct server_pool));
866     if (status != NC_OK) {
867         return status;
868     }
869 
870     /* transform conf pool to server pool */
871     status = array_each(conf_pool, conf_pool_each_transform, server_pool);
872     if (status != NC_OK) {
873         server_pool_deinit(server_pool);
874         return status;
875     }
876     ASSERT(array_n(server_pool) == npool);
877 
878     /* set ctx as the server pool owner */
879     status = array_each(server_pool, server_pool_each_set_owner, ctx);
880     if (status != NC_OK) {
881         server_pool_deinit(server_pool);
882         return status;
883     }
884 
885     /* compute max server connections */
886     ctx->max_nsconn = 0;
887     status = array_each(server_pool, server_pool_each_calc_connections, ctx);
888     if (status != NC_OK) {
889         server_pool_deinit(server_pool);
890         return status;
891     }
892 
893     /* update server pool continuum */
894     status = array_each(server_pool, server_pool_each_run, NULL);
895     if (status != NC_OK) {
896         server_pool_deinit(server_pool);
897         return status;
898     }
899 
900     log_debug(LOG_DEBUG, "init %"PRIu32" pools", npool);
901 
902     return NC_OK;
903 }
904 
905 void
server_pool_deinit(struct array * server_pool)906 server_pool_deinit(struct array *server_pool)
907 {
908     uint32_t i, npool;
909 
910     for (i = 0, npool = array_n(server_pool); i < npool; i++) {
911         struct server_pool *sp;
912 
913         sp = array_pop(server_pool);
914         ASSERT(sp->p_conn == NULL);
915         ASSERT(TAILQ_EMPTY(&sp->c_conn_q) && sp->nc_conn_q == 0);
916 
917         if (sp->continuum != NULL) {
918             nc_free(sp->continuum);
919             sp->ncontinuum = 0;
920             sp->nserver_continuum = 0;
921             sp->nlive_server = 0;
922         }
923 
924         server_deinit(&sp->server);
925 
926         log_debug(LOG_DEBUG, "deinit pool %"PRIu32" '%.*s'", sp->idx,
927                   sp->name.len, sp->name.data);
928     }
929 
930     array_deinit(server_pool);
931 
932     log_debug(LOG_DEBUG, "deinit %"PRIu32" pools", npool);
933 }
934