1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * memcached - memory caching daemon
4 *
5 * http://www.danga.com/memcached/
6 *
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
8 *
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
11 *
12 * Authors:
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
15 */
16 #include "config.h"
17 #include "memcached.h"
18 #include "memcached/extension_loggers.h"
19 #include "utilities/engine_loader.h"
20
21 #include <signal.h>
22 #include <getopt.h>
23 #include <fcntl.h>
24 #include <errno.h>
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <time.h>
29 #include <assert.h>
30 #include <limits.h>
31 #include <ctype.h>
32 #include <stdarg.h>
33 #include <stddef.h>
34
item_set_cas(const void * cookie,item * it,uint64_t cas)35 static inline void item_set_cas(const void *cookie, item *it, uint64_t cas) {
36 settings.engine.v1->item_set_cas(settings.engine.v0, cookie, it, cas);
37 }
38
39 /* The item must always be called "it" */
40 #define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
41 thread_stats->slab_stats[info.clsid].slab_op++;
42
43 #define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
44 thread_stats->thread_op++;
45
46 #define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \
47 thread_stats->slab_op++; \
48 thread_stats->thread_op++;
49
50 #define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
51 SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
52 THREAD_GUTS(conn, thread_stats, slab_op, thread_op)
53
54 #define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \
55 struct independent_stats *independent_stats = get_independent_stats(conn); \
56 struct thread_stats *thread_stats = \
57 &independent_stats->thread_stats[conn->thread->index]; \
58 topkeys_t *topkeys = independent_stats->topkeys; \
59 pthread_mutex_lock(&thread_stats->mutex); \
60 GUTS(conn, thread_stats, slab_op, thread_op); \
61 pthread_mutex_unlock(&thread_stats->mutex); \
62 TK(topkeys, slab_op, key, nkey, current_time); \
63 }
64
65 #define STATS_INCR(conn, op, key, nkey) \
66 STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey)
67
68 #define SLAB_INCR(conn, op, key, nkey) \
69 STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey)
70
71 #define STATS_TWO(conn, slab_op, thread_op, key, nkey) \
72 STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey)
73
74 #define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \
75 STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey)
76
77 #define STATS_HIT(conn, op, key, nkey) \
78 SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey)
79
80 #define STATS_MISS(conn, op, key, nkey) \
81 STATS_TWO(conn, op##_misses, cmd_##op, key, nkey)
82
83 #define STATS_NOKEY(conn, op) { \
84 struct thread_stats *thread_stats = \
85 get_thread_stats(conn); \
86 pthread_mutex_lock(&thread_stats->mutex); \
87 thread_stats->op++; \
88 pthread_mutex_unlock(&thread_stats->mutex); \
89 }
90
91 #define STATS_NOKEY2(conn, op1, op2) { \
92 struct thread_stats *thread_stats = \
93 get_thread_stats(conn); \
94 pthread_mutex_lock(&thread_stats->mutex); \
95 thread_stats->op1++; \
96 thread_stats->op2++; \
97 pthread_mutex_unlock(&thread_stats->mutex); \
98 }
99
100 #define STATS_ADD(conn, op, amt) { \
101 struct thread_stats *thread_stats = \
102 get_thread_stats(conn); \
103 pthread_mutex_lock(&thread_stats->mutex); \
104 thread_stats->op += amt; \
105 pthread_mutex_unlock(&thread_stats->mutex); \
106 }
107
108 volatile sig_atomic_t memcached_shutdown;
109
110 /*
111 * We keep the current time of day in a global variable that's updated by a
112 * timer event. This saves us a bunch of time() system calls (we really only
113 * need to get the time once a second, whereas there can be tens of thousands
114 * of requests a second) and allows us to use server-start-relative timestamps
115 * rather than absolute UNIX timestamps, a space savings on systems where
116 * sizeof(time_t) > sizeof(unsigned int).
117 */
118 volatile rel_time_t current_time;
119
120 /*
121 * forward declarations
122 */
123 static SOCKET new_socket(struct addrinfo *ai);
124 static int try_read_command(conn *c);
125 static inline struct independent_stats *get_independent_stats(conn *c);
126 static inline struct thread_stats *get_thread_stats(conn *c);
127 static void register_callback(ENGINE_HANDLE *eh,
128 ENGINE_EVENT_TYPE type,
129 EVENT_CALLBACK cb, const void *cb_data);
130
131
132 enum try_read_result {
133 READ_DATA_RECEIVED,
134 READ_NO_DATA_RECEIVED,
135 READ_ERROR, /** an error occured (on the socket) (or client closed connection) */
136 READ_MEMORY_ERROR /** failed to allocate more memory */
137 };
138
139 static enum try_read_result try_read_network(conn *c);
140 static enum try_read_result try_read_udp(conn *c);
141
142 /* stats */
143 static void stats_init(void);
144 static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate);
145 static void process_stat_settings(ADD_STAT add_stats, void *c);
146
147
148 /* defaults */
149 static void settings_init(void);
150
151 /* event handling, network IO */
152 static void event_handler(const int fd, const short which, void *arg);
153 static void complete_nread(conn *c);
154 static char *process_command(conn *c, char *command);
155 static void write_and_free(conn *c, char *buf, int bytes);
156 static int ensure_iov_space(conn *c);
157 static int add_iov(conn *c, const void *buf, int len);
158 static int add_msghdr(conn *c);
159
160
161 /* time handling */
162 static void set_current_time(void); /* update the global variable holding
163 global 32-bit seconds-since-start time
164 (to avoid 64 bit time_t) */
165
166 /** exported globals **/
167 struct stats stats;
168 struct settings settings;
169 static time_t process_started; /* when the process was started */
170
171 /** file scope variables **/
172 static conn *listen_conn = NULL;
173 static struct event_base *main_base;
174 static struct independent_stats *default_independent_stats;
175
176 static struct engine_event_handler *engine_event_handlers[MAX_ENGINE_EVENT_TYPE + 1];
177
178 enum transmit_result {
179 TRANSMIT_COMPLETE, /** All done writing. */
180 TRANSMIT_INCOMPLETE, /** More data remaining to write. */
181 TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
182 TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
183 };
184
185 static enum transmit_result transmit(conn *c);
186
187 #define REALTIME_MAXDELTA 60*60*24*30
188
189 // Perform all callbacks of a given type for the given connection.
perform_callbacks(ENGINE_EVENT_TYPE type,const void * data,const void * c)190 static void perform_callbacks(ENGINE_EVENT_TYPE type,
191 const void *data,
192 const void *c) {
193 for (struct engine_event_handler *h = engine_event_handlers[type];
194 h; h = h->next) {
195 h->cb(c, type, data, h->cb_data);
196 }
197 }
198
199 /*
200 * given time value that's either unix time or delta from current unix time,
201 * return unix time. Use the fact that delta can't exceed one month
202 * (and real time value can't be that low).
203 */
realtime(const time_t exptime)204 static rel_time_t realtime(const time_t exptime) {
205 /* no. of seconds in 30 days - largest possible delta exptime */
206
207 if (exptime == 0) return 0; /* 0 means never expire */
208
209 if (exptime > REALTIME_MAXDELTA) {
210 /* if item expiration is at/before the server started, give it an
211 expiration time of 1 second after the server started.
212 (because 0 means don't expire). without this, we'd
213 underflow and wrap around to some large value way in the
214 future, effectively making items expiring in the past
215 really expiring never */
216 if (exptime <= process_started)
217 return (rel_time_t)1;
218 return (rel_time_t)(exptime - process_started);
219 } else {
220 return (rel_time_t)(exptime + current_time);
221 }
222 }
223
224 /**
225 * Convert the relative time to an absolute time (relative to EPOC ;) )
226 */
abstime(const rel_time_t exptime)227 static time_t abstime(const rel_time_t exptime)
228 {
229 return process_started + exptime;
230 }
231
stats_init(void)232 static void stats_init(void) {
233 stats.daemon_conns = 0;
234 stats.rejected_conns = 0;
235 stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
236
237 stats_prefix_init();
238 }
239
stats_reset(const void * cookie)240 static void stats_reset(const void *cookie) {
241 struct conn *conn = (struct conn*)cookie;
242 STATS_LOCK();
243 stats.rejected_conns = 0;
244 stats.total_conns = 0;
245 stats_prefix_clear();
246 STATS_UNLOCK();
247 threadlocal_stats_reset(get_independent_stats(conn)->thread_stats);
248 settings.engine.v1->reset_stats(settings.engine.v0, cookie);
249 }
250
settings_init(void)251 static void settings_init(void) {
252 settings.use_cas = true;
253 settings.access = 0700;
254 settings.port = 11211;
255 settings.udpport = 11211;
256 /* By default this string should be NULL for getaddrinfo() */
257 settings.inter = NULL;
258 settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
259 settings.maxconns = 1000; /* to limit connections-related memory to about 5MB */
260 settings.verbose = 0;
261 settings.oldest_live = 0;
262 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
263 settings.socketpath = NULL; /* by default, not using a unix socket */
264 settings.factor = 1.25;
265 settings.chunk_size = 48; /* space for a modest key and value */
266 settings.num_threads = 4; /* N workers */
267 settings.num_threads_per_udp = 0;
268 settings.prefix_delimiter = ':';
269 settings.detail_enabled = 0;
270 settings.allow_detailed = true;
271 settings.reqs_per_event = DEFAULT_REQS_PER_EVENT;
272 settings.backlog = 1024;
273 settings.binding_protocol = negotiating_prot;
274 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
275 settings.topkeys = 0;
276 settings.require_sasl = false;
277 settings.extensions.logger = get_stderr_logger();
278 }
279
280 /*
281 * Adds a message header to a connection.
282 *
283 * Returns 0 on success, -1 on out-of-memory.
284 */
add_msghdr(conn * c)285 static int add_msghdr(conn *c)
286 {
287 struct msghdr *msg;
288
289 assert(c != NULL);
290
291 if (c->msgsize == c->msgused) {
292 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
293 if (! msg)
294 return -1;
295 c->msglist = msg;
296 c->msgsize *= 2;
297 }
298
299 msg = c->msglist + c->msgused;
300
301 /* this wipes msg_iovlen, msg_control, msg_controllen, and
302 msg_flags, the last 3 of which aren't defined on solaris: */
303 memset(msg, 0, sizeof(struct msghdr));
304
305 msg->msg_iov = &c->iov[c->iovused];
306
307 if (c->request_addr_size > 0) {
308 msg->msg_name = &c->request_addr;
309 msg->msg_namelen = c->request_addr_size;
310 }
311
312 c->msgbytes = 0;
313 c->msgused++;
314
315 if (IS_UDP(c->transport)) {
316 /* Leave room for the UDP header, which we'll fill in later. */
317 return add_iov(c, NULL, UDP_HEADER_SIZE);
318 }
319
320 return 0;
321 }
322
prot_text(enum protocol prot)323 static const char *prot_text(enum protocol prot) {
324 const char *rv = "unknown";
325 switch(prot) {
326 case ascii_prot:
327 rv = "ascii";
328 break;
329 case binary_prot:
330 rv = "binary";
331 break;
332 case negotiating_prot:
333 rv = "auto-negotiate";
334 break;
335 }
336 return rv;
337 }
338
339 struct {
340 pthread_mutex_t mutex;
341 bool disabled;
342 ssize_t count;
343 uint64_t num_disable;
344 } listen_state = { .mutex = PTHREAD_MUTEX_INITIALIZER };
345
is_listen_disabled(void)346 static bool is_listen_disabled(void) {
347 bool ret;
348 pthread_mutex_lock(&listen_state.mutex);
349 ret = listen_state.disabled;
350 pthread_mutex_unlock(&listen_state.mutex);
351 return ret;
352 }
353
get_listen_disabled_num(void)354 static uint64_t get_listen_disabled_num(void) {
355 uint64_t ret;
356 pthread_mutex_lock(&listen_state.mutex);
357 ret = listen_state.num_disable;
358 pthread_mutex_unlock(&listen_state.mutex);
359 return ret;
360 }
361
disable_listen(void)362 static void disable_listen(void) {
363 conn *next;
364 pthread_mutex_lock(&listen_state.mutex);
365 listen_state.disabled = true;
366 listen_state.count = 10;
367 ++listen_state.num_disable;
368 pthread_mutex_unlock(&listen_state.mutex);
369
370 for (next = listen_conn; next; next = next->next) {
371 update_event(next, 0);
372 if (listen(next->sfd, 1) != 0) {
373 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
374 "listen() failed",
375 strerror(errno));
376 }
377 }
378 }
379
safe_close(SOCKET sfd)380 void safe_close(SOCKET sfd) {
381 if (sfd != INVALID_SOCKET) {
382 int rval;
383 while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
384 (errno == EINTR || errno == EAGAIN)) {
385 /* go ahead and retry */
386 }
387
388 if (rval == SOCKET_ERROR) {
389 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
390 "Failed to close socket %d (%s)!!\n", (int)sfd,
391 strerror(errno));
392 } else {
393 STATS_LOCK();
394 stats.curr_conns--;
395 STATS_UNLOCK();
396
397 if (is_listen_disabled()) {
398 notify_dispatcher();
399 }
400 }
401 }
402 }
403
404 /*
405 * Free list management for connections.
406 */
407 cache_t *conn_cache; /* suffix cache */
408
409 /**
410 * Reset all of the dynamic buffers used by a connection back to their
411 * default sizes. The strategy for resizing the buffers is to allocate a
412 * new one of the correct size and free the old one if the allocation succeeds
413 * instead of using realloc to change the buffer size (because realloc may
414 * not shrink the buffers, and will also copy the memory). If the allocation
415 * fails the buffer will be unchanged.
416 *
417 * @param c the connection to resize the buffers for
418 * @return true if all allocations succeeded, false if one or more of the
419 * allocations failed.
420 */
conn_reset_buffersize(conn * c)421 static bool conn_reset_buffersize(conn *c) {
422 bool ret = true;
423
424 if (c->rsize != DATA_BUFFER_SIZE) {
425 void *ptr = malloc(DATA_BUFFER_SIZE);
426 if (ptr != NULL) {
427 free(c->rbuf);
428 c->rbuf = ptr;
429 c->rsize = DATA_BUFFER_SIZE;
430 } else {
431 ret = false;
432 }
433 }
434
435 if (c->wsize != DATA_BUFFER_SIZE) {
436 void *ptr = malloc(DATA_BUFFER_SIZE);
437 if (ptr != NULL) {
438 free(c->wbuf);
439 c->wbuf = ptr;
440 c->wsize = DATA_BUFFER_SIZE;
441 } else {
442 ret = false;
443 }
444 }
445
446 if (c->isize != ITEM_LIST_INITIAL) {
447 void *ptr = malloc(sizeof(item *) * ITEM_LIST_INITIAL);
448 if (ptr != NULL) {
449 free(c->ilist);
450 c->ilist = ptr;
451 c->isize = ITEM_LIST_INITIAL;
452 } else {
453 ret = false;
454 }
455 }
456
457 if (c->suffixsize != SUFFIX_LIST_INITIAL) {
458 void *ptr = malloc(sizeof(char *) * SUFFIX_LIST_INITIAL);
459 if (ptr != NULL) {
460 free(c->suffixlist);
461 c->suffixlist = ptr;
462 c->suffixsize = SUFFIX_LIST_INITIAL;
463 } else {
464 ret = false;
465 }
466 }
467
468 if (c->iovsize != IOV_LIST_INITIAL) {
469 void *ptr = malloc(sizeof(struct iovec) * IOV_LIST_INITIAL);
470 if (ptr != NULL) {
471 free(c->iov);
472 c->iov = ptr;
473 c->iovsize = IOV_LIST_INITIAL;
474 } else {
475 ret = false;
476 }
477 }
478
479 if (c->msgsize != MSG_LIST_INITIAL) {
480 void *ptr = malloc(sizeof(struct msghdr) * MSG_LIST_INITIAL);
481 if (ptr != NULL) {
482 free(c->msglist);
483 c->msglist = ptr;
484 c->msgsize = MSG_LIST_INITIAL;
485 } else {
486 ret = false;
487 }
488 }
489
490 return ret;
491 }
492
493 /**
494 * Constructor for all memory allocations of connection objects. Initialize
495 * all members and allocate the transfer buffers.
496 *
497 * @param buffer The memory allocated by the object cache
498 * @param unused1 not used
499 * @param unused2 not used
500 * @return 0 on success, 1 if we failed to allocate memory
501 */
conn_constructor(void * buffer,void * unused1,int unused2)502 static int conn_constructor(void *buffer, void *unused1, int unused2) {
503 conn *c = buffer;
504 memset(c, 0, sizeof(*c));
505 MEMCACHED_CONN_CREATE(c);
506 (void)unused1; (void)unused2;
507
508 if (!conn_reset_buffersize(c)) {
509 free(c->rbuf);
510 free(c->wbuf);
511 free(c->ilist);
512 free(c->suffixlist);
513 free(c->iov);
514 free(c->msglist);
515 settings.extensions.logger->log(EXTENSION_LOG_WARNING,
516 NULL,
517 "Failed to allocate buffers for connection\n");
518 return 1;
519 }
520
521 STATS_LOCK();
522 stats.conn_structs++;
523 STATS_UNLOCK();
524
525 return 0;
526 }
527
528 /**
529 * Destructor for all connection objects. Release all allocated resources.
530 *
531 * @param buffer The memory allocated by the objec cache
532 * @param unused not used
533 */
conn_destructor(void * buffer,void * unused)534 static void conn_destructor(void *buffer, void *unused) {
535 conn *c = buffer;
536 free(c->rbuf);
537 free(c->wbuf);
538 free(c->ilist);
539 free(c->suffixlist);
540 free(c->iov);
541 free(c->msglist);
542
543 STATS_LOCK();
544 stats.conn_structs--;
545 STATS_UNLOCK();
546 (void)unused;
547 }
548
conn_new(const SOCKET sfd,STATE_FUNC init_state,const int event_flags,const int read_buffer_size,enum network_transport transport,struct event_base * base,struct timeval * timeout)549 conn *conn_new(const SOCKET sfd, STATE_FUNC init_state,
550 const int event_flags,
551 const int read_buffer_size, enum network_transport transport,
552 struct event_base *base, struct timeval *timeout) {
553 conn *c = cache_alloc(conn_cache);
554 if (c == NULL) {
555 return NULL;
556 }
557
558 assert(c->thread == NULL);
559
560 if (c->rsize < read_buffer_size) {
561 void *mem = malloc(read_buffer_size);
562 if (mem) {
563 c->rsize = read_buffer_size;
564 free(c->rbuf);
565 c->rbuf = mem;
566 } else {
567 assert(c->thread == NULL);
568 cache_free(conn_cache, c);
569 return NULL;
570 }
571 }
572
573 c->transport = transport;
574 c->protocol = settings.binding_protocol;
575
576 if (IS_UDP(transport)) {
577 c->request_addr_size = sizeof(c->request_addr);
578 } else {
579 c->request_addr_size = 0;
580 }
581
582 if (settings.verbose > 1) {
583 if (init_state == conn_listening) {
584 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
585 "<%d server listening (%s)\n", sfd,
586 prot_text(c->protocol));
587 } else if (IS_UDP(transport)) {
588 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
589 "<%d server listening (udp)\n", sfd);
590 } else if (c->protocol == negotiating_prot) {
591 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
592 "<%d new auto-negotiating client connection\n",
593 sfd);
594 } else if (c->protocol == ascii_prot) {
595 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
596 "<%d new ascii client connection.\n", sfd);
597 } else if (c->protocol == binary_prot) {
598 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
599 "<%d new binary client connection.\n", sfd);
600 } else {
601 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
602 "<%d new unknown (%d) client connection\n",
603 sfd, c->protocol);
604 assert(false);
605 }
606 }
607
608 c->sfd = sfd;
609 c->state = init_state;
610 c->rlbytes = 0;
611 c->cmd = -1;
612 c->ascii_cmd = NULL;
613 c->rbytes = c->wbytes = 0;
614 c->wcurr = c->wbuf;
615 c->rcurr = c->rbuf;
616 c->ritem = 0;
617 c->icurr = c->ilist;
618 c->suffixcurr = c->suffixlist;
619 c->ileft = 0;
620 c->suffixleft = 0;
621 c->iovused = 0;
622 c->msgcurr = 0;
623 c->msgused = 0;
624 c->next = NULL;
625 c->list_state = 0;
626
627 c->write_and_go = init_state;
628 c->write_and_free = 0;
629 c->item = 0;
630
631 c->noreply = false;
632
633 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
634 event_base_set(base, &c->event);
635 c->ev_flags = event_flags;
636
637 if (!register_event(c, timeout)) {
638 assert(c->thread == NULL);
639 cache_free(conn_cache, c);
640 return NULL;
641 }
642
643 STATS_LOCK();
644 stats.total_conns++;
645 STATS_UNLOCK();
646
647 c->aiostat = ENGINE_SUCCESS;
648 c->ewouldblock = false;
649 c->refcount = 1;
650
651 MEMCACHED_CONN_ALLOCATE(c->sfd);
652
653 perform_callbacks(ON_CONNECT, NULL, c);
654
655 return c;
656 }
657
conn_cleanup(conn * c)658 static void conn_cleanup(conn *c) {
659 assert(c != NULL);
660
661 if (c->item) {
662 settings.engine.v1->release(settings.engine.v0, c, c->item);
663 c->item = 0;
664 }
665
666 if (c->ileft != 0) {
667 for (; c->ileft > 0; c->ileft--,c->icurr++) {
668 settings.engine.v1->release(settings.engine.v0, c, *(c->icurr));
669 }
670 }
671
672 if (c->suffixleft != 0) {
673 for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
674 cache_free(c->thread->suffix_cache, *(c->suffixcurr));
675 }
676 }
677
678 if (c->write_and_free) {
679 free(c->write_and_free);
680 c->write_and_free = 0;
681 }
682
683 if (c->sasl_conn) {
684 sasl_dispose(&c->sasl_conn);
685 c->sasl_conn = NULL;
686 }
687
688 c->engine_storage = NULL;
689 c->tap_iterator = NULL;
690 c->thread = NULL;
691 assert(c->next == NULL);
692 c->ascii_cmd = NULL;
693 c->sfd = INVALID_SOCKET;
694 }
695
conn_close(conn * c)696 void conn_close(conn *c) {
697 assert(c != NULL);
698 assert(c->sfd == INVALID_SOCKET);
699
700 if (c->ascii_cmd != NULL) {
701 c->ascii_cmd->abort(c->ascii_cmd, c);
702 }
703
704 assert(c->thread);
705 LOCK_THREAD(c->thread);
706 /* remove from pending-io list */
707 if (settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
708 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
709 "Current connection was in the pending-io list.. Nuking it\n");
710 }
711 c->thread->pending_io = list_remove(c->thread->pending_io, c);
712 c->thread->pending_close = list_remove(c->thread->pending_close, c);
713 UNLOCK_THREAD(c->thread);
714
715 conn_cleanup(c);
716
717 /*
718 * The contract with the object cache is that we should return the
719 * object in a constructed state. Reset the buffers to the default
720 * size
721 */
722 conn_reset_buffersize(c);
723 assert(c->thread == NULL);
724 cache_free(conn_cache, c);
725 }
726
727 /*
728 * Shrinks a connection's buffers if they're too big. This prevents
729 * periodic large "get" requests from permanently chewing lots of server
730 * memory.
731 *
732 * This should only be called in between requests since it can wipe output
733 * buffers!
734 */
conn_shrink(conn * c)735 static void conn_shrink(conn *c) {
736 assert(c != NULL);
737
738 if (IS_UDP(c->transport))
739 return;
740
741 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
742 char *newbuf;
743
744 if (c->rcurr != c->rbuf)
745 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
746
747 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
748
749 if (newbuf) {
750 c->rbuf = newbuf;
751 c->rsize = DATA_BUFFER_SIZE;
752 }
753 /* TODO check other branch... */
754 c->rcurr = c->rbuf;
755 }
756
757 if (c->isize > ITEM_LIST_HIGHWAT) {
758 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
759 if (newbuf) {
760 c->ilist = newbuf;
761 c->isize = ITEM_LIST_INITIAL;
762 }
763 /* TODO check error condition? */
764 }
765
766 if (c->msgsize > MSG_LIST_HIGHWAT) {
767 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
768 if (newbuf) {
769 c->msglist = newbuf;
770 c->msgsize = MSG_LIST_INITIAL;
771 }
772 /* TODO check error condition? */
773 }
774
775 if (c->iovsize > IOV_LIST_HIGHWAT) {
776 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
777 if (newbuf) {
778 c->iov = newbuf;
779 c->iovsize = IOV_LIST_INITIAL;
780 }
781 /* TODO check return value */
782 }
783 }
784
785 /**
786 * Convert a state name to a human readable form.
787 */
state_text(STATE_FUNC state)788 const char *state_text(STATE_FUNC state) {
789 if (state == conn_listening) {
790 return "conn_listening";
791 } else if (state == conn_new_cmd) {
792 return "conn_new_cmd";
793 } else if (state == conn_waiting) {
794 return "conn_waiting";
795 } else if (state == conn_read) {
796 return "conn_read";
797 } else if (state == conn_parse_cmd) {
798 return "conn_parse_cmd";
799 } else if (state == conn_write) {
800 return "conn_write";
801 } else if (state == conn_nread) {
802 return "conn_nread";
803 } else if (state == conn_swallow) {
804 return "conn_swallow";
805 } else if (state == conn_closing) {
806 return "conn_closing";
807 } else if (state == conn_mwrite) {
808 return "conn_mwrite";
809 } else if (state == conn_ship_log) {
810 return "conn_ship_log";
811 } else if (state == conn_add_tap_client) {
812 return "conn_add_tap_client";
813 } else if (state == conn_setup_tap_stream) {
814 return "conn_setup_tap_stream";
815 } else if (state == conn_pending_close) {
816 return "conn_pending_close";
817 } else if (state == conn_immediate_close) {
818 return "conn_immediate_close";
819 } else {
820 return "Unknown";
821 }
822 }
823
824 /*
825 * Sets a connection's current state in the state machine. Any special
826 * processing that needs to happen on certain state transitions can
827 * happen here.
828 */
conn_set_state(conn * c,STATE_FUNC state)829 void conn_set_state(conn *c, STATE_FUNC state) {
830 assert(c != NULL);
831
832 if (state != c->state) {
833 /*
834 * The connections in the "tap thread" behaves differently than
835 * normal connections because they operate in a full duplex mode.
836 * New messages may appear from both sides, so we can't block on
837 * read from the nework / engine
838 */
839 if (c->thread == tap_thread) {
840 if (state == conn_waiting) {
841 c->which = EV_WRITE;
842 state = conn_ship_log;
843 }
844 }
845
846 if (settings.verbose > 2 || c->state == conn_closing
847 || c->state == conn_add_tap_client) {
848 settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
849 "%d: going from %s to %s\n",
850 c->sfd, state_text(c->state),
851 state_text(state));
852 }
853
854 if (state == conn_write || state == conn_mwrite) {
855 MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
856 }
857
858 c->state = state;
859 }
860 }
861
862 /*
863 * Ensures that there is room for another struct iovec in a connection's
864 * iov list.
865 *
866 * Returns 0 on success, -1 on out-of-memory.
867 */
ensure_iov_space(conn * c)868 static int ensure_iov_space(conn *c) {
869 assert(c != NULL);
870
871 if (c->iovused >= c->iovsize) {
872 int i, iovnum;
873 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
874 (c->iovsize * 2) * sizeof(struct iovec));
875 if (! new_iov)
876 return -1;
877 c->iov = new_iov;
878 c->iovsize *= 2;
879
880 /* Point all the msghdr structures at the new list. */
881 for (i = 0, iovnum = 0; i < c->msgused; i++) {
882 c->msglist[i].msg_iov = &c->iov[iovnum];
883 iovnum += c->msglist[i].msg_iovlen;
884 }
885 }
886
887 return 0;
888 }
889
890
891 /*
892 * Adds data to the list of pending data that will be written out to a
893 * connection.
894 *
895 * Returns 0 on success, -1 on out-of-memory.
896 */
897
add_iov(conn * c,const void * buf,int len)898 static int add_iov(conn *c, const void *buf, int len) {
899 struct msghdr *m;
900 int leftover;
901 bool limit_to_mtu;
902
903 assert(c != NULL);
904
905 do {
906 m = &c->msglist[c->msgused - 1];
907
908 /*
909 * Limit UDP packets, and the first payloads of TCP replies, to
910 * UDP_MAX_PAYLOAD_SIZE bytes.
911 */
912 limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
913
914 /* We may need to start a new msghdr if this one is full. */
915 if (m->msg_iovlen == IOV_MAX ||
916 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
917 add_msghdr(c);
918 m = &c->msglist[c->msgused - 1];
919 }
920
921 if (ensure_iov_space(c) != 0)
922 return -1;
923
924 /* If the fragment is too big to fit in the datagram, split it up */
925 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
926 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
927 len -= leftover;
928 } else {
929 leftover = 0;
930 }
931
932 m = &c->msglist[c->msgused - 1];
933 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
934 m->msg_iov[m->msg_iovlen].iov_len = len;
935
936 c->msgbytes += len;
937 c->iovused++;
938 m->msg_iovlen++;
939
940 buf = ((char *)buf) + len;
941 len = leftover;
942 } while (leftover > 0);
943
944 return 0;
945 }
946
947
948 /*
949 * Constructs a set of UDP headers and attaches them to the outgoing messages.
950 */
build_udp_headers(conn * c)951 static int build_udp_headers(conn *c) {
952 int i;
953 unsigned char *hdr;
954
955 assert(c != NULL);
956
957 if (c->msgused > c->hdrsize) {
958 void *new_hdrbuf;
959 if (c->hdrbuf)
960 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
961 else
962 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
963 if (! new_hdrbuf)
964 return -1;
965 c->hdrbuf = (unsigned char *)new_hdrbuf;
966 c->hdrsize = c->msgused * 2;
967 }
968
969 hdr = c->hdrbuf;
970 for (i = 0; i < c->msgused; i++) {
971 c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
972 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
973 *hdr++ = c->request_id / 256;
974 *hdr++ = c->request_id % 256;
975 *hdr++ = i / 256;
976 *hdr++ = i % 256;
977 *hdr++ = c->msgused / 256;
978 *hdr++ = c->msgused % 256;
979 *hdr++ = 0;
980 *hdr++ = 0;
981 assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
982 }
983
984 return 0;
985 }
986
987
out_string(conn * c,const char * str)988 static void out_string(conn *c, const char *str) {
989 size_t len;
990
991 assert(c != NULL);
992
993 if (c->noreply) {
994 if (settings.verbose > 1) {
995 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
996 ">%d NOREPLY %s\n", c->sfd, str);
997 }
998 c->noreply = false;
999 if (c->sbytes > 0) {
1000 conn_set_state(c, conn_swallow);
1001 } else {
1002 conn_set_state(c, conn_new_cmd);
1003 }
1004 return;
1005 }
1006
1007 if (settings.verbose > 1) {
1008 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1009 ">%d %s\n", c->sfd, str);
1010 }
1011
1012 /* Nuke a partial output... */
1013 c->msgcurr = 0;
1014 c->msgused = 0;
1015 c->iovused = 0;
1016 add_msghdr(c);
1017
1018 len = strlen(str);
1019 if ((len + 2) > c->wsize) {
1020 /* ought to be always enough. just fail for simplicity */
1021 str = "SERVER_ERROR output line too long";
1022 len = strlen(str);
1023 }
1024
1025 memcpy(c->wbuf, str, len);
1026 memcpy(c->wbuf + len, "\r\n", 2);
1027 c->wbytes = len + 2;
1028 c->wcurr = c->wbuf;
1029
1030 conn_set_state(c, conn_write);
1031
1032 if (c->sbytes > 0) {
1033 c->write_and_go = conn_swallow;
1034 } else {
1035 c->write_and_go = conn_new_cmd;
1036 }
1037
1038 return;
1039 }
1040
1041 /*
1042 * we get here after reading the value in set/add/replace commands. The command
1043 * has been stored in c->cmd, and the item is ready in c->item.
1044 */
complete_update_ascii(conn * c)1045 static void complete_update_ascii(conn *c) {
1046 assert(c != NULL);
1047
1048 item *it = c->item;
1049 item_info info = { .nvalue = 1 };
1050 if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
1051 settings.engine.v1->release(settings.engine.v0, c, it);
1052 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1053 "%d: Failed to get item info\n",
1054 c->sfd);
1055 out_string(c, "SERVER_ERROR failed to get item details");
1056 return;
1057 }
1058
1059 c->sbytes = 2; // swallow \r\n
1060 ENGINE_ERROR_CODE ret = c->aiostat;
1061 c->aiostat = ENGINE_SUCCESS;
1062 if (ret == ENGINE_SUCCESS) {
1063 ret = settings.engine.v1->store(settings.engine.v0, c, it, &c->cas,
1064 c->store_op, 0);
1065 }
1066
1067 #ifdef ENABLE_DTRACE
1068 switch (c->store_op) {
1069 case OPERATION_ADD:
1070 MEMCACHED_COMMAND_ADD(c->sfd, info.key, info.nkey,
1071 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1072 break;
1073 case OPERATION_REPLACE:
1074 MEMCACHED_COMMAND_REPLACE(c->sfd, info.key, info.nkey,
1075 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1076 break;
1077 case OPERATION_APPEND:
1078 MEMCACHED_COMMAND_APPEND(c->sfd, info.key, info.nkey,
1079 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1080 break;
1081 case OPERATION_PREPEND:
1082 MEMCACHED_COMMAND_PREPEND(c->sfd, info.key, info.nkey,
1083 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1084 break;
1085 case OPERATION_SET:
1086 MEMCACHED_COMMAND_SET(c->sfd, info.key, info.nkey,
1087 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1088 break;
1089 case OPERATION_CAS:
1090 MEMCACHED_COMMAND_CAS(c->sfd, info.key, info.nkey, info.nbytes, c->cas);
1091 break;
1092 }
1093 #endif
1094
1095 switch (ret) {
1096 case ENGINE_SUCCESS:
1097 out_string(c, "STORED");
1098 break;
1099 case ENGINE_KEY_EEXISTS:
1100 out_string(c, "EXISTS");
1101 break;
1102 case ENGINE_KEY_ENOENT:
1103 out_string(c, "NOT_FOUND");
1104 break;
1105 case ENGINE_NOT_STORED:
1106 out_string(c, "NOT_STORED");
1107 break;
1108 case ENGINE_DISCONNECT:
1109 c->state = conn_closing;
1110 break;
1111 case ENGINE_ENOTSUP:
1112 out_string(c, "SERVER_ERROR not supported");
1113 break;
1114 case ENGINE_ENOMEM:
1115 out_string(c, "SERVER_ERROR out of memory");
1116 break;
1117 case ENGINE_TMPFAIL:
1118 out_string(c, "SERVER_ERROR temporary failure");
1119 break;
1120 case ENGINE_EINVAL:
1121 out_string(c, "CLIENT_ERROR invalid arguments");
1122 break;
1123 case ENGINE_E2BIG:
1124 out_string(c, "CLIENT_ERROR value too big");
1125 break;
1126 case ENGINE_EACCESS:
1127 out_string(c, "CLIENT_ERROR access control violation");
1128 break;
1129 case ENGINE_NOT_MY_VBUCKET:
1130 out_string(c, "SERVER_ERROR not my vbucket");
1131 break;
1132 case ENGINE_FAILED:
1133 out_string(c, "SERVER_ERROR failure");
1134 break;
1135 case ENGINE_EWOULDBLOCK:
1136 c->ewouldblock = true;
1137 break;
1138 case ENGINE_WANT_MORE:
1139 assert(false);
1140 c->state = conn_closing;
1141 break;
1142
1143 default:
1144 out_string(c, "SERVER_ERROR internal");
1145 }
1146
1147 if (c->store_op == OPERATION_CAS) {
1148 switch (ret) {
1149 case ENGINE_SUCCESS:
1150 SLAB_INCR(c, cas_hits, info.key, info.nkey);
1151 break;
1152 case ENGINE_KEY_EEXISTS:
1153 SLAB_INCR(c, cas_badval, info.key, info.nkey);
1154 break;
1155 case ENGINE_KEY_ENOENT:
1156 STATS_NOKEY(c, cas_misses);
1157 break;
1158 default:
1159 ;
1160 }
1161 } else {
1162 SLAB_INCR(c, cmd_set, info.key, info.nkey);
1163 }
1164
1165 if (!c->ewouldblock) {
1166 /* release the c->item reference */
1167 settings.engine.v1->release(settings.engine.v0, c, c->item);
1168 c->item = 0;
1169 }
1170 }
1171
1172 /**
1173 * get a pointer to the start of the request struct for the current command
1174 */
binary_get_request(conn * c)1175 static void* binary_get_request(conn *c) {
1176 char *ret = c->rcurr;
1177 ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
1178 c->binary_header.request.extlen);
1179
1180 assert(ret >= c->rbuf);
1181 return ret;
1182 }
1183
1184 /**
1185 * get a pointer to the key in this request
1186 */
binary_get_key(conn * c)1187 static char* binary_get_key(conn *c) {
1188 return c->rcurr - (c->binary_header.request.keylen);
1189 }
1190
1191 /**
1192 * Insert a key into a buffer, but replace all non-printable characters
1193 * with a '.'.
1194 *
1195 * @param dest where to store the output
1196 * @param destsz size of destination buffer
1197 * @param prefix string to insert before the data
1198 * @param client the client we are serving
1199 * @param from_client set to true if this data is from the client
1200 * @param key the key to add to the buffer
1201 * @param nkey the number of bytes in the key
1202 * @return number of bytes in dest if success, -1 otherwise
1203 */
key_to_printable_buffer(char * dest,size_t destsz,int client,bool from_client,const char * prefix,const char * key,size_t nkey)1204 static ssize_t key_to_printable_buffer(char *dest, size_t destsz,
1205 int client, bool from_client,
1206 const char *prefix,
1207 const char *key,
1208 size_t nkey)
1209 {
1210 ssize_t nw = snprintf(dest, destsz, "%c%d %s ", from_client ? '>' : '<',
1211 client, prefix);
1212 size_t ii;
1213 char *ptr = dest + nw;
1214 if (nw == -1) {
1215 return -1;
1216 }
1217
1218 destsz -= nw;
1219 if (nkey > destsz) {
1220 nkey = destsz;
1221 }
1222
1223 for (ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
1224 if (isgraph(*key)) {
1225 *ptr = *key;
1226 } else {
1227 *ptr = '.';
1228 }
1229 }
1230
1231 *ptr = '\0';
1232 return ptr - dest;
1233 }
1234
1235 /**
1236 * Convert a byte array to a text string
1237 *
1238 * @param dest where to store the output
1239 * @param destsz size of destination buffer
1240 * @param prefix string to insert before the data
1241 * @param client the client we are serving
1242 * @param from_client set to true if this data is from the client
1243 * @param data the data to add to the buffer
1244 * @param size the number of bytes in data to print
1245 * @return number of bytes in dest if success, -1 otherwise
1246 */
bytes_to_output_string(char * dest,size_t destsz,int client,bool from_client,const char * prefix,const char * data,size_t size)1247 static ssize_t bytes_to_output_string(char *dest, size_t destsz,
1248 int client, bool from_client,
1249 const char *prefix,
1250 const char *data,
1251 size_t size)
1252 {
1253 ssize_t nw = snprintf(dest, destsz, "%c%d %s", from_client ? '>' : '<',
1254 client, prefix);
1255 size_t ii;
1256 ssize_t offset = nw;
1257 if (nw == -1) {
1258 return -1;
1259 }
1260
1261 for (ii = 0; ii < size; ++ii) {
1262 if (ii % 4 == 0) {
1263 if ((nw = snprintf(dest + offset, destsz - offset, "\n%c%d ",
1264 from_client ? '>' : '<', client)) == -1) {
1265 return -1;
1266 }
1267 offset += nw;
1268 }
1269 if ((nw = snprintf(dest + offset, destsz - offset,
1270 " 0x%02x", (unsigned char)data[ii])) == -1) {
1271 return -1;
1272 }
1273 offset += nw;
1274 }
1275
1276 if ((nw = snprintf(dest + offset, destsz - offset, "\n")) == -1) {
1277 return -1;
1278 }
1279
1280 return offset + nw;
1281 }
1282
add_bin_header(conn * c,uint16_t err,uint8_t hdr_len,uint16_t key_len,uint32_t body_len)1283 static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
1284 protocol_binary_response_header* header;
1285
1286 assert(c);
1287
1288 c->msgcurr = 0;
1289 c->msgused = 0;
1290 c->iovused = 0;
1291 if (add_msghdr(c) != 0) {
1292 /* XXX: out_string is inappropriate here */
1293 out_string(c, "SERVER_ERROR out of memory");
1294 return;
1295 }
1296
1297 header = (protocol_binary_response_header *)c->wbuf;
1298
1299 header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1300 header->response.opcode = c->binary_header.request.opcode;
1301 header->response.keylen = (uint16_t)htons(key_len);
1302
1303 header->response.extlen = (uint8_t)hdr_len;
1304 header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
1305 header->response.status = (uint16_t)htons(err);
1306
1307 header->response.bodylen = htonl(body_len);
1308 header->response.opaque = c->opaque;
1309 header->response.cas = htonll(c->cas);
1310
1311 if (settings.verbose > 1) {
1312 char buffer[1024];
1313 if (bytes_to_output_string(buffer, sizeof(buffer), c->sfd, false,
1314 "Writing bin response:",
1315 (const char*)header->bytes,
1316 sizeof(header->bytes)) != -1) {
1317 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1318 "%s", buffer);
1319 }
1320 }
1321
1322 add_iov(c, c->wbuf, sizeof(header->response));
1323 }
1324
1325 /**
1326 * Convert an error code generated from the storage engine to the corresponding
1327 * error code used by the protocol layer.
1328 * @param e the error code as used in the engine
1329 * @return the error code as used by the protocol layer
1330 */
engine_error_2_protocol_error(ENGINE_ERROR_CODE e)1331 static protocol_binary_response_status engine_error_2_protocol_error(ENGINE_ERROR_CODE e) {
1332 protocol_binary_response_status ret;
1333
1334 switch (e) {
1335 case ENGINE_SUCCESS:
1336 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
1337 case ENGINE_KEY_ENOENT:
1338 return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1339 case ENGINE_KEY_EEXISTS:
1340 return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1341 case ENGINE_ENOMEM:
1342 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
1343 case ENGINE_TMPFAIL:
1344 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1345 case ENGINE_NOT_STORED:
1346 return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1347 case ENGINE_EINVAL:
1348 return PROTOCOL_BINARY_RESPONSE_EINVAL;
1349 case ENGINE_ENOTSUP:
1350 return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
1351 case ENGINE_E2BIG:
1352 return PROTOCOL_BINARY_RESPONSE_E2BIG;
1353 case ENGINE_NOT_MY_VBUCKET:
1354 return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1355 case ENGINE_ERANGE:
1356 return PROTOCOL_BINARY_RESPONSE_ERANGE;
1357 default:
1358 ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1359 }
1360
1361 return ret;
1362 }
1363
write_bin_packet(conn * c,protocol_binary_response_status err,int swallow)1364 static void write_bin_packet(conn *c, protocol_binary_response_status err, int swallow) {
1365 ssize_t len;
1366 char buffer[1024] = { [sizeof(buffer) - 1] = '\0' };
1367
1368 switch (err) {
1369 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
1370 len = 0;
1371 break;
1372 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
1373 len = snprintf(buffer, sizeof(buffer), "Out of memory");
1374 break;
1375 case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
1376 len = snprintf(buffer, sizeof(buffer), "Temporary failure");
1377 break;
1378 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
1379 len = snprintf(buffer, sizeof(buffer), "Unknown command");
1380 break;
1381 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
1382 len = snprintf(buffer, sizeof(buffer), "Not found");
1383 break;
1384 case PROTOCOL_BINARY_RESPONSE_EINVAL:
1385 len = snprintf(buffer, sizeof(buffer), "Invalid arguments");
1386 break;
1387 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
1388 len = snprintf(buffer, sizeof(buffer), "Data exists for key");
1389 break;
1390 case PROTOCOL_BINARY_RESPONSE_E2BIG:
1391 len = snprintf(buffer, sizeof(buffer), "Too large");
1392 break;
1393 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
1394 len = snprintf(buffer, sizeof(buffer),
1395 "Non-numeric server-side value for incr or decr");
1396 break;
1397 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
1398 len = snprintf(buffer, sizeof(buffer), "Not stored");
1399 break;
1400 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1401 len = snprintf(buffer, sizeof(buffer), "Auth failure");
1402 break;
1403 case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
1404 len = snprintf(buffer, sizeof(buffer), "Not supported");
1405 break;
1406 case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
1407 len = snprintf(buffer, sizeof(buffer),
1408 "I'm not responsible for this vbucket");
1409 break;
1410
1411 default:
1412 len = snprintf(buffer, sizeof(buffer), "UNHANDLED ERROR (%d)", err);
1413 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1414 ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1415 }
1416
1417 /* Allow the engine to pass extra error information */
1418 if (settings.engine.v1->errinfo != NULL) {
1419 size_t elen = settings.engine.v1->errinfo(settings.engine.v0, c, buffer + len + 2,
1420 sizeof(buffer) - len - 3);
1421
1422 if (elen > 0) {
1423 memcpy(buffer + len, ": ", 2);
1424 len += elen + 2;
1425 }
1426 }
1427
1428 if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS && settings.verbose > 1) {
1429 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
1430 ">%d Writing an error: %s\n", c->sfd,
1431 buffer);
1432 }
1433
1434 add_bin_header(c, err, 0, 0, len);
1435 if (len > 0) {
1436 add_iov(c, buffer, len);
1437 }
1438 conn_set_state(c, conn_mwrite);
1439 if (swallow > 0) {
1440 c->sbytes = swallow;
1441 c->write_and_go = conn_swallow;
1442 } else {
1443 c->write_and_go = conn_new_cmd;
1444 }
1445 }
1446
1447 /* Form and send a response to a command over the binary protocol */
write_bin_response(conn * c,const void * d,int hlen,int keylen,int dlen)1448 static void write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen) {
1449 if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1450 c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1451 add_bin_header(c, 0, hlen, keylen, dlen);
1452 if(dlen > 0) {
1453 add_iov(c, d, dlen);
1454 }
1455 conn_set_state(c, conn_mwrite);
1456 c->write_and_go = conn_new_cmd;
1457 } else {
1458 conn_set_state(c, conn_new_cmd);
1459 }
1460 }
1461
1462
complete_incr_bin(conn * c)1463 static void complete_incr_bin(conn *c) {
1464 protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
1465 protocol_binary_request_incr* req = binary_get_request(c);
1466
1467 assert(c != NULL);
1468 assert(c->wsize >= sizeof(*rsp));
1469
1470 /* fix byteorder in the request */
1471 uint64_t delta = ntohll(req->message.body.delta);
1472 uint64_t initial = ntohll(req->message.body.initial);
1473 rel_time_t expiration = ntohl(req->message.body.expiration);
1474 char *key = binary_get_key(c);
1475 size_t nkey = c->binary_header.request.keylen;
1476 bool incr = (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT ||
1477 c->cmd == PROTOCOL_BINARY_CMD_INCREMENTQ);
1478
1479 if (settings.verbose > 1) {
1480 char buffer[1024];
1481 ssize_t nw;
1482 nw = key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1483 incr ? "INCR" : "DECR", key, nkey);
1484 if (nw != -1) {
1485 if (snprintf(buffer + nw, sizeof(buffer) - nw,
1486 " %" PRIu64 ", %" PRIu64 ", %" PRIu64 "\n",
1487 delta, initial, (uint64_t)expiration) != -1) {
1488 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s",
1489 buffer);
1490 }
1491 }
1492 }
1493
1494 ENGINE_ERROR_CODE ret = c->aiostat;
1495 c->aiostat = ENGINE_SUCCESS;
1496 if (ret == ENGINE_SUCCESS) {
1497 ret = settings.engine.v1->arithmetic(settings.engine.v0,
1498 c, key, nkey, incr,
1499 req->message.body.expiration != 0xffffffff,
1500 delta, initial, expiration,
1501 &c->cas,
1502 &rsp->message.body.value,
1503 c->binary_header.request.vbucket);
1504 }
1505
1506 switch (ret) {
1507 case ENGINE_SUCCESS:
1508 rsp->message.body.value = htonll(rsp->message.body.value);
1509 write_bin_response(c, &rsp->message.body, 0, 0,
1510 sizeof (rsp->message.body.value));
1511 if (incr) {
1512 STATS_INCR(c, incr_hits, key, nkey);
1513 } else {
1514 STATS_INCR(c, decr_hits, key, nkey);
1515 }
1516 break;
1517 case ENGINE_KEY_EEXISTS:
1518 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1519 break;
1520 case ENGINE_KEY_ENOENT:
1521 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1522 if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1523 STATS_INCR(c, incr_misses, key, nkey);
1524 } else {
1525 STATS_INCR(c, decr_misses, key, nkey);
1526 }
1527 break;
1528 case ENGINE_ENOMEM:
1529 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1530 break;
1531 case ENGINE_TMPFAIL:
1532 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1533 break;
1534 case ENGINE_EINVAL:
1535 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
1536 break;
1537 case ENGINE_NOT_STORED:
1538 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1539 break;
1540 case ENGINE_DISCONNECT:
1541 c->state = conn_closing;
1542 break;
1543 case ENGINE_ENOTSUP:
1544 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1545 break;
1546 case ENGINE_NOT_MY_VBUCKET:
1547 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1548 break;
1549 case ENGINE_EWOULDBLOCK:
1550 c->ewouldblock = true;
1551 break;
1552 default:
1553 abort();
1554 }
1555 }
1556
complete_update_bin(conn * c)1557 static void complete_update_bin(conn *c) {
1558 protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1559 assert(c != NULL);
1560
1561 item *it = c->item;
1562 item_info info = { .nvalue = 1 };
1563 if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
1564 settings.engine.v1->release(settings.engine.v0, c, it);
1565 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1566 "%d: Failed to get item info\n",
1567 c->sfd);
1568 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1569 return;
1570 }
1571
1572 ENGINE_ERROR_CODE ret = c->aiostat;
1573 c->aiostat = ENGINE_SUCCESS;
1574 if (ret == ENGINE_SUCCESS) {
1575 ret = settings.engine.v1->store(settings.engine.v0, c,
1576 it, &c->cas, c->store_op,
1577 c->binary_header.request.vbucket);
1578 }
1579
1580 #ifdef ENABLE_DTRACE
1581 switch (c->cmd) {
1582 case OPERATION_ADD:
1583 MEMCACHED_COMMAND_ADD(c->sfd, info.key, info.nkey,
1584 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1585 break;
1586 case OPERATION_REPLACE:
1587 MEMCACHED_COMMAND_REPLACE(c->sfd, info.key, info.nkey,
1588 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1589 break;
1590 case OPERATION_APPEND:
1591 MEMCACHED_COMMAND_APPEND(c->sfd, info.key, info.nkey,
1592 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1593 break;
1594 case OPERATION_PREPEND:
1595 MEMCACHED_COMMAND_PREPEND(c->sfd, info.key, info.nkey,
1596 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1597 break;
1598 case OPERATION_SET:
1599 MEMCACHED_COMMAND_SET(c->sfd, info.key, info.nkey,
1600 (ret == ENGINE_SUCCESS) ? info.nbytes : -1, c->cas);
1601 break;
1602 }
1603 #endif
1604
1605 switch (ret) {
1606 case ENGINE_SUCCESS:
1607 /* Stored */
1608 write_bin_response(c, NULL, 0, 0, 0);
1609 break;
1610 case ENGINE_KEY_EEXISTS:
1611 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1612 break;
1613 case ENGINE_KEY_ENOENT:
1614 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1615 break;
1616 case ENGINE_ENOMEM:
1617 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1618 break;
1619 case ENGINE_TMPFAIL:
1620 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1621 break;
1622 case ENGINE_EWOULDBLOCK:
1623 c->ewouldblock = true;
1624 break;
1625 case ENGINE_DISCONNECT:
1626 c->state = conn_closing;
1627 break;
1628 case ENGINE_ENOTSUP:
1629 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1630 break;
1631 case ENGINE_NOT_MY_VBUCKET:
1632 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1633 break;
1634 default:
1635 if (c->store_op == OPERATION_ADD) {
1636 eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1637 } else if(c->store_op == OPERATION_REPLACE) {
1638 eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1639 } else {
1640 eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1641 }
1642 write_bin_packet(c, eno, 0);
1643 }
1644
1645 if (c->store_op == OPERATION_CAS) {
1646 switch (ret) {
1647 case ENGINE_SUCCESS:
1648 SLAB_INCR(c, cas_hits, info.key, info.nkey);
1649 break;
1650 case ENGINE_KEY_EEXISTS:
1651 SLAB_INCR(c, cas_badval, info.key, info.nkey);
1652 break;
1653 case ENGINE_KEY_ENOENT:
1654 STATS_NOKEY(c, cas_misses);
1655 break;
1656 default:
1657 ;
1658 }
1659 } else {
1660 SLAB_INCR(c, cmd_set, info.key, info.nkey);
1661 }
1662
1663 if (!c->ewouldblock) {
1664 /* release the c->item reference */
1665 settings.engine.v1->release(settings.engine.v0, c, c->item);
1666 c->item = 0;
1667 }
1668 }
1669
process_bin_get(conn * c)1670 static void process_bin_get(conn *c) {
1671 item *it;
1672
1673 protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1674 char* key = binary_get_key(c);
1675 size_t nkey = c->binary_header.request.keylen;
1676
1677 if (settings.verbose > 1) {
1678 char buffer[1024];
1679 if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1680 "GET", key, nkey) != -1) {
1681 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1682 buffer);
1683 }
1684 }
1685
1686 ENGINE_ERROR_CODE ret = c->aiostat;
1687 c->aiostat = ENGINE_SUCCESS;
1688 if (ret == ENGINE_SUCCESS) {
1689 ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, nkey,
1690 c->binary_header.request.vbucket);
1691 }
1692
1693 uint16_t keylen;
1694 uint32_t bodylen;
1695 item_info info = { .nvalue = 1 };
1696
1697 switch (ret) {
1698 case ENGINE_SUCCESS:
1699 if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
1700 settings.engine.v1->release(settings.engine.v0, c, it);
1701 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1702 "%d: Failed to get item info\n",
1703 c->sfd);
1704 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1705 break;
1706 }
1707
1708 keylen = 0;
1709 bodylen = sizeof(rsp->message.body) + info.nbytes;
1710
1711 STATS_HIT(c, get, key, nkey);
1712
1713 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1714 bodylen += nkey;
1715 keylen = nkey;
1716 }
1717 add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1718 rsp->message.header.response.cas = htonll(info.cas);
1719
1720 // add the flags
1721 rsp->message.body.flags = info.flags;
1722 add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1723
1724 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1725 add_iov(c, info.key, nkey);
1726 }
1727
1728 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
1729 conn_set_state(c, conn_mwrite);
1730 /* Remember this item so we can garbage collect it later */
1731 c->item = it;
1732 break;
1733 case ENGINE_KEY_ENOENT:
1734 STATS_MISS(c, get, key, nkey);
1735
1736 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1737
1738 if (c->noreply) {
1739 conn_set_state(c, conn_new_cmd);
1740 } else {
1741 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1742 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1743 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1744 0, nkey, nkey);
1745 memcpy(ofs, key, nkey);
1746 add_iov(c, ofs, nkey);
1747 conn_set_state(c, conn_mwrite);
1748 } else {
1749 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1750 }
1751 }
1752 break;
1753 case ENGINE_EWOULDBLOCK:
1754 c->ewouldblock = true;
1755 break;
1756 case ENGINE_DISCONNECT:
1757 c->state = conn_closing;
1758 break;
1759 case ENGINE_ENOTSUP:
1760 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1761 break;
1762 case ENGINE_NOT_MY_VBUCKET:
1763 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1764 break;
1765 case ENGINE_TMPFAIL:
1766 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1767 break;
1768
1769 default:
1770 /* @todo add proper error handling! */
1771 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
1772 "Unknown error code: %d\n", ret);
1773 abort();
1774 }
1775
1776 if (settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
1777 stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
1778 }
1779 }
1780
append_bin_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,conn * c)1781 static void append_bin_stats(const char *key, const uint16_t klen,
1782 const char *val, const uint32_t vlen,
1783 conn *c) {
1784 char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1785 uint32_t bodylen = klen + vlen;
1786 protocol_binary_response_header header = {
1787 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
1788 .response.opcode = PROTOCOL_BINARY_CMD_STAT,
1789 .response.keylen = (uint16_t)htons(klen),
1790 .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1791 .response.bodylen = htonl(bodylen),
1792 .response.opaque = c->opaque
1793 };
1794
1795 memcpy(buf, header.bytes, sizeof(header.response));
1796 buf += sizeof(header.response);
1797
1798 if (klen > 0) {
1799 memcpy(buf, key, klen);
1800 buf += klen;
1801
1802 if (vlen > 0) {
1803 memcpy(buf, val, vlen);
1804 }
1805 }
1806
1807 c->dynamic_buffer.offset += sizeof(header.response) + bodylen;
1808 }
1809
1810 /**
1811 * Append a key-value pair to the stats output buffer. This function assumes
1812 * that the output buffer is big enough (it will be if you call it through
1813 * append_stats)
1814 */
append_ascii_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,conn * c)1815 static void append_ascii_stats(const char *key, const uint16_t klen,
1816 const char *val, const uint32_t vlen,
1817 conn *c) {
1818 char *pos = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1819 uint32_t nbytes = 5; /* "END\r\n" or "STAT " */
1820
1821 if (klen == 0 && vlen == 0) {
1822 memcpy(pos, "END\r\n", 5);
1823 } else {
1824 memcpy(pos, "STAT ", 5);
1825 memcpy(pos + nbytes, key, klen);
1826 nbytes += klen;
1827 if (vlen != 0) {
1828 pos[nbytes] = ' ';
1829 ++nbytes;
1830 memcpy(pos + nbytes, val, vlen);
1831 nbytes += vlen;
1832 }
1833 memcpy(pos + nbytes, "\r\n", 2);
1834 nbytes += 2;
1835 }
1836
1837 c->dynamic_buffer.offset += nbytes;
1838 }
1839
grow_dynamic_buffer(conn * c,size_t needed)1840 static bool grow_dynamic_buffer(conn *c, size_t needed) {
1841 size_t nsize = c->dynamic_buffer.size;
1842 size_t available = nsize - c->dynamic_buffer.offset;
1843 bool rv = true;
1844
1845 /* Special case: No buffer -- need to allocate fresh */
1846 if (c->dynamic_buffer.buffer == NULL) {
1847 nsize = 1024;
1848 available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
1849 }
1850
1851 while (needed > available) {
1852 assert(nsize > 0);
1853 nsize = nsize << 1;
1854 available = nsize - c->dynamic_buffer.offset;
1855 }
1856
1857 if (nsize != c->dynamic_buffer.size) {
1858 char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
1859 if (ptr) {
1860 c->dynamic_buffer.buffer = ptr;
1861 c->dynamic_buffer.size = nsize;
1862 } else {
1863 rv = false;
1864 }
1865 }
1866
1867 return rv;
1868 }
1869
append_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,const void * cookie)1870 static void append_stats(const char *key, const uint16_t klen,
1871 const char *val, const uint32_t vlen,
1872 const void *cookie)
1873 {
1874 /* value without a key is invalid */
1875 if (klen == 0 && vlen > 0) {
1876 return ;
1877 }
1878
1879 conn *c = (conn*)cookie;
1880
1881 if (c->protocol == binary_prot) {
1882 size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1883 if (!grow_dynamic_buffer(c, needed)) {
1884 return ;
1885 }
1886 append_bin_stats(key, klen, val, vlen, c);
1887 } else {
1888 size_t needed = vlen + klen + 10; // 10 == "STAT = \r\n"
1889 if (!grow_dynamic_buffer(c, needed)) {
1890 return ;
1891 }
1892 append_ascii_stats(key, klen, val, vlen, c);
1893 }
1894
1895 assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
1896 }
1897
process_bin_stat(conn * c)1898 static void process_bin_stat(conn *c) {
1899 char *subcommand = binary_get_key(c);
1900 size_t nkey = c->binary_header.request.keylen;
1901
1902 if (settings.verbose > 1) {
1903 char buffer[1024];
1904 if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
1905 "STATS", subcommand, nkey) != -1) {
1906 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
1907 buffer);
1908 }
1909 }
1910
1911 ENGINE_ERROR_CODE ret = c->aiostat;
1912 c->aiostat = ENGINE_SUCCESS;
1913 c->ewouldblock = false;
1914
1915 if (ret == ENGINE_SUCCESS) {
1916 if (nkey == 0) {
1917 /* request all statistics */
1918 ret = settings.engine.v1->get_stats(settings.engine.v0, c, NULL, 0, append_stats);
1919 if (ret == ENGINE_SUCCESS) {
1920 server_stats(&append_stats, c, false);
1921 }
1922 } else if (strncmp(subcommand, "reset", 5) == 0) {
1923 stats_reset(c);
1924 settings.engine.v1->reset_stats(settings.engine.v0, c);
1925 } else if (strncmp(subcommand, "settings", 8) == 0) {
1926 process_stat_settings(&append_stats, c);
1927 } else if (strncmp(subcommand, "detail", 6) == 0) {
1928 char *subcmd_pos = subcommand + 6;
1929 if (settings.allow_detailed) {
1930 if (strncmp(subcmd_pos, " dump", 5) == 0) {
1931 int len;
1932 char *dump_buf = stats_prefix_dump(&len);
1933 if (dump_buf == NULL || len <= 0) {
1934 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1935 return ;
1936 } else {
1937 append_stats("detailed", strlen("detailed"), dump_buf, len, c);
1938 free(dump_buf);
1939 }
1940 } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1941 settings.detail_enabled = 1;
1942 } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1943 settings.detail_enabled = 0;
1944 } else {
1945 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1946 return;
1947 }
1948 } else {
1949 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1950 return;
1951 }
1952 } else if (strncmp(subcommand, "aggregate", 9) == 0) {
1953 server_stats(&append_stats, c, true);
1954 } else if (strncmp(subcommand, "topkeys", 7) == 0) {
1955 topkeys_t *tk = get_independent_stats(c)->topkeys;
1956 if (tk != NULL) {
1957 topkeys_stats(tk, c, current_time, append_stats);
1958 } else {
1959 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1960 return;
1961 }
1962 } else {
1963 ret = settings.engine.v1->get_stats(settings.engine.v0, c,
1964 subcommand, nkey,
1965 append_stats);
1966 }
1967 }
1968
1969 switch (ret) {
1970 case ENGINE_SUCCESS:
1971 append_stats(NULL, 0, NULL, 0, c);
1972 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
1973 c->dynamic_buffer.buffer = NULL;
1974 break;
1975 case ENGINE_ENOMEM:
1976 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1977 break;
1978 case ENGINE_TMPFAIL:
1979 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1980 break;
1981 case ENGINE_KEY_ENOENT:
1982 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1983 break;
1984 case ENGINE_DISCONNECT:
1985 c->state = conn_closing;
1986 break;
1987 case ENGINE_ENOTSUP:
1988 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1989 break;
1990 case ENGINE_EWOULDBLOCK:
1991 c->ewouldblock = true;
1992 break;
1993 default:
1994 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1995 }
1996 }
1997
bin_read_chunk(conn * c,enum bin_substates next_substate,uint32_t chunk)1998 static void bin_read_chunk(conn *c, enum bin_substates next_substate, uint32_t chunk) {
1999 assert(c);
2000 c->substate = next_substate;
2001 c->rlbytes = chunk;
2002
2003 /* Ok... do we have room for everything in our buffer? */
2004 ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
2005 if (c->rlbytes > c->rsize - offset) {
2006 size_t nsize = c->rsize;
2007 size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
2008
2009 while (size > nsize) {
2010 nsize *= 2;
2011 }
2012
2013 if (nsize != c->rsize) {
2014 char *newm;
2015 if (settings.verbose > 1) {
2016 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2017 "%d: Need to grow buffer from %lu to %lu\n",
2018 c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
2019 }
2020 newm = realloc(c->rbuf, nsize);
2021 if (newm == NULL) {
2022 if (settings.verbose) {
2023 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2024 "%d: Failed to grow buffer.. closing connection\n",
2025 c->sfd);
2026 }
2027 conn_set_state(c, conn_closing);
2028 return;
2029 }
2030
2031 c->rbuf= newm;
2032 /* rcurr should point to the same offset in the packet */
2033 c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
2034 c->rsize = nsize;
2035 }
2036 if (c->rbuf != c->rcurr) {
2037 memmove(c->rbuf, c->rcurr, c->rbytes);
2038 c->rcurr = c->rbuf;
2039 if (settings.verbose > 1) {
2040 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2041 "%d: Repack input buffer\n",
2042 c->sfd);
2043 }
2044 }
2045 }
2046
2047 /* preserve the header in the buffer.. */
2048 c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
2049 conn_set_state(c, conn_nread);
2050 }
2051
bin_read_key(conn * c,enum bin_substates next_substate,int extra)2052 static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
2053 bin_read_chunk(c, next_substate, c->keylen + extra);
2054 }
2055
2056
2057 /* Just write an error message and disconnect the client */
handle_binary_protocol_error(conn * c)2058 static void handle_binary_protocol_error(conn *c) {
2059 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2060 if (settings.verbose) {
2061 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2062 "%d: Protocol error (opcode %02x), close connection\n",
2063 c->sfd, c->binary_header.request.opcode);
2064 }
2065 c->write_and_go = conn_closing;
2066 }
2067
init_sasl_conn(conn * c)2068 static void init_sasl_conn(conn *c) {
2069 assert(c);
2070 if (!c->sasl_conn) {
2071 int result=sasl_server_new("memcached",
2072 NULL, NULL, NULL, NULL,
2073 NULL, 0, &c->sasl_conn);
2074 if (result != SASL_OK) {
2075 if (settings.verbose) {
2076 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2077 "%d: Failed to initialize SASL conn.\n",
2078 c->sfd);
2079 }
2080 c->sasl_conn = NULL;
2081 }
2082 }
2083 }
2084
get_auth_data(const void * cookie,auth_data_t * data)2085 static void get_auth_data(const void *cookie, auth_data_t *data) {
2086 conn *c = (conn*)cookie;
2087 if (c->sasl_conn) {
2088 sasl_getprop(c->sasl_conn, SASL_USERNAME, (void*)&data->username);
2089 #ifdef ENABLE_ISASL
2090 sasl_getprop(c->sasl_conn, ISASL_CONFIG, (void*)&data->config);
2091 #endif
2092 }
2093 (void)(data);
2094 }
2095
2096 #ifdef SASL_ENABLED
bin_list_sasl_mechs(conn * c)2097 static void bin_list_sasl_mechs(conn *c) {
2098 init_sasl_conn(c);
2099 const char *result_string = NULL;
2100 unsigned int string_length = 0;
2101 int result=sasl_listmech(c->sasl_conn, NULL,
2102 "", /* What to prepend the string with */
2103 " ", /* What to separate mechanisms with */
2104 "", /* What to append to the string */
2105 &result_string, &string_length,
2106 NULL);
2107 if (result != SASL_OK) {
2108 /* Perhaps there's a better error for this... */
2109 if (settings.verbose) {
2110 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2111 "%d: Failed to list SASL mechanisms.\n",
2112 c->sfd);
2113 }
2114 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2115 return;
2116 }
2117 write_bin_response(c, (char*)result_string, 0, 0, string_length);
2118 }
2119 #endif
2120
2121 struct sasl_tmp {
2122 int ksize;
2123 int vsize;
2124 char data[]; /* data + ksize == value */
2125 };
2126
process_bin_sasl_auth(conn * c)2127 static void process_bin_sasl_auth(conn *c) {
2128 assert(c->binary_header.request.extlen == 0);
2129
2130 int nkey = c->binary_header.request.keylen;
2131 int vlen = c->binary_header.request.bodylen - nkey;
2132
2133 if (nkey > MAX_SASL_MECH_LEN) {
2134 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
2135 c->write_and_go = conn_swallow;
2136 return;
2137 }
2138
2139 char *key = binary_get_key(c);
2140 assert(key);
2141
2142 size_t buffer_size = sizeof(struct sasl_tmp) + nkey + vlen + 2;
2143 struct sasl_tmp *data = calloc(sizeof(struct sasl_tmp) + buffer_size, 1);
2144 if (!data) {
2145 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2146 c->write_and_go = conn_swallow;
2147 return;
2148 }
2149
2150 data->ksize = nkey;
2151 data->vsize = vlen;
2152 memcpy(data->data, key, nkey);
2153
2154 c->item = data;
2155 c->ritem = data->data + nkey;
2156 c->rlbytes = vlen;
2157 conn_set_state(c, conn_nread);
2158 c->substate = bin_reading_sasl_auth_data;
2159 }
2160
process_bin_complete_sasl_auth(conn * c)2161 static void process_bin_complete_sasl_auth(conn *c) {
2162 const char *out = NULL;
2163 unsigned int outlen = 0;
2164
2165 assert(c->item);
2166 init_sasl_conn(c);
2167
2168 int nkey = c->binary_header.request.keylen;
2169 int vlen = c->binary_header.request.bodylen - nkey;
2170
2171 struct sasl_tmp *stmp = c->item;
2172 char mech[nkey+1];
2173 memcpy(mech, stmp->data, nkey);
2174 mech[nkey] = 0x00;
2175
2176 if (settings.verbose) {
2177 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2178 "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
2179 }
2180
2181 const char *challenge = vlen == 0 ? NULL : (stmp->data + nkey);
2182
2183 int result=-1;
2184
2185 switch (c->cmd) {
2186 case PROTOCOL_BINARY_CMD_SASL_AUTH:
2187 result = sasl_server_start(c->sasl_conn, mech,
2188 challenge, vlen,
2189 &out, &outlen);
2190 break;
2191 case PROTOCOL_BINARY_CMD_SASL_STEP:
2192 result = sasl_server_step(c->sasl_conn,
2193 challenge, vlen,
2194 &out, &outlen);
2195 break;
2196 default:
2197 assert(false); /* CMD should be one of the above */
2198 /* This code is pretty much impossible, but makes the compiler
2199 happier */
2200 if (settings.verbose) {
2201 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2202 "%d: Unhandled command %d with challenge %s\n",
2203 c->sfd, c->cmd, challenge);
2204 }
2205 break;
2206 }
2207
2208 free(c->item);
2209 c->item = NULL;
2210 c->ritem = NULL;
2211
2212 if (settings.verbose) {
2213 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2214 "%d: sasl result code: %d\n",
2215 c->sfd, result);
2216 }
2217
2218 switch(result) {
2219 case SASL_OK:
2220 write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated"));
2221 auth_data_t data;
2222 get_auth_data(c, &data);
2223 perform_callbacks(ON_AUTH, (const void*)&data, c);
2224 STATS_NOKEY(c, auth_cmds);
2225 break;
2226 case SASL_CONTINUE:
2227 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
2228 if(outlen > 0) {
2229 add_iov(c, out, outlen);
2230 }
2231 conn_set_state(c, conn_mwrite);
2232 c->write_and_go = conn_new_cmd;
2233 break;
2234 default:
2235 if (settings.verbose) {
2236 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2237 "%d: Unknown sasl response: %d\n",
2238 c->sfd, result);
2239 }
2240 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2241 STATS_NOKEY2(c, auth_cmds, auth_errors);
2242 }
2243 }
2244
authenticated(conn * c)2245 static bool authenticated(conn *c) {
2246 bool rv = false;
2247
2248 switch (c->cmd) {
2249 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
2250 case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */
2251 case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */
2252 case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */
2253 rv = true;
2254 break;
2255 default:
2256 if (c->sasl_conn) {
2257 const void *uname = NULL;
2258 sasl_getprop(c->sasl_conn, SASL_USERNAME, &uname);
2259 rv = uname != NULL;
2260 }
2261 }
2262
2263 if (settings.verbose > 1) {
2264 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2265 "%d: authenticated() in cmd 0x%02x is %s\n",
2266 c->sfd, c->cmd, rv ? "true" : "false");
2267 }
2268
2269 return rv;
2270 }
2271
binary_response_handler(const void * key,uint16_t keylen,const void * ext,uint8_t extlen,const void * body,uint32_t bodylen,uint8_t datatype,uint16_t status,uint64_t cas,const void * cookie)2272 static bool binary_response_handler(const void *key, uint16_t keylen,
2273 const void *ext, uint8_t extlen,
2274 const void *body, uint32_t bodylen,
2275 uint8_t datatype, uint16_t status,
2276 uint64_t cas, const void *cookie)
2277 {
2278 conn *c = (conn*)cookie;
2279 /* Look at append_bin_stats */
2280 size_t needed = keylen + extlen + bodylen + sizeof(protocol_binary_response_header);
2281 if (!grow_dynamic_buffer(c, needed)) {
2282 if (settings.verbose > 0) {
2283 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
2284 "<%d ERROR: Failed to allocate memory for response\n",
2285 c->sfd);
2286 }
2287 return false;
2288 }
2289
2290 char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
2291 protocol_binary_response_header header = {
2292 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
2293 .response.opcode = c->binary_header.request.opcode,
2294 .response.keylen = (uint16_t)htons(keylen),
2295 .response.extlen = extlen,
2296 .response.datatype = datatype,
2297 .response.status = (uint16_t)htons(status),
2298 .response.bodylen = htonl(bodylen + keylen + extlen),
2299 .response.opaque = c->opaque,
2300 .response.cas = htonll(cas),
2301 };
2302
2303 memcpy(buf, header.bytes, sizeof(header.response));
2304 buf += sizeof(header.response);
2305
2306 if (extlen > 0) {
2307 memcpy(buf, ext, extlen);
2308 buf += extlen;
2309 }
2310
2311 if (keylen > 0) {
2312 memcpy(buf, key, keylen);
2313 buf += keylen;
2314 }
2315
2316 if (bodylen > 0) {
2317 memcpy(buf, body, bodylen);
2318 }
2319
2320 c->dynamic_buffer.offset += needed;
2321
2322 return true;
2323 }
2324
2325 /**
2326 * Tap stats (these are only used by the tap thread, so they don't need
2327 * to be in the threadlocal struct right now...
2328 */
2329 struct tap_cmd_stats {
2330 uint64_t connect;
2331 uint64_t mutation;
2332 uint64_t checkpoint_start;
2333 uint64_t checkpoint_end;
2334 uint64_t delete;
2335 uint64_t flush;
2336 uint64_t opaque;
2337 uint64_t vbucket_set;
2338 };
2339
2340 struct tap_stats {
2341 pthread_mutex_t mutex;
2342 struct tap_cmd_stats sent;
2343 struct tap_cmd_stats received;
2344 } tap_stats = { .mutex = PTHREAD_MUTEX_INITIALIZER };
2345
ship_tap_log(conn * c)2346 static void ship_tap_log(conn *c) {
2347 assert(c->thread->type == TAP);
2348 c->msgcurr = 0;
2349 c->msgused = 0;
2350 c->iovused = 0;
2351 if (add_msghdr(c) != 0) {
2352 if (settings.verbose) {
2353 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2354 "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
2355 }
2356 conn_set_state(c, conn_closing);
2357 return ;
2358 }
2359 /* @todo add check for buffer overflow of c->wbuf) */
2360 c->wcurr = c->wbuf;
2361
2362 bool more_data = true;
2363 bool send_data = false;
2364 bool disconnect = false;
2365
2366 item *it;
2367 uint32_t bodylen;
2368 int ii = 0;
2369 c->icurr = c->ilist;
2370 do {
2371 /* @todo fixme! */
2372 if (ii++ == 10) {
2373 break;
2374 }
2375
2376 void *engine;
2377 uint16_t nengine;
2378 uint8_t ttl;
2379 uint16_t tap_flags;
2380 uint32_t seqno;
2381 uint16_t vbucket;
2382
2383 tap_event_t event = c->tap_iterator(settings.engine.v0, c, &it,
2384 &engine, &nengine, &ttl,
2385 &tap_flags, &seqno, &vbucket);
2386 union {
2387 protocol_binary_request_tap_mutation mutation;
2388 protocol_binary_request_tap_delete delete;
2389 protocol_binary_request_tap_flush flush;
2390 protocol_binary_request_tap_opaque opaque;
2391 protocol_binary_request_noop noop;
2392 } msg = {
2393 .mutation.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ,
2394 };
2395
2396 msg.opaque.message.header.request.opaque = htonl(seqno);
2397 msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
2398 msg.opaque.message.body.tap.ttl = ttl;
2399 msg.opaque.message.body.tap.flags = htons(tap_flags);
2400 msg.opaque.message.header.request.extlen = 8;
2401 msg.opaque.message.header.request.vbucket = htons(vbucket);
2402 item_info info = { .nvalue = 1 };
2403
2404 switch (event) {
2405 case TAP_NOOP :
2406 send_data = true;
2407 msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
2408 msg.noop.message.header.request.extlen = 0;
2409 msg.noop.message.header.request.bodylen = htonl(0);
2410 memcpy(c->wcurr, msg.noop.bytes, sizeof(msg.noop.bytes));
2411 add_iov(c, c->wcurr, sizeof(msg.noop.bytes));
2412 c->wcurr += sizeof(msg.noop.bytes);
2413 c->wbytes += sizeof(msg.noop.bytes);
2414 break;
2415 case TAP_PAUSE :
2416 more_data = false;
2417 break;
2418 case TAP_CHECKPOINT_START:
2419 case TAP_CHECKPOINT_END:
2420 case TAP_MUTATION:
2421 if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
2422 settings.engine.v1->release(settings.engine.v0, c, it);
2423 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2424 "%d: Failed to get item info\n", c->sfd);
2425 break;
2426 }
2427 send_data = true;
2428 c->ilist[c->ileft++] = it;
2429
2430 if (event == TAP_CHECKPOINT_START) {
2431 msg.mutation.message.header.request.opcode =
2432 PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
2433 pthread_mutex_lock(&tap_stats.mutex);
2434 tap_stats.sent.checkpoint_start++;
2435 pthread_mutex_unlock(&tap_stats.mutex);
2436 } else if (event == TAP_CHECKPOINT_END) {
2437 msg.mutation.message.header.request.opcode =
2438 PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
2439 pthread_mutex_lock(&tap_stats.mutex);
2440 tap_stats.sent.checkpoint_end++;
2441 pthread_mutex_unlock(&tap_stats.mutex);
2442 } else if (event == TAP_MUTATION) {
2443 msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
2444 pthread_mutex_lock(&tap_stats.mutex);
2445 tap_stats.sent.mutation++;
2446 pthread_mutex_unlock(&tap_stats.mutex);
2447 }
2448
2449 msg.mutation.message.header.request.cas = htonll(info.cas);
2450 msg.mutation.message.header.request.keylen = htons(info.nkey);
2451 msg.mutation.message.header.request.extlen = 16;
2452
2453 bodylen = 16 + info.nkey + nengine;
2454 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2455 bodylen += info.nbytes;
2456 }
2457 msg.mutation.message.header.request.bodylen = htonl(bodylen);
2458 msg.mutation.message.body.item.flags = htonl(info.flags);
2459 msg.mutation.message.body.item.expiration = htonl(info.exptime);
2460 msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
2461 msg.mutation.message.body.tap.ttl = ttl;
2462 msg.mutation.message.body.tap.flags = htons(tap_flags);
2463 memcpy(c->wcurr, msg.mutation.bytes, sizeof(msg.mutation.bytes));
2464
2465 add_iov(c, c->wcurr, sizeof(msg.mutation.bytes));
2466 c->wcurr += sizeof(msg.mutation.bytes);
2467 c->wbytes += sizeof(msg.mutation.bytes);
2468
2469 if (nengine > 0) {
2470 memcpy(c->wcurr, engine, nengine);
2471 add_iov(c, c->wcurr, nengine);
2472 c->wcurr += nengine;
2473 c->wbytes += nengine;
2474 }
2475
2476 add_iov(c, info.key, info.nkey);
2477 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2478 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
2479 }
2480
2481 break;
2482 case TAP_DELETION:
2483 /* This is a delete */
2484 if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
2485 settings.engine.v1->release(settings.engine.v0, c, it);
2486 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2487 "%d: Failed to get item info\n", c->sfd);
2488 break;
2489 }
2490 send_data = true;
2491 c->ilist[c->ileft++] = it;
2492 msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
2493 msg.delete.message.header.request.cas = htonll(info.cas);
2494 msg.delete.message.header.request.keylen = htons(info.nkey);
2495
2496 bodylen = 8 + info.nkey + nengine;
2497 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2498 bodylen += info.nbytes;
2499 }
2500 msg.delete.message.header.request.bodylen = htonl(bodylen);
2501
2502 memcpy(c->wcurr, msg.delete.bytes, sizeof(msg.delete.bytes));
2503 add_iov(c, c->wcurr, sizeof(msg.delete.bytes));
2504 c->wcurr += sizeof(msg.delete.bytes);
2505 c->wbytes += sizeof(msg.delete.bytes);
2506
2507 if (nengine > 0) {
2508 memcpy(c->wcurr, engine, nengine);
2509 add_iov(c, c->wcurr, nengine);
2510 c->wcurr += nengine;
2511 c->wbytes += nengine;
2512 }
2513
2514 add_iov(c, info.key, info.nkey);
2515 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2516 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
2517 }
2518
2519 pthread_mutex_lock(&tap_stats.mutex);
2520 tap_stats.sent.delete++;
2521 pthread_mutex_unlock(&tap_stats.mutex);
2522 break;
2523
2524 case TAP_DISCONNECT:
2525 disconnect = true;
2526 more_data = false;
2527 break;
2528 case TAP_VBUCKET_SET:
2529 case TAP_FLUSH:
2530 case TAP_OPAQUE:
2531 send_data = true;
2532
2533 if (event == TAP_OPAQUE) {
2534 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
2535 pthread_mutex_lock(&tap_stats.mutex);
2536 tap_stats.sent.opaque++;
2537 pthread_mutex_unlock(&tap_stats.mutex);
2538
2539 } else if (event == TAP_FLUSH) {
2540 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
2541 pthread_mutex_lock(&tap_stats.mutex);
2542 tap_stats.sent.flush++;
2543 pthread_mutex_unlock(&tap_stats.mutex);
2544 } else if (event == TAP_VBUCKET_SET) {
2545 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
2546 msg.flush.message.body.tap.flags = htons(tap_flags);
2547 pthread_mutex_lock(&tap_stats.mutex);
2548 tap_stats.sent.vbucket_set++;
2549 pthread_mutex_unlock(&tap_stats.mutex);
2550 }
2551
2552 msg.flush.message.header.request.bodylen = htonl(8 + nengine);
2553 memcpy(c->wcurr, msg.flush.bytes, sizeof(msg.flush.bytes));
2554 add_iov(c, c->wcurr, sizeof(msg.flush.bytes));
2555 c->wcurr += sizeof(msg.flush.bytes);
2556 c->wbytes += sizeof(msg.flush.bytes);
2557 if (nengine > 0) {
2558 memcpy(c->wcurr, engine, nengine);
2559 add_iov(c, c->wcurr, nengine);
2560 c->wcurr += nengine;
2561 c->wbytes += nengine;
2562 }
2563 break;
2564 default:
2565 abort();
2566 }
2567 } while (more_data);
2568
2569 c->ewouldblock = false;
2570 if (send_data) {
2571 conn_set_state(c, conn_mwrite);
2572 if (disconnect) {
2573 c->write_and_go = conn_closing;
2574 } else {
2575 c->write_and_go = conn_ship_log;
2576 }
2577 } else {
2578 if (disconnect) {
2579 conn_set_state(c, conn_closing);
2580 } else {
2581 /* No more items to ship to the slave at this time.. suspend.. */
2582 if (settings.verbose > 1) {
2583 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2584 "%d: No more items in tap log.. waiting\n",
2585 c->sfd);
2586 }
2587 c->ewouldblock = true;
2588 }
2589 }
2590 }
2591
2592
default_unknown_command(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR * descriptor,ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)2593 static ENGINE_ERROR_CODE default_unknown_command(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2594 ENGINE_HANDLE* handle,
2595 const void* cookie,
2596 protocol_binary_request_header *request,
2597 ADD_RESPONSE response)
2598 {
2599 (void)(descriptor);
2600 return settings.engine.v1->unknown_command(handle, cookie, request, response);
2601 }
2602
2603 struct request_lookup {
2604 EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor;
2605 BINARY_COMMAND_CALLBACK callback;
2606 };
2607
2608 static struct request_lookup request_handlers[0x100];
2609
initialize_binary_lookup_map(void)2610 static void initialize_binary_lookup_map(void) {
2611 for (int ii = 0; ii < 0x100; ++ii) {
2612 request_handlers[ii].descriptor = NULL;
2613 request_handlers[ii].callback = default_unknown_command;
2614 }
2615 }
2616
setup_binary_lookup_cmd(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR * descriptor,uint8_t cmd,BINARY_COMMAND_CALLBACK new_handler)2617 static void setup_binary_lookup_cmd(EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *descriptor,
2618 uint8_t cmd,
2619 BINARY_COMMAND_CALLBACK new_handler) {
2620 request_handlers[cmd].descriptor = descriptor;
2621 request_handlers[cmd].callback = new_handler;
2622 }
2623
process_bin_unknown_packet(conn * c)2624 static void process_bin_unknown_packet(conn *c) {
2625 void *packet = c->rcurr - (c->binary_header.request.bodylen +
2626 sizeof(c->binary_header));
2627
2628 ENGINE_ERROR_CODE ret = c->aiostat;
2629 c->aiostat = ENGINE_SUCCESS;
2630 c->ewouldblock = false;
2631
2632 if (ret == ENGINE_SUCCESS) {
2633 struct request_lookup *rq = request_handlers + c->binary_header.request.opcode;
2634 ret = rq->callback(rq->descriptor, settings.engine.v0, c, packet,
2635 binary_response_handler);
2636 }
2637
2638 switch (ret) {
2639 case ENGINE_SUCCESS:
2640 if (c->dynamic_buffer.buffer != NULL) {
2641 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
2642 c->dynamic_buffer.buffer = NULL;
2643 } else {
2644 conn_set_state(c, conn_new_cmd);
2645 }
2646 break;
2647 case ENGINE_EWOULDBLOCK:
2648 c->ewouldblock = true;
2649 break;
2650 case ENGINE_DISCONNECT:
2651 conn_set_state(c, conn_closing);
2652 break;
2653 default:
2654 // Release the dynamic buffer.. it may be partial..
2655 free(c->dynamic_buffer.buffer);
2656 c->dynamic_buffer.buffer = NULL;
2657 write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2658 }
2659 }
2660
process_bin_tap_connect(conn * c)2661 static void process_bin_tap_connect(conn *c) {
2662 char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2663 sizeof(c->binary_header)));
2664 protocol_binary_request_tap_connect *req = (void*)packet;
2665 const char *key = packet + sizeof(req->bytes);
2666 const char *data = key + c->binary_header.request.keylen;
2667 uint32_t flags = 0;
2668 size_t ndata = c->binary_header.request.bodylen -
2669 c->binary_header.request.extlen -
2670 c->binary_header.request.keylen;
2671
2672 if (c->binary_header.request.extlen == 4) {
2673 flags = ntohl(req->message.body.flags);
2674
2675 if (flags & TAP_CONNECT_FLAG_BACKFILL) {
2676 /* the userdata has to be at least 8 bytes! */
2677 if (ndata < 8) {
2678 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2679 "%d: ERROR: Invalid tap connect message\n",
2680 c->sfd);
2681 conn_set_state(c, conn_closing);
2682 return ;
2683 }
2684 }
2685 } else {
2686 data -= 4;
2687 key -= 4;
2688 }
2689
2690 if (settings.verbose && c->binary_header.request.keylen > 0) {
2691 char buffer[1024];
2692 unsigned int len = c->binary_header.request.keylen;
2693 if (len >= sizeof(buffer)) {
2694 len = sizeof(buffer) - 1;
2695 }
2696 memcpy(buffer, key, len);
2697 buffer[len] = '\0';
2698 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
2699 "%d: Trying to connect with named tap connection: <%s>\n",
2700 c->sfd, buffer);
2701 }
2702
2703 TAP_ITERATOR iterator = settings.engine.v1->get_tap_iterator(
2704 settings.engine.v0, c, key, c->binary_header.request.keylen,
2705 flags, data, ndata);
2706
2707 if (iterator == NULL) {
2708 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
2709 "%d: FATAL: The engine does not support tap\n",
2710 c->sfd);
2711 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
2712 c->write_and_go = conn_closing;
2713 } else {
2714 c->tap_iterator = iterator;
2715 c->which = EV_WRITE;
2716 conn_set_state(c, conn_ship_log);
2717 }
2718 }
2719
process_bin_tap_packet(tap_event_t event,conn * c)2720 static void process_bin_tap_packet(tap_event_t event, conn *c) {
2721 assert(c != NULL);
2722 char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2723 sizeof(c->binary_header)));
2724 protocol_binary_request_tap_no_extras *tap = (void*)packet;
2725 uint16_t nengine = ntohs(tap->message.body.tap.enginespecific_length);
2726 uint16_t tap_flags = ntohs(tap->message.body.tap.flags);
2727 uint32_t seqno = ntohl(tap->message.header.request.opaque);
2728 uint8_t ttl = tap->message.body.tap.ttl;
2729 char *engine_specific = packet + sizeof(tap->bytes);
2730 char *key = engine_specific + nengine;
2731 uint16_t nkey = c->binary_header.request.keylen;
2732 char *data = key + nkey;
2733 uint32_t flags = 0;
2734 uint32_t exptime = 0;
2735 uint32_t ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
2736 ENGINE_ERROR_CODE ret = c->aiostat;
2737
2738 if (ttl == 0) {
2739 ret = ENGINE_EINVAL;
2740 } else {
2741 if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
2742 event == TAP_CHECKPOINT_END) {
2743 protocol_binary_request_tap_mutation *mutation = (void*)tap;
2744 flags = ntohl(mutation->message.body.item.flags);
2745 exptime = ntohl(mutation->message.body.item.expiration);
2746 key += 8;
2747 data += 8;
2748 ndata -= 8;
2749 }
2750
2751 if (ret == ENGINE_SUCCESS) {
2752 ret = settings.engine.v1->tap_notify(settings.engine.v0, c,
2753 engine_specific, nengine,
2754 ttl - 1, tap_flags,
2755 event, seqno,
2756 key, nkey,
2757 flags, exptime,
2758 ntohll(tap->message.header.request.cas),
2759 data, ndata,
2760 c->binary_header.request.vbucket);
2761 }
2762 }
2763
2764 switch (ret) {
2765 case ENGINE_DISCONNECT:
2766 conn_set_state(c, conn_closing);
2767 break;
2768 case ENGINE_EWOULDBLOCK:
2769 c->ewouldblock = true;
2770 break;
2771 default:
2772 if ((tap_flags & TAP_FLAG_ACK) || (ret != ENGINE_SUCCESS)) {
2773 write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2774 } else {
2775 conn_set_state(c, conn_new_cmd);
2776 }
2777 }
2778 }
2779
process_bin_tap_ack(conn * c)2780 static void process_bin_tap_ack(conn *c) {
2781 assert(c != NULL);
2782 char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2783 sizeof(c->binary_header)));
2784 protocol_binary_response_no_extras *rsp = (void*)packet;
2785 uint32_t seqno = ntohl(rsp->message.header.response.opaque);
2786 uint16_t status = ntohs(rsp->message.header.response.status);
2787 char *key = packet + sizeof(rsp->bytes);
2788
2789 ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
2790 if (settings.engine.v1->tap_notify != NULL) {
2791 ret = settings.engine.v1->tap_notify(settings.engine.v0, c, NULL, 0, 0, status,
2792 TAP_ACK, seqno, key,
2793 c->binary_header.request.keylen, 0, 0,
2794 0, NULL, 0, 0);
2795 }
2796
2797 if (ret == ENGINE_DISCONNECT) {
2798 conn_set_state(c, conn_closing);
2799 } else {
2800 conn_set_state(c, conn_ship_log);
2801 }
2802 }
2803
2804 /**
2805 * We received a noop response.. just ignore it
2806 */
process_bin_noop_response(conn * c)2807 static void process_bin_noop_response(conn *c) {
2808 assert(c != NULL);
2809 conn_set_state(c, conn_new_cmd);
2810 }
2811
process_bin_verbosity(conn * c)2812 static void process_bin_verbosity(conn *c) {
2813 char *packet = (c->rcurr - (c->binary_header.request.bodylen +
2814 sizeof(c->binary_header)));
2815 protocol_binary_request_verbosity *req = (void*)packet;
2816 uint32_t level = (uint32_t)ntohl(req->message.body.level);
2817 if (level > MAX_VERBOSITY_LEVEL) {
2818 level = MAX_VERBOSITY_LEVEL;
2819 }
2820 settings.verbose = (int)level;
2821 perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
2822 write_bin_response(c, NULL, 0, 0, 0);
2823 }
2824
process_bin_packet(conn * c)2825 static void process_bin_packet(conn *c) {
2826 /* @todo this should be an array of funciton pointers and call through */
2827 switch (c->binary_header.request.opcode) {
2828 case PROTOCOL_BINARY_CMD_TAP_CONNECT:
2829 pthread_mutex_lock(&tap_stats.mutex);
2830 tap_stats.received.connect++;
2831 pthread_mutex_unlock(&tap_stats.mutex);
2832 conn_set_state(c, conn_add_tap_client);
2833 break;
2834 case PROTOCOL_BINARY_CMD_TAP_MUTATION:
2835 pthread_mutex_lock(&tap_stats.mutex);
2836 tap_stats.received.mutation++;
2837 pthread_mutex_unlock(&tap_stats.mutex);
2838 process_bin_tap_packet(TAP_MUTATION, c);
2839 break;
2840 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
2841 pthread_mutex_lock(&tap_stats.mutex);
2842 tap_stats.received.checkpoint_start++;
2843 pthread_mutex_unlock(&tap_stats.mutex);
2844 process_bin_tap_packet(TAP_CHECKPOINT_START, c);
2845 break;
2846 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
2847 pthread_mutex_lock(&tap_stats.mutex);
2848 tap_stats.received.checkpoint_end++;
2849 pthread_mutex_unlock(&tap_stats.mutex);
2850 process_bin_tap_packet(TAP_CHECKPOINT_END, c);
2851 break;
2852 case PROTOCOL_BINARY_CMD_TAP_DELETE:
2853 pthread_mutex_lock(&tap_stats.mutex);
2854 tap_stats.received.delete++;
2855 pthread_mutex_unlock(&tap_stats.mutex);
2856 process_bin_tap_packet(TAP_DELETION, c);
2857 break;
2858 case PROTOCOL_BINARY_CMD_TAP_FLUSH:
2859 pthread_mutex_lock(&tap_stats.mutex);
2860 tap_stats.received.flush++;
2861 pthread_mutex_unlock(&tap_stats.mutex);
2862 process_bin_tap_packet(TAP_FLUSH, c);
2863 break;
2864 case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
2865 pthread_mutex_lock(&tap_stats.mutex);
2866 tap_stats.received.opaque++;
2867 pthread_mutex_unlock(&tap_stats.mutex);
2868 process_bin_tap_packet(TAP_OPAQUE, c);
2869 break;
2870 case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
2871 pthread_mutex_lock(&tap_stats.mutex);
2872 tap_stats.received.vbucket_set++;
2873 pthread_mutex_unlock(&tap_stats.mutex);
2874 process_bin_tap_packet(TAP_VBUCKET_SET, c);
2875 break;
2876 case PROTOCOL_BINARY_CMD_VERBOSITY:
2877 process_bin_verbosity(c);
2878 break;
2879 default:
2880 process_bin_unknown_packet(c);
2881 }
2882 }
2883
2884 typedef void (*RESPONSE_HANDLER)(conn*);
2885 /**
2886 * A map between the response packets op-code and the function to handle
2887 * the response message.
2888 */
2889 static RESPONSE_HANDLER response_handlers[256] = {
2890 [PROTOCOL_BINARY_CMD_NOOP] = process_bin_noop_response,
2891 [PROTOCOL_BINARY_CMD_TAP_MUTATION] = process_bin_tap_ack,
2892 [PROTOCOL_BINARY_CMD_TAP_DELETE] = process_bin_tap_ack,
2893 [PROTOCOL_BINARY_CMD_TAP_FLUSH] = process_bin_tap_ack,
2894 [PROTOCOL_BINARY_CMD_TAP_OPAQUE] = process_bin_tap_ack,
2895 [PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET] = process_bin_tap_ack,
2896 [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START] = process_bin_tap_ack,
2897 [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END] = process_bin_tap_ack
2898 };
2899
dispatch_bin_command(conn * c)2900 static void dispatch_bin_command(conn *c) {
2901 int protocol_error = 0;
2902
2903 int extlen = c->binary_header.request.extlen;
2904 uint16_t keylen = c->binary_header.request.keylen;
2905 uint32_t bodylen = c->binary_header.request.bodylen;
2906
2907 if (settings.require_sasl && !authenticated(c)) {
2908 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2909 c->write_and_go = conn_closing;
2910 return;
2911 }
2912
2913 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
2914 c->noreply = true;
2915
2916 /* binprot supports 16bit keys, but internals are still 8bit */
2917 if (keylen > KEY_MAX_LENGTH) {
2918 handle_binary_protocol_error(c);
2919 return;
2920 }
2921
2922 switch (c->cmd) {
2923 case PROTOCOL_BINARY_CMD_SETQ:
2924 c->cmd = PROTOCOL_BINARY_CMD_SET;
2925 break;
2926 case PROTOCOL_BINARY_CMD_ADDQ:
2927 c->cmd = PROTOCOL_BINARY_CMD_ADD;
2928 break;
2929 case PROTOCOL_BINARY_CMD_REPLACEQ:
2930 c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
2931 break;
2932 case PROTOCOL_BINARY_CMD_DELETEQ:
2933 c->cmd = PROTOCOL_BINARY_CMD_DELETE;
2934 break;
2935 case PROTOCOL_BINARY_CMD_INCREMENTQ:
2936 c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
2937 break;
2938 case PROTOCOL_BINARY_CMD_DECREMENTQ:
2939 c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
2940 break;
2941 case PROTOCOL_BINARY_CMD_QUITQ:
2942 c->cmd = PROTOCOL_BINARY_CMD_QUIT;
2943 break;
2944 case PROTOCOL_BINARY_CMD_FLUSHQ:
2945 c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
2946 break;
2947 case PROTOCOL_BINARY_CMD_APPENDQ:
2948 c->cmd = PROTOCOL_BINARY_CMD_APPEND;
2949 break;
2950 case PROTOCOL_BINARY_CMD_PREPENDQ:
2951 c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
2952 break;
2953 case PROTOCOL_BINARY_CMD_GETQ:
2954 c->cmd = PROTOCOL_BINARY_CMD_GET;
2955 break;
2956 case PROTOCOL_BINARY_CMD_GETKQ:
2957 c->cmd = PROTOCOL_BINARY_CMD_GETK;
2958 break;
2959 default:
2960 c->noreply = false;
2961 }
2962
2963 switch (c->cmd) {
2964 case PROTOCOL_BINARY_CMD_VERSION:
2965 if (extlen == 0 && keylen == 0 && bodylen == 0) {
2966 write_bin_response(c, VERSION, 0, 0, strlen(VERSION));
2967 } else {
2968 protocol_error = 1;
2969 }
2970 break;
2971 case PROTOCOL_BINARY_CMD_FLUSH:
2972 if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
2973 bin_read_key(c, bin_read_flush_exptime, extlen);
2974 } else {
2975 protocol_error = 1;
2976 }
2977 break;
2978 case PROTOCOL_BINARY_CMD_NOOP:
2979 if (extlen == 0 && keylen == 0 && bodylen == 0) {
2980 write_bin_response(c, NULL, 0, 0, 0);
2981 } else {
2982 protocol_error = 1;
2983 }
2984 break;
2985 case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
2986 case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
2987 case PROTOCOL_BINARY_CMD_REPLACE:
2988 if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
2989 bin_read_key(c, bin_reading_set_header, 8);
2990 } else {
2991 protocol_error = 1;
2992 }
2993 break;
2994 case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */
2995 case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */
2996 case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
2997 case PROTOCOL_BINARY_CMD_GETK:
2998 if (extlen == 0 && bodylen == keylen && keylen > 0) {
2999 bin_read_key(c, bin_reading_get_key, 0);
3000 } else {
3001 protocol_error = 1;
3002 }
3003 break;
3004 case PROTOCOL_BINARY_CMD_DELETE:
3005 if (keylen > 0 && extlen == 0 && bodylen == keylen) {
3006 bin_read_key(c, bin_reading_del_header, extlen);
3007 } else {
3008 protocol_error = 1;
3009 }
3010 break;
3011 case PROTOCOL_BINARY_CMD_INCREMENT:
3012 case PROTOCOL_BINARY_CMD_DECREMENT:
3013 if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
3014 bin_read_key(c, bin_reading_incr_header, 20);
3015 } else {
3016 protocol_error = 1;
3017 }
3018 break;
3019 case PROTOCOL_BINARY_CMD_APPEND:
3020 case PROTOCOL_BINARY_CMD_PREPEND:
3021 if (keylen > 0 && extlen == 0) {
3022 bin_read_key(c, bin_reading_set_header, 0);
3023 } else {
3024 protocol_error = 1;
3025 }
3026 break;
3027 case PROTOCOL_BINARY_CMD_STAT:
3028 if (extlen == 0) {
3029 bin_read_key(c, bin_reading_stat, 0);
3030 } else {
3031 protocol_error = 1;
3032 }
3033 break;
3034 case PROTOCOL_BINARY_CMD_QUIT:
3035 if (keylen == 0 && extlen == 0 && bodylen == 0) {
3036 write_bin_response(c, NULL, 0, 0, 0);
3037 c->write_and_go = conn_closing;
3038 if (c->noreply) {
3039 conn_set_state(c, conn_closing);
3040 }
3041 } else {
3042 protocol_error = 1;
3043 }
3044 break;
3045 case PROTOCOL_BINARY_CMD_TAP_CONNECT:
3046 if (settings.engine.v1->get_tap_iterator == NULL) {
3047 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
3048 } else {
3049 bin_read_chunk(c, bin_reading_packet,
3050 c->binary_header.request.bodylen);
3051 }
3052 break;
3053 case PROTOCOL_BINARY_CMD_TAP_MUTATION:
3054 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
3055 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
3056 case PROTOCOL_BINARY_CMD_TAP_DELETE:
3057 case PROTOCOL_BINARY_CMD_TAP_FLUSH:
3058 case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
3059 case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
3060 if (settings.engine.v1->tap_notify == NULL) {
3061 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
3062 } else {
3063 bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
3064 }
3065 break;
3066 #ifdef SASL_ENABLED
3067 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
3068 if (extlen == 0 && keylen == 0 && bodylen == 0) {
3069 bin_list_sasl_mechs(c);
3070 } else {
3071 protocol_error = 1;
3072 }
3073 break;
3074 case PROTOCOL_BINARY_CMD_SASL_AUTH:
3075 case PROTOCOL_BINARY_CMD_SASL_STEP:
3076 if (extlen == 0 && keylen != 0) {
3077 bin_read_key(c, bin_reading_sasl_auth, 0);
3078 } else {
3079 protocol_error = 1;
3080 }
3081 break;
3082 #endif
3083 case PROTOCOL_BINARY_CMD_VERBOSITY:
3084 if (extlen == 4 && keylen == 0 && bodylen == 4) {
3085 bin_read_chunk(c, bin_reading_packet,
3086 c->binary_header.request.bodylen);
3087 } else {
3088 protocol_error = 1;
3089 }
3090 break;
3091 default:
3092 if (settings.engine.v1->unknown_command == NULL) {
3093 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
3094 bodylen);
3095 } else {
3096 bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
3097 }
3098 }
3099
3100 if (protocol_error)
3101 handle_binary_protocol_error(c);
3102 }
3103
process_bin_update(conn * c)3104 static void process_bin_update(conn *c) {
3105 char *key;
3106 uint16_t nkey;
3107 uint32_t vlen;
3108 item *it;
3109 protocol_binary_request_set* req = binary_get_request(c);
3110
3111 assert(c != NULL);
3112
3113 key = binary_get_key(c);
3114 nkey = c->binary_header.request.keylen;
3115
3116 /* fix byteorder in the request */
3117 req->message.body.flags = req->message.body.flags;
3118 rel_time_t expiration = ntohl(req->message.body.expiration);
3119
3120 vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
3121
3122 if (settings.verbose > 1) {
3123 char buffer[1024];
3124 const char *prefix;
3125 if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
3126 prefix = "ADD";
3127 } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
3128 prefix = "SET";
3129 } else {
3130 prefix = "REPLACE";
3131 }
3132
3133 size_t nw;
3134 nw = key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
3135 prefix, key, nkey);
3136
3137 if (nw != -1) {
3138 if (snprintf(buffer + nw, sizeof(buffer) - nw,
3139 " Value len is %d\n", vlen)) {
3140 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s",
3141 buffer);
3142 }
3143 }
3144 }
3145
3146 if (settings.detail_enabled) {
3147 stats_prefix_record_set(key, nkey);
3148 }
3149
3150 ENGINE_ERROR_CODE ret = c->aiostat;
3151 c->aiostat = ENGINE_SUCCESS;
3152 c->ewouldblock = false;
3153 item_info info = { .nvalue = 1 };
3154
3155 if (ret == ENGINE_SUCCESS) {
3156 ret = settings.engine.v1->allocate(settings.engine.v0, c,
3157 &it, key, nkey,
3158 vlen,
3159 req->message.body.flags,
3160 expiration);
3161 if (ret == ENGINE_SUCCESS && !settings.engine.v1->get_item_info(settings.engine.v0,
3162 c, it, &info)) {
3163 settings.engine.v1->release(settings.engine.v0, c, it);
3164 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
3165 return;
3166 }
3167 }
3168
3169 switch (ret) {
3170 case ENGINE_SUCCESS:
3171 item_set_cas(c, it, c->binary_header.request.cas);
3172
3173 switch (c->cmd) {
3174 case PROTOCOL_BINARY_CMD_ADD:
3175 c->store_op = OPERATION_ADD;
3176 break;
3177 case PROTOCOL_BINARY_CMD_SET:
3178 c->store_op = OPERATION_SET;
3179 break;
3180 case PROTOCOL_BINARY_CMD_REPLACE:
3181 c->store_op = OPERATION_REPLACE;
3182 break;
3183 default:
3184 assert(0);
3185 }
3186
3187 if (c->binary_header.request.cas != 0) {
3188 c->store_op = OPERATION_CAS;
3189 }
3190
3191 c->item = it;
3192 c->ritem = info.value[0].iov_base;
3193 c->rlbytes = vlen;
3194 conn_set_state(c, conn_nread);
3195 c->substate = bin_read_set_value;
3196 break;
3197 case ENGINE_EWOULDBLOCK:
3198 c->ewouldblock = true;
3199 break;
3200 case ENGINE_DISCONNECT:
3201 c->state = conn_closing;
3202 break;
3203 default:
3204 if (ret == ENGINE_E2BIG) {
3205 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
3206 } else {
3207 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
3208 }
3209
3210 /*
3211 * Avoid stale data persisting in cache because we failed alloc.
3212 * Unacceptable for SET (but only if cas matches).
3213 * Anywhere else too?
3214 */
3215 if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
3216 /* @todo fix this for the ASYNC interface! */
3217 settings.engine.v1->remove(settings.engine.v0, c, key, nkey,
3218 ntohll(req->message.header.request.cas),
3219 c->binary_header.request.vbucket);
3220 }
3221
3222 /* swallow the data line */
3223 c->write_and_go = conn_swallow;
3224 }
3225 }
3226
process_bin_append_prepend(conn * c)3227 static void process_bin_append_prepend(conn *c) {
3228 char *key;
3229 int nkey;
3230 int vlen;
3231 item *it;
3232
3233 assert(c != NULL);
3234
3235 key = binary_get_key(c);
3236 nkey = c->binary_header.request.keylen;
3237 vlen = c->binary_header.request.bodylen - nkey;
3238
3239 if (settings.verbose > 1) {
3240 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
3241 "Value len is %d\n", vlen);
3242 }
3243
3244 if (settings.detail_enabled) {
3245 stats_prefix_record_set(key, nkey);
3246 }
3247
3248 ENGINE_ERROR_CODE ret = c->aiostat;
3249 c->aiostat = ENGINE_SUCCESS;
3250 c->ewouldblock = false;
3251 item_info info = { .nvalue = 1 };
3252
3253 if (ret == ENGINE_SUCCESS) {
3254 ret = settings.engine.v1->allocate(settings.engine.v0, c,
3255 &it, key, nkey,
3256 vlen, 0, 0);
3257 if (ret == ENGINE_SUCCESS && !settings.engine.v1->get_item_info(settings.engine.v0,
3258 c, it, &info)) {
3259 settings.engine.v1->release(settings.engine.v0, c, it);
3260 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
3261 return;
3262 }
3263 }
3264
3265 switch (ret) {
3266 case ENGINE_SUCCESS:
3267 item_set_cas(c, it, c->binary_header.request.cas);
3268
3269 switch (c->cmd) {
3270 case PROTOCOL_BINARY_CMD_APPEND:
3271 c->store_op = OPERATION_APPEND;
3272 break;
3273 case PROTOCOL_BINARY_CMD_PREPEND:
3274 c->store_op = OPERATION_PREPEND;
3275 break;
3276 default:
3277 assert(0);
3278 }
3279
3280 c->item = it;
3281 c->ritem = info.value[0].iov_base;
3282 c->rlbytes = vlen;
3283 conn_set_state(c, conn_nread);
3284 c->substate = bin_read_set_value;
3285 break;
3286 case ENGINE_EWOULDBLOCK:
3287 c->ewouldblock = true;
3288 break;
3289 case ENGINE_DISCONNECT:
3290 c->state = conn_closing;
3291 break;
3292 default:
3293 if (ret == ENGINE_E2BIG) {
3294 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
3295 } else {
3296 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
3297 }
3298 /* swallow the data line */
3299 c->write_and_go = conn_swallow;
3300 }
3301 }
3302
process_bin_flush(conn * c)3303 static void process_bin_flush(conn *c) {
3304 time_t exptime = 0;
3305 protocol_binary_request_flush* req = binary_get_request(c);
3306
3307 if (c->binary_header.request.extlen == sizeof(req->message.body)) {
3308 exptime = ntohl(req->message.body.expiration);
3309 }
3310
3311 if (settings.verbose > 1) {
3312 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
3313 "%d: flush %ld", c->sfd,
3314 (long)exptime);
3315 }
3316
3317 ENGINE_ERROR_CODE ret;
3318 ret = settings.engine.v1->flush(settings.engine.v0, c, exptime);
3319
3320 if (ret == ENGINE_SUCCESS) {
3321 write_bin_response(c, NULL, 0, 0, 0);
3322 } else if (ret == ENGINE_ENOTSUP) {
3323 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
3324 } else {
3325 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
3326 }
3327 STATS_NOKEY(c, cmd_flush);
3328 }
3329
process_bin_delete(conn * c)3330 static void process_bin_delete(conn *c) {
3331 protocol_binary_request_delete* req = binary_get_request(c);
3332
3333 char* key = binary_get_key(c);
3334 size_t nkey = c->binary_header.request.keylen;
3335
3336 assert(c != NULL);
3337
3338 if (settings.verbose > 1) {
3339 char buffer[1024];
3340 if (key_to_printable_buffer(buffer, sizeof(buffer), c->sfd, true,
3341 "DELETE", key, nkey) != -1) {
3342 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c, "%s\n",
3343 buffer);
3344 }
3345 }
3346
3347 ENGINE_ERROR_CODE ret = c->aiostat;
3348 c->aiostat = ENGINE_SUCCESS;
3349 c->ewouldblock = false;
3350
3351 if (ret == ENGINE_SUCCESS) {
3352 if (settings.detail_enabled) {
3353 stats_prefix_record_delete(key, nkey);
3354 }
3355 ret = settings.engine.v1->remove(settings.engine.v0, c, key, nkey,
3356 ntohll(req->message.header.request.cas),
3357 c->binary_header.request.vbucket);
3358 }
3359
3360 /* For some reason the SLAB_INCR tries to access this... */
3361 item_info info = { .nvalue = 1 };
3362 switch (ret) {
3363 case ENGINE_SUCCESS:
3364 write_bin_response(c, NULL, 0, 0, 0);
3365 SLAB_INCR(c, delete_hits, key, nkey);
3366 break;
3367 case ENGINE_KEY_EEXISTS:
3368 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
3369 break;
3370 case ENGINE_KEY_ENOENT:
3371 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
3372 STATS_INCR(c, delete_misses, key, nkey);
3373 break;
3374 case ENGINE_NOT_MY_VBUCKET:
3375 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
3376 break;
3377 case ENGINE_EWOULDBLOCK:
3378 c->ewouldblock = true;
3379 break;
3380 default:
3381 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
3382 }
3383 }
3384
complete_nread_binary(conn * c)3385 static void complete_nread_binary(conn *c) {
3386 assert(c != NULL);
3387 assert(c->cmd >= 0);
3388
3389 switch(c->substate) {
3390 case bin_reading_set_header:
3391 if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
3392 c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
3393 process_bin_append_prepend(c);
3394 } else {
3395 process_bin_update(c);
3396 }
3397 break;
3398 case bin_read_set_value:
3399 complete_update_bin(c);
3400 break;
3401 case bin_reading_get_key:
3402 process_bin_get(c);
3403 break;
3404 case bin_reading_stat:
3405 process_bin_stat(c);
3406 break;
3407 case bin_reading_del_header:
3408 process_bin_delete(c);
3409 break;
3410 case bin_reading_incr_header:
3411 complete_incr_bin(c);
3412 break;
3413 case bin_read_flush_exptime:
3414 process_bin_flush(c);
3415 break;
3416 case bin_reading_sasl_auth:
3417 process_bin_sasl_auth(c);
3418 break;
3419 case bin_reading_sasl_auth_data:
3420 process_bin_complete_sasl_auth(c);
3421 break;
3422 case bin_reading_packet:
3423 if (c->binary_header.request.magic == PROTOCOL_BINARY_RES) {
3424 RESPONSE_HANDLER handler;
3425 handler = response_handlers[c->binary_header.request.opcode];
3426 if (handler) {
3427 handler(c);
3428 } else {
3429 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
3430 "%d: ERROR: Unsupported response packet received: %u\n",
3431 c->sfd, (unsigned int)c->binary_header.request.opcode);
3432 conn_set_state(c, conn_closing);
3433 }
3434 } else {
3435 process_bin_packet(c);
3436 }
3437 break;
3438 default:
3439 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
3440 "Not handling substate %d\n", c->substate);
3441 abort();
3442 }
3443 }
3444
reset_cmd_handler(conn * c)3445 static void reset_cmd_handler(conn *c) {
3446 c->sbytes = 0;
3447 c->ascii_cmd = NULL;
3448 c->cmd = -1;
3449 c->substate = bin_no_state;
3450 if(c->item != NULL) {
3451 settings.engine.v1->release(settings.engine.v0, c, c->item);
3452 c->item = NULL;
3453 }
3454 conn_shrink(c);
3455 if (c->rbytes > 0) {
3456 conn_set_state(c, conn_parse_cmd);
3457 } else {
3458 conn_set_state(c, conn_waiting);
3459 }
3460 }
3461
ascii_response_handler(const void * cookie,int nbytes,const char * dta)3462 static ENGINE_ERROR_CODE ascii_response_handler(const void *cookie,
3463 int nbytes,
3464 const char *dta)
3465 {
3466 conn *c = (conn*)cookie;
3467 char *buf;
3468 if (!grow_dynamic_buffer(c, nbytes)) {
3469 if (settings.verbose > 0) {
3470 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
3471 "<%d ERROR: Failed to allocate memory for response\n",
3472 c->sfd);
3473 }
3474 return ENGINE_ENOMEM;
3475 }
3476
3477 buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
3478 memcpy(buf, dta, nbytes);
3479 c->dynamic_buffer.offset += nbytes;
3480
3481 return ENGINE_SUCCESS;
3482 }
3483
complete_nread_ascii(conn * c)3484 static void complete_nread_ascii(conn *c) {
3485 if (c->ascii_cmd != NULL) {
3486 c->ewouldblock = false;
3487 switch (c->ascii_cmd->execute(c->ascii_cmd->cookie, c, 0, NULL,
3488 ascii_response_handler)) {
3489 case ENGINE_SUCCESS:
3490 if (c->dynamic_buffer.buffer != NULL) {
3491 write_and_free(c, c->dynamic_buffer.buffer,
3492 c->dynamic_buffer.offset);
3493 c->dynamic_buffer.buffer = NULL;
3494 } else {
3495 conn_set_state(c, conn_new_cmd);
3496 }
3497 break;
3498 case ENGINE_EWOULDBLOCK:
3499 c->ewouldblock = true;
3500 break;
3501 case ENGINE_DISCONNECT:
3502 default:
3503 conn_set_state(c, conn_closing);
3504 }
3505 } else {
3506 complete_update_ascii(c);
3507 }
3508 }
3509
complete_nread(conn * c)3510 static void complete_nread(conn *c) {
3511 assert(c != NULL);
3512 assert(c->protocol == ascii_prot
3513 || c->protocol == binary_prot);
3514
3515 if (c->protocol == ascii_prot) {
3516 complete_nread_ascii(c);
3517 } else if (c->protocol == binary_prot) {
3518 complete_nread_binary(c);
3519 }
3520 }
3521
3522 #define COMMAND_TOKEN 0
3523 #define SUBCOMMAND_TOKEN 1
3524 #define KEY_TOKEN 1
3525
3526 #define MAX_TOKENS 30
3527
3528 /*
3529 * Tokenize the command string by replacing whitespace with '\0' and update
3530 * the token array tokens with pointer to start of each token and length.
3531 * Returns total number of tokens. The last valid token is the terminal
3532 * token (value points to the first unprocessed character of the string and
3533 * length zero).
3534 *
3535 * Usage example:
3536 *
3537 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
3538 * for(int ix = 0; tokens[ix].length != 0; ix++) {
3539 * ...
3540 * }
3541 * ncommand = tokens[ix].value - command;
3542 * command = tokens[ix].value;
3543 * }
3544 */
tokenize_command(char * command,token_t * tokens,const size_t max_tokens)3545 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
3546 char *s, *e;
3547 size_t ntokens = 0;
3548
3549 assert(command != NULL && tokens != NULL && max_tokens > 1);
3550
3551 for (s = e = command; ntokens < max_tokens - 1; ++e) {
3552 if (*e == ' ') {
3553 if (s != e) {
3554 tokens[ntokens].value = s;
3555 tokens[ntokens].length = e - s;
3556 ntokens++;
3557 *e = '\0';
3558 }
3559 s = e + 1;
3560 }
3561 else if (*e == '\0') {
3562 if (s != e) {
3563 tokens[ntokens].value = s;
3564 tokens[ntokens].length = e - s;
3565 ntokens++;
3566 }
3567
3568 break; /* string end */
3569 }
3570 }
3571
3572 /*
3573 * If we scanned the whole string, the terminal value pointer is null,
3574 * otherwise it is the first unprocessed character.
3575 */
3576 tokens[ntokens].value = *e == '\0' ? NULL : e;
3577 tokens[ntokens].length = 0;
3578 ntokens++;
3579
3580 return ntokens;
3581 }
3582
detokenize(token_t * tokens,int ntokens,char ** out,int * nbytes)3583 static void detokenize(token_t *tokens, int ntokens, char **out, int *nbytes) {
3584 int i, nb;
3585 char *buf, *p;
3586
3587 nb = ntokens; // account for spaces, which is ntokens-1, plus the null
3588 for (i = 0; i < ntokens; ++i) {
3589 nb += tokens[i].length;
3590 }
3591
3592 buf = malloc(nb * sizeof(char));
3593 if (buf != NULL) {
3594 p = buf;
3595 for (i = 0; i < ntokens; ++i) {
3596 memcpy(p, tokens[i].value, tokens[i].length);
3597 p += tokens[i].length;
3598 *p = ' ';
3599 p++;
3600 }
3601 buf[nb - 1] = '\0';
3602 *nbytes = nb - 1;
3603 *out = buf;
3604 }
3605 }
3606
3607
3608 /* set up a connection to write a buffer then free it, used for stats */
write_and_free(conn * c,char * buf,int bytes)3609 static void write_and_free(conn *c, char *buf, int bytes) {
3610 if (buf) {
3611 c->write_and_free = buf;
3612 c->wcurr = buf;
3613 c->wbytes = bytes;
3614 conn_set_state(c, conn_write);
3615 c->write_and_go = conn_new_cmd;
3616 } else {
3617 out_string(c, "SERVER_ERROR out of memory writing stats");
3618 }
3619 }
3620
set_noreply_maybe(conn * c,token_t * tokens,size_t ntokens)3621 static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
3622 {
3623 int noreply_index = ntokens - 2;
3624
3625 /*
3626 NOTE: this function is not the first place where we are going to
3627 send the reply. We could send it instead from process_command()
3628 if the request line has wrong number of tokens. However parsing
3629 malformed line for "noreply" option is not reliable anyway, so
3630 it can't be helped.
3631 */
3632 if (tokens[noreply_index].value
3633 && strcmp(tokens[noreply_index].value, "noreply") == 0) {
3634 c->noreply = true;
3635 }
3636 return c->noreply;
3637 }
3638
append_stat(const char * name,ADD_STAT add_stats,conn * c,const char * fmt,...)3639 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
3640 const char *fmt, ...) {
3641 char val_str[STAT_VAL_LEN];
3642 int vlen;
3643 va_list ap;
3644
3645 assert(name);
3646 assert(add_stats);
3647 assert(c);
3648 assert(fmt);
3649
3650 va_start(ap, fmt);
3651 vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
3652 va_end(ap);
3653
3654 add_stats(name, strlen(name), val_str, vlen, c);
3655 }
3656
process_stats_detail(conn * c,const char * command)3657 inline static void process_stats_detail(conn *c, const char *command) {
3658 assert(c != NULL);
3659
3660 if (settings.allow_detailed) {
3661 if (strcmp(command, "on") == 0) {
3662 settings.detail_enabled = 1;
3663 out_string(c, "OK");
3664 }
3665 else if (strcmp(command, "off") == 0) {
3666 settings.detail_enabled = 0;
3667 out_string(c, "OK");
3668 }
3669 else if (strcmp(command, "dump") == 0) {
3670 int len;
3671 char *stats = stats_prefix_dump(&len);
3672 write_and_free(c, stats, len);
3673 }
3674 else {
3675 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
3676 }
3677 }
3678 else {
3679 out_string(c, "CLIENT_ERROR detailed stats disabled");
3680 }
3681 }
3682
aggregate_callback(void * in,void * out)3683 static void aggregate_callback(void *in, void *out) {
3684 struct thread_stats *out_thread_stats = out;
3685 struct independent_stats *in_independent_stats = in;
3686 threadlocal_stats_aggregate(in_independent_stats->thread_stats,
3687 out_thread_stats);
3688 }
3689
3690 /* return server specific stats only */
server_stats(ADD_STAT add_stats,conn * c,bool aggregate)3691 static void server_stats(ADD_STAT add_stats, conn *c, bool aggregate) {
3692 pid_t pid = getpid();
3693 rel_time_t now = current_time;
3694
3695 struct thread_stats thread_stats;
3696 threadlocal_stats_clear(&thread_stats);
3697
3698 if (aggregate && settings.engine.v1->aggregate_stats != NULL) {
3699 settings.engine.v1->aggregate_stats(settings.engine.v0,
3700 (const void *)c,
3701 aggregate_callback,
3702 &thread_stats);
3703 } else {
3704 threadlocal_stats_aggregate(get_independent_stats(c)->thread_stats,
3705 &thread_stats);
3706 }
3707
3708 struct slab_stats slab_stats;
3709 slab_stats_aggregate(&thread_stats, &slab_stats);
3710
3711 #ifndef __WIN32__
3712 struct rusage usage;
3713 getrusage(RUSAGE_SELF, &usage);
3714 #endif
3715
3716 STATS_LOCK();
3717
3718 APPEND_STAT("pid", "%lu", (long)pid);
3719 APPEND_STAT("uptime", "%u", now);
3720 APPEND_STAT("time", "%ld", now + (long)process_started);
3721 APPEND_STAT("version", "%s", VERSION);
3722 APPEND_STAT("libevent", "%s", event_get_version());
3723 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
3724
3725 #ifndef __WIN32__
3726 append_stat("rusage_user", add_stats, c, "%ld.%06ld",
3727 (long)usage.ru_utime.tv_sec,
3728 (long)usage.ru_utime.tv_usec);
3729 append_stat("rusage_system", add_stats, c, "%ld.%06ld",
3730 (long)usage.ru_stime.tv_sec,
3731 (long)usage.ru_stime.tv_usec);
3732 #endif
3733
3734 APPEND_STAT("daemon_connections", "%u", stats.daemon_conns);
3735 APPEND_STAT("curr_connections", "%u", stats.curr_conns);
3736 APPEND_STAT("total_connections", "%u", stats.total_conns);
3737 APPEND_STAT("connection_structures", "%u", stats.conn_structs);
3738 APPEND_STAT("cmd_get", "%"PRIu64, thread_stats.cmd_get);
3739 APPEND_STAT("cmd_set", "%"PRIu64, slab_stats.cmd_set);
3740 APPEND_STAT("cmd_flush", "%"PRIu64, thread_stats.cmd_flush);
3741 APPEND_STAT("auth_cmds", "%"PRIu64, thread_stats.auth_cmds);
3742 APPEND_STAT("auth_errors", "%"PRIu64, thread_stats.auth_errors);
3743 APPEND_STAT("get_hits", "%"PRIu64, slab_stats.get_hits);
3744 APPEND_STAT("get_misses", "%"PRIu64, thread_stats.get_misses);
3745 APPEND_STAT("delete_misses", "%"PRIu64, thread_stats.delete_misses);
3746 APPEND_STAT("delete_hits", "%"PRIu64, slab_stats.delete_hits);
3747 APPEND_STAT("incr_misses", "%"PRIu64, thread_stats.incr_misses);
3748 APPEND_STAT("incr_hits", "%"PRIu64, thread_stats.incr_hits);
3749 APPEND_STAT("decr_misses", "%"PRIu64, thread_stats.decr_misses);
3750 APPEND_STAT("decr_hits", "%"PRIu64, thread_stats.decr_hits);
3751 APPEND_STAT("cas_misses", "%"PRIu64, thread_stats.cas_misses);
3752 APPEND_STAT("cas_hits", "%"PRIu64, slab_stats.cas_hits);
3753 APPEND_STAT("cas_badval", "%"PRIu64, slab_stats.cas_badval);
3754 APPEND_STAT("bytes_read", "%"PRIu64, thread_stats.bytes_read);
3755 APPEND_STAT("bytes_written", "%"PRIu64, thread_stats.bytes_written);
3756 APPEND_STAT("limit_maxbytes", "%"PRIu64, settings.maxbytes);
3757 APPEND_STAT("accepting_conns", "%u", is_listen_disabled() ? 0 : 1);
3758 APPEND_STAT("listen_disabled_num", "%"PRIu64, get_listen_disabled_num());
3759 APPEND_STAT("rejected_conns", "%" PRIu64, (unsigned long long)stats.rejected_conns);
3760 APPEND_STAT("threads", "%d", settings.num_threads);
3761 APPEND_STAT("conn_yields", "%" PRIu64, (unsigned long long)thread_stats.conn_yields);
3762 STATS_UNLOCK();
3763
3764 /*
3765 * Add tap stats (only if non-zero)
3766 */
3767 struct tap_stats ts;
3768 pthread_mutex_lock(&tap_stats.mutex);
3769 ts = tap_stats;
3770 pthread_mutex_unlock(&tap_stats.mutex);
3771
3772 if (ts.sent.connect) {
3773 APPEND_STAT("tap_connect_sent", "%"PRIu64, ts.sent.connect);
3774 }
3775 if (ts.sent.mutation) {
3776 APPEND_STAT("tap_mutation_sent", "%"PRIu64, ts.sent.mutation);
3777 }
3778 if (ts.sent.checkpoint_start) {
3779 APPEND_STAT("tap_checkpoint_start_sent", "%"PRIu64, ts.sent.checkpoint_start);
3780 }
3781 if (ts.sent.checkpoint_end) {
3782 APPEND_STAT("tap_checkpoint_end_sent", "%"PRIu64, ts.sent.checkpoint_end);
3783 }
3784 if (ts.sent.delete) {
3785 APPEND_STAT("tap_delete_sent", "%"PRIu64, ts.sent.delete);
3786 }
3787 if (ts.sent.flush) {
3788 APPEND_STAT("tap_flush_sent", "%"PRIu64, ts.sent.flush);
3789 }
3790 if (ts.sent.opaque) {
3791 APPEND_STAT("tap_opaque_sent", "%"PRIu64, ts.sent.opaque);
3792 }
3793 if (ts.sent.vbucket_set) {
3794 APPEND_STAT("tap_vbucket_set_sent", "%"PRIu64,
3795 ts.sent.vbucket_set);
3796 }
3797 if (ts.received.connect) {
3798 APPEND_STAT("tap_connect_received", "%"PRIu64, ts.received.connect);
3799 }
3800 if (ts.received.mutation) {
3801 APPEND_STAT("tap_mutation_received", "%"PRIu64, ts.received.mutation);
3802 }
3803 if (ts.received.checkpoint_start) {
3804 APPEND_STAT("tap_checkpoint_start_received", "%"PRIu64, ts.received.checkpoint_start);
3805 }
3806 if (ts.received.checkpoint_end) {
3807 APPEND_STAT("tap_checkpoint_end_received", "%"PRIu64, ts.received.checkpoint_end);
3808 }
3809 if (ts.received.delete) {
3810 APPEND_STAT("tap_delete_received", "%"PRIu64, ts.received.delete);
3811 }
3812 if (ts.received.flush) {
3813 APPEND_STAT("tap_flush_received", "%"PRIu64, ts.received.flush);
3814 }
3815 if (ts.received.opaque) {
3816 APPEND_STAT("tap_opaque_received", "%"PRIu64, ts.received.opaque);
3817 }
3818 if (ts.received.vbucket_set) {
3819 APPEND_STAT("tap_vbucket_set_received", "%"PRIu64,
3820 ts.received.vbucket_set);
3821 }
3822 }
3823
process_stat_settings(ADD_STAT add_stats,void * c)3824 static void process_stat_settings(ADD_STAT add_stats, void *c) {
3825 assert(add_stats);
3826 APPEND_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
3827 APPEND_STAT("maxconns", "%d", settings.maxconns);
3828 APPEND_STAT("tcpport", "%d", settings.port);
3829 APPEND_STAT("udpport", "%d", settings.udpport);
3830 APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
3831 APPEND_STAT("verbosity", "%d", settings.verbose);
3832 APPEND_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
3833 APPEND_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
3834 APPEND_STAT("domain_socket", "%s",
3835 settings.socketpath ? settings.socketpath : "NULL");
3836 APPEND_STAT("umask", "%o", settings.access);
3837 APPEND_STAT("growth_factor", "%.2f", settings.factor);
3838 APPEND_STAT("chunk_size", "%d", settings.chunk_size);
3839 APPEND_STAT("num_threads", "%d", settings.num_threads);
3840 APPEND_STAT("num_threads_per_udp", "%d", settings.num_threads_per_udp);
3841 APPEND_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
3842 APPEND_STAT("detail_enabled", "%s",
3843 settings.detail_enabled ? "yes" : "no");
3844 APPEND_STAT("allow_detailed", "%s",
3845 settings.allow_detailed ? "yes" : "no");
3846 APPEND_STAT("reqs_per_event", "%d", settings.reqs_per_event);
3847 APPEND_STAT("reqs_per_tap_event", "%d", settings.reqs_per_tap_event);
3848 APPEND_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
3849 APPEND_STAT("tcp_backlog", "%d", settings.backlog);
3850 APPEND_STAT("binding_protocol", "%s",
3851 prot_text(settings.binding_protocol));
3852 #ifdef SASL_ENABLED
3853 APPEND_STAT("auth_enabled_sasl", "%s", "yes");
3854 #else
3855 APPEND_STAT("auth_enabled_sasl", "%s", "no");
3856 #endif
3857
3858 #ifdef ENABLE_ISASL
3859 APPEND_STAT("auth_sasl_engine", "%s", "isasl");
3860 #elif defined(ENABLE_SASL)
3861 APPEND_STAT("auth_sasl_engine", "%s", "cyrus");
3862 #else
3863 APPEND_STAT("auth_sasl_engine", "%s", "none");
3864 #endif
3865 APPEND_STAT("auth_required_sasl", "%s", settings.require_sasl ? "yes" : "no");
3866 APPEND_STAT("item_size_max", "%d", settings.item_size_max);
3867 APPEND_STAT("topkeys", "%d", settings.topkeys);
3868
3869 for (EXTENSION_DAEMON_DESCRIPTOR *ptr = settings.extensions.daemons;
3870 ptr != NULL;
3871 ptr = ptr->next) {
3872 APPEND_STAT("extension", "%s", ptr->get_name());
3873 }
3874
3875 APPEND_STAT("logger", "%s", settings.extensions.logger->get_name());
3876
3877 for (EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ptr = settings.extensions.ascii;
3878 ptr != NULL;
3879 ptr = ptr->next) {
3880 APPEND_STAT("ascii_extension", "%s", ptr->get_name(ptr->cookie));
3881 }
3882
3883 for (EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *ptr = settings.extensions.binary;
3884 ptr != NULL;
3885 ptr = ptr->next) {
3886 APPEND_STAT("binary_extension", "%s", ptr->get_name());
3887 }
3888 }
3889
process_stat(conn * c,token_t * tokens,const size_t ntokens)3890 static char *process_stat(conn *c, token_t *tokens, const size_t ntokens) {
3891 const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
3892 c->dynamic_buffer.offset = 0;
3893
3894 if (ntokens == 2) {
3895 ENGINE_ERROR_CODE ret = c->aiostat;
3896 c->aiostat = ENGINE_SUCCESS;
3897 c->ewouldblock = false;
3898 if (ret == ENGINE_SUCCESS) {
3899 server_stats(&append_stats, c, false);
3900 ret = settings.engine.v1->get_stats(settings.engine.v0, c,
3901 NULL, 0, &append_stats);
3902 if (ret == ENGINE_EWOULDBLOCK) {
3903 c->ewouldblock = true;
3904 return c->rcurr + 5;
3905 }
3906 }
3907 } else if (strcmp(subcommand, "reset") == 0) {
3908 stats_reset(c);
3909 out_string(c, "RESET");
3910 return NULL;
3911 } else if (strcmp(subcommand, "detail") == 0) {
3912 /* NOTE: how to tackle detail with binary? */
3913 if (ntokens < 4) {
3914 process_stats_detail(c, ""); /* outputs the error message */
3915 } else {
3916 process_stats_detail(c, tokens[2].value);
3917 }
3918 /* Output already generated */
3919 return NULL;
3920 } else if (strcmp(subcommand, "settings") == 0) {
3921 process_stat_settings(&append_stats, c);
3922 } else if (strcmp(subcommand, "cachedump") == 0) {
3923 char *buf = NULL;
3924 unsigned int bytes = 0, id, limit = 0;
3925
3926 if (ntokens < 5) {
3927 out_string(c, "CLIENT_ERROR bad command line");
3928 return NULL;
3929 }
3930
3931 if (!safe_strtoul(tokens[2].value, &id) ||
3932 !safe_strtoul(tokens[3].value, &limit)) {
3933 out_string(c, "CLIENT_ERROR bad command line format");
3934 return NULL;
3935 }
3936
3937 if (id >= POWER_LARGEST) {
3938 out_string(c, "CLIENT_ERROR Illegal slab id");
3939 return NULL;
3940 }
3941
3942 #ifdef FUTURE
3943 buf = item_cachedump(id, limit, &bytes);
3944 #endif
3945 write_and_free(c, buf, bytes);
3946 return NULL;
3947 } else if (strcmp(subcommand, "aggregate") == 0) {
3948 server_stats(&append_stats, c, true);
3949 } else if (strcmp(subcommand, "topkeys") == 0) {
3950 topkeys_t *tk = get_independent_stats(c)->topkeys;
3951 if (tk != NULL) {
3952 topkeys_stats(tk, c, current_time, append_stats);
3953 } else {
3954 out_string(c, "ERROR");
3955 return NULL;
3956 }
3957 } else {
3958 /* getting here means that the subcommand is either engine specific or
3959 is invalid. query the engine and see. */
3960 ENGINE_ERROR_CODE ret = c->aiostat;
3961 c->aiostat = ENGINE_SUCCESS;
3962 c->ewouldblock = false;
3963 if (ret == ENGINE_SUCCESS) {
3964 char *buf = NULL;
3965 int nb = -1;
3966 detokenize(&tokens[1], ntokens - 2, &buf, &nb);
3967 ret = settings.engine.v1->get_stats(settings.engine.v0, c, buf,
3968 nb, append_stats);
3969 free(buf);
3970 }
3971
3972 switch (ret) {
3973 case ENGINE_SUCCESS:
3974 append_stats(NULL, 0, NULL, 0, c);
3975 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
3976 c->dynamic_buffer.buffer = NULL;
3977 break;
3978 case ENGINE_ENOMEM:
3979 out_string(c, "SERVER_ERROR out of memory writing stats");
3980 break;
3981 case ENGINE_DISCONNECT:
3982 c->state = conn_closing;
3983 break;
3984 case ENGINE_ENOTSUP:
3985 out_string(c, "SERVER_ERROR not supported");
3986 break;
3987 case ENGINE_EWOULDBLOCK:
3988 c->ewouldblock = true;
3989 return tokens[SUBCOMMAND_TOKEN].value;
3990 default:
3991 out_string(c, "ERROR");
3992 break;
3993 }
3994
3995 return NULL;
3996 }
3997
3998 /* append terminator and start the transfer */
3999 append_stats(NULL, 0, NULL, 0, c);
4000
4001 if (c->dynamic_buffer.buffer == NULL) {
4002 out_string(c, "SERVER_ERROR out of memory writing stats");
4003 } else {
4004 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
4005 c->dynamic_buffer.buffer = NULL;
4006 }
4007
4008 return NULL;
4009 }
4010
4011 /**
4012 * Get a suffix buffer and insert it into the list of used suffix buffers
4013 * @param c the connection object
4014 * @return a pointer to a new suffix buffer or NULL if allocation failed
4015 */
get_suffix_buffer(conn * c)4016 static char *get_suffix_buffer(conn *c) {
4017 if (c->suffixleft == c->suffixsize) {
4018 char **new_suffix_list;
4019 size_t sz = sizeof(char*) * c->suffixsize * 2;
4020
4021 new_suffix_list = realloc(c->suffixlist, sz);
4022 if (new_suffix_list) {
4023 c->suffixsize *= 2;
4024 c->suffixlist = new_suffix_list;
4025 } else {
4026 if (settings.verbose > 1) {
4027 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4028 "=%d Failed to resize suffix buffer\n", c->sfd);
4029 }
4030
4031 return NULL;
4032 }
4033 }
4034
4035 char *suffix = cache_alloc(c->thread->suffix_cache);
4036 if (suffix != NULL) {
4037 *(c->suffixlist + c->suffixleft) = suffix;
4038 ++c->suffixleft;
4039 }
4040
4041 return suffix;
4042 }
4043
4044 /* ntokens is overwritten here... shrug.. */
process_get_command(conn * c,token_t * tokens,size_t ntokens,bool return_cas)4045 static inline char* process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
4046 char *key;
4047 size_t nkey;
4048 int i = c->ileft;
4049 item *it;
4050 token_t *key_token = &tokens[KEY_TOKEN];
4051 assert(c != NULL);
4052
4053 do {
4054 while(key_token->length != 0) {
4055
4056 key = key_token->value;
4057 nkey = key_token->length;
4058
4059 if(nkey > KEY_MAX_LENGTH) {
4060 out_string(c, "CLIENT_ERROR bad command line format");
4061 return NULL;
4062 }
4063
4064 ENGINE_ERROR_CODE ret = c->aiostat;
4065 c->aiostat = ENGINE_SUCCESS;
4066
4067 if (ret == ENGINE_SUCCESS) {
4068 ret = settings.engine.v1->get(settings.engine.v0, c, &it, key, nkey, 0);
4069 }
4070
4071 switch (ret) {
4072 case ENGINE_EWOULDBLOCK:
4073 c->ewouldblock = true;
4074 c->ileft = i;
4075 return key;
4076
4077 case ENGINE_SUCCESS:
4078 break;
4079 case ENGINE_KEY_ENOENT:
4080 default:
4081 it = NULL;
4082 break;
4083 }
4084
4085 if (settings.detail_enabled) {
4086 stats_prefix_record_get(key, nkey, NULL != it);
4087 }
4088
4089 if (it) {
4090 item_info info = { .nvalue = 1 };
4091 if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it,
4092 &info)) {
4093 settings.engine.v1->release(settings.engine.v0, c, it);
4094 out_string(c, "SERVER_ERROR error getting item data");
4095 break;
4096 }
4097
4098 if (i >= c->isize) {
4099 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
4100 if (new_list) {
4101 c->isize *= 2;
4102 c->ilist = new_list;
4103 } else {
4104 settings.engine.v1->release(settings.engine.v0, c, it);
4105 break;
4106 }
4107 }
4108
4109 /* Rebuild the suffix */
4110 char *suffix = get_suffix_buffer(c);
4111 if (suffix == NULL) {
4112 out_string(c, "SERVER_ERROR out of memory rebuilding suffix");
4113 settings.engine.v1->release(settings.engine.v0, c, it);
4114 return NULL;
4115 }
4116 int suffix_len = snprintf(suffix, SUFFIX_SIZE,
4117 " %u %u\r\n", htonl(info.flags),
4118 info.nbytes);
4119
4120 /*
4121 * Construct the response. Each hit adds three elements to the
4122 * outgoing data list:
4123 * "VALUE "
4124 * key
4125 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
4126 */
4127
4128 MEMCACHED_COMMAND_GET(c->sfd, info.key, info.nkey,
4129 info.nbytes, info.cas);
4130 if (return_cas)
4131 {
4132
4133 char *cas = get_suffix_buffer(c);
4134 if (cas == NULL) {
4135 out_string(c, "SERVER_ERROR out of memory making CAS suffix");
4136 settings.engine.v1->release(settings.engine.v0, c, it);
4137 return NULL;
4138 }
4139 int cas_len = snprintf(cas, SUFFIX_SIZE, " %"PRIu64"\r\n",
4140 info.cas);
4141 if (add_iov(c, "VALUE ", 6) != 0 ||
4142 add_iov(c, info.key, info.nkey) != 0 ||
4143 add_iov(c, suffix, suffix_len - 2) != 0 ||
4144 add_iov(c, cas, cas_len) != 0 ||
4145 add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
4146 add_iov(c, "\r\n", 2) != 0)
4147 {
4148 settings.engine.v1->release(settings.engine.v0, c, it);
4149 break;
4150 }
4151 }
4152 else
4153 {
4154 if (add_iov(c, "VALUE ", 6) != 0 ||
4155 add_iov(c, info.key, info.nkey) != 0 ||
4156 add_iov(c, suffix, suffix_len) != 0 ||
4157 add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
4158 add_iov(c, "\r\n", 2) != 0)
4159 {
4160 settings.engine.v1->release(settings.engine.v0, c, it);
4161 break;
4162 }
4163 }
4164
4165
4166 if (settings.verbose > 1) {
4167 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4168 ">%d sending key %s\n",
4169 c->sfd, info.key);
4170 }
4171
4172 /* item_get() has incremented it->refcount for us */
4173 STATS_HIT(c, get, key, nkey);
4174 *(c->ilist + i) = it;
4175 i++;
4176
4177 } else {
4178 STATS_MISS(c, get, key, nkey);
4179 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
4180 }
4181
4182 key_token++;
4183 }
4184
4185 /*
4186 * If the command string hasn't been fully processed, get the next set
4187 * of tokens.
4188 */
4189 if(key_token->value != NULL) {
4190 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
4191 key_token = tokens;
4192 }
4193
4194 } while(key_token->value != NULL);
4195
4196 c->icurr = c->ilist;
4197 c->ileft = i;
4198 c->suffixcurr = c->suffixlist;
4199
4200 if (settings.verbose > 1) {
4201 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4202 ">%d END\n", c->sfd);
4203 }
4204
4205 /*
4206 If the loop was terminated because of out-of-memory, it is not
4207 reliable to add END\r\n to the buffer, because it might not end
4208 in \r\n. So we send SERVER_ERROR instead.
4209 */
4210 if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
4211 || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
4212 out_string(c, "SERVER_ERROR out of memory writing get response");
4213 }
4214 else {
4215 conn_set_state(c, conn_mwrite);
4216 c->msgcurr = 0;
4217 }
4218
4219 return NULL;
4220 }
4221
process_update_command(conn * c,token_t * tokens,const size_t ntokens,ENGINE_STORE_OPERATION store_op,bool handle_cas)4222 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, ENGINE_STORE_OPERATION store_op, bool handle_cas) {
4223 char *key;
4224 size_t nkey;
4225 unsigned int flags;
4226 int32_t exptime_int = 0;
4227 time_t exptime;
4228 int vlen;
4229 uint64_t req_cas_id=0;
4230 item *it;
4231
4232 assert(c != NULL);
4233
4234 set_noreply_maybe(c, tokens, ntokens);
4235
4236 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
4237 out_string(c, "CLIENT_ERROR bad command line format");
4238 return;
4239 }
4240
4241 key = tokens[KEY_TOKEN].value;
4242 nkey = tokens[KEY_TOKEN].length;
4243
4244 if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
4245 && safe_strtol(tokens[3].value, &exptime_int)
4246 && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
4247 out_string(c, "CLIENT_ERROR bad command line format");
4248 return;
4249 }
4250
4251 /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
4252 exptime = exptime_int;
4253
4254 // does cas value exist?
4255 if (handle_cas) {
4256 if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
4257 out_string(c, "CLIENT_ERROR bad command line format");
4258 return;
4259 }
4260 }
4261
4262 if (vlen < 0) {
4263 out_string(c, "CLIENT_ERROR bad command line format");
4264 return;
4265 }
4266
4267 if (settings.detail_enabled) {
4268 stats_prefix_record_set(key, nkey);
4269 }
4270
4271 ENGINE_ERROR_CODE ret = c->aiostat;
4272 c->aiostat = ENGINE_SUCCESS;
4273 c->ewouldblock = false;
4274
4275 if (ret == ENGINE_SUCCESS) {
4276 ret = settings.engine.v1->allocate(settings.engine.v0, c,
4277 &it, key, nkey,
4278 vlen, htonl(flags), exptime);
4279 }
4280
4281 item_info info = { .nvalue = 1 };
4282 switch (ret) {
4283 case ENGINE_SUCCESS:
4284 item_set_cas(c, it, req_cas_id);
4285 if (!settings.engine.v1->get_item_info(settings.engine.v0, c, it, &info)) {
4286 settings.engine.v1->release(settings.engine.v0, c, it);
4287 out_string(c, "SERVER_ERROR error getting item data");
4288 break;
4289 }
4290 c->item = it;
4291 c->ritem = info.value[0].iov_base;
4292 c->rlbytes = vlen;
4293 c->store_op = store_op;
4294 conn_set_state(c, conn_nread);
4295 break;
4296 case ENGINE_EWOULDBLOCK:
4297 c->ewouldblock = true;
4298 break;
4299 case ENGINE_DISCONNECT:
4300 c->state = conn_closing;
4301 break;
4302 default:
4303 if (ret == ENGINE_E2BIG) {
4304 out_string(c, "SERVER_ERROR object too large for cache");
4305 } else {
4306 out_string(c, "SERVER_ERROR out of memory storing object");
4307 }
4308 /* swallow the data line */
4309 c->write_and_go = conn_swallow;
4310 c->sbytes = vlen + 2;
4311
4312 /* Avoid stale data persisting in cache because we failed alloc.
4313 * Unacceptable for SET. Anywhere else too? */
4314 if (store_op == OPERATION_SET) {
4315 settings.engine.v1->remove(settings.engine.v0, c, key, nkey, 0, 0);
4316 }
4317 }
4318 }
4319
process_arithmetic_command(conn * c,token_t * tokens,const size_t ntokens,const bool incr)4320 static char* process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
4321
4322 uint64_t delta;
4323 char *key;
4324 size_t nkey;
4325
4326 assert(c != NULL);
4327
4328 set_noreply_maybe(c, tokens, ntokens);
4329
4330 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
4331 out_string(c, "CLIENT_ERROR bad command line format");
4332 return NULL;
4333 }
4334
4335 key = tokens[KEY_TOKEN].value;
4336 nkey = tokens[KEY_TOKEN].length;
4337
4338 if (!safe_strtoull(tokens[2].value, &delta)) {
4339 out_string(c, "CLIENT_ERROR invalid numeric delta argument");
4340 return NULL;
4341 }
4342
4343 ENGINE_ERROR_CODE ret = c->aiostat;
4344 c->aiostat = ENGINE_SUCCESS;
4345 uint64_t cas;
4346 uint64_t result;
4347 if (ret == ENGINE_SUCCESS) {
4348 ret = settings.engine.v1->arithmetic(settings.engine.v0, c, key, nkey,
4349 incr, false, delta, 0, 0, &cas,
4350 &result, 0);
4351 }
4352
4353 char temp[INCR_MAX_STORAGE_LEN];
4354 switch (ret) {
4355 case ENGINE_SUCCESS:
4356 if (incr) {
4357 STATS_INCR(c, incr_hits, key, nkey);
4358 } else {
4359 STATS_INCR(c, decr_hits, key, nkey);
4360 }
4361 snprintf(temp, sizeof(temp), "%"PRIu64, result);
4362 out_string(c, temp);
4363 break;
4364 case ENGINE_KEY_ENOENT:
4365 if (incr) {
4366 STATS_INCR(c, incr_misses, key, nkey);
4367 } else {
4368 STATS_INCR(c, decr_misses, key, nkey);
4369 }
4370 out_string(c, "NOT_FOUND");
4371 break;
4372 case ENGINE_ENOMEM:
4373 out_string(c, "SERVER_ERROR out of memory");
4374 break;
4375 case ENGINE_TMPFAIL:
4376 out_string(c, "SERVER_ERROR temporary failure");
4377 break;
4378 case ENGINE_EINVAL:
4379 out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
4380 break;
4381 case ENGINE_NOT_STORED:
4382 out_string(c, "SERVER_ERROR failed to store item");
4383 break;
4384 case ENGINE_DISCONNECT:
4385 c->state = conn_closing;
4386 break;
4387 case ENGINE_ENOTSUP:
4388 out_string(c, "SERVER_ERROR not supported");
4389 break;
4390 case ENGINE_EWOULDBLOCK:
4391 c->ewouldblock = true;
4392 return key;
4393 default:
4394 abort();
4395 }
4396
4397 return NULL;
4398 }
4399
process_delete_command(conn * c,token_t * tokens,const size_t ntokens)4400 static char *process_delete_command(conn *c, token_t *tokens,
4401 const size_t ntokens) {
4402 char *key;
4403 size_t nkey;
4404
4405 assert(c != NULL);
4406
4407 if (ntokens > 3) {
4408 bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value, "0") == 0;
4409 bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
4410 bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
4411 || (ntokens == 5 && hold_is_zero && sets_noreply);
4412 if (!valid) {
4413 out_string(c, "CLIENT_ERROR bad command line format. "
4414 "Usage: delete <key> [noreply]");
4415 return NULL;
4416 }
4417 }
4418
4419 key = tokens[KEY_TOKEN].value;
4420 nkey = tokens[KEY_TOKEN].length;
4421
4422 if (nkey > KEY_MAX_LENGTH) {
4423 out_string(c, "CLIENT_ERROR bad command line format");
4424 return NULL;
4425 }
4426
4427 ENGINE_ERROR_CODE ret = c->aiostat;
4428 c->aiostat = ENGINE_SUCCESS;
4429 c->ewouldblock = false;
4430 if (ret == ENGINE_SUCCESS) {
4431 ret = settings.engine.v1->remove(settings.engine.v0, c,
4432 key, nkey, 0, 0);
4433 }
4434
4435 /* For some reason the SLAB_INCR tries to access this... */
4436 item_info info = { .nvalue = 1 };
4437 switch (ret) {
4438 case ENGINE_SUCCESS:
4439 out_string(c, "DELETED");
4440 SLAB_INCR(c, delete_hits, key, nkey);
4441 break;
4442 case ENGINE_EWOULDBLOCK:
4443 c->ewouldblock = true;
4444 return key;
4445 case ENGINE_TMPFAIL:
4446 out_string(c, "SERVER_ERROR temporary failure");
4447 break;
4448 default:
4449 out_string(c, "NOT_FOUND");
4450 STATS_INCR(c, delete_misses, key, nkey);
4451 }
4452
4453 if (ret != ENGINE_EWOULDBLOCK && settings.detail_enabled) {
4454 stats_prefix_record_delete(key, nkey);
4455 }
4456 return NULL;
4457 }
4458
process_verbosity_command(conn * c,token_t * tokens,const size_t ntokens)4459 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
4460 unsigned int level;
4461
4462 assert(c != NULL);
4463
4464 set_noreply_maybe(c, tokens, ntokens);
4465 if (c->noreply && ntokens == 3) {
4466 /* "verbosity noreply" is not according to the correct syntax */
4467 c->noreply = false;
4468 out_string(c, "ERROR");
4469 return;
4470 }
4471
4472 if (safe_strtoul(tokens[1].value, &level)) {
4473 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
4474 perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
4475 out_string(c, "OK");
4476 } else {
4477 out_string(c, "ERROR");
4478 }
4479 }
4480
process_command(conn * c,char * command)4481 static char* process_command(conn *c, char *command) {
4482
4483 token_t tokens[MAX_TOKENS];
4484 size_t ntokens;
4485 int comm;
4486 char *ret = NULL;
4487
4488 assert(c != NULL);
4489
4490 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
4491
4492 if (settings.verbose > 1) {
4493 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4494 "<%d %s\n", c->sfd, command);
4495 }
4496
4497 /*
4498 * for commands set/add/replace, we build an item and read the data
4499 * directly into it, then continue in nread_complete().
4500 */
4501
4502 if (c->ewouldblock) {
4503 /*
4504 * If we are retrying after the engine has completed a pending io for
4505 * this command, skip add_msghdr() etc and clear the ewouldblock flag.
4506 */
4507 c->ewouldblock = false;
4508 } else {
4509 c->msgcurr = 0;
4510 c->msgused = 0;
4511 c->iovused = 0;
4512 if (add_msghdr(c) != 0) {
4513 out_string(c, "SERVER_ERROR out of memory preparing response");
4514 return NULL;
4515 }
4516 }
4517
4518 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
4519 if (ntokens >= 3 &&
4520 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
4521 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
4522
4523 ret = process_get_command(c, tokens, ntokens, false);
4524
4525 } else if ((ntokens == 6 || ntokens == 7) &&
4526 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = (int)OPERATION_ADD)) ||
4527 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = (int)OPERATION_SET)) ||
4528 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = (int)OPERATION_REPLACE)) ||
4529 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = (int)OPERATION_PREPEND)) ||
4530 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = (int)OPERATION_APPEND)) )) {
4531
4532 process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm, false);
4533
4534 } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = (int)OPERATION_CAS))) {
4535
4536 process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm, true);
4537
4538 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
4539
4540 ret = process_arithmetic_command(c, tokens, ntokens, 1);
4541
4542 } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
4543
4544 ret = process_get_command(c, tokens, ntokens, true);
4545
4546 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
4547
4548 ret = process_arithmetic_command(c, tokens, ntokens, 0);
4549
4550 } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
4551
4552 ret = process_delete_command(c, tokens, ntokens);
4553
4554 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
4555
4556 ret = process_stat(c, tokens, ntokens);
4557
4558 } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
4559 time_t exptime;
4560
4561 set_noreply_maybe(c, tokens, ntokens);
4562
4563 if (ntokens == (c->noreply ? 3 : 2)) {
4564 exptime = 0;
4565 } else {
4566 exptime = strtol(tokens[1].value, NULL, 10);
4567 if(errno == ERANGE) {
4568 out_string(c, "CLIENT_ERROR bad command line format");
4569 return NULL;
4570 }
4571 }
4572
4573 ENGINE_ERROR_CODE ret = c->aiostat;
4574 c->aiostat = ENGINE_SUCCESS;
4575 c->ewouldblock = false;
4576 if (ret == ENGINE_SUCCESS) {
4577 ret = settings.engine.v1->flush(settings.engine.v0, c, exptime);
4578 }
4579
4580 switch (ret) {
4581 case ENGINE_SUCCESS:
4582 out_string(c, "OK");
4583 break;
4584 case ENGINE_ENOTSUP:
4585 out_string(c, "SERVER_ERROR not supported");
4586 break;
4587 case ENGINE_EWOULDBLOCK:
4588 c->ewouldblock = true;
4589 return c->rcurr + 9;
4590 default:
4591 out_string(c, "SERVER_ERROR failed to flush cache");
4592 }
4593
4594 if (ret != ENGINE_EWOULDBLOCK) {
4595 STATS_NOKEY(c, cmd_flush);
4596 }
4597 return NULL;
4598
4599 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
4600
4601 out_string(c, "VERSION " VERSION);
4602
4603 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
4604
4605 conn_set_state(c, conn_closing);
4606
4607 } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
4608 process_verbosity_command(c, tokens, ntokens);
4609 } else if (settings.extensions.ascii != NULL) {
4610 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *cmd;
4611 size_t nbytes = 0;
4612 char *ptr;
4613
4614 if (ntokens > 0) {
4615 if (ntokens == MAX_TOKENS) {
4616 out_string(c, "ERROR too many arguments");
4617 return NULL;
4618 }
4619
4620 if (tokens[ntokens - 1].length == 0) {
4621 --ntokens;
4622 }
4623 }
4624
4625 for (cmd = settings.extensions.ascii; cmd != NULL; cmd = cmd->next) {
4626 if (cmd->accept(cmd->cookie, c, ntokens, tokens, &nbytes, &ptr)) {
4627 break;
4628 }
4629 }
4630
4631 if (cmd == NULL) {
4632 out_string(c, "ERROR unknown command");
4633 } else if (nbytes == 0) {
4634 switch (cmd->execute(cmd->cookie, c, ntokens, tokens,
4635 ascii_response_handler)) {
4636 case ENGINE_SUCCESS:
4637 if (c->dynamic_buffer.buffer != NULL) {
4638 write_and_free(c, c->dynamic_buffer.buffer,
4639 c->dynamic_buffer.offset);
4640 c->dynamic_buffer.buffer = NULL;
4641 } else {
4642 conn_set_state(c, conn_new_cmd);
4643 }
4644 break;
4645 case ENGINE_EWOULDBLOCK:
4646 c->ewouldblock = true;
4647 ret = tokens[KEY_TOKEN].value;;
4648 break;
4649 case ENGINE_DISCONNECT:
4650 default:
4651 conn_set_state(c, conn_closing);
4652
4653 }
4654 } else {
4655 c->rlbytes = nbytes;
4656 c->ritem = ptr;
4657 c->ascii_cmd = cmd;
4658 /* NOT SUPPORTED YET! */
4659 conn_set_state(c, conn_nread);
4660 }
4661 } else {
4662 out_string(c, "ERROR");
4663 }
4664 return ret;
4665 }
4666
4667 /*
4668 * if we have a complete line in the buffer, process it.
4669 */
try_read_command(conn * c)4670 static int try_read_command(conn *c) {
4671 assert(c != NULL);
4672 assert(c->rcurr <= (c->rbuf + c->rsize));
4673 assert(c->rbytes > 0);
4674
4675 if (c->protocol == negotiating_prot || c->transport == udp_transport) {
4676 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
4677 c->protocol = binary_prot;
4678 } else {
4679 c->protocol = ascii_prot;
4680 }
4681
4682 if (settings.verbose > 1) {
4683 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4684 "%d: Client using the %s protocol\n", c->sfd,
4685 prot_text(c->protocol));
4686 }
4687 }
4688
4689 if (c->protocol == binary_prot) {
4690 /* Do we have the complete packet header? */
4691 if (c->rbytes < sizeof(c->binary_header)) {
4692 /* need more data! */
4693 return 0;
4694 } else {
4695 #ifdef NEED_ALIGN
4696 if (((long)(c->rcurr)) % 8 != 0) {
4697 /* must realign input buffer */
4698 memmove(c->rbuf, c->rcurr, c->rbytes);
4699 c->rcurr = c->rbuf;
4700 if (settings.verbose > 1) {
4701 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4702 "%d: Realign input buffer\n", c->sfd);
4703 }
4704 }
4705 #endif
4706 protocol_binary_request_header* req;
4707 req = (protocol_binary_request_header*)c->rcurr;
4708
4709 if (settings.verbose > 1) {
4710 /* Dump the packet before we convert it to host order */
4711 char buffer[1024];
4712 ssize_t nw;
4713 nw = bytes_to_output_string(buffer, sizeof(buffer), c->sfd,
4714 true, "Read binary protocol data:",
4715 (const char*)req->bytes,
4716 sizeof(req->bytes));
4717 if (nw != -1) {
4718 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
4719 "%s", buffer);
4720 }
4721 }
4722
4723 c->binary_header = *req;
4724 c->binary_header.request.keylen = ntohs(req->request.keylen);
4725 c->binary_header.request.bodylen = ntohl(req->request.bodylen);
4726 c->binary_header.request.vbucket = ntohs(req->request.vbucket);
4727 c->binary_header.request.cas = ntohll(req->request.cas);
4728
4729
4730 if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ &&
4731 !(c->binary_header.request.magic == PROTOCOL_BINARY_RES &&
4732 response_handlers[c->binary_header.request.opcode])) {
4733 if (settings.verbose) {
4734 if (c->binary_header.request.magic != PROTOCOL_BINARY_RES) {
4735 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
4736 "%d: Invalid magic: %x\n", c->sfd,
4737 c->binary_header.request.magic);
4738 } else {
4739 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
4740 "%d: ERROR: Unsupported response packet received: %u\n",
4741 c->sfd, (unsigned int)c->binary_header.request.opcode);
4742
4743 }
4744 }
4745 conn_set_state(c, conn_closing);
4746 return -1;
4747 }
4748
4749 c->msgcurr = 0;
4750 c->msgused = 0;
4751 c->iovused = 0;
4752 if (add_msghdr(c) != 0) {
4753 out_string(c, "SERVER_ERROR out of memory");
4754 return 0;
4755 }
4756
4757 c->cmd = c->binary_header.request.opcode;
4758 c->keylen = c->binary_header.request.keylen;
4759 c->opaque = c->binary_header.request.opaque;
4760 /* clear the returned cas value */
4761 c->cas = 0;
4762
4763 dispatch_bin_command(c);
4764
4765 c->rbytes -= sizeof(c->binary_header);
4766 c->rcurr += sizeof(c->binary_header);
4767 }
4768 } else {
4769 char *el, *cont, *left, lb;
4770
4771 if (c->rbytes == 0) {
4772 return 0;
4773 }
4774
4775 el = memchr(c->rcurr, '\n', c->rbytes);
4776 if (!el) {
4777 if (c->rbytes > 1024) {
4778 /*
4779 * We didn't have a '\n' in the first k. This _has_ to be a
4780 * large multiget, if not we should just nuke the connection.
4781 */
4782 char *ptr = c->rcurr;
4783 while (*ptr == ' ') { /* ignore leading whitespaces */
4784 ++ptr;
4785 }
4786
4787 if (ptr - c->rcurr > 100 ||
4788 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
4789
4790 conn_set_state(c, conn_closing);
4791 return 1;
4792 }
4793 }
4794
4795 return 0;
4796 }
4797 cont = el + 1;
4798 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
4799 el--;
4800 }
4801 lb = *el;
4802 *el = '\0';
4803
4804 assert(cont <= (c->rcurr + c->rbytes));
4805
4806 LIBEVENT_THREAD *thread = c->thread;
4807 LOCK_THREAD(thread);
4808 left = process_command(c, c->rcurr);
4809 if (c->ewouldblock) {
4810 unregister_event(c);
4811 }
4812 UNLOCK_THREAD(thread);
4813
4814 if (left != NULL) {
4815 /*
4816 * We have not processed the entire command. This happens
4817 * when the engine returns ENGINE_EWOULDBLOCK for one of the
4818 * keys in a get/gets request.
4819 */
4820 assert (left <= el);
4821
4822 int count = strlen(c->rcurr);
4823 if ((c->rcurr + count) == left) {
4824 // Retry the entire command
4825 cont = c->rcurr;
4826 } else {
4827 left -= (count + 1);
4828 cont = left;
4829 assert(cont >= c->rcurr);
4830 if (cont > c->rcurr) {
4831 memmove(cont, c->rcurr, count);
4832 }
4833 }
4834
4835 /* de-tokenize the command */
4836 while ((left = memchr(left, '\0', el - left)) != NULL) {
4837 *left = ' ';
4838 }
4839 *el = lb;
4840 }
4841
4842 c->rbytes -= (cont - c->rcurr);
4843 c->rcurr = cont;
4844
4845 assert(c->rcurr <= (c->rbuf + c->rsize));
4846 }
4847
4848 return 1;
4849 }
4850
4851 /*
4852 * read a UDP request.
4853 */
try_read_udp(conn * c)4854 static enum try_read_result try_read_udp(conn *c) {
4855 int res;
4856
4857 assert(c != NULL);
4858
4859 c->request_addr_size = sizeof(c->request_addr);
4860 res = recvfrom(c->sfd, c->rbuf, c->rsize,
4861 0, (struct sockaddr *)&c->request_addr, &c->request_addr_size);
4862 if (res > 8) {
4863 unsigned char *buf = (unsigned char *)c->rbuf;
4864 STATS_ADD(c, bytes_read, res);
4865
4866 /* Beginning of UDP packet is the request ID; save it. */
4867 c->request_id = buf[0] * 256 + buf[1];
4868
4869 /* If this is a multi-packet request, drop it. */
4870 if (buf[4] != 0 || buf[5] != 1) {
4871 out_string(c, "SERVER_ERROR multi-packet request not supported");
4872 return READ_NO_DATA_RECEIVED;
4873 }
4874
4875 /* Don't care about any of the rest of the header. */
4876 res -= 8;
4877 memmove(c->rbuf, c->rbuf + 8, res);
4878
4879 c->rbytes += res;
4880 c->rcurr = c->rbuf;
4881 return READ_DATA_RECEIVED;
4882 }
4883 return READ_NO_DATA_RECEIVED;
4884 }
4885
4886 /*
4887 * read from network as much as we can, handle buffer overflow and connection
4888 * close.
4889 * before reading, move the remaining incomplete fragment of a command
4890 * (if any) to the beginning of the buffer.
4891 *
4892 * To protect us from someone flooding a connection with bogus data causing
4893 * the connection to eat up all available memory, break out and start looking
4894 * at the data I've got after a number of reallocs...
4895 *
4896 * @return enum try_read_result
4897 */
try_read_network(conn * c)4898 static enum try_read_result try_read_network(conn *c) {
4899 enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
4900 int res;
4901 int num_allocs = 0;
4902 assert(c != NULL);
4903
4904 if (c->rcurr != c->rbuf) {
4905 if (c->rbytes != 0) /* otherwise there's nothing to copy */
4906 memmove(c->rbuf, c->rcurr, c->rbytes);
4907 c->rcurr = c->rbuf;
4908 }
4909
4910 while (1) {
4911 if (c->rbytes >= c->rsize) {
4912 if (num_allocs == 4) {
4913 return gotdata;
4914 }
4915 ++num_allocs;
4916 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
4917 if (!new_rbuf) {
4918 if (settings.verbose > 0) {
4919 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
4920 "Couldn't realloc input buffer\n");
4921 }
4922 c->rbytes = 0; /* ignore what we read */
4923 out_string(c, "SERVER_ERROR out of memory reading request");
4924 c->write_and_go = conn_closing;
4925 return READ_MEMORY_ERROR;
4926 }
4927 c->rcurr = c->rbuf = new_rbuf;
4928 c->rsize *= 2;
4929 }
4930
4931 int avail = c->rsize - c->rbytes;
4932 res = recv(c->sfd, c->rbuf + c->rbytes, avail, 0);
4933 if (res > 0) {
4934 STATS_ADD(c, bytes_read, res);
4935 gotdata = READ_DATA_RECEIVED;
4936 c->rbytes += res;
4937 if (res == avail) {
4938 continue;
4939 } else {
4940 break;
4941 }
4942 }
4943 if (res == 0) {
4944 return READ_ERROR;
4945 }
4946 if (res == -1) {
4947 if (errno == EAGAIN || errno == EWOULDBLOCK) {
4948 break;
4949 }
4950 return READ_ERROR;
4951 }
4952 }
4953 return gotdata;
4954 }
4955
register_event(conn * c,struct timeval * timeout)4956 bool register_event(conn *c, struct timeval *timeout) {
4957 assert(!c->registered_in_libevent);
4958
4959 if (event_add(&c->event, timeout) == -1) {
4960 settings.extensions.logger->log(EXTENSION_LOG_WARNING,
4961 NULL,
4962 "Failed to add connection to libevent: %s",
4963 strerror(errno));
4964 return false;
4965 }
4966
4967 c->registered_in_libevent = true;
4968
4969 return true;
4970 }
4971
unregister_event(conn * c)4972 bool unregister_event(conn *c) {
4973 assert(c->registered_in_libevent);
4974
4975 if (event_del(&c->event) == -1) {
4976 return false;
4977 }
4978
4979 c->registered_in_libevent = false;
4980
4981 return true;
4982 }
4983
4984
update_event(conn * c,const int new_flags)4985 bool update_event(conn *c, const int new_flags) {
4986 assert(c != NULL);
4987
4988 struct event_base *base = c->event.ev_base;
4989 if (c->ev_flags == new_flags)
4990 return true;
4991
4992 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
4993 "Updated event for %d to read=%s, write=%s\n",
4994 c->sfd, (new_flags & EV_READ ? "yes" : "no"),
4995 (new_flags & EV_WRITE ? "yes" : "no"));
4996
4997 if (!unregister_event(c)) {
4998 return false;
4999 }
5000
5001 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
5002 event_base_set(base, &c->event);
5003 c->ev_flags = new_flags;
5004
5005 return register_event(c, NULL);
5006 }
5007
5008 /*
5009 * Transmit the next chunk of data from our list of msgbuf structures.
5010 *
5011 * Returns:
5012 * TRANSMIT_COMPLETE All done writing.
5013 * TRANSMIT_INCOMPLETE More data remaining to write.
5014 * TRANSMIT_SOFT_ERROR Can't write any more right now.
5015 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
5016 */
transmit(conn * c)5017 static enum transmit_result transmit(conn *c) {
5018 assert(c != NULL);
5019
5020 if (c->msgcurr < c->msgused &&
5021 c->msglist[c->msgcurr].msg_iovlen == 0) {
5022 /* Finished writing the current msg; advance to the next. */
5023 c->msgcurr++;
5024 }
5025 if (c->msgcurr < c->msgused) {
5026 ssize_t res;
5027 struct msghdr *m = &c->msglist[c->msgcurr];
5028
5029 res = sendmsg(c->sfd, m, 0);
5030 if (res > 0) {
5031 STATS_ADD(c, bytes_written, res);
5032
5033 /* We've written some of the data. Remove the completed
5034 iovec entries from the list of pending writes. */
5035 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
5036 res -= m->msg_iov->iov_len;
5037 m->msg_iovlen--;
5038 m->msg_iov++;
5039 }
5040
5041 /* Might have written just part of the last iovec entry;
5042 adjust it so the next write will do the rest. */
5043 if (res > 0) {
5044 m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
5045 m->msg_iov->iov_len -= res;
5046 }
5047 return TRANSMIT_INCOMPLETE;
5048 }
5049 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5050 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5051 if (settings.verbose > 0) {
5052 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
5053 "Couldn't update event\n");
5054 }
5055 conn_set_state(c, conn_closing);
5056 return TRANSMIT_HARD_ERROR;
5057 }
5058 return TRANSMIT_SOFT_ERROR;
5059 }
5060 /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
5061 we have a real error, on which we close the connection */
5062 if (settings.verbose > 0) {
5063 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
5064 "Failed to write, and not due to blocking: %s",
5065 strerror(errno));
5066 }
5067
5068 if (IS_UDP(c->transport))
5069 conn_set_state(c, conn_read);
5070 else
5071 conn_set_state(c, conn_closing);
5072 return TRANSMIT_HARD_ERROR;
5073 } else {
5074 return TRANSMIT_COMPLETE;
5075 }
5076 }
5077
conn_listening(conn * c)5078 bool conn_listening(conn *c)
5079 {
5080 int sfd;
5081 struct sockaddr_storage addr;
5082 socklen_t addrlen = sizeof(addr);
5083
5084 if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
5085 if (errno == EMFILE) {
5086 if (settings.verbose > 0) {
5087 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5088 "Too many open connections\n");
5089 }
5090 disable_listen();
5091 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
5092 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
5093 "Failed to accept new client: %s\n",
5094 strerror(errno));
5095 }
5096
5097 return false;
5098 }
5099
5100 STATS_LOCK();
5101 int curr_conns = ++stats.curr_conns;
5102 STATS_UNLOCK();
5103
5104 if (curr_conns >= settings.maxconns) {
5105 STATS_LOCK();
5106 ++stats.rejected_conns;
5107 STATS_UNLOCK();
5108
5109 if (settings.verbose > 0) {
5110 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5111 "Too many open connections\n");
5112 }
5113
5114 safe_close(sfd);
5115 return false;
5116 }
5117
5118 if (evutil_make_socket_nonblocking(sfd) == -1) {
5119 safe_close(sfd);
5120 return false;
5121 }
5122
5123 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
5124 DATA_BUFFER_SIZE, tcp_transport);
5125
5126 return false;
5127 }
5128
5129 /**
5130 * Ship tap log to the other end. This state differs with all other states
5131 * in the way that it support full duplex dialog. We're listening to both read
5132 * and write events from libevent most of the time. If a read event occurs we
5133 * switch to the conn_read state to read and execute the input message (that would
5134 * be an ack message from the other side). If a write event occurs we continue to
5135 * send tap log to the other end.
5136 * @param c the tap connection to drive
5137 * @return true if we should continue to process work for this connection, false
5138 * if we should start processing events for other connections.
5139 */
conn_ship_log(conn * c)5140 bool conn_ship_log(conn *c) {
5141 bool cont = false;
5142
5143 if (c->sfd == INVALID_SOCKET) {
5144 return false;
5145 }
5146
5147 short mask = EV_READ | EV_PERSIST | EV_WRITE;
5148
5149 if (c->which & EV_READ || c->rbytes > 0) {
5150 if (c->rbytes > 0) {
5151 if (try_read_command(c) == 0) {
5152 conn_set_state(c, conn_read);
5153 }
5154 } else {
5155 conn_set_state(c, conn_read);
5156 }
5157
5158 // we're going to process something.. let's proceed
5159 cont = true;
5160
5161 // We have a finite number of messages in the input queue
5162 // so let's process all of them instead of backing off after
5163 // reading a subset of them.
5164 // Why? Because we've got every time we're calling ship_tap_log
5165 // we try to send a chunk of items.. This means that if we end
5166 // up in a situation where we're receiving a burst of nack messages
5167 // we'll only process a subset of messages in our input queue,
5168 // and it will slowly grow..
5169 c->nevents = settings.reqs_per_tap_event;
5170 } else if (c->which & EV_WRITE) {
5171 --c->nevents;
5172 if (c->nevents >= 0) {
5173 LOCK_THREAD(c->thread);
5174 c->ewouldblock = false;
5175 ship_tap_log(c);
5176 if (c->ewouldblock) {
5177 mask = EV_READ | EV_PERSIST;
5178 } else {
5179 cont = true;
5180 }
5181 UNLOCK_THREAD(c->thread);
5182 }
5183 }
5184
5185 if (!update_event(c, mask)) {
5186 if (settings.verbose > 0) {
5187 settings.extensions.logger->log(EXTENSION_LOG_INFO,
5188 c, "Couldn't update event\n");
5189 }
5190 conn_set_state(c, conn_closing);
5191 }
5192
5193 return cont;
5194 }
5195
conn_waiting(conn * c)5196 bool conn_waiting(conn *c) {
5197 if (!update_event(c, EV_READ | EV_PERSIST)) {
5198 if (settings.verbose > 0) {
5199 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5200 "Couldn't update event\n");
5201 }
5202 conn_set_state(c, conn_closing);
5203 return true;
5204 }
5205 conn_set_state(c, conn_read);
5206 return false;
5207 }
5208
conn_read(conn * c)5209 bool conn_read(conn *c) {
5210 int res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
5211 switch (res) {
5212 case READ_NO_DATA_RECEIVED:
5213 conn_set_state(c, conn_waiting);
5214 break;
5215 case READ_DATA_RECEIVED:
5216 conn_set_state(c, conn_parse_cmd);
5217 break;
5218 case READ_ERROR:
5219 conn_set_state(c, conn_closing);
5220 break;
5221 case READ_MEMORY_ERROR: /* Failed to allocate more memory */
5222 /* State already set by try_read_network */
5223 break;
5224 }
5225
5226 return true;
5227 }
5228
conn_parse_cmd(conn * c)5229 bool conn_parse_cmd(conn *c) {
5230 if (try_read_command(c) == 0) {
5231 /* wee need more data! */
5232 conn_set_state(c, conn_waiting);
5233 }
5234
5235 return !c->ewouldblock;
5236 }
5237
conn_new_cmd(conn * c)5238 bool conn_new_cmd(conn *c) {
5239 /* Only process nreqs at a time to avoid starving other connections */
5240 --c->nevents;
5241 if (c->nevents >= 0) {
5242 reset_cmd_handler(c);
5243 } else {
5244 STATS_NOKEY(c, conn_yields);
5245 if (c->rbytes > 0) {
5246 /* We have already read in data into the input buffer,
5247 so libevent will most likely not signal read events
5248 on the socket (unless more data is available. As a
5249 hack we should just put in a request to write data,
5250 because that should be possible ;-)
5251 */
5252 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5253 if (settings.verbose > 0) {
5254 settings.extensions.logger->log(EXTENSION_LOG_INFO,
5255 c, "Couldn't update event\n");
5256 }
5257 conn_set_state(c, conn_closing);
5258 return true;
5259 }
5260 }
5261 return false;
5262 }
5263
5264 return true;
5265 }
5266
5267
conn_swallow(conn * c)5268 bool conn_swallow(conn *c) {
5269 ssize_t res;
5270 /* we are reading sbytes and throwing them away */
5271 if (c->sbytes == 0) {
5272 conn_set_state(c, conn_new_cmd);
5273 return true;
5274 }
5275
5276 /* first check if we have leftovers in the conn_read buffer */
5277 if (c->rbytes > 0) {
5278 uint32_t tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
5279 c->sbytes -= tocopy;
5280 c->rcurr += tocopy;
5281 c->rbytes -= tocopy;
5282 return true;
5283 }
5284
5285 /* now try reading from the socket */
5286 res = recv(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize, 0);
5287 if (res > 0) {
5288 STATS_ADD(c, bytes_read, res);
5289 c->sbytes -= res;
5290 return true;
5291 }
5292 if (res == 0) { /* end of stream */
5293 conn_set_state(c, conn_closing);
5294 return true;
5295 }
5296 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5297 if (!update_event(c, EV_READ | EV_PERSIST)) {
5298 if (settings.verbose > 0) {
5299 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5300 "Couldn't update event\n");
5301 }
5302 conn_set_state(c, conn_closing);
5303 return true;
5304 }
5305 return false;
5306 }
5307
5308 if (errno != ENOTCONN && errno != ECONNRESET) {
5309 /* otherwise we have a real error, on which we close the connection */
5310 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5311 "Failed to read, and not due to blocking (%s)\n",
5312 strerror(errno));
5313 }
5314
5315 conn_set_state(c, conn_closing);
5316
5317 return true;
5318
5319 }
5320
conn_nread(conn * c)5321 bool conn_nread(conn *c) {
5322 ssize_t res;
5323
5324 if (c->rlbytes == 0) {
5325 LIBEVENT_THREAD *t = c->thread;
5326 LOCK_THREAD(t);
5327 bool block = c->ewouldblock = false;
5328 complete_nread(c);
5329 UNLOCK_THREAD(t);
5330 /* Breaking this into two, as complete_nread may have
5331 moved us to a different thread */
5332 t = c->thread;
5333 LOCK_THREAD(t);
5334 if (c->ewouldblock) {
5335 unregister_event(c);
5336 block = true;
5337 }
5338 UNLOCK_THREAD(t);
5339 return !block;
5340 }
5341 /* first check if we have leftovers in the conn_read buffer */
5342 if (c->rbytes > 0) {
5343 uint32_t tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
5344 if (c->ritem != c->rcurr) {
5345 memmove(c->ritem, c->rcurr, tocopy);
5346 }
5347 c->ritem += tocopy;
5348 c->rlbytes -= tocopy;
5349 c->rcurr += tocopy;
5350 c->rbytes -= tocopy;
5351 if (c->rlbytes == 0) {
5352 return true;
5353 }
5354 }
5355
5356 /* now try reading from the socket */
5357 res = recv(c->sfd, c->ritem, c->rlbytes, 0);
5358 if (res > 0) {
5359 STATS_ADD(c, bytes_read, res);
5360 if (c->rcurr == c->ritem) {
5361 c->rcurr += res;
5362 }
5363 c->ritem += res;
5364 c->rlbytes -= res;
5365 return true;
5366 }
5367 if (res == 0) { /* end of stream */
5368 conn_set_state(c, conn_closing);
5369 return true;
5370 }
5371
5372 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5373 if (!update_event(c, EV_READ | EV_PERSIST)) {
5374 if (settings.verbose > 0) {
5375 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5376 "Couldn't update event\n");
5377 }
5378 conn_set_state(c, conn_closing);
5379 return true;
5380 }
5381 return false;
5382 }
5383
5384 if (errno != ENOTCONN && errno != ECONNRESET) {
5385 /* otherwise we have a real error, on which we close the connection */
5386 settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
5387 "Failed to read, and not due to blocking:\n"
5388 "errno: %d %s \n"
5389 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
5390 errno, strerror(errno),
5391 (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
5392 (int)c->rlbytes, (int)c->rsize);
5393 }
5394 conn_set_state(c, conn_closing);
5395 return true;
5396 }
5397
conn_write(conn * c)5398 bool conn_write(conn *c) {
5399 /*
5400 * We want to write out a simple response. If we haven't already,
5401 * assemble it into a msgbuf list (this will be a single-entry
5402 * list for TCP or a two-entry list for UDP).
5403 */
5404 if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
5405 if (add_iov(c, c->wcurr, c->wbytes) != 0) {
5406 if (settings.verbose > 0) {
5407 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5408 "Couldn't build response\n");
5409 }
5410 conn_set_state(c, conn_closing);
5411 return true;
5412 }
5413 }
5414
5415 return conn_mwrite(c);
5416 }
5417
conn_mwrite(conn * c)5418 bool conn_mwrite(conn *c) {
5419 if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
5420 if (settings.verbose > 0) {
5421 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5422 "Failed to build UDP headers\n");
5423 }
5424 conn_set_state(c, conn_closing);
5425 return true;
5426 }
5427
5428 switch (transmit(c)) {
5429 case TRANSMIT_COMPLETE:
5430 if (c->state == conn_mwrite) {
5431 while (c->ileft > 0) {
5432 item *it = *(c->icurr);
5433 settings.engine.v1->release(settings.engine.v0, c, it);
5434 c->icurr++;
5435 c->ileft--;
5436 }
5437 while (c->suffixleft > 0) {
5438 char *suffix = *(c->suffixcurr);
5439 cache_free(c->thread->suffix_cache, suffix);
5440 c->suffixcurr++;
5441 c->suffixleft--;
5442 }
5443 /* XXX: I don't know why this wasn't the general case */
5444 if(c->protocol == binary_prot) {
5445 conn_set_state(c, c->write_and_go);
5446 } else {
5447 conn_set_state(c, conn_new_cmd);
5448 }
5449 } else if (c->state == conn_write) {
5450 if (c->write_and_free) {
5451 free(c->write_and_free);
5452 c->write_and_free = 0;
5453 }
5454 conn_set_state(c, c->write_and_go);
5455 } else {
5456 if (settings.verbose > 0) {
5457 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5458 "Unexpected state %d\n", c->state);
5459 }
5460 conn_set_state(c, conn_closing);
5461 }
5462 break;
5463
5464 case TRANSMIT_INCOMPLETE:
5465 case TRANSMIT_HARD_ERROR:
5466 break; /* Continue in state machine. */
5467
5468 case TRANSMIT_SOFT_ERROR:
5469 return false;
5470 }
5471
5472 return true;
5473 }
5474
conn_pending_close(conn * c)5475 bool conn_pending_close(conn *c) {
5476 assert(c->sfd == INVALID_SOCKET);
5477 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
5478 "Awaiting clients to release the cookie (pending close for %p)",
5479 (void*)c);
5480 LOCK_THREAD(c->thread);
5481 c->thread->pending_io = list_remove(c->thread->pending_io, c);
5482 if (!list_contains(c->thread->pending_close, c)) {
5483 enlist_conn(c, &c->thread->pending_close);
5484 }
5485 UNLOCK_THREAD(c->thread);
5486
5487 /*
5488 * tell the tap connection that we're disconnecting it now,
5489 * but give it a grace period
5490 */
5491 perform_callbacks(ON_DISCONNECT, NULL, c);
5492
5493 /*
5494 * disconnect callback may have changed the state for the object
5495 * so we might complete the disconnect now
5496 */
5497 return c->state != conn_pending_close;
5498 }
5499
conn_immediate_close(conn * c)5500 bool conn_immediate_close(conn *c) {
5501 settings.extensions.logger->log(EXTENSION_LOG_DETAIL, c,
5502 "Immediate close of %p",
5503 (void*)c);
5504 perform_callbacks(ON_DISCONNECT, NULL, c);
5505 conn_close(c);
5506
5507 return false;
5508 }
5509
conn_closing(conn * c)5510 bool conn_closing(conn *c) {
5511 if (IS_UDP(c->transport)) {
5512 conn_cleanup(c);
5513 return false;
5514 }
5515
5516 // We don't want any network notifications anymore..
5517 unregister_event(c);
5518 safe_close(c->sfd);
5519 c->sfd = INVALID_SOCKET;
5520
5521 if (c->refcount > 1) {
5522 conn_set_state(c, conn_pending_close);
5523 } else {
5524 conn_set_state(c, conn_immediate_close);
5525 }
5526 return true;
5527 }
5528
conn_add_tap_client(conn * c)5529 bool conn_add_tap_client(conn *c) {
5530 LIBEVENT_THREAD *tp = tap_thread;
5531 LIBEVENT_THREAD *orig_thread = c->thread;
5532
5533 assert(orig_thread);
5534 assert(orig_thread != tp);
5535
5536 c->ewouldblock = true;
5537
5538 unregister_event(c);
5539
5540 LOCK_THREAD(orig_thread);
5541 /* Clean out the lists */
5542 orig_thread->pending_io = list_remove(orig_thread->pending_io, c);
5543 orig_thread->pending_close = list_remove(orig_thread->pending_close, c);
5544
5545 LOCK_THREAD(tp);
5546 c->ev_flags = 0;
5547 conn_set_state(c, conn_setup_tap_stream);
5548 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
5549 "Moving %d conn from %p to %p\n",
5550 c->sfd, c->thread, tp);
5551 c->thread = tp;
5552 c->event.ev_base = tp->base;
5553 assert(c->next == NULL);
5554 assert(c->list_state == 0);
5555 enlist_conn(c, &tp->pending_io);
5556
5557 UNLOCK_THREAD(tp);
5558
5559 UNLOCK_THREAD(orig_thread);
5560
5561 notify_thread(tp);
5562
5563 return false;
5564 }
5565
conn_setup_tap_stream(conn * c)5566 bool conn_setup_tap_stream(conn *c) {
5567 process_bin_tap_connect(c);
5568 return true;
5569 }
5570
event_handler(const int fd,const short which,void * arg)5571 void event_handler(const int fd, const short which, void *arg) {
5572 conn *c;
5573
5574 c = (conn *)arg;
5575 assert(c != NULL);
5576
5577 if (memcached_shutdown) {
5578 event_base_loopbreak(c->event.ev_base);
5579 return ;
5580 }
5581
5582 c->which = which;
5583
5584 /* sanity */
5585 if (fd != c->sfd) {
5586 if (c->sfd != INVALID_SOCKET) {
5587 settings.extensions.logger->log(EXTENSION_LOG_INFO, c,
5588 "Catastrophic: event fd doesn't match conn fd!\n");
5589 }
5590 unregister_event(c);
5591 if (c->sfd != INVALID_SOCKET && c->thread != NULL) {
5592 conn_close(c);
5593 }
5594 return;
5595 }
5596
5597 perform_callbacks(ON_SWITCH_CONN, c, c);
5598
5599 c->nevents = settings.reqs_per_event;
5600 if (c->state == conn_ship_log) {
5601 c->nevents = settings.reqs_per_tap_event;
5602 }
5603
5604 LIBEVENT_THREAD *thr = c->thread;
5605
5606 // Do we have pending closes?
5607 const size_t max_items = 256;
5608 conn *pending_close[max_items];
5609 size_t n_pending_close = 0;
5610 if (thr != NULL) {
5611 LOCK_THREAD(thr);
5612 if (thr->pending_close && thr->last_checked != current_time) {
5613 assert(!has_cycle(thr->pending_close));
5614 thr->last_checked = current_time;
5615
5616 n_pending_close = list_to_array(pending_close, max_items,
5617 &thr->pending_close);
5618 }
5619 UNLOCK_THREAD(thr);
5620 }
5621
5622 if (settings.verbose) {
5623 do {
5624 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, c,
5625 "%d - Running task: (%s)\n",
5626 c->sfd, state_text(c->state));
5627 } while (c->state(c));
5628 } else {
5629 while (c->state(c)) {
5630 /* empty */
5631 }
5632 }
5633
5634 /* Close any connections pending close */
5635 if (n_pending_close > 0) {
5636 for (size_t i = 0; i < n_pending_close; ++i) {
5637 conn *ce = pending_close[i];
5638 if (ce->refcount == 1) {
5639 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
5640 "OK, time to nuke: %p\n",
5641 (void*)ce);
5642 conn_close(ce);
5643 pending_close[i] = NULL;
5644 } else {
5645 LOCK_THREAD(ce->thread);
5646 enlist_conn(ce, &ce->thread->pending_close);
5647 UNLOCK_THREAD(ce->thread);
5648 }
5649 }
5650 }
5651
5652 if (thr != NULL) {
5653 LOCK_THREAD(thr);
5654 finalize_list(pending_close, n_pending_close);
5655 UNLOCK_THREAD(thr);
5656 }
5657 }
5658
dispatch_event_handler(int fd,short which,void * arg)5659 static void dispatch_event_handler(int fd, short which, void *arg) {
5660 char buffer[80];
5661 ssize_t nr = recv(fd, buffer, sizeof(buffer), 0);
5662
5663 (void)(which);
5664 (void)(arg);
5665 if (nr != -1 && is_listen_disabled()) {
5666 bool enable = false;
5667 pthread_mutex_lock(&listen_state.mutex);
5668 listen_state.count -= nr;
5669 if (listen_state.count <= 0) {
5670 enable = true;
5671 listen_state.disabled = false;
5672 }
5673 pthread_mutex_unlock(&listen_state.mutex);
5674 if (enable) {
5675 conn *next;
5676 for (next = listen_conn; next; next = next->next) {
5677 update_event(next, EV_READ | EV_PERSIST);
5678 if (listen(next->sfd, settings.backlog) != 0) {
5679 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5680 "listen() failed",
5681 strerror(errno));
5682 }
5683 }
5684 }
5685 }
5686 }
5687
5688
5689
new_socket(struct addrinfo * ai)5690 static SOCKET new_socket(struct addrinfo *ai) {
5691 SOCKET sfd;
5692
5693 sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
5694 if (sfd == INVALID_SOCKET) {
5695 return INVALID_SOCKET;
5696 }
5697
5698 if (evutil_make_socket_nonblocking(sfd) == -1) {
5699 safe_close(sfd);
5700 return INVALID_SOCKET;
5701 }
5702
5703 return sfd;
5704 }
5705
5706
5707 /*
5708 * Sets a socket's send buffer size to the maximum allowed by the system.
5709 */
maximize_sndbuf(const int sfd)5710 static void maximize_sndbuf(const int sfd) {
5711 socklen_t intsize = sizeof(int);
5712 int last_good = 0;
5713 int min, max, avg;
5714 int old_size;
5715
5716 /* Start with the default size. */
5717 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&old_size, &intsize) != 0) {
5718 if (settings.verbose > 0) {
5719 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5720 "getsockopt(SO_SNDBUF): %s",
5721 strerror(errno));
5722 }
5723
5724 return;
5725 }
5726
5727 /* Binary-search for the real maximum. */
5728 min = old_size;
5729 max = MAX_SENDBUF_SIZE;
5730
5731 while (min <= max) {
5732 avg = ((unsigned int)(min + max)) / 2;
5733 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
5734 last_good = avg;
5735 min = avg + 1;
5736 } else {
5737 max = avg - 1;
5738 }
5739 }
5740
5741 if (settings.verbose > 1) {
5742 settings.extensions.logger->log(EXTENSION_LOG_DEBUG, NULL,
5743 "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
5744 }
5745 }
5746
5747
5748
5749 /**
5750 * Create a socket and bind it to a specific port number
5751 * @param interface the interface to bind to
5752 * @param port the port number to bind to
5753 * @param transport the transport protocol (TCP / UDP)
5754 * @param portnumber_file A filepointer to write the port numbers to
5755 * when they are successfully added to the list of ports we
5756 * listen on.
5757 */
server_socket(const char * interface,int port,enum network_transport transport,FILE * portnumber_file)5758 static int server_socket(const char *interface,
5759 int port,
5760 enum network_transport transport,
5761 FILE *portnumber_file) {
5762 int sfd;
5763 struct linger ling = {0, 0};
5764 struct addrinfo *ai;
5765 struct addrinfo *next;
5766 struct addrinfo hints = { .ai_flags = AI_PASSIVE,
5767 .ai_family = AF_UNSPEC };
5768 char port_buf[NI_MAXSERV];
5769 int error;
5770 int success = 0;
5771 int flags =1;
5772
5773 hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
5774
5775 if (port == -1) {
5776 port = 0;
5777 }
5778 snprintf(port_buf, sizeof(port_buf), "%d", port);
5779 error= getaddrinfo(interface, port_buf, &hints, &ai);
5780 if (error != 0) {
5781 if (error != EAI_SYSTEM) {
5782 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5783 "getaddrinfo(): %s\n", gai_strerror(error));
5784 } else {
5785 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5786 "getaddrinfo(): %s\n", strerror(error));
5787 }
5788 return 1;
5789 }
5790
5791 for (next= ai; next; next= next->ai_next) {
5792 conn *listen_conn_add;
5793 if ((sfd = new_socket(next)) == INVALID_SOCKET) {
5794 /* getaddrinfo can return "junk" addresses,
5795 * we make sure at least one works before erroring.
5796 */
5797 continue;
5798 }
5799
5800 #ifdef IPV6_V6ONLY
5801 if (next->ai_family == AF_INET6) {
5802 error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
5803 if (error != 0) {
5804 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5805 "setsockopt(IPV6_V6ONLY): %s",
5806 strerror(errno));
5807 safe_close(sfd);
5808 continue;
5809 }
5810 }
5811 #endif
5812
5813 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
5814 if (IS_UDP(transport)) {
5815 maximize_sndbuf(sfd);
5816 } else {
5817 error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
5818 if (error != 0) {
5819 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5820 "setsockopt(SO_KEEPALIVE): %s",
5821 strerror(errno));
5822 }
5823
5824 error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
5825 if (error != 0) {
5826 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5827 "setsockopt(SO_LINGER): %s",
5828 strerror(errno));
5829 }
5830
5831 error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
5832 if (error != 0) {
5833 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5834 "setsockopt(TCP_NODELAY): %s",
5835 strerror(errno));
5836 }
5837 }
5838
5839 if (bind(sfd, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR) {
5840 if (errno != EADDRINUSE) {
5841 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5842 "bind(): %s",
5843 strerror(errno));
5844 safe_close(sfd);
5845 freeaddrinfo(ai);
5846 return 1;
5847 }
5848 safe_close(sfd);
5849 continue;
5850 } else {
5851 success++;
5852 if (!IS_UDP(transport) && listen(sfd, settings.backlog) == SOCKET_ERROR) {
5853 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5854 "listen(): %s",
5855 strerror(errno));
5856 safe_close(sfd);
5857 freeaddrinfo(ai);
5858 return 1;
5859 }
5860 if (portnumber_file != NULL &&
5861 (next->ai_addr->sa_family == AF_INET ||
5862 next->ai_addr->sa_family == AF_INET6)) {
5863 union {
5864 struct sockaddr_in in;
5865 struct sockaddr_in6 in6;
5866 } my_sockaddr;
5867 socklen_t len = sizeof(my_sockaddr);
5868 if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
5869 if (next->ai_addr->sa_family == AF_INET) {
5870 fprintf(portnumber_file, "%s INET: %u\n",
5871 IS_UDP(transport) ? "UDP" : "TCP",
5872 ntohs(my_sockaddr.in.sin_port));
5873 } else {
5874 fprintf(portnumber_file, "%s INET6: %u\n",
5875 IS_UDP(transport) ? "UDP" : "TCP",
5876 ntohs(my_sockaddr.in6.sin6_port));
5877 }
5878 }
5879 }
5880 }
5881
5882 if (IS_UDP(transport)) {
5883 int c;
5884
5885 for (c = 0; c < settings.num_threads_per_udp; c++) {
5886 /* this is guaranteed to hit all threads because we round-robin */
5887 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
5888 UDP_READ_BUFFER_SIZE, transport);
5889 STATS_LOCK();
5890 ++stats.curr_conns;
5891 ++stats.daemon_conns;
5892 STATS_UNLOCK();
5893 }
5894 } else {
5895 if (!(listen_conn_add = conn_new(sfd, conn_listening,
5896 EV_READ | EV_PERSIST, 1,
5897 transport, main_base, NULL))) {
5898 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5899 "failed to create listening connection\n");
5900 exit(EXIT_FAILURE);
5901 }
5902 listen_conn_add->next = listen_conn;
5903 listen_conn = listen_conn_add;
5904 STATS_LOCK();
5905 ++stats.curr_conns;
5906 ++stats.daemon_conns;
5907 STATS_UNLOCK();
5908 }
5909 }
5910
5911 freeaddrinfo(ai);
5912
5913 /* Return zero iff we detected no errors in starting up connections */
5914 return success == 0;
5915 }
5916
server_sockets(int port,enum network_transport transport,FILE * portnumber_file)5917 static int server_sockets(int port, enum network_transport transport,
5918 FILE *portnumber_file) {
5919 if (settings.inter == NULL) {
5920 return server_socket(settings.inter, port, transport, portnumber_file);
5921 } else {
5922 // tokenize them and bind to each one of them..
5923 char *b;
5924 int ret = 0;
5925 char *list = strdup(settings.inter);
5926
5927 if (list == NULL) {
5928 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5929 "Failed to allocate memory for parsing server interface string\n");
5930 return 1;
5931 }
5932 for (char *p = strtok_r(list, ";,", &b);
5933 p != NULL;
5934 p = strtok_r(NULL, ";,", &b)) {
5935 int the_port = port;
5936
5937 char *s = strchr(p, ':');
5938 if (s != NULL) {
5939 *s = '\0';
5940 ++s;
5941 if (!safe_strtol(s, &the_port)) {
5942 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5943 "Invalid port number: \"%s\"", s);
5944 return 1;
5945 }
5946 }
5947 if (strcmp(p, "*") == 0) {
5948 p = NULL;
5949 }
5950 ret |= server_socket(p, the_port, transport, portnumber_file);
5951 }
5952 free(list);
5953 return ret;
5954 }
5955 }
5956
new_socket_unix(void)5957 static int new_socket_unix(void) {
5958 int sfd;
5959
5960 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == INVALID_SOCKET) {
5961 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
5962 "socket(AF_UNIX, SOCK_STREAM, 0): %s",
5963 strerror(errno));
5964 return INVALID_SOCKET;
5965 }
5966
5967 if (evutil_make_socket_nonblocking(sfd) == -1) {
5968 safe_close(sfd);
5969 return INVALID_SOCKET;
5970 }
5971 return sfd;
5972 }
5973
5974 /* this will probably not work on windows */
server_socket_unix(const char * path,int access_mask)5975 static int server_socket_unix(const char *path, int access_mask) {
5976 int sfd;
5977 struct linger ling = {0, 0};
5978 struct sockaddr_un addr;
5979 struct stat tstat;
5980 int flags =1;
5981 int old_umask;
5982
5983 if (!path) {
5984 return 1;
5985 }
5986
5987 if ((sfd = new_socket_unix()) == -1) {
5988 return 1;
5989 }
5990
5991 /*
5992 * Clean up a previous socket file if we left it around
5993 */
5994 if (lstat(path, &tstat) == 0) {
5995 if (S_ISSOCK(tstat.st_mode))
5996 unlink(path);
5997 }
5998
5999 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
6000 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
6001 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
6002
6003 /*
6004 * the memset call clears nonstandard fields in some impementations
6005 * that otherwise mess things up.
6006 */
6007 memset(&addr, 0, sizeof(addr));
6008
6009 addr.sun_family = AF_UNIX;
6010 strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
6011 assert(strcmp(addr.sun_path, path) == 0);
6012 old_umask = umask( ~(access_mask&0777));
6013 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
6014 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6015 "bind(): %s",
6016 strerror(errno));
6017 safe_close(sfd);
6018 umask(old_umask);
6019 return 1;
6020 }
6021 umask(old_umask);
6022 if (listen(sfd, settings.backlog) == -1) {
6023 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6024 "listen(): %s",
6025 strerror(errno));
6026 safe_close(sfd);
6027 return 1;
6028 }
6029 if (!(listen_conn = conn_new(sfd, conn_listening,
6030 EV_READ | EV_PERSIST, 1,
6031 local_transport, main_base, NULL))) {
6032 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6033 "failed to create listening connection\n");
6034 exit(EXIT_FAILURE);
6035 }
6036
6037 STATS_LOCK();
6038 ++stats.daemon_conns;
6039 STATS_UNLOCK();
6040
6041 return 0;
6042 }
6043
6044 static struct event clockevent;
6045
6046 /* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
set_current_time(void)6047 static void set_current_time(void) {
6048 struct timeval timer;
6049
6050 gettimeofday(&timer, NULL);
6051 current_time = (rel_time_t) (timer.tv_sec - process_started);
6052 }
6053
clock_handler(const int fd,const short which,void * arg)6054 static void clock_handler(const int fd, const short which, void *arg) {
6055 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
6056 static bool initialized = false;
6057
6058 (void)(fd);
6059 (void)(which);
6060 (void)(arg);
6061
6062 if (memcached_shutdown) {
6063 event_base_loopbreak(main_base);
6064 return ;
6065 }
6066
6067 if (initialized) {
6068 /* only delete the event if it's actually there. */
6069 evtimer_del(&clockevent);
6070 } else {
6071 initialized = true;
6072 }
6073
6074 evtimer_set(&clockevent, clock_handler, 0);
6075 event_base_set(main_base, &clockevent);
6076 evtimer_add(&clockevent, &t);
6077
6078 set_current_time();
6079 }
6080
usage(void)6081 static void usage(void) {
6082 printf(PACKAGE " " VERSION "\n");
6083 printf("-p <num> TCP port number to listen on (default: 11211)\n"
6084 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
6085 "-s <file> UNIX socket path to listen on (disables network support)\n"
6086 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
6087 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
6088 " <addr> may be specified as host:port. If you don't specify\n"
6089 " a port number, the value you specified with -p or -U is\n"
6090 " used. You may specify multiple addresses separated by comma\n"
6091 " or by using -l multiple times\n"
6092 "-d run as a daemon\n"
6093 "-r maximize core file limit\n"
6094 "-u <username> assume identity of <username> (only when run as root)\n"
6095 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
6096 "-M return error on memory exhausted (rather than removing items)\n"
6097 "-c <num> max simultaneous connections (default: 1000)\n"
6098 "-k lock down all paged memory. Note that there is a\n"
6099 " limit on how much memory you may lock. Trying to\n"
6100 " allocate more than that would fail, so be sure you\n"
6101 " set the limit correctly for the user you started\n"
6102 " the daemon with (not for -u <username> user;\n"
6103 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
6104 "-v verbose (print errors/warnings while in event loop)\n"
6105 "-vv very verbose (also print client commands/reponses)\n"
6106 "-vvv extremely verbose (also print internal state transitions)\n"
6107 "-h print this help and exit\n"
6108 "-i print memcached and libevent license\n"
6109 "-P <file> save PID in <file>, only used with -d option\n"
6110 "-f <factor> chunk size growth factor (default: 1.25)\n"
6111 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
6112 printf("-L Try to use large memory pages (if available). Increasing\n"
6113 " the memory page size could reduce the number of TLB misses\n"
6114 " and improve the performance. In order to get large pages\n"
6115 " from the OS, memcached will allocate the total item-cache\n"
6116 " in one large chunk.\n");
6117 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
6118 " This is used for per-prefix stats reporting. The default is\n"
6119 " \":\" (colon). If this option is specified, stats collection\n"
6120 " is turned on automatically; if not, then it may be turned on\n"
6121 " by sending the \"stats detail on\" command to the server.\n");
6122 printf("-t <num> number of threads to use (default: 4)\n");
6123 printf("-R Maximum number of requests per event, limits the number of\n"
6124 " requests process for a given connection to prevent \n"
6125 " starvation (default: 20)\n");
6126 printf("-C Disable use of CAS\n");
6127 printf("-b Set the backlog queue limit (default: 1024)\n");
6128 printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
6129 printf("-I Override the size of each slab page. Adjusts max item size\n"
6130 " (default: 1mb, min: 1k, max: 128m)\n");
6131 printf("-q Disable detailed stats commands\n");
6132 #ifdef SASL_ENABLED
6133 printf("-S Require SASL authentication\n");
6134 #endif
6135 printf("-X module,cfg Load the module and initialize it with the config\n");
6136 printf("-E engine Load engine as the storage engine\n");
6137 printf("-e config Pass config as configuration options to the storage engine\n");
6138 printf("\nEnvironment variables:\n"
6139 "MEMCACHED_PORT_FILENAME File to write port information to\n"
6140 "MEMCACHED_TOP_KEYS Number of top keys to keep track of\n"
6141 "MEMCACHED_REQS_TAP_EVENT Similar to -R but for tap_ship_log\n");
6142 }
6143
usage_license(void)6144 static void usage_license(void) {
6145 printf(PACKAGE " " VERSION "\n\n");
6146 printf(
6147 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
6148 "All rights reserved.\n"
6149 "\n"
6150 "Redistribution and use in source and binary forms, with or without\n"
6151 "modification, are permitted provided that the following conditions are\n"
6152 "met:\n"
6153 "\n"
6154 " * Redistributions of source code must retain the above copyright\n"
6155 "notice, this list of conditions and the following disclaimer.\n"
6156 "\n"
6157 " * Redistributions in binary form must reproduce the above\n"
6158 "copyright notice, this list of conditions and the following disclaimer\n"
6159 "in the documentation and/or other materials provided with the\n"
6160 "distribution.\n"
6161 "\n"
6162 " * Neither the name of the Danga Interactive nor the names of its\n"
6163 "contributors may be used to endorse or promote products derived from\n"
6164 "this software without specific prior written permission.\n"
6165 "\n"
6166 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
6167 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
6168 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
6169 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
6170 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
6171 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
6172 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
6173 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
6174 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
6175 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
6176 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
6177 "\n"
6178 "\n"
6179 "This product includes software developed by Niels Provos.\n"
6180 "\n"
6181 "[ libevent ]\n"
6182 "\n"
6183 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
6184 "All rights reserved.\n"
6185 "\n"
6186 "Redistribution and use in source and binary forms, with or without\n"
6187 "modification, are permitted provided that the following conditions\n"
6188 "are met:\n"
6189 "1. Redistributions of source code must retain the above copyright\n"
6190 " notice, this list of conditions and the following disclaimer.\n"
6191 "2. Redistributions in binary form must reproduce the above copyright\n"
6192 " notice, this list of conditions and the following disclaimer in the\n"
6193 " documentation and/or other materials provided with the distribution.\n"
6194 "3. All advertising materials mentioning features or use of this software\n"
6195 " must display the following acknowledgement:\n"
6196 " This product includes software developed by Niels Provos.\n"
6197 "4. The name of the author may not be used to endorse or promote products\n"
6198 " derived from this software without specific prior written permission.\n"
6199 "\n"
6200 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
6201 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
6202 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
6203 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
6204 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
6205 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
6206 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
6207 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
6208 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
6209 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
6210 );
6211
6212 return;
6213 }
6214
save_pid(const char * pid_file)6215 static void save_pid(const char *pid_file) {
6216 FILE *fp;
6217
6218 if (access(pid_file, F_OK) == 0) {
6219 if ((fp = fopen(pid_file, "r")) != NULL) {
6220 char buffer[1024];
6221 if (fgets(buffer, sizeof(buffer), fp) != NULL) {
6222 unsigned int pid;
6223 if (safe_strtoul(buffer, &pid) && kill((pid_t)pid, 0) == 0) {
6224 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6225 "WARNING: The pid file contained the following (running) pid: %u\n", pid);
6226 }
6227 }
6228 fclose(fp);
6229 }
6230 }
6231
6232 if ((fp = fopen(pid_file, "w")) == NULL) {
6233 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6234 "Could not open the pid file %s for writing: %s\n",
6235 pid_file, strerror(errno));
6236 return;
6237 }
6238
6239 fprintf(fp,"%ld\n", (long)getpid());
6240 if (fclose(fp) == -1) {
6241 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6242 "Could not close the pid file %s: %s\n",
6243 pid_file, strerror(errno));
6244 }
6245 }
6246
remove_pidfile(const char * pid_file)6247 static void remove_pidfile(const char *pid_file) {
6248 if (pid_file != NULL) {
6249 if (unlink(pid_file) != 0) {
6250 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6251 "Could not remove the pid file %s: %s\n",
6252 pid_file, strerror(errno));
6253 }
6254 }
6255 }
6256
6257 #ifndef HAVE_SIGIGNORE
sigignore(int sig)6258 static int sigignore(int sig) {
6259 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
6260
6261 if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
6262 return -1;
6263 }
6264 return 0;
6265 }
6266 #endif /* !HAVE_SIGIGNORE */
6267
sigterm_handler(int sig)6268 static void sigterm_handler(int sig) {
6269 assert(sig == SIGTERM || sig == SIGINT);
6270 memcached_shutdown = 1;
6271 }
6272
install_sigterm_handler(void)6273 static int install_sigterm_handler(void) {
6274 struct sigaction sa = {.sa_handler = sigterm_handler, .sa_flags = 0};
6275
6276 if (sigemptyset(&sa.sa_mask) == -1 || sigaction(SIGTERM, &sa, 0) == -1 ||
6277 sigaction(SIGINT, &sa, 0) == -1) {
6278 return -1;
6279 }
6280
6281 return 0;
6282 }
6283
6284 /*
6285 * On systems that supports multiple page sizes we may reduce the
6286 * number of TLB-misses by using the biggest available page size
6287 */
enable_large_pages(void)6288 static int enable_large_pages(void) {
6289 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
6290 int ret = -1;
6291 size_t sizes[32];
6292 int avail = getpagesizes(sizes, 32);
6293 if (avail != -1) {
6294 size_t max = sizes[0];
6295 struct memcntl_mha arg = {0};
6296 int ii;
6297
6298 for (ii = 1; ii < avail; ++ii) {
6299 if (max < sizes[ii]) {
6300 max = sizes[ii];
6301 }
6302 }
6303
6304 arg.mha_flags = 0;
6305 arg.mha_pagesize = max;
6306 arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
6307
6308 if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
6309 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6310 "Failed to set large pages: %s\nWill use default page size\n",
6311 strerror(errno));
6312 } else {
6313 ret = 0;
6314 }
6315 } else {
6316 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6317 "Failed to get supported pagesizes: %s\nWill use default page size\n",
6318 strerror(errno));
6319 }
6320
6321 return ret;
6322 #else
6323 return 0;
6324 #endif
6325 }
6326
get_server_version(void)6327 static const char* get_server_version(void) {
6328 return VERSION;
6329 }
6330
store_engine_specific(const void * cookie,void * engine_data)6331 static void store_engine_specific(const void *cookie,
6332 void *engine_data) {
6333 conn *c = (conn*)cookie;
6334 c->engine_storage = engine_data;
6335 }
6336
get_engine_specific(const void * cookie)6337 static void *get_engine_specific(const void *cookie) {
6338 conn *c = (conn*)cookie;
6339 return c->engine_storage;
6340 }
6341
get_socket_fd(const void * cookie)6342 static int get_socket_fd(const void *cookie) {
6343 conn *c = (conn *)cookie;
6344 return c->sfd;
6345 }
6346
reserve_cookie(const void * cookie)6347 static ENGINE_ERROR_CODE reserve_cookie(const void *cookie) {
6348 conn *c = (conn *)cookie;
6349 ++c->refcount;
6350 return ENGINE_SUCCESS;
6351 }
6352
release_cookie(const void * cookie)6353 static ENGINE_ERROR_CODE release_cookie(const void *cookie) {
6354 conn *c = (conn *)cookie;
6355 --c->refcount;
6356 return ENGINE_SUCCESS;
6357 }
6358
num_independent_stats(void)6359 static int num_independent_stats(void) {
6360 return settings.num_threads + 1;
6361 }
6362
new_independent_stats(void)6363 static void *new_independent_stats(void) {
6364 int ii;
6365 int nrecords = num_independent_stats();
6366 struct independent_stats *independent_stats = calloc(sizeof(independent_stats) + sizeof(struct thread_stats) * nrecords, 1);
6367 if (settings.topkeys > 0)
6368 independent_stats->topkeys = topkeys_init(settings.topkeys);
6369 for (ii = 0; ii < nrecords; ii++)
6370 pthread_mutex_init(&independent_stats->thread_stats[ii].mutex, NULL);
6371 return independent_stats;
6372 }
6373
release_independent_stats(void * stats)6374 static void release_independent_stats(void *stats) {
6375 int ii;
6376 int nrecords = num_independent_stats();
6377 struct independent_stats *independent_stats = stats;
6378 if (independent_stats->topkeys)
6379 topkeys_free(independent_stats->topkeys);
6380 for (ii = 0; ii < nrecords; ii++)
6381 pthread_mutex_destroy(&independent_stats->thread_stats[ii].mutex);
6382 free(independent_stats);
6383 }
6384
get_independent_stats(conn * c)6385 static inline struct independent_stats *get_independent_stats(conn *c) {
6386 struct independent_stats *independent_stats;
6387 if (settings.engine.v1->get_stats_struct != NULL) {
6388 independent_stats = settings.engine.v1->get_stats_struct(settings.engine.v0, (const void *)c);
6389 if (independent_stats == NULL)
6390 independent_stats = default_independent_stats;
6391 } else {
6392 independent_stats = default_independent_stats;
6393 }
6394 return independent_stats;
6395 }
6396
get_thread_stats(conn * c)6397 static inline struct thread_stats *get_thread_stats(conn *c) {
6398 struct independent_stats *independent_stats = get_independent_stats(c);
6399 assert(c->thread->index < num_independent_stats());
6400 return &independent_stats->thread_stats[c->thread->index];
6401 }
6402
register_callback(ENGINE_HANDLE * eh,ENGINE_EVENT_TYPE type,EVENT_CALLBACK cb,const void * cb_data)6403 static void register_callback(ENGINE_HANDLE *eh,
6404 ENGINE_EVENT_TYPE type,
6405 EVENT_CALLBACK cb, const void *cb_data) {
6406 struct engine_event_handler *h =
6407 calloc(sizeof(struct engine_event_handler), 1);
6408
6409 assert(h);
6410 h->cb = cb;
6411 h->cb_data = cb_data;
6412 h->next = engine_event_handlers[type];
6413 engine_event_handlers[type] = h;
6414 (void)(eh); /* unused */
6415 }
6416
get_current_time(void)6417 static rel_time_t get_current_time(void)
6418 {
6419 return current_time;
6420 }
6421
count_eviction(const void * cookie,const void * key,const int nkey)6422 static void count_eviction(const void *cookie, const void *key, const int nkey) {
6423 topkeys_t *tk = get_independent_stats((conn*)cookie)->topkeys;
6424 TK(tk, evictions, key, nkey, get_current_time());
6425 }
6426
6427 /**
6428 * To make it easy for engine implementors that doesn't want to care about
6429 * writing their own incr/decr code, they can just set the arithmetic function
6430 * to NULL and use this implementation. It is not efficient, due to the fact
6431 * that it does multiple calls through the interface (get and then cas store).
6432 * If you don't care, feel free to use it..
6433 */
internal_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)6434 static ENGINE_ERROR_CODE internal_arithmetic(ENGINE_HANDLE* handle,
6435 const void* cookie,
6436 const void* key,
6437 const int nkey,
6438 const bool increment,
6439 const bool create,
6440 const uint64_t delta,
6441 const uint64_t initial,
6442 const rel_time_t exptime,
6443 uint64_t *cas,
6444 uint64_t *result,
6445 uint16_t vbucket)
6446 {
6447 ENGINE_HANDLE_V1 *e = (ENGINE_HANDLE_V1*)handle;
6448
6449 item *it = NULL;
6450
6451 ENGINE_ERROR_CODE ret;
6452 ret = e->get(handle, cookie, &it, key, nkey, vbucket);
6453
6454 if (ret == ENGINE_SUCCESS) {
6455 item_info info = { .nvalue = 1 };
6456
6457 if (!e->get_item_info(handle, cookie, it, &info)) {
6458 e->release(handle, cookie, it);
6459 return ENGINE_FAILED;
6460 }
6461
6462 char value[80];
6463
6464 if (info.value[0].iov_len > (sizeof(value) - 1)) {
6465 e->release(handle, cookie, it);
6466 return ENGINE_EINVAL;
6467 }
6468
6469 memcpy(value, info.value[0].iov_base, info.value[0].iov_len);
6470 value[info.value[0].iov_len] = '\0';
6471
6472 uint64_t val;
6473 if (!safe_strtoull(value, &val)) {
6474 e->release(handle, cookie, it);
6475 return ENGINE_EINVAL;
6476 }
6477
6478 if (increment) {
6479 val += delta;
6480 } else {
6481 if (delta > val) {
6482 val = 0;
6483 } else {
6484 val -= delta;
6485 }
6486 }
6487
6488 size_t nb = snprintf(value, sizeof(value), "%"PRIu64, val);
6489 *result = val;
6490 item *nit = NULL;
6491 if (e->allocate(handle, cookie, &nit, key,
6492 nkey, nb, info.flags, info.exptime) != ENGINE_SUCCESS) {
6493 e->release(handle, cookie, it);
6494 return ENGINE_ENOMEM;
6495 }
6496
6497 item_info i2 = { .nvalue = 1 };
6498 if (!e->get_item_info(handle, cookie, nit, &i2)) {
6499 e->release(handle, cookie, it);
6500 e->release(handle, cookie, nit);
6501 return ENGINE_FAILED;
6502 }
6503
6504 memcpy(i2.value[0].iov_base, value, nb);
6505 e->item_set_cas(handle, cookie, nit, info.cas);
6506 ret = e->store(handle, cookie, nit, cas, OPERATION_CAS, vbucket);
6507 e->release(handle, cookie, it);
6508 e->release(handle, cookie, nit);
6509 } else if (ret == ENGINE_KEY_ENOENT && create) {
6510 char value[80];
6511 size_t nb = snprintf(value, sizeof(value), "%"PRIu64"\r\n", initial);
6512 *result = initial;
6513 if (e->allocate(handle, cookie, &it, key, nkey, nb, 0, exptime) != ENGINE_SUCCESS) {
6514 e->release(handle, cookie, it);
6515 return ENGINE_ENOMEM;
6516 }
6517
6518 item_info info = { .nvalue = 1 };
6519 if (!e->get_item_info(handle, cookie, it, &info)) {
6520 e->release(handle, cookie, it);
6521 return ENGINE_FAILED;
6522 }
6523
6524 memcpy(info.value[0].iov_base, value, nb);
6525 ret = e->store(handle, cookie, it, cas, OPERATION_CAS, vbucket);
6526 e->release(handle, cookie, it);
6527 }
6528
6529 /* We had a race condition.. just call ourself recursively to retry */
6530 if (ret == ENGINE_KEY_EEXISTS) {
6531 return internal_arithmetic(handle, cookie, key, nkey, increment, create, delta,
6532 initial, exptime, cas, result, vbucket);
6533 }
6534
6535 return ret;
6536 }
6537
6538 /**
6539 * Register an extension if it's not already registered
6540 *
6541 * @param type the type of the extension to register
6542 * @param extension the extension to register
6543 * @return true if success, false otherwise
6544 */
register_extension(extension_type_t type,void * extension)6545 static bool register_extension(extension_type_t type, void *extension)
6546 {
6547 if (extension == NULL) {
6548 return false;
6549 }
6550
6551 switch (type) {
6552 case EXTENSION_DAEMON:
6553 for (EXTENSION_DAEMON_DESCRIPTOR *ptr = settings.extensions.daemons;
6554 ptr != NULL;
6555 ptr = ptr->next) {
6556 if (ptr == extension) {
6557 return false;
6558 }
6559 }
6560 ((EXTENSION_DAEMON_DESCRIPTOR *)(extension))->next = settings.extensions.daemons;
6561 settings.extensions.daemons = extension;
6562 return true;
6563 case EXTENSION_LOGGER:
6564 settings.extensions.logger = extension;
6565 return true;
6566 case EXTENSION_ASCII_PROTOCOL:
6567 if (settings.extensions.ascii != NULL) {
6568 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *last;
6569 for (last = settings.extensions.ascii; last->next != NULL;
6570 last = last->next) {
6571 if (last == extension) {
6572 return false;
6573 }
6574 }
6575 if (last == extension) {
6576 return false;
6577 }
6578 last->next = extension;
6579 last->next->next = NULL;
6580 } else {
6581 settings.extensions.ascii = extension;
6582 settings.extensions.ascii->next = NULL;
6583 }
6584 return true;
6585
6586 case EXTENSION_BINARY_PROTOCOL:
6587 if (settings.extensions.binary != NULL) {
6588 EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *last;
6589 for (last = settings.extensions.binary; last->next != NULL;
6590 last = last->next) {
6591 if (last == extension) {
6592 return false;
6593 }
6594 }
6595 if (last == extension) {
6596 return false;
6597 }
6598 last->next = extension;
6599 last->next->next = NULL;
6600 } else {
6601 settings.extensions.binary = extension;
6602 settings.extensions.binary->next = NULL;
6603 }
6604
6605 ((EXTENSION_BINARY_PROTOCOL_DESCRIPTOR*)extension)->setup(setup_binary_lookup_cmd);
6606 return true;
6607
6608 default:
6609 return false;
6610 }
6611 }
6612
6613 /**
6614 * Unregister an extension
6615 *
6616 * @param type the type of the extension to remove
6617 * @param extension the extension to remove
6618 */
unregister_extension(extension_type_t type,void * extension)6619 static void unregister_extension(extension_type_t type, void *extension)
6620 {
6621 switch (type) {
6622 case EXTENSION_DAEMON:
6623 {
6624 EXTENSION_DAEMON_DESCRIPTOR *prev = NULL;
6625 EXTENSION_DAEMON_DESCRIPTOR *ptr = settings.extensions.daemons;
6626
6627 while (ptr != NULL && ptr != extension) {
6628 prev = ptr;
6629 ptr = ptr->next;
6630 }
6631
6632 if (ptr != NULL && prev != NULL) {
6633 prev->next = ptr->next;
6634 }
6635
6636 if (settings.extensions.daemons == ptr) {
6637 settings.extensions.daemons = ptr->next;
6638 }
6639 }
6640 break;
6641 case EXTENSION_LOGGER:
6642 if (settings.extensions.logger == extension) {
6643 if (get_stderr_logger() == extension) {
6644 settings.extensions.logger = get_null_logger();
6645 } else {
6646 settings.extensions.logger = get_stderr_logger();
6647 }
6648 }
6649 break;
6650 case EXTENSION_ASCII_PROTOCOL:
6651 {
6652 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *prev = NULL;
6653 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ptr = settings.extensions.ascii;
6654
6655 while (ptr != NULL && ptr != extension) {
6656 prev = ptr;
6657 ptr = ptr->next;
6658 }
6659
6660 if (ptr != NULL && prev != NULL) {
6661 prev->next = ptr->next;
6662 }
6663
6664 if (settings.extensions.ascii == ptr) {
6665 settings.extensions.ascii = ptr->next;
6666 }
6667 }
6668 break;
6669
6670
6671 case EXTENSION_BINARY_PROTOCOL:
6672 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6673 "You can't unregister a binary command handler!");
6674 abort();
6675 break;
6676
6677 default:
6678 ;
6679 }
6680
6681 }
6682
6683 /**
6684 * Get the named extension
6685 */
get_extension(extension_type_t type)6686 static void* get_extension(extension_type_t type)
6687 {
6688 switch (type) {
6689 case EXTENSION_DAEMON:
6690 return settings.extensions.daemons;
6691
6692 case EXTENSION_LOGGER:
6693 return settings.extensions.logger;
6694
6695 case EXTENSION_ASCII_PROTOCOL:
6696 return settings.extensions.ascii;
6697
6698 case EXTENSION_BINARY_PROTOCOL:
6699 return settings.extensions.binary;
6700
6701 default:
6702 return NULL;
6703 }
6704 }
6705
shutdown_server(void)6706 static void shutdown_server(void) {
6707 memcached_shutdown = 1;
6708 }
6709
get_logger(void)6710 static EXTENSION_LOGGER_DESCRIPTOR* get_logger(void)
6711 {
6712 return settings.extensions.logger;
6713 }
6714
get_log_level(void)6715 static EXTENSION_LOG_LEVEL get_log_level(void)
6716 {
6717 EXTENSION_LOG_LEVEL ret;
6718 switch (settings.verbose) {
6719 case 0: ret = EXTENSION_LOG_WARNING; break;
6720 case 1: ret = EXTENSION_LOG_INFO; break;
6721 case 2: ret = EXTENSION_LOG_DEBUG; break;
6722 default:
6723 ret = EXTENSION_LOG_DETAIL;
6724 }
6725 return ret;
6726 }
6727
set_log_level(EXTENSION_LOG_LEVEL severity)6728 static void set_log_level(EXTENSION_LOG_LEVEL severity)
6729 {
6730 switch (severity) {
6731 case EXTENSION_LOG_WARNING: settings.verbose = 0; break;
6732 case EXTENSION_LOG_INFO: settings.verbose = 1; break;
6733 case EXTENSION_LOG_DEBUG: settings.verbose = 2; break;
6734 default:
6735 settings.verbose = 3;
6736 }
6737 }
6738
get_config_append_stats(const char * key,const uint16_t klen,const char * val,const uint32_t vlen,const void * cookie)6739 static void get_config_append_stats(const char *key, const uint16_t klen,
6740 const char *val, const uint32_t vlen,
6741 const void *cookie)
6742 {
6743 if (klen == 0 || vlen == 0) {
6744 return ;
6745 }
6746
6747 char *pos = (char*)cookie;
6748 size_t nbytes = strlen(pos);
6749
6750 if ((nbytes + klen + vlen + 3) > 1024) {
6751 // Not enough size in the buffer..
6752 return;
6753 }
6754
6755 memcpy(pos + nbytes, key, klen);
6756 nbytes += klen;
6757 pos[nbytes] = '=';
6758 ++nbytes;
6759 memcpy(pos + nbytes, val, vlen);
6760 nbytes += vlen;
6761 memcpy(pos + nbytes, ";", 2);
6762 }
6763
get_config(struct config_item items[])6764 static bool get_config(struct config_item items[]) {
6765 char config[1024];
6766 config[0] = '\0';
6767 process_stat_settings(get_config_append_stats, config);
6768 int rval = parse_config(config, items, NULL);
6769 return rval >= 0;
6770 }
6771
6772 /**
6773 * Callback the engines may call to get the public server interface
6774 * @return pointer to a structure containing the interface. The client should
6775 * know the layout and perform the proper casts.
6776 */
get_server_api(void)6777 static SERVER_HANDLE_V1 *get_server_api(void)
6778 {
6779 static SERVER_CORE_API core_api = {
6780 .server_version = get_server_version,
6781 .hash = hash,
6782 .realtime = realtime,
6783 .abstime = abstime,
6784 .get_current_time = get_current_time,
6785 .parse_config = parse_config,
6786 .shutdown = shutdown_server,
6787 .get_config = get_config
6788 };
6789
6790 static SERVER_COOKIE_API server_cookie_api = {
6791 .get_auth_data = get_auth_data,
6792 .store_engine_specific = store_engine_specific,
6793 .get_engine_specific = get_engine_specific,
6794 .get_socket_fd = get_socket_fd,
6795 .notify_io_complete = notify_io_complete,
6796 .reserve = reserve_cookie,
6797 .release = release_cookie
6798 };
6799
6800 static SERVER_STAT_API server_stat_api = {
6801 .new_stats = new_independent_stats,
6802 .release_stats = release_independent_stats,
6803 .evicting = count_eviction
6804 };
6805
6806 static SERVER_LOG_API server_log_api = {
6807 .get_logger = get_logger,
6808 .get_level = get_log_level,
6809 .set_level = set_log_level
6810 };
6811
6812 static SERVER_EXTENSION_API extension_api = {
6813 .register_extension = register_extension,
6814 .unregister_extension = unregister_extension,
6815 .get_extension = get_extension
6816 };
6817
6818 static SERVER_CALLBACK_API callback_api = {
6819 .register_callback = register_callback,
6820 .perform_callbacks = perform_callbacks,
6821 };
6822
6823 static SERVER_HANDLE_V1 rv = {
6824 .interface = 1,
6825 .core = &core_api,
6826 .stat = &server_stat_api,
6827 .extension = &extension_api,
6828 .callback = &callback_api,
6829 .log = &server_log_api,
6830 .cookie = &server_cookie_api
6831 };
6832
6833 if (rv.engine == NULL) {
6834 rv.engine = settings.engine.v0;
6835 }
6836
6837 return &rv;
6838 }
6839
6840 /**
6841 * Load a shared object and initialize all the extensions in there.
6842 *
6843 * @param soname the name of the shared object (may not be NULL)
6844 * @param config optional configuration parameters
6845 * @return true if success, false otherwise
6846 */
load_extension(const char * soname,const char * config)6847 static bool load_extension(const char *soname, const char *config) {
6848 if (soname == NULL) {
6849 return false;
6850 }
6851
6852 /* Hack to remove the warning from C99 */
6853 union my_hack {
6854 MEMCACHED_EXTENSIONS_INITIALIZE initialize;
6855 void* voidptr;
6856 } funky = {.initialize = NULL };
6857
6858 void *handle = dlopen(soname, RTLD_NOW | RTLD_LOCAL);
6859 if (handle == NULL) {
6860 const char *msg = dlerror();
6861 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6862 "Failed to open library \"%s\": %s\n",
6863 soname, msg ? msg : "unknown error");
6864 return false;
6865 }
6866
6867 void *symbol = dlsym(handle, "memcached_extensions_initialize");
6868 if (symbol == NULL) {
6869 const char *msg = dlerror();
6870 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6871 "Could not find symbol \"memcached_extensions_initialize\" in %s: %s\n",
6872 soname, msg ? msg : "unknown error");
6873 return false;
6874 }
6875 funky.voidptr = symbol;
6876
6877 EXTENSION_ERROR_CODE error = (*funky.initialize)(config, get_server_api);
6878
6879 if (error != EXTENSION_SUCCESS) {
6880 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6881 "Failed to initalize extensions from %s. Error code: %d\n",
6882 soname, error);
6883 dlclose(handle);
6884 return false;
6885 }
6886
6887 if (settings.verbose > 0) {
6888 settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
6889 "Loaded extensions from: %s\n", soname);
6890 }
6891
6892 return true;
6893 }
6894
6895 /**
6896 * Do basic sanity check of the runtime environment
6897 * @return true if no errors found, false if we can't use this env
6898 */
sanitycheck(void)6899 static bool sanitycheck(void) {
6900 /* One of our biggest problems is old and bogus libevents */
6901 const char *ever = event_get_version();
6902 if (ever != NULL) {
6903 if (strncmp(ever, "1.", 2) == 0) {
6904 /* Require at least 1.3 (that's still a couple of years old) */
6905 if ((ever[2] == '1' || ever[2] == '2') && !isdigit(ever[3])) {
6906 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
6907 "You are using libevent %s.\nPlease upgrade to"
6908 " a more recent version (1.3 or newer)\n",
6909 event_get_version());
6910 return false;
6911 }
6912 }
6913 }
6914
6915 return true;
6916 }
6917
main(int argc,char ** argv)6918 int main (int argc, char **argv) {
6919 int c;
6920 bool lock_memory = false;
6921 bool do_daemonize = false;
6922 int maxcore = 0;
6923 char *username = NULL;
6924 char *pid_file = NULL;
6925 struct passwd *pw;
6926 struct rlimit rlim;
6927 char unit = '\0';
6928 int size_max = 0;
6929
6930 bool protocol_specified = false;
6931 bool tcp_specified = false;
6932 bool udp_specified = false;
6933
6934 const char *engine = "default_engine.so";
6935 const char *engine_config = NULL;
6936 char old_options[1024] = { [0] = '\0' };
6937 char *old_opts = old_options;
6938
6939 /* make the time we started always be 2 seconds before we really
6940 did, so time(0) - time.started is never zero. if so, things
6941 like 'settings.oldest_live' which act as booleans as well as
6942 values are now false in boolean context... */
6943 process_started = time(0) - 2;
6944 set_current_time();
6945
6946 /* Initialize the socket subsystem */
6947 initialize_sockets();
6948
6949 /* init settings */
6950 settings_init();
6951
6952 initialize_binary_lookup_map();
6953
6954 if (memcached_initialize_stderr_logger(get_server_api) != EXTENSION_SUCCESS) {
6955 fprintf(stderr, "Failed to initialize log system\n");
6956 return EX_OSERR;
6957 }
6958
6959 if (!sanitycheck()) {
6960 return EX_OSERR;
6961 }
6962
6963 /* process arguments */
6964 while (-1 != (c = getopt(argc, argv,
6965 "a:" /* access mask for unix socket */
6966 "p:" /* TCP port number to listen on */
6967 "s:" /* unix socket path to listen on */
6968 "U:" /* UDP port number to listen on */
6969 "m:" /* max memory to use for items in megabytes */
6970 "M" /* return error on memory exhausted */
6971 "c:" /* max simultaneous connections */
6972 "k" /* lock down all paged memory */
6973 "hi" /* help, licence info */
6974 "r" /* maximize core file limit */
6975 "v" /* verbose */
6976 "d" /* daemon mode */
6977 "l:" /* interface to listen on */
6978 "u:" /* user identity to run as */
6979 "P:" /* save PID in file */
6980 "f:" /* factor? */
6981 "n:" /* minimum space allocated for key+value+flags */
6982 "t:" /* threads */
6983 "D:" /* prefix delimiter? */
6984 "L" /* Large memory pages */
6985 "R:" /* max requests per event */
6986 "C" /* Disable use of CAS */
6987 "b:" /* backlog queue limit */
6988 "B:" /* Binding protocol */
6989 "I:" /* Max item size */
6990 "S" /* Sasl ON */
6991 "E:" /* Engine to load */
6992 "e:" /* Engine options */
6993 "q" /* Disallow detailed stats */
6994 "X:" /* Load extension */
6995 ))) {
6996 switch (c) {
6997 case 'a':
6998 /* access for unix domain socket, as octal mask (like chmod)*/
6999 settings.access= strtol(optarg,NULL,8);
7000 break;
7001
7002 case 'U':
7003 settings.udpport = atoi(optarg);
7004 udp_specified = true;
7005 break;
7006 case 'p':
7007 settings.port = atoi(optarg);
7008 tcp_specified = true;
7009 break;
7010 case 's':
7011 settings.socketpath = optarg;
7012 break;
7013 case 'm':
7014 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
7015 old_opts += sprintf(old_opts, "cache_size=%lu;",
7016 (unsigned long)settings.maxbytes);
7017 break;
7018 case 'M':
7019 settings.evict_to_free = 0;
7020 old_opts += sprintf(old_opts, "eviction=false;");
7021 break;
7022 case 'c':
7023 settings.maxconns = atoi(optarg);
7024 break;
7025 case 'h':
7026 usage();
7027 exit(EXIT_SUCCESS);
7028 case 'i':
7029 usage_license();
7030 exit(EXIT_SUCCESS);
7031 case 'k':
7032 lock_memory = true;
7033 break;
7034 case 'v':
7035 settings.verbose++;
7036 perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
7037 break;
7038 case 'l':
7039 if (settings.inter != NULL) {
7040 size_t len = strlen(settings.inter) + strlen(optarg) + 2;
7041 char *p = malloc(len);
7042 if (p == NULL) {
7043 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7044 "Failed to allocate memory\n");
7045 return 1;
7046 }
7047 snprintf(p, len, "%s,%s", settings.inter, optarg);
7048 free(settings.inter);
7049 settings.inter = p;
7050 } else {
7051 settings.inter= strdup(optarg);
7052 }
7053 break;
7054 case 'd':
7055 do_daemonize = true;
7056 break;
7057 case 'r':
7058 maxcore = 1;
7059 break;
7060 case 'R':
7061 settings.reqs_per_event = atoi(optarg);
7062 if (settings.reqs_per_event <= 0) {
7063 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7064 "Number of requests per event must be greater than 0\n");
7065 return 1;
7066 }
7067 break;
7068 case 'u':
7069 username = optarg;
7070 break;
7071 case 'P':
7072 pid_file = optarg;
7073 break;
7074 case 'f':
7075 settings.factor = atof(optarg);
7076 if (settings.factor <= 1.0) {
7077 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7078 "Factor must be greater than 1\n");
7079 return 1;
7080 }
7081 old_opts += sprintf(old_opts, "factor=%f;",
7082 settings.factor);
7083 break;
7084 case 'n':
7085 settings.chunk_size = atoi(optarg);
7086 if (settings.chunk_size == 0) {
7087 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7088 "Chunk size must be greater than 0\n");
7089 return 1;
7090 }
7091 old_opts += sprintf(old_opts, "chunk_size=%u;",
7092 settings.chunk_size);
7093 break;
7094 case 't':
7095 settings.num_threads = atoi(optarg);
7096 if (settings.num_threads <= 0) {
7097 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7098 "Number of threads must be greater than 0\n");
7099 return 1;
7100 }
7101 /* There're other problems when you get above 64 threads.
7102 * In the future we should portably detect # of cores for the
7103 * default.
7104 */
7105 if (settings.num_threads > 64) {
7106 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7107 "WARNING: Setting a high number of worker"
7108 "threads is not recommended.\n"
7109 " Set this value to the number of cores in"
7110 " your machine or less.\n");
7111 }
7112 break;
7113 case 'D':
7114 settings.prefix_delimiter = optarg[0];
7115 settings.detail_enabled = 1;
7116 break;
7117 case 'L' :
7118 if (enable_large_pages() == 0) {
7119 old_opts += sprintf(old_opts, "preallocate=true;");
7120 }
7121 break;
7122 case 'C' :
7123 settings.use_cas = false;
7124 break;
7125 case 'b' :
7126 settings.backlog = atoi(optarg);
7127 break;
7128 case 'B':
7129 protocol_specified = true;
7130 if (strcmp(optarg, "auto") == 0) {
7131 settings.binding_protocol = negotiating_prot;
7132 } else if (strcmp(optarg, "binary") == 0) {
7133 settings.binding_protocol = binary_prot;
7134 } else if (strcmp(optarg, "ascii") == 0) {
7135 settings.binding_protocol = ascii_prot;
7136 } else {
7137 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7138 "Invalid value for binding protocol: %s\n"
7139 " -- should be one of auto, binary, or ascii\n", optarg);
7140 exit(EX_USAGE);
7141 }
7142 break;
7143 case 'I':
7144 unit = optarg[strlen(optarg)-1];
7145 if (unit == 'k' || unit == 'm' ||
7146 unit == 'K' || unit == 'M') {
7147 optarg[strlen(optarg)-1] = '\0';
7148 size_max = atoi(optarg);
7149 if (unit == 'k' || unit == 'K')
7150 size_max *= 1024;
7151 if (unit == 'm' || unit == 'M')
7152 size_max *= 1024 * 1024;
7153 settings.item_size_max = size_max;
7154 } else {
7155 settings.item_size_max = atoi(optarg);
7156 }
7157 if (settings.item_size_max < 1024) {
7158 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7159 "Item max size cannot be less than 1024 bytes.\n");
7160 return 1;
7161 }
7162 if (settings.item_size_max > 1024 * 1024 * 128) {
7163 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7164 "Cannot set item size limit higher than 128 mb.\n");
7165 return 1;
7166 }
7167 if (settings.item_size_max > 1024 * 1024) {
7168 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7169 "WARNING: Setting item max size above 1MB is not"
7170 " recommended!\n"
7171 " Raising this limit increases the minimum memory requirements\n"
7172 " and will decrease your memory efficiency.\n"
7173 );
7174 }
7175 #ifndef __WIN32__
7176 old_opts += sprintf(old_opts, "item_size_max=%zu;",
7177 settings.item_size_max);
7178 #else
7179 old_opts += sprintf(old_opts, "item_size_max=%lu;", (long unsigned)
7180 settings.item_size_max);
7181 #endif
7182 break;
7183 case 'E':
7184 engine = optarg;
7185 break;
7186 case 'e':
7187 engine_config = optarg;
7188 break;
7189 case 'q':
7190 settings.allow_detailed = false;
7191 break;
7192 case 'S': /* set Sasl authentication to true. Default is false */
7193 #ifndef SASL_ENABLED
7194 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7195 "This server is not built with SASL support.\n");
7196 exit(EX_USAGE);
7197 #else
7198 settings.require_sasl = true;
7199 #endif
7200 break;
7201 case 'X' :
7202 {
7203 char *ptr = strchr(optarg, ',');
7204 if (ptr != NULL) {
7205 *ptr = '\0';
7206 ++ptr;
7207 }
7208 if (!load_extension(optarg, ptr)) {
7209 exit(EXIT_FAILURE);
7210 }
7211 if (ptr != NULL) {
7212 *(ptr - 1) = ',';
7213 }
7214 }
7215 break;
7216 default:
7217 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7218 "Illegal argument \"%c\"\n", c);
7219 return 1;
7220 }
7221 }
7222
7223 /*
7224 * Use one workerthread to serve each UDP port if the user specified
7225 * multiple ports
7226 */
7227 if (settings.inter != NULL && strchr(settings.inter, ',')) {
7228 settings.num_threads_per_udp = 1;
7229 } else {
7230 settings.num_threads_per_udp = settings.num_threads;
7231 }
7232
7233 if (getenv("MEMCACHED_REQS_TAP_EVENT") != NULL) {
7234 settings.reqs_per_tap_event = atoi(getenv("MEMCACHED_REQS_TAP_EVENT"));
7235 }
7236
7237 if (settings.reqs_per_tap_event <= 0) {
7238 settings.reqs_per_tap_event = DEFAULT_REQS_PER_TAP_EVENT;
7239 }
7240
7241
7242 if (install_sigterm_handler() != 0) {
7243 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7244 "Failed to install SIGTERM handler\n");
7245 exit(EXIT_FAILURE);
7246 }
7247
7248 char *topkeys_env = getenv("MEMCACHED_TOP_KEYS");
7249 if (topkeys_env != NULL) {
7250 settings.topkeys = atoi(topkeys_env);
7251 if (settings.topkeys < 0) {
7252 settings.topkeys = 0;
7253 }
7254 }
7255
7256 if (settings.require_sasl) {
7257 if (!protocol_specified) {
7258 settings.binding_protocol = binary_prot;
7259 } else {
7260 if (settings.binding_protocol == negotiating_prot) {
7261 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7262 "ERROR: You cannot use auto-negotiating protocol while requiring SASL.\n");
7263 exit(EX_USAGE);
7264 }
7265 if (settings.binding_protocol == ascii_prot) {
7266 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7267 "ERROR: You cannot use only ASCII protocol while requiring SASL.\n");
7268 exit(EX_USAGE);
7269 }
7270 }
7271 }
7272
7273 if (tcp_specified && !udp_specified) {
7274 settings.udpport = settings.port;
7275 } else if (udp_specified && !tcp_specified) {
7276 settings.port = settings.udpport;
7277 }
7278
7279 if (engine_config != NULL && strlen(old_options) > 0) {
7280 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7281 "ERROR: You can't mix -e with the old options\n");
7282 return EX_USAGE;
7283 } else if (engine_config == NULL && strlen(old_options) > 0) {
7284 engine_config = old_options;
7285 }
7286
7287 if (maxcore != 0) {
7288 struct rlimit rlim_new;
7289 /*
7290 * First try raising to infinity; if that fails, try bringing
7291 * the soft limit to the hard.
7292 */
7293 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
7294 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
7295 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
7296 /* failed. try raising just to the old max */
7297 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
7298 (void)setrlimit(RLIMIT_CORE, &rlim_new);
7299 }
7300 }
7301 /*
7302 * getrlimit again to see what we ended up with. Only fail if
7303 * the soft limit ends up 0, because then no core files will be
7304 * created at all.
7305 */
7306
7307 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
7308 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7309 "failed to ensure corefile creation\n");
7310 exit(EX_OSERR);
7311 }
7312 }
7313
7314 /*
7315 * If needed, increase rlimits to allow as many connections
7316 * as needed.
7317 */
7318
7319 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7320 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7321 "failed to getrlimit number of files\n");
7322 exit(EX_OSERR);
7323 } else {
7324 unsigned int maxfiles = settings.maxconns + (3 * (settings.num_threads + 2));
7325 int syslimit = rlim.rlim_cur;
7326 if (rlim.rlim_cur < maxfiles) {
7327 rlim.rlim_cur = maxfiles;
7328 }
7329 if (rlim.rlim_max < rlim.rlim_cur) {
7330 rlim.rlim_max = rlim.rlim_cur;
7331 }
7332 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7333 const char *fmt;
7334 fmt = "WARNING: maxconns cannot be set to (%d) connections due to "
7335 "system\nresouce restrictions. Increase the number of file "
7336 "descriptors allowed\nto the memcached user process or start "
7337 "memcached as root (remember\nto use the -u parameter).\n"
7338 "The maximum number of connections is set to %d.\n";
7339 int req = settings.maxconns;
7340 settings.maxconns = syslimit - (3 * (settings.num_threads + 2));
7341 if (settings.maxconns < 0) {
7342 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7343 "failed to set rlimit for open files. Try starting as"
7344 " root or requesting smaller maxconns value.\n");
7345 exit(EX_OSERR);
7346 }
7347 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7348 fmt, req, settings.maxconns);
7349 }
7350 }
7351
7352 /* Sanity check for the connection structures */
7353 int nfiles = 0;
7354 if (settings.port != 0) {
7355 nfiles += 2;
7356 }
7357 if (settings.udpport != 0) {
7358 nfiles += settings.num_threads * 2;
7359 }
7360
7361 if (settings.maxconns <= nfiles) {
7362 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7363 "Configuratioin error. \n"
7364 "You specified %d connections, but the system will use at "
7365 "least %d\nconnection structures to start.\n",
7366 settings.maxconns, nfiles);
7367 exit(EX_USAGE);
7368 }
7369
7370 /* lose root privileges if we have them */
7371 if (getuid() == 0 || geteuid() == 0) {
7372 if (username == 0 || *username == '\0') {
7373 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7374 "can't run as root without the -u switch\n");
7375 exit(EX_USAGE);
7376 }
7377 if ((pw = getpwnam(username)) == 0) {
7378 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7379 "can't find the user %s to switch to\n", username);
7380 exit(EX_NOUSER);
7381 }
7382 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
7383 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7384 "failed to assume identity of user %s: %s\n", username,
7385 strerror(errno));
7386 exit(EX_OSERR);
7387 }
7388 }
7389
7390 #ifdef SASL_ENABLED
7391 init_sasl();
7392 #endif /* SASL */
7393
7394 /* daemonize if requested */
7395 /* if we want to ensure our ability to dump core, don't chdir to / */
7396 if (do_daemonize) {
7397 if (sigignore(SIGHUP) == -1) {
7398 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7399 "Failed to ignore SIGHUP: ", strerror(errno));
7400 }
7401 if (daemonize(maxcore, settings.verbose) == -1) {
7402 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7403 "failed to daemon() in order to daemonize\n");
7404 exit(EXIT_FAILURE);
7405 }
7406 }
7407
7408 /* lock paged memory if needed */
7409 if (lock_memory) {
7410 #ifdef HAVE_MLOCKALL
7411 int res = mlockall(MCL_CURRENT | MCL_FUTURE);
7412 if (res != 0) {
7413 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7414 "warning: -k invalid, mlockall() failed: %s\n",
7415 strerror(errno));
7416 }
7417 #else
7418 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7419 "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
7420 #endif
7421 }
7422
7423 /* initialize main thread libevent instance */
7424 main_base = event_init();
7425
7426 /* Load the storage engine */
7427 ENGINE_HANDLE *engine_handle = NULL;
7428 if (!load_engine(engine,get_server_api,settings.extensions.logger,&engine_handle)) {
7429 /* Error already reported */
7430 exit(EXIT_FAILURE);
7431 }
7432
7433 if (!init_engine(engine_handle,engine_config,settings.extensions.logger)) {
7434 return false;
7435 }
7436
7437 if (settings.verbose > 0) {
7438 log_engine_details(engine_handle,settings.extensions.logger);
7439 }
7440 settings.engine.v1 = (ENGINE_HANDLE_V1 *) engine_handle;
7441
7442 if (settings.engine.v1->arithmetic == NULL) {
7443 settings.engine.v1->arithmetic = internal_arithmetic;
7444 }
7445
7446 /* initialize other stuff */
7447 stats_init();
7448
7449 if (!(conn_cache = cache_create("conn", sizeof(conn), sizeof(void*),
7450 conn_constructor, conn_destructor))) {
7451 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7452 "Failed to create connection cache\n");
7453 exit(EXIT_FAILURE);
7454 }
7455
7456 default_independent_stats = new_independent_stats();
7457
7458 #ifndef __WIN32__
7459 /*
7460 * ignore SIGPIPE signals; we can use errno == EPIPE if we
7461 * need that information
7462 */
7463 if (sigignore(SIGPIPE) == -1) {
7464 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7465 "failed to ignore SIGPIPE; sigaction");
7466 exit(EX_OSERR);
7467 }
7468 #endif
7469
7470 /* start up worker threads if MT mode */
7471 thread_init(settings.num_threads, main_base, dispatch_event_handler);
7472
7473 /* initialise clock event */
7474 clock_handler(0, 0, 0);
7475
7476 /* create unix mode sockets after dropping privileges */
7477 if (settings.socketpath != NULL) {
7478 if (server_socket_unix(settings.socketpath,settings.access)) {
7479 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7480 "failed to listen on UNIX socket \"%s\": %s",
7481 settings.socketpath, strerror(errno));
7482 exit(EX_OSERR);
7483 }
7484 }
7485
7486 /* create the listening socket, bind it, and init */
7487 if (settings.socketpath == NULL) {
7488 int udp_port;
7489
7490 const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
7491 char temp_portnumber_filename[PATH_MAX];
7492 FILE *portnumber_file = NULL;
7493
7494 if (portnumber_filename != NULL) {
7495 snprintf(temp_portnumber_filename,
7496 sizeof(temp_portnumber_filename),
7497 "%s.lck", portnumber_filename);
7498
7499 portnumber_file = fopen(temp_portnumber_filename, "a");
7500 if (portnumber_file == NULL) {
7501 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7502 "Failed to open \"%s\": %s\n",
7503 temp_portnumber_filename, strerror(errno));
7504 }
7505 }
7506
7507 if (settings.port && server_sockets(settings.port, tcp_transport,
7508 portnumber_file)) {
7509 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7510 "failed to listen on TCP port %d: %s",
7511 settings.port, strerror(errno));
7512 exit(EX_OSERR);
7513 }
7514
7515 /*
7516 * initialization order: first create the listening sockets
7517 * (may need root on low ports), then drop root if needed,
7518 * then daemonise if needed, then init libevent (in some cases
7519 * descriptors created by libevent wouldn't survive forking).
7520 */
7521 udp_port = settings.udpport ? settings.udpport : settings.port;
7522
7523 /* create the UDP listening socket and bind it */
7524 if (settings.udpport && server_sockets(udp_port, udp_transport,
7525 portnumber_file)) {
7526 settings.extensions.logger->log(EXTENSION_LOG_WARNING, NULL,
7527 "failed to listen on UDP port %d: %s",
7528 settings.port, strerror(errno));
7529 exit(EX_OSERR);
7530 }
7531
7532 if (portnumber_file) {
7533 fclose(portnumber_file);
7534 rename(temp_portnumber_filename, portnumber_filename);
7535 }
7536 }
7537
7538 if (pid_file != NULL) {
7539 save_pid(pid_file);
7540 }
7541
7542 /* Drop privileges no longer needed */
7543 drop_privileges();
7544
7545 if (!memcached_shutdown) {
7546 /* enter the event loop */
7547 event_base_loop(main_base, 0);
7548 }
7549
7550 if (settings.verbose) {
7551 settings.extensions.logger->log(EXTENSION_LOG_INFO, NULL,
7552 "Initiating shutdown\n");
7553 }
7554 threads_shutdown();
7555
7556 settings.engine.v1->destroy(settings.engine.v0, false);
7557
7558 /* remove the PID file if we're a daemon */
7559 if (do_daemonize)
7560 remove_pidfile(pid_file);
7561 /* Clean up strdup() call for bind() address */
7562 if (settings.inter)
7563 free(settings.inter);
7564
7565 return EXIT_SUCCESS;
7566 }
7567