1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "mcreq.h"
19 #include "compress.h"
20 #include "sllist-inl.h"
21 #include "internal.h"
22
23 #define PKT_HDRSIZE(pkt) (MCREQ_PKT_BASESIZE + (pkt)->extlen)
24
25 lcb_error_t
mcreq_reserve_header(mc_PIPELINE * pipeline,mc_PACKET * packet,uint8_t hdrsize)26 mcreq_reserve_header(
27 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize)
28 {
29 int rv;
30 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE;
31 packet->kh_span.size = hdrsize;
32 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
33 if (rv != 0) {
34 return LCB_CLIENT_ENOMEM;
35 }
36 return LCB_SUCCESS;
37 }
38
39 lcb_error_t
mcreq_reserve_key(mc_PIPELINE * pipeline,mc_PACKET * packet,uint8_t hdrsize,const lcb_KEYBUF * kreq)40 mcreq_reserve_key(
41 mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize,
42 const lcb_KEYBUF *kreq)
43 {
44 const struct lcb_CONTIGBUF *contig = &kreq->contig;
45 int rv;
46
47 /** Set the key offset which is the start of the key from the buffer */
48 packet->extlen = hdrsize - MCREQ_PKT_BASESIZE;
49 packet->kh_span.size = kreq->contig.nbytes;
50
51 if (kreq->type == LCB_KV_COPY) {
52 /**
53 * If the key is to be copied then just allocate the span size
54 * for the key+24+extras
55 */
56 packet->kh_span.size += hdrsize;
57 rv = netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
58 if (rv != 0) {
59 return LCB_CLIENT_ENOMEM;
60 }
61
62 /**
63 * Copy the key into the packet starting at the extras end
64 */
65 memcpy(SPAN_BUFFER(&packet->kh_span) + hdrsize,
66 contig->bytes,
67 contig->nbytes);
68
69 } else if (kreq->type == LCB_KV_CONTIG) {
70 /**
71 * Don't do any copying.
72 * Assume the key buffer has enough space for the packet as well.
73 */
74 CREATE_STANDALONE_SPAN(&packet->kh_span, contig->bytes, contig->nbytes);
75 packet->flags |= MCREQ_F_KEY_NOCOPY;
76
77 } else {
78 /** IOVs not supported for keys */
79 return LCB_EINVAL;
80 }
81
82 return LCB_SUCCESS;
83 }
84
85 lcb_error_t
mcreq_reserve_value2(mc_PIPELINE * pl,mc_PACKET * pkt,lcb_size_t n)86 mcreq_reserve_value2(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_size_t n)
87 {
88 int rv;
89 pkt->u_value.single.size = n;
90 if (!n) {
91 return LCB_SUCCESS;
92 }
93
94 pkt->flags |= MCREQ_F_HASVALUE;
95 rv = netbuf_mblock_reserve(&pl->nbmgr, &pkt->u_value.single);
96 if (rv) {
97 return LCB_CLIENT_ENOMEM;
98 }
99 return LCB_SUCCESS;
100 }
101
102 lcb_error_t
mcreq_reserve_value(mc_PIPELINE * pipeline,mc_PACKET * packet,const lcb_VALBUF * vreq)103 mcreq_reserve_value(
104 mc_PIPELINE *pipeline, mc_PACKET *packet, const lcb_VALBUF *vreq)
105 {
106 const lcb_CONTIGBUF *contig = &vreq->u_buf.contig;
107 nb_SPAN *vspan = &packet->u_value.single;
108 int rv;
109
110 if (vreq->vtype == LCB_KV_COPY) {
111 /** Copy the value into a single SPAN */
112 if (! (vspan->size = vreq->u_buf.contig.nbytes)) {
113 return LCB_SUCCESS;
114 }
115 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
116
117 if (rv != 0) {
118 return LCB_CLIENT_ENOMEM;
119 }
120
121 memcpy(SPAN_BUFFER(vspan), contig->bytes, contig->nbytes);
122
123 } else if (vreq->vtype == LCB_KV_CONTIG) {
124 /** It's still contiguous so make it a 'standalone' span */
125 CREATE_STANDALONE_SPAN(vspan, contig->bytes, contig->nbytes);
126 packet->flags |= MCREQ_F_VALUE_NOCOPY;
127
128 } else if (vreq->vtype == LCB_KV_IOV) {
129 /** Multiple spans, no copy */
130 unsigned int ii;
131 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
132 lcb_FRAGBUF *mdst = &packet->u_value.multi;
133
134 packet->flags |= MCREQ_F_VALUE_IOV | MCREQ_F_VALUE_NOCOPY;
135 mdst->niov = msrc->niov;
136 mdst->iov = malloc(mdst->niov * sizeof(*mdst->iov));
137 mdst->total_length = 0;
138
139 for (ii = 0; ii < mdst->niov; ii++) {
140 mdst->iov[ii] = msrc->iov[ii];
141 mdst->total_length += mdst->iov[ii].iov_len;
142 }
143 } else if (vreq->vtype == LCB_KV_IOVCOPY) {
144 /** Multiple input buffers, normal copying output buffer */
145 unsigned int ii, cur_offset;
146 const lcb_FRAGBUF *msrc = &vreq->u_buf.multi;
147
148 if (msrc->total_length) {
149 vspan->size = msrc->total_length;
150 } else {
151 vspan->size = 0;
152 for (ii = 0; ii < msrc->niov; ii++) {
153 vspan->size += msrc->iov[ii].iov_len;
154 }
155 }
156
157 rv = netbuf_mblock_reserve(&pipeline->nbmgr, vspan);
158 if (rv != 0) {
159 return LCB_CLIENT_ENOMEM;
160 }
161
162 for (ii = 0, cur_offset = 0; ii < msrc->niov; ii++) {
163 char *buf = SPAN_BUFFER(vspan) + cur_offset;
164 memcpy(buf, msrc->iov[ii].iov_base, msrc->iov[ii].iov_len);
165 cur_offset += msrc->iov[ii].iov_len;
166 }
167 }
168
169 packet->flags |= MCREQ_F_HASVALUE;
170 return LCB_SUCCESS;
171 }
172
173 static int
pkt_tmo_compar(sllist_node * a,sllist_node * b)174 pkt_tmo_compar(sllist_node *a, sllist_node *b)
175 {
176 mc_PACKET *pa, *pb;
177 hrtime_t tmo_a, tmo_b;
178
179 pa = SLLIST_ITEM(a, mc_PACKET, slnode);
180 pb = SLLIST_ITEM(b, mc_PACKET, slnode);
181
182 tmo_a = MCREQ_PKT_RDATA(pa)->start;
183 tmo_b = MCREQ_PKT_RDATA(pb)->start;
184
185 if (tmo_a == tmo_b) {
186 return 0;
187 } else if (tmo_a < tmo_b) {
188 return -1;
189 } else {
190 return 1;
191 }
192 }
193
194 void
mcreq_reenqueue_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)195 mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
196 {
197 sllist_root *reqs = &pipeline->requests;
198 mcreq_enqueue_packet(pipeline, packet);
199 sllist_remove(reqs, &packet->slnode);
200 sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar);
201 }
202
203 void
mcreq_enqueue_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)204 mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
205 {
206 nb_SPAN *vspan = &packet->u_value.single;
207 sllist_append(&pipeline->requests, &packet->slnode);
208 netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span, packet);
209 MC_INCR_METRIC(pipeline, bytes_queued, packet->kh_span.size);
210
211 if (!(packet->flags & MCREQ_F_HASVALUE)) {
212 goto GT_ENQUEUE_PDU;
213 }
214
215 if (packet->flags & MCREQ_F_VALUE_IOV) {
216 unsigned int ii;
217 lcb_FRAGBUF *multi = &packet->u_value.multi;
218 for (ii = 0; ii < multi->niov; ii++) {
219 netbuf_enqueue(&pipeline->nbmgr, (nb_IOV *)multi->iov + ii, packet);
220 MC_INCR_METRIC(pipeline, bytes_queued, multi->iov[ii].iov_len);
221 }
222
223 } else if (vspan->size) {
224 MC_INCR_METRIC(pipeline, bytes_queued, vspan->size);
225 netbuf_enqueue_span(&pipeline->nbmgr, vspan, packet);
226 }
227
228 GT_ENQUEUE_PDU:
229 netbuf_pdu_enqueue(&pipeline->nbmgr, packet, offsetof(mc_PACKET, sl_flushq));
230 MC_INCR_METRIC(pipeline, packets_queued, 1);
231 }
232
233 void
mcreq_wipe_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)234 mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
235 {
236 if (! (packet->flags & MCREQ_F_KEY_NOCOPY)) {
237 if (packet->flags & MCREQ_F_DETACHED) {
238 free(SPAN_BUFFER(&packet->kh_span));
239 } else {
240 netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
241 }
242 }
243
244 if (! (packet->flags & MCREQ_F_HASVALUE)) {
245 return;
246 }
247
248 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
249 if (packet->flags & MCREQ_F_VALUE_IOV) {
250 free(packet->u_value.multi.iov);
251 }
252
253 return;
254 }
255
256 if (packet->flags & MCREQ_F_DETACHED) {
257 free(SPAN_BUFFER(&packet->u_value.single));
258 } else {
259 netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single);
260 }
261
262 }
263
264 mc_PACKET *
mcreq_allocate_packet(mc_PIPELINE * pipeline)265 mcreq_allocate_packet(mc_PIPELINE *pipeline)
266 {
267 nb_SPAN span;
268 int rv;
269 mc_PACKET *ret;
270 span.size = sizeof(*ret);
271
272 rv = netbuf_mblock_reserve(&pipeline->reqpool, &span);
273 if (rv != 0) {
274 return NULL;
275 }
276
277 ret = (void *) SPAN_MBUFFER_NC(&span);
278 ret->alloc_parent = span.parent;
279 ret->flags = 0;
280 ret->retries = 0;
281 ret->opaque = pipeline->parent->seq++;
282 #ifdef LCB_TRACING
283 ret->u_rdata.reqdata.span = NULL;
284 #endif
285 return ret;
286 }
287
288 void
mcreq_release_packet(mc_PIPELINE * pipeline,mc_PACKET * packet)289 mcreq_release_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
290 {
291 nb_SPAN span;
292 if (packet->flags & MCREQ_F_DETACHED) {
293 sllist_iterator iter;
294 mc_EXPACKET *epkt = (mc_EXPACKET *)packet;
295
296 SLLIST_ITERFOR(&epkt->data, &iter) {
297 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode);
298 sllist_iter_remove(&epkt->data, &iter);
299 d->dtorfn(d);
300 }
301 free(epkt);
302 return;
303 }
304
305 span.size = sizeof(*packet);
306 span.parent = packet->alloc_parent;
307 span.offset = (char *)packet - packet->alloc_parent->root;
308
309 netbuf_mblock_release(&pipeline->reqpool, &span);
310 }
311
312 #define MCREQ_DETACH_WIPESRC 1
313
314 mc_PACKET *
mcreq_renew_packet(const mc_PACKET * src)315 mcreq_renew_packet(const mc_PACKET *src)
316 {
317 char *kdata, *vdata;
318 unsigned nvdata;
319 mc_PACKET *dst;
320 mc_EXPACKET *edst = calloc(1, sizeof(*edst));
321
322 dst = &edst->base;
323 *dst = *src;
324
325 kdata = malloc(src->kh_span.size);
326 memcpy(kdata, SPAN_BUFFER(&src->kh_span), src->kh_span.size);
327 CREATE_STANDALONE_SPAN(&dst->kh_span, kdata, src->kh_span.size);
328
329 dst->flags &= ~(MCREQ_F_KEY_NOCOPY|MCREQ_F_VALUE_NOCOPY|MCREQ_F_VALUE_IOV);
330 dst->flags |= MCREQ_F_DETACHED;
331 dst->alloc_parent = NULL;
332 dst->sl_flushq.next = NULL;
333 dst->slnode.next = NULL;
334 dst->retries = src->retries;
335
336 if (src->flags & MCREQ_F_HASVALUE) {
337 /** Get the length */
338 if (src->flags & MCREQ_F_VALUE_IOV) {
339 unsigned ii;
340 unsigned offset = 0;
341
342 nvdata = src->u_value.multi.total_length;
343 vdata = malloc(nvdata);
344 for (ii = 0; ii < src->u_value.multi.niov; ii++) {
345 const lcb_IOV *iov = src->u_value.multi.iov + ii;
346
347 memcpy(vdata + offset, iov->iov_base, iov->iov_len);
348 offset += iov->iov_len;
349 }
350 } else {
351 protocol_binary_request_header hdr;
352 const nb_SPAN *origspan = &src->u_value.single;
353 mcreq_read_hdr(dst, &hdr);
354
355 if (hdr.request.datatype & PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
356 /* For compressed payloads we need to uncompress it first
357 * because it may be forwarded to a server without compression.
358 * TODO: might be more clever to check a setting flag somewhere
359 * and see if we should do this. */
360
361 lcb_SIZE n_inflated;
362 const void *inflated;
363 int rv;
364
365 vdata = NULL;
366 rv = mcreq_inflate_value(SPAN_BUFFER(origspan), origspan->size,
367 &inflated, &n_inflated, (void**)&vdata);
368
369 lcb_assert(vdata == inflated);
370
371 if (rv != 0) {
372 /* TODO: log error details when snappy will be enabled */
373 free(edst);
374 return NULL;
375 }
376 nvdata = n_inflated;
377 hdr.request.datatype &= ~PROTOCOL_BINARY_DATATYPE_COMPRESSED;
378 hdr.request.bodylen = htonl(
379 ntohs(hdr.request.keylen) +
380 hdr.request.extlen +
381 n_inflated);
382 mcreq_write_hdr(dst, &hdr);
383
384 } else {
385 nvdata = origspan->size;
386 vdata = malloc(nvdata);
387 memcpy(vdata, SPAN_BUFFER(origspan), nvdata);
388 }
389 }
390
391 /* Declare the value as a standalone malloc'd span */
392 CREATE_STANDALONE_SPAN(&dst->u_value.single, vdata, nvdata);
393 }
394
395 if (src->flags & MCREQ_F_DETACHED) {
396 mc_EXPACKET *esrc = (mc_EXPACKET *)src;
397 sllist_iterator iter;
398 SLLIST_ITERFOR(&esrc->data, &iter) {
399 sllist_node *cur = iter.cur;
400 sllist_iter_remove(&esrc->data, &iter);
401 sllist_append(&edst->data, cur);
402 }
403 }
404 return dst;
405 }
406
407 int
mcreq_epkt_insert(mc_EXPACKET * ep,mc_EPKTDATUM * datum)408 mcreq_epkt_insert(mc_EXPACKET *ep, mc_EPKTDATUM *datum)
409 {
410 if (!(ep->base.flags & MCREQ_F_DETACHED)) {
411 return -1;
412 }
413 lcb_assert(!sllist_contains(&ep->data, &datum->slnode));
414 sllist_append(&ep->data, &datum->slnode);
415 return 0;
416 }
417
418 mc_EPKTDATUM *
mcreq_epkt_find(mc_EXPACKET * ep,const char * key)419 mcreq_epkt_find(mc_EXPACKET *ep, const char *key)
420 {
421 sllist_iterator iter;
422 SLLIST_ITERFOR(&ep->data, &iter) {
423 mc_EPKTDATUM *d = SLLIST_ITEM(iter.cur, mc_EPKTDATUM, slnode);
424 if (!strcmp(key, d->key)) {
425 return d;
426 }
427 }
428 return NULL;
429 }
430
431 void
mcreq_map_key(mc_CMDQUEUE * queue,const lcb_KEYBUF * key,const lcb_KEYBUF * hashkey,unsigned nhdr,int * vbid,int * srvix)432 mcreq_map_key(mc_CMDQUEUE *queue,
433 const lcb_KEYBUF *key, const lcb_KEYBUF *hashkey,
434 unsigned nhdr, int *vbid, int *srvix)
435 {
436 const void *hk;
437 size_t nhk = 0;
438 if (hashkey) {
439 if (hashkey->type == LCB_KV_COPY && hashkey->contig.bytes != NULL) {
440 hk = hashkey->contig.bytes;
441 nhk = hashkey->contig.nbytes;
442 } else if (hashkey->type == LCB_KV_VBID) {
443 *vbid = hashkey->contig.nbytes;
444 *srvix = lcbvb_vbmaster(queue->config, *vbid);
445 return;
446 }
447 }
448 if (!nhk) {
449 if (key->type == LCB_KV_COPY) {
450 hk = key->contig.bytes;
451 nhk = key->contig.nbytes;
452 } else {
453 const char *buf = key->contig.bytes;
454 buf += nhdr;
455 hk = buf;
456 nhk = key->contig.nbytes - nhdr;
457 }
458 }
459 lcbvb_map_key(queue->config, hk, nhk, vbid, srvix);
460 }
461
462 lcb_error_t
mcreq_basic_packet(mc_CMDQUEUE * queue,const lcb_CMDBASE * cmd,protocol_binary_request_header * req,lcb_uint8_t extlen,mc_PACKET ** packet,mc_PIPELINE ** pipeline,int options)463 mcreq_basic_packet(
464 mc_CMDQUEUE *queue, const lcb_CMDBASE *cmd,
465 protocol_binary_request_header *req, lcb_uint8_t extlen,
466 mc_PACKET **packet, mc_PIPELINE **pipeline, int options)
467 {
468 int vb, srvix;
469
470 if (!queue->config) {
471 return LCB_CLIENT_ETMPFAIL;
472 }
473 if (!cmd) {
474 return LCB_EINVAL;
475 }
476
477 mcreq_map_key(queue, &cmd->key, &cmd->_hashkey,
478 sizeof(*req) + extlen, &vb, &srvix);
479 if (srvix > -1 && srvix < (int)queue->npipelines) {
480 *pipeline = queue->pipelines[srvix];
481
482 } else {
483 if ((options & MCREQ_BASICPACKET_F_FALLBACKOK) && queue->fallback) {
484 *pipeline = queue->fallback;
485 } else {
486 return LCB_NO_MATCHING_SERVER;
487 }
488 }
489
490 *packet = mcreq_allocate_packet(*pipeline);
491 if (*packet == NULL) {
492 return LCB_CLIENT_ENOMEM;
493 }
494
495 mcreq_reserve_key(*pipeline, *packet, sizeof(*req) + extlen, &cmd->key);
496
497 req->request.keylen = htons((*packet)->kh_span.size - PKT_HDRSIZE(*packet));
498 req->request.vbucket = htons(vb);
499 req->request.extlen = extlen;
500 return LCB_SUCCESS;
501 }
502
503 void
mcreq_get_key(const mc_PACKET * packet,const void ** key,lcb_size_t * nkey)504 mcreq_get_key(const mc_PACKET *packet, const void **key, lcb_size_t *nkey)
505 {
506 *key = SPAN_BUFFER(&packet->kh_span) + PKT_HDRSIZE(packet);
507 *nkey = packet->kh_span.size - PKT_HDRSIZE(packet);
508 }
509
510 lcb_uint32_t
mcreq_get_bodysize(const mc_PACKET * packet)511 mcreq_get_bodysize(const mc_PACKET *packet)
512 {
513 lcb_uint32_t ret;
514 char *retptr = SPAN_BUFFER(&packet->kh_span) + 8;
515 if ((uintptr_t)retptr % sizeof(ret) == 0) {
516 return ntohl(*(lcb_uint32_t*) (void *)retptr);
517 } else {
518 memcpy(&ret, retptr, sizeof(ret));
519 return ntohl(ret);
520 }
521 }
522
523 uint16_t
mcreq_get_vbucket(const mc_PACKET * packet)524 mcreq_get_vbucket(const mc_PACKET *packet)
525 {
526 uint16_t ret;
527 char *retptr = SPAN_BUFFER(&packet->kh_span) + 6;
528 if ((uintptr_t)retptr % sizeof(ret) == 0) {
529 return ntohs(*(uint16_t*)(void*)retptr);
530 } else {
531 memcpy(&ret, retptr, sizeof ret);
532 return ntohs(ret);
533 }
534 }
535
536 uint32_t
mcreq_get_size(const mc_PACKET * packet)537 mcreq_get_size(const mc_PACKET *packet)
538 {
539 uint32_t sz = packet->kh_span.size;
540 if (packet->flags & MCREQ_F_HASVALUE) {
541 if (packet->flags & MCREQ_F_VALUE_IOV) {
542 sz += packet->u_value.multi.total_length;
543 } else {
544 sz += packet->u_value.single.size;
545 }
546 }
547 return sz;
548 }
549
550 void
mcreq_pipeline_cleanup(mc_PIPELINE * pipeline)551 mcreq_pipeline_cleanup(mc_PIPELINE *pipeline)
552 {
553 netbuf_cleanup(&pipeline->nbmgr);
554 netbuf_cleanup(&pipeline->reqpool);
555 }
556
557 int
mcreq_pipeline_init(mc_PIPELINE * pipeline)558 mcreq_pipeline_init(mc_PIPELINE *pipeline)
559 {
560 nb_SETTINGS settings;
561
562 /* Initialize all members to 0 */
563 memset(&pipeline->requests, 0, sizeof pipeline->requests);
564 pipeline->parent = NULL;
565 pipeline->flush_start = NULL;
566 pipeline->index = 0;
567 memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued);
568 pipeline->buf_done_callback = NULL;
569
570 netbuf_default_settings(&settings);
571
572 /** Initialize datapool */
573 netbuf_init(&pipeline->nbmgr, &settings);
574
575 /** Initialize request pool */
576 settings.data_basealloc = sizeof(mc_PACKET) * 32;
577 netbuf_init(&pipeline->reqpool, &settings);;
578 pipeline->metrics = NULL;
579 return 0;
580 }
581
582 void
mcreq_queue_add_pipelines(mc_CMDQUEUE * queue,mc_PIPELINE * const * pipelines,unsigned npipelines,lcbvb_CONFIG * config)583 mcreq_queue_add_pipelines(
584 mc_CMDQUEUE *queue, mc_PIPELINE * const *pipelines, unsigned npipelines,
585 lcbvb_CONFIG* config)
586 {
587 unsigned ii;
588
589 lcb_assert(queue->pipelines == NULL);
590 queue->npipelines = npipelines;
591 queue->_npipelines_ex = queue->npipelines;
592 queue->pipelines = malloc(sizeof(*pipelines) * (npipelines + 1));
593 queue->config = config;
594
595 memcpy(queue->pipelines, pipelines, sizeof(*pipelines) * npipelines);
596
597 free(queue->scheds);
598 queue->scheds = calloc(npipelines+1, 1);
599
600 for (ii = 0; ii < npipelines; ii++) {
601 pipelines[ii]->parent = queue;
602 pipelines[ii]->index = ii;
603 }
604
605 if (queue->fallback) {
606 queue->fallback->index = npipelines;
607 queue->pipelines[queue->npipelines] = queue->fallback;
608 queue->_npipelines_ex++;
609 }
610 }
611
612 mc_PIPELINE **
mcreq_queue_take_pipelines(mc_CMDQUEUE * queue,unsigned * count)613 mcreq_queue_take_pipelines(mc_CMDQUEUE *queue, unsigned *count)
614 {
615 mc_PIPELINE **ret = queue->pipelines;
616 *count = queue->npipelines;
617 queue->pipelines = NULL;
618 queue->npipelines = 0;
619 return ret;
620 }
621
622 int
mcreq_queue_init(mc_CMDQUEUE * queue)623 mcreq_queue_init(mc_CMDQUEUE *queue)
624 {
625 queue->seq = 0;
626 queue->pipelines = NULL;
627 queue->scheds = NULL;
628 queue->fallback = NULL;
629 queue->npipelines = 0;
630 return 0;
631 }
632
633 void
mcreq_queue_cleanup(mc_CMDQUEUE * queue)634 mcreq_queue_cleanup(mc_CMDQUEUE *queue)
635 {
636 if (queue->fallback) {
637 mcreq_pipeline_cleanup(queue->fallback);
638 free(queue->fallback);
639 queue->fallback = NULL;
640 }
641 free(queue->scheds);
642 free(queue->pipelines);
643 queue->pipelines = NULL;
644 queue->npipelines = 0;
645 queue->scheds = NULL;
646 }
647
648 void
mcreq_sched_enter(mc_CMDQUEUE * queue)649 mcreq_sched_enter(mc_CMDQUEUE *queue)
650 {
651 queue->ctxenter = 1;
652 }
653
654
655
656 static void
queuectx_leave(mc_CMDQUEUE * queue,int success,int flush)657 queuectx_leave(mc_CMDQUEUE *queue, int success, int flush)
658 {
659 unsigned ii;
660
661 if (queue->ctxenter) {
662 queue->ctxenter = 0;
663 }
664
665 for (ii = 0; ii < queue->_npipelines_ex; ii++) {
666 mc_PIPELINE *pipeline;
667 sllist_node *ll_next, *ll;
668
669 if (!queue->scheds[ii]) {
670 continue;
671 }
672
673 pipeline = queue->pipelines[ii];
674 ll = SLLIST_FIRST(&pipeline->ctxqueued);
675
676 while (ll) {
677 mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode);
678 ll_next = ll->next;
679
680 if (success) {
681 mcreq_enqueue_packet(pipeline, pkt);
682 } else {
683 if (pkt->flags & MCREQ_F_REQEXT) {
684 mc_REQDATAEX *rd = pkt->u_rdata.exdata;
685 if (rd->procs->fail_dtor) {
686 rd->procs->fail_dtor(pkt);
687 }
688 }
689 mcreq_wipe_packet(pipeline, pkt);
690 mcreq_release_packet(pipeline, pkt);
691 }
692
693 ll = ll_next;
694 }
695 SLLIST_FIRST(&pipeline->ctxqueued) = pipeline->ctxqueued.last = NULL;
696 if (flush) {
697 pipeline->flush_start(pipeline);
698 }
699 queue->scheds[ii] = 0;
700 }
701 }
702
703 void
mcreq_sched_leave(mc_CMDQUEUE * queue,int do_flush)704 mcreq_sched_leave(mc_CMDQUEUE *queue, int do_flush)
705 {
706 queuectx_leave(queue, 1, do_flush);
707 }
708
709 void
mcreq_sched_fail(mc_CMDQUEUE * queue)710 mcreq_sched_fail(mc_CMDQUEUE *queue)
711 {
712 queuectx_leave(queue, 0, 0);
713 }
714
715 void
mcreq_sched_add(mc_PIPELINE * pipeline,mc_PACKET * pkt)716 mcreq_sched_add(mc_PIPELINE *pipeline, mc_PACKET *pkt)
717 {
718 mc_CMDQUEUE *cq = pipeline->parent;
719 if (!cq->scheds[pipeline->index]) {
720 cq->scheds[pipeline->index] = 1;
721 }
722 sllist_append(&pipeline->ctxqueued, &pkt->slnode);
723 }
724
725 static mc_PACKET *
pipeline_find(mc_PIPELINE * pipeline,lcb_uint32_t opaque,int do_remove)726 pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque, int do_remove)
727 {
728 sllist_iterator iter;
729 SLLIST_ITERFOR(&pipeline->requests, &iter) {
730 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
731 if (pkt->opaque == opaque) {
732 if (do_remove) {
733 sllist_iter_remove(&pipeline->requests, &iter);
734 }
735 return pkt;
736 }
737 }
738 return NULL;
739 }
740
741 mc_PACKET *
mcreq_pipeline_find(mc_PIPELINE * pipeline,lcb_uint32_t opaque)742 mcreq_pipeline_find(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
743 {
744 return pipeline_find(pipeline, opaque, 0);
745 }
746
747 mc_PACKET *
mcreq_pipeline_remove(mc_PIPELINE * pipeline,lcb_uint32_t opaque)748 mcreq_pipeline_remove(mc_PIPELINE *pipeline, lcb_uint32_t opaque)
749 {
750 return pipeline_find(pipeline, opaque, 1);
751 }
752
753 void
mcreq_packet_done(mc_PIPELINE * pipeline,mc_PACKET * pkt)754 mcreq_packet_done(mc_PIPELINE *pipeline, mc_PACKET *pkt)
755 {
756 lcb_assert(pkt->flags & MCREQ_F_FLUSHED);
757 lcb_assert(pkt->flags & MCREQ_F_INVOKED);
758 if (pkt->flags & MCREQ_UBUF_FLAGS) {
759 void *kbuf, *vbuf;
760 const void *cookie;
761
762 cookie = MCREQ_PKT_COOKIE(pkt);
763 if (pkt->flags & MCREQ_F_KEY_NOCOPY) {
764 kbuf = SPAN_BUFFER(&pkt->kh_span);
765 } else {
766 kbuf = NULL;
767 }
768 if (pkt->flags & MCREQ_F_VALUE_NOCOPY) {
769 if (pkt->flags & MCREQ_F_VALUE_IOV) {
770 vbuf = pkt->u_value.multi.iov->iov_base;
771 } else {
772 vbuf = SPAN_SABUFFER_NC(&pkt->u_value.single);
773 }
774 } else {
775 vbuf = NULL;
776 }
777
778 pipeline->buf_done_callback(pipeline, cookie, kbuf, vbuf);
779 }
780 mcreq_wipe_packet(pipeline, pkt);
781 mcreq_release_packet(pipeline, pkt);
782 }
783
784 void
mcreq_reset_timeouts(mc_PIPELINE * pl,lcb_U64 nstime)785 mcreq_reset_timeouts(mc_PIPELINE *pl, lcb_U64 nstime)
786 {
787 sllist_node *nn;
788 SLLIST_ITERBASIC(&pl->requests, nn) {
789 mc_PACKET *pkt = SLLIST_ITEM(nn, mc_PACKET, slnode);
790 MCREQ_PKT_RDATA(pkt)->start = nstime;
791 }
792 }
793
794 unsigned
mcreq_pipeline_timeout(mc_PIPELINE * pl,lcb_error_t err,mcreq_pktfail_fn failcb,void * cbarg,hrtime_t oldest_valid,hrtime_t * oldest_start)795 mcreq_pipeline_timeout(
796 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *cbarg,
797 hrtime_t oldest_valid, hrtime_t *oldest_start)
798 {
799 sllist_iterator iter;
800 unsigned count = 0;
801
802 SLLIST_ITERFOR(&pl->requests, &iter) {
803 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
804 mc_REQDATA *rd = MCREQ_PKT_RDATA(pkt);
805
806 /**
807 * oldest_valid contains the LOWEST timestamp we can admit to being
808 * acceptable. If the current command is newer (i.e. has a higher
809 * timestamp) then we break the iteration and return.
810 */
811 if (oldest_valid && rd->start > oldest_valid) {
812 if (oldest_start) {
813 *oldest_start = rd->start;
814 }
815 return count;
816 }
817
818 sllist_iter_remove(&pl->requests, &iter);
819 failcb(pl, pkt, err, cbarg);
820 mcreq_packet_handled(pl, pkt);
821 count++;
822 }
823 return count;
824 }
825
826 unsigned
mcreq_pipeline_fail(mc_PIPELINE * pl,lcb_error_t err,mcreq_pktfail_fn failcb,void * arg)827 mcreq_pipeline_fail(
828 mc_PIPELINE *pl, lcb_error_t err, mcreq_pktfail_fn failcb, void *arg)
829 {
830 return mcreq_pipeline_timeout(pl, err, failcb, arg, 0, NULL);
831 }
832
833 void
mcreq_iterwipe(mc_CMDQUEUE * queue,mc_PIPELINE * src,mcreq_iterwipe_fn callback,void * arg)834 mcreq_iterwipe(mc_CMDQUEUE *queue, mc_PIPELINE *src,
835 mcreq_iterwipe_fn callback, void *arg)
836 {
837 sllist_iterator iter;
838
839 SLLIST_ITERFOR(&src->requests, &iter) {
840 int rv;
841 mc_PACKET *orig = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
842 rv = callback(queue, src, orig, arg);
843 if (rv == MCREQ_REMOVE_PACKET) {
844 sllist_iter_remove(&src->requests, &iter);
845 }
846 }
847 }
848
849 #include "mcreq-flush-inl.h"
850 typedef struct {
851 mc_PIPELINE base;
852 mcreq_fallback_cb handler;
853 } mc_FALLBACKPL;
854
855 static void
do_fallback_flush(mc_PIPELINE * pipeline)856 do_fallback_flush(mc_PIPELINE *pipeline)
857 {
858 nb_IOV iov;
859 unsigned nb;
860 int nused;
861 sllist_iterator iter;
862 mc_FALLBACKPL *fpl = (mc_FALLBACKPL*)pipeline;
863
864 while ((nb = mcreq_flush_iov_fill(pipeline, &iov, 1, &nused))) {
865 mcreq_flush_done(pipeline, nb, nb);
866 }
867 /* Now handle all the packets, for real */
868 SLLIST_ITERFOR(&pipeline->requests, &iter) {
869 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
870 fpl->handler(pipeline->parent, pkt);
871 sllist_iter_remove(&pipeline->requests, &iter);
872 mcreq_packet_handled(pipeline, pkt);
873 }
874 }
875
876 void
mcreq_set_fallback_handler(mc_CMDQUEUE * cq,mcreq_fallback_cb handler)877 mcreq_set_fallback_handler(mc_CMDQUEUE *cq, mcreq_fallback_cb handler)
878 {
879 mc_FALLBACKPL *fallback;
880 lcb_assert(!cq->fallback);
881 fallback = calloc(1, sizeof (mc_FALLBACKPL));
882 cq->fallback = (mc_PIPELINE *)fallback;
883 mcreq_pipeline_init(cq->fallback);
884 cq->fallback->parent = cq;
885 cq->fallback->index = cq->npipelines;
886 ((mc_FALLBACKPL*)cq->fallback)->handler = handler;
887 cq->fallback->flush_start = do_fallback_flush;
888 }
889
890 static void
noop_dumpfn(const void * d,unsigned n,FILE * fp)891 noop_dumpfn(const void *d, unsigned n, FILE *fp) { (void)d;(void)n;(void)fp; }
892
893 #define MCREQ_XFLAGS(X) \
894 X(KEY_NOCOPY) \
895 X(VALUE_NOCOPY) \
896 X(VALUE_IOV) \
897 X(HASVALUE) \
898 X(REQEXT) \
899 X(UFWD) \
900 X(FLUSHED) \
901 X(INVOKED) \
902 X(DETACHED)
903
904 void
mcreq_dump_packet(const mc_PACKET * packet,FILE * fp,mcreq_payload_dump_fn dumpfn)905 mcreq_dump_packet(const mc_PACKET *packet, FILE *fp, mcreq_payload_dump_fn dumpfn)
906 {
907 const char *indent = " ";
908 const mc_REQDATA *rdata = MCREQ_PKT_RDATA(packet);
909
910 if (!dumpfn) {
911 dumpfn = noop_dumpfn;
912 }
913 if (!fp) {
914 fp = stderr;
915 }
916
917 fprintf(fp, "Packet @%p\n", (void *)packet);
918 fprintf(fp, "%sOPAQUE: %u\n", indent, (unsigned int)packet->opaque);
919
920 fprintf(fp, "%sPKTFLAGS: 0x%x ", indent, packet->flags);
921 #define X(base) \
922 if (packet->flags & MCREQ_F_##base) { fprintf(fp, "%s, ", #base); }
923 MCREQ_XFLAGS(X)
924 #undef X
925 fprintf(fp, "\n");
926
927 fprintf(fp, "%sKey+Header Size: %u\n", indent, (unsigned int)packet->kh_span.size);
928 fprintf(fp, "%sKey Offset: %u\n", indent, MCREQ_PKT_BASESIZE + packet->extlen);
929
930
931 if (packet->flags & MCREQ_F_HASVALUE) {
932 if (packet->flags & MCREQ_F_VALUE_IOV) {
933 fprintf(fp, "%sValue Length: %u\n", indent,
934 packet->u_value.multi.total_length);
935
936 fprintf(fp, "%sValue IOV: [start=%p, n=%d]\n", indent,
937 (void *)packet->u_value.multi.iov, packet->u_value.multi.niov);
938 } else {
939 if (packet->flags & MCREQ_F_VALUE_NOCOPY) {
940 fprintf(fp, "%sValue is user allocated\n", indent);
941 }
942 fprintf(fp, "%sValue: %p, %u bytes\n", indent,
943 (void *)SPAN_BUFFER(&packet->u_value.single), packet->u_value.single.size);
944 }
945 }
946
947 fprintf(fp, "%sRDATA(%s): %p\n", indent,
948 (packet->flags & MCREQ_F_REQEXT) ? "ALLOC" : "EMBEDDED", (void *)rdata);
949
950 indent = " ";
951 fprintf(fp, "%sStart: %lu\n", indent, (unsigned long)rdata->start);
952 fprintf(fp, "%sCookie: %p\n", indent, rdata->cookie);
953
954 indent = " ";
955 fprintf(fp, "%sNEXT: %p\n", indent, (void *)packet->slnode.next);
956 if (dumpfn != noop_dumpfn) {
957 fprintf(fp, "PACKET CONTENTS:\n");
958 }
959
960 fwrite(SPAN_BUFFER(&packet->kh_span), 1, packet->kh_span.size, fp);
961 if (packet->flags & MCREQ_F_HASVALUE) {
962 if (packet->flags & MCREQ_F_VALUE_IOV) {
963 const lcb_IOV *iovs = packet->u_value.multi.iov;
964 unsigned ii, ixmax = packet->u_value.multi.niov;
965 for (ii = 0; ii < ixmax; ii++) {
966 dumpfn(iovs[ii].iov_base, iovs[ii].iov_len, fp);
967 }
968 } else {
969 const nb_SPAN *vspan = &packet->u_value.single;
970 dumpfn(SPAN_BUFFER(vspan), vspan->size, fp);
971 }
972 }
973 }
974
975 void
mcreq_dump_chain(const mc_PIPELINE * pipeline,FILE * fp,mcreq_payload_dump_fn dumpfn)976 mcreq_dump_chain(const mc_PIPELINE *pipeline, FILE *fp, mcreq_payload_dump_fn dumpfn)
977 {
978 sllist_node *ll;
979 SLLIST_FOREACH(&pipeline->requests, ll) {
980 const mc_PACKET *pkt = SLLIST_ITEM(ll, mc_PACKET, slnode);
981 mcreq_dump_packet(pkt, fp, dumpfn);
982 }
983 }
984