1 
2 /*
3  * Redistribution and use in source and binary forms, with or
4  * without modification, are permitted provided that the following
5  * conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above
8  *    copyright notice, this list of conditions and the
9  *    following disclaimer.
10  *
11  * 2. Redistributions in binary form must reproduce the above
12  *    copyright notice, this list of conditions and the following
13  *    disclaimer in the documentation and/or other materials
14  *    provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
18  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
20  * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
21  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
24  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
25  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
27  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29 */
30 
31 // #include "tarantool.h"
32 
33 #include <sys/time.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <sys/uio.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <netdb.h>
40 #include <unistd.h>
41 #include <fcntl.h>
42 #include <limits.h>
43 
44 #include <string.h>
45 #include <stdlib.h>
46 #include <sys/types.h>
47 #include <errno.h>
48 #include <stdarg.h>
49 
50 #include <memcached/session.h>
51 
52 static int
tb_bufinit(struct tbbuf * b,int size)53 tb_bufinit(struct tbbuf *b, int size)
54 {
55 	b->off = 0;
56 	b->top = 0;
57 	b->size = size;
58 	b->buf = NULL;
59 	if (size == 0)
60 		return 0;
61 	b->buf = malloc(size);
62 	if (b->buf == NULL)
63 		return -1;
64 	return 0;
65 }
66 
67 static void
tb_buffree(struct tbbuf * b)68 tb_buffree(struct tbbuf *b)
69 {
70 	if (b->buf) {
71 		free(b->buf);
72 		b->buf = NULL;
73 	}
74 }
75 
tb_sesinit(struct tbses * s)76 int tb_sesinit(struct tbses *s)
77 {
78 	s->host = strdup("127.0.0.1");
79 	if (s->host == NULL) {
80 		s->errno_ = ENOMEM;
81 		return -1;
82 	}
83 	s->connected = 0;
84 	s->port = 33013;
85 	s->rbuf = 16384;
86 	s->sbuf = 16384;
87 	s->fd = -1;
88 	s->tmc.tv_sec  = 16;
89 	s->tmc.tv_usec = 0;
90 	s->errno_ = 0;
91 	memset(&s->s, 0, sizeof(s->s));
92 	memset(&s->r, 0, sizeof(s->r));
93 	return 0;
94 }
95 
tb_sesfree(struct tbses * s)96 int tb_sesfree(struct tbses *s)
97 {
98 	tb_sesclose(s);
99 	if (s->host) {
100 		free(s->host);
101 		s->host = NULL;
102 	}
103 	tb_buffree(&s->s);
104 	tb_buffree(&s->r);
105 	return 0;
106 }
107 
tb_sesset(struct tbses * s,enum tbsesopt o,...)108 int tb_sesset(struct tbses *s, enum tbsesopt o, ...)
109 {
110 	va_list args;
111 	va_start(args, o);
112 	switch (o) {
113 	case TB_HOST: {
114 		char *p = strdup(va_arg(args, char*));
115 		if (p == NULL) {
116 			va_end(args);
117 			s->errno_ = ENOMEM;
118 			return -1;
119 		}
120 		free(s->host);
121 		s->host = p;
122 		break;
123 	}
124 	case TB_PORT:
125 		s->port = va_arg(args, int);
126 		break;
127 	case TB_CONNECTTM:
128 		s->tmc.tv_sec  = va_arg(args, int);
129 		s->tmc.tv_usec = 0;
130 		break;
131 	case TB_SENDBUF:
132 		s->sbuf = va_arg(args, int);
133 		break;
134 	case TB_READBUF:
135 		s->rbuf = va_arg(args, int);
136 		break;
137 	default:
138 		va_end(args);
139 		s->errno_ = EINVAL;
140 		return -1;
141 	}
142 	va_end(args);
143 	return 0;
144 }
145 
146 static int
tb_sessetbufmax(struct tbses * s,int opt,int min)147 tb_sessetbufmax(struct tbses *s, int opt, int min)
148 {
149 	int max = 128 * 1024 * 1024;
150 	if (min == 0)
151 		min = 16384;
152 	unsigned int avg = 0;
153 	while (min <= max) {
154 		avg = ((unsigned int)(min + max)) / 2;
155 		if (setsockopt(s->fd, SOL_SOCKET, opt, &avg, sizeof(avg)) == 0)
156 			min = avg + 1;
157 		else
158 			max = avg - 1;
159 	}
160 	return 0;
161 }
162 
163 static int
tb_sessetopts(struct tbses * s)164 tb_sessetopts(struct tbses *s)
165 {
166 	int opt = 1;
167 	if (setsockopt(s->fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
168 		s->errno_ = errno;
169 		return -1;
170 	}
171 	tb_sessetbufmax(s, SO_SNDBUF, s->sbuf);
172 	tb_sessetbufmax(s, SO_RCVBUF, s->rbuf);
173 	return 0;
174 }
175 
176 static int
tb_sesresolve(struct tbses * s,struct sockaddr_in * addr)177 tb_sesresolve(struct tbses *s, struct sockaddr_in *addr)
178 {
179 	memset(addr, 0, sizeof(struct sockaddr_in));
180 	addr->sin_family = AF_INET;
181 	addr->sin_port = htons(s->port);
182 	struct addrinfo *addr_info = NULL;
183 	if (getaddrinfo(s->host, NULL, NULL, &addr_info) == 0) {
184 		memcpy(&addr->sin_addr,
185 		       (void*)&((struct sockaddr_in *)addr_info->ai_addr)->sin_addr,
186 		       sizeof(addr->sin_addr));
187 		freeaddrinfo(addr_info);
188 		return 0;
189 	}
190 	s->errno_ = errno;
191 	if (addr_info)
192 		freeaddrinfo(addr_info);
193 	return -1;
194 }
195 
196 static int
tb_sesnonblock(struct tbses * s,int set)197 tb_sesnonblock(struct tbses *s, int set)
198 {
199 	int flags = fcntl(s->fd, F_GETFL);
200 	if (flags == -1) {
201 		s->errno_ = errno;
202 		return -1;
203 	}
204 	if (set)
205 		flags |= O_NONBLOCK;
206 	else
207 		flags &= ~O_NONBLOCK;
208 	int rc = fcntl(s->fd, F_SETFL, flags);
209 	if (rc == -1)
210 		s->errno_ = errno;
211 	return rc;
212 }
213 
214 static int
tb_sesconnectdo(struct tbses * s)215 tb_sesconnectdo(struct tbses *s)
216 {
217 	/* resolve address */
218 	struct sockaddr_in addr;
219 	int rc = tb_sesresolve(s, &addr);
220 	if (rc == -1)
221 		return -1;
222 	/* set nonblock */
223 	rc = tb_sesnonblock(s, 1);
224 	if (rc == -1)
225 		return -1;
226 
227 	if (connect(s->fd, (struct sockaddr*)&addr, sizeof(addr)) == -1)
228 	{
229 		if (errno == EINPROGRESS) {
230 			/* wait for connection while handling signal events */
231 			const int64_t micro = 1000000;
232 			int64_t tmout_usec = s->tmc.tv_sec * micro;
233 			/* get start connect time */
234 			struct timeval start_connect;
235 			if (gettimeofday(&start_connect, NULL) == -1) {
236 				s->errno_ = errno;
237 				return -1;
238 			}
239 			/* set initial timer */
240 			struct timeval tmout;
241 			memcpy(&tmout, &s->tmc, sizeof(tmout));
242 			while (1) {
243 				fd_set fds;
244 				FD_ZERO(&fds);
245 				FD_SET(s->fd, &fds);
246 				int ret = select(s->fd + 1, NULL, &fds, NULL, &tmout);
247 				if (ret == -1) {
248 					if (errno == EINTR || errno == EAGAIN) {
249 						/* get current time */
250 						struct timeval curr;
251 						if (gettimeofday(&curr, NULL) == -1) {
252 							s->errno_ = errno;
253 							return -1;
254 						}
255 						/* calculate timeout last time */
256 						int64_t passd_usec = (curr.tv_sec - start_connect.tv_sec) * micro +
257 							(curr.tv_usec - start_connect.tv_usec);
258 						int64_t curr_tmeout = passd_usec - tmout_usec;
259 						if (curr_tmeout <= 0) {
260 							s->errno_ = ETIMEDOUT;
261 							return -1;
262 						}
263 						tmout.tv_sec = curr_tmeout / micro;
264 						tmout.tv_usec = curr_tmeout % micro;
265 					} else {
266 						s->errno_ = errno;
267 						return -1;
268 					}
269 				} else if (ret == 0) {
270 					s->errno_ = ETIMEDOUT;
271 					return -1;
272 				} else {
273 					/* we have a event on socket */
274 					break;
275 				}
276 			}
277 			/* checking error status */
278 			int opt = 0;
279 			socklen_t len = sizeof(opt);
280 			if ((getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &opt, &len) == -1) || opt) {
281 				s->errno_ = (opt) ? opt: errno;
282 				return -1;
283 			}
284 		} else {
285 			s->errno_ = errno;
286 			return -1;
287 		}
288 	}
289 
290 	/* set block */
291 	return tb_sesnonblock(s, 0);
292 }
293 
tb_sesconnect(struct tbses * s)294 int tb_sesconnect(struct tbses *s)
295 {
296 	int rc;
297 	if (s->s.buf == NULL) {
298 		rc = tb_bufinit(&s->s, s->sbuf);
299 		if (rc == -1) {
300 			s->errno_ = ENOMEM;
301 			return -1;
302 		}
303 		rc = tb_bufinit(&s->r, s->rbuf);
304 		if (rc == -1) {
305 			tb_buffree(&s->s);
306 			s->errno_ = ENOMEM;
307 			return -1;
308 		}
309 	}
310 	s->fd = socket(AF_INET, SOCK_STREAM, 0);
311 	if (s->fd < 0) {
312 		s->errno_ = errno;
313 		return -1;
314 	}
315 	rc = tb_sessetopts(s);
316 	if (rc == -1)
317 		return -1;
318 	rc = tb_sesconnectdo(s);
319 	if (rc == -1)
320 		return -1;
321 	s->connected = 1;
322 	return 0;
323 }
324 
tb_sesclose(struct tbses * s)325 int tb_sesclose(struct tbses *s)
326 {
327 	int rc = 0;
328 	if (s->fd != -1) {
329 		rc = close(s->fd);
330 		if (rc == -1)
331 			s->errno_ = errno;
332 		s->fd = -1;
333 	}
334 	s->connected = 0;
335 	return rc;
336 }
337 
338 static ssize_t
tb_sessendraw(struct tbses * s,char * buf,size_t size)339 tb_sessendraw(struct tbses *s, char *buf, size_t size)
340 {
341 	size_t off = 0;
342 	do {
343 		ssize_t r;
344 		do {
345 			r = send(s->fd, buf + off, size - off, 0);
346 		} while (r == -1 && (errno == EINTR));
347 		if (r <= 0) {
348 			s->errno_ = errno;
349 			return -1;
350 		}
351 		off += r;
352 	} while (off != size);
353 
354 	return off;
355 }
356 
357 static ssize_t
tb_sessenddo(struct tbses * s,char * buf,size_t size)358 tb_sessenddo(struct tbses *s, char *buf, size_t size)
359 {
360 	if (s->s.buf == NULL)
361 		return tb_sessendraw(s, buf, size);
362 
363 	if (size > s->s.size) {
364 		s->errno_ = E2BIG;
365 		return -1;
366 	}
367 	if ((s->s.off + size) <= s->s.size) {
368 		memcpy(s->s.buf + s->s.off, buf, size);
369 		s->s.off += size;
370 		return size;
371 	}
372 	ssize_t r = tb_sessendraw(s, s->s.buf, s->s.off);
373 	if (r == -1)
374 		return -1;
375 
376 	s->s.off = size;
377 	memcpy(s->s.buf, buf, size);
378 	return size;
379 }
380 
381 static ssize_t
tb_sesrecvraw(struct tbses * s,char * buf,size_t size,int strict)382 tb_sesrecvraw(struct tbses *s, char *buf, size_t size, int strict)
383 {
384 	size_t off = 0;
385 	do {
386 		ssize_t r;
387 		do {
388 			r = recv(s->fd, buf + off, size - off, 0);
389 		} while (r == -1 && (errno == EINTR));
390 		if (r <= 0) {
391 			s->errno_ = errno;
392 			return -1;
393 		}
394 		off += r;
395 	} while (off != size && strict);
396 
397 	return off;
398 }
399 
400 static ssize_t
tb_sesrecvdo(struct tbses * s,char * buf,size_t size)401 tb_sesrecvdo(struct tbses *s, char *buf, size_t size)
402 {
403 	if (s->r.buf == NULL)
404 		return tb_sesrecvraw(s, buf, size, 1);
405 
406 	size_t lv, rv, off = 0, left = size;
407 	while (1) {
408 		if ((s->r.off + left) <= s->r.top) {
409 			memcpy(buf + off, s->r.buf + s->r.off, left);
410 			s->r.off += left;
411 			return size;
412 		}
413 
414 		lv = s->r.top - s->r.off;
415 		rv = left - lv;
416 		if (lv) {
417 			memcpy(buf + off, s->r.buf + s->r.off, lv);
418 			off += lv;
419 		}
420 
421 		s->r.off = 0;
422 		ssize_t top = tb_sesrecvraw(s, s->r.buf, s->r.size, 0);
423 		if (top <= 0) {
424 			s->errno_ = errno;
425 			return -1;
426 		}
427 
428 		s->r.top = top;
429 		if (rv <= s->r.top) {
430 			memcpy(buf + off, s->r.buf, rv);
431 			s->r.off = rv;
432 			return size;
433 		}
434 		left -= lv;
435 	}
436 	return -1;
437 }
438 
tb_sessync(struct tbses * s)439 int tb_sessync(struct tbses *s)
440 {
441 	if (s->s.off == 0)
442 		return 0;
443 	ssize_t rc = tb_sessendraw(s, s->s.buf, s->s.off);
444 	if (rc == -1)
445 		return -1;
446 	s->s.off = 0;
447 	return rc;
448 }
449 
450 ssize_t
tb_sessend(struct tbses * s,char * buf,size_t size)451 tb_sessend(struct tbses *s, char *buf, size_t size)
452 {
453 	return tb_sessenddo(s, buf, size);
454 }
455 
456 ssize_t
tb_sesrecv(struct tbses * s,char * buf,size_t size,int strict)457 tb_sesrecv(struct tbses *s, char *buf, size_t size, int strict)
458 {
459 	if (! strict) {
460 		if (s->r.buf) {
461 			size_t v = s->r.top - s->r.off;
462 			if (v > 0) {
463 				if (size < v)
464 					v = size;
465 				memcpy(buf, s->r.buf + s->r.off, v);
466 				s->r.off += v;
467 				return v;
468 			}
469 		}
470 		/* todo: make unstricted read with readahead
471 		 * buffer */
472 		return tb_sesrecvraw(s, buf, size, 0);
473 	}
474 	return tb_sesrecvdo(s, buf, size);
475 }
476