1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef LCB_MCREQ_H
19 #define LCB_MCREQ_H
20 
21 #include <libcouchbase/couchbase.h>
22 #include <libcouchbase/api3.h>
23 #include <libcouchbase/vbucket.h>
24 #include <memcached/protocol_binary.h>
25 #include <libcouchbase/metrics.h>
26 #include "netbuf/netbuf.h"
27 #include "sllist.h"
28 #include "config.h"
29 #include "packetutils.h"
30 
31 #ifdef __cplusplus
32 extern "C" {
33 #endif /** __cplusplus */
34 
35 /**
36  * @file
37  * @brief Core memcached client routines
38  */
39 
40 /**
41  * @defgroup mcreq Memcached Packets
42  *
43  * @brief
44  * This module defines the core routines which are used to construct, handle,
45  * and enqueue packets. They also handle the retry mechanisms.
46  *
47  *
48  * # Initializing the Queue
49  *
50  * Using the mcreq system involves first establishing an mc_CMDQUEUE structure.
51  * This structure contains several mc_PIPELINE structures. The proper way to
52  * initialize the mc_CMDQEUE structure is to call mcreq_queue_init().
53  *
54  * Once the queue has been initialized, it must be assigned a
55  * `lcbvb_CONFIG*` (which it will _not_ own). This is done via the
56  * mcreq_queue_add_pipelines(). This function takes an array of pipeline pointers,
57  * and this will typically be a "subclass" (mc_SERVER) allocated via
58  * mcserver_alloc()
59  *
60  * Once the pipelines have been established, operations may be scheduled and
61  * distributed across the various pipelines.
62  *
63  * # Creating a Packet
64  *
65  * For each packet sent, the packet should first be reserved via the
66  * mcreq_basic_packet() call which allocates space for the actual packet
67  * as well as provides and populates the vbucket fields as needed.
68  *
69  * The header size must be the total size of the header plus any extras
70  * following the header but before the actual key data.
71  *
72  * If the command carries a body in addition to the key, it should be provided
73  * via mcreq_reserve_value().
74  *
75  * Once the packet has a key and value it must be assigned a cookie. The
76  * cookie may either be of a simple embedded type or an extended type. Whatever
77  * the case the appropriate flags should be set.
78  *
79  * # Scheduling Commands
80  *
81  * Scheduling commands is performed in an _enter_ and _leave_ sequence.
82  * mcreq_sched_enter() should be called before one or more commands are added.
83  * Then for each new command added, mcreq_sched_add() should be invoked with
84  * the new packet, and finally either mcreq_sched_leave() or mcreq_sched_fail()
85  * should be invoked to flush the commands to the network or free the resources
86  * allocated. In both cases the commands affected are scoped by the last call
87  * to mcreq_sched_enter().
88  *
89  * In order for commands to actually be flushed, the mc_PIPELINE::flush_start
90  * field must be set. This can vary depending on what the state of the underlying
91  * socket is. In server.c for example, the initial callback just schedules a
92  * connection. While the connection is in progress this field is set to a no-op
93  * callback, and finally when the socket is connected this field is set to
94  * interact with the I/O system which actually writes the buffers.
95  *
96  * # Flushing Responses
97  *
98  * This module does not do network I/O by design. Its only bridge is the
99  * mc_PIPELINE::flush_start function which should be set to actually flush
100  * the data.
101  *
102  * # Handling Reponses
103  *
104  * The I/O system reading the responses should place the response into a
105  * packet_info structure. Once this is done, the request for the response must
106  * be found using the opaque. This may be done with mcreq_pipeline_find()
107  * or mcreq_pipeline_remove() depending on whether this request expects multiple
108  * responses (such as the 'stat' command). These parameters should be passed
109  * to the mcreq_dispatch_response() function which will invoke the appropriate
110  * user-defined handler for it.
111  *
112  * If the packet does not expect more responses (as above), the application
113  * should call mcreq_packet_handled()
114  *
115  *
116  * # Error Handling and Failing Commands
117  *
118  * This module offers facilities for failing commands from a pipeline while
119  * safely allowing for their sharing of user-allocated data.
120  *
121  * The mcreq_pipeline_fail() and mcreq_pipeline_timeout() will fail packets
122  * in a single pipeline (the former failing all packets, the latter failing
123  * only packets older than a specified threshold).
124  *
125  * The mcreq_iterwipe() will clean a pipeline of its packets, invoking a
126  * callback which allows the user to relocate the packet to another pipeline.
127  * In this callback the user may invoke the mcreq_renew_packet() function to
128  * create a copy of the packet, keeping the previous packet in tact, but
129  * returning a copy of the packet as the 'primary' version.
130  *
131  * @addtogroup mcreq
132  * @{
133  */
134 
135 
136 /**
137  * @name Core Packet Structure
138  * @{
139  */
140 
141 /** @brief Constant defining the size of a memcached header */
142 #define MCREQ_PKT_BASESIZE 24
143 
144 /** @brief Embedded user data for a simple request. */
145 typedef struct mc_REQDATA {
146     const void *cookie; /**< User pointer to place in callbacks */
147     hrtime_t start; /**< Time of the initial request. Used for timeouts */
148     /**
149      * Time when dispatching response has begun for the command.
150      * Used for metrics/tracing. Might be zero, when tracing is not enabled.
151      */
152     hrtime_t dispatch;
153 #ifdef LCB_TRACING
154     lcbtrace_SPAN *span;
155 #endif
156 } mc_REQDATA;
157 
158 struct mc_packet_st;
159 struct mc_pipeline_st;
160 
161 /** This structure serves as a kind of 'vtable' for the mc_REQDATAEX structure. */
162 typedef struct {
163     /**
164      * Callback to be invoked for "Extended" packet handling. This is only
165      * available in the mc_REQDATAEX structure
166      * @param pipeline the pipeline on which the response was received
167      * @param pkt the request packet
168      * @param rc the error code for the response
169      * @param arg opaque pointer for callback
170      */
171     void (*handler)(struct mc_pipeline_st *pipeline,
172             struct mc_packet_st *pkt, lcb_error_t rc, const void *res);
173 
174     /**
175      * Destructor function called from within mcreq_sched_fail() for packets with
176      * extended data. This function should suitably free the data for the packet,
177      * if any.
178      * @param pkt The packet being unscheduled.
179      */
180     void (*fail_dtor)(struct mc_packet_st *pkt);
181 } mc_REQDATAPROCS;
182 
183 /**@brief Allocated user data for an extended request.
184  *
185  * @details
186  * An extended request is typically used by commands which have more complex
187  * handling requirements, such as mapping a single user API call to multiple
188  * packets, or when the packet itself is generated internally rather than
189  * on behalf of an API request.
190  */
191 typedef struct mc_REQDATAEX {
192     const void *cookie; /**< User data */
193     hrtime_t start; /**< Start time */
194     /**
195      * Time when dispatching response has begun for the command.
196      * Used for metrics/tracing. Might be zero, when tracing is not enabled.
197      */
198     hrtime_t dispatch;
199 #ifdef LCB_TRACING
200     lcbtrace_SPAN *span;
201 #endif
202     const mc_REQDATAPROCS *procs; /**< Common routines for the packet */
203 
204     #ifdef __cplusplus
mc_REQDATAEXmc_REQDATAEX205     mc_REQDATAEX(const void *cookie_, const mc_REQDATAPROCS &procs_, hrtime_t start_)
206         : cookie(cookie_), start(start_), dispatch(0),
207 #ifdef LCB_TRACING
208         span(NULL),
209 #endif
210         procs(&procs_)
211     {
212     }
213     #endif
214 } mc_REQDATAEX;
215 
216 /**
217  * Called when the buffers for a packet have been invoked
218  * @param pl the pipeline
219  * @param ucookie the cookie passed to the scheduler
220  * @param kbuf the pointer to the beginning of the key/header buffer, if
221  *        passed to the scheduler
222  * @param vbuf the pointer to the beginning of the value buffer or the first
223  *        IOV within the buffer.
224  */
225 typedef void (*mcreq_bufdone_fn)(struct mc_pipeline_st *pl,
226         const void *ucookie, void *kbuf, void *vbuf);
227 
228 /**
229  * Possible values for the mc_PACKET#flags field in the packet structure.
230  * These provide
231  * information as to which fields in the various unions are in use, and how
232  * to allocate/release data buffers for requests.
233  */
234 typedef enum {
235     /** The key is user-allocated. Do not release to MBLOCK */
236     MCREQ_F_KEY_NOCOPY = 1 << 0,
237 
238     /** The value is user-allocated. Do not release to MBLOCK */
239     MCREQ_F_VALUE_NOCOPY = 1 << 1,
240 
241     /**
242      * The value is user allocated and in the form of an IOV.
243      * Use mc_VALUE#multi
244      */
245     MCREQ_F_VALUE_IOV = 1 << 2,
246 
247     /** The request has a value. Use mc_VALUE#single unless otherwise noted */
248     MCREQ_F_HASVALUE = 1 << 3,
249 
250     /**
251      * The request is tied to an 'extended' user data structure.
252      * Use mc_USER#exdata
253      */
254     MCREQ_F_REQEXT = 1 << 4,
255 
256     /** The request is a one-to-one user forwarded packet */
257     MCREQ_F_UFWD = 1 << 5,
258 
259     /**
260      * Indicates that the entire packet has been flushed. Specifically this
261      * also indicates that the packet's underlying buffers are no longer needed
262      * by libcouchbase.
263      */
264     MCREQ_F_FLUSHED = 1 << 6,
265 
266     /**
267      * Indicates that the callback should NOT be invoked for the request. This
268      * is typically because the request is just present in the queue for buffer
269      * management purposes and has expired or otherwise been invalidated.
270      */
271     MCREQ_F_INVOKED = 1 << 7,
272 
273     /**
274      * Indicates that this packet and its constituent data members are not
275      * part of a nb_MBLOCK but rather point to standalone malloc'd memory. This
276      * also indicates that the packet is actually an mc_EXPACKET extended
277      * type. This is set by mcreq_renew_packet()
278      */
279     MCREQ_F_DETACHED = 1 << 8,
280 
281     /**
282      * Another way of signalling that the callback has an 'internal' variant.
283      * Dispatching this command requires a specially formatted cookie object,
284      * which itself is expected to _contain_ a pointer to the callback, and
285      * thus be formatted like so:
286      * @code{.c}
287      * struct {
288      *   lcb_RESPCALLBACK callback;
289      * };
290      * @endcode
291      */
292     MCREQ_F_PRIVCALLBACK = 1 << 9
293 } mcreq_flags;
294 
295 /** @brief mask of flags indicating user-allocated buffers */
296 #define MCREQ_UBUF_FLAGS (MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY)
297 /** @brief mask of flags indicating response state of the packet */
298 #define MCREQ_STATE_FLAGS (MCREQ_F_INVOKED|MCREQ_F_FLUSHED)
299 
300 /** Union representing the value within a packet */
301 union mc_VALUE {
302     /** For a single contiguous value */
303     nb_SPAN single;
304 
305     /** For a set of multiple IOV buffers */
306     lcb_FRAGBUF multi;
307 };
308 
309 /** Union representing application/command data within a packet structure */
310 union mc_USER {
311     /** Embedded command info for simple commands; 16 bytes, 48B */
312     mc_REQDATA reqdata;
313 
314     /** Pointer to extended data */
315     mc_REQDATAEX *exdata;
316 };
317 
318 /**
319  * @brief Packet structure for a single Memcached command
320  *
321  * A single packet structure is allocated for each request
322  * sent to a server. A packet structure may be associated with user data in the
323  * u_rdata union field, either by using the embedded structure, or by referencing
324  * an allocated chunk of 'extended' user data.
325  */
326 typedef struct mc_packet_st {
327     /** Node in the linked list for logical command ordering */
328     sllist_node slnode;
329 
330     /**
331      * Node in the linked list for actual output ordering.
332      * @see netbuf_end_flush2(), netbuf_pdu_enqueue()
333      */
334     sllist_node sl_flushq;
335 
336     /** Span for key and header */
337     nb_SPAN kh_span;
338 
339     /** Extras length */
340     uint8_t extlen;
341 
342     /** Retries */
343     uint8_t retries;
344 
345     /** flags for request. @see mcreq_flags */
346     uint16_t flags;
347 
348     /** Cached opaque value */
349     uint32_t opaque;
350 
351     /** User/CMDAPI Data */
352     union mc_USER u_rdata;
353 
354     /** Value data */
355     union mc_VALUE u_value;
356 
357     /** Allocation data for the PACKET structure itself */
358     nb_MBLOCK *alloc_parent;
359 } mc_PACKET;
360 
361 
362 /**
363  * @brief Gets the request data from the packet structure itself
364  * @return an mc_REQDATA or mc_REQDATAEX pointer
365  */
366 #define MCREQ_PKT_RDATA(pkt) \
367     (((pkt)->flags & MCREQ_F_REQEXT) \
368         ? ((mc_REQDATA *)(pkt)->u_rdata.exdata) \
369         : (&(pkt)->u_rdata.reqdata))
370 
371 /**
372  * @brief Retrieve the cookie pointer from a packet
373  * @param pkt
374  */
375 #define MCREQ_PKT_COOKIE(pkt) MCREQ_PKT_RDATA(pkt)->cookie
376 
377 /**@}*/
378 
379 /**
380  * Callback invoked when APIs request that a pipeline start flushing. It
381  * receives a pipeline object as its sole argument.
382  */
383 typedef void (*mcreq_flushstart_fn)(struct mc_pipeline_st *pipeline);
384 
385 /**
386  * @brief Structure representing a single input/output queue for memcached
387  *
388  * Memcached request pipeline. This contains the command log for
389  * sending/receiving requests. This is basically the non-I/O part of the server
390  */
391 typedef struct mc_pipeline_st {
392     /** List of requests. Newer requests are appended at the end */
393     sllist_root requests;
394 
395     /** Parent command queue */
396     struct mc_cmdqueue_st *parent;
397 
398     /**
399      * Flush handler. This is invoked to schedule a flush operation
400      * the socket
401      */
402     mcreq_flushstart_fn flush_start;
403 
404     /** Index of this server within the configuration map */
405     int index;
406 
407     /**
408      * Intermediate queue where pending packets are placed. Moved to
409      * the `requests` list when mcreq_sched_leave() is called
410      */
411     sllist_root ctxqueued;
412 
413     /**
414      * Callback invoked for each packet (which has user-defined buffers) when
415      * it is no longer required
416      */
417     mcreq_bufdone_fn buf_done_callback;
418 
419     /** Buffer manager for the respective requests. */
420     nb_MGR nbmgr;
421 
422     /** Allocator for packet structures */
423     nb_MGR reqpool;
424 
425     /** Optional metrics structure for server */
426     struct lcb_SERVERMETRICS_st *metrics;
427 } mc_PIPELINE;
428 
429 typedef struct mc_cmdqueue_st {
430     /** Indexed pipelines, i.e. server map target */
431     mc_PIPELINE **pipelines;
432 
433     /**
434      * Small array of size npipelines, for mcreq_sched_enter()/mcreq_sched_leave()
435      * stuff. See those functions for usage
436      */
437     char *scheds;
438 
439     /**
440      * Whether a context is currently entered (i.e. sched_enter())
441      */
442     unsigned ctxenter;
443 
444     /** Number of pipelines in the queue */
445     unsigned npipelines;
446 
447     /** Number of pipelines, with fallback included */
448     unsigned _npipelines_ex;
449 
450     /** Sequence number for pipeline. Incremented for each new packet */
451     uint32_t seq;
452 
453     /** Configuration handle for vBucket mapping */
454     lcbvb_CONFIG* config;
455 
456     /** Opaque pointer to be used by the application (in this case, lcb core) */
457     void* cqdata;
458 
459     /**Special pipeline used to contain orphaned packets within a scheduling
460      * context. This field is used by mcreq_set_fallback_handler() */
461     mc_PIPELINE *fallback;
462 } mc_CMDQUEUE;
463 
464 /**
465  * Allocate a packet belonging to a specific pipeline.
466  * @param pipeline the pipeline to allocate against
467  * @return a new packet structure or NULL on error
468  */
469 mc_PACKET *
470 mcreq_allocate_packet(mc_PIPELINE *pipeline);
471 
472 
473 /**
474  * Free the packet structure. This will simply free the skeleton structure.
475  * The underlying members will not be touched.
476  * @param pipeline the pipleine which was used to allocate the packet
477  * @param packet the packet to release
478  */
479 void
480 mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet);
481 
482 struct mc_epkt_datum;
483 
484 /**
485  * Extended packet structure. This is returned by mcreq_renew_packet().
486  *
487  * The purpose of this structure is to be able to "tag" extra data to the packet
488  * (typically for retries, or "special" commands) without weighing down on the
489  * normal packet structure; thus it should be considered a 'subclass' of the
490  * normal packet structure.
491  */
492 typedef struct mc_expacket_st {
493     /** The base packet structure */
494     mc_PACKET base;
495     /* Additional data for the packet itself */
496     sllist_root data;
497 } mc_EXPACKET;
498 
499 typedef struct mc_epkt_datum {
500     sllist_node slnode;
501 
502     /**Unique string key by which this datum will be identified, as more
503      * than a single datum can exist for a packet */
504     const char *key;
505 
506     /**Free the data structure
507      * @param datum the datum object */
508     void (*dtorfn)(struct mc_epkt_datum *datum);
509 } mc_EPKTDATUM;
510 
511 /**
512  * Detatches the packet src belonging to the given pipeline. A detached
513  * packet has all its data allocated via malloc and does not belong to
514  * any particular buffer. This is typically used for relocation or retries
515  * where it is impractical to affect the in-order netbuf allocator.
516  *
517  * @param src the source packet to copy
518  * @return a new packet structure. You should still clear the packet's data
519  * with wipe_packet/release_packet but you may pass NULL as the pipeline
520  * parameter.
521  *
522  * @attention
523  * Any 'Extended' packet data is **MOVED** from the source to the destination
524  * packet. This goes well with the typical use case of this function, which is
525  * not to actually duplicate the packet, but rather to provide a fresh copy
526  * which may be re-used.
527  *
528  * @attention
529  * This function attempts to be "dumb" in the sense of trying to make an
530  * exact effective clone of the original packet (the main goal of this function
531  * is to move the resources of the packet over to a new block of memory). This
532  * means things like non-buffer-related flags (i.e. the ones not specifying
533  * the layout of the buffer) are _preserved_, including the so-called
534  * "state flags" which indicate if a packet has been flushed and/or handled. If
535  * calling this function to retry a packet, ensure to clear these state flags.
536  */
537 mc_PACKET *
538 mcreq_renew_packet(const mc_PACKET *src);
539 
540 /**
541  * Associates a datum with the packet. The packet must be a standalone packet,
542  * indicated by the MCREQ_F_DETACHED flag in the mc_PACKET::flags field.
543  * @param ep The packet to which the data should be added
544  * @param datum The datum object to add. The object is not copied and should
545  *  not be freed until the `dtorfn` or `copyfn` functions have been called
546  * @return 0 on success, nonzero on failure (i.e. if packet is not detached).
547  */
548 int
549 mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum);
550 
551 /**
552  * Locate the datum associated with the given key for the packet.
553  * @param ep The packet in which to search
554  * @param key A NUL-terminated string matching the mc_EPKTDATUM::key field
555  * @return The datum, or NULL if it does not exist.
556  */
557 mc_EPKTDATUM *
558 mcreq_epkt_find(mc_EXPACKET *ep, const char *key);
559 
560 /**
561  * Reserve the packet's basic header structure, this is for use for frames
562  * which do not contain keys, or contain fixed size data which does not
563  * need to be returned via get_key
564  * @param pipeline the pipeline to use
565  * @param packet the packet which should contain the header
566  * @param hdrsize the total size of the header+extras+key
567  */
568 lcb_error_t
569 mcreq_reserve_header(
570         mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize);
571 
572 /**
573  * Initialize the given packet's key structure
574  * @param pipeline the pipeline used to allocate the packet
575  * @param packet the packet which should have its key field initialized
576  * @param hdrsize the size of the header before the key. This should contain
577  *        the header size (i.e. 24 bytes) PLUS any extras therein.
578  * @param kreq the user-provided key structure
579  * @return LCB_SUCCESS on success, LCB_CLIENT_ENOMEM on allocation failure
580  */
581 lcb_error_t
582 mcreq_reserve_key(
583         mc_PIPELINE *pipeline, mc_PACKET *packet,
584         uint8_t hdrsize, const lcb_KEYBUF *kreq);
585 
586 
587 /**
588  * Initialize the given packet's value structure. Only applicable for storage
589  * operations.
590  * @param pipeline the pipeline used to allocate the packet
591  * @param packet the packet whose value field should be initialized
592  * @param vreq the user-provided structure containing the value parameters
593  * @return LCB_SUCCESS on success, LCB_CLIENT_ENOMEM on allocation failure
594  */
595 lcb_error_t
596 mcreq_reserve_value(mc_PIPELINE *pipeline, mc_PACKET *packet,
597                     const lcb_VALBUF *vreq);
598 
599 /**
600  * Reserves value/body space, but doesn't actually copy the contents over
601  * @param pipeline the pipeline to use
602  * @param packet the packet to host the value
603  * @param n the number of bytes to reserve
604  */
605 lcb_error_t
606 mcreq_reserve_value2(mc_PIPELINE *pipeline, mc_PACKET *packet, lcb_size_t n);
607 
608 
609 /**
610  * Enqueue the packet to the pipeline. This packet should have fully been
611  * initialized. Specifically, the packet's data buffer contents (i.e. key,
612  * header, and value) must not be modified once this function is called
613  *
614  * @param pipeline the target pipeline that the packet will be queued in
615  * @param packet the packet to enqueue.
616  * This function always succeeds.
617  */
618 void
619 mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet);
620 
621 /**
622  * Like enqueue packet, except it will also inspect the packet's timeout field
623  * and if necessary, restructure the command inside the request list so that
624  * it appears before newer commands.
625  *
626  * The default enqueue_packet() just appends the command to the end of the
627  * queue while this will perform an additional check (and is less efficient)
628  */
629 void
630 mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet);
631 
632 /**
633  * Wipe the packet's internal buffers, releasing them. This should be called
634  * when the underlying data buffer fields are no longer needed, usually this
635  * is called directly before release_packet.
636  * Note that release_packet should be called to free the storage for the packet
637  * structure itself.
638  * @param pipeline the pipeline structure used to allocate this packet
639  * @param packet the packet to wipe.
640  */
641 void
642 mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet);
643 
644 /**
645  * Function to extract mapping information given a key and a hashkey
646  * @param queue The command queue
647  * @param key The structure for the key
648  * @param hashkey The optional hashkey structure
649  * @param nhdr The size of the header (for KV_CONTIG)
650  * @param[out] vbid The vBucket ID
651  * @param[out] srvix The master server's index
652  */
653 void
654 mcreq_map_key(mc_CMDQUEUE *queue,
655     const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey,
656     unsigned nhdr, int *vbid, int *srvix);
657 
658 
659 /**If the packet's vbucket does not have a master node, use the fallback pipeline
660  * and let it be handled by the handler installed via mcreq_set_fallback_handler()
661  */
662 #define MCREQ_BASICPACKET_F_FALLBACKOK 0x01
663 
664 /**
665  * Handle the basic requirements of a packet common to all commands
666  * @param queue the queue
667  * @param cmd the command base structure
668  *
669  * @param[out] req the request header which will be set with key, vbucket, and extlen
670  *        fields. In other words, you do not need to initialize them once this
671  *        function has completed.
672  *
673  * @param extlen the size of extras for this command
674  * @param[out] packet a pointer set to the address of the allocated packet
675  * @param[out] pipeline a pointer set to the target pipeline
676  * @param options a set of options to control creation behavior. Currently the
677  * only recognized options are `0` (i.e. default options), or @ref
678  * MCREQ_BASICPACKET_F_FALLBACKOK
679  */
680 
681 lcb_error_t
682 mcreq_basic_packet(
683         mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd,
684         protocol_binary_request_header *req, uint8_t extlen,
685         mc_PACKET **packet, mc_PIPELINE **pipeline, int options);
686 
687 /**
688  * @brief Get the key from a packet
689  * @param[in] packet The packet from which to retrieve the key
690  * @param[out] key
691  * @param[out] nkey
692  */
693 void
694 mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey);
695 
696 /** @brief Returns the size of the entire packet, in bytes */
697 uint32_t
698 mcreq_get_bodysize(const mc_PACKET *packet);
699 
700 /**
701  * @brief get the total packet size (header+body)
702  * @param packet the packet
703  * @return the total size
704  */
705 uint32_t
706 mcreq_get_size(const mc_PACKET *packet);
707 
708 /**
709  * @brief Get the vBucket for the request
710  * @param packet The packet
711  * @return The vBucket ID from the packet.
712  */
713 uint16_t
714 mcreq_get_vbucket(const mc_PACKET *packet);
715 
716 /** Initializes a single pipeline object */
717 int
718 mcreq_pipeline_init(mc_PIPELINE *pipeline);
719 
720 /** Cleans up any initialization from pipeline_init */
721 void
722 mcreq_pipeline_cleanup(mc_PIPELINE *pipeline);
723 
724 
725 /**
726  * Set the pipelines that this queue will manage
727  * @param queue the queue to take the pipelines
728  * @param pipelines an array of pipeline pointers. The array is copied
729  * @param npipelines number of pipelines in the queue
730  * @param config the configuration handle. The configuration is _not_ owned
731  *        and _not_ copied and the caller must ensure it remains valid
732  *        until it is replaces.
733  */
734 void
735 mcreq_queue_add_pipelines(
736         mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines,
737         unsigned npipelines, lcbvb_CONFIG* config);
738 
739 
740 /**
741  * Set the arra
742  * @param queue the queue
743  * @param count a pointer to the number of pipelines within the queue
744  * @return the pipeline array.
745  *
746  * When this function completes another call to add_pipelines must be performed
747  * in order for the queue to function properly.
748  */
749 mc_PIPELINE **
750 mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count);
751 
752 int
753 mcreq_queue_init(mc_CMDQUEUE *queue);
754 
755 void
756 mcreq_queue_cleanup(mc_CMDQUEUE *queue);
757 
758 /**
759  * @brief Add a packet to the current scheduling context
760  * @param pipeline
761  * @param pkt
762  * @see mcreq_sched_enter()
763  */
764 void
765 mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt);
766 
767 /**
768  * @brief enter a scheduling scope
769  * @param queue
770  * @attention It is not safe to call this function twice
771  * @volatile
772  */
773 void
774 mcreq_sched_enter(struct mc_cmdqueue_st *queue);
775 
776 /**
777  * @brief successfully exit a scheduling scope
778  *
779  * All operations enqueued since the last call to mcreq_sched_enter() will be
780  * placed in their respective pipelines' operation queue.
781  *
782  * @param queue
783  * @param do_flush Whether the items in the queue should be flushed
784  * @volatile
785  */
786 void
787 mcreq_sched_leave(struct mc_cmdqueue_st *queue, int do_flush);
788 
789 /**
790  * @brief destroy all operations within the scheduling scope
791  * All operations enqueued since the last call to mcreq_sched_enter() will
792  * be destroyed
793  * @param queue
794  */
795 void
796 mcreq_sched_fail(struct mc_cmdqueue_st *queue);
797 
798 /**
799  * Find a packet with the given opaque value
800  */
801 mc_PACKET *
802 mcreq_pipeline_find(mc_PIPELINE *pipeline, uint32_t opaque);
803 
804 /**
805  * Find and remove the packet with the given opaque value
806  */
807 mc_PACKET *
808 mcreq_pipeline_remove(mc_PIPELINE *pipeline, uint32_t opaque);
809 
810 /**
811  * Handles a received packet in response to a command
812  * @param pipeline the pipeline upon which the request was received
813  * @param request the original request
814  * @param response the packet received in the response
815  * @param immerr an immediate error message. If this is not LCB_SUCCESS then
816  *  the packet in `response` shall contain only a header and the request itself
817  *  should be analyzed
818  *
819  * @return 0 on success, nonzero if the handler could not be found for the
820  * command.
821  */
822 int
823 mcreq_dispatch_response(mc_PIPELINE *pipeline, mc_PACKET *request,
824                         packet_info *response, lcb_error_t immerr);
825 
826 
827 #define MCREQ_KEEP_PACKET 1
828 #define MCREQ_REMOVE_PACKET 2
829 /**
830  * Callback used for packet iteration wiping
831  *
832  * @param queue the queue
833  * @param srcpl the source pipeline which is being cleaned
834  * @param pkt the packet which is being cleaned
835  * @param cbarg the argument passed to the iterwipe
836  *
837  * @return one of MCREQ_KEEP_PACKET (if the packet should be kept inside the
838  * pipeline) or MCREQ_REMOVE_PACKET (if the packet should not be kept)
839  */
840 typedef int (*mcreq_iterwipe_fn)
841         (mc_CMDQUEUE *queue, mc_PIPELINE *srcpl, mc_PACKET *pkt, void *cbarg);
842 /**
843  * Wipe a single pipeline. This may be used to move and/or relocate
844  * existing commands to other pipelines.
845  * @param queue the queue to use
846  * @param src the pipeline to wipe
847  * @param callback the callback to invoke for each packet
848  * @param arg the argument passed to the callback
849  */
850 void
851 mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src,
852                mcreq_iterwipe_fn callback, void *arg);
853 
854 
855 /**
856  * Called when a packet does not need to have any more references to it
857  * remaining. A packet by itself only has two implicit references; one is
858  * a flush reference and the other is a handler reference.
859  *
860  * The flush reference is unset once the packet has been flushed and the
861  * handler reference is unset once the packet's handler callback has been
862  * invoked and the relevant data relayed to the user.
863  *
864  * Once this function is called, the packet passed will no longer be valid
865  * and thus should not be used.
866  */
867 void
868 mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt);
869 
870 /**
871  * @brief Indicate that the packet was handled
872  * @param pipeline the pipeline
873  * @param pkt the packet which was handled
874  * If the packet has also been flushed, the packet's storage will be released
875  * and `pkt` will no longer point to valid memory.
876  */
877 #define mcreq_packet_handled(pipeline, pkt) do { \
878     (pkt)->flags |= MCREQ_F_INVOKED; \
879     if ((pkt)->flags & MCREQ_F_FLUSHED) { \
880         mcreq_packet_done(pipeline, pkt); \
881     } \
882 } while (0);
883 
884 /**
885  * Reset the timeout (or rather, the start time) on all pending packets
886  * to the time specified.
887  *
888  * @param pl The pipeline
889  * @param nstime The new timestamp to use.
890  */
891 void
892 mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime);
893 
894 /**
895  * Callback to be invoked when a packet is about to be failed out from the
896  * request queue. This should be used to possibly invoke handlers. The packet
897  * will then be removed from the queue.
898  * @param pipeline the pipeline which has been errored
899  * @param packet the current packet
900  * @param err the error received
901  * @param arg an opaque pointer
902  */
903 typedef void (*mcreq_pktfail_fn)
904         (mc_PIPELINE *pipeline, mc_PACKET *packet, lcb_error_t err, void *arg);
905 
906 /**
907  * Fail out a given pipeline. All commands in the pipeline will be removed
908  * from the pipeline (though they may still not be freed if they are pending
909  * a flush).
910  *
911  * @param pipeline the pipeline to fail out
912  * @param err the error which caused the failure
913  * @param failcb a callback invoked to handle each failed packet
914  * @param cbarg a pointer passed as the last parameter to the callback
915  *
916  * @return the number of items actually failed.
917  */
918 unsigned
919 mcreq_pipeline_fail(
920         mc_PIPELINE *pipeline, lcb_error_t err,
921         mcreq_pktfail_fn failcb, void *cbarg);
922 
923 /**
924  * Fail out all commands in the pipeline which are older than a specified
925  * interval. This is similar to the pipeline_fail() function except that commands
926  * which are newer than the threshold are still kept
927  *
928  * @param pipeline the pipeline to fail out
929  * @param err the error to provide to the handlers (usually LCB_ETIMEDOUT)
930  * @param failcb the callback to invoke
931  * @param cbarg the last argument to the callback
932  * @param oldest_valid the _oldest_ time for a command to still be valid
933  * @param oldest_start set to the start time of the _oldest_ command which is
934  *        still valid.
935  *
936  * @return the number of commands actually failed.
937  */
938 unsigned
939 mcreq_pipeline_timeout(
940         mc_PIPELINE *pipeline, lcb_error_t err,
941         mcreq_pktfail_fn failcb, void *cbarg,
942         hrtime_t oldest_valid,
943         hrtime_t *oldest_start);
944 
945 /**
946  * This function is called when a packet could not be properly mapped to a real
947  * pipeline
948  * @param cq the command queue
949  * @param pkt the packet which needs to be relocated. The packet needs to be
950  * properly copied via mcreq_renew_packet()
951  */
952 typedef void (*mcreq_fallback_cb)(mc_CMDQUEUE *cq, mc_PACKET *pkt);
953 
954 /**
955  * Set the callback function to be invoked when a packet could not be properly
956  * mapped to a node. The callback function is invoked from within the
957  * mcreq_sched_leave() function.
958  *
959  * The handler should be assigned only once, during initialization
960  *
961  * @param cq The command queue
962  * @param handler The handler to invoke
963  */
964 void
965 mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler);
966 
967 /**
968  * Callback used by mcreq_dump_packet() and mcreq_dump_chain() to format the
969  * packet's payload
970  * @param data the data to dump
971  * @param size the size of the data
972  * @param fp the file to write the output to
973  */
974 typedef void (*mcreq_payload_dump_fn)
975         (const void *data, unsigned size, FILE *fp);
976 
977 /**
978  * Dumps a single packet to the file indicated by `fp`
979  * @param pkt the packet to dump
980  * @param fp The file to write to
981  * @param dumpfn If specified, this function is called to handle the packet's
982  *  header and payload body
983  */
984 void
985 mcreq_dump_packet(const mc_PACKET *pkt, FILE *fp, mcreq_payload_dump_fn dumpfn);
986 
987 void
988 mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn);
989 
990 #define mcreq_write_hdr(pkt, hdr) \
991         memcpy( SPAN_BUFFER(&(pkt)->kh_span), (hdr)->bytes, sizeof((hdr)->bytes) )
992 
993 #define mcreq_write_exhdr(pkt, hdr, n) \
994         memcpy(SPAN_BUFFER((&pkt)->kh_span), (hdr)->bytes, n)
995 
996 #define mcreq_read_hdr(pkt, hdr) \
997         memcpy( (hdr)->bytes, SPAN_BUFFER(&(pkt)->kh_span), sizeof((hdr)->bytes) )
998 
999 #define mcreq_first_packet(pipeline) \
1000         SLLIST_IS_EMPTY(&(pipeline)->requests) ? NULL : \
1001                 SLLIST_ITEM(SLLIST_FIRST(&(pipeline)->requests), mc_PACKET, slnode)
1002 
1003 /* Increment a metric */
1004 #define MC_INCR_METRIC(pipeline, metric, amount) do { \
1005         if ((pipeline)->metrics) { \
1006             (pipeline)->metrics->metric += amount; \
1007         } \
1008 } while (0)
1009 
1010 /**@}*/
1011 
1012 #ifdef __cplusplus
1013 }
1014 #endif /** __cplusplus */
1015 #endif /* LCB_MCREQ_H */
1016