1 /*
2  * File:   ms_conn.c
3  * Author: Mingqiang Zhuang
4  *
5  * Created on February 10, 2009
6  *
7  * (c) Copyright 2009, Schooner Information Technology, Inc.
8  * http://www.schoonerinfotech.com/
9  *
10  */
11 
12 #include "mem_config.h"
13 
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23 
24 #if defined(HAVE_SYS_TIME_H)
25 # include <sys/time.h>
26 #endif
27 
28 #if defined(HAVE_TIME_H)
29 # include <time.h>
30 #endif
31 
32 #include "ms_setting.h"
33 #include "ms_thread.h"
34 #include "ms_atomic.h"
35 
36 #ifdef linux
37 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
38  * optimize the conversion functions, but the prototypes generate warnings
39  * from gcc. The conversion methods isn't the bottleneck for my app, so
40  * just remove the warnings by undef'ing the optimization ..
41  */
42 #undef ntohs
43 #undef ntohl
44 #undef htons
45 #undef htonl
46 #endif
47 
48 /* for network write */
49 #define TRANSMIT_COMPLETE      0
50 #define TRANSMIT_INCOMPLETE    1
51 #define TRANSMIT_SOFT_ERROR    2
52 #define TRANSMIT_HARD_ERROR    3
53 
54 /* for generating key */
55 #define KEY_PREFIX_BASE        0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
56 #define KEY_PREFIX_MASK        0x1010101010101010
57 
58 /* For parse the value length return by server */
59 #define KEY_TOKEN              1
60 #define VALUELEN_TOKEN         3
61 
62 /* global increasing counter, to ensure the key prefix unique */
63 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
64 
65 /* global increasing counter, generating request id for UDP */
66 static volatile uint32_t udp_request_id= 0;
67 
68 extern pthread_key_t ms_thread_key;
69 
70 /* generate upd request id */
71 static uint32_t ms_get_udp_request_id(void);
72 
73 
74 /* connect initialize */
75 static void ms_task_init(ms_conn_t *c);
76 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
77 static int ms_conn_sock_init(ms_conn_t *c);
78 static int ms_conn_event_init(ms_conn_t *c);
79 static int ms_conn_init(ms_conn_t *c,
80                         const int init_state,
81                         const int read_buffer_size,
82                         const bool is_udp);
83 static void ms_warmup_num_init(ms_conn_t *c);
84 static int ms_item_win_init(ms_conn_t *c);
85 
86 
87 /* connection close */
88 void ms_conn_free(ms_conn_t *c);
89 static void ms_conn_close(ms_conn_t *c);
90 
91 
92 /* create network connection */
93 static int ms_new_socket(struct addrinfo *ai);
94 static void ms_maximize_sndbuf(const int sfd);
95 static int ms_network_connect(ms_conn_t *c,
96                               char *srv_host_name,
97                               const int srv_port,
98                               const bool is_udp,
99                               int *ret_sfd);
100 static int ms_reconn(ms_conn_t *c);
101 
102 
103 /* read and parse */
104 static int ms_tokenize_command(char *command,
105                                token_t *tokens,
106                                const int max_tokens);
107 static int ms_ascii_process_line(ms_conn_t *c, char *command);
108 static int ms_try_read_line(ms_conn_t *c);
109 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
110 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
111 static int ms_try_read_network(ms_conn_t *c);
112 static void ms_verify_value(ms_conn_t *c,
113                             ms_mlget_task_item_t *mlget_item,
114                             char *value,
115                             int vlen);
116 static void ms_ascii_complete_nread(ms_conn_t *c);
117 static void ms_bin_complete_nread(ms_conn_t *c);
118 static void ms_complete_nread(ms_conn_t *c);
119 
120 
121 /* send functions */
122 static int ms_add_msghdr(ms_conn_t *c);
123 static int ms_ensure_iov_space(ms_conn_t *c);
124 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
125 static int ms_build_udp_headers(ms_conn_t *c);
126 static int ms_transmit(ms_conn_t *c);
127 
128 
129 /* status adjustment */
130 static void ms_conn_shrink(ms_conn_t *c);
131 static void ms_conn_set_state(ms_conn_t *c, int state);
132 static bool ms_update_event(ms_conn_t *c, const int new_flags);
133 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
134 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
135 static int ms_update_conn_sock_event(ms_conn_t *c);
136 static bool ms_need_yield(ms_conn_t *c);
137 static void ms_update_start_time(ms_conn_t *c);
138 
139 
140 /* main loop */
141 static void ms_drive_machine(ms_conn_t *c);
142 void ms_event_handler(const int fd, const short which, void *arg);
143 
144 
145 /* ascii protocol */
146 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
147 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
148 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
149 
150 
151 /* binary protocol */
152 static int ms_bin_process_response(ms_conn_t *c);
153 static void ms_add_bin_header(ms_conn_t *c,
154                               uint8_t opcode,
155                               uint8_t hdr_len,
156                               uint16_t key_len,
157                               uint32_t body_len);
158 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
159 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
160 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
161 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
162 
163 
164 /**
165  * each key has two parts, prefix and suffix. The suffix is a
166  * string random get form the character table. The prefix is a
167  * uint64_t variable. And the prefix must be unique. we use the
168  * prefix to identify a key. And the prefix can't include
169  * character ' ' '\r' '\n' '\0'.
170  *
171  * @return uint64_t
172  */
ms_get_key_prefix(void)173 uint64_t ms_get_key_prefix(void)
174 {
175   uint64_t key_prefix;
176 
177   pthread_mutex_lock(&ms_global.seq_mutex);
178   key_prefix_seq|= KEY_PREFIX_MASK;
179   key_prefix= key_prefix_seq;
180   key_prefix_seq++;
181   pthread_mutex_unlock(&ms_global.seq_mutex);
182 
183   return key_prefix;
184 } /* ms_get_key_prefix */
185 
186 
187 /**
188  * get an unique udp request id
189  *
190  * @return an unique UDP request id
191  */
ms_get_udp_request_id(void)192 static uint32_t ms_get_udp_request_id(void)
193 {
194   return atomic_add_32_nv(&udp_request_id, 1);
195 }
196 
197 
198 /**
199  * initialize current task structure
200  *
201  * @param c, pointer of the concurrency
202  */
ms_task_init(ms_conn_t * c)203 static void ms_task_init(ms_conn_t *c)
204 {
205   c->curr_task.cmd= CMD_NULL;
206   c->curr_task.item= 0;
207   c->curr_task.verify= false;
208   c->curr_task.finish_verify= true;
209   c->curr_task.get_miss= true;
210 
211   c->curr_task.get_opt= 0;
212   c->curr_task.set_opt= 0;
213   c->curr_task.cycle_undo_get= 0;
214   c->curr_task.cycle_undo_set= 0;
215   c->curr_task.verified_get= 0;
216   c->curr_task.overwrite_set= 0;
217 } /* ms_task_init */
218 
219 
220 /**
221  * initialize udp for the connection structure
222  *
223  * @param c, pointer of the concurrency
224  * @param is_udp, whether it's udp
225  *
226  * @return int, if success, return EXIT_SUCCESS, else return -1
227  */
ms_conn_udp_init(ms_conn_t * c,const bool is_udp)228 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
229 {
230   c->hdrbuf= 0;
231   c->rudpbuf= 0;
232   c->udppkt= 0;
233 
234   c->rudpsize= UDP_DATA_BUFFER_SIZE;
235   c->hdrsize= 0;
236 
237   c->rudpbytes= 0;
238   c->packets= 0;
239   c->recvpkt= 0;
240   c->pktcurr= 0;
241   c->ordcurr= 0;
242 
243   c->udp= is_udp;
244 
245   if (c->udp || (! c->udp && ms_setting.facebook_test))
246   {
247     c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
248     c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
249 
250     if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
251     {
252       if (c->rudpbuf != NULL)
253         free(c->rudpbuf);
254       if (c->udppkt != NULL)
255         free(c->udppkt);
256       fprintf(stderr, "malloc()\n");
257       return -1;
258     }
259     memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
260   }
261 
262   return EXIT_SUCCESS;
263 } /* ms_conn_udp_init */
264 
265 
266 /**
267  * initialize the connection structure
268  *
269  * @param c, pointer of the concurrency
270  * @param init_state, (conn_read, conn_write, conn_closing)
271  * @param read_buffer_size
272  * @param is_udp, whether it's udp
273  *
274  * @return int, if success, return EXIT_SUCCESS, else return -1
275  */
ms_conn_init(ms_conn_t * c,const int init_state,const int read_buffer_size,const bool is_udp)276 static int ms_conn_init(ms_conn_t *c,
277                         const int init_state,
278                         const int read_buffer_size,
279                         const bool is_udp)
280 {
281   assert(c != NULL);
282 
283   c->rbuf= c->wbuf= 0;
284   c->iov= 0;
285   c->msglist= 0;
286 
287   c->rsize= read_buffer_size;
288   c->wsize= WRITE_BUFFER_SIZE;
289   c->iovsize= IOV_LIST_INITIAL;
290   c->msgsize= MSG_LIST_INITIAL;
291 
292   /* for replication, each connection need connect all the server */
293   if (ms_setting.rep_write_srv > 0)
294   {
295     c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
296   }
297   else
298   {
299     c->total_sfds= ms_setting.sock_per_conn;
300   }
301   c->alive_sfds= 0;
302 
303   c->rbuf= (char *)malloc((size_t)c->rsize);
304   c->wbuf= (char *)malloc((size_t)c->wsize);
305   c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
306   c->msglist= (struct msghdr *)malloc(
307     sizeof(struct msghdr) * (size_t)c->msgsize);
308   if (ms_setting.mult_key_num > 1)
309   {
310     c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
311                               malloc(
312       sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
313   }
314   c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
315 
316   if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
317       || (c->msglist == NULL) || (c->tcpsfd == NULL)
318       || ((ms_setting.mult_key_num > 1)
319           && (c->mlget_task.mlget_item == NULL)))
320   {
321     if (c->rbuf != NULL)
322       free(c->rbuf);
323     if (c->wbuf != NULL)
324       free(c->wbuf);
325     if (c->iov != NULL)
326       free(c->iov);
327     if (c->msglist != NULL)
328       free(c->msglist);
329     if (c->mlget_task.mlget_item != NULL)
330       free(c->mlget_task.mlget_item);
331     if (c->tcpsfd != NULL)
332       free(c->tcpsfd);
333     fprintf(stderr, "malloc()\n");
334     return -1;
335   }
336 
337   c->state= init_state;
338   c->rvbytes= 0;
339   c->rbytes= 0;
340   c->rcurr= c->rbuf;
341   c->wcurr= c->wbuf;
342   c->iovused= 0;
343   c->msgcurr= 0;
344   c->msgused= 0;
345   c->cur_idx= c->total_sfds;       /* default index is a invalid value */
346 
347   c->ctnwrite= false;
348   c->readval= false;
349   c->change_sfd= false;
350 
351   c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
352   c->precmd.isfinish= true;         /* default the previous command finished */
353   c->currcmd.isfinish= false;
354   c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
355   c->precmd.key_prefix= c->currcmd.key_prefix= 0;
356 
357   c->mlget_task.mlget_num= 0;
358   c->mlget_task.value_index= -1;         /* default invalid value */
359 
360   if (ms_setting.binary_prot_)
361   {
362     c->protocol= binary_prot;
363   }
364   else
365   {
366     c->protocol= ascii_prot;
367   }
368 
369   /* initialize udp */
370   if (ms_conn_udp_init(c, is_udp) != 0)
371   {
372     return -1;
373   }
374 
375   /* initialize task */
376   ms_task_init(c);
377 
378   if (! (ms_setting.facebook_test && is_udp))
379   {
380     atomic_add_32(&ms_stats.active_conns, 1);
381   }
382 
383   return EXIT_SUCCESS;
384 } /* ms_conn_init */
385 
386 
387 /**
388  * when doing 100% get operation, it could preset some objects
389  * to warmup the server. this function is used to initialize the
390  * number of the objects to preset.
391  *
392  * @param c, pointer of the concurrency
393  */
ms_warmup_num_init(ms_conn_t * c)394 static void ms_warmup_num_init(ms_conn_t *c)
395 {
396   /* no set operation, preset all the items in the window  */
397   if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
398   {
399     c->warmup_num= c->win_size;
400     c->remain_warmup_num= c->warmup_num;
401   }
402   else
403   {
404     c->warmup_num= 0;
405     c->remain_warmup_num= c->warmup_num;
406   }
407 } /* ms_warmup_num_init */
408 
409 
410 /**
411  * each connection has an item window, this function initialize
412  * the window. The window is used to generate task.
413  *
414  * @param c, pointer of the concurrency
415  *
416  * @return int, if success, return EXIT_SUCCESS, else return -1
417  */
ms_item_win_init(ms_conn_t * c)418 static int ms_item_win_init(ms_conn_t *c)
419 {
420   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
421   int exp_cnt= 0;
422 
423   c->win_size= (int)ms_setting.win_size;
424   c->set_cursor= 0;
425   c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
426   c->remain_exec_num= c->exec_num;
427 
428   c->item_win= (ms_task_item_t *)malloc(
429     sizeof(ms_task_item_t) * (size_t)c->win_size);
430   if (c->item_win == NULL)
431   {
432     fprintf(stderr, "Can't allocate task item array for conn.\n");
433     return -1;
434   }
435   memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
436 
437   for (int i= 0; i < c->win_size; i++)
438   {
439     c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
440     c->item_win[i].key_prefix= ms_get_key_prefix();
441     c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
442     c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
443     c->item_win[i].value_offset= INVALID_OFFSET;         /* default in invalid offset */
444     c->item_win[i].client_time= 0;
445 
446     /* set expire time base on the proportion */
447     if (exp_cnt < ms_setting.exp_ver_per * i)
448     {
449       c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
450       exp_cnt++;
451     }
452     else
453     {
454       c->item_win[i].exp_time= 0;
455     }
456   }
457 
458   ms_warmup_num_init(c);
459 
460   return EXIT_SUCCESS;
461 } /* ms_item_win_init */
462 
463 
464 /**
465  * each connection structure can include one or more sock
466  * handlers. this function create these socks and connect the
467  * server(s).
468  *
469  * @param c, pointer of the concurrency
470  *
471  * @return int, if success, return EXIT_SUCCESS, else return -1
472  */
ms_conn_sock_init(ms_conn_t * c)473 static int ms_conn_sock_init(ms_conn_t *c)
474 {
475   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
476   uint32_t i;
477   int ret_sfd;
478   uint32_t srv_idx= 0;
479 
480   assert(c != NULL);
481   assert(c->tcpsfd != NULL);
482 
483   for (i= 0; i < c->total_sfds; i++)
484   {
485     ret_sfd= 0;
486     if (ms_setting.rep_write_srv > 0)
487     {
488       /* for replication, each connection need connect all the server */
489       srv_idx= i % ms_setting.srv_cnt;
490     }
491     else
492     {
493       /* all the connections in a thread connects the same server */
494       srv_idx= ms_thread->thread_ctx->srv_idx;
495     }
496 
497     if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
498                            ms_setting.servers[srv_idx].srv_port,
499                            ms_setting.udp, &ret_sfd) != 0)
500     {
501       break;
502     }
503 
504     if (i == 0)
505     {
506       c->sfd= ret_sfd;
507     }
508 
509     if (! ms_setting.udp)
510     {
511       c->tcpsfd[i]= ret_sfd;
512     }
513 
514     c->alive_sfds++;
515   }
516 
517   /* initialize udp sock handler if necessary */
518   if (ms_setting.facebook_test)
519   {
520     ret_sfd= 0;
521     if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
522                            ms_setting.servers[srv_idx].srv_port,
523                            true, &ret_sfd) != 0)
524     {
525       c->udpsfd= 0;
526     }
527     else
528     {
529       c->udpsfd= ret_sfd;
530     }
531   }
532 
533   if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
534   {
535     if (ms_setting.udp)
536     {
537       close(c->sfd);
538     }
539     else
540     {
541       for (uint32_t j= 0; j < i; j++)
542       {
543         close(c->tcpsfd[j]);
544       }
545     }
546 
547     if (c->udpsfd != 0)
548     {
549       close(c->udpsfd);
550     }
551 
552     return -1;
553   }
554 
555   return EXIT_SUCCESS;
556 } /* ms_conn_sock_init */
557 
558 
559 /**
560  * each connection is managed by libevent, this function
561  * initialize the event of the connection structure.
562  *
563  * @param c, pointer of the concurrency
564  *
565  * @return int, if success, return EXIT_SUCCESS, else return -1
566  */
ms_conn_event_init(ms_conn_t * c)567 static int ms_conn_event_init(ms_conn_t *c)
568 {
569   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
570   short event_flags= EV_WRITE | EV_PERSIST;
571 
572   event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
573   event_base_set(ms_thread->base, &c->event);
574   c->ev_flags= event_flags;
575 
576   if (event_add(&c->event, NULL) == -1)
577   {
578     return -1;
579   }
580 
581   return EXIT_SUCCESS;
582 } /* ms_conn_event_init */
583 
584 
585 /**
586  * setup a connection, each connection structure of each
587  * thread must call this function to initialize.
588  *
589  * @param c, pointer of the concurrency
590  *
591  * @return int, if success, return EXIT_SUCCESS, else return -1
592  */
ms_setup_conn(ms_conn_t * c)593 int ms_setup_conn(ms_conn_t *c)
594 {
595   if (ms_item_win_init(c) != 0)
596   {
597     return -1;
598   }
599 
600   if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
601   {
602     return -1;
603   }
604 
605   if (ms_conn_sock_init(c) != 0)
606   {
607     return -1;
608   }
609 
610   if (ms_conn_event_init(c) != 0)
611   {
612     return -1;
613   }
614 
615   return EXIT_SUCCESS;
616 } /* ms_setup_conn */
617 
618 
619 /**
620  * Frees a connection.
621  *
622  * @param c, pointer of the concurrency
623  */
ms_conn_free(ms_conn_t * c)624 void ms_conn_free(ms_conn_t *c)
625 {
626   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
627   if (c != NULL)
628   {
629     if (c->hdrbuf != NULL)
630       free(c->hdrbuf);
631     if (c->msglist != NULL)
632       free(c->msglist);
633     if (c->rbuf != NULL)
634       free(c->rbuf);
635     if (c->wbuf != NULL)
636       free(c->wbuf);
637     if (c->iov != NULL)
638       free(c->iov);
639     if (c->mlget_task.mlget_item != NULL)
640       free(c->mlget_task.mlget_item);
641     if (c->rudpbuf != NULL)
642       free(c->rudpbuf);
643     if (c->udppkt != NULL)
644       free(c->udppkt);
645     if (c->item_win != NULL)
646       free(c->item_win);
647     if (c->tcpsfd != NULL)
648       free(c->tcpsfd);
649 
650     if (--ms_thread->nactive_conn == 0)
651     {
652       free(ms_thread->conn);
653     }
654   }
655 } /* ms_conn_free */
656 
657 
658 /**
659  * close a connection
660  *
661  * @param c, pointer of the concurrency
662  */
ms_conn_close(ms_conn_t * c)663 static void ms_conn_close(ms_conn_t *c)
664 {
665   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
666   assert(c != NULL);
667 
668   /* delete the event, the socket and the connection */
669   event_del(&c->event);
670 
671   for (uint32_t i= 0; i < c->total_sfds; i++)
672   {
673     if (c->tcpsfd[i] > 0)
674     {
675       close(c->tcpsfd[i]);
676     }
677   }
678   c->sfd= 0;
679 
680   if (ms_setting.facebook_test)
681   {
682     close(c->udpsfd);
683   }
684 
685   atomic_dec_32(&ms_stats.active_conns);
686 
687   ms_conn_free(c);
688 
689   if (ms_setting.run_time == 0)
690   {
691     pthread_mutex_lock(&ms_global.run_lock.lock);
692     ms_global.run_lock.count++;
693     pthread_cond_signal(&ms_global.run_lock.cond);
694     pthread_mutex_unlock(&ms_global.run_lock.lock);
695   }
696 
697   if (ms_thread->nactive_conn == 0)
698   {
699     pthread_exit(NULL);
700   }
701 } /* ms_conn_close */
702 
703 
704 /**
705  * create a new sock
706  *
707  * @param ai, server address information
708  *
709  * @return int, if success, return EXIT_SUCCESS, else return -1
710  */
ms_new_socket(struct addrinfo * ai)711 static int ms_new_socket(struct addrinfo *ai)
712 {
713   int sfd;
714 
715   if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
716   {
717     fprintf(stderr, "socket() error: %s.\n", strerror(errno));
718     return -1;
719   }
720 
721   return sfd;
722 } /* ms_new_socket */
723 
724 
725 /**
726  * Sets a socket's send buffer size to the maximum allowed by the system.
727  *
728  * @param sfd, file descriptor of socket
729  */
ms_maximize_sndbuf(const int sfd)730 static void ms_maximize_sndbuf(const int sfd)
731 {
732   socklen_t intsize= sizeof(int);
733   unsigned int last_good= 0;
734   unsigned int min, max, avg;
735   unsigned int old_size;
736 
737   /* Start with the default size. */
738   if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
739   {
740     fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
741     return;
742   }
743 
744   /* Binary-search for the real maximum. */
745   min= old_size;
746   max= MAX_SENDBUF_SIZE;
747 
748   while (min <= max)
749   {
750     avg= ((unsigned int)(min + max)) / 2;
751     if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
752     {
753       last_good= avg;
754       min= avg + 1;
755     }
756     else
757     {
758       max= avg - 1;
759     }
760   }
761   (void)last_good;
762 } /* ms_maximize_sndbuf */
763 
764 
765 /**
766  * socket connects the server
767  *
768  * @param c, pointer of the concurrency
769  * @param srv_host_name, the host name of the server
770  * @param srv_port, port of server
771  * @param is_udp, whether it's udp
772  * @param ret_sfd, the connected socket file descriptor
773  *
774  * @return int, if success, return EXIT_SUCCESS, else return -1
775  */
ms_network_connect(ms_conn_t * c,char * srv_host_name,const int srv_port,const bool is_udp,int * ret_sfd)776 static int ms_network_connect(ms_conn_t *c,
777                               char *srv_host_name,
778                               const int srv_port,
779                               const bool is_udp,
780                               int *ret_sfd)
781 {
782   int sfd;
783   struct linger ling=
784   {
785     0, 0
786   };
787   struct addrinfo *ai;
788   struct addrinfo *next;
789   struct addrinfo hints;
790   char port_buf[NI_MAXSERV];
791   int  error;
792   int  success= 0;
793 
794   int flags= 1;
795 
796   /*
797    * the memset call clears nonstandard fields in some impementations
798    * that otherwise mess things up.
799    */
800   memset(&hints, 0, sizeof(hints));
801 #ifdef AI_ADDRCONFIG
802   hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
803 #else
804   hints.ai_flags= AI_PASSIVE;
805 #endif /* AI_ADDRCONFIG */
806   if (is_udp)
807   {
808     hints.ai_protocol= IPPROTO_UDP;
809     hints.ai_socktype= SOCK_DGRAM;
810     hints.ai_family= AF_INET;      /* This left here because of issues with OSX 10.5 */
811   }
812   else
813   {
814     hints.ai_family= AF_UNSPEC;
815     hints.ai_protocol= IPPROTO_TCP;
816     hints.ai_socktype= SOCK_STREAM;
817   }
818 
819   snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
820   error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
821   if (error != 0)
822   {
823     if (error != EAI_SYSTEM)
824       fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
825     else
826       perror("getaddrinfo()\n");
827 
828     return -1;
829   }
830 
831   for (next= ai; next; next= next->ai_next)
832   {
833     if ((sfd= ms_new_socket(next)) == -1)
834     {
835       freeaddrinfo(ai);
836       return -1;
837     }
838 
839     setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
840     if (is_udp)
841     {
842       ms_maximize_sndbuf(sfd);
843     }
844     else
845     {
846       setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
847                  sizeof(flags));
848       setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
849       setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
850                  sizeof(flags));
851     }
852 
853     if (is_udp)
854     {
855       c->srv_recv_addr_size= sizeof(struct sockaddr);
856       memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
857     }
858     else
859     {
860       if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
861       {
862         close(sfd);
863         freeaddrinfo(ai);
864         return -1;
865       }
866     }
867 
868     if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
869         || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
870     {
871       fprintf(stderr, "setting O_NONBLOCK\n");
872       close(sfd);
873       freeaddrinfo(ai);
874       return -1;
875     }
876 
877     if (ret_sfd != NULL)
878     {
879       *ret_sfd= sfd;
880     }
881 
882     success++;
883   }
884 
885   freeaddrinfo(ai);
886 
887   /* Return zero if we detected no errors in starting up connections */
888   return success == 0;
889 } /* ms_network_connect */
890 
891 
892 /**
893  * reconnect a disconnected sock
894  *
895  * @param c, pointer of the concurrency
896  *
897  * @return int, if success, return EXIT_SUCCESS, else return -1
898  */
ms_reconn(ms_conn_t * c)899 static int ms_reconn(ms_conn_t *c)
900 {
901   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
902   uint32_t srv_idx= 0;
903   uint32_t srv_conn_cnt= 0;
904 
905   if (ms_setting.rep_write_srv > 0)
906   {
907     srv_idx= c->cur_idx % ms_setting.srv_cnt;
908     srv_conn_cnt= ms_setting.sock_per_conn  * ms_setting.nconns;
909   }
910   else
911   {
912     srv_idx= ms_thread->thread_ctx->srv_idx;
913     srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
914   }
915 
916   /* close the old socket handler */
917   close(c->sfd);
918   c->tcpsfd[c->cur_idx]= 0;
919 
920   if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
921       % srv_conn_cnt == 0)
922   {
923     gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
924     fprintf(stderr, "Server %s:%d disconnect\n",
925             ms_setting.servers[srv_idx].srv_host_name,
926             ms_setting.servers[srv_idx].srv_port);
927   }
928 
929   if (ms_setting.rep_write_srv > 0)
930   {
931     uint32_t i= 0;
932 
933     for (i= 0; i < c->total_sfds; i++)
934     {
935       if (c->tcpsfd[i] != 0)
936       {
937         break;
938       }
939     }
940 
941     /* all socks disconnect */
942     if (i == c->total_sfds)
943     {
944       return -1;
945     }
946   }
947   else
948   {
949     do
950     {
951       /* reconnect success, break the loop */
952       if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
953                              ms_setting.servers[srv_idx].srv_port,
954                              ms_setting.udp, &c->sfd) == 0)
955       {
956         c->tcpsfd[c->cur_idx]= c->sfd;
957         if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
958             % (uint32_t)srv_conn_cnt == 0)
959         {
960           gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
961           int reconn_time=
962             (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
963                   - ms_setting.servers[srv_idx].disconn_time
964                      .tv_sec);
965           fprintf(stderr, "Server %s:%d reconnect after %ds\n",
966                   ms_setting.servers[srv_idx].srv_host_name,
967                   ms_setting.servers[srv_idx].srv_port, reconn_time);
968         }
969         break;
970       }
971 
972       if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
973       {
974         /* wait a second and reconnect */
975         sleep(1);
976       }
977     }
978     while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
979   }
980 
981   if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
982   {
983     c->sfd= 0;
984     c->alive_sfds--;
985   }
986 
987   return EXIT_SUCCESS;
988 } /* ms_reconn */
989 
990 
991 /**
992  *  reconnect several disconnected socks in the connection
993  *  structure, the ever-1-second timer of the thread will check
994  *  whether some socks in the connections disconnect. if
995  *  disconnect, reconnect the sock.
996  *
997  * @param c, pointer of the concurrency
998  *
999  * @return int, if success, return EXIT_SUCCESS, else return -1
1000  */
ms_reconn_socks(ms_conn_t * c)1001 int ms_reconn_socks(ms_conn_t *c)
1002 {
1003   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
1004   uint32_t srv_idx= 0;
1005   int ret_sfd= 0;
1006   uint32_t srv_conn_cnt= 0;
1007   struct timeval cur_time;
1008 
1009   assert(c != NULL);
1010 
1011   if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1012   {
1013     return EXIT_SUCCESS;
1014   }
1015 
1016   for (uint32_t i= 0; i < c->total_sfds; i++)
1017   {
1018     if (c->tcpsfd[i] == 0)
1019     {
1020       gettimeofday(&cur_time, NULL);
1021 
1022       /**
1023        *  For failover test of replication, reconnect the socks after
1024        *  it disconnects more than 5 seconds, Otherwise memslap will
1025        *  block at connect() function and the work threads can't work
1026        *  in this interval.
1027        */
1028       if (cur_time.tv_sec
1029           - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1030       {
1031         break;
1032       }
1033 
1034       if (ms_setting.rep_write_srv > 0)
1035       {
1036         srv_idx= i % ms_setting.srv_cnt;
1037         srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
1038       }
1039       else
1040       {
1041         srv_idx= ms_thread->thread_ctx->srv_idx;
1042         srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1043       }
1044 
1045       if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1046                              ms_setting.servers[srv_idx].srv_port,
1047                              ms_setting.udp, &ret_sfd) == 0)
1048       {
1049         c->tcpsfd[i]= ret_sfd;
1050         c->alive_sfds++;
1051 
1052         if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1053             % (uint32_t)srv_conn_cnt == 0)
1054         {
1055           gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1056           int reconn_time=
1057             (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1058                   - ms_setting.servers[srv_idx].disconn_time
1059                      .tv_sec);
1060           fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1061                   ms_setting.servers[srv_idx].srv_host_name,
1062                   ms_setting.servers[srv_idx].srv_port, reconn_time);
1063         }
1064       }
1065     }
1066   }
1067 
1068   return EXIT_SUCCESS;
1069 } /* ms_reconn_socks */
1070 
1071 
1072 /**
1073  * Tokenize the command string by replacing whitespace with '\0' and update
1074  * the token array tokens with pointer to start of each token and length.
1075  * Returns total number of tokens.  The last valid token is the terminal
1076  * token (value points to the first unprocessed character of the string and
1077  * length zero).
1078  *
1079  * Usage example:
1080  *
1081  *  while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1082  *      for(int ix = 0; tokens[ix].length != 0; ix++) {
1083  *          ...
1084  *      }
1085  *      ncommand = tokens[ix].value - command;
1086  *      command  = tokens[ix].value;
1087  *   }
1088  *
1089  * @param command, the command string to token
1090  * @param tokens, array to store tokens
1091  * @param max_tokens, maximum tokens number
1092  *
1093  * @return int, the number of tokens
1094  */
ms_tokenize_command(char * command,token_t * tokens,const int max_tokens)1095 static int ms_tokenize_command(char *command,
1096                                token_t *tokens,
1097                                const int max_tokens)
1098 {
1099   char *s, *e;
1100   int  ntokens= 0;
1101 
1102   assert(command != NULL && tokens != NULL && max_tokens > 1);
1103 
1104   for (s= e= command; ntokens < max_tokens - 1; ++e)
1105   {
1106     if (*e == ' ')
1107     {
1108       if (s != e)
1109       {
1110         tokens[ntokens].value= s;
1111         tokens[ntokens].length= (size_t)(e - s);
1112         ntokens++;
1113         *e= '\0';
1114       }
1115       s= e + 1;
1116     }
1117     else if (*e == '\0')
1118     {
1119       if (s != e)
1120       {
1121         tokens[ntokens].value= s;
1122         tokens[ntokens].length= (size_t)(e - s);
1123         ntokens++;
1124       }
1125 
1126       break;       /* string end */
1127     }
1128   }
1129 
1130   return ntokens;
1131 } /* ms_tokenize_command */
1132 
1133 
1134 /**
1135  * parse the response of server.
1136  *
1137  * @param c, pointer of the concurrency
1138  * @param command, the string responded by server
1139  *
1140  * @return int, if the command completed return EXIT_SUCCESS, else return
1141  *         -1
1142  */
ms_ascii_process_line(ms_conn_t * c,char * command)1143 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1144 {
1145   int ret= 0;
1146   int64_t value_len;
1147   char *buffer= command;
1148 
1149   assert(c != NULL);
1150 
1151   /**
1152    * for command get, we store the returned value into local buffer
1153    * then continue in ms_complete_nread().
1154    */
1155 
1156   switch (buffer[0])
1157   {
1158   case 'V':                     /* VALUE || VERSION */
1159     if (buffer[1] == 'A')       /* VALUE */
1160     {
1161       token_t tokens[MAX_TOKENS];
1162       ms_tokenize_command(command, tokens, MAX_TOKENS);
1163       errno= 0;
1164       value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1165       if (errno != 0)
1166       {
1167         printf("<%d ERROR %s\n", c->sfd, strerror(errno));
1168       }
1169       c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1170 
1171       /*
1172        *  We read the \r\n into the string since not doing so is more
1173        *  cycles then the waster of memory to do so.
1174        *
1175        *  We are null terminating through, which will most likely make
1176        *  some people lazy about using the return length.
1177        */
1178       c->rvbytes= (int)(value_len + 2);
1179       c->readval= true;
1180       ret= -1;
1181     }
1182 
1183     break;
1184 
1185   case 'O':   /* OK */
1186     c->currcmd.retstat= MCD_SUCCESS;
1187 
1188   case 'S':                    /* STORED STATS SERVER_ERROR */
1189     if (buffer[2] == 'A')      /* STORED STATS */
1190     {       /* STATS*/
1191       c->currcmd.retstat= MCD_STAT;
1192     }
1193     else if (buffer[1] == 'E')
1194     {
1195       /* SERVER_ERROR */
1196       printf("<%d %s\n", c->sfd, buffer);
1197 
1198       c->currcmd.retstat= MCD_SERVER_ERROR;
1199     }
1200     else if (buffer[1] == 'T')
1201     {
1202       /* STORED */
1203       c->currcmd.retstat= MCD_STORED;
1204     }
1205     else
1206     {
1207       c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1208     }
1209     break;
1210 
1211   case 'D':   /* DELETED DATA */
1212     if (buffer[1] == 'E')
1213     {
1214       c->currcmd.retstat= MCD_DELETED;
1215     }
1216     else
1217     {
1218       c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1219     }
1220 
1221     break;
1222 
1223   case 'N':   /* NOT_FOUND NOT_STORED*/
1224     if (buffer[4] == 'F')
1225     {
1226       c->currcmd.retstat= MCD_NOTFOUND;
1227     }
1228     else if (buffer[4] == 'S')
1229     {
1230       printf("<%d %s\n", c->sfd, buffer);
1231       c->currcmd.retstat= MCD_NOTSTORED;
1232     }
1233     else
1234     {
1235       c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1236     }
1237     break;
1238 
1239   case 'E':   /* PROTOCOL ERROR or END */
1240     if (buffer[1] == 'N')
1241     {
1242       /* END */
1243       c->currcmd.retstat= MCD_END;
1244     }
1245     else if (buffer[1] == 'R')
1246     {
1247       printf("<%d ERROR\n", c->sfd);
1248       c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1249     }
1250     else if (buffer[1] == 'X')
1251     {
1252       c->currcmd.retstat= MCD_DATA_EXISTS;
1253       printf("<%d %s\n", c->sfd, buffer);
1254     }
1255     else
1256     {
1257       c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1258     }
1259     break;
1260 
1261   case 'C':   /* CLIENT ERROR */
1262     printf("<%d %s\n", c->sfd, buffer);
1263     c->currcmd.retstat= MCD_CLIENT_ERROR;
1264     break;
1265 
1266   default:
1267     c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1268     break;
1269   } /* switch */
1270 
1271   return ret;
1272 } /* ms_ascii_process_line */
1273 
1274 
1275 /**
1276  * after one operation completes, reset the concurrency
1277  *
1278  * @param c, pointer of the concurrency
1279  * @param timeout, whether it's timeout
1280  */
ms_reset_conn(ms_conn_t * c,bool timeout)1281 void ms_reset_conn(ms_conn_t *c, bool timeout)
1282 {
1283   assert(c != NULL);
1284 
1285   if (c->udp)
1286   {
1287     if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1288     {
1289       memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1290     }
1291 
1292     c->packets= 0;
1293     c->recvpkt= 0;
1294     c->pktcurr= 0;
1295     c->ordcurr= 0;
1296     c->rudpbytes= 0;
1297   }
1298   c->currcmd.isfinish= true;
1299   c->ctnwrite= false;
1300   c->rbytes= 0;
1301   c->rcurr= c->rbuf;
1302   c->msgcurr = 0;
1303   c->msgused = 0;
1304   c->iovused = 0;
1305   ms_conn_set_state(c, conn_write);
1306   memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t));    /* replicate command state */
1307 
1308   if (timeout)
1309   {
1310     ms_drive_machine(c);
1311   }
1312 } /* ms_reset_conn */
1313 
1314 
1315 /**
1316  * if we have a complete line in the buffer, process it.
1317  *
1318  * @param c, pointer of the concurrency
1319  *
1320  * @return int, if success, return EXIT_SUCCESS, else return -1
1321  */
ms_try_read_line(ms_conn_t * c)1322 static int ms_try_read_line(ms_conn_t *c)
1323 {
1324   if (c->protocol == binary_prot)
1325   {
1326     /* Do we have the complete packet header? */
1327     if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1328     {
1329       /* need more data! */
1330       return EXIT_SUCCESS;
1331     }
1332     else
1333     {
1334 #ifdef NEED_ALIGN
1335       if (((long)(c->rcurr)) % 8 != 0)
1336       {
1337         /* must realign input buffer */
1338         memmove(c->rbuf, c->rcurr, c->rbytes);
1339         c->rcurr= c->rbuf;
1340         if (settings.verbose)
1341         {
1342           fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1343         }
1344       }
1345 #endif
1346       protocol_binary_response_header *rsp;
1347       rsp= (protocol_binary_response_header *)c->rcurr;
1348 
1349       c->binary_header= *rsp;
1350       c->binary_header.response.extlen= rsp->response.extlen;
1351       c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1352       c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1353       c->binary_header.response.status= ntohs(rsp->response.status);
1354 
1355       if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1356       {
1357         fprintf(stderr, "Invalid magic:  %x\n",
1358                 c->binary_header.response.magic);
1359         ms_conn_set_state(c, conn_closing);
1360         return EXIT_SUCCESS;
1361       }
1362 
1363       /* process this complete response */
1364       if (ms_bin_process_response(c) == 0)
1365       {
1366         /* current operation completed */
1367         ms_reset_conn(c, false);
1368         return -1;
1369       }
1370       else
1371       {
1372         c->rbytes-= (int32_t)sizeof(c->binary_header);
1373         c->rcurr+= sizeof(c->binary_header);
1374       }
1375     }
1376   }
1377   else
1378   {
1379     char *el, *cont;
1380 
1381     assert(c != NULL);
1382     assert(c->rcurr <= (c->rbuf + c->rsize));
1383 
1384     if (c->rbytes == 0)
1385       return EXIT_SUCCESS;
1386 
1387     el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1388     if (! el)
1389       return EXIT_SUCCESS;
1390 
1391     cont= el + 1;
1392     if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1393     {
1394       el--;
1395     }
1396     *el= '\0';
1397 
1398     assert(cont <= (c->rcurr + c->rbytes));
1399 
1400     /* process this complete line */
1401     if (ms_ascii_process_line(c, c->rcurr) == 0)
1402     {
1403       /* current operation completed */
1404       ms_reset_conn(c, false);
1405       return -1;
1406     }
1407     else
1408     {
1409       /* current operation didn't complete */
1410       c->rbytes-= (int32_t)(cont - c->rcurr);
1411       c->rcurr= cont;
1412     }
1413 
1414     assert(c->rcurr <= (c->rbuf + c->rsize));
1415   }
1416 
1417   return -1;
1418 } /* ms_try_read_line */
1419 
1420 
1421 /**
1422  *  because the packet of UDP can't ensure the order, the
1423  *  function is used to sort the received udp packet.
1424  *
1425  * @param c, pointer of the concurrency
1426  * @param buf, the buffer to store the ordered packages data
1427  * @param rbytes, the maximum capacity of the buffer
1428  *
1429  * @return int, if success, return the copy bytes, else return
1430  *         -1
1431  */
ms_sort_udp_packet(ms_conn_t * c,char * buf,int rbytes)1432 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1433 {
1434   int len= 0;
1435   int wbytes= 0;
1436   uint16_t req_id= 0;
1437   uint16_t seq_num= 0;
1438   uint16_t packets= 0;
1439   unsigned char *header= NULL;
1440 
1441   /* no enough data */
1442   assert(c != NULL);
1443   assert(buf != NULL);
1444   assert(c->rudpbytes >= UDP_HEADER_SIZE);
1445 
1446   /* calculate received packets count */
1447   if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1448   {
1449     /* the last packet has some data */
1450     c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1451   }
1452   else
1453   {
1454     c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1455   }
1456 
1457   /* get the total packets count if necessary */
1458   if (c->packets == 0)
1459   {
1460     c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1461   }
1462 
1463   /* build the ordered packet array */
1464   for (int i= c->pktcurr; i < c->recvpkt; i++)
1465   {
1466     header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1467     req_id= (uint16_t)HEADER_TO_REQID(header);
1468     assert(req_id == c->request_id % (1 << 16));
1469 
1470     packets= (uint16_t)HEADER_TO_PACKETS(header);
1471     assert(c->packets == HEADER_TO_PACKETS(header));
1472 
1473     seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1474     c->udppkt[seq_num].header= header;
1475     c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1476 
1477     if (i == c->recvpkt - 1)
1478     {
1479       /* last received packet */
1480       if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1481       {
1482         c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1483         c->pktcurr++;
1484       }
1485       else
1486       {
1487         c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1488                                    - UDP_HEADER_SIZE;
1489       }
1490     }
1491     else
1492     {
1493       c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1494       c->pktcurr++;
1495     }
1496   }
1497 
1498   for (int i= c->ordcurr; i < c->recvpkt; i++)
1499   {
1500     /* there is some data to copy */
1501     if ((c->udppkt[i].data != NULL)
1502         && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1503     {
1504       header= c->udppkt[i].header;
1505       len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1506       if (len > rbytes - wbytes)
1507       {
1508         len= rbytes - wbytes;
1509       }
1510 
1511       assert(len <= rbytes - wbytes);
1512       assert(i == HEADER_TO_SEQNUM(header));
1513 
1514       memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1515              (size_t)len);
1516       wbytes+= len;
1517       c->udppkt[i].copybytes+= len;
1518 
1519       if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1520           && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1521       {
1522         /* finish copying all the data of this packet, next */
1523         c->ordcurr++;
1524       }
1525 
1526       /* last received packet, and finish copying all the data */
1527       if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1528           && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1529       {
1530         break;
1531       }
1532 
1533       /* no space to copy data */
1534       if (wbytes >= rbytes)
1535       {
1536         break;
1537       }
1538 
1539       /* it doesn't finish reading all the data of the packet from network */
1540       if ((i != c->recvpkt - 1)
1541           && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1542       {
1543         break;
1544       }
1545     }
1546     else
1547     {
1548       /* no data to copy */
1549       break;
1550     }
1551   }
1552   (void)packets;
1553 
1554   return wbytes == 0 ? -1 : wbytes;
1555 } /* ms_sort_udp_packet */
1556 
1557 
1558 /**
1559  * encapsulate upd read like tcp read
1560  *
1561  * @param c, pointer of the concurrency
1562  * @param buf, read buffer
1563  * @param len, length to read
1564  *
1565  * @return int, if success, return the read bytes, else return
1566  *         -1
1567  */
ms_udp_read(ms_conn_t * c,char * buf,int len)1568 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1569 {
1570   int res= 0;
1571   int avail= 0;
1572   int rbytes= 0;
1573   int copybytes= 0;
1574 
1575   assert(c->udp);
1576 
1577   while (1)
1578   {
1579     if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1580     {
1581       char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1582       if (! new_rbuf)
1583       {
1584         fprintf(stderr, "Couldn't realloc input buffer.\n");
1585         c->rudpbytes= 0;          /* ignore what we read */
1586         return -1;
1587       }
1588       c->rudpbuf= new_rbuf;
1589       c->rudpsize*= 2;
1590     }
1591 
1592     avail= c->rudpsize - c->rudpbytes;
1593     /* UDP each time read a packet, 1400 bytes */
1594     res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1595 
1596     if (res > 0)
1597     {
1598       atomic_add_size(&ms_stats.bytes_read, res);
1599       c->rudpbytes+= res;
1600       rbytes+= res;
1601       if (res == avail)
1602       {
1603         continue;
1604       }
1605       else
1606       {
1607         break;
1608       }
1609     }
1610 
1611     if (res == 0)
1612     {
1613       /* "connection" closed */
1614       return res;
1615     }
1616 
1617     if (res == -1)
1618     {
1619       /* no data to read */
1620       return res;
1621     }
1622   }
1623 
1624   /* copy data to read buffer */
1625   if (rbytes > 0)
1626   {
1627     copybytes= ms_sort_udp_packet(c, buf, len);
1628   }
1629 
1630   if (copybytes == -1)
1631   {
1632     atomic_add_size(&ms_stats.pkt_disorder, 1);
1633   }
1634 
1635   return copybytes;
1636 } /* ms_udp_read */
1637 
1638 
1639 /*
1640  * read from network as much as we can, handle buffer overflow and connection
1641  * close.
1642  * before reading, move the remaining incomplete fragment of a command
1643  * (if any) to the beginning of the buffer.
1644  * return EXIT_SUCCESS if there's nothing to read on the first read.
1645  */
1646 
1647 /**
1648  * read from network as much as we can, handle buffer overflow and connection
1649  * close. before reading, move the remaining incomplete fragment of a command
1650  * (if any) to the beginning of the buffer.
1651  *
1652  * @param c, pointer of the concurrency
1653  *
1654  * @return int,
1655  *         return EXIT_SUCCESS if there's nothing to read on the first read.
1656  *         return EXIT_FAILURE if get data
1657  *         return -1 if error happens
1658  */
ms_try_read_network(ms_conn_t * c)1659 static int ms_try_read_network(ms_conn_t *c)
1660 {
1661   int gotdata= 0;
1662   int res;
1663   int64_t avail;
1664 
1665   assert(c != NULL);
1666 
1667   if ((c->rcurr != c->rbuf)
1668       && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1669           || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1670   {
1671     if (c->rbytes != 0)     /* otherwise there's nothing to copy */
1672       memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1673     c->rcurr= c->rbuf;
1674   }
1675 
1676   while (1)
1677   {
1678     if (c->rbytes >= c->rsize)
1679     {
1680       char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1681       if (! new_rbuf)
1682       {
1683         fprintf(stderr, "Couldn't realloc input buffer.\n");
1684         c->rbytes= 0;          /* ignore what we read */
1685         return -1;
1686       }
1687       c->rcurr= c->rbuf= new_rbuf;
1688       c->rsize*= 2;
1689     }
1690 
1691     avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1692     if (avail == 0)
1693     {
1694       break;
1695     }
1696 
1697     if (c->udp)
1698     {
1699       res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1700     }
1701     else
1702     {
1703       res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1704     }
1705 
1706     if (res > 0)
1707     {
1708       if (! c->udp)
1709       {
1710         atomic_add_size(&ms_stats.bytes_read, res);
1711       }
1712       gotdata= 1;
1713       c->rbytes+= res;
1714       if (res == avail)
1715       {
1716         continue;
1717       }
1718       else
1719       {
1720         break;
1721       }
1722     }
1723     if (res == 0)
1724     {
1725       /* connection closed */
1726       ms_conn_set_state(c, conn_closing);
1727       return -1;
1728     }
1729     if (res == -1)
1730     {
1731       if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1732         break;
1733       /* Should close on unhandled errors. */
1734       ms_conn_set_state(c, conn_closing);
1735       return -1;
1736     }
1737   }
1738 
1739   return gotdata;
1740 } /* ms_try_read_network */
1741 
1742 
1743 /**
1744  * after get the object from server, verify the value if
1745  * necessary.
1746  *
1747  * @param c, pointer of the concurrency
1748  * @param mlget_item, pointer of mulit-get task item structure
1749  * @param value, received value string
1750  * @param vlen, received value string length
1751  */
ms_verify_value(ms_conn_t * c,ms_mlget_task_item_t * mlget_item,char * value,int vlen)1752 static void ms_verify_value(ms_conn_t *c,
1753                             ms_mlget_task_item_t *mlget_item,
1754                             char *value,
1755                             int vlen)
1756 {
1757   if (c->curr_task.verify)
1758   {
1759     assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1760     char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1761     char *orignkey=
1762       &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1763 
1764     /* verify expire time if necessary */
1765     if (c->curr_task.item->exp_time > 0)
1766     {
1767       struct timeval curr_time;
1768       gettimeofday(&curr_time, NULL);
1769 
1770       /* object expired but get it now */
1771       if (curr_time.tv_sec - c->curr_task.item->client_time
1772           > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1773       {
1774         atomic_add_size(&ms_stats.exp_get, 1);
1775 
1776         if (ms_setting.verbose)
1777         {
1778           char set_time[64];
1779           char cur_time[64];
1780           strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1781                    localtime(&c->curr_task.item->client_time));
1782           strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1783                    localtime(&curr_time.tv_sec));
1784           fprintf(stderr,
1785                   "\n<%d expire time verification failed, "
1786                   "object expired but get it now\n"
1787                   "\tkey len: %d\n"
1788                   "\tkey: %" PRIx64 " %.*s\n"
1789                   "\tset time: %s current time: %s "
1790                   "diff time: %d expire time: %d\n"
1791                   "\texpected data: \n"
1792                   "\treceived data len: %d\n"
1793                   "\treceived data: %.*s\n",
1794                   c->sfd,
1795                   c->curr_task.item->key_size,
1796                   c->curr_task.item->key_prefix,
1797                   c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1798                   orignkey,
1799                   set_time,
1800                   cur_time,
1801                   (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1802                   c->curr_task.item->exp_time,
1803                   vlen,
1804                   vlen,
1805                   value);
1806           fflush(stderr);
1807         }
1808       }
1809     }
1810     else
1811     {
1812       if ((c->curr_task.item->value_size != vlen)
1813           || (memcmp(orignval, value, (size_t)vlen) != 0))
1814       {
1815         atomic_add_size(&ms_stats.vef_failed, 1);
1816 
1817         if (ms_setting.verbose)
1818         {
1819           fprintf(stderr,
1820                   "\n<%d data verification failed\n"
1821                   "\tkey len: %d\n"
1822                   "\tkey: %" PRIx64" %.*s\n"
1823                   "\texpected data len: %d\n"
1824                   "\texpected data: %.*s\n"
1825                   "\treceived data len: %d\n"
1826                   "\treceived data: %.*s\n",
1827                   c->sfd,
1828                   c->curr_task.item->key_size,
1829                   c->curr_task.item->key_prefix,
1830                   c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1831                   orignkey,
1832                   c->curr_task.item->value_size,
1833                   c->curr_task.item->value_size,
1834                   orignval,
1835                   vlen,
1836                   vlen,
1837                   value);
1838           fflush(stderr);
1839         }
1840       }
1841     }
1842 
1843     c->curr_task.finish_verify= true;
1844 
1845     if (mlget_item != NULL)
1846     {
1847       mlget_item->finish_verify= true;
1848     }
1849   }
1850 } /* ms_verify_value */
1851 
1852 
1853 /**
1854  * For ASCII protocol, after store the data into the local
1855  * buffer, run this function to handle the data.
1856  *
1857  * @param c, pointer of the concurrency
1858  */
ms_ascii_complete_nread(ms_conn_t * c)1859 static void ms_ascii_complete_nread(ms_conn_t *c)
1860 {
1861   assert(c != NULL);
1862   assert(c->rbytes >= c->rvbytes);
1863   assert(c->protocol == ascii_prot);
1864   if (c->rvbytes > 2)
1865   {
1866     assert(
1867       c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1868   }
1869 
1870   /* multi-get */
1871   ms_mlget_task_item_t *mlget_item= NULL;
1872   if (((ms_setting.mult_key_num > 1)
1873        && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1874       || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1875   {
1876     c->mlget_task.value_index++;
1877     mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1878 
1879     if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1880     {
1881       c->curr_task.item= mlget_item->item;
1882       c->curr_task.verify= mlget_item->verify;
1883       c->curr_task.finish_verify= mlget_item->finish_verify;
1884       mlget_item->get_miss= false;
1885     }
1886     else
1887     {
1888       /* Try to find the task item in multi-get task array */
1889       for (int i= 0; i < c->mlget_task.mlget_num; i++)
1890       {
1891         mlget_item= &c->mlget_task.mlget_item[i];
1892         if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1893         {
1894           c->curr_task.item= mlget_item->item;
1895           c->curr_task.verify= mlget_item->verify;
1896           c->curr_task.finish_verify= mlget_item->finish_verify;
1897           mlget_item->get_miss= false;
1898 
1899           break;
1900         }
1901       }
1902     }
1903   }
1904 
1905   ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1906 
1907   c->curr_task.get_miss= false;
1908   c->rbytes-= c->rvbytes;
1909   c->rcurr= c->rcurr + c->rvbytes;
1910   assert(c->rcurr <= (c->rbuf + c->rsize));
1911   c->readval= false;
1912   c->rvbytes= 0;
1913 } /* ms_ascii_complete_nread */
1914 
1915 
1916 /**
1917  * For binary protocol, after store the data into the local
1918  * buffer, run this function to handle the data.
1919  *
1920  * @param c, pointer of the concurrency
1921  */
ms_bin_complete_nread(ms_conn_t * c)1922 static void ms_bin_complete_nread(ms_conn_t *c)
1923 {
1924   assert(c != NULL);
1925   assert(c->rbytes >= c->rvbytes);
1926   assert(c->protocol == binary_prot);
1927 
1928   int extlen= c->binary_header.response.extlen;
1929   int keylen= c->binary_header.response.keylen;
1930   uint8_t opcode= c->binary_header.response.opcode;
1931 
1932   /* not get command or not include value, just return */
1933   if (((opcode != PROTOCOL_BINARY_CMD_GET)
1934        && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1935       || (c->rvbytes <= extlen + keylen))
1936   {
1937     /* get miss */
1938     if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1939     {
1940       c->currcmd.retstat= MCD_END;
1941       c->curr_task.get_miss= true;
1942     }
1943 
1944     c->readval= false;
1945     c->rvbytes= 0;
1946     ms_reset_conn(c, false);
1947     return;
1948   }
1949 
1950   /* multi-get */
1951   ms_mlget_task_item_t *mlget_item= NULL;
1952   if (((ms_setting.mult_key_num > 1)
1953        && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1954       || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1955   {
1956     c->mlget_task.value_index++;
1957     mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1958 
1959     c->curr_task.item= mlget_item->item;
1960     c->curr_task.verify= mlget_item->verify;
1961     c->curr_task.finish_verify= mlget_item->finish_verify;
1962     mlget_item->get_miss= false;
1963   }
1964 
1965   ms_verify_value(c,
1966                   mlget_item,
1967                   c->rcurr + extlen + keylen,
1968                   c->rvbytes - extlen - keylen);
1969 
1970   c->currcmd.retstat= MCD_END;
1971   c->curr_task.get_miss= false;
1972   c->rbytes-= c->rvbytes;
1973   c->rcurr= c->rcurr + c->rvbytes;
1974   assert(c->rcurr <= (c->rbuf + c->rsize));
1975   c->readval= false;
1976   c->rvbytes= 0;
1977 
1978   if (ms_setting.mult_key_num > 1)
1979   {
1980     /* multi-get have check all the item */
1981     if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1982     {
1983       ms_reset_conn(c, false);
1984     }
1985   }
1986   else
1987   {
1988     /* single get */
1989     ms_reset_conn(c, false);
1990   }
1991 } /* ms_bin_complete_nread */
1992 
1993 
1994 /**
1995  * we get here after reading the value of get commands.
1996  *
1997  * @param c, pointer of the concurrency
1998  */
ms_complete_nread(ms_conn_t * c)1999 static void ms_complete_nread(ms_conn_t *c)
2000 {
2001   assert(c != NULL);
2002   assert(c->rbytes >= c->rvbytes);
2003   assert(c->protocol == ascii_prot
2004          || c->protocol == binary_prot);
2005 
2006   if (c->protocol == binary_prot)
2007   {
2008     ms_bin_complete_nread(c);
2009   }
2010   else
2011   {
2012     ms_ascii_complete_nread(c);
2013   }
2014 } /* ms_complete_nread */
2015 
2016 
2017 /**
2018  * Adds a message header to a connection.
2019  *
2020  * @param c, pointer of the concurrency
2021  *
2022  * @return int, if success, return EXIT_SUCCESS, else return -1
2023  */
ms_add_msghdr(ms_conn_t * c)2024 static int ms_add_msghdr(ms_conn_t *c)
2025 {
2026   struct msghdr *msg;
2027 
2028   assert(c != NULL);
2029 
2030   if (c->msgsize == c->msgused)
2031   {
2032     msg=
2033       realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2034     if (! msg)
2035       return -1;
2036 
2037     c->msglist= msg;
2038     c->msgsize*= 2;
2039   }
2040 
2041   msg= c->msglist + c->msgused;
2042 
2043   /**
2044    *  this wipes msg_iovlen, msg_control, msg_controllen, and
2045    *  msg_flags, the last 3 of which aren't defined on solaris:
2046    */
2047   memset(msg, 0, sizeof(struct msghdr));
2048 
2049   msg->msg_iov= &c->iov[c->iovused];
2050 
2051   if (c->udp && (c->srv_recv_addr_size > 0))
2052   {
2053     msg->msg_name= &c->srv_recv_addr;
2054     msg->msg_namelen= c->srv_recv_addr_size;
2055   }
2056 
2057   c->msgbytes= 0;
2058   c->msgused++;
2059 
2060   if (c->udp)
2061   {
2062     /* Leave room for the UDP header, which we'll fill in later. */
2063     return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2064   }
2065 
2066   return EXIT_SUCCESS;
2067 } /* ms_add_msghdr */
2068 
2069 
2070 /**
2071  * Ensures that there is room for another structure iovec in a connection's
2072  * iov list.
2073  *
2074  * @param c, pointer of the concurrency
2075  *
2076  * @return int, if success, return EXIT_SUCCESS, else return -1
2077  */
ms_ensure_iov_space(ms_conn_t * c)2078 static int ms_ensure_iov_space(ms_conn_t *c)
2079 {
2080   assert(c != NULL);
2081 
2082   if (c->iovused >= c->iovsize)
2083   {
2084     int i, iovnum;
2085     struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2086                                                    ((size_t)c->iovsize
2087                                                     * 2)
2088                                                    * sizeof(struct iovec));
2089     if (! new_iov)
2090       return -1;
2091 
2092     c->iov= new_iov;
2093     c->iovsize*= 2;
2094 
2095     /* Point all the msghdr structures at the new list. */
2096     for (i= 0, iovnum= 0; i < c->msgused; i++)
2097     {
2098       c->msglist[i].msg_iov= &c->iov[iovnum];
2099       iovnum+= (int)c->msglist[i].msg_iovlen;
2100     }
2101   }
2102 
2103   return EXIT_SUCCESS;
2104 } /* ms_ensure_iov_space */
2105 
2106 
2107 /**
2108  * Adds data to the list of pending data that will be written out to a
2109  * connection.
2110  *
2111  * @param c, pointer of the concurrency
2112  * @param buf, the buffer includes data to send
2113  * @param len, the data length in the buffer
2114  *
2115  * @return int, if success, return EXIT_SUCCESS, else return -1
2116  */
ms_add_iov(ms_conn_t * c,const void * buf,int len)2117 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2118 {
2119   struct msghdr *m;
2120   int  leftover;
2121   bool limit_to_mtu;
2122 
2123   assert(c != NULL);
2124 
2125   do
2126   {
2127     m= &c->msglist[c->msgused - 1];
2128 
2129     /*
2130      * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2131      */
2132     limit_to_mtu= c->udp;
2133 
2134 #ifdef IOV_MAX
2135     /* We may need to start a new msghdr if this one is full. */
2136     if ((m->msg_iovlen == IOV_MAX)
2137         || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2138     {
2139       ms_add_msghdr(c);
2140       m= &c->msglist[c->msgused - 1];
2141     }
2142 #endif
2143 
2144     if (ms_ensure_iov_space(c) != 0)
2145       return -1;
2146 
2147     /* If the fragment is too big to fit in the datagram, split it up */
2148     if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2149     {
2150       leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2151       len-= leftover;
2152     }
2153     else
2154     {
2155       leftover= 0;
2156     }
2157 
2158     m= &c->msglist[c->msgused - 1];
2159     m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2160     m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2161 
2162     c->msgbytes+= len;
2163     c->iovused++;
2164     m->msg_iovlen++;
2165 
2166     buf= ((char *)buf) + len;
2167     len= leftover;
2168   }
2169   while (leftover > 0);
2170 
2171   return EXIT_SUCCESS;
2172 } /* ms_add_iov */
2173 
2174 
2175 /**
2176  * Constructs a set of UDP headers and attaches them to the outgoing messages.
2177  *
2178  * @param c, pointer of the concurrency
2179  *
2180  * @return int, if success, return EXIT_SUCCESS, else return -1
2181  */
ms_build_udp_headers(ms_conn_t * c)2182 static int ms_build_udp_headers(ms_conn_t *c)
2183 {
2184   int i;
2185   unsigned char *hdr;
2186 
2187   assert(c != NULL);
2188 
2189   c->request_id= ms_get_udp_request_id();
2190 
2191   if (c->msgused > c->hdrsize)
2192   {
2193     void *new_hdrbuf;
2194     if (c->hdrbuf)
2195       new_hdrbuf= realloc(c->hdrbuf,
2196                           (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2197     else
2198       new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2199     if (! new_hdrbuf)
2200       return -1;
2201 
2202     c->hdrbuf= (unsigned char *)new_hdrbuf;
2203     c->hdrsize= c->msgused * 2;
2204   }
2205 
2206   /* If this is a multi-packet request, drop it. */
2207   if (c->udp && (c->msgused > 1))
2208   {
2209     fprintf(stderr, "multi-packet request for UDP not supported.\n");
2210     return -1;
2211   }
2212 
2213   hdr= c->hdrbuf;
2214   for (i= 0; i < c->msgused; i++)
2215   {
2216     c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2217     c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2218     *hdr++= (unsigned char)(c->request_id / 256);
2219     *hdr++= (unsigned char)(c->request_id % 256);
2220     *hdr++= (unsigned char)(i / 256);
2221     *hdr++= (unsigned char)(i % 256);
2222     *hdr++= (unsigned char)(c->msgused / 256);
2223     *hdr++= (unsigned char)(c->msgused % 256);
2224     *hdr++= (unsigned char)1;          /* support facebook memcached */
2225     *hdr++= (unsigned char)0;
2226     assert(hdr ==
2227            ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2228             + UDP_HEADER_SIZE));
2229   }
2230 
2231   return EXIT_SUCCESS;
2232 } /* ms_build_udp_headers */
2233 
2234 
2235 /**
2236  * Transmit the next chunk of data from our list of msgbuf structures.
2237  *
2238  * @param c, pointer of the concurrency
2239  *
2240  * @return  TRANSMIT_COMPLETE   All done writing.
2241  *          TRANSMIT_INCOMPLETE More data remaining to write.
2242  *          TRANSMIT_SOFT_ERROR Can't write any more right now.
2243  *          TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2244  */
ms_transmit(ms_conn_t * c)2245 static int ms_transmit(ms_conn_t *c)
2246 {
2247   assert(c != NULL);
2248 
2249   if ((c->msgcurr < c->msgused)
2250       && (c->msglist[c->msgcurr].msg_iovlen == 0))
2251   {
2252     /* Finished writing the current msg; advance to the next. */
2253     c->msgcurr++;
2254   }
2255 
2256   if (c->msgcurr < c->msgused)
2257   {
2258     ssize_t res;
2259     struct msghdr *m= &c->msglist[c->msgcurr];
2260 
2261     res= sendmsg(c->sfd, m, 0);
2262     if (res > 0)
2263     {
2264       atomic_add_size(&ms_stats.bytes_written, res);
2265 
2266       /* We've written some of the data. Remove the completed
2267        *  iovec entries from the list of pending writes. */
2268       while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2269       {
2270         res-= (ssize_t)m->msg_iov->iov_len;
2271         m->msg_iovlen--;
2272         m->msg_iov++;
2273       }
2274 
2275       /* Might have written just part of the last iovec entry;
2276        *  adjust it so the next write will do the rest. */
2277       if (res > 0)
2278       {
2279         m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2280         m->msg_iov->iov_len-= (size_t)res;
2281       }
2282       return TRANSMIT_INCOMPLETE;
2283     }
2284     if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2285     {
2286       if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2287       {
2288         fprintf(stderr, "Couldn't update event.\n");
2289         ms_conn_set_state(c, conn_closing);
2290         return TRANSMIT_HARD_ERROR;
2291       }
2292       return TRANSMIT_SOFT_ERROR;
2293     }
2294 
2295     /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2296      *  we have a real error, on which we close the connection */
2297     fprintf(stderr, "Failed to write, and not due to blocking.\n");
2298 
2299     ms_conn_set_state(c, conn_closing);
2300     return TRANSMIT_HARD_ERROR;
2301   }
2302   else
2303   {
2304     return TRANSMIT_COMPLETE;
2305   }
2306 } /* ms_transmit */
2307 
2308 
2309 /**
2310  * Shrinks a connection's buffers if they're too big.  This prevents
2311  * periodic large "mget" response from server chewing lots of client
2312  * memory.
2313  *
2314  * This should only be called in between requests since it can wipe output
2315  * buffers!
2316  *
2317  * @param c, pointer of the concurrency
2318  */
ms_conn_shrink(ms_conn_t * c)2319 static void ms_conn_shrink(ms_conn_t *c)
2320 {
2321   assert(c != NULL);
2322 
2323   if (c->udp)
2324     return;
2325 
2326   if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2327   {
2328     char *newbuf;
2329 
2330     if (c->rcurr != c->rbuf)
2331       memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2332 
2333     newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2334 
2335     if (newbuf)
2336     {
2337       c->rbuf= newbuf;
2338       c->rsize= DATA_BUFFER_SIZE;
2339     }
2340     c->rcurr= c->rbuf;
2341   }
2342 
2343   if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2344       && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2345   {
2346     char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2347     if (! new_rbuf)
2348     {
2349       c->rudpbuf= new_rbuf;
2350       c->rudpsize= UDP_DATA_BUFFER_SIZE;
2351     }
2352     /* TODO check error condition? */
2353   }
2354 
2355   if (c->msgsize > MSG_LIST_HIGHWAT)
2356   {
2357     struct msghdr *newbuf= (struct msghdr *)realloc(
2358       (void *)c->msglist,
2359       MSG_LIST_INITIAL
2360       * sizeof(c->msglist[0]));
2361     if (newbuf)
2362     {
2363       c->msglist= newbuf;
2364       c->msgsize= MSG_LIST_INITIAL;
2365     }
2366     /* TODO check error condition? */
2367   }
2368 
2369   if (c->iovsize > IOV_LIST_HIGHWAT)
2370   {
2371     struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2372                                                   IOV_LIST_INITIAL
2373                                                   * sizeof(c->iov[0]));
2374     if (newbuf)
2375     {
2376       c->iov= newbuf;
2377       c->iovsize= IOV_LIST_INITIAL;
2378     }
2379     /* TODO check return value */
2380   }
2381 } /* ms_conn_shrink */
2382 
2383 
2384 /**
2385  * Sets a connection's current state in the state machine. Any special
2386  * processing that needs to happen on certain state transitions can
2387  * happen here.
2388  *
2389  * @param c, pointer of the concurrency
2390  * @param state, connection state
2391  */
ms_conn_set_state(ms_conn_t * c,int state)2392 static void ms_conn_set_state(ms_conn_t *c, int state)
2393 {
2394   assert(c != NULL);
2395 
2396   if (state != c->state)
2397   {
2398     if (state == conn_read)
2399     {
2400       ms_conn_shrink(c);
2401     }
2402     c->state= state;
2403   }
2404 } /* ms_conn_set_state */
2405 
2406 
2407 /**
2408  * update the event if socks change state. for example: when
2409  * change the listen scoket read event to sock write event, or
2410  * change socket handler, we could call this function.
2411  *
2412  * @param c, pointer of the concurrency
2413  * @param new_flags, new event flags
2414  *
2415  * @return bool, if success, return true, else return false
2416  */
ms_update_event(ms_conn_t * c,const int new_flags)2417 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2418 {
2419   assert(c != NULL);
2420 
2421   struct event_base *base= c->event.ev_base;
2422   if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2423       && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2424   {
2425     return true;
2426   }
2427 
2428   if (event_del(&c->event) == -1)
2429   {
2430     /* try to delete the event again */
2431     if (event_del(&c->event) == -1)
2432     {
2433       return false;
2434     }
2435   }
2436 
2437   event_set(&c->event,
2438             c->sfd,
2439             (short)new_flags,
2440             ms_event_handler,
2441             (void *)c);
2442   event_base_set(base, &c->event);
2443   c->ev_flags= (short)new_flags;
2444 
2445   if (event_add(&c->event, NULL) == -1)
2446   {
2447     return false;
2448   }
2449 
2450   return true;
2451 } /* ms_update_event */
2452 
2453 
2454 /**
2455  * If user want to get the expected throughput, we could limit
2456  * the performance of memslap. we could give up some work and
2457  * just wait a short time. The function is used to check this
2458  * case.
2459  *
2460  * @param c, pointer of the concurrency
2461  *
2462  * @return bool, if success, return true, else return false
2463  */
ms_need_yield(ms_conn_t * c)2464 static bool ms_need_yield(ms_conn_t *c)
2465 {
2466   ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2467   int64_t tps= 0;
2468   int64_t time_diff= 0;
2469   struct timeval curr_time;
2470   ms_task_t *task= &c->curr_task;
2471 
2472   if (ms_setting.expected_tps > 0)
2473   {
2474     gettimeofday(&curr_time, NULL);
2475     time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2476     tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
2477 
2478     /* current throughput is greater than expected throughput */
2479     if (tps > ms_thread->thread_ctx->tps_perconn)
2480     {
2481       return true;
2482     }
2483   }
2484 
2485   return false;
2486 } /* ms_need_yield */
2487 
2488 
2489 /**
2490  * used to update the start time of each operation
2491  *
2492  * @param c, pointer of the concurrency
2493  */
ms_update_start_time(ms_conn_t * c)2494 static void ms_update_start_time(ms_conn_t *c)
2495 {
2496   ms_task_item_t *item= c->curr_task.item;
2497 
2498   if ((ms_setting.stat_freq > 0) || c->udp
2499       || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2500   {
2501     gettimeofday(&c->start_time, NULL);
2502     if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2503     {
2504       /* record the current time */
2505       item->client_time= c->start_time.tv_sec;
2506     }
2507   }
2508 } /* ms_update_start_time */
2509 
2510 
2511 /**
2512  * run the state machine
2513  *
2514  * @param c, pointer of the concurrency
2515  */
ms_drive_machine(ms_conn_t * c)2516 static void ms_drive_machine(ms_conn_t *c)
2517 {
2518   bool stop= false;
2519 
2520   assert(c != NULL);
2521 
2522   while (! stop)
2523   {
2524     switch (c->state)
2525     {
2526     case conn_read:
2527       if (c->readval)
2528       {
2529         if (c->rbytes >= c->rvbytes)
2530         {
2531           ms_complete_nread(c);
2532           break;
2533         }
2534       }
2535       else
2536       {
2537         if (ms_try_read_line(c) != 0)
2538         {
2539           break;
2540         }
2541       }
2542 
2543       if (ms_try_read_network(c) != 0)
2544       {
2545         break;
2546       }
2547 
2548       /* doesn't read all the response data, wait event wake up */
2549       if (! c->currcmd.isfinish)
2550       {
2551         if (! ms_update_event(c, EV_READ | EV_PERSIST))
2552         {
2553           fprintf(stderr, "Couldn't update event.\n");
2554           ms_conn_set_state(c, conn_closing);
2555           break;
2556         }
2557         stop= true;
2558         break;
2559       }
2560 
2561       /* we have no command line and no data to read from network, next write */
2562       ms_conn_set_state(c, conn_write);
2563       memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t));        /* replicate command state */
2564 
2565       break;
2566 
2567     case conn_write:
2568       if (! c->ctnwrite && ms_need_yield(c))
2569       {
2570         usleep(10);
2571 
2572         if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2573         {
2574           fprintf(stderr, "Couldn't update event.\n");
2575           ms_conn_set_state(c, conn_closing);
2576           break;
2577         }
2578         stop= true;
2579         break;
2580       }
2581 
2582       if (! c->ctnwrite && (ms_exec_task(c) != 0))
2583       {
2584         ms_conn_set_state(c, conn_closing);
2585         break;
2586       }
2587 
2588       /* record the start time before starting to send data if necessary */
2589       if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2590       {
2591         if (c->change_sfd)
2592         {
2593           c->change_sfd= false;
2594         }
2595         ms_update_start_time(c);
2596       }
2597 
2598       /* change sfd if necessary */
2599       if (c->change_sfd)
2600       {
2601         c->ctnwrite= true;
2602         stop= true;
2603         break;
2604       }
2605 
2606       /* execute task until nothing need be written to network */
2607       if (! c->ctnwrite && (c->msgcurr == c->msgused))
2608       {
2609         if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2610         {
2611           fprintf(stderr, "Couldn't update event.\n");
2612           ms_conn_set_state(c, conn_closing);
2613           break;
2614         }
2615         stop= true;
2616         break;
2617       }
2618 
2619       switch (ms_transmit(c))
2620       {
2621       case TRANSMIT_COMPLETE:
2622         /* we have no data to write to network, next wait repose */
2623         if (! ms_update_event(c, EV_READ | EV_PERSIST))
2624         {
2625           fprintf(stderr, "Couldn't update event.\n");
2626           ms_conn_set_state(c, conn_closing);
2627           c->ctnwrite= false;
2628           break;
2629         }
2630         ms_conn_set_state(c, conn_read);
2631         c->ctnwrite= false;
2632         stop= true;
2633         break;
2634 
2635       case TRANSMIT_INCOMPLETE:
2636         c->ctnwrite= true;
2637         break;                           /* Continue in state machine. */
2638 
2639       case TRANSMIT_HARD_ERROR:
2640         c->ctnwrite= false;
2641         break;
2642 
2643       case TRANSMIT_SOFT_ERROR:
2644         c->ctnwrite= true;
2645         stop= true;
2646         break;
2647 
2648       default:
2649         break;
2650       } /* switch */
2651 
2652       break;
2653 
2654     case conn_closing:
2655       /* recovery mode, need reconnect if connection close */
2656       if (ms_setting.reconnect && (! ms_global.time_out
2657                                    || ((ms_setting.run_time == 0)
2658                                        && (c->remain_exec_num > 0))))
2659       {
2660         if (ms_reconn(c) != 0)
2661         {
2662           ms_conn_close(c);
2663           stop= true;
2664           break;
2665         }
2666 
2667         ms_reset_conn(c, false);
2668 
2669         if (c->total_sfds == 1)
2670         {
2671           if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2672           {
2673             fprintf(stderr, "Couldn't update event.\n");
2674             ms_conn_set_state(c, conn_closing);
2675             break;
2676           }
2677         }
2678 
2679         break;
2680       }
2681       else
2682       {
2683         ms_conn_close(c);
2684         stop= true;
2685         break;
2686       }
2687 
2688     default:
2689       assert(0);
2690     } /* switch */
2691   }
2692 } /* ms_drive_machine */
2693 
2694 
2695 /**
2696  * the event handler of each thread
2697  *
2698  * @param fd, the file descriptor of socket
2699  * @param which, event flag
2700  * @param arg, argument
2701  */
ms_event_handler(const int fd,const short which,void * arg)2702 void ms_event_handler(const int fd, const short which, void *arg)
2703 {
2704   ms_conn_t *c= (ms_conn_t *)arg;
2705 
2706   assert(c != NULL);
2707 
2708   c->which= which;
2709 
2710   /* sanity */
2711   if (fd != c->sfd)
2712   {
2713     fprintf(stderr,
2714             "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2715             fd,
2716             c->sfd);
2717     ms_conn_close(c);
2718     exit(1);
2719   }
2720   assert(fd == c->sfd);
2721 
2722   ms_drive_machine(c);
2723 
2724   /* wait for next event */
2725 } /* ms_event_handler */
2726 
2727 
2728 /**
2729  * get the next socket descriptor index to run for replication
2730  *
2731  * @param c, pointer of the concurrency
2732  * @param cmd, command(get or set )
2733  *
2734  * @return int, if success, return the index, else return EXIT_SUCCESS
2735  */
ms_get_rep_sock_index(ms_conn_t * c,int cmd)2736 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2737 {
2738   uint32_t sock_index= 0;
2739   uint32_t i= 0;
2740 
2741   if (c->total_sfds == 1)
2742   {
2743     return EXIT_SUCCESS;
2744   }
2745 
2746   if (ms_setting.rep_write_srv == 0)
2747   {
2748     return sock_index;
2749   }
2750 
2751   do
2752   {
2753     if (cmd == CMD_SET)
2754     {
2755       for (i= 0; i < ms_setting.rep_write_srv; i++)
2756       {
2757         if (c->tcpsfd[i] > 0)
2758         {
2759           break;
2760         }
2761       }
2762 
2763       if (i == ms_setting.rep_write_srv)
2764       {
2765         /* random get one replication server to read */
2766         sock_index= (uint32_t)random() % c->total_sfds;
2767       }
2768       else
2769       {
2770         /* random get one replication writing server to write */
2771         sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
2772       }
2773     }
2774     else if (cmd == CMD_GET)
2775     {
2776       /* random get one replication server to read */
2777       sock_index= (uint32_t)random() % c->total_sfds;
2778     }
2779   }
2780   while (c->tcpsfd[sock_index] == 0);
2781 
2782   return sock_index;
2783 } /* ms_get_rep_sock_index */
2784 
2785 
2786 /**
2787  * get the next socket descriptor index to run
2788  *
2789  * @param c, pointer of the concurrency
2790  *
2791  * @return int, return the index
2792  */
ms_get_next_sock_index(ms_conn_t * c)2793 static uint32_t ms_get_next_sock_index(ms_conn_t *c)
2794 {
2795   uint32_t sock_index= 0;
2796 
2797   do
2798   {
2799     sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2800   }
2801   while (c->tcpsfd[sock_index] == 0);
2802 
2803   return sock_index;
2804 } /* ms_get_next_sock_index */
2805 
2806 
2807 /**
2808  * update socket event of the connections
2809  *
2810  * @param c, pointer of the concurrency
2811  *
2812  * @return int, if success, return EXIT_SUCCESS, else return -1
2813  */
ms_update_conn_sock_event(ms_conn_t * c)2814 static int ms_update_conn_sock_event(ms_conn_t *c)
2815 {
2816   assert(c != NULL);
2817 
2818   switch (c->currcmd.cmd)
2819   {
2820   case CMD_SET:
2821     if (ms_setting.facebook_test && c->udp)
2822     {
2823       c->sfd= c->tcpsfd[0];
2824       c->udp= false;
2825       c->change_sfd= true;
2826     }
2827     break;
2828 
2829   case CMD_GET:
2830     if (ms_setting.facebook_test && ! c->udp)
2831     {
2832       c->sfd= c->udpsfd;
2833       c->udp= true;
2834       c->change_sfd= true;
2835     }
2836     break;
2837 
2838   default:
2839     break;
2840   } /* switch */
2841 
2842   if (! c->udp && (c->total_sfds > 1))
2843   {
2844     if (c->cur_idx != c->total_sfds)
2845     {
2846       if (ms_setting.rep_write_srv == 0)
2847       {
2848         c->cur_idx= ms_get_next_sock_index(c);
2849       }
2850       else
2851       {
2852         c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2853       }
2854     }
2855     else
2856     {
2857       /* must select the first sock of the connection at the beginning */
2858       c->cur_idx= 0;
2859     }
2860 
2861     c->sfd= c->tcpsfd[c->cur_idx];
2862     assert(c->sfd != 0);
2863     c->change_sfd= true;
2864   }
2865 
2866   if (c->change_sfd)
2867   {
2868     if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2869     {
2870       fprintf(stderr, "Couldn't update event.\n");
2871       ms_conn_set_state(c, conn_closing);
2872       return -1;
2873     }
2874   }
2875 
2876   return EXIT_SUCCESS;
2877 } /* ms_update_conn_sock_event */
2878 
2879 
2880 /**
2881  * for ASCII protocol, this function build the set command
2882  * string and send the command.
2883  *
2884  * @param c, pointer of the concurrency
2885  * @param item, pointer of task item which includes the object
2886  *            information
2887  *
2888  * @return int, if success, return EXIT_SUCCESS, else return -1
2889  */
ms_build_ascii_write_buf_set(ms_conn_t * c,ms_task_item_t * item)2890 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2891 {
2892   int value_offset;
2893   int write_len;
2894   char *buffer= c->wbuf;
2895 
2896   write_len= snprintf(buffer,
2897                       c->wsize,
2898                       " %u %d %d\r\n",
2899                       0,
2900                       item->exp_time,
2901                       item->value_size);
2902 
2903   if (write_len > c->wsize || write_len < 0)
2904   {
2905     /* ought to be always enough. just fail for simplicity */
2906     fprintf(stderr, "output command line too long.\n");
2907     return -1;
2908   }
2909 
2910   if (item->value_offset == INVALID_OFFSET)
2911   {
2912     value_offset= item->key_suffix_offset;
2913   }
2914   else
2915   {
2916     value_offset= item->value_offset;
2917   }
2918 
2919   if ((ms_add_iov(c, "set ", 4) != 0)
2920       || (ms_add_iov(c, (char *)&item->key_prefix,
2921                      (int)KEY_PREFIX_SIZE) != 0)
2922       || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2923                      item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2924       || (ms_add_iov(c, buffer, write_len) != 0)
2925       || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2926                      item->value_size) != 0)
2927       || (ms_add_iov(c, "\r\n", 2) != 0)
2928       || (c->udp && (ms_build_udp_headers(c) != 0)))
2929   {
2930     return -1;
2931   }
2932 
2933   return EXIT_SUCCESS;
2934 } /* ms_build_ascii_write_buf_set */
2935 
2936 
2937 /**
2938  * used to send set command to server
2939  *
2940  * @param c, pointer of the concurrency
2941  * @param item, pointer of task item which includes the object
2942  *            information
2943  *
2944  * @return int, if success, return EXIT_SUCCESS, else return -1
2945  */
ms_mcd_set(ms_conn_t * c,ms_task_item_t * item)2946 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2947 {
2948   assert(c != NULL);
2949 
2950   c->currcmd.cmd= CMD_SET;
2951   c->currcmd.isfinish= false;
2952   c->currcmd.retstat= MCD_FAILURE;
2953 
2954   if (ms_update_conn_sock_event(c) != 0)
2955   {
2956     return -1;
2957   }
2958 
2959   c->msgcurr= 0;
2960   c->msgused= 0;
2961   c->iovused= 0;
2962   if (ms_add_msghdr(c) != 0)
2963   {
2964     fprintf(stderr, "Out of memory preparing request.");
2965     return -1;
2966   }
2967 
2968   /* binary protocol */
2969   if (c->protocol == binary_prot)
2970   {
2971     if (ms_build_bin_write_buf_set(c, item) != 0)
2972     {
2973       return -1;
2974     }
2975   }
2976   else
2977   {
2978     if (ms_build_ascii_write_buf_set(c, item) != 0)
2979     {
2980       return -1;
2981     }
2982   }
2983 
2984   atomic_add_size(&ms_stats.obj_bytes,
2985                   item->key_size + item->value_size);
2986   atomic_add_size(&ms_stats.cmd_set, 1);
2987 
2988   return EXIT_SUCCESS;
2989 } /* ms_mcd_set */
2990 
2991 
2992 /**
2993  * for ASCII protocol, this function build the get command
2994  * string and send the command.
2995  *
2996  * @param c, pointer of the concurrency
2997  * @param item, pointer of task item which includes the object
2998  *            information
2999  *
3000  * @return int, if success, return EXIT_SUCCESS, else return -1
3001  */
ms_build_ascii_write_buf_get(ms_conn_t * c,ms_task_item_t * item)3002 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3003 {
3004   if ((ms_add_iov(c, "get ", 4) != 0)
3005       || (ms_add_iov(c, (char *)&item->key_prefix,
3006                      (int)KEY_PREFIX_SIZE) != 0)
3007       || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3008                      item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3009       || (ms_add_iov(c, "\r\n", 2) != 0)
3010       || (c->udp && (ms_build_udp_headers(c) != 0)))
3011   {
3012     return -1;
3013   }
3014 
3015   return EXIT_SUCCESS;
3016 } /* ms_build_ascii_write_buf_get */
3017 
3018 
3019 /**
3020  * used to send the get command to server
3021  *
3022  * @param c, pointer of the concurrency
3023  * @param item, pointer of task item which includes the object
3024  *            information
3025  *
3026  * @return int, if success, return EXIT_SUCCESS, else return -1
3027  */
ms_mcd_get(ms_conn_t * c,ms_task_item_t * item)3028 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
3029 {
3030   assert(c != NULL);
3031 
3032   c->currcmd.cmd= CMD_GET;
3033   c->currcmd.isfinish= false;
3034   c->currcmd.retstat= MCD_FAILURE;
3035 
3036   if (ms_update_conn_sock_event(c) != 0)
3037   {
3038     return -1;
3039   }
3040 
3041   c->msgcurr= 0;
3042   c->msgused= 0;
3043   c->iovused= 0;
3044   if (ms_add_msghdr(c) != 0)
3045   {
3046     fprintf(stderr, "Out of memory preparing request.");
3047     return -1;
3048   }
3049 
3050   /* binary protocol */
3051   if (c->protocol == binary_prot)
3052   {
3053     if (ms_build_bin_write_buf_get(c, item) != 0)
3054     {
3055       return -1;
3056     }
3057   }
3058   else
3059   {
3060     if (ms_build_ascii_write_buf_get(c, item) != 0)
3061     {
3062       return -1;
3063     }
3064   }
3065 
3066   atomic_add_size(&ms_stats.cmd_get, 1);
3067 
3068   return EXIT_SUCCESS;
3069 } /* ms_mcd_get */
3070 
3071 
3072 /**
3073  * for ASCII protocol, this function build the multi-get command
3074  * string and send the command.
3075  *
3076  * @param c, pointer of the concurrency
3077  *
3078  * @return int, if success, return EXIT_SUCCESS, else return -1
3079  */
ms_build_ascii_write_buf_mlget(ms_conn_t * c)3080 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3081 {
3082   ms_task_item_t *item;
3083 
3084   if (ms_add_iov(c, "get", 3) != 0)
3085   {
3086     return -1;
3087   }
3088 
3089   for (int i= 0; i < c->mlget_task.mlget_num; i++)
3090   {
3091     item= c->mlget_task.mlget_item[i].item;
3092     assert(item != NULL);
3093     if ((ms_add_iov(c, " ", 1) != 0)
3094         || (ms_add_iov(c, (char *)&item->key_prefix,
3095                        (int)KEY_PREFIX_SIZE) != 0)
3096         || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3097                        item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3098     {
3099       return -1;
3100     }
3101   }
3102 
3103   if ((ms_add_iov(c, "\r\n", 2) != 0)
3104       || (c->udp && (ms_build_udp_headers(c) != 0)))
3105   {
3106     return -1;
3107   }
3108 
3109   return EXIT_SUCCESS;
3110 } /* ms_build_ascii_write_buf_mlget */
3111 
3112 
3113 /**
3114  * used to send the multi-get command to server
3115  *
3116  * @param c, pointer of the concurrency
3117  *
3118  * @return int, if success, return EXIT_SUCCESS, else return -1
3119  */
ms_mcd_mlget(ms_conn_t * c)3120 int ms_mcd_mlget(ms_conn_t *c)
3121 {
3122   ms_task_item_t *item;
3123 
3124   assert(c != NULL);
3125   assert(c->mlget_task.mlget_num >= 1);
3126 
3127   c->currcmd.cmd= CMD_GET;
3128   c->currcmd.isfinish= false;
3129   c->currcmd.retstat= MCD_FAILURE;
3130 
3131   if (ms_update_conn_sock_event(c) != 0)
3132   {
3133     return -1;
3134   }
3135 
3136   c->msgcurr= 0;
3137   c->msgused= 0;
3138   c->iovused= 0;
3139   if (ms_add_msghdr(c) != 0)
3140   {
3141     fprintf(stderr, "Out of memory preparing request.");
3142     return -1;
3143   }
3144 
3145   /* binary protocol */
3146   if (c->protocol == binary_prot)
3147   {
3148     if (ms_build_bin_write_buf_mlget(c) != 0)
3149     {
3150       return -1;
3151     }
3152   }
3153   else
3154   {
3155     if (ms_build_ascii_write_buf_mlget(c) != 0)
3156     {
3157       return -1;
3158     }
3159   }
3160 
3161   /* decrease operation time of each item */
3162   for (int i= 0; i < c->mlget_task.mlget_num; i++)
3163   {
3164     item= c->mlget_task.mlget_item[i].item;
3165     atomic_add_size(&ms_stats.cmd_get, 1);
3166   }
3167 
3168   (void)item;
3169 
3170   return EXIT_SUCCESS;
3171 } /* ms_mcd_mlget */
3172 
3173 
3174 /**
3175  * binary protocol support
3176  */
3177 
3178 /**
3179  * for binary protocol, parse the response of server
3180  *
3181  * @param c, pointer of the concurrency
3182  *
3183  * @return int, if success, return EXIT_SUCCESS, else return -1
3184  */
ms_bin_process_response(ms_conn_t * c)3185 static int ms_bin_process_response(ms_conn_t *c)
3186 {
3187   const char *errstr= NULL;
3188 
3189   assert(c != NULL);
3190 
3191   uint32_t bodylen= c->binary_header.response.bodylen;
3192   uint8_t  opcode= c->binary_header.response.opcode;
3193   uint16_t status= c->binary_header.response.status;
3194 
3195   if (bodylen > 0)
3196   {
3197     c->rvbytes= (int32_t)bodylen;
3198     c->readval= true;
3199     return EXIT_FAILURE;
3200   }
3201   else
3202   {
3203     switch (status)
3204     {
3205     case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3206       if (opcode == PROTOCOL_BINARY_CMD_SET)
3207       {
3208         c->currcmd.retstat= MCD_STORED;
3209       }
3210       else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3211       {
3212         c->currcmd.retstat= MCD_DELETED;
3213       }
3214       else if (opcode == PROTOCOL_BINARY_CMD_GET)
3215       {
3216         c->currcmd.retstat= MCD_END;
3217       }
3218       break;
3219 
3220     case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3221       errstr= "Out of memory";
3222       c->currcmd.retstat= MCD_SERVER_ERROR;
3223       break;
3224 
3225     case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3226       errstr= "Unknown command";
3227       c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3228       break;
3229 
3230     case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3231       errstr= "Not found";
3232       c->currcmd.retstat= MCD_NOTFOUND;
3233       break;
3234 
3235     case PROTOCOL_BINARY_RESPONSE_EINVAL:
3236       errstr= "Invalid arguments";
3237       c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3238       break;
3239 
3240     case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3241       errstr= "Data exists for key.";
3242       break;
3243 
3244     case PROTOCOL_BINARY_RESPONSE_E2BIG:
3245       errstr= "Too large.";
3246       c->currcmd.retstat= MCD_SERVER_ERROR;
3247       break;
3248 
3249     case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3250       errstr= "Not stored.";
3251       c->currcmd.retstat= MCD_NOTSTORED;
3252       break;
3253 
3254     default:
3255       errstr= "Unknown error";
3256       c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3257       break;
3258     } /* switch */
3259 
3260     if (errstr != NULL)
3261     {
3262       fprintf(stderr, "%s\n", errstr);
3263     }
3264   }
3265 
3266   return EXIT_SUCCESS;
3267 } /* ms_bin_process_response */
3268 
3269 
3270 /* build binary header and add the header to the buffer to send */
3271 
3272 /**
3273  * build binary header and add the header to the buffer to send
3274  *
3275  * @param c, pointer of the concurrency
3276  * @param opcode, operation code
3277  * @param hdr_len, length of header
3278  * @param key_len, length of key
3279  * @param body_len. length of body
3280  */
ms_add_bin_header(ms_conn_t * c,uint8_t opcode,uint8_t hdr_len,uint16_t key_len,uint32_t body_len)3281 static void ms_add_bin_header(ms_conn_t *c,
3282                               uint8_t opcode,
3283                               uint8_t hdr_len,
3284                               uint16_t key_len,
3285                               uint32_t body_len)
3286 {
3287   protocol_binary_request_header *header;
3288 
3289   assert(c != NULL);
3290 
3291   header= (protocol_binary_request_header *)c->wcurr;
3292 
3293   header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3294   header->request.opcode= (uint8_t)opcode;
3295   header->request.keylen= htons(key_len);
3296 
3297   header->request.extlen= (uint8_t)hdr_len;
3298   header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3299   header->request.vbucket= 0;
3300 
3301   header->request.bodylen= htonl(body_len);
3302   header->request.opaque= 0;
3303   header->request.cas= 0;
3304 
3305   ms_add_iov(c, c->wcurr, sizeof(header->request));
3306 } /* ms_add_bin_header */
3307 
3308 
3309 /**
3310  * add the key to the socket write buffer array
3311  *
3312  * @param c, pointer of the concurrency
3313  * @param item, pointer of task item which includes the object
3314  *            information
3315  */
ms_add_key_to_iov(ms_conn_t * c,ms_task_item_t * item)3316 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3317 {
3318   ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3319   ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3320              item->key_size - (int)KEY_PREFIX_SIZE);
3321 }
3322 
3323 
3324 /**
3325  * for binary protocol, this function build the set command
3326  * and add the command to send buffer array.
3327  *
3328  * @param c, pointer of the concurrency
3329  * @param item, pointer of task item which includes the object
3330  *            information
3331  *
3332  * @return int, if success, return EXIT_SUCCESS, else return -1
3333  */
ms_build_bin_write_buf_set(ms_conn_t * c,ms_task_item_t * item)3334 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3335 {
3336   assert(c->wbuf == c->wcurr);
3337 
3338   int value_offset;
3339   protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3340   uint16_t keylen= (uint16_t)item->key_size;
3341   uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3342                     + (uint32_t)keylen + (uint32_t)item->value_size;
3343 
3344   ms_add_bin_header(c,
3345                     PROTOCOL_BINARY_CMD_SET,
3346                     sizeof(rep->message.body),
3347                     keylen,
3348                     bodylen);
3349   rep->message.body.flags= 0;
3350   rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3351   ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3352   ms_add_key_to_iov(c, item);
3353 
3354   if (item->value_offset == INVALID_OFFSET)
3355   {
3356     value_offset= item->key_suffix_offset;
3357   }
3358   else
3359   {
3360     value_offset= item->value_offset;
3361   }
3362   ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3363 
3364   return EXIT_SUCCESS;
3365 } /* ms_build_bin_write_buf_set */
3366 
3367 
3368 /**
3369  * for binary protocol, this function build the get command and
3370  * add the command to send buffer array.
3371  *
3372  * @param c, pointer of the concurrency
3373  * @param item, pointer of task item which includes the object
3374  *            information
3375  *
3376  * @return int, if success, return EXIT_SUCCESS, else return -1
3377  */
ms_build_bin_write_buf_get(ms_conn_t * c,ms_task_item_t * item)3378 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3379 {
3380   assert(c->wbuf == c->wcurr);
3381 
3382   ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3383                     (uint32_t)item->key_size);
3384   ms_add_key_to_iov(c, item);
3385 
3386   return EXIT_SUCCESS;
3387 } /* ms_build_bin_write_buf_get */
3388 
3389 
3390 /**
3391  * for binary protocol, this function build the multi-get
3392  * command and add the command to send buffer array.
3393  *
3394  * @param c, pointer of the concurrency
3395  * @param item, pointer of task item which includes the object
3396  *            information
3397  *
3398  * @return int, if success, return EXIT_SUCCESS, else return -1
3399  */
ms_build_bin_write_buf_mlget(ms_conn_t * c)3400 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3401 {
3402   ms_task_item_t *item;
3403 
3404   assert(c->wbuf == c->wcurr);
3405 
3406   for (int i= 0; i < c->mlget_task.mlget_num; i++)
3407   {
3408     item= c->mlget_task.mlget_item[i].item;
3409     assert(item != NULL);
3410 
3411     ms_add_bin_header(c,
3412                       PROTOCOL_BINARY_CMD_GET,
3413                       0,
3414                       (uint16_t)item->key_size,
3415                       (uint32_t)item->key_size);
3416     ms_add_key_to_iov(c, item);
3417     c->wcurr+= sizeof(protocol_binary_request_get);
3418   }
3419 
3420   c->wcurr= c->wbuf;
3421 
3422   return EXIT_SUCCESS;
3423 } /* ms_build_bin_write_buf_mlget */
3424