1
2 /*
3 * Redistribution and use in source and binary forms, with or
4 * without modification, are permitted provided that the following
5 * conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above
8 * copyright notice, this list of conditions and the
9 * following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <inttypes.h>
34 #include <string.h>
35 #include <stdio.h>
36
37 #include "nb_alloc.h"
38 #include "nb_opt.h"
39 #include "nb_key.h"
40 #include "nb_db.h"
41 #include "nb_db_memcached_bin.h"
42
43 #include "memcached/mc.h"
44 #include "memcached/session.h"
45
46 struct db_memcached_bin {
47 struct tbses s;
48 char *value;
49 size_t value_size;
50 char *buf;
51 size_t buf_size;
52 };
53
db_memcached_bin_init(struct nb_db * db,size_t value_size)54 static int db_memcached_bin_init(struct nb_db *db, size_t value_size)
55 {
56 db->priv = nb_malloc(sizeof(struct db_memcached_bin));
57 memset(db->priv, 0, sizeof(struct db_memcached_bin));
58 struct db_memcached_bin *t = db->priv;
59 t->value_size = value_size;
60 t->value = nb_malloc(t->value_size);
61 memset(t->value, '#', t->value_size - 1);
62 t->value[t->value_size - 1] = 0;
63 t->buf_size = 1024 + value_size;
64 t->buf = nb_malloc(t->buf_size);
65 return 0;
66 }
67
db_memcached_bin_free(struct nb_db * db)68 static void db_memcached_bin_free(struct nb_db *db)
69 {
70 struct db_memcached_bin *t = db->priv;
71 tb_sesclose(&t->s);
72 tb_sesfree(&t->s);
73 if (t->value) free(t->value);
74 if (t->buf) free(t->buf);
75 free(db->priv);
76 db->priv = NULL;
77 }
db_memcached_bin_connect(struct nb_db * db,struct nb_options * opts)78 static int db_memcached_bin_connect(struct nb_db *db, struct nb_options *opts)
79 {
80 struct db_memcached_bin *t = db->priv;
81 tb_sesset(&t->s, TB_HOST, opts->host);
82 tb_sesset(&t->s, TB_PORT, opts->port);
83 tb_sesset(&t->s, TB_SENDBUF, opts->buf_send);
84 tb_sesset(&t->s, TB_READBUF, opts->buf_recv);
85 int rc = tb_sesconnect(&t->s);
86 if (rc == -1) {
87 printf("memcached_connect() failed: %d\n", t->s.errno_);
88 return -1;
89 }
90 return 0;
91 }
92
db_memcached_bin_close(struct nb_db * db)93 static void db_memcached_bin_close(struct nb_db *db)
94 {
95 struct db_memcached_bin *t = db->priv;
96 tb_sesclose(&t->s);
97 }
98
db_memcached_bin_insert(struct nb_db * db,struct nb_key * key)99 static int db_memcached_bin_insert(struct nb_db *db, struct nb_key *key)
100 {
101 struct db_memcached_bin *t = db->priv;
102 struct mc req;
103 mc_init (&req, t->buf, t->buf_size, NULL, NULL);
104 mc_op_add (&req, key->data, key->size, t->value, t->value_size, 0, 0);
105 uint32_t size = mc_used(&req);
106 assert(size <= t->buf_size);
107 int rc = tb_sessend(&t->s, t->buf, size);
108 if (rc == -1)
109 return -1;
110 return 0;
111 }
112
db_memcached_bin_replace(struct nb_db * db,struct nb_key * key)113 static int db_memcached_bin_replace(struct nb_db *db, struct nb_key *key)
114 {
115 struct db_memcached_bin *t = db->priv;
116 struct mc req;
117 mc_init (&req, t->buf, t->buf_size, NULL, NULL);
118 mc_op_set (&req, key->data, key->size, t->value, t->value_size, 0, 0);
119 uint32_t size = mc_used(&req);
120 assert(size <= t->buf_size);
121 int rc = tb_sessend(&t->s, t->buf, size);
122 if (rc == -1)
123 return -1;
124 return 0;
125 }
126
db_memcached_bin_delete(struct nb_db * db,struct nb_key * key)127 static int db_memcached_bin_delete(struct nb_db *db, struct nb_key *key)
128 {
129 struct db_memcached_bin *t = db->priv;
130 struct mc req;
131 mc_init (&req, t->buf, t->buf_size, NULL, NULL);
132 mc_op_delete (&req, key->data, key->size);
133 uint32_t size = mc_used(&req);
134 assert(size <= t->buf_size);
135 int rc = tb_sessend(&t->s, t->buf, size);
136 if (rc == -1)
137 return -1;
138 return 0;
139 }
140
db_memcached_bin_update(struct nb_db * db,struct nb_key * key)141 static int db_memcached_bin_update(struct nb_db *db, struct nb_key *key)
142 {
143 (void) db;
144 (void) key;
145
146 /* update is meaningless for memcached, because it is
147 * equivalent to replace */
148 printf("memcached update is not supported\n");
149 return -1;
150 }
151
db_memcached_bin_select(struct nb_db * db,struct nb_key * key)152 static int db_memcached_bin_select(struct nb_db *db, struct nb_key *key)
153 {
154 struct db_memcached_bin *t = db->priv;
155 struct mc req;
156 mc_init (&req, t->buf, t->buf_size, NULL, NULL);
157 mc_op_get (&req, key->data, key->size);
158 uint32_t size = mc_used(&req);
159 assert(size <= t->buf_size);
160 int rc = tb_sessend(&t->s, t->buf, size);
161 if (rc == -1)
162 return -1;
163 return 0;
164 }
165
db_memcached_bin_recv(struct nb_db * db,int count,int * missed)166 static int db_memcached_bin_recv(struct nb_db *db, int count, int *missed)
167 {
168 struct db_memcached_bin *t = db->priv;
169 int rc = tb_sessync(&t->s);
170 if (rc == -1) {
171 printf("sync failed\n");
172 return -1;
173 }
174 while (count > 0)
175 {
176 struct mc_response resp;
177 ssize_t len = tb_sesrecv(&t->s, t->buf,
178 sizeof(struct mc_hdr), 1);
179 if (len < (ssize_t)sizeof(struct mc_hdr)) {
180 printf("recv failed\n");
181 return 0;
182 }
183 const char *p = t->buf;
184 ssize_t rv = mc_reply(&resp, &p, p + sizeof(struct mc_hdr));
185 if (rv == -1) {
186 printf("failed to parse response\n");
187 return -1;
188 } if (rv == 0) {
189 goto get;
190 }
191 len = tb_sesrecv(&t->s, t->buf + sizeof(struct mc_hdr),
192 rv, 1);
193 if (len < rv) {
194 printf("recv failed\n");
195 return -1;
196 }
197 const char *end = p + sizeof(struct mc_hdr) + rv;
198 int64_t r = mc_reply(&resp, &p, end);
199 if (r == -1) {
200 printf("failed to parse response\n");
201 return -1;
202 }
203 get:
204 assert(p == end);
205 if (resp.hdr.cmd == MC_BIN_CMD_GET && resp.hdr.status)
206 *missed = *missed + 1;
207 else if (resp.hdr.cmd != MC_BIN_CMD_GET && resp.hdr.status)
208 printf("server respond: %d\n", resp.hdr.status);
209 count--;
210 }
211 return 0;
212 }
213
214 struct nb_db_if nb_db_memcached_bin =
215 {
216 .name = "memcached_bin",
217 .init = db_memcached_bin_init,
218 .free = db_memcached_bin_free,
219 .connect = db_memcached_bin_connect,
220 .close = db_memcached_bin_close,
221 .insert = db_memcached_bin_insert,
222 .replace = db_memcached_bin_replace,
223 .del = db_memcached_bin_delete,
224 .update = db_memcached_bin_update,
225 .select = db_memcached_bin_select,
226 .recv = db_memcached_bin_recv
227 };
228