1 #include "mc/mcreq.h"
2 #include "sllist-inl.h"
3 #include <gtest/gtest.h>
4 
5 #define NUM_PIPELINES 4
6 
7 struct CQWrap : mc_CMDQUEUE {
8     lcbvb_CONFIG* config;
CQWrapCQWrap9     CQWrap() {
10         mc_PIPELINE **pll;
11         pll = (mc_PIPELINE **)malloc(sizeof(*pll) * NUM_PIPELINES);
12         config = lcbvb_create();
13         for (unsigned ii = 0; ii < NUM_PIPELINES; ii++) {
14             mc_PIPELINE *pipeline = (mc_PIPELINE *)calloc(1, sizeof(*pipeline));
15             mcreq_pipeline_init(pipeline);
16             pll[ii] = pipeline;
17         }
18         lcbvb_genconfig(config, NUM_PIPELINES, 3, 1024);
19         mcreq_queue_init(this);
20         this->seq = 100;
21         mcreq_queue_add_pipelines(this, pll, NUM_PIPELINES, config);
22         free(pll);
23     }
24 
~CQWrapCQWrap25     ~CQWrap() {
26         for (int ii = 0; ii < NUM_PIPELINES; ii++) {
27             mc_PIPELINE *pipeline = pipelines[ii];
28             EXPECT_NE(0, netbuf_is_clean(&pipeline->nbmgr));
29             EXPECT_NE(0, netbuf_is_clean(&pipeline->reqpool));
30             mcreq_pipeline_cleanup(pipeline);
31             free(pipeline);
32         }
33         mcreq_queue_cleanup(this);
34         lcbvb_destroy(config);
35     }
36 
clearPipelinesCQWrap37     void clearPipelines() {
38         for (unsigned ii = 0; ii < npipelines; ii++) {
39             mc_PIPELINE *pipeline = pipelines[ii];
40             sllist_iterator iter;
41             SLLIST_ITERFOR(&pipeline->requests, &iter) {
42                 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode);
43                 sllist_iter_remove(&pipeline->requests, &iter);
44                 mcreq_wipe_packet(pipeline, pkt);
45                 mcreq_release_packet(pipeline, pkt);
46             }
47         }
48     }
49 
setBufFreeCallbackCQWrap50     void setBufFreeCallback(mcreq_bufdone_fn cb) {
51         for (unsigned ii = 0; ii < npipelines; ii++) {
52             pipelines[ii]->buf_done_callback = cb;
53         }
54     }
55 
56     CQWrap(CQWrap&);
57 };
58 
59 struct PacketWrap {
60     mc_PACKET *pkt;
61     mc_PIPELINE *pipeline;
62     protocol_binary_request_header hdr;
63     lcb_CMDBASE cmd;
64     char *pktbuf;
65     char *kbuf;
66 
PacketWrapPacketWrap67     PacketWrap() {
68         pkt = NULL;
69         pipeline = NULL;
70         pktbuf = NULL;
71         kbuf = NULL;
72         memset(&hdr, 0, sizeof(hdr));
73         memset(&cmd, 0, sizeof(cmd));
74     }
75 
setKeyPacketWrap76     void setKey(const char *key) {
77         size_t nkey = strlen(key);
78         pktbuf = new char[24 + nkey + 1];
79         kbuf = pktbuf + 24;
80         memcpy(kbuf, key, nkey);
81         kbuf[nkey] = '\0';
82     }
83 
setContigKeyPacketWrap84     void setContigKey(const char *key) {
85         setKey(key);
86         cmd.key.type = LCB_KV_HEADER_AND_KEY;
87         cmd.key.contig.bytes = pktbuf;
88         cmd.key.contig.nbytes = strlen(key) + 24;
89     }
90 
setCopyKeyPacketWrap91     void setCopyKey(const char *key) {
92         setKey(key);
93         LCB_KREQ_SIMPLE(&cmd.key, kbuf, strlen(key));
94     }
95 
setHeaderSizePacketWrap96     void setHeaderSize() {
97         hdr.request.bodylen = htonl((lcb_uint32_t)strlen(kbuf));
98     }
99 
copyHeaderPacketWrap100     void copyHeader() {
101         memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes));
102     }
103 
setCookiePacketWrap104     void setCookie(void *ptr) {
105         pkt->u_rdata.reqdata.cookie = ptr;
106     }
107 
reservePacketPacketWrap108     bool reservePacket(mc_CMDQUEUE *cq) {
109         lcb_error_t err;
110         err = mcreq_basic_packet(cq, &cmd, &hdr, 0, &pkt, &pipeline, 0);
111         return err == LCB_SUCCESS;
112     }
113 
~PacketWrapPacketWrap114     ~PacketWrap() {
115         if (pktbuf != NULL) {
116             delete[] pktbuf;
117         }
118     }
119 };
120