1 
2 // vim:sw=2:ai
3 
4 #include <signal.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <vector>
9 #include <stdlib.h>
10 #include <memory>
11 #include <errno.h>
12 #include <mysql.h>
13 #include <time.h>
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <fcntl.h>
17 
18 #include "util.hpp"
19 #include "auto_ptrcontainer.hpp"
20 #include "socket.hpp"
21 #include "thread.hpp"
22 #include "hstcpcli.hpp"
23 
24 #if __GNUC__ >= 4
atomic_exchange_and_add(volatile long * valp,long c)25 long atomic_exchange_and_add(volatile long *valp, long c)
26 {
27   return __sync_fetch_and_add(valp, c);
28 }
29 #else
30 #include <bits/atomicity.h>
31 using namespace __gnu_cxx;
atomic_exchange_and_add(volatile long * valp,long c)32 long atomic_exchange_and_add(volatile long *valp, long c)
33 {
34   return __exchange_and_add((volatile _Atomic_word *)valp, c);
35 }
36 #endif
37 
38 namespace dena {
39 
40 struct auto_mysql : private noncopyable {
auto_mysqldena::auto_mysql41   auto_mysql() : db(0) {
42     reset();
43   }
~auto_mysqldena::auto_mysql44   ~auto_mysql() {
45     if (db) {
46       mysql_close(db);
47     }
48   }
resetdena::auto_mysql49   void reset() {
50     if (db) {
51       mysql_close(db);
52     }
53     if ((db = mysql_init(0)) == 0) {
54       fatal_abort("failed to initialize mysql client");
55     }
56   }
operator MYSQL*dena::auto_mysql57   operator MYSQL *() const { return db; }
58  private:
59   MYSQL *db;
60 };
61 
62 struct auto_mysql_res : private noncopyable {
auto_mysql_resdena::auto_mysql_res63   auto_mysql_res(MYSQL *db) {
64     res = mysql_store_result(db);
65   }
~auto_mysql_resdena::auto_mysql_res66   ~auto_mysql_res() {
67     if (res) {
68       mysql_free_result(res);
69     }
70   }
operator MYSQL_RES*dena::auto_mysql_res71   operator MYSQL_RES *() const { return res; }
72  private:
73   MYSQL_RES *res;
74 };
75 
76 struct auto_mysql_stmt : private noncopyable {
auto_mysql_stmtdena::auto_mysql_stmt77   auto_mysql_stmt(MYSQL *db) {
78     stmt = mysql_stmt_init(db);
79   }
~auto_mysql_stmtdena::auto_mysql_stmt80   ~auto_mysql_stmt() {
81     if (stmt) {
82       mysql_stmt_close(stmt);
83     }
84   }
operator MYSQL_STMT*dena::auto_mysql_stmt85   operator MYSQL_STMT *() const { return stmt; }
86  private:
87   MYSQL_STMT *stmt;
88 };
89 
90 namespace {
91 
92 double
gettimeofday_double()93 gettimeofday_double()
94 {
95   struct timeval tv;
96   if (gettimeofday(&tv, 0) != 0) {
97     fatal_abort("gettimeofday");
98   }
99   return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
100 }
101 
102 // unused
103 void
wait_close(int fd)104 wait_close(int fd)
105 {
106   char buf[1024];
107   while (true) {
108     int r = read(fd, buf, sizeof(buf));
109     if (r <= 0) {
110       break;
111     }
112   }
113 }
114 
115 // unused
116 void
gentle_close(int fd)117 gentle_close(int fd)
118 {
119   int r = shutdown(fd, SHUT_WR);
120   if (r != 0) {
121     return;
122   }
123   wait_close(fd);
124 }
125 
126 };
127 
128 struct hstest_shared {
129   config conf;
130   socket_args arg;
131   int verbose;
132   size_t loop;
133   size_t pipe;
134   char op;
135   long num_threads;
136   mutable volatile long count;
137   mutable volatile long conn_count;
138   long wait_conn;
139   volatile char *keygen;
140   long keygen_size;
141   mutable volatile int enable_timing;
142   int usleep;
143   int dump;
hstest_shareddena::hstest_shared144   hstest_shared() : verbose(0), loop(0), pipe(0), op('G'), num_threads(0),
145     count(0), conn_count(0), wait_conn(0), keygen(0), keygen_size(0),
146     enable_timing(0), usleep(0), dump(0) { }
increment_countdena::hstest_shared147   void increment_count(unsigned int c = 1) const volatile {
148     atomic_exchange_and_add(&count, c);
149   }
increment_conndena::hstest_shared150   void increment_conn(unsigned int c) const volatile {
151     atomic_exchange_and_add(&conn_count, c);
152     while (wait_conn != 0 && conn_count < wait_conn) {
153       sleep(1);
154     }
155     // fprintf(stderr, "wait_conn=%ld done\n", wait_conn);
156   }
157 };
158 
159 struct hstest_thread {
160   struct arg_type {
161     size_t id;
162     const hstest_shared& sh;
163     bool watch_flag;
arg_typedena::hstest_thread::arg_type164     arg_type(size_t i, const hstest_shared& s, bool w)
165       : id(i), sh(s), watch_flag(w) { }
166   };
hstest_threaddena::hstest_thread167   hstest_thread(const arg_type& a) : arg(a), io_success_count(0),
168     op_success_count(0), response_min(99999), response_max(0),
169     response_sum(0), response_avg(0) { }
170   void operator ()();
171   void test_1();
172   void test_2_3(int test_num);
173   void test_4_5(int test_num);
174   void test_6(int test_num);
175   void test_7(int test_num);
176   void test_8(int test_num);
177   void test_9(int test_num);
178   void test_10(int test_num);
179   void test_11(int test_num);
180   void test_12(int test_num);
181   void test_21(int test_num);
182   void test_22(int test_num);
183   void test_watch();
184   void sleep_if();
185   void set_timing(double time_spent);
186   arg_type arg;
187   auto_file fd;
188   size_t io_success_count;
189   size_t op_success_count;
190   double response_min, response_max, response_sum, response_avg;
191 };
192 
193 void
test_1()194 hstest_thread::test_1()
195 {
196   char buf[1024];
197   unsigned int seed = arg.id;
198   seed ^= arg.sh.conf.get_int("seed_xor", 0);
199   std::string err;
200   if (socket_connect(fd, arg.sh.arg, err) != 0) {
201     fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
202     return;
203   }
204   const char op = arg.sh.op;
205   const int tablesize = arg.sh.conf.get_int("tablesize", 0);
206   for (size_t i = 0; i < arg.sh.loop; ++i) {
207     for (size_t j = 0; j < arg.sh.pipe; ++j) {
208       int k = 0, v = 0, len = 0;
209       if (op == 'G') {
210 	k = rand_r(&seed);
211 	v = rand_r(&seed); /* unused */
212 	if (tablesize != 0) {
213 	  k &= tablesize;
214 	}
215 	len = snprintf(buf, sizeof(buf), "%c\tk%d\n", op, k);
216       } else {
217 	k = rand_r(&seed);
218 	v = rand_r(&seed);
219 	if (tablesize != 0) {
220 	  k &= tablesize;
221 	}
222 	len = snprintf(buf, sizeof(buf), "%c\tk%d\tv%d\n", op, k, v);
223       }
224       const int wlen = write(fd.get(), buf, len);
225       if (wlen != len) {
226 	return;
227       }
228     }
229     size_t read_cnt = 0;
230     size_t read_pos = 0;
231     while (read_cnt < arg.sh.pipe) {
232       const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos);
233       if (rlen <= 0) {
234 	return;
235       }
236       read_pos += rlen;
237       while (true) {
238 	const char *const p = static_cast<const char *>(memchr(buf, '\n',
239 	  read_pos));
240 	if (p == 0) {
241 	  break;
242 	}
243 	++read_cnt;
244 	++io_success_count;
245 	arg.sh.increment_count();
246 	if (p != buf && buf[0] == '=') {
247 	  ++op_success_count;
248 	}
249 	const size_t rest_size = buf + read_pos - (p + 1);
250 	if (rest_size != 0) {
251 	  memmove(buf, p + 1, rest_size);
252 	}
253 	read_pos = rest_size;
254       }
255     }
256   }
257 }
258 
259 void
test_2_3(int test_num)260 hstest_thread::test_2_3(int test_num)
261 {
262 #if 0
263   char buf_k[128], buf_v[128];
264   unsigned int seed = arg.id;
265   op_base_t op = static_cast<op_base_t>(arg.sh.op);
266   micli_ptr hnd;
267   if (test_num == 2) {
268     hnd = micli_i::create_remote(arg.sh.conf);
269   } else if (test_num == 3) {
270     // hnd = micli_i::create_inproc(arg.sh.localdb);
271   }
272   if (hnd.get() == 0) {
273     return;
274   }
275   for (size_t i = 0; i < arg.sh.loop; ++i) {
276     for (size_t j = 0; j < arg.sh.pipe; ++j) {
277       int k = 0, v = 0, klen = 0, vlen = 0;
278       k = rand_r(&seed);
279       klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
280       v = rand_r(&seed); /* unused */
281       vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v);
282       string_ref arr[2];
283       arr[0] = string_ref(buf_k, klen);
284       arr[1] = string_ref(buf_v, vlen);
285       pstrarr_ptr rec(arr, 2);
286       if (hnd->execute(op, 0, 0, rec.get_const())) {
287 	++io_success_count;
288 	arg.sh.increment_count();
289 	const dataset& res = hnd->get_result_ref();
290 	if (res.size() == 1) {
291 	  ++op_success_count;
292 	}
293       }
294     }
295   }
296 #endif
297 }
298 
299 void
test_4_5(int test_num)300 hstest_thread::test_4_5(int test_num)
301 {
302 #if 0
303   char buf_k[128], buf_v[8192];
304   memset(buf_v, ' ', sizeof(buf_v));
305   unsigned int seed = arg.id;
306   op_base_t op = static_cast<op_base_t>(arg.sh.op);
307   micli_ptr hnd;
308   if (test_num == 4) {
309     hnd = micli_i::create_remote(arg.sh.conf);
310   } else if (test_num == 5) {
311     hnd = micli_i::create_inproc(arg.sh.localdb);
312   }
313   if (hnd.get() == 0) {
314     return;
315   }
316   for (size_t i = 0; i < arg.sh.loop; ++i) {
317     for (size_t j = 0; j < arg.sh.pipe; ++j) {
318       int k = 0, klen = 0, vlen = 0;
319       k = i & 0x0000ffffUL;
320       if (k == 0) {
321 	fprintf(stderr, "k=0\n");
322       }
323       klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
324       vlen = rand_r(&seed) % 8192;
325       string_ref arr[2];
326       arr[0] = string_ref(buf_k, klen);
327       arr[1] = string_ref(buf_v, vlen);
328       pstrarr_ptr rec(arr, 2);
329       if (hnd->execute(op, 0, 0, rec.get_const())) {
330 	++io_success_count;
331 	const dataset& res = hnd->get_result_ref();
332 	if (res.size() == 1) {
333 	  ++op_success_count;
334 	}
335       }
336     }
337   }
338 #endif
339 }
340 
341 void
test_6(int test_num)342 hstest_thread::test_6(int test_num)
343 {
344   int count = arg.sh.conf.get_int("count", 1);
345   auto_file fds[count];
346   for (int i = 0; i < count; ++i) {
347     const double t1 = gettimeofday_double();
348     std::string err;
349     if (socket_connect(fds[i], arg.sh.arg, err) != 0) {
350       fprintf(stderr, "id=%zu i=%d err=%s\n", arg.id, i, err.c_str());
351     }
352     const double t2 = gettimeofday_double();
353     if (t2 - t1 > 1) {
354       fprintf(stderr, "id=%zu i=%d time %f\n", arg.id, i, t2 - t1);
355     }
356   }
357 }
358 
359 void
test_7(int num)360 hstest_thread::test_7(int num)
361 {
362   /*
363     set foo 0 0 10
364     0123456789
365     STORED
366     get foo
367     VALUE foo 0 10
368     0123456789
369     END
370     get var
371     END
372    */
373   char buf[1024];
374   const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
375   unsigned int seed = arg.id;
376   seed ^= arg.sh.conf.get_int("seed_xor", 0);
377   const int tablesize = arg.sh.conf.get_int("tablesize", 0);
378   const char op = arg.sh.op;
379   for (size_t i = 0; i < arg.sh.loop; ++i) {
380     const double tm1 = gettimeofday_double();
381     std::string err;
382     if (fd.get() < 0 && socket_connect(fd, arg.sh.arg, err) != 0) {
383       fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
384       return;
385     }
386     for (size_t j = 0; j < arg.sh.pipe; ++j) {
387       int k = 0, v = 0, len = 0;
388       if (op == 'G') {
389 	k = rand_r(&seed);
390 	v = rand_r(&seed); /* unused */
391 	if (tablesize != 0) {
392 	  k &= tablesize;
393 	}
394 	len = snprintf(buf, sizeof(buf), "get k%d\r\n", k);
395       } else {
396 	k = rand_r(&seed);
397 	v = rand_r(&seed);
398 	if (tablesize != 0) {
399 	  k &= tablesize;
400 	}
401 	char vbuf[1024];
402 	int vlen = snprintf(vbuf, sizeof(vbuf),
403 	  "v%d"
404 	  // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
405 	  // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
406 	  // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
407 	  // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
408 	  // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
409 	  , v);
410 	len = snprintf(buf, sizeof(buf), "set k%d 0 0 %d\r\n%s\r\n",
411 	  k, vlen, vbuf);
412       }
413       const int wlen = write(fd.get(), buf, len);
414       if (wlen != len) {
415 	return;
416       }
417     }
418     size_t read_cnt = 0;
419     size_t read_pos = 0;
420     bool read_response_done = false;
421     bool expect_value = false;
422     while (!read_response_done) {
423       const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos);
424       if (rlen <= 0) {
425 	return;
426       }
427       read_pos += rlen;
428       while (true) {
429 	const char *const p = static_cast<const char *>(memchr(buf, '\n',
430 	  read_pos));
431 	if (p == 0) {
432 	  break;
433 	}
434 	++read_cnt;
435 	if (expect_value) {
436 	  expect_value = false;
437 	} else if (p >= buf + 6 && memcmp(buf, "VALUE ", 6) == 0) {
438 	  expect_value = true;
439 	  ++op_success_count;
440 	} else {
441 	  if (p == buf + 7 && memcmp(buf, "STORED\r", 7) == 0) {
442 	    ++op_success_count;
443 	  }
444 	  read_response_done = true;
445 	}
446 	const size_t rest_size = buf + read_pos - (p + 1);
447 	if (rest_size != 0) {
448 	  memmove(buf, p + 1, rest_size);
449 	}
450 	read_pos = rest_size;
451       }
452       ++io_success_count;
453     }
454     arg.sh.increment_count();
455     if (!keep_connection) {
456       fd.close();
457     }
458     const double tm2 = gettimeofday_double();
459     set_timing(tm2 - tm1);
460     sleep_if();
461   }
462 }
463 
464 struct rec {
465   std::string key;
466   std::string value;
467 };
468 
469 void
test_8(int test_num)470 hstest_thread::test_8(int test_num)
471 {
472 #if 0
473   char buf_k[128], buf_v[128];
474   unsigned int seed = arg.id;
475   // op_base_t op = static_cast<op_base_t>(arg.sh.op);
476   using namespace boost::multi_index;
477   typedef member<rec, std::string, &rec::key> rec_get_key;
478   typedef ordered_unique<rec_get_key> oui;
479   typedef multi_index_container< rec, indexed_by<oui> > mic;
480   #if 0
481   typedef std::map<std::string, std::string> m_type;
482   m_type m;
483   #endif
484   mic m;
485   for (size_t i = 0; i < arg.sh.loop; ++i) {
486     for (size_t j = 0; j < arg.sh.pipe; ++j) {
487       int k = 0, v = 0, klen = 0, vlen = 0;
488       k = rand_r(&seed);
489       klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
490       v = rand_r(&seed); /* unused */
491       vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v);
492       const std::string ks(buf_k, klen);
493       const std::string vs(buf_v, vlen);
494       rec r;
495       r.key = ks;
496       r.value = vs;
497       m.insert(r);
498       // m.insert(std::make_pair(ks, vs));
499       ++io_success_count;
500       ++op_success_count;
501       arg.sh.increment_count();
502     }
503   }
504 #endif
505 }
506 
507 struct mysqltest_thread_initobj : private noncopyable {
mysqltest_thread_initobjdena::mysqltest_thread_initobj508   mysqltest_thread_initobj() {
509     mysql_thread_init();
510   }
~mysqltest_thread_initobjdena::mysqltest_thread_initobj511   ~mysqltest_thread_initobj() {
512     mysql_thread_end();
513   }
514 };
515 
516 void
test_9(int test_num)517 hstest_thread::test_9(int test_num)
518 {
519   /* create table hstest
520    * ( k varchar(255) not null, v varchar(255) not null, primary key(k))
521    * engine = innodb; */
522   auto_mysql db;
523   // mysqltest_thread_initobj initobj;
524   std::string err;
525   const char op = arg.sh.op;
526   const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd");
527   unsigned long long err_cnt = 0;
528   unsigned long long query_cnt = 0;
529   #if 0
530   my_bool reconnect = 0;
531   if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) {
532     err = "mysql_options() failed";
533     ++err_cnt;
534     return;
535   }
536   #endif
537   unsigned int seed = time(0) + arg.id + 1;
538   seed ^= arg.sh.conf.get_int("seed_xor", 0);
539   drand48_data randbuf;
540   srand48_r(seed, &randbuf);
541   const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
542   const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306);
543   const int num = arg.sh.loop;
544   const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
545   const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
546   const std::string mysql_dbname = arg.sh.conf.get_str("dbname", "hstest");
547   const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
548   const int verbose = arg.sh.conf.get_int("verbose", 1);
549   const int tablesize = arg.sh.conf.get_int("tablesize", 10000);
550   const int moreflds = arg.sh.conf.get_int("moreflds", 0);
551   const std::string moreflds_prefix = arg.sh.conf.get_str(
552     "moreflds_prefix", "column0123456789_");
553   const int use_handler = arg.sh.conf.get_int("handler", 0);
554   const int sched_flag = arg.sh.conf.get_int("sched", 0);
555   const int use_in = arg.sh.conf.get_int("in", 0);
556   const int ssps = use_in ? 0 : arg.sh.conf.get_int("ssps", 0);
557   std::string flds = "v";
558   for (int i = 0; i < moreflds; ++i) {
559     char buf[1024];
560     snprintf(buf, sizeof(buf), ",%s%d", moreflds_prefix.c_str(), i);
561     flds += std::string(buf);
562   }
563   int connected = 0;
564   std::auto_ptr<auto_mysql_stmt> stmt;
565   string_buffer wbuf;
566   for (int i = 0; i < num; ++i) {
567     const double tm1 = gettimeofday_double();
568     const int flags = 0;
569     if (connected == 0) {
570       if (!mysql_real_connect(db, mysql_host.c_str(),
571 	mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(),
572 	mysql_dbname.c_str(), mysql_port, 0, flags)) {
573 	err = "failed to connect: " + std::string(mysql_error(db));
574 	if (verbose >= 1) {
575 	  fprintf(stderr, "e=[%s]\n", err.c_str());
576 	}
577 	++err_cnt;
578 	return;
579       }
580       arg.sh.increment_conn(1);
581     }
582     int r = 0;
583     if (connected == 0 && use_handler) {
584       const char *const q = "handler hstest_table1 open";
585       r = mysql_real_query(db, q, strlen(q));
586       if (r != 0) {
587 	err = 1;
588       }
589     }
590     if (connected == 0 && ssps) {
591       stmt.reset(new auto_mysql_stmt(db));
592       const char *const q = "select v from hstest_table1 where k = ?";
593       r = mysql_stmt_prepare(*stmt, q, strlen(q));
594       if (r != 0) {
595 	fprintf(stderr, "ssps err\n");
596 	++err_cnt;
597 	return;
598       }
599     }
600     connected = 1;
601     std::string result_str;
602     unsigned int err = 0;
603     unsigned int num_flds = 0, num_affected_rows = 0;
604     int got_data = 0;
605     char buf_query[16384];
606     int buf_query_len = 0;
607     int k = 0, v = 0;
608     {
609       double kf = 0, vf = 0;
610       drand48_r(&randbuf, &kf);
611       drand48_r(&randbuf, &vf);
612       k = int(kf * tablesize);
613       v = int(vf * tablesize);
614       #if 0
615       k = rand_r(&seed);
616       v = rand_r(&seed);
617       if (tablesize != 0) {
618 	k %= tablesize;
619       }
620       #endif
621       if (op == 'G') {
622 	if (use_handler) {
623 	  buf_query_len = snprintf(buf_query, sizeof(buf_query),
624 	    "handler hstest_table1 read `primary` = ( '%d' )", k);
625 	    // TODO: moreflds
626 	} else if (ssps) {
627 	    //
628 	} else if (use_in) {
629 	  wbuf.clear();
630 	  char *p = wbuf.make_space(1024);
631 	  int len = snprintf(p, 1024, "select %s from hstest_table1 where k in ('%d'", flds.c_str(), k);
632 	  wbuf.space_wrote(len);
633 	  for (int j = 1; j < use_in; ++j) {
634 	    /* generate more key */
635 	    drand48_r(&randbuf, &kf);
636 	    k = int(kf * tablesize);
637 	    p = wbuf.make_space(1024);
638 	    int len = snprintf(p, 1024, ", '%d'", k);
639 	    wbuf.space_wrote(len);
640 	  }
641 	  wbuf.append_literal(")");
642 	} else {
643 	  buf_query_len = snprintf(buf_query, sizeof(buf_query),
644 	    "select %s from hstest_table1 where k = '%d'", flds.c_str(), k);
645 	}
646       } else if (op == 'U') {
647 	buf_query_len = snprintf(buf_query, sizeof(buf_query),
648 	  "update hstest_table1 set v = '%d_%d%s' where k = '%d'",
649 	    v, k, suffix.c_str(), k);
650       } else if (op == 'R') {
651 	buf_query_len = snprintf(buf_query, sizeof(buf_query),
652 	  "replace into hstest_table1 values ('%d', 'v%d')", k, v);
653 	  // TODO: moreflds
654       }
655     }
656     if (r == 0) {
657       if (ssps) {
658 	MYSQL_BIND bind[1] = { };
659 	bind[0].buffer_type = MYSQL_TYPE_LONG;
660 	bind[0].buffer = (char *)&k;
661 	bind[0].is_null = 0;
662 	bind[0].length = 0;
663 	if (mysql_stmt_bind_param(*stmt, bind)) {
664 	  fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
665 	  ++err_cnt;
666 	  return;
667 	}
668 	r = mysql_stmt_execute(*stmt);
669 	// fprintf(stderr, "stmt exec\n");
670       } else if (use_in) {
671 	r = mysql_real_query(db, wbuf.begin(), wbuf.size());
672       } else {
673 	r = mysql_real_query(db, buf_query, buf_query_len);
674 	// fprintf(stderr, "real query\n");
675       }
676       ++query_cnt;
677     }
678     if (r != 0) {
679       err = 1;
680     } else if (ssps) {
681       if (verbose >= 0) {
682 	char resbuf[1024];
683 	unsigned long res_len = 0;
684 	MYSQL_BIND bind[1] = { };
685 	bind[0].buffer_type = MYSQL_TYPE_STRING;
686 	bind[0].buffer = resbuf;
687 	bind[0].buffer_length = sizeof(resbuf);
688 	bind[0].length = &res_len;
689 	if (mysql_stmt_bind_result(*stmt, bind)) {
690 	  fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
691 	  ++err_cnt;
692 	  return;
693 	}
694 	if (mysql_stmt_fetch(*stmt)) {
695 	  fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
696 	  ++err_cnt;
697 	  return;
698 	}
699 	if (!result_str.empty()) {
700 	  result_str += " ";
701 	}
702 	result_str += std::string(resbuf, res_len);
703 	// fprintf(stderr, "SSPS RES: %s\n", result_str.c_str());
704 	got_data = 1;
705       } else {
706 	got_data = 1;
707       }
708     } else {
709       auto_mysql_res res(db);
710       if (res != 0) {
711 	if (verbose >= 0) {
712 	  num_flds = mysql_num_fields(res);
713 	  MYSQL_ROW row = 0;
714 	  while ((row = mysql_fetch_row(res)) != 0) {
715 	    got_data += 1;
716 	    unsigned long *const lengths = mysql_fetch_lengths(res);
717 	    if (verbose >= 2) {
718 	      for (unsigned int i = 0; i < num_flds; ++i) {
719 		if (!result_str.empty()) {
720 		  result_str += " ";
721 		}
722 		result_str += std::string(row[i], lengths[i]);
723 	      }
724 	    }
725 	  }
726 	} else {
727 	  MYSQL_ROW row = 0;
728 	  while ((row = mysql_fetch_row(res)) != 0) {
729 	    got_data += 1;
730 	  }
731 	}
732       } else {
733 	if (mysql_field_count(db) == 0) {
734 	  num_affected_rows = mysql_affected_rows(db);
735 	} else {
736 	  err = 1;
737 	}
738       }
739     }
740     if (verbose >= 2 || (verbose >= 1 && err != 0)) {
741       if (err) {
742 	++err_cnt;
743 	const char *const errstr = mysql_error(db);
744 	fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr,
745 	  num_affected_rows, buf_query);
746       } else {
747 	fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows, buf_query,
748 	  result_str.c_str());
749       }
750     }
751     if (err == 0) {
752       ++io_success_count;
753       if (num_affected_rows > 0 || got_data > 0) {
754 	op_success_count += got_data;
755       } else {
756 	if (verbose >= 1) {
757 	  fprintf(stderr, "k=%d numaff=%u gotdata=%d\n",
758 	    k, num_affected_rows, got_data);
759 	}
760       }
761       arg.sh.increment_count();
762     }
763     if (!keep_connection) {
764       if (stmt.get() != 0) {
765 	stmt.reset();
766       }
767       db.reset();
768       connected = 0;
769     }
770     const double tm2 = gettimeofday_double();
771     set_timing(tm2 - tm1);
772     sleep_if();
773     if (sched_flag) {
774       sched_yield();
775     }
776   }
777   if (verbose >= 1) {
778     fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt);
779   }
780 }
781 
782 void
test_10(int test_num)783 hstest_thread::test_10(int test_num)
784 {
785   const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
786   unsigned int seed = time(0) + arg.id + 1;
787   seed ^= arg.sh.conf.get_int("seed_xor", 0);
788   drand48_data randbuf;
789   srand48_r(seed, &randbuf);
790   std::string err;
791   int keepconn_count = 0;
792   const char op = arg.sh.op;
793   const int verbose = arg.sh.conf.get_int("verbose", 1);
794   const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd");
795   const int tablesize = arg.sh.conf.get_int("tablesize", 10000);
796   const int firstkey = arg.sh.conf.get_int("firstkey", 0);
797   const int sched_flag = arg.sh.conf.get_int("sched", 0);
798   const int moreflds = arg.sh.conf.get_int("moreflds", 0);
799   const std::string dbname = arg.sh.conf.get_str("dbname", "hstest");
800   const std::string table = arg.sh.conf.get_str("table", "hstest_table1");
801   const std::string index = arg.sh.conf.get_str("index", "PRIMARY");
802   const std::string field = arg.sh.conf.get_str("field", "v");
803   const int use_in = arg.sh.conf.get_int("in", 0);
804   const std::string moreflds_prefix = arg.sh.conf.get_str(
805     "moreflds_prefix", "column0123456789_");
806   const int dump = arg.sh.dump;
807   const int nodup = arg.sh.conf.get_int("nodup", 0);
808   std::string moreflds_str;
809   for (int i = 0; i < moreflds; ++i) {
810     char sbuf[1024];
811     snprintf(sbuf, sizeof(sbuf), ",%s%d", moreflds_prefix.c_str(), i);
812     moreflds_str += std::string(sbuf);
813   }
814   string_buffer wbuf;
815   char rbuf[16384];
816   for (size_t i = 0; i < arg.sh.loop; ++i) {
817     int len = 0, rlen = 0, wlen = 0;
818     #if 0
819     const double tm1 = gettimeofday_double();
820     #endif
821     if (fd.get() < 0) {
822       if (socket_connect(fd, arg.sh.arg, err) != 0) {
823 	fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
824 	return;
825       }
826       char *wp = wbuf.make_space(1024);
827       len = snprintf(wp, 1024,
828 	"P\t1\t%s\t%s\tPRIMARY\t%s%s\n", dbname.c_str(), table.c_str(),
829 	  field.c_str(), moreflds_str.c_str());
830 	/* pst_num, db, table, index, retflds */
831       wbuf.space_wrote(len);
832       wlen = write(fd.get(), wbuf.begin(), len);
833       if (len != wlen) {
834 	fprintf(stderr, "write: %d %d\n", len, wlen);
835 	return;
836       }
837       wbuf.clear();
838       rlen = read(fd.get(), rbuf, sizeof(rbuf));
839       if (rlen <= 0 || rbuf[rlen - 1] != '\n') {
840 	fprintf(stderr, "read: rlen=%d errno=%d\n", rlen, errno);
841 	return;
842       }
843       if (rbuf[0] != '0') {
844 	fprintf(stderr, "failed to open table\n");
845 	return;
846       }
847       arg.sh.increment_conn(1);
848     }
849     const double tm1 = gettimeofday_double();
850     for (size_t j = 0; j < arg.sh.pipe; ++j) {
851       int k = 0, v = 0;
852       {
853 	while (true) {
854 	  double kf = 0, vf = 0;
855 	  drand48_r(&randbuf, &kf);
856 	  drand48_r(&randbuf, &vf);
857 	  k = int(kf * tablesize) + firstkey;
858 	  v = int(vf * tablesize) + firstkey;
859 	  if (k - firstkey < arg.sh.keygen_size) {
860 	    volatile char *const ptr = arg.sh.keygen + (k - firstkey);
861 	    // int oldv = __sync_fetch_and_or(ptr, 1);
862 	    int oldv = *ptr;
863 	    *ptr += 1;
864 	    if (nodup && oldv != 0) {
865 	      if (dump) {
866 		fprintf(stderr, "retry\n");
867 	      }
868 	      continue;
869 	    }
870 	  } else {
871 	    if (nodup) {
872 	      if (dump) {
873 		fprintf(stderr, "retry2\n");
874 	      }
875 	      continue;
876 	    }
877 	  }
878 	  size_t len = 0;
879 	  if (op == 'G') {
880 	    if (use_in) {
881 	      char *wp = wbuf.make_space(1024);
882 	      len = snprintf(wp, 1024, "1\t=\t1\t\t%d\t0\t@\t0\t%d\t%d",
883 		use_in, use_in, k);
884 	      wbuf.space_wrote(len);
885 	      for (int j = 1; j < use_in; ++j) {
886 		drand48_r(&randbuf, &kf);
887 		k = int(kf * tablesize) + firstkey;
888 		char *wp = wbuf.make_space(1024);
889 		len = snprintf(wp, 1024, "\t%d", k);
890 		wbuf.space_wrote(len);
891 	      }
892 	      wbuf.append_literal("\n");
893 	    } else {
894 	      char *wp = wbuf.make_space(1024);
895 	      len = snprintf(wp, 1024, "1\t=\t1\t%d\n", k);
896 	      wbuf.space_wrote(len);
897 	    }
898 	  } else if (op == 'U') {
899 	    char *wp = wbuf.make_space(1024);
900 	    len = snprintf(wp, 1024,
901 	      "1\t=\t1\t%d\t1\t0\tU\t%d_%d%s\n", k, v, k, suffix.c_str());
902 	    wbuf.space_wrote(len);
903 	  }
904 	  break;
905 	}
906       }
907     }
908     wlen = write(fd.get(), wbuf.begin(), wbuf.size());
909     if ((size_t) wlen != wbuf.size()) {
910       fprintf(stderr, "write: %d %d\n", (int)wbuf.size(), wlen);
911       return;
912     }
913     wbuf.clear();
914     size_t read_cnt = 0;
915     size_t read_pos = 0;
916     while (read_cnt < arg.sh.pipe) {
917       rlen = read(fd.get(), rbuf + read_pos, sizeof(rbuf) - read_pos);
918       if (rlen <= 0) {
919 	fprintf(stderr, "read: %d\n", rlen);
920 	return;
921       }
922       read_pos += rlen;
923       while (true) {
924 	const char *const nl = static_cast<const char *>(memchr(rbuf, '\n',
925 	  read_pos));
926 	if (nl == 0) {
927 	  break;
928 	}
929 	++read_cnt;
930 	++io_success_count;
931 	const char *t1 = static_cast<const char *>(memchr(rbuf, '\t',
932 	  nl - rbuf));
933 	if (t1 == 0) {
934 	  fprintf(stderr, "error \n");
935 	  break;
936 	}
937 	++t1;
938 	const char *t2 = static_cast<const char *>(memchr(t1, '\t',
939 	  nl - t1));
940 	if (t2 == 0) {
941 	  if (verbose > 1) {
942 	    fprintf(stderr, "key: notfound \n");
943 	  }
944 	  break;
945 	}
946 	++t2;
947 	if (t1 == rbuf + 2 && rbuf[0] == '0') {
948 	  if (op == 'G') {
949 	    ++op_success_count;
950 	    arg.sh.increment_count();
951 	  } else if (op == 'U') {
952 	    const char *t3 = t2;
953 	    while (t3 != nl && t3[0] >= 0x10) {
954 	      ++t3;
955 	    }
956 	    if (t3 != t2 + 1 || t2[0] != '1') {
957 	      const std::string mess(t2, t3);
958 	      fprintf(stderr, "mod: %s\n", mess.c_str());
959 	    } else {
960 	      ++op_success_count;
961 	      arg.sh.increment_count();
962 	      if (arg.sh.dump && arg.sh.pipe == 1) {
963 		fwrite(wbuf.begin(), wbuf.size(), 1, stderr);
964 	      }
965 	    }
966 	  }
967 	} else {
968 	  const char *t3 = t2;
969 	  while (t3 != nl && t3[0] >= 0x10) {
970 	    ++t3;
971 	  }
972 	  const std::string mess(t2, t3);
973 	  fprintf(stderr, "err: %s\n", mess.c_str());
974 	}
975 	const size_t rest_size = rbuf + read_pos - (nl + 1);
976 	if (rest_size != 0) {
977 	  memmove(rbuf, nl + 1, rest_size);
978 	}
979 	read_pos = rest_size;
980       }
981     }
982     if (!keep_connection) {
983       fd.reset();
984       arg.sh.increment_conn(-1);
985     } else if (keep_connection > 1 && ++keepconn_count > keep_connection) {
986       keepconn_count = 0;
987       fd.reset();
988       arg.sh.increment_conn(-1);
989     }
990     const double tm2 = gettimeofday_double();
991     set_timing(tm2 - tm1);
992     sleep_if();
993     if (sched_flag) {
994       sched_yield();
995     }
996   }
997   if (dump) {
998     fprintf(stderr, "done\n");
999   }
1000 }
1001 
1002 void
sleep_if()1003 hstest_thread::sleep_if()
1004 {
1005   if (arg.sh.usleep) {
1006     struct timespec ts = {
1007       arg.sh.usleep / 1000000,
1008       (arg.sh.usleep % 1000000) * 1000
1009     };
1010     nanosleep(&ts, 0);
1011   }
1012 }
1013 
1014 void
set_timing(double time_spent)1015 hstest_thread::set_timing(double time_spent)
1016 {
1017   response_min = std::min(response_min, time_spent);
1018   response_max = std::max(response_max, time_spent);
1019   response_sum += time_spent;
1020   if (op_success_count != 0) {
1021     response_avg = response_sum / op_success_count;
1022   }
1023 }
1024 
1025 void
test_11(int test_num)1026 hstest_thread::test_11(int test_num)
1027 {
1028   const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
1029   const int tablesize = arg.sh.conf.get_int("tablesize", 0);
1030   unsigned int seed = arg.id;
1031   seed ^= arg.sh.conf.get_int("seed_xor", 0);
1032   std::string err;
1033   hstcpcli_ptr cli;
1034   for (size_t i = 0; i < arg.sh.loop; ++i) {
1035     if (cli.get() == 0) {
1036       cli = hstcpcli_i::create(arg.sh.arg);
1037       cli->request_buf_open_index(0, "hstest", "hstest_table1", "", "v");
1038 	/* pst_num, db, table, index, retflds */
1039       if (cli->request_send() != 0) {
1040 	fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str());
1041 	return;
1042       }
1043       size_t num_flds = 0;
1044       if (cli->response_recv(num_flds) != 0) {
1045 	fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str());
1046 	return;
1047       }
1048       cli->response_buf_remove();
1049     }
1050     for (size_t j = 0; j < arg.sh.pipe; ++j) {
1051       char buf[256];
1052       int k = 0, v = 0, len = 0;
1053       {
1054 	k = rand_r(&seed);
1055 	v = rand_r(&seed); /* unused */
1056 	if (tablesize != 0) {
1057 	  k &= tablesize;
1058 	}
1059 	len = snprintf(buf, sizeof(buf), "%d", k);
1060       }
1061       const string_ref key(buf, len);
1062       const string_ref op("=", 1);
1063       cli->request_buf_exec_generic(0, op, &key, 1, 1, 0, string_ref(), 0, 0);
1064     }
1065     if (cli->request_send() != 0) {
1066       fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str());
1067       return;
1068     }
1069     size_t read_cnt = 0;
1070     for (size_t j = 0; j < arg.sh.pipe; ++j) {
1071       size_t num_flds = 0;
1072       if (cli->response_recv(num_flds) != 0) {
1073 	fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str());
1074 	return;
1075       }
1076       {
1077 	++read_cnt;
1078 	++io_success_count;
1079 	arg.sh.increment_count();
1080 	{
1081 	  ++op_success_count;
1082 	}
1083       }
1084       cli->response_buf_remove();
1085     }
1086     if (!keep_connection) {
1087       cli.reset();
1088     }
1089   }
1090 }
1091 
1092 void
test_watch()1093 hstest_thread::test_watch()
1094 {
1095   const int timelimit = arg.sh.conf.get_int("timelimit", 0);
1096   const int timelimit_offset = timelimit / 2;
1097   int loop = 0;
1098   double t1 = 0, t2 = 0;
1099   size_t cnt_t1 = 0, cnt_t2 = 0;
1100   size_t prev_cnt = 0;
1101   double now_f = 0;
1102   while (true) {
1103     sleep(1);
1104     const size_t cnt = arg.sh.count;
1105     const size_t df = cnt - prev_cnt;
1106     prev_cnt = cnt;
1107     const double now_prev = now_f;
1108     now_f = gettimeofday_double();
1109     if (now_prev != 0) {
1110       const double rps = static_cast<double>(df) / (now_f - now_prev);
1111       fprintf(stderr, "now: %zu cntdiff: %zu tdiff: %f rps: %f\n",
1112         static_cast<size_t>(now_f), df, now_f - now_prev, rps);
1113     }
1114     if (timelimit != 0) {
1115       if (arg.sh.wait_conn == 0 || arg.sh.conn_count >= arg.sh.wait_conn) {
1116 	++loop;
1117       }
1118       if (loop == timelimit_offset) {
1119         t1 = gettimeofday_double();
1120         cnt_t1 = cnt;
1121 	arg.sh.enable_timing = 1;
1122 	fprintf(stderr, "start timing\n");
1123       } else if (loop == timelimit_offset + timelimit) {
1124         t2 = gettimeofday_double();
1125         cnt_t2 = cnt;
1126         const size_t cnt_diff = cnt_t2 - cnt_t1;
1127         const double tdiff = t2 - t1;
1128         const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1);
1129         fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n",
1130           t1, cnt_t1, t2, cnt_t2, qps);
1131 	size_t keycnt = 0;
1132 	for (int i = 0; i < arg.sh.keygen_size; ++i) {
1133 	  if (arg.sh.keygen[i]) {
1134 	    ++keycnt;
1135 	  }
1136 	}
1137 	fprintf(stderr, "keygen=%zu\n", keycnt);
1138 	break;
1139       }
1140     }
1141   }
1142 #if 0
1143   int loop = 0;
1144   double t1 = 0, t2 = 0;
1145   size_t cnt_t1 = 0, cnt_t2 = 0;
1146   size_t prev_cnt = 0;
1147   while (true) {
1148     sleep(1);
1149     const size_t cnt = arg.sh.count;
1150     const size_t df = cnt - prev_cnt;
1151     prev_cnt = cnt;
1152     const size_t now = time(0);
1153     fprintf(stderr, "%zu %zu\n", now, df);
1154     if (timelimit != 0) {
1155       ++loop;
1156       if (loop == timelimit_offset) {
1157 	t1 = gettimeofday_double();
1158 	cnt_t1 = cnt;
1159       } else if (loop == timelimit_offset + timelimit) {
1160 	t2 = gettimeofday_double();
1161 	cnt_t2 = cnt;
1162 	const size_t cnt_diff = cnt_t2 - cnt_t1;
1163 	const double tdiff = t2 - t1;
1164 	const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1);
1165 	fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n",
1166 	  t1, cnt_t1, t2, cnt_t2, qps);
1167 	size_t keycnt = 0;
1168 	for (int i = 0; i < arg.sh.keygen_size; ++i) {
1169 	  if (arg.sh.keygen[i]) {
1170 	    ++keycnt;
1171 	  }
1172 	}
1173 	fprintf(stderr, "keygen=%zu\n", keycnt);
1174 	_exit(0);
1175       }
1176     }
1177   }
1178 #endif
1179 }
1180 
1181 void
test_12(int test_num)1182 hstest_thread::test_12(int test_num)
1183 {
1184   /* NOTE: num_threads should be 1 */
1185   /* create table hstest
1186    * ( k varchar(255) not null, v varchar(255) not null, primary key(k))
1187    * engine = innodb; */
1188   mysqltest_thread_initobj initobj;
1189   auto_mysql db;
1190   std::string err;
1191   unsigned long long err_cnt = 0;
1192   unsigned long long query_cnt = 0;
1193   #if 0
1194   my_bool reconnect = 0;
1195   if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) {
1196     err = "mysql_options() failed";
1197     ++err_cnt;
1198     return;
1199   }
1200   #endif
1201   const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
1202   const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306);
1203   const unsigned int num = arg.sh.loop;
1204   const size_t pipe = arg.sh.pipe;
1205   const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
1206   const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
1207   const std::string mysql_dbname = arg.sh.conf.get_str("db", "hstest");
1208   const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
1209   const int verbose = arg.sh.conf.get_int("verbose", 1);
1210   const int use_handler = arg.sh.conf.get_int("handler", 0);
1211   int connected = 0;
1212   unsigned int k = 0;
1213   string_buffer buf;
1214   for (unsigned int i = 0; i < num; ++i) {
1215     const int flags = 0;
1216     if (connected == 0 && !mysql_real_connect(db, mysql_host.c_str(),
1217       mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(),
1218       mysql_dbname.c_str(), mysql_port, 0, flags)) {
1219       err = "failed to connect: " + std::string(mysql_error(db));
1220       if (verbose >= 1) {
1221 	fprintf(stderr, "e=[%s]\n", err.c_str());
1222       }
1223       ++err_cnt;
1224       return;
1225     }
1226     int r = 0;
1227     if (connected == 0 && use_handler) {
1228       const char *const q = "handler hstest open";
1229       r = mysql_real_query(db, q, strlen(q));
1230       if (r != 0) {
1231 	err = 1;
1232       }
1233     }
1234     connected = 1;
1235     std::string result_str;
1236     unsigned int err = 0;
1237     unsigned int num_flds = 0, num_affected_rows = 0;
1238     int got_data = 0;
1239     buf.clear();
1240     buf.append_literal("insert into hstest values ");
1241     for (size_t j = 0; j < pipe; ++j) {
1242       const unsigned int v = ~k;
1243       if (j != 0) {
1244 	buf.append_literal(",");
1245       }
1246       char *wp = buf.make_space(64);
1247       int buf_query_len = snprintf(wp, 64, "('k%u', 'v%u')", k, v);
1248       buf.space_wrote(buf_query_len);
1249       ++k;
1250     }
1251     if (r == 0) {
1252       r = mysql_real_query(db, buf.begin(), buf.size());
1253       ++query_cnt;
1254     }
1255     if (r != 0) {
1256       err = 1;
1257     } else {
1258       auto_mysql_res res(db);
1259       if (res != 0) {
1260 	if (verbose >= 0) {
1261 	  num_flds = mysql_num_fields(res);
1262 	  MYSQL_ROW row = 0;
1263 	  while ((row = mysql_fetch_row(res)) != 0) {
1264 	    got_data = 1;
1265 	    unsigned long *const lengths = mysql_fetch_lengths(res);
1266 	    if (verbose >= 2) {
1267 	      for (unsigned int i = 0; i < num_flds; ++i) {
1268 		if (!result_str.empty()) {
1269 		  result_str += " ";
1270 		}
1271 		result_str += std::string(row[i], lengths[i]);
1272 	      }
1273 	    }
1274 	  }
1275 	}
1276       } else {
1277 	if (mysql_field_count(db) == 0) {
1278 	  num_affected_rows = mysql_affected_rows(db);
1279 	} else {
1280 	  err = 1;
1281 	}
1282       }
1283     }
1284     if (verbose >= 2 || (verbose >= 1 && err != 0)) {
1285       if (err) {
1286 	++err_cnt;
1287 	const char *const errstr = mysql_error(db);
1288 	fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr,
1289 	  num_affected_rows, std::string(buf.begin(), buf.size()).c_str());
1290       } else {
1291 	fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows,
1292 	  std::string(buf.begin(), buf.size()).c_str(),
1293 	  result_str.c_str());
1294       }
1295     }
1296     if (err == 0) {
1297       ++io_success_count;
1298       if (num_affected_rows > 0 || got_data > 0) {
1299 	++op_success_count;
1300       }
1301       arg.sh.increment_count(pipe);
1302     }
1303     if (!keep_connection) {
1304       db.reset();
1305       connected = 0;
1306     }
1307   }
1308   if (verbose >= 1) {
1309     fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt);
1310   }
1311 }
1312 
1313 void
test_21(int num)1314 hstest_thread::test_21(int num)
1315 {
1316   /* fsync test */
1317   unsigned int id = arg.id;
1318   std::string err;
1319   #if 0
1320   if (socket_connect(fd, arg.sh.arg, err) != 0) {
1321     fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
1322     return;
1323   }
1324   #endif
1325   auto_file logfd;
1326   char fname[1024];
1327   snprintf(fname, sizeof(fname), "synctest_%u", id);
1328   int open_flags = O_WRONLY | O_CREAT | O_TRUNC | O_APPEND;
1329   logfd.reset(open(fname, open_flags, 0644));
1330   if (logfd.get() < 0) {
1331     fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno));
1332     return;
1333   }
1334   char buf[1024];
1335   unsigned long long count = 0;
1336   while (true) {
1337     snprintf(buf, sizeof(buf), "%u %llu\n", id, count);
1338     const size_t len = strlen(buf);
1339     if (write(logfd.get(), buf, len) != (ssize_t)len) {
1340       fprintf(stderr, "write: %s: %d %s\n", fname, errno, strerror(errno));
1341       return;
1342     }
1343     #if 0
1344     if (write(fd.get(), buf, len) != (ssize_t)len) {
1345       fprintf(stderr, "write(sock): %d %s\n", errno, strerror(errno));
1346       return;
1347     }
1348     #endif
1349     if (fdatasync(logfd.get()) != 0) {
1350       fprintf(stderr, "fsync: %s: %d %s\n", fname, errno, strerror(errno));
1351       return;
1352     }
1353     ++count;
1354     ++op_success_count;
1355     arg.sh.increment_count();
1356   }
1357 }
1358 
1359 void
test_22(int num)1360 hstest_thread::test_22(int num)
1361 {
1362   /* dd if=/dev/zero of=dummy.dat bs=1024M count=100 */
1363   unsigned int id = arg.id;
1364   std::string err;
1365   auto_file filefd;
1366   char fname[1024];
1367   snprintf(fname, sizeof(fname), "dummy.dat");
1368   int open_flags = O_RDONLY | O_DIRECT;
1369   filefd.reset(open(fname, open_flags, 0644));
1370   if (filefd.get() < 0) {
1371     fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno));
1372     return;
1373   }
1374   char buf_x[4096 * 2];
1375   char *const buf = (char *)(size_t(buf_x + 4096) / 4096 * 4096);
1376   unsigned long long count = 0;
1377   drand48_data randbuf;
1378   unsigned long long seed = time(0);
1379   seed *= 10;
1380   seed += id;
1381   srand48_r(seed, &randbuf);
1382   for (unsigned int i = 0; i < arg.sh.loop; ++i) {
1383     double kf = 0;
1384     drand48_r(&randbuf, &kf);
1385     kf *= (209715200 / 1);
1386     // fprintf(stderr, "v=%f\n", kf);
1387     off_t v = static_cast<off_t>(kf);
1388     v %= (209715200 / 1);
1389     v *= (512 * 1);
1390     const double tm1 = gettimeofday_double();
1391     const ssize_t r = pread(filefd.get(), buf, (512 * 1), v);
1392     const double tm2 = gettimeofday_double();
1393     if (r < 0) {
1394       fprintf(stderr, "pread: %s: %d %s\n", fname, errno, strerror(errno));
1395       return;
1396     }
1397     ++count;
1398     ++op_success_count;
1399     arg.sh.increment_count();
1400     set_timing(tm2 - tm1);
1401   }
1402 }
1403 
1404 void
operator ()()1405 hstest_thread::operator ()()
1406 {
1407   if (arg.watch_flag) {
1408     return test_watch();
1409   }
1410   int test_num = arg.sh.conf.get_int("test", 1);
1411   if (test_num == 1) {
1412     test_1();
1413   } else if (test_num == 2 || test_num == 3) {
1414     test_2_3(test_num);
1415   } else if (test_num == 4 || test_num == 5) {
1416     test_4_5(test_num);
1417   } else if (test_num == 6) {
1418     test_6(test_num);
1419   } else if (test_num == 7) {
1420     test_7(test_num);
1421   } else if (test_num == 8) {
1422     test_8(test_num);
1423   } else if (test_num == 9) {
1424     test_9(test_num);
1425   } else if (test_num == 10) {
1426     test_10(test_num);
1427   } else if (test_num == 11) {
1428     test_11(test_num);
1429   } else if (test_num == 12) {
1430     test_12(test_num);
1431   } else if (test_num == 21) {
1432     test_21(test_num);
1433   } else if (test_num == 22) {
1434     test_22(test_num);
1435   }
1436   const int halt = arg.sh.conf.get_int("halt", 0);
1437   if (halt) {
1438     fprintf(stderr, "thread halted\n");
1439     while (true) {
1440       sleep(100000);
1441     }
1442   }
1443   fprintf(stderr, "thread finished\n");
1444 }
1445 
1446 int
hstest_main(int argc,char ** argv)1447 hstest_main(int argc, char **argv)
1448 {
1449   ignore_sigpipe();
1450   hstest_shared shared;
1451   parse_args(argc, argv, shared.conf);
1452   shared.conf["port"] = shared.conf["hsport"];
1453   shared.arg.set(shared.conf);
1454   shared.loop = shared.conf.get_int("num", 1000);
1455   shared.pipe = shared.conf.get_int("pipe", 1);
1456   shared.verbose = shared.conf.get_int("verbose", 1);
1457   const int tablesize = shared.conf.get_int("tablesize", 0);
1458   std::vector<char> keygen(tablesize);
1459   shared.keygen = &keygen[0];
1460   shared.keygen_size = tablesize;
1461   shared.usleep = shared.conf.get_int("usleep", 0);
1462   shared.dump = shared.conf.get_int("dump", 0);
1463   shared.num_threads = shared.conf.get_int("num_threads", 10);
1464   shared.wait_conn = shared.conf.get_int("wait_conn", 0);
1465   const std::string op = shared.conf.get_str("op", "G");
1466   if (op.size() > 0) {
1467     shared.op = op[0];
1468   }
1469   #if 0
1470   const int localdb_flag = shared.conf.get_int("local", 0);
1471   if (localdb_flag) {
1472     shared.localdb = database_i::create(shared.conf);
1473   }
1474   #endif
1475   const int num_thrs = shared.num_threads;
1476   typedef thread<hstest_thread> thread_type;
1477   typedef std::auto_ptr<thread_type> thread_ptr;
1478   typedef auto_ptrcontainer< std::vector<thread_type *> > thrs_type;
1479   thrs_type thrs;
1480   for (int i = 0; i < num_thrs; ++i) {
1481     const hstest_thread::arg_type arg(i, shared, false);
1482     thread_ptr thr(new thread<hstest_thread>(arg));
1483     thrs.push_back_ptr(thr);
1484   }
1485   for (size_t i = 0; i < thrs.size(); ++i) {
1486     thrs[i]->start();
1487   }
1488   thread_ptr watch_thread;
1489   const int timelimit = shared.conf.get_int("timelimit", 0);
1490   {
1491     const hstest_thread::arg_type arg(0, shared, true);
1492     watch_thread = thread_ptr(new thread<hstest_thread>(arg));
1493     watch_thread->start();
1494   }
1495   size_t iocnt = 0, opcnt = 0;
1496   double respmin = 999999, respmax = 0;
1497   double respsum = 0;
1498   if (timelimit != 0) {
1499     watch_thread->join();
1500   }
1501   for (size_t i = 0; i < thrs.size(); ++i) {
1502     if (timelimit == 0) {
1503       thrs[i]->join();
1504     }
1505     iocnt += (*thrs[i])->io_success_count;
1506     opcnt += (*thrs[i])->op_success_count;
1507     respmin = std::min(respmin, (*thrs[i])->response_min);
1508     respmax = std::max(respmax, (*thrs[i])->response_max);
1509     respsum += (*thrs[i])->response_sum;
1510   }
1511   fprintf(stderr, "io_success_count=%zu op_success_count=%zu\n", iocnt, opcnt);
1512   fprintf(stderr, "respmin=%f respmax=%f respsum=%f respavg=%f\n",
1513     respmin, respmax, respsum, respsum / opcnt);
1514   size_t keycnt = 0;
1515   for (size_t i = 0; i < keygen.size(); ++i) {
1516     if (keygen[i]) {
1517       ++keycnt;
1518     }
1519   }
1520   fprintf(stderr, "keycnt=%zu\n", keycnt);
1521   _exit(0);
1522   return 0;
1523 }
1524 
1525 };
1526 
1527 int
main(int argc,char ** argv)1528 main(int argc, char **argv)
1529 {
1530   return dena::hstest_main(argc, argv);
1531 }
1532 
1533