1 
2 /*
3  * Redistribution and use in source and binary forms, with or
4  * without modification, are permitted provided that the following
5  * conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above
8  *    copyright notice, this list of conditions and the
9  *    following disclaimer.
10  *
11  * 2. Redistributions in binary form must reproduce the above
12  *    copyright notice, this list of conditions and the following
13  *    disclaimer in the documentation and/or other materials
14  *    provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20  * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30 
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <inttypes.h>
34 #include <string.h>
35 #include <stdio.h>
36 
37 #include "nb_alloc.h"
38 #include "nb_opt.h"
39 #include "nb_key.h"
40 #include "nb_db.h"
41 #include "nb_db_memcached_bin.h"
42 
43 #include "memcached/mc.h"
44 #include "memcached/session.h"
45 
46 struct db_memcached_bin {
47 	struct tbses s;
48 	char *value;
49 	size_t value_size;
50 	char *buf;
51 	size_t buf_size;
52 };
53 
db_memcached_bin_init(struct nb_db * db,size_t value_size)54 static int db_memcached_bin_init(struct nb_db *db, size_t value_size)
55 {
56 	db->priv = nb_malloc(sizeof(struct db_memcached_bin));
57 	memset(db->priv, 0, sizeof(struct db_memcached_bin));
58 	struct db_memcached_bin *t = db->priv;
59 	t->value_size = value_size;
60 	t->value = nb_malloc(t->value_size);
61 	memset(t->value, '#', t->value_size - 1);
62 	t->value[t->value_size - 1] = 0;
63 	t->buf_size = 1024 + value_size;
64 	t->buf = nb_malloc(t->buf_size);
65 	return 0;
66 }
67 
db_memcached_bin_free(struct nb_db * db)68 static void db_memcached_bin_free(struct nb_db *db)
69 {
70 	struct db_memcached_bin *t = db->priv;
71 	tb_sesclose(&t->s);
72 	tb_sesfree(&t->s);
73 	if (t->value) free(t->value);
74 	if (t->buf) free(t->buf);
75 	free(db->priv);
76 	db->priv = NULL;
77 }
db_memcached_bin_connect(struct nb_db * db,struct nb_options * opts)78 static int db_memcached_bin_connect(struct nb_db *db, struct nb_options *opts)
79 {
80 	struct db_memcached_bin *t = db->priv;
81 	tb_sesset(&t->s, TB_HOST, opts->host);
82 	tb_sesset(&t->s, TB_PORT, opts->port);
83 	tb_sesset(&t->s, TB_SENDBUF, opts->buf_send);
84 	tb_sesset(&t->s, TB_READBUF, opts->buf_recv);
85 	int rc = tb_sesconnect(&t->s);
86 	if (rc == -1) {
87 		printf("memcached_connect() failed: %d\n", t->s.errno_);
88 		return -1;
89 	}
90 	return 0;
91 }
92 
db_memcached_bin_close(struct nb_db * db)93 static void db_memcached_bin_close(struct nb_db *db)
94 {
95 	struct db_memcached_bin *t = db->priv;
96 	tb_sesclose(&t->s);
97 }
98 
db_memcached_bin_insert(struct nb_db * db,struct nb_key * key)99 static int db_memcached_bin_insert(struct nb_db *db, struct nb_key *key)
100 {
101 	struct db_memcached_bin *t = db->priv;
102 	struct mc req;
103 	mc_init   (&req, t->buf, t->buf_size, NULL, NULL);
104 	mc_op_add (&req, key->data, key->size, t->value, t->value_size, 0, 0);
105 	uint32_t size = mc_used(&req);
106 	assert(size <= t->buf_size);
107 	int rc = tb_sessend(&t->s, t->buf, size);
108 	if (rc == -1)
109 		return -1;
110 	return 0;
111 }
112 
db_memcached_bin_replace(struct nb_db * db,struct nb_key * key)113 static int db_memcached_bin_replace(struct nb_db *db, struct nb_key *key)
114 {
115 	struct db_memcached_bin *t = db->priv;
116 	struct mc req;
117 	mc_init   (&req, t->buf, t->buf_size, NULL, NULL);
118 	mc_op_set (&req, key->data, key->size, t->value, t->value_size, 0, 0);
119 	uint32_t size = mc_used(&req);
120 	assert(size <= t->buf_size);
121 	int rc = tb_sessend(&t->s, t->buf, size);
122 	if (rc == -1)
123 		return -1;
124 	return 0;
125 }
126 
db_memcached_bin_delete(struct nb_db * db,struct nb_key * key)127 static int db_memcached_bin_delete(struct nb_db *db, struct nb_key *key)
128 {
129 	struct db_memcached_bin *t = db->priv;
130 	struct mc req;
131 	mc_init      (&req, t->buf, t->buf_size, NULL, NULL);
132 	mc_op_delete (&req, key->data, key->size);
133 	uint32_t size = mc_used(&req);
134 	assert(size <= t->buf_size);
135 	int rc = tb_sessend(&t->s, t->buf, size);
136 	if (rc == -1)
137 		return -1;
138 	return 0;
139 }
140 
db_memcached_bin_update(struct nb_db * db,struct nb_key * key)141 static int db_memcached_bin_update(struct nb_db *db, struct nb_key *key)
142 {
143 	(void) db;
144 	(void) key;
145 
146 	/* update is meaningless for memcached, because it is
147 	 * equivalent to replace */
148 	printf("memcached update is not supported\n");
149 	return -1;
150 }
151 
db_memcached_bin_select(struct nb_db * db,struct nb_key * key)152 static int db_memcached_bin_select(struct nb_db *db, struct nb_key *key)
153 {
154 	struct db_memcached_bin *t = db->priv;
155 	struct mc req;
156 	mc_init   (&req, t->buf, t->buf_size, NULL, NULL);
157 	mc_op_get (&req, key->data, key->size);
158 	uint32_t size = mc_used(&req);
159 	assert(size <= t->buf_size);
160 	int rc = tb_sessend(&t->s, t->buf, size);
161 	if (rc == -1)
162 		return -1;
163 	return 0;
164 }
165 
db_memcached_bin_recv(struct nb_db * db,int count,int * missed)166 static int db_memcached_bin_recv(struct nb_db *db, int count, int *missed)
167 {
168 	struct db_memcached_bin *t = db->priv;
169 	int rc = tb_sessync(&t->s);
170 	if (rc == -1) {
171 		printf("sync failed\n");
172 		return -1;
173 	}
174 	while (count > 0)
175 	{
176 		struct mc_response resp;
177 		ssize_t len = tb_sesrecv(&t->s, t->buf,
178 					 sizeof(struct mc_hdr), 1);
179 		if (len < (ssize_t)sizeof(struct mc_hdr)) {
180 			printf("recv failed\n");
181 			return 0;
182 		}
183 		const char *p = t->buf;
184 		ssize_t rv = mc_reply(&resp, &p, p + sizeof(struct mc_hdr));
185 		if (rv == -1) {
186 			printf("failed to parse response\n");
187 			return -1;
188 		} if (rv == 0) {
189 			goto get;
190 		}
191 		len = tb_sesrecv(&t->s, t->buf + sizeof(struct mc_hdr),
192 				 rv, 1);
193 		if (len < rv) {
194 			printf("recv failed\n");
195 			return -1;
196 		}
197 		const char *end = p + sizeof(struct mc_hdr) + rv;
198 		int64_t r = mc_reply(&resp, &p, end);
199 		if (r == -1) {
200 			printf("failed to parse response\n");
201 			return -1;
202 		}
203 get:
204 		assert(p == end);
205 		if (resp.hdr.cmd == MC_BIN_CMD_GET && resp.hdr.status)
206 			*missed = *missed + 1;
207 		else if (resp.hdr.cmd != MC_BIN_CMD_GET && resp.hdr.status)
208 			printf("server respond: %d\n", resp.hdr.status);
209 		count--;
210 	}
211 	return 0;
212 }
213 
214 struct nb_db_if nb_db_memcached_bin =
215 {
216 	.name    = "memcached_bin",
217 	.init    = db_memcached_bin_init,
218 	.free    = db_memcached_bin_free,
219 	.connect = db_memcached_bin_connect,
220 	.close   = db_memcached_bin_close,
221 	.insert  = db_memcached_bin_insert,
222 	.replace = db_memcached_bin_replace,
223 	.del     = db_memcached_bin_delete,
224 	.update  = db_memcached_bin_update,
225 	.select  = db_memcached_bin_select,
226 	.recv    = db_memcached_bin_recv
227 };
228