1 
2 /*
3  * Redistribution and use in source and binary forms, with or
4  * without modification, are permitted provided that the following
5  * conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above
8  *    copyright notice, this list of conditions and the
9  *    following disclaimer.
10  *
11  * 2. Redistributions in binary form must reproduce the above
12  *    copyright notice, this list of conditions and the following
13  *    disclaimer in the documentation and/or other materials
14  *    provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20  * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30 
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdarg.h>
34 #include <string.h>
35 #include <stdint.h>
36 #include <stdbool.h>
37 
38 #include <sys/uio.h>
39 
40 #include <uri.h>
41 
42 #include <tarantool/tnt_proto.h>
43 #include <tarantool/tnt_reply.h>
44 #include <tarantool/tnt_stream.h>
45 #include <tarantool/tnt_object.h>
46 #include <tarantool/tnt_mem.h>
47 #include <tarantool/tnt_schema.h>
48 #include <tarantool/tnt_select.h>
49 #include <tarantool/tnt_iter.h>
50 #include <tarantool/tnt_auth.h>
51 
52 #include <tarantool/tnt_net.h>
53 #include <tarantool/tnt_io.h>
54 
55 #include "pmatomic.h"
56 
tnt_net_free(struct tnt_stream * s)57 static void tnt_net_free(struct tnt_stream *s) {
58 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
59 	tnt_io_close(sn);
60 	tnt_mem_free(sn->greeting);
61 	tnt_iob_free(&sn->sbuf);
62 	tnt_iob_free(&sn->rbuf);
63 	tnt_opt_free(&sn->opt);
64 	tnt_schema_free(sn->schema);
65 	tnt_mem_free(sn->schema);
66 	tnt_mem_free(s->data);
67 	s->data = NULL;
68 }
69 
70 static ssize_t
tnt_net_read(struct tnt_stream * s,char * buf,size_t size)71 tnt_net_read(struct tnt_stream *s, char *buf, size_t size) {
72 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
73 	/* read doesn't touches wrcnt */
74 	return tnt_io_recv(sn, buf, size);
75 }
76 
77 static ssize_t
tnt_net_write(struct tnt_stream * s,const char * buf,size_t size)78 tnt_net_write(struct tnt_stream *s, const char *buf, size_t size) {
79 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
80 	ssize_t rc = tnt_io_send(sn, buf, size);
81 	if (rc != -1)
82 		pm_atomic_fetch_add(&s->wrcnt, 1);
83 	return rc;
84 }
85 
86 static ssize_t
tnt_net_writev(struct tnt_stream * s,struct iovec * iov,int count)87 tnt_net_writev(struct tnt_stream *s, struct iovec *iov, int count) {
88 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
89 	ssize_t rc = tnt_io_sendv(sn, iov, count);
90 	if (rc != -1)
91 		pm_atomic_fetch_add(&s->wrcnt, 1);
92 	return rc;
93 }
94 
95 static ssize_t
tnt_net_recv_cb(struct tnt_stream * s,char * buf,ssize_t size)96 tnt_net_recv_cb(struct tnt_stream *s, char *buf, ssize_t size) {
97 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
98 	return tnt_io_recv(sn, buf, size);
99 }
100 
101 static int
tnt_net_reply(struct tnt_stream * s,struct tnt_reply * r)102 tnt_net_reply(struct tnt_stream *s, struct tnt_reply *r) {
103 	if (pm_atomic_load(&s->wrcnt) == 0)
104 		return 1;
105 	int rv = tnt_reply_from(r, (tnt_reply_t)tnt_net_recv_cb, s);
106 	if (r->error || (r->code & TNT_CHUNK) == 0) {
107 		pm_atomic_fetch_sub(&s->wrcnt, 1);
108 	}
109 	return rv;
110 }
111 
tnt_net(struct tnt_stream * s)112 struct tnt_stream *tnt_net(struct tnt_stream *s) {
113 	s = tnt_stream_init(s);
114 	if (s == NULL)
115 		return NULL;
116 	/* allocating stream data */
117 	s->data = tnt_mem_alloc(sizeof(struct tnt_stream_net));
118 	if (s->data == NULL) {
119 		tnt_stream_free(s);
120 		return NULL;
121 	}
122 	memset(s->data, 0, sizeof(struct tnt_stream_net));
123 	/* initializing interfaces */
124 	s->read = tnt_net_read;
125 	s->read_reply = tnt_net_reply;
126 	s->write = tnt_net_write;
127 	s->writev = tnt_net_writev;
128 	s->free = tnt_net_free;
129 	/* initializing internal data */
130 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
131 	sn->fd = -1;
132 	sn->greeting = tnt_mem_alloc(TNT_GREETING_SIZE);
133 	if (sn->greeting == NULL) {
134 		tnt_stream_free(s);
135 	}
136 	if (tnt_opt_init(&sn->opt) == -1) {
137 		tnt_stream_free(s);
138 	}
139 	return s;
140 }
141 
tnt_set(struct tnt_stream * s,int opt,...)142 int tnt_set(struct tnt_stream *s, int opt, ...) {
143 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
144 	va_list args;
145 	va_start(args, opt);
146 	sn->error = tnt_opt_set(&sn->opt, opt, args);
147 	va_end(args);
148 	return (sn->error == TNT_EOK) ? 0 : -1;
149 }
150 
tnt_init(struct tnt_stream * s)151 int tnt_init(struct tnt_stream *s) {
152 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
153 	if ((sn->schema = tnt_schema_new(NULL)) == NULL) {
154 		sn->error = TNT_EMEMORY;
155 		return -1;
156 	}
157 	if (tnt_iob_init(&sn->sbuf, sn->opt.send_buf, sn->opt.send_cb,
158 		sn->opt.send_cbv, sn->opt.send_cb_arg) == -1) {
159 		sn->error = TNT_EMEMORY;
160 		return -1;
161 	}
162 	if (tnt_iob_init(&sn->rbuf, sn->opt.recv_buf, sn->opt.recv_cb, NULL,
163 		sn->opt.recv_cb_arg) == -1) {
164 		sn->error = TNT_EMEMORY;
165 		return -1;
166 	}
167 	sn->inited = 1;
168 	return 0;
169 }
170 
tnt_reload_schema(struct tnt_stream * s)171 int tnt_reload_schema(struct tnt_stream *s)
172 {
173 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
174 	if (!sn->connected || pm_atomic_load(&s->wrcnt) != 0)
175 		return -1;
176 	uint64_t oldsync = tnt_stream_reqid(s, 127);
177 	tnt_get_space(s);
178 	tnt_get_index(s);
179 	tnt_stream_reqid(s, oldsync);
180 	tnt_flush(s);
181 	struct tnt_iter it; tnt_iter_reply(&it, s);
182 	struct tnt_reply bkp; tnt_reply_init(&bkp);
183 	int sloaded = 0;
184 	while (tnt_next(&it)) {
185 		struct tnt_reply *r = TNT_IREPLY_PTR(&it);
186 		switch (r->sync) {
187 		case(127):
188 			if (r->error)
189 				goto error;
190 			tnt_schema_add_spaces(sn->schema, r);
191 			sloaded += 1;
192 			break;
193 		case(128):
194 			if (r->error)
195 				goto error;
196 			if (!(sloaded & 1)) {
197 				memcpy(&bkp, r, sizeof(struct tnt_reply));
198 				r->buf = NULL;
199 				break;
200 			}
201 			sloaded += 2;
202 			tnt_schema_add_indexes(sn->schema, r);
203 			break;
204 		default:
205 			goto error;
206 		}
207 	}
208 	if (bkp.buf) {
209 		tnt_schema_add_indexes(sn->schema, &bkp);
210 		sloaded += 2;
211 		tnt_reply_free(&bkp);
212 	}
213 	if (sloaded != 3) goto error;
214 
215 	tnt_iter_free(&it);
216 	return 0;
217 error:
218 	tnt_iter_free(&it);
219 	return -1;
220 }
221 
222 static int
tnt_authenticate(struct tnt_stream * s)223 tnt_authenticate(struct tnt_stream *s)
224 {
225 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
226 	if (!sn->connected || pm_atomic_load(&s->wrcnt) != 0)
227 		return -1;
228 	struct uri *uri = sn->opt.uri;
229 	tnt_auth(s, uri->login, uri->login_len, uri->password,
230 		 uri->password_len);
231 	tnt_flush(s);
232 	struct tnt_reply rep;
233 	tnt_reply_init(&rep);
234 	if (s->read_reply(s, &rep) == -1)
235 		return -1;
236 	if (rep.error != NULL) {
237 		if (TNT_REPLY_ERR(&rep) == TNT_ER_PASSWORD_MISMATCH)
238 			sn->error = TNT_ELOGIN;
239 		return -1;
240 	}
241 	tnt_reply_free(&rep);
242 	tnt_reload_schema(s);
243 	return 0;
244 }
245 
tnt_connect(struct tnt_stream * s)246 int tnt_connect(struct tnt_stream *s)
247 {
248 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
249 	if (!sn->inited) tnt_init(s);
250 	if (sn->connected)
251 		tnt_close(s);
252 	sn->error = tnt_io_connect(sn);
253 	if (sn->error != TNT_EOK)
254 		return -1;
255 	if (s->read(s, sn->greeting, TNT_GREETING_SIZE) == -1 ||
256 	    sn->error != TNT_EOK)
257 		return -1;
258 	if (sn->opt.uri->login && sn->opt.uri->password)
259 		if (tnt_authenticate(s) == -1)
260 			return -1;
261 	return 0;
262 }
263 
tnt_close(struct tnt_stream * s)264 void tnt_close(struct tnt_stream *s) {
265 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
266 	tnt_iob_clear(&sn->sbuf);
267 	tnt_iob_clear(&sn->rbuf);
268 	tnt_io_close(sn);
269 	s->wrcnt = 0;
270 	s->reqid = 0;
271 }
272 
tnt_flush(struct tnt_stream * s)273 ssize_t tnt_flush(struct tnt_stream *s) {
274 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
275 	return tnt_io_flush(sn);
276 }
277 
tnt_fd(struct tnt_stream * s)278 int tnt_fd(struct tnt_stream *s) {
279 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
280 	return sn->fd;
281 }
282 
tnt_error(struct tnt_stream * s)283 enum tnt_error tnt_error(struct tnt_stream *s) {
284 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
285 	return sn->error;
286 }
287 
288 /* must be in sync with enum tnt_error */
289 
290 struct tnt_error_desc {
291 	enum tnt_error type;
292 	char *desc;
293 };
294 
295 static struct tnt_error_desc tnt_error_list[] =
296 {
297 	{ TNT_EOK,      "ok"                       },
298 	{ TNT_EFAIL,    "fail"                     },
299 	{ TNT_EMEMORY,  "memory allocation failed" },
300 	{ TNT_ESYSTEM,  "system error"             },
301 	{ TNT_EBIG,     "buffer is too big"        },
302 	{ TNT_ESIZE,    "bad buffer size"          },
303 	{ TNT_ERESOLVE, "gethostbyname(2) failed"  },
304 	{ TNT_ETMOUT,   "operation timeout"        },
305 	{ TNT_EBADVAL,  "bad argument"             },
306 	{ TNT_ELOGIN,   "failed to login"          },
307 	{ TNT_LAST,      NULL                      }
308 };
309 
tnt_strerror(struct tnt_stream * s)310 char *tnt_strerror(struct tnt_stream *s)
311 {
312 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
313 	if (sn->error == TNT_ESYSTEM) {
314 		static char msg[256];
315 		snprintf(msg, sizeof(msg), "%s (errno: %d)",
316 			 strerror(sn->errno_), sn->errno_);
317 		return msg;
318 	}
319 	return tnt_error_list[(int)sn->error].desc;
320 }
321 
tnt_errno(struct tnt_stream * s)322 int tnt_errno(struct tnt_stream *s) {
323 	struct tnt_stream_net *sn = TNT_SNET_CAST(s);
324 	return sn->errno_;
325 }
326 
tnt_get_spaceno(struct tnt_stream * s,const char * space,size_t space_len)327 int tnt_get_spaceno(struct tnt_stream *s, const char *space,
328 		    size_t space_len)
329 {
330 	struct tnt_schema *sch = (TNT_SNET_CAST(s))->schema;
331 	return tnt_schema_stosid(sch, space, space_len);
332 }
333 
tnt_get_indexno(struct tnt_stream * s,int spaceno,const char * index,size_t index_len)334 int tnt_get_indexno(struct tnt_stream *s, int spaceno, const char *index,
335 		    size_t index_len)
336 {
337 	struct tnt_schema *sch = TNT_SNET_CAST(s)->schema;
338 	return tnt_schema_stoiid(sch, spaceno, index, index_len);
339 }
340