1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <NdbOut.hpp>
27 #include <NdbApi.hpp>
28 #include <NdbSleep.h>
29 #include <NDBT.hpp>
30 #include <HugoTransactions.hpp>
31 #include <getarg.h>
32
33
34 #define BATCH_SIZE 128
35 struct Table_info
36 {
37 Uint32 id;
38 };
39
40 struct Trans_arg
41 {
42 Ndb *ndb;
43 NdbTransaction *trans;
44 Uint32 bytes_batched;
45 };
46
47 Vector< Vector<NdbRecAttr*> > event_values;
48 Vector< Vector<NdbRecAttr*> > event_pre_values;
49 Vector<struct Table_info> table_infos;
50
event_name(uint etype,char * buf)51 static char* event_name(uint etype, char * buf)
52 {
53 switch(etype){
54 case NdbDictionary::Event::TE_INSERT:
55 strcpy(buf, "TE_INSERT");
56 break;
57 case NdbDictionary::Event::TE_DELETE:
58 strcpy(buf, "TE_DELETE");
59 break;
60 case NdbDictionary::Event::TE_UPDATE:
61 strcpy(buf, "TE_UPDATE");
62 break;
63 case NdbDictionary::Event::TE_CLUSTER_FAILURE:
64 strcpy(buf, "TE_CLUSTER_FAILURE");
65 break;
66 case NdbDictionary::Event::TE_ALTER:
67 strcpy(buf, "TE_ALTER");
68 break;
69 case NdbDictionary::Event::TE_DROP:
70 strcpy(buf, "TE_DROP");
71 break;
72 case NdbDictionary::Event::TE_NODE_FAILURE:
73 strcpy(buf, "TE_NODE_FAILURE");
74 break;
75 case NdbDictionary::Event::TE_SUBSCRIBE:
76 strcpy(buf, "TE_SUBSCRIBE");
77 break;
78 case NdbDictionary::Event::TE_UNSUBSCRIBE:
79 strcpy(buf, "TE_UNSUBSCRIBE");
80 break;
81 default:
82 strcpy(buf, "unknown");
83 }
84 return buf;
85 }
86
do_begin(Ndb * ndb,struct Trans_arg & trans_arg)87 static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
88 {
89 trans_arg.ndb = ndb;
90 trans_arg.trans = ndb->startTransaction();
91 trans_arg.bytes_batched = 0;
92 }
93
do_equal(NdbOperation * op,NdbEventOperation * pOp)94 static void do_equal(NdbOperation *op,
95 NdbEventOperation *pOp)
96 {
97 struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
98 Vector<NdbRecAttr*> &ev = event_values[ti->id];
99 const NdbDictionary::Table *tab= pOp->getTable();
100 unsigned i, n_columns = tab->getNoOfColumns();
101 for (i= 0; i < n_columns; i++)
102 {
103 if (tab->getColumn(i)->getPrimaryKey() &&
104 op->equal(i, ev[i]->aRef()))
105 {
106 abort();
107 }
108 }
109 }
110
do_set_value(NdbOperation * op,NdbEventOperation * pOp)111 static void do_set_value(NdbOperation *op,
112 NdbEventOperation *pOp)
113 {
114 struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
115 Vector<NdbRecAttr*> &ev = event_values[ti->id];
116 const NdbDictionary::Table *tab= pOp->getTable();
117 unsigned i, n_columns = tab->getNoOfColumns();
118 for (i= 0; i < n_columns; i++)
119 {
120 if (!tab->getColumn(i)->getPrimaryKey() &&
121 op->setValue(i, ev[i]->aRef()))
122 {
123 abort();
124 }
125 }
126 }
127
do_insert(struct Trans_arg & trans_arg,NdbEventOperation * pOp)128 static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
129 {
130 if (!trans_arg.trans)
131 return;
132
133 NdbOperation *op =
134 trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
135 op->writeTuple();
136
137 do_equal(op, pOp);
138 do_set_value(op, pOp);
139
140 trans_arg.bytes_batched++;
141 if (trans_arg.bytes_batched > BATCH_SIZE)
142 {
143 trans_arg.trans->execute(NdbTransaction::NoCommit);
144 trans_arg.bytes_batched = 0;
145 }
146 }
do_update(struct Trans_arg & trans_arg,NdbEventOperation * pOp)147 static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
148 {
149 if (!trans_arg.trans)
150 return;
151
152 NdbOperation *op =
153 trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
154 op->writeTuple();
155
156 do_equal(op, pOp);
157 do_set_value(op, pOp);
158
159 trans_arg.bytes_batched++;
160 if (trans_arg.bytes_batched > BATCH_SIZE)
161 {
162 trans_arg.trans->execute(NdbTransaction::NoCommit);
163 trans_arg.bytes_batched = 0;
164 }
165 }
do_delete(struct Trans_arg & trans_arg,NdbEventOperation * pOp)166 static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
167 {
168 if (!trans_arg.trans)
169 return;
170
171 NdbOperation *op =
172 trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
173 op->deleteTuple();
174
175 do_equal(op, pOp);
176
177 trans_arg.bytes_batched++;
178 if (trans_arg.bytes_batched > BATCH_SIZE)
179 {
180 trans_arg.trans->execute(NdbTransaction::NoCommit);
181 trans_arg.bytes_batched = 0;
182 }
183 }
do_commit(struct Trans_arg & trans_arg)184 static void do_commit(struct Trans_arg &trans_arg)
185 {
186 if (!trans_arg.trans)
187 return;
188 trans_arg.trans->execute(NdbTransaction::Commit);
189 trans_arg.ndb->closeTransaction(trans_arg.trans);
190 }
191
192 int
main(int argc,const char ** argv)193 main(int argc, const char** argv){
194 ndb_init();
195
196
197 int _help = 0;
198 const char* db = 0;
199 const char* connectstring1 = 0;
200 const char* connectstring2 = 0;
201
202 struct getargs args[] = {
203 { "connectstring1", 'c',
204 arg_string, &connectstring1, "connectstring1", "" },
205 { "connectstring2", 'C',
206 arg_string, &connectstring2, "connectstring2", "" },
207 { "database", 'd', arg_string, &db, "Database", "" },
208 { "usage", '?', arg_flag, &_help, "Print help", "" }
209 };
210 int num_args = sizeof(args) / sizeof(args[0]);
211 int optind = 0, i;
212 char desc[] =
213 "<tabname>+ \nThis program listen to events on specified tables\n";
214
215 if(getarg(args, num_args, argc, argv, &optind) ||
216 argv[optind] == NULL || _help) {
217 arg_printusage(args, num_args, argv[0], desc);
218 return NDBT_ProgramExit(NDBT_WRONGARGS);
219 }
220
221 // Connect to Ndb
222 Ndb_cluster_connection con(connectstring1);
223 if(con.connect(12, 5, 1) != 0)
224 {
225 return NDBT_ProgramExit(NDBT_FAILED);
226 }
227 Ndb MyNdb( &con, db ? db : "TEST_DB" );
228
229 if(MyNdb.init() != 0){
230 NDB_ERR(MyNdb.getNdbError());
231 return NDBT_ProgramExit(NDBT_FAILED);
232 }
233
234 // Connect to Ndb and wait for it to become ready
235 while(MyNdb.waitUntilReady() != 0)
236 ndbout << "Waiting for ndb to become ready..." << endl;
237
238 Ndb_cluster_connection *con2 = NULL;
239 Ndb *ndb2 = NULL;
240 if (connectstring2)
241 {
242 con2 = new Ndb_cluster_connection(connectstring2);
243
244 if(con2->connect(12, 5, 1) != 0)
245 {
246 return NDBT_ProgramExit(NDBT_FAILED);
247 }
248 ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
249
250 if(ndb2->init() != 0){
251 NDB_ERR(ndb2->getNdbError());
252 return NDBT_ProgramExit(NDBT_FAILED);
253 }
254
255 // Connect to Ndb and wait for it to become ready
256 while(ndb2->waitUntilReady() != 0)
257 ndbout << "Waiting for ndb to become ready..." << endl;
258 }
259
260 int result = 0;
261
262 NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
263 Vector<NdbDictionary::Event*> events;
264 Vector<NdbEventOperation*> event_ops;
265 int sz = 0;
266 for(i= optind; i<argc; i++)
267 {
268 const NdbDictionary::Table* table= myDict->getTable(argv[i]);
269 if(!table)
270 {
271 ndbout_c("Could not find table: %s, skipping", argv[i]);
272 continue;
273 }
274
275 BaseString name;
276 name.appfmt("EV-%s", argv[i]);
277 NdbDictionary::Event *myEvent= new NdbDictionary::Event(name.c_str());
278 myEvent->setTable(table->getName());
279 myEvent->addTableEvent(NdbDictionary::Event::TE_ALL);
280 for(int a = 0; a < table->getNoOfColumns(); a++){
281 myEvent->addEventColumn(a);
282 }
283 myEvent->setReport((NdbDictionary::Event::EventReport)
284 (NdbDictionary::Event::ER_UPDATED |
285 NdbDictionary::Event::ER_DDL));
286
287 if (myDict->createEvent(* myEvent))
288 {
289 if(myDict->getNdbError().classification == NdbError::SchemaObjectExists)
290 {
291 g_info << "Event creation failed event exists. Removing...\n";
292 if (myDict->dropEvent(name.c_str()))
293 {
294 g_err << "Failed to drop event: " << myDict->getNdbError() << endl;
295 result = 1;
296 goto end;
297 }
298 // try again
299 if (myDict->createEvent(* myEvent))
300 {
301 g_err << "Failed to create event: " << myDict->getNdbError() << endl;
302 result = 1;
303 goto end;
304 }
305 }
306 else
307 {
308 g_err << "Failed to create event: " << myDict->getNdbError() << endl;
309 result = 1;
310 goto end;
311 }
312 }
313
314 events.push_back(myEvent);
315
316 NdbEventOperation* pOp = MyNdb.createEventOperation(name.c_str());
317 if ( pOp == NULL ) {
318 g_err << "Event operation creation failed" << endl;
319 result = 1;
320 goto end;
321 }
322
323 event_values.push_back(Vector<NdbRecAttr *>());
324 event_pre_values.push_back(Vector<NdbRecAttr *>());
325 for (int a = 0; a < table->getNoOfColumns(); a++)
326 {
327 event_values[sz].
328 push_back(pOp->getValue(table->getColumn(a)->getName()));
329 event_pre_values[sz].
330 push_back(pOp->getPreValue(table->getColumn(a)->getName()));
331 }
332 event_ops.push_back(pOp);
333 {
334 struct Table_info ti;
335 ti.id = sz;
336 table_infos.push_back(ti);
337 }
338 pOp->setCustomData((void *)&table_infos[sz]);
339 sz++;
340 }
341
342 for(i= 0; i<(int)event_ops.size(); i++)
343 {
344 if (event_ops[i]->execute())
345 {
346 g_err << "operation execution failed: " << event_ops[i]->getNdbError()
347 << endl;
348 result = 1;
349 goto end;
350 }
351 }
352
353 struct Trans_arg trans_arg;
354 char buf[64];
355
356 while(true)
357 {
358 while(MyNdb.pollEvents(100) == 0);
359
360 NdbEventOperation* pOp= MyNdb.nextEvent();
361 while(pOp)
362 {
363 Uint64 gci= pOp->getGCI();
364 Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
365 if (ndb2)
366 do_begin(ndb2, trans_arg);
367 do
368 {
369 switch(pOp->getEventType())
370 {
371 case NdbDictionary::Event::TE_INSERT:
372 cnt_i++;
373 if (ndb2)
374 do_insert(trans_arg, pOp);
375 break;
376 case NdbDictionary::Event::TE_DELETE:
377 cnt_d++;
378 if (ndb2)
379 do_delete(trans_arg, pOp);
380 break;
381 case NdbDictionary::Event::TE_UPDATE:
382 cnt_u++;
383 if (ndb2)
384 do_update(trans_arg, pOp);
385 break;
386 case NdbDictionary::Event::TE_CLUSTER_FAILURE:
387 ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
388 break;
389 case NdbDictionary::Event::TE_ALTER:
390 ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
391 break;
392 case NdbDictionary::Event::TE_DROP:
393 ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
394 break;
395 case NdbDictionary::Event::TE_NODE_FAILURE:
396 ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
397 break;
398 case NdbDictionary::Event::TE_SUBSCRIBE:
399 case NdbDictionary::Event::TE_UNSUBSCRIBE:
400 ndbout_c("Received event: %s", event_name(pOp->getEventType(), buf));
401 break;
402 default:
403 /* We should REALLY never get here. */
404 ndbout_c("Error: unknown event type: %u",
405 (Uint32)pOp->getEventType());
406 abort();
407 }
408 } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
409 if (ndb2)
410 do_commit(trans_arg);
411 ndbout_c("GCI: %u/%u events: %lld(I) %lld(U) %lld(D)",
412 Uint32(gci >> 32), Uint32(gci), cnt_i, cnt_u, cnt_d);
413 }
414 }
415 end:
416 for(i= 0; i<(int)event_ops.size(); i++)
417 MyNdb.dropEventOperation(event_ops[i]);
418
419 if (ndb2)
420 delete ndb2;
421 if (con2)
422 delete con2;
423 return NDBT_ProgramExit(NDBT_OK);
424 }
425
426 template class Vector<struct Table_info>;
427 template class Vector<NdbRecAttr*>;
428 template class Vector< Vector<NdbRecAttr*> >;
429 template class Vector<NdbDictionary::Event*>;
430 template class Vector<NdbEventOperation*>;
431