1 /*
2 Copyright (C) 2007-2010 Tomash Brechko. All rights reserved.
3
4 When used to build Perl module:
5
6 This library is free software; you can redistribute it and/or modify
7 it under the same terms as Perl itself, either Perl version 5.8.8
8 or, at your option, any later version of Perl 5 you may have
9 available.
10
11 When used as a standalone library:
12
13 This library is free software; you can redistribute it and/or modify
14 it under the terms of the GNU Lesser General Public License as
15 published by the Free Software Foundation; either version 2.1 of the
16 License, or (at your option) any later version.
17
18 This library is distributed in the hope that it will be useful, but
19 WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 Lesser General Public License for more details.
22 */
23
24 #include "client.h"
25 #include "array.h"
26 #include "connect.h"
27 #include "parse_keyword.h"
28 #include "dispatch_key.h"
29 #include <stdlib.h>
30 #include <string.h>
31 #include <stdio.h>
32 #ifndef WIN32
33 #include "socket_posix.h"
34 #include <sys/uio.h>
35 #include <signal.h>
36 #include <time.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #else /* WIN32 */
40 #include "socket_win32.h"
41 #endif /* WIN32 */
42
43
44 /* REPLY_BUF_SIZE should be large enough to contain first reply line. */
45 #define REPLY_BUF_SIZE 1536
46
47
48 #define FLAGS_STUB "4294967295"
49 #define EXPTIME_STUB "2147483647"
50 #define DELAY_STUB "4294967295"
51 #define VALUE_SIZE_STUB "18446744073709551615"
52 #define CAS_STUB "18446744073709551615"
53 #define ARITH_STUB "18446744073709551615"
54 #define NOREPLY "noreply"
55
56
57 static const char eol[2] = "\r\n";
58
59
60 typedef unsigned long long generation_type;
61
62
63 struct value_state
64 {
65 void *opaque;
66 void *ptr;
67 value_size_type size;
68 struct meta_object meta;
69 };
70
71
72 struct embedded_state
73 {
74 void *opaque;
75 void *ptr;
76 };
77
78
79 struct command_state;
80 typedef int (*parse_reply_func)(struct command_state *state);
81
82
83 enum command_phase
84 {
85 PHASE_RECEIVE,
86 PHASE_PARSE,
87 PHASE_VALUE,
88 PHASE_DONE
89 };
90
91
92 enum socket_mode_e { NOT_TCP = -1, TCP_LATENCY, TCP_THROUGHPUT };
93
94
95 struct client;
96
97
98 struct command_state
99 {
100 struct client *client;
101 int fd;
102 struct pollfd *pollfd;
103 enum socket_mode_e socket_mode;
104 int noreply;
105 int prepared_last_cmd_noreply;
106 int last_cmd_noreply;
107
108 struct array iov_buf;
109 int str_step;
110
111 generation_type generation;
112
113 int phase;
114 int prepared_nowait_count;
115 int nowait_count;
116 int reply_count;
117
118 char *buf;
119 char *pos;
120 char *end;
121 char *eol;
122 int match;
123
124 struct iovec *iov;
125 int iov_count;
126 int write_offset;
127 struct iovec *key;
128 int key_count;
129 int index;
130 int index_head;
131 int index_tail;
132
133 parse_reply_func parse_reply;
134 struct result_object *object;
135
136 union
137 {
138 struct value_state value;
139 struct embedded_state embedded;
140 } u;
141 };
142
143
144 static inline
145 int
command_state_init(struct command_state * state,struct client * c,int noreply)146 command_state_init(struct command_state *state,
147 struct client *c, int noreply)
148 {
149 state->client = c;
150 state->fd = -1;
151 state->noreply = noreply;
152 state->last_cmd_noreply = 0;
153
154 array_init(&state->iov_buf);
155
156 state->generation = 0;
157 state->nowait_count = 0;
158 state->buf = (char *) malloc(REPLY_BUF_SIZE);
159 if (! state->buf)
160 return -1;
161
162 state->pos = state->end = state->eol = state->buf;
163
164 return 0;
165 }
166
167
168 static inline
169 void
command_state_destroy(struct command_state * state)170 command_state_destroy(struct command_state *state)
171 {
172 free(state->buf);
173
174 array_destroy(&state->iov_buf);
175
176 if (state->fd != -1)
177 close(state->fd);
178 }
179
180
181 static inline
182 void
command_state_reinit(struct command_state * state)183 command_state_reinit(struct command_state *state)
184 {
185 if (state->fd != -1)
186 close(state->fd);
187
188 state->fd = -1;
189 state->last_cmd_noreply = 0;
190
191 array_clear(state->iov_buf);
192
193 state->generation = 0;
194 state->nowait_count = 0;
195
196 state->pos = state->end = state->eol = state->buf;
197 }
198
199
200 struct server
201 {
202 char *host;
203 size_t host_len;
204 char *port;
205 int failure_count;
206 time_t failure_expires;
207 struct command_state cmd_state;
208 };
209
210
211 static inline
212 int
server_init(struct server * s,struct client * c,const char * host,size_t host_len,const char * port,size_t port_len,int noreply)213 server_init(struct server *s, struct client *c,
214 const char *host, size_t host_len,
215 const char *port, size_t port_len, int noreply)
216 {
217 if (port)
218 s->host = (char *) malloc(host_len + 1 + port_len + 1);
219 else
220 s->host = (char *) malloc(host_len + 1);
221
222 if (! s->host)
223 return MEMCACHED_FAILURE;
224
225 memcpy(s->host, host, host_len);
226 s->host[host_len] = '\0';
227 s->host_len = host_len;
228
229 if (port)
230 {
231 s->port = s->host + host_len + 1;
232 memcpy(s->port, port, port_len);
233 s->port[port_len] = '\0';
234 }
235 else
236 {
237 s->port = NULL;
238 }
239
240 s->failure_count = 0;
241 s->failure_expires = 0;
242
243 if (command_state_init(&s->cmd_state, c, noreply) != 0)
244 return MEMCACHED_FAILURE;
245
246 return MEMCACHED_SUCCESS;
247 }
248
249
250 static inline
251 void
server_destroy(struct server * s)252 server_destroy(struct server *s)
253 {
254 free(s->host); /* This also frees port string. */
255 command_state_destroy(&s->cmd_state);
256 }
257
258
259 static inline
260 void
server_reinit(struct server * s)261 server_reinit(struct server *s)
262 {
263 s->failure_count = 0;
264 s->failure_expires = 0;
265
266 command_state_reinit(&s->cmd_state);
267 }
268
269
270 struct index_node
271 {
272 int index;
273 int next;
274 };
275
276
277 struct client
278 {
279 struct array pollfds;
280 struct array servers;
281
282 struct dispatch_state dispatch;
283
284 char *prefix;
285 size_t prefix_len;
286
287 int connect_timeout; /* 1/1000 sec. */
288 int io_timeout; /* 1/1000 sec. */
289 int max_failures;
290 int failure_timeout; /* 1 sec. */
291 int close_on_error;
292 int nowait;
293 int hash_namespace;
294
295 struct array index_list;
296 struct array str_buf;
297 int iov_max;
298
299 generation_type generation;
300
301 struct result_object *object;
302 int noreply;
303 };
304
305
306 static inline
307 void
command_state_reset(struct command_state * state,int str_step,parse_reply_func parse_reply)308 command_state_reset(struct command_state *state, int str_step,
309 parse_reply_func parse_reply)
310 {
311 state->prepared_nowait_count = 0;
312 state->reply_count = 0;
313 state->str_step = str_step;
314 state->key_count = 0;
315 state->parse_reply = parse_reply;
316
317 state->phase = PHASE_RECEIVE;
318
319 array_clear(state->iov_buf);
320
321 state->write_offset = 0;
322 state->index_head = state->index_tail = -1;
323 state->generation = state->client->generation;
324
325 #if 0 /* No need to initialize the following. */
326 state->key = NULL;
327 state->index = 0;
328 state->match = NO_MATCH;
329 state->iov_count = 0;
330 state->iov = NULL;
331 #endif
332 }
333
334
335 static inline
336 int
is_active(struct command_state * state)337 is_active(struct command_state *state)
338 {
339 return (state->generation == state->client->generation);
340 }
341
342
343 static inline
344 void
deactivate(struct command_state * state)345 deactivate(struct command_state *state)
346 {
347 state->generation = state->client->generation - 1;
348 }
349
350
351 static inline
352 int
get_index(struct command_state * state)353 get_index(struct command_state *state)
354 {
355 struct index_node *node = array_elem(state->client->index_list,
356 struct index_node, state->index_head);
357 return node->index;
358 }
359
360
361 static inline
362 void
next_index(struct command_state * state)363 next_index(struct command_state *state)
364 {
365 struct index_node *node = array_elem(state->client->index_list,
366 struct index_node, state->index_head);
367 state->index_head = node->next;
368 }
369
370
371 struct client *
client_init()372 client_init()
373 {
374 struct client *c;
375
376 #ifdef WIN32
377 if (win32_socket_library_acquire() != 0)
378 return NULL;
379 #endif /* WIN32 */
380
381 c = malloc(sizeof(struct client));
382 if (! c)
383 return NULL;
384
385 array_init(&c->pollfds);
386 array_init(&c->servers);
387 array_init(&c->index_list);
388 array_init(&c->str_buf);
389
390 dispatch_init(&c->dispatch);
391
392 c->connect_timeout = 250;
393 c->io_timeout = 1000;
394 c->prefix = " ";
395 c->prefix_len = 1;
396 c->max_failures = 0;
397 c->failure_timeout = 10;
398 c->close_on_error = 1;
399 c->nowait = 0;
400 c->hash_namespace = 0;
401
402 c->iov_max = get_iov_max();
403
404 c->generation = 1; /* Different from initial command state. */
405
406 c->object = NULL;
407 c->noreply = 0;
408
409 return c;
410 }
411
412
413 static
414 int
415 client_noreply_push(struct client *c);
416
417
418 void
client_destroy(struct client * c)419 client_destroy(struct client *c)
420 {
421 struct server *s;
422
423 client_nowait_push(c);
424 client_noreply_push(c);
425
426 for (array_each(c->servers, struct server, s))
427 server_destroy(s);
428
429 dispatch_destroy(&c->dispatch);
430
431 array_destroy(&c->servers);
432 array_destroy(&c->pollfds);
433 array_destroy(&c->index_list);
434 array_destroy(&c->str_buf);
435
436 if (c->prefix_len > 1)
437 free(c->prefix);
438 free(c);
439
440 #ifdef WIN32
441 win32_socket_library_release();
442 #endif /* WIN32 */
443 }
444
445
446 void
client_reinit(struct client * c)447 client_reinit(struct client *c)
448 {
449 struct server *s;
450
451 for (array_each(c->servers, struct server, s))
452 server_reinit(s);
453
454 array_clear(c->str_buf);
455 array_clear(c->index_list);
456
457 c->generation = 1; /* Different from initial command state. */
458 c->object = NULL;
459 }
460
461
462 int
client_set_ketama_points(struct client * c,int ketama_points)463 client_set_ketama_points(struct client *c, int ketama_points)
464 {
465 /* Should be called before we added any server. */
466 if (! array_empty(c->servers) || ketama_points < 0)
467 return MEMCACHED_FAILURE;
468
469 dispatch_set_ketama_points(&c->dispatch, ketama_points);
470
471 return MEMCACHED_SUCCESS;
472 }
473
474
475 void
client_set_connect_timeout(struct client * c,int to)476 client_set_connect_timeout(struct client *c, int to)
477 {
478 c->connect_timeout = (to > 0 ? to : -1);
479 }
480
481
482 void
client_set_io_timeout(struct client * c,int to)483 client_set_io_timeout(struct client *c, int to)
484 {
485 c->io_timeout = (to > 0 ? to : -1);
486 }
487
488
489 void
client_set_max_failures(struct client * c,int f)490 client_set_max_failures(struct client *c, int f)
491 {
492 c->max_failures = f;
493 }
494
495
496 void
client_set_failure_timeout(struct client * c,int to)497 client_set_failure_timeout(struct client *c, int to)
498 {
499 c->failure_timeout = to;
500 }
501
502
503 void
client_set_close_on_error(struct client * c,int enable)504 client_set_close_on_error(struct client *c, int enable)
505 {
506 c->close_on_error = enable;
507 }
508
509
510 void
client_set_nowait(struct client * c,int enable)511 client_set_nowait(struct client *c, int enable)
512 {
513 c->nowait = enable;
514 }
515
516
517 void
client_set_hash_namespace(struct client * c,int enable)518 client_set_hash_namespace(struct client *c, int enable)
519 {
520 c->hash_namespace = enable;
521 }
522
523
524 int
client_add_server(struct client * c,const char * host,size_t host_len,const char * port,size_t port_len,double weight,int noreply)525 client_add_server(struct client *c, const char *host, size_t host_len,
526 const char *port, size_t port_len, double weight,
527 int noreply)
528 {
529 int res;
530
531 if (weight <= 0.0)
532 return MEMCACHED_FAILURE;
533
534 if (array_extend(c->pollfds, struct pollfd, 1, ARRAY_EXTEND_EXACT) == -1)
535 return MEMCACHED_FAILURE;
536
537 if (array_extend(c->servers, struct server, 1, ARRAY_EXTEND_EXACT) == -1)
538 return MEMCACHED_FAILURE;
539
540 res = server_init(array_end(c->servers, struct server), c,
541 host, host_len, port, port_len, noreply);
542 if (res != MEMCACHED_SUCCESS)
543 return res;
544
545 res = dispatch_add_server(&c->dispatch, host, host_len, port, port_len,
546 weight, array_size(c->servers));
547 if (res == -1)
548 return MEMCACHED_FAILURE;
549
550 array_push(c->pollfds);
551 array_push(c->servers);
552
553 return MEMCACHED_SUCCESS;
554 }
555
556
557 int
client_set_prefix(struct client * c,const char * ns,size_t ns_len)558 client_set_prefix(struct client *c, const char *ns, size_t ns_len)
559 {
560 char *s;
561
562 if (ns_len == 0)
563 {
564 if (c->prefix_len > 1)
565 {
566 free(c->prefix);
567 c->prefix = " ";
568 c->prefix_len = 1;
569 }
570
571 if (c->hash_namespace)
572 dispatch_set_prefix(&c->dispatch, "", 0);
573
574 return MEMCACHED_SUCCESS;
575 }
576
577 if (c->prefix_len == 1)
578 c->prefix = NULL;
579 s = (char *) realloc(c->prefix, 1 + ns_len + 1);
580 if (! s)
581 return MEMCACHED_FAILURE;
582
583 s[0] = ' ';
584 memcpy(s + 1, ns, ns_len);
585 s[ns_len + 1] = '\0';
586
587 c->prefix = s;
588 c->prefix_len = 1 + ns_len;
589
590 if (c->hash_namespace)
591 dispatch_set_prefix(&c->dispatch, ns, ns_len);
592
593 return MEMCACHED_SUCCESS;
594 }
595
596
597 const char *
client_get_prefix(struct client * c,size_t * ns_len)598 client_get_prefix(struct client *c, size_t *ns_len)
599 {
600 *ns_len = c->prefix_len - 1;
601
602 return (c->prefix + 1);
603 }
604
605
606 static inline
607 ssize_t
read_restart(int fd,void * buf,size_t size)608 read_restart(int fd, void *buf, size_t size)
609 {
610 ssize_t res;
611
612 do
613 res = read(fd, buf, size);
614 while (res == -1 && errno == EINTR);
615
616 return res;
617 }
618
619
620 static inline
621 ssize_t
readv_restart(int fd,const struct iovec * iov,int count)622 readv_restart(int fd, const struct iovec *iov, int count)
623 {
624 ssize_t res;
625
626 do
627 res = readv(fd, iov, count);
628 while (res == -1 && errno == EINTR);
629
630 return res;
631 }
632
633
634 #ifndef MSG_NOSIGNAL
635
636 static inline
637 ssize_t
writev_restart(int fd,const struct iovec * iov,int count)638 writev_restart(int fd, const struct iovec *iov, int count)
639 {
640 ssize_t res;
641
642 do
643 res = writev(fd, iov, count);
644 while (res == -1 && errno == EINTR);
645
646 return res;
647 }
648
649 #else /* MSG_NOSIGNAL */
650
651 static inline
652 ssize_t
writev_restart(int fd,const struct iovec * iov,int count)653 writev_restart(int fd, const struct iovec *iov, int count)
654 {
655 struct msghdr msg;
656 ssize_t res;
657
658 memset(&msg, 0, sizeof(msg));
659 msg.msg_iov = (struct iovec *) iov;
660 msg.msg_iovlen = count;
661
662 do
663 res = sendmsg(fd, &msg, MSG_NOSIGNAL);
664 while (res == -1 && errno == EINTR);
665
666 return res;
667 }
668
669 #endif /* MSG_NOSIGNAL */
670
671
672 /*
673 parse_key() assumes that one key definitely matches.
674 */
675 static
676 int
parse_key(struct command_state * state)677 parse_key(struct command_state *state)
678 {
679 char *key_pos;
680
681 /* Skip over the prefix. */
682 state->pos += state->client->prefix_len - 1;
683
684 key_pos = (char *) state->key->iov_base;
685 while (state->key_count > 1)
686 {
687 char *key_end, *prefix_key;
688 size_t prefix_len;
689
690 key_end = (char *) state->key->iov_base + state->key->iov_len;
691 while (key_pos != key_end && *state->pos == *key_pos)
692 {
693 ++key_pos;
694 ++state->pos;
695 }
696
697 if (key_pos == key_end && *state->pos == ' ')
698 break;
699
700 prefix_key = (char *) state->key->iov_base;
701 prefix_len = key_pos - prefix_key;
702 /*
703 TODO: Below it might be faster to compare the tail of the key
704 before comparing the head.
705 */
706 do
707 {
708 next_index(state);
709 state->key += 2;
710 }
711 while (--state->key_count > 1
712 && (state->key->iov_len < prefix_len
713 || memcmp(state->key->iov_base,
714 prefix_key, prefix_len) != 0));
715
716 key_pos = (char *) state->key->iov_base + prefix_len;
717 }
718
719 if (state->key_count == 1)
720 {
721 while (*state->pos != ' ')
722 ++state->pos;
723 }
724
725 --state->key_count;
726 state->key += 2;
727 state->index = get_index(state);
728 next_index(state);
729
730 return MEMCACHED_SUCCESS;
731 }
732
733
734 static
735 int
read_value(struct command_state * state)736 read_value(struct command_state *state)
737 {
738 value_size_type size;
739 size_t remains;
740
741 size = state->end - state->pos;
742 if (size > state->u.value.size)
743 size = state->u.value.size;
744 if (size > 0)
745 {
746 memcpy(state->u.value.ptr, state->pos, size);
747 state->u.value.size -= size;
748 state->u.value.ptr = (char *) state->u.value.ptr + size;
749 state->pos += size;
750 }
751
752 remains = state->end - state->pos;
753 if (remains < sizeof(eol))
754 {
755 struct iovec iov[2], *piov;
756
757 state->pos = memmove(state->buf, state->pos, remains);
758 state->end = state->buf + remains;
759
760 iov[0].iov_base = state->u.value.ptr;
761 iov[0].iov_len = state->u.value.size;
762 iov[1].iov_base = state->end;
763 iov[1].iov_len = REPLY_BUF_SIZE - remains;
764 piov = &iov[state->u.value.size > 0 ? 0 : 1];
765
766 do
767 {
768 ssize_t res;
769
770 res = readv_restart(state->fd, piov, iov + 2 - piov);
771 if (res <= 0)
772 {
773 state->u.value.ptr = iov[0].iov_base;
774 state->u.value.size = iov[0].iov_len;
775 state->end = iov[1].iov_base;
776
777 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
778 return MEMCACHED_EAGAIN;
779
780 state->object->free(state->u.value.opaque);
781 return MEMCACHED_CLOSED;
782 }
783
784 if ((size_t) res >= piov->iov_len)
785 {
786 piov->iov_base = (char *) piov->iov_base + piov->iov_len;
787 res -= piov->iov_len;
788 piov->iov_len = 0;
789 ++piov;
790 }
791
792 piov->iov_len -= res;
793 piov->iov_base = (char *) piov->iov_base + res;
794 }
795 while ((size_t) ((char *) iov[1].iov_base - state->pos) < sizeof(eol));
796
797 state->end = iov[1].iov_base;
798 }
799
800 if (memcmp(state->pos, eol, sizeof(eol)) != 0)
801 {
802 state->object->free(state->u.value.opaque);
803 return MEMCACHED_UNKNOWN;
804 }
805 state->pos += sizeof(eol);
806 state->eol = state->pos;
807
808 state->object->store(state->object->arg, state->u.value.opaque,
809 state->index, &state->u.value.meta);
810
811 return MEMCACHED_SUCCESS;
812 }
813
814
815 static inline
816 int
swallow_eol(struct command_state * state,int skip,int done)817 swallow_eol(struct command_state *state, int skip, int done)
818 {
819 if (! skip && state->eol - state->pos != sizeof(eol))
820 return MEMCACHED_UNKNOWN;
821
822 state->pos = state->eol;
823
824 if (done)
825 state->phase = PHASE_DONE;
826
827 return MEMCACHED_SUCCESS;
828 }
829
830
831 static
832 int
parse_ull(struct command_state * state,unsigned long long * result)833 parse_ull(struct command_state *state, unsigned long long *result)
834 {
835 unsigned long long res = 0;
836 const char *beg;
837
838 while (*state->pos == ' ')
839 ++state->pos;
840
841 beg = state->pos;
842
843 while (1)
844 {
845 switch (*state->pos)
846 {
847 case '0': case '1': case '2': case '3': case '4':
848 case '5': case '6': case '7': case '8': case '9':
849 res = res * 10 + (*state->pos - '0');
850 ++state->pos;
851 break;
852
853 default:
854 *result = res;
855 return (beg != state->pos ? MEMCACHED_SUCCESS : MEMCACHED_UNKNOWN);
856 }
857 }
858 }
859
860
861 static
862 int
parse_get_reply(struct command_state * state)863 parse_get_reply(struct command_state *state)
864 {
865 unsigned long long num;
866 int res;
867
868 switch (state->match)
869 {
870 case MATCH_END:
871 return swallow_eol(state, 0, 1);
872
873 default:
874 return MEMCACHED_UNKNOWN;
875
876 case MATCH_VALUE:
877 break;
878 }
879
880 while (*state->pos == ' ')
881 ++state->pos;
882
883 res = parse_key(state);
884 if (res != MEMCACHED_SUCCESS)
885 return res;
886
887 res = parse_ull(state, &num);
888 if (res != MEMCACHED_SUCCESS)
889 return res;
890 state->u.value.meta.flags = num;
891
892 res = parse_ull(state, &num);
893 if (res != MEMCACHED_SUCCESS)
894 return res;
895 state->u.value.size = num;
896
897 if (state->u.value.meta.use_cas)
898 {
899 res = parse_ull(state, &num);
900 if (res != MEMCACHED_SUCCESS)
901 return res;
902 state->u.value.meta.cas = num;
903 }
904
905 res = swallow_eol(state, 0, 0);
906 if (res != MEMCACHED_SUCCESS)
907 return res;
908
909 state->u.value.ptr = state->object->alloc(state->u.value.size,
910 &state->u.value.opaque);
911 if (! state->u.value.ptr)
912 return MEMCACHED_FAILURE;
913
914 state->phase = PHASE_VALUE;
915
916 return MEMCACHED_SUCCESS;
917 }
918
919
920 static inline
921 void
store_result(struct command_state * state,int res)922 store_result(struct command_state *state, int res)
923 {
924 int index = get_index(state);
925 next_index(state);
926 state->object->store(state->object->arg, (void *) (ptrdiff_t) res,
927 index, NULL);
928 }
929
930
931 static
932 int
parse_set_reply(struct command_state * state)933 parse_set_reply(struct command_state *state)
934 {
935 switch (state->match)
936 {
937 case MATCH_STORED:
938 store_result(state, 1);
939 break;
940
941 case MATCH_NOT_STORED:
942 case MATCH_NOT_FOUND:
943 case MATCH_EXISTS:
944 store_result(state, 0);
945 break;
946
947 default:
948 return MEMCACHED_UNKNOWN;
949 }
950
951 return swallow_eol(state, 0, 1);
952 }
953
954
955 static
956 int
parse_delete_reply(struct command_state * state)957 parse_delete_reply(struct command_state *state)
958 {
959 switch (state->match)
960 {
961 case MATCH_DELETED:
962 store_result(state, 1);
963 break;
964
965 case MATCH_NOT_FOUND:
966 store_result(state, 0);
967 break;
968
969 default:
970 return MEMCACHED_UNKNOWN;
971 }
972
973 return swallow_eol(state, 0, 1);
974 }
975
976
977 static
978 int
parse_touch_reply(struct command_state * state)979 parse_touch_reply(struct command_state *state)
980 {
981 switch (state->match)
982 {
983 case MATCH_TOUCHED:
984 store_result(state, 1);
985 break;
986
987 case MATCH_NOT_FOUND:
988 store_result(state, 0);
989 break;
990
991 default:
992 return MEMCACHED_UNKNOWN;
993 }
994
995 return swallow_eol(state, 0, 1);
996 }
997
998
999
1000 static
1001 int
parse_arith_reply(struct command_state * state)1002 parse_arith_reply(struct command_state *state)
1003 {
1004 char *beg;
1005 size_t len;
1006 int zero;
1007
1008 state->index = get_index(state);
1009 next_index(state);
1010
1011 switch (state->match)
1012 {
1013 case MATCH_NOT_FOUND:
1014 /* On NOT_FOUND we store the defined empty string. */
1015 state->u.embedded.ptr =
1016 state->object->alloc(0, &state->u.embedded.opaque);
1017 if (! state->u.embedded.ptr)
1018 return MEMCACHED_FAILURE;
1019
1020 state->object->store(state->object->arg, state->u.embedded.opaque,
1021 state->index, NULL);
1022
1023 return swallow_eol(state, 0, 1);
1024
1025 default:
1026 return MEMCACHED_UNKNOWN;
1027
1028 case MATCH_0: case MATCH_1: case MATCH_2: case MATCH_3: case MATCH_4:
1029 case MATCH_5: case MATCH_6: case MATCH_7: case MATCH_8: case MATCH_9:
1030 break;
1031 }
1032
1033 beg = state->pos - 1;
1034 len = 0;
1035 while (len == 0)
1036 {
1037 switch (*state->pos)
1038 {
1039 case '0': case '1': case '2': case '3': case '4':
1040 case '5': case '6': case '7': case '8': case '9':
1041 ++state->pos;
1042 break;
1043
1044 default:
1045 len = state->pos - beg;
1046 break;
1047 }
1048 }
1049
1050 zero = (*beg == '0' && len == 1);
1051 if (zero)
1052 len = 3;
1053
1054 state->u.embedded.ptr = state->object->alloc(len, &state->u.embedded.opaque);
1055 if (! state->u.embedded.ptr)
1056 return MEMCACHED_FAILURE;
1057
1058 if (! zero)
1059 memcpy(state->u.embedded.ptr, beg, len);
1060 else
1061 memcpy(state->u.embedded.ptr, "0E0", 3);
1062
1063 state->object->store(state->object->arg, state->u.embedded.opaque,
1064 state->index, NULL);
1065
1066 /* Value may be space padded. */
1067 return swallow_eol(state, 1, 1);
1068 }
1069
1070
1071 static
1072 int
parse_ok_reply(struct command_state * state)1073 parse_ok_reply(struct command_state *state)
1074 {
1075 switch (state->match)
1076 {
1077 case MATCH_OK:
1078 store_result(state, 1);
1079 return swallow_eol(state, 0, 1);
1080
1081 default:
1082 return MEMCACHED_UNKNOWN;
1083 }
1084 }
1085
1086
1087 static
1088 int
parse_version_reply(struct command_state * state)1089 parse_version_reply(struct command_state *state)
1090 {
1091 const char *beg;
1092 size_t len;
1093 int res;
1094
1095 state->index = get_index(state);
1096 next_index(state);
1097
1098 switch (state->match)
1099 {
1100 default:
1101 return MEMCACHED_UNKNOWN;
1102
1103 case MATCH_VERSION:
1104 break;
1105 }
1106
1107 while (*state->pos == ' ')
1108 ++state->pos;
1109
1110 beg = state->pos;
1111
1112 res = swallow_eol(state, 1, 1);
1113 if (res != MEMCACHED_SUCCESS)
1114 return res;
1115
1116 len = state->pos - sizeof(eol) - beg;
1117
1118 state->u.embedded.ptr = state->object->alloc(len, &state->u.embedded.opaque);
1119 if (! state->u.embedded.ptr)
1120 return MEMCACHED_FAILURE;
1121
1122 memcpy(state->u.embedded.ptr, beg, len);
1123
1124 state->object->store(state->object->arg, state->u.embedded.opaque,
1125 state->index, NULL);
1126
1127 return MEMCACHED_SUCCESS;
1128 }
1129
1130
1131 static
1132 int
parse_nowait_reply(struct command_state * state)1133 parse_nowait_reply(struct command_state *state)
1134 {
1135 int res;
1136
1137 /*
1138 Cast to enum parse_keyword_e to get compiler warning when some
1139 match result is not handled.
1140 */
1141 switch ((enum parse_keyword_e) state->match)
1142 {
1143 case MATCH_DELETED:
1144 case MATCH_OK:
1145 case MATCH_STORED:
1146 case MATCH_EXISTS:
1147 case MATCH_NOT_FOUND:
1148 case MATCH_NOT_STORED:
1149 case MATCH_TOUCHED:
1150 return swallow_eol(state, 0, 1);
1151
1152 case MATCH_0: case MATCH_1: case MATCH_2: case MATCH_3: case MATCH_4:
1153 case MATCH_5: case MATCH_6: case MATCH_7: case MATCH_8: case MATCH_9:
1154 case MATCH_VERSION: /* see client_noreply_push(). */
1155 return swallow_eol(state, 1, 1);
1156
1157 case MATCH_ERROR:
1158 res = swallow_eol(state, 0, 1);
1159 return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1160
1161 case MATCH_CLIENT_ERROR:
1162 case MATCH_SERVER_ERROR:
1163 res = swallow_eol(state, 1, 1);
1164 return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1165
1166 case NO_MATCH:
1167 case MATCH_VALUE:
1168 case MATCH_END:
1169 case MATCH_STAT:
1170 return MEMCACHED_UNKNOWN;
1171 }
1172
1173 /* Never reach here. */
1174 return MEMCACHED_UNKNOWN;
1175 }
1176
1177
1178 static
1179 void
client_mark_failed(struct client * c,struct server * s)1180 client_mark_failed(struct client *c, struct server *s)
1181 {
1182 if (s->cmd_state.fd != -1)
1183 {
1184 close(s->cmd_state.fd);
1185 s->cmd_state.fd = -1;
1186 s->cmd_state.nowait_count = 0;
1187 s->cmd_state.pos = s->cmd_state.end = s->cmd_state.eol =
1188 s->cmd_state.buf;
1189 }
1190
1191 if (c->max_failures > 0)
1192 {
1193 time_t now = time(NULL);
1194 if (s->failure_expires < now)
1195 s->failure_count = 0;
1196 ++s->failure_count;
1197 /*
1198 Set timeout on first failure, and on max_failures. The idea
1199 is that if max_failures had happened during failure_timeout,
1200 we do not retry in another failure_timeout seconds. This is
1201 not entirely true: we remember the time of the first failure,
1202 but for exact accounting we would have to keep time of each
1203 failure. However such exact measurement is not necessary.
1204 */
1205 if (s->failure_count == 1 || s->failure_count == c->max_failures)
1206 s->failure_expires = now + c->failure_timeout;
1207 }
1208 }
1209
1210
1211 static
1212 int
send_request(struct command_state * state,struct server * s)1213 send_request(struct command_state *state, struct server *s)
1214 {
1215 while (state->iov_count > 0)
1216 {
1217 int count;
1218 ssize_t res;
1219 size_t len;
1220
1221 count = (state->iov_count < state->client->iov_max
1222 ? state->iov_count : state->client->iov_max);
1223
1224 state->iov->iov_base =
1225 (char *) state->iov->iov_base + state->write_offset;
1226 state->iov->iov_len -= state->write_offset;
1227 len = state->iov->iov_len;
1228
1229 res = writev_restart(state->fd, state->iov, count);
1230
1231 state->iov->iov_base =
1232 (char *) state->iov->iov_base - state->write_offset;
1233 state->iov->iov_len += state->write_offset;
1234
1235 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
1236 return MEMCACHED_EAGAIN;
1237 if (res <= 0)
1238 {
1239 deactivate(state);
1240 client_mark_failed(state->client, s);
1241
1242 return MEMCACHED_CLOSED;
1243 }
1244
1245 while ((size_t) res >= len)
1246 {
1247 res -= len;
1248 ++state->iov;
1249 if (--state->iov_count == 0)
1250 break;
1251 len = state->iov->iov_len;
1252 state->write_offset = 0;
1253 }
1254 state->write_offset += res;
1255 }
1256
1257 if (state->reply_count == 0)
1258 deactivate(state);
1259
1260 return MEMCACHED_SUCCESS;
1261 }
1262
1263
1264 static
1265 int
receive_reply(struct command_state * state)1266 receive_reply(struct command_state *state)
1267 {
1268 while (state->eol != state->end && *state->eol != eol[sizeof(eol) - 1])
1269 ++state->eol;
1270
1271 /*
1272 When buffer is empty, move to the beginning of it for better CPU
1273 cache utilization.
1274 */
1275 if (state->pos == state->end)
1276 state->pos = state->end = state->eol = state->buf;
1277
1278 while (state->eol == state->end)
1279 {
1280 size_t size;
1281 ssize_t res;
1282
1283 size = REPLY_BUF_SIZE - (state->end - state->buf);
1284 if (size == 0)
1285 {
1286 if (state->pos != state->buf)
1287 {
1288 size_t len = state->end - state->pos;
1289 state->pos = memmove(state->buf, state->pos, len);
1290 state->end -= REPLY_BUF_SIZE - len;
1291 state->eol -= REPLY_BUF_SIZE - len;
1292 size = REPLY_BUF_SIZE - len;
1293 }
1294 else
1295 {
1296 return MEMCACHED_UNKNOWN;
1297 }
1298 }
1299
1300 res = read_restart(state->fd, state->end, size);
1301 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
1302 return MEMCACHED_EAGAIN;
1303 if (res <= 0)
1304 return MEMCACHED_CLOSED;
1305
1306 state->end += res;
1307
1308 while (state->eol != state->end && *state->eol != eol[sizeof(eol) - 1])
1309 ++state->eol;
1310 }
1311
1312 if ((size_t) (state->eol - state->buf) < sizeof(eol) - 1
1313 || memcmp(state->eol - (sizeof(eol) - 1), eol, sizeof(eol) - 1) != 0)
1314 return MEMCACHED_UNKNOWN;
1315
1316 ++state->eol;
1317
1318 return MEMCACHED_SUCCESS;
1319 }
1320
1321
1322 static
1323 int
parse_reply(struct command_state * state)1324 parse_reply(struct command_state *state)
1325 {
1326 int res, skip;
1327
1328 switch (state->match)
1329 {
1330 case MATCH_ERROR:
1331 case MATCH_CLIENT_ERROR:
1332 case MATCH_SERVER_ERROR:
1333 skip = (state->match != MATCH_ERROR);
1334 res = swallow_eol(state, skip, 1);
1335
1336 return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1337
1338 default:
1339 if (state->nowait_count)
1340 return parse_nowait_reply(state);
1341 else
1342 return state->parse_reply(state);
1343
1344 case NO_MATCH:
1345 return MEMCACHED_UNKNOWN;
1346 }
1347 }
1348
1349
1350 static
1351 int
process_reply(struct command_state * state,struct server * s)1352 process_reply(struct command_state *state, struct server *s)
1353 {
1354 int res = 0;
1355
1356 while (1)
1357 {
1358 switch (state->phase)
1359 {
1360 case PHASE_RECEIVE:
1361 res = receive_reply(state);
1362 if (res != MEMCACHED_SUCCESS)
1363 break;
1364
1365 state->match = parse_keyword(&state->pos);
1366
1367 state->phase = PHASE_PARSE;
1368
1369 /* Fall into below. */
1370
1371 case PHASE_PARSE:
1372 res = parse_reply(state);
1373 if (res != MEMCACHED_SUCCESS)
1374 break;
1375
1376 if (state->phase != PHASE_DONE)
1377 continue;
1378
1379 /* Fall into below. */
1380
1381 case PHASE_DONE:
1382 res = MEMCACHED_SUCCESS;
1383
1384 break;
1385
1386 case PHASE_VALUE:
1387 res = read_value(state);
1388 if (res != MEMCACHED_SUCCESS)
1389 break;
1390
1391 state->phase = PHASE_RECEIVE;
1392 continue;
1393 }
1394
1395 switch (res)
1396 {
1397 case MEMCACHED_ERROR:
1398 if (! (state->client->close_on_error || state->noreply))
1399 break;
1400
1401 /* else fall into below. */
1402
1403 case MEMCACHED_UNKNOWN:
1404 case MEMCACHED_CLOSED:
1405 deactivate(state);
1406 client_mark_failed(state->client, s);
1407
1408 /* Fall into below. */
1409
1410 case MEMCACHED_EAGAIN:
1411 return res;
1412 }
1413
1414 if (state->nowait_count > 0)
1415 {
1416 --state->nowait_count;
1417 }
1418 else if (--state->reply_count == 0)
1419 {
1420 if (state->iov_count == 0)
1421 deactivate(state);
1422
1423 return res;
1424 }
1425
1426 state->phase = PHASE_RECEIVE;
1427 }
1428 }
1429
1430
1431 static inline
1432 void
state_prepare(struct command_state * state,int key_index)1433 state_prepare(struct command_state *state, int key_index)
1434 {
1435 state->last_cmd_noreply = state->prepared_last_cmd_noreply;
1436 state->nowait_count += state->prepared_nowait_count;
1437
1438 state->key = array_elem(state->iov_buf, struct iovec, key_index);
1439 state->iov = array_beg(state->iov_buf, struct iovec);
1440 state->iov_count = array_size(state->iov_buf);
1441
1442 if (state->str_step > 0)
1443 {
1444 struct iovec *iov = state->iov;
1445 char *buf = array_beg(state->client->str_buf, char);
1446 int count = state->iov_count, step = state->str_step;
1447
1448 if (state->key_count > 0)
1449 {
1450 iov += 3;
1451 count -= 3;
1452 }
1453
1454 while (count > 0)
1455 {
1456 iov->iov_base = (void *) (buf + (ptrdiff_t) (iov->iov_base));
1457 iov += step;
1458 count -= step;
1459 }
1460 }
1461 }
1462
1463
1464 int
client_execute(struct client * c,int key_index)1465 client_execute(struct client *c, int key_index)
1466 {
1467 int first_iter = 1;
1468
1469 #if ! defined(MSG_NOSIGNAL) && ! defined(WIN32)
1470 struct sigaction orig, ignore;
1471 int res;
1472
1473 ignore.sa_handler = SIG_IGN;
1474 sigemptyset(&ignore.sa_mask);
1475 ignore.sa_flags = 0;
1476 res = sigaction(SIGPIPE, &ignore, &orig);
1477 if (res == -1)
1478 return MEMCACHED_FAILURE;
1479 #endif /* ! defined(MSG_NOSIGNAL) && ! defined(WIN32) */
1480
1481 while (1)
1482 {
1483 struct server *s;
1484 struct pollfd *pollfd_beg, *pollfd;
1485 int res;
1486
1487 pollfd_beg = array_beg(c->pollfds, struct pollfd);
1488 pollfd = pollfd_beg;
1489
1490 for (array_each(c->servers, struct server, s))
1491 {
1492 int may_write, may_read;
1493 struct command_state *state = &s->cmd_state;
1494
1495 if (! is_active(state))
1496 continue;
1497
1498 if (first_iter)
1499 {
1500 state_prepare(state, key_index);
1501
1502 may_write = 1;
1503 may_read = (state->reply_count > 0
1504 || state->nowait_count > 0);
1505 }
1506 else
1507 {
1508 const short revents = state->pollfd->revents;
1509
1510 may_write = revents & (POLLOUT | POLLERR | POLLHUP);
1511 may_read = revents & (POLLIN | POLLERR | POLLHUP);
1512 }
1513
1514 if (may_read || may_write)
1515 {
1516 if (may_write)
1517 {
1518 int res;
1519
1520 res = send_request(state, s);
1521 if (res == MEMCACHED_CLOSED)
1522 may_read = 0;
1523 }
1524
1525 if (may_read)
1526 process_reply(state, s);
1527
1528 if (! is_active(state))
1529 continue;
1530 }
1531
1532 pollfd->events = 0;
1533
1534 if (state->iov_count > 0)
1535 pollfd->events |= POLLOUT;
1536 if (state->reply_count > 0 || state->nowait_count > 0)
1537 pollfd->events |= POLLIN;
1538
1539 if (pollfd->events != 0)
1540 {
1541 pollfd->fd = state->fd;
1542 state->pollfd = pollfd;
1543 ++pollfd;
1544 }
1545 }
1546
1547 if (pollfd == pollfd_beg)
1548 break;
1549
1550 do
1551 res = poll(pollfd_beg, pollfd - pollfd_beg, c->io_timeout);
1552 while (res == -1 && errno == EINTR);
1553
1554 /*
1555 On error or timeout close all active connections. Otherwise
1556 we might receive garbage on them later.
1557 */
1558 if (res <= 0)
1559 {
1560 for (array_each(c->servers, struct server, s))
1561 {
1562 struct command_state *state = &s->cmd_state;
1563
1564 if (is_active(state))
1565 {
1566 /*
1567 Ugly fix for possible memory leak. FIXME:
1568 requires redesign.
1569 */
1570 if (state->phase == PHASE_VALUE)
1571 state->object->free(state->u.value.opaque);
1572
1573 client_mark_failed(c, s);
1574 }
1575 }
1576
1577 break;
1578 }
1579
1580 first_iter = 0;
1581 }
1582
1583 #if ! defined(MSG_NOSIGNAL) && ! defined(WIN32)
1584 /*
1585 Ignore return value of sigaction(), there's nothing we can do in
1586 the case of error.
1587 */
1588 sigaction(SIGPIPE, &orig, NULL);
1589 #endif /* ! defined(MSG_NOSIGNAL) && ! defined(WIN32) */
1590
1591 return MEMCACHED_SUCCESS;
1592 }
1593
1594
1595 /* Is the following required for any platform? */
1596 #if (! defined(IPPROTO_TCP) && defined(SOL_TCP))
1597 #define IPPROTO_TCP SOL_TCP
1598 #endif
1599
1600
1601 static inline
1602 void
tcp_optimize_latency(struct command_state * state)1603 tcp_optimize_latency(struct command_state *state)
1604 {
1605 #ifdef TCP_NODELAY
1606 if (state->socket_mode == TCP_THROUGHPUT)
1607 {
1608 static const int enable = 1;
1609 setsockopt(state->fd, IPPROTO_TCP, TCP_NODELAY,
1610 (void *) &enable, sizeof(enable));
1611 state->socket_mode = TCP_LATENCY;
1612 }
1613 #endif /* TCP_NODELAY */
1614 }
1615
1616
1617 static inline
1618 void
tcp_optimize_throughput(struct command_state * state)1619 tcp_optimize_throughput(struct command_state *state)
1620 {
1621 #ifdef TCP_NODELAY
1622 if (state->socket_mode == TCP_LATENCY)
1623 {
1624 static const int disable = 0;
1625 setsockopt(state->fd, IPPROTO_TCP, TCP_NODELAY,
1626 (void *) &disable, sizeof(disable));
1627 state->socket_mode = TCP_THROUGHPUT;
1628 }
1629 #endif /* TCP_NODELAY */
1630 }
1631
1632
1633 static
1634 int
get_server_fd(struct client * c,struct server * s)1635 get_server_fd(struct client *c, struct server *s)
1636 {
1637 struct command_state *state;
1638
1639 /*
1640 Do not try to try reconnect if had max_failures and
1641 failure_expires time is not reached yet.
1642 */
1643 if (c->max_failures > 0 && s->failure_count >= c->max_failures)
1644 {
1645 if (time(NULL) <= s->failure_expires)
1646 return -1;
1647 else
1648 s->failure_count = 0;
1649 }
1650
1651 state = &s->cmd_state;
1652 if (state->fd == -1)
1653 {
1654 if (s->port)
1655 {
1656 state->fd = client_connect_inet(s->host, s->port,
1657 c->connect_timeout);
1658 /* This is to trigger actual reset. */
1659 state->socket_mode = TCP_THROUGHPUT;
1660 if (state->fd != -1)
1661 tcp_optimize_latency(state);
1662 }
1663 else
1664 {
1665 state->fd = client_connect_unix(s->host, s->host_len);
1666 state->socket_mode = NOT_TCP;
1667 }
1668 }
1669
1670 if (state->fd == -1)
1671 client_mark_failed(c, s);
1672
1673 return state->fd;
1674 }
1675
1676
1677 static inline
1678 void
iov_push(struct command_state * state,const void * buf,size_t buf_size)1679 iov_push(struct command_state *state, const void *buf, size_t buf_size)
1680 {
1681 struct iovec *iov = array_end(state->iov_buf, struct iovec);
1682 iov->iov_base = (void *) buf;
1683 iov->iov_len = buf_size;
1684 array_push(state->iov_buf);
1685 }
1686
1687
1688 static
1689 int
push_index(struct command_state * state,int index)1690 push_index(struct command_state *state, int index)
1691 {
1692 struct index_node *node;
1693 struct client *c;
1694
1695 c = state->client;
1696 if (array_extend(c->index_list, struct index_node,
1697 1, ARRAY_EXTEND_TWICE) == -1)
1698 return MEMCACHED_FAILURE;
1699
1700 if (state->index_tail != -1)
1701 array_elem(c->index_list, struct index_node, state->index_tail)->next =
1702 array_size(c->index_list);
1703 else
1704 state->index_head = array_size(c->index_list);
1705
1706 state->index_tail = array_size(c->index_list);
1707
1708 node = array_elem(c->index_list, struct index_node, state->index_tail);
1709 node->index = index;
1710 node->next = -1;
1711
1712 array_push(c->index_list);
1713
1714 return MEMCACHED_SUCCESS;
1715 }
1716
1717
1718 static
1719 struct command_state *
init_state(struct command_state * state,int index,size_t request_size,size_t str_size,parse_reply_func parse_reply)1720 init_state(struct command_state *state, int index, size_t request_size,
1721 size_t str_size, parse_reply_func parse_reply)
1722 {
1723 if (! is_active(state))
1724 {
1725 if (state->client->noreply)
1726 {
1727 if (state->client->nowait || state->noreply)
1728 {
1729 parse_reply = NULL;
1730 tcp_optimize_throughput(state);
1731 }
1732
1733 state->prepared_last_cmd_noreply = state->noreply;
1734 }
1735 else
1736 {
1737 state->prepared_last_cmd_noreply = 0;
1738 tcp_optimize_latency(state);
1739 }
1740
1741 state->object = state->client->object;
1742 command_state_reset(state, (str_size > 0 ? request_size : 0),
1743 parse_reply);
1744 }
1745
1746 if (array_extend(state->iov_buf, struct iovec,
1747 request_size, ARRAY_EXTEND_EXACT) == -1)
1748 {
1749 deactivate(state);
1750 return NULL;
1751 }
1752
1753 if (str_size > 0
1754 && array_extend(state->client->str_buf, char,
1755 str_size, ARRAY_EXTEND_TWICE) == -1)
1756 {
1757 deactivate(state);
1758 return NULL;
1759 }
1760
1761 if (push_index(state, index) != MEMCACHED_SUCCESS)
1762 {
1763 deactivate(state);
1764 return NULL;
1765 }
1766
1767 if (state->parse_reply)
1768 ++state->reply_count;
1769 else if (! state->prepared_last_cmd_noreply)
1770 ++state->prepared_nowait_count;
1771
1772 return state;
1773 }
1774
1775
1776 static
1777 struct command_state *
get_state(struct client * c,int index,const char * key,size_t key_len,size_t request_size,size_t str_size,parse_reply_func parse_reply)1778 get_state(struct client *c, int index, const char *key, size_t key_len,
1779 size_t request_size, size_t str_size,
1780 parse_reply_func parse_reply)
1781 {
1782 struct server *s;
1783 int server_index, fd;
1784
1785 server_index = dispatch_key(&c->dispatch, key, key_len);
1786 if (server_index == -1)
1787 return NULL;
1788
1789 s = array_elem(c->servers, struct server, server_index);
1790
1791 fd = get_server_fd(c, s);
1792 if (fd == -1)
1793 return NULL;
1794
1795 return init_state(&s->cmd_state, index, request_size, str_size,
1796 parse_reply);
1797 }
1798
1799
1800 static inline
1801 const char *
get_noreply(struct command_state * state)1802 get_noreply(struct command_state *state)
1803 {
1804 if (state->noreply && state->client->noreply)
1805 return " " NOREPLY;
1806 else
1807 return "";
1808 }
1809
1810
1811 inline
1812 void
client_reset(struct client * c,struct result_object * o,int noreply)1813 client_reset(struct client *c, struct result_object *o, int noreply)
1814 {
1815 array_clear(c->index_list);
1816 array_clear(c->str_buf);
1817
1818 ++c->generation;
1819 c->object = o;
1820 c->noreply = noreply;
1821 }
1822
1823
1824 #define STR_WITH_LEN(str) (str), (sizeof(str) - 1)
1825
1826
1827 int
client_prepare_set(struct client * c,enum set_cmd_e cmd,int key_index,const char * key,size_t key_len,flags_type flags,exptime_type exptime,const void * value,value_size_type value_size)1828 client_prepare_set(struct client *c, enum set_cmd_e cmd, int key_index,
1829 const char *key, size_t key_len,
1830 flags_type flags, exptime_type exptime,
1831 const void *value, value_size_type value_size)
1832 {
1833 static const size_t request_size = 6;
1834 static const size_t str_size =
1835 sizeof(" " FLAGS_STUB " " EXPTIME_STUB " " VALUE_SIZE_STUB
1836 " " NOREPLY "\r\n");
1837
1838 struct command_state *state;
1839
1840 state = get_state(c, key_index, key, key_len, request_size, str_size,
1841 parse_set_reply);
1842 if (! state)
1843 return MEMCACHED_FAILURE;
1844
1845 ++state->key_count;
1846
1847 switch (cmd)
1848 {
1849 case CMD_SET:
1850 iov_push(state, STR_WITH_LEN("set"));
1851 break;
1852
1853 case CMD_ADD:
1854 iov_push(state, STR_WITH_LEN("add"));
1855 break;
1856
1857 case CMD_REPLACE:
1858 iov_push(state, STR_WITH_LEN("replace"));
1859 break;
1860
1861 case CMD_APPEND:
1862 iov_push(state, STR_WITH_LEN("append"));
1863 break;
1864
1865 case CMD_PREPEND:
1866 iov_push(state, STR_WITH_LEN("prepend"));
1867 break;
1868
1869 case CMD_CAS:
1870 /* This can't happen. */
1871 return MEMCACHED_FAILURE;
1872 }
1873 iov_push(state, c->prefix, c->prefix_len);
1874 iov_push(state, key, key_len);
1875
1876 {
1877 char *buf = array_end(c->str_buf, char);
1878 size_t str_size =
1879 sprintf(buf, " " FMT_FLAGS " " FMT_EXPTIME " " FMT_VALUE_SIZE "%s\r\n",
1880 flags, exptime, value_size, get_noreply(state));
1881 iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
1882 array_append(c->str_buf, str_size);
1883 }
1884
1885 iov_push(state, value, value_size);
1886 iov_push(state, STR_WITH_LEN("\r\n"));
1887
1888 return MEMCACHED_SUCCESS;
1889 }
1890
1891
1892 int
client_prepare_cas(struct client * c,int key_index,const char * key,size_t key_len,cas_type cas,flags_type flags,exptime_type exptime,const void * value,value_size_type value_size)1893 client_prepare_cas(struct client *c, int key_index,
1894 const char *key, size_t key_len,
1895 cas_type cas, flags_type flags, exptime_type exptime,
1896 const void *value, value_size_type value_size)
1897 {
1898 static const size_t request_size = 6;
1899 static const size_t str_size =
1900 sizeof(" " FLAGS_STUB " " EXPTIME_STUB " " VALUE_SIZE_STUB
1901 " " CAS_STUB " " NOREPLY "\r\n");
1902
1903 struct command_state *state;
1904
1905 state = get_state(c, key_index, key, key_len, request_size, str_size,
1906 parse_set_reply);
1907 if (! state)
1908 return MEMCACHED_FAILURE;
1909
1910 ++state->key_count;
1911
1912 iov_push(state, STR_WITH_LEN("cas"));
1913 iov_push(state, c->prefix, c->prefix_len);
1914 iov_push(state, key, key_len);
1915
1916 {
1917 char *buf = array_end(c->str_buf, char);
1918 size_t str_size =
1919 sprintf(buf, " " FMT_FLAGS " " FMT_EXPTIME " " FMT_VALUE_SIZE
1920 " " FMT_CAS "%s\r\n", flags, exptime, value_size, cas,
1921 get_noreply(state));
1922 iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
1923 array_append(c->str_buf, str_size);
1924 }
1925
1926 iov_push(state, value, value_size);
1927 iov_push(state, STR_WITH_LEN("\r\n"));
1928
1929 return MEMCACHED_SUCCESS;
1930 }
1931
1932
1933 int
client_prepare_get(struct client * c,enum get_cmd_e cmd,int key_index,const char * key,size_t key_len)1934 client_prepare_get(struct client *c, enum get_cmd_e cmd, int key_index,
1935 const char *key, size_t key_len)
1936 {
1937 static const size_t request_size = 4;
1938
1939 struct command_state *state;
1940
1941 state = get_state(c, key_index, key, key_len, request_size, 0,
1942 parse_get_reply);
1943 if (! state)
1944 return MEMCACHED_FAILURE;
1945
1946 ++state->key_count;
1947
1948 if (! array_empty(state->iov_buf))
1949 {
1950 /* Pop off trailing \r\n because we are about to add another key. */
1951 array_pop(state->iov_buf);
1952
1953 /* get can't be in noreply mode, so reply_count is positive. */
1954 --state->reply_count;
1955 }
1956 else
1957 {
1958 switch (cmd)
1959 {
1960 case CMD_GET:
1961 state->u.value.meta.use_cas = 0;
1962 iov_push(state, STR_WITH_LEN("get"));
1963 break;
1964
1965 case CMD_GETS:
1966 state->u.value.meta.use_cas = 1;
1967 iov_push(state, STR_WITH_LEN("gets"));
1968 break;
1969 }
1970 }
1971
1972 iov_push(state, c->prefix, c->prefix_len);
1973 iov_push(state, key, key_len);
1974 iov_push(state, STR_WITH_LEN("\r\n"));
1975
1976 return MEMCACHED_SUCCESS;
1977 }
1978
1979
1980 int
client_prepare_gat(struct client * c,enum gat_cmd_e cmd,int key_index,const char * key,size_t key_len,const char * exptime,size_t exptime_len)1981 client_prepare_gat(struct client *c, enum gat_cmd_e cmd,
1982 int key_index, const char *key, size_t key_len, const char *exptime, size_t exptime_len)
1983 {
1984 static const size_t request_size = 6;
1985
1986 struct command_state *state;
1987
1988 state = get_state(c, key_index, key, key_len, request_size, 0,
1989 parse_get_reply);
1990 if (! state)
1991 return MEMCACHED_FAILURE;
1992
1993 ++state->key_count;
1994
1995 if (! array_empty(state->iov_buf))
1996 {
1997 /* Pop off trailing \r\n because we are about to add another key. */
1998 array_pop(state->iov_buf);
1999
2000 /* get can't be in noreply mode, so reply_count is positive. */
2001 --state->reply_count;
2002 }
2003 else
2004 {
2005 switch (cmd)
2006 {
2007 case CMD_GAT:
2008 state->u.value.meta.use_cas = 0;
2009 iov_push(state, STR_WITH_LEN("gat"));
2010 break;
2011
2012 case CMD_GATS:
2013 state->u.value.meta.use_cas = 1;
2014 iov_push(state, STR_WITH_LEN("gats"));
2015 break;
2016 }
2017
2018 iov_push(state, STR_WITH_LEN(" "));
2019 iov_push(state, exptime, exptime_len);
2020 }
2021
2022 iov_push(state, c->prefix, c->prefix_len);
2023 iov_push(state, key, key_len);
2024 iov_push(state, STR_WITH_LEN("\r\n"));
2025
2026 return MEMCACHED_SUCCESS;
2027 }
2028
2029
2030 int
client_prepare_incr(struct client * c,enum arith_cmd_e cmd,int key_index,const char * key,size_t key_len,arith_type arg)2031 client_prepare_incr(struct client *c, enum arith_cmd_e cmd, int key_index,
2032 const char *key, size_t key_len, arith_type arg)
2033 {
2034 static const size_t request_size = 4;
2035 static const size_t str_size = sizeof(" " ARITH_STUB " " NOREPLY "\r\n");
2036
2037 struct command_state *state;
2038
2039 state = get_state(c, key_index, key, key_len, request_size, str_size,
2040 parse_arith_reply);
2041 if (! state)
2042 return MEMCACHED_FAILURE;
2043
2044 ++state->key_count;
2045
2046 switch (cmd)
2047 {
2048 case CMD_INCR:
2049 iov_push(state, STR_WITH_LEN("incr"));
2050 break;
2051
2052 case CMD_DECR:
2053 iov_push(state, STR_WITH_LEN("decr"));
2054 break;
2055 }
2056 iov_push(state, c->prefix, c->prefix_len);
2057 iov_push(state, key, key_len);
2058
2059 {
2060 char *buf = array_end(c->str_buf, char);
2061 size_t str_size =
2062 sprintf(buf, " " FMT_ARITH "%s\r\n", arg, get_noreply(state));
2063 iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2064 array_append(c->str_buf, str_size);
2065 }
2066
2067 return MEMCACHED_SUCCESS;
2068 }
2069
2070
2071 int
client_prepare_delete(struct client * c,int key_index,const char * key,size_t key_len)2072 client_prepare_delete(struct client *c, int key_index,
2073 const char *key, size_t key_len)
2074 {
2075 static const size_t request_size = 4;
2076 static const size_t str_size = sizeof(" " NOREPLY "\r\n");
2077
2078 struct command_state *state;
2079
2080 state = get_state(c, key_index, key, key_len, request_size, str_size,
2081 parse_delete_reply);
2082 if (! state)
2083 return MEMCACHED_FAILURE;
2084
2085 ++state->key_count;
2086
2087 iov_push(state, STR_WITH_LEN("delete"));
2088 iov_push(state, c->prefix, c->prefix_len);
2089 iov_push(state, key, key_len);
2090
2091 {
2092 char *buf = array_end(c->str_buf, char);
2093 size_t str_size = sprintf(buf, "%s\r\n", get_noreply(state));
2094 iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2095 array_append(c->str_buf, str_size);
2096 }
2097
2098 return MEMCACHED_SUCCESS;
2099 }
2100
2101
2102 int
client_prepare_touch(struct client * c,int key_index,const char * key,size_t key_len,exptime_type exptime)2103 client_prepare_touch(struct client *c, int key_index,
2104 const char *key, size_t key_len,
2105 exptime_type exptime)
2106 {
2107 static const size_t request_size = 4;
2108 static const size_t str_size = sizeof(" " EXPTIME_STUB " " NOREPLY "\r\n");
2109
2110 struct command_state *state;
2111
2112 state = get_state(c, key_index, key, key_len, request_size, str_size,
2113 parse_touch_reply);
2114 if (! state)
2115 return MEMCACHED_FAILURE;
2116
2117 ++state->key_count;
2118
2119 iov_push(state, STR_WITH_LEN("touch"));
2120 iov_push(state, c->prefix, c->prefix_len);
2121 iov_push(state, key, key_len);
2122
2123 {
2124 char *buf = array_end(c->str_buf, char);
2125 size_t str_size = sprintf(buf, " " FMT_EXPTIME "%s\r\n", exptime, get_noreply(state));
2126 iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2127 array_append(c->str_buf, str_size);
2128 }
2129
2130 return MEMCACHED_SUCCESS;
2131 }
2132
2133
2134 int
client_flush_all(struct client * c,delay_type delay,struct result_object * o,int noreply)2135 client_flush_all(struct client *c, delay_type delay,
2136 struct result_object *o, int noreply)
2137 {
2138 static const size_t request_size = 1;
2139 static const size_t str_size =
2140 sizeof("flush_all " DELAY_STUB " " NOREPLY "\r\n");
2141
2142 struct server *s;
2143 double ddelay = delay, delay_step = 0.0;
2144 int i;
2145
2146 client_reset(c, o, noreply);
2147
2148 if (array_size(c->servers) > 1)
2149 delay_step = ddelay / (array_size(c->servers) - 1);
2150 ddelay += delay_step;
2151
2152 for (i = 0, array_each(c->servers, struct server, s), ++i)
2153 {
2154 struct command_state *state;
2155 int fd;
2156
2157 ddelay -= delay_step;
2158
2159 fd = get_server_fd(c, s);
2160 if (fd == -1)
2161 continue;
2162
2163 state = init_state(&s->cmd_state, i, request_size, str_size,
2164 parse_ok_reply);
2165 if (! state)
2166 continue;
2167
2168 {
2169 char *buf = array_end(c->str_buf, char);
2170 size_t str_size =
2171 sprintf(buf, "flush_all " FMT_DELAY "%s\r\n",
2172 (delay_type) (ddelay + 0.5), get_noreply(state));
2173 iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2174 array_append(c->str_buf, str_size);
2175 }
2176 }
2177
2178 return client_execute(c, 2);
2179 }
2180
2181
2182 int
client_nowait_push(struct client * c)2183 client_nowait_push(struct client *c)
2184 {
2185 struct server *s;
2186
2187 if (! c->nowait)
2188 return MEMCACHED_SUCCESS;
2189
2190 client_reset(c, NULL, 0);
2191
2192 for (array_each(c->servers, struct server, s))
2193 {
2194 struct command_state *state;
2195 int fd;
2196
2197 state = &s->cmd_state;
2198 if (state->nowait_count == 0)
2199 continue;
2200
2201 fd = get_server_fd(c, s);
2202 if (fd == -1)
2203 continue;
2204
2205 /*
2206 In order to wait the final pending reply we pretend that one
2207 command was never a nowait command, and set parse function to
2208 parse_nowait_reply.
2209 */
2210 --state->nowait_count;
2211 command_state_reset(state, 0, parse_nowait_reply);
2212 tcp_optimize_latency(state);
2213 ++state->reply_count;
2214 }
2215
2216 return client_execute(c, 2);
2217 }
2218
2219
2220 int
client_server_versions(struct client * c,struct result_object * o)2221 client_server_versions(struct client *c, struct result_object *o)
2222 {
2223 static const size_t request_size = 1;
2224
2225 struct server *s;
2226 int i;
2227
2228 client_reset(c, o, 0);
2229
2230 for (i = 0, array_each(c->servers, struct server, s), ++i)
2231 {
2232 struct command_state *state;
2233 int fd;
2234
2235 fd = get_server_fd(c, s);
2236 if (fd == -1)
2237 continue;
2238
2239 state = init_state(&s->cmd_state, i, request_size, 0,
2240 parse_version_reply);
2241 if (! state)
2242 continue;
2243
2244 iov_push(state, STR_WITH_LEN("version\r\n"));
2245 }
2246
2247 return client_execute(c, 2);
2248 }
2249
2250
2251 /*
2252 When noreply mode is enabled the client may send the last noreply
2253 request and close the connection. The server will see that the
2254 connection is closed, and will discard all previously read data
2255 without processing it. To avoid this, we send "version" command and
2256 wait for the reply (discarding it).
2257 */
2258 static
2259 int
client_noreply_push(struct client * c)2260 client_noreply_push(struct client *c)
2261 {
2262 static const size_t request_size = 1;
2263
2264 struct server *s;
2265 int i;
2266
2267 client_reset(c, NULL, 0);
2268
2269 for (i = 0, array_each(c->servers, struct server, s), ++i)
2270 {
2271 struct command_state *state = &s->cmd_state;
2272 int fd;
2273
2274 if (! state->last_cmd_noreply)
2275 continue;
2276
2277 fd = get_server_fd(c, s);
2278 if (fd == -1)
2279 continue;
2280
2281 state = init_state(state, i, request_size, 0, parse_nowait_reply);
2282 if (! state)
2283 continue;
2284
2285 iov_push(state, STR_WITH_LEN("version\r\n"));
2286 }
2287
2288 return client_execute(c, 2);
2289 }
2290