1 /*
2 Copyright (c) 2004, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <NDBT.hpp>
26 #include <NdbApi.hpp>
27 #include <NdbRestarter.hpp>
28 #include <HugoOperations.hpp>
29 #include <HugoTransactions.hpp>
30 #include <UtilTransactions.hpp>
31 #include <signaldata/DumpStateOrd.hpp>
32
33 #include <getarg.h>
34 #include <InputStream.hpp>
35
36 struct CASE
37 {
38 bool start_row;
39 bool end_row;
40 bool curr_row;
41 const char * op1;
42 const char * op2;
43 const char * op3;
44 int val;
45 };
46
47 static CASE g_op_types[] =
48 {
49 { false, true, false, "INS", 0, 0, 0 }, // 0x001 a
50 { true, true, false, "UPD", 0, 0, 0 }, // 0x002 d
51 { true, false, false, "DEL", 0, 0, 0 }, // 0x004 g
52
53 { false, true, false, "INS", "UPD", 0, 0 }, // 0x008 b
54 { false, false, false, "INS", "DEL", 0, 0 }, // 0x010 c
55 { true, true, false, "UPD", "UPD", 0, 0 }, // 0x020 e
56 { true, false, false, "UPD", "DEL", 0, 0 }, // 0x040 f
57 { true, true, false, "DEL", "INS", 0, 0 }, // 0x080 h
58
59 { false, true, false, "INS", "DEL", "INS", 0 }, // 0x100 i
60 { true, false, false, "DEL", "INS", "DEL", 0 } // 0x200 j
61 };
62 const size_t OP_COUNT = (sizeof(g_op_types)/sizeof(g_op_types[0]));
63
64 static Ndb* g_ndb = 0;
65 static CASE* g_ops;
66 static Ndb_cluster_connection *g_cluster_connection= 0;
67 static HugoOperations* g_hugo_ops;
68 static int g_use_ops = 1 | 2 | 4;
69 static int g_cases = 0x1;
70 static int g_case_loop = 2;
71 static int g_rows = 10;
72 static int g_setup_tables = 1;
73 static int g_one_op_at_a_time = 0;
74 static const char * g_tablename = "T1";
75 static const NdbDictionary::Table* g_table = 0;
76 static NdbRestarter g_restarter;
77
78 static int init_ndb(int argc, char** argv);
79 static int parse_args(int argc, char** argv);
80 static int connect_ndb();
81 static int drop_all_tables();
82 static int load_table();
83 static int pause_lcp(int error);
84 static int do_op(int row);
85 static int continue_lcp(int error = 0);
86 static int commit();
87 static int restart();
88 static int validate();
89
90 int
main(int argc,char ** argv)91 main(int argc, char ** argv){
92 ndb_init();
93 require(!init_ndb(argc, argv));
94 if(parse_args(argc, argv))
95 return -1;
96 require(!connect_ndb());
97
98 if(g_setup_tables){
99 require(!drop_all_tables());
100
101 if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){
102 exit(-1);
103 }
104 }
105
106 g_table = g_ndb->getDictionary()->getTable(g_tablename);
107 if(g_table == 0){
108 g_err << "Failed to retreive table: " << g_tablename << endl;
109 exit(-1);
110 }
111 require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
112 require(!g_hugo_ops->startTransaction(g_ndb));
113
114 g_ops= new CASE[g_rows];
115
116 const int use_ops = g_use_ops;
117 for(size_t i = 0; i<OP_COUNT; i++)
118 {
119 if(g_one_op_at_a_time){
120 while(i < OP_COUNT && (use_ops & (1 << i)) == 0) i++;
121 if(i == OP_COUNT)
122 break;
123 ndbout_c("-- loop\noperation: %c use_ops: %x", int('a'+i), use_ops);
124 g_use_ops = (1 << i);
125 } else {
126 i = OP_COUNT - 1;
127 }
128
129 size_t test_case = 0;
130 if((1 << test_case++) & g_cases)
131 {
132 for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
133 g_info << "Performing all ops wo/ inteference of LCP" << endl;
134
135 g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
136 g_info << " where ZLCP_OP_WRITE_RT_BREAK is "
137 " finished before SAVE_PAGES" << endl;
138 require(!load_table());
139 require(!pause_lcp(5900));
140 for(int j = 0; j < g_rows; j++){
141 require(!do_op(j));
142 }
143 require(!continue_lcp(5900));
144 require(!commit());
145 require(!pause_lcp(5900));
146 require(!restart());
147 require(!validate());
148 }
149 }
150
151 if((1 << test_case++) & g_cases)
152 {
153 for(int tl = 0; tl<g_case_loop; tl++){
154 g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
155 g_info << " where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
156 << endl;
157 require(!load_table());
158 require(!pause_lcp(5901));
159 for(int j = 0; j < g_rows; j++){
160 require(!do_op(j));
161 }
162 require(!continue_lcp(5901));
163 require(!commit());
164 require(!pause_lcp(5900));
165 require(!restart());
166 require(!validate());
167 }
168 }
169
170 if((1 << test_case++) & g_cases)
171 {
172 for(int tl = 0; tl<g_case_loop; tl++){
173 g_info << "Testing pre LCP operations, undo-ed at commit" << endl;
174 require(!load_table());
175 require(!pause_lcp(5902));
176 for(int j = 0; j < g_rows; j++){
177 require(!do_op(j));
178 }
179 require(!continue_lcp(5902));
180 require(!commit());
181 require(!continue_lcp(5903));
182 require(!pause_lcp(5900));
183 require(!restart());
184 require(!validate());
185 }
186 }
187
188 if((1 << test_case++) & g_cases)
189 {
190 for(int tl = 0; tl < g_case_loop; tl++){
191 g_info << "Testing prepared during LCP and committed after" << endl;
192 require(!load_table());
193 require(!pause_lcp(5904)); // Start LCP, but don't save pages
194 for(int j = 0; j < g_rows; j++){
195 require(!do_op(j));
196 }
197 require(!continue_lcp(5904)); // Start ACC save pages
198 require(!pause_lcp(5900)); // Next LCP
199 require(!commit());
200 require(!restart());
201 require(!validate());
202 }
203 }
204 }
205 }
206
init_ndb(int argc,char ** argv)207 static int init_ndb(int argc, char** argv)
208 {
209 ndb_init();
210 return 0;
211 }
212
parse_args(int argc,char ** argv)213 static int parse_args(int argc, char** argv)
214 {
215 size_t i;
216 char * ops= 0, *cases=0;
217 struct getargs args[] = {
218 { "records", 0, arg_integer, &g_rows, "Number of records", "records" },
219 { "operations", 'o', arg_string, &ops, "Operations [a-h]", 0 },
220 { "1", '1', arg_flag, &g_one_op_at_a_time, "One op at a time", 0 },
221 { "0", '0', arg_negative_flag, &g_one_op_at_a_time, "All ops at once", 0 },
222 { "cases", 'c', arg_string, &cases, "Cases [a-c]", 0 },
223 { 0, 't', arg_flag, &g_setup_tables, "Create table", 0 },
224 { 0, 'u', arg_negative_flag, &g_setup_tables, "Dont create table", 0 }
225 };
226
227 int optind= 0;
228 const int num_args = sizeof(args)/sizeof(args[0]);
229 if(getarg(args, num_args, argc, (const char**)argv, &optind)) {
230 arg_printusage(args, num_args, argv[0], " tabname1\n");
231 ndbout_c("\n -- Operations [a-%c] = ", int('a'+OP_COUNT-1));
232 for(i = 0; i<OP_COUNT; i++){
233 ndbout_c("\t%c = %s %s",
234 int('a'+i), g_op_types[i].op1,
235 g_op_types[i].op2 ? g_op_types[i].op2 : "");
236 }
237 return -1;
238 }
239
240 if(ops != 0){
241 g_use_ops = 0;
242 char * s = ops;
243 while(* s)
244 g_use_ops |= (1 << ((* s++) - 'a'));
245 }
246
247 if(cases != 0){
248 g_cases = 0;
249 char * s = cases;
250 while(* s)
251 g_cases |= (1 << ((* s++) - 'a'));
252 }
253
254 ndbout_c("table: %s", g_tablename);
255 printf("operations: ");
256 for(i = 0; i<OP_COUNT; i++)
257 if(g_use_ops & (1 << i))
258 printf("%c", int('a'+i));
259 printf("\n");
260
261 printf("test cases: ");
262 for(i = 0; i<3; i++)
263 if(g_cases & (1 << i))
264 printf("%c", int('1'+i));
265 printf("\n");
266 printf("-------------\n");
267 return 0;
268 }
269
connect_ndb()270 static int connect_ndb()
271 {
272 g_cluster_connection = new Ndb_cluster_connection();
273 if(g_cluster_connection->connect(12, 5, 1) != 0)
274 {
275 return 1;
276 }
277
278 g_ndb = new Ndb(g_cluster_connection, "TEST_DB");
279 g_ndb->init(256);
280 if(g_ndb->waitUntilReady(30) == 0){
281 return 0;
282 // int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
283 // return g_restarter.dumpStateAllNodes(args, 1);
284 }
285 return -1;
286 }
287
disconnect_ndb()288 static int disconnect_ndb()
289 {
290 delete g_ndb;
291 delete g_cluster_connection;
292 g_ndb = 0;
293 g_table = 0;
294 g_cluster_connection= 0;
295 return 0;
296 }
297
drop_all_tables()298 static int drop_all_tables()
299 {
300 NdbDictionary::Dictionary * dict = g_ndb->getDictionary();
301 require(dict);
302
303 BaseString db = g_ndb->getDatabaseName();
304 BaseString schema = g_ndb->getSchemaName();
305
306 NdbDictionary::Dictionary::List list;
307 if (dict->listObjects(list, NdbDictionary::Object::TypeUndefined) == -1){
308 g_err << "Failed to list tables: " << endl
309 << dict->getNdbError() << endl;
310 return -1;
311 }
312 for (unsigned i = 0; i < list.count; i++) {
313 NdbDictionary::Dictionary::List::Element& elt = list.elements[i];
314 switch (elt.type) {
315 case NdbDictionary::Object::SystemTable:
316 case NdbDictionary::Object::UserTable:
317 g_ndb->setDatabaseName(elt.database);
318 g_ndb->setSchemaName(elt.schema);
319 if(dict->dropTable(elt.name) != 0){
320 g_err << "Failed to drop table: "
321 << elt.database << "/" << elt.schema << "/" << elt.name <<endl;
322 g_err << dict->getNdbError() << endl;
323 return -1;
324 }
325 break;
326 case NdbDictionary::Object::UniqueHashIndex:
327 case NdbDictionary::Object::OrderedIndex:
328 case NdbDictionary::Object::HashIndexTrigger:
329 case NdbDictionary::Object::IndexTrigger:
330 case NdbDictionary::Object::SubscriptionTrigger:
331 case NdbDictionary::Object::ReadOnlyConstraint:
332 default:
333 break;
334 }
335 }
336
337 g_ndb->setDatabaseName(db.c_str());
338 g_ndb->setSchemaName(schema.c_str());
339
340 return 0;
341 }
342
load_table()343 static int load_table()
344 {
345 UtilTransactions clear(* g_table);
346 require(!clear.clearTable(g_ndb));
347
348 HugoOperations ops(* g_table);
349 require(!ops.startTransaction(g_ndb));
350 size_t op = 0;
351 size_t rows = 0;
352 size_t uncommitted = 0;
353 //bool prepared = false;
354 for(int i = 0; i < g_rows; i++){
355 for(op %= OP_COUNT; !((1 << op) & g_use_ops); op = (op + 1) % OP_COUNT);
356 g_ops[i] = g_op_types[op++];
357 if(g_ops[i].start_row){
358 g_ops[i].curr_row = true;
359 g_ops[i].val = rand();
360 require(!ops.pkInsertRecord(g_ndb, i, 1, g_ops[i].val));
361 uncommitted++;
362 } else {
363 g_ops[i].curr_row = false;
364 }
365 if(uncommitted >= 100){
366 require(!ops.execute_Commit(g_ndb));
367 require(!ops.getTransaction()->restart());
368 rows += uncommitted;
369 uncommitted = 0;
370 }
371 }
372 if(uncommitted)
373 require(!ops.execute_Commit(g_ndb));
374
375 require(!ops.closeTransaction(g_ndb));
376 rows += uncommitted;
377 g_info << "Inserted " << rows << " rows" << endl;
378 return 0;
379 }
380
pause_lcp(int error)381 static int pause_lcp(int error)
382 {
383 int nodes = g_restarter.getNumDbNodes();
384
385 int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
386
387 NDB_SOCKET_TYPE my_fd;
388 #ifdef NDB_WIN
389 SOCKET fd= ndb_mgm_listen_event(g_restarter.handle, filter);
390 my_fd.s= fd;
391 #else
392 int fd = ndb_mgm_listen_event(g_restarter.handle, filter);
393 my_fd.fd= fd;
394 #endif
395
396 require(my_socket_valid(my_fd));
397 require(!g_restarter.insertErrorInAllNodes(error));
398 int dump[] = { DumpStateOrd::DihStartLcpImmediately };
399 require(!g_restarter.dumpStateAllNodes(dump, 1));
400
401 char *tmp;
402 char buf[1024];
403 SocketInputStream in(my_fd, 1000);
404 int count = 0;
405 do {
406 tmp = in.gets(buf, 1024);
407 if(tmp)
408 {
409 int id;
410 if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
411 --nodes == 0){
412 my_socket_close(my_fd);
413 return 0;
414 }
415 }
416 } while(count++ < 30);
417
418 my_socket_close(my_fd);
419 return -1;
420 }
421
do_op(int row)422 static int do_op(int row)
423 {
424 HugoOperations & ops = * g_hugo_ops;
425 if(strcmp(g_ops[row].op1, "INS") == 0){
426 require(!g_ops[row].curr_row);
427 g_ops[row].curr_row = true;
428 g_ops[row].val = rand();
429 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
430 } else if(strcmp(g_ops[row].op1, "UPD") == 0){
431 require(g_ops[row].curr_row);
432 g_ops[row].val = rand();
433 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
434 } else if(strcmp(g_ops[row].op1, "DEL") == 0){
435 require(g_ops[row].curr_row);
436 g_ops[row].curr_row = false;
437 require(!ops.pkDeleteRecord(g_ndb, row, 1));
438 }
439
440 require(!ops.execute_NoCommit(g_ndb));
441
442 if(g_ops[row].op2 == 0){
443 } else if(strcmp(g_ops[row].op2, "INS") == 0){
444 require(!g_ops[row].curr_row);
445 g_ops[row].curr_row = true;
446 g_ops[row].val = rand();
447 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
448 } else if(strcmp(g_ops[row].op2, "UPD") == 0){
449 require(g_ops[row].curr_row);
450 g_ops[row].val = rand();
451 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
452 } else if(strcmp(g_ops[row].op2, "DEL") == 0){
453 require(g_ops[row].curr_row);
454 g_ops[row].curr_row = false;
455 require(!ops.pkDeleteRecord(g_ndb, row, 1));
456 }
457
458 if(g_ops[row].op2 != 0)
459 require(!ops.execute_NoCommit(g_ndb));
460
461 if(g_ops[row].op3 == 0){
462 } else if(strcmp(g_ops[row].op3, "INS") == 0){
463 require(!g_ops[row].curr_row);
464 g_ops[row].curr_row = true;
465 g_ops[row].val = rand();
466 require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
467 } else if(strcmp(g_ops[row].op3, "UPD") == 0){
468 require(g_ops[row].curr_row);
469 g_ops[row].val = rand();
470 require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
471 } else if(strcmp(g_ops[row].op3, "DEL") == 0){
472 require(g_ops[row].curr_row);
473 g_ops[row].curr_row = false;
474 require(!ops.pkDeleteRecord(g_ndb, row, 1));
475 }
476
477 if(g_ops[row].op3 != 0)
478 require(!ops.execute_NoCommit(g_ndb));
479
480 return 0;
481 }
482
continue_lcp(int error)483 static int continue_lcp(int error)
484 {
485 int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
486 NDB_SOCKET_TYPE my_fd;
487 my_socket_invalidate(&my_fd);
488 #ifdef NDB_WIN
489 SOCKET fd;
490 #else
491 int fd;
492 #endif
493
494 if(error){
495 fd = ndb_mgm_listen_event(g_restarter.handle, filter);
496 #ifdef NDB_WIN
497 my_fd.s= fd;
498 #else
499 my_fd.fd= fd;
500 #endif
501 require(my_socket_valid(my_fd));
502 }
503
504 int args[] = { DumpStateOrd::LCPContinue };
505 if(g_restarter.dumpStateAllNodes(args, 1) != 0)
506 return -1;
507
508 if(error){
509 char *tmp;
510 char buf[1024];
511 SocketInputStream in(my_fd, 1000);
512 int count = 0;
513 int nodes = g_restarter.getNumDbNodes();
514 do {
515 tmp = in.gets(buf, 1024);
516 if(tmp)
517 {
518 int id;
519 if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
520 --nodes == 0){
521 my_socket_close(my_fd);
522 return 0;
523 }
524 }
525 } while(count++ < 30);
526
527 my_socket_close(my_fd);
528 }
529 return 0;
530 }
531
commit()532 static int commit()
533 {
534 HugoOperations & ops = * g_hugo_ops;
535 int res = ops.execute_Commit(g_ndb);
536 if(res == 0){
537 return ops.getTransaction()->restart();
538 }
539 return res;
540 }
541
restart()542 static int restart()
543 {
544 g_info << "Restarting cluster" << endl;
545 g_hugo_ops->closeTransaction(g_ndb);
546 disconnect_ndb();
547 delete g_hugo_ops;
548
549 require(!g_restarter.restartAll());
550 require(!g_restarter.waitClusterStarted(30));
551 require(!connect_ndb());
552
553 g_table = g_ndb->getDictionary()->getTable(g_tablename);
554 require(g_table);
555 require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
556 require(!g_hugo_ops->startTransaction(g_ndb));
557 return 0;
558 }
559
validate()560 static int validate()
561 {
562 HugoOperations ops(* g_table);
563 for(int i = 0; i < g_rows; i++){
564 require(g_ops[i].curr_row == g_ops[i].end_row);
565 require(!ops.startTransaction(g_ndb));
566 ops.pkReadRecord(g_ndb, i, 1);
567 int res = ops.execute_Commit(g_ndb);
568 if(g_ops[i].curr_row){
569 require(res == 0 && ops.verifyUpdatesValue(g_ops[i].val) == 0);
570 } else {
571 require(res == 626);
572 }
573 ops.closeTransaction(g_ndb);
574 }
575
576 for(int j = 0; j<10; j++){
577 UtilTransactions clear(* g_table);
578 require(!clear.clearTable(g_ndb));
579
580 HugoTransactions trans(* g_table);
581 require(trans.loadTable(g_ndb, 1024) == 0);
582 }
583 return 0;
584 }
585
586