1 /*
2   Copyright (C) 2007-2010 Tomash Brechko.  All rights reserved.
3 
4   When used to build Perl module:
5 
6   This library is free software; you can redistribute it and/or modify
7   it under the same terms as Perl itself, either Perl version 5.8.8
8   or, at your option, any later version of Perl 5 you may have
9   available.
10 
11   When used as a standalone library:
12 
13   This library is free software; you can redistribute it and/or modify
14   it under the terms of the GNU Lesser General Public License as
15   published by the Free Software Foundation; either version 2.1 of the
16   License, or (at your option) any later version.
17 
18   This library is distributed in the hope that it will be useful, but
19   WITHOUT ANY WARRANTY; without even the implied warranty of
20   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21   Lesser General Public License for more details.
22 */
23 
24 #include "client.h"
25 #include "array.h"
26 #include "connect.h"
27 #include "parse_keyword.h"
28 #include "dispatch_key.h"
29 #include <stdlib.h>
30 #include <string.h>
31 #include <stdio.h>
32 #ifndef WIN32
33 #include "socket_posix.h"
34 #include <sys/uio.h>
35 #include <signal.h>
36 #include <time.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #else  /* WIN32 */
40 #include "socket_win32.h"
41 #endif  /* WIN32 */
42 
43 
44 /* REPLY_BUF_SIZE should be large enough to contain first reply line.  */
45 #define REPLY_BUF_SIZE  1536
46 
47 
48 #define FLAGS_STUB  "4294967295"
49 #define EXPTIME_STUB  "2147483647"
50 #define DELAY_STUB  "4294967295"
51 #define VALUE_SIZE_STUB  "18446744073709551615"
52 #define CAS_STUB  "18446744073709551615"
53 #define ARITH_STUB  "18446744073709551615"
54 #define NOREPLY  "noreply"
55 
56 
57 static const char eol[2] = "\r\n";
58 
59 
60 typedef unsigned long long generation_type;
61 
62 
63 struct value_state
64 {
65   void *opaque;
66   void *ptr;
67   value_size_type size;
68   struct meta_object meta;
69 };
70 
71 
72 struct embedded_state
73 {
74   void *opaque;
75   void *ptr;
76 };
77 
78 
79 struct command_state;
80 typedef int (*parse_reply_func)(struct command_state *state);
81 
82 
83 enum command_phase
84 {
85   PHASE_RECEIVE,
86   PHASE_PARSE,
87   PHASE_VALUE,
88   PHASE_DONE
89 };
90 
91 
92 enum socket_mode_e { NOT_TCP = -1, TCP_LATENCY, TCP_THROUGHPUT };
93 
94 
95 struct client;
96 
97 
98 struct command_state
99 {
100   struct client *client;
101   int fd;
102   struct pollfd *pollfd;
103   enum socket_mode_e socket_mode;
104   int noreply;
105   int prepared_last_cmd_noreply;
106   int last_cmd_noreply;
107 
108   struct array iov_buf;
109   int str_step;
110 
111   generation_type generation;
112 
113   int phase;
114   int prepared_nowait_count;
115   int nowait_count;
116   int reply_count;
117 
118   char *buf;
119   char *pos;
120   char *end;
121   char *eol;
122   int match;
123 
124   struct iovec *iov;
125   int iov_count;
126   int write_offset;
127   struct iovec *key;
128   int key_count;
129   int index;
130   int index_head;
131   int index_tail;
132 
133   parse_reply_func parse_reply;
134   struct result_object *object;
135 
136   union
137   {
138     struct value_state value;
139     struct embedded_state embedded;
140   } u;
141 };
142 
143 
144 static inline
145 int
command_state_init(struct command_state * state,struct client * c,int noreply)146 command_state_init(struct command_state *state,
147                    struct client *c, int noreply)
148 {
149   state->client = c;
150   state->fd = -1;
151   state->noreply = noreply;
152   state->last_cmd_noreply = 0;
153 
154   array_init(&state->iov_buf);
155 
156   state->generation = 0;
157   state->nowait_count = 0;
158   state->buf = (char *) malloc(REPLY_BUF_SIZE);
159   if (! state->buf)
160     return -1;
161 
162   state->pos = state->end = state->eol = state->buf;
163 
164   return 0;
165 }
166 
167 
168 static inline
169 void
command_state_destroy(struct command_state * state)170 command_state_destroy(struct command_state *state)
171 {
172   free(state->buf);
173 
174   array_destroy(&state->iov_buf);
175 
176   if (state->fd != -1)
177     close(state->fd);
178 }
179 
180 
181 static inline
182 void
command_state_reinit(struct command_state * state)183 command_state_reinit(struct command_state *state)
184 {
185   if (state->fd != -1)
186     close(state->fd);
187 
188   state->fd = -1;
189   state->last_cmd_noreply = 0;
190 
191   array_clear(state->iov_buf);
192 
193   state->generation = 0;
194   state->nowait_count = 0;
195 
196   state->pos = state->end = state->eol = state->buf;
197 }
198 
199 
200 struct server
201 {
202   char *host;
203   size_t host_len;
204   char *port;
205   int failure_count;
206   time_t failure_expires;
207   struct command_state cmd_state;
208 };
209 
210 
211 static inline
212 int
server_init(struct server * s,struct client * c,const char * host,size_t host_len,const char * port,size_t port_len,int noreply)213 server_init(struct server *s, struct client *c,
214             const char *host, size_t host_len,
215             const char *port, size_t port_len, int noreply)
216 {
217   if (port)
218     s->host = (char *) malloc(host_len + 1 + port_len + 1);
219   else
220     s->host = (char *) malloc(host_len + 1);
221 
222   if (! s->host)
223     return MEMCACHED_FAILURE;
224 
225   memcpy(s->host, host, host_len);
226   s->host[host_len] = '\0';
227   s->host_len = host_len;
228 
229   if (port)
230     {
231       s->port = s->host + host_len + 1;
232       memcpy(s->port, port, port_len);
233       s->port[port_len] = '\0';
234     }
235   else
236     {
237       s->port = NULL;
238     }
239 
240   s->failure_count = 0;
241   s->failure_expires = 0;
242 
243   if (command_state_init(&s->cmd_state, c, noreply) != 0)
244     return MEMCACHED_FAILURE;
245 
246   return MEMCACHED_SUCCESS;
247 }
248 
249 
250 static inline
251 void
server_destroy(struct server * s)252 server_destroy(struct server *s)
253 {
254   free(s->host); /* This also frees port string.  */
255   command_state_destroy(&s->cmd_state);
256 }
257 
258 
259 static inline
260 void
server_reinit(struct server * s)261 server_reinit(struct server *s)
262 {
263   s->failure_count = 0;
264   s->failure_expires = 0;
265 
266   command_state_reinit(&s->cmd_state);
267 }
268 
269 
270 struct index_node
271 {
272   int index;
273   int next;
274 };
275 
276 
277 struct client
278 {
279   struct array pollfds;
280   struct array servers;
281 
282   struct dispatch_state dispatch;
283 
284   char *prefix;
285   size_t prefix_len;
286 
287   int connect_timeout;          /* 1/1000 sec.  */
288   int io_timeout;               /* 1/1000 sec.  */
289   int max_failures;
290   int failure_timeout;          /* 1 sec.  */
291   int close_on_error;
292   int nowait;
293   int hash_namespace;
294 
295   struct array index_list;
296   struct array str_buf;
297   int iov_max;
298 
299   generation_type generation;
300 
301   struct result_object *object;
302   int noreply;
303 };
304 
305 
306 static inline
307 void
command_state_reset(struct command_state * state,int str_step,parse_reply_func parse_reply)308 command_state_reset(struct command_state *state, int str_step,
309                     parse_reply_func parse_reply)
310 {
311   state->prepared_nowait_count = 0;
312   state->reply_count = 0;
313   state->str_step = str_step;
314   state->key_count = 0;
315   state->parse_reply = parse_reply;
316 
317   state->phase = PHASE_RECEIVE;
318 
319   array_clear(state->iov_buf);
320 
321   state->write_offset = 0;
322   state->index_head = state->index_tail = -1;
323   state->generation = state->client->generation;
324 
325 #if 0 /* No need to initialize the following.  */
326   state->key = NULL;
327   state->index = 0;
328   state->match = NO_MATCH;
329   state->iov_count = 0;
330   state->iov = NULL;
331 #endif
332 }
333 
334 
335 static inline
336 int
is_active(struct command_state * state)337 is_active(struct command_state *state)
338 {
339   return (state->generation == state->client->generation);
340 }
341 
342 
343 static inline
344 void
deactivate(struct command_state * state)345 deactivate(struct command_state *state)
346 {
347   state->generation = state->client->generation - 1;
348 }
349 
350 
351 static inline
352 int
get_index(struct command_state * state)353 get_index(struct command_state *state)
354 {
355   struct index_node *node = array_elem(state->client->index_list,
356                                        struct index_node, state->index_head);
357   return node->index;
358 }
359 
360 
361 static inline
362 void
next_index(struct command_state * state)363 next_index(struct command_state *state)
364 {
365   struct index_node *node = array_elem(state->client->index_list,
366                                        struct index_node, state->index_head);
367   state->index_head = node->next;
368 }
369 
370 
371 struct client *
client_init()372 client_init()
373 {
374   struct client *c;
375 
376 #ifdef WIN32
377   if (win32_socket_library_acquire() != 0)
378     return NULL;
379 #endif  /* WIN32 */
380 
381   c = malloc(sizeof(struct client));
382   if (! c)
383     return NULL;
384 
385   array_init(&c->pollfds);
386   array_init(&c->servers);
387   array_init(&c->index_list);
388   array_init(&c->str_buf);
389 
390   dispatch_init(&c->dispatch);
391 
392   c->connect_timeout = 250;
393   c->io_timeout = 1000;
394   c->prefix = " ";
395   c->prefix_len = 1;
396   c->max_failures = 0;
397   c->failure_timeout = 10;
398   c->close_on_error = 1;
399   c->nowait = 0;
400   c->hash_namespace = 0;
401 
402   c->iov_max = get_iov_max();
403 
404   c->generation = 1;            /* Different from initial command state.  */
405 
406   c->object = NULL;
407   c->noreply = 0;
408 
409   return c;
410 }
411 
412 
413 static
414 int
415 client_noreply_push(struct client *c);
416 
417 
418 void
client_destroy(struct client * c)419 client_destroy(struct client *c)
420 {
421   struct server *s;
422 
423   client_nowait_push(c);
424   client_noreply_push(c);
425 
426   for (array_each(c->servers, struct server, s))
427     server_destroy(s);
428 
429   dispatch_destroy(&c->dispatch);
430 
431   array_destroy(&c->servers);
432   array_destroy(&c->pollfds);
433   array_destroy(&c->index_list);
434   array_destroy(&c->str_buf);
435 
436   if (c->prefix_len > 1)
437     free(c->prefix);
438   free(c);
439 
440 #ifdef WIN32
441   win32_socket_library_release();
442 #endif  /* WIN32 */
443 }
444 
445 
446 void
client_reinit(struct client * c)447 client_reinit(struct client *c)
448 {
449   struct server *s;
450 
451   for (array_each(c->servers, struct server, s))
452     server_reinit(s);
453 
454   array_clear(c->str_buf);
455   array_clear(c->index_list);
456 
457   c->generation = 1;            /* Different from initial command state.  */
458   c->object = NULL;
459 }
460 
461 
462 int
client_set_ketama_points(struct client * c,int ketama_points)463 client_set_ketama_points(struct client *c, int ketama_points)
464 {
465   /* Should be called before we added any server.  */
466   if (! array_empty(c->servers) || ketama_points < 0)
467     return MEMCACHED_FAILURE;
468 
469   dispatch_set_ketama_points(&c->dispatch, ketama_points);
470 
471   return MEMCACHED_SUCCESS;
472 }
473 
474 
475 void
client_set_connect_timeout(struct client * c,int to)476 client_set_connect_timeout(struct client *c, int to)
477 {
478   c->connect_timeout = (to > 0 ? to : -1);
479 }
480 
481 
482 void
client_set_io_timeout(struct client * c,int to)483 client_set_io_timeout(struct client *c, int to)
484 {
485   c->io_timeout = (to > 0 ? to : -1);
486 }
487 
488 
489 void
client_set_max_failures(struct client * c,int f)490 client_set_max_failures(struct client *c, int f)
491 {
492   c->max_failures = f;
493 }
494 
495 
496 void
client_set_failure_timeout(struct client * c,int to)497 client_set_failure_timeout(struct client *c, int to)
498 {
499   c->failure_timeout = to;
500 }
501 
502 
503 void
client_set_close_on_error(struct client * c,int enable)504 client_set_close_on_error(struct client *c, int enable)
505 {
506   c->close_on_error = enable;
507 }
508 
509 
510 void
client_set_nowait(struct client * c,int enable)511 client_set_nowait(struct client *c, int enable)
512 {
513   c->nowait = enable;
514 }
515 
516 
517 void
client_set_hash_namespace(struct client * c,int enable)518 client_set_hash_namespace(struct client *c, int enable)
519 {
520   c->hash_namespace = enable;
521 }
522 
523 
524 int
client_add_server(struct client * c,const char * host,size_t host_len,const char * port,size_t port_len,double weight,int noreply)525 client_add_server(struct client *c, const char *host, size_t host_len,
526                   const char *port, size_t port_len, double weight,
527                   int noreply)
528 {
529   int res;
530 
531   if (weight <= 0.0)
532     return MEMCACHED_FAILURE;
533 
534   if (array_extend(c->pollfds, struct pollfd, 1, ARRAY_EXTEND_EXACT) == -1)
535     return MEMCACHED_FAILURE;
536 
537   if (array_extend(c->servers, struct server, 1, ARRAY_EXTEND_EXACT) == -1)
538     return MEMCACHED_FAILURE;
539 
540   res = server_init(array_end(c->servers, struct server), c,
541                     host, host_len, port, port_len, noreply);
542   if (res != MEMCACHED_SUCCESS)
543     return res;
544 
545   res = dispatch_add_server(&c->dispatch, host, host_len, port, port_len,
546                             weight, array_size(c->servers));
547   if (res == -1)
548     return MEMCACHED_FAILURE;
549 
550   array_push(c->pollfds);
551   array_push(c->servers);
552 
553   return MEMCACHED_SUCCESS;
554 }
555 
556 
557 int
client_set_prefix(struct client * c,const char * ns,size_t ns_len)558 client_set_prefix(struct client *c, const char *ns, size_t ns_len)
559 {
560   char *s;
561 
562   if (ns_len == 0)
563     {
564       if (c->prefix_len > 1)
565         {
566           free(c->prefix);
567           c->prefix = " ";
568           c->prefix_len = 1;
569         }
570 
571       if (c->hash_namespace)
572         dispatch_set_prefix(&c->dispatch, "", 0);
573 
574       return MEMCACHED_SUCCESS;
575     }
576 
577   if (c->prefix_len == 1)
578     c->prefix = NULL;
579   s = (char *) realloc(c->prefix, 1 + ns_len + 1);
580   if (! s)
581     return MEMCACHED_FAILURE;
582 
583   s[0] = ' ';
584   memcpy(s + 1, ns, ns_len);
585   s[ns_len + 1] = '\0';
586 
587   c->prefix = s;
588   c->prefix_len = 1 + ns_len;
589 
590   if (c->hash_namespace)
591     dispatch_set_prefix(&c->dispatch, ns, ns_len);
592 
593   return MEMCACHED_SUCCESS;
594 }
595 
596 
597 const char *
client_get_prefix(struct client * c,size_t * ns_len)598 client_get_prefix(struct client *c, size_t *ns_len)
599 {
600   *ns_len = c->prefix_len - 1;
601 
602   return (c->prefix + 1);
603 }
604 
605 
606 static inline
607 ssize_t
read_restart(int fd,void * buf,size_t size)608 read_restart(int fd, void *buf, size_t size)
609 {
610   ssize_t res;
611 
612   do
613     res = read(fd, buf, size);
614   while (res == -1 && errno == EINTR);
615 
616   return res;
617 }
618 
619 
620 static inline
621 ssize_t
readv_restart(int fd,const struct iovec * iov,int count)622 readv_restart(int fd, const struct iovec *iov, int count)
623 {
624   ssize_t res;
625 
626   do
627     res = readv(fd, iov, count);
628   while (res == -1 && errno == EINTR);
629 
630   return res;
631 }
632 
633 
634 #ifndef MSG_NOSIGNAL
635 
636 static inline
637 ssize_t
writev_restart(int fd,const struct iovec * iov,int count)638 writev_restart(int fd, const struct iovec *iov, int count)
639 {
640   ssize_t res;
641 
642   do
643     res = writev(fd, iov, count);
644   while (res == -1 && errno == EINTR);
645 
646   return res;
647 }
648 
649 #else  /* MSG_NOSIGNAL */
650 
651 static inline
652 ssize_t
writev_restart(int fd,const struct iovec * iov,int count)653 writev_restart(int fd, const struct iovec *iov, int count)
654 {
655   struct msghdr msg;
656   ssize_t res;
657 
658   memset(&msg, 0, sizeof(msg));
659   msg.msg_iov = (struct iovec *) iov;
660   msg.msg_iovlen = count;
661 
662   do
663     res = sendmsg(fd, &msg, MSG_NOSIGNAL);
664   while (res == -1 && errno == EINTR);
665 
666   return res;
667 }
668 
669 #endif /* MSG_NOSIGNAL */
670 
671 
672 /*
673   parse_key() assumes that one key definitely matches.
674 */
675 static
676 int
parse_key(struct command_state * state)677 parse_key(struct command_state *state)
678 {
679   char *key_pos;
680 
681   /* Skip over the prefix.  */
682   state->pos += state->client->prefix_len - 1;
683 
684   key_pos = (char *) state->key->iov_base;
685   while (state->key_count > 1)
686     {
687       char *key_end, *prefix_key;
688       size_t prefix_len;
689 
690       key_end = (char *) state->key->iov_base + state->key->iov_len;
691       while (key_pos != key_end && *state->pos == *key_pos)
692         {
693           ++key_pos;
694           ++state->pos;
695         }
696 
697       if (key_pos == key_end && *state->pos == ' ')
698         break;
699 
700       prefix_key = (char *) state->key->iov_base;
701       prefix_len = key_pos - prefix_key;
702       /*
703         TODO: Below it might be faster to compare the tail of the key
704         before comparing the head.
705       */
706       do
707         {
708           next_index(state);
709           state->key += 2;
710         }
711       while (--state->key_count > 1
712              && (state->key->iov_len < prefix_len
713                  || memcmp(state->key->iov_base,
714                            prefix_key, prefix_len) != 0));
715 
716       key_pos = (char *) state->key->iov_base + prefix_len;
717     }
718 
719   if (state->key_count == 1)
720     {
721       while (*state->pos != ' ')
722         ++state->pos;
723     }
724 
725   --state->key_count;
726   state->key += 2;
727   state->index = get_index(state);
728   next_index(state);
729 
730   return MEMCACHED_SUCCESS;
731 }
732 
733 
734 static
735 int
read_value(struct command_state * state)736 read_value(struct command_state *state)
737 {
738   value_size_type size;
739   size_t remains;
740 
741   size = state->end - state->pos;
742   if (size > state->u.value.size)
743     size = state->u.value.size;
744   if (size > 0)
745     {
746       memcpy(state->u.value.ptr, state->pos, size);
747       state->u.value.size -= size;
748       state->u.value.ptr = (char *) state->u.value.ptr + size;
749       state->pos += size;
750     }
751 
752   remains = state->end - state->pos;
753   if (remains < sizeof(eol))
754     {
755       struct iovec iov[2], *piov;
756 
757       state->pos = memmove(state->buf, state->pos, remains);
758       state->end = state->buf + remains;
759 
760       iov[0].iov_base = state->u.value.ptr;
761       iov[0].iov_len = state->u.value.size;
762       iov[1].iov_base = state->end;
763       iov[1].iov_len = REPLY_BUF_SIZE - remains;
764       piov = &iov[state->u.value.size > 0 ? 0 : 1];
765 
766       do
767         {
768           ssize_t res;
769 
770           res = readv_restart(state->fd, piov, iov + 2 - piov);
771           if (res <= 0)
772             {
773               state->u.value.ptr = iov[0].iov_base;
774               state->u.value.size = iov[0].iov_len;
775               state->end = iov[1].iov_base;
776 
777               if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
778                 return MEMCACHED_EAGAIN;
779 
780               state->object->free(state->u.value.opaque);
781               return MEMCACHED_CLOSED;
782             }
783 
784           if ((size_t) res >= piov->iov_len)
785             {
786               piov->iov_base = (char *) piov->iov_base + piov->iov_len;
787               res -= piov->iov_len;
788               piov->iov_len = 0;
789               ++piov;
790             }
791 
792           piov->iov_len -= res;
793           piov->iov_base = (char *) piov->iov_base + res;
794         }
795       while ((size_t) ((char *) iov[1].iov_base - state->pos) < sizeof(eol));
796 
797       state->end = iov[1].iov_base;
798     }
799 
800   if (memcmp(state->pos, eol, sizeof(eol)) != 0)
801     {
802       state->object->free(state->u.value.opaque);
803       return MEMCACHED_UNKNOWN;
804     }
805   state->pos += sizeof(eol);
806   state->eol = state->pos;
807 
808   state->object->store(state->object->arg, state->u.value.opaque,
809                        state->index, &state->u.value.meta);
810 
811   return MEMCACHED_SUCCESS;
812 }
813 
814 
815 static inline
816 int
swallow_eol(struct command_state * state,int skip,int done)817 swallow_eol(struct command_state *state, int skip, int done)
818 {
819   if (! skip && state->eol - state->pos != sizeof(eol))
820     return MEMCACHED_UNKNOWN;
821 
822   state->pos = state->eol;
823 
824   if (done)
825     state->phase = PHASE_DONE;
826 
827   return MEMCACHED_SUCCESS;
828 }
829 
830 
831 static
832 int
parse_ull(struct command_state * state,unsigned long long * result)833 parse_ull(struct command_state *state, unsigned long long *result)
834 {
835   unsigned long long res = 0;
836   const char *beg;
837 
838   while (*state->pos == ' ')
839     ++state->pos;
840 
841   beg = state->pos;
842 
843   while (1)
844     {
845       switch (*state->pos)
846         {
847         case '0': case '1': case '2': case '3': case '4':
848         case '5': case '6': case '7': case '8': case '9':
849           res = res * 10 + (*state->pos - '0');
850           ++state->pos;
851           break;
852 
853         default:
854           *result = res;
855           return (beg != state->pos ? MEMCACHED_SUCCESS : MEMCACHED_UNKNOWN);
856         }
857     }
858 }
859 
860 
861 static
862 int
parse_get_reply(struct command_state * state)863 parse_get_reply(struct command_state *state)
864 {
865   unsigned long long num;
866   int res;
867 
868   switch (state->match)
869     {
870     case MATCH_END:
871       return swallow_eol(state, 0, 1);
872 
873     default:
874       return MEMCACHED_UNKNOWN;
875 
876     case MATCH_VALUE:
877       break;
878     }
879 
880   while (*state->pos == ' ')
881     ++state->pos;
882 
883   res = parse_key(state);
884   if (res != MEMCACHED_SUCCESS)
885     return res;
886 
887   res = parse_ull(state, &num);
888   if (res != MEMCACHED_SUCCESS)
889     return res;
890   state->u.value.meta.flags = num;
891 
892   res = parse_ull(state, &num);
893   if (res != MEMCACHED_SUCCESS)
894     return res;
895   state->u.value.size = num;
896 
897   if (state->u.value.meta.use_cas)
898     {
899       res = parse_ull(state, &num);
900       if (res != MEMCACHED_SUCCESS)
901         return res;
902       state->u.value.meta.cas = num;
903     }
904 
905   res = swallow_eol(state, 0, 0);
906   if (res != MEMCACHED_SUCCESS)
907     return res;
908 
909   state->u.value.ptr = state->object->alloc(state->u.value.size,
910                                             &state->u.value.opaque);
911   if (! state->u.value.ptr)
912     return MEMCACHED_FAILURE;
913 
914   state->phase = PHASE_VALUE;
915 
916   return MEMCACHED_SUCCESS;
917 }
918 
919 
920 static inline
921 void
store_result(struct command_state * state,int res)922 store_result(struct command_state *state, int res)
923 {
924   int index = get_index(state);
925   next_index(state);
926   state->object->store(state->object->arg, (void *) (ptrdiff_t) res,
927                        index, NULL);
928 }
929 
930 
931 static
932 int
parse_set_reply(struct command_state * state)933 parse_set_reply(struct command_state *state)
934 {
935   switch (state->match)
936     {
937     case MATCH_STORED:
938       store_result(state, 1);
939       break;
940 
941     case MATCH_NOT_STORED:
942     case MATCH_NOT_FOUND:
943     case MATCH_EXISTS:
944       store_result(state, 0);
945       break;
946 
947     default:
948       return MEMCACHED_UNKNOWN;
949     }
950 
951   return swallow_eol(state, 0, 1);
952 }
953 
954 
955 static
956 int
parse_delete_reply(struct command_state * state)957 parse_delete_reply(struct command_state *state)
958 {
959   switch (state->match)
960     {
961     case MATCH_DELETED:
962       store_result(state, 1);
963       break;
964 
965     case MATCH_NOT_FOUND:
966       store_result(state, 0);
967       break;
968 
969     default:
970       return MEMCACHED_UNKNOWN;
971     }
972 
973   return swallow_eol(state, 0, 1);
974 }
975 
976 
977 static
978 int
parse_touch_reply(struct command_state * state)979 parse_touch_reply(struct command_state *state)
980 {
981   switch (state->match)
982     {
983     case MATCH_TOUCHED:
984       store_result(state, 1);
985       break;
986 
987     case MATCH_NOT_FOUND:
988       store_result(state, 0);
989       break;
990 
991     default:
992       return MEMCACHED_UNKNOWN;
993     }
994 
995   return swallow_eol(state, 0, 1);
996 }
997 
998 
999 
1000 static
1001 int
parse_arith_reply(struct command_state * state)1002 parse_arith_reply(struct command_state *state)
1003 {
1004   char *beg;
1005   size_t len;
1006   int zero;
1007 
1008   state->index = get_index(state);
1009   next_index(state);
1010 
1011   switch (state->match)
1012     {
1013     case MATCH_NOT_FOUND:
1014       /* On NOT_FOUND we store the defined empty string.  */
1015       state->u.embedded.ptr =
1016         state->object->alloc(0, &state->u.embedded.opaque);
1017       if (! state->u.embedded.ptr)
1018         return MEMCACHED_FAILURE;
1019 
1020       state->object->store(state->object->arg, state->u.embedded.opaque,
1021                            state->index, NULL);
1022 
1023       return swallow_eol(state, 0, 1);
1024 
1025     default:
1026       return MEMCACHED_UNKNOWN;
1027 
1028     case MATCH_0: case MATCH_1: case MATCH_2: case MATCH_3: case MATCH_4:
1029     case MATCH_5: case MATCH_6: case MATCH_7: case MATCH_8: case MATCH_9:
1030       break;
1031     }
1032 
1033   beg = state->pos - 1;
1034   len = 0;
1035   while (len == 0)
1036     {
1037       switch (*state->pos)
1038         {
1039         case '0': case '1': case '2': case '3': case '4':
1040         case '5': case '6': case '7': case '8': case '9':
1041           ++state->pos;
1042           break;
1043 
1044         default:
1045           len = state->pos - beg;
1046           break;
1047         }
1048     }
1049 
1050   zero = (*beg == '0' && len == 1);
1051   if (zero)
1052     len = 3;
1053 
1054   state->u.embedded.ptr = state->object->alloc(len, &state->u.embedded.opaque);
1055   if (! state->u.embedded.ptr)
1056     return MEMCACHED_FAILURE;
1057 
1058   if (! zero)
1059     memcpy(state->u.embedded.ptr, beg, len);
1060   else
1061     memcpy(state->u.embedded.ptr, "0E0", 3);
1062 
1063   state->object->store(state->object->arg, state->u.embedded.opaque,
1064                        state->index, NULL);
1065 
1066   /* Value may be space padded.  */
1067   return swallow_eol(state, 1, 1);
1068 }
1069 
1070 
1071 static
1072 int
parse_ok_reply(struct command_state * state)1073 parse_ok_reply(struct command_state *state)
1074 {
1075   switch (state->match)
1076     {
1077     case MATCH_OK:
1078       store_result(state, 1);
1079       return swallow_eol(state, 0, 1);
1080 
1081     default:
1082       return MEMCACHED_UNKNOWN;
1083     }
1084 }
1085 
1086 
1087 static
1088 int
parse_version_reply(struct command_state * state)1089 parse_version_reply(struct command_state *state)
1090 {
1091   const char *beg;
1092   size_t len;
1093   int res;
1094 
1095   state->index = get_index(state);
1096   next_index(state);
1097 
1098   switch (state->match)
1099     {
1100     default:
1101       return MEMCACHED_UNKNOWN;
1102 
1103     case MATCH_VERSION:
1104       break;
1105     }
1106 
1107   while (*state->pos == ' ')
1108     ++state->pos;
1109 
1110   beg = state->pos;
1111 
1112   res = swallow_eol(state, 1, 1);
1113   if (res != MEMCACHED_SUCCESS)
1114     return res;
1115 
1116   len = state->pos - sizeof(eol) - beg;
1117 
1118   state->u.embedded.ptr = state->object->alloc(len, &state->u.embedded.opaque);
1119   if (! state->u.embedded.ptr)
1120     return MEMCACHED_FAILURE;
1121 
1122   memcpy(state->u.embedded.ptr, beg, len);
1123 
1124   state->object->store(state->object->arg, state->u.embedded.opaque,
1125                        state->index, NULL);
1126 
1127   return MEMCACHED_SUCCESS;
1128 }
1129 
1130 
1131 static
1132 int
parse_nowait_reply(struct command_state * state)1133 parse_nowait_reply(struct command_state *state)
1134 {
1135   int res;
1136 
1137   /*
1138     Cast to enum parse_keyword_e to get compiler warning when some
1139     match result is not handled.
1140   */
1141   switch ((enum parse_keyword_e) state->match)
1142     {
1143     case MATCH_DELETED:
1144     case MATCH_OK:
1145     case MATCH_STORED:
1146     case MATCH_EXISTS:
1147     case MATCH_NOT_FOUND:
1148     case MATCH_NOT_STORED:
1149     case MATCH_TOUCHED:
1150       return swallow_eol(state, 0, 1);
1151 
1152     case MATCH_0: case MATCH_1: case MATCH_2: case MATCH_3: case MATCH_4:
1153     case MATCH_5: case MATCH_6: case MATCH_7: case MATCH_8: case MATCH_9:
1154     case MATCH_VERSION: /* see client_noreply_push().  */
1155       return swallow_eol(state, 1, 1);
1156 
1157     case MATCH_ERROR:
1158       res = swallow_eol(state, 0, 1);
1159       return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1160 
1161     case MATCH_CLIENT_ERROR:
1162     case MATCH_SERVER_ERROR:
1163       res = swallow_eol(state, 1, 1);
1164       return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1165 
1166     case NO_MATCH:
1167     case MATCH_VALUE:
1168     case MATCH_END:
1169     case MATCH_STAT:
1170       return MEMCACHED_UNKNOWN;
1171     }
1172 
1173   /* Never reach here.  */
1174   return MEMCACHED_UNKNOWN;
1175 }
1176 
1177 
1178 static
1179 void
client_mark_failed(struct client * c,struct server * s)1180 client_mark_failed(struct client *c, struct server *s)
1181 {
1182   if (s->cmd_state.fd != -1)
1183     {
1184       close(s->cmd_state.fd);
1185       s->cmd_state.fd = -1;
1186       s->cmd_state.nowait_count = 0;
1187       s->cmd_state.pos = s->cmd_state.end = s->cmd_state.eol =
1188         s->cmd_state.buf;
1189     }
1190 
1191   if (c->max_failures > 0)
1192     {
1193       time_t now = time(NULL);
1194       if (s->failure_expires < now)
1195         s->failure_count = 0;
1196       ++s->failure_count;
1197       /*
1198         Set timeout on first failure, and on max_failures.  The idea
1199         is that if max_failures had happened during failure_timeout,
1200         we do not retry in another failure_timeout seconds.  This is
1201         not entirely true: we remember the time of the first failure,
1202         but for exact accounting we would have to keep time of each
1203         failure.  However such exact measurement is not necessary.
1204       */
1205       if (s->failure_count == 1 || s->failure_count == c->max_failures)
1206         s->failure_expires = now + c->failure_timeout;
1207     }
1208 }
1209 
1210 
1211 static
1212 int
send_request(struct command_state * state,struct server * s)1213 send_request(struct command_state *state, struct server *s)
1214 {
1215   while (state->iov_count > 0)
1216     {
1217       int count;
1218       ssize_t res;
1219       size_t len;
1220 
1221       count = (state->iov_count < state->client->iov_max
1222                ? state->iov_count : state->client->iov_max);
1223 
1224       state->iov->iov_base =
1225         (char *) state->iov->iov_base + state->write_offset;
1226       state->iov->iov_len -= state->write_offset;
1227       len = state->iov->iov_len;
1228 
1229       res = writev_restart(state->fd, state->iov, count);
1230 
1231       state->iov->iov_base =
1232         (char *) state->iov->iov_base - state->write_offset;
1233       state->iov->iov_len += state->write_offset;
1234 
1235       if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
1236         return MEMCACHED_EAGAIN;
1237       if (res <= 0)
1238         {
1239           deactivate(state);
1240           client_mark_failed(state->client, s);
1241 
1242           return MEMCACHED_CLOSED;
1243         }
1244 
1245       while ((size_t) res >= len)
1246         {
1247           res -= len;
1248           ++state->iov;
1249           if (--state->iov_count == 0)
1250             break;
1251           len = state->iov->iov_len;
1252           state->write_offset = 0;
1253         }
1254       state->write_offset += res;
1255     }
1256 
1257   if (state->reply_count == 0)
1258     deactivate(state);
1259 
1260   return MEMCACHED_SUCCESS;
1261 }
1262 
1263 
1264 static
1265 int
receive_reply(struct command_state * state)1266 receive_reply(struct command_state *state)
1267 {
1268   while (state->eol != state->end && *state->eol != eol[sizeof(eol) - 1])
1269     ++state->eol;
1270 
1271   /*
1272     When buffer is empty, move to the beginning of it for better CPU
1273     cache utilization.
1274   */
1275   if (state->pos == state->end)
1276     state->pos = state->end = state->eol = state->buf;
1277 
1278   while (state->eol == state->end)
1279     {
1280       size_t size;
1281       ssize_t res;
1282 
1283       size = REPLY_BUF_SIZE - (state->end - state->buf);
1284       if (size == 0)
1285         {
1286           if (state->pos != state->buf)
1287             {
1288               size_t len = state->end - state->pos;
1289               state->pos = memmove(state->buf, state->pos, len);
1290               state->end -= REPLY_BUF_SIZE - len;
1291               state->eol -= REPLY_BUF_SIZE - len;
1292               size = REPLY_BUF_SIZE - len;
1293             }
1294           else
1295             {
1296               return MEMCACHED_UNKNOWN;
1297             }
1298         }
1299 
1300       res = read_restart(state->fd, state->end, size);
1301       if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
1302         return MEMCACHED_EAGAIN;
1303       if (res <= 0)
1304         return MEMCACHED_CLOSED;
1305 
1306       state->end += res;
1307 
1308       while (state->eol != state->end && *state->eol != eol[sizeof(eol) - 1])
1309         ++state->eol;
1310     }
1311 
1312   if ((size_t) (state->eol - state->buf) < sizeof(eol) - 1
1313       || memcmp(state->eol - (sizeof(eol) - 1), eol, sizeof(eol) - 1) != 0)
1314     return MEMCACHED_UNKNOWN;
1315 
1316   ++state->eol;
1317 
1318   return MEMCACHED_SUCCESS;
1319 }
1320 
1321 
1322 static
1323 int
parse_reply(struct command_state * state)1324 parse_reply(struct command_state *state)
1325 {
1326   int res, skip;
1327 
1328   switch (state->match)
1329     {
1330     case MATCH_ERROR:
1331     case MATCH_CLIENT_ERROR:
1332     case MATCH_SERVER_ERROR:
1333       skip = (state->match != MATCH_ERROR);
1334       res = swallow_eol(state, skip, 1);
1335 
1336       return (res == MEMCACHED_SUCCESS ? MEMCACHED_ERROR : res);
1337 
1338     default:
1339       if (state->nowait_count)
1340         return parse_nowait_reply(state);
1341       else
1342         return state->parse_reply(state);
1343 
1344     case NO_MATCH:
1345       return MEMCACHED_UNKNOWN;
1346     }
1347 }
1348 
1349 
1350 static
1351 int
process_reply(struct command_state * state,struct server * s)1352 process_reply(struct command_state *state, struct server *s)
1353 {
1354   int res = 0;
1355 
1356   while (1)
1357     {
1358       switch (state->phase)
1359         {
1360         case PHASE_RECEIVE:
1361           res = receive_reply(state);
1362           if (res != MEMCACHED_SUCCESS)
1363             break;
1364 
1365           state->match = parse_keyword(&state->pos);
1366 
1367           state->phase = PHASE_PARSE;
1368 
1369           /* Fall into below.  */
1370 
1371         case PHASE_PARSE:
1372           res = parse_reply(state);
1373           if (res != MEMCACHED_SUCCESS)
1374             break;
1375 
1376           if (state->phase != PHASE_DONE)
1377             continue;
1378 
1379           /* Fall into below.  */
1380 
1381         case PHASE_DONE:
1382           res = MEMCACHED_SUCCESS;
1383 
1384           break;
1385 
1386         case PHASE_VALUE:
1387           res = read_value(state);
1388           if (res != MEMCACHED_SUCCESS)
1389             break;
1390 
1391           state->phase = PHASE_RECEIVE;
1392           continue;
1393         }
1394 
1395       switch (res)
1396         {
1397         case MEMCACHED_ERROR:
1398           if (! (state->client->close_on_error || state->noreply))
1399             break;
1400 
1401           /* else fall into below.  */
1402 
1403         case MEMCACHED_UNKNOWN:
1404         case MEMCACHED_CLOSED:
1405           deactivate(state);
1406           client_mark_failed(state->client, s);
1407 
1408           /* Fall into below.  */
1409 
1410         case MEMCACHED_EAGAIN:
1411           return res;
1412         }
1413 
1414       if (state->nowait_count > 0)
1415         {
1416           --state->nowait_count;
1417         }
1418       else if (--state->reply_count == 0)
1419         {
1420           if (state->iov_count == 0)
1421             deactivate(state);
1422 
1423           return res;
1424         }
1425 
1426       state->phase = PHASE_RECEIVE;
1427     }
1428 }
1429 
1430 
1431 static inline
1432 void
state_prepare(struct command_state * state,int key_index)1433 state_prepare(struct command_state *state, int key_index)
1434 {
1435   state->last_cmd_noreply = state->prepared_last_cmd_noreply;
1436   state->nowait_count += state->prepared_nowait_count;
1437 
1438   state->key = array_elem(state->iov_buf, struct iovec, key_index);
1439   state->iov = array_beg(state->iov_buf, struct iovec);
1440   state->iov_count = array_size(state->iov_buf);
1441 
1442   if (state->str_step > 0)
1443     {
1444       struct iovec *iov = state->iov;
1445       char *buf = array_beg(state->client->str_buf, char);
1446       int count = state->iov_count, step = state->str_step;
1447 
1448       if (state->key_count > 0)
1449         {
1450           iov += 3;
1451           count -= 3;
1452         }
1453 
1454       while (count > 0)
1455         {
1456           iov->iov_base = (void *) (buf + (ptrdiff_t) (iov->iov_base));
1457           iov += step;
1458           count -= step;
1459         }
1460     }
1461 }
1462 
1463 
1464 int
client_execute(struct client * c,int key_index)1465 client_execute(struct client *c, int key_index)
1466 {
1467   int first_iter = 1;
1468 
1469 #if ! defined(MSG_NOSIGNAL) && ! defined(WIN32)
1470   struct sigaction orig, ignore;
1471   int res;
1472 
1473   ignore.sa_handler = SIG_IGN;
1474   sigemptyset(&ignore.sa_mask);
1475   ignore.sa_flags = 0;
1476   res = sigaction(SIGPIPE, &ignore, &orig);
1477   if (res == -1)
1478     return MEMCACHED_FAILURE;
1479 #endif /* ! defined(MSG_NOSIGNAL) && ! defined(WIN32) */
1480 
1481   while (1)
1482     {
1483       struct server *s;
1484       struct pollfd *pollfd_beg, *pollfd;
1485       int res;
1486 
1487       pollfd_beg = array_beg(c->pollfds, struct pollfd);
1488       pollfd = pollfd_beg;
1489 
1490       for (array_each(c->servers, struct server, s))
1491         {
1492           int may_write, may_read;
1493           struct command_state *state = &s->cmd_state;
1494 
1495           if (! is_active(state))
1496             continue;
1497 
1498           if (first_iter)
1499             {
1500               state_prepare(state, key_index);
1501 
1502               may_write = 1;
1503               may_read = (state->reply_count > 0
1504                           || state->nowait_count > 0);
1505             }
1506           else
1507             {
1508               const short revents = state->pollfd->revents;
1509 
1510               may_write = revents & (POLLOUT | POLLERR | POLLHUP);
1511               may_read = revents & (POLLIN | POLLERR | POLLHUP);
1512             }
1513 
1514           if (may_read || may_write)
1515             {
1516               if (may_write)
1517                 {
1518                   int res;
1519 
1520                   res = send_request(state, s);
1521                   if (res == MEMCACHED_CLOSED)
1522                     may_read = 0;
1523                 }
1524 
1525               if (may_read)
1526                 process_reply(state, s);
1527 
1528               if (! is_active(state))
1529                 continue;
1530             }
1531 
1532           pollfd->events = 0;
1533 
1534           if (state->iov_count > 0)
1535             pollfd->events |= POLLOUT;
1536           if (state->reply_count > 0 || state->nowait_count > 0)
1537             pollfd->events |= POLLIN;
1538 
1539           if (pollfd->events != 0)
1540             {
1541               pollfd->fd = state->fd;
1542               state->pollfd = pollfd;
1543               ++pollfd;
1544             }
1545         }
1546 
1547       if (pollfd == pollfd_beg)
1548         break;
1549 
1550       do
1551         res = poll(pollfd_beg, pollfd - pollfd_beg, c->io_timeout);
1552       while (res == -1 && errno == EINTR);
1553 
1554       /*
1555         On error or timeout close all active connections.  Otherwise
1556         we might receive garbage on them later.
1557       */
1558       if (res <= 0)
1559         {
1560           for (array_each(c->servers, struct server, s))
1561             {
1562               struct command_state *state = &s->cmd_state;
1563 
1564               if (is_active(state))
1565                 {
1566                   /*
1567                     Ugly fix for possible memory leak.  FIXME:
1568                     requires redesign.
1569                   */
1570                   if (state->phase == PHASE_VALUE)
1571                     state->object->free(state->u.value.opaque);
1572 
1573                   client_mark_failed(c, s);
1574                 }
1575             }
1576 
1577           break;
1578         }
1579 
1580       first_iter = 0;
1581     }
1582 
1583 #if ! defined(MSG_NOSIGNAL) && ! defined(WIN32)
1584   /*
1585     Ignore return value of sigaction(), there's nothing we can do in
1586     the case of error.
1587   */
1588   sigaction(SIGPIPE, &orig, NULL);
1589 #endif /* ! defined(MSG_NOSIGNAL) && ! defined(WIN32) */
1590 
1591   return MEMCACHED_SUCCESS;
1592 }
1593 
1594 
1595 /* Is the following required for any platform?  */
1596 #if (! defined(IPPROTO_TCP) && defined(SOL_TCP))
1597 #define IPPROTO_TCP  SOL_TCP
1598 #endif
1599 
1600 
1601 static inline
1602 void
tcp_optimize_latency(struct command_state * state)1603 tcp_optimize_latency(struct command_state *state)
1604 {
1605 #ifdef TCP_NODELAY
1606   if (state->socket_mode == TCP_THROUGHPUT)
1607     {
1608       static const int enable = 1;
1609       setsockopt(state->fd, IPPROTO_TCP, TCP_NODELAY,
1610                  (void *) &enable, sizeof(enable));
1611       state->socket_mode = TCP_LATENCY;
1612     }
1613 #endif /* TCP_NODELAY */
1614 }
1615 
1616 
1617 static inline
1618 void
tcp_optimize_throughput(struct command_state * state)1619 tcp_optimize_throughput(struct command_state *state)
1620 {
1621 #ifdef TCP_NODELAY
1622   if (state->socket_mode == TCP_LATENCY)
1623     {
1624       static const int disable = 0;
1625       setsockopt(state->fd, IPPROTO_TCP, TCP_NODELAY,
1626                  (void *) &disable, sizeof(disable));
1627       state->socket_mode = TCP_THROUGHPUT;
1628     }
1629 #endif /* TCP_NODELAY */
1630 }
1631 
1632 
1633 static
1634 int
get_server_fd(struct client * c,struct server * s)1635 get_server_fd(struct client *c, struct server *s)
1636 {
1637   struct command_state *state;
1638 
1639   /*
1640     Do not try to try reconnect if had max_failures and
1641     failure_expires time is not reached yet.
1642   */
1643   if (c->max_failures > 0 && s->failure_count >= c->max_failures)
1644     {
1645       if (time(NULL) <= s->failure_expires)
1646         return -1;
1647       else
1648         s->failure_count = 0;
1649     }
1650 
1651   state = &s->cmd_state;
1652   if (state->fd == -1)
1653     {
1654       if (s->port)
1655         {
1656           state->fd = client_connect_inet(s->host, s->port,
1657                                           c->connect_timeout);
1658           /* This is to trigger actual reset.  */
1659           state->socket_mode = TCP_THROUGHPUT;
1660           if (state->fd != -1)
1661             tcp_optimize_latency(state);
1662         }
1663       else
1664         {
1665           state->fd = client_connect_unix(s->host, s->host_len);
1666           state->socket_mode = NOT_TCP;
1667         }
1668     }
1669 
1670   if (state->fd == -1)
1671     client_mark_failed(c, s);
1672 
1673   return state->fd;
1674 }
1675 
1676 
1677 static inline
1678 void
iov_push(struct command_state * state,const void * buf,size_t buf_size)1679 iov_push(struct command_state *state, const void *buf, size_t buf_size)
1680 {
1681   struct iovec *iov = array_end(state->iov_buf, struct iovec);
1682   iov->iov_base = (void *) buf;
1683   iov->iov_len = buf_size;
1684   array_push(state->iov_buf);
1685 }
1686 
1687 
1688 static
1689 int
push_index(struct command_state * state,int index)1690 push_index(struct command_state *state, int index)
1691 {
1692   struct index_node *node;
1693   struct client *c;
1694 
1695   c = state->client;
1696   if (array_extend(c->index_list, struct index_node,
1697                    1, ARRAY_EXTEND_TWICE) == -1)
1698     return MEMCACHED_FAILURE;
1699 
1700   if (state->index_tail != -1)
1701     array_elem(c->index_list, struct index_node, state->index_tail)->next =
1702       array_size(c->index_list);
1703   else
1704     state->index_head = array_size(c->index_list);
1705 
1706   state->index_tail = array_size(c->index_list);
1707 
1708   node = array_elem(c->index_list, struct index_node, state->index_tail);
1709   node->index = index;
1710   node->next = -1;
1711 
1712   array_push(c->index_list);
1713 
1714   return MEMCACHED_SUCCESS;
1715 }
1716 
1717 
1718 static
1719 struct command_state *
init_state(struct command_state * state,int index,size_t request_size,size_t str_size,parse_reply_func parse_reply)1720 init_state(struct command_state *state, int index, size_t request_size,
1721            size_t str_size, parse_reply_func parse_reply)
1722 {
1723   if (! is_active(state))
1724     {
1725       if (state->client->noreply)
1726         {
1727           if (state->client->nowait || state->noreply)
1728             {
1729               parse_reply = NULL;
1730               tcp_optimize_throughput(state);
1731             }
1732 
1733           state->prepared_last_cmd_noreply = state->noreply;
1734         }
1735       else
1736         {
1737           state->prepared_last_cmd_noreply = 0;
1738           tcp_optimize_latency(state);
1739         }
1740 
1741       state->object = state->client->object;
1742       command_state_reset(state, (str_size > 0 ? request_size : 0),
1743                           parse_reply);
1744     }
1745 
1746   if (array_extend(state->iov_buf, struct iovec,
1747                    request_size, ARRAY_EXTEND_EXACT) == -1)
1748     {
1749       deactivate(state);
1750       return NULL;
1751     }
1752 
1753   if (str_size > 0
1754       && array_extend(state->client->str_buf, char,
1755                       str_size, ARRAY_EXTEND_TWICE) == -1)
1756     {
1757       deactivate(state);
1758       return NULL;
1759     }
1760 
1761   if (push_index(state, index) != MEMCACHED_SUCCESS)
1762     {
1763       deactivate(state);
1764       return NULL;
1765     }
1766 
1767   if (state->parse_reply)
1768     ++state->reply_count;
1769   else if (! state->prepared_last_cmd_noreply)
1770     ++state->prepared_nowait_count;
1771 
1772   return state;
1773 }
1774 
1775 
1776 static
1777 struct command_state *
get_state(struct client * c,int index,const char * key,size_t key_len,size_t request_size,size_t str_size,parse_reply_func parse_reply)1778 get_state(struct client *c, int index, const char *key, size_t key_len,
1779           size_t request_size, size_t str_size,
1780           parse_reply_func parse_reply)
1781 {
1782   struct server *s;
1783   int server_index, fd;
1784 
1785   server_index = dispatch_key(&c->dispatch, key, key_len);
1786   if (server_index == -1)
1787     return NULL;
1788 
1789   s = array_elem(c->servers, struct server, server_index);
1790 
1791   fd = get_server_fd(c, s);
1792   if (fd == -1)
1793     return NULL;
1794 
1795   return init_state(&s->cmd_state, index, request_size, str_size,
1796                     parse_reply);
1797 }
1798 
1799 
1800 static inline
1801 const char *
get_noreply(struct command_state * state)1802 get_noreply(struct command_state *state)
1803 {
1804   if (state->noreply && state->client->noreply)
1805     return " " NOREPLY;
1806   else
1807     return "";
1808 }
1809 
1810 
1811 inline
1812 void
client_reset(struct client * c,struct result_object * o,int noreply)1813 client_reset(struct client *c, struct result_object *o, int noreply)
1814 {
1815   array_clear(c->index_list);
1816   array_clear(c->str_buf);
1817 
1818   ++c->generation;
1819   c->object = o;
1820   c->noreply = noreply;
1821 }
1822 
1823 
1824 #define STR_WITH_LEN(str) (str), (sizeof(str) - 1)
1825 
1826 
1827 int
client_prepare_set(struct client * c,enum set_cmd_e cmd,int key_index,const char * key,size_t key_len,flags_type flags,exptime_type exptime,const void * value,value_size_type value_size)1828 client_prepare_set(struct client *c, enum set_cmd_e cmd, int key_index,
1829                    const char *key, size_t key_len,
1830                    flags_type flags, exptime_type exptime,
1831                    const void *value, value_size_type value_size)
1832 {
1833   static const size_t request_size = 6;
1834   static const size_t str_size =
1835     sizeof(" " FLAGS_STUB " " EXPTIME_STUB " " VALUE_SIZE_STUB
1836            " " NOREPLY "\r\n");
1837 
1838   struct command_state *state;
1839 
1840   state = get_state(c, key_index, key, key_len, request_size, str_size,
1841                     parse_set_reply);
1842   if (! state)
1843     return MEMCACHED_FAILURE;
1844 
1845   ++state->key_count;
1846 
1847   switch (cmd)
1848     {
1849     case CMD_SET:
1850       iov_push(state, STR_WITH_LEN("set"));
1851       break;
1852 
1853     case CMD_ADD:
1854       iov_push(state, STR_WITH_LEN("add"));
1855       break;
1856 
1857     case CMD_REPLACE:
1858       iov_push(state, STR_WITH_LEN("replace"));
1859       break;
1860 
1861     case CMD_APPEND:
1862       iov_push(state, STR_WITH_LEN("append"));
1863       break;
1864 
1865     case CMD_PREPEND:
1866       iov_push(state, STR_WITH_LEN("prepend"));
1867       break;
1868 
1869     case CMD_CAS:
1870       /* This can't happen.  */
1871       return MEMCACHED_FAILURE;
1872     }
1873   iov_push(state, c->prefix, c->prefix_len);
1874   iov_push(state, key, key_len);
1875 
1876   {
1877     char *buf = array_end(c->str_buf, char);
1878     size_t str_size =
1879       sprintf(buf, " " FMT_FLAGS " " FMT_EXPTIME " " FMT_VALUE_SIZE "%s\r\n",
1880               flags, exptime, value_size, get_noreply(state));
1881     iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
1882     array_append(c->str_buf, str_size);
1883   }
1884 
1885   iov_push(state, value, value_size);
1886   iov_push(state, STR_WITH_LEN("\r\n"));
1887 
1888   return MEMCACHED_SUCCESS;
1889 }
1890 
1891 
1892 int
client_prepare_cas(struct client * c,int key_index,const char * key,size_t key_len,cas_type cas,flags_type flags,exptime_type exptime,const void * value,value_size_type value_size)1893 client_prepare_cas(struct client *c, int key_index,
1894                    const char *key, size_t key_len,
1895                    cas_type cas, flags_type flags, exptime_type exptime,
1896                    const void *value, value_size_type value_size)
1897 {
1898   static const size_t request_size = 6;
1899   static const size_t str_size =
1900     sizeof(" " FLAGS_STUB " " EXPTIME_STUB " " VALUE_SIZE_STUB
1901            " " CAS_STUB " " NOREPLY "\r\n");
1902 
1903   struct command_state *state;
1904 
1905   state = get_state(c, key_index, key, key_len, request_size, str_size,
1906                     parse_set_reply);
1907   if (! state)
1908     return MEMCACHED_FAILURE;
1909 
1910   ++state->key_count;
1911 
1912   iov_push(state, STR_WITH_LEN("cas"));
1913   iov_push(state, c->prefix, c->prefix_len);
1914   iov_push(state, key, key_len);
1915 
1916   {
1917     char *buf = array_end(c->str_buf, char);
1918     size_t str_size =
1919       sprintf(buf, " " FMT_FLAGS " " FMT_EXPTIME " " FMT_VALUE_SIZE
1920               " " FMT_CAS "%s\r\n", flags, exptime, value_size, cas,
1921               get_noreply(state));
1922     iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
1923     array_append(c->str_buf, str_size);
1924   }
1925 
1926   iov_push(state, value, value_size);
1927   iov_push(state, STR_WITH_LEN("\r\n"));
1928 
1929   return MEMCACHED_SUCCESS;
1930 }
1931 
1932 
1933 int
client_prepare_get(struct client * c,enum get_cmd_e cmd,int key_index,const char * key,size_t key_len)1934 client_prepare_get(struct client *c, enum get_cmd_e cmd, int key_index,
1935                    const char *key, size_t key_len)
1936 {
1937   static const size_t request_size = 4;
1938 
1939   struct command_state *state;
1940 
1941   state = get_state(c, key_index, key, key_len, request_size, 0,
1942                     parse_get_reply);
1943   if (! state)
1944     return MEMCACHED_FAILURE;
1945 
1946   ++state->key_count;
1947 
1948   if (! array_empty(state->iov_buf))
1949     {
1950       /* Pop off trailing \r\n because we are about to add another key.  */
1951       array_pop(state->iov_buf);
1952 
1953       /* get can't be in noreply mode, so reply_count is positive.  */
1954       --state->reply_count;
1955     }
1956   else
1957     {
1958       switch (cmd)
1959         {
1960         case CMD_GET:
1961           state->u.value.meta.use_cas = 0;
1962           iov_push(state, STR_WITH_LEN("get"));
1963           break;
1964 
1965         case CMD_GETS:
1966           state->u.value.meta.use_cas = 1;
1967           iov_push(state, STR_WITH_LEN("gets"));
1968           break;
1969         }
1970     }
1971 
1972   iov_push(state, c->prefix, c->prefix_len);
1973   iov_push(state, key, key_len);
1974   iov_push(state, STR_WITH_LEN("\r\n"));
1975 
1976   return MEMCACHED_SUCCESS;
1977 }
1978 
1979 
1980 int
client_prepare_gat(struct client * c,enum gat_cmd_e cmd,int key_index,const char * key,size_t key_len,const char * exptime,size_t exptime_len)1981 client_prepare_gat(struct client *c, enum gat_cmd_e cmd,
1982                    int key_index, const char *key, size_t key_len, const char *exptime, size_t exptime_len)
1983 {
1984   static const size_t request_size = 6;
1985 
1986   struct command_state *state;
1987 
1988   state = get_state(c, key_index, key, key_len, request_size, 0,
1989                     parse_get_reply);
1990   if (! state)
1991     return MEMCACHED_FAILURE;
1992 
1993   ++state->key_count;
1994 
1995   if (! array_empty(state->iov_buf))
1996     {
1997       /* Pop off trailing \r\n because we are about to add another key.  */
1998       array_pop(state->iov_buf);
1999 
2000       /* get can't be in noreply mode, so reply_count is positive.  */
2001       --state->reply_count;
2002     }
2003   else
2004     {
2005       switch (cmd)
2006         {
2007         case CMD_GAT:
2008           state->u.value.meta.use_cas = 0;
2009           iov_push(state, STR_WITH_LEN("gat"));
2010           break;
2011 
2012         case CMD_GATS:
2013           state->u.value.meta.use_cas = 1;
2014           iov_push(state, STR_WITH_LEN("gats"));
2015           break;
2016         }
2017 
2018       iov_push(state, STR_WITH_LEN(" "));
2019       iov_push(state, exptime, exptime_len);
2020     }
2021 
2022   iov_push(state, c->prefix, c->prefix_len);
2023   iov_push(state, key, key_len);
2024   iov_push(state, STR_WITH_LEN("\r\n"));
2025 
2026   return MEMCACHED_SUCCESS;
2027 }
2028 
2029 
2030 int
client_prepare_incr(struct client * c,enum arith_cmd_e cmd,int key_index,const char * key,size_t key_len,arith_type arg)2031 client_prepare_incr(struct client *c, enum arith_cmd_e cmd, int key_index,
2032                     const char *key, size_t key_len, arith_type arg)
2033 {
2034   static const size_t request_size = 4;
2035   static const size_t str_size = sizeof(" " ARITH_STUB " " NOREPLY "\r\n");
2036 
2037   struct command_state *state;
2038 
2039   state = get_state(c, key_index, key, key_len, request_size, str_size,
2040                     parse_arith_reply);
2041   if (! state)
2042     return MEMCACHED_FAILURE;
2043 
2044   ++state->key_count;
2045 
2046   switch (cmd)
2047     {
2048     case CMD_INCR:
2049       iov_push(state, STR_WITH_LEN("incr"));
2050       break;
2051 
2052     case CMD_DECR:
2053       iov_push(state, STR_WITH_LEN("decr"));
2054       break;
2055     }
2056   iov_push(state, c->prefix, c->prefix_len);
2057   iov_push(state, key, key_len);
2058 
2059   {
2060     char *buf = array_end(c->str_buf, char);
2061     size_t str_size =
2062       sprintf(buf, " " FMT_ARITH "%s\r\n", arg, get_noreply(state));
2063     iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2064     array_append(c->str_buf, str_size);
2065   }
2066 
2067   return MEMCACHED_SUCCESS;
2068 }
2069 
2070 
2071 int
client_prepare_delete(struct client * c,int key_index,const char * key,size_t key_len)2072 client_prepare_delete(struct client *c, int key_index,
2073                       const char *key, size_t key_len)
2074 {
2075   static const size_t request_size = 4;
2076   static const size_t str_size = sizeof(" " NOREPLY "\r\n");
2077 
2078   struct command_state *state;
2079 
2080   state = get_state(c, key_index, key, key_len, request_size, str_size,
2081                     parse_delete_reply);
2082   if (! state)
2083     return MEMCACHED_FAILURE;
2084 
2085   ++state->key_count;
2086 
2087   iov_push(state, STR_WITH_LEN("delete"));
2088   iov_push(state, c->prefix, c->prefix_len);
2089   iov_push(state, key, key_len);
2090 
2091   {
2092     char *buf = array_end(c->str_buf, char);
2093     size_t str_size = sprintf(buf, "%s\r\n", get_noreply(state));
2094     iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2095     array_append(c->str_buf, str_size);
2096   }
2097 
2098   return MEMCACHED_SUCCESS;
2099 }
2100 
2101 
2102 int
client_prepare_touch(struct client * c,int key_index,const char * key,size_t key_len,exptime_type exptime)2103 client_prepare_touch(struct client *c, int key_index,
2104                       const char *key, size_t key_len,
2105                       exptime_type exptime)
2106 {
2107   static const size_t request_size = 4;
2108   static const size_t str_size = sizeof(" " EXPTIME_STUB " " NOREPLY "\r\n");
2109 
2110   struct command_state *state;
2111 
2112   state = get_state(c, key_index, key, key_len, request_size, str_size,
2113                     parse_touch_reply);
2114   if (! state)
2115     return MEMCACHED_FAILURE;
2116 
2117   ++state->key_count;
2118 
2119   iov_push(state, STR_WITH_LEN("touch"));
2120   iov_push(state, c->prefix, c->prefix_len);
2121   iov_push(state, key, key_len);
2122 
2123   {
2124     char *buf = array_end(c->str_buf, char);
2125     size_t str_size = sprintf(buf, " " FMT_EXPTIME "%s\r\n", exptime, get_noreply(state));
2126     iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2127     array_append(c->str_buf, str_size);
2128   }
2129 
2130   return MEMCACHED_SUCCESS;
2131 }
2132 
2133 
2134 int
client_flush_all(struct client * c,delay_type delay,struct result_object * o,int noreply)2135 client_flush_all(struct client *c, delay_type delay,
2136                  struct result_object *o, int noreply)
2137 {
2138   static const size_t request_size = 1;
2139   static const size_t str_size =
2140     sizeof("flush_all " DELAY_STUB " " NOREPLY "\r\n");
2141 
2142   struct server *s;
2143   double ddelay = delay, delay_step = 0.0;
2144   int i;
2145 
2146   client_reset(c, o, noreply);
2147 
2148   if (array_size(c->servers) > 1)
2149     delay_step = ddelay / (array_size(c->servers) - 1);
2150   ddelay += delay_step;
2151 
2152   for (i = 0, array_each(c->servers, struct server, s), ++i)
2153     {
2154       struct command_state *state;
2155       int fd;
2156 
2157       ddelay -= delay_step;
2158 
2159       fd = get_server_fd(c, s);
2160       if (fd == -1)
2161         continue;
2162 
2163       state = init_state(&s->cmd_state, i, request_size, str_size,
2164                          parse_ok_reply);
2165       if (! state)
2166         continue;
2167 
2168       {
2169         char *buf = array_end(c->str_buf, char);
2170         size_t str_size =
2171           sprintf(buf, "flush_all " FMT_DELAY "%s\r\n",
2172                   (delay_type) (ddelay + 0.5), get_noreply(state));
2173         iov_push(state, (void *) (ptrdiff_t) array_size(c->str_buf), str_size);
2174         array_append(c->str_buf, str_size);
2175       }
2176     }
2177 
2178   return client_execute(c, 2);
2179 }
2180 
2181 
2182 int
client_nowait_push(struct client * c)2183 client_nowait_push(struct client *c)
2184 {
2185   struct server *s;
2186 
2187   if (! c->nowait)
2188     return MEMCACHED_SUCCESS;
2189 
2190   client_reset(c, NULL, 0);
2191 
2192   for (array_each(c->servers, struct server, s))
2193     {
2194       struct command_state *state;
2195       int fd;
2196 
2197       state = &s->cmd_state;
2198       if (state->nowait_count == 0)
2199         continue;
2200 
2201       fd = get_server_fd(c, s);
2202       if (fd == -1)
2203         continue;
2204 
2205       /*
2206         In order to wait the final pending reply we pretend that one
2207         command was never a nowait command, and set parse function to
2208         parse_nowait_reply.
2209       */
2210       --state->nowait_count;
2211       command_state_reset(state, 0, parse_nowait_reply);
2212       tcp_optimize_latency(state);
2213       ++state->reply_count;
2214     }
2215 
2216   return client_execute(c, 2);
2217 }
2218 
2219 
2220 int
client_server_versions(struct client * c,struct result_object * o)2221 client_server_versions(struct client *c, struct result_object *o)
2222 {
2223   static const size_t request_size = 1;
2224 
2225   struct server *s;
2226   int i;
2227 
2228   client_reset(c, o, 0);
2229 
2230   for (i = 0, array_each(c->servers, struct server, s), ++i)
2231     {
2232       struct command_state *state;
2233       int fd;
2234 
2235       fd = get_server_fd(c, s);
2236       if (fd == -1)
2237         continue;
2238 
2239       state = init_state(&s->cmd_state, i, request_size, 0,
2240                          parse_version_reply);
2241       if (! state)
2242         continue;
2243 
2244       iov_push(state, STR_WITH_LEN("version\r\n"));
2245     }
2246 
2247   return client_execute(c, 2);
2248 }
2249 
2250 
2251 /*
2252   When noreply mode is enabled the client may send the last noreply
2253   request and close the connection.  The server will see that the
2254   connection is closed, and will discard all previously read data
2255   without processing it.  To avoid this, we send "version" command and
2256   wait for the reply (discarding it).
2257 */
2258 static
2259 int
client_noreply_push(struct client * c)2260 client_noreply_push(struct client *c)
2261 {
2262   static const size_t request_size = 1;
2263 
2264   struct server *s;
2265   int i;
2266 
2267   client_reset(c, NULL, 0);
2268 
2269   for (i = 0, array_each(c->servers, struct server, s), ++i)
2270     {
2271       struct command_state *state = &s->cmd_state;
2272       int fd;
2273 
2274       if (! state->last_cmd_noreply)
2275         continue;
2276 
2277       fd = get_server_fd(c, s);
2278       if (fd == -1)
2279         continue;
2280 
2281       state = init_state(state, i, request_size, 0, parse_nowait_reply);
2282       if (! state)
2283         continue;
2284 
2285       iov_push(state, STR_WITH_LEN("version\r\n"));
2286     }
2287 
2288   return client_execute(c, 2);
2289 }
2290