1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #define DBTUP_C
27 #include <dblqh/Dblqh.hpp>
28 #include "Dbtup.hpp"
29 #include <RefConvert.hpp>
30 #include <ndb_limits.h>
31 #include <pc.hpp>
32 #include <AttributeDescriptor.hpp>
33 #include "AttributeOffset.hpp"
34 #include <AttributeHeader.hpp>
35 #include <Interpreter.hpp>
36 #include <signaldata/TupKey.hpp>
37 #include <signaldata/AttrInfo.hpp>
38 #include <signaldata/TuxMaint.hpp>
39 #include <signaldata/ScanFrag.hpp>
40 #include <NdbSqlUtil.hpp>
41 #include <Checksum.hpp>
42 #include <portlib/ndb_prefetch.h>
43
44 #define JAM_FILE_ID 422
45
46
47 // #define TRACE_INTERPRETER
48
49 /* For debugging */
50 static void
dump_hex(const Uint32 * p,Uint32 len)51 dump_hex(const Uint32 *p, Uint32 len)
52 {
53 if(len > 2560)
54 len= 160;
55 if(len==0)
56 return;
57 for(;;)
58 {
59 if(len>=4)
60 ndbout_c("%8p %08X %08X %08X %08X", p, p[0], p[1], p[2], p[3]);
61 else if(len>=3)
62 ndbout_c("%8p %08X %08X %08X", p, p[0], p[1], p[2]);
63 else if(len>=2)
64 ndbout_c("%8p %08X %08X", p, p[0], p[1]);
65 else
66 ndbout_c("%8p %08X", p, p[0]);
67 if(len <= 4)
68 break;
69 len-= 4;
70 p+= 4;
71 }
72 }
73
74 /**
75 * getStoredProcAttrInfo
76 *
77 * Get the I-Val of the supplied stored procedure's
78 * AttrInfo section
79 * Initialise the AttrInfo length in the request
80 */
getStoredProcAttrInfo(Uint32 storedId,KeyReqStruct * req_struct,Uint32 & attrInfoIVal)81 int Dbtup::getStoredProcAttrInfo(Uint32 storedId,
82 KeyReqStruct* req_struct,
83 Uint32& attrInfoIVal)
84 {
85 jam();
86 StoredProcPtr storedPtr;
87 c_storedProcPool.getPtr(storedPtr, storedId);
88 if (storedPtr.i != RNIL) {
89 if ((storedPtr.p->storedCode == ZSCAN_PROCEDURE) ||
90 (storedPtr.p->storedCode == ZCOPY_PROCEDURE)) {
91 /* Setup OperationRec with stored procedure AttrInfo section */
92 SegmentedSectionPtr sectionPtr;
93 getSection(sectionPtr, storedPtr.p->storedProcIVal);
94 Uint32 storedProcLen= sectionPtr.sz;
95
96 ndbassert( attrInfoIVal == RNIL );
97 attrInfoIVal= storedPtr.p->storedProcIVal;
98 req_struct->attrinfo_len= storedProcLen;
99 return ZOK;
100 }
101 }
102 terrorCode= ZSTORED_PROC_ID_ERROR;
103 return terrorCode;
104 }
105
copyAttrinfo(Operationrec * regOperPtr,Uint32 * inBuffer,Uint32 expectedLen,Uint32 attrInfoIVal)106 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
107 Uint32* inBuffer,
108 Uint32 expectedLen,
109 Uint32 attrInfoIVal)
110 {
111 ndbassert( expectedLen > 0 || attrInfoIVal == RNIL );
112
113 if (expectedLen > 0)
114 {
115 ndbassert( attrInfoIVal != RNIL );
116
117 /* Check length in section is as we expect */
118 SegmentedSectionPtr sectionPtr;
119 getSection(sectionPtr, attrInfoIVal);
120
121 ndbrequire(sectionPtr.sz == expectedLen);
122 ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
123
124 /* Copy attrInfo data into linear buffer */
125 // TODO : Consider operating TUP out of first segment where
126 // appropriate
127 copy(inBuffer, attrInfoIVal);
128 }
129
130 regOperPtr->m_any_value= 0;
131
132 return;
133 }
134
135 void
setInvalidChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr)136 Dbtup::setInvalidChecksum(Tuple_header *tuple_ptr,
137 const Tablerec * regTabPtr)
138 {
139 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
140 {
141 jam();
142 /**
143 * Set a magic checksum when tuple isn't supposed to be read.
144 */
145 tuple_ptr->m_checksum = 0x87654321;
146 }
147 }
148
149 void
updateChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr,Uint32 old_header,Uint32 new_header)150 Dbtup::updateChecksum(Tuple_header *tuple_ptr,
151 const Tablerec *regTabPtr,
152 Uint32 old_header,
153 Uint32 new_header)
154 {
155 /**
156 * This function is used when only updating the header bits in row.
157 * We start by XOR:ing the old header, this negates the impact of the
158 * old header since old_header ^ old_header = 0. Next we XOR with new
159 * header to get the new checksum and finally we store the new checksum.
160 */
161 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
162 {
163 Uint32 checksum = tuple_ptr->m_checksum;
164 jam();
165 checksum ^= old_header;
166 checksum ^= new_header;
167 tuple_ptr->m_checksum = checksum;
168 }
169 }
170
171 void
setChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr)172 Dbtup::setChecksum(Tuple_header* tuple_ptr,
173 const Tablerec* regTabPtr)
174 {
175 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
176 {
177 jam();
178 tuple_ptr->m_checksum= 0;
179 tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
180 }
181 }
182
183 Uint32
calculateChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr)184 Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
185 const Tablerec* regTabPtr)
186 {
187 Uint32 checksum;
188 Uint32 rec_size, *tuple_header;
189 rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
190 tuple_header= &tuple_ptr->m_header_bits;
191 // includes tupVersion
192 //printf("%p - ", tuple_ptr);
193
194 /**
195 * We include every except the first word of the Tuple header
196 * which is only used on copy tuples. We do however include
197 * the header bits.
198 */
199 checksum = computeXorChecksum(
200 tuple_header, (rec_size-Tuple_header::HeaderSize) + 1);
201
202 //printf("-> %.8x\n", checksum);
203
204 #if 0
205 if (var_sized) {
206 /*
207 if (! req_struct->fix_var_together) {
208 jam();
209 checksum ^= tuple_header[rec_size];
210 }
211 */
212 jam();
213 var_data_part= req_struct->var_data_start;
214 vsize_words= calculate_total_var_size(req_struct->var_len_array,
215 regTabPtr->no_var_attr);
216 ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
217 checksum = computeXorChecksum(var_data_part,vsize_words,checksum);
218 }
219 #endif
220 return checksum;
221 }
222
223 int
corruptedTupleDetected(KeyReqStruct * req_struct,Tablerec * regTabPtr)224 Dbtup::corruptedTupleDetected(KeyReqStruct *req_struct, Tablerec *regTabPtr)
225 {
226 Uint32 checksum = calculateChecksum(req_struct->m_tuple_ptr, regTabPtr);
227 Uint32 header_bits = req_struct->m_tuple_ptr->m_header_bits;
228 ndbout_c("Tuple corruption detected, checksum: 0x%x, header_bits: 0x%x"
229 ", checksum word: 0x%x",
230 checksum, header_bits, req_struct->m_tuple_ptr->m_checksum);
231 if (c_crashOnCorruptedTuple && !ERROR_INSERTED(4036))
232 {
233 ndbout_c(" Exiting.");
234 ndbrequire(false);
235 }
236 (void)ERROR_INSERTED_CLEAR(4036);
237 terrorCode= ZTUPLE_CORRUPTED_ERROR;
238 tupkeyErrorLab(req_struct);
239 return -1;
240 }
241
242 /* ----------------------------------------------------------------- */
243 /* ----------- INSERT_ACTIVE_OP_LIST -------------- */
244 /* ----------------------------------------------------------------- */
245 bool
insertActiveOpList(OperationrecPtr regOperPtr,KeyReqStruct * req_struct)246 Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
247 KeyReqStruct* req_struct)
248 {
249 OperationrecPtr prevOpPtr;
250 ndbrequire(!regOperPtr.p->op_struct.bit_field.in_active_list);
251 regOperPtr.p->op_struct.bit_field.in_active_list= true;
252 req_struct->prevOpPtr.i=
253 prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
254 regOperPtr.p->prevActiveOp= prevOpPtr.i;
255 regOperPtr.p->nextActiveOp= RNIL;
256 regOperPtr.p->m_undo_buffer_space= 0;
257 req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
258 if (prevOpPtr.i == RNIL) {
259 return true;
260 } else {
261 jam();
262 req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
263 prevOpPtr.p->nextActiveOp= regOperPtr.i;
264
265 regOperPtr.p->op_struct.bit_field.m_wait_log_buffer=
266 prevOpPtr.p->op_struct.bit_field.m_wait_log_buffer;
267 regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit=
268 prevOpPtr.p->op_struct.bit_field.m_load_diskpage_on_commit;
269 regOperPtr.p->op_struct.bit_field.m_gci_written=
270 prevOpPtr.p->op_struct.bit_field.m_gci_written;
271 regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
272 // start with prev mask (matters only for UPD o UPD)
273
274 regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
275
276 prevOpPtr.p->op_struct.bit_field.m_wait_log_buffer= 0;
277 prevOpPtr.p->op_struct.bit_field.m_load_diskpage_on_commit= 0;
278
279 if(prevOpPtr.p->tuple_state == TUPLE_PREPARED)
280 {
281 Uint32 op= regOperPtr.p->op_type;
282 Uint32 prevOp= prevOpPtr.p->op_type;
283 if (prevOp == ZDELETE)
284 {
285 if(op == ZINSERT)
286 {
287 // mark both
288 prevOpPtr.p->op_struct.bit_field.delete_insert_flag= true;
289 regOperPtr.p->op_struct.bit_field.delete_insert_flag= true;
290 return true;
291 }
292 else if (op == ZREFRESH)
293 {
294 /* ZREFRESH after Delete - ok */
295 return true;
296 }
297 else
298 {
299 terrorCode= ZTUPLE_DELETED_ERROR;
300 return false;
301 }
302 }
303 else if(op == ZINSERT && prevOp != ZDELETE)
304 {
305 terrorCode= ZINSERT_ERROR;
306 return false;
307 }
308 else if (prevOp == ZREFRESH)
309 {
310 /* No operation after a ZREFRESH */
311 terrorCode= ZOP_AFTER_REFRESH_ERROR;
312 return false;
313 }
314 return true;
315 }
316 else
317 {
318 terrorCode= ZMUST_BE_ABORTED_ERROR;
319 return false;
320 }
321 }
322 }
323
324 bool
setup_read(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,bool disk)325 Dbtup::setup_read(KeyReqStruct *req_struct,
326 Operationrec* regOperPtr,
327 Fragrecord* regFragPtr,
328 Tablerec* regTabPtr,
329 bool disk)
330 {
331 OperationrecPtr currOpPtr;
332 currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
333 Uint32 bits = req_struct->m_tuple_ptr->m_header_bits;
334
335 if (unlikely(req_struct->m_reorg != ScanFragReq::REORG_ALL))
336 {
337 Uint32 moved = bits & Tuple_header::REORG_MOVE;
338 if (! ((req_struct->m_reorg == ScanFragReq::REORG_NOT_MOVED && moved == 0) ||
339 (req_struct->m_reorg == ScanFragReq::REORG_MOVED && moved != 0)))
340 {
341 terrorCode= ZTUPLE_DELETED_ERROR;
342 return false;
343 }
344 }
345 if (currOpPtr.i == RNIL)
346 {
347 if (regTabPtr->need_expand(disk))
348 prepare_read(req_struct, regTabPtr, disk);
349 return true;
350 }
351
352 do {
353 Uint32 savepointId= regOperPtr->savepointId;
354 bool dirty= req_struct->dirty_op;
355
356 c_operation_pool.getPtr(currOpPtr);
357 bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
358 req_struct->trans_id1,
359 req_struct->trans_id2);
360 /**
361 * Read committed in same trans reads latest copy
362 */
363 if(dirty && !sameTrans)
364 {
365 savepointId= 0;
366 }
367 else if(sameTrans)
368 {
369 // Use savepoint even in read committed mode
370 dirty= false;
371 }
372
373 /* found == true indicates that savepoint is some state
374 * within tuple's current transaction's uncommitted operations
375 */
376 bool found= find_savepoint(currOpPtr, savepointId);
377
378 Uint32 currOp= currOpPtr.p->op_type;
379
380 /* is_insert==true if tuple did not exist before its current
381 * transaction
382 */
383 bool is_insert = (bits & Tuple_header::ALLOC);
384
385 /* If savepoint is in transaction, and post-delete-op
386 * OR
387 * Tuple didn't exist before
388 * AND
389 * Read is dirty
390 * OR
391 * Savepoint is before-transaction
392 *
393 * Tuple does not exist in read's view
394 */
395 if((found && currOp == ZDELETE) ||
396 ((dirty || !found) && is_insert))
397 {
398 /* Tuple not visible to this read operation */
399 terrorCode= ZTUPLE_DELETED_ERROR;
400 break;
401 }
402
403 if(dirty || !found)
404 {
405 /* Read existing committed tuple */
406 }
407 else
408 {
409 req_struct->m_tuple_ptr=
410 get_copy_tuple(&currOpPtr.p->m_copy_tuple_location);
411 }
412
413 if (regTabPtr->need_expand(disk))
414 prepare_read(req_struct, regTabPtr, disk);
415
416 #if 0
417 ndbout_c("reading copy");
418 Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
419 req_struct->m_tuple_ptr= fixed_ptr;
420 req_struct->fix_var_together= true;
421 req_struct->var_len_array= (Uint16*)var_ptr;
422 req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
423 Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
424 req_struct->var_pos_array,
425 regTabPtr->no_var_attr);
426 req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
427 #endif
428 return true;
429 } while(0);
430
431 return false;
432 }
433
434 int
load_diskpage(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)435 Dbtup::load_diskpage(Signal* signal,
436 Uint32 opRec, Uint32 fragPtrI,
437 Uint32 lkey1, Uint32 lkey2, Uint32 flags)
438 {
439 Ptr<Operationrec> operPtr;
440
441 c_operation_pool.getPtr(operPtr, opRec);
442
443 Operationrec * regOperPtr= operPtr.p;
444 Fragrecord * regFragPtr= prepare_fragptr.p;
445 Tablerec* regTabPtr = prepare_tabptr.p;
446
447 if (Local_key::ref(lkey1, lkey2) == ~(Uint32)0)
448 {
449 jam();
450 regOperPtr->op_struct.bit_field.m_wait_log_buffer= 1;
451 regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 1;
452 if (unlikely((flags & 7) == ZREFRESH))
453 {
454 jam();
455 /* Refresh of previously nonexistant DD tuple.
456 * No diskpage to load at commit time
457 */
458 regOperPtr->op_struct.bit_field.m_wait_log_buffer= 0;
459 regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 0;
460 }
461
462 /* In either case return 1 for 'proceed' */
463 return 1;
464 }
465
466 jam();
467 Uint32 page_idx= lkey2;
468 Uint32 frag_page_id= lkey1;
469 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
470 frag_page_id);
471 regOperPtr->m_tuple_location.m_page_idx= page_idx;
472
473 PagePtr page_ptr;
474 Uint32* tmp= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
475 Tuple_header* ptr= (Tuple_header*)tmp;
476
477 int res= 1;
478 if(ptr->m_header_bits & Tuple_header::DISK_PART)
479 {
480 jam();
481 Page_cache_client::Request req;
482 memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
483 req.m_callback.m_callbackData= opRec;
484 req.m_callback.m_callbackFunction=
485 safe_cast(&Dbtup::disk_page_load_callback);
486
487 #ifdef ERROR_INSERT
488 if (ERROR_INSERTED(4022))
489 {
490 flags |= Page_cache_client::DELAY_REQ;
491 const NDB_TICKS now = NdbTick_getCurrentTicks();
492 req.m_delay_until_time = NdbTick_AddMilliseconds(now,(Uint64)3000);
493 }
494 if (ERROR_INSERTED(4035) && (rand() % 13) == 0)
495 {
496 // Disk access have to randomly wait max 16ms for a diskpage
497 Uint64 delay = (Uint64)(rand() % 16) + 1;
498 flags |= Page_cache_client::DELAY_REQ;
499 const NDB_TICKS now = NdbTick_getCurrentTicks();
500 req.m_delay_until_time = NdbTick_AddMilliseconds(now,delay);
501 }
502 #endif
503
504 Page_cache_client pgman(this, c_pgman);
505 res= pgman.get_page(signal, req, flags);
506 }
507
508 switch(flags & 7)
509 {
510 case ZREAD:
511 case ZREAD_EX:
512 break;
513 case ZDELETE:
514 case ZUPDATE:
515 case ZINSERT:
516 case ZWRITE:
517 case ZREFRESH:
518 jam();
519 regOperPtr->op_struct.bit_field.m_wait_log_buffer= 1;
520 regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 1;
521 }
522 return res;
523 }
524
525 void
disk_page_load_callback(Signal * signal,Uint32 opRec,Uint32 page_id)526 Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
527 {
528 Ptr<Operationrec> operPtr;
529 c_operation_pool.getPtr(operPtr, opRec);
530 c_lqh->acckeyconf_load_diskpage_callback(signal,
531 operPtr.p->userpointer, page_id);
532 }
533
534 int
load_diskpage_scan(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)535 Dbtup::load_diskpage_scan(Signal* signal,
536 Uint32 opRec, Uint32 fragPtrI,
537 Uint32 lkey1, Uint32 lkey2, Uint32 flags)
538 {
539 Ptr<Operationrec> operPtr;
540
541 c_operation_pool.getPtr(operPtr, opRec);
542
543 Operationrec * regOperPtr= operPtr.p;
544 Fragrecord * regFragPtr= prepare_fragptr.p;
545 Tablerec* regTabPtr = prepare_tabptr.p;
546
547 jam();
548 Uint32 page_idx= lkey2;
549 Uint32 frag_page_id= lkey1;
550 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
551 frag_page_id);
552 regOperPtr->m_tuple_location.m_page_idx= page_idx;
553 regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 0;
554
555 PagePtr page_ptr;
556 Uint32* tmp= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
557 Tuple_header* ptr= (Tuple_header*)tmp;
558
559 int res= 1;
560 if(ptr->m_header_bits & Tuple_header::DISK_PART)
561 {
562 jam();
563 Page_cache_client::Request req;
564 memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
565 req.m_callback.m_callbackData= opRec;
566 req.m_callback.m_callbackFunction=
567 safe_cast(&Dbtup::disk_page_load_scan_callback);
568
569 Page_cache_client pgman(this, c_pgman);
570 res= pgman.get_page(signal, req, flags);
571 }
572 return res;
573 }
574
575 void
disk_page_load_scan_callback(Signal * signal,Uint32 opRec,Uint32 page_id)576 Dbtup::disk_page_load_scan_callback(Signal* signal,
577 Uint32 opRec, Uint32 page_id)
578 {
579 Ptr<Operationrec> operPtr;
580 c_operation_pool.getPtr(operPtr, opRec);
581 c_lqh->next_scanconf_load_diskpage_callback(signal,
582 operPtr.p->userpointer, page_id);
583 }
584
585 /**
586 This method is used to prepare for faster execution of TUPKEYREQ.
587 It prepares the pointers to the fragment record, the table record,
588 the page for the record and the tuple pointer to the record. In
589 addition it also prefetches the cache lines of the fixed size part
590 of the tuple.
591
592 The calculations performed here have to be done when we arrive in
593 execTUPKEYREQ, we perform them here to enable prefetching the
594 cache lines of the fixed part of the tuple storage. In order to not
595 do the same work twice we store the calculated information in
596 block variables. Given that we can arrive in execTUPKEYREQ from
597 multiple directions, we have added debug-code that verifies that we
598 have passed through prepareTUPKEYREQ always before we reach
599 execTUPKEYREQ.
600
601 The access of the fixed size part of the tuple is an almost certain
602 CPU cache miss and so performing this as early as possible will
603 decrease the time for cache misses later in the process. Tests using
604 Sysbench indicates that this prefetch gains about 5% in performance.
605 */
prepareTUPKEYREQ(Uint32 page_id,Uint32 page_idx,Uint32 frag_id)606 void Dbtup::prepareTUPKEYREQ(Uint32 page_id,
607 Uint32 page_idx,
608 Uint32 frag_id)
609 {
610 FragrecordPtr fragptr;
611 TablerecPtr tabptr;
612 PagePtr pagePtr;
613 Local_key key;
614
615 fragptr.i = frag_id;
616 const Uint32 RnoOfFragrec= cnoOfFragrec;
617 const Uint32 RnoOfTablerec= cnoOfTablerec;
618 Fragrecord * Rfragrecord = fragrecord;
619 Tablerec * Rtablerec = tablerec;
620
621 jamEntry();
622 ndbrequire(fragptr.i < RnoOfFragrec);
623 ptrAss(fragptr, Rfragrecord);
624 tabptr.i = fragptr.p->fragTableId;
625 #ifdef VM_TRACE
626 prepare_orig_local_key.m_page_no = page_id;
627 prepare_orig_local_key.m_page_idx = page_idx;
628 #endif
629 bool is_page_key = (!(Local_key::isInvalid(page_id, page_idx) ||
630 isCopyTuple(page_id, page_idx)));
631 ptrCheckGuard(tabptr, RnoOfTablerec, Rtablerec);
632
633 if (is_page_key)
634 {
635 register Uint32 fixed_part_size_in_words =
636 tabptr.p->m_offsets[MM].m_fix_header_size;
637 page_id = getRealpid(fragptr.p, page_id);
638 key.m_page_no = page_id;
639 key.m_page_idx = page_idx;
640 register Uint32 *tuple_ptr = get_ptr(&pagePtr,
641 &key,
642 tabptr.p);
643 jam();
644 prepare_pageptr = pagePtr;
645 prepare_tuple_ptr = tuple_ptr;
646 prepare_page_no = page_id;
647 for (Uint32 i = 0; i < fixed_part_size_in_words; i+= 16)
648 {
649 NDB_PREFETCH_WRITE(tuple_ptr + i);
650 }
651 }
652 prepare_tabptr = tabptr;
653 prepare_fragptr = fragptr;
654 }
655
execTUPKEYREQ(Signal * signal)656 bool Dbtup::execTUPKEYREQ(Signal* signal)
657 {
658 TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
659 Ptr<Operationrec> operPtr;
660 KeyReqStruct req_struct(this);
661
662 Uint32 RoperPtr= tupKeyReq->connectPtr;
663
664 jamEntry();
665
666 c_operation_pool.getPtr(operPtr, RoperPtr);
667
668 #ifdef VM_TRACE
669 {
670 bool error_found = false;
671 Local_key key;
672 key.m_page_no = tupKeyReq->keyRef1;
673 key.m_page_idx = tupKeyReq->keyRef2;
674 if (key.m_page_no != prepare_orig_local_key.m_page_no)
675 {
676 ndbout << "page_no = " << prepare_orig_local_key.m_page_no;
677 ndbout << " keyRef1 = " << key.m_page_no << endl;
678 error_found = true;
679 }
680 if (key.m_page_idx != prepare_orig_local_key.m_page_idx)
681 {
682 ndbout << "page_idx = " << prepare_orig_local_key.m_page_idx;
683 ndbout << " keyRef2 = " << key.m_page_idx << endl;
684 error_found = true;
685 }
686 if (prepare_fragptr.i != tupKeyReq->fragPtr)
687 {
688 ndbout << "fragptr.i = " << prepare_fragptr.i;
689 ndbout << " keyRef1 = " << tupKeyReq->fragPtr << endl;
690 error_found = true;
691 }
692 if (error_found)
693 {
694 ndbout << flush;
695 }
696 ndbassert(prepare_orig_local_key.m_page_no == key.m_page_no);
697 ndbassert(prepare_orig_local_key.m_page_idx == key.m_page_idx);
698 ndbassert(prepare_fragptr.i == tupKeyReq->fragPtr);
699 }
700 #endif
701
702 const Uint32 fragPtrI = prepare_fragptr.i;
703 /**
704 * DESIGN PATTERN DESCRIPTION
705 * --------------------------
706 * The variable operPtr.p is located on the block object, it is located
707 * there to ensure that we can easily access it in many methods such
708 * that we don't have to transport it through method calls. There are
709 * a number of references to structs that we store in this manner.
710 * Oftentimes they refer to the operation object, the table object,
711 * the fragment object and sometimes also a transaction object.
712 *
713 * Given that we both need access to the .i-value and the .p-value
714 * of all of those objects we store them on the block object to
715 * avoid the need of transporting them from function to function.
716 * This is an optimisation and obviously requires that one keeps
717 * track of which variables are alive and which are not.
718 * The function clear_global_variables used in debug mode ensures
719 * that all pointer variables are cleared before an asynchronous
720 * signal is executed.
721 *
722 * When we need to access data through the .p-value many times
723 * (more than one time), then it often pays off to declare a
724 * stack variable such as below regOperPtr. This helps the compiler
725 * to avoid having to constantly reload the .p-value from the
726 * block object after each store operation through a pointer.
727 *
728 * One has to take care though when doing this to ensure that
729 * one doesn't create a stack variable that creates too much
730 * pressure on the register allocation in the method. This is
731 * particularly important in large methods.
732 *
733 * The pattern is to define the variable as:
734 * Operationrec * const regOperPtr = operPtr.p;
735 * This helps the compiler to understand that we won't change the
736 * pointer here.
737 */
738 Operationrec * const regOperPtr= operPtr.p;
739
740 Dbtup::TransState trans_state = get_trans_state(regOperPtr);
741
742 req_struct.signal= signal;
743 req_struct.num_fired_triggers= 0;
744 req_struct.no_exec_instructions = 0;
745 req_struct.read_length= 0;
746 req_struct.last_row= false;
747 req_struct.changeMask.clear();
748 req_struct.m_is_lcp = false;
749 req_struct.operPtrP = regOperPtr;
750 regOperPtr->fragmentPtr= fragPtrI;
751
752 if (unlikely(trans_state != TRANS_IDLE))
753 {
754 TUPKEY_abort(&req_struct, 39);
755 return false;
756 }
757
758 /* ----------------------------------------------------------------- */
759 // Operation is ZREAD when we arrive here so no need to worry about the
760 // abort process.
761 /* ----------------------------------------------------------------- */
762 /* ----------- INITIATE THE OPERATION RECORD -------------- */
763 /* ----------------------------------------------------------------- */
764 {
765 register Operationrec::OpStruct op_struct;
766 op_struct.op_bit_fields = regOperPtr->op_struct.op_bit_fields;
767 const Uint32 TrequestInfo= tupKeyReq->request;
768 const Uint32 disable_fk_checks = tupKeyReq->disable_fk_checks;
769 const Uint32 primaryReplica = tupKeyReq->primaryReplica;
770
771 regOperPtr->m_copy_tuple_location.setNull();
772 op_struct.bit_field.delete_insert_flag = false;
773 op_struct.bit_field.tupVersion= ZNIL;
774 op_struct.bit_field.m_physical_only_op = 0;
775 op_struct.bit_field.m_gci_written = 0;
776 op_struct.bit_field.m_disable_fk_checks = disable_fk_checks;
777 op_struct.bit_field.primary_replica= primaryReplica;
778 op_struct.bit_field.m_reorg = TupKeyReq::getReorgFlag(TrequestInfo);
779 req_struct.m_prio_a_flag = TupKeyReq::getPrioAFlag(TrequestInfo);
780 req_struct.m_reorg = TupKeyReq::getReorgFlag(TrequestInfo);
781 regOperPtr->op_struct.op_bit_fields = op_struct.op_bit_fields;
782 regOperPtr->op_type= TupKeyReq::getOperation(TrequestInfo);
783 req_struct.m_disable_fk_checks = disable_fk_checks;
784 req_struct.m_use_rowid = TupKeyReq::getRowidFlag(TrequestInfo);
785 req_struct.interpreted_exec= TupKeyReq::getInterpretedFlag(TrequestInfo);
786 req_struct.dirty_op= TupKeyReq::getDirtyFlag(TrequestInfo);
787 }
788
789 {
790 /**
791 * DESIGN PATTERN DESCRIPTION
792 * --------------------------
793 * This code segment is using a common design pattern in the
794 * signal reception and signal sending code of performance
795 * critical functions such as execTUPKEYREQ.
796 * The idea is that at signal reception we need to transfer
797 * data from the signal object to state variables relating to
798 * the operation we are about to execute.
799 * The normal manner to do this would be to write:
800 * regOperPtr->savePointId = tupKeyReq->savePointId;
801 *
802 * This normal manner would however not work so well due to
803 * that the compiler has to issue assembler code that does
804 * a load operation immediately followed by a store operation.
805 * Many modern CPUs can hide parts of this deficiency in the
806 * code, but only to a certain extent.
807 *
808 * What we want to do here is instead to perform a series of
809 * six loads followed by six stores. The delay after a load
810 * is ready for a store operation is oftentimes 3 cycles. Many
811 * CPUs can handle two loads per cycle. So by using 6 loads
812 * we ensure that we execute at full speed as long as the data
813 * is available in the first level CPU cache.
814 *
815 * The reason we don't want to use more than 6 loads before
816 * we start storing is that CPUs have a limited amount of
817 * CPU registers. The x86 have 16 CPU registers available.
818 * Here is a short description of commonly used registers:
819 * RIP: Instruction pointer, not available
820 * RSP: Top of Stack pointer, not available for L/S
821 * RBP: Current Stack frame pointer, not available for L/S
822 * RDI: Usually this-pointer, reference to Dbtup object here
823 *
824 * In this particular example we also need to have a register
825 * for storing:
826 * tupKeyReq, req_struct, regOperPtr.
827 *
828 * The compiler also needs a few more registers to track some
829 * of the other live variables such that not all of the live
830 * variables have to be spilled to the stack.
831 *
832 * Thus the design pattern uses between 4 to 6 variables loaded
833 * before storing them. Another commonly used manner is to locate
834 * all initialisations to constants in one or more of those
835 * initialisation code blocks as well.
836 *
837 * The naming pattern is to define the temporary variable as
838 * const Uint32 name_of_variable_to_assign = x->name;
839 * y->name_of_variable_to_assign = name_of_variable_to_assign.
840 *
841 * In the case where the receiver of the data is a signal object
842 * we use the pattern:
843 * const Uint32 sig0 = x->name;
844 * signal->theData[0] = sig0;
845 *
846 * Finally if possible we should place this initialisation in a
847 * separate code block by surrounding it with brackets, this is
848 * to assist the compiler to understand that the variables used
849 * are not needed after storing its value. Most compilers will
850 * handle this well anyways, but it helps the compiler avoid
851 * doing mistakes and it also clarifies for the reader of the
852 * source code. As can be seen in code below this rule is
853 * however not followed if it will remove other possibilities.
854 */
855 const Uint32 savePointId= tupKeyReq->savePointId;
856 const Uint32 attrBufLen = tupKeyReq->attrBufLen;
857 const Uint32 opRef = tupKeyReq->opRef;
858 const Uint32 tcOpIndex = tupKeyReq->tcOpIndex;
859 const Uint32 coordinatorTC= tupKeyReq->coordinatorTC;
860 const Uint32 applRef = tupKeyReq->applRef;
861
862 regOperPtr->savepointId= savePointId;
863 req_struct.log_size= attrBufLen;
864 req_struct.attrinfo_len= attrBufLen;
865 req_struct.tc_operation_ptr= opRef;
866 req_struct.TC_index= tcOpIndex;
867 req_struct.TC_ref= coordinatorTC;
868 req_struct.rec_blockref= applRef;
869 }
870
871 const Uint32 disk_page= tupKeyReq->disk_page;
872 const Uint32 row_id_page_no = tupKeyReq->m_row_id_page_no;
873 const Uint32 row_id_page_idx = tupKeyReq->m_row_id_page_idx;
874 const Uint32 deferred_constraints = tupKeyReq->deferred_constraints;
875 const Uint32 keyRef1= tupKeyReq->keyRef1;
876 const Uint32 keyRef2 = tupKeyReq->keyRef2;
877
878 req_struct.m_disk_page_ptr.i= disk_page;
879 req_struct.m_row_id.m_page_no = row_id_page_no;
880 req_struct.m_row_id.m_page_idx = row_id_page_idx;
881 req_struct.m_deferred_constraints = deferred_constraints;
882 Uint32 pageid = req_struct.frag_page_id= keyRef1;
883 Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= keyRef2;
884
885 const Uint32 transId1 = tupKeyReq->transId1;
886 const Uint32 transId2 = tupKeyReq->transId2;
887 Tablerec * const regTabPtr = prepare_tabptr.p;
888 Fragrecord * const regFragPtr = prepare_fragptr.p;
889
890 /* Get AttrInfo section if this is a long TUPKEYREQ */
891 Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
892 const Uint32 Rstoredid= tupKeyReq->storedProcedure;
893
894 req_struct.trans_id1= transId1;
895 req_struct.trans_id2= transId2;
896 req_struct.tablePtrP = regTabPtr;
897 req_struct.fragPtrP = regFragPtr;
898
899 /* If we have AttrInfo, check we expected it, and
900 * that we don't have AttrInfo by another means
901 */
902 ndbassert( (attrInfoIVal == RNIL) ||
903 (tupKeyReq->attrBufLen > 0));
904
905 const Uint32 Roptype = regOperPtr->op_type;
906
907 if (Rstoredid != ZNIL) {
908 /* This is part of a scan, get attrInfoIVal for
909 * given stored procedure
910 */
911 ndbrequire(getStoredProcAttrInfo(Rstoredid,
912 &req_struct,
913 attrInfoIVal) == ZOK);
914 }
915
916 /* Copy AttrInfo from section into linear in-buffer */
917 copyAttrinfo(regOperPtr,
918 &cinBuffer[0],
919 req_struct.attrinfo_len,
920 attrInfoIVal);
921
922
923 const Uint32 loc_prepare_page_id = prepare_page_no;
924 if (Roptype == ZINSERT && Local_key::isInvalid(pageid, pageidx))
925 {
926 // No tuple allocated yet
927 goto do_insert;
928 }
929
930 if (Roptype == ZREFRESH && Local_key::isInvalid(pageid, pageidx))
931 {
932 // No tuple allocated yet
933 goto do_refresh;
934 }
935
936 if (unlikely(isCopyTuple(pageid, pageidx)))
937 {
938 /**
939 * Only LCP reads a copy-tuple "directly"
940 */
941 ndbassert(Roptype == ZREAD);
942 ndbassert(disk_page == RNIL);
943 setup_lcp_read_copy_tuple(&req_struct, regOperPtr, regFragPtr, regTabPtr);
944 goto do_read;
945 }
946
947 /**
948 * Get pointer to tuple
949 */
950 regOperPtr->m_tuple_location.m_page_no = loc_prepare_page_id;
951 setup_fixed_tuple_ref_opt(&req_struct);
952 setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
953
954 /**
955 * Check operation
956 */
957 if (Roptype == ZREAD) {
958 jam();
959
960 if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
961 disk_page != RNIL))
962 {
963 do_read:
964 if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
965 {
966 req_struct.log_size= 0;
967 /* ---------------------------------------------------------------- */
968 // Read Operations need not to be taken out of any lists.
969 // We also do not need to wait for commit since there is no changes
970 // to commit. Thus we
971 // prepare the operation record already now for the next operation.
972 // Write operations set the state to STARTED indicating that they
973 // are waiting for the Commit or Abort decision.
974 /* ---------------------------------------------------------------- */
975 returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_IDLE);
976 return true;
977 }
978 return false;
979 }
980 tupkeyErrorLab(&req_struct);
981 return false;
982 }
983
984 if(insertActiveOpList(operPtr, &req_struct))
985 {
986 if(Roptype == ZINSERT)
987 {
988 jam();
989 do_insert:
990 Local_key accminupdate;
991 Local_key * accminupdateptr = &accminupdate;
992 if (unlikely(handleInsertReq(signal, operPtr,
993 prepare_fragptr, regTabPtr, &req_struct,
994 &accminupdateptr) == -1))
995 {
996 return false;
997 }
998
999 terrorCode = 0;
1000 checkImmediateTriggersAfterInsert(&req_struct,
1001 regOperPtr,
1002 regTabPtr,
1003 disk_page != RNIL);
1004
1005 if (unlikely(terrorCode != 0))
1006 {
1007 tupkeyErrorLab(&req_struct);
1008 return false;
1009 }
1010
1011 if (!regTabPtr->tuxCustomTriggers.isEmpty())
1012 {
1013 jam();
1014 if (unlikely(executeTuxInsertTriggers(signal,
1015 regOperPtr,
1016 regFragPtr,
1017 regTabPtr) != 0))
1018 {
1019 jam();
1020 /*
1021 * TUP insert succeeded but add of TUX entries failed. All
1022 * TUX changes have been rolled back at this point.
1023 *
1024 * We will abort via tupkeyErrorLab() as usual. This routine
1025 * however resets the operation to ZREAD. The TUP_ABORTREQ
1026 * arriving later cannot then undo the insert.
1027 *
1028 * Therefore we call TUP_ABORTREQ already now. Diskdata etc
1029 * should be in memory and timeslicing cannot occur. We must
1030 * skip TUX abort triggers since TUX is already aborted. We
1031 * will dealloc the fixed and var parts if necessary.
1032 */
1033 signal->theData[0] = operPtr.i;
1034 do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS | ZABORT_DEALLOC);
1035 tupkeyErrorLab(&req_struct);
1036 return false;
1037 }
1038 }
1039
1040 if (accminupdateptr)
1041 {
1042 /**
1043 * Update ACC local-key, once *everything* has completed succesfully
1044 */
1045 c_lqh->accminupdate(signal,
1046 regOperPtr->userpointer,
1047 accminupdateptr);
1048 }
1049
1050 returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1051 return true;
1052 }
1053
1054 if (Roptype == ZUPDATE) {
1055 jam();
1056 if (unlikely(handleUpdateReq(signal, regOperPtr,
1057 regFragPtr, regTabPtr,
1058 &req_struct, disk_page != RNIL) == -1))
1059 {
1060 return false;
1061 }
1062
1063 terrorCode = 0;
1064 checkImmediateTriggersAfterUpdate(&req_struct,
1065 regOperPtr,
1066 regTabPtr,
1067 disk_page != RNIL);
1068
1069 if (unlikely(terrorCode != 0))
1070 {
1071 tupkeyErrorLab(&req_struct);
1072 return false;
1073 }
1074
1075 if (!regTabPtr->tuxCustomTriggers.isEmpty())
1076 {
1077 jam();
1078 if (unlikely(executeTuxUpdateTriggers(signal,
1079 regOperPtr,
1080 regFragPtr,
1081 regTabPtr) != 0))
1082 {
1083 jam();
1084 /*
1085 * See insert case.
1086 */
1087 signal->theData[0] = operPtr.i;
1088 do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
1089 tupkeyErrorLab(&req_struct);
1090 return false;
1091 }
1092 }
1093
1094 returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1095 return true;
1096 }
1097 else if(Roptype == ZDELETE)
1098 {
1099 jam();
1100 req_struct.log_size= 0;
1101 if (unlikely(handleDeleteReq(signal, regOperPtr,
1102 regFragPtr, regTabPtr,
1103 &req_struct,
1104 disk_page != RNIL) == -1))
1105 {
1106 return false;
1107 }
1108
1109 terrorCode = 0;
1110 checkImmediateTriggersAfterDelete(&req_struct,
1111 regOperPtr,
1112 regTabPtr,
1113 disk_page != RNIL);
1114
1115 if (unlikely(terrorCode != 0))
1116 {
1117 tupkeyErrorLab(&req_struct);
1118 return false;
1119 }
1120
1121 /*
1122 * TUX doesn't need to check for triggers at delete since entries in
1123 * the index are kept until commit time.
1124 */
1125
1126 returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1127 return true;
1128 }
1129 else if (Roptype == ZREFRESH)
1130 {
1131 /**
1132 * No TUX or immediate triggers, just detached triggers
1133 */
1134 do_refresh:
1135 if (unlikely(handleRefreshReq(signal, operPtr,
1136 prepare_fragptr, regTabPtr,
1137 &req_struct, disk_page != RNIL) == -1))
1138 {
1139 return false;
1140 }
1141
1142 returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1143 return true;
1144
1145 }
1146 else
1147 {
1148 ndbrequire(false); // Invalid op type
1149 }
1150 }
1151
1152 tupkeyErrorLab(&req_struct);
1153 return false;
1154 }
1155
1156 void
setup_fixed_part(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1157 Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
1158 Operationrec* regOperPtr,
1159 Tablerec* regTabPtr)
1160 {
1161 ndbassert(regOperPtr->op_type == ZINSERT ||
1162 (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
1163
1164 req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1165 req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1166
1167 Uint32 num_attr= regTabPtr->m_no_of_attributes;
1168 Uint32 descr_start= regTabPtr->tabDescriptor;
1169 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1170 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1171 req_struct->attr_descr= tab_descr;
1172 }
1173
1174 void
setup_lcp_read_copy_tuple(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)1175 Dbtup::setup_lcp_read_copy_tuple(KeyReqStruct* req_struct,
1176 Operationrec* regOperPtr,
1177 Fragrecord* regFragPtr,
1178 Tablerec* regTabPtr)
1179 {
1180 Local_key tmp;
1181 tmp.m_page_no = req_struct->frag_page_id;
1182 tmp.m_page_idx = regOperPtr->m_tuple_location.m_page_idx;
1183 clearCopyTuple(tmp.m_page_no, tmp.m_page_idx);
1184
1185 Uint32 * copytuple = get_copy_tuple_raw(&tmp);
1186 Local_key rowid;
1187 memcpy(&rowid, copytuple+0, sizeof(Local_key));
1188
1189 req_struct->frag_page_id = rowid.m_page_no;
1190 regOperPtr->m_tuple_location.m_page_idx = rowid.m_page_idx;
1191
1192 Tuple_header * th = get_copy_tuple(copytuple);
1193 req_struct->m_page_ptr.setNull();
1194 req_struct->m_tuple_ptr = (Tuple_header*)th;
1195 th->m_operation_ptr_i = RNIL;
1196 ndbassert((th->m_header_bits & Tuple_header::COPY_TUPLE) != 0);
1197
1198 Uint32 num_attr= regTabPtr->m_no_of_attributes;
1199 Uint32 descr_start= regTabPtr->tabDescriptor;
1200 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1201 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1202 req_struct->attr_descr= tab_descr;
1203
1204 bool disk = false;
1205 if (regTabPtr->need_expand(disk))
1206 {
1207 jam();
1208 prepare_read(req_struct, regTabPtr, disk);
1209 }
1210 }
1211
1212 /* ---------------------------------------------------------------- */
1213 /* ------------------------ CONFIRM REQUEST ----------------------- */
1214 /* ---------------------------------------------------------------- */
1215 inline
returnTUPKEYCONF(Signal * signal,KeyReqStruct * req_struct,Operationrec * regOperPtr,TransState trans_state)1216 void Dbtup::returnTUPKEYCONF(Signal* signal,
1217 KeyReqStruct *req_struct,
1218 Operationrec * regOperPtr,
1219 TransState trans_state)
1220 {
1221 /**
1222 * When we arrive here we have been executing read code and/or write
1223 * code to read/write the tuple. During this execution path we have
1224 * not accessed the regOperPtr object for a long time and we have
1225 * accessed lots of other data in the meantime. This prefetch was
1226 * shown useful by using the perf tool. So not an obvious prefetch.
1227 */
1228 NDB_PREFETCH_WRITE(regOperPtr);
1229 TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
1230
1231 Uint32 Rcreate_rowid = req_struct->m_use_rowid;
1232 Uint32 RuserPointer= regOperPtr->userpointer;
1233 Uint32 RnumFiredTriggers= req_struct->num_fired_triggers;
1234 const Uint32 RnoExecInstructions = req_struct->no_exec_instructions;
1235 Uint32 log_size= req_struct->log_size;
1236 Uint32 read_length= req_struct->read_length;
1237 Uint32 last_row= req_struct->last_row;
1238
1239 tupKeyConf->userPtr= RuserPointer;
1240 tupKeyConf->readLength= read_length;
1241 tupKeyConf->writeLength= log_size;
1242 tupKeyConf->numFiredTriggers= RnumFiredTriggers;
1243 tupKeyConf->lastRow= last_row;
1244 tupKeyConf->rowid = Rcreate_rowid;
1245 tupKeyConf->noExecInstructions = RnoExecInstructions;
1246 set_tuple_state(regOperPtr, TUPLE_PREPARED);
1247 set_trans_state(regOperPtr, trans_state);
1248 }
1249
1250
1251 #define MAX_READ (MIN(sizeof(signal->theData), MAX_SEND_MESSAGE_BYTESIZE))
1252
1253 /* ---------------------------------------------------------------- */
1254 /* ----------------------------- READ ---------------------------- */
1255 /* ---------------------------------------------------------------- */
handleReadReq(Signal * signal,Operationrec * regOperPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)1256 int Dbtup::handleReadReq(Signal* signal,
1257 Operationrec* regOperPtr,
1258 Tablerec* regTabPtr,
1259 KeyReqStruct* req_struct)
1260 {
1261 Uint32 *dst;
1262 Uint32 dstLen, start_index;
1263 const BlockReference sendBref= req_struct->rec_blockref;
1264 if (((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1265 (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) ||
1266 ERROR_INSERTED(4036)) {
1267 jam();
1268 return corruptedTupleDetected(req_struct, regTabPtr);
1269 }
1270
1271 const Uint32 node = refToNode(sendBref);
1272 if(node != 0 && node != getOwnNodeId()) {
1273 start_index= 25;
1274 } else {
1275 jam();
1276 /**
1277 * execute direct
1278 */
1279 start_index= 3;
1280 }
1281 dst= &signal->theData[start_index];
1282 dstLen= (MAX_READ / 4) - start_index;
1283 if (!req_struct->interpreted_exec) {
1284 jam();
1285 int ret = readAttributes(req_struct,
1286 &cinBuffer[0],
1287 req_struct->attrinfo_len,
1288 dst,
1289 dstLen,
1290 false);
1291 if (likely(ret >= 0)) {
1292 /* ------------------------------------------------------------------------- */
1293 // We have read all data into coutBuffer. Now send it to the API.
1294 /* ------------------------------------------------------------------------- */
1295 jam();
1296 Uint32 TnoOfDataRead= (Uint32) ret;
1297 req_struct->read_length += TnoOfDataRead;
1298 sendReadAttrinfo(signal, req_struct, TnoOfDataRead);
1299 return 0;
1300 }
1301 else
1302 {
1303 terrorCode = Uint32(-ret);
1304 }
1305 } else {
1306 return interpreterStartLab(signal, req_struct);
1307 }
1308
1309 jam();
1310 tupkeyErrorLab(req_struct);
1311 return -1;
1312 }
1313
1314 static
1315 void
handle_reorg(Dbtup::KeyReqStruct * req_struct,Dbtup::Fragrecord::FragState state)1316 handle_reorg(Dbtup::KeyReqStruct * req_struct,
1317 Dbtup::Fragrecord::FragState state)
1318 {
1319 Uint32 reorg = req_struct->m_reorg;
1320 switch(state){
1321 case Dbtup::Fragrecord::FS_FREE:
1322 case Dbtup::Fragrecord::FS_REORG_NEW:
1323 case Dbtup::Fragrecord::FS_REORG_COMMIT_NEW:
1324 case Dbtup::Fragrecord::FS_REORG_COMPLETE_NEW:
1325 return;
1326 case Dbtup::Fragrecord::FS_REORG_COMMIT:
1327 case Dbtup::Fragrecord::FS_REORG_COMPLETE:
1328 if (reorg != ScanFragReq::REORG_NOT_MOVED)
1329 return;
1330 break;
1331 case Dbtup::Fragrecord::FS_ONLINE:
1332 if (reorg != ScanFragReq::REORG_MOVED)
1333 return;
1334 break;
1335 default:
1336 return;
1337 }
1338 req_struct->m_tuple_ptr->m_header_bits |= Dbtup::Tuple_header::REORG_MOVE;
1339 }
1340
1341 /* ---------------------------------------------------------------- */
1342 /* ---------------------------- UPDATE ---------------------------- */
1343 /* ---------------------------------------------------------------- */
handleUpdateReq(Signal * signal,Operationrec * operPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)1344 int Dbtup::handleUpdateReq(Signal* signal,
1345 Operationrec* operPtrP,
1346 Fragrecord* regFragPtr,
1347 Tablerec* regTabPtr,
1348 KeyReqStruct* req_struct,
1349 bool disk)
1350 {
1351 Tuple_header *dst;
1352 Tuple_header *base= req_struct->m_tuple_ptr, *org;
1353 ChangeMask * change_mask_ptr;
1354 if ((dst= alloc_copy_tuple(regTabPtr, &operPtrP->m_copy_tuple_location))== 0)
1355 {
1356 terrorCode= ZMEM_NOMEM_ERROR;
1357 goto error;
1358 }
1359
1360 Uint32 tup_version;
1361 change_mask_ptr = get_change_mask_ptr(regTabPtr, dst);
1362 if(operPtrP->is_first_operation())
1363 {
1364 org= req_struct->m_tuple_ptr;
1365 tup_version= org->get_tuple_version();
1366 clear_change_mask_info(regTabPtr, change_mask_ptr);
1367 }
1368 else
1369 {
1370 jam();
1371 Operationrec* prevOp= req_struct->prevOpPtr.p;
1372 tup_version= prevOp->op_struct.bit_field.tupVersion;
1373 Uint32 * rawptr = get_copy_tuple_raw(&prevOp->m_copy_tuple_location);
1374 org= get_copy_tuple(rawptr);
1375 copy_change_mask_info(regTabPtr,
1376 change_mask_ptr,
1377 get_change_mask_ptr(rawptr));
1378 }
1379
1380 /**
1381 * Check consistency before update/delete
1382 */
1383 req_struct->m_tuple_ptr= org;
1384 if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1385 (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
1386 {
1387 jam();
1388 return corruptedTupleDetected(req_struct, regTabPtr);
1389 }
1390
1391 req_struct->m_tuple_ptr= dst;
1392
1393 union {
1394 Uint32 sizes[4];
1395 Uint64 cmp[2];
1396 };
1397
1398 disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
1399 if (regTabPtr->need_expand(disk))
1400 {
1401 expand_tuple(req_struct, sizes, org, regTabPtr, disk);
1402 if(disk && operPtrP->m_undo_buffer_space == 0)
1403 {
1404 jam();
1405 operPtrP->op_struct.bit_field.m_wait_log_buffer = 1;
1406 operPtrP->op_struct.bit_field.m_load_diskpage_on_commit = 1;
1407 Uint32 sz= operPtrP->m_undo_buffer_space=
1408 (sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
1409
1410 D("Logfile_client - handleUpdateReq");
1411 Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1412 terrorCode= lgman.alloc_log_space(sz, jamBuffer());
1413 if(unlikely(terrorCode))
1414 {
1415 jam();
1416 operPtrP->m_undo_buffer_space= 0;
1417 goto error;
1418 }
1419 }
1420 }
1421 else
1422 {
1423 memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1424 req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1425 }
1426
1427 tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
1428 operPtrP->op_struct.bit_field.tupVersion= tup_version;
1429
1430 req_struct->optimize_options = 0;
1431
1432 if (!req_struct->interpreted_exec) {
1433 jam();
1434
1435 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1436 {
1437 jam();
1438 Uint32 attrId =
1439 regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1440
1441 store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
1442 }
1443 int retValue = updateAttributes(req_struct,
1444 &cinBuffer[0],
1445 req_struct->attrinfo_len);
1446 if (unlikely(retValue < 0))
1447 {
1448 terrorCode = Uint32(-retValue);
1449 goto error;
1450 }
1451 } else {
1452 if (unlikely(interpreterStartLab(signal, req_struct) == -1))
1453 return -1;
1454 }
1455
1456 update_change_mask_info(regTabPtr,
1457 change_mask_ptr,
1458 req_struct->changeMask.rep.data);
1459
1460 switch (req_struct->optimize_options) {
1461 case AttributeHeader::OPTIMIZE_MOVE_VARPART:
1462 /**
1463 * optimize varpart of tuple, move varpart of tuple from
1464 * big-free-size page list into small-free-size page list
1465 */
1466 if(base->m_header_bits & Tuple_header::VAR_PART)
1467 optimize_var_part(req_struct, base, operPtrP,
1468 regFragPtr, regTabPtr);
1469 break;
1470 case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
1471 //TODO: move fix part of tuple
1472 break;
1473 default:
1474 break;
1475 }
1476
1477 if (regTabPtr->need_shrink())
1478 {
1479 shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
1480 if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
1481 base,
1482 operPtrP,
1483 regFragPtr,
1484 regTabPtr,
1485 sizes)) {
1486 goto error;
1487 }
1488 }
1489
1490 if (req_struct->m_reorg != ScanFragReq::REORG_ALL)
1491 {
1492 handle_reorg(req_struct, regFragPtr->fragStatus);
1493 }
1494
1495 req_struct->m_tuple_ptr->set_tuple_version(tup_version);
1496
1497 setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1498
1499 set_tuple_state(operPtrP, TUPLE_PREPARED);
1500
1501 return 0;
1502
1503 error:
1504 tupkeyErrorLab(req_struct);
1505 return -1;
1506 }
1507
1508 /*
1509 expand_dyn_part - copy dynamic attributes to fully expanded size.
1510
1511 Both variable-sized and fixed-size attributes are stored in the same way
1512 in the expanded form as variable-sized attributes (in expand_var_part()).
1513
1514 This method is used for both mem and disk dynamic data.
1515
1516 dst Destination for expanded data
1517 tabPtrP Table descriptor
1518 src Pointer to the start of dynamic bitmap in source row
1519 row_len Total number of 32-bit words in dynamic part of row
1520 tabDesc Array of table descriptors
1521 order Array of indexes into tabDesc, dynfix followed by dynvar
1522 */
1523 static
1524 Uint32*
expand_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,Uint32 row_len,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 max_bmlen)1525 expand_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1526 const Uint32* src,
1527 Uint32 row_len,
1528 const Uint32 * tabDesc,
1529 const Uint16* order,
1530 Uint32 dynvar,
1531 Uint32 dynfix,
1532 Uint32 max_bmlen)
1533 {
1534 /* Copy the bitmap, zeroing out any words not stored in the row. */
1535 Uint32 *dst_bm_ptr= (Uint32*)dst->m_dyn_data_ptr;
1536 Uint32 bm_len = row_len ? (* src & Dbtup::DYN_BM_LEN_MASK) : 0;
1537
1538 assert(bm_len <= max_bmlen);
1539
1540 if(bm_len > 0)
1541 memcpy(dst_bm_ptr, src, 4*bm_len);
1542 if(bm_len < max_bmlen)
1543 bzero(dst_bm_ptr + bm_len, 4 * (max_bmlen - bm_len));
1544
1545 /**
1546 * Store max_bmlen for homogen code in DbtupRoutines
1547 */
1548 Uint32 tmp = (* dst_bm_ptr);
1549 * dst_bm_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | max_bmlen;
1550
1551 char *src_off_start= (char*)(src + bm_len);
1552 assert((UintPtr(src_off_start)&3) == 0);
1553 Uint16 *src_off_ptr= (Uint16*)src_off_start;
1554
1555 /*
1556 Prepare the variable-sized dynamic attributes, copying out data from the
1557 source row for any that are not NULL.
1558 */
1559 Uint32 no_attr= dst->m_dyn_len_offset;
1560 Uint16* dst_off_ptr= dst->m_dyn_offset_arr_ptr;
1561 Uint16* dst_len_ptr= dst_off_ptr + no_attr;
1562 Uint16 this_src_off= row_len ? * src_off_ptr++ : 0;
1563 /* We need to reserve room for the offsets written by shrink_tuple+padding.*/
1564 Uint16 dst_off= 4 * (max_bmlen + ((dynvar+2)>>1));
1565 char *dst_ptr= (char*)dst_bm_ptr + dst_off;
1566 for(Uint32 i= 0; i<dynvar; i++)
1567 {
1568 Uint16 j= order[dynfix+i];
1569 Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
1570 Uint32 len;
1571 Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1572 if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1573 {
1574 Uint16 next_src_off= *src_off_ptr++;
1575 len= next_src_off - this_src_off;
1576 memcpy(dst_ptr, src_off_start+this_src_off, len);
1577 this_src_off= next_src_off;
1578 }
1579 else
1580 {
1581 len= 0;
1582 }
1583 dst_off_ptr[i]= dst_off;
1584 dst_len_ptr[i]= dst_off+len;
1585 dst_off+= max_len;
1586 dst_ptr+= max_len;
1587 }
1588 /*
1589 The fixed-size data is stored 32-bit aligned after the variable-sized
1590 data.
1591 */
1592 char *src_ptr= src_off_start+this_src_off;
1593 src_ptr= (char *)(ALIGN_WORD(src_ptr));
1594
1595 /*
1596 Prepare the fixed-size dynamic attributes, copying out data from the
1597 source row for any that are not NULL.
1598 Note that the fixed-size data is stored in reverse from the end of the
1599 dynamic part of the row. This is true both for the stored/shrunken and
1600 for the expanded form.
1601 */
1602 for(Uint32 i= dynfix; i>0; )
1603 {
1604 i--;
1605 Uint16 j= order[i];
1606 Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1607 dst_off_ptr[dynvar+i]= dst_off;
1608 /* len offset array is not used for fixed size. */
1609 Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1610 if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1611 {
1612 assert((UintPtr(dst_ptr)&3) == 0);
1613 memcpy(dst_ptr, src_ptr, fix_size);
1614 src_ptr+= fix_size;
1615 }
1616 dst_off+= fix_size;
1617 dst_ptr+= fix_size;
1618 }
1619
1620 return (Uint32 *)dst_ptr;
1621 }
1622
1623 static
1624 Uint32*
shrink_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,Uint32 * dst_ptr,const Dbtup::Tablerec * tabPtrP,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 ind)1625 shrink_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1626 Uint32 *dst_ptr,
1627 const Dbtup::Tablerec* tabPtrP,
1628 const Uint32 * tabDesc,
1629 const Uint16* order,
1630 Uint32 dynvar,
1631 Uint32 dynfix,
1632 Uint32 ind)
1633 {
1634 /**
1635 * Now build the dynamic part, if any.
1636 * First look for any trailing all-NULL words of the bitmap; we do
1637 * not need to store those.
1638 */
1639 assert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
1640 char *dyn_src_ptr= dst->m_dyn_data_ptr;
1641 Uint32 bm_len = tabPtrP->m_offsets[ind].m_dyn_null_words; // In words
1642
1643 /* If no dynamic variables, store nothing. */
1644 assert(bm_len);
1645 {
1646 /**
1647 * clear bm-len bits, so they won't incorrect indicate
1648 * a non-zero map
1649 */
1650 * ((Uint32 *)dyn_src_ptr) &= ~Uint32(Dbtup::DYN_BM_LEN_MASK);
1651
1652 Uint32 *bm_ptr= (Uint32 *)dyn_src_ptr + bm_len - 1;
1653 while(*bm_ptr == 0)
1654 {
1655 bm_ptr--;
1656 bm_len--;
1657 if(bm_len == 0)
1658 break;
1659 }
1660 }
1661
1662 if (bm_len)
1663 {
1664 /**
1665 * Copy the bitmap, counting the number of variable sized
1666 * attributes that are not NULL on the way.
1667 */
1668 Uint32 *dyn_dst_ptr= dst_ptr;
1669 Uint32 dyn_var_count= 0;
1670 const Uint32 *src_bm_ptr= (Uint32 *)(dyn_src_ptr);
1671 Uint32 *dst_bm_ptr= (Uint32 *)dyn_dst_ptr;
1672
1673 /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
1674 * split to separate function. */
1675 Uint16 dyn_dst_data_offset= 0;
1676 const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask[ind];
1677 for(Uint16 i= 0; i< bm_len; i++)
1678 {
1679 Uint32 v= src_bm_ptr[i];
1680 dyn_var_count+= BitmaskImpl::count_bits(v & *dyn_bm_var_mask_ptr++);
1681 dst_bm_ptr[i]= v;
1682 }
1683
1684 Uint32 tmp = *dyn_dst_ptr;
1685 assert(bm_len <= Dbtup::DYN_BM_LEN_MASK);
1686 * dyn_dst_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | bm_len;
1687 dyn_dst_ptr+= bm_len;
1688 dyn_dst_data_offset= 2*dyn_var_count + 2;
1689
1690 Uint16 *dyn_src_off_array= dst->m_dyn_offset_arr_ptr;
1691 Uint16 *dyn_src_lenoff_array=
1692 dyn_src_off_array + dst->m_dyn_len_offset;
1693 Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
1694
1695 /**
1696 * Copy over the variable sized not-NULL attributes.
1697 * Data offsets are counted from the start of the offset array, and
1698 * we store one additional offset to be able to easily compute the
1699 * data length as the difference between offsets.
1700 */
1701 Uint16 off_idx= 0;
1702 for(Uint32 i= 0; i<dynvar; i++)
1703 {
1704 /**
1705 * Note that we must use the destination (shrunken) bitmap here,
1706 * as the source (expanded) bitmap may have been already clobbered
1707 * (by offset data).
1708 */
1709 Uint32 attrDesc2 = tabDesc[order[dynfix+i]+1];
1710 Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1711 if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1712 {
1713 dyn_dst_off_array[off_idx++]= dyn_dst_data_offset;
1714 Uint32 dyn_src_off= dyn_src_off_array[i];
1715 Uint32 dyn_len= dyn_src_lenoff_array[i] - dyn_src_off;
1716 memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
1717 dyn_src_ptr + dyn_src_off,
1718 dyn_len);
1719 dyn_dst_data_offset+= dyn_len;
1720 }
1721 }
1722 /* If all dynamic attributes are NULL, we store nothing. */
1723 dyn_dst_off_array[off_idx]= dyn_dst_data_offset;
1724 assert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
1725
1726 char *dynvar_end_ptr= ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
1727 char *dyn_dst_data_ptr= (char *)(ALIGN_WORD(dynvar_end_ptr));
1728
1729 /**
1730 * Zero out any padding bytes. Might not be strictly necessary,
1731 * but seems cleaner than leaving random stuff in there.
1732 */
1733 bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
1734
1735 /* *
1736 * Copy over the fixed-sized not-NULL attributes.
1737 * Note that attributes are copied in reverse order; this is to avoid
1738 * overwriting not-yet-copied data, as the data is also stored in
1739 * reverse order.
1740 */
1741 for(Uint32 i= dynfix; i > 0; )
1742 {
1743 i--;
1744 Uint16 j= order[i];
1745 Uint32 attrDesc2 = tabDesc[j+1];
1746 Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1747 if(bm_len > (pos >>5 ) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1748 {
1749 Uint32 fixsize=
1750 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1751 memmove(dyn_dst_data_ptr,
1752 dyn_src_ptr + dyn_src_off_array[dynvar+i],
1753 fixsize);
1754 dyn_dst_data_ptr += fixsize;
1755 }
1756 }
1757 dst_ptr = (Uint32*)dyn_dst_data_ptr;
1758 assert((UintPtr(dst_ptr) & 3) == 0);
1759 }
1760 return (Uint32 *)dst_ptr;
1761 }
1762
1763 /* ---------------------------------------------------------------- */
1764 /* ----------------------------- INSERT --------------------------- */
1765 /* ---------------------------------------------------------------- */
1766 void
prepare_initial_insert(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1767 Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
1768 Operationrec* regOperPtr,
1769 Tablerec* regTabPtr)
1770 {
1771 Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ?
1772 sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
1773 regOperPtr->nextActiveOp= RNIL;
1774 regOperPtr->prevActiveOp= RNIL;
1775 regOperPtr->op_struct.bit_field.in_active_list= true;
1776 regOperPtr->m_undo_buffer_space= disk_undo;
1777
1778 req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1779 req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1780
1781 Uint32 num_attr= regTabPtr->m_no_of_attributes;
1782 Uint32 descr_start= regTabPtr->tabDescriptor;
1783 Uint32 order_desc= regTabPtr->m_real_order_descriptor;
1784 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1785 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1786 req_struct->attr_descr= tab_descr;
1787 Uint16* order= (Uint16*)&tableDescriptor[order_desc];
1788 order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
1789
1790 Uint32 bits = Tuple_header::COPY_TUPLE;
1791 bits |= disk_undo ? (Tuple_header::DISK_ALLOC|Tuple_header::DISK_INLINE) : 0;
1792
1793 const Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
1794 const Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
1795 const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
1796 const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
1797 const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
1798 Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
1799 Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
1800
1801 if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
1802 {
1803 ref->m_page_no = RNIL;
1804 ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
1805 }
1806
1807 if(mm_vars || mm_dyns)
1808 {
1809 jam();
1810 /* Init Varpart_copy struct */
1811 Varpart_copy * cp = (Varpart_copy*)ptr;
1812 cp->m_len = 0;
1813 ptr += Varpart_copy::SZ32;
1814
1815 /* Prepare empty varsize part. */
1816 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
1817
1818 if (mm_vars)
1819 {
1820 dst->m_data_ptr= (char*)(((Uint16*)ptr)+mm_vars+1);
1821 dst->m_offset_array_ptr= req_struct->var_pos_array;
1822 dst->m_var_len_offset= mm_vars;
1823 dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
1824
1825 Uint32 pos= 0;
1826 Uint16 *pos_ptr = req_struct->var_pos_array;
1827 Uint16 *len_ptr = pos_ptr + mm_vars;
1828 for(Uint32 i= 0; i<mm_vars; i++)
1829 {
1830 * pos_ptr++ = pos;
1831 * len_ptr++ = pos;
1832 pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
1833 }
1834
1835 // Disk/dynamic part is 32-bit aligned
1836 ptr = ALIGN_WORD(dst->m_data_ptr+pos);
1837 ndbassert(ptr == ALIGN_WORD(dst->m_data_ptr +
1838 regTabPtr->m_offsets[MM].m_max_var_offset));
1839 }
1840
1841 if (mm_dyns)
1842 {
1843 jam();
1844 /* Prepare empty dynamic part. */
1845 dst->m_dyn_data_ptr= (char *)ptr;
1846 dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
1847 dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
1848 dst->m_max_dyn_offset= regTabPtr->m_offsets[MM].m_max_dyn_offset;
1849
1850 ptr = expand_dyn_part(dst, 0, 0,
1851 (Uint32*)tab_descr, order,
1852 mm_dynvar, mm_dynfix,
1853 regTabPtr->m_offsets[MM].m_dyn_null_words);
1854 }
1855
1856 ndbassert((UintPtr(ptr)&3) == 0);
1857 }
1858
1859 req_struct->m_disk_ptr= (Tuple_header*)ptr;
1860
1861 ndbrequire(dd_vars == 0);
1862
1863 req_struct->m_tuple_ptr->m_header_bits= bits;
1864
1865 // Set all null bits
1866 memset(req_struct->m_tuple_ptr->m_null_bits+
1867 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1868 4*regTabPtr->m_offsets[MM].m_null_words);
1869 memset(req_struct->m_disk_ptr->m_null_bits+
1870 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1871 4*regTabPtr->m_offsets[DD].m_null_words);
1872 }
1873
handleInsertReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> fragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,Local_key ** accminupdateptr)1874 int Dbtup::handleInsertReq(Signal* signal,
1875 Ptr<Operationrec> regOperPtr,
1876 Ptr<Fragrecord> fragPtr,
1877 Tablerec* regTabPtr,
1878 KeyReqStruct *req_struct,
1879 Local_key ** accminupdateptr)
1880 {
1881 Uint32 tup_version = 1;
1882 Fragrecord* regFragPtr = fragPtr.p;
1883 Uint32 *ptr= 0;
1884 Tuple_header *dst;
1885 Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
1886 Tuple_header *tuple_ptr;
1887
1888 bool disk = regTabPtr->m_no_of_disk_attributes > 0;
1889 bool mem_insert = regOperPtr.p->is_first_operation();
1890 bool disk_insert = mem_insert && disk;
1891 bool vardynsize = (regTabPtr->m_attributes[MM].m_no_of_varsize ||
1892 regTabPtr->m_attributes[MM].m_no_of_dynamic);
1893 bool varalloc = vardynsize || regTabPtr->m_bits & Tablerec::TR_ForceVarPart;
1894 bool rowid = req_struct->m_use_rowid;
1895 bool update_acc = false;
1896 Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1897 Uint32 frag_page_id = req_struct->frag_page_id;
1898
1899 union {
1900 Uint32 sizes[4];
1901 Uint64 cmp[2];
1902 };
1903 cmp[0] = cmp[1] = 0;
1904
1905 if (ERROR_INSERTED(4014))
1906 {
1907 dst = 0;
1908 goto undo_buffer_error;
1909 }
1910
1911 dst= alloc_copy_tuple(regTabPtr, ®OperPtr.p->m_copy_tuple_location);
1912
1913 if (unlikely(dst == 0))
1914 {
1915 goto undo_buffer_error;
1916 }
1917 tuple_ptr= req_struct->m_tuple_ptr= dst;
1918 set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
1919
1920 if(mem_insert)
1921 {
1922 jam();
1923 prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
1924 }
1925 else
1926 {
1927 Operationrec* prevOp= req_struct->prevOpPtr.p;
1928 ndbassert(prevOp->op_type == ZDELETE);
1929 tup_version= prevOp->op_struct.bit_field.tupVersion + 1;
1930
1931 if(!prevOp->is_first_operation())
1932 {
1933 jam();
1934 org= get_copy_tuple(&prevOp->m_copy_tuple_location);
1935 }
1936 if (regTabPtr->need_expand())
1937 {
1938 expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
1939 memset(req_struct->m_disk_ptr->m_null_bits+
1940 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1941 4*regTabPtr->m_offsets[DD].m_null_words);
1942
1943 Uint32 bm_size_in_bytes= 4*(regTabPtr->m_offsets[MM].m_dyn_null_words);
1944 if (bm_size_in_bytes)
1945 {
1946 Uint32* ptr =
1947 (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
1948 bzero(ptr, bm_size_in_bytes);
1949 * ptr = bm_size_in_bytes >> 2;
1950 }
1951 }
1952 else
1953 {
1954 memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1955 tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1956 }
1957 memset(tuple_ptr->m_null_bits+
1958 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1959 4*regTabPtr->m_offsets[MM].m_null_words);
1960 }
1961
1962 int res;
1963 if (disk_insert)
1964 {
1965 jam();
1966 if (ERROR_INSERTED(4015))
1967 {
1968 terrorCode = 1501;
1969 goto log_space_error;
1970 }
1971
1972 D("Logfile_client - handleInsertReq");
1973 Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1974 res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space, jamBuffer());
1975 if(unlikely(res))
1976 {
1977 jam();
1978 terrorCode= res;
1979 goto log_space_error;
1980 }
1981 }
1982
1983 regOperPtr.p->op_struct.bit_field.tupVersion=
1984 tup_version & ZTUP_VERSION_MASK;
1985 tuple_ptr->set_tuple_version(tup_version);
1986
1987 if (ERROR_INSERTED(4016))
1988 {
1989 terrorCode = ZAI_INCONSISTENCY_ERROR;
1990 goto update_error;
1991 }
1992
1993 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1994 {
1995 Uint32 attrId =
1996 regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1997
1998 store_extra_row_bits(attrId, regTabPtr, tuple_ptr, /* default */ 0, false);
1999 }
2000
2001 if (!regTabPtr->m_default_value_location.isNull())
2002 {
2003 jam();
2004 Uint32 default_values_len;
2005 /* Get default values ptr + len for this table */
2006 Uint32* default_values = get_default_ptr(regTabPtr, default_values_len);
2007 ndbrequire(default_values_len != 0 && default_values != NULL);
2008 /*
2009 * Update default values into row first,
2010 * next update with data received from the client.
2011 */
2012 if(unlikely((res = updateAttributes(req_struct, default_values,
2013 default_values_len)) < 0))
2014 {
2015 jam();
2016 terrorCode = Uint32(-res);
2017 goto update_error;
2018 }
2019 }
2020
2021 if(unlikely((res = updateAttributes(req_struct, &cinBuffer[0],
2022 req_struct->attrinfo_len)) < 0))
2023 {
2024 terrorCode = Uint32(-res);
2025 goto update_error;
2026 }
2027
2028 if (ERROR_INSERTED(4017))
2029 {
2030 goto null_check_error;
2031 }
2032 if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
2033 {
2034 goto null_check_error;
2035 }
2036
2037 if (req_struct->m_is_lcp)
2038 {
2039 jam();
2040 sizes[2+MM] = req_struct->m_lcp_varpart_len;
2041 }
2042 else if (regTabPtr->need_shrink())
2043 {
2044 shrink_tuple(req_struct, sizes+2, regTabPtr, true);
2045 }
2046
2047 if (ERROR_INSERTED(4025))
2048 {
2049 goto mem_error;
2050 }
2051
2052 if (ERROR_INSERTED(4026))
2053 {
2054 CLEAR_ERROR_INSERT_VALUE;
2055 goto mem_error;
2056 }
2057
2058 if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
2059 {
2060 goto mem_error;
2061 }
2062
2063 if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
2064 {
2065 CLEAR_ERROR_INSERT_VALUE;
2066 goto mem_error;
2067 }
2068
2069 /**
2070 * Alloc memory
2071 */
2072 if(mem_insert)
2073 {
2074 terrorCode = 0;
2075 if (!rowid)
2076 {
2077 if (ERROR_INSERTED(4018))
2078 {
2079 goto mem_error;
2080 }
2081
2082 if (!varalloc)
2083 {
2084 jam();
2085 ptr= alloc_fix_rec(jamBuffer(),
2086 &terrorCode,
2087 regFragPtr,
2088 regTabPtr,
2089 ®OperPtr.p->m_tuple_location,
2090 &frag_page_id);
2091 }
2092 else
2093 {
2094 jam();
2095 regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
2096 ptr= alloc_var_rec(&terrorCode,
2097 regFragPtr, regTabPtr,
2098 sizes[2+MM],
2099 ®OperPtr.p->m_tuple_location,
2100 &frag_page_id);
2101 }
2102 if (unlikely(ptr == 0))
2103 {
2104 goto mem_error;
2105 }
2106 req_struct->m_use_rowid = true;
2107 }
2108 else
2109 {
2110 regOperPtr.p->m_tuple_location = req_struct->m_row_id;
2111 if (ERROR_INSERTED(4019))
2112 {
2113 terrorCode = ZROWID_ALLOCATED;
2114 goto alloc_rowid_error;
2115 }
2116
2117 if (!varalloc)
2118 {
2119 jam();
2120 ptr= alloc_fix_rowid(&terrorCode,
2121 regFragPtr,
2122 regTabPtr,
2123 ®OperPtr.p->m_tuple_location,
2124 &frag_page_id);
2125 }
2126 else
2127 {
2128 jam();
2129 regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
2130 ptr= alloc_var_rowid(&terrorCode,
2131 regFragPtr, regTabPtr,
2132 sizes[2+MM],
2133 ®OperPtr.p->m_tuple_location,
2134 &frag_page_id);
2135 }
2136 if (unlikely(ptr == 0))
2137 {
2138 jam();
2139 goto alloc_rowid_error;
2140 }
2141 }
2142 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
2143 update_acc = true; /* Will be updated later once success is known */
2144
2145 base = (Tuple_header*)ptr;
2146 base->m_operation_ptr_i= regOperPtr.i;
2147 base->m_header_bits= Tuple_header::ALLOC |
2148 (sizes[2+MM] > 0 ? Tuple_header::VAR_PART : 0);
2149 /**
2150 * No need to set checksum here, the tuple is allocated, but contains
2151 * no data, so if we attempt to read it in this state we even want the
2152 * checksum to be wrong since it is not allowed to read the tuple in
2153 * this state.
2154 */
2155 }
2156 else
2157 {
2158 if (ERROR_INSERTED(4020))
2159 {
2160 goto size_change_error;
2161 }
2162
2163 if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
2164 unlikely(handle_size_change_after_update(req_struct,
2165 base,
2166 regOperPtr.p,
2167 regFragPtr,
2168 regTabPtr,
2169 sizes) != 0))
2170 {
2171 goto size_change_error;
2172 }
2173 req_struct->m_use_rowid = false;
2174 Uint32 old_header = base->m_header_bits;
2175 base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
2176 Uint32 new_header = base->m_header_bits;
2177 if (old_header != new_header)
2178 {
2179 jam();
2180 updateChecksum(base, regTabPtr, old_header, new_header);
2181 }
2182 }
2183
2184 if (disk_insert)
2185 {
2186 jam();
2187 Local_key tmp;
2188 Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
2189 1 : sizes[2+DD];
2190
2191 if (ERROR_INSERTED(4021))
2192 {
2193 terrorCode = 1601;
2194 goto disk_prealloc_error;
2195 }
2196
2197 if (!Local_key::isShort(frag_page_id))
2198 {
2199 jam();
2200 terrorCode = 1603;
2201 goto disk_prealloc_error;
2202 }
2203
2204 int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
2205 if (unlikely(ret < 0))
2206 {
2207 jam();
2208 terrorCode = -ret;
2209 goto disk_prealloc_error;
2210 }
2211
2212 regOperPtr.p->op_struct.bit_field.m_disk_preallocated= 1;
2213 tmp.m_page_idx= size;
2214 memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
2215
2216 /**
2217 * Set ref from disk to mm
2218 */
2219 Local_key ref = regOperPtr.p->m_tuple_location;
2220 ref.m_page_no = frag_page_id;
2221
2222 Tuple_header* disk_ptr= req_struct->m_disk_ptr;
2223 disk_ptr->m_header_bits = 0;
2224 disk_ptr->m_base_record_ref= ref.ref();
2225 }
2226
2227 if (req_struct->m_reorg != ScanFragReq::REORG_ALL)
2228 {
2229 handle_reorg(req_struct, regFragPtr->fragStatus);
2230 }
2231
2232 /* Have been successful with disk + mem, update ACC to point to
2233 * new record if necessary
2234 * Failures in disk alloc will skip this part
2235 */
2236 if (update_acc)
2237 {
2238 /* Acc stores the local key with the frag_page_id rather
2239 * than the real_page_id
2240 */
2241 ndbassert(regOperPtr.p->m_tuple_location.m_page_no == real_page_id);
2242
2243 Local_key accKey = regOperPtr.p->m_tuple_location;
2244 accKey.m_page_no = frag_page_id;
2245 ** accminupdateptr = accKey;
2246 }
2247 else
2248 {
2249 * accminupdateptr = 0; // No accminupdate should be performed
2250 }
2251
2252 setChecksum(req_struct->m_tuple_ptr, regTabPtr);
2253
2254 set_tuple_state(regOperPtr.p, TUPLE_PREPARED);
2255
2256 return 0;
2257
2258 size_change_error:
2259 jam();
2260 terrorCode = ZMEM_NOMEM_ERROR;
2261 goto exit_error;
2262
2263 undo_buffer_error:
2264 jam();
2265 terrorCode= ZMEM_NOMEM_ERROR;
2266 regOperPtr.p->m_undo_buffer_space = 0;
2267 if (mem_insert)
2268 regOperPtr.p->m_tuple_location.setNull();
2269 regOperPtr.p->m_copy_tuple_location.setNull();
2270 tupkeyErrorLab(req_struct);
2271 return -1;
2272
2273 null_check_error:
2274 jam();
2275 terrorCode= ZNO_ILLEGAL_NULL_ATTR;
2276 goto update_error;
2277
2278 mem_error:
2279 jam();
2280 if (terrorCode == 0)
2281 {
2282 terrorCode= ZMEM_NOMEM_ERROR;
2283 }
2284 goto update_error;
2285
2286 log_space_error:
2287 jam();
2288 regOperPtr.p->m_undo_buffer_space = 0;
2289 alloc_rowid_error:
2290 jam();
2291 update_error:
2292 jam();
2293 if (mem_insert)
2294 {
2295 regOperPtr.p->op_struct.bit_field.in_active_list = false;
2296 regOperPtr.p->m_tuple_location.setNull();
2297 }
2298 exit_error:
2299 if (!regOperPtr.p->m_tuple_location.isNull())
2300 {
2301 jam();
2302 /* Memory allocated, abort insert, releasing memory if appropriate */
2303 signal->theData[0] = regOperPtr.i;
2304 do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS | ZABORT_DEALLOC);
2305 }
2306 tupkeyErrorLab(req_struct);
2307 return -1;
2308
2309 disk_prealloc_error:
2310 jam();
2311 base->m_header_bits |= Tuple_header::FREED;
2312 setInvalidChecksum(base, regTabPtr);
2313 goto exit_error;
2314 }
2315
2316 /* ---------------------------------------------------------------- */
2317 /* ---------------------------- DELETE ---------------------------- */
2318 /* ---------------------------------------------------------------- */
handleDeleteReq(Signal * signal,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2319 int Dbtup::handleDeleteReq(Signal* signal,
2320 Operationrec* regOperPtr,
2321 Fragrecord* regFragPtr,
2322 Tablerec* regTabPtr,
2323 KeyReqStruct *req_struct,
2324 bool disk)
2325 {
2326 Tuple_header* dst = alloc_copy_tuple(regTabPtr,
2327 ®OperPtr->m_copy_tuple_location);
2328 if (dst == 0) {
2329 terrorCode = ZMEM_NOMEM_ERROR;
2330 goto error;
2331 }
2332
2333 // delete must set but not increment tupVersion
2334 if (!regOperPtr->is_first_operation())
2335 {
2336 jam();
2337 Operationrec* prevOp= req_struct->prevOpPtr.p;
2338 regOperPtr->op_struct.bit_field.tupVersion=
2339 prevOp->op_struct.bit_field.tupVersion;
2340 // make copy since previous op is committed before this one
2341 const Tuple_header* org = get_copy_tuple(&prevOp->m_copy_tuple_location);
2342 Uint32 len = regTabPtr->total_rec_size -
2343 Uint32(((Uint32*)dst) -
2344 get_copy_tuple_raw(®OperPtr->m_copy_tuple_location));
2345 memcpy(dst, org, 4 * len);
2346 req_struct->m_tuple_ptr = dst;
2347 }
2348 else
2349 {
2350 regOperPtr->op_struct.bit_field.tupVersion=
2351 req_struct->m_tuple_ptr->get_tuple_version();
2352 if (regTabPtr->m_no_of_disk_attributes)
2353 {
2354 dst->m_header_bits = req_struct->m_tuple_ptr->m_header_bits;
2355 memcpy(dst->get_disk_ref_ptr(regTabPtr),
2356 req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
2357 sizeof(Local_key));
2358 }
2359 }
2360 req_struct->changeMask.set();
2361 set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
2362
2363 if(disk && regOperPtr->m_undo_buffer_space == 0)
2364 {
2365 jam();
2366 regOperPtr->op_struct.bit_field.m_wait_log_buffer = 1;
2367 regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit = 1;
2368 Uint32 sz= regOperPtr->m_undo_buffer_space=
2369 (sizeof(Dbtup::Disk_undo::Free) >> 2) +
2370 regTabPtr->m_offsets[DD].m_fix_header_size - 1;
2371
2372 D("Logfile_client - handleDeleteReq");
2373 Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
2374 terrorCode= lgman.alloc_log_space(sz, jamBuffer());
2375 if(unlikely(terrorCode))
2376 {
2377 jam();
2378 regOperPtr->m_undo_buffer_space= 0;
2379 goto error;
2380 }
2381 }
2382
2383 set_tuple_state(regOperPtr, TUPLE_PREPARED);
2384
2385 if (req_struct->attrinfo_len == 0)
2386 {
2387 return 0;
2388 }
2389
2390 if (regTabPtr->need_expand(disk))
2391 {
2392 prepare_read(req_struct, regTabPtr, disk);
2393 }
2394
2395 {
2396 Uint32 RlogSize;
2397 int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
2398 if (ret == 0 && (RlogSize= req_struct->log_size))
2399 {
2400 jam();
2401 sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2402 }
2403 return ret;
2404 }
2405
2406 error:
2407 tupkeyErrorLab(req_struct);
2408 return -1;
2409 }
2410
2411 int
handleRefreshReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2412 Dbtup::handleRefreshReq(Signal* signal,
2413 Ptr<Operationrec> regOperPtr,
2414 Ptr<Fragrecord> regFragPtr,
2415 Tablerec* regTabPtr,
2416 KeyReqStruct *req_struct,
2417 bool disk)
2418 {
2419 /* Here we setup the tuple so that a transition to its current
2420 * state can be observed by SUMA's detached triggers.
2421 *
2422 * If the tuple does not exist then we fabricate a tuple
2423 * so that it can appear to be 'deleted'.
2424 * The fabricated tuple may have invalid NULL values etc.
2425 * If the tuple does exist then we fabricate a null-change
2426 * update to the tuple.
2427 *
2428 * The logic differs depending on whether there are already
2429 * other operations on the tuple in this transaction.
2430 * No other operations (including Refresh) are allowed after
2431 * a refresh.
2432 */
2433 Uint32 refresh_case;
2434 if (regOperPtr.p->is_first_operation())
2435 {
2436 jam();
2437 if (Local_key::isInvalid(req_struct->frag_page_id,
2438 regOperPtr.p->m_tuple_location.m_page_idx))
2439 {
2440 jam();
2441 refresh_case = Operationrec::RF_SINGLE_NOT_EXIST;
2442 //ndbout_c("case 1");
2443 /**
2444 * This is refresh of non-existing tuple...
2445 * i.e "delete", reuse initial insert
2446 */
2447 Local_key accminupdate;
2448 Local_key * accminupdateptr = &accminupdate;
2449
2450 /**
2451 * We don't need ...in this scenario
2452 * - disk
2453 * - default values
2454 */
2455 Uint32 save_disk = regTabPtr->m_no_of_disk_attributes;
2456 Local_key save_defaults = regTabPtr->m_default_value_location;
2457 Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2458 regTabPtr->notNullAttributeMask;
2459
2460 regTabPtr->m_no_of_disk_attributes = 0;
2461 regTabPtr->m_default_value_location.setNull();
2462 regOperPtr.p->op_type = ZINSERT;
2463
2464 /**
2465 * Update notNullAttributeMask to only include primary keys
2466 */
2467 regTabPtr->notNullAttributeMask.clear();
2468 const Uint32 * primarykeys =
2469 (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2470 for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2471 regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2472
2473 int res = handleInsertReq(signal, regOperPtr,
2474 regFragPtr, regTabPtr, req_struct,
2475 &accminupdateptr);
2476
2477 regTabPtr->m_no_of_disk_attributes = save_disk;
2478 regTabPtr->m_default_value_location = save_defaults;
2479 regTabPtr->notNullAttributeMask = save_mask;
2480
2481 if (unlikely(res == -1))
2482 {
2483 return -1;
2484 }
2485
2486 regOperPtr.p->op_type = ZREFRESH;
2487
2488 if (accminupdateptr)
2489 {
2490 /**
2491 * Update ACC local-key, once *everything* has completed succesfully
2492 */
2493 c_lqh->accminupdate(signal,
2494 regOperPtr.p->userpointer,
2495 accminupdateptr);
2496 }
2497 }
2498 else
2499 {
2500 refresh_case = Operationrec::RF_SINGLE_EXIST;
2501 //ndbout_c("case 2");
2502 jam();
2503
2504 Tuple_header* origTuple = req_struct->m_tuple_ptr;
2505 Uint32 tup_version_save = origTuple->get_tuple_version();
2506 {
2507 /* Set new row version and update the tuple header */
2508 Uint32 old_header = origTuple->m_header_bits;
2509 Uint32 new_tup_version = decr_tup_version(tup_version_save);
2510 origTuple->set_tuple_version(new_tup_version);
2511 Uint32 new_header = origTuple->m_header_bits;
2512 updateChecksum(origTuple, regTabPtr, old_header, new_header);
2513 }
2514 int res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2515 regTabPtr, req_struct, disk);
2516
2517 /* Now we must reset the original tuple header back
2518 * to the original version.
2519 * The copy tuple will have the correct version due to
2520 * the update incrementing it.
2521 * On commit, the tuple becomes the copy tuple.
2522 * On abort, the original tuple remains. If we don't
2523 * reset it here, then aborts cause the version to
2524 * decrease
2525 *
2526 * We also need to recalculate checksum since we're changing the
2527 * row here.
2528 */
2529 {
2530 Uint32 old_header = origTuple->m_header_bits;
2531 origTuple->set_tuple_version(tup_version_save);
2532 Uint32 new_header = origTuple->m_header_bits;
2533 updateChecksum(origTuple, regTabPtr, old_header, new_header);
2534 }
2535 if (res == -1)
2536 return -1;
2537 }
2538 }
2539 else
2540 {
2541 /* Not first operation on tuple in transaction */
2542 jam();
2543
2544 Uint32 tup_version_save =
2545 req_struct->prevOpPtr.p->op_struct.bit_field.tupVersion;
2546 Uint32 new_tup_version = decr_tup_version(tup_version_save);
2547 req_struct->prevOpPtr.p->op_struct.bit_field.tupVersion = new_tup_version;
2548
2549 int res;
2550 if (req_struct->prevOpPtr.p->op_type == ZDELETE)
2551 {
2552 refresh_case = Operationrec::RF_MULTI_NOT_EXIST;
2553 //ndbout_c("case 3");
2554
2555 jam();
2556 /**
2557 * We don't need ...in this scenario
2558 * - default values
2559 *
2560 * We keep disk attributes to avoid issues with 'insert'
2561 */
2562 Local_key save_defaults = regTabPtr->m_default_value_location;
2563 Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2564 regTabPtr->notNullAttributeMask;
2565
2566 regTabPtr->m_default_value_location.setNull();
2567 regOperPtr.p->op_type = ZINSERT;
2568
2569 /**
2570 * Update notNullAttributeMask to only include primary keys
2571 */
2572 regTabPtr->notNullAttributeMask.clear();
2573 const Uint32 * primarykeys =
2574 (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2575 for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2576 regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2577
2578 /**
2579 * This is multi-update + DELETE + REFRESH
2580 */
2581 Local_key * accminupdateptr = 0;
2582 res = handleInsertReq(signal, regOperPtr,
2583 regFragPtr, regTabPtr, req_struct,
2584 &accminupdateptr);
2585
2586 regTabPtr->m_default_value_location = save_defaults;
2587 regTabPtr->notNullAttributeMask = save_mask;
2588
2589 if (unlikely(res == -1))
2590 {
2591 return -1;
2592 }
2593
2594 regOperPtr.p->op_type = ZREFRESH;
2595 }
2596 else
2597 {
2598 jam();
2599 refresh_case = Operationrec::RF_MULTI_EXIST;
2600 //ndbout_c("case 4");
2601 /**
2602 * This is multi-update + INSERT/UPDATE + REFRESH
2603 */
2604 res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2605 regTabPtr, req_struct, disk);
2606 }
2607 req_struct->prevOpPtr.p->op_struct.bit_field.tupVersion = tup_version_save;
2608 if (res == -1)
2609 return -1;
2610 }
2611
2612 /* Store the refresh scenario in the copy tuple location */
2613 // TODO : Verify this is never used as a copy tuple location!
2614 regOperPtr.p->m_copy_tuple_location.m_file_no = refresh_case;
2615 return 0;
2616 }
2617
2618 bool
checkNullAttributes(KeyReqStruct * req_struct,Tablerec * regTabPtr)2619 Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
2620 Tablerec* regTabPtr)
2621 {
2622 // Implement checking of updating all not null attributes in an insert here.
2623 Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
2624 /*
2625 * The idea here is maybe that changeMask is not-null attributes
2626 * and must contain notNullAttributeMask. But:
2627 *
2628 * 1. changeMask has all bits set on insert
2629 * 2. not-null is checked in each UpdateFunction
2630 * 3. the code below does not work except trivially due to 1.
2631 *
2632 * XXX remove or fix
2633 */
2634 attributeMask.clear();
2635 attributeMask.bitOR(req_struct->changeMask);
2636 attributeMask.bitAND(regTabPtr->notNullAttributeMask);
2637 attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
2638 if (!attributeMask.isclear()) {
2639 return false;
2640 }
2641 return true;
2642 }
2643
2644 /* ---------------------------------------------------------------- */
2645 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE */
2646 /* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
2647 /* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL */
2648 /* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR PHASES*/
2649 /* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
2650 /* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
2651 /* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE */
2652 /* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
2653 /* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL. */
2654 /* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
2655 /* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN */
2656 /* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND */
2657 /* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM */
2658 /* TO THE CLIENT APPLICATION. */
2659 /* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
2660 /* THE INTERPRETER EXECUTION REGION. */
2661 /* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS */
2662 /* */
2663 /* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
2664 /* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
2665 /* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED */
2666 /* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER. */
2667 /* */
2668 /* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY */
2669 /* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO */
2670 /* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN */
2671 /* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS. */
2672 /* */
2673 /* */
2674 /* ----------------------------------------- */
2675 /* + INITIAL READ REGION + */
2676 /* ----------------------------------------- */
2677 /* + INTERPRETED EXECUTE REGION + */
2678 /* ----------------------------------------- */
2679 /* + FINAL UPDATE REGION + */
2680 /* ----------------------------------------- */
2681 /* + FINAL READ REGION + */
2682 /* ----------------------------------------- */
2683 /* + SUBROUTINE REGION + */
2684 /* ----------------------------------------- */
2685 /* ---------------------------------------------------------------- */
2686 /* ---------------------------------------------------------------- */
2687 /* ----------------- INTERPRETED EXECUTION ----------------------- */
2688 /* ---------------------------------------------------------------- */
interpreterStartLab(Signal * signal,KeyReqStruct * req_struct)2689 int Dbtup::interpreterStartLab(Signal* signal,
2690 KeyReqStruct *req_struct)
2691 {
2692 Operationrec * const regOperPtr = req_struct->operPtrP;
2693 int TnoDataRW;
2694 Uint32 RtotalLen, start_index, dstLen;
2695 Uint32 *dst;
2696
2697 Uint32 RinitReadLen= cinBuffer[0];
2698 Uint32 RexecRegionLen= cinBuffer[1];
2699 Uint32 RfinalUpdateLen= cinBuffer[2];
2700 Uint32 RfinalRLen= cinBuffer[3];
2701 Uint32 RsubLen= cinBuffer[4];
2702
2703 jam();
2704
2705 Uint32 RattrinbufLen= req_struct->attrinfo_len;
2706 const BlockReference sendBref= req_struct->rec_blockref;
2707
2708 const Uint32 node = refToNode(sendBref);
2709 if(node != 0 && node != getOwnNodeId()) {
2710 start_index= 25;
2711 } else {
2712 jam();
2713 /**
2714 * execute direct
2715 */
2716 start_index= 3;
2717 }
2718 dst= &signal->theData[start_index];
2719 dstLen= (MAX_READ / 4) - start_index;
2720
2721 RtotalLen= RinitReadLen;
2722 RtotalLen += RexecRegionLen;
2723 RtotalLen += RfinalUpdateLen;
2724 RtotalLen += RfinalRLen;
2725 RtotalLen += RsubLen;
2726
2727 Uint32 RattroutCounter= 0;
2728 Uint32 RinstructionCounter= 5;
2729
2730 /* All information to be logged/propagated to replicas
2731 * is generated from here on so reset the log word count
2732 */
2733 Uint32 RlogSize= req_struct->log_size= 0;
2734 if (((RtotalLen + 5) == RattrinbufLen) &&
2735 (RattrinbufLen >= 5) &&
2736 (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
2737 /* ---------------------------------------------------------------- */
2738 // We start by checking consistency. We must have the first five
2739 // words of the ATTRINFO to give us the length of the regions. The
2740 // size of these regions must be the same as the total ATTRINFO
2741 // length and finally the total length must be within the limits.
2742 /* ---------------------------------------------------------------- */
2743
2744 if (RinitReadLen > 0) {
2745 jam();
2746 /* ---------------------------------------------------------------- */
2747 // The first step that can be taken in the interpreter is to read
2748 // data of the tuple before any updates have been applied.
2749 /* ---------------------------------------------------------------- */
2750 TnoDataRW= readAttributes(req_struct,
2751 &cinBuffer[5],
2752 RinitReadLen,
2753 &dst[0],
2754 dstLen,
2755 false);
2756 if (TnoDataRW >= 0) {
2757 RattroutCounter= TnoDataRW;
2758 RinstructionCounter += RinitReadLen;
2759 } else {
2760 jam();
2761 terrorCode = Uint32(-TnoDataRW);
2762 tupkeyErrorLab(req_struct);
2763 return -1;
2764 }
2765 }
2766 if (RexecRegionLen > 0) {
2767 jam();
2768 /* ---------------------------------------------------------------- */
2769 // The next step is the actual interpreted execution. This executes
2770 // a register-based virtual machine which can read and write attributes
2771 // to and from registers.
2772 /* ---------------------------------------------------------------- */
2773 Uint32 RsubPC= RinstructionCounter + RexecRegionLen
2774 + RfinalUpdateLen + RfinalRLen;
2775 TnoDataRW= interpreterNextLab(signal,
2776 req_struct,
2777 &clogMemBuffer[0],
2778 &cinBuffer[RinstructionCounter],
2779 RexecRegionLen,
2780 &cinBuffer[RsubPC],
2781 RsubLen,
2782 &coutBuffer[0],
2783 sizeof(coutBuffer) / 4);
2784 if (TnoDataRW != -1) {
2785 RinstructionCounter += RexecRegionLen;
2786 RlogSize= TnoDataRW;
2787 } else {
2788 jam();
2789 /**
2790 * TUPKEY REF is sent from within interpreter
2791 */
2792 return -1;
2793 }
2794 }
2795
2796 if ((RlogSize > 0) ||
2797 (RfinalUpdateLen > 0))
2798 {
2799 /* Operation updates row,
2800 * reset author pseudo-col before update takes effect
2801 * This should probably occur only if the interpreted program
2802 * did not explicitly write the value, but that requires a bit
2803 * to record whether the value has been written.
2804 */
2805 Tablerec* regTabPtr = req_struct->tablePtrP;
2806 Tuple_header* dst = req_struct->m_tuple_ptr;
2807
2808 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
2809 {
2810 Uint32 attrId =
2811 regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
2812
2813 store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
2814 }
2815 }
2816
2817 if (RfinalUpdateLen > 0) {
2818 jam();
2819 /* ---------------------------------------------------------------- */
2820 // We can also apply a set of updates without any conditions as part
2821 // of the interpreted execution.
2822 /* ---------------------------------------------------------------- */
2823 if (regOperPtr->op_type == ZUPDATE) {
2824 TnoDataRW= updateAttributes(req_struct,
2825 &cinBuffer[RinstructionCounter],
2826 RfinalUpdateLen);
2827 if (TnoDataRW >= 0) {
2828 MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
2829 &cinBuffer[RinstructionCounter],
2830 RfinalUpdateLen);
2831 RinstructionCounter += RfinalUpdateLen;
2832 RlogSize += RfinalUpdateLen;
2833 } else {
2834 jam();
2835 terrorCode = Uint32(-TnoDataRW);
2836 tupkeyErrorLab(req_struct);
2837 return -1;
2838 }
2839 } else {
2840 return TUPKEY_abort(req_struct, 19);
2841 }
2842 }
2843 if (RfinalRLen > 0) {
2844 jam();
2845 /* ---------------------------------------------------------------- */
2846 // The final action is that we can also read the tuple after it has
2847 // been updated.
2848 /* ---------------------------------------------------------------- */
2849 TnoDataRW= readAttributes(req_struct,
2850 &cinBuffer[RinstructionCounter],
2851 RfinalRLen,
2852 &dst[RattroutCounter],
2853 (dstLen - RattroutCounter),
2854 false);
2855 if (TnoDataRW >= 0) {
2856 RattroutCounter += TnoDataRW;
2857 } else {
2858 jam();
2859 terrorCode = Uint32(-TnoDataRW);
2860 tupkeyErrorLab(req_struct);
2861 return -1;
2862 }
2863 }
2864 /* Add log words explicitly generated here to existing log size
2865 * - readAttributes can generate log for ANYVALUE column
2866 * It adds the words directly to req_struct->log_size
2867 * This is used for ANYVALUE and interpreted delete.
2868 */
2869 req_struct->log_size+= RlogSize;
2870 req_struct->read_length += RattroutCounter;
2871 sendReadAttrinfo(signal, req_struct, RattroutCounter);
2872 if (RlogSize > 0) {
2873 return sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2874 }
2875 return 0;
2876 } else {
2877 return TUPKEY_abort(req_struct, 22);
2878 }
2879 }
2880
2881 /* ---------------------------------------------------------------- */
2882 /* WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
2883 /* BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY */
2884 /* NODES. */
2885 /* INPUT: LOG_ATTRINFOPTR WHERE TO FETCH DATA FROM */
2886 /* TLOG_START FIRST INDEX TO LOG */
2887 /* TLOG_END LAST INDEX + 1 TO LOG */
2888 /* ---------------------------------------------------------------- */
sendLogAttrinfo(Signal * signal,KeyReqStruct * req_struct,Uint32 TlogSize,Operationrec * const regOperPtr)2889 int Dbtup::sendLogAttrinfo(Signal* signal,
2890 KeyReqStruct * req_struct,
2891 Uint32 TlogSize,
2892 Operationrec * const regOperPtr)
2893
2894 {
2895 /* Copy from Log buffer to segmented section,
2896 * then attach to ATTRINFO and execute direct
2897 * to LQH
2898 */
2899 ndbrequire( TlogSize > 0 );
2900 Uint32 longSectionIVal= RNIL;
2901 bool ok= appendToSection(longSectionIVal,
2902 &clogMemBuffer[0],
2903 TlogSize);
2904 if (unlikely(!ok))
2905 {
2906 /* Resource error, abort transaction */
2907 terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
2908 tupkeyErrorLab(req_struct);
2909 return -1;
2910 }
2911
2912 /* Send a TUP_ATTRINFO signal to LQH, which contains
2913 * the relevant user pointer and the attrinfo section's
2914 * IVAL
2915 */
2916 signal->theData[0]= regOperPtr->userpointer;
2917 signal->theData[1]= TlogSize;
2918 signal->theData[2]= longSectionIVal;
2919
2920 EXECUTE_DIRECT(DBLQH,
2921 GSN_TUP_ATTRINFO,
2922 signal,
2923 3);
2924 return 0;
2925 }
2926
2927 inline
2928 Uint32
brancher(Uint32 TheInstruction,Uint32 TprogramCounter)2929 Dbtup::brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
2930 {
2931 Uint32 TbranchDirection= TheInstruction >> 31;
2932 Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
2933 TprogramCounter--;
2934 if (TbranchDirection == 1) {
2935 jam();
2936 /* ---------------------------------------------------------------- */
2937 /* WE JUMP BACKWARDS. */
2938 /* ---------------------------------------------------------------- */
2939 return (TprogramCounter - TbranchLength);
2940 } else {
2941 jam();
2942 /* ---------------------------------------------------------------- */
2943 /* WE JUMP FORWARD. */
2944 /* ---------------------------------------------------------------- */
2945 return (TprogramCounter + TbranchLength);
2946 }
2947 }
2948
2949 const Uint32 *
lookupInterpreterParameter(Uint32 paramNo,const Uint32 * subptr,Uint32 sublen) const2950 Dbtup::lookupInterpreterParameter(Uint32 paramNo,
2951 const Uint32 * subptr,
2952 Uint32 sublen) const
2953 {
2954 /**
2955 * The parameters...are stored in the subroutine section
2956 *
2957 * WORD2 WORD3 WORD4 WORD5
2958 * [ P0 HEADER ] [ P0 DATA ] [ P1 HEADER ] [ P1 DATA ]
2959 *
2960 *
2961 * len=4 <=> 1 word
2962 */
2963 Uint32 pos = 0;
2964 while (paramNo)
2965 {
2966 const Uint32 * head = subptr + pos;
2967 Uint32 len = AttributeHeader::getDataSize(* head);
2968 paramNo --;
2969 pos += 1 + len;
2970 if (unlikely(pos >= sublen))
2971 return 0;
2972 }
2973
2974 const Uint32 * head = subptr + pos;
2975 Uint32 len = AttributeHeader::getDataSize(* head);
2976 if (unlikely(pos + 1 + len > sublen))
2977 return 0;
2978
2979 return head;
2980 }
2981
interpreterNextLab(Signal * signal,KeyReqStruct * req_struct,Uint32 * logMemory,Uint32 * mainProgram,Uint32 TmainProgLen,Uint32 * subroutineProg,Uint32 TsubroutineLen,Uint32 * tmpArea,Uint32 tmpAreaSz)2982 int Dbtup::interpreterNextLab(Signal* signal,
2983 KeyReqStruct* req_struct,
2984 Uint32* logMemory,
2985 Uint32* mainProgram,
2986 Uint32 TmainProgLen,
2987 Uint32* subroutineProg,
2988 Uint32 TsubroutineLen,
2989 Uint32 * tmpArea,
2990 Uint32 tmpAreaSz)
2991 {
2992 register Uint32* TcurrentProgram= mainProgram;
2993 register Uint32 TcurrentSize= TmainProgLen;
2994 register Uint32 TprogramCounter= 0;
2995 register Uint32 theInstruction;
2996 register Uint32 theRegister;
2997 Uint32 TdataWritten= 0;
2998 Uint32 RstackPtr= 0;
2999 union {
3000 Uint32 TregMemBuffer[32];
3001 Uint64 align[16];
3002 };
3003 (void)align; // kill warning
3004 Uint32 TstackMemBuffer[32];
3005
3006 Uint32& RnoOfInstructions = req_struct->no_exec_instructions;
3007 ndbassert(RnoOfInstructions == 0);
3008 /* ---------------------------------------------------------------- */
3009 // Initialise all 8 registers to contain the NULL value.
3010 // In this version we can handle 32 and 64 bit unsigned integers.
3011 // They are handled as 64 bit values. Thus the 32 most significant
3012 // bits are zeroed for 32 bit values.
3013 /* ---------------------------------------------------------------- */
3014 TregMemBuffer[0]= 0;
3015 TregMemBuffer[4]= 0;
3016 TregMemBuffer[8]= 0;
3017 TregMemBuffer[12]= 0;
3018 TregMemBuffer[16]= 0;
3019 TregMemBuffer[20]= 0;
3020 TregMemBuffer[24]= 0;
3021 TregMemBuffer[28]= 0;
3022 Uint32 tmpHabitant= ~0;
3023
3024 while (RnoOfInstructions < 8000) {
3025 /* ---------------------------------------------------------------- */
3026 /* EXECUTE THE NEXT INTERPRETER INSTRUCTION. */
3027 /* ---------------------------------------------------------------- */
3028 RnoOfInstructions++;
3029 theInstruction= TcurrentProgram[TprogramCounter];
3030 theRegister= Interpreter::getReg1(theInstruction) << 2;
3031 #ifdef TRACE_INTERPRETER
3032 ndbout_c("Interpreter : RnoOfInstructions : %u. TprogramCounter : %u. Opcode : %u",
3033 RnoOfInstructions, TprogramCounter, Interpreter::getOpCode(theInstruction));
3034 #endif
3035 if (TprogramCounter < TcurrentSize) {
3036 TprogramCounter++;
3037 switch (Interpreter::getOpCode(theInstruction)) {
3038 case Interpreter::READ_ATTR_INTO_REG:
3039 jam();
3040 /* ---------------------------------------------------------------- */
3041 // Read an attribute from the tuple into a register.
3042 // While reading an attribute we allow the attribute to be an array
3043 // as long as it fits in the 64 bits of the register.
3044 /* ---------------------------------------------------------------- */
3045 {
3046 Uint32 theAttrinfo= theInstruction;
3047 int TnoDataRW= readAttributes(req_struct,
3048 &theAttrinfo,
3049 (Uint32)1,
3050 &TregMemBuffer[theRegister],
3051 (Uint32)3,
3052 false);
3053 if (TnoDataRW == 2) {
3054 /* ------------------------------------------------------------- */
3055 // Two words read means that we get the instruction plus one 32
3056 // word read. Thus we set the register to be a 32 bit register.
3057 /* ------------------------------------------------------------- */
3058 TregMemBuffer[theRegister]= 0x50;
3059 // arithmetic conversion if big-endian
3060 * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
3061 } else if (TnoDataRW == 3) {
3062 /* ------------------------------------------------------------- */
3063 // Three words read means that we get the instruction plus two
3064 // 32 words read. Thus we set the register to be a 64 bit register.
3065 /* ------------------------------------------------------------- */
3066 TregMemBuffer[theRegister]= 0x60;
3067 TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
3068 TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
3069 } else if (TnoDataRW == 1) {
3070 /* ------------------------------------------------------------- */
3071 // One word read means that we must have read a NULL value. We set
3072 // the register to indicate a NULL value.
3073 /* ------------------------------------------------------------- */
3074 TregMemBuffer[theRegister]= 0;
3075 TregMemBuffer[theRegister + 2]= 0;
3076 TregMemBuffer[theRegister + 3]= 0;
3077 } else if (TnoDataRW < 0) {
3078 jam();
3079 terrorCode = Uint32(-TnoDataRW);
3080 tupkeyErrorLab(req_struct);
3081 return -1;
3082 } else {
3083 /* ------------------------------------------------------------- */
3084 // Any other return value from the read attribute here is not
3085 // allowed and will lead to a system crash.
3086 /* ------------------------------------------------------------- */
3087 ndbrequire(false);
3088 }
3089 break;
3090 }
3091
3092 case Interpreter::WRITE_ATTR_FROM_REG:
3093 jam();
3094 {
3095 Uint32 TattrId= theInstruction >> 16;
3096 Uint32 TattrDescrIndex= req_struct->tablePtrP->tabDescriptor +
3097 (TattrId << ZAD_LOG_SIZE);
3098 Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
3099 Uint32 TregType= TregMemBuffer[theRegister];
3100
3101 /* --------------------------------------------------------------- */
3102 // Calculate the number of words of this attribute.
3103 // We allow writes into arrays as long as they fit into the 64 bit
3104 // register size.
3105 /* --------------------------------------------------------------- */
3106 Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
3107 Uint32 Toptype = req_struct->operPtrP->op_type;
3108 Uint32 TdataForUpdate[3];
3109 Uint32 Tlen;
3110
3111 AttributeHeader ah(TattrId, TattrNoOfWords << 2);
3112 TdataForUpdate[0]= ah.m_value;
3113 TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
3114 TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
3115 Tlen= TattrNoOfWords + 1;
3116 if (Toptype == ZUPDATE) {
3117 if (TattrNoOfWords <= 2) {
3118 if (TattrNoOfWords == 1) {
3119 // arithmetic conversion if big-endian
3120 Int64 * tmp = new (&TregMemBuffer[theRegister + 2]) Int64;
3121 TdataForUpdate[1] = Uint32(* tmp);
3122 TdataForUpdate[2] = 0;
3123 }
3124 if (TregType == 0) {
3125 /* --------------------------------------------------------- */
3126 // Write a NULL value into the attribute
3127 /* --------------------------------------------------------- */
3128 ah.setNULL();
3129 TdataForUpdate[0]= ah.m_value;
3130 Tlen= 1;
3131 }
3132 int TnoDataRW= updateAttributes(req_struct,
3133 &TdataForUpdate[0],
3134 Tlen);
3135 if (TnoDataRW >= 0) {
3136 /* --------------------------------------------------------- */
3137 // Write the written data also into the log buffer so that it
3138 // will be logged.
3139 /* --------------------------------------------------------- */
3140 logMemory[TdataWritten + 0]= TdataForUpdate[0];
3141 logMemory[TdataWritten + 1]= TdataForUpdate[1];
3142 logMemory[TdataWritten + 2]= TdataForUpdate[2];
3143 TdataWritten += Tlen;
3144 } else {
3145 terrorCode = Uint32(-TnoDataRW);
3146 tupkeyErrorLab(req_struct);
3147 return -1;
3148 }
3149 } else {
3150 return TUPKEY_abort(req_struct, 15);
3151 }
3152 } else {
3153 return TUPKEY_abort(req_struct, 16);
3154 }
3155 break;
3156 }
3157
3158 case Interpreter::LOAD_CONST_NULL:
3159 jam();
3160 TregMemBuffer[theRegister]= 0; /* NULL INDICATOR */
3161 break;
3162
3163 case Interpreter::LOAD_CONST16:
3164 jam();
3165 TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
3166 * (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
3167 break;
3168
3169 case Interpreter::LOAD_CONST32:
3170 jam();
3171 TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
3172 * (Int64*)(TregMemBuffer+theRegister+2)= *
3173 (TcurrentProgram+TprogramCounter);
3174 TprogramCounter++;
3175 break;
3176
3177 case Interpreter::LOAD_CONST64:
3178 jam();
3179 TregMemBuffer[theRegister]= 0x60; /* 64 BIT UNSIGNED CONSTANT */
3180 TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
3181 TprogramCounter++);
3182 TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
3183 TprogramCounter++);
3184 break;
3185
3186 case Interpreter::ADD_REG_REG:
3187 jam();
3188 {
3189 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3190 Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
3191
3192 Uint32 TrightType= TregMemBuffer[TrightRegister];
3193 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3194
3195
3196 Uint32 TleftType= TregMemBuffer[theRegister];
3197 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3198
3199 if ((TleftType | TrightType) != 0) {
3200 Uint64 Tdest0= Tleft0 + Tright0;
3201 * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
3202 TregMemBuffer[TdestRegister]= 0x60;
3203 } else {
3204 return TUPKEY_abort(req_struct, 20);
3205 }
3206 break;
3207 }
3208
3209 case Interpreter::SUB_REG_REG:
3210 jam();
3211 {
3212 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3213 Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
3214
3215 Uint32 TrightType= TregMemBuffer[TrightRegister];
3216 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3217
3218 Uint32 TleftType= TregMemBuffer[theRegister];
3219 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3220
3221 if ((TleftType | TrightType) != 0) {
3222 Int64 Tdest0= Tleft0 - Tright0;
3223 * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
3224 TregMemBuffer[TdestRegister]= 0x60;
3225 } else {
3226 return TUPKEY_abort(req_struct, 20);
3227 }
3228 break;
3229 }
3230
3231 case Interpreter::BRANCH:
3232 TprogramCounter= brancher(theInstruction, TprogramCounter);
3233 break;
3234
3235 case Interpreter::BRANCH_REG_EQ_NULL:
3236 if (TregMemBuffer[theRegister] != 0) {
3237 jam();
3238 continue;
3239 } else {
3240 jam();
3241 TprogramCounter= brancher(theInstruction, TprogramCounter);
3242 }
3243 break;
3244
3245 case Interpreter::BRANCH_REG_NE_NULL:
3246 if (TregMemBuffer[theRegister] == 0) {
3247 jam();
3248 continue;
3249 } else {
3250 jam();
3251 TprogramCounter= brancher(theInstruction, TprogramCounter);
3252 }
3253 break;
3254
3255
3256 case Interpreter::BRANCH_EQ_REG_REG:
3257 {
3258 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3259
3260 Uint32 TleftType= TregMemBuffer[theRegister];
3261 Uint32 Tleft0= TregMemBuffer[theRegister + 2];
3262 Uint32 Tleft1= TregMemBuffer[theRegister + 3];
3263
3264 Uint32 TrightType= TregMemBuffer[TrightRegister];
3265 Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
3266 Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
3267 if ((TrightType | TleftType) != 0) {
3268 jam();
3269 if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
3270 TprogramCounter= brancher(theInstruction, TprogramCounter);
3271 }
3272 } else {
3273 return TUPKEY_abort(req_struct, 23);
3274 }
3275 break;
3276 }
3277
3278 case Interpreter::BRANCH_NE_REG_REG:
3279 {
3280 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3281
3282 Uint32 TleftType= TregMemBuffer[theRegister];
3283 Uint32 Tleft0= TregMemBuffer[theRegister + 2];
3284 Uint32 Tleft1= TregMemBuffer[theRegister + 3];
3285
3286 Uint32 TrightType= TregMemBuffer[TrightRegister];
3287 Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
3288 Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
3289 if ((TrightType | TleftType) != 0) {
3290 jam();
3291 if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
3292 TprogramCounter= brancher(theInstruction, TprogramCounter);
3293 }
3294 } else {
3295 return TUPKEY_abort(req_struct, 24);
3296 }
3297 break;
3298 }
3299
3300 case Interpreter::BRANCH_LT_REG_REG:
3301 {
3302 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3303
3304 Uint32 TrightType= TregMemBuffer[TrightRegister];
3305 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3306
3307 Uint32 TleftType= TregMemBuffer[theRegister];
3308 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3309
3310
3311 if ((TrightType | TleftType) != 0) {
3312 jam();
3313 if (Tleft0 < Tright0) {
3314 TprogramCounter= brancher(theInstruction, TprogramCounter);
3315 }
3316 } else {
3317 return TUPKEY_abort(req_struct, 24);
3318 }
3319 break;
3320 }
3321
3322 case Interpreter::BRANCH_LE_REG_REG:
3323 {
3324 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3325
3326 Uint32 TrightType= TregMemBuffer[TrightRegister];
3327 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3328
3329 Uint32 TleftType= TregMemBuffer[theRegister];
3330 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3331
3332
3333 if ((TrightType | TleftType) != 0) {
3334 jam();
3335 if (Tleft0 <= Tright0) {
3336 TprogramCounter= brancher(theInstruction, TprogramCounter);
3337 }
3338 } else {
3339 return TUPKEY_abort(req_struct, 26);
3340 }
3341 break;
3342 }
3343
3344 case Interpreter::BRANCH_GT_REG_REG:
3345 {
3346 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3347
3348 Uint32 TrightType= TregMemBuffer[TrightRegister];
3349 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3350
3351 Uint32 TleftType= TregMemBuffer[theRegister];
3352 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3353
3354
3355 if ((TrightType | TleftType) != 0) {
3356 jam();
3357 if (Tleft0 > Tright0){
3358 TprogramCounter= brancher(theInstruction, TprogramCounter);
3359 }
3360 } else {
3361 return TUPKEY_abort(req_struct, 27);
3362 }
3363 break;
3364 }
3365
3366 case Interpreter::BRANCH_GE_REG_REG:
3367 {
3368 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3369
3370 Uint32 TrightType= TregMemBuffer[TrightRegister];
3371 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3372
3373 Uint32 TleftType= TregMemBuffer[theRegister];
3374 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3375
3376
3377 if ((TrightType | TleftType) != 0) {
3378 jam();
3379 if (Tleft0 >= Tright0){
3380 TprogramCounter= brancher(theInstruction, TprogramCounter);
3381 }
3382 } else {
3383 return TUPKEY_abort(req_struct, 28);
3384 }
3385 break;
3386 }
3387
3388 case Interpreter::BRANCH_ATTR_OP_ARG_2:
3389 case Interpreter::BRANCH_ATTR_OP_ARG:{
3390 jam();
3391 Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
3392 Uint32 ins2 = TcurrentProgram[TprogramCounter];
3393 Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
3394 Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
3395 Uint32 step = argLen;
3396
3397 if(tmpHabitant != attrId){
3398 Int32 TnoDataR = readAttributes(req_struct,
3399 &attrId, 1,
3400 tmpArea, tmpAreaSz,
3401 false);
3402
3403 if (TnoDataR < 0) {
3404 jam();
3405 terrorCode = Uint32(-TnoDataR);
3406 tupkeyErrorLab(req_struct);
3407 return -1;
3408 }
3409 tmpHabitant= attrId;
3410 }
3411
3412 // get type
3413 attrId >>= 16;
3414 Uint32 TattrDescrIndex = req_struct->tablePtrP->tabDescriptor +
3415 (attrId << ZAD_LOG_SIZE);
3416 Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
3417 Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
3418 Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
3419 void * cs = 0;
3420 if(AttributeOffset::getCharsetFlag(TattrDesc2))
3421 {
3422 Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
3423 cs = req_struct->tablePtrP->charsetArray[pos];
3424 }
3425 const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
3426
3427 // get data
3428 AttributeHeader ah(tmpArea[0]);
3429 const char* s1 = (char*)&tmpArea[1];
3430 const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
3431 // fixed length in 5.0
3432 Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
3433
3434 if (Interpreter::getOpCode(theInstruction) ==
3435 Interpreter::BRANCH_ATTR_OP_ARG_2)
3436 {
3437 jam();
3438 Uint32 paramNo = Interpreter::getBranchCol_ParamNo(ins2);
3439 const Uint32 * paramptr = lookupInterpreterParameter(paramNo,
3440 subroutineProg,
3441 TsubroutineLen);
3442 if (unlikely(paramptr == 0))
3443 {
3444 jam();
3445 terrorCode = 99; // TODO
3446 tupkeyErrorLab(req_struct);
3447 return -1;
3448 }
3449
3450 argLen = AttributeHeader::getByteSize(* paramptr);
3451 step = 0;
3452 s2 = (char*)(paramptr + 1);
3453 }
3454
3455 if (typeId == NDB_TYPE_BIT)
3456 {
3457 /* Size in bytes for bit fields can be incorrect due to
3458 * rounding down
3459 */
3460 Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
3461 + 7) / 8;
3462 attrLen= bitFieldAttrLen;
3463 }
3464
3465 bool r1_null = ah.isNULL();
3466 bool r2_null = argLen == 0;
3467 int res1;
3468 if (cond <= Interpreter::GE)
3469 {
3470 /* Inequality - EQ, NE, LT, LE, GT, GE */
3471 if (r1_null || r2_null) {
3472 // NULL==NULL and NULL<not-NULL
3473 res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
3474 } else {
3475 jam();
3476 if (unlikely(sqlType.m_cmp == 0))
3477 {
3478 return TUPKEY_abort(req_struct, 40);
3479 }
3480 res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen);
3481 }
3482 } else {
3483 if ((cond == Interpreter::LIKE) ||
3484 (cond == Interpreter::NOT_LIKE))
3485 {
3486 if (r1_null || r2_null) {
3487 // NULL like NULL is true (has no practical use)
3488 res1 = r1_null && r2_null ? 0 : -1;
3489 } else {
3490 jam();
3491 if (unlikely(sqlType.m_like == 0))
3492 {
3493 return TUPKEY_abort(req_struct, 40);
3494 }
3495 res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
3496 }
3497 }
3498 else
3499 {
3500 /* AND_XX_MASK condition */
3501 ndbassert(cond <= Interpreter::AND_NE_ZERO);
3502 if (unlikely(sqlType.m_mask == 0))
3503 {
3504 return TUPKEY_abort(req_struct,40);
3505 }
3506 /* If either arg is NULL, we say COL AND MASK
3507 * NE_ZERO and NE_MASK.
3508 */
3509 if (r1_null || r2_null) {
3510 res1= 1;
3511 } else {
3512
3513 bool cmpZero=
3514 (cond == Interpreter::AND_EQ_ZERO) ||
3515 (cond == Interpreter::AND_NE_ZERO);
3516
3517 res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
3518 }
3519 }
3520 }
3521
3522 int res = 0;
3523 switch ((Interpreter::BinaryCondition)cond) {
3524 case Interpreter::EQ:
3525 res = (res1 == 0);
3526 break;
3527 case Interpreter::NE:
3528 res = (res1 != 0);
3529 break;
3530 // note the condition is backwards
3531 case Interpreter::LT:
3532 res = (res1 > 0);
3533 break;
3534 case Interpreter::LE:
3535 res = (res1 >= 0);
3536 break;
3537 case Interpreter::GT:
3538 res = (res1 < 0);
3539 break;
3540 case Interpreter::GE:
3541 res = (res1 <= 0);
3542 break;
3543 case Interpreter::LIKE:
3544 res = (res1 == 0);
3545 break;
3546 case Interpreter::NOT_LIKE:
3547 res = (res1 == 1);
3548 break;
3549 case Interpreter::AND_EQ_MASK:
3550 res = (res1 == 0);
3551 break;
3552 case Interpreter::AND_NE_MASK:
3553 res = (res1 != 0);
3554 break;
3555 case Interpreter::AND_EQ_ZERO:
3556 res = (res1 == 0);
3557 break;
3558 case Interpreter::AND_NE_ZERO:
3559 res = (res1 != 0);
3560 break;
3561 // XXX handle invalid value
3562 }
3563 #ifdef TRACE_INTERPRETER
3564 ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
3565 cond, attrId >> 16,
3566 attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
3567 #endif
3568 if (res)
3569 TprogramCounter = brancher(theInstruction, TprogramCounter);
3570 else
3571 {
3572 Uint32 tmp = ((step + 3) >> 2) + 1;
3573 TprogramCounter += tmp;
3574 }
3575 break;
3576 }
3577
3578 case Interpreter::BRANCH_ATTR_EQ_NULL:{
3579 jam();
3580 Uint32 ins2= TcurrentProgram[TprogramCounter];
3581 Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3582
3583 if (tmpHabitant != attrId){
3584 Int32 TnoDataR= readAttributes(req_struct,
3585 &attrId, 1,
3586 tmpArea, tmpAreaSz,
3587 false);
3588
3589 if (TnoDataR < 0) {
3590 jam();
3591 terrorCode = Uint32(-TnoDataR);
3592 tupkeyErrorLab(req_struct);
3593 return -1;
3594 }
3595 tmpHabitant= attrId;
3596 }
3597
3598 AttributeHeader ah(tmpArea[0]);
3599 if (ah.isNULL()){
3600 TprogramCounter= brancher(theInstruction, TprogramCounter);
3601 } else {
3602 TprogramCounter ++;
3603 }
3604 break;
3605 }
3606
3607 case Interpreter::BRANCH_ATTR_NE_NULL:{
3608 jam();
3609 Uint32 ins2= TcurrentProgram[TprogramCounter];
3610 Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3611
3612 if (tmpHabitant != attrId){
3613 Int32 TnoDataR= readAttributes(req_struct,
3614 &attrId, 1,
3615 tmpArea, tmpAreaSz,
3616 false);
3617
3618 if (TnoDataR < 0) {
3619 jam();
3620 terrorCode = Uint32(-TnoDataR);
3621 tupkeyErrorLab(req_struct);
3622 return -1;
3623 }
3624 tmpHabitant= attrId;
3625 }
3626
3627 AttributeHeader ah(tmpArea[0]);
3628 if (ah.isNULL()){
3629 TprogramCounter ++;
3630 } else {
3631 TprogramCounter= brancher(theInstruction, TprogramCounter);
3632 }
3633 break;
3634 }
3635
3636 case Interpreter::EXIT_OK:
3637 jam();
3638 #ifdef TRACE_INTERPRETER
3639 ndbout_c(" - exit_ok");
3640 #endif
3641 return TdataWritten;
3642
3643 case Interpreter::EXIT_OK_LAST:
3644 jam();
3645 #ifdef TRACE_INTERPRETER
3646 ndbout_c(" - exit_ok_last");
3647 #endif
3648 req_struct->last_row= true;
3649 return TdataWritten;
3650
3651 case Interpreter::EXIT_REFUSE:
3652 jam();
3653 #ifdef TRACE_INTERPRETER
3654 ndbout_c(" - exit_nok");
3655 #endif
3656 terrorCode= theInstruction >> 16;
3657 return TUPKEY_abort(req_struct, 29);
3658
3659 case Interpreter::CALL:
3660 jam();
3661 #ifdef TRACE_INTERPRETER
3662 ndbout_c(" - call addr=%u, subroutine len=%u ret addr=%u",
3663 theInstruction >> 16, TsubroutineLen, TprogramCounter);
3664 #endif
3665 RstackPtr++;
3666 if (RstackPtr < 32) {
3667 TstackMemBuffer[RstackPtr]= TprogramCounter;
3668 TprogramCounter= theInstruction >> 16;
3669 if (TprogramCounter < TsubroutineLen) {
3670 TcurrentProgram= subroutineProg;
3671 TcurrentSize= TsubroutineLen;
3672 } else {
3673 return TUPKEY_abort(req_struct, 30);
3674 }
3675 } else {
3676 return TUPKEY_abort(req_struct, 31);
3677 }
3678 break;
3679
3680 case Interpreter::RETURN:
3681 jam();
3682 #ifdef TRACE_INTERPRETER
3683 ndbout_c(" - return to %u from stack level %u",
3684 TstackMemBuffer[RstackPtr],
3685 RstackPtr);
3686 #endif
3687 if (RstackPtr > 0) {
3688 TprogramCounter= TstackMemBuffer[RstackPtr];
3689 RstackPtr--;
3690 if (RstackPtr == 0) {
3691 jam();
3692 /* ------------------------------------------------------------- */
3693 // We are back to the main program.
3694 /* ------------------------------------------------------------- */
3695 TcurrentProgram= mainProgram;
3696 TcurrentSize= TmainProgLen;
3697 }
3698 } else {
3699 return TUPKEY_abort(req_struct, 32);
3700 }
3701 break;
3702
3703 default:
3704 return TUPKEY_abort(req_struct, 33);
3705 }
3706 } else {
3707 return TUPKEY_abort(req_struct, 34);
3708 }
3709 }
3710 return TUPKEY_abort(req_struct, 35);
3711 }
3712
3713 /**
3714 * expand_var_part - copy packed variable attributes to fully expanded size
3715 *
3716 * dst: where to start writing attribute data
3717 * dst_off_ptr where to write attribute offsets
3718 * src pointer to packed attributes
3719 * tabDesc array of attribute descriptors (used for getting max size)
3720 * no_of_attr no of atributes to expand
3721 */
3722 static
3723 Uint32*
expand_var_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,const Uint32 * tabDesc,const Uint16 * order)3724 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
3725 const Uint32* src,
3726 const Uint32 * tabDesc,
3727 const Uint16* order)
3728 {
3729 char* dst_ptr= dst->m_data_ptr;
3730 Uint32 no_attr= dst->m_var_len_offset;
3731 Uint16* dst_off_ptr= dst->m_offset_array_ptr;
3732 Uint16* dst_len_ptr= dst_off_ptr + no_attr;
3733 const Uint16* src_off_ptr= (const Uint16*)src;
3734 const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
3735
3736 Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
3737 for(Uint32 i = 0; i<no_attr; i++)
3738 {
3739 next_pos= *src_off_ptr++;
3740 len= next_pos - tmp;
3741
3742 *dst_off_ptr++ = dst_off;
3743 *dst_len_ptr++ = dst_off + len;
3744 memcpy(dst_ptr, src_ptr, len);
3745 src_ptr += len;
3746
3747 max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
3748 dst_ptr += max_len; // Max size
3749 dst_off += max_len;
3750
3751 tmp= next_pos;
3752 }
3753
3754 return ALIGN_WORD(dst_ptr);
3755 }
3756
3757 void
expand_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],Tuple_header * src,const Tablerec * tabPtrP,bool disk)3758 Dbtup::expand_tuple(KeyReqStruct* req_struct,
3759 Uint32 sizes[2],
3760 Tuple_header* src,
3761 const Tablerec* tabPtrP,
3762 bool disk)
3763 {
3764 Uint32 bits= src->m_header_bits;
3765 Uint32 extra_bits = bits;
3766 Tuple_header* ptr= req_struct->m_tuple_ptr;
3767
3768 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3769 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3770 Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3771 Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3772 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3773 Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
3774 Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3775
3776 Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3777 const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
3778 const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
3779 const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
3780 const Uint32 *desc= (Uint32*)req_struct->attr_descr;
3781 const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3782 order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
3783
3784 // Copy fix part
3785 sizes[MM]= 1;
3786 memcpy(ptr, src, 4*fix_size);
3787 if(mm_vars || mm_dyns)
3788 {
3789 /*
3790 * Reserve place for initial length word and offset array (with one extra
3791 * offset). This will be filled-in in later, in shrink_tuple().
3792 */
3793 dst_ptr += Varpart_copy::SZ32;
3794
3795 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3796 Uint32 step; // in bytes
3797 Uint32 src_len;
3798 const Uint32 *src_data;
3799 if (bits & Tuple_header::VAR_PART)
3800 {
3801 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3802 if(! (bits & Tuple_header::COPY_TUPLE))
3803 {
3804 /* This is for the initial expansion of a stored row. */
3805 Ptr<Page> var_page;
3806 src_data= get_ptr(&var_page, *var_ref);
3807 src_len= get_len(&var_page, *var_ref);
3808 sizes[MM]= src_len;
3809 step= 0;
3810 req_struct->m_varpart_page_ptr = var_page;
3811
3812 /* An original tuple cant have grown as we're expanding it...
3813 * else we would be "re-expand"*/
3814 ndbassert(! (bits & Tuple_header::MM_GROWN));
3815 }
3816 else
3817 {
3818 /* This is for the re-expansion of a shrunken row (update2 ...) */
3819
3820 Varpart_copy* vp = (Varpart_copy*)src_ptr;
3821 src_len = vp->m_len;
3822 src_data= vp->m_data;
3823 step= (Varpart_copy::SZ32 + src_len); // 1+ is for extra word
3824 req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
3825 sizes[MM]= src_len;
3826 }
3827
3828 if (mm_vars)
3829 {
3830 dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
3831 dst->m_offset_array_ptr= req_struct->var_pos_array;
3832 dst->m_var_len_offset= mm_vars;
3833 dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
3834
3835 dst_ptr= expand_var_part(dst, src_data, desc, order);
3836 ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
3837 /**
3838 * Move to end of fix varpart
3839 */
3840 char* varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3841 Uint32 varlen = ((Uint16*)src_data)[mm_vars];
3842 Uint32 *dynstart = ALIGN_WORD(varstart + varlen);
3843
3844 ndbassert(src_len >= (dynstart - src_data));
3845 src_len -= Uint32(dynstart - src_data);
3846 src_data = dynstart;
3847 }
3848 }
3849 else
3850 {
3851 /**
3852 * No varpart...only allowed for dynattr
3853 */
3854 ndbassert(mm_vars == 0);
3855 src_len = step = sizes[MM] = 0;
3856 src_data = 0;
3857 }
3858
3859 if (mm_dyns)
3860 {
3861 /**
3862 * dynattr needs to be expanded even if no varpart existed before
3863 */
3864 dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
3865 dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
3866 dst->m_max_dyn_offset= tabPtrP->m_offsets[MM].m_max_dyn_offset;
3867 dst->m_dyn_data_ptr= (char*)dst_ptr;
3868 dst_ptr= expand_dyn_part(dst, src_data,
3869 src_len,
3870 desc, order + mm_vars,
3871 mm_dynvar, mm_dynfix,
3872 tabPtrP->m_offsets[MM].m_dyn_null_words);
3873 }
3874
3875 ndbassert((UintPtr(src_ptr) & 3) == 0);
3876 src_ptr = src_ptr + step;
3877 }
3878
3879 src->m_header_bits= bits &
3880 ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
3881
3882 /**
3883 * The source tuple only touches the header parts. The updates of the
3884 * tuple is applied on the new copy tuple. We still need to ensure that
3885 * the checksum is correct on the tuple even after changing the header
3886 * parts since the header is part of the checksum. This is not covered
3887 * by setting checksum normally since mostly we don't touch the
3888 * original tuple.
3889 */
3890 updateChecksum(src, tabPtrP, bits, src->m_header_bits);
3891
3892 sizes[DD]= 0;
3893 if(disk && dd_tot)
3894 {
3895 const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3896 order+= mm_vars+mm_dynvar+mm_dynfix;
3897
3898 if(bits & Tuple_header::DISK_INLINE)
3899 {
3900 // Only on copy tuple
3901 ndbassert(bits & Tuple_header::COPY_TUPLE);
3902 }
3903 else
3904 {
3905 Local_key key;
3906 memcpy(&key, disk_ref, sizeof(key));
3907 key.m_page_no= req_struct->m_disk_page_ptr.i;
3908 src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3909 }
3910 extra_bits |= Tuple_header::DISK_INLINE;
3911
3912 // Fix diskpart
3913 req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
3914 memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3915 sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3916
3917 ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3918
3919 ndbrequire(dd_vars == 0);
3920 }
3921
3922 ptr->m_header_bits= (extra_bits | Tuple_header::COPY_TUPLE);
3923 req_struct->is_expanded= true;
3924 }
3925
3926 void
dump_tuple(const KeyReqStruct * req_struct,const Tablerec * tabPtrP)3927 Dbtup::dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP)
3928 {
3929 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3930 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3931 //Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3932 const Tuple_header* ptr= req_struct->m_tuple_ptr;
3933 Uint32 bits= ptr->m_header_bits;
3934 const Uint32 *tuple_words= (Uint32 *)ptr;
3935 const Uint32 *fix_p;
3936 Uint32 fix_len;
3937 const Uint32 *var_p;
3938 Uint32 var_len;
3939 //const Uint32 *disk_p;
3940 //Uint32 disk_len;
3941 const char *typ;
3942
3943 fix_p= tuple_words;
3944 fix_len= tabPtrP->m_offsets[MM].m_fix_header_size;
3945 if(req_struct->is_expanded)
3946 {
3947 typ= "expanded";
3948 var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3949 var_len= 0; // No dump of varpart in expanded
3950 #if 0
3951 disk_p= (Uint32 *)req_struct->m_disk_ptr;
3952 disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3953 #endif
3954 }
3955 else if(! (bits & Tuple_header::COPY_TUPLE))
3956 {
3957 typ= "stored";
3958 if(mm_vars+mm_dyns)
3959 {
3960 //const KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3961 const Var_part_ref *varref= ptr->get_var_part_ref_ptr(tabPtrP);
3962 Ptr<Page> tmp;
3963 var_p= get_ptr(&tmp, * varref);
3964 var_len= get_len(&tmp, * varref);
3965 }
3966 else
3967 {
3968 var_p= 0;
3969 var_len= 0;
3970 }
3971 #if 0
3972 if(dd_tot)
3973 {
3974 Local_key key;
3975 memcpy(&key, ptr->get_disk_ref_ptr(tabPtrP), sizeof(key));
3976 key.m_page_no= req_struct->m_disk_page_ptr.i;
3977 disk_p= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3978 disk_len= tabPtrP->m_offsets[DD].m_fix_header_size;
3979 }
3980 else
3981 {
3982 disk_p= var_p;
3983 disk_len= 0;
3984 }
3985 #endif
3986 }
3987 else
3988 {
3989 typ= "shrunken";
3990 if(mm_vars+mm_dyns)
3991 {
3992 var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3993 var_len= *((Uint16 *)var_p) + 1;
3994 }
3995 else
3996 {
3997 var_p= 0;
3998 var_len= 0;
3999 }
4000 #if 0
4001 disk_p= (Uint32 *)(req_struct->m_disk_ptr);
4002 disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
4003 #endif
4004 }
4005 ndbout_c("Fixed part[%s](%p len=%u words)",typ, fix_p, fix_len);
4006 dump_hex(fix_p, fix_len);
4007 ndbout_c("Varpart part[%s](%p len=%u words)", typ , var_p, var_len);
4008 dump_hex(var_p, var_len);
4009 #if 0
4010 ndbout_c("Disk part[%s](%p len=%u words)", typ, disk_p, disk_len);
4011 dump_hex(disk_p, disk_len);
4012 #endif
4013 }
4014
4015 void
prepare_read(KeyReqStruct * req_struct,Tablerec * tabPtrP,bool disk)4016 Dbtup::prepare_read(KeyReqStruct* req_struct,
4017 Tablerec* tabPtrP, bool disk)
4018 {
4019 Tuple_header* ptr= req_struct->m_tuple_ptr;
4020
4021 Uint32 bits= ptr->m_header_bits;
4022 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
4023 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
4024 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
4025
4026 const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
4027 const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
4028 const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
4029 if(mm_vars || mm_dyns)
4030 {
4031 const Uint32 *src_data= src_ptr;
4032 Uint32 src_len;
4033 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
4034 if (bits & Tuple_header::VAR_PART)
4035 {
4036 if(! (bits & Tuple_header::COPY_TUPLE))
4037 {
4038 Ptr<Page> tmp;
4039 src_data= get_ptr(&tmp, * var_ref);
4040 src_len= get_len(&tmp, * var_ref);
4041
4042 /* If the original tuple was grown,
4043 * the old size is stored at the end. */
4044 if(bits & Tuple_header::MM_GROWN)
4045 {
4046 /**
4047 * This is when triggers read before value of update
4048 * when original has been reallocated due to grow
4049 */
4050 ndbassert(src_len>0);
4051 src_len= src_data[src_len-1];
4052 }
4053 }
4054 else
4055 {
4056 Varpart_copy* vp = (Varpart_copy*)src_ptr;
4057 src_len = vp->m_len;
4058 src_data = vp->m_data;
4059 src_ptr++;
4060 }
4061
4062 char* varstart;
4063 Uint32 varlen;
4064 const Uint32* dynstart;
4065 if (mm_vars)
4066 {
4067 varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
4068 varlen = ((Uint16*)src_data)[mm_vars];
4069 dynstart = ALIGN_WORD(varstart + varlen);
4070 }
4071 else
4072 {
4073 varstart = 0;
4074 varlen = 0;
4075 dynstart = src_data;
4076 }
4077
4078 dst->m_data_ptr= varstart;
4079 dst->m_offset_array_ptr= (Uint16*)src_data;
4080 dst->m_var_len_offset= 1;
4081 dst->m_max_var_offset= varlen;
4082
4083 Uint32 dynlen = Uint32(src_len - (dynstart - src_data));
4084 ndbassert(src_len >= (dynstart - src_data));
4085 dst->m_dyn_data_ptr= (char*)dynstart;
4086 dst->m_dyn_part_len= dynlen;
4087 // Do or not to to do
4088 // dst->m_dyn_offset_arr_ptr = dynlen ? (Uint16*)(dynstart + *(Uint8*)dynstart) : 0;
4089
4090 /*
4091 dst->m_dyn_offset_arr_ptr and dst->m_dyn_len_offset are not used for
4092 reading the stored/shrunken format.
4093 */
4094 }
4095 else
4096 {
4097 src_len = 0;
4098 dst->m_max_var_offset = 0;
4099 dst->m_dyn_part_len = 0;
4100 #if defined VM_TRACE || defined ERROR_INSERT
4101 bzero(dst, sizeof(* dst));
4102 #endif
4103 }
4104
4105 // disk part start after dynamic part.
4106 src_ptr+= src_len;
4107 }
4108
4109 if(disk && dd_tot)
4110 {
4111 const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
4112
4113 if(bits & Tuple_header::DISK_INLINE)
4114 {
4115 // Only on copy tuple
4116 ndbassert(bits & Tuple_header::COPY_TUPLE);
4117 }
4118 else
4119 {
4120 // XXX
4121 Local_key key;
4122 memcpy(&key, disk_ref, sizeof(key));
4123 key.m_page_no= req_struct->m_disk_page_ptr.i;
4124 src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
4125 }
4126 // Fix diskpart
4127 req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
4128 ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
4129 ndbrequire(dd_vars == 0);
4130 }
4131
4132 req_struct->is_expanded= false;
4133 }
4134
4135 void
shrink_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],const Tablerec * tabPtrP,bool disk)4136 Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
4137 const Tablerec* tabPtrP, bool disk)
4138 {
4139 ndbassert(tabPtrP->need_shrink());
4140 Tuple_header* ptr= req_struct->m_tuple_ptr;
4141 ndbassert(ptr->m_header_bits & Tuple_header::COPY_TUPLE);
4142
4143 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
4144 Uint32 order_desc= tabPtrP->m_real_order_descriptor;
4145 const Uint32 * tabDesc= (Uint32*)req_struct->attr_descr;
4146 const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
4147 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
4148 Uint16 mm_fix= tabPtrP->m_attributes[MM].m_no_of_fixsize;
4149 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
4150 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
4151 Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
4152 Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
4153 Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
4154
4155 Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
4156 Uint16* src_off_ptr= req_struct->var_pos_array;
4157 order += mm_fix;
4158
4159 sizes[MM] = 1;
4160 sizes[DD] = 0;
4161 if(mm_vars || mm_dyns)
4162 {
4163 Varpart_copy* vp = (Varpart_copy*)dst_ptr;
4164 Uint32* varstart = dst_ptr = vp->m_data;
4165
4166 if (mm_vars)
4167 {
4168 Uint16* dst_off_ptr= (Uint16*)dst_ptr;
4169 char* dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
4170 char* src_data_ptr= dst_data_ptr;
4171 Uint32 off= 0;
4172 for(Uint32 i= 0; i<mm_vars; i++)
4173 {
4174 const char* data_ptr= src_data_ptr + *src_off_ptr;
4175 Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
4176 * dst_off_ptr++= off;
4177 memmove(dst_data_ptr, data_ptr, len);
4178 off += len;
4179 src_off_ptr++;
4180 dst_data_ptr += len;
4181 }
4182 *dst_off_ptr= off;
4183 dst_ptr = ALIGN_WORD(dst_data_ptr);
4184 order += mm_vars; // Point to first dynfix entry
4185 }
4186
4187 if (mm_dyns)
4188 {
4189 dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
4190 order, mm_dynvar, mm_dynfix, MM);
4191 ndbassert((char*)dst_ptr <= ((char*)ptr) + 8192);
4192 order += mm_dynfix + mm_dynvar;
4193 }
4194
4195 Uint32 varpart_len= Uint32(dst_ptr - varstart);
4196 vp->m_len = varpart_len;
4197 sizes[MM] = varpart_len;
4198 ptr->m_header_bits |= (varpart_len) ? Tuple_header::VAR_PART : 0;
4199
4200 ndbassert((UintPtr(ptr) & 3) == 0);
4201 ndbassert(varpart_len < 0x10000);
4202 }
4203
4204 if(disk && dd_tot)
4205 {
4206 Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
4207 req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
4208 ndbrequire(dd_vars == 0);
4209 sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
4210 memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
4211 }
4212
4213 req_struct->is_expanded= false;
4214
4215 }
4216
4217 void
validate_page(Tablerec * regTabPtr,Var_page * p)4218 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
4219 {
4220 /* ToDo: We could also do some checks here for any dynamic part. */
4221 Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
4222 Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
4223 Tuple_header::HeaderSize;
4224
4225 if(mm_vars == 0)
4226 return;
4227
4228 for(Uint32 F= 0; F<NDB_ARRAY_SIZE(regTabPtr->fragrec); F++)
4229 {
4230 FragrecordPtr fragPtr;
4231
4232 if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
4233 continue;
4234
4235 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4236 for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
4237 {
4238 Uint32 real= getRealpid(fragPtr.p, P);
4239 Var_page* page= (Var_page*)c_page_pool.getPtr(real);
4240
4241 for(Uint32 i=1; i<page->high_index; i++)
4242 {
4243 Uint32 idx= page->get_index_word(i);
4244 Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
4245 if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
4246 {
4247 Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
4248 Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
4249 if(! (ptr->m_header_bits & Tuple_header::COPY_TUPLE))
4250 {
4251 ndbrequire(len == fix_sz + 1);
4252 Local_key tmp; tmp.assref(*part);
4253 Ptr<Page> tmpPage;
4254 part= get_ptr(&tmpPage, *(Var_part_ref*)part);
4255 len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
4256 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
4257 ndbrequire(len >= ((sz + 3) >> 2));
4258 }
4259 else
4260 {
4261 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
4262 ndbrequire(len >= ((sz+3)>>2)+fix_sz);
4263 }
4264 if(ptr->m_operation_ptr_i != RNIL)
4265 {
4266 c_operation_pool.getPtr(ptr->m_operation_ptr_i);
4267 }
4268 }
4269 else if(!(idx & Var_page::FREE))
4270 {
4271 /**
4272 * Chain
4273 */
4274 Uint32 *part= page->get_ptr(i);
4275 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
4276 ndbrequire(len >= ((sz + 3) >> 2));
4277 }
4278 else
4279 {
4280
4281 }
4282 }
4283 if(p == 0 && page->high_index > 1)
4284 page->reorg((Var_page*)ctemp_page);
4285 }
4286 }
4287
4288 if(p == 0)
4289 {
4290 validate_page(regTabPtr, (Var_page*)1);
4291 }
4292 }
4293
4294 int
handle_size_change_after_update(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Uint32 sizes[4])4295 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
4296 Tuple_header* org,
4297 Operationrec* regOperPtr,
4298 Fragrecord* regFragPtr,
4299 Tablerec* regTabPtr,
4300 Uint32 sizes[4])
4301 {
4302 ndbrequire(sizes[1] == sizes[3]);
4303 //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
4304 if(0)
4305 printf("%p %d %d - handle_size_change_after_update ",
4306 req_struct->m_tuple_ptr,
4307 regOperPtr->m_tuple_location.m_page_no,
4308 regOperPtr->m_tuple_location.m_page_idx);
4309
4310 Uint32 bits= org->m_header_bits;
4311 Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
4312
4313 if(sizes[2+MM] == sizes[MM])
4314 ;
4315 else if(sizes[2+MM] < sizes[MM])
4316 {
4317 if(0) ndbout_c("shrink");
4318 req_struct->m_tuple_ptr->m_header_bits= copy_bits|Tuple_header::MM_SHRINK;
4319 }
4320 else
4321 {
4322 if(0) printf("grow - ");
4323 Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
4324 Var_page* pageP= (Var_page*)pagePtr.p;
4325 Var_part_ref *refptr= org->get_var_part_ref_ptr(regTabPtr);
4326 ndbassert(! (bits & Tuple_header::COPY_TUPLE));
4327
4328 Local_key ref;
4329 refptr->copyout(&ref);
4330 Uint32 alloc;
4331 Uint32 idx= ref.m_page_idx;
4332 if (bits & Tuple_header::VAR_PART)
4333 {
4334 if (copy_bits & Tuple_header::COPY_TUPLE)
4335 {
4336 c_page_pool.getPtr(pagePtr, ref.m_page_no);
4337 pageP = (Var_page*)pagePtr.p;
4338 }
4339 alloc = pageP->get_entry_len(idx);
4340 }
4341 else
4342 {
4343 alloc = 0;
4344 }
4345 Uint32 orig_size= alloc;
4346 if(bits & Tuple_header::MM_GROWN)
4347 {
4348 /* Was grown before, so must fetch real original size from last word. */
4349 Uint32 *old_var_part= pageP->get_ptr(idx);
4350 ndbassert(alloc>0);
4351 orig_size= old_var_part[alloc-1];
4352 }
4353
4354 if (alloc)
4355 {
4356 #ifdef VM_TRACE
4357 if(!pageP->get_entry_chain(idx))
4358 ndbout << *pageP << endl;
4359 #endif
4360 ndbassert(pageP->get_entry_chain(idx));
4361 }
4362
4363 Uint32 needed= sizes[2+MM];
4364
4365 if(needed <= alloc)
4366 {
4367 //ndbassert(!regOperPtr->is_first_operation());
4368 if (0) ndbout_c(" no grow");
4369 return 0;
4370 }
4371 Uint32 *new_var_part=realloc_var_part(&terrorCode,
4372 regFragPtr, regTabPtr, pagePtr,
4373 refptr, alloc, needed);
4374 if (unlikely(new_var_part==NULL))
4375 return -1;
4376 /* Mark the tuple grown, store the original length at the end. */
4377 org->m_header_bits= bits | Tuple_header::MM_GROWN | Tuple_header::VAR_PART;
4378 new_var_part[needed-1]= orig_size;
4379
4380 /**
4381 * Here we can change both header bits and the reference to the varpart,
4382 * this means that we need to completely recalculate the checksum here.
4383 */
4384 setChecksum(org, regTabPtr);
4385 }
4386 return 0;
4387 }
4388
4389 int
optimize_var_part(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)4390 Dbtup::optimize_var_part(KeyReqStruct* req_struct,
4391 Tuple_header* org,
4392 Operationrec* regOperPtr,
4393 Fragrecord* regFragPtr,
4394 Tablerec* regTabPtr)
4395 {
4396 jam();
4397 Var_part_ref* refptr = org->get_var_part_ref_ptr(regTabPtr);
4398
4399 Local_key ref;
4400 refptr->copyout(&ref);
4401 Uint32 idx = ref.m_page_idx;
4402
4403 Ptr<Page> pagePtr;
4404 c_page_pool.getPtr(pagePtr, ref.m_page_no);
4405
4406 Var_page* pageP = (Var_page*)pagePtr.p;
4407 Uint32 var_part_size = pageP->get_entry_len(idx);
4408
4409 /**
4410 * if the size of page list_index is MAX_FREE_LIST,
4411 * we think it as full page, then need not optimize
4412 */
4413 if(pageP->list_index != MAX_FREE_LIST)
4414 {
4415 jam();
4416 /*
4417 * optimize var part of tuple by moving varpart,
4418 * then we possibly reclaim free pages
4419 */
4420 move_var_part(regFragPtr, regTabPtr, pagePtr,
4421 refptr, var_part_size);
4422 setChecksum(org, regTabPtr);
4423 }
4424
4425 return 0;
4426 }
4427
4428 int
nr_update_gci(Uint32 fragPtrI,const Local_key * key,Uint32 gci)4429 Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4430 {
4431 FragrecordPtr fragPtr;
4432 fragPtr.i= fragPtrI;
4433 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4434 TablerecPtr tablePtr;
4435 tablePtr.i= fragPtr.p->fragTableId;
4436 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4437
4438 if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4439 {
4440 Local_key tmp = *key;
4441 PagePtr pagePtr;
4442
4443 pagePtr.i = getRealpidCheck(fragPtr.p, tmp.m_page_no);
4444 if (unlikely(pagePtr.i == RNIL))
4445 {
4446 jam();
4447 return 0;
4448 }
4449
4450 c_page_pool.getPtr(pagePtr);
4451
4452 Tuple_header* ptr = (Tuple_header*)
4453 ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
4454
4455 ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
4456 *ptr->get_mm_gci(tablePtr.p) = gci;
4457 }
4458 return 0;
4459 }
4460
4461 int
nr_read_pk(Uint32 fragPtrI,const Local_key * key,Uint32 * dst,bool & copy)4462 Dbtup::nr_read_pk(Uint32 fragPtrI,
4463 const Local_key* key, Uint32* dst, bool& copy)
4464 {
4465
4466 FragrecordPtr fragPtr;
4467 fragPtr.i= fragPtrI;
4468 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4469 TablerecPtr tablePtr;
4470 tablePtr.i= fragPtr.p->fragTableId;
4471 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4472
4473 Local_key tmp = *key;
4474
4475 PagePtr pagePtr;
4476 pagePtr.i = getRealpidCheck(fragPtr.p, tmp.m_page_no);
4477 if (unlikely(pagePtr.i == RNIL))
4478 {
4479 jam();
4480 dst[0] = 0;
4481 return 0;
4482 }
4483
4484 c_page_pool.getPtr(pagePtr);
4485 KeyReqStruct req_struct(this);
4486 Uint32* ptr= ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
4487
4488 req_struct.m_page_ptr = pagePtr;
4489 req_struct.m_tuple_ptr = (Tuple_header*)ptr;
4490 Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
4491
4492 int ret = 0;
4493 copy = false;
4494 if (! (bits & Tuple_header::FREE))
4495 {
4496 if (bits & Tuple_header::ALLOC)
4497 {
4498 Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
4499 Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
4500 ndbassert(!opPtrP->m_copy_tuple_location.isNull());
4501 req_struct.m_tuple_ptr=
4502 get_copy_tuple(&opPtrP->m_copy_tuple_location);
4503 copy = true;
4504 }
4505 req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
4506 req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
4507
4508 Uint32 num_attr= tablePtr.p->m_no_of_attributes;
4509 Uint32 descr_start= tablePtr.p->tabDescriptor;
4510 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
4511 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
4512 req_struct.attr_descr= tab_descr;
4513
4514 if (tablePtr.p->need_expand())
4515 prepare_read(&req_struct, tablePtr.p, false);
4516
4517 const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
4518 const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
4519 // read pk attributes from original tuple
4520
4521 req_struct.tablePtrP = tablePtr.p;
4522 req_struct.fragPtrP = fragPtr.p;
4523
4524 // do it
4525 ret = readAttributes(&req_struct,
4526 attrIds,
4527 numAttrs,
4528 dst,
4529 ZNIL, false);
4530
4531 // done
4532 if (likely(ret >= 0)) {
4533 // remove headers
4534 Uint32 n= 0;
4535 Uint32 i= 0;
4536 while (n < numAttrs) {
4537 const AttributeHeader ah(dst[i]);
4538 Uint32 size= ah.getDataSize();
4539 ndbrequire(size != 0);
4540 for (Uint32 j= 0; j < size; j++) {
4541 dst[i + j - n]= dst[i + j + 1];
4542 }
4543 n+= 1;
4544 i+= 1 + size;
4545 }
4546 ndbrequire((int)i == ret);
4547 ret -= numAttrs;
4548 } else {
4549 return ret;
4550 }
4551 }
4552
4553 if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4554 {
4555 dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
4556 }
4557 else
4558 {
4559 dst[ret] = 0;
4560 }
4561 return ret;
4562 }
4563
4564 int
nr_delete(Signal * signal,Uint32 senderData,Uint32 fragPtrI,const Local_key * key,Uint32 gci)4565 Dbtup::nr_delete(Signal* signal, Uint32 senderData,
4566 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4567 {
4568 FragrecordPtr fragPtr;
4569 fragPtr.i= fragPtrI;
4570 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4571 TablerecPtr tablePtr;
4572 tablePtr.i= fragPtr.p->fragTableId;
4573 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4574
4575 Local_key tmp = * key;
4576 tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no);
4577
4578 PagePtr pagePtr;
4579 Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
4580
4581 if (!tablePtr.p->tuxCustomTriggers.isEmpty())
4582 {
4583 jam();
4584 TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
4585 req->tableId = fragPtr.p->fragTableId;
4586 req->fragId = fragPtr.p->fragmentId;
4587 req->pageId = tmp.m_page_no;
4588 req->pageIndex = tmp.m_page_idx;
4589 req->tupVersion = ptr->get_tuple_version();
4590 req->opInfo = TuxMaintReq::OpRemove;
4591 removeTuxEntries(signal, tablePtr.p);
4592 }
4593
4594 Local_key disk;
4595 memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
4596
4597 if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
4598 tablePtr.p->m_attributes[MM].m_no_of_dynamic)
4599 {
4600 jam();
4601 free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
4602 } else {
4603 jam();
4604 free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
4605 }
4606
4607 if (tablePtr.p->m_no_of_disk_attributes)
4608 {
4609 jam();
4610
4611 Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4612 tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4613
4614 D("Logfile_client - nr_delete");
4615 Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4616 int res = lgman.alloc_log_space(sz, jamBuffer());
4617 ndbrequire(res == 0);
4618
4619 /**
4620 * 1) alloc log buffer
4621 * 2) get page
4622 * 3) get log buffer
4623 * 4) delete tuple
4624 */
4625 Page_cache_client::Request preq;
4626 preq.m_page = disk;
4627 preq.m_callback.m_callbackData = senderData;
4628 preq.m_callback.m_callbackFunction =
4629 safe_cast(&Dbtup::nr_delete_page_callback);
4630 int flags = Page_cache_client::COMMIT_REQ;
4631
4632 #ifdef ERROR_INSERT
4633 if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
4634 {
4635 int rnd = rand() % 100;
4636 int slp = 0;
4637 if (ERROR_INSERTED(4024))
4638 {
4639 slp = 3000;
4640 }
4641 else if (rnd > 90)
4642 {
4643 slp = 3000;
4644 }
4645 else if (rnd > 70)
4646 {
4647 slp = 100;
4648 }
4649
4650 ndbout_c("rnd: %d slp: %d", rnd, slp);
4651
4652 if (slp)
4653 {
4654 flags |= Page_cache_client::DELAY_REQ;
4655 const NDB_TICKS now = NdbTick_getCurrentTicks();
4656 preq.m_delay_until_time = NdbTick_AddMilliseconds(now,(Uint64)slp);
4657 }
4658 }
4659 #endif
4660 Ptr<GlobalPage> diskPagePtr;
4661 Page_cache_client pgman(this, c_pgman);
4662 res = pgman.get_page(signal, preq, flags);
4663 diskPagePtr = pgman.m_ptr;
4664 if (res == 0)
4665 {
4666 goto timeslice;
4667 }
4668 else if (unlikely(res == -1))
4669 {
4670 return -1;
4671 }
4672
4673 PagePtr disk_page((Tup_page*)diskPagePtr.p, diskPagePtr.i);
4674 disk_page_set_dirty(disk_page);
4675
4676 CallbackPtr cptr;
4677 cptr.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4678 cptr.m_callbackData = senderData;
4679 res= lgman.get_log_buffer(signal, sz, &cptr);
4680 switch(res){
4681 case 0:
4682 signal->theData[2] = disk_page.i;
4683 goto timeslice;
4684 case -1:
4685 ndbrequire("NOT YET IMPLEMENTED" == 0);
4686 break;
4687 }
4688
4689 if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
4690 disk_page_free(signal, tablePtr.p, fragPtr.p,
4691 &disk, *(PagePtr*)&disk_page, gci);
4692 return 0;
4693 }
4694
4695 return 0;
4696
4697 timeslice:
4698 memcpy(signal->theData, &disk, sizeof(disk));
4699 return 1;
4700 }
4701
4702 void
nr_delete_page_callback(Signal * signal,Uint32 userpointer,Uint32 page_id)4703 Dbtup::nr_delete_page_callback(Signal* signal,
4704 Uint32 userpointer, Uint32 page_id)//unused
4705 {
4706 Ptr<GlobalPage> gpage;
4707 m_global_page_pool.getPtr(gpage, page_id);
4708 PagePtr pagePtr((Tup_page*)gpage.p, gpage.i);
4709 disk_page_set_dirty(pagePtr);
4710 Dblqh::Nr_op_info op;
4711 op.m_ptr_i = userpointer;
4712 op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
4713 op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
4714 c_lqh->get_nr_op_info(&op, page_id);
4715
4716 Ptr<Fragrecord> fragPtr;
4717 fragPtr.i= op.m_tup_frag_ptr_i;
4718 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4719
4720 Ptr<Tablerec> tablePtr;
4721 tablePtr.i = fragPtr.p->fragTableId;
4722 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4723
4724 Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4725 tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4726
4727 CallbackPtr cb;
4728 cb.m_callbackData = userpointer;
4729 cb.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4730 D("Logfile_client - nr_delete_page_callback");
4731 Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4732 int res= lgman.get_log_buffer(signal, sz, &cb);
4733 switch(res){
4734 case 0:
4735 jam();
4736 return;
4737 case -1:
4738 ndbrequire("NOT YET IMPLEMENTED" == 0);
4739 break;
4740 }
4741 jam();
4742
4743 if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4744 disk_page_free(signal, tablePtr.p, fragPtr.p,
4745 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4746
4747 c_lqh->nr_delete_complete(signal, &op);
4748 return;
4749 }
4750
4751 void
nr_delete_log_buffer_callback(Signal * signal,Uint32 userpointer,Uint32 unused)4752 Dbtup::nr_delete_log_buffer_callback(Signal* signal,
4753 Uint32 userpointer,
4754 Uint32 unused)
4755 {
4756 Dblqh::Nr_op_info op;
4757 op.m_ptr_i = userpointer;
4758 c_lqh->get_nr_op_info(&op, RNIL);
4759
4760 Ptr<Fragrecord> fragPtr;
4761 fragPtr.i= op.m_tup_frag_ptr_i;
4762 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4763
4764 Ptr<Tablerec> tablePtr;
4765 tablePtr.i = fragPtr.p->fragTableId;
4766 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4767
4768 Ptr<GlobalPage> gpage;
4769 m_global_page_pool.getPtr(gpage, op.m_page_id);
4770 PagePtr pagePtr((Tup_page*)gpage.p, gpage.i);
4771
4772 /**
4773 * reset page no
4774 */
4775 if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4776 jam();
4777 disk_page_free(signal, tablePtr.p, fragPtr.p,
4778 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4779
4780 c_lqh->nr_delete_complete(signal, &op);
4781 }
4782