1 /*
2 * File: ms_task.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "mem_config.h"
13
14 #if defined(HAVE_SYS_TIME_H)
15 # include <sys/time.h>
16 #endif
17
18 #if defined(HAVE_TIME_H)
19 # include <time.h>
20 #endif
21
22 #include "ms_thread.h"
23 #include "ms_setting.h"
24 #include "ms_atomic.h"
25
26 /* command distribution adjustment cycle */
27 #define CMD_DISTR_ADJUST_CYCLE 1000
28 #define DISADJUST_FACTOR 0.03 /**
29 * In one adjustment cycle, if undo set or get
30 * operations proportion is more than 3% , means
31 * there are too many new item or need more new
32 * item in the window. This factor shows it.
33 */
34
35 /* get item from task window */
36 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
37 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
38 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
39 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);
40
41
42 /* select next operation to do */
43 static void ms_select_opt(ms_conn_t *c, ms_task_t *task);
44
45
46 /* set and get speed estimate for controlling and adjustment */
47 static bool ms_is_set_too_fast(ms_task_t *task);
48 static bool ms_is_get_too_fast(ms_task_t *task);
49 static void ms_kick_out_item(ms_task_item_t *item);
50
51
52 /* miss rate adjustment */
53 static bool ms_need_overwrite_item(ms_task_t *task);
54 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
55
56
57 /* deal with data verification initialization */
58 static void ms_task_data_verify_init(ms_task_t *task);
59 static void ms_task_expire_verify_init(ms_task_t *task);
60
61
62 /* select a new task to do */
63 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup);
64
65
66 /* run the selected task */
67 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item);
68 static void ms_update_stat_result(ms_conn_t *c);
69 static void ms_update_multi_get_result(ms_conn_t *c);
70 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item);
71 static void ms_update_task_result(ms_conn_t *c);
72 static void ms_single_getset_task_sch(ms_conn_t *c);
73 static void ms_multi_getset_task_sch(ms_conn_t *c);
74 static void ms_send_signal(ms_sync_lock_t *sync_lock);
75 static void ms_warmup_server(ms_conn_t *c);
76 static int ms_run_getset_task(ms_conn_t *c);
77
78
79 /**
80 * used to get the current operation item(object)
81 *
82 * @param c, pointer of the concurrency
83 *
84 * @return ms_task_item_t*, current operating item
85 */
ms_get_cur_opt_item(ms_conn_t * c)86 static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c)
87 {
88 return c->curr_task.item;
89 }
90
91
92 /**
93 * used to get the next item to do get operation
94 *
95 * @param c, pointer of the concurrency
96 *
97 * @return ms_task_item_t*, the pointer of the next item to do
98 * get operation
99 */
ms_get_next_get_item(ms_conn_t * c)100 static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c)
101 {
102 ms_task_item_t *item= NULL;
103
104 if (c->set_cursor <= 0)
105 {
106 /* the first item in the window */
107 item= &c->item_win[0];
108 }
109 else if (c->set_cursor > 0 && c->set_cursor < (uint32_t)c->win_size)
110 {
111 /* random get one item set before */
112 item= &c->item_win[random() % (int64_t)c->set_cursor];
113 }
114 else
115 {
116 /* random get one item from the window */
117 item= &c->item_win[random() % c->win_size];
118 }
119
120 return item;
121 } /* ms_get_next_get_item */
122
123
124 /**
125 * used to get the next item to do set operation
126 *
127 * @param c, pointer of the concurrency
128 *
129 * @return ms_task_item_t*, the pointer of the next item to do
130 * set operation
131 */
ms_get_next_set_item(ms_conn_t * c)132 static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c)
133 {
134 /**
135 * when a set command successes, the cursor will plus 1. If set
136 * fails, the cursor doesn't change. it isn't necessary to
137 * increase the cursor here.
138 */
139 return &c->item_win[(int64_t)c->set_cursor % c->win_size];
140 }
141
142
143 /**
144 * If we need do overwrite, we could select a item set before.
145 * This function is used to get a item set before to do
146 * overwrite.
147 *
148 * @param c, pointer of the concurrency
149 *
150 * @return ms_task_item_t*, the pointer of the previous item of
151 * set operation
152 */
ms_get_random_overwrite_item(ms_conn_t * c)153 static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c)
154 {
155 return ms_get_next_get_item(c);
156 } /* ms_get_random_overwrite_item */
157
158 /**
159 * According to the proportion of operations(get or set), select
160 * an operation to do.
161 *
162 * @param c, pointer of the concurrency
163 * @param task, pointer of current task in the concurrency
164 */
ms_select_opt(ms_conn_t * c,ms_task_t * task)165 static void ms_select_opt(ms_conn_t *c, ms_task_t *task)
166 {
167 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
168 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
169
170 /* update cycle operation number if necessary */
171 if ((task->cycle_undo_get == 0) || (task->cycle_undo_set == 0))
172 {
173 task->cycle_undo_get+= (int)(CMD_DISTR_ADJUST_CYCLE * get_prop);
174 task->cycle_undo_set+= (int)(CMD_DISTR_ADJUST_CYCLE * set_prop);
175 }
176
177 /**
178 * According to operation distribution to choose doing which
179 * operation. If it can't set new object to sever, just change
180 * to do get operation.
181 */
182 if ((set_prop > PROP_ERROR)
183 && ((double)task->get_opt * set_prop >= (double)task->set_opt
184 * get_prop))
185 {
186 task->cmd= CMD_SET;
187 task->item= ms_get_next_set_item(c);
188 }
189 else
190 {
191 task->cmd= CMD_GET;
192 task->item= ms_get_next_get_item(c);
193 }
194 } /* ms_select_opt */
195
196
197 /**
198 * used to judge whether the number of get operations done is
199 * more than expected number of get operations to do right now.
200 *
201 * @param task, pointer of current task in the concurrency
202 *
203 * @return bool, if get too fast, return true, else return false
204 */
ms_is_get_too_fast(ms_task_t * task)205 static bool ms_is_get_too_fast(ms_task_t *task)
206 {
207 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
208 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
209
210 /* no get operation */
211 if (get_prop < PROP_ERROR)
212 {
213 return false;
214 }
215
216 int max_undo_set= (int)(set_prop / get_prop * (1.0 + DISADJUST_FACTOR))
217 * task->cycle_undo_get;
218
219 if (((double)task->get_opt * set_prop > (double)task->set_opt * get_prop)
220 && (task->cycle_undo_set > max_undo_set))
221 {
222 return true;
223 }
224
225 return false;
226 } /* ms_is_get_too_fast */
227
228
229 /**
230 * used to judge whether the number of set operations done is
231 * more than expected number of set operations to do right now.
232 *
233 * @param task, pointer of current task in the concurrency
234 *
235 * @return bool, if set too fast, return true, else return false
236 */
ms_is_set_too_fast(ms_task_t * task)237 static bool ms_is_set_too_fast(ms_task_t *task)
238 {
239 double get_prop= ms_setting.cmd_distr[CMD_GET].cmd_prop;
240 double set_prop= ms_setting.cmd_distr[CMD_SET].cmd_prop;
241
242 /* no set operation */
243 if (set_prop < PROP_ERROR)
244 {
245 return false;
246 }
247
248 /* If it does set operation too fast, skip some */
249 int max_undo_get= (int)((get_prop / set_prop * (1.0 + DISADJUST_FACTOR))
250 * (double)task->cycle_undo_set);
251
252 if (((double)task->get_opt * set_prop < (double)task->set_opt * get_prop)
253 && (task->cycle_undo_get > max_undo_get))
254 {
255 return true;
256 }
257
258 return false;
259 } /* ms_is_set_too_fast */
260
261
262 /**
263 * kick out the old item in the window, and add a new item to
264 * overwrite the old item. When we don't want to do overwrite
265 * object, and the current item to do set operation is an old
266 * item, we could kick out the old item and add a new item. Then
267 * we can ensure we set new object every time.
268 *
269 * @param item, pointer of task item which includes the object
270 * information
271 */
ms_kick_out_item(ms_task_item_t * item)272 static void ms_kick_out_item(ms_task_item_t *item)
273 {
274 /* allocate a new item */
275 item->key_prefix= ms_get_key_prefix();
276
277 item->key_suffix_offset++;
278 item->value_offset= INVALID_OFFSET; /* new item use invalid value offset */
279 item->client_time= 0;
280 } /* ms_kick_out_item */
281
282
283 /**
284 * used to judge whether we need overwrite object based on the
285 * options user specified
286 *
287 * @param task, pointer of current task in the concurrency
288 *
289 * @return bool, if need overwrite, return true, else return
290 * false
291 */
ms_need_overwrite_item(ms_task_t * task)292 static bool ms_need_overwrite_item(ms_task_t *task)
293 {
294 ms_task_item_t *item= task->item;
295
296 assert(item != NULL);
297 assert(task->cmd == CMD_SET);
298
299 /**
300 * according to data overwrite percent to determine if do data
301 * overwrite.
302 */
303 if (task->overwrite_set < (double)task->set_opt
304 * ms_setting.overwrite_percent)
305 {
306 return true;
307 }
308
309 return false;
310 } /* ms_need_overwirte_item */
311
312
313 /**
314 * used to adjust operation. the function must be called after
315 * select operation. the function change get operation to set
316 * operation, or set operation to get operation based on the
317 * current case.
318 *
319 * @param c, pointer of the concurrency
320 * @param task, pointer of current task in the concurrency
321 *
322 * @return bool, if success, return true, else return false
323 */
ms_adjust_opt(ms_conn_t * c,ms_task_t * task)324 static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task)
325 {
326 ms_task_item_t *item= task->item;
327
328 assert(item != NULL);
329
330 if (task->cmd == CMD_SET)
331 {
332 /* If did set operation too fast, skip some */
333 if (ms_is_set_too_fast(task))
334 {
335 /* get the item instead */
336 if (item->value_offset != INVALID_OFFSET)
337 {
338 task->cmd= CMD_GET;
339 return true;
340 }
341 }
342
343 /* If the current item is not a new item, kick it out */
344 if (item->value_offset != INVALID_OFFSET)
345 {
346 if (ms_need_overwrite_item(task))
347 {
348 /* overwrite */
349 task->overwrite_set++;
350 }
351 else
352 {
353 /* kick out the current item to do set operation */
354 ms_kick_out_item(item);
355 }
356 }
357 else /* it's a new item */
358 {
359 /* need overwrite */
360 if (ms_need_overwrite_item(task))
361 {
362 /**
363 * overwrite not use the item with current set cursor, revert
364 * set cursor.
365 */
366 c->set_cursor--;
367
368 item= ms_get_random_overwrite_item(c);
369 if (item->value_offset != INVALID_OFFSET)
370 {
371 task->item= item;
372 task->overwrite_set++;
373 }
374 else /* item is a new item */
375 {
376 /* select the item to run, and cancel overwrite */
377 task->item= item;
378 }
379 }
380 }
381 task->cmd= CMD_SET;
382 return true;
383 }
384 else
385 {
386 if (item->value_offset == INVALID_OFFSET)
387 {
388 task->cmd= CMD_SET;
389 return true;
390 }
391
392 /**
393 * If It does get operation too fast, it will change the
394 * operation to set.
395 */
396 if (ms_is_get_too_fast(task))
397 {
398 /* don't kick out the first item in the window */
399 if (! ms_is_set_too_fast(task))
400 {
401 ms_kick_out_item(item);
402 task->cmd= CMD_SET;
403 return true;
404 }
405 else
406 {
407 return false;
408 }
409 }
410
411 assert(item->value_offset != INVALID_OFFSET);
412
413 task->cmd= CMD_GET;
414 return true;
415 }
416 } /* ms_adjust_opt */
417
418
419 /**
420 * used to initialize the task which need verify data.
421 *
422 * @param task, pointer of current task in the concurrency
423 */
ms_task_data_verify_init(ms_task_t * task)424 static void ms_task_data_verify_init(ms_task_t *task)
425 {
426 ms_task_item_t *item= task->item;
427
428 assert(item != NULL);
429 assert(task->cmd == CMD_GET);
430
431 /**
432 * according to data verification percent to determine if do
433 * data verification.
434 */
435 if (task->verified_get < (double)task->get_opt
436 * ms_setting.verify_percent)
437 {
438 /**
439 * currently it doesn't do verify, just increase the counter,
440 * and do verification next proper get command
441 */
442 if ((task->item->value_offset != INVALID_OFFSET)
443 && (item->exp_time == 0))
444 {
445 task->verify= true;
446 task->finish_verify= false;
447 task->verified_get++;
448 }
449 }
450 } /* ms_task_data_verify_init */
451
452
453 /**
454 * used to initialize the task which need verify expire time.
455 *
456 * @param task, pointer of current task in the concurrency
457 */
ms_task_expire_verify_init(ms_task_t * task)458 static void ms_task_expire_verify_init(ms_task_t *task)
459 {
460 ms_task_item_t *item= task->item;
461
462 assert(item != NULL);
463 assert(task->cmd == CMD_GET);
464 assert(item->exp_time > 0);
465
466 task->verify= true;
467 task->finish_verify= false;
468 } /* ms_task_expire_verify_init */
469
470
471 /**
472 * used to get one task, the function initializes the task
473 * structure.
474 *
475 * @param c, pointer of the concurrency
476 * @param warmup, whether it need warmup
477 *
478 * @return ms_task_t*, pointer of current task in the
479 * concurrency
480 */
ms_get_task(ms_conn_t * c,bool warmup)481 static ms_task_t *ms_get_task(ms_conn_t *c, bool warmup)
482 {
483 ms_task_t *task= &c->curr_task;
484
485 while (1)
486 {
487 task->verify= false;
488 task->finish_verify= true;
489 task->get_miss= true;
490
491 if (warmup)
492 {
493 task->cmd= CMD_SET;
494 task->item= ms_get_next_set_item(c);
495
496 return task;
497 }
498
499 /* according to operation distribution to choose doing which operation */
500 ms_select_opt(c, task);
501
502 if (! ms_adjust_opt(c, task))
503 {
504 continue;
505 }
506
507 if ((ms_setting.verify_percent > 0) && (task->cmd == CMD_GET))
508 {
509 ms_task_data_verify_init(task);
510 }
511
512 if ((ms_setting.exp_ver_per > 0) && (task->cmd == CMD_GET)
513 && (task->item->exp_time > 0))
514 {
515 ms_task_expire_verify_init(task);
516 }
517
518 break;
519 }
520
521 /**
522 * Only update get and delete counter, set counter will be
523 * updated after set operation successes.
524 */
525 if (task->cmd == CMD_GET)
526 {
527 task->get_opt++;
528 task->cycle_undo_get--;
529 }
530
531 return task;
532 } /* ms_get_task */
533
534
535 /**
536 * send a signal to the main monitor thread
537 *
538 * @param sync_lock, pointer of the lock
539 */
ms_send_signal(ms_sync_lock_t * sync_lock)540 static void ms_send_signal(ms_sync_lock_t *sync_lock)
541 {
542 pthread_mutex_lock(&sync_lock->lock);
543 sync_lock->count++;
544 pthread_cond_signal(&sync_lock->cond);
545 pthread_mutex_unlock(&sync_lock->lock);
546 } /* ms_send_signal */
547
548
549 /**
550 * If user only want to do get operation, but there is no object
551 * in server , so we use this function to warmup the server, and
552 * set some objects to server. It runs at the beginning of task.
553 *
554 * @param c, pointer of the concurrency
555 */
ms_warmup_server(ms_conn_t * c)556 static void ms_warmup_server(ms_conn_t *c)
557 {
558 ms_task_t *task;
559 ms_task_item_t *item;
560
561 /**
562 * Extra one loop to get the last command returned state.
563 * Normally it gets the previous command returned state.
564 */
565 if ((c->remain_warmup_num >= 0)
566 && (c->remain_warmup_num != c->warmup_num))
567 {
568 item= ms_get_cur_opt_item(c);
569 /* only update the set command result state for data verification */
570 if ((c->precmd.cmd == CMD_SET) && (c->precmd.retstat == MCD_STORED))
571 {
572 item->value_offset= item->key_suffix_offset;
573 /* set success, update counter */
574 c->set_cursor++;
575 }
576 else if (c->precmd.cmd == CMD_SET && c->precmd.retstat != MCD_STORED)
577 {
578 printf("key: %" PRIx64 " didn't set success\n", item->key_prefix);
579 }
580 }
581
582 /* the last time don't run a task */
583 if (c->remain_warmup_num-- > 0)
584 {
585 /* operate next task item */
586 task= ms_get_task(c, true);
587 item= task->item;
588 ms_mcd_set(c, item);
589 }
590
591 /**
592 * finish warming up server, wait all connects initialize
593 * complete. Then all connects can start do task at the same
594 * time.
595 */
596 if (c->remain_warmup_num == -1)
597 {
598 ms_send_signal(&ms_global.warmup_lock);
599 c->remain_warmup_num--; /* never run the if branch */
600 }
601 } /* ms_warmup_server */
602
603
604 /**
605 * dispatch single get and set task
606 *
607 * @param c, pointer of the concurrency
608 */
ms_single_getset_task_sch(ms_conn_t * c)609 static void ms_single_getset_task_sch(ms_conn_t *c)
610 {
611 ms_task_t *task;
612 ms_task_item_t *item;
613
614 /* the last time don't run a task */
615 if (c->remain_exec_num-- > 0)
616 {
617 task= ms_get_task(c, false);
618 item= task->item;
619 if (task->cmd == CMD_SET)
620 {
621 ms_mcd_set(c, item);
622 }
623 else if (task->cmd == CMD_GET)
624 {
625 assert(task->cmd == CMD_GET);
626 ms_mcd_get(c, item);
627 }
628 }
629 } /* ms_single_getset_task_sch */
630
631
632 /**
633 * dispatch multi-get and set task
634 *
635 * @param c, pointer of the concurrency
636 */
ms_multi_getset_task_sch(ms_conn_t * c)637 static void ms_multi_getset_task_sch(ms_conn_t *c)
638 {
639 ms_task_t *task;
640 ms_mlget_task_item_t *mlget_item;
641
642 while (1)
643 {
644 if (c->remain_exec_num-- > 0)
645 {
646 task= ms_get_task(c, false);
647 if (task->cmd == CMD_SET) /* just do it */
648 {
649 ms_mcd_set(c, task->item);
650 break;
651 }
652 else
653 {
654 assert(task->cmd == CMD_GET);
655 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.mlget_num];
656 mlget_item->item= task->item;
657 mlget_item->verify= task->verify;
658 mlget_item->finish_verify= task->finish_verify;
659 mlget_item->get_miss= task->get_miss;
660 c->mlget_task.mlget_num++;
661
662 /* enough multi-get task items can be done */
663 if ((c->mlget_task.mlget_num >= ms_setting.mult_key_num)
664 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
665 {
666 ms_mcd_mlget(c);
667 break;
668 }
669 }
670 }
671 else
672 {
673 if ((c->remain_exec_num <= 0) && (c->mlget_task.mlget_num > 0))
674 {
675 ms_mcd_mlget(c);
676 }
677 break;
678 }
679 }
680 } /* ms_multi_getset_task_sch */
681
682
683 /**
684 * calculate the difference value of two time points
685 *
686 * @param start_time, the start time
687 * @param end_time, the end time
688 *
689 * @return uint64_t, the difference value between start_time and end_time in us
690 */
ms_time_diff(struct timeval * start_time,struct timeval * end_time)691 int64_t ms_time_diff(struct timeval *start_time, struct timeval *end_time)
692 {
693 int64_t endtime= end_time->tv_sec * 1000000 + end_time->tv_usec;
694 int64_t starttime= start_time->tv_sec * 1000000 + start_time->tv_usec;
695
696 assert(endtime >= starttime);
697
698 return endtime - starttime;
699 } /* ms_time_diff */
700
701
702 /**
703 * after get the response from server for multi-get, the
704 * function update the state of the task and do data verify if
705 * necessary.
706 *
707 * @param c, pointer of the concurrency
708 */
ms_update_multi_get_result(ms_conn_t * c)709 static void ms_update_multi_get_result(ms_conn_t *c)
710 {
711 ms_mlget_task_item_t *mlget_item;
712 ms_task_item_t *item;
713 char *orignval= NULL;
714 char *orignkey= NULL;
715
716 if (c == NULL)
717 {
718 return;
719 }
720 assert(c != NULL);
721
722 for (int i= 0; i < c->mlget_task.mlget_num; i++)
723 {
724 mlget_item= &c->mlget_task.mlget_item[i];
725 item= mlget_item->item;
726 orignval= &ms_setting.char_block[item->value_offset];
727 orignkey= &ms_setting.char_block[item->key_suffix_offset];
728
729 /* update get miss counter */
730 if (mlget_item->get_miss)
731 {
732 atomic_add_size(&ms_stats.get_misses, 1);
733 }
734
735 /* get nothing from server for this task item */
736 if (mlget_item->verify && ! mlget_item->finish_verify)
737 {
738 /* verify expire time if necessary */
739 if (item->exp_time > 0)
740 {
741 struct timeval curr_time;
742 gettimeofday(&curr_time, NULL);
743
744 /* object doesn't expire but can't get it now */
745 if (curr_time.tv_sec - item->client_time
746 < item->exp_time - EXPIRE_TIME_ERROR)
747 {
748 atomic_add_size(&ms_stats.unexp_unget, 1);
749
750 if (ms_setting.verbose)
751 {
752 char set_time[64];
753 char cur_time[64];
754 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
755 localtime(&item->client_time));
756 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
757 localtime(&curr_time.tv_sec));
758 fprintf(stderr,
759 "\n\t<%d expire time verification failed, object "
760 "doesn't expire but can't get it now\n"
761 "\tkey len: %d\n"
762 "\tkey: %" PRIx64 " %.*s\n"
763 "\tset time: %s current time: %s "
764 "diff time: %d expire time: %d\n"
765 "\texpected data len: %d\n"
766 "\texpected data: %.*s\n"
767 "\treceived data: \n",
768 c->sfd,
769 item->key_size,
770 item->key_prefix,
771 item->key_size - (int)KEY_PREFIX_SIZE,
772 orignkey,
773 set_time,
774 cur_time,
775 (int)(curr_time.tv_sec - item->client_time),
776 item->exp_time,
777 item->value_size,
778 item->value_size,
779 orignval);
780 fflush(stderr);
781 }
782 }
783 }
784 else
785 {
786 atomic_add_size(&ms_stats.vef_miss, 1);
787
788 if (ms_setting.verbose)
789 {
790 fprintf(stderr, "\n<%d data verification failed\n"
791 "\tkey len: %d\n"
792 "\tkey: %" PRIx64 " %.*s\n"
793 "\texpected data len: %d\n"
794 "\texpected data: %.*s\n"
795 "\treceived data: \n",
796 c->sfd, item->key_size, item->key_prefix,
797 item->key_size - (int)KEY_PREFIX_SIZE,
798 orignkey, item->value_size, item->value_size, orignval);
799 fflush(stderr);
800 }
801 }
802 }
803 }
804 c->mlget_task.mlget_num= 0;
805 c->mlget_task.value_index= INVALID_OFFSET;
806 } /* ms_update_multi_get_result */
807
808
809 /**
810 * after get the response from server for single get, the
811 * function update the state of the task and do data verify if
812 * necessary.
813 *
814 * @param c, pointer of the concurrency
815 * @param item, pointer of task item which includes the object
816 * information
817 */
ms_update_single_get_result(ms_conn_t * c,ms_task_item_t * item)818 static void ms_update_single_get_result(ms_conn_t *c, ms_task_item_t *item)
819 {
820 char *orignval= NULL;
821 char *orignkey= NULL;
822
823 if ((c == NULL) || (item == NULL))
824 {
825 return;
826 }
827 assert(c != NULL);
828 assert(item != NULL);
829
830 orignval= &ms_setting.char_block[item->value_offset];
831 orignkey= &ms_setting.char_block[item->key_suffix_offset];
832
833 /* update get miss counter */
834 if ((c->precmd.cmd == CMD_GET) && c->curr_task.get_miss)
835 {
836 atomic_add_size(&ms_stats.get_misses, 1);
837 }
838
839 /* get nothing from server for this task item */
840 if ((c->precmd.cmd == CMD_GET) && c->curr_task.verify
841 && ! c->curr_task.finish_verify)
842 {
843 /* verify expire time if necessary */
844 if (item->exp_time > 0)
845 {
846 struct timeval curr_time;
847 gettimeofday(&curr_time, NULL);
848
849 /* object doesn't expire but can't get it now */
850 if (curr_time.tv_sec - item->client_time
851 < item->exp_time - EXPIRE_TIME_ERROR)
852 {
853 atomic_add_size(&ms_stats.unexp_unget, 1);
854
855 if (ms_setting.verbose)
856 {
857 char set_time[64];
858 char cur_time[64];
859 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
860 localtime(&item->client_time));
861 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
862 localtime(&curr_time.tv_sec));
863 fprintf(stderr,
864 "\n\t<%d expire time verification failed, object "
865 "doesn't expire but can't get it now\n"
866 "\tkey len: %d\n"
867 "\tkey: %" PRIx64 " %.*s\n"
868 "\tset time: %s current time: %s "
869 "diff time: %d expire time: %d\n"
870 "\texpected data len: %d\n"
871 "\texpected data: %.*s\n"
872 "\treceived data: \n",
873 c->sfd,
874 item->key_size,
875 item->key_prefix,
876 item->key_size - (int)KEY_PREFIX_SIZE,
877 orignkey,
878 set_time,
879 cur_time,
880 (int)(curr_time.tv_sec - item->client_time),
881 item->exp_time,
882 item->value_size,
883 item->value_size,
884 orignval);
885 fflush(stderr);
886 }
887 }
888 }
889 else
890 {
891 atomic_add_size(&ms_stats.vef_miss, 1);
892
893 if (ms_setting.verbose)
894 {
895 fprintf(stderr, "\n<%d data verification failed\n"
896 "\tkey len: %d\n"
897 "\tkey: %" PRIx64 " %.*s\n"
898 "\texpected data len: %d\n"
899 "\texpected data: %.*s\n"
900 "\treceived data: \n",
901 c->sfd, item->key_size, item->key_prefix,
902 item->key_size - (int)KEY_PREFIX_SIZE,
903 orignkey, item->value_size, item->value_size, orignval);
904 fflush(stderr);
905 }
906 }
907 }
908 } /* ms_update_single_get_result */
909
910
911 /**
912 * after get the response from server for set the function
913 * update the state of the task and do data verify if necessary.
914 *
915 * @param c, pointer of the concurrency
916 * @param item, pointer of task item which includes the object
917 * information
918 */
ms_update_set_result(ms_conn_t * c,ms_task_item_t * item)919 static void ms_update_set_result(ms_conn_t *c, ms_task_item_t *item)
920 {
921 if ((c == NULL) || (item == NULL))
922 {
923 return;
924 }
925 assert(c != NULL);
926 assert(item != NULL);
927
928 if (c->precmd.cmd == CMD_SET)
929 {
930 switch (c->precmd.retstat)
931 {
932 case MCD_STORED:
933 if (item->value_offset == INVALID_OFFSET)
934 {
935 /* first set with the same offset of key suffix */
936 item->value_offset= item->key_suffix_offset;
937 }
938 else
939 {
940 /* not first set, just increase the value offset */
941 item->value_offset+= 1;
942 }
943
944 /* set successes, update counter */
945 c->set_cursor++;
946 c->curr_task.set_opt++;
947 c->curr_task.cycle_undo_set--;
948 break;
949
950 case MCD_SERVER_ERROR:
951 default:
952 break;
953 } /* switch */
954 }
955 } /* ms_update_set_result */
956
957
958 /**
959 * update the response time result
960 *
961 * @param c, pointer of the concurrency
962 */
ms_update_stat_result(ms_conn_t * c)963 static void ms_update_stat_result(ms_conn_t *c)
964 {
965 bool get_miss= false;
966
967 if (c == NULL)
968 {
969 return;
970 }
971 assert(c != NULL);
972
973 gettimeofday(&c->end_time, NULL);
974 uint64_t time_diff= (uint64_t)ms_time_diff(&c->start_time, &c->end_time);
975
976 pthread_mutex_lock(&ms_statistic.stat_mutex);
977
978 switch (c->precmd.cmd)
979 {
980 case CMD_SET:
981 ms_record_event(&ms_statistic.set_stat, time_diff, false);
982 break;
983
984 case CMD_GET:
985 if (c->curr_task.get_miss)
986 {
987 get_miss= true;
988 }
989 ms_record_event(&ms_statistic.get_stat, time_diff, get_miss);
990 break;
991
992 default:
993 break;
994 } /* switch */
995
996 ms_record_event(&ms_statistic.total_stat, time_diff, get_miss);
997 pthread_mutex_unlock(&ms_statistic.stat_mutex);
998 } /* ms_update_stat_result */
999
1000
1001 /**
1002 * after get response from server for the current operation, and
1003 * before doing the next operation, update the state of the
1004 * current operation.
1005 *
1006 * @param c, pointer of the concurrency
1007 */
ms_update_task_result(ms_conn_t * c)1008 static void ms_update_task_result(ms_conn_t *c)
1009 {
1010 ms_task_item_t *item;
1011
1012 if (c == NULL)
1013 {
1014 return;
1015 }
1016 assert(c != NULL);
1017
1018 item= ms_get_cur_opt_item(c);
1019 if (item == NULL)
1020 {
1021 return;
1022 }
1023 assert(item != NULL);
1024
1025 ms_update_set_result(c, item);
1026
1027 if ((ms_setting.stat_freq > 0)
1028 && ((c->precmd.cmd == CMD_SET) || (c->precmd.cmd == CMD_GET)))
1029 {
1030 ms_update_stat_result(c);
1031 }
1032
1033 /* update multi-get task item */
1034 if (((ms_setting.mult_key_num > 1)
1035 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1036 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1037 {
1038 ms_update_multi_get_result(c);
1039 }
1040 else
1041 {
1042 ms_update_single_get_result(c, item);
1043 }
1044 } /* ms_update_task_result */
1045
1046
1047 /**
1048 * run get and set operation
1049 *
1050 * @param c, pointer of the concurrency
1051 *
1052 * @return int, if success, return EXIT_SUCCESS, else return -1
1053 */
ms_run_getset_task(ms_conn_t * c)1054 static int ms_run_getset_task(ms_conn_t *c)
1055 {
1056 /**
1057 * extra one loop to get the last command return state. get the
1058 * last command return state.
1059 */
1060 if ((c->remain_exec_num >= 0)
1061 && (c->remain_exec_num != c->exec_num))
1062 {
1063 ms_update_task_result(c);
1064 }
1065
1066 /* multi-get */
1067 if (ms_setting.mult_key_num > 1)
1068 {
1069 /* operate next task item */
1070 ms_multi_getset_task_sch(c);
1071 }
1072 else
1073 {
1074 /* operate next task item */
1075 ms_single_getset_task_sch(c);
1076 }
1077
1078 /* no task to do, exit */
1079 if ((c->remain_exec_num == -1) || ms_global.time_out)
1080 {
1081 return -1;
1082 }
1083
1084 return EXIT_SUCCESS;
1085 } /* ms_run_getset_task */
1086
1087
1088 /**
1089 * the state machine call the function to execute task.
1090 *
1091 * @param c, pointer of the concurrency
1092 *
1093 * @return int, if success, return EXIT_SUCCESS, else return -1
1094 */
ms_exec_task(struct conn * c)1095 int ms_exec_task(struct conn *c)
1096 {
1097 if (! ms_global.finish_warmup)
1098 {
1099 ms_warmup_server(c);
1100 }
1101 else
1102 {
1103 if (ms_run_getset_task(c) != 0)
1104 {
1105 return -1;
1106 }
1107 }
1108
1109 return EXIT_SUCCESS;
1110 } /* ms_exec_task */
1111