1 /*
2  * File:   ms_task.c
3  * Author: Mingqiang Zhuang
4  *
5  * Created on February 10, 2009
6  *
7  * (c) Copyright 2009, Schooner Information Technology, Inc.
8  * http://www.schoonerinfotech.com/
9  *
10  */
11 
12 #include "mem_config.h"
13 
14 #if defined(HAVE_SYS_TIME_H)
15 # include <sys/time.h>
16 #endif
17 
18 #if defined(HAVE_TIME_H)
19 # include <time.h>
20 #endif
21 
22 #include "ms_thread.h"
23 #include "ms_setting.h"
24 #include "ms_atomic.h"
25 
26 /* command distribution adjustment cycle */
27 #define CMD_DISTR_ADJUST_CYCLE    1000
28 #define DISADJUST_FACTOR          0.03 /**
29                                  * In one adjustment cycle, if undo set or get
30                                  * operations proportion is more than 3% , means
31                                  * there are too many new item or need more new
32                                  * item in the window. This factor shows it.
33                                  */
34 
35 /* get item from task window */
36 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
37 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
38 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
39 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);
40 
41 
42 /* select next operation to do */
43 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
44 
45 
46 /* set and get speed estimate for controlling and adjustment */
47 static bool ms_is_set_too_fast(ms_task_t *task);
48 static bool ms_is_get_too_fast(ms_task_t *task);
49 static void ms_kick_out_item(ms_task_item_t *item);
50 
51 
52 /* miss rate adjustment */
53 static bool ms_need_overwrite_item(ms_task_t *task);
54 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
55 
56 
57 /* deal with data verification initialization */
58 static void ms_task_data_verify_init(ms_task_t *task);
59 static void ms_task_expire_verify_init(ms_task_t *task);
60 
61 
62 /* select a new task to do */
63 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
64 
65 
66 /* run the selected task */
67 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
68 static void ms_update_stat_result(ms_conn_t *c);
69 static void ms_update_multi_get_result(ms_conn_t *c);
70 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
71 static void ms_update_task_result(ms_conn_t *c);
72 static void ms_single_getset_task_sch(ms_conn_t *c);
73 static void ms_multi_getset_task_sch(ms_conn_t *c);
74 static void ms_send_signal(ms_sync_lock_t *sync_lock);
75 static void ms_warmup_server(ms_conn_t *c);
76 static int ms_run_getset_task(ms_conn_t *c);
77 
78 
79 /**
80  * used to get the current operation item(object)
81  *
82  * @param c, pointer of the concurrency
83  *
84  * @return ms_task_item_t*, current operating item
85  */
ms_get_cur_opt_item(ms_conn_t * c)86 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
87 {
88   return c->curr_task.item;
89 }
90 
91 
92 /**
93  * used to get the next item to do get operation
94  *
95  * @param c, pointer of the concurrency
96  *
97  * @return ms_task_item_t*, the pointer of the next item to do
98  *         get operation
99  */
ms_get_next_get_item(ms_conn_t * c)100 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
101 {
102   ms_task_item_t *item= NULL;
103 
104   if (c->set_cursor <= 0)
105   {
106     /* the first item in the window */
107     item= &c->item_win[0];
108   }
109   else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
110   {
111     /* random get one item set before */
112     item= &c->item_win[random() % (int64_t)c->set_cursor];
113   }
114   else
115   {
116     /* random get one item from the window */
117     item= &c->item_win[random() % c->win_size];
118   }
119 
120   return item;
121 } /* ms_get_next_get_item */
122 
123 
124 /**
125  * used to get the next item to do set operation
126  *
127  * @param c, pointer of the concurrency
128  *
129  * @return ms_task_item_t*, the pointer of the next item to do
130  *         set operation
131  */
ms_get_next_set_item(ms_conn_t * c)132 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
133 {
134   /**
135    *  when a set command successes, the cursor will plus 1. If set
136    *  fails, the cursor doesn't change. it isn't necessary to
137    *  increase the cursor here.
138    */
139   return &c->item_win[(int64_t)c->set_cursor % c->win_size];
140 }
141 
142 
143 /**
144  * If we need do overwrite, we could select a item set before.
145  * This function is used to get a item set before to do
146  * overwrite.
147  *
148  * @param c, pointer of the concurrency
149  *
150  * @return ms_task_item_t*, the pointer of the previous item of
151  *         set operation
152  */
ms_get_random_overwrite_item(ms_conn_t * c)153 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c)
154 {
155     return ms_get_next_get_item(c);
156 } /* ms_get_random_overwrite_item */
157 
158 /**
159  * According to the proportion of operations(get or set), select
160  * an operation to do.
161  *
162  * @param c, pointer of the concurrency
163  * @param task, pointer of current task in the concurrency
164  */
ms_select_opt(ms_conn_t * c,ms_task_t * task)165 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
166 {
167   double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
168   double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
169 
170   /* update cycle operation number if necessary */
171   if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
172   {
173     task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
174     task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
175   }
176 
177   /**
178    *  According to operation distribution to choose doing which
179    *  operation. If it can't set new object to sever, just change
180    *  to do get operation.
181    */
182   if ((set_prop > PROP_ERROR)
183       && ((double)task->get_opt * set_prop >= (double)task->set_opt
184           * get_prop))
185   {
186     task->cmd= CMD_SET;
187     task->item= ms_get_next_set_item(c);
188   }
189   else
190   {
191     task->cmd= CMD_GET;
192     task->item= ms_get_next_get_item(c);
193   }
194 } /* ms_select_opt */
195 
196 
197 /**
198  * used to judge whether the number of get operations done is
199  * more than expected number of get operations to do right now.
200  *
201  * @param task, pointer of current task in the concurrency
202  *
203  * @return bool, if get too fast, return true, else return false
204  */
ms_is_get_too_fast(ms_task_t * task)205 static bool ms_is_get_too_fast(ms_task_t *task)
206 {
207   double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
208   double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
209 
210   /* no get operation */
211   if (get_prop < PROP_ERROR)
212   {
213     return false;
214   }
215 
216   int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
217                     * task->cycle_undo_get;
218 
219   if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
220       && (task->cycle_undo_set > max_undo_set))
221   {
222     return true;
223   }
224 
225   return false;
226 } /* ms_is_get_too_fast */
227 
228 
229 /**
230  * used to judge whether the number of set operations done is
231  * more than expected number of set operations to do right now.
232  *
233  * @param task, pointer of current task in the concurrency
234  *
235  * @return bool, if set too fast, return true, else return false
236  */
ms_is_set_too_fast(ms_task_t * task)237 static bool ms_is_set_too_fast(ms_task_t *task)
238 {
239   double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
240   double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
241 
242   /* no set operation */
243   if (set_prop < PROP_ERROR)
244   {
245     return false;
246   }
247 
248   /* If it does set operation too fast, skip some */
249   int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
250                           * (double)task->cycle_undo_set);
251 
252   if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
253       && (task->cycle_undo_get > max_undo_get))
254   {
255     return true;
256   }
257 
258   return false;
259 } /* ms_is_set_too_fast */
260 
261 
262 /**
263  * kick out the old item in the window, and add a new item to
264  * overwrite the old item. When we don't want to do overwrite
265  * object, and the current item to do set operation is an old
266  * item, we could kick out the old item and add a new item. Then
267  * we can ensure we set new object every time.
268  *
269  * @param item, pointer of task item which includes the object
270  *            information
271  */
ms_kick_out_item(ms_task_item_t * item)272 static void ms_kick_out_item(ms_task_item_t *item)
273 {
274   /* allocate a new item */
275   item->key_prefix= ms_get_key_prefix();
276 
277   item->key_suffix_offset++;
278   item->value_offset= INVALID_OFFSET;       /* new item use invalid value offset */
279   item->client_time= 0;
280 } /* ms_kick_out_item */
281 
282 
283 /**
284  *  used to judge whether we need overwrite object based on the
285  *  options user specified
286  *
287  * @param task, pointer of current task in the concurrency
288  *
289  * @return bool, if need overwrite, return true, else return
290  *         false
291  */
ms_need_overwrite_item(ms_task_t * task)292 static bool ms_need_overwrite_item(ms_task_t *task)
293 {
294   ms_task_item_t *item= task->item;
295 
296   assert(item != NULL);
297   assert(task->cmd == CMD_SET);
298 
299   /**
300    *  according to data overwrite percent to determine if do data
301    *  overwrite.
302    */
303   if (task->overwrite_set < (double)task->set_opt
304       * ms_setting.overwrite_percent)
305   {
306     return true;
307   }
308 
309   return false;
310 } /* ms_need_overwirte_item */
311 
312 
313 /**
314  * used to adjust operation. the function must be called after
315  * select operation. the function change get operation to set
316  * operation, or set operation to get operation based on the
317  * current case.
318  *
319  * @param c, pointer of the concurrency
320  * @param task, pointer of current task in the concurrency
321  *
322  * @return bool, if success, return true, else return false
323  */
ms_adjust_opt(ms_conn_t * c,ms_task_t * task)324 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
325 {
326   ms_task_item_t *item= task->item;
327 
328   assert(item != NULL);
329 
330   if (task->cmd == CMD_SET)
331   {
332     /* If did set operation too fast, skip some */
333     if (ms_is_set_too_fast(task))
334     {
335       /* get the item instead */
336       if (item->value_offset != INVALID_OFFSET)
337       {
338         task->cmd= CMD_GET;
339         return true;
340       }
341     }
342 
343     /* If the current item is not a new item, kick it out */
344     if (item->value_offset != INVALID_OFFSET)
345     {
346       if (ms_need_overwrite_item(task))
347       {
348         /* overwrite */
349         task->overwrite_set++;
350       }
351       else
352       {
353         /* kick out the current item to do set operation */
354         ms_kick_out_item(item);
355       }
356     }
357     else            /* it's a new item */
358     {
359       /* need overwrite */
360       if (ms_need_overwrite_item(task))
361       {
362         /**
363          *  overwrite not use the item with current set cursor, revert
364          *  set cursor.
365          */
366         c->set_cursor--;
367 
368         item= ms_get_random_overwrite_item(c);
369         if (item->value_offset != INVALID_OFFSET)
370         {
371           task->item= item;
372           task->overwrite_set++;
373         }
374         else                /* item is a new item */
375         {
376           /* select the item to run, and cancel overwrite */
377           task->item= item;
378         }
379       }
380     }
381     task->cmd= CMD_SET;
382     return true;
383   }
384   else
385   {
386     if (item->value_offset == INVALID_OFFSET)
387     {
388       task->cmd= CMD_SET;
389       return true;
390     }
391 
392     /**
393      *  If It does get operation too fast, it will change the
394      *  operation to set.
395      */
396     if (ms_is_get_too_fast(task))
397     {
398       /* don't kick out the first item in the window */
399       if (! ms_is_set_too_fast(task))
400       {
401         ms_kick_out_item(item);
402         task->cmd= CMD_SET;
403         return true;
404       }
405       else
406       {
407         return false;
408       }
409     }
410 
411     assert(item->value_offset != INVALID_OFFSET);
412 
413     task->cmd= CMD_GET;
414     return true;
415   }
416 } /* ms_adjust_opt */
417 
418 
419 /**
420  * used to initialize the task which need verify data.
421  *
422  * @param task, pointer of current task in the concurrency
423  */
ms_task_data_verify_init(ms_task_t * task)424 static void ms_task_data_verify_init(ms_task_t *task)
425 {
426   ms_task_item_t *item= task->item;
427 
428   assert(item != NULL);
429   assert(task->cmd == CMD_GET);
430 
431   /**
432    *  according to data verification percent to determine if do
433    *  data verification.
434    */
435   if (task->verified_get < (double)task->get_opt
436       * ms_setting.verify_percent)
437   {
438     /**
439      *  currently it doesn't do verify, just increase the counter,
440      *  and do verification next proper get command
441      */
442     if ((task->item->value_offset != INVALID_OFFSET)
443         && (item->exp_time == 0))
444     {
445       task->verify= true;
446       task->finish_verify= false;
447       task->verified_get++;
448     }
449   }
450 } /* ms_task_data_verify_init */
451 
452 
453 /**
454  * used to initialize the task which need verify expire time.
455  *
456  * @param task, pointer of current task in the concurrency
457  */
ms_task_expire_verify_init(ms_task_t * task)458 static void ms_task_expire_verify_init(ms_task_t *task)
459 {
460   ms_task_item_t *item= task->item;
461 
462   assert(item != NULL);
463   assert(task->cmd == CMD_GET);
464   assert(item->exp_time > 0);
465 
466   task->verify= true;
467   task->finish_verify= false;
468 } /* ms_task_expire_verify_init */
469 
470 
471 /**
472  * used to get one task, the function initializes the task
473  * structure.
474  *
475  * @param c, pointer of the concurrency
476  * @param warmup, whether it need warmup
477  *
478  * @return ms_task_t*, pointer of current task in the
479  *         concurrency
480  */
ms_get_task(ms_conn_t * c,bool warmup)481 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
482 {
483   ms_task_t *task= &c->curr_task;
484 
485   while (1)
486   {
487     task->verify= false;
488     task->finish_verify= true;
489     task->get_miss= true;
490 
491     if (warmup)
492     {
493       task->cmd= CMD_SET;
494       task->item= ms_get_next_set_item(c);
495 
496       return task;
497     }
498 
499     /* according to operation distribution to choose doing which operation */
500     ms_select_opt(c, task);
501 
502     if (! ms_adjust_opt(c, task))
503     {
504       continue;
505     }
506 
507     if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
508     {
509       ms_task_data_verify_init(task);
510     }
511 
512     if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
513         && (task->item->exp_time > 0))
514     {
515       ms_task_expire_verify_init(task);
516     }
517 
518     break;
519   }
520 
521   /**
522    *  Only update get and delete counter, set counter will be
523    *  updated after set operation successes.
524    */
525   if (task->cmd == CMD_GET)
526   {
527     task->get_opt++;
528     task->cycle_undo_get--;
529   }
530 
531   return task;
532 } /* ms_get_task */
533 
534 
535 /**
536  * send a signal to the main monitor thread
537  *
538  * @param sync_lock, pointer of the lock
539  */
ms_send_signal(ms_sync_lock_t * sync_lock)540 static void ms_send_signal(ms_sync_lock_t *sync_lock)
541 {
542   pthread_mutex_lock(&sync_lock->lock);
543   sync_lock->count++;
544   pthread_cond_signal(&sync_lock->cond);
545   pthread_mutex_unlock(&sync_lock->lock);
546 } /* ms_send_signal */
547 
548 
549 /**
550  * If user only want to do get operation, but there is no object
551  * in server , so we use this function to warmup the server, and
552  * set some objects to server. It runs at the beginning of task.
553  *
554  * @param c, pointer of the concurrency
555  */
ms_warmup_server(ms_conn_t * c)556 static void ms_warmup_server(ms_conn_t *c)
557 {
558   ms_task_t *task;
559   ms_task_item_t *item;
560 
561   /**
562    * Extra one loop to get the last command returned state.
563    * Normally it gets the previous command returned state.
564    */
565   if ((c->remain_warmup_num >= 0)
566       && (c->remain_warmup_num != c->warmup_num))
567   {
568     item= ms_get_cur_opt_item(c);
569     /* only update the set command result state for data verification */
570     if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
571     {
572       item->value_offset= item->key_suffix_offset;
573       /* set success, update counter */
574       c->set_cursor++;
575     }
576     else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
577     {
578       printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
579     }
580   }
581 
582   /* the last time don't run a task */
583   if (c->remain_warmup_num-- > 0)
584   {
585     /* operate next task item */
586     task= ms_get_task(c, true);
587     item= task->item;
588     ms_mcd_set(c, item);
589   }
590 
591   /**
592    *  finish warming up server, wait all connects initialize
593    *  complete. Then all connects can start do task at the same
594    *  time.
595    */
596   if (c->remain_warmup_num == -1)
597   {
598     ms_send_signal(&ms_global.warmup_lock);
599     c->remain_warmup_num--;       /* never run the if branch */
600   }
601 } /* ms_warmup_server */
602 
603 
604 /**
605  * dispatch single get and set task
606  *
607  * @param c, pointer of the concurrency
608  */
ms_single_getset_task_sch(ms_conn_t * c)609 static void ms_single_getset_task_sch(ms_conn_t *c)
610 {
611   ms_task_t *task;
612   ms_task_item_t *item;
613 
614   /* the last time don't run a task */
615   if (c->remain_exec_num-- > 0)
616   {
617     task= ms_get_task(c, false);
618     item= task->item;
619     if (task->cmd == CMD_SET)
620     {
621       ms_mcd_set(c, item);
622     }
623     else if (task->cmd == CMD_GET)
624     {
625       assert(task->cmd == CMD_GET);
626       ms_mcd_get(c, item);
627     }
628   }
629 } /* ms_single_getset_task_sch */
630 
631 
632 /**
633  * dispatch multi-get and set task
634  *
635  * @param c, pointer of the concurrency
636  */
ms_multi_getset_task_sch(ms_conn_t * c)637 static void ms_multi_getset_task_sch(ms_conn_t *c)
638 {
639   ms_task_t *task;
640   ms_mlget_task_item_t *mlget_item;
641 
642   while (1)
643   {
644     if (c->remain_exec_num-- > 0)
645     {
646       task= ms_get_task(c, false);
647       if (task->cmd == CMD_SET)             /* just do it */
648       {
649         ms_mcd_set(c, task->item);
650         break;
651       }
652       else
653       {
654         assert(task->cmd == CMD_GET);
655         mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
656         mlget_item->item= task->item;
657         mlget_item->verify= task->verify;
658         mlget_item->finish_verify= task->finish_verify;
659         mlget_item->get_miss= task->get_miss;
660         c->mlget_task.mlget_num++;
661 
662         /* enough multi-get task items can be done */
663         if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
664             || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
665         {
666           ms_mcd_mlget(c);
667           break;
668         }
669       }
670     }
671     else
672     {
673       if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
674       {
675         ms_mcd_mlget(c);
676       }
677       break;
678     }
679   }
680 } /* ms_multi_getset_task_sch */
681 
682 
683 /**
684  * calculate the difference value of two time points
685  *
686  * @param start_time, the start time
687  * @param end_time, the end time
688  *
689  * @return uint64_t, the difference value between start_time and end_time in us
690  */
ms_time_diff(struct timeval * start_time,struct timeval * end_time)691 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
692 {
693   int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
694   int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;
695 
696   assert(endtime >= starttime);
697 
698   return endtime - starttime;
699 } /* ms_time_diff */
700 
701 
702 /**
703  * after get the response from server for multi-get, the
704  * function update the state of the task and do data verify if
705  * necessary.
706  *
707  * @param c, pointer of the concurrency
708  */
ms_update_multi_get_result(ms_conn_t * c)709 static void ms_update_multi_get_result(ms_conn_t *c)
710 {
711   ms_mlget_task_item_t *mlget_item;
712   ms_task_item_t *item;
713   char *orignval= NULL;
714   char *orignkey= NULL;
715 
716   if (c == NULL)
717   {
718     return;
719   }
720   assert(c != NULL);
721 
722   for (int i= 0; i < c->mlget_task.mlget_num; i++)
723   {
724     mlget_item= &c->mlget_task.mlget_item[i];
725     item= mlget_item->item;
726     orignval= &ms_setting.char_block[item->value_offset];
727     orignkey= &ms_setting.char_block[item->key_suffix_offset];
728 
729     /* update get miss counter */
730     if (mlget_item->get_miss)
731     {
732       atomic_add_size(&ms_stats.get_misses, 1);
733     }
734 
735     /* get nothing from server for this task item */
736     if (mlget_item->verify && ! mlget_item->finish_verify)
737     {
738       /* verify expire time if necessary */
739       if (item->exp_time > 0)
740       {
741         struct timeval curr_time;
742         gettimeofday(&curr_time, NULL);
743 
744         /* object doesn't expire but can't get it now */
745         if (curr_time.tv_sec - item->client_time
746             < item->exp_time - EXPIRE_TIME_ERROR)
747         {
748           atomic_add_size(&ms_stats.unexp_unget, 1);
749 
750           if (ms_setting.verbose)
751           {
752             char set_time[64];
753             char cur_time[64];
754             strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
755                      localtime(&item->client_time));
756             strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
757                      localtime(&curr_time.tv_sec));
758             fprintf(stderr,
759                     "\n\t<%d expire time verification failed, object "
760                     "doesn't expire but can't get it now\n"
761                     "\tkey len: %d\n"
762                     "\tkey: %" PRIx64 " %.*s\n"
763                     "\tset time: %s current time: %s "
764                     "diff time: %d expire time: %d\n"
765                     "\texpected data len: %d\n"
766                     "\texpected data: %.*s\n"
767                     "\treceived data: \n",
768                     c->sfd,
769                     item->key_size,
770                     item->key_prefix,
771                     item->key_size - (int)KEY_PREFIX_SIZE,
772                     orignkey,
773                     set_time,
774                     cur_time,
775                     (int)(curr_time.tv_sec - item->client_time),
776                     item->exp_time,
777                     item->value_size,
778                     item->value_size,
779                     orignval);
780             fflush(stderr);
781           }
782         }
783       }
784       else
785       {
786         atomic_add_size(&ms_stats.vef_miss, 1);
787 
788         if (ms_setting.verbose)
789         {
790           fprintf(stderr, "\n<%d data verification failed\n"
791                           "\tkey len: %d\n"
792                           "\tkey: %" PRIx64 " %.*s\n"
793                           "\texpected data len: %d\n"
794                           "\texpected data: %.*s\n"
795                           "\treceived data: \n",
796                   c->sfd, item->key_size, item->key_prefix,
797                   item->key_size - (int)KEY_PREFIX_SIZE,
798                   orignkey, item->value_size, item->value_size, orignval);
799           fflush(stderr);
800         }
801       }
802     }
803   }
804   c->mlget_task.mlget_num= 0;
805   c->mlget_task.value_index= INVALID_OFFSET;
806 } /* ms_update_multi_get_result */
807 
808 
809 /**
810  * after get the response from server for single get, the
811  * function update the state of the task and do data verify if
812  * necessary.
813  *
814  * @param c, pointer of the concurrency
815  * @param item, pointer of task item which includes the object
816  *            information
817  */
ms_update_single_get_result(ms_conn_t * c,ms_task_item_t * item)818 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
819 {
820   char *orignval= NULL;
821   char *orignkey= NULL;
822 
823   if ((c == NULL) || (item == NULL))
824   {
825     return;
826   }
827   assert(c != NULL);
828   assert(item != NULL);
829 
830   orignval= &ms_setting.char_block[item->value_offset];
831   orignkey= &ms_setting.char_block[item->key_suffix_offset];
832 
833   /* update get miss counter */
834   if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
835   {
836     atomic_add_size(&ms_stats.get_misses, 1);
837   }
838 
839   /* get nothing from server for this task item */
840   if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
841       && ! c->curr_task.finish_verify)
842   {
843     /* verify expire time if necessary */
844     if (item->exp_time > 0)
845     {
846       struct timeval curr_time;
847       gettimeofday(&curr_time, NULL);
848 
849       /* object doesn't expire but can't get it now */
850       if (curr_time.tv_sec - item->client_time
851           < item->exp_time - EXPIRE_TIME_ERROR)
852       {
853         atomic_add_size(&ms_stats.unexp_unget, 1);
854 
855         if (ms_setting.verbose)
856         {
857           char set_time[64];
858           char cur_time[64];
859           strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
860                    localtime(&item->client_time));
861           strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
862                    localtime(&curr_time.tv_sec));
863           fprintf(stderr,
864                   "\n\t<%d expire time verification failed, object "
865                   "doesn't expire but can't get it now\n"
866                   "\tkey len: %d\n"
867                   "\tkey: %" PRIx64 " %.*s\n"
868                   "\tset time: %s current time: %s "
869                   "diff time: %d expire time: %d\n"
870                   "\texpected data len: %d\n"
871                   "\texpected data: %.*s\n"
872                   "\treceived data: \n",
873                   c->sfd,
874                   item->key_size,
875                   item->key_prefix,
876                   item->key_size - (int)KEY_PREFIX_SIZE,
877                   orignkey,
878                   set_time,
879                   cur_time,
880                   (int)(curr_time.tv_sec - item->client_time),
881                   item->exp_time,
882                   item->value_size,
883                   item->value_size,
884                   orignval);
885           fflush(stderr);
886         }
887       }
888     }
889     else
890     {
891       atomic_add_size(&ms_stats.vef_miss, 1);
892 
893       if (ms_setting.verbose)
894       {
895         fprintf(stderr, "\n<%d data verification failed\n"
896                         "\tkey len: %d\n"
897                         "\tkey: %" PRIx64 " %.*s\n"
898                         "\texpected data len: %d\n"
899                         "\texpected data: %.*s\n"
900                         "\treceived data: \n",
901                 c->sfd, item->key_size, item->key_prefix,
902                 item->key_size - (int)KEY_PREFIX_SIZE,
903                 orignkey, item->value_size, item->value_size, orignval);
904         fflush(stderr);
905       }
906     }
907   }
908 } /* ms_update_single_get_result */
909 
910 
911 /**
912  * after get the response from server for set the function
913  * update the state of the task and do data verify if necessary.
914  *
915  * @param c, pointer of the concurrency
916  * @param item, pointer of task item which includes the object
917  *            information
918  */
ms_update_set_result(ms_conn_t * c,ms_task_item_t * item)919 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
920 {
921   if ((c == NULL) || (item == NULL))
922   {
923     return;
924   }
925   assert(c != NULL);
926   assert(item != NULL);
927 
928   if (c->precmd.cmd == CMD_SET)
929   {
930     switch (c->precmd.retstat)
931     {
932     case MCD_STORED:
933       if (item->value_offset == INVALID_OFFSET)
934       {
935         /* first set with the same offset of key suffix */
936         item->value_offset= item->key_suffix_offset;
937       }
938       else
939       {
940         /* not first set, just increase the value offset */
941         item->value_offset+= 1;
942       }
943 
944       /* set successes, update counter */
945       c->set_cursor++;
946       c->curr_task.set_opt++;
947       c->curr_task.cycle_undo_set--;
948       break;
949 
950     case MCD_SERVER_ERROR:
951     default:
952       break;
953     } /* switch */
954   }
955 } /* ms_update_set_result */
956 
957 
958 /**
959  * update the response time result
960  *
961  * @param c, pointer of the concurrency
962  */
ms_update_stat_result(ms_conn_t * c)963 static void ms_update_stat_result(ms_conn_t *c)
964 {
965   bool get_miss= false;
966 
967   if (c == NULL)
968   {
969     return;
970   }
971   assert(c != NULL);
972 
973   gettimeofday(&c->end_time, NULL);
974   uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
975 
976   pthread_mutex_lock(&ms_statistic.stat_mutex);
977 
978   switch (c->precmd.cmd)
979   {
980   case CMD_SET:
981     ms_record_event(&ms_statistic.set_stat, time_diff, false);
982     break;
983 
984   case CMD_GET:
985     if (c->curr_task.get_miss)
986     {
987       get_miss= true;
988     }
989     ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
990     break;
991 
992   default:
993     break;
994   } /* switch */
995 
996   ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
997   pthread_mutex_unlock(&ms_statistic.stat_mutex);
998 } /* ms_update_stat_result */
999 
1000 
1001 /**
1002  * after get response from server for the current operation, and
1003  * before doing the next operation, update the state of the
1004  * current operation.
1005  *
1006  * @param c, pointer of the concurrency
1007  */
ms_update_task_result(ms_conn_t * c)1008 static void ms_update_task_result(ms_conn_t *c)
1009 {
1010   ms_task_item_t *item;
1011 
1012   if (c == NULL)
1013   {
1014     return;
1015   }
1016   assert(c != NULL);
1017 
1018   item= ms_get_cur_opt_item(c);
1019   if (item == NULL)
1020   {
1021     return;
1022   }
1023   assert(item != NULL);
1024 
1025   ms_update_set_result(c, item);
1026 
1027   if ((ms_setting.stat_freq > 0)
1028       && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
1029   {
1030     ms_update_stat_result(c);
1031   }
1032 
1033   /* update multi-get task item */
1034   if (((ms_setting.mult_key_num > 1)
1035        && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1036       || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1037   {
1038     ms_update_multi_get_result(c);
1039   }
1040   else
1041   {
1042     ms_update_single_get_result(c, item);
1043   }
1044 } /* ms_update_task_result */
1045 
1046 
1047 /**
1048  * run get and set operation
1049  *
1050  * @param c, pointer of the concurrency
1051  *
1052  * @return int, if success, return EXIT_SUCCESS, else return -1
1053  */
ms_run_getset_task(ms_conn_t * c)1054 static int ms_run_getset_task(ms_conn_t *c)
1055 {
1056   /**
1057    * extra one loop to get the last command return state. get the
1058    * last command return state.
1059    */
1060   if ((c->remain_exec_num >= 0)
1061       && (c->remain_exec_num != c->exec_num))
1062   {
1063     ms_update_task_result(c);
1064   }
1065 
1066   /* multi-get */
1067   if (ms_setting.mult_key_num > 1)
1068   {
1069     /* operate next task item */
1070     ms_multi_getset_task_sch(c);
1071   }
1072   else
1073   {
1074     /* operate next task item */
1075     ms_single_getset_task_sch(c);
1076   }
1077 
1078   /* no task to do, exit */
1079   if ((c->remain_exec_num == -1) || ms_global.time_out)
1080   {
1081     return -1;
1082   }
1083 
1084   return EXIT_SUCCESS;
1085 } /* ms_run_getset_task */
1086 
1087 
1088 /**
1089  * the state machine call the function to execute task.
1090  *
1091  * @param c, pointer of the concurrency
1092  *
1093  * @return int, if success, return EXIT_SUCCESS, else return -1
1094  */
ms_exec_task(struct conn * c)1095 int ms_exec_task(struct conn *c)
1096 {
1097   if (! ms_global.finish_warmup)
1098   {
1099     ms_warmup_server(c);
1100   }
1101   else
1102   {
1103     if (ms_run_getset_task(c) != 0)
1104     {
1105       return -1;
1106     }
1107   }
1108 
1109   return EXIT_SUCCESS;
1110 } /* ms_exec_task */
1111