1 /*
2    Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "ndb_conflict_trans.h"
26 
27 #ifdef HAVE_NDB_BINLOG
28 #include "my_sys.h"
29 #include "my_base.h"
30 
31 /* Whether to track all transactions, or just
32  * 'interesting' ones
33  */
34 #define TRACK_ALL_TRANSACTIONS 0
35 
36 /* Whether to check the transaction graph for
37  * correctness at runtime
38  */
39 #define CHECK_TRANS_GRAPH 0
40 
41 /* st_row_event_key_info implementation */
42 
43 st_row_event_key_info::
st_row_event_key_info(const NdbDictionary::Table * _table,const uchar * _key_buff,Uint32 _key_buff_len,Uint64 _transaction_id)44 st_row_event_key_info(const NdbDictionary::Table* _table,
45                       const uchar* _key_buff,
46                       Uint32 _key_buff_len,
47                       Uint64 _transaction_id):
48   tableObj(_table),
49   packed_key(_key_buff),
50   packed_key_len(_key_buff_len),
51   transaction_id(_transaction_id),
52   hash_next(NULL)
53 {
54 }
55 
56 Uint64
getTransactionId() const57 st_row_event_key_info::getTransactionId() const
58 {
59   return transaction_id;
60 }
61 
62 void
updateRowTransactionId(Uint64 mostRecentTransId)63 st_row_event_key_info::updateRowTransactionId(Uint64 mostRecentTransId)
64 {
65   transaction_id = mostRecentTransId;
66 }
67 
68 Uint32
hashValue() const69 st_row_event_key_info::hashValue() const
70 {
71   /* Include Table Object Id + primary key */
72   Uint32 h = (17 * 37) + tableObj->getObjectId();
73   for (Uint32 i=0; i < packed_key_len; i++)
74     h = (37 * h) + packed_key[i];
75   return h;
76 }
77 
78 bool
equal(const st_row_event_key_info * other) const79 st_row_event_key_info::equal(const st_row_event_key_info* other) const
80 {
81   /* Check same table + same PK */
82   return
83     ((tableObj == other->tableObj) &&
84      (packed_key_len == other->packed_key_len) &&
85      (memcmp(packed_key, other->packed_key, packed_key_len) == 0));
86 };
87 
88 st_row_event_key_info*
getNext() const89 st_row_event_key_info::getNext() const
90 {
91   return hash_next;
92 };
93 
94 void
setNext(st_row_event_key_info * _next)95 st_row_event_key_info::setNext(st_row_event_key_info* _next)
96 {
97   hash_next = _next;
98 };
99 
100 
101 /* st_trans_dependency implementation */
102 
103 st_trans_dependency::
st_trans_dependency(st_transaction * _target_transaction,st_transaction * _dependent_transaction,const st_trans_dependency * _next)104 st_trans_dependency(st_transaction* _target_transaction,
105                     st_transaction* _dependent_transaction,
106                     const st_trans_dependency* _next)
107   : target_transaction(_target_transaction),
108     dependent_transaction(_dependent_transaction),
109     next_entry(_next),
110     hash_next(NULL)
111 {
112 };
113 
114 st_transaction*
getTargetTransaction() const115 st_trans_dependency::getTargetTransaction() const
116 {
117   return target_transaction;
118 }
119 
120 st_transaction*
getDependentTransaction() const121 st_trans_dependency::getDependentTransaction() const
122 {
123   return dependent_transaction;
124 }
125 
126 const st_trans_dependency*
getNextDependency() const127 st_trans_dependency::getNextDependency() const
128 {
129   return next_entry;
130 }
131 
132 Uint32
hashValue() const133 st_trans_dependency::hashValue() const
134 {
135   /* Hash the ptrs in a rather nasty way */
136   UintPtr p =
137     ((UintPtr) target_transaction) ^
138     ((UintPtr) dependent_transaction);;
139 
140   if (sizeof(p) == 8)
141   {
142     /* Xor two words of 64 bit ptr */
143     p =
144       (p & 0xffffffff) ^
145       ((((Uint64) p) >> 32) & 0xffffffff);
146   }
147 
148   return 17 + (37 * (Uint32)p);
149 };
150 
151 bool
equal(const st_trans_dependency * other) const152 st_trans_dependency::equal(const st_trans_dependency* other) const
153 {
154   return ((target_transaction == other->target_transaction) &&
155           (dependent_transaction == other->dependent_transaction));
156 };
157 
158 st_trans_dependency*
getNext() const159 st_trans_dependency::getNext() const
160 {
161   return hash_next;
162 };
163 
164 void
setNext(st_trans_dependency * _next)165 st_trans_dependency::setNext(st_trans_dependency* _next)
166 {
167   hash_next = _next;
168 };
169 
170 
171 /* st_transaction implementation */
172 
st_transaction(Uint64 _transaction_id)173 st_transaction::st_transaction(Uint64 _transaction_id)
174   : transaction_id(_transaction_id),
175     in_conflict(false),
176     dependency_list_head(NULL),
177     hash_next(NULL)
178 {
179 };
180 
181 Uint64
getTransactionId() const182 st_transaction::getTransactionId() const
183 {
184   return transaction_id;
185 }
186 
187 bool
getInConflict() const188 st_transaction::getInConflict() const
189 {
190   return in_conflict;
191 }
192 
193 void
setInConflict()194 st_transaction::setInConflict()
195 {
196   in_conflict = true;
197 }
198 
199 const st_trans_dependency*
getDependencyListHead() const200 st_transaction::getDependencyListHead() const
201 {
202   return dependency_list_head;
203 }
204 
205 void
setDependencyListHead(st_trans_dependency * head)206 st_transaction::setDependencyListHead(st_trans_dependency* head)
207 {
208   dependency_list_head = head;
209 }
210 
211 /* Hash Api */
212 Uint32
hashValue() const213 st_transaction::hashValue() const
214 {
215   return 17 + (37 * ((transaction_id & 0xffffffff) ^
216                      (transaction_id >> 32 & 0xffffffff)));
217 };
218 
219 bool
equal(const st_transaction * other) const220 st_transaction::equal(const st_transaction* other) const
221 {
222   return transaction_id == other->transaction_id;
223 };
224 
225 st_transaction*
getNext() const226 st_transaction::getNext() const
227 {
228     return hash_next;
229 };
230 
231 void
setNext(st_transaction * _next)232 st_transaction::setNext(st_transaction* _next)
233 {
234   hash_next = _next;
235 };
236 
237 
238 
239 /*
240    Unique HashMap(Set) of st_row_event_key_info ptrs, with bucket storage
241    allocated from st_mem_root_allocator
242 */
243 template class HashMap2<st_row_event_key_info, true, st_mem_root_allocator>;
244 template class HashMap2<st_transaction, true, st_mem_root_allocator>;
245 template class HashMap2<st_trans_dependency, true, st_mem_root_allocator>;
246 template class LinkedStack<Uint64, st_mem_root_allocator>;
247 
248 
249 /**
250  * pack_key_to_buffer
251  *
252  * Given a table, key_record and record, this method will
253  * determine how many significant bytes the key contains,
254  * and if a buffer is passed, will copy the bytes into the
255  * buffer.
256  */
257 static
258 int
pack_key_to_buffer(const NdbDictionary::Table * table,const NdbRecord * key_rec,const uchar * record,uchar * buffer,Uint32 & buff_len)259 pack_key_to_buffer(const NdbDictionary::Table* table,
260                    const NdbRecord* key_rec,
261                    const uchar* record,
262                    uchar* buffer,
263                    Uint32& buff_len)
264 {
265   /* Loop over attributes in key record, determining their actual
266    * size based on column type and contents of record
267    * If buffer supplied, copy them contiguously to buffer
268    * return total length
269    */
270   Uint32 attr_id;
271   Uint32 buff_offset = 0;
272   NdbDictionary::getFirstAttrId(key_rec, attr_id);
273 
274   do
275   {
276     Uint32 from_offset = 0;
277     Uint32 byte_len = 0;
278     const NdbDictionary::Column* key_col = table->getColumn(attr_id);
279     NdbDictionary::getOffset(key_rec, attr_id, from_offset);
280     assert( ! NdbDictionary::isNull(key_rec, (const char*) record, attr_id));
281 
282     switch(key_col->getArrayType())
283     {
284     case NDB_ARRAYTYPE_FIXED:
285       byte_len = key_col->getSizeInBytes();
286       break;
287     case NDB_ARRAYTYPE_SHORT_VAR:
288       byte_len = record[from_offset];
289       from_offset++;
290       break;
291     case NDB_ARRAYTYPE_MEDIUM_VAR:
292       byte_len = uint2korr(&record[from_offset]);
293       from_offset+= 2;
294       break;
295     };
296     assert( (buff_offset + byte_len) <= buff_len );
297 
298     if (buffer)
299       memcpy(&buffer[buff_offset], &record[from_offset], byte_len);
300 
301     buff_offset+= byte_len;
302   } while (NdbDictionary::getNextAttrId(key_rec, attr_id));
303 
304   buff_len = buff_offset;
305   return 0;
306 };
307 
308 static
determine_packed_key_size(const NdbDictionary::Table * table,const NdbRecord * key_rec,const uchar * record)309 Uint32 determine_packed_key_size(const NdbDictionary::Table* table,
310                                  const NdbRecord* key_rec,
311                                  const uchar* record)
312 {
313   Uint32 key_size = ~Uint32(0);
314   /* Use pack_key_to_buffer to calculate length required */
315   pack_key_to_buffer(table,
316                      key_rec,
317                      record,
318                      NULL,
319                      key_size);
320   return key_size;
321 };
322 
323 /* st_mem_root_allocator implementation */
324 void*
alloc(void * ctx,size_t bytes)325 st_mem_root_allocator::alloc(void* ctx, size_t bytes)
326 {
327   st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
328   return alloc_root(a->mem_root, bytes);
329 };
330 
331 void*
mem_calloc(void * ctx,size_t nelem,size_t bytes)332 st_mem_root_allocator::mem_calloc(void* ctx, size_t nelem, size_t bytes)
333 {
334   st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
335   return alloc_root(a->mem_root,
336                     nelem * bytes);
337 }
338 
339 void
mem_free(void * ctx,void * mem)340 st_mem_root_allocator::mem_free(void* ctx, void* mem)
341 {
342   /* Do nothing, will be globally freed when arena (mem_root)
343    * released
344    */
345 };
346 
st_mem_root_allocator(MEM_ROOT * _mem_root)347 st_mem_root_allocator::st_mem_root_allocator(MEM_ROOT* _mem_root)
348   : mem_root(_mem_root)
349 {
350 };
351 
352 
353 /* DependencyTracker implementation */
354 
355 DependencyTracker*
newDependencyTracker(MEM_ROOT * mem_root)356 DependencyTracker::newDependencyTracker(MEM_ROOT* mem_root)
357 {
358   DependencyTracker* dt = NULL;
359   void* mem = alloc_root(mem_root,
360                          sizeof(DependencyTracker));
361   if (mem)
362   {
363     dt = new (mem) DependencyTracker(mem_root);
364   }
365 
366   return dt;
367 }
368 
369 
370 DependencyTracker::
DependencyTracker(MEM_ROOT * mem_root)371 DependencyTracker(MEM_ROOT* mem_root)
372   : mra(mem_root), key_hash(&mra), trans_hash(&mra),
373     dependency_hash(&mra),
374     iteratorTodo(ITERATOR_STACK_BLOCKSIZE, &mra),
375     conflicting_trans_count(0),
376     error_text(NULL)
377 {
378   /* TODO Get sizes from somewhere */
379   key_hash.setSize(1024);
380   trans_hash.setSize(100);
381   dependency_hash.setSize(100);
382 };
383 
384 
385 int
386 DependencyTracker::
track_operation(const NdbDictionary::Table * table,const NdbRecord * key_rec,const uchar * row,Uint64 transaction_id)387 track_operation(const NdbDictionary::Table* table,
388                 const NdbRecord* key_rec,
389                 const uchar* row,
390                 Uint64 transaction_id)
391 {
392   DBUG_ENTER("track_operation");
393 
394   Uint32 required_buff_size = determine_packed_key_size(table,
395                                                         key_rec,
396                                                         row);
397   DBUG_PRINT("info", ("Required length for key : %u", required_buff_size));
398 
399   /* Alloc space for packed key and struct*/
400   uchar* packed_key_buff = (uchar*) alloc_root(mra.mem_root,
401                                                required_buff_size);
402   void* element_mem = alloc_root(mra.mem_root,
403                                  sizeof(st_row_event_key_info));
404 
405   if (pack_key_to_buffer(table,
406                          key_rec,
407                          row,
408                          packed_key_buff,
409                          required_buff_size))
410   {
411     if (!error_text)
412       error_text="track_operation : Failed packing key";
413     DBUG_RETURN(-1);
414   }
415 
416   if (TRACK_ALL_TRANSACTIONS)
417   {
418     st_transaction* transEntry = get_or_create_transaction(transaction_id);
419     if (!transEntry)
420     {
421       error_text = "track_operation : Failed to get or create transaction";
422       DBUG_RETURN(HA_ERR_OUT_OF_MEM);
423     }
424   }
425 
426   st_row_event_key_info* key_info = new (element_mem)
427     st_row_event_key_info(table,
428                           packed_key_buff,
429                           required_buff_size,
430                           transaction_id);
431 
432   /* Now try to add element to hash */
433   if (! key_hash.add(key_info))
434   {
435     /*
436       Already an element in the keyhash with this primary key
437       If it's for the same transaction then ignore, otherwise
438       it's an inter-transaction dependency
439     */
440     st_row_event_key_info* existing = key_hash.get(key_info);
441 
442     Uint64 existingTransIdOnRow = existing->getTransactionId();
443     Uint64 newTransIdOnRow = key_info->getTransactionId();
444 
445     if (existingTransIdOnRow != newTransIdOnRow)
446     {
447       int res = add_dependency(existingTransIdOnRow,
448                                newTransIdOnRow);
449       /*
450         Update stored transaction_id to be latest for key.
451         Further key operations on this row will depend on this
452         transaction, and transitively on the previous
453         transaction.
454       */
455       existing->updateRowTransactionId(newTransIdOnRow);
456 
457       assert(res == 0 || error_text != NULL);
458 
459       DBUG_RETURN(res);
460     }
461     else
462     {
463       /*
464          How can we have two updates to the same row with the
465          same transaction id?  Only if the transaction id
466          is invalid (e.g. not set)
467          In normal cases with only one upstream master, each
468          distinct master user transaction will have a unique
469          id, and all operations on a row in that transaction
470          will be merged in TUP prior to emitting a SUMA
471          event.
472          This could be relaxed for more complex upstream
473          topologies, but acts as a sanity guard currently.
474       */
475       if (existingTransIdOnRow != InvalidTransactionId)
476       {
477         assert(false);
478         error_text= "Two row operations to same key sharing user transaction id";
479         DBUG_RETURN(-1);
480       }
481     }
482   }
483 
484   DBUG_RETURN(0);
485 };
486 
487 int
488 DependencyTracker::
mark_conflict(Uint64 trans_id)489 mark_conflict(Uint64 trans_id)
490 {
491   DBUG_ENTER("mark_conflict");
492   DBUG_PRINT("info", ("trans_id : %llu", trans_id));
493 
494   st_transaction* entry = get_or_create_transaction(trans_id);
495   if (!entry)
496   {
497     error_text = "mark_conflict : get_or_create_transaction() failure";
498     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
499   }
500 
501   if (entry->getInConflict())
502   {
503     /* Nothing to do here */
504     DBUG_RETURN(0);
505   }
506 
507   /* Have entry, mark it, and any dependents */
508   bool fetch_node_dependents;
509   st_transaction* dependent = entry;
510   reset_dependency_iterator();
511   do
512   {
513     DBUG_PRINT("info", ("Visiting transaction %llu, conflict : %u",
514                         dependent->getTransactionId(),
515                         dependent->getInConflict()));
516     /*
517       If marked already, don't fetch dependents, as
518       they will also be marked already
519     */
520     fetch_node_dependents = false;
521 
522     if (!dependent->getInConflict())
523     {
524       dependent->setInConflict();
525       conflicting_trans_count++;
526       fetch_node_dependents = true;
527     }
528   } while ((dependent = get_next_dependency(dependent, fetch_node_dependents)));
529 
530   assert( verify_graph() );
531 
532   DBUG_RETURN(0);
533 }
534 
535 bool
in_conflict(Uint64 trans_id)536 DependencyTracker::in_conflict(Uint64 trans_id)
537 {
538   DBUG_ENTER("in_conflict");
539   DBUG_PRINT("info", ("trans_id %llu", trans_id));
540   st_transaction key(trans_id);
541   const st_transaction* entry = NULL;
542 
543   /*
544     If transaction hash entry exists, check it for
545     conflicts.  If it doesn't exist, no conflict
546   */
547   if ((entry = trans_hash.get(&key)))
548   {
549     DBUG_PRINT("info", ("in_conflict : %u", entry->getInConflict()));
550     DBUG_RETURN(entry->getInConflict());
551   }
552   else
553   {
554     assert(! TRACK_ALL_TRANSACTIONS);
555   }
556   DBUG_RETURN(false);
557 };
558 
559 st_transaction*
560 DependencyTracker::
get_or_create_transaction(Uint64 trans_id)561 get_or_create_transaction(Uint64 trans_id)
562 {
563   DBUG_ENTER("get_or_create_transaction");
564   st_transaction transKey(trans_id);
565   st_transaction* transEntry = NULL;
566 
567   if (! (transEntry = trans_hash.get(&transKey)))
568   {
569     /*
570        Transaction does not exist.  Allocate it
571        and add to the hash
572     */
573     DBUG_PRINT("info", ("Creating new hash entry for transaction (%llu)",
574                         trans_id));
575 
576     transEntry = (st_transaction*)
577       st_mem_root_allocator::alloc(&mra, sizeof(st_transaction));
578 
579     if (transEntry)
580     {
581       new (transEntry) st_transaction(trans_id);
582 
583       if (!trans_hash.add(transEntry))
584       {
585         st_mem_root_allocator::mem_free(&mra, transEntry); /* For show */
586         transEntry = NULL;
587       }
588     }
589   }
590 
591   DBUG_RETURN(transEntry);
592 }
593 
594 int
595 DependencyTracker::
add_dependency(Uint64 trans_id,Uint64 dependent_trans_id)596 add_dependency(Uint64 trans_id, Uint64 dependent_trans_id)
597 {
598   DBUG_ENTER("add_dependency");
599   DBUG_PRINT("info", ("Recording dependency of %llu on %llu",
600                       dependent_trans_id, trans_id));
601   st_transaction* targetEntry = get_or_create_transaction(trans_id);
602   if (!targetEntry)
603   {
604     error_text = "add_dependency : Failed get_or_create_transaction";
605     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
606   }
607 
608   st_transaction* dependentEntry = get_or_create_transaction(dependent_trans_id);
609   if (!dependentEntry)
610   {
611     error_text = "add_dependency : Failed get_or_create_transaction";
612     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
613   }
614 
615   /* Now lookup dependency.  Add it if not already present */
616   st_trans_dependency depKey(targetEntry, dependentEntry, NULL);
617   st_trans_dependency* dep = NULL;
618   if (! (dep = dependency_hash.get(&depKey)))
619   {
620     DBUG_PRINT("info", ("Creating new dependency hash entry for "
621                         "dependency of %llu on %llu.",
622                         dependentEntry->getTransactionId(),
623                         targetEntry->getTransactionId()));
624 
625     dep = (st_trans_dependency*)
626       st_mem_root_allocator::alloc(&mra, sizeof(st_trans_dependency));
627 
628     new (dep) st_trans_dependency(targetEntry, dependentEntry,
629                                   targetEntry->getDependencyListHead());
630 
631     targetEntry->setDependencyListHead(dep);
632 
633     /* New dependency, propagate in_conflict if necessary */
634     if (targetEntry->getInConflict())
635     {
636       DBUG_PRINT("info", ("Marking new dependent as in-conflict"));
637       DBUG_RETURN(mark_conflict(dependentEntry->getTransactionId()));
638     }
639   }
640 
641   assert(verify_graph());
642 
643   DBUG_RETURN(0);
644 };
645 
646 void
647 DependencyTracker::
reset_dependency_iterator()648 reset_dependency_iterator()
649 {
650   iteratorTodo.reset();
651 };
652 
653 st_transaction*
654 DependencyTracker::
get_next_dependency(const st_transaction * current,bool include_dependents_of_current)655 get_next_dependency(const st_transaction* current,
656                     bool include_dependents_of_current)
657 {
658   DBUG_ENTER("get_next_dependency");
659   DBUG_PRINT("info", ("node : %llu", current->getTransactionId()));
660   /*
661     Depth first traverse, with option to ignore sub graphs.
662   */
663   if (include_dependents_of_current)
664   {
665     /* Add dependents to stack */
666     const st_trans_dependency* dependency = current->getDependencyListHead();
667 
668     while (dependency)
669     {
670       assert(dependency->getTargetTransaction() == current);
671       DBUG_PRINT("info", ("Adding dependency %llu->%llu",
672                           dependency->getDependentTransaction()->getTransactionId(),
673                           dependency->getTargetTransaction()->getTransactionId()));
674 
675       Uint64 dependentTransactionId =
676         dependency->getDependentTransaction()->getTransactionId();
677       iteratorTodo.push(dependentTransactionId);
678       dependency= dependency->getNextDependency();
679     }
680   }
681 
682   Uint64 nextId;
683   if (iteratorTodo.pop(nextId))
684   {
685     DBUG_PRINT("info", ("Returning transaction id %llu",
686                         nextId));
687     st_transaction key(nextId);
688     st_transaction* dependent = trans_hash.get(&key);
689     assert(dependent);
690     DBUG_RETURN(dependent);
691   }
692 
693   assert(iteratorTodo.size() == 0);
694   DBUG_PRINT("info", ("No more dependencies to visit"));
695   DBUG_RETURN(NULL);
696 };
697 
698 void
699 DependencyTracker::
dump_dependents(Uint64 trans_id)700 dump_dependents(Uint64 trans_id)
701 {
702   fprintf(stderr, "Dumping dependents of transid %llu : ", trans_id);
703 
704   st_transaction key(trans_id);
705   const st_transaction* dependent = NULL;
706 
707   if ((dependent = trans_hash.get(&key)))
708   {
709     reset_dependency_iterator();
710     const char* comma = ", ";
711     const char* sep = "";
712     do
713     {
714       {
715         fprintf(stderr, "%s%llu%s", sep, dependent->getTransactionId(),
716                 (dependent->getInConflict()?"-C":""));
717         sep = comma;
718       }
719     } while ((dependent = get_next_dependency(dependent)));
720     fprintf(stderr, "\n");
721   }
722   else
723   {
724     fprintf(stderr, "None\n");
725   }
726 };
727 
728 bool
729 DependencyTracker::
verify_graph()730 verify_graph()
731 {
732   if (! CHECK_TRANS_GRAPH)
733     return true;
734 
735   /*
736      Check the graph structure obeys its invariants
737 
738      1) There are no cycles in the graph such that
739         a transaction is a dependent of itself
740 
741      2) If a transaction is marked in_conflict, all
742         of its dependents (transitively), are also
743         marked in conflict
744 
745      This is expensive to verify, so not always on
746   */
747   HashMap2<st_transaction, true, st_mem_root_allocator>::Iterator it(trans_hash);
748 
749   st_transaction* root = NULL;
750 
751   while ((root = it.next()))
752   {
753     bool in_conflict = root->getInConflict();
754 
755     /* Now visit all dependents */
756     st_transaction* dependent = root;
757     reset_dependency_iterator();
758 
759     while((dependent = get_next_dependency(dependent, true)))
760     {
761       if (dependent == root)
762       {
763         /* Must exit, or we'll be here forever */
764         fprintf(stderr, "Error : Cycle discovered in graph\n");
765         abort();
766         return false;
767       }
768 
769       if (in_conflict &&
770           ! dependent->getInConflict())
771       {
772         fprintf(stderr, "Error : Dependent transaction not marked in-conflict\n");
773         abort();
774         return false;
775       }
776     }
777   }
778 
779   return true;
780 }
781 
782 
783 const char*
get_error_text() const784 DependencyTracker::get_error_text() const
785 {
786   return error_text;
787 };
788 
789 Uint32
get_conflict_count() const790 DependencyTracker::get_conflict_count() const
791 {
792   return conflicting_trans_count;
793 }
794 
795 /* #ifdef HAVE_NDB_BINLOG */
796 
797 #endif
798