1 /* Copyright (c) 2003-2005 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <ndb_global.h>
18 #include <NdbMain.h>
19 #include <NdbApi.hpp>
20 #include <NdbOut.hpp>
21 #include <NdbMutex.h>
22 #include <NdbCondition.h>
23 #include <NdbThread.h>
24 #include <NdbTest.hpp>
25 
26 struct Opt {
27   bool m_dbg;
28   const char* m_scan;
29   const char* m_tname;
30   const char* m_xname;
OptOpt31   Opt() :
32     m_dbg(true),
33     m_scan("tx"),
34     m_tname("T"),
35     m_xname("X")
36     {}
37 };
38 
39 static void
printusage()40 printusage()
41 {
42   Opt d;
43   ndbout
44     << "usage: testDeadlock" << endl
45     << "-scan tx        scan table, index [" << d.m_scan << "]" << endl
46     ;
47 }
48 
49 static Opt g_opt;
50 
51 static NdbMutex *ndbout_mutex= NULL;
52 static Ndb_cluster_connection *g_cluster_connection= 0;
53 #define DBG(x) \
54   do { \
55     if (! g_opt.m_dbg) break; \
56     NdbMutex_Lock(ndbout_mutex); \
57     ndbout << "line " << __LINE__ << " " << x << endl; \
58     NdbMutex_Unlock(ndbout_mutex); \
59   } while (0)
60 
61 #define CHK(x) \
62   do { \
63     if (x) break; \
64     ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
65     return -1; \
66   } while (0)
67 
68 #define CHN(p, x) \
69   do { \
70     if (x) break; \
71     ndbout << "line " << __LINE__ << ": " << #x << " failed" << endl; \
72     ndbout << (p)->getNdbError() << endl; \
73     return -1; \
74   } while (0)
75 
76 // threads
77 
78 typedef int (*Runstep)(struct Thr& thr);
79 
80 struct Thr {
81   enum State { Wait, Start, Stop, Stopped, Exit };
82   State m_state;
83   int m_no;
84   Runstep m_runstep;
85   int m_ret;
86   NdbMutex* m_mutex;
87   NdbCondition* m_cond;
88   NdbThread* m_thread;
89   void* m_status;
90   Ndb* m_ndb;
91   NdbConnection* m_con;
92   NdbScanOperation* m_scanop;
93   NdbIndexScanOperation* m_indexscanop;
94   //
95   Thr(int no);
96   ~Thr();
97   int run();
98   void start(Runstep runstep);
99   void stop();
100   void stopped();
lockThr101   void lock() { NdbMutex_Lock(m_mutex); }
unlockThr102   void unlock() { NdbMutex_Unlock(m_mutex); }
waitThr103   void wait() { NdbCondition_Wait(m_cond, m_mutex); }
signalThr104   void signal() { NdbCondition_Signal(m_cond); }
105   void exit();
joinThr106   void join() { NdbThread_WaitFor(m_thread, &m_status); }
107 };
108 
109 static NdbOut&
operator <<(NdbOut & out,const Thr & thr)110 operator<<(NdbOut& out, const Thr& thr) {
111   out << "thr " << thr.m_no;
112   return out;
113 }
114 
115 extern "C" { static void* runthread(void* arg); }
116 
Thr(int no)117 Thr::Thr(int no)
118 {
119   m_state = Wait;
120   m_no = no;
121   m_runstep = 0;
122   m_ret = 0;
123   m_mutex = NdbMutex_Create();
124   m_cond = NdbCondition_Create();
125   assert(m_mutex != 0 && m_cond != 0);
126   const unsigned stacksize = 256 * 1024;
127   const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
128   m_thread = NdbThread_Create(runthread, (void**)this, stacksize, "me", prio);
129   if (m_thread == 0) {
130     DBG("create thread failed: errno=" << errno);
131     m_ret = -1;
132   }
133   m_status = 0;
134   m_ndb = 0;
135   m_con = 0;
136   m_scanop = 0;
137   m_indexscanop = 0;
138 }
139 
~Thr()140 Thr::~Thr()
141 {
142   if (m_thread != 0)
143     NdbThread_Destroy(&m_thread);
144   if (m_cond != 0)
145     NdbCondition_Destroy(m_cond);
146   if (m_mutex != 0)
147     NdbMutex_Destroy(m_mutex);
148 }
149 
150 static void*
runthread(void * arg)151 runthread(void* arg) {
152   Thr& thr = *(Thr*)arg;
153   thr.run();
154   return 0;
155 }
156 
157 int
run()158 Thr::run()
159 {
160   DBG(*this << " run");
161   while (true) {
162     lock();
163     while (m_state != Start && m_state != Exit) {
164       wait();
165     }
166     if (m_state == Exit) {
167       DBG(*this << " exit");
168       unlock();
169       break;
170     }
171     m_ret = (*m_runstep)(*this);
172     m_state = Stopped;
173     signal();
174     unlock();
175     if (m_ret != 0) {
176       DBG(*this << " error exit");
177       break;
178     }
179   }
180   delete m_ndb;
181   m_ndb = 0;
182   return 0;
183 }
184 
185 void
start(Runstep runstep)186 Thr::start(Runstep runstep)
187 {
188   lock();
189   m_state = Start;
190   m_runstep = runstep;
191   signal();
192   unlock();
193 }
194 
195 void
stopped()196 Thr::stopped()
197 {
198   lock();
199   while (m_state != Stopped) {
200     wait();
201   }
202   m_state = Wait;
203   unlock();
204 }
205 
206 void
exit()207 Thr::exit()
208 {
209   lock();
210   m_state = Exit;
211   signal();
212   unlock();
213 }
214 
215 // general
216 
217 static int
runstep_connect(Thr & thr)218 runstep_connect(Thr& thr)
219 {
220   Ndb* ndb = thr.m_ndb = new Ndb(g_cluster_connection, "TEST_DB");
221   CHN(ndb, ndb->init() == 0);
222   CHN(ndb, ndb->waitUntilReady() == 0);
223   DBG(thr << " connected");
224   return 0;
225 }
226 
227 static int
runstep_starttx(Thr & thr)228 runstep_starttx(Thr& thr)
229 {
230   Ndb* ndb = thr.m_ndb;
231   assert(ndb != 0);
232   CHN(ndb, (thr.m_con = ndb->startTransaction()) != 0);
233   DBG("thr " << thr.m_no << " tx started");
234   return 0;
235 }
236 
237 /*
238  * WL1822 flush locks
239  *
240  * Table T with 3 tuples X, Y, Z.
241  * Two transactions (* = lock wait).
242  *
243  * - tx1 reads and locks Z
244  * - tx2 scans X, Y, *Z
245  * - tx2 returns X, Y before lock wait on Z
246  * - tx1 reads and locks *X
247  * - api asks for next tx2 result
248  * - LQH unlocks X via ACC or TUX [*]
249  * - tx1 gets lock on X
250  * - tx1 returns X to api
251  * - api commits tx1
252  * - tx2 gets lock on Z
253  * - tx2 returs Z to api
254  *
255  * The point is deadlock is avoided due to [*].
256  * The test is for 1 db node and 1 fragment table.
257  */
258 
259 static char wl1822_scantx = 0;
260 
261 static const Uint32 wl1822_valA[3] = { 0, 1, 2 };
262 static const Uint32 wl1822_valB[3] = { 3, 4, 5 };
263 
264 static Uint32 wl1822_bufA = ~0;
265 static Uint32 wl1822_bufB = ~0;
266 
267 // map scan row to key (A) and reverse
268 static unsigned wl1822_r2k[3] = { 0, 0, 0 };
269 static unsigned wl1822_k2r[3] = { 0, 0, 0 };
270 
271 static int
wl1822_createtable(Thr & thr)272 wl1822_createtable(Thr& thr)
273 {
274   Ndb* ndb = thr.m_ndb;
275   assert(ndb != 0);
276   NdbDictionary::Dictionary* dic = ndb->getDictionary();
277   // drop T
278   if (dic->getTable(g_opt.m_tname) != 0)
279     CHN(dic, dic->dropTable(g_opt.m_tname) == 0);
280   // create T
281   NdbDictionary::Table tab(g_opt.m_tname);
282   tab.setFragmentType(NdbDictionary::Object::FragAllSmall);
283   { NdbDictionary::Column col("A");
284     col.setType(NdbDictionary::Column::Unsigned);
285     col.setPrimaryKey(true);
286     tab.addColumn(col);
287   }
288   { NdbDictionary::Column col("B");
289     col.setType(NdbDictionary::Column::Unsigned);
290     col.setPrimaryKey(false);
291     tab.addColumn(col);
292   }
293   CHN(dic, dic->createTable(tab) == 0);
294   // create X
295   NdbDictionary::Index ind(g_opt.m_xname);
296   ind.setTable(g_opt.m_tname);
297   ind.setType(NdbDictionary::Index::OrderedIndex);
298   ind.setLogging(false);
299   ind.addColumn("B");
300   CHN(dic, dic->createIndex(ind) == 0);
301   DBG("created " << g_opt.m_tname << ", " << g_opt.m_xname);
302   return 0;
303 }
304 
305 static int
wl1822_insertrows(Thr & thr)306 wl1822_insertrows(Thr& thr)
307 {
308   // insert X, Y, Z
309   Ndb* ndb = thr.m_ndb;
310   assert(ndb != 0);
311   NdbConnection* con;
312   NdbOperation* op;
313   for (unsigned k = 0; k < 3; k++) {
314     CHN(ndb, (con = ndb->startTransaction()) != 0);
315     CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
316     CHN(op, op->insertTuple() == 0);
317     CHN(op, op->equal("A", (char*)&wl1822_valA[k]) == 0);
318     CHN(op, op->setValue("B", (char*)&wl1822_valB[k]) == 0);
319     CHN(con, con->execute(Commit) == 0);
320     ndb->closeTransaction(con);
321   }
322   DBG("inserted X, Y, Z");
323   return 0;
324 }
325 
326 static int
wl1822_getscanorder(Thr & thr)327 wl1822_getscanorder(Thr& thr)
328 {
329   // cheat, table order happens to be key order in my test
330   wl1822_r2k[0] = 0;
331   wl1822_r2k[1] = 1;
332   wl1822_r2k[2] = 2;
333   wl1822_k2r[0] = 0;
334   wl1822_k2r[1] = 1;
335   wl1822_k2r[2] = 2;
336   DBG("scan order determined");
337   return 0;
338 }
339 
340 static int
wl1822_tx1_readZ(Thr & thr)341 wl1822_tx1_readZ(Thr& thr)
342 {
343   // tx1 read Z with exclusive lock
344   NdbConnection* con = thr.m_con;
345   assert(con != 0);
346   NdbOperation* op;
347   CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
348   CHN(op, op->readTupleExclusive() == 0);
349   CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
350   wl1822_bufB = ~0;
351   CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
352   CHN(con, con->execute(NoCommit) == 0);
353   CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
354   DBG("tx1 locked Z");
355   return 0;
356 }
357 
358 static int
wl1822_tx2_scanXY(Thr & thr)359 wl1822_tx2_scanXY(Thr& thr)
360 {
361   // tx2 scan X, Y with exclusive lock
362   NdbConnection* con = thr.m_con;
363   assert(con != 0);
364   NdbScanOperation* scanop;
365   NdbIndexScanOperation* indexscanop;
366   NdbResultSet* rs;
367   if (wl1822_scantx == 't') {
368     CHN(con, (scanop = thr.m_scanop = con->getNdbScanOperation(g_opt.m_tname)) != 0);
369     DBG("tx2 scan exclusive " << g_opt.m_tname);
370   }
371   if (wl1822_scantx == 'x') {
372     CHN(con, (scanop = thr.m_scanop = indexscanop = thr.m_indexscanop = con->getNdbIndexScanOperation(g_opt.m_xname, g_opt.m_tname)) != 0);
373     DBG("tx2 scan exclusive " << g_opt.m_xname);
374   }
375   CHN(scanop, scanop->readTuplesExclusive(16) == 0);
376   CHN(scanop, scanop->getValue("A", (char*)&wl1822_bufA) != 0);
377   CHN(scanop, scanop->getValue("B", (char*)&wl1822_bufB) != 0);
378   CHN(con, con->execute(NoCommit) == 0);
379   unsigned row = 0;
380   while (row < 2) {
381     DBG("before row " << row);
382     int ret;
383     wl1822_bufA = wl1822_bufB = ~0;
384     CHN(con, (ret = scanop->nextResult(true)) == 0);
385     DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
386     CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
387     CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
388     row++;
389   }
390   return 0;
391 }
392 
393 static int
wl1822_tx1_readX_commit(Thr & thr)394 wl1822_tx1_readX_commit(Thr& thr)
395 {
396   // tx1 read X with exclusive lock and commit
397   NdbConnection* con = thr.m_con;
398   assert(con != 0);
399   NdbOperation* op;
400   CHN(con, (op = con->getNdbOperation(g_opt.m_tname)) != 0);
401   CHN(op, op->readTupleExclusive() == 0);
402   CHN(op, op->equal("A", wl1822_valA[wl1822_r2k[2]]) == 0);
403   wl1822_bufB = ~0;
404   CHN(op, op->getValue("B", (char*)&wl1822_bufB) != 0);
405   CHN(con, con->execute(NoCommit) == 0);
406   CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[2]]);
407   DBG("tx1 locked X");
408   CHN(con, con->execute(Commit) == 0);
409   DBG("tx1 commit");
410   return 0;
411 }
412 
413 static int
wl1822_tx2_scanZ_close(Thr & thr)414 wl1822_tx2_scanZ_close(Thr& thr)
415 {
416   // tx2 scan Z with exclusive lock and close scan
417   Ndb* ndb = thr.m_ndb;
418   NdbConnection* con = thr.m_con;
419   NdbScanOperation* scanop = thr.m_scanop;
420   assert(ndb != 0 && con != 0 && scanop != 0);
421   unsigned row = 2;
422   while (true) {
423     DBG("before row " << row);
424     int ret;
425     wl1822_bufA = wl1822_bufB = ~0;
426     CHN(con, (ret = scanop->nextResult(true)) == 0 || ret == 1);
427     if (ret == 1)
428       break;
429     DBG("got row " << row << " a=" << wl1822_bufA << " b=" << wl1822_bufB);
430     CHK(wl1822_bufA == wl1822_valA[wl1822_r2k[row]]);
431     CHK(wl1822_bufB == wl1822_valB[wl1822_r2k[row]]);
432     row++;
433   }
434   ndb->closeTransaction(con);
435   CHK(row == 3);
436   return 0;
437 }
438 
439 // threads are synced between each step
440 static Runstep wl1822_step[][2] = {
441   { runstep_connect, runstep_connect },
442   { wl1822_createtable, 0 },
443   { wl1822_insertrows, 0 },
444   { wl1822_getscanorder, 0 },
445   { runstep_starttx, runstep_starttx },
446   { wl1822_tx1_readZ, 0 },
447   { 0, wl1822_tx2_scanXY },
448   { wl1822_tx1_readX_commit, wl1822_tx2_scanZ_close }
449 };
450 const unsigned wl1822_stepcount = sizeof(wl1822_step)/sizeof(wl1822_step[0]);
451 
452 static int
wl1822_main(char scantx)453 wl1822_main(char scantx)
454 {
455   wl1822_scantx = scantx;
456   static const unsigned thrcount = 2;
457   // create threads for tx1 and tx2
458   Thr* thrlist[2];
459   int n;
460   for (n = 0; n < thrcount; n++) {
461     Thr& thr = *(thrlist[n] = new Thr(1 + n));
462     CHK(thr.m_ret == 0);
463   }
464   // run the steps
465   for (unsigned i = 0; i < wl1822_stepcount; i++) {
466     DBG("step " << i << " start");
467     for (n = 0; n < thrcount; n++) {
468       Thr& thr = *thrlist[n];
469       Runstep runstep = wl1822_step[i][n];
470       if (runstep != 0)
471         thr.start(runstep);
472     }
473     for (n = 0; n < thrcount; n++) {
474       Thr& thr = *thrlist[n];
475       Runstep runstep = wl1822_step[i][n];
476       if (runstep != 0)
477         thr.stopped();
478     }
479   }
480   // delete threads
481   for (n = 0; n < thrcount; n++) {
482     Thr& thr = *thrlist[n];
483     thr.exit();
484     thr.join();
485     delete &thr;
486   }
487   return 0;
488 }
489 
490 NDB_COMMAND(testOdbcDriver, "testDeadlock", "testDeadlock", "testDeadlock", 65535)
491 {
492   ndb_init();
493   if (ndbout_mutex == NULL)
494     ndbout_mutex= NdbMutex_Create();
495   while (++argv, --argc > 0) {
496     const char* arg = argv[0];
497     if (strcmp(arg, "-scan") == 0) {
498       if (++argv, --argc > 0) {
499         g_opt.m_scan = strdup(argv[0]);
500         continue;
501       }
502     }
503     printusage();
504     return NDBT_ProgramExit(NDBT_WRONGARGS);
505   }
506 
507   Ndb_cluster_connection con;
508   if(con.connect(12, 5, 1) != 0)
509   {
510     return NDBT_ProgramExit(NDBT_FAILED);
511   }
512   g_cluster_connection= &con;
513 
514   if (
515       strchr(g_opt.m_scan, 't') != 0 && wl1822_main('t') == -1 ||
516       strchr(g_opt.m_scan, 'x') != 0 && wl1822_main('x') == -1
517       ) {
518     return NDBT_ProgramExit(NDBT_FAILED);
519   }
520   return NDBT_ProgramExit(NDBT_OK);
521 }
522 
523 // vim: set sw=2 et:
524