1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 
3 /** \file
4  * The main memcached header holding commonly used data
5  * structures and function prototypes.
6  */
7 
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11 
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <sys/time.h>
15 #include <netinet/in.h>
16 #include <event.h>
17 #include <netdb.h>
18 #include <pthread.h>
19 #include <unistd.h>
20 #include <assert.h>
21 #include <grp.h>
22 #include <signal.h>
23 /* need this to get IOV_MAX on some platforms. */
24 #ifndef __need_IOV_MAX
25 #define __need_IOV_MAX
26 #endif
27 #include <limits.h>
28 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
29 #ifndef IOV_MAX
30 #if defined(__FreeBSD__) || defined(__APPLE__) || defined(__GNU__)
31 # define IOV_MAX 1024
32 /* GNU/Hurd don't set MAXPATHLEN
33  * http://www.gnu.org/software/hurd/hurd/porting/guidelines.html#PATH_MAX_tt_MAX_PATH_tt_MAXPATHL */
34 #ifndef MAXPATHLEN
35 #define MAXPATHLEN 4096
36 #endif
37 #endif
38 #endif
39 
40 #include "itoa_ljust.h"
41 #include "protocol_binary.h"
42 #include "cache.h"
43 #include "logger.h"
44 
45 #ifdef EXTSTORE
46 #include "crc32c.h"
47 #endif
48 
49 #include "sasl_defs.h"
50 #ifdef TLS
51 #include <openssl/ssl.h>
52 #endif
53 
54 /* for NAPI pinning feature */
55 #ifndef SO_INCOMING_NAPI_ID
56 #define SO_INCOMING_NAPI_ID 56
57 #endif
58 
59 /** Maximum length of a key. */
60 #define KEY_MAX_LENGTH 250
61 
62 /** Maximum length of a uri encoded key. */
63 #define KEY_MAX_URI_ENCODED_LENGTH (KEY_MAX_LENGTH  * 3 + 1)
64 
65 /** Size of an incr buf. */
66 #define INCR_MAX_STORAGE_LEN 24
67 
68 #define WRITE_BUFFER_SIZE 1024
69 #define READ_BUFFER_SIZE 16384
70 #define READ_BUFFER_CACHED 0
71 #define UDP_READ_BUFFER_SIZE 65536
72 #define UDP_MAX_PAYLOAD_SIZE 1400
73 #define UDP_HEADER_SIZE 8
74 #define UDP_DATA_SIZE 1392 // UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE
75 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
76 
77 /* Binary protocol stuff */
78 #define BIN_MAX_EXTLEN 20 // length of the _incr command is currently the longest.
79 
80 /* Initial power multiplier for the hash table */
81 #define HASHPOWER_DEFAULT 16
82 #define HASHPOWER_MAX 32
83 
84 /*
85  * We only reposition items in the LRU queue if they haven't been repositioned
86  * in this many seconds. That saves us from churning on frequently-accessed
87  * items.
88  */
89 #define ITEM_UPDATE_INTERVAL 60
90 
91 /*
92  * Valid range of the maximum size of an item, in bytes.
93  */
94 #define ITEM_SIZE_MAX_LOWER_LIMIT 1024
95 #define ITEM_SIZE_MAX_UPPER_LIMIT 1024 * 1024 * 1024
96 
97 
98 /* unistd.h is here */
99 #if HAVE_UNISTD_H
100 # include <unistd.h>
101 #endif
102 
103 /* Slab sizing definitions. */
104 #define POWER_SMALLEST 1
105 #define POWER_LARGEST  256 /* actual cap is 255 */
106 #define SLAB_GLOBAL_PAGE_POOL 0 /* magic slab class for storing pages for reassignment */
107 #define CHUNK_ALIGN_BYTES 8
108 /* slab class max is a 6-bit number, -1. */
109 #define MAX_NUMBER_OF_SLAB_CLASSES (63 + 1)
110 
111 /** How long an object can reasonably be assumed to be locked before
112     harvesting it on a low memory condition. Default: disabled. */
113 #define TAIL_REPAIR_TIME_DEFAULT 0
114 
115 /* warning: don't use these macros with a function, as it evals its arg twice */
116 #define ITEM_get_cas(i) (((i)->it_flags & ITEM_CAS) ? \
117         (i)->data->cas : (uint64_t)0)
118 
119 #define ITEM_set_cas(i,v) { \
120     if ((i)->it_flags & ITEM_CAS) { \
121         (i)->data->cas = v; \
122     } \
123 }
124 
125 #define ITEM_key(item) (((char*)&((item)->data)) \
126          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
127 
128 #define ITEM_suffix(item) ((char*) &((item)->data) + (item)->nkey + 1 \
129          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
130 
131 #define ITEM_data(item) ((char*) &((item)->data) + (item)->nkey + 1 \
132          + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0) \
133          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
134 
135 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \
136          + (item)->nbytes \
137          + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0) \
138          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
139 
140 #define ITEM_clsid(item) ((item)->slabs_clsid & ~(3<<6))
141 #define ITEM_lruid(item) ((item)->slabs_clsid & (3<<6))
142 
143 #define STAT_KEY_LEN 128
144 #define STAT_VAL_LEN 128
145 
146 /** Append a simple stat with a stat name, value format and value */
147 #define APPEND_STAT(name, fmt, val) \
148     append_stat(name, add_stats, c, fmt, val);
149 
150 /** Append an indexed stat with a stat name (with format), value format
151     and value */
152 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val)          \
153     klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name);    \
154     vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val);               \
155     add_stats(key_str, klen, val_str, vlen, c);
156 
157 /** Common APPEND_NUM_FMT_STAT format. */
158 #define APPEND_NUM_STAT(num, name, fmt, val) \
159     APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val)
160 
161 /** Item client flag conversion */
162 #define FLAGS_CONV(it, flag) { \
163     if ((it)->it_flags & ITEM_CFLAGS) { \
164         flag = *((uint32_t *)ITEM_suffix((it))); \
165     } else { \
166         flag = 0; \
167     } \
168 }
169 
170 #define FLAGS_SIZE(item) (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0)
171 
172 /**
173  * Callback for any function producing stats.
174  *
175  * @param key the stat's key
176  * @param klen length of the key
177  * @param val the stat's value in an ascii form (e.g. text form of a number)
178  * @param vlen length of the value
179  * @parm cookie magic callback cookie
180  */
181 typedef void (*ADD_STAT)(const char *key, const uint16_t klen,
182                          const char *val, const uint32_t vlen,
183                          const void *cookie);
184 
185 /*
186  * NOTE: If you modify this table you _MUST_ update the function state_text
187  */
188 /**
189  * Possible states of a connection.
190  */
191 enum conn_states {
192     conn_listening,  /**< the socket which listens for connections */
193     conn_new_cmd,    /**< Prepare connection for next command */
194     conn_waiting,    /**< waiting for a readable socket */
195     conn_read,       /**< reading in a command line */
196     conn_parse_cmd,  /**< try to parse a command from the input buffer */
197     conn_write,      /**< writing out a simple response */
198     conn_nread,      /**< reading in a fixed number of bytes */
199     conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
200     conn_closing,    /**< closing this connection */
201     conn_mwrite,     /**< writing out many items sequentially */
202     conn_closed,     /**< connection is closed */
203     conn_watch,      /**< held by the logger thread as a watcher */
204     conn_io_queue,   /**< wait on async. process to get response object */
205     conn_max_state   /**< Max state value (used for assertion) */
206 };
207 
208 enum bin_substates {
209     bin_no_state,
210     bin_reading_set_header,
211     bin_reading_cas_header,
212     bin_read_set_value,
213     bin_reading_get_key,
214     bin_reading_stat,
215     bin_reading_del_header,
216     bin_reading_incr_header,
217     bin_read_flush_exptime,
218     bin_reading_sasl_auth,
219     bin_reading_sasl_auth_data,
220     bin_reading_touch_key,
221 };
222 
223 enum protocol {
224     ascii_prot = 3, /* arbitrary value. */
225     binary_prot,
226     negotiating_prot /* Discovering the protocol */
227 };
228 
229 enum network_transport {
230     local_transport, /* Unix sockets*/
231     tcp_transport,
232     udp_transport
233 };
234 
235 enum pause_thread_types {
236     PAUSE_WORKER_THREADS = 0,
237     PAUSE_ALL_THREADS,
238     RESUME_ALL_THREADS,
239     RESUME_WORKER_THREADS
240 };
241 
242 enum stop_reasons {
243     NOT_STOP,
244     GRACE_STOP,
245     EXIT_NORMALLY
246 };
247 
248 enum close_reasons {
249     ERROR_CLOSE,
250     NORMAL_CLOSE,
251     IDLE_TIMEOUT_CLOSE,
252     SHUTDOWN_CLOSE,
253 };
254 
255 #define IS_TCP(x) (x == tcp_transport)
256 #define IS_UDP(x) (x == udp_transport)
257 
258 #define NREAD_ADD 1
259 #define NREAD_SET 2
260 #define NREAD_REPLACE 3
261 #define NREAD_APPEND 4
262 #define NREAD_PREPEND 5
263 #define NREAD_CAS 6
264 
265 enum store_item_type {
266     NOT_STORED=0, STORED, EXISTS, NOT_FOUND, TOO_LARGE, NO_MEMORY
267 };
268 
269 enum delta_result_type {
270     OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND, DELTA_ITEM_CAS_MISMATCH
271 };
272 
273 /** Time relative to server start. Smaller than time_t on 64-bit systems. */
274 // TODO: Move to sub-header. needed in logger.h
275 //typedef unsigned int rel_time_t;
276 
277 /** Use X macros to avoid iterating over the stats fields during reset and
278  * aggregation. No longer have to add new stats in 3+ places.
279  */
280 
281 #define SLAB_STATS_FIELDS \
282     X(set_cmds) \
283     X(get_hits) \
284     X(touch_hits) \
285     X(delete_hits) \
286     X(cas_hits) \
287     X(cas_badval) \
288     X(incr_hits) \
289     X(decr_hits)
290 
291 /** Stats stored per slab (and per thread). */
292 struct slab_stats {
293 #define X(name) uint64_t    name;
294     SLAB_STATS_FIELDS
295 #undef X
296 };
297 
298 #define THREAD_STATS_FIELDS \
299     X(get_cmds) \
300     X(get_misses) \
301     X(get_expired) \
302     X(get_flushed) \
303     X(touch_cmds) \
304     X(touch_misses) \
305     X(delete_misses) \
306     X(incr_misses) \
307     X(decr_misses) \
308     X(cas_misses) \
309     X(meta_cmds) \
310     X(bytes_read) \
311     X(bytes_written) \
312     X(flush_cmds) \
313     X(conn_yields) /* # of yields for connections (-R option)*/ \
314     X(auth_cmds) \
315     X(auth_errors) \
316     X(idle_kicks) /* idle connections killed */ \
317     X(response_obj_oom) \
318     X(response_obj_count) \
319     X(response_obj_bytes) \
320     X(read_buf_oom)
321 
322 #ifdef EXTSTORE
323 #define EXTSTORE_THREAD_STATS_FIELDS \
324     X(get_extstore) \
325     X(get_aborted_extstore) \
326     X(get_oom_extstore) \
327     X(recache_from_extstore) \
328     X(miss_from_extstore) \
329     X(badcrc_from_extstore)
330 #endif
331 
332 /**
333  * Stats stored per-thread.
334  */
335 struct thread_stats {
336     pthread_mutex_t   mutex;
337 #define X(name) uint64_t    name;
338     THREAD_STATS_FIELDS
339 #ifdef EXTSTORE
340     EXTSTORE_THREAD_STATS_FIELDS
341 #endif
342 #undef X
343     struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
344     uint64_t lru_hits[POWER_LARGEST];
345     uint64_t read_buf_count;
346     uint64_t read_buf_bytes;
347     uint64_t read_buf_bytes_free;
348 };
349 
350 /**
351  * Global stats. Only resettable stats should go into this structure.
352  */
353 struct stats {
354     uint64_t      total_items;
355     uint64_t      total_conns;
356     uint64_t      rejected_conns;
357     uint64_t      malloc_fails;
358     uint64_t      listen_disabled_num;
359     uint64_t      slabs_moved;       /* times slabs were moved around */
360     uint64_t      slab_reassign_rescues; /* items rescued during slab move */
361     uint64_t      slab_reassign_evictions_nomem; /* valid items lost during slab move */
362     uint64_t      slab_reassign_inline_reclaim; /* valid items lost during slab move */
363     uint64_t      slab_reassign_chunk_rescues; /* chunked-item chunks recovered */
364     uint64_t      slab_reassign_busy_items; /* valid temporarily unmovable */
365     uint64_t      slab_reassign_busy_deletes; /* refcounted items killed */
366     uint64_t      lru_crawler_starts; /* Number of item crawlers kicked off */
367     uint64_t      lru_maintainer_juggles; /* number of LRU bg pokes */
368     uint64_t      time_in_listen_disabled_us;  /* elapsed time in microseconds while server unable to process new connections */
369     uint64_t      log_worker_dropped; /* logs dropped by worker threads */
370     uint64_t      log_worker_written; /* logs written by worker threads */
371     uint64_t      log_watcher_skipped; /* logs watchers missed */
372     uint64_t      log_watcher_sent; /* logs sent to watcher buffers */
373 #ifdef EXTSTORE
374     uint64_t      extstore_compact_lost; /* items lost because they were locked */
375     uint64_t      extstore_compact_rescues; /* items re-written during compaction */
376     uint64_t      extstore_compact_skipped; /* unhit items skipped during compaction */
377 #endif
378 #ifdef TLS
379     uint64_t      ssl_handshake_errors; /* TLS failures at accept/handshake time */
380     uint64_t      ssl_new_sessions; /* successfully negotiated new (non-reused) TLS sessions */
381 #endif
382     struct timeval maxconns_entered;  /* last time maxconns entered */
383     uint64_t      unexpected_napi_ids;  /* see doc/napi_ids.txt */
384     uint64_t      round_robin_fallback; /* see doc/napi_ids.txt */
385 };
386 
387 /**
388  * Global "state" stats. Reflects state that shouldn't be wiped ever.
389  * Ordered for some cache line locality for commonly updated counters.
390  */
391 struct stats_state {
392     uint64_t      curr_items;
393     uint64_t      curr_bytes;
394     uint64_t      curr_conns;
395     uint64_t      hash_bytes;       /* size used for hash tables */
396     unsigned int  conn_structs;
397     unsigned int  reserved_fds;
398     unsigned int  hash_power_level; /* Better hope it's not over 9000 */
399     unsigned int  log_watchers; /* number of currently active watchers */
400     bool          hash_is_expanding; /* If the hash table is being expanded */
401     bool          accepting_conns;  /* whether we are currently accepting */
402     bool          slab_reassign_running; /* slab reassign in progress */
403     bool          lru_crawler_running; /* crawl in progress */
404 };
405 
406 #define MAX_VERBOSITY_LEVEL 2
407 
408 /* When adding a setting, be sure to update process_stat_settings */
409 /**
410  * Globally accessible settings as derived from the commandline.
411  */
412 struct settings {
413     size_t maxbytes;
414     int maxconns;
415     int port;
416     int udpport;
417     char *inter;
418     int verbose;
419     rel_time_t oldest_live; /* ignore existing items older than this */
420     uint64_t oldest_cas; /* ignore existing items with CAS values lower than this */
421     int evict_to_free;
422     char *socketpath;   /* path to unix socket if using local socket */
423     char *auth_file;    /* path to user authentication file */
424     int access;  /* access mask (a la chmod) for unix domain socket */
425     double factor;          /* chunk size growth factor */
426     int chunk_size;
427     int num_threads;        /* number of worker (without dispatcher) libevent threads to run */
428     int num_threads_per_udp; /* number of worker threads serving each udp socket */
429     char prefix_delimiter;  /* character that marks a key prefix (for stats) */
430     int detail_enabled;     /* nonzero if we're collecting detailed stats */
431     int reqs_per_event;     /* Maximum number of io to process on each
432                                io-event. */
433     bool use_cas;
434     enum protocol binding_protocol;
435     int backlog;
436     int item_size_max;        /* Maximum item size */
437     int slab_chunk_size_max;  /* Upper end for chunks within slab pages. */
438     int slab_page_size;     /* Slab's page units. */
439     volatile sig_atomic_t sig_hup;  /* a HUP signal was received but not yet handled */
440     bool sasl;              /* SASL on/off */
441     bool maxconns_fast;     /* Whether or not to early close connections */
442     bool lru_crawler;        /* Whether or not to enable the autocrawler thread */
443     bool lru_maintainer_thread; /* LRU maintainer background thread */
444     bool lru_segmented;     /* Use split or flat LRU's */
445     bool slab_reassign;     /* Whether or not slab reassignment is allowed */
446     int slab_automove;     /* Whether or not to automatically move slabs */
447     double slab_automove_ratio; /* youngest must be within pct of oldest */
448     unsigned int slab_automove_window; /* window mover for algorithm */
449     int hashpower_init;     /* Starting hash power level */
450     bool shutdown_command; /* allow shutdown command */
451     int tail_repair_time;   /* LRU tail refcount leak repair time */
452     bool flush_enabled;     /* flush_all enabled */
453     bool dump_enabled;      /* whether cachedump/metadump commands work */
454     char *hash_algorithm;     /* Hash algorithm in use */
455     int lru_crawler_sleep;  /* Microsecond sleep between items */
456     uint32_t lru_crawler_tocrawl; /* Number of items to crawl per run */
457     int hot_lru_pct; /* percentage of slab space for HOT_LRU */
458     int warm_lru_pct; /* percentage of slab space for WARM_LRU */
459     double hot_max_factor; /* HOT tail age relative to COLD tail */
460     double warm_max_factor; /* WARM tail age relative to COLD tail */
461     int crawls_persleep; /* Number of LRU crawls to run before sleeping */
462     bool temp_lru; /* TTL < temporary_ttl uses TEMP_LRU */
463     uint32_t temporary_ttl; /* temporary LRU threshold */
464     int idle_timeout;       /* Number of seconds to let connections idle */
465     unsigned int logger_watcher_buf_size; /* size of logger's per-watcher buffer */
466     unsigned int logger_buf_size; /* size of per-thread logger buffer */
467     unsigned int read_buf_mem_limit; /* total megabytes allowable for net buffers */
468     bool drop_privileges;   /* Whether or not to drop unnecessary process privileges */
469     bool watch_enabled; /* allows watch commands to be dropped */
470     bool relaxed_privileges;   /* Relax process restrictions when running testapp */
471     bool meta_response_old; /* use "OK" instead of "HD". for response code TEMPORARY! */
472 #ifdef EXTSTORE
473     unsigned int ext_io_threadcount; /* number of IO threads to run. */
474     unsigned int ext_page_size; /* size in megabytes of storage pages. */
475     unsigned int ext_item_size; /* minimum size of items to store externally */
476     unsigned int ext_item_age; /* max age of tail item before storing ext. */
477     unsigned int ext_low_ttl; /* remaining TTL below this uses own pages */
478     unsigned int ext_recache_rate; /* counter++ % recache_rate == 0 > recache */
479     unsigned int ext_wbuf_size; /* read only note for the engine */
480     unsigned int ext_compact_under; /* when fewer than this many pages, compact */
481     unsigned int ext_drop_under; /* when fewer than this many pages, drop COLD items */
482     double ext_max_frag; /* ideal maximum page fragmentation */
483     double slab_automove_freeratio; /* % of memory to hold free as buffer */
484     bool ext_drop_unread; /* skip unread items during compaction */
485     /* per-slab-class free chunk limit */
486     unsigned int ext_free_memchunks[MAX_NUMBER_OF_SLAB_CLASSES];
487 #endif
488 #ifdef TLS
489     bool ssl_enabled; /* indicates whether SSL is enabled */
490     SSL_CTX *ssl_ctx; /* holds the SSL server context which has the server certificate */
491     char *ssl_chain_cert; /* path to the server SSL chain certificate */
492     char *ssl_key; /* path to the server key */
493     int ssl_verify_mode; /* client certificate verify mode */
494     int ssl_keyformat; /* key format , default is PEM */
495     char *ssl_ciphers; /* list of SSL ciphers */
496     char *ssl_ca_cert; /* certificate with CAs. */
497     rel_time_t ssl_last_cert_refresh_time; /* time of the last server certificate refresh */
498     unsigned int ssl_wbuf_size; /* size of the write buffer used by ssl_sendmsg method */
499     bool ssl_session_cache; /* enable SSL server session caching */
500     int ssl_min_version; /* minimum SSL protocol version to accept */
501 #endif
502     int num_napi_ids;   /* maximum number of NAPI IDs */
503     char *memory_file;  /* warm restart memory file path */
504 };
505 
506 extern struct stats stats;
507 extern struct stats_state stats_state;
508 extern time_t process_started;
509 extern struct settings settings;
510 
511 #define ITEM_LINKED 1
512 #define ITEM_CAS 2
513 
514 /* temp */
515 #define ITEM_SLABBED 4
516 
517 /* Item was fetched at least once in its lifetime */
518 #define ITEM_FETCHED 8
519 /* Appended on fetch, removed on LRU shuffling */
520 #define ITEM_ACTIVE 16
521 /* If an item's storage are chained chunks. */
522 #define ITEM_CHUNKED 32
523 #define ITEM_CHUNK 64
524 /* ITEM_data bulk is external to item */
525 #define ITEM_HDR 128
526 /* additional 4 bytes for item client flags */
527 #define ITEM_CFLAGS 256
528 /* item has sent out a token already */
529 #define ITEM_TOKEN_SENT 512
530 /* reserved, in case tokens should be a 2-bit count in future */
531 #define ITEM_TOKEN_RESERVED 1024
532 /* if item has been marked as a stale value */
533 #define ITEM_STALE 2048
534 /* if item key was sent in binary */
535 #define ITEM_KEY_BINARY 4096
536 
537 /**
538  * Structure for storing items within memcached.
539  */
540 typedef struct _stritem {
541     /* Protected by LRU locks */
542     struct _stritem *next;
543     struct _stritem *prev;
544     /* Rest are protected by an item lock */
545     struct _stritem *h_next;    /* hash chain next */
546     rel_time_t      time;       /* least recent access */
547     rel_time_t      exptime;    /* expire time */
548     int             nbytes;     /* size of data */
549     unsigned short  refcount;
550     uint16_t        it_flags;   /* ITEM_* above */
551     uint8_t         slabs_clsid;/* which slab class we're in */
552     uint8_t         nkey;       /* key length, w/terminating null and padding */
553     /* this odd type prevents type-punning issues when we do
554      * the little shuffle to save space when not using CAS. */
555     union {
556         uint64_t cas;
557         char end;
558     } data[];
559     /* if it_flags & ITEM_CAS we have 8 bytes CAS */
560     /* then null-terminated key */
561     /* then " flags length\r\n" (no terminating null) */
562     /* then data with terminating \r\n (no terminating null; it's binary!) */
563 } item;
564 
565 // TODO: If we eventually want user loaded modules, we can't use an enum :(
566 enum crawler_run_type {
567     CRAWLER_AUTOEXPIRE=0, CRAWLER_EXPIRED, CRAWLER_METADUMP
568 };
569 
570 typedef struct {
571     struct _stritem *next;
572     struct _stritem *prev;
573     struct _stritem *h_next;    /* hash chain next */
574     rel_time_t      time;       /* least recent access */
575     rel_time_t      exptime;    /* expire time */
576     int             nbytes;     /* size of data */
577     unsigned short  refcount;
578     uint16_t        it_flags;   /* ITEM_* above */
579     uint8_t         slabs_clsid;/* which slab class we're in */
580     uint8_t         nkey;       /* key length, w/terminating null and padding */
581     uint32_t        remaining;  /* Max keys to crawl per slab per invocation */
582     uint64_t        reclaimed;  /* items reclaimed during this crawl. */
583     uint64_t        unfetched;  /* items reclaimed unfetched during this crawl. */
584     uint64_t        checked;    /* items examined during this crawl. */
585 } crawler;
586 
587 /* Header when an item is actually a chunk of another item. */
588 typedef struct _strchunk {
589     struct _strchunk *next;     /* points within its own chain. */
590     struct _strchunk *prev;     /* can potentially point to the head. */
591     struct _stritem  *head;     /* always points to the owner chunk */
592     int              size;      /* available chunk space in bytes */
593     int              used;      /* chunk space used */
594     int              nbytes;    /* used. */
595     unsigned short   refcount;  /* used? */
596     uint16_t         it_flags;  /* ITEM_* above. */
597     uint8_t          slabs_clsid; /* Same as above. */
598     uint8_t          orig_clsid; /* For obj hdr chunks slabs_clsid is fake. */
599     char data[];
600 } item_chunk;
601 
602 #ifdef NEED_ALIGN
ITEM_schunk(item * it)603 static inline char *ITEM_schunk(item *it) {
604     int offset = it->nkey + 1
605         + ((it->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0)
606         + ((it->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0);
607     int remain = offset % 8;
608     if (remain != 0) {
609         offset += 8 - remain;
610     }
611     return ((char *) &(it->data)) + offset;
612 }
613 #else
614 #define ITEM_schunk(item) ((char*) &((item)->data) + (item)->nkey + 1 \
615          + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0) \
616          + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
617 #endif
618 
619 #ifdef EXTSTORE
620 typedef struct {
621     unsigned int page_version; /* from IO header */
622     unsigned int offset; /* from IO header */
623     unsigned short page_id; /* from IO header */
624 } item_hdr;
625 #endif
626 
627 #define IO_QUEUE_COUNT 3
628 
629 #define IO_QUEUE_NONE 0
630 #define IO_QUEUE_EXTSTORE 1
631 
632 typedef struct _io_pending_t io_pending_t;
633 typedef struct io_queue_s io_queue_t;
634 typedef void (*io_queue_stack_cb)(io_queue_t *q);
635 typedef void (*io_queue_cb)(io_pending_t *pending);
636 // this structure's ownership gets passed between threads:
637 // - owned normally by the worker thread.
638 // - multiple queues can be submitted at the same time.
639 // - each queue can be sent to different background threads.
640 // - each submitted queue needs to know when to return to the worker.
641 // - the worker needs to know when all queues have returned so it can process.
642 //
643 // io_queue_t's count field is owned by worker until submitted. Then owned by
644 // side thread until returned.
645 // conn->io_queues_submitted is always owned by the worker thread. it is
646 // incremented as the worker submits queues, and decremented as it gets pinged
647 // for returned threads.
648 //
649 // All of this is to avoid having to hit a mutex owned by the connection
650 // thread that gets pinged for each thread (or an equivalent atomic).
651 struct io_queue_s {
652     void *ctx; // duplicated from io_queue_cb_t
653     void *stack_ctx; // module-specific context to be batch-submitted
654     int count; // ios to process before returning. only accessed by queue processor once submitted
655     int type; // duplicated from io_queue_cb_t
656 };
657 
658 typedef struct io_queue_cb_s {
659     void *ctx; // untouched ptr for specific context
660     io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
661     io_queue_stack_cb complete_cb;
662     io_queue_cb return_cb; // called on worker thread.
663     io_queue_cb finalize_cb; // called back on the worker thread.
664     int type;
665 } io_queue_cb_t;
666 
667 typedef struct _mc_resp_bundle mc_resp_bundle;
668 typedef struct {
669     pthread_t thread_id;        /* unique ID of this thread */
670     struct event_base *base;    /* libevent handle this thread uses */
671     struct event notify_event;  /* listen event for notify pipe */
672 #ifdef HAVE_EVENTFD
673     int notify_event_fd;        /* notify counter */
674 #else
675     int notify_receive_fd;      /* receiving end of notify pipe */
676     int notify_send_fd;         /* sending end of notify pipe */
677 #endif
678     struct thread_stats stats;  /* Stats generated by this thread */
679     io_queue_cb_t io_queues[IO_QUEUE_COUNT];
680     struct conn_queue *ev_queue; /* Worker/conn event queue */
681     cache_t *rbuf_cache;        /* static-sized read buffers */
682     mc_resp_bundle *open_bundle;
683     cache_t *io_cache;          /* IO objects */
684 #ifdef EXTSTORE
685     void *storage;              /* data object for storage system */
686 #endif
687     logger *l;                  /* logger buffer */
688     void *lru_bump_buf;         /* async LRU bump buffer */
689 #ifdef TLS
690     char   *ssl_wbuf;
691 #endif
692     int napi_id;                /* napi id associated with this thread */
693 
694 } LIBEVENT_THREAD;
695 
696 /**
697  * Response objects
698  */
699 #define MC_RESP_IOVCOUNT 4
700 typedef struct _mc_resp {
701     mc_resp_bundle *bundle; // ptr back to bundle
702     struct _mc_resp *next; // choo choo.
703     int wbytes; // bytes to write out of wbuf: might be able to nuke this.
704     int tosend; // total bytes to send for this response
705     void *write_and_free; /** free this memory after finishing writing */
706     io_pending_t *io_pending; /* pending IO descriptor for this response */
707 
708     item *item; /* item associated with this response object, with reference held */
709     struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */
710     int chunked_total; /* total amount of chunked item data to send. */
711     uint8_t iovcnt;
712     uint8_t chunked_data_iov; /* this iov is a pointer to chunked data header */
713 
714     /* instruct transmit to skip this response object. used by storage engines
715      * to asynchronously kill an object that was queued to write
716      */
717     bool skip;
718     bool free; // double free detection.
719     // UDP bits. Copied in from the client.
720     uint16_t    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
721     uint16_t    udp_sequence; /* packet counter when transmitting result */
722     uint16_t    udp_total; /* total number of packets in sequence */
723     struct sockaddr_in6 request_addr; /* udp: Who sent this request */
724     socklen_t request_addr_size;
725 
726     char wbuf[WRITE_BUFFER_SIZE];
727 } mc_resp;
728 
729 #define MAX_RESP_PER_BUNDLE ((READ_BUFFER_SIZE - sizeof(mc_resp_bundle)) / sizeof(mc_resp))
730 struct _mc_resp_bundle {
731     uint8_t refcount;
732     uint8_t next_check; // next object to check on assignment.
733     struct _mc_resp_bundle *next;
734     struct _mc_resp_bundle *prev;
735     mc_resp r[];
736 };
737 
738 typedef struct conn conn;
739 
740 struct _io_pending_t {
741     int io_queue_type; // matches one of IO_QUEUE_*
742     LIBEVENT_THREAD *thread;
743     conn *c;
744     mc_resp *resp; // associated response object
745     char data[120];
746 };
747 
748 /**
749  * The structure representing a connection into memcached.
750  */
751 struct conn {
752     sasl_conn_t *sasl_conn;
753     int    sfd;
754     bool sasl_started;
755     bool authenticated;
756     bool set_stale;
757     bool mset_res; /** uses mset format for return code */
758     bool close_after_write; /** flush write then move to close connection */
759     bool rbuf_malloced; /** read buffer was malloc'ed for ascii mget, needs free() */
760     bool item_malloced; /** item for conn_nread state is a temporary malloc */
761 #ifdef TLS
762     SSL    *ssl;
763     char   *ssl_wbuf;
764     bool ssl_enabled;
765 #endif
766     enum conn_states  state;
767     enum bin_substates substate;
768     rel_time_t last_cmd_time;
769     struct event event;
770     short  ev_flags;
771     short  which;   /** which events were just triggered */
772 
773     char   *rbuf;   /** buffer to read commands into */
774     char   *rcurr;  /** but if we parsed some already, this is where we stopped */
775     int    rsize;   /** total allocated size of rbuf */
776     int    rbytes;  /** how much data, starting from rcur, do we have unparsed */
777 
778     mc_resp *resp; // tail response.
779     mc_resp *resp_head; // first response in current stack.
780     char   *ritem;  /** when we read in an item's value, it goes here */
781     int    rlbytes;
782 
783     /**
784      * item is used to hold an item structure created after reading the command
785      * line of set/add/replace commands, but before we finished reading the actual
786      * data. The data is read into ITEM_data(item) to avoid extra copying.
787      */
788 
789     void   *item;     /* for commands set/add/replace  */
790 
791     /* data for the swallow state */
792     int    sbytes;    /* how many bytes to swallow */
793 
794     int io_queues_submitted; /* see notes on io_queue_t */
795     io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */
796 #ifdef EXTSTORE
797     unsigned int recache_counter;
798 #endif
799     enum protocol protocol;   /* which protocol this connection speaks */
800     enum network_transport transport; /* what transport is used by this connection */
801     enum close_reasons close_reason; /* reason for transition into conn_closing */
802 
803     /* data for UDP clients */
804     int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
805     struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
806     socklen_t request_addr_size;
807 
808     bool   noreply;   /* True if the reply should not be sent. */
809     /* current stats command */
810     struct {
811         char *buffer;
812         size_t size;
813         size_t offset;
814     } stats;
815 
816     /* Binary protocol stuff */
817     /* This is where the binary header goes */
818     protocol_binary_request_header binary_header;
819     uint64_t cas; /* the cas to return */
820     short cmd; /* current command being processed */
821     int opaque;
822     int keylen;
823     conn   *next;     /* Used for generating a list of conn structures */
824     LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
825     int (*try_read_command)(conn *c); /* pointer for top level input parser */
826     ssize_t (*read)(conn  *c, void *buf, size_t count);
827     ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
828     ssize_t (*write)(conn *c, void *buf, size_t count);
829 };
830 
831 /* array of conn structures, indexed by file descriptor */
832 extern conn **conns;
833 
834 /* current time of day (updated periodically) */
835 extern volatile rel_time_t current_time;
836 
837 #ifdef MEMCACHED_DEBUG
838 extern volatile bool is_paused;
839 extern volatile int64_t delta;
840 #endif
841 
842 /* TODO: Move to slabs.h? */
843 extern volatile int slab_rebalance_signal;
844 
845 struct slab_rebalance {
846     void *slab_start;
847     void *slab_end;
848     void *slab_pos;
849     int s_clsid;
850     int d_clsid;
851     uint32_t busy_items;
852     uint32_t rescues;
853     uint32_t evictions_nomem;
854     uint32_t inline_reclaim;
855     uint32_t chunk_rescues;
856     uint32_t busy_deletes;
857     uint32_t busy_loops;
858     uint8_t done;
859     uint8_t *completed;
860 };
861 
862 extern struct slab_rebalance slab_rebal;
863 #ifdef EXTSTORE
864 extern void *ext_storage;
865 #endif
866 /*
867  * Functions
868  */
869 void do_accept_new_conns(const bool do_accept);
870 enum delta_result_type do_add_delta(conn *c, const char *key,
871                                     const size_t nkey, const bool incr,
872                                     const int64_t delta, char *buf,
873                                     uint64_t *cas, const uint32_t hv,
874                                     item **it_ret);
875 enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
876 void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
877 void conn_io_queue_setup(conn *c);
878 io_queue_t *conn_io_queue_get(conn *c, int type);
879 io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
880 void conn_io_queue_return(io_pending_t *io);
881 conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
882     enum network_transport transport, struct event_base *base, void *ssl);
883 
884 void conn_worker_readd(conn *c);
885 extern int daemonize(int nochdir, int noclose);
886 
887 #define mutex_lock(x) pthread_mutex_lock(x)
888 #define mutex_unlock(x) pthread_mutex_unlock(x)
889 
890 #include "stats_prefix.h"
891 #include "slabs.h"
892 #include "assoc.h"
893 #include "items.h"
894 #include "crawler.h"
895 #include "trace.h"
896 #include "hash.h"
897 #include "util.h"
898 
899 /*
900  * Functions such as the libevent-related calls that need to do cross-thread
901  * communication in multithreaded mode (rather than actually doing the work
902  * in the current thread) are called via "dispatch_" frontends, which are
903  * also #define-d to directly call the underlying code in singlethreaded mode.
904  */
905 void memcached_thread_init(int nthreads, void *arg);
906 void redispatch_conn(conn *c);
907 void timeout_conn(conn *c);
908 void return_io_pending(io_pending_t *io);
909 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
910     enum network_transport transport, void *ssl);
911 void sidethread_conn_close(conn *c);
912 
913 /* Lock wrappers for cache functions that are called from main loop. */
914 enum delta_result_type add_delta(conn *c, const char *key,
915                                  const size_t nkey, bool incr,
916                                  const int64_t delta, char *buf,
917                                  uint64_t *cas);
918 void accept_new_conns(const bool do_accept);
919 void  conn_close_idle(conn *c);
920 void  conn_close_all(void);
921 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
922 #define DO_UPDATE true
923 #define DONT_UPDATE false
924 item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update);
925 item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv);
926 item *item_touch(const char *key, const size_t nkey, uint32_t exptime, conn *c);
927 int   item_link(item *it);
928 void  item_remove(item *it);
929 int   item_replace(item *it, item *new_it, const uint32_t hv);
930 void  item_unlink(item *it);
931 
932 void item_lock(uint32_t hv);
933 void *item_trylock(uint32_t hv);
934 void item_trylock_unlock(void *arg);
935 void item_unlock(uint32_t hv);
936 void pause_threads(enum pause_thread_types type);
937 void stop_threads(void);
938 int stop_conn_timeout_thread(void);
939 #define refcount_incr(it) ++(it->refcount)
940 #define refcount_decr(it) --(it->refcount)
941 void STATS_LOCK(void);
942 void STATS_UNLOCK(void);
943 #define THR_STATS_LOCK(c) pthread_mutex_lock(&c->thread->stats.mutex)
944 #define THR_STATS_UNLOCK(c) pthread_mutex_unlock(&c->thread->stats.mutex)
945 void threadlocal_stats_reset(void);
946 void threadlocal_stats_aggregate(struct thread_stats *stats);
947 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
948 
949 /* Stat processing functions */
950 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
951                  const char *fmt, ...);
952 
953 enum store_item_type store_item(item *item, int comm, conn *c);
954 
955 /* Protocol related code */
956 void out_string(conn *c, const char *str);
957 #define REALTIME_MAXDELTA 60*60*24*30
958 /* Negative exptimes can underflow and end up immortal. realtime() will
959    immediately expire values that are greater than REALTIME_MAXDELTA, but less
960    than process_started, so lets aim for that. */
961 #define EXPTIME_TO_POSITIVE_TIME(exptime) (exptime < 0) ? \
962         REALTIME_MAXDELTA + 1 : exptime
963 rel_time_t realtime(const time_t exptime);
964 item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch, bool do_update, bool *overflow);
965 item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32_t *hv, bool *overflow);
966 // Read/Response object handlers.
967 void resp_reset(mc_resp *resp);
968 void resp_add_iov(mc_resp *resp, const void *buf, int len);
969 void resp_add_chunked_iov(mc_resp *resp, const void *buf, int len);
970 bool resp_start(conn *c);
971 mc_resp* resp_finish(conn *c, mc_resp *resp);
972 bool resp_has_stack(conn *c);
973 bool rbuf_switch_to_malloc(conn *c);
974 void conn_release_items(conn *c);
975 void conn_set_state(conn *c, enum conn_states state);
976 void out_of_memory(conn *c, char *ascii_error);
977 void out_errstring(conn *c, const char *str);
978 void write_and_free(conn *c, char *buf, int bytes);
979 void server_stats(ADD_STAT add_stats, conn *c);
980 void append_stats(const char *key, const uint16_t klen,
981                   const char *val, const uint32_t vlen,
982                   const void *cookie);
983 /** Return a datum for stats in binary protocol */
984 bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c);
985 void stats_reset(void);
986 void process_stat_settings(ADD_STAT add_stats, void *c);
987 void process_stats_conns(ADD_STAT add_stats, void *c);
988 
989 #if HAVE_DROP_PRIVILEGES
990 extern void setup_privilege_violations_handler(void);
991 extern void drop_privileges(void);
992 #else
993 #define setup_privilege_violations_handler()
994 #define drop_privileges()
995 #endif
996 
997 #if HAVE_DROP_WORKER_PRIVILEGES
998 extern void drop_worker_privileges(void);
999 #else
1000 #define drop_worker_privileges()
1001 #endif
1002 
1003 /* If supported, give compiler hints for branch prediction. */
1004 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
1005 #define __builtin_expect(x, expected_value) (x)
1006 #endif
1007 
1008 #define likely(x)       __builtin_expect((x),1)
1009 #define unlikely(x)     __builtin_expect((x),0)
1010