1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 /** \file
4 * The main memcached header holding commonly used data
5 * structures and function prototypes.
6 */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <sys/time.h>
15 #include <netinet/in.h>
16 #include <event.h>
17 #include <netdb.h>
18 #include <pthread.h>
19 #include <unistd.h>
20 #include <assert.h>
21 #include <grp.h>
22 #include <signal.h>
23 /* need this to get IOV_MAX on some platforms. */
24 #ifndef __need_IOV_MAX
25 #define __need_IOV_MAX
26 #endif
27 #include <limits.h>
28 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
29 #ifndef IOV_MAX
30 #if defined(__FreeBSD__) || defined(__APPLE__) || defined(__GNU__)
31 # define IOV_MAX 1024
32 /* GNU/Hurd don't set MAXPATHLEN
33 * http://www.gnu.org/software/hurd/hurd/porting/guidelines.html#PATH_MAX_tt_MAX_PATH_tt_MAXPATHL */
34 #ifndef MAXPATHLEN
35 #define MAXPATHLEN 4096
36 #endif
37 #endif
38 #endif
39
40 #include "itoa_ljust.h"
41 #include "protocol_binary.h"
42 #include "cache.h"
43 #include "logger.h"
44
45 #ifdef EXTSTORE
46 #include "crc32c.h"
47 #endif
48
49 #include "sasl_defs.h"
50 #ifdef TLS
51 #include <openssl/ssl.h>
52 #endif
53
54 /* for NAPI pinning feature */
55 #ifndef SO_INCOMING_NAPI_ID
56 #define SO_INCOMING_NAPI_ID 56
57 #endif
58
59 /** Maximum length of a key. */
60 #define KEY_MAX_LENGTH 250
61
62 /** Maximum length of a uri encoded key. */
63 #define KEY_MAX_URI_ENCODED_LENGTH (KEY_MAX_LENGTH * 3 + 1)
64
65 /** Size of an incr buf. */
66 #define INCR_MAX_STORAGE_LEN 24
67
68 #define WRITE_BUFFER_SIZE 1024
69 #define READ_BUFFER_SIZE 16384
70 #define READ_BUFFER_CACHED 0
71 #define UDP_READ_BUFFER_SIZE 65536
72 #define UDP_MAX_PAYLOAD_SIZE 1400
73 #define UDP_HEADER_SIZE 8
74 #define UDP_DATA_SIZE 1392 // UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE
75 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
76
77 /* Binary protocol stuff */
78 #define BIN_MAX_EXTLEN 20 // length of the _incr command is currently the longest.
79
80 /* Initial power multiplier for the hash table */
81 #define HASHPOWER_DEFAULT 16
82 #define HASHPOWER_MAX 32
83
84 /*
85 * We only reposition items in the LRU queue if they haven't been repositioned
86 * in this many seconds. That saves us from churning on frequently-accessed
87 * items.
88 */
89 #define ITEM_UPDATE_INTERVAL 60
90
91 /*
92 * Valid range of the maximum size of an item, in bytes.
93 */
94 #define ITEM_SIZE_MAX_LOWER_LIMIT 1024
95 #define ITEM_SIZE_MAX_UPPER_LIMIT 1024 * 1024 * 1024
96
97
98 /* unistd.h is here */
99 #if HAVE_UNISTD_H
100 # include <unistd.h>
101 #endif
102
103 /* Slab sizing definitions. */
104 #define POWER_SMALLEST 1
105 #define POWER_LARGEST 256 /* actual cap is 255 */
106 #define SLAB_GLOBAL_PAGE_POOL 0 /* magic slab class for storing pages for reassignment */
107 #define CHUNK_ALIGN_BYTES 8
108 /* slab class max is a 6-bit number, -1. */
109 #define MAX_NUMBER_OF_SLAB_CLASSES (63 + 1)
110
111 /** How long an object can reasonably be assumed to be locked before
112 harvesting it on a low memory condition. Default: disabled. */
113 #define TAIL_REPAIR_TIME_DEFAULT 0
114
115 /* warning: don't use these macros with a function, as it evals its arg twice */
116 #define ITEM_get_cas(i) (((i)->it_flags & ITEM_CAS) ? \
117 (i)->data->cas : (uint64_t)0)
118
119 #define ITEM_set_cas(i,v) { \
120 if ((i)->it_flags & ITEM_CAS) { \
121 (i)->data->cas = v; \
122 } \
123 }
124
125 #define ITEM_key(item) (((char*)&((item)->data)) \
126 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
127
128 #define ITEM_suffix(item) ((char*) &((item)->data) + (item)->nkey + 1 \
129 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
130
131 #define ITEM_data(item) ((char*) &((item)->data) + (item)->nkey + 1 \
132 + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0) \
133 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
134
135 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \
136 + (item)->nbytes \
137 + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0) \
138 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
139
140 #define ITEM_clsid(item) ((item)->slabs_clsid & ~(3<<6))
141 #define ITEM_lruid(item) ((item)->slabs_clsid & (3<<6))
142
143 #define STAT_KEY_LEN 128
144 #define STAT_VAL_LEN 128
145
146 /** Append a simple stat with a stat name, value format and value */
147 #define APPEND_STAT(name, fmt, val) \
148 append_stat(name, add_stats, c, fmt, val);
149
150 /** Append an indexed stat with a stat name (with format), value format
151 and value */
152 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val) \
153 klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name); \
154 vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val); \
155 add_stats(key_str, klen, val_str, vlen, c);
156
157 /** Common APPEND_NUM_FMT_STAT format. */
158 #define APPEND_NUM_STAT(num, name, fmt, val) \
159 APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val)
160
161 /** Item client flag conversion */
162 #define FLAGS_CONV(it, flag) { \
163 if ((it)->it_flags & ITEM_CFLAGS) { \
164 flag = *((uint32_t *)ITEM_suffix((it))); \
165 } else { \
166 flag = 0; \
167 } \
168 }
169
170 #define FLAGS_SIZE(item) (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0)
171
172 /**
173 * Callback for any function producing stats.
174 *
175 * @param key the stat's key
176 * @param klen length of the key
177 * @param val the stat's value in an ascii form (e.g. text form of a number)
178 * @param vlen length of the value
179 * @parm cookie magic callback cookie
180 */
181 typedef void (*ADD_STAT)(const char *key, const uint16_t klen,
182 const char *val, const uint32_t vlen,
183 const void *cookie);
184
185 /*
186 * NOTE: If you modify this table you _MUST_ update the function state_text
187 */
188 /**
189 * Possible states of a connection.
190 */
191 enum conn_states {
192 conn_listening, /**< the socket which listens for connections */
193 conn_new_cmd, /**< Prepare connection for next command */
194 conn_waiting, /**< waiting for a readable socket */
195 conn_read, /**< reading in a command line */
196 conn_parse_cmd, /**< try to parse a command from the input buffer */
197 conn_write, /**< writing out a simple response */
198 conn_nread, /**< reading in a fixed number of bytes */
199 conn_swallow, /**< swallowing unnecessary bytes w/o storing */
200 conn_closing, /**< closing this connection */
201 conn_mwrite, /**< writing out many items sequentially */
202 conn_closed, /**< connection is closed */
203 conn_watch, /**< held by the logger thread as a watcher */
204 conn_io_queue, /**< wait on async. process to get response object */
205 conn_max_state /**< Max state value (used for assertion) */
206 };
207
208 enum bin_substates {
209 bin_no_state,
210 bin_reading_set_header,
211 bin_reading_cas_header,
212 bin_read_set_value,
213 bin_reading_get_key,
214 bin_reading_stat,
215 bin_reading_del_header,
216 bin_reading_incr_header,
217 bin_read_flush_exptime,
218 bin_reading_sasl_auth,
219 bin_reading_sasl_auth_data,
220 bin_reading_touch_key,
221 };
222
223 enum protocol {
224 ascii_prot = 3, /* arbitrary value. */
225 binary_prot,
226 negotiating_prot /* Discovering the protocol */
227 };
228
229 enum network_transport {
230 local_transport, /* Unix sockets*/
231 tcp_transport,
232 udp_transport
233 };
234
235 enum pause_thread_types {
236 PAUSE_WORKER_THREADS = 0,
237 PAUSE_ALL_THREADS,
238 RESUME_ALL_THREADS,
239 RESUME_WORKER_THREADS
240 };
241
242 enum stop_reasons {
243 NOT_STOP,
244 GRACE_STOP,
245 EXIT_NORMALLY
246 };
247
248 enum close_reasons {
249 ERROR_CLOSE,
250 NORMAL_CLOSE,
251 IDLE_TIMEOUT_CLOSE,
252 SHUTDOWN_CLOSE,
253 };
254
255 #define IS_TCP(x) (x == tcp_transport)
256 #define IS_UDP(x) (x == udp_transport)
257
258 #define NREAD_ADD 1
259 #define NREAD_SET 2
260 #define NREAD_REPLACE 3
261 #define NREAD_APPEND 4
262 #define NREAD_PREPEND 5
263 #define NREAD_CAS 6
264
265 enum store_item_type {
266 NOT_STORED=0, STORED, EXISTS, NOT_FOUND, TOO_LARGE, NO_MEMORY
267 };
268
269 enum delta_result_type {
270 OK, NON_NUMERIC, EOM, DELTA_ITEM_NOT_FOUND, DELTA_ITEM_CAS_MISMATCH
271 };
272
273 /** Time relative to server start. Smaller than time_t on 64-bit systems. */
274 // TODO: Move to sub-header. needed in logger.h
275 //typedef unsigned int rel_time_t;
276
277 /** Use X macros to avoid iterating over the stats fields during reset and
278 * aggregation. No longer have to add new stats in 3+ places.
279 */
280
281 #define SLAB_STATS_FIELDS \
282 X(set_cmds) \
283 X(get_hits) \
284 X(touch_hits) \
285 X(delete_hits) \
286 X(cas_hits) \
287 X(cas_badval) \
288 X(incr_hits) \
289 X(decr_hits)
290
291 /** Stats stored per slab (and per thread). */
292 struct slab_stats {
293 #define X(name) uint64_t name;
294 SLAB_STATS_FIELDS
295 #undef X
296 };
297
298 #define THREAD_STATS_FIELDS \
299 X(get_cmds) \
300 X(get_misses) \
301 X(get_expired) \
302 X(get_flushed) \
303 X(touch_cmds) \
304 X(touch_misses) \
305 X(delete_misses) \
306 X(incr_misses) \
307 X(decr_misses) \
308 X(cas_misses) \
309 X(meta_cmds) \
310 X(bytes_read) \
311 X(bytes_written) \
312 X(flush_cmds) \
313 X(conn_yields) /* # of yields for connections (-R option)*/ \
314 X(auth_cmds) \
315 X(auth_errors) \
316 X(idle_kicks) /* idle connections killed */ \
317 X(response_obj_oom) \
318 X(response_obj_count) \
319 X(response_obj_bytes) \
320 X(read_buf_oom)
321
322 #ifdef EXTSTORE
323 #define EXTSTORE_THREAD_STATS_FIELDS \
324 X(get_extstore) \
325 X(get_aborted_extstore) \
326 X(get_oom_extstore) \
327 X(recache_from_extstore) \
328 X(miss_from_extstore) \
329 X(badcrc_from_extstore)
330 #endif
331
332 /**
333 * Stats stored per-thread.
334 */
335 struct thread_stats {
336 pthread_mutex_t mutex;
337 #define X(name) uint64_t name;
338 THREAD_STATS_FIELDS
339 #ifdef EXTSTORE
340 EXTSTORE_THREAD_STATS_FIELDS
341 #endif
342 #undef X
343 struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
344 uint64_t lru_hits[POWER_LARGEST];
345 uint64_t read_buf_count;
346 uint64_t read_buf_bytes;
347 uint64_t read_buf_bytes_free;
348 };
349
350 /**
351 * Global stats. Only resettable stats should go into this structure.
352 */
353 struct stats {
354 uint64_t total_items;
355 uint64_t total_conns;
356 uint64_t rejected_conns;
357 uint64_t malloc_fails;
358 uint64_t listen_disabled_num;
359 uint64_t slabs_moved; /* times slabs were moved around */
360 uint64_t slab_reassign_rescues; /* items rescued during slab move */
361 uint64_t slab_reassign_evictions_nomem; /* valid items lost during slab move */
362 uint64_t slab_reassign_inline_reclaim; /* valid items lost during slab move */
363 uint64_t slab_reassign_chunk_rescues; /* chunked-item chunks recovered */
364 uint64_t slab_reassign_busy_items; /* valid temporarily unmovable */
365 uint64_t slab_reassign_busy_deletes; /* refcounted items killed */
366 uint64_t lru_crawler_starts; /* Number of item crawlers kicked off */
367 uint64_t lru_maintainer_juggles; /* number of LRU bg pokes */
368 uint64_t time_in_listen_disabled_us; /* elapsed time in microseconds while server unable to process new connections */
369 uint64_t log_worker_dropped; /* logs dropped by worker threads */
370 uint64_t log_worker_written; /* logs written by worker threads */
371 uint64_t log_watcher_skipped; /* logs watchers missed */
372 uint64_t log_watcher_sent; /* logs sent to watcher buffers */
373 #ifdef EXTSTORE
374 uint64_t extstore_compact_lost; /* items lost because they were locked */
375 uint64_t extstore_compact_rescues; /* items re-written during compaction */
376 uint64_t extstore_compact_skipped; /* unhit items skipped during compaction */
377 #endif
378 #ifdef TLS
379 uint64_t ssl_handshake_errors; /* TLS failures at accept/handshake time */
380 uint64_t ssl_new_sessions; /* successfully negotiated new (non-reused) TLS sessions */
381 #endif
382 struct timeval maxconns_entered; /* last time maxconns entered */
383 uint64_t unexpected_napi_ids; /* see doc/napi_ids.txt */
384 uint64_t round_robin_fallback; /* see doc/napi_ids.txt */
385 };
386
387 /**
388 * Global "state" stats. Reflects state that shouldn't be wiped ever.
389 * Ordered for some cache line locality for commonly updated counters.
390 */
391 struct stats_state {
392 uint64_t curr_items;
393 uint64_t curr_bytes;
394 uint64_t curr_conns;
395 uint64_t hash_bytes; /* size used for hash tables */
396 unsigned int conn_structs;
397 unsigned int reserved_fds;
398 unsigned int hash_power_level; /* Better hope it's not over 9000 */
399 unsigned int log_watchers; /* number of currently active watchers */
400 bool hash_is_expanding; /* If the hash table is being expanded */
401 bool accepting_conns; /* whether we are currently accepting */
402 bool slab_reassign_running; /* slab reassign in progress */
403 bool lru_crawler_running; /* crawl in progress */
404 };
405
406 #define MAX_VERBOSITY_LEVEL 2
407
408 /* When adding a setting, be sure to update process_stat_settings */
409 /**
410 * Globally accessible settings as derived from the commandline.
411 */
412 struct settings {
413 size_t maxbytes;
414 int maxconns;
415 int port;
416 int udpport;
417 char *inter;
418 int verbose;
419 rel_time_t oldest_live; /* ignore existing items older than this */
420 uint64_t oldest_cas; /* ignore existing items with CAS values lower than this */
421 int evict_to_free;
422 char *socketpath; /* path to unix socket if using local socket */
423 char *auth_file; /* path to user authentication file */
424 int access; /* access mask (a la chmod) for unix domain socket */
425 double factor; /* chunk size growth factor */
426 int chunk_size;
427 int num_threads; /* number of worker (without dispatcher) libevent threads to run */
428 int num_threads_per_udp; /* number of worker threads serving each udp socket */
429 char prefix_delimiter; /* character that marks a key prefix (for stats) */
430 int detail_enabled; /* nonzero if we're collecting detailed stats */
431 int reqs_per_event; /* Maximum number of io to process on each
432 io-event. */
433 bool use_cas;
434 enum protocol binding_protocol;
435 int backlog;
436 int item_size_max; /* Maximum item size */
437 int slab_chunk_size_max; /* Upper end for chunks within slab pages. */
438 int slab_page_size; /* Slab's page units. */
439 volatile sig_atomic_t sig_hup; /* a HUP signal was received but not yet handled */
440 bool sasl; /* SASL on/off */
441 bool maxconns_fast; /* Whether or not to early close connections */
442 bool lru_crawler; /* Whether or not to enable the autocrawler thread */
443 bool lru_maintainer_thread; /* LRU maintainer background thread */
444 bool lru_segmented; /* Use split or flat LRU's */
445 bool slab_reassign; /* Whether or not slab reassignment is allowed */
446 int slab_automove; /* Whether or not to automatically move slabs */
447 double slab_automove_ratio; /* youngest must be within pct of oldest */
448 unsigned int slab_automove_window; /* window mover for algorithm */
449 int hashpower_init; /* Starting hash power level */
450 bool shutdown_command; /* allow shutdown command */
451 int tail_repair_time; /* LRU tail refcount leak repair time */
452 bool flush_enabled; /* flush_all enabled */
453 bool dump_enabled; /* whether cachedump/metadump commands work */
454 char *hash_algorithm; /* Hash algorithm in use */
455 int lru_crawler_sleep; /* Microsecond sleep between items */
456 uint32_t lru_crawler_tocrawl; /* Number of items to crawl per run */
457 int hot_lru_pct; /* percentage of slab space for HOT_LRU */
458 int warm_lru_pct; /* percentage of slab space for WARM_LRU */
459 double hot_max_factor; /* HOT tail age relative to COLD tail */
460 double warm_max_factor; /* WARM tail age relative to COLD tail */
461 int crawls_persleep; /* Number of LRU crawls to run before sleeping */
462 bool temp_lru; /* TTL < temporary_ttl uses TEMP_LRU */
463 uint32_t temporary_ttl; /* temporary LRU threshold */
464 int idle_timeout; /* Number of seconds to let connections idle */
465 unsigned int logger_watcher_buf_size; /* size of logger's per-watcher buffer */
466 unsigned int logger_buf_size; /* size of per-thread logger buffer */
467 unsigned int read_buf_mem_limit; /* total megabytes allowable for net buffers */
468 bool drop_privileges; /* Whether or not to drop unnecessary process privileges */
469 bool watch_enabled; /* allows watch commands to be dropped */
470 bool relaxed_privileges; /* Relax process restrictions when running testapp */
471 bool meta_response_old; /* use "OK" instead of "HD". for response code TEMPORARY! */
472 #ifdef EXTSTORE
473 unsigned int ext_io_threadcount; /* number of IO threads to run. */
474 unsigned int ext_page_size; /* size in megabytes of storage pages. */
475 unsigned int ext_item_size; /* minimum size of items to store externally */
476 unsigned int ext_item_age; /* max age of tail item before storing ext. */
477 unsigned int ext_low_ttl; /* remaining TTL below this uses own pages */
478 unsigned int ext_recache_rate; /* counter++ % recache_rate == 0 > recache */
479 unsigned int ext_wbuf_size; /* read only note for the engine */
480 unsigned int ext_compact_under; /* when fewer than this many pages, compact */
481 unsigned int ext_drop_under; /* when fewer than this many pages, drop COLD items */
482 double ext_max_frag; /* ideal maximum page fragmentation */
483 double slab_automove_freeratio; /* % of memory to hold free as buffer */
484 bool ext_drop_unread; /* skip unread items during compaction */
485 /* per-slab-class free chunk limit */
486 unsigned int ext_free_memchunks[MAX_NUMBER_OF_SLAB_CLASSES];
487 #endif
488 #ifdef TLS
489 bool ssl_enabled; /* indicates whether SSL is enabled */
490 SSL_CTX *ssl_ctx; /* holds the SSL server context which has the server certificate */
491 char *ssl_chain_cert; /* path to the server SSL chain certificate */
492 char *ssl_key; /* path to the server key */
493 int ssl_verify_mode; /* client certificate verify mode */
494 int ssl_keyformat; /* key format , default is PEM */
495 char *ssl_ciphers; /* list of SSL ciphers */
496 char *ssl_ca_cert; /* certificate with CAs. */
497 rel_time_t ssl_last_cert_refresh_time; /* time of the last server certificate refresh */
498 unsigned int ssl_wbuf_size; /* size of the write buffer used by ssl_sendmsg method */
499 bool ssl_session_cache; /* enable SSL server session caching */
500 int ssl_min_version; /* minimum SSL protocol version to accept */
501 #endif
502 int num_napi_ids; /* maximum number of NAPI IDs */
503 char *memory_file; /* warm restart memory file path */
504 };
505
506 extern struct stats stats;
507 extern struct stats_state stats_state;
508 extern time_t process_started;
509 extern struct settings settings;
510
511 #define ITEM_LINKED 1
512 #define ITEM_CAS 2
513
514 /* temp */
515 #define ITEM_SLABBED 4
516
517 /* Item was fetched at least once in its lifetime */
518 #define ITEM_FETCHED 8
519 /* Appended on fetch, removed on LRU shuffling */
520 #define ITEM_ACTIVE 16
521 /* If an item's storage are chained chunks. */
522 #define ITEM_CHUNKED 32
523 #define ITEM_CHUNK 64
524 /* ITEM_data bulk is external to item */
525 #define ITEM_HDR 128
526 /* additional 4 bytes for item client flags */
527 #define ITEM_CFLAGS 256
528 /* item has sent out a token already */
529 #define ITEM_TOKEN_SENT 512
530 /* reserved, in case tokens should be a 2-bit count in future */
531 #define ITEM_TOKEN_RESERVED 1024
532 /* if item has been marked as a stale value */
533 #define ITEM_STALE 2048
534 /* if item key was sent in binary */
535 #define ITEM_KEY_BINARY 4096
536
537 /**
538 * Structure for storing items within memcached.
539 */
540 typedef struct _stritem {
541 /* Protected by LRU locks */
542 struct _stritem *next;
543 struct _stritem *prev;
544 /* Rest are protected by an item lock */
545 struct _stritem *h_next; /* hash chain next */
546 rel_time_t time; /* least recent access */
547 rel_time_t exptime; /* expire time */
548 int nbytes; /* size of data */
549 unsigned short refcount;
550 uint16_t it_flags; /* ITEM_* above */
551 uint8_t slabs_clsid;/* which slab class we're in */
552 uint8_t nkey; /* key length, w/terminating null and padding */
553 /* this odd type prevents type-punning issues when we do
554 * the little shuffle to save space when not using CAS. */
555 union {
556 uint64_t cas;
557 char end;
558 } data[];
559 /* if it_flags & ITEM_CAS we have 8 bytes CAS */
560 /* then null-terminated key */
561 /* then " flags length\r\n" (no terminating null) */
562 /* then data with terminating \r\n (no terminating null; it's binary!) */
563 } item;
564
565 // TODO: If we eventually want user loaded modules, we can't use an enum :(
566 enum crawler_run_type {
567 CRAWLER_AUTOEXPIRE=0, CRAWLER_EXPIRED, CRAWLER_METADUMP
568 };
569
570 typedef struct {
571 struct _stritem *next;
572 struct _stritem *prev;
573 struct _stritem *h_next; /* hash chain next */
574 rel_time_t time; /* least recent access */
575 rel_time_t exptime; /* expire time */
576 int nbytes; /* size of data */
577 unsigned short refcount;
578 uint16_t it_flags; /* ITEM_* above */
579 uint8_t slabs_clsid;/* which slab class we're in */
580 uint8_t nkey; /* key length, w/terminating null and padding */
581 uint32_t remaining; /* Max keys to crawl per slab per invocation */
582 uint64_t reclaimed; /* items reclaimed during this crawl. */
583 uint64_t unfetched; /* items reclaimed unfetched during this crawl. */
584 uint64_t checked; /* items examined during this crawl. */
585 } crawler;
586
587 /* Header when an item is actually a chunk of another item. */
588 typedef struct _strchunk {
589 struct _strchunk *next; /* points within its own chain. */
590 struct _strchunk *prev; /* can potentially point to the head. */
591 struct _stritem *head; /* always points to the owner chunk */
592 int size; /* available chunk space in bytes */
593 int used; /* chunk space used */
594 int nbytes; /* used. */
595 unsigned short refcount; /* used? */
596 uint16_t it_flags; /* ITEM_* above. */
597 uint8_t slabs_clsid; /* Same as above. */
598 uint8_t orig_clsid; /* For obj hdr chunks slabs_clsid is fake. */
599 char data[];
600 } item_chunk;
601
602 #ifdef NEED_ALIGN
ITEM_schunk(item * it)603 static inline char *ITEM_schunk(item *it) {
604 int offset = it->nkey + 1
605 + ((it->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0)
606 + ((it->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0);
607 int remain = offset % 8;
608 if (remain != 0) {
609 offset += 8 - remain;
610 }
611 return ((char *) &(it->data)) + offset;
612 }
613 #else
614 #define ITEM_schunk(item) ((char*) &((item)->data) + (item)->nkey + 1 \
615 + (((item)->it_flags & ITEM_CFLAGS) ? sizeof(uint32_t) : 0) \
616 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
617 #endif
618
619 #ifdef EXTSTORE
620 typedef struct {
621 unsigned int page_version; /* from IO header */
622 unsigned int offset; /* from IO header */
623 unsigned short page_id; /* from IO header */
624 } item_hdr;
625 #endif
626
627 #define IO_QUEUE_COUNT 3
628
629 #define IO_QUEUE_NONE 0
630 #define IO_QUEUE_EXTSTORE 1
631
632 typedef struct _io_pending_t io_pending_t;
633 typedef struct io_queue_s io_queue_t;
634 typedef void (*io_queue_stack_cb)(io_queue_t *q);
635 typedef void (*io_queue_cb)(io_pending_t *pending);
636 // this structure's ownership gets passed between threads:
637 // - owned normally by the worker thread.
638 // - multiple queues can be submitted at the same time.
639 // - each queue can be sent to different background threads.
640 // - each submitted queue needs to know when to return to the worker.
641 // - the worker needs to know when all queues have returned so it can process.
642 //
643 // io_queue_t's count field is owned by worker until submitted. Then owned by
644 // side thread until returned.
645 // conn->io_queues_submitted is always owned by the worker thread. it is
646 // incremented as the worker submits queues, and decremented as it gets pinged
647 // for returned threads.
648 //
649 // All of this is to avoid having to hit a mutex owned by the connection
650 // thread that gets pinged for each thread (or an equivalent atomic).
651 struct io_queue_s {
652 void *ctx; // duplicated from io_queue_cb_t
653 void *stack_ctx; // module-specific context to be batch-submitted
654 int count; // ios to process before returning. only accessed by queue processor once submitted
655 int type; // duplicated from io_queue_cb_t
656 };
657
658 typedef struct io_queue_cb_s {
659 void *ctx; // untouched ptr for specific context
660 io_queue_stack_cb submit_cb; // callback given a full stack of pending IO's at once.
661 io_queue_stack_cb complete_cb;
662 io_queue_cb return_cb; // called on worker thread.
663 io_queue_cb finalize_cb; // called back on the worker thread.
664 int type;
665 } io_queue_cb_t;
666
667 typedef struct _mc_resp_bundle mc_resp_bundle;
668 typedef struct {
669 pthread_t thread_id; /* unique ID of this thread */
670 struct event_base *base; /* libevent handle this thread uses */
671 struct event notify_event; /* listen event for notify pipe */
672 #ifdef HAVE_EVENTFD
673 int notify_event_fd; /* notify counter */
674 #else
675 int notify_receive_fd; /* receiving end of notify pipe */
676 int notify_send_fd; /* sending end of notify pipe */
677 #endif
678 struct thread_stats stats; /* Stats generated by this thread */
679 io_queue_cb_t io_queues[IO_QUEUE_COUNT];
680 struct conn_queue *ev_queue; /* Worker/conn event queue */
681 cache_t *rbuf_cache; /* static-sized read buffers */
682 mc_resp_bundle *open_bundle;
683 cache_t *io_cache; /* IO objects */
684 #ifdef EXTSTORE
685 void *storage; /* data object for storage system */
686 #endif
687 logger *l; /* logger buffer */
688 void *lru_bump_buf; /* async LRU bump buffer */
689 #ifdef TLS
690 char *ssl_wbuf;
691 #endif
692 int napi_id; /* napi id associated with this thread */
693
694 } LIBEVENT_THREAD;
695
696 /**
697 * Response objects
698 */
699 #define MC_RESP_IOVCOUNT 4
700 typedef struct _mc_resp {
701 mc_resp_bundle *bundle; // ptr back to bundle
702 struct _mc_resp *next; // choo choo.
703 int wbytes; // bytes to write out of wbuf: might be able to nuke this.
704 int tosend; // total bytes to send for this response
705 void *write_and_free; /** free this memory after finishing writing */
706 io_pending_t *io_pending; /* pending IO descriptor for this response */
707
708 item *item; /* item associated with this response object, with reference held */
709 struct iovec iov[MC_RESP_IOVCOUNT]; /* built-in iovecs to simplify network code */
710 int chunked_total; /* total amount of chunked item data to send. */
711 uint8_t iovcnt;
712 uint8_t chunked_data_iov; /* this iov is a pointer to chunked data header */
713
714 /* instruct transmit to skip this response object. used by storage engines
715 * to asynchronously kill an object that was queued to write
716 */
717 bool skip;
718 bool free; // double free detection.
719 // UDP bits. Copied in from the client.
720 uint16_t request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
721 uint16_t udp_sequence; /* packet counter when transmitting result */
722 uint16_t udp_total; /* total number of packets in sequence */
723 struct sockaddr_in6 request_addr; /* udp: Who sent this request */
724 socklen_t request_addr_size;
725
726 char wbuf[WRITE_BUFFER_SIZE];
727 } mc_resp;
728
729 #define MAX_RESP_PER_BUNDLE ((READ_BUFFER_SIZE - sizeof(mc_resp_bundle)) / sizeof(mc_resp))
730 struct _mc_resp_bundle {
731 uint8_t refcount;
732 uint8_t next_check; // next object to check on assignment.
733 struct _mc_resp_bundle *next;
734 struct _mc_resp_bundle *prev;
735 mc_resp r[];
736 };
737
738 typedef struct conn conn;
739
740 struct _io_pending_t {
741 int io_queue_type; // matches one of IO_QUEUE_*
742 LIBEVENT_THREAD *thread;
743 conn *c;
744 mc_resp *resp; // associated response object
745 char data[120];
746 };
747
748 /**
749 * The structure representing a connection into memcached.
750 */
751 struct conn {
752 sasl_conn_t *sasl_conn;
753 int sfd;
754 bool sasl_started;
755 bool authenticated;
756 bool set_stale;
757 bool mset_res; /** uses mset format for return code */
758 bool close_after_write; /** flush write then move to close connection */
759 bool rbuf_malloced; /** read buffer was malloc'ed for ascii mget, needs free() */
760 bool item_malloced; /** item for conn_nread state is a temporary malloc */
761 #ifdef TLS
762 SSL *ssl;
763 char *ssl_wbuf;
764 bool ssl_enabled;
765 #endif
766 enum conn_states state;
767 enum bin_substates substate;
768 rel_time_t last_cmd_time;
769 struct event event;
770 short ev_flags;
771 short which; /** which events were just triggered */
772
773 char *rbuf; /** buffer to read commands into */
774 char *rcurr; /** but if we parsed some already, this is where we stopped */
775 int rsize; /** total allocated size of rbuf */
776 int rbytes; /** how much data, starting from rcur, do we have unparsed */
777
778 mc_resp *resp; // tail response.
779 mc_resp *resp_head; // first response in current stack.
780 char *ritem; /** when we read in an item's value, it goes here */
781 int rlbytes;
782
783 /**
784 * item is used to hold an item structure created after reading the command
785 * line of set/add/replace commands, but before we finished reading the actual
786 * data. The data is read into ITEM_data(item) to avoid extra copying.
787 */
788
789 void *item; /* for commands set/add/replace */
790
791 /* data for the swallow state */
792 int sbytes; /* how many bytes to swallow */
793
794 int io_queues_submitted; /* see notes on io_queue_t */
795 io_queue_t io_queues[IO_QUEUE_COUNT]; /* set of deferred IO queues. */
796 #ifdef EXTSTORE
797 unsigned int recache_counter;
798 #endif
799 enum protocol protocol; /* which protocol this connection speaks */
800 enum network_transport transport; /* what transport is used by this connection */
801 enum close_reasons close_reason; /* reason for transition into conn_closing */
802
803 /* data for UDP clients */
804 int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
805 struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
806 socklen_t request_addr_size;
807
808 bool noreply; /* True if the reply should not be sent. */
809 /* current stats command */
810 struct {
811 char *buffer;
812 size_t size;
813 size_t offset;
814 } stats;
815
816 /* Binary protocol stuff */
817 /* This is where the binary header goes */
818 protocol_binary_request_header binary_header;
819 uint64_t cas; /* the cas to return */
820 short cmd; /* current command being processed */
821 int opaque;
822 int keylen;
823 conn *next; /* Used for generating a list of conn structures */
824 LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
825 int (*try_read_command)(conn *c); /* pointer for top level input parser */
826 ssize_t (*read)(conn *c, void *buf, size_t count);
827 ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
828 ssize_t (*write)(conn *c, void *buf, size_t count);
829 };
830
831 /* array of conn structures, indexed by file descriptor */
832 extern conn **conns;
833
834 /* current time of day (updated periodically) */
835 extern volatile rel_time_t current_time;
836
837 #ifdef MEMCACHED_DEBUG
838 extern volatile bool is_paused;
839 extern volatile int64_t delta;
840 #endif
841
842 /* TODO: Move to slabs.h? */
843 extern volatile int slab_rebalance_signal;
844
845 struct slab_rebalance {
846 void *slab_start;
847 void *slab_end;
848 void *slab_pos;
849 int s_clsid;
850 int d_clsid;
851 uint32_t busy_items;
852 uint32_t rescues;
853 uint32_t evictions_nomem;
854 uint32_t inline_reclaim;
855 uint32_t chunk_rescues;
856 uint32_t busy_deletes;
857 uint32_t busy_loops;
858 uint8_t done;
859 uint8_t *completed;
860 };
861
862 extern struct slab_rebalance slab_rebal;
863 #ifdef EXTSTORE
864 extern void *ext_storage;
865 #endif
866 /*
867 * Functions
868 */
869 void do_accept_new_conns(const bool do_accept);
870 enum delta_result_type do_add_delta(conn *c, const char *key,
871 const size_t nkey, const bool incr,
872 const int64_t delta, char *buf,
873 uint64_t *cas, const uint32_t hv,
874 item **it_ret);
875 enum store_item_type do_store_item(item *item, int comm, conn* c, const uint32_t hv);
876 void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb);
877 void conn_io_queue_setup(conn *c);
878 io_queue_t *conn_io_queue_get(conn *c, int type);
879 io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
880 void conn_io_queue_return(io_pending_t *io);
881 conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
882 enum network_transport transport, struct event_base *base, void *ssl);
883
884 void conn_worker_readd(conn *c);
885 extern int daemonize(int nochdir, int noclose);
886
887 #define mutex_lock(x) pthread_mutex_lock(x)
888 #define mutex_unlock(x) pthread_mutex_unlock(x)
889
890 #include "stats_prefix.h"
891 #include "slabs.h"
892 #include "assoc.h"
893 #include "items.h"
894 #include "crawler.h"
895 #include "trace.h"
896 #include "hash.h"
897 #include "util.h"
898
899 /*
900 * Functions such as the libevent-related calls that need to do cross-thread
901 * communication in multithreaded mode (rather than actually doing the work
902 * in the current thread) are called via "dispatch_" frontends, which are
903 * also #define-d to directly call the underlying code in singlethreaded mode.
904 */
905 void memcached_thread_init(int nthreads, void *arg);
906 void redispatch_conn(conn *c);
907 void timeout_conn(conn *c);
908 void return_io_pending(io_pending_t *io);
909 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
910 enum network_transport transport, void *ssl);
911 void sidethread_conn_close(conn *c);
912
913 /* Lock wrappers for cache functions that are called from main loop. */
914 enum delta_result_type add_delta(conn *c, const char *key,
915 const size_t nkey, bool incr,
916 const int64_t delta, char *buf,
917 uint64_t *cas);
918 void accept_new_conns(const bool do_accept);
919 void conn_close_idle(conn *c);
920 void conn_close_all(void);
921 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
922 #define DO_UPDATE true
923 #define DONT_UPDATE false
924 item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update);
925 item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv);
926 item *item_touch(const char *key, const size_t nkey, uint32_t exptime, conn *c);
927 int item_link(item *it);
928 void item_remove(item *it);
929 int item_replace(item *it, item *new_it, const uint32_t hv);
930 void item_unlink(item *it);
931
932 void item_lock(uint32_t hv);
933 void *item_trylock(uint32_t hv);
934 void item_trylock_unlock(void *arg);
935 void item_unlock(uint32_t hv);
936 void pause_threads(enum pause_thread_types type);
937 void stop_threads(void);
938 int stop_conn_timeout_thread(void);
939 #define refcount_incr(it) ++(it->refcount)
940 #define refcount_decr(it) --(it->refcount)
941 void STATS_LOCK(void);
942 void STATS_UNLOCK(void);
943 #define THR_STATS_LOCK(c) pthread_mutex_lock(&c->thread->stats.mutex)
944 #define THR_STATS_UNLOCK(c) pthread_mutex_unlock(&c->thread->stats.mutex)
945 void threadlocal_stats_reset(void);
946 void threadlocal_stats_aggregate(struct thread_stats *stats);
947 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
948
949 /* Stat processing functions */
950 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
951 const char *fmt, ...);
952
953 enum store_item_type store_item(item *item, int comm, conn *c);
954
955 /* Protocol related code */
956 void out_string(conn *c, const char *str);
957 #define REALTIME_MAXDELTA 60*60*24*30
958 /* Negative exptimes can underflow and end up immortal. realtime() will
959 immediately expire values that are greater than REALTIME_MAXDELTA, but less
960 than process_started, so lets aim for that. */
961 #define EXPTIME_TO_POSITIVE_TIME(exptime) (exptime < 0) ? \
962 REALTIME_MAXDELTA + 1 : exptime
963 rel_time_t realtime(const time_t exptime);
964 item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch, bool do_update, bool *overflow);
965 item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32_t *hv, bool *overflow);
966 // Read/Response object handlers.
967 void resp_reset(mc_resp *resp);
968 void resp_add_iov(mc_resp *resp, const void *buf, int len);
969 void resp_add_chunked_iov(mc_resp *resp, const void *buf, int len);
970 bool resp_start(conn *c);
971 mc_resp* resp_finish(conn *c, mc_resp *resp);
972 bool resp_has_stack(conn *c);
973 bool rbuf_switch_to_malloc(conn *c);
974 void conn_release_items(conn *c);
975 void conn_set_state(conn *c, enum conn_states state);
976 void out_of_memory(conn *c, char *ascii_error);
977 void out_errstring(conn *c, const char *str);
978 void write_and_free(conn *c, char *buf, int bytes);
979 void server_stats(ADD_STAT add_stats, conn *c);
980 void append_stats(const char *key, const uint16_t klen,
981 const char *val, const uint32_t vlen,
982 const void *cookie);
983 /** Return a datum for stats in binary protocol */
984 bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c);
985 void stats_reset(void);
986 void process_stat_settings(ADD_STAT add_stats, void *c);
987 void process_stats_conns(ADD_STAT add_stats, void *c);
988
989 #if HAVE_DROP_PRIVILEGES
990 extern void setup_privilege_violations_handler(void);
991 extern void drop_privileges(void);
992 #else
993 #define setup_privilege_violations_handler()
994 #define drop_privileges()
995 #endif
996
997 #if HAVE_DROP_WORKER_PRIVILEGES
998 extern void drop_worker_privileges(void);
999 #else
1000 #define drop_worker_privileges()
1001 #endif
1002
1003 /* If supported, give compiler hints for branch prediction. */
1004 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
1005 #define __builtin_expect(x, expected_value) (x)
1006 #endif
1007
1008 #define likely(x) __builtin_expect((x),1)
1009 #define unlikely(x) __builtin_expect((x),0)
1010