1
2 // vim:sw=2:ai
3
4 #include <signal.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <vector>
9 #include <stdlib.h>
10 #include <memory>
11 #include <errno.h>
12 #include <mysql.h>
13 #include <time.h>
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <fcntl.h>
17
18 #include "util.hpp"
19 #include "auto_ptrcontainer.hpp"
20 #include "socket.hpp"
21 #include "thread.hpp"
22 #include "hstcpcli.hpp"
23
24 #if __GNUC__ >= 4
atomic_exchange_and_add(volatile long * valp,long c)25 long atomic_exchange_and_add(volatile long *valp, long c)
26 {
27 return __sync_fetch_and_add(valp, c);
28 }
29 #else
30 #include <bits/atomicity.h>
31 using namespace __gnu_cxx;
atomic_exchange_and_add(volatile long * valp,long c)32 long atomic_exchange_and_add(volatile long *valp, long c)
33 {
34 return __exchange_and_add((volatile _Atomic_word *)valp, c);
35 }
36 #endif
37
38 namespace dena {
39
40 struct auto_mysql : private noncopyable {
auto_mysqldena::auto_mysql41 auto_mysql() : db(0) {
42 reset();
43 }
~auto_mysqldena::auto_mysql44 ~auto_mysql() {
45 if (db) {
46 mysql_close(db);
47 }
48 }
resetdena::auto_mysql49 void reset() {
50 if (db) {
51 mysql_close(db);
52 }
53 if ((db = mysql_init(0)) == 0) {
54 fatal_abort("failed to initialize mysql client");
55 }
56 }
operator MYSQL*dena::auto_mysql57 operator MYSQL *() const { return db; }
58 private:
59 MYSQL *db;
60 };
61
62 struct auto_mysql_res : private noncopyable {
auto_mysql_resdena::auto_mysql_res63 auto_mysql_res(MYSQL *db) {
64 res = mysql_store_result(db);
65 }
~auto_mysql_resdena::auto_mysql_res66 ~auto_mysql_res() {
67 if (res) {
68 mysql_free_result(res);
69 }
70 }
operator MYSQL_RES*dena::auto_mysql_res71 operator MYSQL_RES *() const { return res; }
72 private:
73 MYSQL_RES *res;
74 };
75
76 struct auto_mysql_stmt : private noncopyable {
auto_mysql_stmtdena::auto_mysql_stmt77 auto_mysql_stmt(MYSQL *db) {
78 stmt = mysql_stmt_init(db);
79 }
~auto_mysql_stmtdena::auto_mysql_stmt80 ~auto_mysql_stmt() {
81 if (stmt) {
82 mysql_stmt_close(stmt);
83 }
84 }
operator MYSQL_STMT*dena::auto_mysql_stmt85 operator MYSQL_STMT *() const { return stmt; }
86 private:
87 MYSQL_STMT *stmt;
88 };
89
90 namespace {
91
92 double
gettimeofday_double()93 gettimeofday_double()
94 {
95 struct timeval tv = { };
96 if (gettimeofday(&tv, 0) != 0) {
97 fatal_abort("gettimeofday");
98 }
99 return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
100 }
101
102 // unused
103 void
wait_close(int fd)104 wait_close(int fd)
105 {
106 char buf[1024];
107 while (true) {
108 int r = read(fd, buf, sizeof(buf));
109 if (r <= 0) {
110 break;
111 }
112 }
113 }
114
115 // unused
116 void
gentle_close(int fd)117 gentle_close(int fd)
118 {
119 int r = shutdown(fd, SHUT_WR);
120 if (r != 0) {
121 return;
122 }
123 wait_close(fd);
124 }
125
126 };
127
128 struct hstest_shared {
129 config conf;
130 socket_args arg;
131 int verbose;
132 size_t loop;
133 size_t pipe;
134 char op;
135 long num_threads;
136 mutable volatile long count;
137 mutable volatile long conn_count;
138 long wait_conn;
139 volatile char *keygen;
140 long keygen_size;
141 mutable volatile int enable_timing;
142 int usleep;
143 int dump;
hstest_shareddena::hstest_shared144 hstest_shared() : verbose(0), loop(0), pipe(0), op('G'), num_threads(0),
145 count(0), conn_count(0), wait_conn(0), keygen(0), keygen_size(0),
146 enable_timing(0), usleep(0), dump(0) { }
increment_countdena::hstest_shared147 void increment_count(unsigned int c = 1) const volatile {
148 atomic_exchange_and_add(&count, c);
149 }
increment_conndena::hstest_shared150 void increment_conn(unsigned int c) const volatile {
151 atomic_exchange_and_add(&conn_count, c);
152 while (wait_conn != 0 && conn_count < wait_conn) {
153 sleep(1);
154 }
155 // fprintf(stderr, "wait_conn=%ld done\n", wait_conn);
156 }
157 };
158
159 struct hstest_thread {
160 struct arg_type {
161 size_t id;
162 const hstest_shared& sh;
163 bool watch_flag;
arg_typedena::hstest_thread::arg_type164 arg_type(size_t i, const hstest_shared& s, bool w)
165 : id(i), sh(s), watch_flag(w) { }
166 };
hstest_threaddena::hstest_thread167 hstest_thread(const arg_type& a) : arg(a), io_success_count(0),
168 op_success_count(0), response_min(99999), response_max(0),
169 response_sum(0), response_avg(0) { }
170 void operator ()();
171 void test_1();
172 void test_2_3(int test_num);
173 void test_4_5(int test_num);
174 void test_6(int test_num);
175 void test_7(int test_num);
176 void test_8(int test_num);
177 void test_9(int test_num);
178 void test_10(int test_num);
179 void test_11(int test_num);
180 void test_12(int test_num);
181 void test_21(int test_num);
182 void test_22(int test_num);
183 void test_watch();
184 void sleep_if();
185 void set_timing(double time_spent);
186 arg_type arg;
187 auto_file fd;
188 size_t io_success_count;
189 size_t op_success_count;
190 double response_min, response_max, response_sum, response_avg;
191 };
192
193 void
test_1()194 hstest_thread::test_1()
195 {
196 char buf[1024];
197 unsigned int seed = arg.id;
198 seed ^= arg.sh.conf.get_int("seed_xor", 0);
199 std::string err;
200 if (socket_connect(fd, arg.sh.arg, err) != 0) {
201 fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
202 return;
203 }
204 const char op = arg.sh.op;
205 const int tablesize = arg.sh.conf.get_int("tablesize", 0);
206 for (size_t i = 0; i < arg.sh.loop; ++i) {
207 for (size_t j = 0; j < arg.sh.pipe; ++j) {
208 int k = 0, v = 0, len = 0;
209 if (op == 'G') {
210 k = rand_r(&seed);
211 v = rand_r(&seed); /* unused */
212 if (tablesize != 0) {
213 k &= tablesize;
214 }
215 len = snprintf(buf, sizeof(buf), "%c\tk%d\n", op, k);
216 } else {
217 k = rand_r(&seed);
218 v = rand_r(&seed);
219 if (tablesize != 0) {
220 k &= tablesize;
221 }
222 len = snprintf(buf, sizeof(buf), "%c\tk%d\tv%d\n", op, k, v);
223 }
224 const int wlen = write(fd.get(), buf, len);
225 if (wlen != len) {
226 return;
227 }
228 }
229 size_t read_cnt = 0;
230 size_t read_pos = 0;
231 while (read_cnt < arg.sh.pipe) {
232 const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos);
233 if (rlen <= 0) {
234 return;
235 }
236 read_pos += rlen;
237 while (true) {
238 const char *const p = static_cast<const char *>(memchr(buf, '\n',
239 read_pos));
240 if (p == 0) {
241 break;
242 }
243 ++read_cnt;
244 ++io_success_count;
245 arg.sh.increment_count();
246 if (p != buf && buf[0] == '=') {
247 ++op_success_count;
248 }
249 const size_t rest_size = buf + read_pos - (p + 1);
250 if (rest_size != 0) {
251 memmove(buf, p + 1, rest_size);
252 }
253 read_pos = rest_size;
254 }
255 }
256 }
257 }
258
259 void
test_2_3(int test_num)260 hstest_thread::test_2_3(int test_num)
261 {
262 #if 0
263 char buf_k[128], buf_v[128];
264 unsigned int seed = arg.id;
265 op_base_t op = static_cast<op_base_t>(arg.sh.op);
266 micli_ptr hnd;
267 if (test_num == 2) {
268 hnd = micli_i::create_remote(arg.sh.conf);
269 } else if (test_num == 3) {
270 // hnd = micli_i::create_inproc(arg.sh.localdb);
271 }
272 if (hnd.get() == 0) {
273 return;
274 }
275 for (size_t i = 0; i < arg.sh.loop; ++i) {
276 for (size_t j = 0; j < arg.sh.pipe; ++j) {
277 int k = 0, v = 0, klen = 0, vlen = 0;
278 k = rand_r(&seed);
279 klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
280 v = rand_r(&seed); /* unused */
281 vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v);
282 string_ref arr[2];
283 arr[0] = string_ref(buf_k, klen);
284 arr[1] = string_ref(buf_v, vlen);
285 pstrarr_ptr rec(arr, 2);
286 if (hnd->execute(op, 0, 0, rec.get_const())) {
287 ++io_success_count;
288 arg.sh.increment_count();
289 const dataset& res = hnd->get_result_ref();
290 if (res.size() == 1) {
291 ++op_success_count;
292 }
293 }
294 }
295 }
296 #endif
297 }
298
299 void
test_4_5(int test_num)300 hstest_thread::test_4_5(int test_num)
301 {
302 #if 0
303 char buf_k[128], buf_v[8192];
304 memset(buf_v, ' ', sizeof(buf_v));
305 unsigned int seed = arg.id;
306 op_base_t op = static_cast<op_base_t>(arg.sh.op);
307 micli_ptr hnd;
308 if (test_num == 4) {
309 hnd = micli_i::create_remote(arg.sh.conf);
310 } else if (test_num == 5) {
311 hnd = micli_i::create_inproc(arg.sh.localdb);
312 }
313 if (hnd.get() == 0) {
314 return;
315 }
316 for (size_t i = 0; i < arg.sh.loop; ++i) {
317 for (size_t j = 0; j < arg.sh.pipe; ++j) {
318 int k = 0, klen = 0, vlen = 0;
319 k = i & 0x0000ffffUL;
320 if (k == 0) {
321 fprintf(stderr, "k=0\n");
322 }
323 klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
324 vlen = rand_r(&seed) % 8192;
325 string_ref arr[2];
326 arr[0] = string_ref(buf_k, klen);
327 arr[1] = string_ref(buf_v, vlen);
328 pstrarr_ptr rec(arr, 2);
329 if (hnd->execute(op, 0, 0, rec.get_const())) {
330 ++io_success_count;
331 const dataset& res = hnd->get_result_ref();
332 if (res.size() == 1) {
333 ++op_success_count;
334 }
335 }
336 }
337 }
338 #endif
339 }
340
341 void
test_6(int test_num)342 hstest_thread::test_6(int test_num)
343 {
344 int count = arg.sh.conf.get_int("count", 1);
345 auto_file fds[count];
346 for (int i = 0; i < count; ++i) {
347 const double t1 = gettimeofday_double();
348 std::string err;
349 if (socket_connect(fds[i], arg.sh.arg, err) != 0) {
350 fprintf(stderr, "id=%zu i=%d err=%s\n", arg.id, i, err.c_str());
351 }
352 const double t2 = gettimeofday_double();
353 if (t2 - t1 > 1) {
354 fprintf(stderr, "id=%zu i=%d time %f\n", arg.id, i, t2 - t1);
355 }
356 }
357 }
358
359 void
test_7(int num)360 hstest_thread::test_7(int num)
361 {
362 /*
363 set foo 0 0 10
364 0123456789
365 STORED
366 get foo
367 VALUE foo 0 10
368 0123456789
369 END
370 get var
371 END
372 */
373 char buf[1024];
374 const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
375 unsigned int seed = arg.id;
376 seed ^= arg.sh.conf.get_int("seed_xor", 0);
377 const int tablesize = arg.sh.conf.get_int("tablesize", 0);
378 const char op = arg.sh.op;
379 for (size_t i = 0; i < arg.sh.loop; ++i) {
380 const double tm1 = gettimeofday_double();
381 std::string err;
382 if (fd.get() < 0 && socket_connect(fd, arg.sh.arg, err) != 0) {
383 fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
384 return;
385 }
386 for (size_t j = 0; j < arg.sh.pipe; ++j) {
387 int k = 0, v = 0, len = 0;
388 if (op == 'G') {
389 k = rand_r(&seed);
390 v = rand_r(&seed); /* unused */
391 if (tablesize != 0) {
392 k &= tablesize;
393 }
394 len = snprintf(buf, sizeof(buf), "get k%d\r\n", k);
395 } else {
396 k = rand_r(&seed);
397 v = rand_r(&seed);
398 if (tablesize != 0) {
399 k &= tablesize;
400 }
401 char vbuf[1024];
402 int vlen = snprintf(vbuf, sizeof(vbuf),
403 "v%d"
404 // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
405 // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
406 // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
407 // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
408 // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
409 , v);
410 len = snprintf(buf, sizeof(buf), "set k%d 0 0 %d\r\n%s\r\n",
411 k, vlen, vbuf);
412 }
413 const int wlen = write(fd.get(), buf, len);
414 if (wlen != len) {
415 return;
416 }
417 }
418 size_t read_cnt = 0;
419 size_t read_pos = 0;
420 bool read_response_done = false;
421 bool expect_value = false;
422 while (!read_response_done) {
423 const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos);
424 if (rlen <= 0) {
425 return;
426 }
427 read_pos += rlen;
428 while (true) {
429 const char *const p = static_cast<const char *>(memchr(buf, '\n',
430 read_pos));
431 if (p == 0) {
432 break;
433 }
434 ++read_cnt;
435 if (expect_value) {
436 expect_value = false;
437 } else if (p >= buf + 6 && memcmp(buf, "VALUE ", 6) == 0) {
438 expect_value = true;
439 ++op_success_count;
440 } else {
441 if (p == buf + 7 && memcmp(buf, "STORED\r", 7) == 0) {
442 ++op_success_count;
443 }
444 read_response_done = true;
445 }
446 const size_t rest_size = buf + read_pos - (p + 1);
447 if (rest_size != 0) {
448 memmove(buf, p + 1, rest_size);
449 }
450 read_pos = rest_size;
451 }
452 ++io_success_count;
453 }
454 arg.sh.increment_count();
455 if (!keep_connection) {
456 fd.close();
457 }
458 const double tm2 = gettimeofday_double();
459 set_timing(tm2 - tm1);
460 sleep_if();
461 }
462 }
463
464 struct rec {
465 std::string key;
466 std::string value;
467 };
468
469 void
test_8(int test_num)470 hstest_thread::test_8(int test_num)
471 {
472 #if 0
473 char buf_k[128], buf_v[128];
474 unsigned int seed = arg.id;
475 // op_base_t op = static_cast<op_base_t>(arg.sh.op);
476 using namespace boost::multi_index;
477 typedef member<rec, std::string, &rec::key> rec_get_key;
478 typedef ordered_unique<rec_get_key> oui;
479 typedef multi_index_container< rec, indexed_by<oui> > mic;
480 #if 0
481 typedef std::map<std::string, std::string> m_type;
482 m_type m;
483 #endif
484 mic m;
485 for (size_t i = 0; i < arg.sh.loop; ++i) {
486 for (size_t j = 0; j < arg.sh.pipe; ++j) {
487 int k = 0, v = 0, klen = 0, vlen = 0;
488 k = rand_r(&seed);
489 klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
490 v = rand_r(&seed); /* unused */
491 vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v);
492 const std::string ks(buf_k, klen);
493 const std::string vs(buf_v, vlen);
494 rec r;
495 r.key = ks;
496 r.value = vs;
497 m.insert(r);
498 // m.insert(std::make_pair(ks, vs));
499 ++io_success_count;
500 ++op_success_count;
501 arg.sh.increment_count();
502 }
503 }
504 #endif
505 }
506
507 struct mysqltest_thread_initobj : private noncopyable {
mysqltest_thread_initobjdena::mysqltest_thread_initobj508 mysqltest_thread_initobj() {
509 mysql_thread_init();
510 }
~mysqltest_thread_initobjdena::mysqltest_thread_initobj511 ~mysqltest_thread_initobj() {
512 mysql_thread_end();
513 }
514 };
515
516 void
test_9(int test_num)517 hstest_thread::test_9(int test_num)
518 {
519 /* create table hstest
520 * ( k varchar(255) not null, v varchar(255) not null, primary key(k))
521 * engine = innodb; */
522 auto_mysql db;
523 // mysqltest_thread_initobj initobj;
524 std::string err;
525 const char op = arg.sh.op;
526 const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd");
527 unsigned long long err_cnt = 0;
528 unsigned long long query_cnt = 0;
529 #if 0
530 my_bool reconnect = 0;
531 if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) {
532 err = "mysql_options() failed";
533 ++err_cnt;
534 return;
535 }
536 #endif
537 unsigned int seed = time(0) + arg.id + 1;
538 seed ^= arg.sh.conf.get_int("seed_xor", 0);
539 drand48_data randbuf;
540 srand48_r(seed, &randbuf);
541 const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
542 const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306);
543 const int num = arg.sh.loop;
544 const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
545 const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
546 const std::string mysql_dbname = arg.sh.conf.get_str("dbname", "hstest");
547 const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
548 const int verbose = arg.sh.conf.get_int("verbose", 1);
549 const int tablesize = arg.sh.conf.get_int("tablesize", 10000);
550 const int moreflds = arg.sh.conf.get_int("moreflds", 0);
551 const std::string moreflds_prefix = arg.sh.conf.get_str(
552 "moreflds_prefix", "column0123456789_");
553 const int use_handler = arg.sh.conf.get_int("handler", 0);
554 const int sched_flag = arg.sh.conf.get_int("sched", 0);
555 const int use_in = arg.sh.conf.get_int("in", 0);
556 const int ssps = use_in ? 0 : arg.sh.conf.get_int("ssps", 0);
557 std::string flds = "v";
558 for (int i = 0; i < moreflds; ++i) {
559 char buf[1024];
560 snprintf(buf, sizeof(buf), ",%s%d", moreflds_prefix.c_str(), i);
561 flds += std::string(buf);
562 }
563 int connected = 0;
564 std::auto_ptr<auto_mysql_stmt> stmt;
565 string_buffer wbuf;
566 for (int i = 0; i < num; ++i) {
567 const double tm1 = gettimeofday_double();
568 const int flags = 0;
569 if (connected == 0) {
570 if (!mysql_real_connect(db, mysql_host.c_str(),
571 mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(),
572 mysql_dbname.c_str(), mysql_port, 0, flags)) {
573 err = "failed to connect: " + std::string(mysql_error(db));
574 if (verbose >= 1) {
575 fprintf(stderr, "e=[%s]\n", err.c_str());
576 }
577 ++err_cnt;
578 return;
579 }
580 arg.sh.increment_conn(1);
581 }
582 int r = 0;
583 if (connected == 0 && use_handler) {
584 const char *const q = "handler hstest_table1 open";
585 r = mysql_real_query(db, q, strlen(q));
586 if (r != 0) {
587 err = 1;
588 }
589 }
590 if (connected == 0 && ssps) {
591 stmt.reset(new auto_mysql_stmt(db));
592 const char *const q = "select v from hstest_table1 where k = ?";
593 r = mysql_stmt_prepare(*stmt, q, strlen(q));
594 if (r != 0) {
595 fprintf(stderr, "ssps err\n");
596 ++err_cnt;
597 return;
598 }
599 }
600 connected = 1;
601 std::string result_str;
602 unsigned int err = 0;
603 unsigned int num_flds = 0, num_affected_rows = 0;
604 int got_data = 0;
605 char buf_query[16384];
606 int buf_query_len = 0;
607 int k = 0, v = 0;
608 {
609 double kf = 0, vf = 0;
610 drand48_r(&randbuf, &kf);
611 drand48_r(&randbuf, &vf);
612 k = int(kf * tablesize);
613 v = int(vf * tablesize);
614 #if 0
615 k = rand_r(&seed);
616 v = rand_r(&seed);
617 if (tablesize != 0) {
618 k %= tablesize;
619 }
620 #endif
621 if (op == 'G') {
622 if (use_handler) {
623 buf_query_len = snprintf(buf_query, sizeof(buf_query),
624 "handler hstest_table1 read `primary` = ( '%d' )", k);
625 // TODO: moreflds
626 } else if (ssps) {
627 //
628 } else if (use_in) {
629 wbuf.clear();
630 char *p = wbuf.make_space(1024);
631 int len = snprintf(p, 1024, "select %s from hstest_table1 where k in ('%d'", flds.c_str(), k);
632 wbuf.space_wrote(len);
633 for (int j = 1; j < use_in; ++j) {
634 /* generate more key */
635 drand48_r(&randbuf, &kf);
636 k = int(kf * tablesize);
637 p = wbuf.make_space(1024);
638 int len = snprintf(p, 1024, ", '%d'", k);
639 wbuf.space_wrote(len);
640 }
641 wbuf.append_literal(")");
642 } else {
643 buf_query_len = snprintf(buf_query, sizeof(buf_query),
644 "select %s from hstest_table1 where k = '%d'", flds.c_str(), k);
645 }
646 } else if (op == 'U') {
647 buf_query_len = snprintf(buf_query, sizeof(buf_query),
648 "update hstest_table1 set v = '%d_%d%s' where k = '%d'",
649 v, k, suffix.c_str(), k);
650 } else if (op == 'R') {
651 buf_query_len = snprintf(buf_query, sizeof(buf_query),
652 "replace into hstest_table1 values ('%d', 'v%d')", k, v);
653 // TODO: moreflds
654 }
655 }
656 if (r == 0) {
657 if (ssps) {
658 MYSQL_BIND bind[1] = { };
659 bind[0].buffer_type = MYSQL_TYPE_LONG;
660 bind[0].buffer = (char *)&k;
661 bind[0].is_null = 0;
662 bind[0].length = 0;
663 if (mysql_stmt_bind_param(*stmt, bind)) {
664 fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
665 ++err_cnt;
666 return;
667 }
668 r = mysql_stmt_execute(*stmt);
669 // fprintf(stderr, "stmt exec\n");
670 } else if (use_in) {
671 r = mysql_real_query(db, wbuf.begin(), wbuf.size());
672 } else {
673 r = mysql_real_query(db, buf_query, buf_query_len);
674 // fprintf(stderr, "real query\n");
675 }
676 ++query_cnt;
677 }
678 if (r != 0) {
679 err = 1;
680 } else if (ssps) {
681 if (verbose >= 0) {
682 char resbuf[1024];
683 unsigned long res_len = 0;
684 MYSQL_BIND bind[1] = { };
685 bind[0].buffer_type = MYSQL_TYPE_STRING;
686 bind[0].buffer = resbuf;
687 bind[0].buffer_length = sizeof(resbuf);
688 bind[0].length = &res_len;
689 if (mysql_stmt_bind_result(*stmt, bind)) {
690 fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
691 ++err_cnt;
692 return;
693 }
694 if (mysql_stmt_fetch(*stmt)) {
695 fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
696 ++err_cnt;
697 return;
698 }
699 if (!result_str.empty()) {
700 result_str += " ";
701 }
702 result_str += std::string(resbuf, res_len);
703 // fprintf(stderr, "SSPS RES: %s\n", result_str.c_str());
704 got_data = 1;
705 } else {
706 got_data = 1;
707 }
708 } else {
709 auto_mysql_res res(db);
710 if (res != 0) {
711 if (verbose >= 0) {
712 num_flds = mysql_num_fields(res);
713 MYSQL_ROW row = 0;
714 while ((row = mysql_fetch_row(res)) != 0) {
715 got_data += 1;
716 unsigned long *const lengths = mysql_fetch_lengths(res);
717 if (verbose >= 2) {
718 for (unsigned int i = 0; i < num_flds; ++i) {
719 if (!result_str.empty()) {
720 result_str += " ";
721 }
722 result_str += std::string(row[i], lengths[i]);
723 }
724 }
725 }
726 } else {
727 MYSQL_ROW row = 0;
728 while ((row = mysql_fetch_row(res)) != 0) {
729 got_data += 1;
730 }
731 }
732 } else {
733 if (mysql_field_count(db) == 0) {
734 num_affected_rows = mysql_affected_rows(db);
735 } else {
736 err = 1;
737 }
738 }
739 }
740 if (verbose >= 2 || (verbose >= 1 && err != 0)) {
741 if (err) {
742 ++err_cnt;
743 const char *const errstr = mysql_error(db);
744 fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr,
745 num_affected_rows, buf_query);
746 } else {
747 fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows, buf_query,
748 result_str.c_str());
749 }
750 }
751 if (err == 0) {
752 ++io_success_count;
753 if (num_affected_rows > 0 || got_data > 0) {
754 op_success_count += got_data;
755 } else {
756 if (verbose >= 1) {
757 fprintf(stderr, "k=%d numaff=%u gotdata=%d\n",
758 k, num_affected_rows, got_data);
759 }
760 }
761 arg.sh.increment_count();
762 }
763 if (!keep_connection) {
764 if (stmt.get() != 0) {
765 stmt.reset();
766 }
767 db.reset();
768 connected = 0;
769 }
770 const double tm2 = gettimeofday_double();
771 set_timing(tm2 - tm1);
772 sleep_if();
773 if (sched_flag) {
774 sched_yield();
775 }
776 }
777 if (verbose >= 1) {
778 fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt);
779 }
780 }
781
782 void
test_10(int test_num)783 hstest_thread::test_10(int test_num)
784 {
785 const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
786 unsigned int seed = time(0) + arg.id + 1;
787 seed ^= arg.sh.conf.get_int("seed_xor", 0);
788 drand48_data randbuf;
789 srand48_r(seed, &randbuf);
790 std::string err;
791 int keepconn_count = 0;
792 const char op = arg.sh.op;
793 const int verbose = arg.sh.conf.get_int("verbose", 1);
794 const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd");
795 const int tablesize = arg.sh.conf.get_int("tablesize", 10000);
796 const int firstkey = arg.sh.conf.get_int("firstkey", 0);
797 const int sched_flag = arg.sh.conf.get_int("sched", 0);
798 const int moreflds = arg.sh.conf.get_int("moreflds", 0);
799 const std::string dbname = arg.sh.conf.get_str("dbname", "hstest");
800 const std::string table = arg.sh.conf.get_str("table", "hstest_table1");
801 const std::string index = arg.sh.conf.get_str("index", "PRIMARY");
802 const std::string field = arg.sh.conf.get_str("field", "v");
803 const int use_in = arg.sh.conf.get_int("in", 0);
804 const std::string moreflds_prefix = arg.sh.conf.get_str(
805 "moreflds_prefix", "column0123456789_");
806 const int dump = arg.sh.dump;
807 const int nodup = arg.sh.conf.get_int("nodup", 0);
808 std::string moreflds_str;
809 for (int i = 0; i < moreflds; ++i) {
810 char sbuf[1024];
811 snprintf(sbuf, sizeof(sbuf), ",%s%d", moreflds_prefix.c_str(), i);
812 moreflds_str += std::string(sbuf);
813 }
814 string_buffer wbuf;
815 char rbuf[16384];
816 for (size_t i = 0; i < arg.sh.loop; ++i) {
817 int len = 0, rlen = 0, wlen = 0;
818 #if 0
819 const double tm1 = gettimeofday_double();
820 #endif
821 if (fd.get() < 0) {
822 if (socket_connect(fd, arg.sh.arg, err) != 0) {
823 fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
824 return;
825 }
826 char *wp = wbuf.make_space(1024);
827 len = snprintf(wp, 1024,
828 "P\t1\t%s\t%s\tPRIMARY\t%s%s\n", dbname.c_str(), table.c_str(),
829 field.c_str(), moreflds_str.c_str());
830 /* pst_num, db, table, index, retflds */
831 wbuf.space_wrote(len);
832 wlen = write(fd.get(), wbuf.begin(), len);
833 if (len != wlen) {
834 fprintf(stderr, "write: %d %d\n", len, wlen);
835 return;
836 }
837 wbuf.clear();
838 rlen = read(fd.get(), rbuf, sizeof(rbuf));
839 if (rlen <= 0 || rbuf[rlen - 1] != '\n') {
840 fprintf(stderr, "read: rlen=%d errno=%d\n", rlen, errno);
841 return;
842 }
843 if (rbuf[0] != '0') {
844 fprintf(stderr, "failed to open table\n");
845 return;
846 }
847 arg.sh.increment_conn(1);
848 }
849 const double tm1 = gettimeofday_double();
850 for (size_t j = 0; j < arg.sh.pipe; ++j) {
851 int k = 0, v = 0;
852 {
853 while (true) {
854 double kf = 0, vf = 0;
855 drand48_r(&randbuf, &kf);
856 drand48_r(&randbuf, &vf);
857 k = int(kf * tablesize) + firstkey;
858 v = int(vf * tablesize) + firstkey;
859 if (k - firstkey < arg.sh.keygen_size) {
860 volatile char *const ptr = arg.sh.keygen + (k - firstkey);
861 // int oldv = __sync_fetch_and_or(ptr, 1);
862 int oldv = *ptr;
863 *ptr += 1;
864 if (nodup && oldv != 0) {
865 if (dump) {
866 fprintf(stderr, "retry\n");
867 }
868 continue;
869 }
870 } else {
871 if (nodup) {
872 if (dump) {
873 fprintf(stderr, "retry2\n");
874 }
875 continue;
876 }
877 }
878 size_t len = 0;
879 if (op == 'G') {
880 if (use_in) {
881 char *wp = wbuf.make_space(1024);
882 len = snprintf(wp, 1024, "1\t=\t1\t\t%d\t0\t@\t0\t%d\t%d",
883 use_in, use_in, k);
884 wbuf.space_wrote(len);
885 for (int j = 1; j < use_in; ++j) {
886 drand48_r(&randbuf, &kf);
887 k = int(kf * tablesize) + firstkey;
888 char *wp = wbuf.make_space(1024);
889 len = snprintf(wp, 1024, "\t%d", k);
890 wbuf.space_wrote(len);
891 }
892 wbuf.append_literal("\n");
893 } else {
894 char *wp = wbuf.make_space(1024);
895 len = snprintf(wp, 1024, "1\t=\t1\t%d\n", k);
896 wbuf.space_wrote(len);
897 }
898 } else if (op == 'U') {
899 char *wp = wbuf.make_space(1024);
900 len = snprintf(wp, 1024,
901 "1\t=\t1\t%d\t1\t0\tU\t%d_%d%s\n", k, v, k, suffix.c_str());
902 wbuf.space_wrote(len);
903 }
904 break;
905 }
906 }
907 }
908 wlen = write(fd.get(), wbuf.begin(), wbuf.size());
909 if (wlen != wbuf.size()) {
910 fprintf(stderr, "write: %d %d\n", (int)wbuf.size(), wlen);
911 return;
912 }
913 wbuf.clear();
914 size_t read_cnt = 0;
915 size_t read_pos = 0;
916 while (read_cnt < arg.sh.pipe) {
917 rlen = read(fd.get(), rbuf + read_pos, sizeof(rbuf) - read_pos);
918 if (rlen <= 0) {
919 fprintf(stderr, "read: %d\n", rlen);
920 return;
921 }
922 read_pos += rlen;
923 while (true) {
924 const char *const nl = static_cast<const char *>(memchr(rbuf, '\n',
925 read_pos));
926 if (nl == 0) {
927 break;
928 }
929 ++read_cnt;
930 ++io_success_count;
931 const char *t1 = static_cast<const char *>(memchr(rbuf, '\t',
932 nl - rbuf));
933 if (t1 == 0) {
934 fprintf(stderr, "error \n");
935 break;
936 }
937 ++t1;
938 const char *t2 = static_cast<const char *>(memchr(t1, '\t',
939 nl - t1));
940 if (t2 == 0) {
941 if (verbose > 1) {
942 fprintf(stderr, "key: notfound \n");
943 }
944 break;
945 }
946 ++t2;
947 if (t1 == rbuf + 2 && rbuf[0] == '0') {
948 if (op == 'G') {
949 ++op_success_count;
950 arg.sh.increment_count();
951 } else if (op == 'U') {
952 const char *t3 = t2;
953 while (t3 != nl && t3[0] >= 0x10) {
954 ++t3;
955 }
956 if (t3 != t2 + 1 || t2[0] != '1') {
957 const std::string mess(t2, t3);
958 fprintf(stderr, "mod: %s\n", mess.c_str());
959 } else {
960 ++op_success_count;
961 arg.sh.increment_count();
962 if (arg.sh.dump && arg.sh.pipe == 1) {
963 fwrite(wbuf.begin(), wbuf.size(), 1, stderr);
964 }
965 }
966 }
967 } else {
968 const char *t3 = t2;
969 while (t3 != nl && t3[0] >= 0x10) {
970 ++t3;
971 }
972 const std::string mess(t2, t3);
973 fprintf(stderr, "err: %s\n", mess.c_str());
974 }
975 const size_t rest_size = rbuf + read_pos - (nl + 1);
976 if (rest_size != 0) {
977 memmove(rbuf, nl + 1, rest_size);
978 }
979 read_pos = rest_size;
980 }
981 }
982 if (!keep_connection) {
983 fd.reset();
984 arg.sh.increment_conn(-1);
985 } else if (keep_connection > 1 && ++keepconn_count > keep_connection) {
986 keepconn_count = 0;
987 fd.reset();
988 arg.sh.increment_conn(-1);
989 }
990 const double tm2 = gettimeofday_double();
991 set_timing(tm2 - tm1);
992 sleep_if();
993 if (sched_flag) {
994 sched_yield();
995 }
996 }
997 if (dump) {
998 fprintf(stderr, "done\n");
999 }
1000 }
1001
1002 void
sleep_if()1003 hstest_thread::sleep_if()
1004 {
1005 if (arg.sh.usleep) {
1006 struct timespec ts = {
1007 arg.sh.usleep / 1000000,
1008 (arg.sh.usleep % 1000000) * 1000
1009 };
1010 nanosleep(&ts, 0);
1011 }
1012 }
1013
1014 void
set_timing(double time_spent)1015 hstest_thread::set_timing(double time_spent)
1016 {
1017 response_min = std::min(response_min, time_spent);
1018 response_max = std::max(response_max, time_spent);
1019 response_sum += time_spent;
1020 if (op_success_count != 0) {
1021 response_avg = response_sum / op_success_count;
1022 }
1023 }
1024
1025 void
test_11(int test_num)1026 hstest_thread::test_11(int test_num)
1027 {
1028 const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
1029 const int tablesize = arg.sh.conf.get_int("tablesize", 0);
1030 unsigned int seed = arg.id;
1031 seed ^= arg.sh.conf.get_int("seed_xor", 0);
1032 std::string err;
1033 hstcpcli_ptr cli;
1034 for (size_t i = 0; i < arg.sh.loop; ++i) {
1035 if (cli.get() == 0) {
1036 cli = hstcpcli_i::create(arg.sh.arg);
1037 cli->request_buf_open_index(0, "hstest", "hstest_table1", "", "v");
1038 /* pst_num, db, table, index, retflds */
1039 if (cli->request_send() != 0) {
1040 fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str());
1041 return;
1042 }
1043 size_t num_flds = 0;
1044 if (cli->response_recv(num_flds) != 0) {
1045 fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str());
1046 return;
1047 }
1048 cli->response_buf_remove();
1049 }
1050 for (size_t j = 0; j < arg.sh.pipe; ++j) {
1051 char buf[256];
1052 int k = 0, v = 0, len = 0;
1053 {
1054 k = rand_r(&seed);
1055 v = rand_r(&seed); /* unused */
1056 if (tablesize != 0) {
1057 k &= tablesize;
1058 }
1059 len = snprintf(buf, sizeof(buf), "%d", k);
1060 }
1061 const string_ref key(buf, len);
1062 const string_ref op("=", 1);
1063 cli->request_buf_exec_generic(0, op, &key, 1, 1, 0, string_ref(), 0, 0);
1064 }
1065 if (cli->request_send() != 0) {
1066 fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str());
1067 return;
1068 }
1069 size_t read_cnt = 0;
1070 for (size_t j = 0; j < arg.sh.pipe; ++j) {
1071 size_t num_flds = 0;
1072 if (cli->response_recv(num_flds) != 0) {
1073 fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str());
1074 return;
1075 }
1076 {
1077 ++read_cnt;
1078 ++io_success_count;
1079 arg.sh.increment_count();
1080 {
1081 ++op_success_count;
1082 }
1083 }
1084 cli->response_buf_remove();
1085 }
1086 if (!keep_connection) {
1087 cli.reset();
1088 }
1089 }
1090 }
1091
1092 void
test_watch()1093 hstest_thread::test_watch()
1094 {
1095 const int timelimit = arg.sh.conf.get_int("timelimit", 0);
1096 const int timelimit_offset = timelimit / 2;
1097 int loop = 0;
1098 double t1 = 0, t2 = 0;
1099 size_t cnt_t1 = 0, cnt_t2 = 0;
1100 size_t prev_cnt = 0;
1101 double now_f = 0;
1102 while (true) {
1103 sleep(1);
1104 const size_t cnt = arg.sh.count;
1105 const size_t df = cnt - prev_cnt;
1106 prev_cnt = cnt;
1107 const double now_prev = now_f;
1108 now_f = gettimeofday_double();
1109 if (now_prev != 0) {
1110 const double rps = static_cast<double>(df) / (now_f - now_prev);
1111 fprintf(stderr, "now: %zu cntdiff: %zu tdiff: %f rps: %f\n",
1112 static_cast<size_t>(now_f), df, now_f - now_prev, rps);
1113 }
1114 if (timelimit != 0) {
1115 if (arg.sh.wait_conn == 0 || arg.sh.conn_count >= arg.sh.wait_conn) {
1116 ++loop;
1117 }
1118 if (loop == timelimit_offset) {
1119 t1 = gettimeofday_double();
1120 cnt_t1 = cnt;
1121 arg.sh.enable_timing = 1;
1122 fprintf(stderr, "start timing\n");
1123 } else if (loop == timelimit_offset + timelimit) {
1124 t2 = gettimeofday_double();
1125 cnt_t2 = cnt;
1126 const size_t cnt_diff = cnt_t2 - cnt_t1;
1127 const double tdiff = t2 - t1;
1128 const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1);
1129 fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n",
1130 t1, cnt_t1, t2, cnt_t2, qps);
1131 size_t keycnt = 0;
1132 for (int i = 0; i < arg.sh.keygen_size; ++i) {
1133 if (arg.sh.keygen[i]) {
1134 ++keycnt;
1135 }
1136 }
1137 fprintf(stderr, "keygen=%zu\n", keycnt);
1138 break;
1139 }
1140 }
1141 }
1142 #if 0
1143 int loop = 0;
1144 double t1 = 0, t2 = 0;
1145 size_t cnt_t1 = 0, cnt_t2 = 0;
1146 size_t prev_cnt = 0;
1147 while (true) {
1148 sleep(1);
1149 const size_t cnt = arg.sh.count;
1150 const size_t df = cnt - prev_cnt;
1151 prev_cnt = cnt;
1152 const size_t now = time(0);
1153 fprintf(stderr, "%zu %zu\n", now, df);
1154 if (timelimit != 0) {
1155 ++loop;
1156 if (loop == timelimit_offset) {
1157 t1 = gettimeofday_double();
1158 cnt_t1 = cnt;
1159 } else if (loop == timelimit_offset + timelimit) {
1160 t2 = gettimeofday_double();
1161 cnt_t2 = cnt;
1162 const size_t cnt_diff = cnt_t2 - cnt_t1;
1163 const double tdiff = t2 - t1;
1164 const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1);
1165 fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n",
1166 t1, cnt_t1, t2, cnt_t2, qps);
1167 size_t keycnt = 0;
1168 for (int i = 0; i < arg.sh.keygen_size; ++i) {
1169 if (arg.sh.keygen[i]) {
1170 ++keycnt;
1171 }
1172 }
1173 fprintf(stderr, "keygen=%zu\n", keycnt);
1174 _exit(0);
1175 }
1176 }
1177 }
1178 #endif
1179 }
1180
1181 void
test_12(int test_num)1182 hstest_thread::test_12(int test_num)
1183 {
1184 /* NOTE: num_threads should be 1 */
1185 /* create table hstest
1186 * ( k varchar(255) not null, v varchar(255) not null, primary key(k))
1187 * engine = innodb; */
1188 mysqltest_thread_initobj initobj;
1189 auto_mysql db;
1190 std::string err;
1191 unsigned long long err_cnt = 0;
1192 unsigned long long query_cnt = 0;
1193 #if 0
1194 my_bool reconnect = 0;
1195 if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) {
1196 err = "mysql_options() failed";
1197 ++err_cnt;
1198 return;
1199 }
1200 #endif
1201 const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
1202 const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306);
1203 const unsigned int num = arg.sh.loop;
1204 const size_t pipe = arg.sh.pipe;
1205 const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
1206 const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
1207 const std::string mysql_dbname = arg.sh.conf.get_str("db", "hstest");
1208 const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
1209 const int verbose = arg.sh.conf.get_int("verbose", 1);
1210 const int use_handler = arg.sh.conf.get_int("handler", 0);
1211 int connected = 0;
1212 unsigned int k = 0;
1213 string_buffer buf;
1214 for (unsigned int i = 0; i < num; ++i) {
1215 const int flags = 0;
1216 if (connected == 0 && !mysql_real_connect(db, mysql_host.c_str(),
1217 mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(),
1218 mysql_dbname.c_str(), mysql_port, 0, flags)) {
1219 err = "failed to connect: " + std::string(mysql_error(db));
1220 if (verbose >= 1) {
1221 fprintf(stderr, "e=[%s]\n", err.c_str());
1222 }
1223 ++err_cnt;
1224 return;
1225 }
1226 int r = 0;
1227 if (connected == 0 && use_handler) {
1228 const char *const q = "handler hstest open";
1229 r = mysql_real_query(db, q, strlen(q));
1230 if (r != 0) {
1231 err = 1;
1232 }
1233 }
1234 connected = 1;
1235 std::string result_str;
1236 unsigned int err = 0;
1237 unsigned int num_flds = 0, num_affected_rows = 0;
1238 int got_data = 0;
1239 buf.clear();
1240 buf.append_literal("insert into hstest values ");
1241 for (size_t j = 0; j < pipe; ++j) {
1242 const unsigned int v = ~k;
1243 if (j != 0) {
1244 buf.append_literal(",");
1245 }
1246 char *wp = buf.make_space(64);
1247 int buf_query_len = snprintf(wp, 64, "('k%u', 'v%u')", k, v);
1248 buf.space_wrote(buf_query_len);
1249 ++k;
1250 }
1251 if (r == 0) {
1252 r = mysql_real_query(db, buf.begin(), buf.size());
1253 ++query_cnt;
1254 }
1255 if (r != 0) {
1256 err = 1;
1257 } else {
1258 auto_mysql_res res(db);
1259 if (res != 0) {
1260 if (verbose >= 0) {
1261 num_flds = mysql_num_fields(res);
1262 MYSQL_ROW row = 0;
1263 while ((row = mysql_fetch_row(res)) != 0) {
1264 got_data = 1;
1265 unsigned long *const lengths = mysql_fetch_lengths(res);
1266 if (verbose >= 2) {
1267 for (unsigned int i = 0; i < num_flds; ++i) {
1268 if (!result_str.empty()) {
1269 result_str += " ";
1270 }
1271 result_str += std::string(row[i], lengths[i]);
1272 }
1273 }
1274 }
1275 }
1276 } else {
1277 if (mysql_field_count(db) == 0) {
1278 num_affected_rows = mysql_affected_rows(db);
1279 } else {
1280 err = 1;
1281 }
1282 }
1283 }
1284 if (verbose >= 2 || (verbose >= 1 && err != 0)) {
1285 if (err) {
1286 ++err_cnt;
1287 const char *const errstr = mysql_error(db);
1288 fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr,
1289 num_affected_rows, std::string(buf.begin(), buf.size()).c_str());
1290 } else {
1291 fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows,
1292 std::string(buf.begin(), buf.size()).c_str(),
1293 result_str.c_str());
1294 }
1295 }
1296 if (err == 0) {
1297 ++io_success_count;
1298 if (num_affected_rows > 0 || got_data > 0) {
1299 ++op_success_count;
1300 }
1301 arg.sh.increment_count(pipe);
1302 }
1303 if (!keep_connection) {
1304 db.reset();
1305 connected = 0;
1306 }
1307 }
1308 if (verbose >= 1) {
1309 fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt);
1310 }
1311 }
1312
1313 void
test_21(int num)1314 hstest_thread::test_21(int num)
1315 {
1316 /* fsync test */
1317 unsigned int id = arg.id;
1318 std::string err;
1319 #if 0
1320 if (socket_connect(fd, arg.sh.arg, err) != 0) {
1321 fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
1322 return;
1323 }
1324 #endif
1325 auto_file logfd;
1326 char fname[1024];
1327 snprintf(fname, sizeof(fname), "synctest_%u", id);
1328 int open_flags = O_WRONLY | O_CREAT | O_TRUNC | O_APPEND;
1329 logfd.reset(open(fname, open_flags, 0644));
1330 if (logfd.get() < 0) {
1331 fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno));
1332 return;
1333 }
1334 char buf[1024];
1335 unsigned long long count = 0;
1336 while (true) {
1337 snprintf(buf, sizeof(buf), "%u %llu\n", id, count);
1338 const size_t len = strlen(buf);
1339 if (write(logfd.get(), buf, len) != (ssize_t)len) {
1340 fprintf(stderr, "write: %s: %d %s\n", fname, errno, strerror(errno));
1341 return;
1342 }
1343 #if 0
1344 if (write(fd.get(), buf, len) != (ssize_t)len) {
1345 fprintf(stderr, "write(sock): %d %s\n", errno, strerror(errno));
1346 return;
1347 }
1348 #endif
1349 if (fdatasync(logfd.get()) != 0) {
1350 fprintf(stderr, "fsync: %s: %d %s\n", fname, errno, strerror(errno));
1351 return;
1352 }
1353 ++count;
1354 ++op_success_count;
1355 arg.sh.increment_count();
1356 }
1357 }
1358
1359 void
test_22(int num)1360 hstest_thread::test_22(int num)
1361 {
1362 /* dd if=/dev/zero of=dummy.dat bs=1024M count=100 */
1363 unsigned int id = arg.id;
1364 std::string err;
1365 auto_file filefd;
1366 char fname[1024];
1367 snprintf(fname, sizeof(fname), "dummy.dat");
1368 int open_flags = O_RDONLY | O_DIRECT;
1369 filefd.reset(open(fname, open_flags, 0644));
1370 if (filefd.get() < 0) {
1371 fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno));
1372 return;
1373 }
1374 char buf_x[4096 * 2];
1375 char *const buf = (char *)(size_t(buf_x + 4096) / 4096 * 4096);
1376 unsigned long long count = 0;
1377 drand48_data randbuf;
1378 unsigned long long seed = time(0);
1379 seed *= 10;
1380 seed += id;
1381 srand48_r(seed, &randbuf);
1382 for (unsigned int i = 0; i < arg.sh.loop; ++i) {
1383 double kf = 0;
1384 drand48_r(&randbuf, &kf);
1385 kf *= (209715200 / 1);
1386 // fprintf(stderr, "v=%f\n", kf);
1387 off_t v = static_cast<off_t>(kf);
1388 v %= (209715200 / 1);
1389 v *= (512 * 1);
1390 const double tm1 = gettimeofday_double();
1391 const ssize_t r = pread(filefd.get(), buf, (512 * 1), v);
1392 const double tm2 = gettimeofday_double();
1393 if (r < 0) {
1394 fprintf(stderr, "pread: %s: %d %s\n", fname, errno, strerror(errno));
1395 return;
1396 }
1397 ++count;
1398 ++op_success_count;
1399 arg.sh.increment_count();
1400 set_timing(tm2 - tm1);
1401 }
1402 }
1403
1404 void
operator ()()1405 hstest_thread::operator ()()
1406 {
1407 if (arg.watch_flag) {
1408 return test_watch();
1409 }
1410 int test_num = arg.sh.conf.get_int("test", 1);
1411 if (test_num == 1) {
1412 test_1();
1413 } else if (test_num == 2 || test_num == 3) {
1414 test_2_3(test_num);
1415 } else if (test_num == 4 || test_num == 5) {
1416 test_4_5(test_num);
1417 } else if (test_num == 6) {
1418 test_6(test_num);
1419 } else if (test_num == 7) {
1420 test_7(test_num);
1421 } else if (test_num == 8) {
1422 test_8(test_num);
1423 } else if (test_num == 9) {
1424 test_9(test_num);
1425 } else if (test_num == 10) {
1426 test_10(test_num);
1427 } else if (test_num == 11) {
1428 test_11(test_num);
1429 } else if (test_num == 12) {
1430 test_12(test_num);
1431 } else if (test_num == 21) {
1432 test_21(test_num);
1433 } else if (test_num == 22) {
1434 test_22(test_num);
1435 }
1436 const int halt = arg.sh.conf.get_int("halt", 0);
1437 if (halt) {
1438 fprintf(stderr, "thread halted\n");
1439 while (true) {
1440 sleep(100000);
1441 }
1442 }
1443 fprintf(stderr, "thread finished\n");
1444 }
1445
1446 int
hstest_main(int argc,char ** argv)1447 hstest_main(int argc, char **argv)
1448 {
1449 ignore_sigpipe();
1450 hstest_shared shared;
1451 parse_args(argc, argv, shared.conf);
1452 shared.conf["port"] = shared.conf["hsport"];
1453 shared.arg.set(shared.conf);
1454 shared.loop = shared.conf.get_int("num", 1000);
1455 shared.pipe = shared.conf.get_int("pipe", 1);
1456 shared.verbose = shared.conf.get_int("verbose", 1);
1457 const int tablesize = shared.conf.get_int("tablesize", 0);
1458 std::vector<char> keygen(tablesize);
1459 shared.keygen = &keygen[0];
1460 shared.keygen_size = tablesize;
1461 shared.usleep = shared.conf.get_int("usleep", 0);
1462 shared.dump = shared.conf.get_int("dump", 0);
1463 shared.num_threads = shared.conf.get_int("num_threads", 10);
1464 shared.wait_conn = shared.conf.get_int("wait_conn", 0);
1465 const std::string op = shared.conf.get_str("op", "G");
1466 if (op.size() > 0) {
1467 shared.op = op[0];
1468 }
1469 #if 0
1470 const int localdb_flag = shared.conf.get_int("local", 0);
1471 if (localdb_flag) {
1472 shared.localdb = database_i::create(shared.conf);
1473 }
1474 #endif
1475 const int num_thrs = shared.num_threads;
1476 typedef thread<hstest_thread> thread_type;
1477 typedef std::auto_ptr<thread_type> thread_ptr;
1478 typedef auto_ptrcontainer< std::vector<thread_type *> > thrs_type;
1479 thrs_type thrs;
1480 for (int i = 0; i < num_thrs; ++i) {
1481 const hstest_thread::arg_type arg(i, shared, false);
1482 thread_ptr thr(new thread<hstest_thread>(arg));
1483 thrs.push_back_ptr(thr);
1484 }
1485 for (size_t i = 0; i < thrs.size(); ++i) {
1486 thrs[i]->start();
1487 }
1488 thread_ptr watch_thread;
1489 const int timelimit = shared.conf.get_int("timelimit", 0);
1490 {
1491 const hstest_thread::arg_type arg(0, shared, true);
1492 watch_thread = thread_ptr(new thread<hstest_thread>(arg));
1493 watch_thread->start();
1494 }
1495 size_t iocnt = 0, opcnt = 0;
1496 double respmin = 999999, respmax = 0;
1497 double respsum = 0;
1498 if (timelimit != 0) {
1499 watch_thread->join();
1500 }
1501 for (size_t i = 0; i < thrs.size(); ++i) {
1502 if (timelimit == 0) {
1503 thrs[i]->join();
1504 }
1505 iocnt += (*thrs[i])->io_success_count;
1506 opcnt += (*thrs[i])->op_success_count;
1507 respmin = std::min(respmin, (*thrs[i])->response_min);
1508 respmax = std::max(respmax, (*thrs[i])->response_max);
1509 respsum += (*thrs[i])->response_sum;
1510 }
1511 fprintf(stderr, "io_success_count=%zu op_success_count=%zu\n", iocnt, opcnt);
1512 fprintf(stderr, "respmin=%f respmax=%f respsum=%f respavg=%f\n",
1513 respmin, respmax, respsum, respsum / opcnt);
1514 size_t keycnt = 0;
1515 for (size_t i = 0; i < keygen.size(); ++i) {
1516 if (keygen[i]) {
1517 ++keycnt;
1518 }
1519 }
1520 fprintf(stderr, "keycnt=%zu\n", keycnt);
1521 _exit(0);
1522 return 0;
1523 }
1524
1525 };
1526
1527 int
main(int argc,char ** argv)1528 main(int argc, char **argv)
1529 {
1530 return dena::hstest_main(argc, argv);
1531 }
1532
1533