1 /*
2    Copyright (c) 2004, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <NDBT.hpp>
26 #include <NdbApi.hpp>
27 #include <NdbRestarter.hpp>
28 #include <HugoOperations.hpp>
29 #include <HugoTransactions.hpp>
30 #include <UtilTransactions.hpp>
31 #include <signaldata/DumpStateOrd.hpp>
32 
33 #include <getarg.h>
34 #include <InputStream.hpp>
35 
36 struct CASE
37 {
38   bool start_row;
39   bool end_row;
40   bool curr_row;
41   const char * op1;
42   const char * op2;
43   const char * op3;
44   int val;
45 };
46 
47 static CASE g_op_types[] =
48 {
49   { false, true,  false, "INS", 0,     0,     0 }, // 0x001 a
50   { true,  true,  false, "UPD", 0,     0,     0 }, // 0x002 d
51   { true,  false, false, "DEL", 0,     0,     0 }, // 0x004 g
52 
53   { false, true,  false, "INS", "UPD", 0,     0 }, // 0x008 b
54   { false, false, false, "INS", "DEL", 0,     0 }, // 0x010 c
55   { true,  true,  false, "UPD", "UPD", 0,     0 }, // 0x020 e
56   { true,  false, false, "UPD", "DEL", 0,     0 }, // 0x040 f
57   { true,  true,  false, "DEL", "INS", 0,     0 }, // 0x080 h
58 
59   { false, true,  false, "INS", "DEL", "INS", 0 }, // 0x100 i
60   { true,  false, false, "DEL", "INS", "DEL", 0 }  // 0x200 j
61 };
62 const size_t OP_COUNT = (sizeof(g_op_types)/sizeof(g_op_types[0]));
63 
64 static Ndb* g_ndb = 0;
65 static CASE* g_ops;
66 static Ndb_cluster_connection *g_cluster_connection= 0;
67 static HugoOperations* g_hugo_ops;
68 static int g_use_ops = 1 | 2 | 4;
69 static int g_cases = 0x1;
70 static int g_case_loop = 2;
71 static int g_rows = 10;
72 static int g_setup_tables = 1;
73 static int g_one_op_at_a_time = 0;
74 static const char * g_tablename = "T1";
75 static const NdbDictionary::Table* g_table = 0;
76 static NdbRestarter g_restarter;
77 
78 static int init_ndb(int argc, char** argv);
79 static int parse_args(int argc, char** argv);
80 static int connect_ndb();
81 static int drop_all_tables();
82 static int load_table();
83 static int pause_lcp(int error);
84 static int do_op(int row);
85 static int continue_lcp(int error = 0);
86 static int commit();
87 static int restart();
88 static int validate();
89 
90 int
main(int argc,char ** argv)91 main(int argc, char ** argv){
92   ndb_init();
93   require(!init_ndb(argc, argv));
94   if(parse_args(argc, argv))
95     return -1;
96   require(!connect_ndb());
97 
98   if(g_setup_tables){
99     require(!drop_all_tables());
100 
101     if(NDBT_Tables::createTable(g_ndb, g_tablename) != 0){
102       exit(-1);
103     }
104   }
105 
106   g_table = g_ndb->getDictionary()->getTable(g_tablename);
107   if(g_table == 0){
108     g_err << "Failed to retreive table: " << g_tablename << endl;
109     exit(-1);
110   }
111   require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
112   require(!g_hugo_ops->startTransaction(g_ndb));
113 
114   g_ops= new CASE[g_rows];
115 
116   const int use_ops = g_use_ops;
117   for(size_t i = 0; i<OP_COUNT; i++)
118   {
119     if(g_one_op_at_a_time){
120       while(i < OP_COUNT && (use_ops & (1 << i)) == 0) i++;
121       if(i == OP_COUNT)
122 	break;
123       ndbout_c("-- loop\noperation: %c use_ops: %x", int('a'+i), use_ops);
124       g_use_ops = (1 << i);
125     } else {
126       i = OP_COUNT - 1;
127     }
128 
129     size_t test_case = 0;
130     if((1 << test_case++) & g_cases)
131     {
132       for(size_t tl = 0; tl<(size_t)g_case_loop; tl++){
133 	g_info << "Performing all ops wo/ inteference of LCP" << endl;
134 
135 	g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
136 	g_info << "  where ZLCP_OP_WRITE_RT_BREAK is "
137 	  " finished before SAVE_PAGES" << endl;
138 	require(!load_table());
139 	require(!pause_lcp(5900));
140         for(int j = 0; j < g_rows; j++){
141 	  require(!do_op(j));
142 	}
143 	require(!continue_lcp(5900));
144 	require(!commit());
145 	require(!pause_lcp(5900));
146 	require(!restart());
147 	require(!validate());
148       }
149     }
150 
151     if((1 << test_case++) & g_cases)
152     {
153       for(int tl = 0; tl<g_case_loop; tl++){
154 	g_info << "Testing pre LCP operations, ZLCP_OP_WRITE_RT_BREAK" << endl;
155 	g_info << "  where ZLCP_OP_WRITE_RT_BREAK is finished after SAVE_PAGES"
156 	       << endl;
157 	require(!load_table());
158 	require(!pause_lcp(5901));
159         for(int j = 0; j < g_rows; j++){
160 	  require(!do_op(j));
161 	}
162 	require(!continue_lcp(5901));
163 	require(!commit());
164 	require(!pause_lcp(5900));
165 	require(!restart());
166 	require(!validate());
167       }
168     }
169 
170     if((1 << test_case++) & g_cases)
171     {
172       for(int tl = 0; tl<g_case_loop; tl++){
173 	g_info << "Testing pre LCP operations, undo-ed at commit" << endl;
174 	require(!load_table());
175 	require(!pause_lcp(5902));
176         for(int j = 0; j < g_rows; j++){
177 	  require(!do_op(j));
178 	}
179 	require(!continue_lcp(5902));
180 	require(!commit());
181 	require(!continue_lcp(5903));
182 	require(!pause_lcp(5900));
183 	require(!restart());
184 	require(!validate());
185       }
186     }
187 
188     if((1 << test_case++) & g_cases)
189     {
190       for(int tl = 0; tl < g_case_loop; tl++){
191 	g_info << "Testing prepared during LCP and committed after" << endl;
192 	require(!load_table());
193 	require(!pause_lcp(5904));    // Start LCP, but don't save pages
194         for(int j = 0; j < g_rows; j++){
195 	  require(!do_op(j));
196 	}
197 	require(!continue_lcp(5904)); // Start ACC save pages
198 	require(!pause_lcp(5900));    // Next LCP
199 	require(!commit());
200 	require(!restart());
201 	require(!validate());
202       }
203     }
204   }
205 }
206 
init_ndb(int argc,char ** argv)207 static int init_ndb(int argc, char** argv)
208 {
209   ndb_init();
210   return 0;
211 }
212 
parse_args(int argc,char ** argv)213 static int parse_args(int argc, char** argv)
214 {
215   size_t i;
216   char * ops= 0, *cases=0;
217   struct getargs args[] = {
218     { "records", 0, arg_integer, &g_rows, "Number of records", "records" },
219     { "operations", 'o', arg_string, &ops, "Operations [a-h]", 0 },
220     { "1", '1', arg_flag, &g_one_op_at_a_time, "One op at a time", 0 },
221     { "0", '0', arg_negative_flag, &g_one_op_at_a_time, "All ops at once", 0 },
222     { "cases", 'c', arg_string, &cases, "Cases [a-c]", 0 },
223     { 0, 't', arg_flag, &g_setup_tables, "Create table", 0 },
224     { 0, 'u', arg_negative_flag, &g_setup_tables, "Dont create table", 0 }
225   };
226 
227   int optind= 0;
228   const int num_args = sizeof(args)/sizeof(args[0]);
229   if(getarg(args, num_args, argc, (const char**)argv, &optind)) {
230     arg_printusage(args, num_args, argv[0], " tabname1\n");
231     ndbout_c("\n -- Operations [a-%c] = ", int('a'+OP_COUNT-1));
232     for(i = 0; i<OP_COUNT; i++){
233       ndbout_c("\t%c = %s %s",
234 	       int('a'+i), g_op_types[i].op1,
235 	       g_op_types[i].op2 ? g_op_types[i].op2 : "");
236     }
237     return -1;
238   }
239 
240   if(ops != 0){
241     g_use_ops = 0;
242     char * s = ops;
243     while(* s)
244       g_use_ops |= (1 << ((* s++) - 'a'));
245   }
246 
247   if(cases != 0){
248     g_cases = 0;
249     char * s = cases;
250     while(* s)
251       g_cases |= (1 << ((* s++) - 'a'));
252   }
253 
254   ndbout_c("table: %s", g_tablename);
255   printf("operations: ");
256   for(i = 0; i<OP_COUNT; i++)
257     if(g_use_ops & (1 << i))
258       printf("%c", int('a'+i));
259   printf("\n");
260 
261   printf("test cases: ");
262   for(i = 0; i<3; i++)
263     if(g_cases & (1 << i))
264       printf("%c", int('1'+i));
265   printf("\n");
266   printf("-------------\n");
267   return 0;
268 }
269 
connect_ndb()270 static int connect_ndb()
271 {
272   g_cluster_connection = new Ndb_cluster_connection();
273   if(g_cluster_connection->connect(12, 5, 1) != 0)
274   {
275     return 1;
276   }
277 
278   g_ndb = new Ndb(g_cluster_connection, "TEST_DB");
279   g_ndb->init(256);
280   if(g_ndb->waitUntilReady(30) == 0){
281     return 0;
282 //    int args[] = { DumpStateOrd::DihMaxTimeBetweenLCP };
283 //    return g_restarter.dumpStateAllNodes(args, 1);
284   }
285   return -1;
286 }
287 
disconnect_ndb()288 static int disconnect_ndb()
289 {
290   delete g_ndb;
291   delete g_cluster_connection;
292   g_ndb = 0;
293   g_table = 0;
294   g_cluster_connection= 0;
295   return 0;
296 }
297 
drop_all_tables()298 static int drop_all_tables()
299 {
300   NdbDictionary::Dictionary * dict = g_ndb->getDictionary();
301   require(dict);
302 
303   BaseString db = g_ndb->getDatabaseName();
304   BaseString schema = g_ndb->getSchemaName();
305 
306   NdbDictionary::Dictionary::List list;
307   if (dict->listObjects(list, NdbDictionary::Object::TypeUndefined) == -1){
308       g_err << "Failed to list tables: " << endl
309 	    << dict->getNdbError() << endl;
310       return -1;
311   }
312   for (unsigned i = 0; i < list.count; i++) {
313     NdbDictionary::Dictionary::List::Element& elt = list.elements[i];
314     switch (elt.type) {
315     case NdbDictionary::Object::SystemTable:
316     case NdbDictionary::Object::UserTable:
317       g_ndb->setDatabaseName(elt.database);
318       g_ndb->setSchemaName(elt.schema);
319       if(dict->dropTable(elt.name) != 0){
320 	g_err << "Failed to drop table: "
321 	      << elt.database << "/" << elt.schema << "/" << elt.name <<endl;
322 	g_err << dict->getNdbError() << endl;
323 	return -1;
324       }
325       break;
326     case NdbDictionary::Object::UniqueHashIndex:
327     case NdbDictionary::Object::OrderedIndex:
328     case NdbDictionary::Object::HashIndexTrigger:
329     case NdbDictionary::Object::IndexTrigger:
330     case NdbDictionary::Object::SubscriptionTrigger:
331     case NdbDictionary::Object::ReadOnlyConstraint:
332     default:
333       break;
334     }
335   }
336 
337   g_ndb->setDatabaseName(db.c_str());
338   g_ndb->setSchemaName(schema.c_str());
339 
340   return 0;
341 }
342 
load_table()343 static int load_table()
344 {
345   UtilTransactions clear(* g_table);
346   require(!clear.clearTable(g_ndb));
347 
348   HugoOperations ops(* g_table);
349   require(!ops.startTransaction(g_ndb));
350   size_t op = 0;
351   size_t rows = 0;
352   size_t uncommitted = 0;
353   //bool prepared = false;
354   for(int i = 0; i < g_rows; i++){
355     for(op %= OP_COUNT; !((1 << op) & g_use_ops); op = (op + 1) % OP_COUNT);
356     g_ops[i] = g_op_types[op++];
357     if(g_ops[i].start_row){
358       g_ops[i].curr_row = true;
359       g_ops[i].val = rand();
360       require(!ops.pkInsertRecord(g_ndb, i, 1, g_ops[i].val));
361       uncommitted++;
362     } else {
363       g_ops[i].curr_row = false;
364     }
365     if(uncommitted >= 100){
366       require(!ops.execute_Commit(g_ndb));
367       require(!ops.getTransaction()->restart());
368       rows += uncommitted;
369       uncommitted = 0;
370     }
371   }
372   if(uncommitted)
373     require(!ops.execute_Commit(g_ndb));
374 
375   require(!ops.closeTransaction(g_ndb));
376   rows += uncommitted;
377   g_info << "Inserted " << rows << " rows" << endl;
378   return 0;
379 }
380 
pause_lcp(int error)381 static int pause_lcp(int error)
382 {
383   int nodes = g_restarter.getNumDbNodes();
384 
385   int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
386 
387   NDB_SOCKET_TYPE my_fd;
388 #ifdef NDB_WIN
389   SOCKET fd= ndb_mgm_listen_event(g_restarter.handle, filter);
390   my_fd.s= fd;
391 #else
392   int fd = ndb_mgm_listen_event(g_restarter.handle, filter);
393   my_fd.fd= fd;
394 #endif
395 
396   require(my_socket_valid(my_fd));
397   require(!g_restarter.insertErrorInAllNodes(error));
398   int dump[] = { DumpStateOrd::DihStartLcpImmediately };
399   require(!g_restarter.dumpStateAllNodes(dump, 1));
400 
401   char *tmp;
402   char buf[1024];
403   SocketInputStream in(my_fd, 1000);
404   int count = 0;
405   do {
406     tmp = in.gets(buf, 1024);
407     if(tmp)
408     {
409       int id;
410       if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
411 	 --nodes == 0){
412 	my_socket_close(my_fd);
413 	return 0;
414       }
415     }
416   } while(count++ < 30);
417 
418   my_socket_close(my_fd);
419   return -1;
420 }
421 
do_op(int row)422 static int do_op(int row)
423 {
424   HugoOperations & ops = * g_hugo_ops;
425   if(strcmp(g_ops[row].op1, "INS") == 0){
426     require(!g_ops[row].curr_row);
427     g_ops[row].curr_row = true;
428     g_ops[row].val = rand();
429     require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
430   } else if(strcmp(g_ops[row].op1, "UPD") == 0){
431     require(g_ops[row].curr_row);
432     g_ops[row].val = rand();
433     require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
434   } else if(strcmp(g_ops[row].op1, "DEL") == 0){
435     require(g_ops[row].curr_row);
436     g_ops[row].curr_row = false;
437     require(!ops.pkDeleteRecord(g_ndb, row, 1));
438   }
439 
440   require(!ops.execute_NoCommit(g_ndb));
441 
442   if(g_ops[row].op2 == 0){
443   } else if(strcmp(g_ops[row].op2, "INS") == 0){
444     require(!g_ops[row].curr_row);
445     g_ops[row].curr_row = true;
446     g_ops[row].val = rand();
447     require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
448   } else if(strcmp(g_ops[row].op2, "UPD") == 0){
449     require(g_ops[row].curr_row);
450     g_ops[row].val = rand();
451     require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
452   } else if(strcmp(g_ops[row].op2, "DEL") == 0){
453     require(g_ops[row].curr_row);
454     g_ops[row].curr_row = false;
455     require(!ops.pkDeleteRecord(g_ndb, row, 1));
456   }
457 
458   if(g_ops[row].op2 != 0)
459     require(!ops.execute_NoCommit(g_ndb));
460 
461   if(g_ops[row].op3 == 0){
462   } else if(strcmp(g_ops[row].op3, "INS") == 0){
463     require(!g_ops[row].curr_row);
464     g_ops[row].curr_row = true;
465     g_ops[row].val = rand();
466     require(!ops.pkInsertRecord(g_ndb, row, 1, g_ops[row].val));
467   } else if(strcmp(g_ops[row].op3, "UPD") == 0){
468     require(g_ops[row].curr_row);
469     g_ops[row].val = rand();
470     require(!ops.pkUpdateRecord(g_ndb, row, 1, g_ops[row].val));
471   } else if(strcmp(g_ops[row].op3, "DEL") == 0){
472     require(g_ops[row].curr_row);
473     g_ops[row].curr_row = false;
474     require(!ops.pkDeleteRecord(g_ndb, row, 1));
475   }
476 
477   if(g_ops[row].op3 != 0)
478     require(!ops.execute_NoCommit(g_ndb));
479 
480   return 0;
481 }
482 
continue_lcp(int error)483 static int continue_lcp(int error)
484 {
485   int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_INFO, 0 };
486   NDB_SOCKET_TYPE my_fd;
487   my_socket_invalidate(&my_fd);
488 #ifdef NDB_WIN
489   SOCKET fd;
490 #else
491   int fd;
492 #endif
493 
494   if(error){
495     fd = ndb_mgm_listen_event(g_restarter.handle, filter);
496 #ifdef NDB_WIN
497     my_fd.s= fd;
498 #else
499     my_fd.fd= fd;
500 #endif
501     require(my_socket_valid(my_fd));
502   }
503 
504   int args[] = { DumpStateOrd::LCPContinue };
505   if(g_restarter.dumpStateAllNodes(args, 1) != 0)
506     return -1;
507 
508   if(error){
509     char *tmp;
510     char buf[1024];
511     SocketInputStream in(my_fd, 1000);
512     int count = 0;
513     int nodes = g_restarter.getNumDbNodes();
514     do {
515       tmp = in.gets(buf, 1024);
516       if(tmp)
517       {
518 	int id;
519 	if(sscanf(tmp, "%*[^:]: LCP: %d ", &id) == 1 && id == error &&
520 	   --nodes == 0){
521 	  my_socket_close(my_fd);
522 	  return 0;
523 	}
524       }
525     } while(count++ < 30);
526 
527     my_socket_close(my_fd);
528   }
529   return 0;
530 }
531 
commit()532 static int commit()
533 {
534   HugoOperations & ops = * g_hugo_ops;
535   int res = ops.execute_Commit(g_ndb);
536   if(res == 0){
537     return ops.getTransaction()->restart();
538   }
539   return res;
540 }
541 
restart()542 static int restart()
543 {
544   g_info << "Restarting cluster" << endl;
545   g_hugo_ops->closeTransaction(g_ndb);
546   disconnect_ndb();
547   delete g_hugo_ops;
548 
549   require(!g_restarter.restartAll());
550   require(!g_restarter.waitClusterStarted(30));
551   require(!connect_ndb());
552 
553   g_table = g_ndb->getDictionary()->getTable(g_tablename);
554   require(g_table);
555   require((g_hugo_ops = new HugoOperations(* g_table)) != 0);
556   require(!g_hugo_ops->startTransaction(g_ndb));
557   return 0;
558 }
559 
validate()560 static int validate()
561 {
562   HugoOperations ops(* g_table);
563   for(int i = 0; i < g_rows; i++){
564     require(g_ops[i].curr_row == g_ops[i].end_row);
565     require(!ops.startTransaction(g_ndb));
566     ops.pkReadRecord(g_ndb, i, 1);
567     int res = ops.execute_Commit(g_ndb);
568     if(g_ops[i].curr_row){
569       require(res == 0 && ops.verifyUpdatesValue(g_ops[i].val) == 0);
570     } else {
571       require(res == 626);
572     }
573     ops.closeTransaction(g_ndb);
574   }
575 
576   for(int j = 0; j<10; j++){
577     UtilTransactions clear(* g_table);
578     require(!clear.clearTable(g_ndb));
579 
580     HugoTransactions trans(* g_table);
581     require(trans.loadTable(g_ndb, 1024) == 0);
582   }
583   return 0;
584 }
585 
586