1 #include "mc/mcreq.h" 2 #include "sllist-inl.h" 3 #include <gtest/gtest.h> 4 5 #define NUM_PIPELINES 4 6 7 struct CQWrap : mc_CMDQUEUE { 8 lcbvb_CONFIG* config; CQWrapCQWrap9 CQWrap() { 10 mc_PIPELINE **pll; 11 pll = (mc_PIPELINE **)malloc(sizeof(*pll) * NUM_PIPELINES); 12 config = lcbvb_create(); 13 for (unsigned ii = 0; ii < NUM_PIPELINES; ii++) { 14 mc_PIPELINE *pipeline = (mc_PIPELINE *)calloc(1, sizeof(*pipeline)); 15 mcreq_pipeline_init(pipeline); 16 pll[ii] = pipeline; 17 } 18 lcbvb_genconfig(config, NUM_PIPELINES, 3, 1024); 19 mcreq_queue_init(this); 20 this->seq = 100; 21 mcreq_queue_add_pipelines(this, pll, NUM_PIPELINES, config); 22 free(pll); 23 } 24 ~CQWrapCQWrap25 ~CQWrap() { 26 for (int ii = 0; ii < NUM_PIPELINES; ii++) { 27 mc_PIPELINE *pipeline = pipelines[ii]; 28 EXPECT_NE(0, netbuf_is_clean(&pipeline->nbmgr)); 29 EXPECT_NE(0, netbuf_is_clean(&pipeline->reqpool)); 30 mcreq_pipeline_cleanup(pipeline); 31 free(pipeline); 32 } 33 mcreq_queue_cleanup(this); 34 lcbvb_destroy(config); 35 } 36 clearPipelinesCQWrap37 void clearPipelines() { 38 for (unsigned ii = 0; ii < npipelines; ii++) { 39 mc_PIPELINE *pipeline = pipelines[ii]; 40 sllist_iterator iter; 41 SLLIST_ITERFOR(&pipeline->requests, &iter) { 42 mc_PACKET *pkt = SLLIST_ITEM(iter.cur, mc_PACKET, slnode); 43 sllist_iter_remove(&pipeline->requests, &iter); 44 mcreq_wipe_packet(pipeline, pkt); 45 mcreq_release_packet(pipeline, pkt); 46 } 47 } 48 } 49 setBufFreeCallbackCQWrap50 void setBufFreeCallback(mcreq_bufdone_fn cb) { 51 for (unsigned ii = 0; ii < npipelines; ii++) { 52 pipelines[ii]->buf_done_callback = cb; 53 } 54 } 55 56 CQWrap(CQWrap&); 57 }; 58 59 struct PacketWrap { 60 mc_PACKET *pkt; 61 mc_PIPELINE *pipeline; 62 protocol_binary_request_header hdr; 63 lcb_CMDBASE cmd; 64 char *pktbuf; 65 char *kbuf; 66 PacketWrapPacketWrap67 PacketWrap() { 68 pkt = NULL; 69 pipeline = NULL; 70 pktbuf = NULL; 71 kbuf = NULL; 72 memset(&hdr, 0, sizeof(hdr)); 73 memset(&cmd, 0, sizeof(cmd)); 74 } 75 setKeyPacketWrap76 void setKey(const char *key) { 77 size_t nkey = strlen(key); 78 pktbuf = new char[24 + nkey + 1]; 79 kbuf = pktbuf + 24; 80 memcpy(kbuf, key, nkey); 81 kbuf[nkey] = '\0'; 82 } 83 setContigKeyPacketWrap84 void setContigKey(const char *key) { 85 setKey(key); 86 cmd.key.type = LCB_KV_HEADER_AND_KEY; 87 cmd.key.contig.bytes = pktbuf; 88 cmd.key.contig.nbytes = strlen(key) + 24; 89 } 90 setCopyKeyPacketWrap91 void setCopyKey(const char *key) { 92 setKey(key); 93 LCB_KREQ_SIMPLE(&cmd.key, kbuf, strlen(key)); 94 } 95 setHeaderSizePacketWrap96 void setHeaderSize() { 97 hdr.request.bodylen = htonl((lcb_uint32_t)strlen(kbuf)); 98 } 99 copyHeaderPacketWrap100 void copyHeader() { 101 memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes)); 102 } 103 setCookiePacketWrap104 void setCookie(void *ptr) { 105 pkt->u_rdata.reqdata.cookie = ptr; 106 } 107 reservePacketPacketWrap108 bool reservePacket(mc_CMDQUEUE *cq) { 109 lcb_error_t err; 110 err = mcreq_basic_packet(cq, &cmd, &hdr, 0, &pkt, &pipeline, 0); 111 return err == LCB_SUCCESS; 112 } 113 ~PacketWrapPacketWrap114 ~PacketWrap() { 115 if (pktbuf != NULL) { 116 delete[] pktbuf; 117 } 118 } 119 }; 120