1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "mcreq.h"
19 #include "compress.h"
20 #include "sllist-inl.h"
21 #include "internal.h"
22 
23 #define PKT_HDRSIZE(pkt) (MCREQ_PKT_BASESIZE + (pkt)->extlen)
24 
25 lcb_error_t
mcreq_reserve_header(mc_PIPELINE * pipeline,mc_PACKET * packet,uint8_t hdrsize)26 mcreq_reserve_header(
27         mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize)
28 {
29     int rv;
30     packet->extlen = hdrsize - MCREQ_PKT_BASESIZE;
31     packet->kh_span.size = hdrsize;
32     rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
33     if (rv != 0) {
34         return LCB_CLIENT_ENOMEM;
35     }
36     return LCB_SUCCESS;
37 }
38 
39 lcb_error_t
mcreq_reserve_key(mc_PIPELINE * pipeline,mc_PACKET * packet,uint8_t hdrsize,const lcb_KEYBUF * kreq)40 mcreq_reserve_key(
41         mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize,
42         const lcb_KEYBUF *kreq)
43 {
44     const struct lcb_CONTIGBUF *contig = &kreq->contig;
45     int rv;
46 
47     /** Set the key offset which is the start of the key from the buffer */
48     packet->extlen = hdrsize - MCREQ_PKT_BASESIZE;
49     packet->kh_span.size = kreq->contig.nbytes;
50 
51     if (kreq->type == LCB_KV_COPY) {
52         /**
53          * If the key is to be copied then just allocate the span size
54          * for the key+24+extras
55          */
56         packet->kh_span.size += hdrsize;
57         rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
58         if (rv != 0) {
59             return LCB_CLIENT_ENOMEM;
60         }
61 
62         /**
63          * Copy the key into the packet starting at the extras end
64          */
65         memcpy(SPAN_BUFFER(&packet->kh_span) + hdrsize,
66                contig->bytes,
67                contig->nbytes);
68 
69     } else if (kreq->type == LCB_KV_CONTIG) {
70         /**
71          * Don't do any copying.
72          * Assume the key buffer has enough space for the packet as well.
73          */
74         CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes);
75         packet->flags |= MCREQ_F_KEY_NOCOPY;
76 
77     } else {
78         /** IOVs not supported for keys */
79         return LCB_EINVAL;
80     }
81 
82     return LCB_SUCCESS;
83 }
84 
85 lcb_error_t
mcreq_reserve_value2(mc_PIPELINE * pl,mc_PACKET * pkt,lcb_size_t n)86 mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n)
87 {
88     int rv;
89     pkt->u_value.single.size = n;
90     if (!n) {
91         return LCB_SUCCESS;
92     }
93 
94     pkt->flags |= MCREQ_F_HASVALUE;
95     rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single);
96     if (rv) {
97         return LCB_CLIENT_ENOMEM;
98     }
99     return LCB_SUCCESS;
100 }
101 
102 lcb_error_t
mcreq_reserve_value(mc_PIPELINE * pipeline,mc_PACKET * packet,const lcb_VALBUF * vreq)103 mcreq_reserve_value(
104         mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq)
105 {
106     const lcb_CONTIGBUF *contig = &vreq->u_buf.contig;
107     nb_SPAN *vspan = &packet->u_value.single;
108     int rv;
109 
110     if (vreq->vtype == LCB_KV_COPY) {
111         /** Copy the value into a single SPAN */
112         if (! (vspan->size = vreq->u_buf.contig.nbytes)) {
113             return LCB_SUCCESS;
114         }
115         rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
116 
117         if (rv != 0) {
118             return LCB_CLIENT_ENOMEM;
119         }
120 
121         memcpy(SPAN_BUFFER(vspan), contig->bytes, contig->nbytes);
122 
123     } else if (vreq->vtype == LCB_KV_CONTIG) {
124         /** It's still contiguous so make it a 'standalone' span */
125         CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes);
126         packet->flags |= MCREQ_F_VALUE_NOCOPY;
127 
128     } else if (vreq->vtype == LCB_KV_IOV) {
129         /** Multiple spans, no copy */
130         unsigned int ii;
131         const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
132         lcb_FRAGBUF *mdst = &packet->u_value.multi;
133 
134         packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY;
135         mdst->niov = msrc->niov;
136         mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov));
137         mdst->total_length = 0;
138 
139         for (ii = 0; ii < mdst->niov; ii++) {
140             mdst->iov[ii] = msrc->iov[ii];
141             mdst->total_length += mdst->iov[ii].iov_len;
142         }
143     } else if (vreq->vtype == LCB_KV_IOVCOPY) {
144         /** Multiple input buffers, normal copying output buffer */
145         unsigned int ii, cur_offset;
146         const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
147 
148         if (msrc->total_length) {
149             vspan->size = msrc->total_length;
150         } else {
151             vspan->size = 0;
152             for (ii = 0; ii < msrc->niov; ii++) {
153                 vspan->size += msrc->iov[ii].iov_len;
154             }
155         }
156 
157         rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
158         if (rv != 0) {
159             return LCB_CLIENT_ENOMEM;
160         }
161 
162         for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) {
163             char *buf = SPAN_BUFFER(vspan) + cur_offset;
164             memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len);
165             cur_offset += msrc->iov[ii].iov_len;
166         }
167     }
168 
169     packet->flags |= MCREQ_F_HASVALUE;
170     return LCB_SUCCESS;
171 }
172 
173 static int
pkt_tmo_compar(sllist_node * a,sllist_node * b)174 pkt_tmo_compar(sllist_node *a, sllist_node *b)
175 {
176     mc_PACKET *pa, *pb;
177     hrtime_t tmo_a, tmo_b;
178 
179     pa = SLLIST_ITEM(a, mc_PACKET, slnode);
180     pb = SLLIST_ITEM(b, mc_PACKET, slnode);
181 
182     tmo_a = MCREQ_PKT_RDATA(pa)->start;
183     tmo_b = MCREQ_PKT_RDATA(pb)->start;
184 
185     if (tmo_a == tmo_b) {
186         return 0;
187     } else if (tmo_a < tmo_b) {
188         return -1;
189     } else {
190         return 1;
191     }
192 }
193 
194 void
mcreq_reenqueue_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)195 mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
196 {
197     sllist_root *reqs = &pipeline->requests;
198     mcreq_enqueue_packet(pipeline, packet);
199     sllist_remove(reqs, &packet->slnode);
200     sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar);
201 }
202 
203 void
mcreq_enqueue_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)204 mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
205 {
206     nb_SPAN *vspan = &packet->u_value.single;
207     sllist_append(&pipeline->requests, &packet->slnode);
208     netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span, packet);
209     MC_INCR_METRIC(pipeline, bytes_queued, packet->kh_span.size);
210 
211     if (!(packet->flags & MCREQ_F_HASVALUE)) {
212         goto GT_ENQUEUE_PDU;
213     }
214 
215     if (packet->flags & MCREQ_F_VALUE_IOV) {
216         unsigned int ii;
217         lcb_FRAGBUF *multi = &packet->u_value.multi;
218         for (ii = 0; ii < multi->niov; ii++) {
219             netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii, packet);
220             MC_INCR_METRIC(pipeline, bytes_queued, multi->iov[ii].iov_len);
221         }
222 
223     } else if (vspan->size) {
224         MC_INCR_METRIC(pipeline, bytes_queued, vspan->size);
225         netbuf_enqueue_span(&pipeline->nbmgr, vspan, packet);
226     }
227 
228     GT_ENQUEUE_PDU:
229     netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq));
230     MC_INCR_METRIC(pipeline, packets_queued, 1);
231 }
232 
233 void
mcreq_wipe_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)234 mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
235 {
236     if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) {
237         if (packet->flags & MCREQ_F_DETACHED) {
238             free(SPAN_BUFFER(&packet->kh_span));
239         } else {
240             netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
241         }
242     }
243 
244     if (! (packet->flags & MCREQ_F_HASVALUE)) {
245         return;
246     }
247 
248     if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
249         if (packet->flags & MCREQ_F_VALUE_IOV) {
250             free(packet->u_value.multi.iov);
251         }
252 
253         return;
254     }
255 
256     if (packet->flags & MCREQ_F_DETACHED) {
257         free(SPAN_BUFFER(&packet->u_value.single));
258     } else {
259         netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single);
260     }
261 
262 }
263 
264 mc_PACKET *
mcreq_allocate_packet(mc_PIPELINE * pipeline)265 mcreq_allocate_packet(mc_PIPELINE *pipeline)
266 {
267     nb_SPAN span;
268     int rv;
269     mc_PACKET *ret;
270     span.size = sizeof(*ret);
271 
272     rv = netbuf_mblock_reserve(&pipeline->reqpool, &span);
273     if (rv != 0) {
274         return NULL;
275     }
276 
277     ret = (void *) SPAN_MBUFFER_NC(&span);
278     ret->alloc_parent = span.parent;
279     ret->flags = 0;
280     ret->retries = 0;
281     ret->opaque = pipeline->parent->seq++;
282 #ifdef LCB_TRACING
283     ret->u_rdata.reqdata.span = NULL;
284 #endif
285     return ret;
286 }
287 
288 void
mcreq_release_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)289 mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
290 {
291     nb_SPAN span;
292     if (packet->flags & MCREQ_F_DETACHED) {
293         sllist_iterator iter;
294         mc_EXPACKET *epkt = (mc_EXPACKET *)packet;
295 
296         SLLIST_ITERFOR(&epkt->data, &iter) {
297             mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode);
298             sllist_iter_remove(&epkt->data, &iter);
299             d->dtorfn(d);
300         }
301         free(epkt);
302         return;
303     }
304 
305     span.size = sizeof(*packet);
306     span.parent = packet->alloc_parent;
307     span.offset = (char *)packet - packet->alloc_parent->root;
308 
309     netbuf_mblock_release(&pipeline->reqpool, &span);
310 }
311 
312 #define MCREQ_DETACH_WIPESRC 1
313 
314 mc_PACKET *
mcreq_renew_packet(const mc_PACKET * src)315 mcreq_renew_packet(const mc_PACKET *src)
316 {
317     char *kdata, *vdata;
318     unsigned nvdata;
319     mc_PACKET *dst;
320     mc_EXPACKET *edst = calloc(1, sizeof(*edst));
321 
322     dst = &edst->base;
323     *dst = *src;
324 
325     kdata = malloc(src->kh_span.size);
326     memcpy(kdata, SPAN_BUFFER(&src->kh_span), src->kh_span.size);
327     CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size);
328 
329     dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV);
330     dst->flags |= MCREQ_F_DETACHED;
331     dst->alloc_parent = NULL;
332     dst->sl_flushq.next = NULL;
333     dst->slnode.next = NULL;
334     dst->retries = src->retries;
335 
336     if (src->flags & MCREQ_F_HASVALUE) {
337         /** Get the length */
338         if (src->flags & MCREQ_F_VALUE_IOV) {
339             unsigned ii;
340             unsigned offset = 0;
341 
342             nvdata = src->u_value.multi.total_length;
343             vdata = malloc(nvdata);
344             for (ii = 0; ii < src->u_value.multi.niov; ii++) {
345                 const lcb_IOV *iov = src->u_value.multi.iov + ii;
346 
347                 memcpy(vdata + offset, iov->iov_base, iov->iov_len);
348                 offset += iov->iov_len;
349             }
350         } else {
351             protocol_binary_request_header hdr;
352             const nb_SPAN *origspan = &src->u_value.single;
353             mcreq_read_hdr(dst, &hdr);
354 
355             if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
356                 /* For compressed payloads we need to uncompress it first
357                  * because it may be forwarded to a server without compression.
358                  * TODO: might be more clever to check a setting flag somewhere
359                  * and see if we should do this. */
360 
361                 lcb_SIZE n_inflated;
362                 const void *inflated;
363                 int rv;
364 
365                 vdata = NULL;
366                 rv = mcreq_inflate_value(SPAN_BUFFER(origspan), origspan->size,
367                     &inflated, &n_inflated, (void**)&vdata);
368 
369                 lcb_assert(vdata == inflated);
370 
371                 if (rv != 0) {
372                     /* TODO: log error details when snappy will be enabled */
373                     free(edst);
374                     return NULL;
375                 }
376                 nvdata = n_inflated;
377                 hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED;
378                 hdr.request.bodylen = htonl(
379                     ntohs(hdr.request.keylen) +
380                     hdr.request.extlen +
381                     n_inflated);
382                 mcreq_write_hdr(dst, &hdr);
383 
384             } else {
385                 nvdata = origspan->size;
386                 vdata = malloc(nvdata);
387                 memcpy(vdata, SPAN_BUFFER(origspan), nvdata);
388             }
389         }
390 
391         /* Declare the value as a standalone malloc'd span */
392         CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata);
393     }
394 
395     if (src->flags & MCREQ_F_DETACHED) {
396         mc_EXPACKET *esrc = (mc_EXPACKET *)src;
397         sllist_iterator iter;
398         SLLIST_ITERFOR(&esrc->data, &iter) {
399             sllist_node *cur = iter.cur;
400             sllist_iter_remove(&esrc->data, &iter);
401             sllist_append(&edst->data, cur);
402         }
403     }
404     return dst;
405 }
406 
407 int
mcreq_epkt_insert(mc_EXPACKET * ep,mc_EPKTDATUM * datum)408 mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum)
409 {
410     if (!(ep->base.flags & MCREQ_F_DETACHED)) {
411         return -1;
412     }
413     lcb_assert(!sllist_contains(&ep->data, &datum->slnode));
414     sllist_append(&ep->data, &datum->slnode);
415     return 0;
416 }
417 
418 mc_EPKTDATUM *
mcreq_epkt_find(mc_EXPACKET * ep,const char * key)419 mcreq_epkt_find(mc_EXPACKET *ep, const char *key)
420 {
421     sllist_iterator iter;
422     SLLIST_ITERFOR(&ep->data, &iter) {
423         mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode);
424         if (!strcmp(key, d->key)) {
425             return d;
426         }
427     }
428     return NULL;
429 }
430 
431 void
mcreq_map_key(mc_CMDQUEUE * queue,const lcb_KEYBUF * key,const lcb_KEYBUF * hashkey,unsigned nhdr,int * vbid,int * srvix)432 mcreq_map_key(mc_CMDQUEUE *queue,
433     const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey,
434     unsigned nhdr, int *vbid, int *srvix)
435 {
436     const void *hk;
437     size_t nhk = 0;
438     if (hashkey) {
439         if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL) {
440             hk = hashkey->contig.bytes;
441             nhk = hashkey->contig.nbytes;
442         } else if (hashkey->type == LCB_KV_VBID) {
443             *vbid = hashkey->contig.nbytes;
444             *srvix = lcbvb_vbmaster(queue->config, *vbid);
445             return;
446         }
447     }
448     if (!nhk) {
449         if (key->type == LCB_KV_COPY) {
450             hk = key->contig.bytes;
451             nhk = key->contig.nbytes;
452         } else {
453             const char *buf = key->contig.bytes;
454             buf += nhdr;
455             hk = buf;
456             nhk = key->contig.nbytes - nhdr;
457         }
458     }
459     lcbvb_map_key(queue->config, hk, nhk, vbid, srvix);
460 }
461 
462 lcb_error_t
mcreq_basic_packet(mc_CMDQUEUE * queue,const lcb_CMDBASE * cmd,protocol_binary_request_header * req,lcb_uint8_t extlen,mc_PACKET ** packet,mc_PIPELINE ** pipeline,int options)463 mcreq_basic_packet(
464         mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd,
465         protocol_binary_request_header *req, lcb_uint8_t extlen,
466         mc_PACKET **packet, mc_PIPELINE **pipeline, int options)
467 {
468     int vb, srvix;
469 
470     if (!queue->config) {
471         return LCB_CLIENT_ETMPFAIL;
472     }
473     if (!cmd) {
474         return LCB_EINVAL;
475     }
476 
477     mcreq_map_key(queue, &cmd->key, &cmd->_hashkey,
478         sizeof(*req) + extlen, &vb, &srvix);
479     if (srvix > -1 && srvix < (int)queue->npipelines) {
480         *pipeline = queue->pipelines[srvix];
481 
482     } else {
483         if ((options & MCREQ_BASICPACKET_F_FALLBACKOK) && queue->fallback) {
484             *pipeline = queue->fallback;
485         } else {
486             return LCB_NO_MATCHING_SERVER;
487         }
488     }
489 
490     *packet = mcreq_allocate_packet(*pipeline);
491     if (*packet == NULL) {
492         return LCB_CLIENT_ENOMEM;
493     }
494 
495     mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key);
496 
497     req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet));
498     req->request.vbucket = htons(vb);
499     req->request.extlen = extlen;
500     return LCB_SUCCESS;
501 }
502 
503 void
mcreq_get_key(const mc_PACKET * packet,const void ** key,lcb_size_t * nkey)504 mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey)
505 {
506     *key = SPAN_BUFFER(&packet->kh_span) + PKT_HDRSIZE(packet);
507     *nkey = packet->kh_span.size - PKT_HDRSIZE(packet);
508 }
509 
510 lcb_uint32_t
mcreq_get_bodysize(const mc_PACKET * packet)511 mcreq_get_bodysize(const mc_PACKET *packet)
512 {
513     lcb_uint32_t ret;
514     char *retptr = SPAN_BUFFER(&packet->kh_span) + 8;
515     if ((uintptr_t)retptr % sizeof(ret) == 0) {
516         return ntohl(*(lcb_uint32_t*) (void *)retptr);
517     } else {
518         memcpy(&ret, retptr, sizeof(ret));
519         return ntohl(ret);
520     }
521 }
522 
523 uint16_t
mcreq_get_vbucket(const mc_PACKET * packet)524 mcreq_get_vbucket(const mc_PACKET *packet)
525 {
526     uint16_t ret;
527     char *retptr = SPAN_BUFFER(&packet->kh_span) + 6;
528     if ((uintptr_t)retptr % sizeof(ret) == 0) {
529         return ntohs(*(uint16_t*)(void*)retptr);
530     } else {
531         memcpy(&ret, retptr, sizeof ret);
532         return ntohs(ret);
533     }
534 }
535 
536 uint32_t
mcreq_get_size(const mc_PACKET * packet)537 mcreq_get_size(const mc_PACKET *packet)
538 {
539     uint32_t sz = packet->kh_span.size;
540     if (packet->flags & MCREQ_F_HASVALUE) {
541         if (packet->flags & MCREQ_F_VALUE_IOV) {
542             sz += packet->u_value.multi.total_length;
543         } else {
544             sz += packet->u_value.single.size;
545         }
546     }
547     return sz;
548 }
549 
550 void
mcreq_pipeline_cleanup(mc_PIPELINE * pipeline)551 mcreq_pipeline_cleanup(mc_PIPELINE *pipeline)
552 {
553     netbuf_cleanup(&pipeline->nbmgr);
554     netbuf_cleanup(&pipeline->reqpool);
555 }
556 
557 int
mcreq_pipeline_init(mc_PIPELINE * pipeline)558 mcreq_pipeline_init(mc_PIPELINE *pipeline)
559 {
560     nb_SETTINGS settings;
561 
562     /* Initialize all members to 0 */
563     memset(&pipeline->requests, 0, sizeof pipeline->requests);
564     pipeline->parent = NULL;
565     pipeline->flush_start = NULL;
566     pipeline->index = 0;
567     memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued);
568     pipeline->buf_done_callback = NULL;
569 
570     netbuf_default_settings(&settings);
571 
572     /** Initialize datapool */
573     netbuf_init(&pipeline->nbmgr, &settings);
574 
575     /** Initialize request pool */
576     settings.data_basealloc = sizeof(mc_PACKET) * 32;
577     netbuf_init(&pipeline->reqpool, &settings);;
578     pipeline->metrics = NULL;
579     return 0;
580 }
581 
582 void
mcreq_queue_add_pipelines(mc_CMDQUEUE * queue,mc_PIPELINE * const * pipelines,unsigned npipelines,lcbvb_CONFIG * config)583 mcreq_queue_add_pipelines(
584         mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines,
585         lcbvb_CONFIG* config)
586 {
587     unsigned ii;
588 
589     lcb_assert(queue->pipelines == NULL);
590     queue->npipelines = npipelines;
591     queue->_npipelines_ex = queue->npipelines;
592     queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1));
593     queue->config = config;
594 
595     memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines);
596 
597     free(queue->scheds);
598     queue->scheds = calloc(npipelines+1, 1);
599 
600     for (ii = 0; ii < npipelines; ii++) {
601         pipelines[ii]->parent = queue;
602         pipelines[ii]->index = ii;
603     }
604 
605     if (queue->fallback) {
606         queue->fallback->index = npipelines;
607         queue->pipelines[queue->npipelines] = queue->fallback;
608         queue->_npipelines_ex++;
609     }
610 }
611 
612 mc_PIPELINE **
mcreq_queue_take_pipelines(mc_CMDQUEUE * queue,unsigned * count)613 mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count)
614 {
615     mc_PIPELINE **ret = queue->pipelines;
616     *count = queue->npipelines;
617     queue->pipelines = NULL;
618     queue->npipelines = 0;
619     return ret;
620 }
621 
622 int
mcreq_queue_init(mc_CMDQUEUE * queue)623 mcreq_queue_init(mc_CMDQUEUE *queue)
624 {
625     queue->seq = 0;
626     queue->pipelines = NULL;
627     queue->scheds = NULL;
628     queue->fallback = NULL;
629     queue->npipelines = 0;
630     return 0;
631 }
632 
633 void
mcreq_queue_cleanup(mc_CMDQUEUE * queue)634 mcreq_queue_cleanup(mc_CMDQUEUE *queue)
635 {
636     if (queue->fallback) {
637         mcreq_pipeline_cleanup(queue->fallback);
638         free(queue->fallback);
639         queue->fallback = NULL;
640     }
641     free(queue->scheds);
642     free(queue->pipelines);
643     queue->pipelines = NULL;
644     queue->npipelines = 0;
645     queue->scheds = NULL;
646 }
647 
648 void
mcreq_sched_enter(mc_CMDQUEUE * queue)649 mcreq_sched_enter(mc_CMDQUEUE *queue)
650 {
651     queue->ctxenter = 1;
652 }
653 
654 
655 
656 static void
queuectx_leave(mc_CMDQUEUE * queue,int success,int flush)657 queuectx_leave(mc_CMDQUEUE *queue, int success, int flush)
658 {
659     unsigned ii;
660 
661     if (queue->ctxenter) {
662         queue->ctxenter = 0;
663     }
664 
665     for (ii = 0; ii < queue->_npipelines_ex; ii++) {
666         mc_PIPELINE *pipeline;
667         sllist_node *ll_next, *ll;
668 
669         if (!queue->scheds[ii]) {
670             continue;
671         }
672 
673         pipeline = queue->pipelines[ii];
674         ll = SLLIST_FIRST(&pipeline->ctxqueued);
675 
676         while (ll) {
677             mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode);
678             ll_next = ll->next;
679 
680             if (success) {
681                 mcreq_enqueue_packet(pipeline, pkt);
682             } else {
683                 if (pkt->flags & MCREQ_F_REQEXT) {
684                     mc_REQDATAEX *rd = pkt->u_rdata.exdata;
685                     if (rd->procs->fail_dtor) {
686                         rd->procs->fail_dtor(pkt);
687                     }
688                 }
689                 mcreq_wipe_packet(pipeline, pkt);
690                 mcreq_release_packet(pipeline, pkt);
691             }
692 
693             ll = ll_next;
694         }
695         SLLIST_FIRST(&pipeline->ctxqueued) = pipeline->ctxqueued.last = NULL;
696         if (flush) {
697             pipeline->flush_start(pipeline);
698         }
699         queue->scheds[ii] = 0;
700     }
701 }
702 
703 void
mcreq_sched_leave(mc_CMDQUEUE * queue,int do_flush)704 mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush)
705 {
706     queuectx_leave(queue, 1, do_flush);
707 }
708 
709 void
mcreq_sched_fail(mc_CMDQUEUE * queue)710 mcreq_sched_fail(mc_CMDQUEUE *queue)
711 {
712     queuectx_leave(queue, 0, 0);
713 }
714 
715 void
mcreq_sched_add(mc_PIPELINE * pipeline,mc_PACKET * pkt)716 mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt)
717 {
718     mc_CMDQUEUE *cq = pipeline->parent;
719     if (!cq->scheds[pipeline->index]) {
720         cq->scheds[pipeline->index] = 1;
721     }
722     sllist_append(&pipeline->ctxqueued, &pkt->slnode);
723 }
724 
725 static mc_PACKET *
pipeline_find(mc_PIPELINE * pipeline,lcb_uint32_t opaque,int do_remove)726 pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove)
727 {
728     sllist_iterator iter;
729     SLLIST_ITERFOR(&pipeline->requests, &iter) {
730         mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
731         if (pkt->opaque == opaque) {
732             if (do_remove) {
733                 sllist_iter_remove(&pipeline->requests, &iter);
734             }
735             return pkt;
736         }
737     }
738     return NULL;
739 }
740 
741 mc_PACKET *
mcreq_pipeline_find(mc_PIPELINE * pipeline,lcb_uint32_t opaque)742 mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
743 {
744     return pipeline_find(pipeline, opaque, 0);
745 }
746 
747 mc_PACKET *
mcreq_pipeline_remove(mc_PIPELINE * pipeline,lcb_uint32_t opaque)748 mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
749 {
750     return pipeline_find(pipeline, opaque, 1);
751 }
752 
753 void
mcreq_packet_done(mc_PIPELINE * pipeline,mc_PACKET * pkt)754 mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt)
755 {
756     lcb_assert(pkt->flags & MCREQ_F_FLUSHED);
757     lcb_assert(pkt->flags & MCREQ_F_INVOKED);
758     if (pkt->flags & MCREQ_UBUF_FLAGS) {
759         void *kbuf, *vbuf;
760         const void *cookie;
761 
762         cookie = MCREQ_PKT_COOKIE(pkt);
763         if (pkt->flags & MCREQ_F_KEY_NOCOPY) {
764             kbuf = SPAN_BUFFER(&pkt->kh_span);
765         } else {
766             kbuf = NULL;
767         }
768         if (pkt->flags & MCREQ_F_VALUE_NOCOPY) {
769             if (pkt->flags & MCREQ_F_VALUE_IOV) {
770                 vbuf = pkt->u_value.multi.iov->iov_base;
771             } else {
772                 vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single);
773             }
774         } else {
775             vbuf = NULL;
776         }
777 
778         pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf);
779     }
780     mcreq_wipe_packet(pipeline, pkt);
781     mcreq_release_packet(pipeline, pkt);
782 }
783 
784 void
mcreq_reset_timeouts(mc_PIPELINE * pl,lcb_U64 nstime)785 mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime)
786 {
787     sllist_node *nn;
788     SLLIST_ITERBASIC(&pl->requests, nn) {
789         mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode);
790         MCREQ_PKT_RDATA(pkt)->start = nstime;
791     }
792 }
793 
794 unsigned
mcreq_pipeline_timeout(mc_PIPELINE * pl,lcb_error_t err,mcreq_pktfail_fn failcb,void * cbarg,hrtime_t oldest_valid,hrtime_t * oldest_start)795 mcreq_pipeline_timeout(
796         mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg,
797         hrtime_t oldest_valid, hrtime_t *oldest_start)
798 {
799     sllist_iterator iter;
800     unsigned count = 0;
801 
802     SLLIST_ITERFOR(&pl->requests, &iter) {
803         mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
804         mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt);
805 
806         /**
807          * oldest_valid contains the LOWEST timestamp we can admit to being
808          * acceptable. If the current command is newer (i.e. has a higher
809          * timestamp) then we break the iteration and return.
810          */
811         if (oldest_valid && rd->start > oldest_valid) {
812             if (oldest_start) {
813                 *oldest_start = rd->start;
814             }
815             return count;
816         }
817 
818         sllist_iter_remove(&pl->requests, &iter);
819         failcb(pl, pkt, err, cbarg);
820         mcreq_packet_handled(pl, pkt);
821         count++;
822     }
823     return count;
824 }
825 
826 unsigned
mcreq_pipeline_fail(mc_PIPELINE * pl,lcb_error_t err,mcreq_pktfail_fn failcb,void * arg)827 mcreq_pipeline_fail(
828         mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg)
829 {
830     return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL);
831 }
832 
833 void
mcreq_iterwipe(mc_CMDQUEUE * queue,mc_PIPELINE * src,mcreq_iterwipe_fn callback,void * arg)834 mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src,
835                mcreq_iterwipe_fn callback, void *arg)
836 {
837     sllist_iterator iter;
838 
839     SLLIST_ITERFOR(&src->requests, &iter) {
840         int rv;
841         mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
842         rv = callback(queue, src, orig, arg);
843         if (rv == MCREQ_REMOVE_PACKET) {
844             sllist_iter_remove(&src->requests, &iter);
845         }
846     }
847 }
848 
849 #include "mcreq-flush-inl.h"
850 typedef struct {
851     mc_PIPELINE base;
852     mcreq_fallback_cb handler;
853 } mc_FALLBACKPL;
854 
855 static void
do_fallback_flush(mc_PIPELINE * pipeline)856 do_fallback_flush(mc_PIPELINE *pipeline)
857 {
858     nb_IOV iov;
859     unsigned nb;
860     int nused;
861     sllist_iterator iter;
862     mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline;
863 
864     while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) {
865         mcreq_flush_done(pipeline, nb, nb);
866     }
867     /* Now handle all the packets, for real */
868     SLLIST_ITERFOR(&pipeline->requests, &iter) {
869         mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
870         fpl->handler(pipeline->parent, pkt);
871         sllist_iter_remove(&pipeline->requests, &iter);
872         mcreq_packet_handled(pipeline, pkt);
873     }
874 }
875 
876 void
mcreq_set_fallback_handler(mc_CMDQUEUE * cq,mcreq_fallback_cb handler)877 mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler)
878 {
879     mc_FALLBACKPL *fallback;
880     lcb_assert(!cq->fallback);
881     fallback = calloc(1, sizeof (mc_FALLBACKPL));
882     cq->fallback = (mc_PIPELINE *)fallback;
883     mcreq_pipeline_init(cq->fallback);
884     cq->fallback->parent = cq;
885     cq->fallback->index = cq->npipelines;
886     ((mc_FALLBACKPL*)cq->fallback)->handler = handler;
887     cq->fallback->flush_start = do_fallback_flush;
888 }
889 
890 static void
noop_dumpfn(const void * d,unsigned n,FILE * fp)891 noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; }
892 
893 #define MCREQ_XFLAGS(X) \
894     X(KEY_NOCOPY) \
895     X(VALUE_NOCOPY) \
896     X(VALUE_IOV) \
897     X(HASVALUE) \
898     X(REQEXT) \
899     X(UFWD) \
900     X(FLUSHED) \
901     X(INVOKED) \
902     X(DETACHED)
903 
904 void
mcreq_dump_packet(const mc_PACKET * packet,FILE * fp,mcreq_payload_dump_fn dumpfn)905 mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn)
906 {
907     const char *indent = "  ";
908     const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet);
909 
910     if (!dumpfn) {
911         dumpfn = noop_dumpfn;
912     }
913     if (!fp) {
914         fp = stderr;
915     }
916 
917     fprintf(fp, "Packet @%p\n", (void *)packet);
918     fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque);
919 
920     fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags);
921     #define X(base) \
922     if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); }
923     MCREQ_XFLAGS(X)
924     #undef X
925     fprintf(fp, "\n");
926 
927     fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size);
928     fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE + packet->extlen);
929 
930 
931     if (packet->flags & MCREQ_F_HASVALUE) {
932         if (packet->flags & MCREQ_F_VALUE_IOV) {
933             fprintf(fp, "%sValue Length: %u\n", indent,
934                    packet->u_value.multi.total_length);
935 
936             fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent,
937                 (void *)packet->u_value.multi.iov, packet->u_value.multi.niov);
938         } else {
939             if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
940                 fprintf(fp, "%sValue is user allocated\n", indent);
941             }
942             fprintf(fp, "%sValue: %p, %u bytes\n", indent,
943                 (void *)SPAN_BUFFER(&packet->u_value.single), packet->u_value.single.size);
944         }
945     }
946 
947     fprintf(fp, "%sRDATA(%s): %p\n", indent,
948         (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata);
949 
950     indent = "    ";
951     fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start);
952     fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie);
953 
954     indent = "  ";
955     fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next);
956     if (dumpfn != noop_dumpfn) {
957         fprintf(fp, "PACKET CONTENTS:\n");
958     }
959 
960     fwrite(SPAN_BUFFER(&packet->kh_span), 1, packet->kh_span.size, fp);
961     if (packet->flags & MCREQ_F_HASVALUE) {
962         if (packet->flags & MCREQ_F_VALUE_IOV) {
963             const lcb_IOV *iovs = packet->u_value.multi.iov;
964             unsigned ii, ixmax = packet->u_value.multi.niov;
965             for (ii = 0; ii < ixmax; ii++) {
966                 dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp);
967             }
968         } else {
969             const nb_SPAN *vspan = &packet->u_value.single;
970             dumpfn(SPAN_BUFFER(vspan), vspan->size, fp);
971         }
972     }
973 }
974 
975 void
mcreq_dump_chain(const mc_PIPELINE * pipeline,FILE * fp,mcreq_payload_dump_fn dumpfn)976 mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn)
977 {
978     sllist_node *ll;
979     SLLIST_FOREACH(&pipeline->requests, ll) {
980         const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode);
981         mcreq_dump_packet(pkt, fp, dumpfn);
982     }
983 }
984