1
2 /*
3 * Redistribution and use in source and binary forms, with or
4 * without modification, are permitted provided that the following
5 * conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above
8 * copyright notice, this list of conditions and the
9 * following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31 #include <stdlib.h>
32 #include <stdio.h>
33 #include <stdarg.h>
34 #include <stdint.h>
35 #include <string.h>
36 #include <stdbool.h>
37
38 #include <sys/uio.h>
39
40 #include <uri.h>
41
42 #include <tarantool/tnt_proto.h>
43 #include <tarantool/tnt_reply.h>
44 #include <tarantool/tnt_stream.h>
45 #include <tarantool/tnt_object.h>
46 #include <tarantool/tnt_mem.h>
47 #include <tarantool/tnt_schema.h>
48 #include <tarantool/tnt_select.h>
49 #include <tarantool/tnt_iter.h>
50 #include <tarantool/tnt_auth.h>
51
52 #include <tarantool/tnt_net.h>
53 #include <tarantool/tnt_io.h>
54
tnt_net_free(struct tnt_stream * s)55 static void tnt_net_free(struct tnt_stream *s) {
56 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
57 tnt_io_close(sn);
58 tnt_mem_free(sn->greeting);
59 tnt_iob_free(&sn->sbuf);
60 tnt_iob_free(&sn->rbuf);
61 tnt_opt_free(&sn->opt);
62 tnt_schema_free(sn->schema);
63 tnt_mem_free(sn->schema);
64 tnt_mem_free(s->data);
65 s->data = NULL;
66 }
67
68 static ssize_t
tnt_net_read(struct tnt_stream * s,char * buf,size_t size)69 tnt_net_read(struct tnt_stream *s, char *buf, size_t size) {
70 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
71 /* read doesn't touches wrcnt */
72 return tnt_io_recv(sn, buf, size);
73 }
74
75 static ssize_t
tnt_net_write(struct tnt_stream * s,const char * buf,size_t size)76 tnt_net_write(struct tnt_stream *s, const char *buf, size_t size) {
77 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
78 ssize_t rc = tnt_io_send(sn, buf, size);
79 if (rc != -1)
80 s->wrcnt++;
81 return rc;
82 }
83
84 static ssize_t
tnt_net_writev(struct tnt_stream * s,struct iovec * iov,int count)85 tnt_net_writev(struct tnt_stream *s, struct iovec *iov, int count) {
86 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
87 ssize_t rc = tnt_io_sendv(sn, iov, count);
88 if (rc != -1)
89 s->wrcnt++;
90 return rc;
91 }
92
93 static ssize_t
tnt_net_recv_cb(struct tnt_stream * s,char * buf,ssize_t size)94 tnt_net_recv_cb(struct tnt_stream *s, char *buf, ssize_t size) {
95 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
96 return tnt_io_recv(sn, buf, size);
97 }
98
99 static int
tnt_net_reply(struct tnt_stream * s,struct tnt_reply * r)100 tnt_net_reply(struct tnt_stream *s, struct tnt_reply *r) {
101 if (s->wrcnt == 0)
102 return 1;
103 s->wrcnt--;
104 return tnt_reply_from(r, (tnt_reply_t)tnt_net_recv_cb, s);
105 }
106
tnt_net(struct tnt_stream * s)107 struct tnt_stream *tnt_net(struct tnt_stream *s) {
108 int allocated = s == NULL;
109 s = tnt_stream_init(s);
110 if (s == NULL)
111 return NULL;
112 /* allocating stream data */
113 s->data = tnt_mem_alloc(sizeof(struct tnt_stream_net));
114 if (s->data == NULL) {
115 if (allocated)
116 tnt_stream_free(s);
117 return NULL;
118 }
119 memset(s->data, 0, sizeof(struct tnt_stream_net));
120 /* initializing interfaces */
121 s->read = tnt_net_read;
122 s->read_reply = tnt_net_reply;
123 s->write = tnt_net_write;
124 s->writev = tnt_net_writev;
125 s->free = tnt_net_free;
126 /* initializing internal data */
127 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
128 sn->fd = -1;
129 sn->greeting = tnt_mem_alloc(TNT_GREETING_SIZE);
130 if (sn->greeting == NULL) {
131 s->free(s);
132 if (allocated)
133 tnt_stream_free(s);
134 return NULL;
135 }
136 if (tnt_opt_init(&sn->opt) == -1) {
137 s->free(s);
138 if (allocated)
139 tnt_stream_free(s);
140 }
141 return s;
142 }
143
tnt_set(struct tnt_stream * s,int opt,...)144 int tnt_set(struct tnt_stream *s, int opt, ...) {
145 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
146 va_list args;
147 va_start(args, opt);
148 sn->error = tnt_opt_set(&sn->opt, opt, args);
149 va_end(args);
150 return (sn->error == TNT_EOK) ? 0 : -1;
151 }
152
tnt_init(struct tnt_stream * s)153 int tnt_init(struct tnt_stream *s) {
154 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
155 if ((sn->schema = tnt_schema_new(NULL)) == NULL) {
156 sn->error = TNT_EMEMORY;
157 return -1;
158 }
159 if (tnt_iob_init(&sn->sbuf, sn->opt.send_buf, sn->opt.send_cb,
160 sn->opt.send_cbv, sn->opt.send_cb_arg) == -1) {
161 sn->error = TNT_EMEMORY;
162 return -1;
163 }
164 if (tnt_iob_init(&sn->rbuf, sn->opt.recv_buf, sn->opt.recv_cb, NULL,
165 sn->opt.recv_cb_arg) == -1) {
166 sn->error = TNT_EMEMORY;
167 return -1;
168 }
169 sn->inited = 1;
170 return 0;
171 }
172
tnt_reload_schema(struct tnt_stream * s)173 int tnt_reload_schema(struct tnt_stream *s)
174 {
175 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
176 if (!sn->connected || s->wrcnt != 0)
177 return -1;
178 uint32_t oldsync = tnt_stream_reqid(s, 127);
179 tnt_get_space(s);
180 tnt_get_index(s);
181 tnt_stream_reqid(s, oldsync);
182 tnt_flush(s);
183 struct tnt_iter it; tnt_iter_reply(&it, s);
184 struct tnt_reply bkp; tnt_reply_init(&bkp);
185 int sloaded = 0;
186 while (tnt_next(&it)) {
187 struct tnt_reply *r = TNT_IREPLY_PTR(&it);
188 switch (r->sync) {
189 case(127):
190 if (r->error)
191 goto error;
192 tnt_schema_add_spaces(sn->schema, r);
193 sloaded += 1;
194 break;
195 case(128):
196 if (r->error)
197 goto error;
198 if (!(sloaded & 1)) {
199 memcpy(&bkp, r, sizeof(struct tnt_reply));
200 r->buf = NULL;
201 break;
202 }
203 sloaded += 2;
204 tnt_schema_add_indexes(sn->schema, r);
205 break;
206 default:
207 goto error;
208 }
209 }
210 if (bkp.buf) {
211 tnt_schema_add_indexes(sn->schema, &bkp);
212 sloaded += 2;
213 tnt_reply_free(&bkp);
214 }
215 if (sloaded != 3) goto error;
216
217 tnt_iter_free(&it);
218 return 0;
219 error:
220 tnt_iter_free(&it);
221 return -1;
222 }
223
224 static int
tnt_authenticate(struct tnt_stream * s)225 tnt_authenticate(struct tnt_stream *s)
226 {
227 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
228 if (!sn->connected || s->wrcnt != 0)
229 return -1;
230 struct uri *uri = sn->opt.uri;
231 tnt_auth(s, uri->login, uri->login_len, uri->password,
232 uri->password_len);
233 tnt_flush(s);
234 struct tnt_reply rep; tnt_reply_init(&rep);
235 if (s->read_reply(s, &rep) == -1)
236 return -1;
237 if (rep.error != NULL) {
238 if (TNT_REPLY_ERR(&rep) == TNT_ER_PASSWORD_MISMATCH)
239 sn->error = TNT_ELOGIN;
240 return -1;
241 }
242 tnt_reload_schema(s);
243 return 0;
244 }
245
tnt_connect(struct tnt_stream * s)246 int tnt_connect(struct tnt_stream *s)
247 {
248 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
249 if (!sn->inited) tnt_init(s);
250 if (sn->connected)
251 tnt_close(s);
252 sn->error = tnt_io_connect(sn);
253 if (sn->error != TNT_EOK)
254 return -1;
255 if (s->read(s, sn->greeting, TNT_GREETING_SIZE) == -1 ||
256 sn->error != TNT_EOK)
257 return -1;
258 if (sn->opt.uri->login && sn->opt.uri->password)
259 if (tnt_authenticate(s) == -1)
260 return -1;
261 return 0;
262 }
263
tnt_close(struct tnt_stream * s)264 void tnt_close(struct tnt_stream *s) {
265 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
266 tnt_io_close(sn);
267 }
268
tnt_flush(struct tnt_stream * s)269 ssize_t tnt_flush(struct tnt_stream *s) {
270 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
271 return tnt_io_flush(sn);
272 }
273
tnt_fd(struct tnt_stream * s)274 int tnt_fd(struct tnt_stream *s) {
275 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
276 return sn->fd;
277 }
278
tnt_error(struct tnt_stream * s)279 enum tnt_error tnt_error(struct tnt_stream *s) {
280 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
281 return sn->error;
282 }
283
284 /* must be in sync with enum tnt_error */
285
286 struct tnt_error_desc {
287 enum tnt_error type;
288 char *desc;
289 };
290
291 static struct tnt_error_desc tnt_error_list[] =
292 {
293 { TNT_EOK, "ok" },
294 { TNT_EFAIL, "fail" },
295 { TNT_EMEMORY, "memory allocation failed" },
296 { TNT_ESYSTEM, "system error" },
297 { TNT_EBIG, "buffer is too big" },
298 { TNT_ESIZE, "bad buffer size" },
299 { TNT_ERESOLVE, "gethostbyname(2) failed" },
300 { TNT_ETMOUT, "operation timeout" },
301 { TNT_EBADVAL, "bad argument" },
302 { TNT_ELOGIN, "failed to login" },
303 { TNT_LAST, NULL }
304 };
305
tnt_strerror(struct tnt_stream * s)306 char *tnt_strerror(struct tnt_stream *s)
307 {
308 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
309 if (sn->error == TNT_ESYSTEM) {
310 static char msg[256];
311 snprintf(msg, sizeof(msg), "%s (errno: %d)",
312 strerror(sn->errno_), sn->errno_);
313 return msg;
314 }
315 return tnt_error_list[(int)sn->error].desc;
316 }
317
tnt_errno(struct tnt_stream * s)318 int tnt_errno(struct tnt_stream *s) {
319 struct tnt_stream_net *sn = TNT_SNET_CAST(s);
320 return sn->errno_;
321 }
322
tnt_get_spaceno(struct tnt_stream * s,const char * space,size_t space_len)323 int tnt_get_spaceno(struct tnt_stream *s, const char *space,
324 size_t space_len)
325 {
326 struct tnt_schema *sch = (TNT_SNET_CAST(s))->schema;
327 return tnt_schema_stosid(sch, space, space_len);
328 }
329
tnt_get_indexno(struct tnt_stream * s,int spaceno,const char * index,size_t index_len)330 int tnt_get_indexno(struct tnt_stream *s, int spaceno, const char *index,
331 size_t index_len)
332 {
333 struct tnt_schema *sch = TNT_SNET_CAST(s)->schema;
334 return tnt_schema_stoiid(sch, spaceno, index, index_len);
335 }
336