1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #ifndef MEMCACHED_H
3 #define MEMCACHED_H
4 
5 /** \file
6  * The main memcached header holding commonly used data
7  * structures and function prototypes.
8  */
9 #ifdef HAVE_LIBEVENT2
10 #include <event2/event.h>
11 #include <event2/event_struct.h>
12 #include <event2/event_compat.h>
13 #elif HAVE_LIBEVENT1
14 #include <event.h>
15 #else
16 #error "No libevent library found"
17 #endif /* HAVE_LIBEVENT2 */
18 
19 #include <pthread.h>
20 #include <config_static.h>
21 
22 #include <memcached/protocol_binary.h>
23 #include <memcached/engine.h>
24 #include <memcached/extension.h>
25 
26 #include "cache.h"
27 #include "topkeys.h"
28 
29 #include "sasl_defs.h"
30 
31 /** Maximum length of a key. */
32 #define KEY_MAX_LENGTH 250
33 
34 /** Size of an incr buf. */
35 #define INCR_MAX_STORAGE_LEN 24
36 
37 #define DATA_BUFFER_SIZE 2048
38 #define UDP_READ_BUFFER_SIZE 65536
39 #define UDP_MAX_PAYLOAD_SIZE 1400
40 #define UDP_HEADER_SIZE 8
41 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
42 /* I'm told the max length of a 64-bit num converted to string is 20 bytes.
43  * Plus a few for spaces, \r\n, \0 */
44 #define SUFFIX_SIZE 24
45 
46 /** Initial size of list of items being returned by "get". */
47 #define ITEM_LIST_INITIAL 200
48 
49 /** Initial size of list of CAS suffixes appended to "gets" lines. */
50 #define SUFFIX_LIST_INITIAL 20
51 
52 /** Initial size of the sendmsg() scatter/gather array. */
53 #define IOV_LIST_INITIAL 400
54 
55 /** Initial number of sendmsg() argument structures to allocate. */
56 #define MSG_LIST_INITIAL 10
57 
58 /** High water marks for buffer shrinking */
59 #define READ_BUFFER_HIGHWAT 8192
60 #define ITEM_LIST_HIGHWAT 400
61 #define IOV_LIST_HIGHWAT 600
62 #define MSG_LIST_HIGHWAT 100
63 
64 /* Binary protocol stuff */
65 #define MIN_BIN_PKT_LENGTH 16
66 #define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t))
67 
68 /* Slab sizing definitions. */
69 #define POWER_SMALLEST 1
70 #define POWER_LARGEST  200
71 #define CHUNK_ALIGN_BYTES 8
72 #define DONT_PREALLOC_SLABS
73 #define MAX_NUMBER_OF_SLAB_CLASSES (POWER_LARGEST + 1)
74 
75 
76 #define STAT_KEY_LEN 128
77 #define STAT_VAL_LEN 128
78 
79 #define DEFAULT_REQS_PER_EVENT     20
80 #define DEFAULT_REQS_PER_TAP_EVENT 50
81 
82 /** Append a simple stat with a stat name, value format and value */
83 #define APPEND_STAT(name, fmt, val) \
84     append_stat(name, add_stats, c, fmt, val);
85 
86 /** Append an indexed stat with a stat name (with format), value format
87     and value */
88 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val)          \
89     klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name);    \
90     vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val);               \
91     add_stats(key_str, klen, val_str, vlen, c);
92 
93 /** Common APPEND_NUM_FMT_STAT format. */
94 #define APPEND_NUM_STAT(num, name, fmt, val) \
95     APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val)
96 
97 enum bin_substates {
98     bin_no_state,
99     bin_reading_set_header,
100     bin_reading_cas_header,
101     bin_read_set_value,
102     bin_reading_get_key,
103     bin_reading_stat,
104     bin_reading_del_header,
105     bin_reading_incr_header,
106     bin_read_flush_exptime,
107     bin_reading_sasl_auth,
108     bin_reading_sasl_auth_data,
109     bin_reading_packet
110 };
111 
112 enum protocol {
113     ascii_prot = 3, /* arbitrary value. */
114     binary_prot,
115     negotiating_prot /* Discovering the protocol */
116 };
117 
118 enum network_transport {
119     local_transport, /* Unix sockets*/
120     tcp_transport,
121     udp_transport
122 };
123 
124 #define IS_UDP(x) (x == udp_transport)
125 
126 /** Stats stored per slab (and per thread). */
127 struct slab_stats {
128     uint64_t  cmd_set;
129     uint64_t  get_hits;
130     uint64_t  delete_hits;
131     uint64_t  cas_hits;
132     uint64_t  cas_badval;
133 };
134 
135 /**
136  * Stats stored per-thread.
137  */
138 struct thread_stats {
139     pthread_mutex_t   mutex;
140     uint64_t          cmd_get;
141     uint64_t          get_misses;
142     uint64_t          delete_misses;
143     uint64_t          incr_misses;
144     uint64_t          decr_misses;
145     uint64_t          incr_hits;
146     uint64_t          decr_hits;
147     uint64_t          cas_misses;
148     uint64_t          bytes_read;
149     uint64_t          bytes_written;
150     uint64_t          cmd_flush;
151     uint64_t          conn_yields; /* # of yields for connections (-R option)*/
152     uint64_t          auth_cmds;
153     uint64_t          auth_errors;
154     struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
155 };
156 
157 
158 /**
159  * The stats structure the engine keeps track of
160  */
161 struct independent_stats {
162     topkeys_t *topkeys;
163     struct thread_stats thread_stats[];
164 };
165 
166 /**
167  * Global stats.
168  */
169 struct stats {
170     pthread_mutex_t mutex;
171     unsigned int  daemon_conns; /* conns used by the server */
172     unsigned int  curr_conns;
173     unsigned int  total_conns;
174     unsigned int  conn_structs;
175     time_t        started;          /* when the process was started */
176     uint64_t      rejected_conns; /* number of times I reject a client */
177 };
178 
179 #define MAX_VERBOSITY_LEVEL 2
180 
181 /* When adding a setting, be sure to update process_stat_settings */
182 /**
183  * Globally accessible settings as derived from the commandline.
184  */
185 struct settings {
186     size_t maxbytes;
187     int maxconns;
188     int port;
189     int udpport;
190     char *inter;
191     int verbose;
192     rel_time_t oldest_live; /* ignore existing items older than this */
193     int evict_to_free;
194     char *socketpath;   /* path to unix socket if using local socket */
195     int access;  /* access mask (a la chmod) for unix domain socket */
196     double factor;          /* chunk size growth factor */
197     int chunk_size;
198     int num_threads;        /* number of worker (without dispatcher) libevent threads to run */
199     int num_threads_per_udp; /* number of worker threads serving each udp socket */
200     char prefix_delimiter;  /* character that marks a key prefix (for stats) */
201     int detail_enabled;     /* nonzero if we're collecting detailed stats */
202     bool allow_detailed;    /* detailed stats commands are allowed */
203     int reqs_per_event;     /* Maximum number of io to process on each
204                                io-event. */
205     int reqs_per_tap_event; /* Maximum number of tap io to process on each
206                                io-event. */
207     bool use_cas;
208     enum protocol binding_protocol;
209     int backlog;
210     size_t item_size_max;   /* Maximum item size, and upper end for slabs */
211     bool sasl;              /* SASL on/off */
212     bool require_sasl;      /* require SASL auth */
213     int topkeys;            /* Number of top keys to track */
214     union {
215         ENGINE_HANDLE *v0;
216         ENGINE_HANDLE_V1 *v1;
217     } engine;
218     struct {
219         EXTENSION_DAEMON_DESCRIPTOR *daemons;
220         EXTENSION_LOGGER_DESCRIPTOR *logger;
221         EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ascii;
222     } extensions;
223 };
224 
225 struct engine_event_handler {
226     EVENT_CALLBACK cb;
227     const void *cb_data;
228     struct engine_event_handler *next;
229 };
230 
231 extern struct stats stats;
232 extern struct settings settings;
233 
234 enum thread_type {
235     GENERAL = 11,
236     TAP = 13,
237     DISPATCHER = 15
238 };
239 
240 typedef struct {
241     pthread_t thread_id;        /* unique ID of this thread */
242     struct event_base *base;    /* libevent handle this thread uses */
243     struct event notify_event;  /* listen event for notify pipe */
244     SOCKET notify[2];           /* notification pipes */
245     struct conn_queue *new_conn_queue; /* queue of new connections to handle */
246     cache_t *suffix_cache;      /* suffix cache */
247     pthread_mutex_t mutex;      /* Mutex to lock protect access to the pending_io */
248     bool is_locked;
249     struct conn *pending_io;    /* List of connection with pending async io ops */
250     int index;                  /* index of this thread in the threads array */
251     enum thread_type type;      /* Type of IO this thread processes */
252 
253     rel_time_t last_checked;
254     struct conn *pending_close; /* list of connections close at a later time */
255 } LIBEVENT_THREAD;
256 
257 #define LOCK_THREAD(t)                          \
258     if (pthread_mutex_lock(&t->mutex) != 0) {   \
259         abort();                                \
260     }                                           \
261     assert(t->is_locked == false);              \
262     t->is_locked = true;
263 
264 #define UNLOCK_THREAD(t)                         \
265     assert(t->is_locked == true);                \
266     t->is_locked = false;                        \
267     if (pthread_mutex_unlock(&t->mutex) != 0) {  \
268         abort();                                 \
269     }
270 
271 extern void notify_thread(LIBEVENT_THREAD *thread);
272 extern void notify_dispatcher(void);
273 extern bool create_notification_pipe(LIBEVENT_THREAD *me);
274 
275 extern LIBEVENT_THREAD* tap_thread;
276 
277 typedef struct conn conn;
278 typedef bool (*STATE_FUNC)(conn *);
279 
280 /**
281  * The structure representing a connection into memcached.
282  */
283 struct conn {
284     SOCKET sfd;
285     int nevents;
286     sasl_conn_t *sasl_conn;
287     STATE_FUNC   state;
288     enum bin_substates substate;
289 #ifdef DEBUG
290     bool   registered_in_libevent;
291 #endif
292     struct event event;
293     short  ev_flags;
294     short  which;   /** which events were just triggered */
295 
296     char   *rbuf;   /** buffer to read commands into */
297     char   *rcurr;  /** but if we parsed some already, this is where we stopped */
298     uint32_t rsize;   /** total allocated size of rbuf */
299     uint32_t rbytes;  /** how much data, starting from rcur, do we have unparsed */
300 
301     char   *wbuf;
302     char   *wcurr;
303     uint32_t wsize;
304     uint32_t wbytes;
305     /** which state to go into after finishing current write */
306     STATE_FUNC   write_and_go;
307     void   *write_and_free; /** free this memory after finishing writing */
308 
309     char   *ritem;  /** when we read in an item's value, it goes here */
310     uint32_t rlbytes;
311 
312     /* data for the nread state */
313 
314     /**
315      * item is used to hold an item structure created after reading the command
316      * line of set/add/replace commands, but before we finished reading the actual
317      * data. The data is read into ITEM_data(item) to avoid extra copying.
318      */
319 
320     void   *item;     /* for commands set/add/replace  */
321     ENGINE_STORE_OPERATION    store_op; /* which one is it: set/add/replace */
322 
323 
324     /* data for the swallow state */
325     int    sbytes;    /* how many bytes to swallow */
326 
327     /* data for the mwrite state */
328     struct iovec *iov;
329     int    iovsize;   /* number of elements allocated in iov[] */
330     int    iovused;   /* number of elements used in iov[] */
331 
332     struct msghdr *msglist;
333     int    msgsize;   /* number of elements allocated in msglist[] */
334     int    msgused;   /* number of elements used in msglist[] */
335     int    msgcurr;   /* element in msglist[] being transmitted now */
336     int    msgbytes;  /* number of bytes in current msg */
337 
338     item   **ilist;   /* list of items to write out */
339     int    isize;
340     item   **icurr;
341     int    ileft;
342 
343     char   **suffixlist;
344     int    suffixsize;
345     char   **suffixcurr;
346     int    suffixleft;
347 
348     enum protocol protocol;   /* which protocol this connection speaks */
349     enum network_transport transport; /* what transport is used by this connection */
350 
351     /* data for UDP clients */
352     int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
353     struct sockaddr_storage request_addr; /* Who sent the most recent request */
354     socklen_t request_addr_size;
355     unsigned char *hdrbuf; /* udp packet headers */
356     int    hdrsize;   /* number of headers' worth of space is allocated */
357 
358     bool   noreply;   /* True if the reply should not be sent. */
359     /* current stats command */
360 
361     uint8_t refcount; /* number of references to the object */
362 
363     struct {
364         char *buffer;
365         size_t size;
366         size_t offset;
367     } dynamic_buffer;
368 
369     void *engine_storage;
370 
371     /** Current ascii protocol */
372     EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ascii_cmd;
373 
374 
375     /* Binary protocol stuff */
376     /* This is where the binary header goes */
377     protocol_binary_request_header binary_header;
378     uint64_t cas; /* the cas to return */
379     short cmd; /* current command being processed */
380     int opaque;
381     int keylen;
382 
383     int list_state; /* bitmask of list state data for this connection */
384     conn   *next;     /* Used for generating a list of conn structures */
385     LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
386 
387     ENGINE_ERROR_CODE aiostat;
388     bool ewouldblock;
389     bool tap_nack_mode;
390     TAP_ITERATOR tap_iterator;
391 };
392 
393 /* States for the connection list_state */
394 #define LIST_STATE_PROCESSING 1
395 #define LIST_STATE_REQ_PENDING_IO 2
396 #define LIST_STATE_REQ_PENDING_CLOSE 4
397 
398 /*
399  * Functions
400  */
401 conn *conn_new(const SOCKET sfd, STATE_FUNC init_state, const int event_flags,
402                const int read_buffer_size, enum network_transport transport,
403                struct event_base *base, struct timeval *timeout);
404 #ifndef WIN32
405 extern int daemonize(int nochdir, int noclose);
406 #endif
407 
408 #include "stats.h"
409 #include "trace.h"
410 #include "hash.h"
411 #include <memcached/util.h>
412 
413 /*
414  * Functions to add / update the connection to libevent
415  */
416 bool register_event(conn *c, struct timeval *timeout);
417 bool unregister_event(conn *c);
418 bool update_event(conn *c, const int new_flags);
419 
420 /*
421  * Functions such as the libevent-related calls that need to do cross-thread
422  * communication in multithreaded mode (rather than actually doing the work
423  * in the current thread) are called via "dispatch_" frontends, which are
424  * also #define-d to directly call the underlying code in singlethreaded mode.
425  */
426 
427 void thread_init(int nthreads, struct event_base *main_base,
428                  void (*dispatcher_callback)(int, short, void *));
429 void threads_shutdown(void);
430 
431 int  dispatch_event_add(int thread, conn *c);
432 void dispatch_conn_new(SOCKET sfd, STATE_FUNC init_state, int event_flags,
433                        int read_buffer_size, enum network_transport transport);
434 
435 /* Lock wrappers for cache functions that are called from main loop. */
436 void accept_new_conns(const bool do_accept);
437 conn *conn_from_freelist(void);
438 bool  conn_add_to_freelist(conn *c);
439 int   is_listen_thread(void);
440 
441 void STATS_LOCK(void);
442 void STATS_UNLOCK(void);
443 void threadlocal_stats_clear(struct thread_stats *stats);
444 void threadlocal_stats_reset(struct thread_stats *thread_stats);
445 void threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats);
446 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
447 
448 /* Stat processing functions */
449 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
450                  const char *fmt, ...);
451 
452 void notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status);
453 void conn_set_state(conn *c, STATE_FUNC state);
454 const char *state_text(STATE_FUNC state);
455 void safe_close(SOCKET sfd);
456 
457 
458 // Number of times this connection is in the given pending list
459 int number_of_pending(conn *c, conn *pending);
460 bool has_cycle(conn *c);
461 bool list_contains(conn *h, conn *n);
462 conn *list_remove(conn *h, conn *n);
463 size_t list_to_array(conn **dest, size_t max_items, conn **l);
464 void enlist_conn(conn *c, conn **list);
465 void finalize_list(conn **list, size_t items);
466 bool set_socket_nonblocking(SOCKET sfd);
467 
468 void conn_close(conn *c);
469 
470 
471 #if HAVE_DROP_PRIVILEGES
472 extern void drop_privileges(void);
473 #else
474 #define drop_privileges()
475 #endif
476 
477 /* connection state machine */
478 bool conn_listening(conn *c);
479 bool conn_new_cmd(conn *c);
480 bool conn_waiting(conn *c);
481 bool conn_read(conn *c);
482 bool conn_parse_cmd(conn *c);
483 bool conn_write(conn *c);
484 bool conn_nread(conn *c);
485 bool conn_swallow(conn *c);
486 bool conn_pending_close(conn *c);
487 bool conn_immediate_close(conn *c);
488 bool conn_closing(conn *c);
489 bool conn_mwrite(conn *c);
490 bool conn_ship_log(conn *c);
491 bool conn_add_tap_client(conn *c);
492 bool conn_setup_tap_stream(conn *c);
493 
494 /* If supported, give compiler hints for branch prediction. */
495 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
496 #define __builtin_expect(x, expected_value) (x)
497 #endif
498 
499 #define likely(x)       __builtin_expect((x),1)
500 #define unlikely(x)     __builtin_expect((x),0)
501 #endif
502