1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2014 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef LCB_MCREQ_H 19 #define LCB_MCREQ_H 20 21 #include <libcouchbase/couchbase.h> 22 #include <libcouchbase/api3.h> 23 #include <libcouchbase/vbucket.h> 24 #include <memcached/protocol_binary.h> 25 #include <libcouchbase/metrics.h> 26 #include "netbuf/netbuf.h" 27 #include "sllist.h" 28 #include "config.h" 29 #include "packetutils.h" 30 31 #ifdef __cplusplus 32 extern "C" { 33 #endif /** __cplusplus */ 34 35 /** 36 * @file 37 * @brief Core memcached client routines 38 */ 39 40 /** 41 * @defgroup mcreq Memcached Packets 42 * 43 * @brief 44 * This module defines the core routines which are used to construct, handle, 45 * and enqueue packets. They also handle the retry mechanisms. 46 * 47 * 48 * # Initializing the Queue 49 * 50 * Using the mcreq system involves first establishing an mc_CMDQUEUE structure. 51 * This structure contains several mc_PIPELINE structures. The proper way to 52 * initialize the mc_CMDQEUE structure is to call mcreq_queue_init(). 53 * 54 * Once the queue has been initialized, it must be assigned a 55 * `lcbvb_CONFIG*` (which it will _not_ own). This is done via the 56 * mcreq_queue_add_pipelines(). This function takes an array of pipeline pointers, 57 * and this will typically be a "subclass" (mc_SERVER) allocated via 58 * mcserver_alloc() 59 * 60 * Once the pipelines have been established, operations may be scheduled and 61 * distributed across the various pipelines. 62 * 63 * # Creating a Packet 64 * 65 * For each packet sent, the packet should first be reserved via the 66 * mcreq_basic_packet() call which allocates space for the actual packet 67 * as well as provides and populates the vbucket fields as needed. 68 * 69 * The header size must be the total size of the header plus any extras 70 * following the header but before the actual key data. 71 * 72 * If the command carries a body in addition to the key, it should be provided 73 * via mcreq_reserve_value(). 74 * 75 * Once the packet has a key and value it must be assigned a cookie. The 76 * cookie may either be of a simple embedded type or an extended type. Whatever 77 * the case the appropriate flags should be set. 78 * 79 * # Scheduling Commands 80 * 81 * Scheduling commands is performed in an _enter_ and _leave_ sequence. 82 * mcreq_sched_enter() should be called before one or more commands are added. 83 * Then for each new command added, mcreq_sched_add() should be invoked with 84 * the new packet, and finally either mcreq_sched_leave() or mcreq_sched_fail() 85 * should be invoked to flush the commands to the network or free the resources 86 * allocated. In both cases the commands affected are scoped by the last call 87 * to mcreq_sched_enter(). 88 * 89 * In order for commands to actually be flushed, the mc_PIPELINE::flush_start 90 * field must be set. This can vary depending on what the state of the underlying 91 * socket is. In server.c for example, the initial callback just schedules a 92 * connection. While the connection is in progress this field is set to a no-op 93 * callback, and finally when the socket is connected this field is set to 94 * interact with the I/O system which actually writes the buffers. 95 * 96 * # Flushing Responses 97 * 98 * This module does not do network I/O by design. Its only bridge is the 99 * mc_PIPELINE::flush_start function which should be set to actually flush 100 * the data. 101 * 102 * # Handling Reponses 103 * 104 * The I/O system reading the responses should place the response into a 105 * packet_info structure. Once this is done, the request for the response must 106 * be found using the opaque. This may be done with mcreq_pipeline_find() 107 * or mcreq_pipeline_remove() depending on whether this request expects multiple 108 * responses (such as the 'stat' command). These parameters should be passed 109 * to the mcreq_dispatch_response() function which will invoke the appropriate 110 * user-defined handler for it. 111 * 112 * If the packet does not expect more responses (as above), the application 113 * should call mcreq_packet_handled() 114 * 115 * 116 * # Error Handling and Failing Commands 117 * 118 * This module offers facilities for failing commands from a pipeline while 119 * safely allowing for their sharing of user-allocated data. 120 * 121 * The mcreq_pipeline_fail() and mcreq_pipeline_timeout() will fail packets 122 * in a single pipeline (the former failing all packets, the latter failing 123 * only packets older than a specified threshold). 124 * 125 * The mcreq_iterwipe() will clean a pipeline of its packets, invoking a 126 * callback which allows the user to relocate the packet to another pipeline. 127 * In this callback the user may invoke the mcreq_renew_packet() function to 128 * create a copy of the packet, keeping the previous packet in tact, but 129 * returning a copy of the packet as the 'primary' version. 130 * 131 * @addtogroup mcreq 132 * @{ 133 */ 134 135 136 /** 137 * @name Core Packet Structure 138 * @{ 139 */ 140 141 /** @brief Constant defining the size of a memcached header */ 142 #define MCREQ_PKT_BASESIZE 24 143 144 /** @brief Embedded user data for a simple request. */ 145 typedef struct mc_REQDATA { 146 const void *cookie; /**< User pointer to place in callbacks */ 147 hrtime_t start; /**< Time of the initial request. Used for timeouts */ 148 /** 149 * Time when dispatching response has begun for the command. 150 * Used for metrics/tracing. Might be zero, when tracing is not enabled. 151 */ 152 hrtime_t dispatch; 153 #ifdef LCB_TRACING 154 lcbtrace_SPAN *span; 155 #endif 156 } mc_REQDATA; 157 158 struct mc_packet_st; 159 struct mc_pipeline_st; 160 161 /** This structure serves as a kind of 'vtable' for the mc_REQDATAEX structure. */ 162 typedef struct { 163 /** 164 * Callback to be invoked for "Extended" packet handling. This is only 165 * available in the mc_REQDATAEX structure 166 * @param pipeline the pipeline on which the response was received 167 * @param pkt the request packet 168 * @param rc the error code for the response 169 * @param arg opaque pointer for callback 170 */ 171 void (*handler)(struct mc_pipeline_st *pipeline, 172 struct mc_packet_st *pkt, lcb_error_t rc, const void *res); 173 174 /** 175 * Destructor function called from within mcreq_sched_fail() for packets with 176 * extended data. This function should suitably free the data for the packet, 177 * if any. 178 * @param pkt The packet being unscheduled. 179 */ 180 void (*fail_dtor)(struct mc_packet_st *pkt); 181 } mc_REQDATAPROCS; 182 183 /**@brief Allocated user data for an extended request. 184 * 185 * @details 186 * An extended request is typically used by commands which have more complex 187 * handling requirements, such as mapping a single user API call to multiple 188 * packets, or when the packet itself is generated internally rather than 189 * on behalf of an API request. 190 */ 191 typedef struct mc_REQDATAEX { 192 const void *cookie; /**< User data */ 193 hrtime_t start; /**< Start time */ 194 /** 195 * Time when dispatching response has begun for the command. 196 * Used for metrics/tracing. Might be zero, when tracing is not enabled. 197 */ 198 hrtime_t dispatch; 199 #ifdef LCB_TRACING 200 lcbtrace_SPAN *span; 201 #endif 202 const mc_REQDATAPROCS *procs; /**< Common routines for the packet */ 203 204 #ifdef __cplusplus mc_REQDATAEXmc_REQDATAEX205 mc_REQDATAEX(const void *cookie_, const mc_REQDATAPROCS &procs_, hrtime_t start_) 206 : cookie(cookie_), start(start_), dispatch(0), 207 #ifdef LCB_TRACING 208 span(NULL), 209 #endif 210 procs(&procs_) 211 { 212 } 213 #endif 214 } mc_REQDATAEX; 215 216 /** 217 * Called when the buffers for a packet have been invoked 218 * @param pl the pipeline 219 * @param ucookie the cookie passed to the scheduler 220 * @param kbuf the pointer to the beginning of the key/header buffer, if 221 * passed to the scheduler 222 * @param vbuf the pointer to the beginning of the value buffer or the first 223 * IOV within the buffer. 224 */ 225 typedef void (*mcreq_bufdone_fn)(struct mc_pipeline_st *pl, 226 const void *ucookie, void *kbuf, void *vbuf); 227 228 /** 229 * Possible values for the mc_PACKET#flags field in the packet structure. 230 * These provide 231 * information as to which fields in the various unions are in use, and how 232 * to allocate/release data buffers for requests. 233 */ 234 typedef enum { 235 /** The key is user-allocated. Do not release to MBLOCK */ 236 MCREQ_F_KEY_NOCOPY = 1 << 0, 237 238 /** The value is user-allocated. Do not release to MBLOCK */ 239 MCREQ_F_VALUE_NOCOPY = 1 << 1, 240 241 /** 242 * The value is user allocated and in the form of an IOV. 243 * Use mc_VALUE#multi 244 */ 245 MCREQ_F_VALUE_IOV = 1 << 2, 246 247 /** The request has a value. Use mc_VALUE#single unless otherwise noted */ 248 MCREQ_F_HASVALUE = 1 << 3, 249 250 /** 251 * The request is tied to an 'extended' user data structure. 252 * Use mc_USER#exdata 253 */ 254 MCREQ_F_REQEXT = 1 << 4, 255 256 /** The request is a one-to-one user forwarded packet */ 257 MCREQ_F_UFWD = 1 << 5, 258 259 /** 260 * Indicates that the entire packet has been flushed. Specifically this 261 * also indicates that the packet's underlying buffers are no longer needed 262 * by libcouchbase. 263 */ 264 MCREQ_F_FLUSHED = 1 << 6, 265 266 /** 267 * Indicates that the callback should NOT be invoked for the request. This 268 * is typically because the request is just present in the queue for buffer 269 * management purposes and has expired or otherwise been invalidated. 270 */ 271 MCREQ_F_INVOKED = 1 << 7, 272 273 /** 274 * Indicates that this packet and its constituent data members are not 275 * part of a nb_MBLOCK but rather point to standalone malloc'd memory. This 276 * also indicates that the packet is actually an mc_EXPACKET extended 277 * type. This is set by mcreq_renew_packet() 278 */ 279 MCREQ_F_DETACHED = 1 << 8, 280 281 /** 282 * Another way of signalling that the callback has an 'internal' variant. 283 * Dispatching this command requires a specially formatted cookie object, 284 * which itself is expected to _contain_ a pointer to the callback, and 285 * thus be formatted like so: 286 * @code{.c} 287 * struct { 288 * lcb_RESPCALLBACK callback; 289 * }; 290 * @endcode 291 */ 292 MCREQ_F_PRIVCALLBACK = 1 << 9 293 } mcreq_flags; 294 295 /** @brief mask of flags indicating user-allocated buffers */ 296 #define MCREQ_UBUF_FLAGS (MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY) 297 /** @brief mask of flags indicating response state of the packet */ 298 #define MCREQ_STATE_FLAGS (MCREQ_F_INVOKED|MCREQ_F_FLUSHED) 299 300 /** Union representing the value within a packet */ 301 union mc_VALUE { 302 /** For a single contiguous value */ 303 nb_SPAN single; 304 305 /** For a set of multiple IOV buffers */ 306 lcb_FRAGBUF multi; 307 }; 308 309 /** Union representing application/command data within a packet structure */ 310 union mc_USER { 311 /** Embedded command info for simple commands; 16 bytes, 48B */ 312 mc_REQDATA reqdata; 313 314 /** Pointer to extended data */ 315 mc_REQDATAEX *exdata; 316 }; 317 318 /** 319 * @brief Packet structure for a single Memcached command 320 * 321 * A single packet structure is allocated for each request 322 * sent to a server. A packet structure may be associated with user data in the 323 * u_rdata union field, either by using the embedded structure, or by referencing 324 * an allocated chunk of 'extended' user data. 325 */ 326 typedef struct mc_packet_st { 327 /** Node in the linked list for logical command ordering */ 328 sllist_node slnode; 329 330 /** 331 * Node in the linked list for actual output ordering. 332 * @see netbuf_end_flush2(), netbuf_pdu_enqueue() 333 */ 334 sllist_node sl_flushq; 335 336 /** Span for key and header */ 337 nb_SPAN kh_span; 338 339 /** Extras length */ 340 uint8_t extlen; 341 342 /** Retries */ 343 uint8_t retries; 344 345 /** flags for request. @see mcreq_flags */ 346 uint16_t flags; 347 348 /** Cached opaque value */ 349 uint32_t opaque; 350 351 /** User/CMDAPI Data */ 352 union mc_USER u_rdata; 353 354 /** Value data */ 355 union mc_VALUE u_value; 356 357 /** Allocation data for the PACKET structure itself */ 358 nb_MBLOCK *alloc_parent; 359 } mc_PACKET; 360 361 362 /** 363 * @brief Gets the request data from the packet structure itself 364 * @return an mc_REQDATA or mc_REQDATAEX pointer 365 */ 366 #define MCREQ_PKT_RDATA(pkt) \ 367 (((pkt)->flags & MCREQ_F_REQEXT) \ 368 ? ((mc_REQDATA *)(pkt)->u_rdata.exdata) \ 369 : (&(pkt)->u_rdata.reqdata)) 370 371 /** 372 * @brief Retrieve the cookie pointer from a packet 373 * @param pkt 374 */ 375 #define MCREQ_PKT_COOKIE(pkt) MCREQ_PKT_RDATA(pkt)->cookie 376 377 /**@}*/ 378 379 /** 380 * Callback invoked when APIs request that a pipeline start flushing. It 381 * receives a pipeline object as its sole argument. 382 */ 383 typedef void (*mcreq_flushstart_fn)(struct mc_pipeline_st *pipeline); 384 385 /** 386 * @brief Structure representing a single input/output queue for memcached 387 * 388 * Memcached request pipeline. This contains the command log for 389 * sending/receiving requests. This is basically the non-I/O part of the server 390 */ 391 typedef struct mc_pipeline_st { 392 /** List of requests. Newer requests are appended at the end */ 393 sllist_root requests; 394 395 /** Parent command queue */ 396 struct mc_cmdqueue_st *parent; 397 398 /** 399 * Flush handler. This is invoked to schedule a flush operation 400 * the socket 401 */ 402 mcreq_flushstart_fn flush_start; 403 404 /** Index of this server within the configuration map */ 405 int index; 406 407 /** 408 * Intermediate queue where pending packets are placed. Moved to 409 * the `requests` list when mcreq_sched_leave() is called 410 */ 411 sllist_root ctxqueued; 412 413 /** 414 * Callback invoked for each packet (which has user-defined buffers) when 415 * it is no longer required 416 */ 417 mcreq_bufdone_fn buf_done_callback; 418 419 /** Buffer manager for the respective requests. */ 420 nb_MGR nbmgr; 421 422 /** Allocator for packet structures */ 423 nb_MGR reqpool; 424 425 /** Optional metrics structure for server */ 426 struct lcb_SERVERMETRICS_st *metrics; 427 } mc_PIPELINE; 428 429 typedef struct mc_cmdqueue_st { 430 /** Indexed pipelines, i.e. server map target */ 431 mc_PIPELINE **pipelines; 432 433 /** 434 * Small array of size npipelines, for mcreq_sched_enter()/mcreq_sched_leave() 435 * stuff. See those functions for usage 436 */ 437 char *scheds; 438 439 /** 440 * Whether a context is currently entered (i.e. sched_enter()) 441 */ 442 unsigned ctxenter; 443 444 /** Number of pipelines in the queue */ 445 unsigned npipelines; 446 447 /** Number of pipelines, with fallback included */ 448 unsigned _npipelines_ex; 449 450 /** Sequence number for pipeline. Incremented for each new packet */ 451 uint32_t seq; 452 453 /** Configuration handle for vBucket mapping */ 454 lcbvb_CONFIG* config; 455 456 /** Opaque pointer to be used by the application (in this case, lcb core) */ 457 void* cqdata; 458 459 /**Special pipeline used to contain orphaned packets within a scheduling 460 * context. This field is used by mcreq_set_fallback_handler() */ 461 mc_PIPELINE *fallback; 462 } mc_CMDQUEUE; 463 464 /** 465 * Allocate a packet belonging to a specific pipeline. 466 * @param pipeline the pipeline to allocate against 467 * @return a new packet structure or NULL on error 468 */ 469 mc_PACKET * 470 mcreq_allocate_packet(mc_PIPELINE *pipeline); 471 472 473 /** 474 * Free the packet structure. This will simply free the skeleton structure. 475 * The underlying members will not be touched. 476 * @param pipeline the pipleine which was used to allocate the packet 477 * @param packet the packet to release 478 */ 479 void 480 mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet); 481 482 struct mc_epkt_datum; 483 484 /** 485 * Extended packet structure. This is returned by mcreq_renew_packet(). 486 * 487 * The purpose of this structure is to be able to "tag" extra data to the packet 488 * (typically for retries, or "special" commands) without weighing down on the 489 * normal packet structure; thus it should be considered a 'subclass' of the 490 * normal packet structure. 491 */ 492 typedef struct mc_expacket_st { 493 /** The base packet structure */ 494 mc_PACKET base; 495 /* Additional data for the packet itself */ 496 sllist_root data; 497 } mc_EXPACKET; 498 499 typedef struct mc_epkt_datum { 500 sllist_node slnode; 501 502 /**Unique string key by which this datum will be identified, as more 503 * than a single datum can exist for a packet */ 504 const char *key; 505 506 /**Free the data structure 507 * @param datum the datum object */ 508 void (*dtorfn)(struct mc_epkt_datum *datum); 509 } mc_EPKTDATUM; 510 511 /** 512 * Detatches the packet src belonging to the given pipeline. A detached 513 * packet has all its data allocated via malloc and does not belong to 514 * any particular buffer. This is typically used for relocation or retries 515 * where it is impractical to affect the in-order netbuf allocator. 516 * 517 * @param src the source packet to copy 518 * @return a new packet structure. You should still clear the packet's data 519 * with wipe_packet/release_packet but you may pass NULL as the pipeline 520 * parameter. 521 * 522 * @attention 523 * Any 'Extended' packet data is **MOVED** from the source to the destination 524 * packet. This goes well with the typical use case of this function, which is 525 * not to actually duplicate the packet, but rather to provide a fresh copy 526 * which may be re-used. 527 * 528 * @attention 529 * This function attempts to be "dumb" in the sense of trying to make an 530 * exact effective clone of the original packet (the main goal of this function 531 * is to move the resources of the packet over to a new block of memory). This 532 * means things like non-buffer-related flags (i.e. the ones not specifying 533 * the layout of the buffer) are _preserved_, including the so-called 534 * "state flags" which indicate if a packet has been flushed and/or handled. If 535 * calling this function to retry a packet, ensure to clear these state flags. 536 */ 537 mc_PACKET * 538 mcreq_renew_packet(const mc_PACKET *src); 539 540 /** 541 * Associates a datum with the packet. The packet must be a standalone packet, 542 * indicated by the MCREQ_F_DETACHED flag in the mc_PACKET::flags field. 543 * @param ep The packet to which the data should be added 544 * @param datum The datum object to add. The object is not copied and should 545 * not be freed until the `dtorfn` or `copyfn` functions have been called 546 * @return 0 on success, nonzero on failure (i.e. if packet is not detached). 547 */ 548 int 549 mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum); 550 551 /** 552 * Locate the datum associated with the given key for the packet. 553 * @param ep The packet in which to search 554 * @param key A NUL-terminated string matching the mc_EPKTDATUM::key field 555 * @return The datum, or NULL if it does not exist. 556 */ 557 mc_EPKTDATUM * 558 mcreq_epkt_find(mc_EXPACKET *ep, const char *key); 559 560 /** 561 * Reserve the packet's basic header structure, this is for use for frames 562 * which do not contain keys, or contain fixed size data which does not 563 * need to be returned via get_key 564 * @param pipeline the pipeline to use 565 * @param packet the packet which should contain the header 566 * @param hdrsize the total size of the header+extras+key 567 */ 568 lcb_error_t 569 mcreq_reserve_header( 570 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize); 571 572 /** 573 * Initialize the given packet's key structure 574 * @param pipeline the pipeline used to allocate the packet 575 * @param packet the packet which should have its key field initialized 576 * @param hdrsize the size of the header before the key. This should contain 577 * the header size (i.e. 24 bytes) PLUS any extras therein. 578 * @param kreq the user-provided key structure 579 * @return LCB_SUCCESS on success, LCB_CLIENT_ENOMEM on allocation failure 580 */ 581 lcb_error_t 582 mcreq_reserve_key( 583 mc_PIPELINE *pipeline, mc_PACKET *packet, 584 uint8_t hdrsize, const lcb_KEYBUF *kreq); 585 586 587 /** 588 * Initialize the given packet's value structure. Only applicable for storage 589 * operations. 590 * @param pipeline the pipeline used to allocate the packet 591 * @param packet the packet whose value field should be initialized 592 * @param vreq the user-provided structure containing the value parameters 593 * @return LCB_SUCCESS on success, LCB_CLIENT_ENOMEM on allocation failure 594 */ 595 lcb_error_t 596 mcreq_reserve_value(mc_PIPELINE *pipeline, mc_PACKET *packet, 597 const lcb_VALBUF *vreq); 598 599 /** 600 * Reserves value/body space, but doesn't actually copy the contents over 601 * @param pipeline the pipeline to use 602 * @param packet the packet to host the value 603 * @param n the number of bytes to reserve 604 */ 605 lcb_error_t 606 mcreq_reserve_value2(mc_PIPELINE *pipeline, mc_PACKET *packet, lcb_size_t n); 607 608 609 /** 610 * Enqueue the packet to the pipeline. This packet should have fully been 611 * initialized. Specifically, the packet's data buffer contents (i.e. key, 612 * header, and value) must not be modified once this function is called 613 * 614 * @param pipeline the target pipeline that the packet will be queued in 615 * @param packet the packet to enqueue. 616 * This function always succeeds. 617 */ 618 void 619 mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet); 620 621 /** 622 * Like enqueue packet, except it will also inspect the packet's timeout field 623 * and if necessary, restructure the command inside the request list so that 624 * it appears before newer commands. 625 * 626 * The default enqueue_packet() just appends the command to the end of the 627 * queue while this will perform an additional check (and is less efficient) 628 */ 629 void 630 mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet); 631 632 /** 633 * Wipe the packet's internal buffers, releasing them. This should be called 634 * when the underlying data buffer fields are no longer needed, usually this 635 * is called directly before release_packet. 636 * Note that release_packet should be called to free the storage for the packet 637 * structure itself. 638 * @param pipeline the pipeline structure used to allocate this packet 639 * @param packet the packet to wipe. 640 */ 641 void 642 mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet); 643 644 /** 645 * Function to extract mapping information given a key and a hashkey 646 * @param queue The command queue 647 * @param key The structure for the key 648 * @param hashkey The optional hashkey structure 649 * @param nhdr The size of the header (for KV_CONTIG) 650 * @param[out] vbid The vBucket ID 651 * @param[out] srvix The master server's index 652 */ 653 void 654 mcreq_map_key(mc_CMDQUEUE *queue, 655 const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey, 656 unsigned nhdr, int *vbid, int *srvix); 657 658 659 /**If the packet's vbucket does not have a master node, use the fallback pipeline 660 * and let it be handled by the handler installed via mcreq_set_fallback_handler() 661 */ 662 #define MCREQ_BASICPACKET_F_FALLBACKOK 0x01 663 664 /** 665 * Handle the basic requirements of a packet common to all commands 666 * @param queue the queue 667 * @param cmd the command base structure 668 * 669 * @param[out] req the request header which will be set with key, vbucket, and extlen 670 * fields. In other words, you do not need to initialize them once this 671 * function has completed. 672 * 673 * @param extlen the size of extras for this command 674 * @param[out] packet a pointer set to the address of the allocated packet 675 * @param[out] pipeline a pointer set to the target pipeline 676 * @param options a set of options to control creation behavior. Currently the 677 * only recognized options are `0` (i.e. default options), or @ref 678 * MCREQ_BASICPACKET_F_FALLBACKOK 679 */ 680 681 lcb_error_t 682 mcreq_basic_packet( 683 mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd, 684 protocol_binary_request_header *req, uint8_t extlen, 685 mc_PACKET **packet, mc_PIPELINE **pipeline, int options); 686 687 /** 688 * @brief Get the key from a packet 689 * @param[in] packet The packet from which to retrieve the key 690 * @param[out] key 691 * @param[out] nkey 692 */ 693 void 694 mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey); 695 696 /** @brief Returns the size of the entire packet, in bytes */ 697 uint32_t 698 mcreq_get_bodysize(const mc_PACKET *packet); 699 700 /** 701 * @brief get the total packet size (header+body) 702 * @param packet the packet 703 * @return the total size 704 */ 705 uint32_t 706 mcreq_get_size(const mc_PACKET *packet); 707 708 /** 709 * @brief Get the vBucket for the request 710 * @param packet The packet 711 * @return The vBucket ID from the packet. 712 */ 713 uint16_t 714 mcreq_get_vbucket(const mc_PACKET *packet); 715 716 /** Initializes a single pipeline object */ 717 int 718 mcreq_pipeline_init(mc_PIPELINE *pipeline); 719 720 /** Cleans up any initialization from pipeline_init */ 721 void 722 mcreq_pipeline_cleanup(mc_PIPELINE *pipeline); 723 724 725 /** 726 * Set the pipelines that this queue will manage 727 * @param queue the queue to take the pipelines 728 * @param pipelines an array of pipeline pointers. The array is copied 729 * @param npipelines number of pipelines in the queue 730 * @param config the configuration handle. The configuration is _not_ owned 731 * and _not_ copied and the caller must ensure it remains valid 732 * until it is replaces. 733 */ 734 void 735 mcreq_queue_add_pipelines( 736 mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, 737 unsigned npipelines, lcbvb_CONFIG* config); 738 739 740 /** 741 * Set the arra 742 * @param queue the queue 743 * @param count a pointer to the number of pipelines within the queue 744 * @return the pipeline array. 745 * 746 * When this function completes another call to add_pipelines must be performed 747 * in order for the queue to function properly. 748 */ 749 mc_PIPELINE ** 750 mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count); 751 752 int 753 mcreq_queue_init(mc_CMDQUEUE *queue); 754 755 void 756 mcreq_queue_cleanup(mc_CMDQUEUE *queue); 757 758 /** 759 * @brief Add a packet to the current scheduling context 760 * @param pipeline 761 * @param pkt 762 * @see mcreq_sched_enter() 763 */ 764 void 765 mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt); 766 767 /** 768 * @brief enter a scheduling scope 769 * @param queue 770 * @attention It is not safe to call this function twice 771 * @volatile 772 */ 773 void 774 mcreq_sched_enter(struct mc_cmdqueue_st *queue); 775 776 /** 777 * @brief successfully exit a scheduling scope 778 * 779 * All operations enqueued since the last call to mcreq_sched_enter() will be 780 * placed in their respective pipelines' operation queue. 781 * 782 * @param queue 783 * @param do_flush Whether the items in the queue should be flushed 784 * @volatile 785 */ 786 void 787 mcreq_sched_leave(struct mc_cmdqueue_st *queue, int do_flush); 788 789 /** 790 * @brief destroy all operations within the scheduling scope 791 * All operations enqueued since the last call to mcreq_sched_enter() will 792 * be destroyed 793 * @param queue 794 */ 795 void 796 mcreq_sched_fail(struct mc_cmdqueue_st *queue); 797 798 /** 799 * Find a packet with the given opaque value 800 */ 801 mc_PACKET * 802 mcreq_pipeline_find(mc_PIPELINE *pipeline, uint32_t opaque); 803 804 /** 805 * Find and remove the packet with the given opaque value 806 */ 807 mc_PACKET * 808 mcreq_pipeline_remove(mc_PIPELINE *pipeline, uint32_t opaque); 809 810 /** 811 * Handles a received packet in response to a command 812 * @param pipeline the pipeline upon which the request was received 813 * @param request the original request 814 * @param response the packet received in the response 815 * @param immerr an immediate error message. If this is not LCB_SUCCESS then 816 * the packet in `response` shall contain only a header and the request itself 817 * should be analyzed 818 * 819 * @return 0 on success, nonzero if the handler could not be found for the 820 * command. 821 */ 822 int 823 mcreq_dispatch_response(mc_PIPELINE *pipeline, mc_PACKET *request, 824 packet_info *response, lcb_error_t immerr); 825 826 827 #define MCREQ_KEEP_PACKET 1 828 #define MCREQ_REMOVE_PACKET 2 829 /** 830 * Callback used for packet iteration wiping 831 * 832 * @param queue the queue 833 * @param srcpl the source pipeline which is being cleaned 834 * @param pkt the packet which is being cleaned 835 * @param cbarg the argument passed to the iterwipe 836 * 837 * @return one of MCREQ_KEEP_PACKET (if the packet should be kept inside the 838 * pipeline) or MCREQ_REMOVE_PACKET (if the packet should not be kept) 839 */ 840 typedef int (*mcreq_iterwipe_fn) 841 (mc_CMDQUEUE *queue, mc_PIPELINE *srcpl, mc_PACKET *pkt, void *cbarg); 842 /** 843 * Wipe a single pipeline. This may be used to move and/or relocate 844 * existing commands to other pipelines. 845 * @param queue the queue to use 846 * @param src the pipeline to wipe 847 * @param callback the callback to invoke for each packet 848 * @param arg the argument passed to the callback 849 */ 850 void 851 mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src, 852 mcreq_iterwipe_fn callback, void *arg); 853 854 855 /** 856 * Called when a packet does not need to have any more references to it 857 * remaining. A packet by itself only has two implicit references; one is 858 * a flush reference and the other is a handler reference. 859 * 860 * The flush reference is unset once the packet has been flushed and the 861 * handler reference is unset once the packet's handler callback has been 862 * invoked and the relevant data relayed to the user. 863 * 864 * Once this function is called, the packet passed will no longer be valid 865 * and thus should not be used. 866 */ 867 void 868 mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt); 869 870 /** 871 * @brief Indicate that the packet was handled 872 * @param pipeline the pipeline 873 * @param pkt the packet which was handled 874 * If the packet has also been flushed, the packet's storage will be released 875 * and `pkt` will no longer point to valid memory. 876 */ 877 #define mcreq_packet_handled(pipeline, pkt) do { \ 878 (pkt)->flags |= MCREQ_F_INVOKED; \ 879 if ((pkt)->flags & MCREQ_F_FLUSHED) { \ 880 mcreq_packet_done(pipeline, pkt); \ 881 } \ 882 } while (0); 883 884 /** 885 * Reset the timeout (or rather, the start time) on all pending packets 886 * to the time specified. 887 * 888 * @param pl The pipeline 889 * @param nstime The new timestamp to use. 890 */ 891 void 892 mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime); 893 894 /** 895 * Callback to be invoked when a packet is about to be failed out from the 896 * request queue. This should be used to possibly invoke handlers. The packet 897 * will then be removed from the queue. 898 * @param pipeline the pipeline which has been errored 899 * @param packet the current packet 900 * @param err the error received 901 * @param arg an opaque pointer 902 */ 903 typedef void (*mcreq_pktfail_fn) 904 (mc_PIPELINE *pipeline, mc_PACKET *packet, lcb_error_t err, void *arg); 905 906 /** 907 * Fail out a given pipeline. All commands in the pipeline will be removed 908 * from the pipeline (though they may still not be freed if they are pending 909 * a flush). 910 * 911 * @param pipeline the pipeline to fail out 912 * @param err the error which caused the failure 913 * @param failcb a callback invoked to handle each failed packet 914 * @param cbarg a pointer passed as the last parameter to the callback 915 * 916 * @return the number of items actually failed. 917 */ 918 unsigned 919 mcreq_pipeline_fail( 920 mc_PIPELINE *pipeline, lcb_error_t err, 921 mcreq_pktfail_fn failcb, void *cbarg); 922 923 /** 924 * Fail out all commands in the pipeline which are older than a specified 925 * interval. This is similar to the pipeline_fail() function except that commands 926 * which are newer than the threshold are still kept 927 * 928 * @param pipeline the pipeline to fail out 929 * @param err the error to provide to the handlers (usually LCB_ETIMEDOUT) 930 * @param failcb the callback to invoke 931 * @param cbarg the last argument to the callback 932 * @param oldest_valid the _oldest_ time for a command to still be valid 933 * @param oldest_start set to the start time of the _oldest_ command which is 934 * still valid. 935 * 936 * @return the number of commands actually failed. 937 */ 938 unsigned 939 mcreq_pipeline_timeout( 940 mc_PIPELINE *pipeline, lcb_error_t err, 941 mcreq_pktfail_fn failcb, void *cbarg, 942 hrtime_t oldest_valid, 943 hrtime_t *oldest_start); 944 945 /** 946 * This function is called when a packet could not be properly mapped to a real 947 * pipeline 948 * @param cq the command queue 949 * @param pkt the packet which needs to be relocated. The packet needs to be 950 * properly copied via mcreq_renew_packet() 951 */ 952 typedef void (*mcreq_fallback_cb)(mc_CMDQUEUE *cq, mc_PACKET *pkt); 953 954 /** 955 * Set the callback function to be invoked when a packet could not be properly 956 * mapped to a node. The callback function is invoked from within the 957 * mcreq_sched_leave() function. 958 * 959 * The handler should be assigned only once, during initialization 960 * 961 * @param cq The command queue 962 * @param handler The handler to invoke 963 */ 964 void 965 mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler); 966 967 /** 968 * Callback used by mcreq_dump_packet() and mcreq_dump_chain() to format the 969 * packet's payload 970 * @param data the data to dump 971 * @param size the size of the data 972 * @param fp the file to write the output to 973 */ 974 typedef void (*mcreq_payload_dump_fn) 975 (const void *data, unsigned size, FILE *fp); 976 977 /** 978 * Dumps a single packet to the file indicated by `fp` 979 * @param pkt the packet to dump 980 * @param fp The file to write to 981 * @param dumpfn If specified, this function is called to handle the packet's 982 * header and payload body 983 */ 984 void 985 mcreq_dump_packet(const mc_PACKET *pkt, FILE *fp, mcreq_payload_dump_fn dumpfn); 986 987 void 988 mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn); 989 990 #define mcreq_write_hdr(pkt, hdr) \ 991 memcpy( SPAN_BUFFER(&(pkt)->kh_span), (hdr)->bytes, sizeof((hdr)->bytes) ) 992 993 #define mcreq_write_exhdr(pkt, hdr, n) \ 994 memcpy(SPAN_BUFFER((&pkt)->kh_span), (hdr)->bytes, n) 995 996 #define mcreq_read_hdr(pkt, hdr) \ 997 memcpy( (hdr)->bytes, SPAN_BUFFER(&(pkt)->kh_span), sizeof((hdr)->bytes) ) 998 999 #define mcreq_first_packet(pipeline) \ 1000 SLLIST_IS_EMPTY(&(pipeline)->requests) ? NULL : \ 1001 SLLIST_ITEM(SLLIST_FIRST(&(pipeline)->requests), mc_PACKET, slnode) 1002 1003 /* Increment a metric */ 1004 #define MC_INCR_METRIC(pipeline, metric, amount) do { \ 1005 if ((pipeline)->metrics) { \ 1006 (pipeline)->metrics->metric += amount; \ 1007 } \ 1008 } while (0) 1009 1010 /**@}*/ 1011 1012 #ifdef __cplusplus 1013 } 1014 #endif /** __cplusplus */ 1015 #endif /* LCB_MCREQ_H */ 1016