1 /* Copyright (c) 2003-2006 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #define DBTUX_NODE_CPP
18 #include "Dbtux.hpp"
19 
20 /*
21  * Allocate index node in TUP.
22  */
23 int
allocNode(Signal * signal,NodeHandle & node)24 Dbtux::allocNode(Signal* signal, NodeHandle& node)
25 {
26   if (ERROR_INSERTED(12007)) {
27     jam();
28     CLEAR_ERROR_INSERT_VALUE;
29     return TuxMaintReq::NoMemError;
30   }
31   Frag& frag = node.m_frag;
32   Uint32 pageId = NullTupLoc.getPageId();
33   Uint32 pageOffset = NullTupLoc.getPageOffset();
34   Uint32* node32 = 0;
35   int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
36   jamEntry();
37   if (errorCode == 0) {
38     jam();
39     node.m_loc = TupLoc(pageId, pageOffset);
40     node.m_node = reinterpret_cast<TreeNode*>(node32);
41     ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
42   } else {
43     switch (errorCode) {
44     case 827:
45       errorCode = TuxMaintReq::NoMemError;
46       break;
47     }
48   }
49   return errorCode;
50 }
51 
52 /*
53  * Set handle to point to existing node.
54  */
55 void
selectNode(NodeHandle & node,TupLoc loc)56 Dbtux::selectNode(NodeHandle& node, TupLoc loc)
57 {
58   Frag& frag = node.m_frag;
59   ndbrequire(loc != NullTupLoc);
60   Uint32 pageId = loc.getPageId();
61   Uint32 pageOffset = loc.getPageOffset();
62   Uint32* node32 = 0;
63   c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
64   jamEntry();
65   node.m_loc = loc;
66   node.m_node = reinterpret_cast<TreeNode*>(node32);
67   ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
68 }
69 
70 /*
71  * Set handle to point to new node.  Uses a pre-allocated node.
72  */
73 void
insertNode(NodeHandle & node)74 Dbtux::insertNode(NodeHandle& node)
75 {
76   Frag& frag = node.m_frag;
77   // unlink from freelist
78   selectNode(node, frag.m_freeLoc);
79   frag.m_freeLoc = node.getLink(0);
80   new (node.m_node) TreeNode();
81 #ifdef VM_TRACE
82   TreeHead& tree = frag.m_tree;
83   memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
84   TreeEnt* entList = tree.getEntList(node.m_node);
85   memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
86 #endif
87 }
88 
89 /*
90  * Delete existing node.  Simply put it on the freelist.
91  */
92 void
deleteNode(NodeHandle & node)93 Dbtux::deleteNode(NodeHandle& node)
94 {
95   Frag& frag = node.m_frag;
96   ndbrequire(node.getOccup() == 0);
97   // link to freelist
98   node.setLink(0, frag.m_freeLoc);
99   frag.m_freeLoc = node.m_loc;
100   // invalidate the handle
101   node.m_loc = NullTupLoc;
102   node.m_node = 0;
103 }
104 
105 /*
106  * Set prefix.  Copies the number of words that fits.  Includes
107  * attribute headers for now.  XXX use null mask instead
108  */
109 void
setNodePref(NodeHandle & node)110 Dbtux::setNodePref(NodeHandle& node)
111 {
112   const Frag& frag = node.m_frag;
113   const TreeHead& tree = frag.m_tree;
114   readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey);
115   copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize);
116 }
117 
118 // node operations
119 
120 /*
121  * Add entry at position.  Move entries greater than or equal to the old
122  * one (if any) to the right.
123  *
124  *            X
125  *            v
126  *      A B C D E _ _  =>  A B C X D E _
127  *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
128  *
129  * Add list of scans at the new entry.
130  */
131 void
nodePushUp(NodeHandle & node,unsigned pos,const TreeEnt & ent,Uint32 scanList)132 Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
133 {
134   Frag& frag = node.m_frag;
135   TreeHead& tree = frag.m_tree;
136   const unsigned occup = node.getOccup();
137   ndbrequire(occup < tree.m_maxOccup && pos <= occup);
138   // fix old scans
139   if (node.getNodeScan() != RNIL)
140     nodePushUpScans(node, pos);
141   // fix node
142   TreeEnt* const entList = tree.getEntList(node.m_node);
143   entList[occup] = entList[0];
144   TreeEnt* const tmpList = entList + 1;
145   for (unsigned i = occup; i > pos; i--) {
146     jam();
147     tmpList[i] = tmpList[i - 1];
148   }
149   tmpList[pos] = ent;
150   entList[0] = entList[occup + 1];
151   node.setOccup(occup + 1);
152   // add new scans
153   if (scanList != RNIL)
154     addScanList(node, pos, scanList);
155   // fix prefix
156   if (occup == 0 || pos == 0)
157     setNodePref(node);
158 }
159 
160 void
nodePushUpScans(NodeHandle & node,unsigned pos)161 Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
162 {
163   const unsigned occup = node.getOccup();
164   ScanOpPtr scanPtr;
165   scanPtr.i = node.getNodeScan();
166   do {
167     jam();
168     c_scanOpPool.getPtr(scanPtr);
169     TreePos& scanPos = scanPtr.p->m_scanPos;
170     ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
171     if (scanPos.m_pos >= pos) {
172       jam();
173 #ifdef VM_TRACE
174       if (debugFlags & DebugScan) {
175         debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
176         debugOut << "At pushUp pos=" << pos << " " << node << endl;
177       }
178 #endif
179       scanPos.m_pos++;
180     }
181     scanPtr.i = scanPtr.p->m_nodeScan;
182   } while (scanPtr.i != RNIL);
183 }
184 
185 /*
186  * Remove and return entry at position.  Move entries greater than the
187  * removed one to the left.  This is the opposite of nodePushUp.
188  *
189  *                               D
190  *            ^                  ^
191  *      A B C D E F _  =>  A B C E F _ _
192  *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
193  *
194  * Scans at removed entry are returned if non-zero location is passed or
195  * else moved forward.
196  */
197 void
nodePopDown(NodeHandle & node,unsigned pos,TreeEnt & ent,Uint32 * scanList)198 Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
199 {
200   Frag& frag = node.m_frag;
201   TreeHead& tree = frag.m_tree;
202   const unsigned occup = node.getOccup();
203   ndbrequire(occup <= tree.m_maxOccup && pos < occup);
204   if (node.getNodeScan() != RNIL) {
205     // remove or move scans at this position
206     if (scanList == 0)
207       moveScanList(node, pos);
208     else
209       removeScanList(node, pos, *scanList);
210     // fix other scans
211     if (node.getNodeScan() != RNIL)
212       nodePopDownScans(node, pos);
213   }
214   // fix node
215   TreeEnt* const entList = tree.getEntList(node.m_node);
216   entList[occup] = entList[0];
217   TreeEnt* const tmpList = entList + 1;
218   ent = tmpList[pos];
219   for (unsigned i = pos; i < occup - 1; i++) {
220     jam();
221     tmpList[i] = tmpList[i + 1];
222   }
223   entList[0] = entList[occup - 1];
224   node.setOccup(occup - 1);
225   // fix prefix
226   if (occup != 1 && pos == 0)
227     setNodePref(node);
228 }
229 
230 void
nodePopDownScans(NodeHandle & node,unsigned pos)231 Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
232 {
233   const unsigned occup = node.getOccup();
234   ScanOpPtr scanPtr;
235   scanPtr.i = node.getNodeScan();
236   do {
237     jam();
238     c_scanOpPool.getPtr(scanPtr);
239     TreePos& scanPos = scanPtr.p->m_scanPos;
240     ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
241     // handled before
242     ndbrequire(scanPos.m_pos != pos);
243     if (scanPos.m_pos > pos) {
244       jam();
245 #ifdef VM_TRACE
246       if (debugFlags & DebugScan) {
247         debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
248         debugOut << "At popDown pos=" << pos << " " << node << endl;
249       }
250 #endif
251       scanPos.m_pos--;
252     }
253     scanPtr.i = scanPtr.p->m_nodeScan;
254   } while (scanPtr.i != RNIL);
255 }
256 
257 /*
258  * Add entry at existing position.  Move entries less than or equal to
259  * the old one to the left.  Remove and return old min entry.
260  *
261  *            X            A
262  *      ^     v            ^
263  *      A B C D E _ _  =>  B C D X E _ _
264  *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
265  *
266  * Return list of scans at the removed position 0.
267  */
268 void
nodePushDown(NodeHandle & node,unsigned pos,TreeEnt & ent,Uint32 & scanList)269 Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
270 {
271   Frag& frag = node.m_frag;
272   TreeHead& tree = frag.m_tree;
273   const unsigned occup = node.getOccup();
274   ndbrequire(occup <= tree.m_maxOccup && pos < occup);
275   if (node.getNodeScan() != RNIL) {
276     // remove scans at 0
277     removeScanList(node, 0, scanList);
278     // fix other scans
279     if (node.getNodeScan() != RNIL)
280       nodePushDownScans(node, pos);
281   }
282   // fix node
283   TreeEnt* const entList = tree.getEntList(node.m_node);
284   entList[occup] = entList[0];
285   TreeEnt* const tmpList = entList + 1;
286   TreeEnt oldMin = tmpList[0];
287   for (unsigned i = 0; i < pos; i++) {
288     jam();
289     tmpList[i] = tmpList[i + 1];
290   }
291   tmpList[pos] = ent;
292   ent = oldMin;
293   entList[0] = entList[occup];
294   // fix prefix
295   if (true)
296     setNodePref(node);
297 }
298 
299 void
nodePushDownScans(NodeHandle & node,unsigned pos)300 Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
301 {
302   const unsigned occup = node.getOccup();
303   ScanOpPtr scanPtr;
304   scanPtr.i = node.getNodeScan();
305   do {
306     jam();
307     c_scanOpPool.getPtr(scanPtr);
308     TreePos& scanPos = scanPtr.p->m_scanPos;
309     ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
310     // handled before
311     ndbrequire(scanPos.m_pos != 0);
312     if (scanPos.m_pos <= pos) {
313       jam();
314 #ifdef VM_TRACE
315       if (debugFlags & DebugScan) {
316         debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
317         debugOut << "At pushDown pos=" << pos << " " << node << endl;
318       }
319 #endif
320       scanPos.m_pos--;
321     }
322     scanPtr.i = scanPtr.p->m_nodeScan;
323   } while (scanPtr.i != RNIL);
324 }
325 
326 /*
327  * Remove and return entry at position.  Move entries less than the
328  * removed one to the right.  Replace min entry by the input entry.
329  * This is the opposite of nodePushDown.
330  *
331  *      X                        D
332  *      v     ^                  ^
333  *      A B C D E _ _  =>  X A B C E _ _
334  *      0 1 2 3 4 5 6      0 1 2 3 4 5 6
335  *
336  * Move scans at removed entry and add scans at the new entry.
337  */
338 void
nodePopUp(NodeHandle & node,unsigned pos,TreeEnt & ent,Uint32 scanList)339 Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
340 {
341   Frag& frag = node.m_frag;
342   TreeHead& tree = frag.m_tree;
343   const unsigned occup = node.getOccup();
344   ndbrequire(occup <= tree.m_maxOccup && pos < occup);
345   if (node.getNodeScan() != RNIL) {
346     // move scans whose entry disappears
347     moveScanList(node, pos);
348     // fix other scans
349     if (node.getNodeScan() != RNIL)
350       nodePopUpScans(node, pos);
351   }
352   // fix node
353   TreeEnt* const entList = tree.getEntList(node.m_node);
354   entList[occup] = entList[0];
355   TreeEnt* const tmpList = entList + 1;
356   TreeEnt newMin = ent;
357   ent = tmpList[pos];
358   for (unsigned i = pos; i > 0; i--) {
359     jam();
360     tmpList[i] = tmpList[i - 1];
361   }
362   tmpList[0] = newMin;
363   entList[0] = entList[occup];
364   // add scans
365   if (scanList != RNIL)
366     addScanList(node, 0, scanList);
367   // fix prefix
368   if (true)
369     setNodePref(node);
370 }
371 
372 void
nodePopUpScans(NodeHandle & node,unsigned pos)373 Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
374 {
375   const unsigned occup = node.getOccup();
376   ScanOpPtr scanPtr;
377   scanPtr.i = node.getNodeScan();
378   do {
379     jam();
380     c_scanOpPool.getPtr(scanPtr);
381     TreePos& scanPos = scanPtr.p->m_scanPos;
382     ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
383     ndbrequire(scanPos.m_pos != pos);
384     if (scanPos.m_pos < pos) {
385       jam();
386 #ifdef VM_TRACE
387       if (debugFlags & DebugScan) {
388         debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
389         debugOut << "At popUp pos=" << pos << " " << node << endl;
390       }
391 #endif
392       scanPos.m_pos++;
393     }
394     scanPtr.i = scanPtr.p->m_nodeScan;
395   } while (scanPtr.i != RNIL);
396 }
397 
398 /*
399  * Move number of entries from another node to this node before the min
400  * (i=0) or after the max (i=1).  Expensive but not often used.
401  */
402 void
nodeSlide(NodeHandle & dstNode,NodeHandle & srcNode,unsigned cnt,unsigned i)403 Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
404 {
405   ndbrequire(i <= 1);
406   while (cnt != 0) {
407     TreeEnt ent;
408     Uint32 scanList = RNIL;
409     nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
410     nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
411     cnt--;
412   }
413 }
414 
415 // scans linked to node
416 
417 
418 /*
419  * Add list of scans to node at given position.
420  */
421 void
addScanList(NodeHandle & node,unsigned pos,Uint32 scanList)422 Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
423 {
424   ScanOpPtr scanPtr;
425   scanPtr.i = scanList;
426   do {
427     jam();
428     c_scanOpPool.getPtr(scanPtr);
429 #ifdef VM_TRACE
430       if (debugFlags & DebugScan) {
431         debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
432         debugOut << "To pos=" << pos << " " << node << endl;
433       }
434 #endif
435     const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
436     scanPtr.p->m_nodeScan = RNIL;
437     linkScan(node, scanPtr);
438     TreePos& scanPos = scanPtr.p->m_scanPos;
439     // set position but leave direction alone
440     scanPos.m_loc = node.m_loc;
441     scanPos.m_pos = pos;
442     scanPtr.i = nextPtrI;
443   } while (scanPtr.i != RNIL);
444 }
445 
446 /*
447  * Remove list of scans from node at given position.  The return
448  * location must point to existing list (in fact RNIL always).
449  */
450 void
removeScanList(NodeHandle & node,unsigned pos,Uint32 & scanList)451 Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
452 {
453   ScanOpPtr scanPtr;
454   scanPtr.i = node.getNodeScan();
455   do {
456     jam();
457     c_scanOpPool.getPtr(scanPtr);
458     const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
459     TreePos& scanPos = scanPtr.p->m_scanPos;
460     ndbrequire(scanPos.m_loc == node.m_loc);
461     if (scanPos.m_pos == pos) {
462       jam();
463 #ifdef VM_TRACE
464       if (debugFlags & DebugScan) {
465         debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
466         debugOut << "Fron pos=" << pos << " " << node << endl;
467       }
468 #endif
469       unlinkScan(node, scanPtr);
470       scanPtr.p->m_nodeScan = scanList;
471       scanList = scanPtr.i;
472       // unset position but leave direction alone
473       scanPos.m_loc = NullTupLoc;
474       scanPos.m_pos = ZNIL;
475     }
476     scanPtr.i = nextPtrI;
477   } while (scanPtr.i != RNIL);
478 }
479 
480 /*
481  * Move list of scans away from entry about to be removed.  Uses scan
482  * method scanNext().
483  */
484 void
moveScanList(NodeHandle & node,unsigned pos)485 Dbtux::moveScanList(NodeHandle& node, unsigned pos)
486 {
487   ScanOpPtr scanPtr;
488   scanPtr.i = node.getNodeScan();
489   do {
490     jam();
491     c_scanOpPool.getPtr(scanPtr);
492     TreePos& scanPos = scanPtr.p->m_scanPos;
493     const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
494     ndbrequire(scanPos.m_loc == node.m_loc);
495     if (scanPos.m_pos == pos) {
496       jam();
497 #ifdef VM_TRACE
498       if (debugFlags & DebugScan) {
499         debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
500         debugOut << "At pos=" << pos << " " << node << endl;
501       }
502 #endif
503       scanNext(scanPtr, true);
504       ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
505     }
506     scanPtr.i = nextPtrI;
507   } while (scanPtr.i != RNIL);
508 }
509 
510 /*
511  * Link scan to the list under the node.  The list is single-linked and
512  * ordering does not matter.
513  */
514 void
linkScan(NodeHandle & node,ScanOpPtr scanPtr)515 Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr)
516 {
517 #ifdef VM_TRACE
518   if (debugFlags & DebugScan) {
519     debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
520     debugOut << "To node " << node << endl;
521   }
522 #endif
523   ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL);
524   scanPtr.p->m_nodeScan = node.getNodeScan();
525   node.setNodeScan(scanPtr.i);
526 }
527 
528 /*
529  * Unlink a scan from the list under the node.
530  */
531 void
unlinkScan(NodeHandle & node,ScanOpPtr scanPtr)532 Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr)
533 {
534 #ifdef VM_TRACE
535   if (debugFlags & DebugScan) {
536     debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
537     debugOut << "From node " << node << endl;
538   }
539 #endif
540   ScanOpPtr currPtr;
541   currPtr.i = node.getNodeScan();
542   ScanOpPtr prevPtr;
543   prevPtr.i = RNIL;
544   while (true) {
545     jam();
546     c_scanOpPool.getPtr(currPtr);
547     Uint32 nextPtrI = currPtr.p->m_nodeScan;
548     if (currPtr.i == scanPtr.i) {
549       jam();
550       if (prevPtr.i == RNIL) {
551         node.setNodeScan(nextPtrI);
552       } else {
553         jam();
554         prevPtr.p->m_nodeScan = nextPtrI;
555       }
556       scanPtr.p->m_nodeScan = RNIL;
557       // check for duplicates
558       ndbrequire(! islinkScan(node, scanPtr));
559       return;
560     }
561     prevPtr = currPtr;
562     currPtr.i = nextPtrI;
563   }
564 }
565 
566 /*
567  * Check if a scan is linked to this node.  Only for ndbrequire.
568  */
569 bool
islinkScan(NodeHandle & node,ScanOpPtr scanPtr)570 Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr)
571 {
572   ScanOpPtr currPtr;
573   currPtr.i = node.getNodeScan();
574   while (currPtr.i != RNIL) {
575     jam();
576     c_scanOpPool.getPtr(currPtr);
577     if (currPtr.i == scanPtr.i) {
578       jam();
579       return true;
580     }
581     currPtr.i = currPtr.p->m_nodeScan;
582   }
583   return false;
584 }
585 
586 void
progError(int line,int cause,const char * file)587 Dbtux::NodeHandle::progError(int line, int cause, const char* file)
588 {
589   ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line);
590 }
591