1
2 /*
3 * Redistribution and use in source and binary forms, with or
4 * without modification, are permitted provided that the following
5 * conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above
8 * copyright notice, this list of conditions and the
9 * following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdarg.h>
34 #include <string.h>
35 #include <stdint.h>
36 #include <stdbool.h>
37
38 #include <sys/uio.h>
39
40 #include <uri.h>
41
42 #include <tarantool/tnt_proto.h>
43 #include <tarantool/tnt_reply.h>
44 #include <tarantool/tnt_stream.h>
45 #include <tarantool/tnt_object.h>
46 #include <tarantool/tnt_mem.h>
47 #include <tarantool/tnt_schema.h>
48 #include <tarantool/tnt_select.h>
49 #include <tarantool/tnt_iter.h>
50 #include <tarantool/tnt_auth.h>
51
52 #include <tarantool/tnt_net.h>
53 #include <tarantool/tnt_io.h>
54
55 #include "pmatomic.h"
56
tnt_net_free(struct tnt_stream * s)57 static void tnt_net_free(struct tnt_stream *s) {
58 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
59 tnt_io_close(sn);
60 tnt_mem_free(sn->greeting);
61 tnt_iob_free(&sn->sbuf);
62 tnt_iob_free(&sn->rbuf);
63 tnt_opt_free(&sn->opt);
64 tnt_schema_free(sn->schema);
65 tnt_mem_free(sn->schema);
66 tnt_mem_free(s->data);
67 s->data = NULL;
68 }
69
70 static ssize_t
tnt_net_read(struct tnt_stream * s,char * buf,size_t size)71 tnt_net_read(struct tnt_stream *s, char *buf, size_t size) {
72 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
73 /* read doesn't touches wrcnt */
74 return tnt_io_recv(sn, buf, size);
75 }
76
77 static ssize_t
tnt_net_write(struct tnt_stream * s,const char * buf,size_t size)78 tnt_net_write(struct tnt_stream *s, const char *buf, size_t size) {
79 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
80 ssize_t rc = tnt_io_send(sn, buf, size);
81 if (rc != -1)
82 pm_atomic_fetch_add(&s->wrcnt, 1);
83 return rc;
84 }
85
86 static ssize_t
tnt_net_writev(struct tnt_stream * s,struct iovec * iov,int count)87 tnt_net_writev(struct tnt_stream *s, struct iovec *iov, int count) {
88 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
89 ssize_t rc = tnt_io_sendv(sn, iov, count);
90 if (rc != -1)
91 pm_atomic_fetch_add(&s->wrcnt, 1);
92 return rc;
93 }
94
95 static ssize_t
tnt_net_recv_cb(struct tnt_stream * s,char * buf,ssize_t size)96 tnt_net_recv_cb(struct tnt_stream *s, char *buf, ssize_t size) {
97 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
98 return tnt_io_recv(sn, buf, size);
99 }
100
101 static int
tnt_net_reply(struct tnt_stream * s,struct tnt_reply * r)102 tnt_net_reply(struct tnt_stream *s, struct tnt_reply *r) {
103 if (pm_atomic_load(&s->wrcnt) == 0)
104 return 1;
105 int rv = tnt_reply_from(r, (tnt_reply_t)tnt_net_recv_cb, s);
106 if (r->error || (r->code & TNT_CHUNK) == 0) {
107 pm_atomic_fetch_sub(&s->wrcnt, 1);
108 }
109 return rv;
110 }
111
tnt_net(struct tnt_stream * s)112 struct tnt_stream *tnt_net(struct tnt_stream *s) {
113 s = tnt_stream_init(s);
114 if (s == NULL)
115 return NULL;
116 /* allocating stream data */
117 s->data = tnt_mem_alloc(sizeof(struct tnt_stream_net));
118 if (s->data == NULL) {
119 tnt_stream_free(s);
120 return NULL;
121 }
122 memset(s->data, 0, sizeof(struct tnt_stream_net));
123 /* initializing interfaces */
124 s->read = tnt_net_read;
125 s->read_reply = tnt_net_reply;
126 s->write = tnt_net_write;
127 s->writev = tnt_net_writev;
128 s->free = tnt_net_free;
129 /* initializing internal data */
130 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
131 sn->fd = -1;
132 sn->greeting = tnt_mem_alloc(TNT_GREETING_SIZE);
133 if (sn->greeting == NULL) {
134 tnt_stream_free(s);
135 }
136 if (tnt_opt_init(&sn->opt) == -1) {
137 tnt_stream_free(s);
138 }
139 return s;
140 }
141
tnt_set(struct tnt_stream * s,int opt,...)142 int tnt_set(struct tnt_stream *s, int opt, ...) {
143 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
144 va_list args;
145 va_start(args, opt);
146 sn->error = tnt_opt_set(&sn->opt, opt, args);
147 va_end(args);
148 return (sn->error == TNT_EOK) ? 0 : -1;
149 }
150
tnt_init(struct tnt_stream * s)151 int tnt_init(struct tnt_stream *s) {
152 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
153 if ((sn->schema = tnt_schema_new(NULL)) == NULL) {
154 sn->error = TNT_EMEMORY;
155 return -1;
156 }
157 if (tnt_iob_init(&sn->sbuf, sn->opt.send_buf, sn->opt.send_cb,
158 sn->opt.send_cbv, sn->opt.send_cb_arg) == -1) {
159 sn->error = TNT_EMEMORY;
160 return -1;
161 }
162 if (tnt_iob_init(&sn->rbuf, sn->opt.recv_buf, sn->opt.recv_cb, NULL,
163 sn->opt.recv_cb_arg) == -1) {
164 sn->error = TNT_EMEMORY;
165 return -1;
166 }
167 sn->inited = 1;
168 return 0;
169 }
170
tnt_reload_schema(struct tnt_stream * s)171 int tnt_reload_schema(struct tnt_stream *s)
172 {
173 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
174 if (!sn->connected || pm_atomic_load(&s->wrcnt) != 0)
175 return -1;
176 uint64_t oldsync = tnt_stream_reqid(s, 127);
177 tnt_get_space(s);
178 tnt_get_index(s);
179 tnt_stream_reqid(s, oldsync);
180 tnt_flush(s);
181 struct tnt_iter it; tnt_iter_reply(&it, s);
182 struct tnt_reply bkp; tnt_reply_init(&bkp);
183 int sloaded = 0;
184 while (tnt_next(&it)) {
185 struct tnt_reply *r = TNT_IREPLY_PTR(&it);
186 switch (r->sync) {
187 case(127):
188 if (r->error)
189 goto error;
190 tnt_schema_add_spaces(sn->schema, r);
191 sloaded += 1;
192 break;
193 case(128):
194 if (r->error)
195 goto error;
196 if (!(sloaded & 1)) {
197 memcpy(&bkp, r, sizeof(struct tnt_reply));
198 r->buf = NULL;
199 break;
200 }
201 sloaded += 2;
202 tnt_schema_add_indexes(sn->schema, r);
203 break;
204 default:
205 goto error;
206 }
207 }
208 if (bkp.buf) {
209 tnt_schema_add_indexes(sn->schema, &bkp);
210 sloaded += 2;
211 tnt_reply_free(&bkp);
212 }
213 if (sloaded != 3) goto error;
214
215 tnt_iter_free(&it);
216 return 0;
217 error:
218 tnt_iter_free(&it);
219 return -1;
220 }
221
222 static int
tnt_authenticate(struct tnt_stream * s)223 tnt_authenticate(struct tnt_stream *s)
224 {
225 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
226 if (!sn->connected || pm_atomic_load(&s->wrcnt) != 0)
227 return -1;
228 struct uri *uri = sn->opt.uri;
229 tnt_auth(s, uri->login, uri->login_len, uri->password,
230 uri->password_len);
231 tnt_flush(s);
232 struct tnt_reply rep;
233 tnt_reply_init(&rep);
234 if (s->read_reply(s, &rep) == -1)
235 return -1;
236 if (rep.error != NULL) {
237 if (TNT_REPLY_ERR(&rep) == TNT_ER_PASSWORD_MISMATCH)
238 sn->error = TNT_ELOGIN;
239 return -1;
240 }
241 tnt_reply_free(&rep);
242 tnt_reload_schema(s);
243 return 0;
244 }
245
tnt_connect(struct tnt_stream * s)246 int tnt_connect(struct tnt_stream *s)
247 {
248 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
249 if (!sn->inited) tnt_init(s);
250 if (sn->connected)
251 tnt_close(s);
252 sn->error = tnt_io_connect(sn);
253 if (sn->error != TNT_EOK)
254 return -1;
255 if (s->read(s, sn->greeting, TNT_GREETING_SIZE) == -1 ||
256 sn->error != TNT_EOK)
257 return -1;
258 if (sn->opt.uri->login && sn->opt.uri->password)
259 if (tnt_authenticate(s) == -1)
260 return -1;
261 return 0;
262 }
263
tnt_close(struct tnt_stream * s)264 void tnt_close(struct tnt_stream *s) {
265 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
266 tnt_iob_clear(&sn->sbuf);
267 tnt_iob_clear(&sn->rbuf);
268 tnt_io_close(sn);
269 s->wrcnt = 0;
270 s->reqid = 0;
271 }
272
tnt_flush(struct tnt_stream * s)273 ssize_t tnt_flush(struct tnt_stream *s) {
274 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
275 return tnt_io_flush(sn);
276 }
277
tnt_fd(struct tnt_stream * s)278 int tnt_fd(struct tnt_stream *s) {
279 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
280 return sn->fd;
281 }
282
tnt_error(struct tnt_stream * s)283 enum tnt_error tnt_error(struct tnt_stream *s) {
284 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
285 return sn->error;
286 }
287
288 /* must be in sync with enum tnt_error */
289
290 struct tnt_error_desc {
291 enum tnt_error type;
292 char *desc;
293 };
294
295 static struct tnt_error_desc tnt_error_list[] =
296 {
297 { TNT_EOK, "ok" },
298 { TNT_EFAIL, "fail" },
299 { TNT_EMEMORY, "memory allocation failed" },
300 { TNT_ESYSTEM, "system error" },
301 { TNT_EBIG, "buffer is too big" },
302 { TNT_ESIZE, "bad buffer size" },
303 { TNT_ERESOLVE, "gethostbyname(2) failed" },
304 { TNT_ETMOUT, "operation timeout" },
305 { TNT_EBADVAL, "bad argument" },
306 { TNT_ELOGIN, "failed to login" },
307 { TNT_LAST, NULL }
308 };
309
tnt_strerror(struct tnt_stream * s)310 char *tnt_strerror(struct tnt_stream *s)
311 {
312 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
313 if (sn->error == TNT_ESYSTEM) {
314 static char msg[256];
315 snprintf(msg, sizeof(msg), "%s (errno: %d)",
316 strerror(sn->errno_), sn->errno_);
317 return msg;
318 }
319 return tnt_error_list[(int)sn->error].desc;
320 }
321
tnt_errno(struct tnt_stream * s)322 int tnt_errno(struct tnt_stream *s) {
323 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
324 return sn->errno_;
325 }
326
tnt_get_spaceno(struct tnt_stream * s,const char * space,size_t space_len)327 int tnt_get_spaceno(struct tnt_stream *s, const char *space,
328 size_t space_len)
329 {
330 struct tnt_schema *sch = (TNT_SNET_CAST(s))->schema;
331 return tnt_schema_stosid(sch, space, space_len);
332 }
333
tnt_get_indexno(struct tnt_stream * s,int spaceno,const char * index,size_t index_len)334 int tnt_get_indexno(struct tnt_stream *s, int spaceno, const char *index,
335 size_t index_len)
336 {
337 struct tnt_schema *sch = TNT_SNET_CAST(s)->schema;
338 return tnt_schema_stoiid(sch, spaceno, index, index_len);
339 }
340