1 /*
2 * twemproxy - A fast and lightweight proxy for memcached protocol.
3 * Copyright (C) 2011 Twitter, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include <stdlib.h>
19 #include <unistd.h>
20
21 #include <nc_core.h>
22 #include <nc_server.h>
23 #include <nc_conf.h>
24
25 static void
server_resolve(struct server * server,struct conn * conn)26 server_resolve(struct server *server, struct conn *conn)
27 {
28 rstatus_t status;
29
30 status = nc_resolve(&server->addrstr, server->port, &server->info);
31 if (status != NC_OK) {
32 conn->err = EHOSTDOWN;
33 conn->done = 1;
34 return;
35 }
36
37 conn->family = server->info.family;
38 conn->addrlen = server->info.addrlen;
39 conn->addr = (struct sockaddr *)&server->info.addr;
40 }
41
42 void
server_ref(struct conn * conn,void * owner)43 server_ref(struct conn *conn, void *owner)
44 {
45 struct server *server = owner;
46
47 ASSERT(!conn->client && !conn->proxy);
48 ASSERT(conn->owner == NULL);
49
50 server_resolve(server, conn);
51
52 server->ns_conn_q++;
53 TAILQ_INSERT_TAIL(&server->s_conn_q, conn, conn_tqe);
54
55 conn->owner = owner;
56
57 log_debug(LOG_VVERB, "ref conn %p owner %p into '%.*s", conn, server,
58 server->pname.len, server->pname.data);
59 }
60
61 void
server_unref(struct conn * conn)62 server_unref(struct conn *conn)
63 {
64 struct server *server;
65
66 ASSERT(!conn->client && !conn->proxy);
67 ASSERT(conn->owner != NULL);
68
69 server = conn->owner;
70 conn->owner = NULL;
71
72 ASSERT(server->ns_conn_q != 0);
73 server->ns_conn_q--;
74 TAILQ_REMOVE(&server->s_conn_q, conn, conn_tqe);
75
76 log_debug(LOG_VVERB, "unref conn %p owner %p from '%.*s'", conn, server,
77 server->pname.len, server->pname.data);
78 }
79
80 int
server_timeout(struct conn * conn)81 server_timeout(struct conn *conn)
82 {
83 struct server *server;
84 struct server_pool *pool;
85
86 ASSERT(!conn->client && !conn->proxy);
87
88 server = conn->owner;
89 pool = server->owner;
90
91 return pool->timeout;
92 }
93
94 bool
server_active(const struct conn * conn)95 server_active(const struct conn *conn)
96 {
97 ASSERT(!conn->client && !conn->proxy);
98
99 if (!TAILQ_EMPTY(&conn->imsg_q)) {
100 log_debug(LOG_VVERB, "s %d is active", conn->sd);
101 return true;
102 }
103
104 if (!TAILQ_EMPTY(&conn->omsg_q)) {
105 log_debug(LOG_VVERB, "s %d is active", conn->sd);
106 return true;
107 }
108
109 if (conn->rmsg != NULL) {
110 log_debug(LOG_VVERB, "s %d is active", conn->sd);
111 return true;
112 }
113
114 if (conn->smsg != NULL) {
115 log_debug(LOG_VVERB, "s %d is active", conn->sd);
116 return true;
117 }
118
119 log_debug(LOG_VVERB, "s %d is inactive", conn->sd);
120
121 return false;
122 }
123
124 static rstatus_t
server_each_set_owner(void * elem,void * data)125 server_each_set_owner(void *elem, void *data)
126 {
127 struct server *s = elem;
128 struct server_pool *sp = data;
129
130 s->owner = sp;
131
132 return NC_OK;
133 }
134
135 rstatus_t
server_init(struct array * server,struct array * conf_server,struct server_pool * sp)136 server_init(struct array *server, struct array *conf_server,
137 struct server_pool *sp)
138 {
139 rstatus_t status;
140 uint32_t nserver;
141
142 nserver = array_n(conf_server);
143 ASSERT(nserver != 0);
144 ASSERT(array_n(server) == 0);
145
146 status = array_init(server, nserver, sizeof(struct server));
147 if (status != NC_OK) {
148 return status;
149 }
150
151 /* transform conf server to server */
152 status = array_each(conf_server, conf_server_each_transform, server);
153 if (status != NC_OK) {
154 server_deinit(server);
155 return status;
156 }
157 ASSERT(array_n(server) == nserver);
158
159 /* set server owner */
160 status = array_each(server, server_each_set_owner, sp);
161 if (status != NC_OK) {
162 server_deinit(server);
163 return status;
164 }
165
166 log_debug(LOG_DEBUG, "init %"PRIu32" servers in pool %"PRIu32" '%.*s'",
167 nserver, sp->idx, sp->name.len, sp->name.data);
168
169 return NC_OK;
170 }
171
172 void
server_deinit(struct array * server)173 server_deinit(struct array *server)
174 {
175 uint32_t i, nserver;
176
177 for (i = 0, nserver = array_n(server); i < nserver; i++) {
178 struct server *s;
179
180 s = array_pop(server);
181 ASSERT(TAILQ_EMPTY(&s->s_conn_q) && s->ns_conn_q == 0);
182 }
183 array_deinit(server);
184 }
185
186 struct conn *
server_conn(struct server * server)187 server_conn(struct server *server)
188 {
189 struct server_pool *pool;
190 struct conn *conn;
191
192 pool = server->owner;
193
194 /*
195 * FIXME: handle multiple server connections per server and do load
196 * balancing on it. Support multiple algorithms for
197 * 'server_connections:' > 0 key
198 */
199
200 if (server->ns_conn_q < pool->server_connections) {
201 return conn_get(server, false, pool->redis);
202 }
203 ASSERT(server->ns_conn_q == pool->server_connections);
204
205 /*
206 * Pick a server connection from the head of the queue and insert
207 * it back into the tail of queue to maintain the lru order
208 */
209 conn = TAILQ_FIRST(&server->s_conn_q);
210 ASSERT(!conn->client && !conn->proxy);
211
212 TAILQ_REMOVE(&server->s_conn_q, conn, conn_tqe);
213 TAILQ_INSERT_TAIL(&server->s_conn_q, conn, conn_tqe);
214
215 return conn;
216 }
217
218 static rstatus_t
server_each_preconnect(void * elem,void * data)219 server_each_preconnect(void *elem, void *data)
220 {
221 rstatus_t status;
222 struct server *server;
223 struct server_pool *pool;
224 struct conn *conn;
225
226 server = elem;
227 pool = server->owner;
228
229 conn = server_conn(server);
230 if (conn == NULL) {
231 return NC_ENOMEM;
232 }
233
234 status = server_connect(pool->ctx, server, conn);
235 if (status != NC_OK) {
236 log_warn("connect to server '%.*s' failed, ignored: %s",
237 server->pname.len, server->pname.data, strerror(errno));
238 server_close(pool->ctx, conn);
239 }
240
241 return NC_OK;
242 }
243
244 static rstatus_t
server_each_disconnect(void * elem,void * data)245 server_each_disconnect(void *elem, void *data)
246 {
247 struct server *server;
248 struct server_pool *pool;
249
250 server = elem;
251 pool = server->owner;
252
253 while (!TAILQ_EMPTY(&server->s_conn_q)) {
254 struct conn *conn;
255
256 ASSERT(server->ns_conn_q > 0);
257
258 conn = TAILQ_FIRST(&server->s_conn_q);
259 conn->close(pool->ctx, conn);
260 }
261
262 return NC_OK;
263 }
264
265 static void
server_failure(struct context * ctx,struct server * server)266 server_failure(struct context *ctx, struct server *server)
267 {
268 struct server_pool *pool = server->owner;
269 int64_t now, next;
270 rstatus_t status;
271
272 if (!pool->auto_eject_hosts) {
273 return;
274 }
275
276 server->failure_count++;
277
278 log_debug(LOG_VERB, "server '%.*s' failure count %"PRIu32" limit %"PRIu32,
279 server->pname.len, server->pname.data, server->failure_count,
280 pool->server_failure_limit);
281
282 if (server->failure_count < pool->server_failure_limit) {
283 return;
284 }
285
286 now = nc_usec_now();
287 if (now < 0) {
288 return;
289 }
290
291 stats_server_set_ts(ctx, server, server_ejected_at, now);
292
293 next = now + pool->server_retry_timeout;
294
295 log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to delete server '%.*s' "
296 "for next %"PRId64" secs", pool->idx, pool->name.len,
297 pool->name.data, server->pname.len, server->pname.data,
298 pool->server_retry_timeout / 1000 / 1000);
299
300 stats_pool_incr(ctx, pool, server_ejects);
301
302 server->failure_count = 0;
303 server->next_retry = next;
304
305 status = server_pool_run(pool);
306 if (status != NC_OK) {
307 log_error("updating pool %"PRIu32" '%.*s' failed: %s", pool->idx,
308 pool->name.len, pool->name.data, strerror(errno));
309 }
310 }
311
312 static void
server_close_stats(struct context * ctx,struct server * server,err_t err,unsigned eof,unsigned connected)313 server_close_stats(struct context *ctx, struct server *server, err_t err,
314 unsigned eof, unsigned connected)
315 {
316 if (connected) {
317 stats_server_decr(ctx, server, server_connections);
318 }
319
320 if (eof) {
321 stats_server_incr(ctx, server, server_eof);
322 return;
323 }
324
325 switch (err) {
326 case ETIMEDOUT:
327 stats_server_incr(ctx, server, server_timedout);
328 break;
329 case EPIPE:
330 case ECONNRESET:
331 case ECONNABORTED:
332 case ECONNREFUSED:
333 case ENOTCONN:
334 case ENETDOWN:
335 case ENETUNREACH:
336 case EHOSTDOWN:
337 case EHOSTUNREACH:
338 default:
339 stats_server_incr(ctx, server, server_err);
340 break;
341 }
342 }
343
344 void
server_close(struct context * ctx,struct conn * conn)345 server_close(struct context *ctx, struct conn *conn)
346 {
347 rstatus_t status;
348 struct msg *msg, *nmsg; /* current and next message */
349 struct conn *c_conn; /* peer client connection */
350
351 ASSERT(!conn->client && !conn->proxy);
352
353 server_close_stats(ctx, conn->owner, conn->err, conn->eof,
354 conn->connected);
355
356 conn->connected = false;
357
358 if (conn->sd < 0) {
359 server_failure(ctx, conn->owner);
360 conn->unref(conn);
361 conn_put(conn);
362 return;
363 }
364
365 for (msg = TAILQ_FIRST(&conn->imsg_q); msg != NULL; msg = nmsg) {
366 nmsg = TAILQ_NEXT(msg, s_tqe);
367
368 /* dequeue the message (request) from server inq */
369 conn->dequeue_inq(ctx, conn, msg);
370
371 /*
372 * Don't send any error response, if
373 * 1. request is tagged as noreply or,
374 * 2. client has already closed its connection
375 */
376 if (msg->swallow || msg->noreply) {
377 log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32
378 " type %d", conn->sd, msg->id, msg->mlen, msg->type);
379 req_put(msg);
380 } else {
381 c_conn = msg->owner;
382 ASSERT(c_conn->client && !c_conn->proxy);
383
384 msg->done = 1;
385 msg->error = 1;
386 msg->err = conn->err;
387
388 if (msg->frag_owner != NULL) {
389 msg->frag_owner->nfrag_done++;
390 }
391
392 if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
393 event_add_out(ctx->evb, msg->owner);
394 }
395
396 log_debug(LOG_INFO, "close s %d schedule error for req %"PRIu64" "
397 "len %"PRIu32" type %d from c %d%c %s", conn->sd, msg->id,
398 msg->mlen, msg->type, c_conn->sd, conn->err ? ':' : ' ',
399 conn->err ? strerror(conn->err): " ");
400 }
401 }
402 ASSERT(TAILQ_EMPTY(&conn->imsg_q));
403
404 for (msg = TAILQ_FIRST(&conn->omsg_q); msg != NULL; msg = nmsg) {
405 nmsg = TAILQ_NEXT(msg, s_tqe);
406
407 /* dequeue the message (request) from server outq */
408 conn->dequeue_outq(ctx, conn, msg);
409
410 if (msg->swallow) {
411 log_debug(LOG_INFO, "close s %d swallow req %"PRIu64" len %"PRIu32
412 " type %d", conn->sd, msg->id, msg->mlen, msg->type);
413 req_put(msg);
414 } else {
415 c_conn = msg->owner;
416 ASSERT(c_conn->client && !c_conn->proxy);
417
418 msg->done = 1;
419 msg->error = 1;
420 msg->err = conn->err;
421 if (msg->frag_owner != NULL) {
422 msg->frag_owner->nfrag_done++;
423 }
424
425 if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
426 event_add_out(ctx->evb, msg->owner);
427 }
428
429 log_debug(LOG_INFO, "close s %d schedule error for req %"PRIu64" "
430 "len %"PRIu32" type %d from c %d%c %s", conn->sd, msg->id,
431 msg->mlen, msg->type, c_conn->sd, conn->err ? ':' : ' ',
432 conn->err ? strerror(conn->err): " ");
433 }
434 }
435 ASSERT(TAILQ_EMPTY(&conn->omsg_q));
436
437 msg = conn->rmsg;
438 if (msg != NULL) {
439 conn->rmsg = NULL;
440
441 ASSERT(!msg->request);
442 ASSERT(msg->peer == NULL);
443
444 rsp_put(msg);
445
446 log_debug(LOG_INFO, "close s %d discarding rsp %"PRIu64" len %"PRIu32" "
447 "in error", conn->sd, msg->id, msg->mlen);
448 }
449
450 ASSERT(conn->smsg == NULL);
451
452 server_failure(ctx, conn->owner);
453
454 conn->unref(conn);
455
456 status = close(conn->sd);
457 if (status < 0) {
458 log_error("close s %d failed, ignored: %s", conn->sd, strerror(errno));
459 }
460 conn->sd = -1;
461
462 conn_put(conn);
463 }
464
465 rstatus_t
server_connect(struct context * ctx,struct server * server,struct conn * conn)466 server_connect(struct context *ctx, struct server *server, struct conn *conn)
467 {
468 rstatus_t status;
469
470 ASSERT(!conn->client && !conn->proxy);
471
472 if (conn->err) {
473 ASSERT(conn->done && conn->sd < 0);
474 errno = conn->err;
475 return NC_ERROR;
476 }
477
478 if (conn->sd > 0) {
479 /* already connected on server connection */
480 return NC_OK;
481 }
482
483 log_debug(LOG_VVERB, "connect to server '%.*s'", server->pname.len,
484 server->pname.data);
485
486 conn->sd = socket(conn->family, SOCK_STREAM, 0);
487 if (conn->sd < 0) {
488 log_error("socket for server '%.*s' failed: %s", server->pname.len,
489 server->pname.data, strerror(errno));
490 status = NC_ERROR;
491 goto error;
492 }
493
494 status = nc_set_nonblocking(conn->sd);
495 if (status != NC_OK) {
496 log_error("set nonblock on s %d for server '%.*s' failed: %s",
497 conn->sd, server->pname.len, server->pname.data,
498 strerror(errno));
499 goto error;
500 }
501
502 if (server->pname.data[0] != '/') {
503 status = nc_set_tcpnodelay(conn->sd);
504 if (status != NC_OK) {
505 log_warn("set tcpnodelay on s %d for server '%.*s' failed, ignored: %s",
506 conn->sd, server->pname.len, server->pname.data,
507 strerror(errno));
508 }
509 }
510
511 status = event_add_conn(ctx->evb, conn);
512 if (status != NC_OK) {
513 log_error("event add conn s %d for server '%.*s' failed: %s",
514 conn->sd, server->pname.len, server->pname.data,
515 strerror(errno));
516 goto error;
517 }
518
519 ASSERT(!conn->connecting && !conn->connected);
520
521 status = connect(conn->sd, conn->addr, conn->addrlen);
522 if (status != NC_OK) {
523 if (errno == EINPROGRESS) {
524 conn->connecting = 1;
525 log_debug(LOG_DEBUG, "connecting on s %d to server '%.*s'",
526 conn->sd, server->pname.len, server->pname.data);
527 return NC_OK;
528 }
529
530 log_error("connect on s %d to server '%.*s' failed: %s", conn->sd,
531 server->pname.len, server->pname.data, strerror(errno));
532
533 goto error;
534 }
535
536 ASSERT(!conn->connecting);
537 conn->connected = 1;
538 log_debug(LOG_INFO, "connected on s %d to server '%.*s'", conn->sd,
539 server->pname.len, server->pname.data);
540
541 return NC_OK;
542
543 error:
544 conn->err = errno;
545 return status;
546 }
547
548 void
server_connected(struct context * ctx,struct conn * conn)549 server_connected(struct context *ctx, struct conn *conn)
550 {
551 struct server *server = conn->owner;
552
553 ASSERT(!conn->client && !conn->proxy);
554 ASSERT(conn->connecting && !conn->connected);
555
556 stats_server_incr(ctx, server, server_connections);
557
558 conn->connecting = 0;
559 conn->connected = 1;
560
561 conn->post_connect(ctx, conn, server);
562
563 log_debug(LOG_INFO, "connected on s %d to server '%.*s'", conn->sd,
564 server->pname.len, server->pname.data);
565 }
566
567 void
server_ok(struct context * ctx,struct conn * conn)568 server_ok(struct context *ctx, struct conn *conn)
569 {
570 struct server *server = conn->owner;
571
572 ASSERT(!conn->client && !conn->proxy);
573 ASSERT(conn->connected);
574
575 if (server->failure_count != 0) {
576 log_debug(LOG_VERB, "reset server '%.*s' failure count from %"PRIu32
577 " to 0", server->pname.len, server->pname.data,
578 server->failure_count);
579 server->failure_count = 0;
580 server->next_retry = 0LL;
581 }
582 }
583
584 static rstatus_t
server_pool_update(struct server_pool * pool)585 server_pool_update(struct server_pool *pool)
586 {
587 rstatus_t status;
588 int64_t now;
589 uint32_t pnlive_server; /* prev # live server */
590
591 if (!pool->auto_eject_hosts) {
592 return NC_OK;
593 }
594
595 if (pool->next_rebuild == 0LL) {
596 return NC_OK;
597 }
598
599 now = nc_usec_now();
600 if (now < 0) {
601 return NC_ERROR;
602 }
603
604 if (now <= pool->next_rebuild) {
605 if (pool->nlive_server == 0) {
606 errno = ECONNREFUSED;
607 return NC_ERROR;
608 }
609 return NC_OK;
610 }
611
612 pnlive_server = pool->nlive_server;
613
614 status = server_pool_run(pool);
615 if (status != NC_OK) {
616 log_error("updating pool %"PRIu32" with dist %d failed: %s", pool->idx,
617 pool->dist_type, strerror(errno));
618 return status;
619 }
620
621 log_debug(LOG_INFO, "update pool %"PRIu32" '%.*s' to add %"PRIu32" servers",
622 pool->idx, pool->name.len, pool->name.data,
623 pool->nlive_server - pnlive_server);
624
625
626 return NC_OK;
627 }
628
629 static uint32_t
server_pool_hash(const struct server_pool * pool,const uint8_t * key,uint32_t keylen)630 server_pool_hash(const struct server_pool *pool, const uint8_t *key, uint32_t keylen)
631 {
632 ASSERT(array_n(&pool->server) != 0);
633 ASSERT(key != NULL);
634
635 if (array_n(&pool->server) == 1) {
636 return 0;
637 }
638
639 if (keylen == 0) {
640 return 0;
641 }
642
643 return pool->key_hash((const char *)key, keylen);
644 }
645
646 uint32_t
server_pool_idx(const struct server_pool * pool,const uint8_t * key,uint32_t keylen)647 server_pool_idx(const struct server_pool *pool, const uint8_t *key, uint32_t keylen)
648 {
649 uint32_t hash, idx;
650 uint32_t nserver = array_n(&pool->server);
651
652 ASSERT(nserver != 0);
653 ASSERT(key != NULL);
654
655 if (nserver == 1) {
656 /* Optimization: Skip hashing and dispatching for pools with only one server */
657 return 0;
658 }
659
660 /*
661 * If hash_tag: is configured for this server pool, we use the part of
662 * the key within the hash tag as an input to the distributor. Otherwise
663 * we use the full key
664 */
665 if (!string_empty(&pool->hash_tag)) {
666 const struct string *tag = &pool->hash_tag;
667 const uint8_t *tag_start, *tag_end;
668
669 tag_start = nc_strchr(key, key + keylen, tag->data[0]);
670 if (tag_start != NULL) {
671 tag_end = nc_strchr(tag_start + 1, key + keylen, tag->data[1]);
672 if ((tag_end != NULL) && (tag_end - tag_start > 1)) {
673 key = tag_start + 1;
674 keylen = (uint32_t)(tag_end - key);
675 }
676 }
677 }
678
679 switch (pool->dist_type) {
680 case DIST_KETAMA:
681 hash = server_pool_hash(pool, key, keylen);
682 idx = ketama_dispatch(pool->continuum, pool->ncontinuum, hash);
683 break;
684
685 case DIST_MODULA:
686 hash = server_pool_hash(pool, key, keylen);
687 idx = modula_dispatch(pool->continuum, pool->ncontinuum, hash);
688 break;
689
690 case DIST_RANDOM:
691 idx = random_dispatch(pool->continuum, pool->ncontinuum, 0);
692 break;
693
694 default:
695 NOT_REACHED();
696 return 0;
697 }
698 ASSERT(idx < array_n(&pool->server));
699 return idx;
700 }
701
702 static struct server *
server_pool_server(struct server_pool * pool,const uint8_t * key,uint32_t keylen)703 server_pool_server(struct server_pool *pool, const uint8_t *key, uint32_t keylen)
704 {
705 struct server *server;
706 uint32_t idx;
707
708 idx = server_pool_idx(pool, key, keylen);
709 server = array_get(&pool->server, idx);
710
711 log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen,
712 key, pool->dist_type, server->pname.len, server->pname.data);
713
714 return server;
715 }
716
717 struct conn *
server_pool_conn(struct context * ctx,struct server_pool * pool,const uint8_t * key,uint32_t keylen)718 server_pool_conn(struct context *ctx, struct server_pool *pool, const uint8_t *key,
719 uint32_t keylen)
720 {
721 rstatus_t status;
722 struct server *server;
723 struct conn *conn;
724
725 status = server_pool_update(pool);
726 if (status != NC_OK) {
727 return NULL;
728 }
729
730 /* from a given {key, keylen} pick a server from pool */
731 server = server_pool_server(pool, key, keylen);
732 if (server == NULL) {
733 return NULL;
734 }
735
736 /* pick a connection to a given server */
737 conn = server_conn(server);
738 if (conn == NULL) {
739 return NULL;
740 }
741
742 status = server_connect(ctx, server, conn);
743 if (status != NC_OK) {
744 server_close(ctx, conn);
745 return NULL;
746 }
747
748 return conn;
749 }
750
751 static rstatus_t
server_pool_each_preconnect(void * elem,void * data)752 server_pool_each_preconnect(void *elem, void *data)
753 {
754 rstatus_t status;
755 struct server_pool *sp = elem;
756
757 if (!sp->preconnect) {
758 return NC_OK;
759 }
760
761 status = array_each(&sp->server, server_each_preconnect, NULL);
762 if (status != NC_OK) {
763 return status;
764 }
765
766 return NC_OK;
767 }
768
769 rstatus_t
server_pool_preconnect(struct context * ctx)770 server_pool_preconnect(struct context *ctx)
771 {
772 rstatus_t status;
773
774 status = array_each(&ctx->pool, server_pool_each_preconnect, NULL);
775 if (status != NC_OK) {
776 return status;
777 }
778
779 return NC_OK;
780 }
781
782 static rstatus_t
server_pool_each_disconnect(void * elem,void * data)783 server_pool_each_disconnect(void *elem, void *data)
784 {
785 rstatus_t status;
786 struct server_pool *sp = elem;
787
788 status = array_each(&sp->server, server_each_disconnect, NULL);
789 if (status != NC_OK) {
790 return status;
791 }
792
793 return NC_OK;
794 }
795
796 void
server_pool_disconnect(struct context * ctx)797 server_pool_disconnect(struct context *ctx)
798 {
799 array_each(&ctx->pool, server_pool_each_disconnect, NULL);
800 }
801
802 static rstatus_t
server_pool_each_set_owner(void * elem,void * data)803 server_pool_each_set_owner(void *elem, void *data)
804 {
805 struct server_pool *sp = elem;
806 struct context *ctx = data;
807
808 sp->ctx = ctx;
809
810 return NC_OK;
811 }
812
813 static rstatus_t
server_pool_each_calc_connections(void * elem,void * data)814 server_pool_each_calc_connections(void *elem, void *data)
815 {
816 struct server_pool *sp = elem;
817 struct context *ctx = data;
818
819 ctx->max_nsconn += sp->server_connections * array_n(&sp->server);
820 ctx->max_nsconn += 1; /* pool listening socket */
821
822 return NC_OK;
823 }
824
825 rstatus_t
server_pool_run(struct server_pool * pool)826 server_pool_run(struct server_pool *pool)
827 {
828 ASSERT(array_n(&pool->server) != 0);
829
830 switch (pool->dist_type) {
831 case DIST_KETAMA:
832 return ketama_update(pool);
833
834 case DIST_MODULA:
835 return modula_update(pool);
836
837 case DIST_RANDOM:
838 return random_update(pool);
839
840 default:
841 NOT_REACHED();
842 return NC_ERROR;
843 }
844
845 return NC_OK;
846 }
847
848 static rstatus_t
server_pool_each_run(void * elem,void * data)849 server_pool_each_run(void *elem, void *data)
850 {
851 return server_pool_run(elem);
852 }
853
854 rstatus_t
server_pool_init(struct array * server_pool,struct array * conf_pool,struct context * ctx)855 server_pool_init(struct array *server_pool, struct array *conf_pool,
856 struct context *ctx)
857 {
858 rstatus_t status;
859 uint32_t npool;
860
861 npool = array_n(conf_pool);
862 ASSERT(npool != 0);
863 ASSERT(array_n(server_pool) == 0);
864
865 status = array_init(server_pool, npool, sizeof(struct server_pool));
866 if (status != NC_OK) {
867 return status;
868 }
869
870 /* transform conf pool to server pool */
871 status = array_each(conf_pool, conf_pool_each_transform, server_pool);
872 if (status != NC_OK) {
873 server_pool_deinit(server_pool);
874 return status;
875 }
876 ASSERT(array_n(server_pool) == npool);
877
878 /* set ctx as the server pool owner */
879 status = array_each(server_pool, server_pool_each_set_owner, ctx);
880 if (status != NC_OK) {
881 server_pool_deinit(server_pool);
882 return status;
883 }
884
885 /* compute max server connections */
886 ctx->max_nsconn = 0;
887 status = array_each(server_pool, server_pool_each_calc_connections, ctx);
888 if (status != NC_OK) {
889 server_pool_deinit(server_pool);
890 return status;
891 }
892
893 /* update server pool continuum */
894 status = array_each(server_pool, server_pool_each_run, NULL);
895 if (status != NC_OK) {
896 server_pool_deinit(server_pool);
897 return status;
898 }
899
900 log_debug(LOG_DEBUG, "init %"PRIu32" pools", npool);
901
902 return NC_OK;
903 }
904
905 void
server_pool_deinit(struct array * server_pool)906 server_pool_deinit(struct array *server_pool)
907 {
908 uint32_t i, npool;
909
910 for (i = 0, npool = array_n(server_pool); i < npool; i++) {
911 struct server_pool *sp;
912
913 sp = array_pop(server_pool);
914 ASSERT(sp->p_conn == NULL);
915 ASSERT(TAILQ_EMPTY(&sp->c_conn_q) && sp->nc_conn_q == 0);
916
917 if (sp->continuum != NULL) {
918 nc_free(sp->continuum);
919 sp->ncontinuum = 0;
920 sp->nserver_continuum = 0;
921 sp->nlive_server = 0;
922 }
923
924 server_deinit(&sp->server);
925
926 log_debug(LOG_DEBUG, "deinit pool %"PRIu32" '%.*s'", sp->idx,
927 sp->name.len, sp->name.data);
928 }
929
930 array_deinit(server_pool);
931
932 log_debug(LOG_DEBUG, "deinit %"PRIu32" pools", npool);
933 }
934