1
2 // vim:sw=2:ai
3
4 #include <signal.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <vector>
9 #include <map>
10 #include <stdlib.h>
11 #include <memory>
12 #include <errno.h>
13 #include <mysql.h>
14 #include <time.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include "util.hpp"
20 #include "auto_ptrcontainer.hpp"
21 #include "socket.hpp"
22 #include "hstcpcli.hpp"
23 #include "string_util.hpp"
24 #include "mutex.hpp"
25
26 namespace dena {
27
28 struct auto_mysql : private noncopyable {
auto_mysqldena::auto_mysql29 auto_mysql() : db(0) {
30 reset();
31 }
~auto_mysqldena::auto_mysql32 ~auto_mysql() {
33 if (db) {
34 mysql_close(db);
35 }
36 }
resetdena::auto_mysql37 void reset() {
38 if (db) {
39 mysql_close(db);
40 }
41 if ((db = mysql_init(0)) == 0) {
42 fatal_exit("failed to initialize mysql client");
43 }
44 }
operator MYSQL*dena::auto_mysql45 operator MYSQL *() const { return db; }
46 private:
47 MYSQL *db;
48 };
49
50 struct auto_mysql_res : private noncopyable {
auto_mysql_resdena::auto_mysql_res51 auto_mysql_res(MYSQL *db) {
52 res = mysql_store_result(db);
53 }
~auto_mysql_resdena::auto_mysql_res54 ~auto_mysql_res() {
55 if (res) {
56 mysql_free_result(res);
57 }
58 }
operator MYSQL_RES*dena::auto_mysql_res59 operator MYSQL_RES *() const { return res; }
60 private:
61 MYSQL_RES *res;
62 };
63
64 struct auto_mysql_stmt : private noncopyable {
auto_mysql_stmtdena::auto_mysql_stmt65 auto_mysql_stmt(MYSQL *db) {
66 stmt = mysql_stmt_init(db);
67 }
~auto_mysql_stmtdena::auto_mysql_stmt68 ~auto_mysql_stmt() {
69 if (stmt) {
70 mysql_stmt_close(stmt);
71 }
72 }
operator MYSQL_STMT*dena::auto_mysql_stmt73 operator MYSQL_STMT *() const { return stmt; }
74 private:
75 MYSQL_STMT *stmt;
76 };
77
78 double
gettimeofday_double()79 gettimeofday_double()
80 {
81 struct timeval tv = { };
82 if (gettimeofday(&tv, 0) != 0) {
83 fatal_abort("gettimeofday");
84 }
85 return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
86 }
87
88 struct record_value {
89 mutex lock;
90 bool deleted;
91 bool unknown_state;
92 std::string key;
93 std::vector<std::string> values;
record_valuedena::record_value94 record_value() : deleted(true), unknown_state(false) { }
95 };
96
97 struct hs_longrun_shared {
98 config conf;
99 socket_args arg;
100 int verbose;
101 long num_threads;
102 int usleep;
103 volatile mutable int running;
104 auto_ptrcontainer< std::vector<record_value *> > records;
hs_longrun_shareddena::hs_longrun_shared105 hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { }
106 };
107
108 struct thread_base {
thread_basedena::thread_base109 thread_base() : need_join(false), stack_size(256 * 1024) { }
~thread_basedena::thread_base110 virtual ~thread_base() {
111 join();
112 }
113 virtual void run() = 0;
startdena::thread_base114 void start() {
115 if (!start_nothrow()) {
116 fatal_abort("thread::start");
117 }
118 }
start_nothrowdena::thread_base119 bool start_nothrow() {
120 if (need_join) {
121 return need_join; /* true */
122 }
123 void *const arg = this;
124 pthread_attr_t attr;
125 if (pthread_attr_init(&attr) != 0) {
126 fatal_abort("pthread_attr_init");
127 }
128 if (pthread_attr_setstacksize(&attr, stack_size) != 0) {
129 fatal_abort("pthread_attr_setstacksize");
130 }
131 const int r = pthread_create(&thr, &attr, thread_main, arg);
132 if (pthread_attr_destroy(&attr) != 0) {
133 fatal_abort("pthread_attr_destroy");
134 }
135 if (r != 0) {
136 return need_join; /* false */
137 }
138 need_join = true;
139 return need_join; /* true */
140 }
joindena::thread_base141 void join() {
142 if (!need_join) {
143 return;
144 }
145 int e = 0;
146 if ((e = pthread_join(thr, 0)) != 0) {
147 fatal_abort("pthread_join");
148 }
149 need_join = false;
150 }
151 private:
thread_maindena::thread_base152 static void *thread_main(void *arg) {
153 thread_base *p = static_cast<thread_base *>(arg);
154 p->run();
155 return 0;
156 }
157 private:
158 pthread_t thr;
159 bool need_join;
160 size_t stack_size;
161 };
162
163 struct hs_longrun_stat {
164 unsigned long long verify_error_count;
165 unsigned long long runtime_error_count;
166 unsigned long long unknown_count;
167 unsigned long long success_count;
hs_longrun_statdena::hs_longrun_stat168 hs_longrun_stat()
169 : verify_error_count(0), runtime_error_count(0),
170 unknown_count(0), success_count(0) { }
adddena::hs_longrun_stat171 void add(const hs_longrun_stat& x) {
172 verify_error_count += x.verify_error_count;
173 runtime_error_count += x.runtime_error_count;
174 unknown_count += x.unknown_count;
175 success_count += x.success_count;
176 }
177 };
178
179 struct hs_longrun_thread_base : public thread_base {
180 struct arg_type {
181 int id;
182 std::string worker_type;
183 char op;
184 int lock_flag;
185 const hs_longrun_shared& sh;
arg_typedena::hs_longrun_thread_base::arg_type186 arg_type(int id, const std::string& worker_type, char op, int lock_flag,
187 const hs_longrun_shared& sh)
188 : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag),
189 sh(sh) { }
190 };
191 arg_type arg;
192 hs_longrun_stat stat;
193 drand48_data randbuf;
194 unsigned int seed;
hs_longrun_thread_basedena::hs_longrun_thread_base195 hs_longrun_thread_base(const arg_type& arg)
196 : arg(arg), seed(0) {
197 seed = time(0) + arg.id + 1;
198 srand48_r(seed, &randbuf);
199 }
~hs_longrun_thread_basedena::hs_longrun_thread_base200 virtual ~hs_longrun_thread_base() { }
201 virtual void run() = 0;
rand_recorddena::hs_longrun_thread_base202 size_t rand_record() {
203 double v = 0;
204 drand48_r(&randbuf, &v);
205 const size_t sz = arg.sh.records.size();
206 size_t r = size_t(v * sz);
207 if (r >= sz) {
208 r = 0;
209 }
210 return r;
211 }
212 int verify_update(const std::string& k, const std::string& v1,
213 const std::string& v2, const std::string& v3, record_value& rec,
214 uint32_t num_rows, bool cur_unknown_state);
215 int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds,
216 const std::string rrec[4], record_value& rec);
217 int verify_readnolock(const std::string& k, uint32_t num_rows,
218 uint32_t num_flds, const std::string rrec[4]);
219 };
220
221 int
verify_update(const std::string & k,const std::string & v1,const std::string & v2,const std::string & v3,record_value & rec,uint32_t num_rows,bool cur_unknown_state)222 hs_longrun_thread_base::verify_update(const std::string& k,
223 const std::string& v1, const std::string& v2, const std::string& v3,
224 record_value& rec, uint32_t num_rows, bool cur_unknown_state)
225 {
226 const bool op_success = num_rows == 1;
227 int ret = 0;
228 if (!rec.unknown_state) {
229 if (!rec.deleted && !op_success) {
230 ++stat.verify_error_count;
231 if (arg.sh.verbose > 0) {
232 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
233 "unexpected_update_failure\n",
234 arg.worker_type.c_str(), arg.id, k.c_str());
235 }
236 ret = 1;
237 } else if (rec.deleted && op_success) {
238 ++stat.verify_error_count;
239 if (arg.sh.verbose > 0) {
240 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
241 "unexpected_update_success\n",
242 arg.worker_type.c_str(), arg.id, k.c_str());
243 }
244 ret = 1;
245 }
246 }
247 if (op_success) {
248 rec.values.resize(4);
249 rec.values[0] = k;
250 rec.values[1] = v1;
251 rec.values[2] = v2;
252 rec.values[3] = v3;
253 if (ret == 0 && !rec.unknown_state) {
254 ++stat.success_count;
255 }
256 }
257 rec.unknown_state = cur_unknown_state;
258 if (arg.sh.verbose >= 100 && ret == 0) {
259 fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(),
260 k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
261 }
262 return ret;
263 }
264
265 int
verify_read(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4],record_value & rec)266 hs_longrun_thread_base::verify_read(const std::string& k,
267 uint32_t num_rows, uint32_t num_flds, const std::string rrec[4],
268 record_value& rec)
269 {
270 const bool op_success = num_rows != 0;
271 int ret = 0;
272 if (!rec.unknown_state) {
273 if (!rec.deleted && !op_success) {
274 ++stat.verify_error_count;
275 if (arg.sh.verbose > 0) {
276 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
277 "unexpected_read_failure\n",
278 arg.worker_type.c_str(), arg.id, k.c_str());
279 }
280 ret = 1;
281 } else if (rec.deleted && op_success) {
282 ++stat.verify_error_count;
283 if (arg.sh.verbose > 0) {
284 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
285 "unexpected_read_success\n",
286 arg.worker_type.c_str(), arg.id, k.c_str());
287 }
288 ret = 1;
289 } else if (num_flds != 4) {
290 ++stat.verify_error_count;
291 if (arg.sh.verbose > 0) {
292 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
293 "unexpected_read_fldnum %d\n",
294 arg.worker_type.c_str(), arg.id, k.c_str(),
295 static_cast<int>(num_flds));
296 }
297 ret = 1;
298 } else if (rec.deleted) {
299 /* nothing to verify */
300 } else {
301 int diff = 0;
302 for (size_t i = 0; i < 4; ++i) {
303 if (rec.values[i] == rrec[i]) {
304 /* ok */
305 } else {
306 diff = 1;
307 }
308 }
309 if (diff) {
310 std::string mess;
311 for (size_t i = 0; i < 4; ++i) {
312 const std::string& expected = rec.values[i];
313 const std::string& val = rrec[i];
314 mess += " " + val + "/" + expected;
315 }
316 if (arg.sh.verbose > 0) {
317 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
318 "unexpected_read_value %s\n",
319 arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str());
320 }
321 ret = 1;
322 }
323 }
324 }
325 if (arg.sh.verbose >= 100 && ret == 0) {
326 fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str());
327 }
328 if (ret == 0 && !rec.unknown_state) {
329 ++stat.success_count;
330 }
331 return ret;
332 }
333
334 int
verify_readnolock(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4])335 hs_longrun_thread_base::verify_readnolock(const std::string& k,
336 uint32_t num_rows, uint32_t num_flds, const std::string rrec[4])
337 {
338 int ret = 0;
339 if (num_rows != 1 || num_flds != 4) {
340 ++stat.verify_error_count;
341 if (arg.sh.verbose > 0) {
342 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
343 "unexpected_read_failure\n",
344 arg.worker_type.c_str(), arg.id, k.c_str());
345 }
346 ret = 1;
347 }
348 if (arg.sh.verbose >= 100 && ret == 0) {
349 fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(),
350 k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(),
351 rrec[3].c_str());
352 }
353 if (ret == 0) {
354 ++stat.success_count;
355 }
356 return ret;
357 }
358
359 struct hs_longrun_thread_hs : public hs_longrun_thread_base {
hs_longrun_thread_hsdena::hs_longrun_thread_hs360 hs_longrun_thread_hs(const arg_type& arg)
361 : hs_longrun_thread_base(arg) { }
362 void run();
363 int check_hs_error(const char *mess, record_value *rec);
364 int op_insert(record_value& rec);
365 int op_delete(record_value& rec);
366 int op_update(record_value& rec);
367 int op_read(record_value& rec);
368 int op_readnolock(int k);
369 hstcpcli_ptr cli;
370 socket_args sockargs;
371 };
372
373 string_ref
to_string_ref(const std::string & s)374 to_string_ref(const std::string& s)
375 {
376 return string_ref(s.data(), s.size());
377 }
378
379 std::string
to_string(const string_ref & s)380 to_string(const string_ref& s)
381 {
382 return std::string(s.begin(), s.size());
383 }
384
385 void
run()386 hs_longrun_thread_hs::run()
387 {
388 config c = arg.sh.conf;
389 if (arg.op == 'R' || arg.op == 'N') {
390 c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998));
391 } else {
392 c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999));
393 }
394 sockargs.set(c);
395
396 while (arg.sh.running) {
397 if (cli.get() == 0 || !cli->stable_point()) {
398 cli = hstcpcli_i::create(sockargs);
399 if (check_hs_error("connect", 0) != 0) {
400 cli.reset();
401 continue;
402 }
403 cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY",
404 "k,v1,v2,v3", "k,v1,v2,v3");
405 cli->request_send();
406 if (check_hs_error("openindex_send", 0) != 0) {
407 cli.reset();
408 continue;
409 }
410 size_t num_flds = 0;
411 cli->response_recv(num_flds);
412 if (check_hs_error("openindex_recv", 0) != 0) {
413 cli.reset();
414 continue;
415 }
416 cli->response_buf_remove();
417 }
418 const size_t rec_id = rand_record();
419 if (arg.lock_flag) {
420 record_value& rec = *arg.sh.records[rec_id];
421 lock_guard g(rec.lock);
422 int e = 0;
423 switch (arg.op) {
424 case 'I':
425 e = op_insert(rec);
426 break;
427 case 'D':
428 e = op_delete(rec);
429 break;
430 case 'U':
431 e = op_update(rec);
432 break;
433 case 'R':
434 e = op_read(rec);
435 break;
436 default:
437 break;
438 }
439 } else {
440 int e = 0;
441 switch (arg.op) {
442 case 'N':
443 e = op_readnolock(rec_id);
444 break;
445 default:
446 break;
447 }
448 }
449 }
450 }
451
452 int
op_insert(record_value & rec)453 hs_longrun_thread_hs::op_insert(record_value& rec)
454 {
455 const std::string k = rec.key;
456 const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id);
457 const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id);
458 const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id);
459 const string_ref op_ref("+", 1);
460 const string_ref op_args[4] = {
461 to_string_ref(k),
462 to_string_ref(v1),
463 to_string_ref(v2),
464 to_string_ref(v3)
465 };
466 cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0,
467 string_ref(), 0, 0, 0, 0);
468 cli->request_send();
469 if (check_hs_error("op_insert_send", &rec) != 0) { return 1; }
470 size_t numflds = 0;
471 cli->response_recv(numflds);
472 if (arg.sh.verbose > 10) {
473 const string_ref *row = cli->get_next_row();
474 fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(),
475 row ? to_string(row[0]).c_str() : "");
476 }
477 const bool op_success = cli->get_error_code() == 0;
478 int ret = 0;
479 if (!rec.unknown_state) {
480 if (rec.deleted && !op_success) {
481 ++stat.verify_error_count;
482 if (arg.sh.verbose > 0) {
483 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
484 "unexpected_insert_failure\n",
485 arg.worker_type.c_str(), arg.id, k.c_str());
486 }
487 ret = 1;
488 } else if (!rec.deleted && op_success) {
489 ++stat.verify_error_count;
490 if (arg.sh.verbose > 0) {
491 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
492 "unexpected_insert_success\n",
493 arg.worker_type.c_str(), arg.id, k.c_str());
494 }
495 ret = 1;
496 }
497 } else {
498 ++stat.unknown_count;
499 }
500 if (op_success) {
501 rec.values.resize(4);
502 rec.values[0] = k;
503 rec.values[1] = v1;
504 rec.values[2] = v2;
505 rec.values[3] = v3;
506 rec.deleted = false;
507 if (arg.sh.verbose >= 100 && ret == 0) {
508 fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(),
509 v2.c_str(), v3.c_str());
510 }
511 if (ret == 0 && !rec.unknown_state) {
512 ++stat.success_count;
513 }
514 rec.unknown_state = false;
515 }
516 cli->response_buf_remove();
517 return ret;
518 }
519
520 int
op_delete(record_value & rec)521 hs_longrun_thread_hs::op_delete(record_value& rec)
522 {
523 const std::string k = rec.key;
524 const string_ref op_ref("=", 1);
525 const string_ref op_args[1] = {
526 to_string_ref(k),
527 };
528 const string_ref modop_ref("D", 1);
529 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
530 modop_ref, 0, 0, 0, 0);
531 cli->request_send();
532 if (check_hs_error("op_delete_send", &rec) != 0) { return 1; }
533 size_t numflds = 0;
534 cli->response_recv(numflds);
535 if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; }
536 const string_ref *row = cli->get_next_row();
537 const bool op_success = (numflds > 0 && row != 0 &&
538 to_string(row[0]) == "1");
539 int ret = 0;
540 if (!rec.unknown_state) {
541 if (!rec.deleted && !op_success) {
542 ++stat.verify_error_count;
543 if (arg.sh.verbose > 0) {
544 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
545 "unexpected_delete_failure\n",
546 arg.worker_type.c_str(), arg.id, k.c_str());
547 }
548 ret = 1;
549 } else if (rec.deleted && op_success) {
550 ++stat.verify_error_count;
551 if (arg.sh.verbose > 0) {
552 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
553 "unexpected_delete_success\n",
554 arg.worker_type.c_str(), arg.id, k.c_str());
555 }
556 ret = 1;
557 }
558 }
559 cli->response_buf_remove();
560 if (op_success) {
561 rec.deleted = true;
562 if (ret == 0 && !rec.unknown_state) {
563 ++stat.success_count;
564 }
565 rec.unknown_state = false;
566 }
567 if (arg.sh.verbose >= 100 && ret == 0) {
568 fprintf(stderr, "HS_DELETE %s\n", k.c_str());
569 }
570 return ret;
571 }
572
573 int
op_update(record_value & rec)574 hs_longrun_thread_hs::op_update(record_value& rec)
575 {
576 const std::string k = rec.key;
577 const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id);
578 const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id);
579 const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id);
580 const string_ref op_ref("=", 1);
581 const string_ref op_args[1] = {
582 to_string_ref(k),
583 };
584 const string_ref modop_ref("U", 1);
585 const string_ref modop_args[4] = {
586 to_string_ref(k),
587 to_string_ref(v1),
588 to_string_ref(v2),
589 to_string_ref(v3)
590 };
591 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
592 modop_ref, modop_args, 4, 0, 0);
593 cli->request_send();
594 if (check_hs_error("op_update_send", &rec) != 0) { return 1; }
595 size_t numflds = 0;
596 cli->response_recv(numflds);
597 if (check_hs_error("op_update_recv", &rec) != 0) { return 1; }
598 const string_ref *row = cli->get_next_row();
599 uint32_t num_rows = row
600 ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0;
601 cli->response_buf_remove();
602 const bool cur_unknown_state = (num_rows == 1);
603 return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state);
604 }
605
606 int
op_read(record_value & rec)607 hs_longrun_thread_hs::op_read(record_value& rec)
608 {
609 const std::string k = rec.key;
610 const string_ref op_ref("=", 1);
611 const string_ref op_args[1] = {
612 to_string_ref(k),
613 };
614 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
615 string_ref(), 0, 0, 0, 0);
616 cli->request_send();
617 if (check_hs_error("op_read_send", 0) != 0) { return 1; }
618 size_t num_flds = 0;
619 size_t num_rows = 0;
620 cli->response_recv(num_flds);
621 if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
622 const string_ref *row = cli->get_next_row();
623 std::string rrec[4];
624 if (row != 0 && num_flds == 4) {
625 for (int i = 0; i < 4; ++i) {
626 rrec[i] = to_string(row[i]);
627 }
628 ++num_rows;
629 }
630 row = cli->get_next_row();
631 if (row != 0) {
632 ++num_rows;
633 }
634 cli->response_buf_remove();
635 return verify_read(k, num_rows, num_flds, rrec, rec);
636 }
637
638 int
op_readnolock(int key)639 hs_longrun_thread_hs::op_readnolock(int key)
640 {
641 const std::string k = to_stdstring(key);
642 const string_ref op_ref("=", 1);
643 const string_ref op_args[1] = {
644 to_string_ref(k),
645 };
646 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
647 string_ref(), 0, 0, 0, 0);
648 cli->request_send();
649 if (check_hs_error("op_read_send", 0) != 0) { return 1; }
650 size_t num_flds = 0;
651 size_t num_rows = 0;
652 cli->response_recv(num_flds);
653 if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
654 const string_ref *row = cli->get_next_row();
655 std::string rrec[4];
656 if (row != 0 && num_flds == 4) {
657 for (int i = 0; i < 4; ++i) {
658 rrec[i] = to_string(row[i]);
659 }
660 ++num_rows;
661 }
662 row = cli->get_next_row();
663 if (row != 0) {
664 ++num_rows;
665 }
666 cli->response_buf_remove();
667 return verify_readnolock(k, num_rows, num_flds, rrec);
668 }
669
670 int
check_hs_error(const char * mess,record_value * rec)671 hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec)
672 {
673 const int err = cli->get_error_code();
674 if (err == 0) {
675 return 0;
676 }
677 ++stat.runtime_error_count;
678 if (arg.sh.verbose > 0) {
679 const std::string estr = cli->get_error();
680 fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n",
681 arg.op, arg.id, mess, err, estr.c_str());
682 }
683 if (rec) {
684 rec->unknown_state = true;
685 }
686 return 1;
687 }
688
689 struct hs_longrun_thread_my : public hs_longrun_thread_base {
hs_longrun_thread_mydena::hs_longrun_thread_my690 hs_longrun_thread_my(const arg_type& arg)
691 : hs_longrun_thread_base(arg), connected(false) { }
692 void run();
693 void show_mysql_error(const char *mess, record_value *rec);
694 int op_insert(record_value& rec);
695 int op_delete(record_value& rec);
696 int op_update(record_value& rec);
697 int op_delins(record_value& rec);
698 int op_read(record_value& rec);
699 auto_mysql db;
700 bool connected;
701 };
702
703 void
run()704 hs_longrun_thread_my::run()
705 {
706 const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
707 const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
708 const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
709 const std::string mysql_dbname = "hstestdb";
710
711 while (arg.sh.running) {
712 if (!connected) {
713 if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
714 mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
715 show_mysql_error("mysql_real_connect", 0);
716 continue;
717 }
718 }
719 connected = true;
720 const size_t rec_id = rand_record();
721 record_value& rec = *arg.sh.records[rec_id];
722 lock_guard g(rec.lock);
723 int e = 0;
724 switch (arg.op) {
725 #if 0
726 case 'I':
727 e = op_insert(rec);
728 break;
729 case 'D':
730 e = op_delete(rec);
731 break;
732 case 'U':
733 e = op_update(rec);
734 break;
735 #endif
736 case 'T':
737 e = op_delins(rec);
738 break;
739 case 'R':
740 e = op_read(rec);
741 break;
742 default:
743 break;
744 }
745 }
746 }
747
748 int
op_delins(record_value & rec)749 hs_longrun_thread_my::op_delins(record_value& rec)
750 {
751 const std::string k = rec.key;
752 const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id);
753 const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id);
754 const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id);
755 int success = 0;
756 bool cur_unknown_state = false;
757 do {
758 char query[1024];
759 #if 1
760 if (mysql_query(db, "begin") != 0) {
761 if (arg.sh.verbose >= 20) {
762 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin");
763 }
764 break;
765 }
766 #endif
767 cur_unknown_state = true;
768 snprintf(query, 1024,
769 "delete from hstesttbl where k = '%s'", k.c_str());
770 if (mysql_query(db, query) != 0) {
771 if (arg.sh.verbose >= 20) {
772 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
773 }
774 break;
775 }
776 if (mysql_affected_rows(db) != 1) {
777 if (arg.sh.verbose >= 20) {
778 fprintf(stderr, "mysql: notfound: [%s]\n", query);
779 }
780 break;
781 }
782 snprintf(query, 1024,
783 "insert into hstesttbl values ('%s', '%s', '%s', '%s')",
784 k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
785 if (mysql_query(db, query) != 0) {
786 if (arg.sh.verbose >= 20) {
787 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
788 }
789 break;
790 }
791 #if 1
792 if (mysql_query(db, "commit") != 0) {
793 if (arg.sh.verbose >= 20) {
794 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit");
795 }
796 break;
797 }
798 #endif
799 success = true;
800 cur_unknown_state = false;
801 } while (false);
802 return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state);
803 }
804
805 int
op_read(record_value & rec)806 hs_longrun_thread_my::op_read(record_value& rec)
807 {
808 const std::string k = rec.key;
809 char query[1024] = { 0 };
810 const int len = snprintf(query, 1024,
811 "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str());
812 const int r = mysql_real_query(db, query, len > 0 ? len : 0);
813 if (r != 0) {
814 show_mysql_error(query, 0);
815 return 1;
816 }
817 MYSQL_ROW row = 0;
818 unsigned long *lengths = 0;
819 unsigned int num_rows = 0;
820 unsigned int num_flds = 0;
821 auto_mysql_res res(db);
822 std::string rrec[4];
823 if (res != 0) {
824 num_flds = mysql_num_fields(res);
825 row = mysql_fetch_row(res);
826 if (row != 0) {
827 lengths = mysql_fetch_lengths(res);
828 if (num_flds == 4) {
829 for (int i = 0; i < 4; ++i) {
830 rrec[i] = std::string(row[i], lengths[i]);
831 }
832 }
833 ++num_rows;
834 row = mysql_fetch_row(res);
835 if (row != 0) {
836 ++num_rows;
837 }
838 }
839 }
840 return verify_read(k, num_rows, num_flds, rrec, rec);
841 }
842
843 void
show_mysql_error(const char * mess,record_value * rec)844 hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec)
845 {
846 ++stat.runtime_error_count;
847 if (arg.sh.verbose > 0) {
848 fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n",
849 arg.op, arg.id, mess, mysql_error(db));
850 }
851 if (rec) {
852 rec->unknown_state = true;
853 }
854 db.reset();
855 connected = false;
856 }
857
858 void
mysql_do(MYSQL * db,const char * query)859 mysql_do(MYSQL *db, const char *query)
860 {
861 if (mysql_real_query(db, query, strlen(query)) != 0) {
862 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
863 fatal_exit("mysql_do");
864 }
865 }
866
867 void
hs_longrun_init_table(const config & conf,int num_prepare,hs_longrun_shared & shared)868 hs_longrun_init_table(const config& conf, int num_prepare,
869 hs_longrun_shared& shared)
870 {
871 const std::string mysql_host = conf.get_str("host", "localhost");
872 const std::string mysql_user = conf.get_str("mysqluser", "root");
873 const std::string mysql_passwd = conf.get_str("mysqlpass", "");
874 const std::string mysql_dbname = "";
875 auto_mysql db;
876 if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
877 mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
878 fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db));
879 fatal_exit("hs_longrun_init_table");
880 }
881 mysql_do(db, "drop database if exists hstestdb");
882 mysql_do(db, "create database hstestdb");
883 mysql_do(db, "use hstestdb");
884 mysql_do(db,
885 "create table hstesttbl ("
886 "k int primary key,"
887 "v1 varchar(32) not null,"
888 "v2 varchar(32) not null,"
889 "v3 varchar(32) not null"
890 ") character set utf8 collate utf8_bin engine = innodb");
891 for (int i = 0; i < num_prepare; ++i) {
892 const std::string i_str = to_stdstring(i);
893 const std::string v1 = "pv1_" + i_str;
894 const std::string v2 = "pv2_" + i_str;
895 const std::string v3 = "pv3_" + i_str;
896 char buf[1024];
897 snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values"
898 "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str());
899 mysql_do(db, buf);
900 record_value *rec = shared.records[i];
901 rec->key = i_str;
902 rec->values.resize(4);
903 rec->values[0] = i_str;
904 rec->values[1] = v1;
905 rec->values[2] = v2;
906 rec->values[3] = v3;
907 rec->deleted = false;
908 }
909 }
910
911 int
hs_longrun_main(int argc,char ** argv)912 hs_longrun_main(int argc, char **argv)
913 {
914 hs_longrun_shared shared;
915 parse_args(argc, argv, shared.conf);
916 shared.conf["host"] = shared.conf.get_str("host", "localhost");
917 shared.verbose = shared.conf.get_int("verbose", 1);
918 const int table_size = shared.conf.get_int("table_size", 10000);
919 for (int i = 0; i < table_size; ++i) {
920 std::auto_ptr<record_value> rec(new record_value());
921 rec->key = to_stdstring(i);
922 shared.records.push_back_ptr(rec);
923 }
924 mysql_library_init(0, 0, 0);
925 const int duration = shared.conf.get_int("duration", 10);
926 const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10);
927 const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10);
928 const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10);
929 const int num_hsread = shared.conf.get_int("num_hsread", 10);
930 const int num_myread = shared.conf.get_int("num_myread", 10);
931 const int num_mydelins = shared.conf.get_int("num_mydelins", 10);
932 int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10);
933 const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0);
934 if (!always_filled) {
935 num_hsreadnolock = 0;
936 }
937 hs_longrun_init_table(shared.conf, always_filled ? table_size : 0,
938 shared);
939 /* create worker threads */
940 static const struct thrtmpl_type {
941 const char *type; char op; int num; int hs; int lock;
942 } thrtmpl[] = {
943 { "hsinsert", 'I', num_hsinsert, 1, 1 },
944 { "hsdelete", 'D', num_hsdelete, 1, 1 },
945 { "hsupdate", 'U', num_hsupdate, 1, 1 },
946 { "hsread", 'R', num_hsread, 1, 1 },
947 { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 },
948 { "myread", 'R', num_myread, 0, 1 },
949 { "mydelins", 'T', num_mydelins, 0, 1 },
950 };
951 typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type;
952 thrs_type thrs;
953 for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) {
954 const thrtmpl_type& e = thrtmpl[i];
955 for (int j = 0; j < e.num; ++j) {
956 int id = thrs.size();
957 const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock,
958 shared);
959 std::auto_ptr<hs_longrun_thread_base> thr;
960 if (e.hs) {
961 thr.reset(new hs_longrun_thread_hs(arg));
962 } else {
963 thr.reset(new hs_longrun_thread_my(arg));
964 }
965 thrs.push_back_ptr(thr);
966 }
967 }
968 shared.num_threads = thrs.size();
969 /* start threads */
970 fprintf(stderr, "START\n");
971 shared.running = 1;
972 for (size_t i = 0; i < thrs.size(); ++i) {
973 thrs[i]->start();
974 }
975 /* wait */
976 sleep(duration);
977 /* stop thread */
978 shared.running = 0;
979 for (size_t i = 0; i < thrs.size(); ++i) {
980 thrs[i]->join();
981 }
982 fprintf(stderr, "DONE\n");
983 /* summary */
984 typedef std::map<std::string, hs_longrun_stat> stat_map;
985 stat_map sm;
986 for (size_t i = 0; i < thrs.size(); ++i) {
987 hs_longrun_thread_base *const thr = thrs[i];
988 const std::string wt = thr->arg.worker_type;
989 hs_longrun_stat& v = sm[wt];
990 v.add(thr->stat);
991 }
992 hs_longrun_stat total;
993 for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) {
994 if (i->second.verify_error_count != 0) {
995 fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(),
996 i->second.verify_error_count);
997 }
998 if (i->second.runtime_error_count) {
999 fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(),
1000 i->second.runtime_error_count);
1001 }
1002 if (i->second.unknown_count) {
1003 fprintf(stderr, "%s unknown %llu\n", i->first.c_str(),
1004 i->second.unknown_count);
1005 }
1006 fprintf(stderr, "%s success %llu\n", i->first.c_str(),
1007 i->second.success_count);
1008 total.add(i->second);
1009 }
1010 if (total.verify_error_count != 0) {
1011 fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count);
1012 }
1013 if (total.runtime_error_count != 0) {
1014 fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count);
1015 }
1016 if (total.unknown_count != 0) {
1017 fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count);
1018 }
1019 fprintf(stderr, "TOTAL success %llu\n", total.success_count);
1020 mysql_library_end();
1021 return 0;
1022 }
1023
1024 };
1025
1026 int
main(int argc,char ** argv)1027 main(int argc, char **argv)
1028 {
1029 return dena::hs_longrun_main(argc, argv);
1030 }
1031
1032