1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Thread management for memcached.
4 */
5 #include "config.h"
6 #include "memcached.h"
7 #include <assert.h>
8 #include <stdio.h>
9 #include <errno.h>
10 #include <stdlib.h>
11 #include <errno.h>
12 #include <string.h>
13 #include <stdint.h>
14 #include <signal.h>
15 #include <pthread.h>
16 #include <fcntl.h>
17
18 #define ITEMS_PER_ALLOC 64
19
20 #define INNODB_MEMCACHED
21
22 extern void my_thread_end();
23 extern void my_thread_init();
24
25 static char devnull[8192];
26 extern volatile sig_atomic_t memcached_shutdown;
27
28 /* An item in the connection queue. */
29 typedef struct conn_queue_item CQ_ITEM;
30 struct conn_queue_item {
31 SOCKET sfd;
32 STATE_FUNC init_state;
33 int event_flags;
34 int read_buffer_size;
35 enum network_transport transport;
36 CQ_ITEM *next;
37 };
38
39 /* A connection queue. */
40 typedef struct conn_queue CQ;
41 struct conn_queue {
42 CQ_ITEM *head;
43 CQ_ITEM *tail;
44 pthread_mutex_t lock;
45 pthread_cond_t cond;
46 };
47
48 /* Connection lock around accepting new connections */
49 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Lock for global stats */
52 static pthread_mutex_t stats_lock;
53
54 /* Free list of CQ_ITEM structs */
55 static CQ_ITEM *cqi_freelist;
56 static pthread_mutex_t cqi_freelist_lock;
57
58 static LIBEVENT_THREAD dispatcher_thread;
59
60 /*
61 * Each libevent instance has a wakeup pipe, which other threads
62 * can use to signal that they've put a new connection on its queue.
63 */
64 static int nthreads;
65 static LIBEVENT_THREAD *threads;
66 static pthread_t *thread_ids;
67 LIBEVENT_THREAD *tap_thread;
68
69 /*
70 * Number of worker threads that have finished setting themselves up.
71 */
72 static int init_count = 0;
73 static pthread_mutex_t init_lock;
74 static pthread_cond_t init_cond;
75
76
77 static void thread_libevent_process(int fd, short which, void *arg);
78 static void libevent_tap_process(int fd, short which, void *arg);
79
80 /*
81 * Initializes a connection queue.
82 */
cq_init(CQ * cq)83 static void cq_init(CQ *cq) {
84 pthread_mutex_init(&cq->lock, NULL);
85 pthread_cond_init(&cq->cond, NULL);
86 cq->head = NULL;
87 cq->tail = NULL;
88 }
89
90 /*
91 * Looks for an item on a connection queue, but doesn't block if there isn't
92 * one.
93 * Returns the item, or NULL if no item is available
94 */
cq_pop(CQ * cq)95 static CQ_ITEM *cq_pop(CQ *cq) {
96 CQ_ITEM *item;
97
98 pthread_mutex_lock(&cq->lock);
99 item = cq->head;
100 if (NULL != item) {
101 cq->head = item->next;
102 if (NULL == cq->head)
103 cq->tail = NULL;
104 }
105 pthread_mutex_unlock(&cq->lock);
106
107 return item;
108 }
109
110 /*
111 * Adds an item to a connection queue.
112 */
cq_push(CQ * cq,CQ_ITEM * item)113 static void cq_push(CQ *cq, CQ_ITEM *item) {
114 item->next = NULL;
115
116 pthread_mutex_lock(&cq->lock);
117 if (NULL == cq->tail)
118 cq->head = item;
119 else
120 cq->tail->next = item;
121 cq->tail = item;
122 pthread_cond_signal(&cq->cond);
123 pthread_mutex_unlock(&cq->lock);
124 }
125
126 /*
127 * Returns a fresh connection queue item.
128 */
cqi_new(void)129 static CQ_ITEM *cqi_new(void) {
130 CQ_ITEM *item = NULL;
131 pthread_mutex_lock(&cqi_freelist_lock);
132 if (cqi_freelist) {
133 item = cqi_freelist;
134 cqi_freelist = item->next;
135 }
136 pthread_mutex_unlock(&cqi_freelist_lock);
137
138 if (NULL == item) {
139 int i;
140
141 /* Allocate a bunch of items at once to reduce fragmentation */
142 item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
143 if (NULL == item)
144 return NULL;
145
146 /*
147 * Link together all the new items except the first one
148 * (which we'll return to the caller) for placement on
149 * the freelist.
150 */
151 for (i = 2; i < ITEMS_PER_ALLOC; i++)
152 item[i - 1].next = &item[i];
153
154 pthread_mutex_lock(&cqi_freelist_lock);
155 item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
156 cqi_freelist = &item[1];
157 pthread_mutex_unlock(&cqi_freelist_lock);
158 }
159
160 return item;
161 }
162
163
164 /*
165 * Frees a connection queue item (adds it to the freelist.)
166 */
cqi_free(CQ_ITEM * item)167 static void cqi_free(CQ_ITEM *item) {
168 pthread_mutex_lock(&cqi_freelist_lock);
169 item->next = cqi_freelist;
170 cqi_freelist = item;
171 pthread_mutex_unlock(&cqi_freelist_lock);
172 }
173
174
175 /*
176 * Creates a worker thread.
177 */
create_worker(void * (* func)(void *),void * arg,pthread_t * id)178 static void create_worker(void *(*func)(void *), void *arg, pthread_t *id) {
179 pthread_attr_t attr;
180 int ret;
181
182 pthread_attr_init(&attr);
183
184 if ((ret = pthread_create(id, &attr, func, arg)) != 0) {
185 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
186 "Can't create thread: %s\n",
187 strerror(ret));
188 exit(1);
189 }
190 }
191
192 /****************************** LIBEVENT THREADS *****************************/
193
create_notification_pipe(LIBEVENT_THREAD * me)194 bool create_notification_pipe(LIBEVENT_THREAD *me)
195 {
196 if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
197 (void*)me->notify) == SOCKET_ERROR) {
198 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
199 "Can't create notify pipe: %s",
200 strerror(errno));
201 return false;
202 }
203
204 for (int j = 0; j < 2; ++j) {
205 int flags = 1;
206 setsockopt(me->notify[j], IPPROTO_TCP,
207 TCP_NODELAY, (void *)&flags, sizeof(flags));
208 setsockopt(me->notify[j], SOL_SOCKET,
209 SO_REUSEADDR, (void *)&flags, sizeof(flags));
210
211
212 if (evutil_make_socket_nonblocking(me->notify[j]) == -1) {
213 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
214 "Failed to enable non-blocking: %s",
215 strerror(errno));
216 return false;
217 }
218 }
219 return true;
220 }
221
setup_dispatcher(struct event_base * main_base,void (* dispatcher_callback)(int,short,void *))222 static void setup_dispatcher(struct event_base *main_base,
223 void (*dispatcher_callback)(int, short, void *))
224 {
225 memset(&dispatcher_thread, 0, sizeof(dispatcher_thread));
226 dispatcher_thread.type = DISPATCHER;
227 dispatcher_thread.base = main_base;
228 dispatcher_thread.thread_id = pthread_self();
229 if (!create_notification_pipe(&dispatcher_thread)) {
230 exit(1);
231 }
232 /* Listen for notifications from other threads */
233 event_set(&dispatcher_thread.notify_event, dispatcher_thread.notify[0],
234 EV_READ | EV_PERSIST, dispatcher_callback, &dispatcher_callback);
235 event_base_set(dispatcher_thread.base, &dispatcher_thread.notify_event);
236
237 if (event_add(&dispatcher_thread.notify_event, 0) == -1) {
238 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
239 "Can't monitor libevent notify pipe\n");
240 exit(1);
241 }
242 }
243
244 /*
245 * Set up a thread's information.
246 */
setup_thread(LIBEVENT_THREAD * me,bool tap)247 static void setup_thread(LIBEVENT_THREAD *me, bool tap) {
248 me->type = tap ? TAP : GENERAL;
249 me->base = event_init();
250 if (! me->base) {
251 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
252 "Can't allocate event base\n");
253 exit(1);
254 }
255
256 /* Listen for notifications from other threads */
257 event_set(&me->notify_event, me->notify[0],
258 EV_READ | EV_PERSIST,
259 tap ? libevent_tap_process : thread_libevent_process, me);
260 event_base_set(me->base, &me->notify_event);
261
262 if (event_add(&me->notify_event, 0) == -1) {
263 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
264 "Can't monitor libevent notify pipe\n");
265 exit(1);
266 }
267
268 if (!tap) {
269 me->new_conn_queue = malloc(sizeof(struct conn_queue));
270 if (me->new_conn_queue == NULL) {
271 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
272 "Failed to allocate memory for connection queue");
273 exit(EXIT_FAILURE);
274 }
275 cq_init(me->new_conn_queue);
276 }
277
278 if ((pthread_mutex_init(&me->mutex, NULL) != 0)) {
279 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
280 "Failed to initialize mutex: %s\n",
281 strerror(errno));
282 exit(EXIT_FAILURE);
283 }
284
285 me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
286 NULL, NULL);
287 if (me->suffix_cache == NULL) {
288 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
289 "Failed to create suffix cache\n");
290 exit(EXIT_FAILURE);
291 }
292 }
293
294 /*
295 * Worker thread: main event loop
296 */
worker_libevent(void * arg)297 static void *worker_libevent(void *arg) {
298 LIBEVENT_THREAD *me = arg;
299
300 /* Any per-thread setup can happen here; thread_init() will block until
301 * all threads have finished initializing.
302 */
303
304 pthread_mutex_lock(&init_lock);
305 init_count++;
306 pthread_cond_signal(&init_cond);
307 pthread_mutex_unlock(&init_lock);
308
309 event_base_loop(me->base, 0);
310 #ifdef INNODB_MEMCACHED
311 if (me->base)
312 event_base_free(me->base);
313 #endif
314
315 return NULL;
316 }
317
number_of_pending(conn * c,conn * list)318 int number_of_pending(conn *c, conn *list) {
319 int rv = 0;
320 for (; list; list = list->next) {
321 if (list == c) {
322 rv ++;
323 }
324 }
325 return rv;
326 }
327
328 /*
329 * Processes an incoming "handle a new connection" item. This is called when
330 * input arrives on the libevent wakeup pipe.
331 */
thread_libevent_process(int fd,short which,void * arg)332 static void thread_libevent_process(int fd, short which, void *arg) {
333 LIBEVENT_THREAD *me = arg;
334 assert(me->type == GENERAL);
335 CQ_ITEM *item;
336
337 if (recv(fd, devnull, sizeof(devnull), 0) == -1) {
338 if (settings.verbose > 0) {
339 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
340 "Can't read from libevent pipe: %s\n",
341 strerror(errno));
342 }
343 }
344
345 if (memcached_shutdown) {
346 event_base_loopbreak(me->base);
347 return ;
348 }
349
350 while ((item = cq_pop(me->new_conn_queue)) != NULL) {
351 conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
352 item->read_buffer_size, item->transport, me->base,
353 NULL);
354 if (c == NULL) {
355 if (IS_UDP(item->transport)) {
356 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
357 "Can't listen for events on UDP socket\n");
358 exit(1);
359 } else {
360 if (settings.verbose > 0) {
361 settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
362 "Can't listen for events on fd %d\n",
363 item->sfd);
364 }
365 closesocket(item->sfd);
366 }
367 } else {
368 assert(c->thread == NULL);
369 c->thread = me;
370 }
371 cqi_free(item);
372 }
373
374 pthread_mutex_lock(&me->mutex);
375 conn* pending = me->pending_io;
376 me->pending_io = NULL;
377 pthread_mutex_unlock(&me->mutex);
378 while (pending != NULL) {
379 conn *c = pending;
380 assert(me == c->thread);
381 pending = pending->next;
382 c->next = NULL;
383 register_event(c, 0);
384 /*
385 * We don't want the thread to keep on serving all of the data
386 * from the context of the notification pipe, so just let it
387 * run one time to set up the correct mask in libevent
388 */
389 c->nevents = 1;
390 /* c->nevents = settings.reqs_per_event; */
391 while (c->state(c)) {
392 /* do task */
393 }
394 }
395 }
396
397 extern volatile rel_time_t current_time;
398
has_cycle(conn * c)399 bool has_cycle(conn *c) {
400 if (!c) {
401 return false;
402 }
403 conn *slowNode, *fastNode1, *fastNode2;
404 slowNode = fastNode1 = fastNode2 = c;
405 while (slowNode && (fastNode1 = fastNode2->next) && (fastNode2 = fastNode1->next)) {
406 if (slowNode == fastNode1 || slowNode == fastNode2) {
407 return true;
408 }
409 slowNode = slowNode->next;
410 }
411 return false;
412 }
413
list_contains(conn * haystack,conn * needle)414 bool list_contains(conn *haystack, conn *needle) {
415 for (; haystack; haystack = haystack -> next) {
416 if (needle == haystack) {
417 return true;
418 }
419 }
420 return false;
421 }
422
list_remove(conn * haystack,conn * needle)423 conn* list_remove(conn *haystack, conn *needle) {
424 if (!haystack) {
425 return NULL;
426 }
427
428 if (haystack == needle) {
429 conn *rv = needle->next;
430 needle->next = NULL;
431 return rv;
432 }
433
434 haystack->next = list_remove(haystack->next, needle);
435
436 return haystack;
437 }
438
list_to_array(conn ** dest,size_t max_items,conn ** l)439 size_t list_to_array(conn **dest, size_t max_items, conn **l) {
440 size_t n_items = 0;
441 for (; *l && n_items < max_items - 1; ++n_items) {
442 dest[n_items] = *l;
443 *l = dest[n_items]->next;
444 dest[n_items]->next = NULL;
445 dest[n_items]->list_state |= LIST_STATE_PROCESSING;
446 }
447 return n_items;
448 }
449
enlist_conn(conn * c,conn ** list)450 void enlist_conn(conn *c, conn **list) {
451 LIBEVENT_THREAD *thr = c->thread;
452 assert(list == &thr->pending_io || list == &thr->pending_close);
453 if ((c->list_state & LIST_STATE_PROCESSING) == 0) {
454 assert(!list_contains(thr->pending_close, c));
455 assert(!list_contains(thr->pending_io, c));
456 assert(c->next == NULL);
457 c->next = *list;
458 *list = c;
459 assert(list_contains(*list, c));
460 assert(!has_cycle(*list));
461 } else {
462 c->list_state |= (list == &thr->pending_io ?
463 LIST_STATE_REQ_PENDING_IO :
464 LIST_STATE_REQ_PENDING_CLOSE);
465 }
466 }
467
finalize_list(conn ** list,size_t items)468 void finalize_list(conn **list, size_t items) {
469 for (size_t i = 0; i < items; i++) {
470 list[i]->list_state &= ~LIST_STATE_PROCESSING;
471 if (list[i]->sfd != INVALID_SOCKET) {
472 if (list[i]->list_state & LIST_STATE_REQ_PENDING_IO) {
473 enlist_conn(list[i], &list[i]->thread->pending_io);
474 } else if (list[i]->list_state & LIST_STATE_REQ_PENDING_CLOSE) {
475 enlist_conn(list[i], &list[i]->thread->pending_close);
476 }
477 }
478 list[i]->list_state = 0;
479 }
480 }
481
482
libevent_tap_process(int fd,short which,void * arg)483 static void libevent_tap_process(int fd, short which, void *arg) {
484 LIBEVENT_THREAD *me = arg;
485 assert(me->type == TAP);
486
487 if (recv(fd, devnull, sizeof(devnull), 0) == -1) {
488 if (settings.verbose > 0) {
489 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
490 "Can't read from libevent pipe: %s\n",
491 strerror(errno));
492 }
493 }
494
495 if (memcached_shutdown) {
496 event_base_loopbreak(me->base);
497 return ;
498 }
499
500 // Do we have pending closes?
501 const size_t max_items = 256;
502 LOCK_THREAD(me);
503 conn *pending_close[max_items];
504 size_t n_pending_close = 0;
505
506 if (me->pending_close && me->last_checked != current_time) {
507 assert(!has_cycle(me->pending_close));
508 me->last_checked = current_time;
509
510 n_pending_close = list_to_array(pending_close, max_items,
511 &me->pending_close);
512 }
513
514 // Now copy the pending IO buffer and run them...
515 conn *pending_io[max_items];
516 size_t n_items = list_to_array(pending_io, max_items, &me->pending_io);
517
518 UNLOCK_THREAD(me);
519 for (size_t i = 0; i < n_items; ++i) {
520 conn *c = pending_io[i];
521
522 assert(c->thread == me);
523
524 LOCK_THREAD(c->thread);
525 assert(me == c->thread);
526 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
527 "Processing tap pending_io for %d\n", c->sfd);
528
529 UNLOCK_THREAD(me);
530 register_event(c, NULL);
531 /*
532 * We don't want the thread to keep on serving all of the data
533 * from the context of the notification pipe, so just let it
534 * run one time to set up the correct mask in libevent
535 */
536 c->nevents = 1;
537 c->which = EV_WRITE;
538 while (c->state(c)) {
539 /* do task */
540 }
541 }
542
543 /* Close any connections pending close */
544 if (n_pending_close > 0) {
545 for (size_t i = 0; i < n_pending_close; ++i) {
546 conn *ce = pending_close[i];
547 if (ce->refcount == 1) {
548 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
549 "OK, time to nuke: %p\n",
550 (void*)ce);
551 assert(ce->next == NULL);
552 conn_close(ce);
553 } else {
554 LOCK_THREAD(me);
555 enlist_conn(ce, &me->pending_close);
556 UNLOCK_THREAD(me);
557 }
558 }
559 }
560
561 LOCK_THREAD(me);
562 finalize_list(pending_io, n_items);
563 finalize_list(pending_close, n_pending_close);
564 UNLOCK_THREAD(me);
565 }
566
is_thread_me(LIBEVENT_THREAD * thr)567 static bool is_thread_me(LIBEVENT_THREAD *thr) {
568 #ifdef __WIN32__
569 pthread_t tid = pthread_self();
570 return(tid.p == thr->thread_id.p && tid.x == thr->thread_id.x);
571 #else
572 return pthread_self() == thr->thread_id;
573 #endif
574 }
575
notify_io_complete(const void * cookie,ENGINE_ERROR_CODE status)576 void notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status)
577 {
578 if (cookie == NULL) {
579 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
580 "notify_io_complete called without a valid cookie (status %x)\n",
581 status);
582 return ;
583 }
584
585 struct conn *conn = (struct conn *)cookie;
586
587 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
588 "Got notify from %d, status %x\n",
589 conn->sfd, status);
590
591 /*
592 ** TROND:
593 ** I changed the logic for the tap connections so that the core
594 ** issues the ON_DISCONNECT call to the engine instead of trying
595 ** to close the connection. Then it let's the engine have a grace
596 ** period to call notify_io_complete if not it will go ahead and
597 ** kill it.
598 **
599 */
600 if (status == ENGINE_DISCONNECT && conn->thread == tap_thread) {
601 LOCK_THREAD(conn->thread);
602 if (conn->sfd != INVALID_SOCKET) {
603 unregister_event(conn);
604 safe_close(conn->sfd);
605 conn->sfd = INVALID_SOCKET;
606 }
607
608 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
609 "Immediate close of %p\n",
610 conn);
611 conn_set_state(conn, conn_immediate_close);
612
613 if (!is_thread_me(conn->thread)) {
614 /* kick the thread in the butt */
615 notify_thread(conn->thread);
616 }
617
618 UNLOCK_THREAD(conn->thread);
619 return;
620 }
621
622 /*
623 ** There may be a race condition between the engine calling this
624 ** function and the core closing the connection.
625 ** Let's lock the connection structure (this might not be the
626 ** correct one) and re-evaluate.
627 */
628 LIBEVENT_THREAD *thr = conn->thread;
629 if (thr == NULL || (conn->state == conn_closing ||
630 conn->state == conn_pending_close ||
631 conn->state == conn_immediate_close)) {
632 return;
633 }
634
635 int notify = 0;
636
637 LOCK_THREAD(thr);
638 if (thr != conn->thread || !conn->ewouldblock) {
639 // Ignore
640 UNLOCK_THREAD(thr);
641 return;
642 }
643
644 conn->aiostat = status;
645
646 /* Move the connection to the closing state if the engine
647 * wants it to be disconnected
648 */
649 if (status == ENGINE_DISCONNECT) {
650 conn->state = conn_closing;
651 notify = 1;
652 thr->pending_io = list_remove(thr->pending_io, conn);
653 if (number_of_pending(conn, thr->pending_close) == 0) {
654 enlist_conn(conn, &thr->pending_close);
655 }
656 } else {
657 if (number_of_pending(conn, thr->pending_io) +
658 number_of_pending(conn, thr->pending_close) == 0) {
659 if (thr->pending_io == NULL) {
660 notify = 1;
661 }
662 enlist_conn(conn, &thr->pending_io);
663 }
664 }
665 UNLOCK_THREAD(thr);
666
667 /* kick the thread in the butt */
668 if (notify) {
669 notify_thread(thr);
670 }
671 }
672
673 /* Which thread we assigned a connection to most recently. */
674 static int last_thread = -1;
675
676 /*
677 * Dispatches a new connection to another thread. This is only ever called
678 * from the main thread, either during initialization (for UDP) or because
679 * of an incoming connection.
680 */
dispatch_conn_new(SOCKET sfd,STATE_FUNC init_state,int event_flags,int read_buffer_size,enum network_transport transport)681 void dispatch_conn_new(SOCKET sfd, STATE_FUNC init_state, int event_flags,
682 int read_buffer_size, enum network_transport transport) {
683 CQ_ITEM *item = cqi_new();
684 int tid = (last_thread + 1) % settings.num_threads;
685
686 LIBEVENT_THREAD *thread = threads + tid;
687
688 last_thread = tid;
689
690 item->sfd = sfd;
691 item->init_state = init_state;
692 item->event_flags = event_flags;
693 item->read_buffer_size = read_buffer_size;
694 item->transport = transport;
695
696 cq_push(thread->new_conn_queue, item);
697
698 MEMCACHED_CONN_DISPATCH(sfd, (uintptr_t)thread->thread_id);
699 notify_thread(thread);
700 }
701
702 /*
703 * Returns true if this is the thread that listens for new TCP connections.
704 */
is_listen_thread()705 int is_listen_thread() {
706 #ifdef __WIN32__
707 pthread_t tid = pthread_self();
708 return(tid.p == dispatcher_thread.thread_id.p && tid.x == dispatcher_thread.thread_id.x);
709 #else
710 return pthread_self() == dispatcher_thread.thread_id;
711 #endif
712 }
713
notify_dispatcher(void)714 void notify_dispatcher(void) {
715 notify_thread(&dispatcher_thread);
716 }
717
718 /******************************* GLOBAL STATS ******************************/
719
STATS_LOCK()720 void STATS_LOCK() {
721 pthread_mutex_lock(&stats_lock);
722 }
723
STATS_UNLOCK()724 void STATS_UNLOCK() {
725 pthread_mutex_unlock(&stats_lock);
726 }
727
threadlocal_stats_clear(struct thread_stats * stats)728 void threadlocal_stats_clear(struct thread_stats *stats) {
729 stats->cmd_get = 0;
730 stats->get_misses = 0;
731 stats->delete_misses = 0;
732 stats->incr_misses = 0;
733 stats->decr_misses = 0;
734 stats->incr_hits = 0;
735 stats->decr_hits = 0;
736 stats->cas_misses = 0;
737 stats->bytes_written = 0;
738 stats->bytes_read = 0;
739 stats->cmd_flush = 0;
740 stats->conn_yields = 0;
741 stats->auth_cmds = 0;
742 stats->auth_errors = 0;
743
744 memset(stats->slab_stats, 0,
745 sizeof(struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES);
746 }
747
threadlocal_stats_reset(struct thread_stats * thread_stats)748 void threadlocal_stats_reset(struct thread_stats *thread_stats) {
749 int ii;
750 for (ii = 0; ii < settings.num_threads; ++ii) {
751 pthread_mutex_lock(&thread_stats[ii].mutex);
752 threadlocal_stats_clear(&thread_stats[ii]);
753 pthread_mutex_unlock(&thread_stats[ii].mutex);
754 }
755 }
756
threadlocal_stats_aggregate(struct thread_stats * thread_stats,struct thread_stats * stats)757 void threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats) {
758 int ii, sid;
759 for (ii = 0; ii < settings.num_threads; ++ii) {
760 pthread_mutex_lock(&thread_stats[ii].mutex);
761
762 stats->cmd_get += thread_stats[ii].cmd_get;
763 stats->get_misses += thread_stats[ii].get_misses;
764 stats->delete_misses += thread_stats[ii].delete_misses;
765 stats->decr_misses += thread_stats[ii].decr_misses;
766 stats->incr_misses += thread_stats[ii].incr_misses;
767 stats->decr_hits += thread_stats[ii].decr_hits;
768 stats->incr_hits += thread_stats[ii].incr_hits;
769 stats->cas_misses += thread_stats[ii].cas_misses;
770 stats->bytes_read += thread_stats[ii].bytes_read;
771 stats->bytes_written += thread_stats[ii].bytes_written;
772 stats->cmd_flush += thread_stats[ii].cmd_flush;
773 stats->conn_yields += thread_stats[ii].conn_yields;
774 stats->auth_cmds += thread_stats[ii].auth_cmds;
775 stats->auth_errors += thread_stats[ii].auth_errors;
776
777 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
778 stats->slab_stats[sid].cmd_set +=
779 thread_stats[ii].slab_stats[sid].cmd_set;
780 stats->slab_stats[sid].get_hits +=
781 thread_stats[ii].slab_stats[sid].get_hits;
782 stats->slab_stats[sid].delete_hits +=
783 thread_stats[ii].slab_stats[sid].delete_hits;
784 stats->slab_stats[sid].cas_hits +=
785 thread_stats[ii].slab_stats[sid].cas_hits;
786 stats->slab_stats[sid].cas_badval +=
787 thread_stats[ii].slab_stats[sid].cas_badval;
788 }
789
790 pthread_mutex_unlock(&thread_stats[ii].mutex);
791 }
792 }
793
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)794 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
795 int sid;
796
797 out->cmd_set = 0;
798 out->get_hits = 0;
799 out->delete_hits = 0;
800 out->cas_hits = 0;
801 out->cas_badval = 0;
802
803 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
804 out->cmd_set += stats->slab_stats[sid].cmd_set;
805 out->get_hits += stats->slab_stats[sid].get_hits;
806 out->delete_hits += stats->slab_stats[sid].delete_hits;
807 out->cas_hits += stats->slab_stats[sid].cas_hits;
808 out->cas_badval += stats->slab_stats[sid].cas_badval;
809 }
810 }
811
812 /*
813 * Initializes the thread subsystem, creating various worker threads.
814 *
815 * nthreads Number of worker event handler threads to spawn
816 * main_base Event base for main thread
817 */
thread_init(int nthr,struct event_base * main_base,void (* dispatcher_callback)(int,short,void *))818 void thread_init(int nthr, struct event_base *main_base,
819 void (*dispatcher_callback)(int, short, void *)) {
820 int i;
821 nthreads = nthr + 1;
822
823 pthread_mutex_init(&stats_lock, NULL);
824 pthread_mutex_init(&init_lock, NULL);
825 pthread_cond_init(&init_cond, NULL);
826
827 pthread_mutex_init(&cqi_freelist_lock, NULL);
828 cqi_freelist = NULL;
829
830 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
831 if (! threads) {
832 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
833 "Can't allocate thread descriptors: %s",
834 strerror(errno));
835 exit(1);
836 }
837 thread_ids = calloc(nthreads, sizeof(pthread_t));
838 if (! thread_ids) {
839 perror("Can't allocate thread descriptors");
840 exit(1);
841 }
842
843 setup_dispatcher(main_base, dispatcher_callback);
844
845 for (i = 0; i < nthreads; i++) {
846 if (!create_notification_pipe(&threads[i])) {
847 exit(1);
848 }
849 threads[i].index = i;
850
851 setup_thread(&threads[i], i == (nthreads - 1));
852 }
853
854 /* Create threads after we've done all the libevent setup. */
855 for (i = 0; i < nthreads; i++) {
856 create_worker(worker_libevent, &threads[i], &thread_ids[i]);
857 threads[i].thread_id = thread_ids[i];
858 }
859
860 tap_thread = &threads[nthreads - 1];
861
862 /* Wait for all the threads to set themselves up before returning. */
863 pthread_mutex_lock(&init_lock);
864 while (init_count < nthreads) {
865 pthread_cond_wait(&init_cond, &init_lock);
866 }
867 pthread_mutex_unlock(&init_lock);
868 }
869
threads_shutdown(void)870 void threads_shutdown(void)
871 {
872 for (int ii = 0; ii < nthreads; ++ii) {
873 notify_thread(&threads[ii]);
874 pthread_join(thread_ids[ii], NULL);
875 }
876 for (int ii = 0; ii < nthreads; ++ii) {
877 safe_close(threads[ii].notify[0]);
878 safe_close(threads[ii].notify[1]);
879 }
880
881 #ifdef INNODB_MEMCACHED
882 if (dispatcher_thread.notify[0])
883 closesocket(dispatcher_thread.notify[0]);
884 if (dispatcher_thread.notify[1])
885 closesocket(dispatcher_thread.notify[1]);
886 #endif
887 }
888
notify_thread(LIBEVENT_THREAD * thread)889 void notify_thread(LIBEVENT_THREAD *thread) {
890 if (send(thread->notify[1], "", 1, 0) != 1) {
891 if (thread == tap_thread) {
892 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
893 "Failed to notify TAP thread: %s",
894 strerror(errno));
895 } else {
896 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
897 "Failed to notify thread: %s",
898 strerror(errno));
899 }
900 }
901 }
902