1 /* Copyright (c) 2003-2008 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #define DBTUP_C
18 #define DBTUP_INDEX_CPP
19 #include <Dblqh.hpp>
20 #include "Dbtup.hpp"
21 #include <RefConvert.hpp>
22 #include <ndb_limits.h>
23 #include <pc.hpp>
24 #include <AttributeDescriptor.hpp>
25 #include "AttributeOffset.hpp"
26 #include <AttributeHeader.hpp>
27 #include <signaldata/TuxMaint.hpp>
28
29 // methods used by ordered index
30
31 void
tuxGetTupAddr(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 & tupAddr)32 Dbtup::tuxGetTupAddr(Uint32 fragPtrI,
33 Uint32 pageId,
34 Uint32 pageIndex,
35 Uint32& tupAddr)
36 {
37 jamEntry();
38 PagePtr pagePtr;
39 c_page_pool.getPtr(pagePtr, pageId);
40 Uint32 fragPageId= pagePtr.p->frag_page_id;
41 tupAddr= (fragPageId << MAX_TUPLES_BITS) | pageIndex;
42 }
43
44 int
tuxAllocNode(Signal * signal,Uint32 fragPtrI,Uint32 & pageId,Uint32 & pageOffset,Uint32 * & node)45 Dbtup::tuxAllocNode(Signal* signal,
46 Uint32 fragPtrI,
47 Uint32& pageId,
48 Uint32& pageOffset,
49 Uint32*& node)
50 {
51 jamEntry();
52 FragrecordPtr fragPtr;
53 fragPtr.i= fragPtrI;
54 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
55 TablerecPtr tablePtr;
56 tablePtr.i= fragPtr.p->fragTableId;
57 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
58 terrorCode= 0;
59
60 Local_key key;
61 Uint32* ptr, frag_page_id;
62 if ((ptr= alloc_fix_rec(fragPtr.p, tablePtr.p, &key, &frag_page_id)) == 0)
63 {
64 jam();
65 terrorCode = ZMEM_NOMEM_ERROR; // caller sets error
66 return terrorCode;
67 }
68 pageId= key.m_page_no;
69 pageOffset= key.m_page_idx;
70 Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
71 Uint32 attrDataOffset= AttributeOffset::getOffset(
72 tableDescriptor[attrDescIndex + 1].tabDescr);
73 node= ptr + attrDataOffset;
74 return 0;
75 }
76
77 #if 0
78 void
79 Dbtup::tuxFreeNode(Signal* signal,
80 Uint32 fragPtrI,
81 Uint32 pageId,
82 Uint32 pageOffset,
83 Uint32* node)
84 {
85 jamEntry();
86 FragrecordPtr fragPtr;
87 fragPtr.i= fragPtrI;
88 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
89 TablerecPtr tablePtr;
90 tablePtr.i= fragPtr.p->fragTableId;
91 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
92 PagePtr pagePtr;
93 pagePtr.i= pageId;
94 ptrCheckGuard(pagePtr, cnoOfPage, cpage);
95 Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
96 Uint32 attrDataOffset= AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
97 ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset);
98 freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset);
99 }
100 #endif
101
102 void
tuxGetNode(Uint32 fragPtrI,Uint32 pageId,Uint32 pageOffset,Uint32 * & node)103 Dbtup::tuxGetNode(Uint32 fragPtrI,
104 Uint32 pageId,
105 Uint32 pageOffset,
106 Uint32*& node)
107 {
108 jamEntry();
109 FragrecordPtr fragPtr;
110 fragPtr.i= fragPtrI;
111 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
112 TablerecPtr tablePtr;
113 tablePtr.i= fragPtr.p->fragTableId;
114 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
115 PagePtr pagePtr;
116 c_page_pool.getPtr(pagePtr, pageId);
117 Uint32 attrDescIndex= tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
118 Uint32 attrDataOffset= AttributeOffset::getOffset(
119 tableDescriptor[attrDescIndex + 1].tabDescr);
120 node= ((Fix_page*)pagePtr.p)->
121 get_ptr(pageOffset, tablePtr.p->m_offsets[MM].m_fix_header_size) +
122 attrDataOffset;
123 }
124 int
tuxReadAttrs(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 tupVersion,const Uint32 * attrIds,Uint32 numAttrs,Uint32 * dataOut)125 Dbtup::tuxReadAttrs(Uint32 fragPtrI,
126 Uint32 pageId,
127 Uint32 pageIndex,
128 Uint32 tupVersion,
129 const Uint32* attrIds,
130 Uint32 numAttrs,
131 Uint32* dataOut)
132 {
133 jamEntry();
134 // use own variables instead of globals
135 FragrecordPtr fragPtr;
136 fragPtr.i= fragPtrI;
137 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
138 TablerecPtr tablePtr;
139 tablePtr.i= fragPtr.p->fragTableId;
140 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
141
142 // search for tuple version if not original
143
144 Operationrec tmpOp;
145 KeyReqStruct req_struct;
146 tmpOp.m_tuple_location.m_page_no= pageId;
147 tmpOp.m_tuple_location.m_page_idx= pageIndex;
148
149 setup_fixed_part(&req_struct, &tmpOp, tablePtr.p);
150 Tuple_header *tuple_ptr= req_struct.m_tuple_ptr;
151 if (tuple_ptr->get_tuple_version() != tupVersion)
152 {
153 jam();
154 OperationrecPtr opPtr;
155 opPtr.i= tuple_ptr->m_operation_ptr_i;
156 Uint32 loopGuard= 0;
157 while (opPtr.i != RNIL) {
158 c_operation_pool.getPtr(opPtr);
159 if (opPtr.p->tupVersion == tupVersion) {
160 jam();
161 if (!opPtr.p->m_copy_tuple_location.isNull()) {
162 req_struct.m_tuple_ptr= (Tuple_header*)
163 c_undo_buffer.get_ptr(&opPtr.p->m_copy_tuple_location);
164 }
165 break;
166 }
167 jam();
168 opPtr.i= opPtr.p->prevActiveOp;
169 ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS));
170 }
171 }
172 // read key attributes from found tuple version
173 // save globals
174 TablerecPtr tabptr_old= tabptr;
175 FragrecordPtr fragptr_old= fragptr;
176 OperationrecPtr operPtr_old= operPtr;
177 // new globals
178 tabptr= tablePtr;
179 fragptr= fragPtr;
180 operPtr.i= RNIL;
181 operPtr.p= NULL;
182 prepare_read(&req_struct, tablePtr.p, false);
183
184 // do it
185 int ret = readAttributes(&req_struct,
186 attrIds,
187 numAttrs,
188 dataOut,
189 ZNIL,
190 true);
191
192 // restore globals
193 tabptr= tabptr_old;
194 fragptr= fragptr_old;
195 operPtr= operPtr_old;
196 // done
197 if (ret == -1) {
198 ret = terrorCode ? (-(int)terrorCode) : -1;
199 }
200 return ret;
201 }
202 int
tuxReadPk(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 * dataOut,bool xfrmFlag)203 Dbtup::tuxReadPk(Uint32 fragPtrI, Uint32 pageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
204 {
205 jamEntry();
206 // use own variables instead of globals
207 FragrecordPtr fragPtr;
208 fragPtr.i= fragPtrI;
209 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
210 TablerecPtr tablePtr;
211 tablePtr.i= fragPtr.p->fragTableId;
212 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
213
214 Operationrec tmpOp;
215 tmpOp.m_tuple_location.m_page_no= pageId;
216 tmpOp.m_tuple_location.m_page_idx= pageIndex;
217
218 KeyReqStruct req_struct;
219
220 PagePtr page_ptr;
221 Uint32* ptr= get_ptr(&page_ptr, &tmpOp.m_tuple_location, tablePtr.p);
222 req_struct.m_page_ptr = page_ptr;
223 req_struct.m_tuple_ptr = (Tuple_header*)ptr;
224
225 int ret = 0;
226 if (! (req_struct.m_tuple_ptr->m_header_bits & Tuple_header::FREE))
227 {
228 req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
229 req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
230
231 Uint32 num_attr= tablePtr.p->m_no_of_attributes;
232 Uint32 descr_start= tablePtr.p->tabDescriptor;
233 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
234 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
235 req_struct.attr_descr= tab_descr;
236
237 if(req_struct.m_tuple_ptr->m_header_bits & Tuple_header::ALLOC)
238 {
239 Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
240 Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
241 ndbassert(!opPtrP->m_copy_tuple_location.isNull());
242 req_struct.m_tuple_ptr= (Tuple_header*)
243 c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
244 }
245 prepare_read(&req_struct, tablePtr.p, false);
246
247 const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
248 const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
249 // read pk attributes from original tuple
250
251 // save globals
252 TablerecPtr tabptr_old= tabptr;
253 FragrecordPtr fragptr_old= fragptr;
254 OperationrecPtr operPtr_old= operPtr;
255
256 // new globals
257 tabptr= tablePtr;
258 fragptr= fragPtr;
259 operPtr.i= RNIL;
260 operPtr.p= NULL;
261
262 // do it
263 ret = readAttributes(&req_struct,
264 attrIds,
265 numAttrs,
266 dataOut,
267 ZNIL,
268 xfrmFlag);
269 // restore globals
270 tabptr= tabptr_old;
271 fragptr= fragptr_old;
272 operPtr= operPtr_old;
273 // done
274 if (ret != -1) {
275 // remove headers
276 Uint32 n= 0;
277 Uint32 i= 0;
278 while (n < numAttrs) {
279 const AttributeHeader ah(dataOut[i]);
280 Uint32 size= ah.getDataSize();
281 ndbrequire(size != 0);
282 for (Uint32 j= 0; j < size; j++) {
283 dataOut[i + j - n]= dataOut[i + j + 1];
284 }
285 n+= 1;
286 i+= 1 + size;
287 }
288 ndbrequire((int)i == ret);
289 ret -= numAttrs;
290 } else {
291 ret= terrorCode ? (-(int)terrorCode) : -1;
292 }
293 }
294 if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
295 {
296 dataOut[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
297 }
298 else
299 {
300 dataOut[ret] = 0;
301 }
302 return ret;
303 }
304
305 int
accReadPk(Uint32 tableId,Uint32 fragId,Uint32 fragPageId,Uint32 pageIndex,Uint32 * dataOut,bool xfrmFlag)306 Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIndex, Uint32* dataOut, bool xfrmFlag)
307 {
308 jamEntry();
309 // get table
310 TablerecPtr tablePtr;
311 tablePtr.i = tableId;
312 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
313 // get fragment
314 FragrecordPtr fragPtr;
315 getFragmentrec(fragPtr, fragId, tablePtr.p);
316 // get real page id and tuple offset
317
318 Uint32 pageId = getRealpid(fragPtr.p, fragPageId);
319 // use TUX routine - optimize later
320 int ret = tuxReadPk(fragPtr.i, pageId, pageIndex, dataOut, xfrmFlag);
321 return ret;
322 }
323
324 /*
325 * TUX index contains all tuple versions. A scan in TUX has scanned
326 * one of them and asks if it can be returned as scan result. This
327 * depends on trans id, dirty read flag, and savepoint within trans.
328 *
329 * Previously this faked a ZREAD operation and used getPage().
330 * In TUP getPage() is run after ACC locking, but TUX comes here
331 * before ACC access. Instead of modifying getPage() it is more
332 * clear to do the full check here.
333 */
334 bool
tuxQueryTh(Uint32 fragPtrI,Uint32 pageId,Uint32 pageIndex,Uint32 tupVersion,Uint32 transId1,Uint32 transId2,bool dirty,Uint32 savepointId)335 Dbtup::tuxQueryTh(Uint32 fragPtrI,
336 Uint32 pageId,
337 Uint32 pageIndex,
338 Uint32 tupVersion,
339 Uint32 transId1,
340 Uint32 transId2,
341 bool dirty,
342 Uint32 savepointId)
343 {
344 jamEntry();
345 FragrecordPtr fragPtr;
346 fragPtr.i= fragPtrI;
347 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
348 TablerecPtr tablePtr;
349 tablePtr.i= fragPtr.p->fragTableId;
350 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
351 PagePtr pagePtr;
352 pagePtr.i = pageId;
353 c_page_pool.getPtr(pagePtr);
354
355 KeyReqStruct req_struct;
356
357 {
358 Operationrec tmpOp;
359 tmpOp.m_tuple_location.m_page_no = pageId;
360 tmpOp.m_tuple_location.m_page_idx = pageIndex;
361 setup_fixed_part(&req_struct, &tmpOp, tablePtr.p);
362 }
363
364 Tuple_header* tuple_ptr = req_struct.m_tuple_ptr;
365
366 OperationrecPtr currOpPtr;
367 currOpPtr.i = tuple_ptr->m_operation_ptr_i;
368 if (currOpPtr.i == RNIL) {
369 jam();
370 // tuple has no operation, any scan can see it
371 return true;
372 }
373 c_operation_pool.getPtr(currOpPtr);
374
375 const bool sameTrans =
376 c_lqh->is_same_trans(currOpPtr.p->userpointer, transId1, transId2);
377
378 bool res = false;
379 OperationrecPtr loopOpPtr = currOpPtr;
380
381 if (!sameTrans) {
382 jam();
383 if (!dirty) {
384 jam();
385 if (currOpPtr.p->nextActiveOp == RNIL) {
386 jam();
387 // last op - TUX makes ACC lock request in same timeslice
388 res = true;
389 }
390 }
391 else {
392 // loop to first op (returns false)
393 find_savepoint(loopOpPtr, 0);
394 const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
395
396 if (op_type != ZINSERT) {
397 jam();
398 // read committed version
399 const Uint32 origVersion = tuple_ptr->get_tuple_version();
400 if (origVersion == tupVersion) {
401 jam();
402 res = true;
403 }
404 }
405 }
406 }
407 else {
408 jam();
409 // for own trans, ignore dirty flag
410
411 if (find_savepoint(loopOpPtr, savepointId)) {
412 jam();
413 const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
414
415 if (op_type != ZDELETE) {
416 jam();
417 // check if this op has produced the scanned version
418 Uint32 loopVersion = loopOpPtr.p->tupVersion;
419 if (loopVersion == tupVersion) {
420 jam();
421 res = true;
422 }
423 }
424 }
425 }
426
427 return res;
428 }
429
430 // ordered index build
431
432 //#define TIME_MEASUREMENT
433 #ifdef TIME_MEASUREMENT
434 static Uint32 time_events;
435 NDB_TICKS tot_time_passed;
436 Uint32 number_events;
437 #endif
438 void
execBUILDINDXREQ(Signal * signal)439 Dbtup::execBUILDINDXREQ(Signal* signal)
440 {
441 jamEntry();
442 #ifdef TIME_MEASUREMENT
443 time_events= 0;
444 tot_time_passed= 0;
445 number_events= 1;
446 #endif
447 // get new operation
448 BuildIndexPtr buildPtr;
449 if (! c_buildIndexList.seize(buildPtr)) {
450 jam();
451 BuildIndexRec buildRec;
452 memcpy(buildRec.m_request, signal->theData, sizeof(buildRec.m_request));
453 buildRec.m_errorCode= BuildIndxRef::Busy;
454 buildIndexReply(signal, &buildRec);
455 return;
456 }
457 memcpy(buildPtr.p->m_request,
458 signal->theData,
459 sizeof(buildPtr.p->m_request));
460 // check
461 buildPtr.p->m_errorCode= BuildIndxRef::NoError;
462 do {
463 const BuildIndxReq* buildReq= (const BuildIndxReq*)buildPtr.p->m_request;
464 if (buildReq->getTableId() >= cnoOfTablerec) {
465 jam();
466 buildPtr.p->m_errorCode= BuildIndxRef::InvalidPrimaryTable;
467 break;
468 }
469 TablerecPtr tablePtr;
470 tablePtr.i= buildReq->getTableId();
471 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
472 if (tablePtr.p->tableStatus != DEFINED) {
473 jam();
474 buildPtr.p->m_errorCode= BuildIndxRef::InvalidPrimaryTable;
475 break;
476 }
477 // memory page format
478 buildPtr.p->m_build_vs =
479 tablePtr.p->m_attributes[MM].m_no_of_varsize > 0;
480 if (DictTabInfo::isOrderedIndex(buildReq->getIndexType())) {
481 jam();
482 const DLList<TupTriggerData>& triggerList =
483 tablePtr.p->tuxCustomTriggers;
484
485 TriggerPtr triggerPtr;
486 triggerList.first(triggerPtr);
487 while (triggerPtr.i != RNIL) {
488 if (triggerPtr.p->indexId == buildReq->getIndexId()) {
489 jam();
490 break;
491 }
492 triggerList.next(triggerPtr);
493 }
494 if (triggerPtr.i == RNIL) {
495 jam();
496 // trigger was not created
497 buildPtr.p->m_errorCode = BuildIndxRef::InternalError;
498 break;
499 }
500 buildPtr.p->m_indexId = buildReq->getIndexId();
501 buildPtr.p->m_buildRef = DBTUX;
502 } else if(buildReq->getIndexId() == RNIL) {
503 jam();
504 // REBUILD of acc
505 buildPtr.p->m_indexId = RNIL;
506 buildPtr.p->m_buildRef = DBACC;
507 } else {
508 jam();
509 buildPtr.p->m_errorCode = BuildIndxRef::InvalidIndexType;
510 break;
511 }
512
513 // set to first tuple position
514 const Uint32 firstTupleNo = 0;
515 buildPtr.p->m_fragNo= 0;
516 buildPtr.p->m_pageId= 0;
517 buildPtr.p->m_tupleNo= firstTupleNo;
518 // start build
519 buildIndex(signal, buildPtr.i);
520 return;
521 } while (0);
522 // check failed
523 buildIndexReply(signal, buildPtr.p);
524 c_buildIndexList.release(buildPtr);
525 }
526
527 void
buildIndex(Signal * signal,Uint32 buildPtrI)528 Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
529 {
530 // get build record
531 BuildIndexPtr buildPtr;
532 buildPtr.i= buildPtrI;
533 c_buildIndexList.getPtr(buildPtr);
534 const BuildIndxReq* buildReq= (const BuildIndxReq*)buildPtr.p->m_request;
535 // get table
536 TablerecPtr tablePtr;
537 tablePtr.i= buildReq->getTableId();
538 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
539
540 const Uint32 firstTupleNo = 0;
541 const Uint32 tupheadsize = tablePtr.p->m_offsets[MM].m_fix_header_size;
542
543 #ifdef TIME_MEASUREMENT
544 MicroSecondTimer start;
545 MicroSecondTimer stop;
546 NDB_TICKS time_passed;
547 #endif
548 do {
549 // get fragment
550 FragrecordPtr fragPtr;
551 if (buildPtr.p->m_fragNo == MAX_FRAG_PER_NODE) {
552 jam();
553 // build ready
554 buildIndexReply(signal, buildPtr.p);
555 c_buildIndexList.release(buildPtr);
556 return;
557 }
558 ndbrequire(buildPtr.p->m_fragNo < MAX_FRAG_PER_NODE);
559 fragPtr.i= tablePtr.p->fragrec[buildPtr.p->m_fragNo];
560 if (fragPtr.i == RNIL) {
561 jam();
562 buildPtr.p->m_fragNo++;
563 buildPtr.p->m_pageId= 0;
564 buildPtr.p->m_tupleNo= firstTupleNo;
565 break;
566 }
567 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
568 // get page
569 PagePtr pagePtr;
570 if (buildPtr.p->m_pageId >= fragPtr.p->noOfPages) {
571 jam();
572 buildPtr.p->m_fragNo++;
573 buildPtr.p->m_pageId= 0;
574 buildPtr.p->m_tupleNo= firstTupleNo;
575 break;
576 }
577 Uint32 realPageId= getRealpid(fragPtr.p, buildPtr.p->m_pageId);
578 c_page_pool.getPtr(pagePtr, realPageId);
579 Uint32 pageState= pagePtr.p->page_state;
580 // skip empty page
581 if (pageState == ZEMPTY_MM) {
582 jam();
583 buildPtr.p->m_pageId++;
584 buildPtr.p->m_tupleNo= firstTupleNo;
585 break;
586 }
587 // get tuple
588 Uint32 pageIndex = ~0;
589 const Tuple_header* tuple_ptr = 0;
590 pageIndex = buildPtr.p->m_tupleNo * tupheadsize;
591 if (pageIndex + tupheadsize > Fix_page::DATA_WORDS) {
592 jam();
593 buildPtr.p->m_pageId++;
594 buildPtr.p->m_tupleNo= firstTupleNo;
595 break;
596 }
597 tuple_ptr = (Tuple_header*)&pagePtr.p->m_data[pageIndex];
598 // skip over free tuple
599 if (tuple_ptr->m_header_bits & Tuple_header::FREE) {
600 jam();
601 buildPtr.p->m_tupleNo++;
602 break;
603 }
604 Uint32 tupVersion= tuple_ptr->get_tuple_version();
605 OperationrecPtr pageOperPtr;
606 pageOperPtr.i= tuple_ptr->m_operation_ptr_i;
607 #ifdef TIME_MEASUREMENT
608 NdbTick_getMicroTimer(&start);
609 #endif
610 // add to index
611 TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
612 req->errorCode = RNIL;
613 req->tableId = tablePtr.i;
614 req->indexId = buildPtr.p->m_indexId;
615 req->fragId = tablePtr.p->fragid[buildPtr.p->m_fragNo];
616 req->pageId = realPageId;
617 req->tupVersion = tupVersion;
618 req->opInfo = TuxMaintReq::OpAdd;
619 req->tupFragPtrI = fragPtr.i;
620 req->fragPageId = buildPtr.p->m_pageId;
621 req->pageIndex = pageIndex;
622
623 if (pageOperPtr.i == RNIL)
624 {
625 EXECUTE_DIRECT(buildPtr.p->m_buildRef, GSN_TUX_MAINT_REQ,
626 signal, TuxMaintReq::SignalLength+2);
627 }
628 else
629 {
630 /*
631 If there is an ongoing operation on the tuple then it is either a
632 copy tuple or an original tuple with an ongoing transaction. In
633 both cases realPageId and pageOffset refer to the original tuple.
634 The tuple address stored in TUX will always be the original tuple
635 but with the tuple version of the tuple we found.
636
637 This is necessary to avoid having to update TUX at abort of
638 update. If an update aborts then the copy tuple is copied to
639 the original tuple. The build will however have found that
640 tuple as a copy tuple. The original tuple is stable and is thus
641 preferrable to store in TUX.
642 */
643 jam();
644
645 /**
646 * Since copy tuples now can't be found on real pages.
647 * we will here build all copies of the tuple
648 *
649 * Note only "real" tupVersion's should be added
650 * i.e delete's shouldnt be added
651 * (unless it's the first op, when "original" should be added)
652 */
653 do
654 {
655 c_operation_pool.getPtr(pageOperPtr);
656 if(pageOperPtr.p->op_struct.op_type != ZDELETE ||
657 pageOperPtr.p->is_first_operation())
658 {
659 req->errorCode = RNIL;
660 req->tupVersion= pageOperPtr.p->tupVersion;
661 EXECUTE_DIRECT(buildPtr.p->m_buildRef, GSN_TUX_MAINT_REQ,
662 signal, TuxMaintReq::SignalLength+2);
663 }
664 else
665 {
666 req->errorCode= 0;
667 }
668 pageOperPtr.i= pageOperPtr.p->prevActiveOp;
669 } while(req->errorCode == 0 && pageOperPtr.i != RNIL);
670 }
671
672 jamEntry();
673 if (req->errorCode != 0) {
674 switch (req->errorCode) {
675 case TuxMaintReq::NoMemError:
676 jam();
677 buildPtr.p->m_errorCode= BuildIndxRef::AllocationFailure;
678 break;
679 default:
680 ndbrequire(false);
681 break;
682 }
683 buildIndexReply(signal, buildPtr.p);
684 c_buildIndexList.release(buildPtr);
685 return;
686 }
687 #ifdef TIME_MEASUREMENT
688 NdbTick_getMicroTimer(&stop);
689 time_passed= NdbTick_getMicrosPassed(start, stop);
690 if (time_passed < 1000) {
691 time_events++;
692 tot_time_passed += time_passed;
693 if (time_events == number_events) {
694 NDB_TICKS mean_time_passed= tot_time_passed /
695 (NDB_TICKS)number_events;
696 ndbout << "Number of events= " << number_events;
697 ndbout << " Mean time passed= " << mean_time_passed << endl;
698 number_events <<= 1;
699 tot_time_passed= (NDB_TICKS)0;
700 time_events= 0;
701 }
702 }
703 #endif
704 // next tuple
705 buildPtr.p->m_tupleNo++;
706 break;
707 } while (0);
708 signal->theData[0]= ZBUILD_INDEX;
709 signal->theData[1]= buildPtr.i;
710 sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
711 }
712
713 void
buildIndexReply(Signal * signal,const BuildIndexRec * buildPtrP)714 Dbtup::buildIndexReply(Signal* signal, const BuildIndexRec* buildPtrP)
715 {
716 const BuildIndxReq* const buildReq=
717 (const BuildIndxReq*)buildPtrP->m_request;
718 // conf is subset of ref
719 BuildIndxRef* rep= (BuildIndxRef*)signal->getDataPtr();
720 rep->setUserRef(buildReq->getUserRef());
721 rep->setConnectionPtr(buildReq->getConnectionPtr());
722 rep->setRequestType(buildReq->getRequestType());
723 rep->setTableId(buildReq->getTableId());
724 rep->setIndexType(buildReq->getIndexType());
725 rep->setIndexId(buildReq->getIndexId());
726 // conf
727 if (buildPtrP->m_errorCode == BuildIndxRef::NoError) {
728 jam();
729 sendSignal(rep->getUserRef(), GSN_BUILDINDXCONF,
730 signal, BuildIndxConf::SignalLength, JBB);
731 return;
732 }
733 // ref
734 rep->setErrorCode(buildPtrP->m_errorCode);
735 sendSignal(rep->getUserRef(), GSN_BUILDINDXREF,
736 signal, BuildIndxRef::SignalLength, JBB);
737 }
738