1 /* Copyright (c) 2003-2008 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #define DBTUP_C
18 #define DBTUP_INDEX_CPP
19 #include <Dblqh.hpp>
20 #include "Dbtup.hpp"
21 #include <RefConvert.hpp>
22 #include <ndb_limits.h>
23 #include <pc.hpp>
24 #include <AttributeDescriptor.hpp>
25 #include "AttributeOffset.hpp"
26 #include <AttributeHeader.hpp>
27 #include <signaldata/TuxMaint.hpp>
28 
29 // methods used by ordered index
30 
31 void
tuxGetTupAddr(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 & tupAddr)32 Dbtup::tuxGetTupAddr(Uint32 fragPtrI,
33                      Uint32 pageId,
34                      Uint32 pageIndex,
35                      Uint32& tupAddr)
36 {
37   jamEntry();
38   PagePtr pagePtr;
39   c_page_pool.getPtr(pagePtr, pageId);
40   Uint32 fragPageId= pagePtr.p->frag_page_id;
41   tupAddr= (fragPageId << MAX_TUPLES_BITS) | pageIndex;
42 }
43 
44 int
tuxAllocNode(Signal * signal,Uint32 fragPtrI,Uint32 & pageId,Uint32 & pageOffset,Uint32 * & node)45 Dbtup::tuxAllocNode(Signal* signal,
46                     Uint32 fragPtrI,
47                     Uint32& pageId,
48                     Uint32& pageOffset,
49                     Uint32*& node)
50 {
51   jamEntry();
52   FragrecordPtr fragPtr;
53   fragPtr.i= fragPtrI;
54   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
55   TablerecPtr tablePtr;
56   tablePtr.i= fragPtr.p->fragTableId;
57   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
58   terrorCode= 0;
59 
60   Local_key key;
61   Uint32* ptr, frag_page_id;
62   if ((ptr= alloc_fix_rec(fragPtr.p, tablePtr.p, &key, &frag_page_id)) == 0)
63   {
64     jam();
65     terrorCode = ZMEM_NOMEM_ERROR; // caller sets error
66     return terrorCode;
67   }
68   pageId= key.m_page_no;
69   pageOffset= key.m_page_idx;
70   Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
71   Uint32 attrDataOffset= AttributeOffset::getOffset(
72                               tableDescriptor[attrDescIndex + 1].tabDescr);
73   node= ptr + attrDataOffset;
74   return 0;
75 }
76 
77 #if 0
78 void
79 Dbtup::tuxFreeNode(Signal* signal,
80                    Uint32 fragPtrI,
81                    Uint32 pageId,
82                    Uint32 pageOffset,
83                    Uint32* node)
84 {
85   jamEntry();
86   FragrecordPtr fragPtr;
87   fragPtr.i= fragPtrI;
88   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
89   TablerecPtr tablePtr;
90   tablePtr.i= fragPtr.p->fragTableId;
91   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
92   PagePtr pagePtr;
93   pagePtr.i= pageId;
94   ptrCheckGuard(pagePtr, cnoOfPage, cpage);
95   Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
96   Uint32 attrDataOffset= AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
97   ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset);
98   freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset);
99 }
100 #endif
101 
102 void
tuxGetNode(Uint32 fragPtrI,Uint32 pageId,Uint32 pageOffset,Uint32 * & node)103 Dbtup::tuxGetNode(Uint32 fragPtrI,
104                   Uint32 pageId,
105                   Uint32 pageOffset,
106                   Uint32*& node)
107 {
108   jamEntry();
109   FragrecordPtr fragPtr;
110   fragPtr.i= fragPtrI;
111   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
112   TablerecPtr tablePtr;
113   tablePtr.i= fragPtr.p->fragTableId;
114   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
115   PagePtr pagePtr;
116   c_page_pool.getPtr(pagePtr, pageId);
117   Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
118   Uint32 attrDataOffset= AttributeOffset::getOffset(
119                             tableDescriptor[attrDescIndex + 1].tabDescr);
120   node= ((Fix_page*)pagePtr.p)->
121     get_ptr(pageOffset, tablePtr.p->m_offsets[MM].m_fix_header_size) +
122     attrDataOffset;
123 }
124 int
tuxReadAttrs(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 tupVersion,const Uint32 * attrIds,Uint32 numAttrs,Uint32 * dataOut)125 Dbtup::tuxReadAttrs(Uint32 fragPtrI,
126                     Uint32 pageId,
127                     Uint32 pageIndex,
128                     Uint32 tupVersion,
129                     const Uint32* attrIds,
130                     Uint32 numAttrs,
131                     Uint32* dataOut)
132 {
133   jamEntry();
134   // use own variables instead of globals
135   FragrecordPtr fragPtr;
136   fragPtr.i= fragPtrI;
137   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
138   TablerecPtr tablePtr;
139   tablePtr.i= fragPtr.p->fragTableId;
140   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
141 
142   // search for tuple version if not original
143 
144   Operationrec tmpOp;
145   KeyReqStruct req_struct;
146   tmpOp.m_tuple_location.m_page_no= pageId;
147   tmpOp.m_tuple_location.m_page_idx= pageIndex;
148 
149   setup_fixed_part(&req_struct, &tmpOp, tablePtr.p);
150   Tuple_header *tuple_ptr= req_struct.m_tuple_ptr;
151   if (tuple_ptr->get_tuple_version() != tupVersion)
152   {
153     jam();
154     OperationrecPtr opPtr;
155     opPtr.i= tuple_ptr->m_operation_ptr_i;
156     Uint32 loopGuard= 0;
157     while (opPtr.i != RNIL) {
158       c_operation_pool.getPtr(opPtr);
159       if (opPtr.p->tupVersion == tupVersion) {
160 	jam();
161 	if (!opPtr.p->m_copy_tuple_location.isNull()) {
162 	  req_struct.m_tuple_ptr= (Tuple_header*)
163 	    c_undo_buffer.get_ptr(&opPtr.p->m_copy_tuple_location);
164         }
165 	break;
166       }
167       jam();
168       opPtr.i= opPtr.p->prevActiveOp;
169       ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS));
170     }
171   }
172   // read key attributes from found tuple version
173   // save globals
174   TablerecPtr tabptr_old= tabptr;
175   FragrecordPtr fragptr_old= fragptr;
176   OperationrecPtr operPtr_old= operPtr;
177   // new globals
178   tabptr= tablePtr;
179   fragptr= fragPtr;
180   operPtr.i= RNIL;
181   operPtr.p= NULL;
182   prepare_read(&req_struct, tablePtr.p, false);
183 
184   // do it
185   int ret = readAttributes(&req_struct,
186                            attrIds,
187                            numAttrs,
188                            dataOut,
189                            ZNIL,
190                            true);
191 
192   // restore globals
193   tabptr= tabptr_old;
194   fragptr= fragptr_old;
195   operPtr= operPtr_old;
196   // done
197   if (ret == -1) {
198     ret = terrorCode ? (-(int)terrorCode) : -1;
199   }
200   return ret;
201 }
202 int
tuxReadPk(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 * dataOut,bool xfrmFlag)203 Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
204 {
205   jamEntry();
206   // use own variables instead of globals
207   FragrecordPtr fragPtr;
208   fragPtr.i= fragPtrI;
209   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
210   TablerecPtr tablePtr;
211   tablePtr.i= fragPtr.p->fragTableId;
212   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
213 
214   Operationrec tmpOp;
215   tmpOp.m_tuple_location.m_page_no= pageId;
216   tmpOp.m_tuple_location.m_page_idx= pageIndex;
217 
218   KeyReqStruct req_struct;
219 
220   PagePtr page_ptr;
221   Uint32* ptr= get_ptr(&page_ptr, &tmpOp.m_tuple_location, tablePtr.p);
222   req_struct.m_page_ptr = page_ptr;
223   req_struct.m_tuple_ptr = (Tuple_header*)ptr;
224 
225   int ret = 0;
226   if (! (req_struct.m_tuple_ptr->m_header_bits & Tuple_header::FREE))
227   {
228     req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
229     req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
230 
231     Uint32 num_attr= tablePtr.p->m_no_of_attributes;
232     Uint32 descr_start= tablePtr.p->tabDescriptor;
233     TableDescriptor *tab_descr= &tableDescriptor[descr_start];
234     ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
235     req_struct.attr_descr= tab_descr;
236 
237     if(req_struct.m_tuple_ptr->m_header_bits & Tuple_header::ALLOC)
238     {
239       Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
240       Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
241       ndbassert(!opPtrP->m_copy_tuple_location.isNull());
242       req_struct.m_tuple_ptr= (Tuple_header*)
243 	c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
244     }
245     prepare_read(&req_struct, tablePtr.p, false);
246 
247     const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
248     const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
249     // read pk attributes from original tuple
250 
251     // save globals
252     TablerecPtr tabptr_old= tabptr;
253     FragrecordPtr fragptr_old= fragptr;
254     OperationrecPtr operPtr_old= operPtr;
255 
256     // new globals
257     tabptr= tablePtr;
258     fragptr= fragPtr;
259     operPtr.i= RNIL;
260     operPtr.p= NULL;
261 
262     // do it
263     ret = readAttributes(&req_struct,
264 			 attrIds,
265 			 numAttrs,
266 			 dataOut,
267 			 ZNIL,
268 			 xfrmFlag);
269     // restore globals
270     tabptr= tabptr_old;
271     fragptr= fragptr_old;
272     operPtr= operPtr_old;
273     // done
274     if (ret != -1) {
275       // remove headers
276       Uint32 n= 0;
277       Uint32 i= 0;
278       while (n < numAttrs) {
279 	const AttributeHeader ah(dataOut[i]);
280 	Uint32 size= ah.getDataSize();
281 	ndbrequire(size != 0);
282 	for (Uint32 j= 0; j < size; j++) {
283 	  dataOut[i + j - n]= dataOut[i + j + 1];
284 	}
285 	n+= 1;
286 	i+= 1 + size;
287       }
288       ndbrequire((int)i == ret);
289       ret -= numAttrs;
290     } else {
291       ret= terrorCode ? (-(int)terrorCode) : -1;
292     }
293   }
294   if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
295   {
296     dataOut[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
297   }
298   else
299   {
300     dataOut[ret] = 0;
301   }
302   return ret;
303 }
304 
305 int
accReadPk(Uint32 tableId,Uint32 fragId,Uint32 fragPageId,Uint32 pageIndex,Uint32 * dataOut,bool xfrmFlag)306 Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
307 {
308   jamEntry();
309   // get table
310   TablerecPtr tablePtr;
311   tablePtr.i = tableId;
312   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
313   // get fragment
314   FragrecordPtr fragPtr;
315   getFragmentrec(fragPtr, fragId, tablePtr.p);
316   // get real page id and tuple offset
317 
318   Uint32 pageId = getRealpid(fragPtr.p, fragPageId);
319   // use TUX routine - optimize later
320   int ret = tuxReadPk(fragPtr.i, pageId, pageIndex, dataOut, xfrmFlag);
321   return ret;
322 }
323 
324 /*
325  * TUX index contains all tuple versions.  A scan in TUX has scanned
326  * one of them and asks if it can be returned as scan result.  This
327  * depends on trans id, dirty read flag, and savepoint within trans.
328  *
329  * Previously this faked a ZREAD operation and used getPage().
330  * In TUP getPage() is run after ACC locking, but TUX comes here
331  * before ACC access.  Instead of modifying getPage() it is more
332  * clear to do the full check here.
333  */
334 bool
tuxQueryTh(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 tupVersion,Uint32 transId1,Uint32 transId2,bool dirty,Uint32 savepointId)335 Dbtup::tuxQueryTh(Uint32 fragPtrI,
336                   Uint32 pageId,
337                   Uint32 pageIndex,
338                   Uint32 tupVersion,
339                   Uint32 transId1,
340                   Uint32 transId2,
341                   bool dirty,
342                   Uint32 savepointId)
343 {
344   jamEntry();
345   FragrecordPtr fragPtr;
346   fragPtr.i= fragPtrI;
347   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
348   TablerecPtr tablePtr;
349   tablePtr.i= fragPtr.p->fragTableId;
350   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
351   PagePtr pagePtr;
352   pagePtr.i = pageId;
353   c_page_pool.getPtr(pagePtr);
354 
355   KeyReqStruct req_struct;
356 
357   {
358     Operationrec tmpOp;
359     tmpOp.m_tuple_location.m_page_no = pageId;
360     tmpOp.m_tuple_location.m_page_idx = pageIndex;
361     setup_fixed_part(&req_struct, &tmpOp, tablePtr.p);
362   }
363 
364   Tuple_header* tuple_ptr = req_struct.m_tuple_ptr;
365 
366   OperationrecPtr currOpPtr;
367   currOpPtr.i = tuple_ptr->m_operation_ptr_i;
368   if (currOpPtr.i == RNIL) {
369     jam();
370     // tuple has no operation, any scan can see it
371     return true;
372   }
373   c_operation_pool.getPtr(currOpPtr);
374 
375   const bool sameTrans =
376     c_lqh->is_same_trans(currOpPtr.p->userpointer, transId1, transId2);
377 
378   bool res = false;
379   OperationrecPtr loopOpPtr = currOpPtr;
380 
381   if (!sameTrans) {
382     jam();
383     if (!dirty) {
384       jam();
385       if (currOpPtr.p->nextActiveOp == RNIL) {
386         jam();
387         // last op - TUX makes ACC lock request in same timeslice
388         res = true;
389       }
390     }
391     else {
392       // loop to first op (returns false)
393       find_savepoint(loopOpPtr, 0);
394       const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
395 
396       if (op_type != ZINSERT) {
397         jam();
398         // read committed version
399         const Uint32 origVersion = tuple_ptr->get_tuple_version();
400         if (origVersion == tupVersion) {
401           jam();
402           res = true;
403         }
404       }
405     }
406   }
407   else {
408     jam();
409     // for own trans, ignore dirty flag
410 
411     if (find_savepoint(loopOpPtr, savepointId)) {
412       jam();
413       const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
414 
415       if (op_type != ZDELETE) {
416         jam();
417         // check if this op has produced the scanned version
418         Uint32 loopVersion = loopOpPtr.p->tupVersion;
419         if (loopVersion == tupVersion) {
420           jam();
421           res = true;
422         }
423       }
424     }
425   }
426 
427   return res;
428 }
429 
430 // ordered index build
431 
432 //#define TIME_MEASUREMENT
433 #ifdef TIME_MEASUREMENT
434   static Uint32 time_events;
435   NDB_TICKS tot_time_passed;
436   Uint32 number_events;
437 #endif
438 void
execBUILDINDXREQ(Signal * signal)439 Dbtup::execBUILDINDXREQ(Signal* signal)
440 {
441   jamEntry();
442 #ifdef TIME_MEASUREMENT
443   time_events= 0;
444   tot_time_passed= 0;
445   number_events= 1;
446 #endif
447   // get new operation
448   BuildIndexPtr buildPtr;
449   if (! c_buildIndexList.seize(buildPtr)) {
450     jam();
451     BuildIndexRec buildRec;
452     memcpy(buildRec.m_request, signal->theData, sizeof(buildRec.m_request));
453     buildRec.m_errorCode= BuildIndxRef::Busy;
454     buildIndexReply(signal, &buildRec);
455     return;
456   }
457   memcpy(buildPtr.p->m_request,
458          signal->theData,
459          sizeof(buildPtr.p->m_request));
460   // check
461   buildPtr.p->m_errorCode= BuildIndxRef::NoError;
462   do {
463     const BuildIndxReq* buildReq= (const BuildIndxReq*)buildPtr.p->m_request;
464     if (buildReq->getTableId() >= cnoOfTablerec) {
465       jam();
466       buildPtr.p->m_errorCode= BuildIndxRef::InvalidPrimaryTable;
467       break;
468     }
469     TablerecPtr tablePtr;
470     tablePtr.i= buildReq->getTableId();
471     ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
472     if (tablePtr.p->tableStatus != DEFINED) {
473       jam();
474       buildPtr.p->m_errorCode= BuildIndxRef::InvalidPrimaryTable;
475       break;
476     }
477     // memory page format
478     buildPtr.p->m_build_vs =
479       tablePtr.p->m_attributes[MM].m_no_of_varsize > 0;
480     if (DictTabInfo::isOrderedIndex(buildReq->getIndexType())) {
481       jam();
482       const DLList<TupTriggerData>& triggerList =
483 	tablePtr.p->tuxCustomTriggers;
484 
485       TriggerPtr triggerPtr;
486       triggerList.first(triggerPtr);
487       while (triggerPtr.i != RNIL) {
488 	if (triggerPtr.p->indexId == buildReq->getIndexId()) {
489 	  jam();
490 	  break;
491 	}
492 	triggerList.next(triggerPtr);
493       }
494       if (triggerPtr.i == RNIL) {
495 	jam();
496 	// trigger was not created
497 	buildPtr.p->m_errorCode = BuildIndxRef::InternalError;
498 	break;
499       }
500       buildPtr.p->m_indexId = buildReq->getIndexId();
501       buildPtr.p->m_buildRef = DBTUX;
502     } else if(buildReq->getIndexId() == RNIL) {
503       jam();
504       // REBUILD of acc
505       buildPtr.p->m_indexId = RNIL;
506       buildPtr.p->m_buildRef = DBACC;
507     } else {
508       jam();
509       buildPtr.p->m_errorCode = BuildIndxRef::InvalidIndexType;
510       break;
511     }
512 
513     // set to first tuple position
514     const Uint32 firstTupleNo = 0;
515     buildPtr.p->m_fragNo= 0;
516     buildPtr.p->m_pageId= 0;
517     buildPtr.p->m_tupleNo= firstTupleNo;
518     // start build
519     buildIndex(signal, buildPtr.i);
520     return;
521   } while (0);
522   // check failed
523   buildIndexReply(signal, buildPtr.p);
524   c_buildIndexList.release(buildPtr);
525 }
526 
527 void
buildIndex(Signal * signal,Uint32 buildPtrI)528 Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
529 {
530   // get build record
531   BuildIndexPtr buildPtr;
532   buildPtr.i= buildPtrI;
533   c_buildIndexList.getPtr(buildPtr);
534   const BuildIndxReq* buildReq= (const BuildIndxReq*)buildPtr.p->m_request;
535   // get table
536   TablerecPtr tablePtr;
537   tablePtr.i= buildReq->getTableId();
538   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
539 
540   const Uint32 firstTupleNo = 0;
541   const Uint32 tupheadsize = tablePtr.p->m_offsets[MM].m_fix_header_size;
542 
543 #ifdef TIME_MEASUREMENT
544   MicroSecondTimer start;
545   MicroSecondTimer stop;
546   NDB_TICKS time_passed;
547 #endif
548   do {
549     // get fragment
550     FragrecordPtr fragPtr;
551     if (buildPtr.p->m_fragNo == MAX_FRAG_PER_NODE) {
552       jam();
553       // build ready
554       buildIndexReply(signal, buildPtr.p);
555       c_buildIndexList.release(buildPtr);
556       return;
557     }
558     ndbrequire(buildPtr.p->m_fragNo < MAX_FRAG_PER_NODE);
559     fragPtr.i= tablePtr.p->fragrec[buildPtr.p->m_fragNo];
560     if (fragPtr.i == RNIL) {
561       jam();
562       buildPtr.p->m_fragNo++;
563       buildPtr.p->m_pageId= 0;
564       buildPtr.p->m_tupleNo= firstTupleNo;
565       break;
566     }
567     ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
568     // get page
569     PagePtr pagePtr;
570     if (buildPtr.p->m_pageId >= fragPtr.p->noOfPages) {
571       jam();
572       buildPtr.p->m_fragNo++;
573       buildPtr.p->m_pageId= 0;
574       buildPtr.p->m_tupleNo= firstTupleNo;
575       break;
576     }
577     Uint32 realPageId= getRealpid(fragPtr.p, buildPtr.p->m_pageId);
578     c_page_pool.getPtr(pagePtr, realPageId);
579     Uint32 pageState= pagePtr.p->page_state;
580     // skip empty page
581     if (pageState == ZEMPTY_MM) {
582       jam();
583       buildPtr.p->m_pageId++;
584       buildPtr.p->m_tupleNo= firstTupleNo;
585       break;
586     }
587     // get tuple
588     Uint32 pageIndex = ~0;
589     const Tuple_header* tuple_ptr = 0;
590     pageIndex = buildPtr.p->m_tupleNo * tupheadsize;
591     if (pageIndex + tupheadsize > Fix_page::DATA_WORDS) {
592       jam();
593       buildPtr.p->m_pageId++;
594       buildPtr.p->m_tupleNo= firstTupleNo;
595       break;
596     }
597     tuple_ptr = (Tuple_header*)&pagePtr.p->m_data[pageIndex];
598     // skip over free tuple
599     if (tuple_ptr->m_header_bits & Tuple_header::FREE) {
600       jam();
601       buildPtr.p->m_tupleNo++;
602       break;
603     }
604     Uint32 tupVersion= tuple_ptr->get_tuple_version();
605     OperationrecPtr pageOperPtr;
606     pageOperPtr.i= tuple_ptr->m_operation_ptr_i;
607 #ifdef TIME_MEASUREMENT
608     NdbTick_getMicroTimer(&start);
609 #endif
610     // add to index
611     TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
612     req->errorCode = RNIL;
613     req->tableId = tablePtr.i;
614     req->indexId = buildPtr.p->m_indexId;
615     req->fragId = tablePtr.p->fragid[buildPtr.p->m_fragNo];
616     req->pageId = realPageId;
617     req->tupVersion = tupVersion;
618     req->opInfo = TuxMaintReq::OpAdd;
619     req->tupFragPtrI = fragPtr.i;
620     req->fragPageId = buildPtr.p->m_pageId;
621     req->pageIndex = pageIndex;
622 
623     if (pageOperPtr.i == RNIL)
624     {
625       EXECUTE_DIRECT(buildPtr.p->m_buildRef, GSN_TUX_MAINT_REQ,
626 		     signal, TuxMaintReq::SignalLength+2);
627     }
628     else
629     {
630       /*
631       If there is an ongoing operation on the tuple then it is either a
632       copy tuple or an original tuple with an ongoing transaction. In
633       both cases realPageId and pageOffset refer to the original tuple.
634       The tuple address stored in TUX will always be the original tuple
635       but with the tuple version of the tuple we found.
636 
637       This is necessary to avoid having to update TUX at abort of
638       update. If an update aborts then the copy tuple is copied to
639       the original tuple. The build will however have found that
640       tuple as a copy tuple. The original tuple is stable and is thus
641       preferrable to store in TUX.
642       */
643       jam();
644 
645       /**
646        * Since copy tuples now can't be found on real pages.
647        *   we will here build all copies of the tuple
648        *
649        * Note only "real" tupVersion's should be added
650        *      i.e delete's shouldnt be added
651        *      (unless it's the first op, when "original" should be added)
652        */
653       do
654       {
655 	c_operation_pool.getPtr(pageOperPtr);
656 	if(pageOperPtr.p->op_struct.op_type != ZDELETE ||
657 	   pageOperPtr.p->is_first_operation())
658 	{
659 	  req->errorCode = RNIL;
660 	  req->tupVersion= pageOperPtr.p->tupVersion;
661 	  EXECUTE_DIRECT(buildPtr.p->m_buildRef, GSN_TUX_MAINT_REQ,
662 			 signal, TuxMaintReq::SignalLength+2);
663 	}
664 	else
665 	{
666 	  req->errorCode= 0;
667 	}
668 	pageOperPtr.i= pageOperPtr.p->prevActiveOp;
669       } while(req->errorCode == 0 && pageOperPtr.i != RNIL);
670     }
671 
672     jamEntry();
673     if (req->errorCode != 0) {
674       switch (req->errorCode) {
675       case TuxMaintReq::NoMemError:
676         jam();
677         buildPtr.p->m_errorCode= BuildIndxRef::AllocationFailure;
678         break;
679       default:
680         ndbrequire(false);
681         break;
682       }
683       buildIndexReply(signal, buildPtr.p);
684       c_buildIndexList.release(buildPtr);
685       return;
686     }
687 #ifdef TIME_MEASUREMENT
688     NdbTick_getMicroTimer(&stop);
689     time_passed= NdbTick_getMicrosPassed(start, stop);
690     if (time_passed < 1000) {
691       time_events++;
692       tot_time_passed += time_passed;
693       if (time_events == number_events) {
694         NDB_TICKS mean_time_passed= tot_time_passed /
695                                      (NDB_TICKS)number_events;
696         ndbout << "Number of events= " << number_events;
697         ndbout << " Mean time passed= " << mean_time_passed << endl;
698         number_events <<= 1;
699         tot_time_passed= (NDB_TICKS)0;
700         time_events= 0;
701       }
702     }
703 #endif
704     // next tuple
705     buildPtr.p->m_tupleNo++;
706     break;
707   } while (0);
708   signal->theData[0]= ZBUILD_INDEX;
709   signal->theData[1]= buildPtr.i;
710   sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
711 }
712 
713 void
buildIndexReply(Signal * signal,const BuildIndexRec * buildPtrP)714 Dbtup::buildIndexReply(Signal* signal, const BuildIndexRec* buildPtrP)
715 {
716   const BuildIndxReq* const buildReq=
717                     (const BuildIndxReq*)buildPtrP->m_request;
718   // conf is subset of ref
719   BuildIndxRef* rep= (BuildIndxRef*)signal->getDataPtr();
720   rep->setUserRef(buildReq->getUserRef());
721   rep->setConnectionPtr(buildReq->getConnectionPtr());
722   rep->setRequestType(buildReq->getRequestType());
723   rep->setTableId(buildReq->getTableId());
724   rep->setIndexType(buildReq->getIndexType());
725   rep->setIndexId(buildReq->getIndexId());
726   // conf
727   if (buildPtrP->m_errorCode == BuildIndxRef::NoError) {
728     jam();
729     sendSignal(rep->getUserRef(), GSN_BUILDINDXCONF,
730         signal, BuildIndxConf::SignalLength, JBB);
731     return;
732   }
733   // ref
734   rep->setErrorCode(buildPtrP->m_errorCode);
735   sendSignal(rep->getUserRef(), GSN_BUILDINDXREF,
736       signal, BuildIndxRef::SignalLength, JBB);
737 }
738