1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  * Thread management for memcached.
4  */
5 #include "config.h"
6 #include "memcached.h"
7 #include <assert.h>
8 #include <stdio.h>
9 #include <errno.h>
10 #include <stdlib.h>
11 #include <errno.h>
12 #include <string.h>
13 #include <stdint.h>
14 #include <signal.h>
15 #include <pthread.h>
16 #include <fcntl.h>
17 
18 #define ITEMS_PER_ALLOC 64
19 
20 #define INNODB_MEMCACHED
21 
22 extern void my_thread_end();
23 extern void my_thread_init();
24 
25 static char devnull[8192];
26 extern volatile sig_atomic_t memcached_shutdown;
27 
28 /* An item in the connection queue. */
29 typedef struct conn_queue_item CQ_ITEM;
30 struct conn_queue_item {
31     SOCKET            sfd;
32     STATE_FUNC        init_state;
33     int               event_flags;
34     int               read_buffer_size;
35     enum network_transport     transport;
36     CQ_ITEM          *next;
37 };
38 
39 /* A connection queue. */
40 typedef struct conn_queue CQ;
41 struct conn_queue {
42     CQ_ITEM *head;
43     CQ_ITEM *tail;
44     pthread_mutex_t lock;
45     pthread_cond_t  cond;
46 };
47 
48 /* Connection lock around accepting new connections */
49 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
50 
51 /* Lock for global stats */
52 static pthread_mutex_t stats_lock;
53 
54 /* Free list of CQ_ITEM structs */
55 static CQ_ITEM *cqi_freelist;
56 static pthread_mutex_t cqi_freelist_lock;
57 
58 static LIBEVENT_THREAD dispatcher_thread;
59 
60 /*
61  * Each libevent instance has a wakeup pipe, which other threads
62  * can use to signal that they've put a new connection on its queue.
63  */
64 static int nthreads;
65 static LIBEVENT_THREAD *threads;
66 static pthread_t *thread_ids;
67 LIBEVENT_THREAD *tap_thread;
68 
69 /*
70  * Number of worker threads that have finished setting themselves up.
71  */
72 static int init_count = 0;
73 static pthread_mutex_t init_lock;
74 static pthread_cond_t init_cond;
75 
76 
77 static void thread_libevent_process(int fd, short which, void *arg);
78 static void libevent_tap_process(int fd, short which, void *arg);
79 
80 /*
81  * Initializes a connection queue.
82  */
cq_init(CQ * cq)83 static void cq_init(CQ *cq) {
84     pthread_mutex_init(&cq->lock, NULL);
85     pthread_cond_init(&cq->cond, NULL);
86     cq->head = NULL;
87     cq->tail = NULL;
88 }
89 
90 /*
91  * Looks for an item on a connection queue, but doesn't block if there isn't
92  * one.
93  * Returns the item, or NULL if no item is available
94  */
cq_pop(CQ * cq)95 static CQ_ITEM *cq_pop(CQ *cq) {
96     CQ_ITEM *item;
97 
98     pthread_mutex_lock(&cq->lock);
99     item = cq->head;
100     if (NULL != item) {
101         cq->head = item->next;
102         if (NULL == cq->head)
103             cq->tail = NULL;
104     }
105     pthread_mutex_unlock(&cq->lock);
106 
107     return item;
108 }
109 
110 /*
111  * Adds an item to a connection queue.
112  */
cq_push(CQ * cq,CQ_ITEM * item)113 static void cq_push(CQ *cq, CQ_ITEM *item) {
114     item->next = NULL;
115 
116     pthread_mutex_lock(&cq->lock);
117     if (NULL == cq->tail)
118         cq->head = item;
119     else
120         cq->tail->next = item;
121     cq->tail = item;
122     pthread_cond_signal(&cq->cond);
123     pthread_mutex_unlock(&cq->lock);
124 }
125 
126 /*
127  * Returns a fresh connection queue item.
128  */
cqi_new(void)129 static CQ_ITEM *cqi_new(void) {
130     CQ_ITEM *item = NULL;
131     pthread_mutex_lock(&cqi_freelist_lock);
132     if (cqi_freelist) {
133         item = cqi_freelist;
134         cqi_freelist = item->next;
135     }
136     pthread_mutex_unlock(&cqi_freelist_lock);
137 
138     if (NULL == item) {
139         int i;
140 
141         /* Allocate a bunch of items at once to reduce fragmentation */
142         item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
143         if (NULL == item)
144             return NULL;
145 
146         /*
147          * Link together all the new items except the first one
148          * (which we'll return to the caller) for placement on
149          * the freelist.
150          */
151         for (i = 2; i < ITEMS_PER_ALLOC; i++)
152             item[i - 1].next = &item[i];
153 
154         pthread_mutex_lock(&cqi_freelist_lock);
155         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
156         cqi_freelist = &item[1];
157         pthread_mutex_unlock(&cqi_freelist_lock);
158     }
159 
160     return item;
161 }
162 
163 
164 /*
165  * Frees a connection queue item (adds it to the freelist.)
166  */
cqi_free(CQ_ITEM * item)167 static void cqi_free(CQ_ITEM *item) {
168     pthread_mutex_lock(&cqi_freelist_lock);
169     item->next = cqi_freelist;
170     cqi_freelist = item;
171     pthread_mutex_unlock(&cqi_freelist_lock);
172 }
173 
174 
175 /*
176  * Creates a worker thread.
177  */
create_worker(void * (* func)(void *),void * arg,pthread_t * id)178 static void create_worker(void *(*func)(void *), void *arg, pthread_t *id) {
179     pthread_attr_t  attr;
180     int             ret;
181 
182     pthread_attr_init(&attr);
183 
184     if ((ret = pthread_create(id, &attr, func, arg)) != 0) {
185         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
186                                         "Can't create thread: %s\n",
187                                         strerror(ret));
188         exit(1);
189     }
190 }
191 
192 /****************************** LIBEVENT THREADS *****************************/
193 
create_notification_pipe(LIBEVENT_THREAD * me)194 bool create_notification_pipe(LIBEVENT_THREAD *me)
195 {
196     if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
197                           (void*)me->notify) == SOCKET_ERROR) {
198         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
199                                         "Can't create notify pipe: %s",
200                                         strerror(errno));
201         return false;
202     }
203 
204     for (int j = 0; j < 2; ++j) {
205         int flags = 1;
206         setsockopt(me->notify[j], IPPROTO_TCP,
207                    TCP_NODELAY, (void *)&flags, sizeof(flags));
208         setsockopt(me->notify[j], SOL_SOCKET,
209                    SO_REUSEADDR, (void *)&flags, sizeof(flags));
210 
211 
212         if (evutil_make_socket_nonblocking(me->notify[j]) == -1) {
213             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
214                                             "Failed to enable non-blocking: %s",
215                                             strerror(errno));
216             return false;
217         }
218     }
219     return true;
220 }
221 
setup_dispatcher(struct event_base * main_base,void (* dispatcher_callback)(int,short,void *))222 static void setup_dispatcher(struct event_base *main_base,
223                              void (*dispatcher_callback)(int, short, void *))
224 {
225     memset(&dispatcher_thread, 0, sizeof(dispatcher_thread));
226     dispatcher_thread.type = DISPATCHER;
227     dispatcher_thread.base = main_base;
228     dispatcher_thread.thread_id = pthread_self();
229     if (!create_notification_pipe(&dispatcher_thread)) {
230         exit(1);
231     }
232     /* Listen for notifications from other threads */
233     event_set(&dispatcher_thread.notify_event, dispatcher_thread.notify[0],
234               EV_READ | EV_PERSIST, dispatcher_callback, &dispatcher_callback);
235     event_base_set(dispatcher_thread.base, &dispatcher_thread.notify_event);
236 
237     if (event_add(&dispatcher_thread.notify_event, 0) == -1) {
238         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
239                                         "Can't monitor libevent notify pipe\n");
240         exit(1);
241     }
242 }
243 
244 /*
245  * Set up a thread's information.
246  */
setup_thread(LIBEVENT_THREAD * me,bool tap)247 static void setup_thread(LIBEVENT_THREAD *me, bool tap) {
248     me->type = tap ? TAP : GENERAL;
249     me->base = event_init();
250     if (! me->base) {
251         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
252                                         "Can't allocate event base\n");
253         exit(1);
254     }
255 
256     /* Listen for notifications from other threads */
257     event_set(&me->notify_event, me->notify[0],
258               EV_READ | EV_PERSIST,
259               tap ? libevent_tap_process : thread_libevent_process, me);
260     event_base_set(me->base, &me->notify_event);
261 
262     if (event_add(&me->notify_event, 0) == -1) {
263         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
264                                         "Can't monitor libevent notify pipe\n");
265         exit(1);
266     }
267 
268     if (!tap) {
269         me->new_conn_queue = malloc(sizeof(struct conn_queue));
270         if (me->new_conn_queue == NULL) {
271             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
272                                             "Failed to allocate memory for connection queue");
273             exit(EXIT_FAILURE);
274         }
275         cq_init(me->new_conn_queue);
276     }
277 
278     if ((pthread_mutex_init(&me->mutex, NULL) != 0)) {
279         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
280                                         "Failed to initialize mutex: %s\n",
281                                         strerror(errno));
282         exit(EXIT_FAILURE);
283     }
284 
285     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
286                                     NULL, NULL);
287     if (me->suffix_cache == NULL) {
288         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
289                                         "Failed to create suffix cache\n");
290         exit(EXIT_FAILURE);
291     }
292 }
293 
294 /*
295  * Worker thread: main event loop
296  */
worker_libevent(void * arg)297 static void *worker_libevent(void *arg) {
298     LIBEVENT_THREAD *me = arg;
299 
300     /* Any per-thread setup can happen here; thread_init() will block until
301      * all threads have finished initializing.
302      */
303 
304     pthread_mutex_lock(&init_lock);
305     init_count++;
306     pthread_cond_signal(&init_cond);
307     pthread_mutex_unlock(&init_lock);
308 
309     event_base_loop(me->base, 0);
310 #ifdef INNODB_MEMCACHED
311     if (me->base)
312         event_base_free(me->base);
313 #endif
314 
315     return NULL;
316 }
317 
number_of_pending(conn * c,conn * list)318 int number_of_pending(conn *c, conn *list) {
319     int rv = 0;
320     for (; list; list = list->next) {
321         if (list == c) {
322             rv ++;
323         }
324     }
325     return rv;
326 }
327 
328 /*
329  * Processes an incoming "handle a new connection" item. This is called when
330  * input arrives on the libevent wakeup pipe.
331  */
thread_libevent_process(int fd,short which,void * arg)332 static void thread_libevent_process(int fd, short which, void *arg) {
333     LIBEVENT_THREAD *me = arg;
334     assert(me->type == GENERAL);
335     CQ_ITEM *item;
336 
337     if (recv(fd, devnull, sizeof(devnull), 0) == -1) {
338         if (settings.verbose > 0) {
339             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
340                                             "Can't read from libevent pipe: %s\n",
341                                             strerror(errno));
342         }
343     }
344 
345     if (memcached_shutdown) {
346          event_base_loopbreak(me->base);
347          return ;
348     }
349 
350     while ((item = cq_pop(me->new_conn_queue)) != NULL) {
351         conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
352                            item->read_buffer_size, item->transport, me->base,
353                            NULL);
354         if (c == NULL) {
355             if (IS_UDP(item->transport)) {
356                 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
357                          "Can't listen for events on UDP socket\n");
358                 exit(1);
359             } else {
360                 if (settings.verbose > 0) {
361                     settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
362                             "Can't listen for events on fd %d\n",
363                             item->sfd);
364                 }
365                 closesocket(item->sfd);
366             }
367         } else {
368             assert(c->thread == NULL);
369             c->thread = me;
370         }
371         cqi_free(item);
372     }
373 
374     pthread_mutex_lock(&me->mutex);
375     conn* pending = me->pending_io;
376     me->pending_io = NULL;
377     pthread_mutex_unlock(&me->mutex);
378     while (pending != NULL) {
379         conn *c = pending;
380         assert(me == c->thread);
381         pending = pending->next;
382         c->next = NULL;
383         register_event(c, 0);
384         /*
385          * We don't want the thread to keep on serving all of the data
386          * from the context of the notification pipe, so just let it
387          * run one time to set up the correct mask in libevent
388          */
389         c->nevents = 1;
390        /* c->nevents = settings.reqs_per_event; */
391         while (c->state(c)) {
392             /* do task */
393         }
394     }
395 }
396 
397 extern volatile rel_time_t current_time;
398 
has_cycle(conn * c)399 bool has_cycle(conn *c) {
400     if (!c) {
401         return false;
402     }
403     conn *slowNode, *fastNode1, *fastNode2;
404     slowNode = fastNode1 = fastNode2 = c;
405     while (slowNode && (fastNode1 = fastNode2->next) && (fastNode2 = fastNode1->next)) {
406         if (slowNode == fastNode1 || slowNode == fastNode2) {
407             return true;
408         }
409         slowNode = slowNode->next;
410     }
411     return false;
412 }
413 
list_contains(conn * haystack,conn * needle)414 bool list_contains(conn *haystack, conn *needle) {
415     for (; haystack; haystack = haystack -> next) {
416         if (needle == haystack) {
417             return true;
418         }
419     }
420     return false;
421 }
422 
list_remove(conn * haystack,conn * needle)423 conn* list_remove(conn *haystack, conn *needle) {
424     if (!haystack) {
425         return NULL;
426     }
427 
428     if (haystack == needle) {
429         conn *rv = needle->next;
430         needle->next = NULL;
431         return rv;
432     }
433 
434     haystack->next = list_remove(haystack->next, needle);
435 
436     return haystack;
437 }
438 
list_to_array(conn ** dest,size_t max_items,conn ** l)439 size_t list_to_array(conn **dest, size_t max_items, conn **l) {
440     size_t n_items = 0;
441     for (; *l && n_items < max_items - 1; ++n_items) {
442         dest[n_items] = *l;
443         *l = dest[n_items]->next;
444         dest[n_items]->next = NULL;
445         dest[n_items]->list_state |= LIST_STATE_PROCESSING;
446     }
447     return n_items;
448 }
449 
enlist_conn(conn * c,conn ** list)450 void enlist_conn(conn *c, conn **list) {
451     LIBEVENT_THREAD *thr = c->thread;
452     assert(list == &thr->pending_io || list == &thr->pending_close);
453     if ((c->list_state & LIST_STATE_PROCESSING) == 0) {
454         assert(!list_contains(thr->pending_close, c));
455         assert(!list_contains(thr->pending_io, c));
456         assert(c->next == NULL);
457         c->next = *list;
458         *list = c;
459         assert(list_contains(*list, c));
460         assert(!has_cycle(*list));
461     } else {
462         c->list_state |= (list == &thr->pending_io ?
463                           LIST_STATE_REQ_PENDING_IO :
464                           LIST_STATE_REQ_PENDING_CLOSE);
465     }
466 }
467 
finalize_list(conn ** list,size_t items)468 void finalize_list(conn **list, size_t items) {
469     for (size_t i = 0; i < items; i++) {
470         list[i]->list_state &= ~LIST_STATE_PROCESSING;
471         if (list[i]->sfd != INVALID_SOCKET) {
472             if (list[i]->list_state & LIST_STATE_REQ_PENDING_IO) {
473                 enlist_conn(list[i], &list[i]->thread->pending_io);
474             } else if (list[i]->list_state & LIST_STATE_REQ_PENDING_CLOSE) {
475                 enlist_conn(list[i], &list[i]->thread->pending_close);
476             }
477         }
478         list[i]->list_state = 0;
479     }
480 }
481 
482 
libevent_tap_process(int fd,short which,void * arg)483 static void libevent_tap_process(int fd, short which, void *arg) {
484     LIBEVENT_THREAD *me = arg;
485     assert(me->type == TAP);
486 
487     if (recv(fd, devnull, sizeof(devnull), 0) == -1) {
488         if (settings.verbose > 0) {
489             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
490                                             "Can't read from libevent pipe: %s\n",
491                                             strerror(errno));
492         }
493     }
494 
495     if (memcached_shutdown) {
496         event_base_loopbreak(me->base);
497         return ;
498     }
499 
500     // Do we have pending closes?
501     const size_t max_items = 256;
502     LOCK_THREAD(me);
503     conn *pending_close[max_items];
504     size_t n_pending_close = 0;
505 
506     if (me->pending_close && me->last_checked != current_time) {
507         assert(!has_cycle(me->pending_close));
508         me->last_checked = current_time;
509 
510         n_pending_close = list_to_array(pending_close, max_items,
511                                         &me->pending_close);
512     }
513 
514     // Now copy the pending IO buffer and run them...
515     conn *pending_io[max_items];
516     size_t n_items = list_to_array(pending_io, max_items, &me->pending_io);
517 
518     UNLOCK_THREAD(me);
519     for (size_t i = 0; i < n_items; ++i) {
520         conn *c = pending_io[i];
521 
522         assert(c->thread == me);
523 
524         LOCK_THREAD(c->thread);
525         assert(me == c->thread);
526         settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
527                                         "Processing tap pending_io for %d\n", c->sfd);
528 
529         UNLOCK_THREAD(me);
530         register_event(c, NULL);
531         /*
532          * We don't want the thread to keep on serving all of the data
533          * from the context of the notification pipe, so just let it
534          * run one time to set up the correct mask in libevent
535          */
536         c->nevents = 1;
537         c->which = EV_WRITE;
538         while (c->state(c)) {
539             /* do task */
540         }
541     }
542 
543     /* Close any connections pending close */
544     if (n_pending_close > 0) {
545         for (size_t i = 0; i < n_pending_close; ++i) {
546             conn *ce = pending_close[i];
547             if (ce->refcount == 1) {
548                 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
549                                                 "OK, time to nuke: %p\n",
550                                                 (void*)ce);
551                 assert(ce->next == NULL);
552                 conn_close(ce);
553             } else {
554                 LOCK_THREAD(me);
555                 enlist_conn(ce, &me->pending_close);
556                 UNLOCK_THREAD(me);
557             }
558         }
559     }
560 
561     LOCK_THREAD(me);
562     finalize_list(pending_io, n_items);
563     finalize_list(pending_close, n_pending_close);
564     UNLOCK_THREAD(me);
565 }
566 
is_thread_me(LIBEVENT_THREAD * thr)567 static bool is_thread_me(LIBEVENT_THREAD *thr) {
568 #ifdef __WIN32__
569     pthread_t tid = pthread_self();
570     return(tid.p == thr->thread_id.p && tid.x == thr->thread_id.x);
571 #else
572     return pthread_self() == thr->thread_id;
573 #endif
574 }
575 
notify_io_complete(const void * cookie,ENGINE_ERROR_CODE status)576 void notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status)
577 {
578     if (cookie == NULL) {
579         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
580                                         "notify_io_complete called without a valid cookie (status %x)\n",
581                                         status);
582         return ;
583     }
584 
585     struct conn *conn = (struct conn *)cookie;
586 
587     settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
588                                     "Got notify from %d, status %x\n",
589                                     conn->sfd, status);
590 
591     /*
592     ** TROND:
593     **   I changed the logic for the tap connections so that the core
594     **   issues the ON_DISCONNECT call to the engine instead of trying
595     **   to close the connection. Then it let's the engine have a grace
596     **   period to call notify_io_complete if not it will go ahead and
597     **   kill it.
598     **
599     */
600     if (status == ENGINE_DISCONNECT && conn->thread == tap_thread) {
601         LOCK_THREAD(conn->thread);
602         if (conn->sfd != INVALID_SOCKET) {
603             unregister_event(conn);
604             safe_close(conn->sfd);
605             conn->sfd = INVALID_SOCKET;
606         }
607 
608         settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
609                                         "Immediate close of %p\n",
610                                         conn);
611         conn_set_state(conn, conn_immediate_close);
612 
613         if (!is_thread_me(conn->thread)) {
614             /* kick the thread in the butt */
615             notify_thread(conn->thread);
616         }
617 
618         UNLOCK_THREAD(conn->thread);
619         return;
620     }
621 
622     /*
623     ** There may be a race condition between the engine calling this
624     ** function and the core closing the connection.
625     ** Let's lock the connection structure (this might not be the
626     ** correct one) and re-evaluate.
627     */
628     LIBEVENT_THREAD *thr = conn->thread;
629     if (thr == NULL || (conn->state == conn_closing ||
630                         conn->state == conn_pending_close ||
631                         conn->state == conn_immediate_close)) {
632         return;
633     }
634 
635     int notify = 0;
636 
637     LOCK_THREAD(thr);
638     if (thr != conn->thread || !conn->ewouldblock) {
639         // Ignore
640         UNLOCK_THREAD(thr);
641         return;
642     }
643 
644     conn->aiostat = status;
645 
646     /* Move the connection to the closing state if the engine
647      * wants it to be disconnected
648      */
649     if (status == ENGINE_DISCONNECT) {
650         conn->state = conn_closing;
651         notify = 1;
652         thr->pending_io = list_remove(thr->pending_io, conn);
653         if (number_of_pending(conn, thr->pending_close) == 0) {
654             enlist_conn(conn, &thr->pending_close);
655         }
656     } else {
657         if (number_of_pending(conn, thr->pending_io) +
658             number_of_pending(conn, thr->pending_close) == 0) {
659             if (thr->pending_io == NULL) {
660                 notify = 1;
661             }
662             enlist_conn(conn, &thr->pending_io);
663         }
664     }
665     UNLOCK_THREAD(thr);
666 
667     /* kick the thread in the butt */
668     if (notify) {
669         notify_thread(thr);
670     }
671 }
672 
673 /* Which thread we assigned a connection to most recently. */
674 static int last_thread = -1;
675 
676 /*
677  * Dispatches a new connection to another thread. This is only ever called
678  * from the main thread, either during initialization (for UDP) or because
679  * of an incoming connection.
680  */
dispatch_conn_new(SOCKET sfd,STATE_FUNC init_state,int event_flags,int read_buffer_size,enum network_transport transport)681 void dispatch_conn_new(SOCKET sfd, STATE_FUNC init_state, int event_flags,
682                        int read_buffer_size, enum network_transport transport) {
683     CQ_ITEM *item = cqi_new();
684     int tid = (last_thread + 1) % settings.num_threads;
685 
686     LIBEVENT_THREAD *thread = threads + tid;
687 
688     last_thread = tid;
689 
690     item->sfd = sfd;
691     item->init_state = init_state;
692     item->event_flags = event_flags;
693     item->read_buffer_size = read_buffer_size;
694     item->transport = transport;
695 
696     cq_push(thread->new_conn_queue, item);
697 
698     MEMCACHED_CONN_DISPATCH(sfd, (uintptr_t)thread->thread_id);
699     notify_thread(thread);
700 }
701 
702 /*
703  * Returns true if this is the thread that listens for new TCP connections.
704  */
is_listen_thread()705 int is_listen_thread() {
706 #ifdef __WIN32__
707     pthread_t tid = pthread_self();
708     return(tid.p == dispatcher_thread.thread_id.p && tid.x == dispatcher_thread.thread_id.x);
709 #else
710     return pthread_self() == dispatcher_thread.thread_id;
711 #endif
712 }
713 
notify_dispatcher(void)714 void notify_dispatcher(void) {
715     notify_thread(&dispatcher_thread);
716 }
717 
718 /******************************* GLOBAL STATS ******************************/
719 
STATS_LOCK()720 void STATS_LOCK() {
721     pthread_mutex_lock(&stats_lock);
722 }
723 
STATS_UNLOCK()724 void STATS_UNLOCK() {
725     pthread_mutex_unlock(&stats_lock);
726 }
727 
threadlocal_stats_clear(struct thread_stats * stats)728 void threadlocal_stats_clear(struct thread_stats *stats) {
729     stats->cmd_get = 0;
730     stats->get_misses = 0;
731     stats->delete_misses = 0;
732     stats->incr_misses = 0;
733     stats->decr_misses = 0;
734     stats->incr_hits = 0;
735     stats->decr_hits = 0;
736     stats->cas_misses = 0;
737     stats->bytes_written = 0;
738     stats->bytes_read = 0;
739     stats->cmd_flush = 0;
740     stats->conn_yields = 0;
741     stats->auth_cmds = 0;
742     stats->auth_errors = 0;
743 
744     memset(stats->slab_stats, 0,
745            sizeof(struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES);
746 }
747 
threadlocal_stats_reset(struct thread_stats * thread_stats)748 void threadlocal_stats_reset(struct thread_stats *thread_stats) {
749     int ii;
750     for (ii = 0; ii < settings.num_threads; ++ii) {
751         pthread_mutex_lock(&thread_stats[ii].mutex);
752         threadlocal_stats_clear(&thread_stats[ii]);
753         pthread_mutex_unlock(&thread_stats[ii].mutex);
754     }
755 }
756 
threadlocal_stats_aggregate(struct thread_stats * thread_stats,struct thread_stats * stats)757 void threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats) {
758     int ii, sid;
759     for (ii = 0; ii < settings.num_threads; ++ii) {
760         pthread_mutex_lock(&thread_stats[ii].mutex);
761 
762         stats->cmd_get += thread_stats[ii].cmd_get;
763         stats->get_misses += thread_stats[ii].get_misses;
764         stats->delete_misses += thread_stats[ii].delete_misses;
765         stats->decr_misses += thread_stats[ii].decr_misses;
766         stats->incr_misses += thread_stats[ii].incr_misses;
767         stats->decr_hits += thread_stats[ii].decr_hits;
768         stats->incr_hits += thread_stats[ii].incr_hits;
769         stats->cas_misses += thread_stats[ii].cas_misses;
770         stats->bytes_read += thread_stats[ii].bytes_read;
771         stats->bytes_written += thread_stats[ii].bytes_written;
772         stats->cmd_flush += thread_stats[ii].cmd_flush;
773         stats->conn_yields += thread_stats[ii].conn_yields;
774         stats->auth_cmds += thread_stats[ii].auth_cmds;
775         stats->auth_errors += thread_stats[ii].auth_errors;
776 
777         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
778             stats->slab_stats[sid].cmd_set +=
779                 thread_stats[ii].slab_stats[sid].cmd_set;
780             stats->slab_stats[sid].get_hits +=
781                 thread_stats[ii].slab_stats[sid].get_hits;
782             stats->slab_stats[sid].delete_hits +=
783                 thread_stats[ii].slab_stats[sid].delete_hits;
784             stats->slab_stats[sid].cas_hits +=
785                 thread_stats[ii].slab_stats[sid].cas_hits;
786             stats->slab_stats[sid].cas_badval +=
787                 thread_stats[ii].slab_stats[sid].cas_badval;
788         }
789 
790         pthread_mutex_unlock(&thread_stats[ii].mutex);
791     }
792 }
793 
slab_stats_aggregate(struct thread_stats * stats,struct slab_stats * out)794 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
795     int sid;
796 
797     out->cmd_set = 0;
798     out->get_hits = 0;
799     out->delete_hits = 0;
800     out->cas_hits = 0;
801     out->cas_badval = 0;
802 
803     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
804         out->cmd_set += stats->slab_stats[sid].cmd_set;
805         out->get_hits += stats->slab_stats[sid].get_hits;
806         out->delete_hits += stats->slab_stats[sid].delete_hits;
807         out->cas_hits += stats->slab_stats[sid].cas_hits;
808         out->cas_badval += stats->slab_stats[sid].cas_badval;
809     }
810 }
811 
812 /*
813  * Initializes the thread subsystem, creating various worker threads.
814  *
815  * nthreads  Number of worker event handler threads to spawn
816  * main_base Event base for main thread
817  */
thread_init(int nthr,struct event_base * main_base,void (* dispatcher_callback)(int,short,void *))818 void thread_init(int nthr, struct event_base *main_base,
819                  void (*dispatcher_callback)(int, short, void *)) {
820     int i;
821     nthreads = nthr + 1;
822 
823     pthread_mutex_init(&stats_lock, NULL);
824     pthread_mutex_init(&init_lock, NULL);
825     pthread_cond_init(&init_cond, NULL);
826 
827     pthread_mutex_init(&cqi_freelist_lock, NULL);
828     cqi_freelist = NULL;
829 
830     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
831     if (! threads) {
832         settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
833                                         "Can't allocate thread descriptors: %s",
834                                         strerror(errno));
835         exit(1);
836     }
837     thread_ids = calloc(nthreads, sizeof(pthread_t));
838     if (! thread_ids) {
839         perror("Can't allocate thread descriptors");
840         exit(1);
841     }
842 
843     setup_dispatcher(main_base, dispatcher_callback);
844 
845     for (i = 0; i < nthreads; i++) {
846         if (!create_notification_pipe(&threads[i])) {
847             exit(1);
848         }
849         threads[i].index = i;
850 
851         setup_thread(&threads[i], i == (nthreads - 1));
852     }
853 
854     /* Create threads after we've done all the libevent setup. */
855     for (i = 0; i < nthreads; i++) {
856         create_worker(worker_libevent, &threads[i], &thread_ids[i]);
857         threads[i].thread_id = thread_ids[i];
858     }
859 
860     tap_thread = &threads[nthreads - 1];
861 
862     /* Wait for all the threads to set themselves up before returning. */
863     pthread_mutex_lock(&init_lock);
864     while (init_count < nthreads) {
865         pthread_cond_wait(&init_cond, &init_lock);
866     }
867     pthread_mutex_unlock(&init_lock);
868 }
869 
threads_shutdown(void)870 void threads_shutdown(void)
871 {
872     for (int ii = 0; ii < nthreads; ++ii) {
873         notify_thread(&threads[ii]);
874         pthread_join(thread_ids[ii], NULL);
875     }
876     for (int ii = 0; ii < nthreads; ++ii) {
877         safe_close(threads[ii].notify[0]);
878         safe_close(threads[ii].notify[1]);
879     }
880 
881 #ifdef INNODB_MEMCACHED
882     if (dispatcher_thread.notify[0])
883         closesocket(dispatcher_thread.notify[0]);
884     if (dispatcher_thread.notify[1])
885         closesocket(dispatcher_thread.notify[1]);
886 #endif
887 }
888 
notify_thread(LIBEVENT_THREAD * thread)889 void notify_thread(LIBEVENT_THREAD *thread) {
890     if (send(thread->notify[1], "", 1, 0) != 1) {
891         if (thread == tap_thread) {
892             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
893                                             "Failed to notify TAP thread: %s",
894                                             strerror(errno));
895         } else {
896             settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
897                                             "Failed to notify thread: %s",
898                                             strerror(errno));
899         }
900     }
901 }
902