1 #include "iotests.h"
2 
3 using namespace std;
4 class ObseqnoTest : public MockUnitTest {
5 };
6 
7 extern "C" {
storeCb_getstok(lcb_t,int cbtype,const lcb_RESPBASE * rb)8 static void storeCb_getstok(lcb_t, int cbtype, const lcb_RESPBASE *rb)
9 {
10     EXPECT_EQ(LCB_SUCCESS, rb->rc);
11     const lcb_MUTATION_TOKEN *tmp = lcb_resp_get_mutation_token(cbtype, rb);
12     if (tmp) {
13         *(lcb_MUTATION_TOKEN *)rb->cookie = *tmp;
14     }
15 }
16 }
17 
18 static void
storeGetStok(lcb_t instance,const string & k,const string & v,lcb_MUTATION_TOKEN * st)19 storeGetStok(lcb_t instance, const string& k, const string& v, lcb_MUTATION_TOKEN *st)
20 {
21     lcb_RESPCALLBACK oldcb = lcb_get_callback3(instance, LCB_CALLBACK_STORE);
22     lcb_install_callback3(instance, LCB_CALLBACK_STORE, storeCb_getstok);
23     lcb_sched_enter(instance);
24     lcb_CMDSTORE cmd = { 0 };
25     LCB_CMD_SET_KEY(&cmd, k.c_str(), k.size());
26     LCB_CMD_SET_VALUE(&cmd, v.c_str(), v.size());
27     cmd.operation = LCB_SET;
28 
29     lcb_error_t rc = lcb_store3(instance, st, &cmd);
30     EXPECT_EQ(LCB_SUCCESS, rc);
31     lcb_sched_leave(instance);
32     lcb_wait(instance);
33     lcb_install_callback3(instance, LCB_CALLBACK_STORE, oldcb);
34 }
35 
TEST_F(ObseqnoTest,testFetchImplicit)36 TEST_F(ObseqnoTest, testFetchImplicit) {
37     SKIP_UNLESS_MOCK();
38     HandleWrap hw;
39     lcb_t instance;
40     lcb_error_t rc;
41     createConnection(hw, instance);
42     const char *key = "obseqBasic";
43     const char *value = "value";
44 
45     rc = lcb_cntl_string(instance, "dur_mutation_tokens", "true");
46     ASSERT_EQ(LCB_SUCCESS, rc);
47 
48     lcb_MUTATION_TOKEN st_fetched = { 0 };
49     storeGetStok(instance, key, value, &st_fetched);
50     ASSERT_TRUE(st_fetched.uuid_ != 0);
51 
52     lcb_KEYBUF kb;
53     LCB_KREQ_SIMPLE(&kb, key, strlen(key));
54 
55     const lcb_MUTATION_TOKEN *ss = lcb_get_mutation_token(instance, &kb, &rc);
56     ASSERT_EQ(LCB_SUCCESS, rc);
57     ASSERT_TRUE(ss != NULL);
58     ASSERT_EQ(ss->uuid_, st_fetched.uuid_);
59     ASSERT_EQ(ss->vbid_, st_fetched.vbid_);
60     ASSERT_EQ(ss->seqno_, st_fetched.seqno_);
61 }
62 
63 extern "C" {
64 static void
obseqCallback(lcb_t,int,const lcb_RESPBASE * rb)65 obseqCallback(lcb_t, int, const lcb_RESPBASE *rb) {
66     lcb_RESPOBSEQNO *pp = (lcb_RESPOBSEQNO *)rb->cookie;
67     *pp = *(const lcb_RESPOBSEQNO *)rb;
68 }
69 }
70 
71 static void
doObserveSeqno(lcb_t instance,const lcb_MUTATION_TOKEN * ss,int server,lcb_RESPOBSEQNO & resp)72 doObserveSeqno(lcb_t instance, const lcb_MUTATION_TOKEN *ss, int server, lcb_RESPOBSEQNO& resp) {
73     lcb_CMDOBSEQNO cmd = { 0 };
74     cmd.vbid = ss->vbid_;
75     cmd.uuid = ss->uuid_;
76     cmd.server_index = server;
77     lcb_error_t rc;
78 
79     lcb_sched_enter(instance);
80     rc = lcb_observe_seqno3(instance, &resp, &cmd);
81     if (rc != LCB_SUCCESS) {
82         resp.rc = rc;
83         resp.rflags |= LCB_RESP_F_CLIENTGEN;
84         return;
85     }
86 
87     lcb_RESPCALLBACK oldcb = lcb_get_callback3(instance, LCB_CALLBACK_OBSEQNO);
88     lcb_install_callback3(instance, LCB_CALLBACK_OBSEQNO, obseqCallback);
89     lcb_sched_leave(instance);
90     lcb_wait(instance);
91     lcb_install_callback3(instance, LCB_CALLBACK_OBSEQNO, oldcb);
92 }
93 
TEST_F(ObseqnoTest,testObserve)94 TEST_F(ObseqnoTest, testObserve) {
95     SKIP_UNLESS_MOCK();
96 
97     HandleWrap hw;
98     lcb_t instance;
99     createConnection(hw, instance);
100     lcbvb_CONFIG *vbc;
101 
102     lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_VBCONFIG, &vbc);
103 
104     lcb_MUTATION_TOKEN st_fetched = { 0 };
105     const char *key = "testObserve";
106     const char *value = "value";
107 
108     // Get the synctoken
109     storeGetStok(instance, key, value, &st_fetched);
110     ASSERT_TRUE(LCB_MUTATION_TOKEN_ISVALID(&st_fetched));
111 
112     for (size_t ii = 0; ii < lcbvb_get_nreplicas(vbc)+1; ii++) {
113         int ix = lcbvb_vbserver(vbc, st_fetched.vbid_, ii);
114         lcb_RESPOBSEQNO resp = { 0 };
115         doObserveSeqno(instance, &st_fetched, ix, resp);
116         ASSERT_EQ(LCB_SUCCESS, resp.rc);
117         ASSERT_EQ(st_fetched.uuid_, resp.cur_uuid);
118         ASSERT_EQ(0, resp.old_uuid);
119 //        printf("SEQ_MEM: %lu. SEQ_DISK: %lu\n", resp.mem_seqno, resp.persisted_seqno);
120         ASSERT_GT(resp.mem_seqno, 0);
121         ASSERT_EQ(resp.mem_seqno, resp.persisted_seqno);
122     }
123 }
124 
TEST_F(ObseqnoTest,testFailoverFormat)125 TEST_F(ObseqnoTest, testFailoverFormat) {
126     SKIP_UNLESS_MOCK();
127     HandleWrap hw;
128     lcb_t instance;
129     createConnection(hw, instance);
130     lcbvb_CONFIG *vbc;
131 
132     lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_VBCONFIG, &vbc);
133     const char *key = "testObserve";
134     const char *value = "value";
135 
136     lcb_MUTATION_TOKEN st_fetched = { 0 };
137     storeGetStok(instance, key, value, &st_fetched);
138 
139     MockEnvironment *env = MockEnvironment::getInstance();
140     env->regenVbCoords();
141 
142     // Now we should get a different sequence number
143     lcb_RESPOBSEQNO rr;
144     doObserveSeqno(instance, &st_fetched, lcbvb_vbmaster(vbc, st_fetched.vbid_), rr);
145     ASSERT_EQ(LCB_SUCCESS, rr.rc);
146 //    printf("Old UUID: %llu\n", rr.old_uuid);
147 //    printf("Cur UUID: %llu\n", rr.cur_uuid);
148     ASSERT_GT(rr.old_uuid, 0);
149     ASSERT_EQ(rr.old_uuid, st_fetched.uuid_);
150     ASSERT_NE(rr.old_uuid, rr.cur_uuid);
151     ASSERT_EQ(rr.old_seqno, st_fetched.seqno_);
152 
153 }
154 
155 // TODO: We should add more tests here, but in order to do this, we must
156 // first validate the mock.
157