1
2 /*
3 * Redistribution and use in source and binary forms, with or
4 * without modification, are permitted provided that the following
5 * conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above
8 * copyright notice, this list of conditions and the
9 * following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31 // #include "tarantool.h"
32
33 #include <sys/time.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <sys/uio.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <netdb.h>
40 #include <unistd.h>
41 #include <fcntl.h>
42 #include <limits.h>
43
44 #include <string.h>
45 #include <stdlib.h>
46 #include <sys/types.h>
47 #include <errno.h>
48 #include <stdarg.h>
49
50 #include <memcached/session.h>
51
52 static int
tb_bufinit(struct tbbuf * b,int size)53 tb_bufinit(struct tbbuf *b, int size)
54 {
55 b->off = 0;
56 b->top = 0;
57 b->size = size;
58 b->buf = NULL;
59 if (size == 0)
60 return 0;
61 b->buf = malloc(size);
62 if (b->buf == NULL)
63 return -1;
64 return 0;
65 }
66
67 static void
tb_buffree(struct tbbuf * b)68 tb_buffree(struct tbbuf *b)
69 {
70 if (b->buf) {
71 free(b->buf);
72 b->buf = NULL;
73 }
74 }
75
tb_sesinit(struct tbses * s)76 int tb_sesinit(struct tbses *s)
77 {
78 s->host = strdup("127.0.0.1");
79 if (s->host == NULL) {
80 s->errno_ = ENOMEM;
81 return -1;
82 }
83 s->connected = 0;
84 s->port = 33013;
85 s->rbuf = 16384;
86 s->sbuf = 16384;
87 s->fd = -1;
88 s->tmc.tv_sec = 16;
89 s->tmc.tv_usec = 0;
90 s->errno_ = 0;
91 memset(&s->s, 0, sizeof(s->s));
92 memset(&s->r, 0, sizeof(s->r));
93 return 0;
94 }
95
tb_sesfree(struct tbses * s)96 int tb_sesfree(struct tbses *s)
97 {
98 tb_sesclose(s);
99 if (s->host) {
100 free(s->host);
101 s->host = NULL;
102 }
103 tb_buffree(&s->s);
104 tb_buffree(&s->r);
105 return 0;
106 }
107
tb_sesset(struct tbses * s,enum tbsesopt o,...)108 int tb_sesset(struct tbses *s, enum tbsesopt o, ...)
109 {
110 va_list args;
111 va_start(args, o);
112 switch (o) {
113 case TB_HOST: {
114 char *p = strdup(va_arg(args, char*));
115 if (p == NULL) {
116 va_end(args);
117 s->errno_ = ENOMEM;
118 return -1;
119 }
120 free(s->host);
121 s->host = p;
122 break;
123 }
124 case TB_PORT:
125 s->port = va_arg(args, int);
126 break;
127 case TB_CONNECTTM:
128 s->tmc.tv_sec = va_arg(args, int);
129 s->tmc.tv_usec = 0;
130 break;
131 case TB_SENDBUF:
132 s->sbuf = va_arg(args, int);
133 break;
134 case TB_READBUF:
135 s->rbuf = va_arg(args, int);
136 break;
137 default:
138 va_end(args);
139 s->errno_ = EINVAL;
140 return -1;
141 }
142 va_end(args);
143 return 0;
144 }
145
146 static int
tb_sessetbufmax(struct tbses * s,int opt,int min)147 tb_sessetbufmax(struct tbses *s, int opt, int min)
148 {
149 int max = 128 * 1024 * 1024;
150 if (min == 0)
151 min = 16384;
152 unsigned int avg = 0;
153 while (min <= max) {
154 avg = ((unsigned int)(min + max)) / 2;
155 if (setsockopt(s->fd, SOL_SOCKET, opt, &avg, sizeof(avg)) == 0)
156 min = avg + 1;
157 else
158 max = avg - 1;
159 }
160 return 0;
161 }
162
163 static int
tb_sessetopts(struct tbses * s)164 tb_sessetopts(struct tbses *s)
165 {
166 int opt = 1;
167 if (setsockopt(s->fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
168 s->errno_ = errno;
169 return -1;
170 }
171 tb_sessetbufmax(s, SO_SNDBUF, s->sbuf);
172 tb_sessetbufmax(s, SO_RCVBUF, s->rbuf);
173 return 0;
174 }
175
176 static int
tb_sesresolve(struct tbses * s,struct sockaddr_in * addr)177 tb_sesresolve(struct tbses *s, struct sockaddr_in *addr)
178 {
179 memset(addr, 0, sizeof(struct sockaddr_in));
180 addr->sin_family = AF_INET;
181 addr->sin_port = htons(s->port);
182 struct addrinfo *addr_info = NULL;
183 if (getaddrinfo(s->host, NULL, NULL, &addr_info) == 0) {
184 memcpy(&addr->sin_addr,
185 (void*)&((struct sockaddr_in *)addr_info->ai_addr)->sin_addr,
186 sizeof(addr->sin_addr));
187 freeaddrinfo(addr_info);
188 return 0;
189 }
190 s->errno_ = errno;
191 if (addr_info)
192 freeaddrinfo(addr_info);
193 return -1;
194 }
195
196 static int
tb_sesnonblock(struct tbses * s,int set)197 tb_sesnonblock(struct tbses *s, int set)
198 {
199 int flags = fcntl(s->fd, F_GETFL);
200 if (flags == -1) {
201 s->errno_ = errno;
202 return -1;
203 }
204 if (set)
205 flags |= O_NONBLOCK;
206 else
207 flags &= ~O_NONBLOCK;
208 int rc = fcntl(s->fd, F_SETFL, flags);
209 if (rc == -1)
210 s->errno_ = errno;
211 return rc;
212 }
213
214 static int
tb_sesconnectdo(struct tbses * s)215 tb_sesconnectdo(struct tbses *s)
216 {
217 /* resolve address */
218 struct sockaddr_in addr;
219 int rc = tb_sesresolve(s, &addr);
220 if (rc == -1)
221 return -1;
222 /* set nonblock */
223 rc = tb_sesnonblock(s, 1);
224 if (rc == -1)
225 return -1;
226
227 if (connect(s->fd, (struct sockaddr*)&addr, sizeof(addr)) == -1)
228 {
229 if (errno == EINPROGRESS) {
230 /* wait for connection while handling signal events */
231 const int64_t micro = 1000000;
232 int64_t tmout_usec = s->tmc.tv_sec * micro;
233 /* get start connect time */
234 struct timeval start_connect;
235 if (gettimeofday(&start_connect, NULL) == -1) {
236 s->errno_ = errno;
237 return -1;
238 }
239 /* set initial timer */
240 struct timeval tmout;
241 memcpy(&tmout, &s->tmc, sizeof(tmout));
242 while (1) {
243 fd_set fds;
244 FD_ZERO(&fds);
245 FD_SET(s->fd, &fds);
246 int ret = select(s->fd + 1, NULL, &fds, NULL, &tmout);
247 if (ret == -1) {
248 if (errno == EINTR || errno == EAGAIN) {
249 /* get current time */
250 struct timeval curr;
251 if (gettimeofday(&curr, NULL) == -1) {
252 s->errno_ = errno;
253 return -1;
254 }
255 /* calculate timeout last time */
256 int64_t passd_usec = (curr.tv_sec - start_connect.tv_sec) * micro +
257 (curr.tv_usec - start_connect.tv_usec);
258 int64_t curr_tmeout = passd_usec - tmout_usec;
259 if (curr_tmeout <= 0) {
260 s->errno_ = ETIMEDOUT;
261 return -1;
262 }
263 tmout.tv_sec = curr_tmeout / micro;
264 tmout.tv_usec = curr_tmeout % micro;
265 } else {
266 s->errno_ = errno;
267 return -1;
268 }
269 } else if (ret == 0) {
270 s->errno_ = ETIMEDOUT;
271 return -1;
272 } else {
273 /* we have a event on socket */
274 break;
275 }
276 }
277 /* checking error status */
278 int opt = 0;
279 socklen_t len = sizeof(opt);
280 if ((getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &opt, &len) == -1) || opt) {
281 s->errno_ = (opt) ? opt: errno;
282 return -1;
283 }
284 } else {
285 s->errno_ = errno;
286 return -1;
287 }
288 }
289
290 /* set block */
291 return tb_sesnonblock(s, 0);
292 }
293
tb_sesconnect(struct tbses * s)294 int tb_sesconnect(struct tbses *s)
295 {
296 int rc;
297 if (s->s.buf == NULL) {
298 rc = tb_bufinit(&s->s, s->sbuf);
299 if (rc == -1) {
300 s->errno_ = ENOMEM;
301 return -1;
302 }
303 rc = tb_bufinit(&s->r, s->rbuf);
304 if (rc == -1) {
305 tb_buffree(&s->s);
306 s->errno_ = ENOMEM;
307 return -1;
308 }
309 }
310 s->fd = socket(AF_INET, SOCK_STREAM, 0);
311 if (s->fd < 0) {
312 s->errno_ = errno;
313 return -1;
314 }
315 rc = tb_sessetopts(s);
316 if (rc == -1)
317 return -1;
318 rc = tb_sesconnectdo(s);
319 if (rc == -1)
320 return -1;
321 s->connected = 1;
322 return 0;
323 }
324
tb_sesclose(struct tbses * s)325 int tb_sesclose(struct tbses *s)
326 {
327 int rc = 0;
328 if (s->fd != -1) {
329 rc = close(s->fd);
330 if (rc == -1)
331 s->errno_ = errno;
332 s->fd = -1;
333 }
334 s->connected = 0;
335 return rc;
336 }
337
338 static ssize_t
tb_sessendraw(struct tbses * s,char * buf,size_t size)339 tb_sessendraw(struct tbses *s, char *buf, size_t size)
340 {
341 size_t off = 0;
342 do {
343 ssize_t r;
344 do {
345 r = send(s->fd, buf + off, size - off, 0);
346 } while (r == -1 && (errno == EINTR));
347 if (r <= 0) {
348 s->errno_ = errno;
349 return -1;
350 }
351 off += r;
352 } while (off != size);
353
354 return off;
355 }
356
357 static ssize_t
tb_sessenddo(struct tbses * s,char * buf,size_t size)358 tb_sessenddo(struct tbses *s, char *buf, size_t size)
359 {
360 if (s->s.buf == NULL)
361 return tb_sessendraw(s, buf, size);
362
363 if (size > s->s.size) {
364 s->errno_ = E2BIG;
365 return -1;
366 }
367 if ((s->s.off + size) <= s->s.size) {
368 memcpy(s->s.buf + s->s.off, buf, size);
369 s->s.off += size;
370 return size;
371 }
372 ssize_t r = tb_sessendraw(s, s->s.buf, s->s.off);
373 if (r == -1)
374 return -1;
375
376 s->s.off = size;
377 memcpy(s->s.buf, buf, size);
378 return size;
379 }
380
381 static ssize_t
tb_sesrecvraw(struct tbses * s,char * buf,size_t size,int strict)382 tb_sesrecvraw(struct tbses *s, char *buf, size_t size, int strict)
383 {
384 size_t off = 0;
385 do {
386 ssize_t r;
387 do {
388 r = recv(s->fd, buf + off, size - off, 0);
389 } while (r == -1 && (errno == EINTR));
390 if (r <= 0) {
391 s->errno_ = errno;
392 return -1;
393 }
394 off += r;
395 } while (off != size && strict);
396
397 return off;
398 }
399
400 static ssize_t
tb_sesrecvdo(struct tbses * s,char * buf,size_t size)401 tb_sesrecvdo(struct tbses *s, char *buf, size_t size)
402 {
403 if (s->r.buf == NULL)
404 return tb_sesrecvraw(s, buf, size, 1);
405
406 size_t lv, rv, off = 0, left = size;
407 while (1) {
408 if ((s->r.off + left) <= s->r.top) {
409 memcpy(buf + off, s->r.buf + s->r.off, left);
410 s->r.off += left;
411 return size;
412 }
413
414 lv = s->r.top - s->r.off;
415 rv = left - lv;
416 if (lv) {
417 memcpy(buf + off, s->r.buf + s->r.off, lv);
418 off += lv;
419 }
420
421 s->r.off = 0;
422 ssize_t top = tb_sesrecvraw(s, s->r.buf, s->r.size, 0);
423 if (top <= 0) {
424 s->errno_ = errno;
425 return -1;
426 }
427
428 s->r.top = top;
429 if (rv <= s->r.top) {
430 memcpy(buf + off, s->r.buf, rv);
431 s->r.off = rv;
432 return size;
433 }
434 left -= lv;
435 }
436 return -1;
437 }
438
tb_sessync(struct tbses * s)439 int tb_sessync(struct tbses *s)
440 {
441 if (s->s.off == 0)
442 return 0;
443 ssize_t rc = tb_sessendraw(s, s->s.buf, s->s.off);
444 if (rc == -1)
445 return -1;
446 s->s.off = 0;
447 return rc;
448 }
449
450 ssize_t
tb_sessend(struct tbses * s,char * buf,size_t size)451 tb_sessend(struct tbses *s, char *buf, size_t size)
452 {
453 return tb_sessenddo(s, buf, size);
454 }
455
456 ssize_t
tb_sesrecv(struct tbses * s,char * buf,size_t size,int strict)457 tb_sesrecv(struct tbses *s, char *buf, size_t size, int strict)
458 {
459 if (! strict) {
460 if (s->r.buf) {
461 size_t v = s->r.top - s->r.off;
462 if (v > 0) {
463 if (size < v)
464 v = size;
465 memcpy(buf, s->r.buf + s->r.off, v);
466 s->r.off += v;
467 return v;
468 }
469 }
470 /* todo: make unstricted read with readahead
471 * buffer */
472 return tb_sesrecvraw(s, buf, size, 0);
473 }
474 return tb_sesrecvdo(s, buf, size);
475 }
476