1 
2 // vim:sw=2:ai
3 
4 #include <signal.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <vector>
9 #include <map>
10 #include <stdlib.h>
11 #include <memory>
12 #include <errno.h>
13 #include <mysql.h>
14 #include <time.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18 
19 #include "util.hpp"
20 #include "auto_ptrcontainer.hpp"
21 #include "socket.hpp"
22 #include "hstcpcli.hpp"
23 #include "string_util.hpp"
24 #include "mutex.hpp"
25 
26 namespace dena {
27 
28 struct auto_mysql : private noncopyable {
auto_mysqldena::auto_mysql29   auto_mysql() : db(0) {
30     reset();
31   }
~auto_mysqldena::auto_mysql32   ~auto_mysql() {
33     if (db) {
34       mysql_close(db);
35     }
36   }
resetdena::auto_mysql37   void reset() {
38     if (db) {
39       mysql_close(db);
40     }
41     if ((db = mysql_init(0)) == 0) {
42       fatal_abort("failed to initialize mysql client");
43     }
44   }
operator MYSQL*dena::auto_mysql45   operator MYSQL *() const { return db; }
46  private:
47   MYSQL *db;
48 };
49 
50 struct auto_mysql_res : private noncopyable {
auto_mysql_resdena::auto_mysql_res51   auto_mysql_res(MYSQL *db) {
52     res = mysql_store_result(db);
53   }
~auto_mysql_resdena::auto_mysql_res54   ~auto_mysql_res() {
55     if (res) {
56       mysql_free_result(res);
57     }
58   }
operator MYSQL_RES*dena::auto_mysql_res59   operator MYSQL_RES *() const { return res; }
60  private:
61   MYSQL_RES *res;
62 };
63 
64 struct auto_mysql_stmt : private noncopyable {
auto_mysql_stmtdena::auto_mysql_stmt65   auto_mysql_stmt(MYSQL *db) {
66     stmt = mysql_stmt_init(db);
67   }
~auto_mysql_stmtdena::auto_mysql_stmt68   ~auto_mysql_stmt() {
69     if (stmt) {
70       mysql_stmt_close(stmt);
71     }
72   }
operator MYSQL_STMT*dena::auto_mysql_stmt73   operator MYSQL_STMT *() const { return stmt; }
74  private:
75   MYSQL_STMT *stmt;
76 };
77 
78 double
gettimeofday_double()79 gettimeofday_double()
80 {
81   struct timeval tv = { };
82   if (gettimeofday(&tv, 0) != 0) {
83     fatal_abort("gettimeofday");
84   }
85   return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
86 }
87 
88 struct record_value {
89   mutex lock;
90   bool deleted;
91   bool unknown_state;
92   std::string key;
93   std::vector<std::string> values;
record_valuedena::record_value94   record_value() : deleted(true), unknown_state(false) { }
95 };
96 
97 struct hs_longrun_shared {
98   config conf;
99   socket_args arg;
100   int verbose;
101   long num_threads;
102   int usleep;
103   volatile mutable int running;
104   auto_ptrcontainer< std::vector<record_value *> > records;
hs_longrun_shareddena::hs_longrun_shared105   hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { }
106 };
107 
108 struct thread_base {
thread_basedena::thread_base109   thread_base() : need_join(false), stack_size(256 * 1024) { }
~thread_basedena::thread_base110   virtual ~thread_base() {
111     join();
112   }
113   virtual void run() = 0;
startdena::thread_base114   void start() {
115     if (!start_nothrow()) {
116       fatal_abort("thread::start");
117     }
118   }
start_nothrowdena::thread_base119   bool start_nothrow() {
120     if (need_join) {
121       return need_join; /* true */
122     }
123     void *const arg = this;
124     pthread_attr_t attr;
125     if (pthread_attr_init(&attr) != 0) {
126       fatal_abort("pthread_attr_init");
127     }
128     if (pthread_attr_setstacksize(&attr, stack_size) != 0) {
129       fatal_abort("pthread_attr_setstacksize");
130     }
131     const int r = pthread_create(&thr, &attr, thread_main, arg);
132     if (pthread_attr_destroy(&attr) != 0) {
133       fatal_abort("pthread_attr_destroy");
134     }
135     if (r != 0) {
136       return need_join; /* false */
137     }
138     need_join = true;
139     return need_join; /* true */
140   }
joindena::thread_base141   void join() {
142     if (!need_join) {
143       return;
144     }
145     int e = 0;
146     if ((e = pthread_join(thr, 0)) != 0) {
147       fatal_abort("pthread_join");
148     }
149     need_join = false;
150   }
151  private:
thread_maindena::thread_base152   static void *thread_main(void *arg) {
153     thread_base *p = static_cast<thread_base *>(arg);
154     p->run();
155     return 0;
156   }
157  private:
158   pthread_t thr;
159   bool need_join;
160   size_t stack_size;
161 };
162 
163 struct hs_longrun_stat {
164   unsigned long long verify_error_count;
165   unsigned long long runtime_error_count;
166   unsigned long long unknown_count;
167   unsigned long long success_count;
hs_longrun_statdena::hs_longrun_stat168   hs_longrun_stat()
169     : verify_error_count(0), runtime_error_count(0),
170       unknown_count(0), success_count(0) { }
adddena::hs_longrun_stat171   void add(const hs_longrun_stat& x) {
172     verify_error_count += x.verify_error_count;
173     runtime_error_count += x.runtime_error_count;
174     unknown_count += x.unknown_count;
175     success_count += x.success_count;
176   }
177 };
178 
179 struct hs_longrun_thread_base : public thread_base {
180   struct arg_type {
181     int id;
182     std::string worker_type;
183     char op;
184     int lock_flag;
185     const hs_longrun_shared& sh;
arg_typedena::hs_longrun_thread_base::arg_type186     arg_type(int id, const std::string& worker_type, char op, int lock_flag,
187       const hs_longrun_shared& sh)
188       : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag),
189 	sh(sh) { }
190   };
191   arg_type arg;
192   hs_longrun_stat stat;
193   drand48_data randbuf;
194   unsigned int seed;
hs_longrun_thread_basedena::hs_longrun_thread_base195   hs_longrun_thread_base(const arg_type& arg)
196     : arg(arg), seed(0) {
197     seed = time(0) + arg.id + 1;
198     srand48_r(seed, &randbuf);
199   }
~hs_longrun_thread_basedena::hs_longrun_thread_base200   virtual ~hs_longrun_thread_base() { }
201   virtual void run() = 0;
rand_recorddena::hs_longrun_thread_base202   size_t rand_record() {
203     double v = 0;
204     drand48_r(&randbuf, &v);
205     const size_t sz = arg.sh.records.size();
206     size_t r = size_t(v * sz);
207     if (r >= sz) {
208       r = 0;
209     }
210     return r;
211   }
212   int verify_update(const std::string& k, const std::string& v1,
213     const std::string& v2, const std::string& v3, record_value& rec,
214     uint32_t num_rows, bool cur_unknown_state);
215   int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds,
216     const std::string rrec[4], record_value& rec);
217   int verify_readnolock(const std::string& k, uint32_t num_rows,
218     uint32_t num_flds, const std::string rrec[4]);
219 };
220 
221 int
verify_update(const std::string & k,const std::string & v1,const std::string & v2,const std::string & v3,record_value & rec,uint32_t num_rows,bool cur_unknown_state)222 hs_longrun_thread_base::verify_update(const std::string& k,
223   const std::string& v1, const std::string& v2, const std::string& v3,
224   record_value& rec, uint32_t num_rows, bool cur_unknown_state)
225 {
226   const bool op_success = num_rows == 1;
227   int ret = 0;
228   if (!rec.unknown_state) {
229     if (!rec.deleted && !op_success) {
230       ++stat.verify_error_count;
231       if (arg.sh.verbose > 0) {
232 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
233 	  "unexpected_update_failure\n",
234 	  arg.worker_type.c_str(), arg.id, k.c_str());
235       }
236       ret = 1;
237     } else if (rec.deleted && op_success) {
238       ++stat.verify_error_count;
239       if (arg.sh.verbose > 0) {
240 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
241 	  "unexpected_update_success\n",
242 	  arg.worker_type.c_str(), arg.id, k.c_str());
243       }
244       ret = 1;
245     }
246   }
247   if (op_success) {
248     rec.values.resize(4);
249     rec.values[0] = k;
250     rec.values[1] = v1;
251     rec.values[2] = v2;
252     rec.values[3] = v3;
253     if (ret == 0 && !rec.unknown_state) {
254       ++stat.success_count;
255     }
256   }
257   rec.unknown_state = cur_unknown_state;
258   if (arg.sh.verbose >= 100 && ret == 0) {
259     fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(),
260       k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
261   }
262   return ret;
263 }
264 
265 int
verify_read(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4],record_value & rec)266 hs_longrun_thread_base::verify_read(const std::string& k,
267   uint32_t num_rows, uint32_t num_flds, const std::string rrec[4],
268   record_value& rec)
269 {
270   const bool op_success = num_rows != 0;
271   int ret = 0;
272   if (!rec.unknown_state) {
273     if (!rec.deleted && !op_success) {
274       ++stat.verify_error_count;
275       if (arg.sh.verbose > 0) {
276 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
277 	  "unexpected_read_failure\n",
278 	  arg.worker_type.c_str(), arg.id, k.c_str());
279       }
280       ret = 1;
281     } else if (rec.deleted && op_success) {
282       ++stat.verify_error_count;
283       if (arg.sh.verbose > 0) {
284 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
285 	  "unexpected_read_success\n",
286 	  arg.worker_type.c_str(), arg.id, k.c_str());
287       }
288       ret = 1;
289     } else if (num_flds != 4) {
290       ++stat.verify_error_count;
291       if (arg.sh.verbose > 0) {
292 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
293 	  "unexpected_read_fldnum %d\n",
294 	  arg.worker_type.c_str(), arg.id, k.c_str(),
295 	  static_cast<int>(num_flds));
296       }
297       ret = 1;
298     } else if (rec.deleted) {
299       /* nothing to verify */
300     } else {
301       int diff = 0;
302       for (size_t i = 0; i < 4; ++i) {
303 	if (rec.values[i] == rrec[i]) {
304 	  /* ok */
305 	} else {
306 	  diff = 1;
307 	}
308       }
309       if (diff) {
310 	std::string mess;
311 	for (size_t i = 0; i < 4; ++i) {
312 	  const std::string& expected = rec.values[i];
313 	  const std::string& val = rrec[i];
314 	  mess += " " + val + "/" + expected;
315 	}
316 	if (arg.sh.verbose > 0) {
317 	  fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
318 	    "unexpected_read_value %s\n",
319 	    arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str());
320 	}
321 	ret = 1;
322       }
323     }
324   }
325   if (arg.sh.verbose >= 100 && ret == 0) {
326     fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str());
327   }
328   if (ret == 0 && !rec.unknown_state) {
329     ++stat.success_count;
330   }
331   return ret;
332 }
333 
334 int
verify_readnolock(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4])335 hs_longrun_thread_base::verify_readnolock(const std::string& k,
336   uint32_t num_rows, uint32_t num_flds, const std::string rrec[4])
337 {
338   int ret = 0;
339   if (num_rows != 1 || num_flds != 4) {
340     ++stat.verify_error_count;
341     if (arg.sh.verbose > 0) {
342       fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
343 	"unexpected_read_failure\n",
344 	arg.worker_type.c_str(), arg.id, k.c_str());
345     }
346     ret = 1;
347   }
348   if (arg.sh.verbose >= 100 && ret == 0) {
349     fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(),
350       k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(),
351       rrec[3].c_str());
352   }
353   if (ret == 0) {
354     ++stat.success_count;
355   }
356   return ret;
357 }
358 
359 struct hs_longrun_thread_hs : public hs_longrun_thread_base {
hs_longrun_thread_hsdena::hs_longrun_thread_hs360   hs_longrun_thread_hs(const arg_type& arg)
361     : hs_longrun_thread_base(arg) { }
362   void run();
363   int check_hs_error(const char *mess, record_value *rec);
364   int op_insert(record_value& rec);
365   int op_delete(record_value& rec);
366   int op_update(record_value& rec);
367   int op_read(record_value& rec);
368   int op_readnolock(int k);
369   hstcpcli_ptr cli;
370   socket_args sockargs;
371 };
372 
373 struct lock_guard : noncopyable {
lock_guarddena::lock_guard374   lock_guard(mutex& mtx) : mtx(mtx) {
375     mtx.lock();
376   }
~lock_guarddena::lock_guard377   ~lock_guard() {
378     mtx.unlock();
379   }
380   mutex& mtx;
381 };
382 
383 string_ref
to_string_ref(const std::string & s)384 to_string_ref(const std::string& s)
385 {
386   return string_ref(s.data(), s.size());
387 }
388 
389 std::string
to_string(const string_ref & s)390 to_string(const string_ref& s)
391 {
392   return std::string(s.begin(), s.size());
393 }
394 
395 void
run()396 hs_longrun_thread_hs::run()
397 {
398   config c = arg.sh.conf;
399   if (arg.op == 'R' || arg.op == 'N') {
400     c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998));
401   } else {
402     c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999));
403   }
404   sockargs.set(c);
405 
406   while (arg.sh.running) {
407     if (cli.get() == 0 || !cli->stable_point()) {
408       cli = hstcpcli_i::create(sockargs);
409       if (check_hs_error("connect", 0) != 0) {
410 	cli.reset();
411 	continue;
412       }
413       cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY",
414 	"k,v1,v2,v3", "k,v1,v2,v3");
415       cli->request_send();
416       if (check_hs_error("openindex_send", 0) != 0) {
417 	cli.reset();
418 	continue;
419       }
420       size_t num_flds = 0;
421       cli->response_recv(num_flds);
422       if (check_hs_error("openindex_recv", 0) != 0) {
423 	cli.reset();
424 	continue;
425       }
426       cli->response_buf_remove();
427     }
428     const size_t rec_id = rand_record();
429     if (arg.lock_flag) {
430       record_value& rec = *arg.sh.records[rec_id];
431       lock_guard g(rec.lock);
432       int e = 0;
433       switch (arg.op) {
434       case 'I':
435 	e = op_insert(rec);
436 	break;
437       case 'D':
438 	e = op_delete(rec);
439 	break;
440       case 'U':
441 	e = op_update(rec);
442 	break;
443       case 'R':
444 	e = op_read(rec);
445 	break;
446       default:
447 	break;
448       }
449     } else {
450       int e = 0;
451       switch (arg.op) {
452       case 'N':
453 	e = op_readnolock(rec_id);
454 	break;
455       default:
456 	break;
457       }
458     }
459   }
460 }
461 
462 int
op_insert(record_value & rec)463 hs_longrun_thread_hs::op_insert(record_value& rec)
464 {
465   const std::string k = rec.key;
466   const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id);
467   const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id);
468   const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id);
469   const string_ref op_ref("+", 1);
470   const string_ref op_args[4] = {
471     to_string_ref(k),
472     to_string_ref(v1),
473     to_string_ref(v2),
474     to_string_ref(v3)
475   };
476   cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0,
477     string_ref(), 0, 0, 0, 0);
478   cli->request_send();
479   if (check_hs_error("op_insert_send", &rec) != 0) { return 1; }
480   size_t numflds = 0;
481   cli->response_recv(numflds);
482   if (arg.sh.verbose > 10) {
483     const string_ref *row = cli->get_next_row();
484     fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(),
485       row ? to_string(row[0]).c_str() : "");
486   }
487   const bool op_success = cli->get_error_code() == 0;
488   int ret = 0;
489   if (!rec.unknown_state) {
490     if (rec.deleted && !op_success) {
491       ++stat.verify_error_count;
492       if (arg.sh.verbose > 0) {
493 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
494 	  "unexpected_insert_failure\n",
495 	  arg.worker_type.c_str(), arg.id, k.c_str());
496       }
497       ret = 1;
498     } else if (!rec.deleted && op_success) {
499       ++stat.verify_error_count;
500       if (arg.sh.verbose > 0) {
501 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
502 	  "unexpected_insert_success\n",
503 	  arg.worker_type.c_str(), arg.id, k.c_str());
504       }
505       ret = 1;
506     }
507   } else {
508     ++stat.unknown_count;
509   }
510   if (op_success) {
511     rec.values.resize(4);
512     rec.values[0] = k;
513     rec.values[1] = v1;
514     rec.values[2] = v2;
515     rec.values[3] = v3;
516     rec.deleted = false;
517     if (arg.sh.verbose >= 100 && ret == 0) {
518       fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(),
519 	v2.c_str(), v3.c_str());
520     }
521     if (ret == 0 && !rec.unknown_state) {
522       ++stat.success_count;
523     }
524     rec.unknown_state = false;
525   }
526   cli->response_buf_remove();
527   return ret;
528 }
529 
530 int
op_delete(record_value & rec)531 hs_longrun_thread_hs::op_delete(record_value& rec)
532 {
533   const std::string k = rec.key;
534   const string_ref op_ref("=", 1);
535   const string_ref op_args[1] = {
536     to_string_ref(k),
537   };
538   const string_ref modop_ref("D", 1);
539   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
540     modop_ref, 0, 0, 0, 0);
541   cli->request_send();
542   if (check_hs_error("op_delete_send", &rec) != 0) { return 1; }
543   size_t numflds = 0;
544   cli->response_recv(numflds);
545   if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; }
546   const string_ref *row = cli->get_next_row();
547   const bool op_success = (numflds > 0 && row != 0 &&
548     to_string(row[0]) == "1");
549   int ret = 0;
550   if (!rec.unknown_state) {
551     if (!rec.deleted && !op_success) {
552       ++stat.verify_error_count;
553       if (arg.sh.verbose > 0) {
554 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
555 	  "unexpected_delete_failure\n",
556 	  arg.worker_type.c_str(), arg.id, k.c_str());
557       }
558       ret = 1;
559     } else if (rec.deleted && op_success) {
560       ++stat.verify_error_count;
561       if (arg.sh.verbose > 0) {
562 	fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
563 	  "unexpected_delete_success\n",
564 	  arg.worker_type.c_str(), arg.id, k.c_str());
565       }
566       ret = 1;
567     }
568   }
569   cli->response_buf_remove();
570   if (op_success) {
571     rec.deleted = true;
572     if (ret == 0 && !rec.unknown_state) {
573       ++stat.success_count;
574     }
575     rec.unknown_state = false;
576   }
577   if (arg.sh.verbose >= 100 && ret == 0) {
578     fprintf(stderr, "HS_DELETE %s\n", k.c_str());
579   }
580   return ret;
581 }
582 
583 int
op_update(record_value & rec)584 hs_longrun_thread_hs::op_update(record_value& rec)
585 {
586   const std::string k = rec.key;
587   const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id);
588   const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id);
589   const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id);
590   const string_ref op_ref("=", 1);
591   const string_ref op_args[1] = {
592     to_string_ref(k),
593   };
594   const string_ref modop_ref("U", 1);
595   const string_ref modop_args[4] = {
596     to_string_ref(k),
597     to_string_ref(v1),
598     to_string_ref(v2),
599     to_string_ref(v3)
600   };
601   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
602     modop_ref, modop_args, 4, 0, 0);
603   cli->request_send();
604   if (check_hs_error("op_update_send", &rec) != 0) { return 1; }
605   size_t numflds = 0;
606   cli->response_recv(numflds);
607   if (check_hs_error("op_update_recv", &rec) != 0) { return 1; }
608   const string_ref *row = cli->get_next_row();
609   uint32_t num_rows = row
610     ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0;
611   cli->response_buf_remove();
612   const bool cur_unknown_state = (num_rows == 1);
613   return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state);
614 }
615 
616 int
op_read(record_value & rec)617 hs_longrun_thread_hs::op_read(record_value& rec)
618 {
619   const std::string k = rec.key;
620   const string_ref op_ref("=", 1);
621   const string_ref op_args[1] = {
622     to_string_ref(k),
623   };
624   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
625     string_ref(), 0, 0, 0, 0);
626   cli->request_send();
627   if (check_hs_error("op_read_send", 0) != 0) { return 1; }
628   size_t num_flds = 0;
629   size_t num_rows = 0;
630   cli->response_recv(num_flds);
631   if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
632   const string_ref *row = cli->get_next_row();
633   std::string rrec[4];
634   if (row != 0 && num_flds == 4) {
635     for (int i = 0; i < 4; ++i) {
636       rrec[i] = to_string(row[i]);
637     }
638     ++num_rows;
639   }
640   row = cli->get_next_row();
641   if (row != 0) {
642     ++num_rows;
643   }
644   cli->response_buf_remove();
645   return verify_read(k, num_rows, num_flds, rrec, rec);
646 }
647 
648 int
op_readnolock(int key)649 hs_longrun_thread_hs::op_readnolock(int key)
650 {
651   const std::string k = to_stdstring(key);
652   const string_ref op_ref("=", 1);
653   const string_ref op_args[1] = {
654     to_string_ref(k),
655   };
656   cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
657     string_ref(), 0, 0, 0, 0);
658   cli->request_send();
659   if (check_hs_error("op_read_send", 0) != 0) { return 1; }
660   size_t num_flds = 0;
661   size_t num_rows = 0;
662   cli->response_recv(num_flds);
663   if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
664   const string_ref *row = cli->get_next_row();
665   std::string rrec[4];
666   if (row != 0 && num_flds == 4) {
667     for (int i = 0; i < 4; ++i) {
668       rrec[i] = to_string(row[i]);
669     }
670     ++num_rows;
671   }
672   row = cli->get_next_row();
673   if (row != 0) {
674     ++num_rows;
675   }
676   cli->response_buf_remove();
677   return verify_readnolock(k, num_rows, num_flds, rrec);
678 }
679 
680 int
check_hs_error(const char * mess,record_value * rec)681 hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec)
682 {
683   const int err = cli->get_error_code();
684   if (err == 0) {
685     return 0;
686   }
687   ++stat.runtime_error_count;
688   if (arg.sh.verbose > 0) {
689     const std::string estr = cli->get_error();
690     fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n",
691       arg.op, arg.id, mess, err, estr.c_str());
692   }
693   if (rec) {
694     rec->unknown_state = true;
695   }
696   return 1;
697 }
698 
699 struct hs_longrun_thread_my : public hs_longrun_thread_base {
hs_longrun_thread_mydena::hs_longrun_thread_my700   hs_longrun_thread_my(const arg_type& arg)
701     : hs_longrun_thread_base(arg), connected(false) { }
702   void run();
703   void show_mysql_error(const char *mess, record_value *rec);
704   int op_insert(record_value& rec);
705   int op_delete(record_value& rec);
706   int op_update(record_value& rec);
707   int op_delins(record_value& rec);
708   int op_read(record_value& rec);
709   auto_mysql db;
710   bool connected;
711 };
712 
713 void
run()714 hs_longrun_thread_my::run()
715 {
716   const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
717   const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
718   const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
719   const std::string mysql_dbname = "hstestdb";
720 
721   while (arg.sh.running) {
722     if (!connected) {
723       if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
724 	mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
725 	show_mysql_error("mysql_real_connect", 0);
726 	continue;
727       }
728     }
729     connected = true;
730     const size_t rec_id = rand_record();
731     record_value& rec = *arg.sh.records[rec_id];
732     lock_guard g(rec.lock);
733     int e = 0;
734     switch (arg.op) {
735     #if 0
736     case 'I':
737       e = op_insert(rec);
738       break;
739     case 'D':
740       e = op_delete(rec);
741       break;
742     case 'U':
743       e = op_update(rec);
744       break;
745     #endif
746     case 'T':
747       e = op_delins(rec);
748       break;
749     case 'R':
750       e = op_read(rec);
751       break;
752     default:
753       break;
754     }
755   }
756 }
757 
758 int
op_delins(record_value & rec)759 hs_longrun_thread_my::op_delins(record_value& rec)
760 {
761   const std::string k = rec.key;
762   const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id);
763   const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id);
764   const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id);
765   int success = 0;
766   bool cur_unknown_state = false;
767   do {
768     char query[1024];
769     #if 1
770     if (mysql_query(db, "begin") != 0) {
771       if (arg.sh.verbose >= 20) {
772 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin");
773       }
774       break;
775     }
776     #endif
777     cur_unknown_state = true;
778     snprintf(query, 1024,
779       "delete from hstesttbl where k = '%s'", k.c_str());
780     if (mysql_query(db, query) != 0) {
781       if (arg.sh.verbose >= 20) {
782 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
783       }
784       break;
785     }
786     if (mysql_affected_rows(db) != 1) {
787       if (arg.sh.verbose >= 20) {
788 	fprintf(stderr, "mysql: notfound: [%s]\n", query);
789       }
790       break;
791     }
792     snprintf(query, 1024,
793       "insert into hstesttbl values ('%s', '%s', '%s', '%s')",
794       k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
795     if (mysql_query(db, query) != 0) {
796       if (arg.sh.verbose >= 20) {
797 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
798       }
799       break;
800     }
801     #if 1
802     if (mysql_query(db, "commit") != 0) {
803       if (arg.sh.verbose >= 20) {
804 	fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit");
805       }
806       break;
807     }
808     #endif
809     success = true;
810     cur_unknown_state = false;
811   } while (false);
812   return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state);
813 }
814 
815 int
op_read(record_value & rec)816 hs_longrun_thread_my::op_read(record_value& rec)
817 {
818   const std::string k = rec.key;
819   char query[1024] = { 0 };
820   const int len = snprintf(query, 1024,
821     "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str());
822   const int r = mysql_real_query(db, query, len > 0 ? len : 0);
823   if (r != 0) {
824     show_mysql_error(query, 0);
825     return 1;
826   }
827   MYSQL_ROW row = 0;
828   unsigned long *lengths = 0;
829   unsigned int num_rows = 0;
830   unsigned int num_flds = 0;
831   auto_mysql_res res(db);
832   std::string rrec[4];
833   if (res != 0) {
834     num_flds = mysql_num_fields(res);
835     row = mysql_fetch_row(res);
836     if (row != 0) {
837       lengths = mysql_fetch_lengths(res);
838       if (num_flds == 4) {
839 	for (int i = 0; i < 4; ++i) {
840 	  rrec[i] = std::string(row[i], lengths[i]);
841 	}
842       }
843       ++num_rows;
844       row = mysql_fetch_row(res);
845       if (row != 0) {
846 	++num_rows;
847       }
848     }
849   }
850   return verify_read(k, num_rows, num_flds, rrec, rec);
851 }
852 
853 void
show_mysql_error(const char * mess,record_value * rec)854 hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec)
855 {
856   ++stat.runtime_error_count;
857   if (arg.sh.verbose > 0) {
858     fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n",
859       arg.op, arg.id, mess, mysql_error(db));
860   }
861   if (rec) {
862     rec->unknown_state = true;
863   }
864   db.reset();
865   connected = false;
866 }
867 
868 void
mysql_do(MYSQL * db,const char * query)869 mysql_do(MYSQL *db, const char *query)
870 {
871   if (mysql_real_query(db, query, strlen(query)) != 0) {
872     fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
873     fatal_abort("mysql_do");
874   }
875 }
876 
877 void
hs_longrun_init_table(const config & conf,int num_prepare,hs_longrun_shared & shared)878 hs_longrun_init_table(const config& conf, int num_prepare,
879   hs_longrun_shared& shared)
880 {
881   const std::string mysql_host = conf.get_str("host", "localhost");
882   const std::string mysql_user = conf.get_str("mysqluser", "root");
883   const std::string mysql_passwd = conf.get_str("mysqlpass", "");
884   const std::string mysql_dbname = "";
885   auto_mysql db;
886   if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
887     mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
888     fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db));
889     fatal_abort("hs_longrun_init_table");
890   }
891   mysql_do(db, "drop database if exists hstestdb");
892   mysql_do(db, "create database hstestdb");
893   mysql_do(db, "use hstestdb");
894   mysql_do(db,
895     "create table hstesttbl ("
896     "k int primary key,"
897     "v1 varchar(32) not null,"
898     "v2 varchar(32) not null,"
899     "v3 varchar(32) not null"
900     ") character set utf8 collate utf8_bin engine = innodb");
901   for (int i = 0; i < num_prepare; ++i) {
902     const std::string i_str = to_stdstring(i);
903     const std::string v1 = "pv1_" + i_str;
904     const std::string v2 = "pv2_" + i_str;
905     const std::string v3 = "pv3_" + i_str;
906     char buf[1024];
907     snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values"
908       "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str());
909     mysql_do(db, buf);
910     record_value *rec = shared.records[i];
911     rec->key = i_str;
912     rec->values.resize(4);
913     rec->values[0] = i_str;
914     rec->values[1] = v1;
915     rec->values[2] = v2;
916     rec->values[3] = v3;
917     rec->deleted = false;
918   }
919 }
920 
921 int
hs_longrun_main(int argc,char ** argv)922 hs_longrun_main(int argc, char **argv)
923 {
924   hs_longrun_shared shared;
925   parse_args(argc, argv, shared.conf);
926   shared.conf["host"] = shared.conf.get_str("host", "localhost");
927   shared.verbose = shared.conf.get_int("verbose", 1);
928   const int table_size = shared.conf.get_int("table_size", 10000);
929   for (int i = 0; i < table_size; ++i) {
930     std::auto_ptr<record_value> rec(new record_value());
931     rec->key = to_stdstring(i);
932     shared.records.push_back_ptr(rec);
933   }
934   mysql_library_init(0, 0, 0);
935   const int duration = shared.conf.get_int("duration", 10);
936   const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10);
937   const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10);
938   const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10);
939   const int num_hsread = shared.conf.get_int("num_hsread", 10);
940   const int num_myread = shared.conf.get_int("num_myread", 10);
941   const int num_mydelins = shared.conf.get_int("num_mydelins", 10);
942   int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10);
943   const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0);
944   if (!always_filled) {
945     num_hsreadnolock = 0;
946   }
947   hs_longrun_init_table(shared.conf, always_filled ? table_size : 0,
948     shared);
949   /* create worker threads */
950   static const struct thrtmpl_type {
951     const char *type; char op; int num; int hs; int lock;
952   } thrtmpl[] = {
953     { "hsinsert", 'I', num_hsinsert, 1, 1 },
954     { "hsdelete", 'D', num_hsdelete, 1, 1 },
955     { "hsupdate", 'U', num_hsupdate, 1, 1 },
956     { "hsread", 'R', num_hsread, 1, 1 },
957     { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 },
958     { "myread", 'R', num_myread, 0, 1 },
959     { "mydelins", 'T', num_mydelins, 0, 1 },
960   };
961   typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type;
962   thrs_type thrs;
963   for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) {
964     const thrtmpl_type& e = thrtmpl[i];
965     for (int j = 0; j < e.num; ++j) {
966       int id = thrs.size();
967       const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock,
968 	shared);
969       std::auto_ptr<hs_longrun_thread_base> thr;
970       if (e.hs) {
971       	thr.reset(new hs_longrun_thread_hs(arg));
972       } else {
973 	thr.reset(new hs_longrun_thread_my(arg));
974       }
975       thrs.push_back_ptr(thr);
976     }
977   }
978   shared.num_threads = thrs.size();
979   /* start threads */
980   fprintf(stderr, "START\n");
981   shared.running = 1;
982   for (size_t i = 0; i < thrs.size(); ++i) {
983     thrs[i]->start();
984   }
985   /* wait */
986   sleep(duration);
987   /* stop thread */
988   shared.running = 0;
989   for (size_t i = 0; i < thrs.size(); ++i) {
990     thrs[i]->join();
991   }
992   fprintf(stderr, "DONE\n");
993   /* summary */
994   typedef std::map<std::string, hs_longrun_stat> stat_map;
995   stat_map sm;
996   for (size_t i = 0; i < thrs.size(); ++i) {
997     hs_longrun_thread_base *const thr = thrs[i];
998     const std::string wt = thr->arg.worker_type;
999     hs_longrun_stat& v = sm[wt];
1000     v.add(thr->stat);
1001   }
1002   hs_longrun_stat total;
1003   for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) {
1004     if (i->second.verify_error_count != 0) {
1005       fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(),
1006 	i->second.verify_error_count);
1007     }
1008     if (i->second.runtime_error_count) {
1009       fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(),
1010 	i->second.runtime_error_count);
1011     }
1012     if (i->second.unknown_count) {
1013       fprintf(stderr, "%s unknown %llu\n", i->first.c_str(),
1014 	i->second.unknown_count);
1015     }
1016     fprintf(stderr, "%s success %llu\n", i->first.c_str(),
1017       i->second.success_count);
1018     total.add(i->second);
1019   }
1020   if (total.verify_error_count != 0) {
1021     fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count);
1022   }
1023   if (total.runtime_error_count != 0) {
1024     fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count);
1025   }
1026   if (total.unknown_count != 0) {
1027     fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count);
1028   }
1029   fprintf(stderr, "TOTAL success %llu\n", total.success_count);
1030   mysql_library_end();
1031   return 0;
1032 }
1033 
1034 };
1035 
1036 int
main(int argc,char ** argv)1037 main(int argc, char **argv)
1038 {
1039   return dena::hs_longrun_main(argc, argv);
1040 }
1041 
1042