1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 #ifndef MEMCACHED_H 3 #define MEMCACHED_H 4 5 /** \file 6 * The main memcached header holding commonly used data 7 * structures and function prototypes. 8 */ 9 #ifdef HAVE_LIBEVENT2 10 #include <event2/event.h> 11 #include <event2/event_struct.h> 12 #include <event2/event_compat.h> 13 #elif HAVE_LIBEVENT1 14 #include <event.h> 15 #else 16 #error "No libevent library found" 17 #endif /* HAVE_LIBEVENT2 */ 18 19 #include <pthread.h> 20 #include <config_static.h> 21 22 #include <memcached/protocol_binary.h> 23 #include <memcached/engine.h> 24 #include <memcached/extension.h> 25 26 #include "cache.h" 27 #include "topkeys.h" 28 29 #include "sasl_defs.h" 30 31 /** Maximum length of a key. */ 32 #define KEY_MAX_LENGTH 250 33 34 /** Size of an incr buf. */ 35 #define INCR_MAX_STORAGE_LEN 24 36 37 #define DATA_BUFFER_SIZE 2048 38 #define UDP_READ_BUFFER_SIZE 65536 39 #define UDP_MAX_PAYLOAD_SIZE 1400 40 #define UDP_HEADER_SIZE 8 41 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) 42 /* I'm told the max length of a 64-bit num converted to string is 20 bytes. 43 * Plus a few for spaces, \r\n, \0 */ 44 #define SUFFIX_SIZE 24 45 46 /** Initial size of list of items being returned by "get". */ 47 #define ITEM_LIST_INITIAL 200 48 49 /** Initial size of list of CAS suffixes appended to "gets" lines. */ 50 #define SUFFIX_LIST_INITIAL 20 51 52 /** Initial size of the sendmsg() scatter/gather array. */ 53 #define IOV_LIST_INITIAL 400 54 55 /** Initial number of sendmsg() argument structures to allocate. */ 56 #define MSG_LIST_INITIAL 10 57 58 /** High water marks for buffer shrinking */ 59 #define READ_BUFFER_HIGHWAT 8192 60 #define ITEM_LIST_HIGHWAT 400 61 #define IOV_LIST_HIGHWAT 600 62 #define MSG_LIST_HIGHWAT 100 63 64 /* Binary protocol stuff */ 65 #define MIN_BIN_PKT_LENGTH 16 66 #define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t)) 67 68 /* Slab sizing definitions. */ 69 #define POWER_SMALLEST 1 70 #define POWER_LARGEST 200 71 #define CHUNK_ALIGN_BYTES 8 72 #define DONT_PREALLOC_SLABS 73 #define MAX_NUMBER_OF_SLAB_CLASSES (POWER_LARGEST + 1) 74 75 76 #define STAT_KEY_LEN 128 77 #define STAT_VAL_LEN 128 78 79 #define DEFAULT_REQS_PER_EVENT 20 80 #define DEFAULT_REQS_PER_TAP_EVENT 50 81 82 /** Append a simple stat with a stat name, value format and value */ 83 #define APPEND_STAT(name, fmt, val) \ 84 append_stat(name, add_stats, c, fmt, val); 85 86 /** Append an indexed stat with a stat name (with format), value format 87 and value */ 88 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val) \ 89 klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name); \ 90 vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val); \ 91 add_stats(key_str, klen, val_str, vlen, c); 92 93 /** Common APPEND_NUM_FMT_STAT format. */ 94 #define APPEND_NUM_STAT(num, name, fmt, val) \ 95 APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val) 96 97 enum bin_substates { 98 bin_no_state, 99 bin_reading_set_header, 100 bin_reading_cas_header, 101 bin_read_set_value, 102 bin_reading_get_key, 103 bin_reading_stat, 104 bin_reading_del_header, 105 bin_reading_incr_header, 106 bin_read_flush_exptime, 107 bin_reading_sasl_auth, 108 bin_reading_sasl_auth_data, 109 bin_reading_packet 110 }; 111 112 enum protocol { 113 ascii_prot = 3, /* arbitrary value. */ 114 binary_prot, 115 negotiating_prot /* Discovering the protocol */ 116 }; 117 118 enum network_transport { 119 local_transport, /* Unix sockets*/ 120 tcp_transport, 121 udp_transport 122 }; 123 124 #define IS_UDP(x) (x == udp_transport) 125 126 /** Stats stored per slab (and per thread). */ 127 struct slab_stats { 128 uint64_t cmd_set; 129 uint64_t get_hits; 130 uint64_t delete_hits; 131 uint64_t cas_hits; 132 uint64_t cas_badval; 133 }; 134 135 /** 136 * Stats stored per-thread. 137 */ 138 struct thread_stats { 139 pthread_mutex_t mutex; 140 uint64_t cmd_get; 141 uint64_t get_misses; 142 uint64_t delete_misses; 143 uint64_t incr_misses; 144 uint64_t decr_misses; 145 uint64_t incr_hits; 146 uint64_t decr_hits; 147 uint64_t cas_misses; 148 uint64_t bytes_read; 149 uint64_t bytes_written; 150 uint64_t cmd_flush; 151 uint64_t conn_yields; /* # of yields for connections (-R option)*/ 152 uint64_t auth_cmds; 153 uint64_t auth_errors; 154 struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES]; 155 }; 156 157 158 /** 159 * The stats structure the engine keeps track of 160 */ 161 struct independent_stats { 162 topkeys_t *topkeys; 163 struct thread_stats thread_stats[]; 164 }; 165 166 /** 167 * Global stats. 168 */ 169 struct stats { 170 pthread_mutex_t mutex; 171 unsigned int daemon_conns; /* conns used by the server */ 172 unsigned int curr_conns; 173 unsigned int total_conns; 174 unsigned int conn_structs; 175 time_t started; /* when the process was started */ 176 uint64_t rejected_conns; /* number of times I reject a client */ 177 }; 178 179 #define MAX_VERBOSITY_LEVEL 2 180 181 /* When adding a setting, be sure to update process_stat_settings */ 182 /** 183 * Globally accessible settings as derived from the commandline. 184 */ 185 struct settings { 186 size_t maxbytes; 187 int maxconns; 188 int port; 189 int udpport; 190 char *inter; 191 int verbose; 192 rel_time_t oldest_live; /* ignore existing items older than this */ 193 int evict_to_free; 194 char *socketpath; /* path to unix socket if using local socket */ 195 int access; /* access mask (a la chmod) for unix domain socket */ 196 double factor; /* chunk size growth factor */ 197 int chunk_size; 198 int num_threads; /* number of worker (without dispatcher) libevent threads to run */ 199 int num_threads_per_udp; /* number of worker threads serving each udp socket */ 200 char prefix_delimiter; /* character that marks a key prefix (for stats) */ 201 int detail_enabled; /* nonzero if we're collecting detailed stats */ 202 bool allow_detailed; /* detailed stats commands are allowed */ 203 int reqs_per_event; /* Maximum number of io to process on each 204 io-event. */ 205 int reqs_per_tap_event; /* Maximum number of tap io to process on each 206 io-event. */ 207 bool use_cas; 208 enum protocol binding_protocol; 209 int backlog; 210 size_t item_size_max; /* Maximum item size, and upper end for slabs */ 211 bool sasl; /* SASL on/off */ 212 bool require_sasl; /* require SASL auth */ 213 int topkeys; /* Number of top keys to track */ 214 union { 215 ENGINE_HANDLE *v0; 216 ENGINE_HANDLE_V1 *v1; 217 } engine; 218 struct { 219 EXTENSION_DAEMON_DESCRIPTOR *daemons; 220 EXTENSION_LOGGER_DESCRIPTOR *logger; 221 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ascii; 222 } extensions; 223 }; 224 225 struct engine_event_handler { 226 EVENT_CALLBACK cb; 227 const void *cb_data; 228 struct engine_event_handler *next; 229 }; 230 231 extern struct stats stats; 232 extern struct settings settings; 233 234 enum thread_type { 235 GENERAL = 11, 236 TAP = 13, 237 DISPATCHER = 15 238 }; 239 240 typedef struct { 241 pthread_t thread_id; /* unique ID of this thread */ 242 struct event_base *base; /* libevent handle this thread uses */ 243 struct event notify_event; /* listen event for notify pipe */ 244 SOCKET notify[2]; /* notification pipes */ 245 struct conn_queue *new_conn_queue; /* queue of new connections to handle */ 246 cache_t *suffix_cache; /* suffix cache */ 247 pthread_mutex_t mutex; /* Mutex to lock protect access to the pending_io */ 248 bool is_locked; 249 struct conn *pending_io; /* List of connection with pending async io ops */ 250 int index; /* index of this thread in the threads array */ 251 enum thread_type type; /* Type of IO this thread processes */ 252 253 rel_time_t last_checked; 254 struct conn *pending_close; /* list of connections close at a later time */ 255 } LIBEVENT_THREAD; 256 257 #define LOCK_THREAD(t) \ 258 if (pthread_mutex_lock(&t->mutex) != 0) { \ 259 abort(); \ 260 } \ 261 assert(t->is_locked == false); \ 262 t->is_locked = true; 263 264 #define UNLOCK_THREAD(t) \ 265 assert(t->is_locked == true); \ 266 t->is_locked = false; \ 267 if (pthread_mutex_unlock(&t->mutex) != 0) { \ 268 abort(); \ 269 } 270 271 extern void notify_thread(LIBEVENT_THREAD *thread); 272 extern void notify_dispatcher(void); 273 extern bool create_notification_pipe(LIBEVENT_THREAD *me); 274 275 extern LIBEVENT_THREAD* tap_thread; 276 277 typedef struct conn conn; 278 typedef bool (*STATE_FUNC)(conn *); 279 280 /** 281 * The structure representing a connection into memcached. 282 */ 283 struct conn { 284 SOCKET sfd; 285 int nevents; 286 sasl_conn_t *sasl_conn; 287 STATE_FUNC state; 288 enum bin_substates substate; 289 #ifdef DEBUG 290 bool registered_in_libevent; 291 #endif 292 struct event event; 293 short ev_flags; 294 short which; /** which events were just triggered */ 295 296 char *rbuf; /** buffer to read commands into */ 297 char *rcurr; /** but if we parsed some already, this is where we stopped */ 298 uint32_t rsize; /** total allocated size of rbuf */ 299 uint32_t rbytes; /** how much data, starting from rcur, do we have unparsed */ 300 301 char *wbuf; 302 char *wcurr; 303 uint32_t wsize; 304 uint32_t wbytes; 305 /** which state to go into after finishing current write */ 306 STATE_FUNC write_and_go; 307 void *write_and_free; /** free this memory after finishing writing */ 308 309 char *ritem; /** when we read in an item's value, it goes here */ 310 uint32_t rlbytes; 311 312 /* data for the nread state */ 313 314 /** 315 * item is used to hold an item structure created after reading the command 316 * line of set/add/replace commands, but before we finished reading the actual 317 * data. The data is read into ITEM_data(item) to avoid extra copying. 318 */ 319 320 void *item; /* for commands set/add/replace */ 321 ENGINE_STORE_OPERATION store_op; /* which one is it: set/add/replace */ 322 323 324 /* data for the swallow state */ 325 int sbytes; /* how many bytes to swallow */ 326 327 /* data for the mwrite state */ 328 struct iovec *iov; 329 int iovsize; /* number of elements allocated in iov[] */ 330 int iovused; /* number of elements used in iov[] */ 331 332 struct msghdr *msglist; 333 int msgsize; /* number of elements allocated in msglist[] */ 334 int msgused; /* number of elements used in msglist[] */ 335 int msgcurr; /* element in msglist[] being transmitted now */ 336 int msgbytes; /* number of bytes in current msg */ 337 338 item **ilist; /* list of items to write out */ 339 int isize; 340 item **icurr; 341 int ileft; 342 343 char **suffixlist; 344 int suffixsize; 345 char **suffixcurr; 346 int suffixleft; 347 348 enum protocol protocol; /* which protocol this connection speaks */ 349 enum network_transport transport; /* what transport is used by this connection */ 350 351 /* data for UDP clients */ 352 int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ 353 struct sockaddr_storage request_addr; /* Who sent the most recent request */ 354 socklen_t request_addr_size; 355 unsigned char *hdrbuf; /* udp packet headers */ 356 int hdrsize; /* number of headers' worth of space is allocated */ 357 358 bool noreply; /* True if the reply should not be sent. */ 359 /* current stats command */ 360 361 uint8_t refcount; /* number of references to the object */ 362 363 struct { 364 char *buffer; 365 size_t size; 366 size_t offset; 367 } dynamic_buffer; 368 369 void *engine_storage; 370 371 /** Current ascii protocol */ 372 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ascii_cmd; 373 374 375 /* Binary protocol stuff */ 376 /* This is where the binary header goes */ 377 protocol_binary_request_header binary_header; 378 uint64_t cas; /* the cas to return */ 379 short cmd; /* current command being processed */ 380 int opaque; 381 int keylen; 382 383 int list_state; /* bitmask of list state data for this connection */ 384 conn *next; /* Used for generating a list of conn structures */ 385 LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ 386 387 ENGINE_ERROR_CODE aiostat; 388 bool ewouldblock; 389 bool tap_nack_mode; 390 TAP_ITERATOR tap_iterator; 391 }; 392 393 /* States for the connection list_state */ 394 #define LIST_STATE_PROCESSING 1 395 #define LIST_STATE_REQ_PENDING_IO 2 396 #define LIST_STATE_REQ_PENDING_CLOSE 4 397 398 /* 399 * Functions 400 */ 401 conn *conn_new(const SOCKET sfd, STATE_FUNC init_state, const int event_flags, 402 const int read_buffer_size, enum network_transport transport, 403 struct event_base *base, struct timeval *timeout); 404 #ifndef WIN32 405 extern int daemonize(int nochdir, int noclose); 406 #endif 407 408 #include "stats.h" 409 #include "trace.h" 410 #include "hash.h" 411 #include <memcached/util.h> 412 413 /* 414 * Functions to add / update the connection to libevent 415 */ 416 bool register_event(conn *c, struct timeval *timeout); 417 bool unregister_event(conn *c); 418 bool update_event(conn *c, const int new_flags); 419 420 /* 421 * Functions such as the libevent-related calls that need to do cross-thread 422 * communication in multithreaded mode (rather than actually doing the work 423 * in the current thread) are called via "dispatch_" frontends, which are 424 * also #define-d to directly call the underlying code in singlethreaded mode. 425 */ 426 427 void thread_init(int nthreads, struct event_base *main_base, 428 void (*dispatcher_callback)(int, short, void *)); 429 void threads_shutdown(void); 430 431 int dispatch_event_add(int thread, conn *c); 432 void dispatch_conn_new(SOCKET sfd, STATE_FUNC init_state, int event_flags, 433 int read_buffer_size, enum network_transport transport); 434 435 /* Lock wrappers for cache functions that are called from main loop. */ 436 void accept_new_conns(const bool do_accept); 437 conn *conn_from_freelist(void); 438 bool conn_add_to_freelist(conn *c); 439 int is_listen_thread(void); 440 441 void STATS_LOCK(void); 442 void STATS_UNLOCK(void); 443 void threadlocal_stats_clear(struct thread_stats *stats); 444 void threadlocal_stats_reset(struct thread_stats *thread_stats); 445 void threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats); 446 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out); 447 448 /* Stat processing functions */ 449 void append_stat(const char *name, ADD_STAT add_stats, conn *c, 450 const char *fmt, ...); 451 452 void notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status); 453 void conn_set_state(conn *c, STATE_FUNC state); 454 const char *state_text(STATE_FUNC state); 455 void safe_close(SOCKET sfd); 456 457 458 // Number of times this connection is in the given pending list 459 int number_of_pending(conn *c, conn *pending); 460 bool has_cycle(conn *c); 461 bool list_contains(conn *h, conn *n); 462 conn *list_remove(conn *h, conn *n); 463 size_t list_to_array(conn **dest, size_t max_items, conn **l); 464 void enlist_conn(conn *c, conn **list); 465 void finalize_list(conn **list, size_t items); 466 bool set_socket_nonblocking(SOCKET sfd); 467 468 void conn_close(conn *c); 469 470 471 #if HAVE_DROP_PRIVILEGES 472 extern void drop_privileges(void); 473 #else 474 #define drop_privileges() 475 #endif 476 477 /* connection state machine */ 478 bool conn_listening(conn *c); 479 bool conn_new_cmd(conn *c); 480 bool conn_waiting(conn *c); 481 bool conn_read(conn *c); 482 bool conn_parse_cmd(conn *c); 483 bool conn_write(conn *c); 484 bool conn_nread(conn *c); 485 bool conn_swallow(conn *c); 486 bool conn_pending_close(conn *c); 487 bool conn_immediate_close(conn *c); 488 bool conn_closing(conn *c); 489 bool conn_mwrite(conn *c); 490 bool conn_ship_log(conn *c); 491 bool conn_add_tap_client(conn *c); 492 bool conn_setup_tap_stream(conn *c); 493 494 /* If supported, give compiler hints for branch prediction. */ 495 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96) 496 #define __builtin_expect(x, expected_value) (x) 497 #endif 498 499 #define likely(x) __builtin_expect((x),1) 500 #define unlikely(x) __builtin_expect((x),0) 501 #endif 502