1 /* Copyright (c) 2003-2005 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <ndb_global.h>
18 #include <NdbMain.h>
19 #include <NdbApi.hpp>
20 #include <NdbOut.hpp>
21 #include <NdbMutex.h>
22 #include <NdbCondition.h>
23 #include <NdbThread.h>
24 #include <NdbTest.hpp>
25
26 struct Opt {
27 bool m_dbg;
28 const char* m_scan;
29 const char* m_tname;
30 const char* m_xname;
OptOpt31 Opt() :
32 m_dbg(true),
33 m_scan("tx"),
34 m_tname("T"),
35 m_xname("X")
36 {}
37 };
38
39 static void
printusage()40 printusage()
41 {
42 Opt d;
43 ndbout
44 << "usage: testDeadlock" << endl
45 << "-scan tx scan table, index [" << d.m_scan << "]" << endl
46 ;
47 }
48
49 static Opt g_opt;
50
51 static NdbMutex *ndbout_mutex= NULL;
52 static Ndb_cluster_connection *g_cluster_connection= 0;
53 #define DBG(x) \
54 do { \
55 if (! g_opt.m_dbg) break; \
56 NdbMutex_Lock(ndbout_mutex); \
57 ndbout << "line " << __LINE__ << " " << x << endl; \
58 NdbMutex_Unlock(ndbout_mutex); \
59 } while (0)
60
61 #define CHK(x) \
62 do { \
63 if (x) break; \
64 ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
65 return -1; \
66 } while (0)
67
68 #define CHN(p, x) \
69 do { \
70 if (x) break; \
71 ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
72 ndbout << (p)->getNdbError() << endl; \
73 return -1; \
74 } while (0)
75
76 // threads
77
78 typedef int (*Runstep)(struct Thr& thr);
79
80 struct Thr {
81 enum State { Wait, Start, Stop, Stopped, Exit };
82 State m_state;
83 int m_no;
84 Runstep m_runstep;
85 int m_ret;
86 NdbMutex* m_mutex;
87 NdbCondition* m_cond;
88 NdbThread* m_thread;
89 void* m_status;
90 Ndb* m_ndb;
91 NdbConnection* m_con;
92 NdbScanOperation* m_scanop;
93 NdbIndexScanOperation* m_indexscanop;
94 //
95 Thr(int no);
96 ~Thr();
97 int run();
98 void start(Runstep runstep);
99 void stop();
100 void stopped();
lockThr101 void lock() { NdbMutex_Lock(m_mutex); }
unlockThr102 void unlock() { NdbMutex_Unlock(m_mutex); }
waitThr103 void wait() { NdbCondition_Wait(m_cond, m_mutex); }
signalThr104 void signal() { NdbCondition_Signal(m_cond); }
105 void exit();
joinThr106 void join() { NdbThread_WaitFor(m_thread, &m_status); }
107 };
108
109 static NdbOut&
operator <<(NdbOut & out,const Thr & thr)110 operator<<(NdbOut& out, const Thr& thr) {
111 out << "thr " << thr.m_no;
112 return out;
113 }
114
115 extern "C" { static void* runthread(void* arg); }
116
Thr(int no)117 Thr::Thr(int no)
118 {
119 m_state = Wait;
120 m_no = no;
121 m_runstep = 0;
122 m_ret = 0;
123 m_mutex = NdbMutex_Create();
124 m_cond = NdbCondition_Create();
125 assert(m_mutex != 0 && m_cond != 0);
126 const unsigned stacksize = 256 * 1024;
127 const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
128 m_thread = NdbThread_Create(runthread, (void**)this, stacksize, "me", prio);
129 if (m_thread == 0) {
130 DBG("create thread failed: errno=" << errno);
131 m_ret = -1;
132 }
133 m_status = 0;
134 m_ndb = 0;
135 m_con = 0;
136 m_scanop = 0;
137 m_indexscanop = 0;
138 }
139
~Thr()140 Thr::~Thr()
141 {
142 if (m_thread != 0)
143 NdbThread_Destroy(&m_thread);
144 if (m_cond != 0)
145 NdbCondition_Destroy(m_cond);
146 if (m_mutex != 0)
147 NdbMutex_Destroy(m_mutex);
148 }
149
150 static void*
runthread(void * arg)151 runthread(void* arg) {
152 Thr& thr = *(Thr*)arg;
153 thr.run();
154 return 0;
155 }
156
157 int
run()158 Thr::run()
159 {
160 DBG(*this << " run");
161 while (true) {
162 lock();
163 while (m_state != Start && m_state != Exit) {
164 wait();
165 }
166 if (m_state == Exit) {
167 DBG(*this << " exit");
168 unlock();
169 break;
170 }
171 m_ret = (*m_runstep)(*this);
172 m_state = Stopped;
173 signal();
174 unlock();
175 if (m_ret != 0) {
176 DBG(*this << " error exit");
177 break;
178 }
179 }
180 delete m_ndb;
181 m_ndb = 0;
182 return 0;
183 }
184
185 void
start(Runstep runstep)186 Thr::start(Runstep runstep)
187 {
188 lock();
189 m_state = Start;
190 m_runstep = runstep;
191 signal();
192 unlock();
193 }
194
195 void
stopped()196 Thr::stopped()
197 {
198 lock();
199 while (m_state != Stopped) {
200 wait();
201 }
202 m_state = Wait;
203 unlock();
204 }
205
206 void
exit()207 Thr::exit()
208 {
209 lock();
210 m_state = Exit;
211 signal();
212 unlock();
213 }
214
215 // general
216
217 static int
runstep_connect(Thr & thr)218 runstep_connect(Thr& thr)
219 {
220 Ndb* ndb = thr.m_ndb = new Ndb(g_cluster_connection, "TEST_DB");
221 CHN(ndb, ndb->init() == 0);
222 CHN(ndb, ndb->waitUntilReady() == 0);
223 DBG(thr << " connected");
224 return 0;
225 }
226
227 static int
runstep_starttx(Thr & thr)228 runstep_starttx(Thr& thr)
229 {
230 Ndb* ndb = thr.m_ndb;
231 assert(ndb != 0);
232 CHN(ndb, (thr.m_con = ndb->startTransaction()) != 0);
233 DBG("thr " << thr.m_no << " tx started");
234 return 0;
235 }
236
237 /*
238 * WL1822 flush locks
239 *
240 * Table T with 3 tuples X, Y, Z.
241 * Two transactions (* = lock wait).
242 *
243 * - tx1 reads and locks Z
244 * - tx2 scans X, Y, *Z
245 * - tx2 returns X, Y before lock wait on Z
246 * - tx1 reads and locks *X
247 * - api asks for next tx2 result
248 * - LQH unlocks X via ACC or TUX [*]
249 * - tx1 gets lock on X
250 * - tx1 returns X to api
251 * - api commits tx1
252 * - tx2 gets lock on Z
253 * - tx2 returs Z to api
254 *
255 * The point is deadlock is avoided due to [*].
256 * The test is for 1 db node and 1 fragment table.
257 */
258
259 static char wl1822_scantx = 0;
260
261 static const Uint32 wl1822_valA[3] = { 0, 1, 2 };
262 static const Uint32 wl1822_valB[3] = { 3, 4, 5 };
263
264 static Uint32 wl1822_bufA = ~0;
265 static Uint32 wl1822_bufB = ~0;
266
267 // map scan row to key (A) and reverse
268 static unsigned wl1822_r2k[3] = { 0, 0, 0 };
269 static unsigned wl1822_k2r[3] = { 0, 0, 0 };
270
271 static int
wl1822_createtable(Thr & thr)272 wl1822_createtable(Thr& thr)
273 {
274 Ndb* ndb = thr.m_ndb;
275 assert(ndb != 0);
276 NdbDictionary::Dictionary* dic = ndb->getDictionary();
277 // drop T
278 if (dic->getTable(g_opt.m_tname) != 0)
279 CHN(dic, dic->dropTable(g_opt.m_tname) == 0);
280 // create T
281 NdbDictionary::Table tab(g_opt.m_tname);
282 tab.setFragmentType(NdbDictionary::Object::FragAllSmall);
283 { NdbDictionary::Column col("A");
284 col.setType(NdbDictionary::Column::Unsigned);
285 col.setPrimaryKey(true);
286 tab.addColumn(col);
287 }
288 { NdbDictionary::Column col("B");
289 col.setType(NdbDictionary::Column::Unsigned);
290 col.setPrimaryKey(false);
291 tab.addColumn(col);
292 }
293 CHN(dic, dic->createTable(tab) == 0);
294 // create X
295 NdbDictionary::Index ind(g_opt.m_xname);
296 ind.setTable(g_opt.m_tname);
297 ind.setType(NdbDictionary::Index::OrderedIndex);
298 ind.setLogging(false);
299 ind.addColumn("B");
300 CHN(dic, dic->createIndex(ind) == 0);
301 DBG("created " << g_opt.m_tname << ", " << g_opt.m_xname);
302 return 0;
303 }
304
305 static int
wl1822_insertrows(Thr & thr)306 wl1822_insertrows(Thr& thr)
307 {
308 // insert X, Y, Z
309 Ndb* ndb = thr.m_ndb;
310 assert(ndb != 0);
311 NdbConnection* con;
312 NdbOperation* op;
313 for (unsigned k = 0; k < 3; k++) {
314 CHN(ndb, (con = ndb->startTransaction()) != 0);
315 CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
316 CHN(op, op->insertTuple() == 0);
317 CHN(op, op->equal("A", (char*)&wl1822_valA[k]) == 0);
318 CHN(op, op->setValue("B", (char*)&wl1822_valB[k]) == 0);
319 CHN(con, con->execute(Commit) == 0);
320 ndb->closeTransaction(con);
321 }
322 DBG("inserted X, Y, Z");
323 return 0;
324 }
325
326 static int
wl1822_getscanorder(Thr & thr)327 wl1822_getscanorder(Thr& thr)
328 {
329 // cheat, table order happens to be key order in my test
330 wl1822_r2k[0] = 0;
331 wl1822_r2k[1] = 1;
332 wl1822_r2k[2] = 2;
333 wl1822_k2r[0] = 0;
334 wl1822_k2r[1] = 1;
335 wl1822_k2r[2] = 2;
336 DBG("scan order determined");
337 return 0;
338 }
339
340 static int
wl1822_tx1_readZ(Thr & thr)341 wl1822_tx1_readZ(Thr& thr)
342 {
343 // tx1 read Z with exclusive lock
344 NdbConnection* con = thr.m_con;
345 assert(con != 0);
346 NdbOperation* op;
347 CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
348 CHN(op, op->readTupleExclusive() == 0);
349 CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
350 wl1822_bufB = ~0;
351 CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
352 CHN(con, con->execute(NoCommit) == 0);
353 CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
354 DBG("tx1 locked Z");
355 return 0;
356 }
357
358 static int
wl1822_tx2_scanXY(Thr & thr)359 wl1822_tx2_scanXY(Thr& thr)
360 {
361 // tx2 scan X, Y with exclusive lock
362 NdbConnection* con = thr.m_con;
363 assert(con != 0);
364 NdbScanOperation* scanop;
365 NdbIndexScanOperation* indexscanop;
366 NdbResultSet* rs;
367 if (wl1822_scantx == 't') {
368 CHN(con, (scanop = thr.m_scanop = con->getNdbScanOperation(g_opt.m_tname)) != 0);
369 DBG("tx2 scan exclusive " << g_opt.m_tname);
370 }
371 if (wl1822_scantx == 'x') {
372 CHN(con, (scanop = thr.m_scanop = indexscanop = thr.m_indexscanop = con->getNdbIndexScanOperation(g_opt.m_xname, g_opt.m_tname)) != 0);
373 DBG("tx2 scan exclusive " << g_opt.m_xname);
374 }
375 CHN(scanop, scanop->readTuplesExclusive(16) == 0);
376 CHN(scanop, scanop->getValue("A", (char*)&wl1822_bufA) != 0);
377 CHN(scanop, scanop->getValue("B", (char*)&wl1822_bufB) != 0);
378 CHN(con, con->execute(NoCommit) == 0);
379 unsigned row = 0;
380 while (row < 2) {
381 DBG("before row " << row);
382 int ret;
383 wl1822_bufA = wl1822_bufB = ~0;
384 CHN(con, (ret = scanop->nextResult(true)) == 0);
385 DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
386 CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
387 CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
388 row++;
389 }
390 return 0;
391 }
392
393 static int
wl1822_tx1_readX_commit(Thr & thr)394 wl1822_tx1_readX_commit(Thr& thr)
395 {
396 // tx1 read X with exclusive lock and commit
397 NdbConnection* con = thr.m_con;
398 assert(con != 0);
399 NdbOperation* op;
400 CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
401 CHN(op, op->readTupleExclusive() == 0);
402 CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
403 wl1822_bufB = ~0;
404 CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
405 CHN(con, con->execute(NoCommit) == 0);
406 CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
407 DBG("tx1 locked X");
408 CHN(con, con->execute(Commit) == 0);
409 DBG("tx1 commit");
410 return 0;
411 }
412
413 static int
wl1822_tx2_scanZ_close(Thr & thr)414 wl1822_tx2_scanZ_close(Thr& thr)
415 {
416 // tx2 scan Z with exclusive lock and close scan
417 Ndb* ndb = thr.m_ndb;
418 NdbConnection* con = thr.m_con;
419 NdbScanOperation* scanop = thr.m_scanop;
420 assert(ndb != 0 && con != 0 && scanop != 0);
421 unsigned row = 2;
422 while (true) {
423 DBG("before row " << row);
424 int ret;
425 wl1822_bufA = wl1822_bufB = ~0;
426 CHN(con, (ret = scanop->nextResult(true)) == 0 || ret == 1);
427 if (ret == 1)
428 break;
429 DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
430 CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
431 CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
432 row++;
433 }
434 ndb->closeTransaction(con);
435 CHK(row == 3);
436 return 0;
437 }
438
439 // threads are synced between each step
440 static Runstep wl1822_step[][2] = {
441 { runstep_connect, runstep_connect },
442 { wl1822_createtable, 0 },
443 { wl1822_insertrows, 0 },
444 { wl1822_getscanorder, 0 },
445 { runstep_starttx, runstep_starttx },
446 { wl1822_tx1_readZ, 0 },
447 { 0, wl1822_tx2_scanXY },
448 { wl1822_tx1_readX_commit, wl1822_tx2_scanZ_close }
449 };
450 const unsigned wl1822_stepcount = sizeof(wl1822_step)/sizeof(wl1822_step[0]);
451
452 static int
wl1822_main(char scantx)453 wl1822_main(char scantx)
454 {
455 wl1822_scantx = scantx;
456 static const unsigned thrcount = 2;
457 // create threads for tx1 and tx2
458 Thr* thrlist[2];
459 int n;
460 for (n = 0; n < thrcount; n++) {
461 Thr& thr = *(thrlist[n] = new Thr(1 + n));
462 CHK(thr.m_ret == 0);
463 }
464 // run the steps
465 for (unsigned i = 0; i < wl1822_stepcount; i++) {
466 DBG("step " << i << " start");
467 for (n = 0; n < thrcount; n++) {
468 Thr& thr = *thrlist[n];
469 Runstep runstep = wl1822_step[i][n];
470 if (runstep != 0)
471 thr.start(runstep);
472 }
473 for (n = 0; n < thrcount; n++) {
474 Thr& thr = *thrlist[n];
475 Runstep runstep = wl1822_step[i][n];
476 if (runstep != 0)
477 thr.stopped();
478 }
479 }
480 // delete threads
481 for (n = 0; n < thrcount; n++) {
482 Thr& thr = *thrlist[n];
483 thr.exit();
484 thr.join();
485 delete &thr;
486 }
487 return 0;
488 }
489
490 NDB_COMMAND(testOdbcDriver, "testDeadlock", "testDeadlock", "testDeadlock", 65535)
491 {
492 ndb_init();
493 if (ndbout_mutex == NULL)
494 ndbout_mutex= NdbMutex_Create();
495 while (++argv, --argc > 0) {
496 const char* arg = argv[0];
497 if (strcmp(arg, "-scan") == 0) {
498 if (++argv, --argc > 0) {
499 g_opt.m_scan = strdup(argv[0]);
500 continue;
501 }
502 }
503 printusage();
504 return NDBT_ProgramExit(NDBT_WRONGARGS);
505 }
506
507 Ndb_cluster_connection con;
508 if(con.connect(12, 5, 1) != 0)
509 {
510 return NDBT_ProgramExit(NDBT_FAILED);
511 }
512 g_cluster_connection= &con;
513
514 if (
515 strchr(g_opt.m_scan, 't') != 0 && wl1822_main('t') == -1 ||
516 strchr(g_opt.m_scan, 'x') != 0 && wl1822_main('x') == -1
517 ) {
518 return NDBT_ProgramExit(NDBT_FAILED);
519 }
520 return NDBT_ProgramExit(NDBT_OK);
521 }
522
523 // vim: set sw=2 et:
524