1 
2 // vim:sw=2:ai
3 
4 #include <signal.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <vector>
9 #include <map>
10 #include <stdlib.h>
11 #include <memory>
12 #include <errno.h>
13 #include <mysql.h>
14 #include <time.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18 
19 #include "util.hpp"
20 #include "auto_ptrcontainer.hpp"
21 #include "socket.hpp"
22 #include "hstcpcli.hpp"
23 #include "string_util.hpp"
24 #include "mutex.hpp"
25 
26 namespace dena {
27 
28 struct auto_mysql : private noncopyable {
auto_mysqldena::auto_mysql29   auto_mysql() : db(0) {
30     reset();
31   }
~auto_mysqldena::auto_mysql32   ~auto_mysql() {
33     if (db) {
34       mysql_close(db);
35     }
36   }
resetdena::auto_mysql37   void reset() {
38     if (db) {
39       mysql_close(db);
40     }
41     if ((db = mysql_init(0)) == 0) {
42       fatal_exit("failed to initialize mysql client");
43     }
44   }
operator MYSQL*dena::auto_mysql45   operator MYSQL *() const { return db; }
46  private:
47   MYSQL *db;
48 };
49 
50 struct auto_mysql_res : private noncopyable {
auto_mysql_resdena::auto_mysql_res51   auto_mysql_res(MYSQL *db) {
52     res = mysql_store_result(db);
53   }
~auto_mysql_resdena::auto_mysql_res54   ~auto_mysql_res() {
55     if (res) {
56       mysql_free_result(res);
57     }
58   }
operator MYSQL_RES*dena::auto_mysql_res59   operator MYSQL_RES *() const { return res; }
60  private:
61   MYSQL_RES *res;
62 };
63 
64 struct auto_mysql_stmt : private noncopyable {
auto_mysql_stmtdena::auto_mysql_stmt65   auto_mysql_stmt(MYSQL *db) {
66     stmt = mysql_stmt_init(db);
67   }
~auto_mysql_stmtdena::auto_mysql_stmt68   ~auto_mysql_stmt() {
69     if (stmt) {
70       mysql_stmt_close(stmt);
71     }
72   }
operator MYSQL_STMT*dena::auto_mysql_stmt73   operator MYSQL_STMT *() const { return stmt; }
74  private:
75   MYSQL_STMT *stmt;
76 };
77 
78 double
gettimeofday_double()79 gettimeofday_double()
80 {
81   struct timeval tv = { };
82   if (gettimeofday(&tv, 0) != 0) {
83     fatal_abort("gettimeofday");
84   }
85   return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
86 }
87 
88 struct record_value {
89   mutex lock;
90   bool deleted;
91   bool unknown_state;
92   std::string key;
93   std::vector<std::string> values;
record_valuedena::record_value94   record_value() : deleted(true), unknown_state(false) { }
95 };
96 
97 struct hs_longrun_shared {
98   config conf;
99   socket_args arg;
100   int verbose;
101   long num_threads;
102   int usleep;
103   volatile mutable int running;
104   auto_ptrcontainer< std::vector<record_value *> > records;
hs_longrun_shareddena::hs_longrun_shared105   hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { }
106 };
107 
108 struct thread_base {
thread_basedena::thread_base109   thread_base() : need_join(false), stack_size(256 * 1024) { }
~thread_basedena::thread_base110   virtual ~thread_base() {
111     join();
112   }
113   virtual void run() = 0;
startdena::thread_base114   void start() {
115     if (!start_nothrow()) {
116       fatal_abort("thread::start");
117     }
118   }
start_nothrowdena::thread_base119   bool start_nothrow() {
120     if (need_join) {
121       return need_join; /* true */
122     }
123     void *const arg = this;
124     pthread_attr_t attr;
125     if (pthread_attr_init(&attr) != 0) {
126       fatal_abort("pthread_attr_init");
127     }
128     if (pthread_attr_setstacksize(&attr, stack_size) != 0) {
129       fatal_abort("pthread_attr_setstacksize");
130     }
131     const int r = pthread_create(&thr, &attr, thread_main, arg);
132     if (pthread_attr_destroy(&attr) != 0) {
133       fatal_abort("pthread_attr_destroy");
134     }
135     if (r != 0) {
136       return need_join; /* false */
137     }
138     need_join = true;
139     return need_join; /* true */
140   }
joindena::thread_base141   void join() {
142     if (!need_join) {
143       return;
144     }
145     int e = 0;
146     if ((e = pthread_join(thr, 0)) != 0) {
147       fatal_abort("pthread_join");
148     }
149     need_join = false;
150   }
151  private:
thread_maindena::thread_base152   static void *thread_main(void *arg) {
153     thread_base *p = static_cast<thread_base *>(arg);
154     p->run();
155     return 0;
156   }
157  private:
158   pthread_t thr;
159   bool need_join;
160   size_t stack_size;
161 };
162 
163 struct hs_longrun_stat {
164   unsigned long long verify_error_count;
165   unsigned long long runtime_error_count;
166   unsigned long long unknown_count;
167   unsigned long long success_count;
hs_longrun_statdena::hs_longrun_stat168   hs_longrun_stat()
169     : verify_error_count(0), runtime_error_count(0),
170       unknown_count(0), success_count(0) { }
adddena::hs_longrun_stat171   void add(const hs_longrun_stat& x) {
172     verify_error_count += x.verify_error_count;
173     runtime_error_count += x.runtime_error_count;
174     unknown_count += x.unknown_count;
175     success_count += x.success_count;
176   }
177 };
178 
179 struct hs_longrun_thread_base : public thread_base {
180   struct arg_type {
181     int id;
182     std::string worker_type;
183     char op;
184     int lock_flag;
185     const hs_longrun_shared& sh;
arg_typedena::hs_longrun_thread_base::arg_type186     arg_type(int id, const std::string& worker_type, char op, int lock_flag,
187       const hs_longrun_shared& sh)
188       : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag),
189 	sh(sh) { }
190   };
191   arg_type arg;
192   hs_longrun_stat stat;
193   drand48_data randbuf;
194   unsigned int seed;
hs_longrun_thread_basedena::hs_longrun_thread_base195   hs_longrun_thread_base(const arg_type& arg)
196     : arg(arg), seed(0) {
197     seed = time(0) + arg.id + 1;
198     srand48_r(seed, &randbuf);
199   }
~hs_longrun_thread_basedena::hs_longrun_thread_base200   virtual ~hs_longrun_thread_base() { }
201   virtual void run() = 0;
rand_recorddena::hs_longrun_thread_base202   size_t rand_record() {
203     double v = 0;
204     drand48_r(&randbuf, &v);
205     const size_t sz = arg.sh.records.size();
206     size_t r = size_t(v * sz);
207     if (r >= sz) {
208       r = 0;
209     }
210     return r;
211   }
212   int verify_update(const std::string& k, const std::string& v1,
213     const std::string& v2, const std::string& v3, record_value& rec,
214     uint32_t num_rows, bool cur_unknown_state);
215   int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds,
216     const std::string rrec[4], record_value& rec);
217   int verify_readnolock(const std::string& k, uint32_t num_rows,
218     uint32_t num_flds, const std::string rrec[4]);
219 };
220 
221 int
verify_update(const std::string & k,const std::string & v1,const std::string & v2,const std::string & v3,record_value & rec,uint32_t num_rows,bool cur_unknown_state)222 hs_longrun_thread_base::verify_update(const std::string& k,
223   const std::string& v1, const std::string& v2, const std::string& v3,
224   record_value& rec, uint32_t num_rows, bool cur_unknown_state)
225 {
226   const bool op_success = num_rows == 1;
227   int ret = 0;
228   if (!rec.unknown_state) {
229     if (!rec.deleted && !op_success) {
230       ++stat.verify_error_count;
231       if (arg.sh.verbose > 0) {
232 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
233 	  "unexpected_update_failure\n",
234 	  arg.worker_type.c_str(), arg.id, k.c_str());
235       }
236       ret = 1;
237     } else if (rec.deleted && op_success) {
238       ++stat.verify_error_count;
239       if (arg.sh.verbose > 0) {
240 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
241 	  "unexpected_update_success\n",
242 	  arg.worker_type.c_str(), arg.id, k.c_str());
243       }
244       ret = 1;
245     }
246   }
247   if (op_success) {
248     rec.values.resize(4);
249     rec.values[0] = k;
250     rec.values[1] = v1;
251     rec.values[2] = v2;
252     rec.values[3] = v3;
253     if (ret == 0 && !rec.unknown_state) {
254       ++stat.success_count;
255     }
256   }
257   rec.unknown_state = cur_unknown_state;
258   if (arg.sh.verbose >= 100 && ret == 0) {
259     fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(),
260       k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
261   }
262   return ret;
263 }
264 
265 int
verify_read(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4],record_value & rec)266 hs_longrun_thread_base::verify_read(const std::string& k,
267   uint32_t num_rows, uint32_t num_flds, const std::string rrec[4],
268   record_value& rec)
269 {
270   const bool op_success = num_rows != 0;
271   int ret = 0;
272   if (!rec.unknown_state) {
273     if (!rec.deleted && !op_success) {
274       ++stat.verify_error_count;
275       if (arg.sh.verbose > 0) {
276 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
277 	  "unexpected_read_failure\n",
278 	  arg.worker_type.c_str(), arg.id, k.c_str());
279       }
280       ret = 1;
281     } else if (rec.deleted && op_success) {
282       ++stat.verify_error_count;
283       if (arg.sh.verbose > 0) {
284 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
285 	  "unexpected_read_success\n",
286 	  arg.worker_type.c_str(), arg.id, k.c_str());
287       }
288       ret = 1;
289     } else if (num_flds != 4) {
290       ++stat.verify_error_count;
291       if (arg.sh.verbose > 0) {
292 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
293 	  "unexpected_read_fldnum %d\n",
294 	  arg.worker_type.c_str(), arg.id, k.c_str(),
295 	  static_cast<int>(num_flds));
296       }
297       ret = 1;
298     } else if (rec.deleted) {
299       /* nothing to verify */
300     } else {
301       int diff = 0;
302       for (size_t i = 0; i < 4; ++i) {
303 	if (rec.values[i] == rrec[i]) {
304 	  /* ok */
305 	} else {
306 	  diff = 1;
307 	}
308       }
309       if (diff) {
310 	std::string mess;
311 	for (size_t i = 0; i < 4; ++i) {
312 	  const std::string& expected = rec.values[i];
313 	  const std::string& val = rrec[i];
314 	  mess += " " + val + "/" + expected;
315 	}
316 	if (arg.sh.verbose > 0) {
317 	  fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
318 	    "unexpected_read_value %s\n",
319 	    arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str());
320 	}
321 	ret = 1;
322       }
323     }
324   }
325   if (arg.sh.verbose >= 100 && ret == 0) {
326     fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str());
327   }
328   if (ret == 0 && !rec.unknown_state) {
329     ++stat.success_count;
330   }
331   return ret;
332 }
333 
334 int
verify_readnolock(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4])335 hs_longrun_thread_base::verify_readnolock(const std::string& k,
336   uint32_t num_rows, uint32_t num_flds, const std::string rrec[4])
337 {
338   int ret = 0;
339   if (num_rows != 1 || num_flds != 4) {
340     ++stat.verify_error_count;
341     if (arg.sh.verbose > 0) {
342       fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
343 	"unexpected_read_failure\n",
344 	arg.worker_type.c_str(), arg.id, k.c_str());
345     }
346     ret = 1;
347   }
348   if (arg.sh.verbose >= 100 && ret == 0) {
349     fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(),
350       k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(),
351       rrec[3].c_str());
352   }
353   if (ret == 0) {
354     ++stat.success_count;
355   }
356   return ret;
357 }
358 
359 struct hs_longrun_thread_hs : public hs_longrun_thread_base {
hs_longrun_thread_hsdena::hs_longrun_thread_hs360   hs_longrun_thread_hs(const arg_type& arg)
361     : hs_longrun_thread_base(arg) { }
362   void run();
363   int check_hs_error(const char *mess, record_value *rec);
364   int op_insert(record_value& rec);
365   int op_delete(record_value& rec);
366   int op_update(record_value& rec);
367   int op_read(record_value& rec);
368   int op_readnolock(int k);
369   hstcpcli_ptr cli;
370   socket_args sockargs;
371 };
372 
373 string_ref
to_string_ref(const std::string & s)374 to_string_ref(const std::string& s)
375 {
376   return string_ref(s.data(), s.size());
377 }
378 
379 std::string
to_string(const string_ref & s)380 to_string(const string_ref& s)
381 {
382   return std::string(s.begin(), s.size());
383 }
384 
385 void
run()386 hs_longrun_thread_hs::run()
387 {
388   config c = arg.sh.conf;
389   if (arg.op == 'R' || arg.op == 'N') {
390     c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998));
391   } else {
392     c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999));
393   }
394   sockargs.set(c);
395 
396   while (arg.sh.running) {
397     if (cli.get() == 0 || !cli->stable_point()) {
398       cli = hstcpcli_i::create(sockargs);
399       if (check_hs_error("connect", 0) != 0) {
400 	cli.reset();
401 	continue;
402       }
403       cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY",
404 	"k,v1,v2,v3", "k,v1,v2,v3");
405       cli->request_send();
406       if (check_hs_error("openindex_send", 0) != 0) {
407 	cli.reset();
408 	continue;
409       }
410       size_t num_flds = 0;
411       cli->response_recv(num_flds);
412       if (check_hs_error("openindex_recv", 0) != 0) {
413 	cli.reset();
414 	continue;
415       }
416       cli->response_buf_remove();
417     }
418     const size_t rec_id = rand_record();
419     if (arg.lock_flag) {
420       record_value& rec = *arg.sh.records[rec_id];
421       lock_guard g(rec.lock);
422       int e = 0;
423       switch (arg.op) {
424       case 'I':
425 	e = op_insert(rec);
426 	break;
427       case 'D':
428 	e = op_delete(rec);
429 	break;
430       case 'U':
431 	e = op_update(rec);
432 	break;
433       case 'R':
434 	e = op_read(rec);
435 	break;
436       default:
437 	break;
438       }
439     } else {
440       int e = 0;
441       switch (arg.op) {
442       case 'N':
443 	e = op_readnolock(rec_id);
444 	break;
445       default:
446 	break;
447       }
448     }
449   }
450 }
451 
452 int
op_insert(record_value & rec)453 hs_longrun_thread_hs::op_insert(record_value& rec)
454 {
455   const std::string k = rec.key;
456   const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id);
457   const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id);
458   const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id);
459   const string_ref op_ref("+", 1);
460   const string_ref op_args[4] = {
461     to_string_ref(k),
462     to_string_ref(v1),
463     to_string_ref(v2),
464     to_string_ref(v3)
465   };
466   cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0,
467     string_ref(), 0, 0, 0, 0);
468   cli->request_send();
469   if (check_hs_error("op_insert_send", &rec) != 0) { return 1; }
470   size_t numflds = 0;
471   cli->response_recv(numflds);
472   if (arg.sh.verbose > 10) {
473     const string_ref *row = cli->get_next_row();
474     fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(),
475       row ? to_string(row[0]).c_str() : "");
476   }
477   const bool op_success = cli->get_error_code() == 0;
478   int ret = 0;
479   if (!rec.unknown_state) {
480     if (rec.deleted && !op_success) {
481       ++stat.verify_error_count;
482       if (arg.sh.verbose > 0) {
483 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
484 	  "unexpected_insert_failure\n",
485 	  arg.worker_type.c_str(), arg.id, k.c_str());
486       }
487       ret = 1;
488     } else if (!rec.deleted && op_success) {
489       ++stat.verify_error_count;
490       if (arg.sh.verbose > 0) {
491 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
492 	  "unexpected_insert_success\n",
493 	  arg.worker_type.c_str(), arg.id, k.c_str());
494       }
495       ret = 1;
496     }
497   } else {
498     ++stat.unknown_count;
499   }
500   if (op_success) {
501     rec.values.resize(4);
502     rec.values[0] = k;
503     rec.values[1] = v1;
504     rec.values[2] = v2;
505     rec.values[3] = v3;
506     rec.deleted = false;
507     if (arg.sh.verbose >= 100 && ret == 0) {
508       fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(),
509 	v2.c_str(), v3.c_str());
510     }
511     if (ret == 0 && !rec.unknown_state) {
512       ++stat.success_count;
513     }
514     rec.unknown_state = false;
515   }
516   cli->response_buf_remove();
517   return ret;
518 }
519 
520 int
op_delete(record_value & rec)521 hs_longrun_thread_hs::op_delete(record_value& rec)
522 {
523   const std::string k = rec.key;
524   const string_ref op_ref("=", 1);
525   const string_ref op_args[1] = {
526     to_string_ref(k),
527   };
528   const string_ref modop_ref("D", 1);
529   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
530     modop_ref, 0, 0, 0, 0);
531   cli->request_send();
532   if (check_hs_error("op_delete_send", &rec) != 0) { return 1; }
533   size_t numflds = 0;
534   cli->response_recv(numflds);
535   if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; }
536   const string_ref *row = cli->get_next_row();
537   const bool op_success = (numflds > 0 && row != 0 &&
538     to_string(row[0]) == "1");
539   int ret = 0;
540   if (!rec.unknown_state) {
541     if (!rec.deleted && !op_success) {
542       ++stat.verify_error_count;
543       if (arg.sh.verbose > 0) {
544 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
545 	  "unexpected_delete_failure\n",
546 	  arg.worker_type.c_str(), arg.id, k.c_str());
547       }
548       ret = 1;
549     } else if (rec.deleted && op_success) {
550       ++stat.verify_error_count;
551       if (arg.sh.verbose > 0) {
552 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
553 	  "unexpected_delete_success\n",
554 	  arg.worker_type.c_str(), arg.id, k.c_str());
555       }
556       ret = 1;
557     }
558   }
559   cli->response_buf_remove();
560   if (op_success) {
561     rec.deleted = true;
562     if (ret == 0 && !rec.unknown_state) {
563       ++stat.success_count;
564     }
565     rec.unknown_state = false;
566   }
567   if (arg.sh.verbose >= 100 && ret == 0) {
568     fprintf(stderr, "HS_DELETE %s\n", k.c_str());
569   }
570   return ret;
571 }
572 
573 int
op_update(record_value & rec)574 hs_longrun_thread_hs::op_update(record_value& rec)
575 {
576   const std::string k = rec.key;
577   const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id);
578   const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id);
579   const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id);
580   const string_ref op_ref("=", 1);
581   const string_ref op_args[1] = {
582     to_string_ref(k),
583   };
584   const string_ref modop_ref("U", 1);
585   const string_ref modop_args[4] = {
586     to_string_ref(k),
587     to_string_ref(v1),
588     to_string_ref(v2),
589     to_string_ref(v3)
590   };
591   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
592     modop_ref, modop_args, 4, 0, 0);
593   cli->request_send();
594   if (check_hs_error("op_update_send", &rec) != 0) { return 1; }
595   size_t numflds = 0;
596   cli->response_recv(numflds);
597   if (check_hs_error("op_update_recv", &rec) != 0) { return 1; }
598   const string_ref *row = cli->get_next_row();
599   uint32_t num_rows = row
600     ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0;
601   cli->response_buf_remove();
602   const bool cur_unknown_state = (num_rows == 1);
603   return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state);
604 }
605 
606 int
op_read(record_value & rec)607 hs_longrun_thread_hs::op_read(record_value& rec)
608 {
609   const std::string k = rec.key;
610   const string_ref op_ref("=", 1);
611   const string_ref op_args[1] = {
612     to_string_ref(k),
613   };
614   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
615     string_ref(), 0, 0, 0, 0);
616   cli->request_send();
617   if (check_hs_error("op_read_send", 0) != 0) { return 1; }
618   size_t num_flds = 0;
619   size_t num_rows = 0;
620   cli->response_recv(num_flds);
621   if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
622   const string_ref *row = cli->get_next_row();
623   std::string rrec[4];
624   if (row != 0 && num_flds == 4) {
625     for (int i = 0; i < 4; ++i) {
626       rrec[i] = to_string(row[i]);
627     }
628     ++num_rows;
629   }
630   row = cli->get_next_row();
631   if (row != 0) {
632     ++num_rows;
633   }
634   cli->response_buf_remove();
635   return verify_read(k, num_rows, num_flds, rrec, rec);
636 }
637 
638 int
op_readnolock(int key)639 hs_longrun_thread_hs::op_readnolock(int key)
640 {
641   const std::string k = to_stdstring(key);
642   const string_ref op_ref("=", 1);
643   const string_ref op_args[1] = {
644     to_string_ref(k),
645   };
646   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
647     string_ref(), 0, 0, 0, 0);
648   cli->request_send();
649   if (check_hs_error("op_read_send", 0) != 0) { return 1; }
650   size_t num_flds = 0;
651   size_t num_rows = 0;
652   cli->response_recv(num_flds);
653   if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
654   const string_ref *row = cli->get_next_row();
655   std::string rrec[4];
656   if (row != 0 && num_flds == 4) {
657     for (int i = 0; i < 4; ++i) {
658       rrec[i] = to_string(row[i]);
659     }
660     ++num_rows;
661   }
662   row = cli->get_next_row();
663   if (row != 0) {
664     ++num_rows;
665   }
666   cli->response_buf_remove();
667   return verify_readnolock(k, num_rows, num_flds, rrec);
668 }
669 
670 int
check_hs_error(const char * mess,record_value * rec)671 hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec)
672 {
673   const int err = cli->get_error_code();
674   if (err == 0) {
675     return 0;
676   }
677   ++stat.runtime_error_count;
678   if (arg.sh.verbose > 0) {
679     const std::string estr = cli->get_error();
680     fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n",
681       arg.op, arg.id, mess, err, estr.c_str());
682   }
683   if (rec) {
684     rec->unknown_state = true;
685   }
686   return 1;
687 }
688 
689 struct hs_longrun_thread_my : public hs_longrun_thread_base {
hs_longrun_thread_mydena::hs_longrun_thread_my690   hs_longrun_thread_my(const arg_type& arg)
691     : hs_longrun_thread_base(arg), connected(false) { }
692   void run();
693   void show_mysql_error(const char *mess, record_value *rec);
694   int op_insert(record_value& rec);
695   int op_delete(record_value& rec);
696   int op_update(record_value& rec);
697   int op_delins(record_value& rec);
698   int op_read(record_value& rec);
699   auto_mysql db;
700   bool connected;
701 };
702 
703 void
run()704 hs_longrun_thread_my::run()
705 {
706   const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
707   const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
708   const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
709   const std::string mysql_dbname = "hstestdb";
710 
711   while (arg.sh.running) {
712     if (!connected) {
713       if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
714 	mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
715 	show_mysql_error("mysql_real_connect", 0);
716 	continue;
717       }
718     }
719     connected = true;
720     const size_t rec_id = rand_record();
721     record_value& rec = *arg.sh.records[rec_id];
722     lock_guard g(rec.lock);
723     int e = 0;
724     switch (arg.op) {
725     #if 0
726     case 'I':
727       e = op_insert(rec);
728       break;
729     case 'D':
730       e = op_delete(rec);
731       break;
732     case 'U':
733       e = op_update(rec);
734       break;
735     #endif
736     case 'T':
737       e = op_delins(rec);
738       break;
739     case 'R':
740       e = op_read(rec);
741       break;
742     default:
743       break;
744     }
745   }
746 }
747 
748 int
op_delins(record_value & rec)749 hs_longrun_thread_my::op_delins(record_value& rec)
750 {
751   const std::string k = rec.key;
752   const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id);
753   const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id);
754   const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id);
755   int success = 0;
756   bool cur_unknown_state = false;
757   do {
758     char query[1024];
759     #if 1
760     if (mysql_query(db, "begin") != 0) {
761       if (arg.sh.verbose >= 20) {
762 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin");
763       }
764       break;
765     }
766     #endif
767     cur_unknown_state = true;
768     snprintf(query, 1024,
769       "delete from hstesttbl where k = '%s'", k.c_str());
770     if (mysql_query(db, query) != 0) {
771       if (arg.sh.verbose >= 20) {
772 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
773       }
774       break;
775     }
776     if (mysql_affected_rows(db) != 1) {
777       if (arg.sh.verbose >= 20) {
778 	fprintf(stderr, "mysql: notfound: [%s]\n", query);
779       }
780       break;
781     }
782     snprintf(query, 1024,
783       "insert into hstesttbl values ('%s', '%s', '%s', '%s')",
784       k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
785     if (mysql_query(db, query) != 0) {
786       if (arg.sh.verbose >= 20) {
787 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
788       }
789       break;
790     }
791     #if 1
792     if (mysql_query(db, "commit") != 0) {
793       if (arg.sh.verbose >= 20) {
794 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit");
795       }
796       break;
797     }
798     #endif
799     success = true;
800     cur_unknown_state = false;
801   } while (false);
802   return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state);
803 }
804 
805 int
op_read(record_value & rec)806 hs_longrun_thread_my::op_read(record_value& rec)
807 {
808   const std::string k = rec.key;
809   char query[1024] = { 0 };
810   const int len = snprintf(query, 1024,
811     "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str());
812   const int r = mysql_real_query(db, query, len > 0 ? len : 0);
813   if (r != 0) {
814     show_mysql_error(query, 0);
815     return 1;
816   }
817   MYSQL_ROW row = 0;
818   unsigned long *lengths = 0;
819   unsigned int num_rows = 0;
820   unsigned int num_flds = 0;
821   auto_mysql_res res(db);
822   std::string rrec[4];
823   if (res != 0) {
824     num_flds = mysql_num_fields(res);
825     row = mysql_fetch_row(res);
826     if (row != 0) {
827       lengths = mysql_fetch_lengths(res);
828       if (num_flds == 4) {
829 	for (int i = 0; i < 4; ++i) {
830 	  rrec[i] = std::string(row[i], lengths[i]);
831 	}
832       }
833       ++num_rows;
834       row = mysql_fetch_row(res);
835       if (row != 0) {
836 	++num_rows;
837       }
838     }
839   }
840   return verify_read(k, num_rows, num_flds, rrec, rec);
841 }
842 
843 void
show_mysql_error(const char * mess,record_value * rec)844 hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec)
845 {
846   ++stat.runtime_error_count;
847   if (arg.sh.verbose > 0) {
848     fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n",
849       arg.op, arg.id, mess, mysql_error(db));
850   }
851   if (rec) {
852     rec->unknown_state = true;
853   }
854   db.reset();
855   connected = false;
856 }
857 
858 void
mysql_do(MYSQL * db,const char * query)859 mysql_do(MYSQL *db, const char *query)
860 {
861   if (mysql_real_query(db, query, strlen(query)) != 0) {
862     fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
863     fatal_exit("mysql_do");
864   }
865 }
866 
867 void
hs_longrun_init_table(const config & conf,int num_prepare,hs_longrun_shared & shared)868 hs_longrun_init_table(const config& conf, int num_prepare,
869   hs_longrun_shared& shared)
870 {
871   const std::string mysql_host = conf.get_str("host", "localhost");
872   const std::string mysql_user = conf.get_str("mysqluser", "root");
873   const std::string mysql_passwd = conf.get_str("mysqlpass", "");
874   const std::string mysql_dbname = "";
875   auto_mysql db;
876   if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
877     mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
878     fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db));
879     fatal_exit("hs_longrun_init_table");
880   }
881   mysql_do(db, "drop database if exists hstestdb");
882   mysql_do(db, "create database hstestdb");
883   mysql_do(db, "use hstestdb");
884   mysql_do(db,
885     "create table hstesttbl ("
886     "k int primary key,"
887     "v1 varchar(32) not null,"
888     "v2 varchar(32) not null,"
889     "v3 varchar(32) not null"
890     ") character set utf8 collate utf8_bin engine = innodb");
891   for (int i = 0; i < num_prepare; ++i) {
892     const std::string i_str = to_stdstring(i);
893     const std::string v1 = "pv1_" + i_str;
894     const std::string v2 = "pv2_" + i_str;
895     const std::string v3 = "pv3_" + i_str;
896     char buf[1024];
897     snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values"
898       "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str());
899     mysql_do(db, buf);
900     record_value *rec = shared.records[i];
901     rec->key = i_str;
902     rec->values.resize(4);
903     rec->values[0] = i_str;
904     rec->values[1] = v1;
905     rec->values[2] = v2;
906     rec->values[3] = v3;
907     rec->deleted = false;
908   }
909 }
910 
911 int
hs_longrun_main(int argc,char ** argv)912 hs_longrun_main(int argc, char **argv)
913 {
914   hs_longrun_shared shared;
915   parse_args(argc, argv, shared.conf);
916   shared.conf["host"] = shared.conf.get_str("host", "localhost");
917   shared.verbose = shared.conf.get_int("verbose", 1);
918   const int table_size = shared.conf.get_int("table_size", 10000);
919   for (int i = 0; i < table_size; ++i) {
920     std::auto_ptr<record_value> rec(new record_value());
921     rec->key = to_stdstring(i);
922     shared.records.push_back_ptr(rec);
923   }
924   mysql_library_init(0, 0, 0);
925   const int duration = shared.conf.get_int("duration", 10);
926   const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10);
927   const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10);
928   const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10);
929   const int num_hsread = shared.conf.get_int("num_hsread", 10);
930   const int num_myread = shared.conf.get_int("num_myread", 10);
931   const int num_mydelins = shared.conf.get_int("num_mydelins", 10);
932   int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10);
933   const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0);
934   if (!always_filled) {
935     num_hsreadnolock = 0;
936   }
937   hs_longrun_init_table(shared.conf, always_filled ? table_size : 0,
938     shared);
939   /* create worker threads */
940   static const struct thrtmpl_type {
941     const char *type; char op; int num; int hs; int lock;
942   } thrtmpl[] = {
943     { "hsinsert", 'I', num_hsinsert, 1, 1 },
944     { "hsdelete", 'D', num_hsdelete, 1, 1 },
945     { "hsupdate", 'U', num_hsupdate, 1, 1 },
946     { "hsread", 'R', num_hsread, 1, 1 },
947     { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 },
948     { "myread", 'R', num_myread, 0, 1 },
949     { "mydelins", 'T', num_mydelins, 0, 1 },
950   };
951   typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type;
952   thrs_type thrs;
953   for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) {
954     const thrtmpl_type& e = thrtmpl[i];
955     for (int j = 0; j < e.num; ++j) {
956       int id = thrs.size();
957       const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock,
958 	shared);
959       std::auto_ptr<hs_longrun_thread_base> thr;
960       if (e.hs) {
961       	thr.reset(new hs_longrun_thread_hs(arg));
962       } else {
963 	thr.reset(new hs_longrun_thread_my(arg));
964       }
965       thrs.push_back_ptr(thr);
966     }
967   }
968   shared.num_threads = thrs.size();
969   /* start threads */
970   fprintf(stderr, "START\n");
971   shared.running = 1;
972   for (size_t i = 0; i < thrs.size(); ++i) {
973     thrs[i]->start();
974   }
975   /* wait */
976   sleep(duration);
977   /* stop thread */
978   shared.running = 0;
979   for (size_t i = 0; i < thrs.size(); ++i) {
980     thrs[i]->join();
981   }
982   fprintf(stderr, "DONE\n");
983   /* summary */
984   typedef std::map<std::string, hs_longrun_stat> stat_map;
985   stat_map sm;
986   for (size_t i = 0; i < thrs.size(); ++i) {
987     hs_longrun_thread_base *const thr = thrs[i];
988     const std::string wt = thr->arg.worker_type;
989     hs_longrun_stat& v = sm[wt];
990     v.add(thr->stat);
991   }
992   hs_longrun_stat total;
993   for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) {
994     if (i->second.verify_error_count != 0) {
995       fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(),
996 	i->second.verify_error_count);
997     }
998     if (i->second.runtime_error_count) {
999       fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(),
1000 	i->second.runtime_error_count);
1001     }
1002     if (i->second.unknown_count) {
1003       fprintf(stderr, "%s unknown %llu\n", i->first.c_str(),
1004 	i->second.unknown_count);
1005     }
1006     fprintf(stderr, "%s success %llu\n", i->first.c_str(),
1007       i->second.success_count);
1008     total.add(i->second);
1009   }
1010   if (total.verify_error_count != 0) {
1011     fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count);
1012   }
1013   if (total.runtime_error_count != 0) {
1014     fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count);
1015   }
1016   if (total.unknown_count != 0) {
1017     fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count);
1018   }
1019   fprintf(stderr, "TOTAL success %llu\n", total.success_count);
1020   mysql_library_end();
1021   return 0;
1022 }
1023 
1024 };
1025 
1026 int
main(int argc,char ** argv)1027 main(int argc, char **argv)
1028 {
1029   return dena::hs_longrun_main(argc, argv);
1030 }
1031 
1032