1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <NdbOut.hpp>
27 #include <NdbApi.hpp>
28 #include <NdbSleep.h>
29 #include <NDBT.hpp>
30 #include <HugoTransactions.hpp>
31 #include <getarg.h>
32 
33 
34 #define BATCH_SIZE 128
35 struct Table_info
36 {
37   Uint32 id;
38 };
39 
40 struct Trans_arg
41 {
42   Ndb *ndb;
43   NdbTransaction *trans;
44   Uint32 bytes_batched;
45 };
46 
47 Vector< Vector<NdbRecAttr*> > event_values;
48 Vector< Vector<NdbRecAttr*> > event_pre_values;
49 Vector<struct Table_info> table_infos;
50 
event_name(uint etype,char * buf)51 static char* event_name(uint etype, char * buf)
52 {
53   switch(etype){
54   case NdbDictionary::Event::TE_INSERT:
55     strcpy(buf, "TE_INSERT");
56     break;
57   case NdbDictionary::Event::TE_DELETE:
58     strcpy(buf, "TE_DELETE");
59     break;
60   case NdbDictionary::Event::TE_UPDATE:
61     strcpy(buf, "TE_UPDATE");
62     break;
63   case NdbDictionary::Event::TE_CLUSTER_FAILURE:
64     strcpy(buf, "TE_CLUSTER_FAILURE");
65     break;
66   case NdbDictionary::Event::TE_ALTER:
67     strcpy(buf, "TE_ALTER");
68     break;
69   case NdbDictionary::Event::TE_DROP:
70     strcpy(buf, "TE_DROP");
71     break;
72   case NdbDictionary::Event::TE_NODE_FAILURE:
73     strcpy(buf, "TE_NODE_FAILURE");
74     break;
75   case NdbDictionary::Event::TE_SUBSCRIBE:
76     strcpy(buf, "TE_SUBSCRIBE");
77     break;
78   case NdbDictionary::Event::TE_UNSUBSCRIBE:
79     strcpy(buf, "TE_UNSUBSCRIBE");
80     break;
81   default:
82     strcpy(buf, "unknown");
83   }
84   return buf;
85 }
86 
do_begin(Ndb * ndb,struct Trans_arg & trans_arg)87 static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
88 {
89   trans_arg.ndb =  ndb;
90   trans_arg.trans = ndb->startTransaction();
91   trans_arg.bytes_batched = 0;
92 }
93 
do_equal(NdbOperation * op,NdbEventOperation * pOp)94 static void do_equal(NdbOperation *op,
95                      NdbEventOperation *pOp)
96 {
97   struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
98   Vector<NdbRecAttr*> &ev = event_values[ti->id];
99   const NdbDictionary::Table *tab= pOp->getTable();
100   unsigned i, n_columns = tab->getNoOfColumns();
101   for (i= 0; i < n_columns; i++)
102   {
103     if (tab->getColumn(i)->getPrimaryKey() &&
104         op->equal(i, ev[i]->aRef()))
105     {
106       abort();
107     }
108   }
109 }
110 
do_set_value(NdbOperation * op,NdbEventOperation * pOp)111 static void do_set_value(NdbOperation *op,
112                          NdbEventOperation *pOp)
113 {
114   struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
115   Vector<NdbRecAttr*> &ev = event_values[ti->id];
116   const NdbDictionary::Table *tab= pOp->getTable();
117   unsigned i, n_columns = tab->getNoOfColumns();
118   for (i= 0; i < n_columns; i++)
119   {
120     if (!tab->getColumn(i)->getPrimaryKey() &&
121         op->setValue(i, ev[i]->aRef()))
122     {
123       abort();
124     }
125   }
126 }
127 
do_insert(struct Trans_arg & trans_arg,NdbEventOperation * pOp)128 static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
129 {
130   if (!trans_arg.trans)
131     return;
132 
133   NdbOperation *op =
134     trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
135   op->writeTuple();
136 
137   do_equal(op, pOp);
138   do_set_value(op, pOp);
139 
140   trans_arg.bytes_batched++;
141   if (trans_arg.bytes_batched > BATCH_SIZE)
142   {
143     trans_arg.trans->execute(NdbTransaction::NoCommit);
144     trans_arg.bytes_batched = 0;
145   }
146 }
do_update(struct Trans_arg & trans_arg,NdbEventOperation * pOp)147 static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
148 {
149   if (!trans_arg.trans)
150     return;
151 
152   NdbOperation *op =
153     trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
154   op->writeTuple();
155 
156   do_equal(op, pOp);
157   do_set_value(op, pOp);
158 
159   trans_arg.bytes_batched++;
160   if (trans_arg.bytes_batched > BATCH_SIZE)
161   {
162     trans_arg.trans->execute(NdbTransaction::NoCommit);
163     trans_arg.bytes_batched = 0;
164   }
165 }
do_delete(struct Trans_arg & trans_arg,NdbEventOperation * pOp)166 static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
167 {
168   if (!trans_arg.trans)
169     return;
170 
171   NdbOperation *op =
172     trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
173   op->deleteTuple();
174 
175   do_equal(op, pOp);
176 
177   trans_arg.bytes_batched++;
178   if (trans_arg.bytes_batched > BATCH_SIZE)
179   {
180     trans_arg.trans->execute(NdbTransaction::NoCommit);
181     trans_arg.bytes_batched = 0;
182   }
183 }
do_commit(struct Trans_arg & trans_arg)184 static void do_commit(struct Trans_arg &trans_arg)
185 {
186   if (!trans_arg.trans)
187     return;
188   trans_arg.trans->execute(NdbTransaction::Commit);
189   trans_arg.ndb->closeTransaction(trans_arg.trans);
190 }
191 
192 int
main(int argc,const char ** argv)193 main(int argc, const char** argv){
194   ndb_init();
195 
196 
197   int _help = 0;
198   const char* db = 0;
199   const char* connectstring1 = 0;
200   const char* connectstring2 = 0;
201 
202   struct getargs args[] = {
203     { "connectstring1", 'c',
204       arg_string, &connectstring1, "connectstring1", "" },
205     { "connectstring2", 'C',
206       arg_string, &connectstring2, "connectstring2", "" },
207     { "database", 'd', arg_string, &db, "Database", "" },
208     { "usage", '?', arg_flag, &_help, "Print help", "" }
209   };
210   int num_args = sizeof(args) / sizeof(args[0]);
211   int optind = 0, i;
212   char desc[] =
213     "<tabname>+ \nThis program listen to events on specified tables\n";
214 
215   if(getarg(args, num_args, argc, argv, &optind) ||
216      argv[optind] == NULL || _help) {
217     arg_printusage(args, num_args, argv[0], desc);
218     return NDBT_ProgramExit(NDBT_WRONGARGS);
219   }
220 
221   // Connect to Ndb
222   Ndb_cluster_connection con(connectstring1);
223   if(con.connect(12, 5, 1) != 0)
224   {
225     return NDBT_ProgramExit(NDBT_FAILED);
226   }
227   Ndb MyNdb( &con, db ? db : "TEST_DB" );
228 
229   if(MyNdb.init() != 0){
230     NDB_ERR(MyNdb.getNdbError());
231     return NDBT_ProgramExit(NDBT_FAILED);
232   }
233 
234   // Connect to Ndb and wait for it to become ready
235   while(MyNdb.waitUntilReady() != 0)
236     ndbout << "Waiting for ndb to become ready..." << endl;
237 
238   Ndb_cluster_connection *con2 = NULL;
239   Ndb *ndb2 =  NULL;
240   if (connectstring2)
241   {
242     con2 = new Ndb_cluster_connection(connectstring2);
243 
244     if(con2->connect(12, 5, 1) != 0)
245     {
246       return NDBT_ProgramExit(NDBT_FAILED);
247     }
248     ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
249 
250     if(ndb2->init() != 0){
251       NDB_ERR(ndb2->getNdbError());
252       return NDBT_ProgramExit(NDBT_FAILED);
253     }
254 
255     // Connect to Ndb and wait for it to become ready
256     while(ndb2->waitUntilReady() != 0)
257       ndbout << "Waiting for ndb to become ready..." << endl;
258   }
259 
260   int result = 0;
261 
262   NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
263   Vector<NdbDictionary::Event*> events;
264   Vector<NdbEventOperation*> event_ops;
265   int sz = 0;
266   for(i= optind; i<argc; i++)
267   {
268     const NdbDictionary::Table* table= myDict->getTable(argv[i]);
269     if(!table)
270     {
271       ndbout_c("Could not find table: %s, skipping", argv[i]);
272       continue;
273     }
274 
275     BaseString name;
276     name.appfmt("EV-%s", argv[i]);
277     NdbDictionary::Event *myEvent= new NdbDictionary::Event(name.c_str());
278     myEvent->setTable(table->getName());
279     myEvent->addTableEvent(NdbDictionary::Event::TE_ALL);
280     for(int a = 0; a < table->getNoOfColumns(); a++){
281       myEvent->addEventColumn(a);
282     }
283     myEvent->setReport((NdbDictionary::Event::EventReport)
284                        (NdbDictionary::Event::ER_UPDATED |
285                         NdbDictionary::Event::ER_DDL));
286 
287     if (myDict->createEvent(* myEvent))
288     {
289       if(myDict->getNdbError().classification == NdbError::SchemaObjectExists)
290       {
291 	g_info << "Event creation failed event exists. Removing...\n";
292 	if (myDict->dropEvent(name.c_str()))
293 	{
294 	  g_err << "Failed to drop event: " << myDict->getNdbError() << endl;
295 	  result = 1;
296 	  goto end;
297 	}
298 	// try again
299 	if (myDict->createEvent(* myEvent))
300 	{
301 	  g_err << "Failed to create event: " << myDict->getNdbError() << endl;
302 	  result = 1;
303 	  goto end;
304 	}
305       }
306       else
307       {
308 	g_err << "Failed to create event: " << myDict->getNdbError() << endl;
309 	result = 1;
310 	goto end;
311       }
312     }
313 
314     events.push_back(myEvent);
315 
316     NdbEventOperation* pOp = MyNdb.createEventOperation(name.c_str());
317     if ( pOp == NULL ) {
318       g_err << "Event operation creation failed" << endl;
319       result = 1;
320       goto end;
321     }
322 
323     event_values.push_back(Vector<NdbRecAttr *>());
324     event_pre_values.push_back(Vector<NdbRecAttr *>());
325     for (int a = 0; a < table->getNoOfColumns(); a++)
326     {
327       event_values[sz].
328         push_back(pOp->getValue(table->getColumn(a)->getName()));
329       event_pre_values[sz].
330         push_back(pOp->getPreValue(table->getColumn(a)->getName()));
331     }
332     event_ops.push_back(pOp);
333     {
334       struct Table_info ti;
335       ti.id = sz;
336       table_infos.push_back(ti);
337     }
338     pOp->setCustomData((void *)&table_infos[sz]);
339     sz++;
340   }
341 
342   for(i= 0; i<(int)event_ops.size(); i++)
343   {
344     if (event_ops[i]->execute())
345     {
346       g_err << "operation execution failed: " << event_ops[i]->getNdbError()
347 	    << endl;
348       result = 1;
349       goto end;
350     }
351   }
352 
353   struct Trans_arg trans_arg;
354   char buf[64];
355 
356   while(true)
357   {
358     while(MyNdb.pollEvents(100) == 0);
359 
360     NdbEventOperation* pOp= MyNdb.nextEvent();
361     while(pOp)
362     {
363       Uint64 gci= pOp->getGCI();
364       Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
365       if (ndb2)
366         do_begin(ndb2, trans_arg);
367       do
368       {
369 	switch(pOp->getEventType())
370 	{
371 	case NdbDictionary::Event::TE_INSERT:
372 	  cnt_i++;
373           if (ndb2)
374             do_insert(trans_arg, pOp);
375 	  break;
376 	case NdbDictionary::Event::TE_DELETE:
377 	  cnt_d++;
378           if (ndb2)
379             do_delete(trans_arg, pOp);
380 	  break;
381 	case NdbDictionary::Event::TE_UPDATE:
382 	  cnt_u++;
383           if (ndb2)
384             do_update(trans_arg, pOp);
385 	  break;
386 	case NdbDictionary::Event::TE_CLUSTER_FAILURE:
387           ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
388 	  break;
389 	case NdbDictionary::Event::TE_ALTER:
390           ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
391 	  break;
392 	case NdbDictionary::Event::TE_DROP:
393           ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
394 	  break;
395 	case NdbDictionary::Event::TE_NODE_FAILURE:
396           ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
397 	  break;
398 	case NdbDictionary::Event::TE_SUBSCRIBE:
399 	case NdbDictionary::Event::TE_UNSUBSCRIBE:
400           ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
401 	  break;
402 	default:
403 	  /* We should REALLY never get here. */
404 	  ndbout_c("Error: unknown event type: %u",
405 		   (Uint32)pOp->getEventType());
406 	  abort();
407 	}
408       } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
409       if (ndb2)
410         do_commit(trans_arg);
411       ndbout_c("GCI: %u/%u events: %lld(I) %lld(U) %lld(D)",
412                Uint32(gci >> 32), Uint32(gci), cnt_i, cnt_u, cnt_d);
413     }
414   }
415 end:
416   for(i= 0; i<(int)event_ops.size(); i++)
417     MyNdb.dropEventOperation(event_ops[i]);
418 
419   if (ndb2)
420     delete ndb2;
421   if (con2)
422     delete con2;
423   return NDBT_ProgramExit(NDBT_OK);
424 }
425 
426 template class Vector<struct Table_info>;
427 template class Vector<NdbRecAttr*>;
428 template class Vector< Vector<NdbRecAttr*> >;
429 template class Vector<NdbDictionary::Event*>;
430 template class Vector<NdbEventOperation*>;
431