1
2 // vim:sw=2:ai
3
4 #include <signal.h>
5 #include <sys/time.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <vector>
9 #include <map>
10 #include <stdlib.h>
11 #include <memory>
12 #include <errno.h>
13 #include <mysql.h>
14 #include <time.h>
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include "util.hpp"
20 #include "auto_ptrcontainer.hpp"
21 #include "socket.hpp"
22 #include "hstcpcli.hpp"
23 #include "string_util.hpp"
24 #include "mutex.hpp"
25
26 namespace dena {
27
28 struct auto_mysql : private noncopyable {
auto_mysqldena::auto_mysql29 auto_mysql() : db(0) {
30 reset();
31 }
~auto_mysqldena::auto_mysql32 ~auto_mysql() {
33 if (db) {
34 mysql_close(db);
35 }
36 }
resetdena::auto_mysql37 void reset() {
38 if (db) {
39 mysql_close(db);
40 }
41 if ((db = mysql_init(0)) == 0) {
42 fatal_abort("failed to initialize mysql client");
43 }
44 }
operator MYSQL*dena::auto_mysql45 operator MYSQL *() const { return db; }
46 private:
47 MYSQL *db;
48 };
49
50 struct auto_mysql_res : private noncopyable {
auto_mysql_resdena::auto_mysql_res51 auto_mysql_res(MYSQL *db) {
52 res = mysql_store_result(db);
53 }
~auto_mysql_resdena::auto_mysql_res54 ~auto_mysql_res() {
55 if (res) {
56 mysql_free_result(res);
57 }
58 }
operator MYSQL_RES*dena::auto_mysql_res59 operator MYSQL_RES *() const { return res; }
60 private:
61 MYSQL_RES *res;
62 };
63
64 struct auto_mysql_stmt : private noncopyable {
auto_mysql_stmtdena::auto_mysql_stmt65 auto_mysql_stmt(MYSQL *db) {
66 stmt = mysql_stmt_init(db);
67 }
~auto_mysql_stmtdena::auto_mysql_stmt68 ~auto_mysql_stmt() {
69 if (stmt) {
70 mysql_stmt_close(stmt);
71 }
72 }
operator MYSQL_STMT*dena::auto_mysql_stmt73 operator MYSQL_STMT *() const { return stmt; }
74 private:
75 MYSQL_STMT *stmt;
76 };
77
78 double
gettimeofday_double()79 gettimeofday_double()
80 {
81 struct timeval tv = { };
82 if (gettimeofday(&tv, 0) != 0) {
83 fatal_abort("gettimeofday");
84 }
85 return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
86 }
87
88 struct record_value {
89 mutex lock;
90 bool deleted;
91 bool unknown_state;
92 std::string key;
93 std::vector<std::string> values;
record_valuedena::record_value94 record_value() : deleted(true), unknown_state(false) { }
95 };
96
97 struct hs_longrun_shared {
98 config conf;
99 socket_args arg;
100 int verbose;
101 long num_threads;
102 int usleep;
103 volatile mutable int running;
104 auto_ptrcontainer< std::vector<record_value *> > records;
hs_longrun_shareddena::hs_longrun_shared105 hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { }
106 };
107
108 struct thread_base {
thread_basedena::thread_base109 thread_base() : need_join(false), stack_size(256 * 1024) { }
~thread_basedena::thread_base110 virtual ~thread_base() {
111 join();
112 }
113 virtual void run() = 0;
startdena::thread_base114 void start() {
115 if (!start_nothrow()) {
116 fatal_abort("thread::start");
117 }
118 }
start_nothrowdena::thread_base119 bool start_nothrow() {
120 if (need_join) {
121 return need_join; /* true */
122 }
123 void *const arg = this;
124 pthread_attr_t attr;
125 if (pthread_attr_init(&attr) != 0) {
126 fatal_abort("pthread_attr_init");
127 }
128 if (pthread_attr_setstacksize(&attr, stack_size) != 0) {
129 fatal_abort("pthread_attr_setstacksize");
130 }
131 const int r = pthread_create(&thr, &attr, thread_main, arg);
132 if (pthread_attr_destroy(&attr) != 0) {
133 fatal_abort("pthread_attr_destroy");
134 }
135 if (r != 0) {
136 return need_join; /* false */
137 }
138 need_join = true;
139 return need_join; /* true */
140 }
joindena::thread_base141 void join() {
142 if (!need_join) {
143 return;
144 }
145 int e = 0;
146 if ((e = pthread_join(thr, 0)) != 0) {
147 fatal_abort("pthread_join");
148 }
149 need_join = false;
150 }
151 private:
thread_maindena::thread_base152 static void *thread_main(void *arg) {
153 thread_base *p = static_cast<thread_base *>(arg);
154 p->run();
155 return 0;
156 }
157 private:
158 pthread_t thr;
159 bool need_join;
160 size_t stack_size;
161 };
162
163 struct hs_longrun_stat {
164 unsigned long long verify_error_count;
165 unsigned long long runtime_error_count;
166 unsigned long long unknown_count;
167 unsigned long long success_count;
hs_longrun_statdena::hs_longrun_stat168 hs_longrun_stat()
169 : verify_error_count(0), runtime_error_count(0),
170 unknown_count(0), success_count(0) { }
adddena::hs_longrun_stat171 void add(const hs_longrun_stat& x) {
172 verify_error_count += x.verify_error_count;
173 runtime_error_count += x.runtime_error_count;
174 unknown_count += x.unknown_count;
175 success_count += x.success_count;
176 }
177 };
178
179 struct hs_longrun_thread_base : public thread_base {
180 struct arg_type {
181 int id;
182 std::string worker_type;
183 char op;
184 int lock_flag;
185 const hs_longrun_shared& sh;
arg_typedena::hs_longrun_thread_base::arg_type186 arg_type(int id, const std::string& worker_type, char op, int lock_flag,
187 const hs_longrun_shared& sh)
188 : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag),
189 sh(sh) { }
190 };
191 arg_type arg;
192 hs_longrun_stat stat;
193 drand48_data randbuf;
194 unsigned int seed;
hs_longrun_thread_basedena::hs_longrun_thread_base195 hs_longrun_thread_base(const arg_type& arg)
196 : arg(arg), seed(0) {
197 seed = time(0) + arg.id + 1;
198 srand48_r(seed, &randbuf);
199 }
~hs_longrun_thread_basedena::hs_longrun_thread_base200 virtual ~hs_longrun_thread_base() { }
201 virtual void run() = 0;
rand_recorddena::hs_longrun_thread_base202 size_t rand_record() {
203 double v = 0;
204 drand48_r(&randbuf, &v);
205 const size_t sz = arg.sh.records.size();
206 size_t r = size_t(v * sz);
207 if (r >= sz) {
208 r = 0;
209 }
210 return r;
211 }
212 int verify_update(const std::string& k, const std::string& v1,
213 const std::string& v2, const std::string& v3, record_value& rec,
214 uint32_t num_rows, bool cur_unknown_state);
215 int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds,
216 const std::string rrec[4], record_value& rec);
217 int verify_readnolock(const std::string& k, uint32_t num_rows,
218 uint32_t num_flds, const std::string rrec[4]);
219 };
220
221 int
verify_update(const std::string & k,const std::string & v1,const std::string & v2,const std::string & v3,record_value & rec,uint32_t num_rows,bool cur_unknown_state)222 hs_longrun_thread_base::verify_update(const std::string& k,
223 const std::string& v1, const std::string& v2, const std::string& v3,
224 record_value& rec, uint32_t num_rows, bool cur_unknown_state)
225 {
226 const bool op_success = num_rows == 1;
227 int ret = 0;
228 if (!rec.unknown_state) {
229 if (!rec.deleted && !op_success) {
230 ++stat.verify_error_count;
231 if (arg.sh.verbose > 0) {
232 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
233 "unexpected_update_failure\n",
234 arg.worker_type.c_str(), arg.id, k.c_str());
235 }
236 ret = 1;
237 } else if (rec.deleted && op_success) {
238 ++stat.verify_error_count;
239 if (arg.sh.verbose > 0) {
240 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
241 "unexpected_update_success\n",
242 arg.worker_type.c_str(), arg.id, k.c_str());
243 }
244 ret = 1;
245 }
246 }
247 if (op_success) {
248 rec.values.resize(4);
249 rec.values[0] = k;
250 rec.values[1] = v1;
251 rec.values[2] = v2;
252 rec.values[3] = v3;
253 if (ret == 0 && !rec.unknown_state) {
254 ++stat.success_count;
255 }
256 }
257 rec.unknown_state = cur_unknown_state;
258 if (arg.sh.verbose >= 100 && ret == 0) {
259 fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(),
260 k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
261 }
262 return ret;
263 }
264
265 int
verify_read(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4],record_value & rec)266 hs_longrun_thread_base::verify_read(const std::string& k,
267 uint32_t num_rows, uint32_t num_flds, const std::string rrec[4],
268 record_value& rec)
269 {
270 const bool op_success = num_rows != 0;
271 int ret = 0;
272 if (!rec.unknown_state) {
273 if (!rec.deleted && !op_success) {
274 ++stat.verify_error_count;
275 if (arg.sh.verbose > 0) {
276 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
277 "unexpected_read_failure\n",
278 arg.worker_type.c_str(), arg.id, k.c_str());
279 }
280 ret = 1;
281 } else if (rec.deleted && op_success) {
282 ++stat.verify_error_count;
283 if (arg.sh.verbose > 0) {
284 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
285 "unexpected_read_success\n",
286 arg.worker_type.c_str(), arg.id, k.c_str());
287 }
288 ret = 1;
289 } else if (num_flds != 4) {
290 ++stat.verify_error_count;
291 if (arg.sh.verbose > 0) {
292 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
293 "unexpected_read_fldnum %d\n",
294 arg.worker_type.c_str(), arg.id, k.c_str(),
295 static_cast<int>(num_flds));
296 }
297 ret = 1;
298 } else if (rec.deleted) {
299 /* nothing to verify */
300 } else {
301 int diff = 0;
302 for (size_t i = 0; i < 4; ++i) {
303 if (rec.values[i] == rrec[i]) {
304 /* ok */
305 } else {
306 diff = 1;
307 }
308 }
309 if (diff) {
310 std::string mess;
311 for (size_t i = 0; i < 4; ++i) {
312 const std::string& expected = rec.values[i];
313 const std::string& val = rrec[i];
314 mess += " " + val + "/" + expected;
315 }
316 if (arg.sh.verbose > 0) {
317 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
318 "unexpected_read_value %s\n",
319 arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str());
320 }
321 ret = 1;
322 }
323 }
324 }
325 if (arg.sh.verbose >= 100 && ret == 0) {
326 fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str());
327 }
328 if (ret == 0 && !rec.unknown_state) {
329 ++stat.success_count;
330 }
331 return ret;
332 }
333
334 int
verify_readnolock(const std::string & k,uint32_t num_rows,uint32_t num_flds,const std::string rrec[4])335 hs_longrun_thread_base::verify_readnolock(const std::string& k,
336 uint32_t num_rows, uint32_t num_flds, const std::string rrec[4])
337 {
338 int ret = 0;
339 if (num_rows != 1 || num_flds != 4) {
340 ++stat.verify_error_count;
341 if (arg.sh.verbose > 0) {
342 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
343 "unexpected_read_failure\n",
344 arg.worker_type.c_str(), arg.id, k.c_str());
345 }
346 ret = 1;
347 }
348 if (arg.sh.verbose >= 100 && ret == 0) {
349 fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(),
350 k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(),
351 rrec[3].c_str());
352 }
353 if (ret == 0) {
354 ++stat.success_count;
355 }
356 return ret;
357 }
358
359 struct hs_longrun_thread_hs : public hs_longrun_thread_base {
hs_longrun_thread_hsdena::hs_longrun_thread_hs360 hs_longrun_thread_hs(const arg_type& arg)
361 : hs_longrun_thread_base(arg) { }
362 void run();
363 int check_hs_error(const char *mess, record_value *rec);
364 int op_insert(record_value& rec);
365 int op_delete(record_value& rec);
366 int op_update(record_value& rec);
367 int op_read(record_value& rec);
368 int op_readnolock(int k);
369 hstcpcli_ptr cli;
370 socket_args sockargs;
371 };
372
373 struct lock_guard : noncopyable {
lock_guarddena::lock_guard374 lock_guard(mutex& mtx) : mtx(mtx) {
375 mtx.lock();
376 }
~lock_guarddena::lock_guard377 ~lock_guard() {
378 mtx.unlock();
379 }
380 mutex& mtx;
381 };
382
383 string_ref
to_string_ref(const std::string & s)384 to_string_ref(const std::string& s)
385 {
386 return string_ref(s.data(), s.size());
387 }
388
389 std::string
to_string(const string_ref & s)390 to_string(const string_ref& s)
391 {
392 return std::string(s.begin(), s.size());
393 }
394
395 void
run()396 hs_longrun_thread_hs::run()
397 {
398 config c = arg.sh.conf;
399 if (arg.op == 'R' || arg.op == 'N') {
400 c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998));
401 } else {
402 c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999));
403 }
404 sockargs.set(c);
405
406 while (arg.sh.running) {
407 if (cli.get() == 0 || !cli->stable_point()) {
408 cli = hstcpcli_i::create(sockargs);
409 if (check_hs_error("connect", 0) != 0) {
410 cli.reset();
411 continue;
412 }
413 cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY",
414 "k,v1,v2,v3", "k,v1,v2,v3");
415 cli->request_send();
416 if (check_hs_error("openindex_send", 0) != 0) {
417 cli.reset();
418 continue;
419 }
420 size_t num_flds = 0;
421 cli->response_recv(num_flds);
422 if (check_hs_error("openindex_recv", 0) != 0) {
423 cli.reset();
424 continue;
425 }
426 cli->response_buf_remove();
427 }
428 const size_t rec_id = rand_record();
429 if (arg.lock_flag) {
430 record_value& rec = *arg.sh.records[rec_id];
431 lock_guard g(rec.lock);
432 int e = 0;
433 switch (arg.op) {
434 case 'I':
435 e = op_insert(rec);
436 break;
437 case 'D':
438 e = op_delete(rec);
439 break;
440 case 'U':
441 e = op_update(rec);
442 break;
443 case 'R':
444 e = op_read(rec);
445 break;
446 default:
447 break;
448 }
449 } else {
450 int e = 0;
451 switch (arg.op) {
452 case 'N':
453 e = op_readnolock(rec_id);
454 break;
455 default:
456 break;
457 }
458 }
459 }
460 }
461
462 int
op_insert(record_value & rec)463 hs_longrun_thread_hs::op_insert(record_value& rec)
464 {
465 const std::string k = rec.key;
466 const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id);
467 const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id);
468 const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id);
469 const string_ref op_ref("+", 1);
470 const string_ref op_args[4] = {
471 to_string_ref(k),
472 to_string_ref(v1),
473 to_string_ref(v2),
474 to_string_ref(v3)
475 };
476 cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0,
477 string_ref(), 0, 0, 0, 0);
478 cli->request_send();
479 if (check_hs_error("op_insert_send", &rec) != 0) { return 1; }
480 size_t numflds = 0;
481 cli->response_recv(numflds);
482 if (arg.sh.verbose > 10) {
483 const string_ref *row = cli->get_next_row();
484 fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(),
485 row ? to_string(row[0]).c_str() : "");
486 }
487 const bool op_success = cli->get_error_code() == 0;
488 int ret = 0;
489 if (!rec.unknown_state) {
490 if (rec.deleted && !op_success) {
491 ++stat.verify_error_count;
492 if (arg.sh.verbose > 0) {
493 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
494 "unexpected_insert_failure\n",
495 arg.worker_type.c_str(), arg.id, k.c_str());
496 }
497 ret = 1;
498 } else if (!rec.deleted && op_success) {
499 ++stat.verify_error_count;
500 if (arg.sh.verbose > 0) {
501 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
502 "unexpected_insert_success\n",
503 arg.worker_type.c_str(), arg.id, k.c_str());
504 }
505 ret = 1;
506 }
507 } else {
508 ++stat.unknown_count;
509 }
510 if (op_success) {
511 rec.values.resize(4);
512 rec.values[0] = k;
513 rec.values[1] = v1;
514 rec.values[2] = v2;
515 rec.values[3] = v3;
516 rec.deleted = false;
517 if (arg.sh.verbose >= 100 && ret == 0) {
518 fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(),
519 v2.c_str(), v3.c_str());
520 }
521 if (ret == 0 && !rec.unknown_state) {
522 ++stat.success_count;
523 }
524 rec.unknown_state = false;
525 }
526 cli->response_buf_remove();
527 return ret;
528 }
529
530 int
op_delete(record_value & rec)531 hs_longrun_thread_hs::op_delete(record_value& rec)
532 {
533 const std::string k = rec.key;
534 const string_ref op_ref("=", 1);
535 const string_ref op_args[1] = {
536 to_string_ref(k),
537 };
538 const string_ref modop_ref("D", 1);
539 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
540 modop_ref, 0, 0, 0, 0);
541 cli->request_send();
542 if (check_hs_error("op_delete_send", &rec) != 0) { return 1; }
543 size_t numflds = 0;
544 cli->response_recv(numflds);
545 if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; }
546 const string_ref *row = cli->get_next_row();
547 const bool op_success = (numflds > 0 && row != 0 &&
548 to_string(row[0]) == "1");
549 int ret = 0;
550 if (!rec.unknown_state) {
551 if (!rec.deleted && !op_success) {
552 ++stat.verify_error_count;
553 if (arg.sh.verbose > 0) {
554 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
555 "unexpected_delete_failure\n",
556 arg.worker_type.c_str(), arg.id, k.c_str());
557 }
558 ret = 1;
559 } else if (rec.deleted && op_success) {
560 ++stat.verify_error_count;
561 if (arg.sh.verbose > 0) {
562 fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
563 "unexpected_delete_success\n",
564 arg.worker_type.c_str(), arg.id, k.c_str());
565 }
566 ret = 1;
567 }
568 }
569 cli->response_buf_remove();
570 if (op_success) {
571 rec.deleted = true;
572 if (ret == 0 && !rec.unknown_state) {
573 ++stat.success_count;
574 }
575 rec.unknown_state = false;
576 }
577 if (arg.sh.verbose >= 100 && ret == 0) {
578 fprintf(stderr, "HS_DELETE %s\n", k.c_str());
579 }
580 return ret;
581 }
582
583 int
op_update(record_value & rec)584 hs_longrun_thread_hs::op_update(record_value& rec)
585 {
586 const std::string k = rec.key;
587 const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id);
588 const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id);
589 const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id);
590 const string_ref op_ref("=", 1);
591 const string_ref op_args[1] = {
592 to_string_ref(k),
593 };
594 const string_ref modop_ref("U", 1);
595 const string_ref modop_args[4] = {
596 to_string_ref(k),
597 to_string_ref(v1),
598 to_string_ref(v2),
599 to_string_ref(v3)
600 };
601 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
602 modop_ref, modop_args, 4, 0, 0);
603 cli->request_send();
604 if (check_hs_error("op_update_send", &rec) != 0) { return 1; }
605 size_t numflds = 0;
606 cli->response_recv(numflds);
607 if (check_hs_error("op_update_recv", &rec) != 0) { return 1; }
608 const string_ref *row = cli->get_next_row();
609 uint32_t num_rows = row
610 ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0;
611 cli->response_buf_remove();
612 const bool cur_unknown_state = (num_rows == 1);
613 return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state);
614 }
615
616 int
op_read(record_value & rec)617 hs_longrun_thread_hs::op_read(record_value& rec)
618 {
619 const std::string k = rec.key;
620 const string_ref op_ref("=", 1);
621 const string_ref op_args[1] = {
622 to_string_ref(k),
623 };
624 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
625 string_ref(), 0, 0, 0, 0);
626 cli->request_send();
627 if (check_hs_error("op_read_send", 0) != 0) { return 1; }
628 size_t num_flds = 0;
629 size_t num_rows = 0;
630 cli->response_recv(num_flds);
631 if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
632 const string_ref *row = cli->get_next_row();
633 std::string rrec[4];
634 if (row != 0 && num_flds == 4) {
635 for (int i = 0; i < 4; ++i) {
636 rrec[i] = to_string(row[i]);
637 }
638 ++num_rows;
639 }
640 row = cli->get_next_row();
641 if (row != 0) {
642 ++num_rows;
643 }
644 cli->response_buf_remove();
645 return verify_read(k, num_rows, num_flds, rrec, rec);
646 }
647
648 int
op_readnolock(int key)649 hs_longrun_thread_hs::op_readnolock(int key)
650 {
651 const std::string k = to_stdstring(key);
652 const string_ref op_ref("=", 1);
653 const string_ref op_args[1] = {
654 to_string_ref(k),
655 };
656 cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
657 string_ref(), 0, 0, 0, 0);
658 cli->request_send();
659 if (check_hs_error("op_read_send", 0) != 0) { return 1; }
660 size_t num_flds = 0;
661 size_t num_rows = 0;
662 cli->response_recv(num_flds);
663 if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
664 const string_ref *row = cli->get_next_row();
665 std::string rrec[4];
666 if (row != 0 && num_flds == 4) {
667 for (int i = 0; i < 4; ++i) {
668 rrec[i] = to_string(row[i]);
669 }
670 ++num_rows;
671 }
672 row = cli->get_next_row();
673 if (row != 0) {
674 ++num_rows;
675 }
676 cli->response_buf_remove();
677 return verify_readnolock(k, num_rows, num_flds, rrec);
678 }
679
680 int
check_hs_error(const char * mess,record_value * rec)681 hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec)
682 {
683 const int err = cli->get_error_code();
684 if (err == 0) {
685 return 0;
686 }
687 ++stat.runtime_error_count;
688 if (arg.sh.verbose > 0) {
689 const std::string estr = cli->get_error();
690 fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n",
691 arg.op, arg.id, mess, err, estr.c_str());
692 }
693 if (rec) {
694 rec->unknown_state = true;
695 }
696 return 1;
697 }
698
699 struct hs_longrun_thread_my : public hs_longrun_thread_base {
hs_longrun_thread_mydena::hs_longrun_thread_my700 hs_longrun_thread_my(const arg_type& arg)
701 : hs_longrun_thread_base(arg), connected(false) { }
702 void run();
703 void show_mysql_error(const char *mess, record_value *rec);
704 int op_insert(record_value& rec);
705 int op_delete(record_value& rec);
706 int op_update(record_value& rec);
707 int op_delins(record_value& rec);
708 int op_read(record_value& rec);
709 auto_mysql db;
710 bool connected;
711 };
712
713 void
run()714 hs_longrun_thread_my::run()
715 {
716 const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
717 const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
718 const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
719 const std::string mysql_dbname = "hstestdb";
720
721 while (arg.sh.running) {
722 if (!connected) {
723 if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
724 mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
725 show_mysql_error("mysql_real_connect", 0);
726 continue;
727 }
728 }
729 connected = true;
730 const size_t rec_id = rand_record();
731 record_value& rec = *arg.sh.records[rec_id];
732 lock_guard g(rec.lock);
733 int e = 0;
734 switch (arg.op) {
735 #if 0
736 case 'I':
737 e = op_insert(rec);
738 break;
739 case 'D':
740 e = op_delete(rec);
741 break;
742 case 'U':
743 e = op_update(rec);
744 break;
745 #endif
746 case 'T':
747 e = op_delins(rec);
748 break;
749 case 'R':
750 e = op_read(rec);
751 break;
752 default:
753 break;
754 }
755 }
756 }
757
758 int
op_delins(record_value & rec)759 hs_longrun_thread_my::op_delins(record_value& rec)
760 {
761 const std::string k = rec.key;
762 const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id);
763 const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id);
764 const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id);
765 int success = 0;
766 bool cur_unknown_state = false;
767 do {
768 char query[1024];
769 #if 1
770 if (mysql_query(db, "begin") != 0) {
771 if (arg.sh.verbose >= 20) {
772 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin");
773 }
774 break;
775 }
776 #endif
777 cur_unknown_state = true;
778 snprintf(query, 1024,
779 "delete from hstesttbl where k = '%s'", k.c_str());
780 if (mysql_query(db, query) != 0) {
781 if (arg.sh.verbose >= 20) {
782 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
783 }
784 break;
785 }
786 if (mysql_affected_rows(db) != 1) {
787 if (arg.sh.verbose >= 20) {
788 fprintf(stderr, "mysql: notfound: [%s]\n", query);
789 }
790 break;
791 }
792 snprintf(query, 1024,
793 "insert into hstesttbl values ('%s', '%s', '%s', '%s')",
794 k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
795 if (mysql_query(db, query) != 0) {
796 if (arg.sh.verbose >= 20) {
797 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
798 }
799 break;
800 }
801 #if 1
802 if (mysql_query(db, "commit") != 0) {
803 if (arg.sh.verbose >= 20) {
804 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit");
805 }
806 break;
807 }
808 #endif
809 success = true;
810 cur_unknown_state = false;
811 } while (false);
812 return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state);
813 }
814
815 int
op_read(record_value & rec)816 hs_longrun_thread_my::op_read(record_value& rec)
817 {
818 const std::string k = rec.key;
819 char query[1024] = { 0 };
820 const int len = snprintf(query, 1024,
821 "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str());
822 const int r = mysql_real_query(db, query, len > 0 ? len : 0);
823 if (r != 0) {
824 show_mysql_error(query, 0);
825 return 1;
826 }
827 MYSQL_ROW row = 0;
828 unsigned long *lengths = 0;
829 unsigned int num_rows = 0;
830 unsigned int num_flds = 0;
831 auto_mysql_res res(db);
832 std::string rrec[4];
833 if (res != 0) {
834 num_flds = mysql_num_fields(res);
835 row = mysql_fetch_row(res);
836 if (row != 0) {
837 lengths = mysql_fetch_lengths(res);
838 if (num_flds == 4) {
839 for (int i = 0; i < 4; ++i) {
840 rrec[i] = std::string(row[i], lengths[i]);
841 }
842 }
843 ++num_rows;
844 row = mysql_fetch_row(res);
845 if (row != 0) {
846 ++num_rows;
847 }
848 }
849 }
850 return verify_read(k, num_rows, num_flds, rrec, rec);
851 }
852
853 void
show_mysql_error(const char * mess,record_value * rec)854 hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec)
855 {
856 ++stat.runtime_error_count;
857 if (arg.sh.verbose > 0) {
858 fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n",
859 arg.op, arg.id, mess, mysql_error(db));
860 }
861 if (rec) {
862 rec->unknown_state = true;
863 }
864 db.reset();
865 connected = false;
866 }
867
868 void
mysql_do(MYSQL * db,const char * query)869 mysql_do(MYSQL *db, const char *query)
870 {
871 if (mysql_real_query(db, query, strlen(query)) != 0) {
872 fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
873 fatal_abort("mysql_do");
874 }
875 }
876
877 void
hs_longrun_init_table(const config & conf,int num_prepare,hs_longrun_shared & shared)878 hs_longrun_init_table(const config& conf, int num_prepare,
879 hs_longrun_shared& shared)
880 {
881 const std::string mysql_host = conf.get_str("host", "localhost");
882 const std::string mysql_user = conf.get_str("mysqluser", "root");
883 const std::string mysql_passwd = conf.get_str("mysqlpass", "");
884 const std::string mysql_dbname = "";
885 auto_mysql db;
886 if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
887 mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
888 fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db));
889 fatal_abort("hs_longrun_init_table");
890 }
891 mysql_do(db, "drop database if exists hstestdb");
892 mysql_do(db, "create database hstestdb");
893 mysql_do(db, "use hstestdb");
894 mysql_do(db,
895 "create table hstesttbl ("
896 "k int primary key,"
897 "v1 varchar(32) not null,"
898 "v2 varchar(32) not null,"
899 "v3 varchar(32) not null"
900 ") character set utf8 collate utf8_bin engine = innodb");
901 for (int i = 0; i < num_prepare; ++i) {
902 const std::string i_str = to_stdstring(i);
903 const std::string v1 = "pv1_" + i_str;
904 const std::string v2 = "pv2_" + i_str;
905 const std::string v3 = "pv3_" + i_str;
906 char buf[1024];
907 snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values"
908 "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str());
909 mysql_do(db, buf);
910 record_value *rec = shared.records[i];
911 rec->key = i_str;
912 rec->values.resize(4);
913 rec->values[0] = i_str;
914 rec->values[1] = v1;
915 rec->values[2] = v2;
916 rec->values[3] = v3;
917 rec->deleted = false;
918 }
919 }
920
921 int
hs_longrun_main(int argc,char ** argv)922 hs_longrun_main(int argc, char **argv)
923 {
924 hs_longrun_shared shared;
925 parse_args(argc, argv, shared.conf);
926 shared.conf["host"] = shared.conf.get_str("host", "localhost");
927 shared.verbose = shared.conf.get_int("verbose", 1);
928 const int table_size = shared.conf.get_int("table_size", 10000);
929 for (int i = 0; i < table_size; ++i) {
930 std::auto_ptr<record_value> rec(new record_value());
931 rec->key = to_stdstring(i);
932 shared.records.push_back_ptr(rec);
933 }
934 mysql_library_init(0, 0, 0);
935 const int duration = shared.conf.get_int("duration", 10);
936 const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10);
937 const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10);
938 const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10);
939 const int num_hsread = shared.conf.get_int("num_hsread", 10);
940 const int num_myread = shared.conf.get_int("num_myread", 10);
941 const int num_mydelins = shared.conf.get_int("num_mydelins", 10);
942 int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10);
943 const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0);
944 if (!always_filled) {
945 num_hsreadnolock = 0;
946 }
947 hs_longrun_init_table(shared.conf, always_filled ? table_size : 0,
948 shared);
949 /* create worker threads */
950 static const struct thrtmpl_type {
951 const char *type; char op; int num; int hs; int lock;
952 } thrtmpl[] = {
953 { "hsinsert", 'I', num_hsinsert, 1, 1 },
954 { "hsdelete", 'D', num_hsdelete, 1, 1 },
955 { "hsupdate", 'U', num_hsupdate, 1, 1 },
956 { "hsread", 'R', num_hsread, 1, 1 },
957 { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 },
958 { "myread", 'R', num_myread, 0, 1 },
959 { "mydelins", 'T', num_mydelins, 0, 1 },
960 };
961 typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type;
962 thrs_type thrs;
963 for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) {
964 const thrtmpl_type& e = thrtmpl[i];
965 for (int j = 0; j < e.num; ++j) {
966 int id = thrs.size();
967 const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock,
968 shared);
969 std::auto_ptr<hs_longrun_thread_base> thr;
970 if (e.hs) {
971 thr.reset(new hs_longrun_thread_hs(arg));
972 } else {
973 thr.reset(new hs_longrun_thread_my(arg));
974 }
975 thrs.push_back_ptr(thr);
976 }
977 }
978 shared.num_threads = thrs.size();
979 /* start threads */
980 fprintf(stderr, "START\n");
981 shared.running = 1;
982 for (size_t i = 0; i < thrs.size(); ++i) {
983 thrs[i]->start();
984 }
985 /* wait */
986 sleep(duration);
987 /* stop thread */
988 shared.running = 0;
989 for (size_t i = 0; i < thrs.size(); ++i) {
990 thrs[i]->join();
991 }
992 fprintf(stderr, "DONE\n");
993 /* summary */
994 typedef std::map<std::string, hs_longrun_stat> stat_map;
995 stat_map sm;
996 for (size_t i = 0; i < thrs.size(); ++i) {
997 hs_longrun_thread_base *const thr = thrs[i];
998 const std::string wt = thr->arg.worker_type;
999 hs_longrun_stat& v = sm[wt];
1000 v.add(thr->stat);
1001 }
1002 hs_longrun_stat total;
1003 for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) {
1004 if (i->second.verify_error_count != 0) {
1005 fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(),
1006 i->second.verify_error_count);
1007 }
1008 if (i->second.runtime_error_count) {
1009 fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(),
1010 i->second.runtime_error_count);
1011 }
1012 if (i->second.unknown_count) {
1013 fprintf(stderr, "%s unknown %llu\n", i->first.c_str(),
1014 i->second.unknown_count);
1015 }
1016 fprintf(stderr, "%s success %llu\n", i->first.c_str(),
1017 i->second.success_count);
1018 total.add(i->second);
1019 }
1020 if (total.verify_error_count != 0) {
1021 fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count);
1022 }
1023 if (total.runtime_error_count != 0) {
1024 fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count);
1025 }
1026 if (total.unknown_count != 0) {
1027 fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count);
1028 }
1029 fprintf(stderr, "TOTAL success %llu\n", total.success_count);
1030 mysql_library_end();
1031 return 0;
1032 }
1033
1034 };
1035
1036 int
main(int argc,char ** argv)1037 main(int argc, char **argv)
1038 {
1039 return dena::hs_longrun_main(argc, argv);
1040 }
1041
1042