1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 #ifndef MEMCACHED_H 3 #define MEMCACHED_H 4 5 /** \file 6 * The main memcached header holding commonly used data 7 * structures and function prototypes. 8 */ 9 #include <event.h> 10 #include <pthread.h> 11 12 #include <memcached/protocol_binary.h> 13 #include <memcached/engine.h> 14 #include <memcached/extension.h> 15 16 #include "cache.h" 17 #include "topkeys.h" 18 19 #include "sasl_defs.h" 20 21 /** Maximum length of a key. */ 22 #define KEY_MAX_LENGTH 250 23 24 /** Size of an incr buf. */ 25 #define INCR_MAX_STORAGE_LEN 24 26 27 #define DATA_BUFFER_SIZE 2048 28 #define UDP_READ_BUFFER_SIZE 65536 29 #define UDP_MAX_PAYLOAD_SIZE 1400 30 #define UDP_HEADER_SIZE 8 31 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) 32 /* I'm told the max length of a 64-bit num converted to string is 20 bytes. 33 * Plus a few for spaces, \r\n, \0 */ 34 #define SUFFIX_SIZE 24 35 36 /** Initial size of list of items being returned by "get". */ 37 #define ITEM_LIST_INITIAL 200 38 39 /** Initial size of list of CAS suffixes appended to "gets" lines. */ 40 #define SUFFIX_LIST_INITIAL 20 41 42 /** Initial size of the sendmsg() scatter/gather array. */ 43 #define IOV_LIST_INITIAL 400 44 45 /** Initial number of sendmsg() argument structures to allocate. */ 46 #define MSG_LIST_INITIAL 10 47 48 /** High water marks for buffer shrinking */ 49 #define READ_BUFFER_HIGHWAT 8192 50 #define ITEM_LIST_HIGHWAT 400 51 #define IOV_LIST_HIGHWAT 600 52 #define MSG_LIST_HIGHWAT 100 53 54 /* Binary protocol stuff */ 55 #define MIN_BIN_PKT_LENGTH 16 56 #define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t)) 57 58 /* Slab sizing definitions. */ 59 #define POWER_SMALLEST 1 60 #define POWER_LARGEST 200 61 #define CHUNK_ALIGN_BYTES 8 62 #define DONT_PREALLOC_SLABS 63 #define MAX_NUMBER_OF_SLAB_CLASSES (POWER_LARGEST + 1) 64 65 66 #define STAT_KEY_LEN 128 67 #define STAT_VAL_LEN 128 68 69 #define DEFAULT_REQS_PER_EVENT 20 70 #define DEFAULT_REQS_PER_TAP_EVENT 50 71 72 /** Append a simple stat with a stat name, value format and value */ 73 #define APPEND_STAT(name, fmt, val) \ 74 append_stat(name, add_stats, c, fmt, val); 75 76 /** Append an indexed stat with a stat name (with format), value format 77 and value */ 78 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val) \ 79 klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name); \ 80 vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val); \ 81 add_stats(key_str, klen, val_str, vlen, c); 82 83 /** Common APPEND_NUM_FMT_STAT format. */ 84 #define APPEND_NUM_STAT(num, name, fmt, val) \ 85 APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val) 86 87 enum bin_substates { 88 bin_no_state, 89 bin_reading_set_header, 90 bin_reading_cas_header, 91 bin_read_set_value, 92 bin_reading_get_key, 93 bin_reading_stat, 94 bin_reading_del_header, 95 bin_reading_incr_header, 96 bin_read_flush_exptime, 97 bin_reading_sasl_auth, 98 bin_reading_sasl_auth_data, 99 bin_reading_packet 100 }; 101 102 enum protocol { 103 ascii_prot = 3, /* arbitrary value. */ 104 binary_prot, 105 negotiating_prot /* Discovering the protocol */ 106 }; 107 108 enum network_transport { 109 local_transport, /* Unix sockets*/ 110 tcp_transport, 111 udp_transport 112 }; 113 114 #define IS_UDP(x) (x == udp_transport) 115 116 /** Stats stored per slab (and per thread). */ 117 struct slab_stats { 118 uint64_t cmd_set; 119 uint64_t get_hits; 120 uint64_t delete_hits; 121 uint64_t cas_hits; 122 uint64_t cas_badval; 123 }; 124 125 /** 126 * Stats stored per-thread. 127 */ 128 struct thread_stats { 129 pthread_mutex_t mutex; 130 uint64_t cmd_get; 131 uint64_t get_misses; 132 uint64_t delete_misses; 133 uint64_t incr_misses; 134 uint64_t decr_misses; 135 uint64_t incr_hits; 136 uint64_t decr_hits; 137 uint64_t cas_misses; 138 uint64_t bytes_read; 139 uint64_t bytes_written; 140 uint64_t cmd_flush; 141 uint64_t conn_yields; /* # of yields for connections (-R option)*/ 142 uint64_t auth_cmds; 143 uint64_t auth_errors; 144 struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES]; 145 }; 146 147 148 /** 149 * The stats structure the engine keeps track of 150 */ 151 struct independent_stats { 152 topkeys_t *topkeys; 153 struct thread_stats thread_stats[]; 154 }; 155 156 /** 157 * Global stats. 158 */ 159 struct stats { 160 pthread_mutex_t mutex; 161 unsigned int daemon_conns; /* conns used by the server */ 162 unsigned int curr_conns; 163 unsigned int total_conns; 164 unsigned int conn_structs; 165 time_t started; /* when the process was started */ 166 uint64_t rejected_conns; /* number of times I reject a client */ 167 }; 168 169 #define MAX_VERBOSITY_LEVEL 2 170 171 /* When adding a setting, be sure to update process_stat_settings */ 172 /** 173 * Globally accessible settings as derived from the commandline. 174 */ 175 struct settings { 176 size_t maxbytes; 177 int maxconns; 178 int port; 179 int udpport; 180 char *inter; 181 int verbose; 182 rel_time_t oldest_live; /* ignore existing items older than this */ 183 int evict_to_free; 184 char *socketpath; /* path to unix socket if using local socket */ 185 int access; /* access mask (a la chmod) for unix domain socket */ 186 double factor; /* chunk size growth factor */ 187 int chunk_size; 188 int num_threads; /* number of worker (without dispatcher) libevent threads to run */ 189 int num_threads_per_udp; /* number of worker threads serving each udp socket */ 190 char prefix_delimiter; /* character that marks a key prefix (for stats) */ 191 int detail_enabled; /* nonzero if we're collecting detailed stats */ 192 bool allow_detailed; /* detailed stats commands are allowed */ 193 int reqs_per_event; /* Maximum number of io to process on each 194 io-event. */ 195 int reqs_per_tap_event; /* Maximum number of tap io to process on each 196 io-event. */ 197 bool use_cas; 198 enum protocol binding_protocol; 199 int backlog; 200 size_t item_size_max; /* Maximum item size, and upper end for slabs */ 201 bool sasl; /* SASL on/off */ 202 bool require_sasl; /* require SASL auth */ 203 int topkeys; /* Number of top keys to track */ 204 union { 205 ENGINE_HANDLE *v0; 206 ENGINE_HANDLE_V1 *v1; 207 } engine; 208 struct { 209 EXTENSION_DAEMON_DESCRIPTOR *daemons; 210 EXTENSION_LOGGER_DESCRIPTOR *logger; 211 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ascii; 212 EXTENSION_BINARY_PROTOCOL_DESCRIPTOR *binary; 213 } extensions; 214 }; 215 216 struct engine_event_handler { 217 EVENT_CALLBACK cb; 218 const void *cb_data; 219 struct engine_event_handler *next; 220 }; 221 222 extern struct stats stats; 223 extern struct settings settings; 224 225 enum thread_type { 226 GENERAL = 11, 227 TAP = 13, 228 DISPATCHER = 15 229 }; 230 231 typedef struct { 232 pthread_t thread_id; /* unique ID of this thread */ 233 struct event_base *base; /* libevent handle this thread uses */ 234 struct event notify_event; /* listen event for notify pipe */ 235 SOCKET notify[2]; /* notification pipes */ 236 struct conn_queue *new_conn_queue; /* queue of new connections to handle */ 237 cache_t *suffix_cache; /* suffix cache */ 238 pthread_mutex_t mutex; /* Mutex to lock protect access to the pending_io */ 239 bool is_locked; 240 struct conn *pending_io; /* List of connection with pending async io ops */ 241 int index; /* index of this thread in the threads array */ 242 enum thread_type type; /* Type of IO this thread processes */ 243 244 rel_time_t last_checked; 245 struct conn *pending_close; /* list of connections close at a later time */ 246 } LIBEVENT_THREAD; 247 248 #define LOCK_THREAD(t) \ 249 if (pthread_mutex_lock(&t->mutex) != 0) { \ 250 abort(); \ 251 } \ 252 assert(t->is_locked == false); \ 253 t->is_locked = true; 254 255 #define UNLOCK_THREAD(t) \ 256 assert(t->is_locked == true); \ 257 t->is_locked = false; \ 258 if (pthread_mutex_unlock(&t->mutex) != 0) { \ 259 abort(); \ 260 } 261 262 extern void notify_thread(LIBEVENT_THREAD *thread); 263 extern void notify_dispatcher(void); 264 extern bool create_notification_pipe(LIBEVENT_THREAD *me); 265 266 extern LIBEVENT_THREAD* tap_thread; 267 268 typedef struct conn conn; 269 typedef bool (*STATE_FUNC)(conn *); 270 271 /** 272 * The structure representing a connection into memcached. 273 */ 274 struct conn { 275 SOCKET sfd; 276 int nevents; 277 sasl_conn_t *sasl_conn; 278 STATE_FUNC state; 279 enum bin_substates substate; 280 bool registered_in_libevent; 281 struct event event; 282 short ev_flags; 283 short which; /** which events were just triggered */ 284 285 char *rbuf; /** buffer to read commands into */ 286 char *rcurr; /** but if we parsed some already, this is where we stopped */ 287 uint32_t rsize; /** total allocated size of rbuf */ 288 uint32_t rbytes; /** how much data, starting from rcur, do we have unparsed */ 289 290 char *wbuf; 291 char *wcurr; 292 uint32_t wsize; 293 uint32_t wbytes; 294 /** which state to go into after finishing current write */ 295 STATE_FUNC write_and_go; 296 void *write_and_free; /** free this memory after finishing writing */ 297 298 char *ritem; /** when we read in an item's value, it goes here */ 299 uint32_t rlbytes; 300 301 /* data for the nread state */ 302 303 /** 304 * item is used to hold an item structure created after reading the command 305 * line of set/add/replace commands, but before we finished reading the actual 306 * data. The data is read into ITEM_data(item) to avoid extra copying. 307 */ 308 309 void *item; /* for commands set/add/replace */ 310 ENGINE_STORE_OPERATION store_op; /* which one is it: set/add/replace */ 311 312 313 /* data for the swallow state */ 314 int sbytes; /* how many bytes to swallow */ 315 316 /* data for the mwrite state */ 317 struct iovec *iov; 318 int iovsize; /* number of elements allocated in iov[] */ 319 int iovused; /* number of elements used in iov[] */ 320 321 struct msghdr *msglist; 322 int msgsize; /* number of elements allocated in msglist[] */ 323 int msgused; /* number of elements used in msglist[] */ 324 int msgcurr; /* element in msglist[] being transmitted now */ 325 int msgbytes; /* number of bytes in current msg */ 326 327 item **ilist; /* list of items to write out */ 328 int isize; 329 item **icurr; 330 int ileft; 331 332 char **suffixlist; 333 int suffixsize; 334 char **suffixcurr; 335 int suffixleft; 336 337 enum protocol protocol; /* which protocol this connection speaks */ 338 enum network_transport transport; /* what transport is used by this connection */ 339 340 /* data for UDP clients */ 341 int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ 342 struct sockaddr_storage request_addr; /* Who sent the most recent request */ 343 socklen_t request_addr_size; 344 unsigned char *hdrbuf; /* udp packet headers */ 345 int hdrsize; /* number of headers' worth of space is allocated */ 346 347 bool noreply; /* True if the reply should not be sent. */ 348 /* current stats command */ 349 350 uint8_t refcount; /* number of references to the object */ 351 352 struct { 353 char *buffer; 354 size_t size; 355 size_t offset; 356 } dynamic_buffer; 357 358 void *engine_storage; 359 360 /** Current ascii protocol */ 361 EXTENSION_ASCII_PROTOCOL_DESCRIPTOR *ascii_cmd; 362 363 364 /* Binary protocol stuff */ 365 /* This is where the binary header goes */ 366 protocol_binary_request_header binary_header; 367 uint64_t cas; /* the cas to return */ 368 short cmd; /* current command being processed */ 369 int opaque; 370 int keylen; 371 372 int list_state; /* bitmask of list state data for this connection */ 373 conn *next; /* Used for generating a list of conn structures */ 374 LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ 375 376 ENGINE_ERROR_CODE aiostat; 377 bool ewouldblock; 378 TAP_ITERATOR tap_iterator; 379 }; 380 381 /* States for the connection list_state */ 382 #define LIST_STATE_PROCESSING 1 383 #define LIST_STATE_REQ_PENDING_IO 2 384 #define LIST_STATE_REQ_PENDING_CLOSE 4 385 386 /* 387 * Functions 388 */ 389 conn *conn_new(const SOCKET sfd, STATE_FUNC init_state, const int event_flags, 390 const int read_buffer_size, enum network_transport transport, 391 struct event_base *base, struct timeval *timeout); 392 #ifndef WIN32 393 extern int daemonize(int nochdir, int noclose); 394 #endif 395 396 #include "stats.h" 397 #include "trace.h" 398 #include "hash.h" 399 #include <memcached/util.h> 400 401 /* 402 * Functions to add / update the connection to libevent 403 */ 404 bool register_event(conn *c, struct timeval *timeout); 405 bool unregister_event(conn *c); 406 bool update_event(conn *c, const int new_flags); 407 408 /* 409 * Functions such as the libevent-related calls that need to do cross-thread 410 * communication in multithreaded mode (rather than actually doing the work 411 * in the current thread) are called via "dispatch_" frontends, which are 412 * also #define-d to directly call the underlying code in singlethreaded mode. 413 */ 414 415 void thread_init(int nthreads, struct event_base *main_base, 416 void (*dispatcher_callback)(int, short, void *)); 417 void threads_shutdown(void); 418 419 int dispatch_event_add(int thread, conn *c); 420 void dispatch_conn_new(SOCKET sfd, STATE_FUNC init_state, int event_flags, 421 int read_buffer_size, enum network_transport transport); 422 423 /* Lock wrappers for cache functions that are called from main loop. */ 424 void accept_new_conns(const bool do_accept); 425 conn *conn_from_freelist(void); 426 bool conn_add_to_freelist(conn *c); 427 int is_listen_thread(void); 428 429 void STATS_LOCK(void); 430 void STATS_UNLOCK(void); 431 void threadlocal_stats_clear(struct thread_stats *stats); 432 void threadlocal_stats_reset(struct thread_stats *thread_stats); 433 void threadlocal_stats_aggregate(struct thread_stats *thread_stats, struct thread_stats *stats); 434 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out); 435 436 /* Stat processing functions */ 437 void append_stat(const char *name, ADD_STAT add_stats, conn *c, 438 const char *fmt, ...); 439 440 void notify_io_complete(const void *cookie, ENGINE_ERROR_CODE status); 441 void conn_set_state(conn *c, STATE_FUNC state); 442 const char *state_text(STATE_FUNC state); 443 void safe_close(SOCKET sfd); 444 445 446 // Number of times this connection is in the given pending list 447 int number_of_pending(conn *c, conn *pending); 448 bool has_cycle(conn *c); 449 bool list_contains(conn *h, conn *n); 450 conn *list_remove(conn *h, conn *n); 451 size_t list_to_array(conn **dest, size_t max_items, conn **l); 452 void enlist_conn(conn *c, conn **list); 453 void finalize_list(conn **list, size_t items); 454 bool set_socket_nonblocking(SOCKET sfd); 455 456 void conn_close(conn *c); 457 458 459 #if HAVE_DROP_PRIVILEGES 460 extern void drop_privileges(void); 461 #else 462 #define drop_privileges() 463 #endif 464 465 /* connection state machine */ 466 bool conn_listening(conn *c); 467 bool conn_new_cmd(conn *c); 468 bool conn_waiting(conn *c); 469 bool conn_read(conn *c); 470 bool conn_parse_cmd(conn *c); 471 bool conn_write(conn *c); 472 bool conn_nread(conn *c); 473 bool conn_swallow(conn *c); 474 bool conn_pending_close(conn *c); 475 bool conn_immediate_close(conn *c); 476 bool conn_closing(conn *c); 477 bool conn_mwrite(conn *c); 478 bool conn_ship_log(conn *c); 479 bool conn_add_tap_client(conn *c); 480 bool conn_setup_tap_stream(conn *c); 481 482 /* If supported, give compiler hints for branch prediction. */ 483 #if !defined(__builtin_expect) && (!defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)) 484 #define __builtin_expect(x, expected_value) (x) 485 #endif 486 487 #define likely(x) __builtin_expect((x),1) 488 #define unlikely(x) __builtin_expect((x),0) 489 #endif 490