1 /*
2 Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #define DBTUP_C
27 #include <dblqh/Dblqh.hpp>
28 #include "Dbtup.hpp"
29 #include <RefConvert.hpp>
30 #include <ndb_limits.h>
31 #include <pc.hpp>
32 #include <AttributeDescriptor.hpp>
33 #include "AttributeOffset.hpp"
34 #include <AttributeHeader.hpp>
35 #include <Interpreter.hpp>
36 #include <signaldata/TupKey.hpp>
37 #include <signaldata/AttrInfo.hpp>
38 #include <NdbSqlUtil.hpp>
39
40 // #define TRACE_INTERPRETER
41
42 /* For debugging */
43 static void
dump_hex(const Uint32 * p,Uint32 len)44 dump_hex(const Uint32 *p, Uint32 len)
45 {
46 if(len > 2560)
47 len= 160;
48 if(len==0)
49 return;
50 for(;;)
51 {
52 if(len>=4)
53 ndbout_c("%8p %08X %08X %08X %08X", p, p[0], p[1], p[2], p[3]);
54 else if(len>=3)
55 ndbout_c("%8p %08X %08X %08X", p, p[0], p[1], p[2]);
56 else if(len>=2)
57 ndbout_c("%8p %08X %08X", p, p[0], p[1]);
58 else
59 ndbout_c("%8p %08X", p, p[0]);
60 if(len <= 4)
61 break;
62 len-= 4;
63 p+= 4;
64 }
65 }
66
67 /**
68 * getStoredProcAttrInfo
69 *
70 * Get the I-Val of the supplied stored procedure's
71 * AttrInfo section
72 * Initialise the AttrInfo length in the request
73 */
getStoredProcAttrInfo(Uint32 storedId,KeyReqStruct * req_struct,Uint32 & attrInfoIVal)74 int Dbtup::getStoredProcAttrInfo(Uint32 storedId,
75 KeyReqStruct* req_struct,
76 Uint32& attrInfoIVal)
77 {
78 jam();
79 StoredProcPtr storedPtr;
80 c_storedProcPool.getPtr(storedPtr, storedId);
81 if (storedPtr.i != RNIL) {
82 if ((storedPtr.p->storedCode == ZSCAN_PROCEDURE) ||
83 (storedPtr.p->storedCode == ZCOPY_PROCEDURE)) {
84 /* Setup OperationRec with stored procedure AttrInfo section */
85 SegmentedSectionPtr sectionPtr;
86 getSection(sectionPtr, storedPtr.p->storedProcIVal);
87 Uint32 storedProcLen= sectionPtr.sz;
88
89 ndbassert( attrInfoIVal == RNIL );
90 attrInfoIVal= storedPtr.p->storedProcIVal;
91 req_struct->attrinfo_len= storedProcLen;
92 return ZOK;
93 }
94 }
95 terrorCode= ZSTORED_PROC_ID_ERROR;
96 return terrorCode;
97 }
98
copyAttrinfo(Operationrec * regOperPtr,Uint32 * inBuffer,Uint32 expectedLen,Uint32 attrInfoIVal)99 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
100 Uint32* inBuffer,
101 Uint32 expectedLen,
102 Uint32 attrInfoIVal)
103 {
104 ndbassert( expectedLen > 0 || attrInfoIVal == RNIL );
105
106 if (expectedLen > 0)
107 {
108 ndbassert( attrInfoIVal != RNIL );
109
110 /* Check length in section is as we expect */
111 SegmentedSectionPtr sectionPtr;
112 getSection(sectionPtr, attrInfoIVal);
113
114 ndbrequire(sectionPtr.sz == expectedLen);
115 ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
116
117 /* Copy attrInfo data into linear buffer */
118 // TODO : Consider operating TUP out of first segment where
119 // appropriate
120 copy(inBuffer, attrInfoIVal);
121 }
122
123 regOperPtr->m_any_value= 0;
124
125 return;
126 }
127
128 void
setChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)129 Dbtup::setChecksum(Tuple_header* tuple_ptr,
130 Tablerec* regTabPtr)
131 {
132 tuple_ptr->m_checksum= 0;
133 tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
134 }
135
136 Uint32
calculateChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)137 Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
138 Tablerec* regTabPtr)
139 {
140 Uint32 checksum;
141 Uint32 i, rec_size, *tuple_header;
142 rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
143 tuple_header= tuple_ptr->m_data;
144 checksum= 0;
145 // includes tupVersion
146 //printf("%p - ", tuple_ptr);
147
148 for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
149 checksum ^= tuple_header[i];
150 //printf("%.8x ", tuple_header[i]);
151 }
152
153 //printf("-> %.8x\n", checksum);
154
155 #if 0
156 if (var_sized) {
157 /*
158 if (! req_struct->fix_var_together) {
159 jam();
160 checksum ^= tuple_header[rec_size];
161 }
162 */
163 jam();
164 var_data_part= req_struct->var_data_start;
165 vsize_words= calculate_total_var_size(req_struct->var_len_array,
166 regTabPtr->no_var_attr);
167 ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
168 for (i= 0; i < vsize_words; i++) {
169 checksum ^= var_data_part[i];
170 }
171 }
172 #endif
173 return checksum;
174 }
175
176 /* ----------------------------------------------------------------- */
177 /* ----------- INSERT_ACTIVE_OP_LIST -------------- */
178 /* ----------------------------------------------------------------- */
179 bool
insertActiveOpList(OperationrecPtr regOperPtr,KeyReqStruct * req_struct)180 Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
181 KeyReqStruct* req_struct)
182 {
183 OperationrecPtr prevOpPtr;
184 ndbrequire(!regOperPtr.p->op_struct.in_active_list);
185 regOperPtr.p->op_struct.in_active_list= true;
186 req_struct->prevOpPtr.i=
187 prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
188 regOperPtr.p->prevActiveOp= prevOpPtr.i;
189 regOperPtr.p->nextActiveOp= RNIL;
190 regOperPtr.p->m_undo_buffer_space= 0;
191 req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
192 if (prevOpPtr.i == RNIL) {
193 return true;
194 } else {
195 req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
196 prevOpPtr.p->nextActiveOp= regOperPtr.i;
197
198 regOperPtr.p->op_struct.m_wait_log_buffer=
199 prevOpPtr.p->op_struct.m_wait_log_buffer;
200 regOperPtr.p->op_struct.m_load_diskpage_on_commit=
201 prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
202 regOperPtr.p->op_struct.m_gci_written=
203 prevOpPtr.p->op_struct.m_gci_written;
204 regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
205 // start with prev mask (matters only for UPD o UPD)
206
207 regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
208
209 prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
210 prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
211
212 if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
213 {
214 Uint32 op= regOperPtr.p->op_struct.op_type;
215 Uint32 prevOp= prevOpPtr.p->op_struct.op_type;
216 if (prevOp == ZDELETE)
217 {
218 if(op == ZINSERT)
219 {
220 // mark both
221 prevOpPtr.p->op_struct.delete_insert_flag= true;
222 regOperPtr.p->op_struct.delete_insert_flag= true;
223 return true;
224 }
225 else if (op == ZREFRESH)
226 {
227 /* ZREFRESH after Delete - ok */
228 return true;
229 }
230 else
231 {
232 terrorCode= ZTUPLE_DELETED_ERROR;
233 return false;
234 }
235 }
236 else if(op == ZINSERT && prevOp != ZDELETE)
237 {
238 terrorCode= ZINSERT_ERROR;
239 return false;
240 }
241 else if (prevOp == ZREFRESH)
242 {
243 /* No operation after a ZREFRESH */
244 terrorCode= ZOP_AFTER_REFRESH_ERROR;
245 return false;
246 }
247 return true;
248 }
249 else
250 {
251 terrorCode= ZMUST_BE_ABORTED_ERROR;
252 return false;
253 }
254 }
255 }
256
257 bool
setup_read(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,bool disk)258 Dbtup::setup_read(KeyReqStruct *req_struct,
259 Operationrec* regOperPtr,
260 Fragrecord* regFragPtr,
261 Tablerec* regTabPtr,
262 bool disk)
263 {
264 OperationrecPtr currOpPtr;
265 currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
266 Uint32 bits = req_struct->m_tuple_ptr->m_header_bits;
267
268 if (unlikely(req_struct->m_reorg))
269 {
270 Uint32 moved = bits & Tuple_header::REORG_MOVE;
271 if (! ((req_struct->m_reorg == 1 && moved == 0) ||
272 (req_struct->m_reorg == 2 && moved != 0)))
273 {
274 terrorCode= ZTUPLE_DELETED_ERROR;
275 return false;
276 }
277 }
278 if (currOpPtr.i == RNIL)
279 {
280 if (regTabPtr->need_expand(disk))
281 prepare_read(req_struct, regTabPtr, disk);
282 return true;
283 }
284
285 do {
286 Uint32 savepointId= regOperPtr->savepointId;
287 bool dirty= req_struct->dirty_op;
288
289 c_operation_pool.getPtr(currOpPtr);
290 bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
291 req_struct->trans_id1,
292 req_struct->trans_id2);
293 /**
294 * Read committed in same trans reads latest copy
295 */
296 if(dirty && !sameTrans)
297 {
298 savepointId= 0;
299 }
300 else if(sameTrans)
301 {
302 // Use savepoint even in read committed mode
303 dirty= false;
304 }
305
306 /* found == true indicates that savepoint is some state
307 * within tuple's current transaction's uncommitted operations
308 */
309 bool found= find_savepoint(currOpPtr, savepointId);
310
311 Uint32 currOp= currOpPtr.p->op_struct.op_type;
312
313 /* is_insert==true if tuple did not exist before its current
314 * transaction
315 */
316 bool is_insert = (bits & Tuple_header::ALLOC);
317
318 /* If savepoint is in transaction, and post-delete-op
319 * OR
320 * Tuple didn't exist before
321 * AND
322 * Read is dirty
323 * OR
324 * Savepoint is before-transaction
325 *
326 * Tuple does not exist in read's view
327 */
328 if((found && currOp == ZDELETE) ||
329 ((dirty || !found) && is_insert))
330 {
331 /* Tuple not visible to this read operation */
332 terrorCode= ZTUPLE_DELETED_ERROR;
333 break;
334 }
335
336 if(dirty || !found)
337 {
338 /* Read existing committed tuple */
339 }
340 else
341 {
342 req_struct->m_tuple_ptr=
343 get_copy_tuple(&currOpPtr.p->m_copy_tuple_location);
344 }
345
346 if (regTabPtr->need_expand(disk))
347 prepare_read(req_struct, regTabPtr, disk);
348
349 #if 0
350 ndbout_c("reading copy");
351 Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
352 req_struct->m_tuple_ptr= fixed_ptr;
353 req_struct->fix_var_together= true;
354 req_struct->var_len_array= (Uint16*)var_ptr;
355 req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
356 Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
357 req_struct->var_pos_array,
358 regTabPtr->no_var_attr);
359 req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
360 #endif
361 return true;
362 } while(0);
363
364 return false;
365 }
366
367 int
load_diskpage(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)368 Dbtup::load_diskpage(Signal* signal,
369 Uint32 opRec, Uint32 fragPtrI,
370 Uint32 lkey1, Uint32 lkey2, Uint32 flags)
371 {
372 Ptr<Tablerec> tabptr;
373 Ptr<Fragrecord> fragptr;
374 Ptr<Operationrec> operPtr;
375
376 c_operation_pool.getPtr(operPtr, opRec);
377 fragptr.i= fragPtrI;
378 ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
379
380 Operationrec * regOperPtr= operPtr.p;
381 Fragrecord * regFragPtr= fragptr.p;
382
383 tabptr.i = regFragPtr->fragTableId;
384 ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
385 Tablerec* regTabPtr = tabptr.p;
386
387 if (Local_key::ref(lkey1, lkey2) == ~(Uint32)0)
388 {
389 jam();
390 regOperPtr->op_struct.m_wait_log_buffer= 1;
391 regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
392 if (unlikely((flags & 7) == ZREFRESH))
393 {
394 jam();
395 /* Refresh of previously nonexistant DD tuple.
396 * No diskpage to load at commit time
397 */
398 regOperPtr->op_struct.m_wait_log_buffer= 0;
399 regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
400 }
401
402 /* In either case return 1 for 'proceed' */
403 return 1;
404 }
405
406 jam();
407 Uint32 page_idx= lkey2;
408 Uint32 frag_page_id= lkey1;
409 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
410 frag_page_id);
411 regOperPtr->m_tuple_location.m_page_idx= page_idx;
412
413 PagePtr page_ptr;
414 Uint32* tmp= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
415 Tuple_header* ptr= (Tuple_header*)tmp;
416
417 int res= 1;
418 if(ptr->m_header_bits & Tuple_header::DISK_PART)
419 {
420 Page_cache_client::Request req;
421 memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
422 req.m_callback.m_callbackData= opRec;
423 req.m_callback.m_callbackFunction=
424 safe_cast(&Dbtup::disk_page_load_callback);
425
426 #ifdef ERROR_INSERT
427 if (ERROR_INSERTED(4022))
428 {
429 flags |= Page_cache_client::DELAY_REQ;
430 req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
431 }
432 #endif
433
434 Page_cache_client pgman(this, c_pgman);
435 res= pgman.get_page(signal, req, flags);
436 m_pgman_ptr = pgman.m_ptr;
437 if(res > 0)
438 {
439 //ndbout_c("in cache");
440 // In cache
441 }
442 else if(res == 0)
443 {
444 //ndbout_c("waiting for callback");
445 // set state
446 }
447 else
448 {
449 // Error
450 }
451 }
452
453 switch(flags & 7)
454 {
455 case ZREAD:
456 case ZREAD_EX:
457 break;
458 case ZDELETE:
459 case ZUPDATE:
460 case ZINSERT:
461 case ZWRITE:
462 case ZREFRESH:
463 regOperPtr->op_struct.m_wait_log_buffer= 1;
464 regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
465 }
466 return res;
467 }
468
469 void
disk_page_load_callback(Signal * signal,Uint32 opRec,Uint32 page_id)470 Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
471 {
472 Ptr<Operationrec> operPtr;
473 c_operation_pool.getPtr(operPtr, opRec);
474 c_lqh->acckeyconf_load_diskpage_callback(signal,
475 operPtr.p->userpointer, page_id);
476 }
477
478 int
load_diskpage_scan(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)479 Dbtup::load_diskpage_scan(Signal* signal,
480 Uint32 opRec, Uint32 fragPtrI,
481 Uint32 lkey1, Uint32 lkey2, Uint32 flags)
482 {
483 Ptr<Tablerec> tabptr;
484 Ptr<Fragrecord> fragptr;
485 Ptr<Operationrec> operPtr;
486
487 c_operation_pool.getPtr(operPtr, opRec);
488 fragptr.i= fragPtrI;
489 ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
490
491 Operationrec * regOperPtr= operPtr.p;
492 Fragrecord * regFragPtr= fragptr.p;
493
494 tabptr.i = regFragPtr->fragTableId;
495 ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
496 Tablerec* regTabPtr = tabptr.p;
497
498 jam();
499 Uint32 page_idx= lkey2;
500 Uint32 frag_page_id= lkey1;
501 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
502 frag_page_id);
503 regOperPtr->m_tuple_location.m_page_idx= page_idx;
504 regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
505
506 PagePtr page_ptr;
507 Uint32* tmp= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
508 Tuple_header* ptr= (Tuple_header*)tmp;
509
510 int res= 1;
511 if(ptr->m_header_bits & Tuple_header::DISK_PART)
512 {
513 Page_cache_client::Request req;
514 memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
515 req.m_callback.m_callbackData= opRec;
516 req.m_callback.m_callbackFunction=
517 safe_cast(&Dbtup::disk_page_load_scan_callback);
518
519 Page_cache_client pgman(this, c_pgman);
520 res= pgman.get_page(signal, req, flags);
521 m_pgman_ptr = pgman.m_ptr;
522 if(res > 0)
523 {
524 // ndbout_c("in cache");
525 // In cache
526 }
527 else if(res == 0)
528 {
529 //ndbout_c("waiting for callback");
530 // set state
531 }
532 else
533 {
534 // Error
535 }
536 }
537 return res;
538 }
539
540 void
disk_page_load_scan_callback(Signal * signal,Uint32 opRec,Uint32 page_id)541 Dbtup::disk_page_load_scan_callback(Signal* signal,
542 Uint32 opRec, Uint32 page_id)
543 {
544 Ptr<Operationrec> operPtr;
545 c_operation_pool.getPtr(operPtr, opRec);
546 c_lqh->next_scanconf_load_diskpage_callback(signal,
547 operPtr.p->userpointer, page_id);
548 }
549
execTUPKEYREQ(Signal * signal)550 void Dbtup::execTUPKEYREQ(Signal* signal)
551 {
552 TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
553 Ptr<Tablerec> tabptr;
554 Ptr<Fragrecord> fragptr;
555 Ptr<Operationrec> operPtr;
556 KeyReqStruct req_struct(this);
557 Uint32 sig1, sig2, sig3, sig4;
558
559 Uint32 RoperPtr= tupKeyReq->connectPtr;
560 Uint32 Rfragptr= tupKeyReq->fragPtr;
561
562 Uint32 RnoOfFragrec= cnoOfFragrec;
563 Uint32 RnoOfTablerec= cnoOfTablerec;
564
565 jamEntry();
566 fragptr.i= Rfragptr;
567
568 ndbrequire(Rfragptr < RnoOfFragrec);
569
570 c_operation_pool.getPtr(operPtr, RoperPtr);
571 ptrAss(fragptr, fragrecord);
572
573 Uint32 TrequestInfo= tupKeyReq->request;
574
575 Operationrec * regOperPtr= operPtr.p;
576 Fragrecord * regFragPtr= fragptr.p;
577
578 tabptr.i = regFragPtr->fragTableId;
579 ptrCheckGuard(tabptr, RnoOfTablerec, tablerec);
580 Tablerec* regTabPtr = tabptr.p;
581
582 req_struct.tablePtrP = tabptr.p;
583 req_struct.fragPtrP = fragptr.p;
584 req_struct.operPtrP = operPtr.p;
585 req_struct.signal= signal;
586 req_struct.dirty_op= TrequestInfo & 1;
587 req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
588 req_struct.no_fired_triggers= 0;
589 req_struct.read_length= 0;
590 req_struct.last_row= false;
591 req_struct.changeMask.clear();
592 req_struct.m_is_lcp = false;
593
594 if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
595 {
596 TUPKEY_abort(&req_struct, 39);
597 return;
598 }
599
600 /* ----------------------------------------------------------------- */
601 // Operation is ZREAD when we arrive here so no need to worry about the
602 // abort process.
603 /* ----------------------------------------------------------------- */
604 /* ----------- INITIATE THE OPERATION RECORD -------------- */
605 /* ----------------------------------------------------------------- */
606 Uint32 Rstoredid= tupKeyReq->storedProcedure;
607
608 regOperPtr->fragmentPtr= Rfragptr;
609 regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0x7;
610 regOperPtr->op_struct.delete_insert_flag = false;
611 regOperPtr->op_struct.m_reorg = (TrequestInfo >> 12) & 3;
612
613 regOperPtr->m_copy_tuple_location.setNull();
614 regOperPtr->tupVersion= ZNIL;
615
616 sig1= tupKeyReq->savePointId;
617 sig2= tupKeyReq->primaryReplica;
618 sig3= tupKeyReq->keyRef2;
619
620 regOperPtr->savepointId= sig1;
621 regOperPtr->op_struct.primary_replica= sig2;
622 Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
623
624 sig1= tupKeyReq->opRef;
625 sig2= tupKeyReq->tcOpIndex;
626 sig3= tupKeyReq->coordinatorTC;
627 sig4= tupKeyReq->keyRef1;
628
629 req_struct.tc_operation_ptr= sig1;
630 req_struct.TC_index= sig2;
631 req_struct.TC_ref= sig3;
632 Uint32 pageid = req_struct.frag_page_id= sig4;
633 req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
634 req_struct.m_reorg = (TrequestInfo >> 12) & 3;
635
636 sig1= tupKeyReq->attrBufLen;
637 sig2= tupKeyReq->applRef;
638 sig3= tupKeyReq->transId1;
639 sig4= tupKeyReq->transId2;
640
641 Uint32 disk_page= tupKeyReq->disk_page;
642
643 req_struct.log_size= sig1;
644 req_struct.attrinfo_len= sig1;
645 req_struct.rec_blockref= sig2;
646 req_struct.trans_id1= sig3;
647 req_struct.trans_id2= sig4;
648 req_struct.m_disk_page_ptr.i= disk_page;
649
650 sig1 = tupKeyReq->m_row_id_page_no;
651 sig2 = tupKeyReq->m_row_id_page_idx;
652 sig3 = tupKeyReq->deferred_constraints;
653
654 req_struct.m_row_id.m_page_no = sig1;
655 req_struct.m_row_id.m_page_idx = sig2;
656 req_struct.m_deferred_constraints = sig3;
657
658 /* Get AttrInfo section if this is a long TUPKEYREQ */
659 Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
660
661 /* If we have AttrInfo, check we expected it, and
662 * that we don't have AttrInfo by another means
663 */
664 ndbassert( (attrInfoIVal == RNIL) ||
665 (tupKeyReq->attrBufLen > 0));
666
667 Uint32 Roptype = regOperPtr->op_struct.op_type;
668
669 if (Rstoredid != ZNIL) {
670 /* This is part of a scan, get attrInfoIVal for
671 * given stored procedure
672 */
673 ndbrequire(getStoredProcAttrInfo(Rstoredid,
674 &req_struct,
675 attrInfoIVal) == ZOK);
676 }
677
678 /* Copy AttrInfo from section into linear in-buffer */
679 copyAttrinfo(regOperPtr,
680 &cinBuffer[0],
681 req_struct.attrinfo_len,
682 attrInfoIVal);
683
684 regOperPtr->op_struct.m_gci_written = 0;
685
686 if (Roptype == ZINSERT && Local_key::isInvalid(pageid, pageidx))
687 {
688 // No tuple allocated yet
689 goto do_insert;
690 }
691
692 if (Roptype == ZREFRESH && Local_key::isInvalid(pageid, pageidx))
693 {
694 // No tuple allocated yet
695 goto do_refresh;
696 }
697
698 if (unlikely(isCopyTuple(pageid, pageidx)))
699 {
700 /**
701 * Only LCP reads a copy-tuple "directly"
702 */
703 ndbassert(Roptype == ZREAD);
704 ndbassert(disk_page == RNIL);
705 setup_lcp_read_copy_tuple(&req_struct, regOperPtr, regFragPtr, regTabPtr);
706 goto do_read;
707 }
708
709 /**
710 * Get pointer to tuple
711 */
712 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
713 req_struct.frag_page_id);
714
715 setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
716
717 /**
718 * Check operation
719 */
720 if (Roptype == ZREAD) {
721 jam();
722
723 if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
724 disk_page != RNIL))
725 {
726 do_read:
727 if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
728 {
729 req_struct.log_size= 0;
730 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
731 /* ---------------------------------------------------------------- */
732 // Read Operations need not to be taken out of any lists.
733 // We also do not need to wait for commit since there is no changes
734 // to commit. Thus we
735 // prepare the operation record already now for the next operation.
736 // Write operations have set the state to STARTED above indicating
737 // that they are waiting for the Commit or Abort decision.
738 /* ---------------------------------------------------------------- */
739 set_trans_state(regOperPtr, TRANS_IDLE);
740 }
741 return;
742 }
743 tupkeyErrorLab(&req_struct);
744 return;
745 }
746
747 if(insertActiveOpList(operPtr, &req_struct))
748 {
749 if(Roptype == ZINSERT)
750 {
751 jam();
752 do_insert:
753 Local_key accminupdate;
754 Local_key * accminupdateptr = &accminupdate;
755 if (unlikely(handleInsertReq(signal, operPtr,
756 fragptr, regTabPtr, &req_struct,
757 &accminupdateptr) == -1))
758 {
759 return;
760 }
761
762 terrorCode = 0;
763 checkImmediateTriggersAfterInsert(&req_struct,
764 regOperPtr,
765 regTabPtr,
766 disk_page != RNIL);
767
768 if (unlikely(terrorCode != 0))
769 {
770 tupkeyErrorLab(&req_struct);
771 return;
772 }
773
774 if (!regTabPtr->tuxCustomTriggers.isEmpty())
775 {
776 jam();
777 if (unlikely(executeTuxInsertTriggers(signal,
778 regOperPtr,
779 regFragPtr,
780 regTabPtr) != 0))
781 {
782 jam();
783 /*
784 * TUP insert succeeded but add of TUX entries failed. All
785 * TUX changes have been rolled back at this point.
786 *
787 * We will abort via tupkeyErrorLab() as usual. This routine
788 * however resets the operation to ZREAD. The TUP_ABORTREQ
789 * arriving later cannot then undo the insert.
790 *
791 * Therefore we call TUP_ABORTREQ already now. Diskdata etc
792 * should be in memory and timeslicing cannot occur. We must
793 * skip TUX abort triggers since TUX is already aborted.
794 */
795 signal->theData[0] = operPtr.i;
796 do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
797 tupkeyErrorLab(&req_struct);
798 return;
799 }
800 }
801
802 if (accminupdateptr)
803 {
804 /**
805 * Update ACC local-key, once *everything* has completed succesfully
806 */
807 c_lqh->accminupdate(signal,
808 regOperPtr->userpointer,
809 accminupdateptr);
810 }
811
812 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
813 return;
814 }
815
816 if (Roptype == ZUPDATE) {
817 jam();
818 if (unlikely(handleUpdateReq(signal, regOperPtr,
819 regFragPtr, regTabPtr,
820 &req_struct, disk_page != RNIL) == -1))
821 {
822 return;
823 }
824
825 terrorCode = 0;
826 checkImmediateTriggersAfterUpdate(&req_struct,
827 regOperPtr,
828 regTabPtr,
829 disk_page != RNIL);
830
831 if (unlikely(terrorCode != 0))
832 {
833 tupkeyErrorLab(&req_struct);
834 return;
835 }
836
837 if (!regTabPtr->tuxCustomTriggers.isEmpty())
838 {
839 jam();
840 if (unlikely(executeTuxUpdateTriggers(signal,
841 regOperPtr,
842 regFragPtr,
843 regTabPtr) != 0))
844 {
845 jam();
846 /*
847 * See insert case.
848 */
849 signal->theData[0] = operPtr.i;
850 do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
851 tupkeyErrorLab(&req_struct);
852 return;
853 }
854 }
855
856 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
857 return;
858 }
859 else if(Roptype == ZDELETE)
860 {
861 jam();
862 req_struct.log_size= 0;
863 if (unlikely(handleDeleteReq(signal, regOperPtr,
864 regFragPtr, regTabPtr,
865 &req_struct,
866 disk_page != RNIL) == -1))
867 {
868 return;
869 }
870
871 terrorCode = 0;
872 checkImmediateTriggersAfterDelete(&req_struct,
873 regOperPtr,
874 regTabPtr,
875 disk_page != RNIL);
876
877 if (unlikely(terrorCode != 0))
878 {
879 tupkeyErrorLab(&req_struct);
880 return;
881 }
882
883 /*
884 * TUX doesn't need to check for triggers at delete since entries in
885 * the index are kept until commit time.
886 */
887
888 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
889 return;
890 }
891 else if (Roptype == ZREFRESH)
892 {
893 /**
894 * No TUX or immediate triggers, just detached triggers
895 */
896 do_refresh:
897 if (unlikely(handleRefreshReq(signal, operPtr,
898 fragptr, regTabPtr,
899 &req_struct, disk_page != RNIL) == -1))
900 {
901 return;
902 }
903
904 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
905 return;
906
907 }
908 else
909 {
910 ndbrequire(false); // Invalid op type
911 }
912 }
913
914 tupkeyErrorLab(&req_struct);
915 }
916
917 void
setup_fixed_part(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)918 Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
919 Operationrec* regOperPtr,
920 Tablerec* regTabPtr)
921 {
922 PagePtr page_ptr;
923 Uint32* ptr= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
924 req_struct->m_page_ptr = page_ptr;
925 req_struct->m_tuple_ptr = (Tuple_header*)ptr;
926
927 ndbassert(regOperPtr->op_struct.op_type == ZINSERT || (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
928
929 req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
930 req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
931
932 Uint32 num_attr= regTabPtr->m_no_of_attributes;
933 Uint32 descr_start= regTabPtr->tabDescriptor;
934 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
935 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
936 req_struct->attr_descr= tab_descr;
937 }
938
939 void
setup_lcp_read_copy_tuple(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)940 Dbtup::setup_lcp_read_copy_tuple(KeyReqStruct* req_struct,
941 Operationrec* regOperPtr,
942 Fragrecord* regFragPtr,
943 Tablerec* regTabPtr)
944 {
945 Local_key tmp;
946 tmp.m_page_no = req_struct->frag_page_id;
947 tmp.m_page_idx = regOperPtr->m_tuple_location.m_page_idx;
948 clearCopyTuple(tmp.m_page_no, tmp.m_page_idx);
949
950 Uint32 * copytuple = get_copy_tuple_raw(&tmp);
951 Local_key rowid;
952 memcpy(&rowid, copytuple+0, sizeof(Local_key));
953
954 req_struct->frag_page_id = rowid.m_page_no;
955 regOperPtr->m_tuple_location.m_page_idx = rowid.m_page_idx;
956
957 Tuple_header * th = get_copy_tuple(copytuple);
958 req_struct->m_page_ptr.setNull();
959 req_struct->m_tuple_ptr = (Tuple_header*)th;
960 th->m_operation_ptr_i = RNIL;
961 ndbassert((th->m_header_bits & Tuple_header::COPY_TUPLE) != 0);
962
963 Uint32 num_attr= regTabPtr->m_no_of_attributes;
964 Uint32 descr_start= regTabPtr->tabDescriptor;
965 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
966 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
967 req_struct->attr_descr= tab_descr;
968
969 bool disk = false;
970 if (regTabPtr->need_expand(disk))
971 {
972 jam();
973 prepare_read(req_struct, regTabPtr, disk);
974 }
975 }
976
977 /* ---------------------------------------------------------------- */
978 /* ------------------------ CONFIRM REQUEST ----------------------- */
979 /* ---------------------------------------------------------------- */
sendTUPKEYCONF(Signal * signal,KeyReqStruct * req_struct,Operationrec * regOperPtr)980 void Dbtup::sendTUPKEYCONF(Signal* signal,
981 KeyReqStruct *req_struct,
982 Operationrec * regOperPtr)
983 {
984 TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
985
986 Uint32 Rcreate_rowid = req_struct->m_use_rowid;
987 Uint32 RuserPointer= regOperPtr->userpointer;
988 Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
989 Uint32 log_size= req_struct->log_size;
990 Uint32 read_length= req_struct->read_length;
991 Uint32 last_row= req_struct->last_row;
992
993 set_trans_state(regOperPtr, TRANS_STARTED);
994 set_tuple_state(regOperPtr, TUPLE_PREPARED);
995 tupKeyConf->userPtr= RuserPointer;
996 tupKeyConf->readLength= read_length;
997 tupKeyConf->writeLength= log_size;
998 tupKeyConf->noFiredTriggers= RnoFiredTriggers;
999 tupKeyConf->lastRow= last_row;
1000 tupKeyConf->rowid = Rcreate_rowid;
1001
1002 EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
1003 TupKeyConf::SignalLength);
1004
1005 }
1006
1007
1008 #define MAX_READ (MIN(sizeof(signal->theData), MAX_SEND_MESSAGE_BYTESIZE))
1009
1010 /* ---------------------------------------------------------------- */
1011 /* ----------------------------- READ ---------------------------- */
1012 /* ---------------------------------------------------------------- */
handleReadReq(Signal * signal,Operationrec * regOperPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)1013 int Dbtup::handleReadReq(Signal* signal,
1014 Operationrec* regOperPtr,
1015 Tablerec* regTabPtr,
1016 KeyReqStruct* req_struct)
1017 {
1018 Uint32 *dst;
1019 Uint32 dstLen, start_index;
1020 const BlockReference sendBref= req_struct->rec_blockref;
1021 if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1022 (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) {
1023 jam();
1024 ndbout_c("here2");
1025 terrorCode= ZTUPLE_CORRUPTED_ERROR;
1026 tupkeyErrorLab(req_struct);
1027 return -1;
1028 }
1029
1030 const Uint32 node = refToNode(sendBref);
1031 if(node != 0 && node != getOwnNodeId()) {
1032 start_index= 25;
1033 } else {
1034 jam();
1035 /**
1036 * execute direct
1037 */
1038 start_index= 3;
1039 }
1040 dst= &signal->theData[start_index];
1041 dstLen= (MAX_READ / 4) - start_index;
1042 if (!req_struct->interpreted_exec) {
1043 jam();
1044 int ret = readAttributes(req_struct,
1045 &cinBuffer[0],
1046 req_struct->attrinfo_len,
1047 dst,
1048 dstLen,
1049 false);
1050 if (likely(ret >= 0)) {
1051 /* ------------------------------------------------------------------------- */
1052 // We have read all data into coutBuffer. Now send it to the API.
1053 /* ------------------------------------------------------------------------- */
1054 jam();
1055 Uint32 TnoOfDataRead= (Uint32) ret;
1056 req_struct->read_length += TnoOfDataRead;
1057 sendReadAttrinfo(signal, req_struct, TnoOfDataRead, regOperPtr);
1058 return 0;
1059 }
1060 else
1061 {
1062 terrorCode = Uint32(-ret);
1063 }
1064 } else {
1065 jam();
1066 if (likely(interpreterStartLab(signal, req_struct) != -1)) {
1067 return 0;
1068 }
1069 return -1;
1070 }
1071
1072 jam();
1073 tupkeyErrorLab(req_struct);
1074 return -1;
1075 }
1076
1077 static
1078 void
handle_reorg(Dbtup::KeyReqStruct * req_struct,Dbtup::Fragrecord::FragState state)1079 handle_reorg(Dbtup::KeyReqStruct * req_struct,
1080 Dbtup::Fragrecord::FragState state)
1081 {
1082 Uint32 reorg = req_struct->m_reorg;
1083 switch(state){
1084 case Dbtup::Fragrecord::FS_FREE:
1085 case Dbtup::Fragrecord::FS_REORG_NEW:
1086 case Dbtup::Fragrecord::FS_REORG_COMMIT_NEW:
1087 case Dbtup::Fragrecord::FS_REORG_COMPLETE_NEW:
1088 return;
1089 case Dbtup::Fragrecord::FS_REORG_COMMIT:
1090 case Dbtup::Fragrecord::FS_REORG_COMPLETE:
1091 if (reorg != 1)
1092 return;
1093 break;
1094 case Dbtup::Fragrecord::FS_ONLINE:
1095 if (reorg != 2)
1096 return;
1097 break;
1098 default:
1099 return;
1100 }
1101 req_struct->m_tuple_ptr->m_header_bits |= Dbtup::Tuple_header::REORG_MOVE;
1102 }
1103
1104 /* ---------------------------------------------------------------- */
1105 /* ---------------------------- UPDATE ---------------------------- */
1106 /* ---------------------------------------------------------------- */
handleUpdateReq(Signal * signal,Operationrec * operPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)1107 int Dbtup::handleUpdateReq(Signal* signal,
1108 Operationrec* operPtrP,
1109 Fragrecord* regFragPtr,
1110 Tablerec* regTabPtr,
1111 KeyReqStruct* req_struct,
1112 bool disk)
1113 {
1114 Tuple_header *dst;
1115 Tuple_header *base= req_struct->m_tuple_ptr, *org;
1116 ChangeMask * change_mask_ptr;
1117 if ((dst= alloc_copy_tuple(regTabPtr, &operPtrP->m_copy_tuple_location))== 0)
1118 {
1119 terrorCode= ZMEM_NOMEM_ERROR;
1120 goto error;
1121 }
1122
1123 Uint32 tup_version;
1124 change_mask_ptr = get_change_mask_ptr(regTabPtr, dst);
1125 if(operPtrP->is_first_operation())
1126 {
1127 org= req_struct->m_tuple_ptr;
1128 tup_version= org->get_tuple_version();
1129 clear_change_mask_info(regTabPtr, change_mask_ptr);
1130 }
1131 else
1132 {
1133 Operationrec* prevOp= req_struct->prevOpPtr.p;
1134 tup_version= prevOp->tupVersion;
1135 Uint32 * rawptr = get_copy_tuple_raw(&prevOp->m_copy_tuple_location);
1136 org= get_copy_tuple(rawptr);
1137 copy_change_mask_info(regTabPtr,
1138 change_mask_ptr,
1139 get_change_mask_ptr(rawptr));
1140 }
1141
1142 /**
1143 * Check consistency before update/delete
1144 */
1145 req_struct->m_tuple_ptr= org;
1146 if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1147 (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
1148 {
1149 terrorCode= ZTUPLE_CORRUPTED_ERROR;
1150 goto error;
1151 }
1152
1153 req_struct->m_tuple_ptr= dst;
1154
1155 union {
1156 Uint32 sizes[4];
1157 Uint64 cmp[2];
1158 };
1159
1160 disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
1161 if (regTabPtr->need_expand(disk))
1162 {
1163 expand_tuple(req_struct, sizes, org, regTabPtr, disk);
1164 if(disk && operPtrP->m_undo_buffer_space == 0)
1165 {
1166 operPtrP->op_struct.m_wait_log_buffer = 1;
1167 operPtrP->op_struct.m_load_diskpage_on_commit = 1;
1168 Uint32 sz= operPtrP->m_undo_buffer_space=
1169 (sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
1170
1171 D("Logfile_client - handleUpdateReq");
1172 Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1173 terrorCode= lgman.alloc_log_space(sz);
1174 if(unlikely(terrorCode))
1175 {
1176 operPtrP->m_undo_buffer_space= 0;
1177 goto error;
1178 }
1179 }
1180 }
1181 else
1182 {
1183 memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1184 req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1185 }
1186
1187 tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
1188 operPtrP->tupVersion= tup_version;
1189
1190 req_struct->optimize_options = 0;
1191
1192 if (!req_struct->interpreted_exec) {
1193 jam();
1194
1195 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1196 {
1197 jam();
1198 Uint32 attrId =
1199 regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1200
1201 store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
1202 }
1203 int retValue = updateAttributes(req_struct,
1204 &cinBuffer[0],
1205 req_struct->attrinfo_len);
1206 if (unlikely(retValue < 0))
1207 {
1208 terrorCode = Uint32(-retValue);
1209 goto error;
1210 }
1211 } else {
1212 jam();
1213 if (unlikely(interpreterStartLab(signal, req_struct) == -1))
1214 return -1;
1215 }
1216
1217 update_change_mask_info(regTabPtr,
1218 change_mask_ptr,
1219 req_struct->changeMask.rep.data);
1220
1221 switch (req_struct->optimize_options) {
1222 case AttributeHeader::OPTIMIZE_MOVE_VARPART:
1223 /**
1224 * optimize varpart of tuple, move varpart of tuple from
1225 * big-free-size page list into small-free-size page list
1226 */
1227 if(base->m_header_bits & Tuple_header::VAR_PART)
1228 optimize_var_part(req_struct, base, operPtrP,
1229 regFragPtr, regTabPtr);
1230 break;
1231 case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
1232 //TODO: move fix part of tuple
1233 break;
1234 default:
1235 break;
1236 }
1237
1238 if (regTabPtr->need_shrink())
1239 {
1240 shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
1241 if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
1242 base,
1243 operPtrP,
1244 regFragPtr,
1245 regTabPtr,
1246 sizes)) {
1247 goto error;
1248 }
1249 }
1250
1251 if (req_struct->m_reorg)
1252 {
1253 handle_reorg(req_struct, regFragPtr->fragStatus);
1254 }
1255
1256 req_struct->m_tuple_ptr->set_tuple_version(tup_version);
1257 if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
1258 jam();
1259 setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1260 }
1261
1262 set_tuple_state(operPtrP, TUPLE_PREPARED);
1263
1264 return 0;
1265
1266 error:
1267 tupkeyErrorLab(req_struct);
1268 return -1;
1269 }
1270
1271 /*
1272 expand_dyn_part - copy dynamic attributes to fully expanded size.
1273
1274 Both variable-sized and fixed-size attributes are stored in the same way
1275 in the expanded form as variable-sized attributes (in expand_var_part()).
1276
1277 This method is used for both mem and disk dynamic data.
1278
1279 dst Destination for expanded data
1280 tabPtrP Table descriptor
1281 src Pointer to the start of dynamic bitmap in source row
1282 row_len Total number of 32-bit words in dynamic part of row
1283 tabDesc Array of table descriptors
1284 order Array of indexes into tabDesc, dynfix followed by dynvar
1285 */
1286 static
1287 Uint32*
expand_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,Uint32 row_len,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 max_bmlen)1288 expand_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1289 const Uint32* src,
1290 Uint32 row_len,
1291 const Uint32 * tabDesc,
1292 const Uint16* order,
1293 Uint32 dynvar,
1294 Uint32 dynfix,
1295 Uint32 max_bmlen)
1296 {
1297 /* Copy the bitmap, zeroing out any words not stored in the row. */
1298 Uint32 *dst_bm_ptr= (Uint32*)dst->m_dyn_data_ptr;
1299 Uint32 bm_len = row_len ? (* src & Dbtup::DYN_BM_LEN_MASK) : 0;
1300
1301 assert(bm_len <= max_bmlen);
1302
1303 if(bm_len > 0)
1304 memcpy(dst_bm_ptr, src, 4*bm_len);
1305 if(bm_len < max_bmlen)
1306 bzero(dst_bm_ptr + bm_len, 4 * (max_bmlen - bm_len));
1307
1308 /**
1309 * Store max_bmlen for homogen code in DbtupRoutines
1310 */
1311 Uint32 tmp = (* dst_bm_ptr);
1312 * dst_bm_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | max_bmlen;
1313
1314 char *src_off_start= (char*)(src + bm_len);
1315 assert((UintPtr(src_off_start)&3) == 0);
1316 Uint16 *src_off_ptr= (Uint16*)src_off_start;
1317
1318 /*
1319 Prepare the variable-sized dynamic attributes, copying out data from the
1320 source row for any that are not NULL.
1321 */
1322 Uint32 no_attr= dst->m_dyn_len_offset;
1323 Uint16* dst_off_ptr= dst->m_dyn_offset_arr_ptr;
1324 Uint16* dst_len_ptr= dst_off_ptr + no_attr;
1325 Uint16 this_src_off= row_len ? * src_off_ptr++ : 0;
1326 /* We need to reserve room for the offsets written by shrink_tuple+padding.*/
1327 Uint16 dst_off= 4 * (max_bmlen + ((dynvar+2)>>1));
1328 char *dst_ptr= (char*)dst_bm_ptr + dst_off;
1329 for(Uint32 i= 0; i<dynvar; i++)
1330 {
1331 Uint16 j= order[dynfix+i];
1332 Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
1333 Uint32 len;
1334 Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1335 if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1336 {
1337 Uint16 next_src_off= *src_off_ptr++;
1338 len= next_src_off - this_src_off;
1339 memcpy(dst_ptr, src_off_start+this_src_off, len);
1340 this_src_off= next_src_off;
1341 }
1342 else
1343 {
1344 len= 0;
1345 }
1346 dst_off_ptr[i]= dst_off;
1347 dst_len_ptr[i]= dst_off+len;
1348 dst_off+= max_len;
1349 dst_ptr+= max_len;
1350 }
1351 /*
1352 The fixed-size data is stored 32-bit aligned after the variable-sized
1353 data.
1354 */
1355 char *src_ptr= src_off_start+this_src_off;
1356 src_ptr= (char *)(ALIGN_WORD(src_ptr));
1357
1358 /*
1359 Prepare the fixed-size dynamic attributes, copying out data from the
1360 source row for any that are not NULL.
1361 Note that the fixed-size data is stored in reverse from the end of the
1362 dynamic part of the row. This is true both for the stored/shrunken and
1363 for the expanded form.
1364 */
1365 for(Uint32 i= dynfix; i>0; )
1366 {
1367 i--;
1368 Uint16 j= order[i];
1369 Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1370 dst_off_ptr[dynvar+i]= dst_off;
1371 /* len offset array is not used for fixed size. */
1372 Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1373 if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1374 {
1375 assert((UintPtr(dst_ptr)&3) == 0);
1376 memcpy(dst_ptr, src_ptr, fix_size);
1377 src_ptr+= fix_size;
1378 }
1379 dst_off+= fix_size;
1380 dst_ptr+= fix_size;
1381 }
1382
1383 return (Uint32 *)dst_ptr;
1384 }
1385
1386 static
1387 Uint32*
shrink_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,Uint32 * dst_ptr,const Dbtup::Tablerec * tabPtrP,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 ind)1388 shrink_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1389 Uint32 *dst_ptr,
1390 const Dbtup::Tablerec* tabPtrP,
1391 const Uint32 * tabDesc,
1392 const Uint16* order,
1393 Uint32 dynvar,
1394 Uint32 dynfix,
1395 Uint32 ind)
1396 {
1397 /**
1398 * Now build the dynamic part, if any.
1399 * First look for any trailing all-NULL words of the bitmap; we do
1400 * not need to store those.
1401 */
1402 assert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
1403 char *dyn_src_ptr= dst->m_dyn_data_ptr;
1404 Uint32 bm_len = tabPtrP->m_offsets[ind].m_dyn_null_words; // In words
1405
1406 /* If no dynamic variables, store nothing. */
1407 assert(bm_len);
1408 {
1409 /**
1410 * clear bm-len bits, so they won't incorrect indicate
1411 * a non-zero map
1412 */
1413 * ((Uint32 *)dyn_src_ptr) &= ~Uint32(Dbtup::DYN_BM_LEN_MASK);
1414
1415 Uint32 *bm_ptr= (Uint32 *)dyn_src_ptr + bm_len - 1;
1416 while(*bm_ptr == 0)
1417 {
1418 bm_ptr--;
1419 bm_len--;
1420 if(bm_len == 0)
1421 break;
1422 }
1423 }
1424
1425 if (bm_len)
1426 {
1427 /**
1428 * Copy the bitmap, counting the number of variable sized
1429 * attributes that are not NULL on the way.
1430 */
1431 Uint32 *dyn_dst_ptr= dst_ptr;
1432 Uint32 dyn_var_count= 0;
1433 const Uint32 *src_bm_ptr= (Uint32 *)(dyn_src_ptr);
1434 Uint32 *dst_bm_ptr= (Uint32 *)dyn_dst_ptr;
1435
1436 /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
1437 * split to separate function. */
1438 Uint16 dyn_dst_data_offset= 0;
1439 const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask[ind];
1440 for(Uint16 i= 0; i< bm_len; i++)
1441 {
1442 Uint32 v= src_bm_ptr[i];
1443 dyn_var_count+= BitmaskImpl::count_bits(v & *dyn_bm_var_mask_ptr++);
1444 dst_bm_ptr[i]= v;
1445 }
1446
1447 Uint32 tmp = *dyn_dst_ptr;
1448 assert(bm_len <= Dbtup::DYN_BM_LEN_MASK);
1449 * dyn_dst_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | bm_len;
1450 dyn_dst_ptr+= bm_len;
1451 dyn_dst_data_offset= 2*dyn_var_count + 2;
1452
1453 Uint16 *dyn_src_off_array= dst->m_dyn_offset_arr_ptr;
1454 Uint16 *dyn_src_lenoff_array=
1455 dyn_src_off_array + dst->m_dyn_len_offset;
1456 Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
1457
1458 /**
1459 * Copy over the variable sized not-NULL attributes.
1460 * Data offsets are counted from the start of the offset array, and
1461 * we store one additional offset to be able to easily compute the
1462 * data length as the difference between offsets.
1463 */
1464 Uint16 off_idx= 0;
1465 for(Uint32 i= 0; i<dynvar; i++)
1466 {
1467 /**
1468 * Note that we must use the destination (shrunken) bitmap here,
1469 * as the source (expanded) bitmap may have been already clobbered
1470 * (by offset data).
1471 */
1472 Uint32 attrDesc2 = tabDesc[order[dynfix+i]+1];
1473 Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1474 if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1475 {
1476 dyn_dst_off_array[off_idx++]= dyn_dst_data_offset;
1477 Uint32 dyn_src_off= dyn_src_off_array[i];
1478 Uint32 dyn_len= dyn_src_lenoff_array[i] - dyn_src_off;
1479 memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
1480 dyn_src_ptr + dyn_src_off,
1481 dyn_len);
1482 dyn_dst_data_offset+= dyn_len;
1483 }
1484 }
1485 /* If all dynamic attributes are NULL, we store nothing. */
1486 dyn_dst_off_array[off_idx]= dyn_dst_data_offset;
1487 assert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
1488
1489 char *dynvar_end_ptr= ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
1490 char *dyn_dst_data_ptr= (char *)(ALIGN_WORD(dynvar_end_ptr));
1491
1492 /**
1493 * Zero out any padding bytes. Might not be strictly necessary,
1494 * but seems cleaner than leaving random stuff in there.
1495 */
1496 bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
1497
1498 /* *
1499 * Copy over the fixed-sized not-NULL attributes.
1500 * Note that attributes are copied in reverse order; this is to avoid
1501 * overwriting not-yet-copied data, as the data is also stored in
1502 * reverse order.
1503 */
1504 for(Uint32 i= dynfix; i > 0; )
1505 {
1506 i--;
1507 Uint16 j= order[i];
1508 Uint32 attrDesc2 = tabDesc[j+1];
1509 Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1510 if(bm_len > (pos >>5 ) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1511 {
1512 Uint32 fixsize=
1513 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1514 memmove(dyn_dst_data_ptr,
1515 dyn_src_ptr + dyn_src_off_array[dynvar+i],
1516 fixsize);
1517 dyn_dst_data_ptr += fixsize;
1518 }
1519 }
1520 dst_ptr = (Uint32*)dyn_dst_data_ptr;
1521 assert((UintPtr(dst_ptr) & 3) == 0);
1522 }
1523 return (Uint32 *)dst_ptr;
1524 }
1525
1526 /* ---------------------------------------------------------------- */
1527 /* ----------------------------- INSERT --------------------------- */
1528 /* ---------------------------------------------------------------- */
1529 void
prepare_initial_insert(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1530 Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
1531 Operationrec* regOperPtr,
1532 Tablerec* regTabPtr)
1533 {
1534 Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ?
1535 sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
1536 regOperPtr->nextActiveOp= RNIL;
1537 regOperPtr->prevActiveOp= RNIL;
1538 regOperPtr->op_struct.in_active_list= true;
1539 regOperPtr->m_undo_buffer_space= disk_undo;
1540
1541 req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1542 req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1543
1544 Uint32 num_attr= regTabPtr->m_no_of_attributes;
1545 Uint32 descr_start= regTabPtr->tabDescriptor;
1546 Uint32 order_desc= regTabPtr->m_real_order_descriptor;
1547 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1548 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1549 req_struct->attr_descr= tab_descr;
1550 Uint16* order= (Uint16*)&tableDescriptor[order_desc];
1551 order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
1552
1553 Uint32 bits = Tuple_header::COPY_TUPLE;
1554 bits |= disk_undo ? (Tuple_header::DISK_ALLOC|Tuple_header::DISK_INLINE) : 0;
1555
1556 const Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
1557 const Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
1558 const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
1559 const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
1560 const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
1561 Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
1562 Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
1563
1564 if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
1565 {
1566 ref->m_page_no = RNIL;
1567 ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
1568 }
1569
1570 if(mm_vars || mm_dyns)
1571 {
1572 jam();
1573 /* Init Varpart_copy struct */
1574 Varpart_copy * cp = (Varpart_copy*)ptr;
1575 cp->m_len = 0;
1576 ptr += Varpart_copy::SZ32;
1577
1578 /* Prepare empty varsize part. */
1579 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
1580
1581 if (mm_vars)
1582 {
1583 dst->m_data_ptr= (char*)(((Uint16*)ptr)+mm_vars+1);
1584 dst->m_offset_array_ptr= req_struct->var_pos_array;
1585 dst->m_var_len_offset= mm_vars;
1586 dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
1587
1588 Uint32 pos= 0;
1589 Uint16 *pos_ptr = req_struct->var_pos_array;
1590 Uint16 *len_ptr = pos_ptr + mm_vars;
1591 for(Uint32 i= 0; i<mm_vars; i++)
1592 {
1593 * pos_ptr++ = pos;
1594 * len_ptr++ = pos;
1595 pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
1596 }
1597
1598 // Disk/dynamic part is 32-bit aligned
1599 ptr = ALIGN_WORD(dst->m_data_ptr+pos);
1600 ndbassert(ptr == ALIGN_WORD(dst->m_data_ptr +
1601 regTabPtr->m_offsets[MM].m_max_var_offset));
1602 }
1603
1604 if (mm_dyns)
1605 {
1606 jam();
1607 /* Prepare empty dynamic part. */
1608 dst->m_dyn_data_ptr= (char *)ptr;
1609 dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
1610 dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
1611 dst->m_max_dyn_offset= regTabPtr->m_offsets[MM].m_max_dyn_offset;
1612
1613 ptr = expand_dyn_part(dst, 0, 0,
1614 (Uint32*)tab_descr, order,
1615 mm_dynvar, mm_dynfix,
1616 regTabPtr->m_offsets[MM].m_dyn_null_words);
1617 }
1618
1619 ndbassert((UintPtr(ptr)&3) == 0);
1620 }
1621
1622 req_struct->m_disk_ptr= (Tuple_header*)ptr;
1623
1624 ndbrequire(dd_vars == 0);
1625
1626 req_struct->m_tuple_ptr->m_header_bits= bits;
1627
1628 // Set all null bits
1629 memset(req_struct->m_tuple_ptr->m_null_bits+
1630 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1631 4*regTabPtr->m_offsets[MM].m_null_words);
1632 memset(req_struct->m_disk_ptr->m_null_bits+
1633 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1634 4*regTabPtr->m_offsets[DD].m_null_words);
1635 }
1636
handleInsertReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> fragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,Local_key ** accminupdateptr)1637 int Dbtup::handleInsertReq(Signal* signal,
1638 Ptr<Operationrec> regOperPtr,
1639 Ptr<Fragrecord> fragPtr,
1640 Tablerec* regTabPtr,
1641 KeyReqStruct *req_struct,
1642 Local_key ** accminupdateptr)
1643 {
1644 Uint32 tup_version = 1;
1645 Fragrecord* regFragPtr = fragPtr.p;
1646 Uint32 *ptr= 0;
1647 Tuple_header *dst;
1648 Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
1649 Tuple_header *tuple_ptr;
1650
1651 bool disk = regTabPtr->m_no_of_disk_attributes > 0;
1652 bool mem_insert = regOperPtr.p->is_first_operation();
1653 bool disk_insert = mem_insert && disk;
1654 bool vardynsize = (regTabPtr->m_attributes[MM].m_no_of_varsize ||
1655 regTabPtr->m_attributes[MM].m_no_of_dynamic);
1656 bool varalloc = vardynsize || regTabPtr->m_bits & Tablerec::TR_ForceVarPart;
1657 bool rowid = req_struct->m_use_rowid;
1658 bool update_acc = false;
1659 Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1660 Uint32 frag_page_id = req_struct->frag_page_id;
1661
1662 union {
1663 Uint32 sizes[4];
1664 Uint64 cmp[2];
1665 };
1666 cmp[0] = cmp[1] = 0;
1667
1668 if (ERROR_INSERTED(4014))
1669 {
1670 dst = 0;
1671 goto undo_buffer_error;
1672 }
1673
1674 dst= alloc_copy_tuple(regTabPtr, ®OperPtr.p->m_copy_tuple_location);
1675
1676 if (unlikely(dst == 0))
1677 {
1678 goto undo_buffer_error;
1679 }
1680 tuple_ptr= req_struct->m_tuple_ptr= dst;
1681 set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
1682
1683 if(mem_insert)
1684 {
1685 jam();
1686 prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
1687 }
1688 else
1689 {
1690 Operationrec* prevOp= req_struct->prevOpPtr.p;
1691 ndbassert(prevOp->op_struct.op_type == ZDELETE);
1692 tup_version= prevOp->tupVersion + 1;
1693
1694 if(!prevOp->is_first_operation())
1695 org= get_copy_tuple(&prevOp->m_copy_tuple_location);
1696 if (regTabPtr->need_expand())
1697 {
1698 expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
1699 memset(req_struct->m_disk_ptr->m_null_bits+
1700 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1701 4*regTabPtr->m_offsets[DD].m_null_words);
1702
1703 Uint32 bm_size_in_bytes= 4*(regTabPtr->m_offsets[MM].m_dyn_null_words);
1704 if (bm_size_in_bytes)
1705 {
1706 Uint32* ptr =
1707 (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
1708 bzero(ptr, bm_size_in_bytes);
1709 * ptr = bm_size_in_bytes >> 2;
1710 }
1711 }
1712 else
1713 {
1714 memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1715 tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1716 }
1717 memset(tuple_ptr->m_null_bits+
1718 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1719 4*regTabPtr->m_offsets[MM].m_null_words);
1720 }
1721
1722 int res;
1723 if (disk_insert)
1724 {
1725 if (ERROR_INSERTED(4015))
1726 {
1727 terrorCode = 1501;
1728 goto log_space_error;
1729 }
1730
1731 D("Logfile_client - handleInsertReq");
1732 Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1733 res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space);
1734 if(unlikely(res))
1735 {
1736 terrorCode= res;
1737 goto log_space_error;
1738 }
1739 }
1740
1741 regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
1742 tuple_ptr->set_tuple_version(tup_version);
1743
1744 if (ERROR_INSERTED(4016))
1745 {
1746 terrorCode = ZAI_INCONSISTENCY_ERROR;
1747 goto update_error;
1748 }
1749
1750 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1751 {
1752 Uint32 attrId =
1753 regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1754
1755 store_extra_row_bits(attrId, regTabPtr, tuple_ptr, /* default */ 0, false);
1756 }
1757
1758 if (!regTabPtr->m_default_value_location.isNull())
1759 {
1760 jam();
1761 Uint32 default_values_len;
1762 /* Get default values ptr + len for this table */
1763 Uint32* default_values = get_default_ptr(regTabPtr, default_values_len);
1764 ndbrequire(default_values_len != 0 && default_values != NULL);
1765 /*
1766 * Update default values into row first,
1767 * next update with data received from the client.
1768 */
1769 if(unlikely((res = updateAttributes(req_struct, default_values,
1770 default_values_len)) < 0))
1771 {
1772 jam();
1773 terrorCode = Uint32(-res);
1774 goto update_error;
1775 }
1776 }
1777
1778 if(unlikely((res = updateAttributes(req_struct, &cinBuffer[0],
1779 req_struct->attrinfo_len)) < 0))
1780 {
1781 terrorCode = Uint32(-res);
1782 goto update_error;
1783 }
1784
1785 if (ERROR_INSERTED(4017))
1786 {
1787 goto null_check_error;
1788 }
1789 if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
1790 {
1791 goto null_check_error;
1792 }
1793
1794 if (req_struct->m_is_lcp)
1795 {
1796 jam();
1797 sizes[2+MM] = req_struct->m_lcp_varpart_len;
1798 }
1799 else if (regTabPtr->need_shrink())
1800 {
1801 shrink_tuple(req_struct, sizes+2, regTabPtr, true);
1802 }
1803
1804 if (ERROR_INSERTED(4025))
1805 {
1806 goto mem_error;
1807 }
1808
1809 if (ERROR_INSERTED(4026))
1810 {
1811 CLEAR_ERROR_INSERT_VALUE;
1812 goto mem_error;
1813 }
1814
1815 if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
1816 {
1817 goto mem_error;
1818 }
1819
1820 if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
1821 {
1822 CLEAR_ERROR_INSERT_VALUE;
1823 goto mem_error;
1824 }
1825
1826 /**
1827 * Alloc memory
1828 */
1829 if(mem_insert)
1830 {
1831 if (!rowid)
1832 {
1833 if (ERROR_INSERTED(4018))
1834 {
1835 goto mem_error;
1836 }
1837
1838 if (!varalloc)
1839 {
1840 jam();
1841 ptr= alloc_fix_rec(&terrorCode,
1842 regFragPtr,
1843 regTabPtr,
1844 ®OperPtr.p->m_tuple_location,
1845 &frag_page_id);
1846 }
1847 else
1848 {
1849 jam();
1850 regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1851 ptr= alloc_var_rec(&terrorCode,
1852 regFragPtr, regTabPtr,
1853 sizes[2+MM],
1854 ®OperPtr.p->m_tuple_location,
1855 &frag_page_id);
1856 }
1857 if (unlikely(ptr == 0))
1858 {
1859 goto mem_error;
1860 }
1861 req_struct->m_use_rowid = true;
1862 }
1863 else
1864 {
1865 regOperPtr.p->m_tuple_location = req_struct->m_row_id;
1866 if (ERROR_INSERTED(4019))
1867 {
1868 terrorCode = ZROWID_ALLOCATED;
1869 goto alloc_rowid_error;
1870 }
1871
1872 if (!varalloc)
1873 {
1874 jam();
1875 ptr= alloc_fix_rowid(&terrorCode,
1876 regFragPtr,
1877 regTabPtr,
1878 ®OperPtr.p->m_tuple_location,
1879 &frag_page_id);
1880 }
1881 else
1882 {
1883 jam();
1884 regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1885 ptr= alloc_var_rowid(&terrorCode,
1886 regFragPtr, regTabPtr,
1887 sizes[2+MM],
1888 ®OperPtr.p->m_tuple_location,
1889 &frag_page_id);
1890 }
1891 if (unlikely(ptr == 0))
1892 {
1893 jam();
1894 goto alloc_rowid_error;
1895 }
1896 }
1897 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1898 update_acc = true; /* Will be updated later once success is known */
1899
1900 base = (Tuple_header*)ptr;
1901 base->m_operation_ptr_i= regOperPtr.i;
1902 base->m_header_bits= Tuple_header::ALLOC |
1903 (sizes[2+MM] > 0 ? Tuple_header::VAR_PART : 0);
1904 }
1905 else
1906 {
1907 if (ERROR_INSERTED(4020))
1908 {
1909 goto size_change_error;
1910 }
1911
1912 if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
1913 unlikely(handle_size_change_after_update(req_struct,
1914 base,
1915 regOperPtr.p,
1916 regFragPtr,
1917 regTabPtr,
1918 sizes) != 0))
1919 {
1920 goto size_change_error;
1921 }
1922 req_struct->m_use_rowid = false;
1923 base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
1924 }
1925
1926 if (disk_insert)
1927 {
1928 Local_key tmp;
1929 Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
1930 1 : sizes[2+DD];
1931
1932 if (ERROR_INSERTED(4021))
1933 {
1934 terrorCode = 1601;
1935 goto disk_prealloc_error;
1936 }
1937
1938 int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
1939 if (unlikely(ret < 0))
1940 {
1941 terrorCode = -ret;
1942 goto disk_prealloc_error;
1943 }
1944
1945 regOperPtr.p->op_struct.m_disk_preallocated= 1;
1946 tmp.m_page_idx= size;
1947 memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
1948
1949 /**
1950 * Set ref from disk to mm
1951 */
1952 Local_key ref = regOperPtr.p->m_tuple_location;
1953 ref.m_page_no = frag_page_id;
1954
1955 Tuple_header* disk_ptr= req_struct->m_disk_ptr;
1956 disk_ptr->m_header_bits = 0;
1957 disk_ptr->m_base_record_ref= ref.ref();
1958 }
1959
1960 if (req_struct->m_reorg)
1961 {
1962 handle_reorg(req_struct, regFragPtr->fragStatus);
1963 }
1964
1965 /* Have been successful with disk + mem, update ACC to point to
1966 * new record if necessary
1967 * Failures in disk alloc will skip this part
1968 */
1969 if (update_acc)
1970 {
1971 /* Acc stores the local key with the frag_page_id rather
1972 * than the real_page_id
1973 */
1974 ndbassert(regOperPtr.p->m_tuple_location.m_page_no == real_page_id);
1975
1976 Local_key accKey = regOperPtr.p->m_tuple_location;
1977 accKey.m_page_no = frag_page_id;
1978 ** accminupdateptr = accKey;
1979 }
1980 else
1981 {
1982 * accminupdateptr = 0; // No accminupdate should be performed
1983 }
1984
1985 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
1986 {
1987 jam();
1988 setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1989 }
1990
1991 set_tuple_state(regOperPtr.p, TUPLE_PREPARED);
1992
1993 return 0;
1994
1995 size_change_error:
1996 jam();
1997 terrorCode = ZMEM_NOMEM_ERROR;
1998 goto exit_error;
1999
2000 undo_buffer_error:
2001 jam();
2002 terrorCode= ZMEM_NOMEM_ERROR;
2003 regOperPtr.p->m_undo_buffer_space = 0;
2004 if (mem_insert)
2005 regOperPtr.p->m_tuple_location.setNull();
2006 regOperPtr.p->m_copy_tuple_location.setNull();
2007 tupkeyErrorLab(req_struct);
2008 return -1;
2009
2010 null_check_error:
2011 jam();
2012 terrorCode= ZNO_ILLEGAL_NULL_ATTR;
2013 goto update_error;
2014
2015 mem_error:
2016 jam();
2017 terrorCode= ZMEM_NOMEM_ERROR;
2018 goto update_error;
2019
2020 log_space_error:
2021 jam();
2022 regOperPtr.p->m_undo_buffer_space = 0;
2023 alloc_rowid_error:
2024 jam();
2025 update_error:
2026 jam();
2027 if (mem_insert)
2028 {
2029 regOperPtr.p->op_struct.in_active_list = false;
2030 regOperPtr.p->m_tuple_location.setNull();
2031 }
2032 exit_error:
2033 tupkeyErrorLab(req_struct);
2034 return -1;
2035
2036 disk_prealloc_error:
2037 base->m_header_bits |= Tuple_header::FREED;
2038 goto exit_error;
2039 }
2040
2041 /* ---------------------------------------------------------------- */
2042 /* ---------------------------- DELETE ---------------------------- */
2043 /* ---------------------------------------------------------------- */
handleDeleteReq(Signal * signal,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2044 int Dbtup::handleDeleteReq(Signal* signal,
2045 Operationrec* regOperPtr,
2046 Fragrecord* regFragPtr,
2047 Tablerec* regTabPtr,
2048 KeyReqStruct *req_struct,
2049 bool disk)
2050 {
2051 Tuple_header* dst = alloc_copy_tuple(regTabPtr,
2052 ®OperPtr->m_copy_tuple_location);
2053 if (dst == 0) {
2054 terrorCode = ZMEM_NOMEM_ERROR;
2055 goto error;
2056 }
2057
2058 // delete must set but not increment tupVersion
2059 if (!regOperPtr->is_first_operation())
2060 {
2061 Operationrec* prevOp= req_struct->prevOpPtr.p;
2062 regOperPtr->tupVersion= prevOp->tupVersion;
2063 // make copy since previous op is committed before this one
2064 const Tuple_header* org = get_copy_tuple(&prevOp->m_copy_tuple_location);
2065 Uint32 len = regTabPtr->total_rec_size -
2066 Uint32(((Uint32*)dst) -
2067 get_copy_tuple_raw(®OperPtr->m_copy_tuple_location));
2068 memcpy(dst, org, 4 * len);
2069 req_struct->m_tuple_ptr = dst;
2070 }
2071 else
2072 {
2073 regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
2074 if (regTabPtr->m_no_of_disk_attributes)
2075 {
2076 dst->m_header_bits = req_struct->m_tuple_ptr->m_header_bits;
2077 memcpy(dst->get_disk_ref_ptr(regTabPtr),
2078 req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
2079 sizeof(Local_key));
2080 }
2081 }
2082 req_struct->changeMask.set();
2083 set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
2084
2085 if(disk && regOperPtr->m_undo_buffer_space == 0)
2086 {
2087 regOperPtr->op_struct.m_wait_log_buffer = 1;
2088 regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
2089 Uint32 sz= regOperPtr->m_undo_buffer_space=
2090 (sizeof(Dbtup::Disk_undo::Free) >> 2) +
2091 regTabPtr->m_offsets[DD].m_fix_header_size - 1;
2092
2093 D("Logfile_client - handleDeleteReq");
2094 Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
2095 terrorCode= lgman.alloc_log_space(sz);
2096 if(unlikely(terrorCode))
2097 {
2098 regOperPtr->m_undo_buffer_space= 0;
2099 goto error;
2100 }
2101 }
2102
2103 set_tuple_state(regOperPtr, TUPLE_PREPARED);
2104
2105 if (req_struct->attrinfo_len == 0)
2106 {
2107 return 0;
2108 }
2109
2110 if (regTabPtr->need_expand(disk))
2111 {
2112 prepare_read(req_struct, regTabPtr, disk);
2113 }
2114
2115 {
2116 Uint32 RlogSize;
2117 int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
2118 if (ret == 0 && (RlogSize= req_struct->log_size))
2119 {
2120 jam();
2121 sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2122 }
2123 return ret;
2124 }
2125
2126 error:
2127 tupkeyErrorLab(req_struct);
2128 return -1;
2129 }
2130
2131 int
handleRefreshReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2132 Dbtup::handleRefreshReq(Signal* signal,
2133 Ptr<Operationrec> regOperPtr,
2134 Ptr<Fragrecord> regFragPtr,
2135 Tablerec* regTabPtr,
2136 KeyReqStruct *req_struct,
2137 bool disk)
2138 {
2139 /* Here we setup the tuple so that a transition to its current
2140 * state can be observed by SUMA's detached triggers.
2141 *
2142 * If the tuple does not exist then we fabricate a tuple
2143 * so that it can appear to be 'deleted'.
2144 * The fabricated tuple may have invalid NULL values etc.
2145 * If the tuple does exist then we fabricate a null-change
2146 * update to the tuple.
2147 *
2148 * The logic differs depending on whether there are already
2149 * other operations on the tuple in this transaction.
2150 * No other operations (including Refresh) are allowed after
2151 * a refresh.
2152 */
2153 Uint32 refresh_case;
2154 if (regOperPtr.p->is_first_operation())
2155 {
2156 jam();
2157 if (Local_key::isInvalid(req_struct->frag_page_id,
2158 regOperPtr.p->m_tuple_location.m_page_idx))
2159 {
2160 jam();
2161 refresh_case = Operationrec::RF_SINGLE_NOT_EXIST;
2162 //ndbout_c("case 1");
2163 /**
2164 * This is refresh of non-existing tuple...
2165 * i.e "delete", reuse initial insert
2166 */
2167 Local_key accminupdate;
2168 Local_key * accminupdateptr = &accminupdate;
2169
2170 /**
2171 * We don't need ...in this scenario
2172 * - disk
2173 * - default values
2174 */
2175 Uint32 save_disk = regTabPtr->m_no_of_disk_attributes;
2176 Local_key save_defaults = regTabPtr->m_default_value_location;
2177 Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2178 regTabPtr->notNullAttributeMask;
2179
2180 regTabPtr->m_no_of_disk_attributes = 0;
2181 regTabPtr->m_default_value_location.setNull();
2182 regOperPtr.p->op_struct.op_type = ZINSERT;
2183
2184 /**
2185 * Update notNullAttributeMask to only include primary keys
2186 */
2187 regTabPtr->notNullAttributeMask.clear();
2188 const Uint32 * primarykeys =
2189 (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2190 for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2191 regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2192
2193 int res = handleInsertReq(signal, regOperPtr,
2194 regFragPtr, regTabPtr, req_struct,
2195 &accminupdateptr);
2196
2197 regTabPtr->m_no_of_disk_attributes = save_disk;
2198 regTabPtr->m_default_value_location = save_defaults;
2199 regTabPtr->notNullAttributeMask = save_mask;
2200
2201 if (unlikely(res == -1))
2202 {
2203 return -1;
2204 }
2205
2206 regOperPtr.p->op_struct.op_type = ZREFRESH;
2207
2208 if (accminupdateptr)
2209 {
2210 /**
2211 * Update ACC local-key, once *everything* has completed succesfully
2212 */
2213 c_lqh->accminupdate(signal,
2214 regOperPtr.p->userpointer,
2215 accminupdateptr);
2216 }
2217 }
2218 else
2219 {
2220 refresh_case = Operationrec::RF_SINGLE_EXIST;
2221 //ndbout_c("case 2");
2222 jam();
2223
2224 Uint32 tup_version_save = req_struct->m_tuple_ptr->get_tuple_version();
2225 Uint32 new_tup_version = decr_tup_version(tup_version_save);
2226 Tuple_header* origTuple = req_struct->m_tuple_ptr;
2227 origTuple->set_tuple_version(new_tup_version);
2228 int res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2229 regTabPtr, req_struct, disk);
2230 /* Now we must reset the original tuple header back
2231 * to the original version.
2232 * The copy tuple will have the correct version due to
2233 * the update incrementing it.
2234 * On commit, the tuple becomes the copy tuple.
2235 * On abort, the original tuple remains. If we don't
2236 * reset it here, then aborts cause the version to
2237 * decrease
2238 */
2239 origTuple->set_tuple_version(tup_version_save);
2240 if (res == -1)
2241 return -1;
2242 }
2243 }
2244 else
2245 {
2246 /* Not first operation on tuple in transaction */
2247 jam();
2248
2249 Uint32 tup_version_save = req_struct->prevOpPtr.p->tupVersion;
2250 Uint32 new_tup_version = decr_tup_version(tup_version_save);
2251 req_struct->prevOpPtr.p->tupVersion = new_tup_version;
2252
2253 int res;
2254 if (req_struct->prevOpPtr.p->op_struct.op_type == ZDELETE)
2255 {
2256 refresh_case = Operationrec::RF_MULTI_NOT_EXIST;
2257 //ndbout_c("case 3");
2258
2259 jam();
2260 /**
2261 * We don't need ...in this scenario
2262 * - default values
2263 *
2264 * We keep disk attributes to avoid issues with 'insert'
2265 */
2266 Local_key save_defaults = regTabPtr->m_default_value_location;
2267 Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2268 regTabPtr->notNullAttributeMask;
2269
2270 regTabPtr->m_default_value_location.setNull();
2271 regOperPtr.p->op_struct.op_type = ZINSERT;
2272
2273 /**
2274 * Update notNullAttributeMask to only include primary keys
2275 */
2276 regTabPtr->notNullAttributeMask.clear();
2277 const Uint32 * primarykeys =
2278 (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2279 for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2280 regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2281
2282 /**
2283 * This is multi-update + DELETE + REFRESH
2284 */
2285 Local_key * accminupdateptr = 0;
2286 res = handleInsertReq(signal, regOperPtr,
2287 regFragPtr, regTabPtr, req_struct,
2288 &accminupdateptr);
2289
2290 regTabPtr->m_default_value_location = save_defaults;
2291 regTabPtr->notNullAttributeMask = save_mask;
2292
2293 if (unlikely(res == -1))
2294 {
2295 return -1;
2296 }
2297
2298 regOperPtr.p->op_struct.op_type = ZREFRESH;
2299 }
2300 else
2301 {
2302 jam();
2303 refresh_case = Operationrec::RF_MULTI_EXIST;
2304 //ndbout_c("case 4");
2305 /**
2306 * This is multi-update + INSERT/UPDATE + REFRESH
2307 */
2308 res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2309 regTabPtr, req_struct, disk);
2310 }
2311 req_struct->prevOpPtr.p->tupVersion = tup_version_save;
2312 if (res == -1)
2313 return -1;
2314 }
2315
2316 /* Store the refresh scenario in the copy tuple location */
2317 // TODO : Verify this is never used as a copy tuple location!
2318 regOperPtr.p->m_copy_tuple_location.m_file_no = refresh_case;
2319 return 0;
2320 }
2321
2322 bool
checkNullAttributes(KeyReqStruct * req_struct,Tablerec * regTabPtr)2323 Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
2324 Tablerec* regTabPtr)
2325 {
2326 // Implement checking of updating all not null attributes in an insert here.
2327 Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
2328 /*
2329 * The idea here is maybe that changeMask is not-null attributes
2330 * and must contain notNullAttributeMask. But:
2331 *
2332 * 1. changeMask has all bits set on insert
2333 * 2. not-null is checked in each UpdateFunction
2334 * 3. the code below does not work except trivially due to 1.
2335 *
2336 * XXX remove or fix
2337 */
2338 attributeMask.clear();
2339 attributeMask.bitOR(req_struct->changeMask);
2340 attributeMask.bitAND(regTabPtr->notNullAttributeMask);
2341 attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
2342 if (!attributeMask.isclear()) {
2343 return false;
2344 }
2345 return true;
2346 }
2347
2348 /* ---------------------------------------------------------------- */
2349 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE */
2350 /* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
2351 /* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL */
2352 /* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR PHASES*/
2353 /* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
2354 /* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
2355 /* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE */
2356 /* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
2357 /* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL. */
2358 /* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
2359 /* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN */
2360 /* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND */
2361 /* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM */
2362 /* TO THE CLIENT APPLICATION. */
2363 /* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
2364 /* THE INTERPRETER EXECUTION REGION. */
2365 /* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS */
2366 /* */
2367 /* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
2368 /* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
2369 /* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED */
2370 /* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER. */
2371 /* */
2372 /* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY */
2373 /* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO */
2374 /* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN */
2375 /* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS. */
2376 /* */
2377 /* */
2378 /* ----------------------------------------- */
2379 /* + INITIAL READ REGION + */
2380 /* ----------------------------------------- */
2381 /* + INTERPRETED EXECUTE REGION + */
2382 /* ----------------------------------------- */
2383 /* + FINAL UPDATE REGION + */
2384 /* ----------------------------------------- */
2385 /* + FINAL READ REGION + */
2386 /* ----------------------------------------- */
2387 /* + SUBROUTINE REGION + */
2388 /* ----------------------------------------- */
2389 /* ---------------------------------------------------------------- */
2390 /* ---------------------------------------------------------------- */
2391 /* ----------------- INTERPRETED EXECUTION ----------------------- */
2392 /* ---------------------------------------------------------------- */
interpreterStartLab(Signal * signal,KeyReqStruct * req_struct)2393 int Dbtup::interpreterStartLab(Signal* signal,
2394 KeyReqStruct *req_struct)
2395 {
2396 Operationrec * const regOperPtr = req_struct->operPtrP;
2397 int TnoDataRW;
2398 Uint32 RtotalLen, start_index, dstLen;
2399 Uint32 *dst;
2400
2401 Uint32 RinitReadLen= cinBuffer[0];
2402 Uint32 RexecRegionLen= cinBuffer[1];
2403 Uint32 RfinalUpdateLen= cinBuffer[2];
2404 Uint32 RfinalRLen= cinBuffer[3];
2405 Uint32 RsubLen= cinBuffer[4];
2406
2407 Uint32 RattrinbufLen= req_struct->attrinfo_len;
2408 const BlockReference sendBref= req_struct->rec_blockref;
2409
2410 const Uint32 node = refToNode(sendBref);
2411 if(node != 0 && node != getOwnNodeId()) {
2412 start_index= 25;
2413 } else {
2414 jam();
2415 /**
2416 * execute direct
2417 */
2418 start_index= 3;
2419 }
2420 dst= &signal->theData[start_index];
2421 dstLen= (MAX_READ / 4) - start_index;
2422
2423 RtotalLen= RinitReadLen;
2424 RtotalLen += RexecRegionLen;
2425 RtotalLen += RfinalUpdateLen;
2426 RtotalLen += RfinalRLen;
2427 RtotalLen += RsubLen;
2428
2429 Uint32 RattroutCounter= 0;
2430 Uint32 RinstructionCounter= 5;
2431
2432 /* All information to be logged/propagated to replicas
2433 * is generated from here on so reset the log word count
2434 */
2435 Uint32 RlogSize= req_struct->log_size= 0;
2436 if (((RtotalLen + 5) == RattrinbufLen) &&
2437 (RattrinbufLen >= 5) &&
2438 (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
2439 /* ---------------------------------------------------------------- */
2440 // We start by checking consistency. We must have the first five
2441 // words of the ATTRINFO to give us the length of the regions. The
2442 // size of these regions must be the same as the total ATTRINFO
2443 // length and finally the total length must be within the limits.
2444 /* ---------------------------------------------------------------- */
2445
2446 if (RinitReadLen > 0) {
2447 jam();
2448 /* ---------------------------------------------------------------- */
2449 // The first step that can be taken in the interpreter is to read
2450 // data of the tuple before any updates have been applied.
2451 /* ---------------------------------------------------------------- */
2452 TnoDataRW= readAttributes(req_struct,
2453 &cinBuffer[5],
2454 RinitReadLen,
2455 &dst[0],
2456 dstLen,
2457 false);
2458 if (TnoDataRW >= 0) {
2459 RattroutCounter= TnoDataRW;
2460 RinstructionCounter += RinitReadLen;
2461 } else {
2462 jam();
2463 terrorCode = Uint32(-TnoDataRW);
2464 tupkeyErrorLab(req_struct);
2465 return -1;
2466 }
2467 }
2468 if (RexecRegionLen > 0) {
2469 jam();
2470 /* ---------------------------------------------------------------- */
2471 // The next step is the actual interpreted execution. This executes
2472 // a register-based virtual machine which can read and write attributes
2473 // to and from registers.
2474 /* ---------------------------------------------------------------- */
2475 Uint32 RsubPC= RinstructionCounter + RexecRegionLen
2476 + RfinalUpdateLen + RfinalRLen;
2477 TnoDataRW= interpreterNextLab(signal,
2478 req_struct,
2479 &clogMemBuffer[0],
2480 &cinBuffer[RinstructionCounter],
2481 RexecRegionLen,
2482 &cinBuffer[RsubPC],
2483 RsubLen,
2484 &coutBuffer[0],
2485 sizeof(coutBuffer) / 4);
2486 if (TnoDataRW != -1) {
2487 RinstructionCounter += RexecRegionLen;
2488 RlogSize= TnoDataRW;
2489 } else {
2490 jam();
2491 /**
2492 * TUPKEY REF is sent from within interpreter
2493 */
2494 return -1;
2495 }
2496 }
2497
2498 if ((RlogSize > 0) ||
2499 (RfinalUpdateLen > 0))
2500 {
2501 /* Operation updates row,
2502 * reset author pseudo-col before update takes effect
2503 * This should probably occur only if the interpreted program
2504 * did not explicitly write the value, but that requires a bit
2505 * to record whether the value has been written.
2506 */
2507 Tablerec* regTabPtr = req_struct->tablePtrP;
2508 Tuple_header* dst = req_struct->m_tuple_ptr;
2509
2510 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
2511 {
2512 Uint32 attrId =
2513 regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
2514
2515 store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
2516 }
2517 }
2518
2519 if (RfinalUpdateLen > 0) {
2520 jam();
2521 /* ---------------------------------------------------------------- */
2522 // We can also apply a set of updates without any conditions as part
2523 // of the interpreted execution.
2524 /* ---------------------------------------------------------------- */
2525 if (regOperPtr->op_struct.op_type == ZUPDATE) {
2526 TnoDataRW= updateAttributes(req_struct,
2527 &cinBuffer[RinstructionCounter],
2528 RfinalUpdateLen);
2529 if (TnoDataRW >= 0) {
2530 MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
2531 &cinBuffer[RinstructionCounter],
2532 RfinalUpdateLen);
2533 RinstructionCounter += RfinalUpdateLen;
2534 RlogSize += RfinalUpdateLen;
2535 } else {
2536 jam();
2537 terrorCode = Uint32(-TnoDataRW);
2538 tupkeyErrorLab(req_struct);
2539 return -1;
2540 }
2541 } else {
2542 return TUPKEY_abort(req_struct, 19);
2543 }
2544 }
2545 if (RfinalRLen > 0) {
2546 jam();
2547 /* ---------------------------------------------------------------- */
2548 // The final action is that we can also read the tuple after it has
2549 // been updated.
2550 /* ---------------------------------------------------------------- */
2551 TnoDataRW= readAttributes(req_struct,
2552 &cinBuffer[RinstructionCounter],
2553 RfinalRLen,
2554 &dst[RattroutCounter],
2555 (dstLen - RattroutCounter),
2556 false);
2557 if (TnoDataRW >= 0) {
2558 RattroutCounter += TnoDataRW;
2559 } else {
2560 jam();
2561 terrorCode = Uint32(-TnoDataRW);
2562 tupkeyErrorLab(req_struct);
2563 return -1;
2564 }
2565 }
2566 /* Add log words explicitly generated here to existing log size
2567 * - readAttributes can generate log for ANYVALUE column
2568 * It adds the words directly to req_struct->log_size
2569 * This is used for ANYVALUE and interpreted delete.
2570 */
2571 req_struct->log_size+= RlogSize;
2572 req_struct->read_length += RattroutCounter;
2573 sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
2574 if (RlogSize > 0) {
2575 return sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2576 }
2577 return 0;
2578 } else {
2579 return TUPKEY_abort(req_struct, 22);
2580 }
2581 }
2582
2583 /* ---------------------------------------------------------------- */
2584 /* WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
2585 /* BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY */
2586 /* NODES. */
2587 /* INPUT: LOG_ATTRINFOPTR WHERE TO FETCH DATA FROM */
2588 /* TLOG_START FIRST INDEX TO LOG */
2589 /* TLOG_END LAST INDEX + 1 TO LOG */
2590 /* ---------------------------------------------------------------- */
sendLogAttrinfo(Signal * signal,KeyReqStruct * req_struct,Uint32 TlogSize,Operationrec * const regOperPtr)2591 int Dbtup::sendLogAttrinfo(Signal* signal,
2592 KeyReqStruct * req_struct,
2593 Uint32 TlogSize,
2594 Operationrec * const regOperPtr)
2595
2596 {
2597 /* Copy from Log buffer to segmented section,
2598 * then attach to ATTRINFO and execute direct
2599 * to LQH
2600 */
2601 ndbrequire( TlogSize > 0 );
2602 Uint32 longSectionIVal= RNIL;
2603 bool ok= appendToSection(longSectionIVal,
2604 &clogMemBuffer[0],
2605 TlogSize);
2606 if (unlikely(!ok))
2607 {
2608 /* Resource error, abort transaction */
2609 terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
2610 tupkeyErrorLab(req_struct);
2611 return -1;
2612 }
2613
2614 /* Send a TUP_ATTRINFO signal to LQH, which contains
2615 * the relevant user pointer and the attrinfo section's
2616 * IVAL
2617 */
2618 signal->theData[0]= regOperPtr->userpointer;
2619 signal->theData[1]= TlogSize;
2620 signal->theData[2]= longSectionIVal;
2621
2622 EXECUTE_DIRECT(DBLQH,
2623 GSN_TUP_ATTRINFO,
2624 signal,
2625 3);
2626 return 0;
2627 }
2628
2629 inline
2630 Uint32
brancher(Uint32 TheInstruction,Uint32 TprogramCounter)2631 Dbtup::brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
2632 {
2633 Uint32 TbranchDirection= TheInstruction >> 31;
2634 Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
2635 TprogramCounter--;
2636 if (TbranchDirection == 1) {
2637 jam();
2638 /* ---------------------------------------------------------------- */
2639 /* WE JUMP BACKWARDS. */
2640 /* ---------------------------------------------------------------- */
2641 return (TprogramCounter - TbranchLength);
2642 } else {
2643 jam();
2644 /* ---------------------------------------------------------------- */
2645 /* WE JUMP FORWARD. */
2646 /* ---------------------------------------------------------------- */
2647 return (TprogramCounter + TbranchLength);
2648 }
2649 }
2650
2651 const Uint32 *
lookupInterpreterParameter(Uint32 paramNo,const Uint32 * subptr,Uint32 sublen) const2652 Dbtup::lookupInterpreterParameter(Uint32 paramNo,
2653 const Uint32 * subptr,
2654 Uint32 sublen) const
2655 {
2656 /**
2657 * The parameters...are stored in the subroutine section
2658 *
2659 * WORD2 WORD3 WORD4 WORD5
2660 * [ P0 HEADER ] [ P0 DATA ] [ P1 HEADER ] [ P1 DATA ]
2661 *
2662 *
2663 * len=4 <=> 1 word
2664 */
2665 Uint32 pos = 0;
2666 while (paramNo)
2667 {
2668 const Uint32 * head = subptr + pos;
2669 Uint32 len = AttributeHeader::getDataSize(* head);
2670 paramNo --;
2671 pos += 1 + len;
2672 if (unlikely(pos >= sublen))
2673 return 0;
2674 }
2675
2676 const Uint32 * head = subptr + pos;
2677 Uint32 len = AttributeHeader::getDataSize(* head);
2678 if (unlikely(pos + 1 + len > sublen))
2679 return 0;
2680
2681 return head;
2682 }
2683
interpreterNextLab(Signal * signal,KeyReqStruct * req_struct,Uint32 * logMemory,Uint32 * mainProgram,Uint32 TmainProgLen,Uint32 * subroutineProg,Uint32 TsubroutineLen,Uint32 * tmpArea,Uint32 tmpAreaSz)2684 int Dbtup::interpreterNextLab(Signal* signal,
2685 KeyReqStruct* req_struct,
2686 Uint32* logMemory,
2687 Uint32* mainProgram,
2688 Uint32 TmainProgLen,
2689 Uint32* subroutineProg,
2690 Uint32 TsubroutineLen,
2691 Uint32 * tmpArea,
2692 Uint32 tmpAreaSz)
2693 {
2694 register Uint32* TcurrentProgram= mainProgram;
2695 register Uint32 TcurrentSize= TmainProgLen;
2696 register Uint32 RnoOfInstructions= 0;
2697 register Uint32 TprogramCounter= 0;
2698 register Uint32 theInstruction;
2699 register Uint32 theRegister;
2700 Uint32 TdataWritten= 0;
2701 Uint32 RstackPtr= 0;
2702 union {
2703 Uint32 TregMemBuffer[32];
2704 Uint64 align[16];
2705 };
2706 (void)align; // kill warning
2707 Uint32 TstackMemBuffer[32];
2708
2709 /* ---------------------------------------------------------------- */
2710 // Initialise all 8 registers to contain the NULL value.
2711 // In this version we can handle 32 and 64 bit unsigned integers.
2712 // They are handled as 64 bit values. Thus the 32 most significant
2713 // bits are zeroed for 32 bit values.
2714 /* ---------------------------------------------------------------- */
2715 TregMemBuffer[0]= 0;
2716 TregMemBuffer[4]= 0;
2717 TregMemBuffer[8]= 0;
2718 TregMemBuffer[12]= 0;
2719 TregMemBuffer[16]= 0;
2720 TregMemBuffer[20]= 0;
2721 TregMemBuffer[24]= 0;
2722 TregMemBuffer[28]= 0;
2723 Uint32 tmpHabitant= ~0;
2724
2725 while (RnoOfInstructions < 8000) {
2726 /* ---------------------------------------------------------------- */
2727 /* EXECUTE THE NEXT INTERPRETER INSTRUCTION. */
2728 /* ---------------------------------------------------------------- */
2729 RnoOfInstructions++;
2730 theInstruction= TcurrentProgram[TprogramCounter];
2731 theRegister= Interpreter::getReg1(theInstruction) << 2;
2732 #ifdef TRACE_INTERPRETER
2733 ndbout_c("Interpreter : RnoOfInstructions : %u. TprogramCounter : %u. Opcode : %u",
2734 RnoOfInstructions, TprogramCounter, Interpreter::getOpCode(theInstruction));
2735 #endif
2736 if (TprogramCounter < TcurrentSize) {
2737 TprogramCounter++;
2738 switch (Interpreter::getOpCode(theInstruction)) {
2739 case Interpreter::READ_ATTR_INTO_REG:
2740 jam();
2741 /* ---------------------------------------------------------------- */
2742 // Read an attribute from the tuple into a register.
2743 // While reading an attribute we allow the attribute to be an array
2744 // as long as it fits in the 64 bits of the register.
2745 /* ---------------------------------------------------------------- */
2746 {
2747 Uint32 theAttrinfo= theInstruction;
2748 int TnoDataRW= readAttributes(req_struct,
2749 &theAttrinfo,
2750 (Uint32)1,
2751 &TregMemBuffer[theRegister],
2752 (Uint32)3,
2753 false);
2754 if (TnoDataRW == 2) {
2755 /* ------------------------------------------------------------- */
2756 // Two words read means that we get the instruction plus one 32
2757 // word read. Thus we set the register to be a 32 bit register.
2758 /* ------------------------------------------------------------- */
2759 TregMemBuffer[theRegister]= 0x50;
2760 // arithmetic conversion if big-endian
2761 * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
2762 } else if (TnoDataRW == 3) {
2763 /* ------------------------------------------------------------- */
2764 // Three words read means that we get the instruction plus two
2765 // 32 words read. Thus we set the register to be a 64 bit register.
2766 /* ------------------------------------------------------------- */
2767 TregMemBuffer[theRegister]= 0x60;
2768 TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
2769 TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
2770 } else if (TnoDataRW == 1) {
2771 /* ------------------------------------------------------------- */
2772 // One word read means that we must have read a NULL value. We set
2773 // the register to indicate a NULL value.
2774 /* ------------------------------------------------------------- */
2775 TregMemBuffer[theRegister]= 0;
2776 TregMemBuffer[theRegister + 2]= 0;
2777 TregMemBuffer[theRegister + 3]= 0;
2778 } else if (TnoDataRW < 0) {
2779 jam();
2780 terrorCode = Uint32(-TnoDataRW);
2781 tupkeyErrorLab(req_struct);
2782 return -1;
2783 } else {
2784 /* ------------------------------------------------------------- */
2785 // Any other return value from the read attribute here is not
2786 // allowed and will lead to a system crash.
2787 /* ------------------------------------------------------------- */
2788 ndbrequire(false);
2789 }
2790 break;
2791 }
2792
2793 case Interpreter::WRITE_ATTR_FROM_REG:
2794 jam();
2795 {
2796 Uint32 TattrId= theInstruction >> 16;
2797 Uint32 TattrDescrIndex= req_struct->tablePtrP->tabDescriptor +
2798 (TattrId << ZAD_LOG_SIZE);
2799 Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
2800 Uint32 TregType= TregMemBuffer[theRegister];
2801
2802 /* --------------------------------------------------------------- */
2803 // Calculate the number of words of this attribute.
2804 // We allow writes into arrays as long as they fit into the 64 bit
2805 // register size.
2806 /* --------------------------------------------------------------- */
2807 Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
2808 Uint32 Toptype = req_struct->operPtrP->op_struct.op_type;
2809 Uint32 TdataForUpdate[3];
2810 Uint32 Tlen;
2811
2812 AttributeHeader ah(TattrId, TattrNoOfWords << 2);
2813 TdataForUpdate[0]= ah.m_value;
2814 TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
2815 TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
2816 Tlen= TattrNoOfWords + 1;
2817 if (Toptype == ZUPDATE) {
2818 if (TattrNoOfWords <= 2) {
2819 if (TattrNoOfWords == 1) {
2820 // arithmetic conversion if big-endian
2821 Int64 * tmp = new (&TregMemBuffer[theRegister + 2]) Int64;
2822 TdataForUpdate[1] = Uint32(* tmp);
2823 TdataForUpdate[2] = 0;
2824 }
2825 if (TregType == 0) {
2826 /* --------------------------------------------------------- */
2827 // Write a NULL value into the attribute
2828 /* --------------------------------------------------------- */
2829 ah.setNULL();
2830 TdataForUpdate[0]= ah.m_value;
2831 Tlen= 1;
2832 }
2833 int TnoDataRW= updateAttributes(req_struct,
2834 &TdataForUpdate[0],
2835 Tlen);
2836 if (TnoDataRW >= 0) {
2837 /* --------------------------------------------------------- */
2838 // Write the written data also into the log buffer so that it
2839 // will be logged.
2840 /* --------------------------------------------------------- */
2841 logMemory[TdataWritten + 0]= TdataForUpdate[0];
2842 logMemory[TdataWritten + 1]= TdataForUpdate[1];
2843 logMemory[TdataWritten + 2]= TdataForUpdate[2];
2844 TdataWritten += Tlen;
2845 } else {
2846 terrorCode = Uint32(-TnoDataRW);
2847 tupkeyErrorLab(req_struct);
2848 return -1;
2849 }
2850 } else {
2851 return TUPKEY_abort(req_struct, 15);
2852 }
2853 } else {
2854 return TUPKEY_abort(req_struct, 16);
2855 }
2856 break;
2857 }
2858
2859 case Interpreter::LOAD_CONST_NULL:
2860 jam();
2861 TregMemBuffer[theRegister]= 0; /* NULL INDICATOR */
2862 break;
2863
2864 case Interpreter::LOAD_CONST16:
2865 jam();
2866 TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
2867 * (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
2868 break;
2869
2870 case Interpreter::LOAD_CONST32:
2871 jam();
2872 TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
2873 * (Int64*)(TregMemBuffer+theRegister+2)= *
2874 (TcurrentProgram+TprogramCounter);
2875 TprogramCounter++;
2876 break;
2877
2878 case Interpreter::LOAD_CONST64:
2879 jam();
2880 TregMemBuffer[theRegister]= 0x60; /* 64 BIT UNSIGNED CONSTANT */
2881 TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
2882 TprogramCounter++);
2883 TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
2884 TprogramCounter++);
2885 break;
2886
2887 case Interpreter::ADD_REG_REG:
2888 jam();
2889 {
2890 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2891 Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2892
2893 Uint32 TrightType= TregMemBuffer[TrightRegister];
2894 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2895
2896
2897 Uint32 TleftType= TregMemBuffer[theRegister];
2898 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2899
2900 if ((TleftType | TrightType) != 0) {
2901 Uint64 Tdest0= Tleft0 + Tright0;
2902 * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2903 TregMemBuffer[TdestRegister]= 0x60;
2904 } else {
2905 return TUPKEY_abort(req_struct, 20);
2906 }
2907 break;
2908 }
2909
2910 case Interpreter::SUB_REG_REG:
2911 jam();
2912 {
2913 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2914 Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2915
2916 Uint32 TrightType= TregMemBuffer[TrightRegister];
2917 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2918
2919 Uint32 TleftType= TregMemBuffer[theRegister];
2920 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2921
2922 if ((TleftType | TrightType) != 0) {
2923 Int64 Tdest0= Tleft0 - Tright0;
2924 * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2925 TregMemBuffer[TdestRegister]= 0x60;
2926 } else {
2927 return TUPKEY_abort(req_struct, 20);
2928 }
2929 break;
2930 }
2931
2932 case Interpreter::BRANCH:
2933 TprogramCounter= brancher(theInstruction, TprogramCounter);
2934 break;
2935
2936 case Interpreter::BRANCH_REG_EQ_NULL:
2937 if (TregMemBuffer[theRegister] != 0) {
2938 jam();
2939 continue;
2940 } else {
2941 jam();
2942 TprogramCounter= brancher(theInstruction, TprogramCounter);
2943 }
2944 break;
2945
2946 case Interpreter::BRANCH_REG_NE_NULL:
2947 if (TregMemBuffer[theRegister] == 0) {
2948 jam();
2949 continue;
2950 } else {
2951 jam();
2952 TprogramCounter= brancher(theInstruction, TprogramCounter);
2953 }
2954 break;
2955
2956
2957 case Interpreter::BRANCH_EQ_REG_REG:
2958 {
2959 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2960
2961 Uint32 TleftType= TregMemBuffer[theRegister];
2962 Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2963 Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2964
2965 Uint32 TrightType= TregMemBuffer[TrightRegister];
2966 Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2967 Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2968 if ((TrightType | TleftType) != 0) {
2969 jam();
2970 if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
2971 TprogramCounter= brancher(theInstruction, TprogramCounter);
2972 }
2973 } else {
2974 return TUPKEY_abort(req_struct, 23);
2975 }
2976 break;
2977 }
2978
2979 case Interpreter::BRANCH_NE_REG_REG:
2980 {
2981 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2982
2983 Uint32 TleftType= TregMemBuffer[theRegister];
2984 Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2985 Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2986
2987 Uint32 TrightType= TregMemBuffer[TrightRegister];
2988 Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2989 Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2990 if ((TrightType | TleftType) != 0) {
2991 jam();
2992 if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
2993 TprogramCounter= brancher(theInstruction, TprogramCounter);
2994 }
2995 } else {
2996 return TUPKEY_abort(req_struct, 24);
2997 }
2998 break;
2999 }
3000
3001 case Interpreter::BRANCH_LT_REG_REG:
3002 {
3003 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3004
3005 Uint32 TrightType= TregMemBuffer[TrightRegister];
3006 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3007
3008 Uint32 TleftType= TregMemBuffer[theRegister];
3009 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3010
3011
3012 if ((TrightType | TleftType) != 0) {
3013 jam();
3014 if (Tleft0 < Tright0) {
3015 TprogramCounter= brancher(theInstruction, TprogramCounter);
3016 }
3017 } else {
3018 return TUPKEY_abort(req_struct, 24);
3019 }
3020 break;
3021 }
3022
3023 case Interpreter::BRANCH_LE_REG_REG:
3024 {
3025 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3026
3027 Uint32 TrightType= TregMemBuffer[TrightRegister];
3028 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3029
3030 Uint32 TleftType= TregMemBuffer[theRegister];
3031 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3032
3033
3034 if ((TrightType | TleftType) != 0) {
3035 jam();
3036 if (Tleft0 <= Tright0) {
3037 TprogramCounter= brancher(theInstruction, TprogramCounter);
3038 }
3039 } else {
3040 return TUPKEY_abort(req_struct, 26);
3041 }
3042 break;
3043 }
3044
3045 case Interpreter::BRANCH_GT_REG_REG:
3046 {
3047 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3048
3049 Uint32 TrightType= TregMemBuffer[TrightRegister];
3050 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3051
3052 Uint32 TleftType= TregMemBuffer[theRegister];
3053 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3054
3055
3056 if ((TrightType | TleftType) != 0) {
3057 jam();
3058 if (Tleft0 > Tright0){
3059 TprogramCounter= brancher(theInstruction, TprogramCounter);
3060 }
3061 } else {
3062 return TUPKEY_abort(req_struct, 27);
3063 }
3064 break;
3065 }
3066
3067 case Interpreter::BRANCH_GE_REG_REG:
3068 {
3069 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3070
3071 Uint32 TrightType= TregMemBuffer[TrightRegister];
3072 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3073
3074 Uint32 TleftType= TregMemBuffer[theRegister];
3075 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3076
3077
3078 if ((TrightType | TleftType) != 0) {
3079 jam();
3080 if (Tleft0 >= Tright0){
3081 TprogramCounter= brancher(theInstruction, TprogramCounter);
3082 }
3083 } else {
3084 return TUPKEY_abort(req_struct, 28);
3085 }
3086 break;
3087 }
3088
3089 case Interpreter::BRANCH_ATTR_OP_ARG_2:
3090 case Interpreter::BRANCH_ATTR_OP_ARG:{
3091 jam();
3092 Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
3093 Uint32 ins2 = TcurrentProgram[TprogramCounter];
3094 Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
3095 Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
3096 Uint32 step = argLen;
3097
3098 if(tmpHabitant != attrId){
3099 Int32 TnoDataR = readAttributes(req_struct,
3100 &attrId, 1,
3101 tmpArea, tmpAreaSz,
3102 false);
3103
3104 if (TnoDataR < 0) {
3105 jam();
3106 terrorCode = Uint32(-TnoDataR);
3107 tupkeyErrorLab(req_struct);
3108 return -1;
3109 }
3110 tmpHabitant= attrId;
3111 }
3112
3113 // get type
3114 attrId >>= 16;
3115 Uint32 TattrDescrIndex = req_struct->tablePtrP->tabDescriptor +
3116 (attrId << ZAD_LOG_SIZE);
3117 Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
3118 Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
3119 Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
3120 void * cs = 0;
3121 if(AttributeOffset::getCharsetFlag(TattrDesc2))
3122 {
3123 Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
3124 cs = req_struct->tablePtrP->charsetArray[pos];
3125 }
3126 const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
3127
3128 // get data
3129 AttributeHeader ah(tmpArea[0]);
3130 const char* s1 = (char*)&tmpArea[1];
3131 const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
3132 // fixed length in 5.0
3133 Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
3134
3135 if (Interpreter::getOpCode(theInstruction) ==
3136 Interpreter::BRANCH_ATTR_OP_ARG_2)
3137 {
3138 jam();
3139 Uint32 paramNo = Interpreter::getBranchCol_ParamNo(ins2);
3140 const Uint32 * paramptr = lookupInterpreterParameter(paramNo,
3141 subroutineProg,
3142 TsubroutineLen);
3143 if (unlikely(paramptr == 0))
3144 {
3145 jam();
3146 terrorCode = 99; // TODO
3147 tupkeyErrorLab(req_struct);
3148 return -1;
3149 }
3150
3151 argLen = AttributeHeader::getByteSize(* paramptr);
3152 step = 0;
3153 s2 = (char*)(paramptr + 1);
3154 }
3155
3156 if (typeId == NDB_TYPE_BIT)
3157 {
3158 /* Size in bytes for bit fields can be incorrect due to
3159 * rounding down
3160 */
3161 Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
3162 + 7) / 8;
3163 attrLen= bitFieldAttrLen;
3164 }
3165
3166 bool r1_null = ah.isNULL();
3167 bool r2_null = argLen == 0;
3168 int res1;
3169 if (cond <= Interpreter::GE)
3170 {
3171 /* Inequality - EQ, NE, LT, LE, GT, GE */
3172 if (r1_null || r2_null) {
3173 // NULL==NULL and NULL<not-NULL
3174 res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
3175 } else {
3176 jam();
3177 if (unlikely(sqlType.m_cmp == 0))
3178 {
3179 return TUPKEY_abort(req_struct, 40);
3180 }
3181 res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen);
3182 }
3183 } else {
3184 if ((cond == Interpreter::LIKE) ||
3185 (cond == Interpreter::NOT_LIKE))
3186 {
3187 if (r1_null || r2_null) {
3188 // NULL like NULL is true (has no practical use)
3189 res1 = r1_null && r2_null ? 0 : -1;
3190 } else {
3191 jam();
3192 if (unlikely(sqlType.m_like == 0))
3193 {
3194 return TUPKEY_abort(req_struct, 40);
3195 }
3196 res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
3197 }
3198 }
3199 else
3200 {
3201 /* AND_XX_MASK condition */
3202 ndbassert(cond <= Interpreter::AND_NE_ZERO);
3203 if (unlikely(sqlType.m_mask == 0))
3204 {
3205 return TUPKEY_abort(req_struct,40);
3206 }
3207 /* If either arg is NULL, we say COL AND MASK
3208 * NE_ZERO and NE_MASK.
3209 */
3210 if (r1_null || r2_null) {
3211 res1= 1;
3212 } else {
3213
3214 bool cmpZero=
3215 (cond == Interpreter::AND_EQ_ZERO) ||
3216 (cond == Interpreter::AND_NE_ZERO);
3217
3218 res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
3219 }
3220 }
3221 }
3222
3223 int res = 0;
3224 switch ((Interpreter::BinaryCondition)cond) {
3225 case Interpreter::EQ:
3226 res = (res1 == 0);
3227 break;
3228 case Interpreter::NE:
3229 res = (res1 != 0);
3230 break;
3231 // note the condition is backwards
3232 case Interpreter::LT:
3233 res = (res1 > 0);
3234 break;
3235 case Interpreter::LE:
3236 res = (res1 >= 0);
3237 break;
3238 case Interpreter::GT:
3239 res = (res1 < 0);
3240 break;
3241 case Interpreter::GE:
3242 res = (res1 <= 0);
3243 break;
3244 case Interpreter::LIKE:
3245 res = (res1 == 0);
3246 break;
3247 case Interpreter::NOT_LIKE:
3248 res = (res1 == 1);
3249 break;
3250 case Interpreter::AND_EQ_MASK:
3251 res = (res1 == 0);
3252 break;
3253 case Interpreter::AND_NE_MASK:
3254 res = (res1 != 0);
3255 break;
3256 case Interpreter::AND_EQ_ZERO:
3257 res = (res1 == 0);
3258 break;
3259 case Interpreter::AND_NE_ZERO:
3260 res = (res1 != 0);
3261 break;
3262 // XXX handle invalid value
3263 }
3264 #ifdef TRACE_INTERPRETER
3265 ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
3266 cond, attrId >> 16,
3267 attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
3268 #endif
3269 if (res)
3270 TprogramCounter = brancher(theInstruction, TprogramCounter);
3271 else
3272 {
3273 Uint32 tmp = ((step + 3) >> 2) + 1;
3274 TprogramCounter += tmp;
3275 }
3276 break;
3277 }
3278
3279 case Interpreter::BRANCH_ATTR_EQ_NULL:{
3280 jam();
3281 Uint32 ins2= TcurrentProgram[TprogramCounter];
3282 Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3283
3284 if (tmpHabitant != attrId){
3285 Int32 TnoDataR= readAttributes(req_struct,
3286 &attrId, 1,
3287 tmpArea, tmpAreaSz,
3288 false);
3289
3290 if (TnoDataR < 0) {
3291 jam();
3292 terrorCode = Uint32(-TnoDataR);
3293 tupkeyErrorLab(req_struct);
3294 return -1;
3295 }
3296 tmpHabitant= attrId;
3297 }
3298
3299 AttributeHeader ah(tmpArea[0]);
3300 if (ah.isNULL()){
3301 TprogramCounter= brancher(theInstruction, TprogramCounter);
3302 } else {
3303 TprogramCounter ++;
3304 }
3305 break;
3306 }
3307
3308 case Interpreter::BRANCH_ATTR_NE_NULL:{
3309 jam();
3310 Uint32 ins2= TcurrentProgram[TprogramCounter];
3311 Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3312
3313 if (tmpHabitant != attrId){
3314 Int32 TnoDataR= readAttributes(req_struct,
3315 &attrId, 1,
3316 tmpArea, tmpAreaSz,
3317 false);
3318
3319 if (TnoDataR < 0) {
3320 jam();
3321 terrorCode = Uint32(-TnoDataR);
3322 tupkeyErrorLab(req_struct);
3323 return -1;
3324 }
3325 tmpHabitant= attrId;
3326 }
3327
3328 AttributeHeader ah(tmpArea[0]);
3329 if (ah.isNULL()){
3330 TprogramCounter ++;
3331 } else {
3332 TprogramCounter= brancher(theInstruction, TprogramCounter);
3333 }
3334 break;
3335 }
3336
3337 case Interpreter::EXIT_OK:
3338 jam();
3339 #ifdef TRACE_INTERPRETER
3340 ndbout_c(" - exit_ok");
3341 #endif
3342 return TdataWritten;
3343
3344 case Interpreter::EXIT_OK_LAST:
3345 jam();
3346 #ifdef TRACE_INTERPRETER
3347 ndbout_c(" - exit_ok_last");
3348 #endif
3349 req_struct->last_row= true;
3350 return TdataWritten;
3351
3352 case Interpreter::EXIT_REFUSE:
3353 jam();
3354 #ifdef TRACE_INTERPRETER
3355 ndbout_c(" - exit_nok");
3356 #endif
3357 terrorCode= theInstruction >> 16;
3358 return TUPKEY_abort(req_struct, 29);
3359
3360 case Interpreter::CALL:
3361 jam();
3362 #ifdef TRACE_INTERPRETER
3363 ndbout_c(" - call addr=%u, subroutine len=%u ret addr=%u",
3364 theInstruction >> 16, TsubroutineLen, TprogramCounter);
3365 #endif
3366 RstackPtr++;
3367 if (RstackPtr < 32) {
3368 TstackMemBuffer[RstackPtr]= TprogramCounter;
3369 TprogramCounter= theInstruction >> 16;
3370 if (TprogramCounter < TsubroutineLen) {
3371 TcurrentProgram= subroutineProg;
3372 TcurrentSize= TsubroutineLen;
3373 } else {
3374 return TUPKEY_abort(req_struct, 30);
3375 }
3376 } else {
3377 return TUPKEY_abort(req_struct, 31);
3378 }
3379 break;
3380
3381 case Interpreter::RETURN:
3382 jam();
3383 #ifdef TRACE_INTERPRETER
3384 ndbout_c(" - return to %u from stack level %u",
3385 TstackMemBuffer[RstackPtr],
3386 RstackPtr);
3387 #endif
3388 if (RstackPtr > 0) {
3389 TprogramCounter= TstackMemBuffer[RstackPtr];
3390 RstackPtr--;
3391 if (RstackPtr == 0) {
3392 jam();
3393 /* ------------------------------------------------------------- */
3394 // We are back to the main program.
3395 /* ------------------------------------------------------------- */
3396 TcurrentProgram= mainProgram;
3397 TcurrentSize= TmainProgLen;
3398 }
3399 } else {
3400 return TUPKEY_abort(req_struct, 32);
3401 }
3402 break;
3403
3404 default:
3405 return TUPKEY_abort(req_struct, 33);
3406 }
3407 } else {
3408 return TUPKEY_abort(req_struct, 34);
3409 }
3410 }
3411 return TUPKEY_abort(req_struct, 35);
3412 }
3413
3414 /**
3415 * expand_var_part - copy packed variable attributes to fully expanded size
3416 *
3417 * dst: where to start writing attribute data
3418 * dst_off_ptr where to write attribute offsets
3419 * src pointer to packed attributes
3420 * tabDesc array of attribute descriptors (used for getting max size)
3421 * no_of_attr no of atributes to expand
3422 */
3423 static
3424 Uint32*
expand_var_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,const Uint32 * tabDesc,const Uint16 * order)3425 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
3426 const Uint32* src,
3427 const Uint32 * tabDesc,
3428 const Uint16* order)
3429 {
3430 char* dst_ptr= dst->m_data_ptr;
3431 Uint32 no_attr= dst->m_var_len_offset;
3432 Uint16* dst_off_ptr= dst->m_offset_array_ptr;
3433 Uint16* dst_len_ptr= dst_off_ptr + no_attr;
3434 const Uint16* src_off_ptr= (const Uint16*)src;
3435 const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
3436
3437 Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
3438 for(Uint32 i = 0; i<no_attr; i++)
3439 {
3440 next_pos= *src_off_ptr++;
3441 len= next_pos - tmp;
3442
3443 *dst_off_ptr++ = dst_off;
3444 *dst_len_ptr++ = dst_off + len;
3445 memcpy(dst_ptr, src_ptr, len);
3446 src_ptr += len;
3447
3448 max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
3449 dst_ptr += max_len; // Max size
3450 dst_off += max_len;
3451
3452 tmp= next_pos;
3453 }
3454
3455 return ALIGN_WORD(dst_ptr);
3456 }
3457
3458 void
expand_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],Tuple_header * src,const Tablerec * tabPtrP,bool disk)3459 Dbtup::expand_tuple(KeyReqStruct* req_struct,
3460 Uint32 sizes[2],
3461 Tuple_header* src,
3462 const Tablerec* tabPtrP,
3463 bool disk)
3464 {
3465 Uint32 bits= src->m_header_bits;
3466 Uint32 extra_bits = bits;
3467 Tuple_header* ptr= req_struct->m_tuple_ptr;
3468
3469 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3470 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3471 Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3472 Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3473 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3474 Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
3475 Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3476
3477 Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3478 const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
3479 const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
3480 const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
3481 const Uint32 *desc= (Uint32*)req_struct->attr_descr;
3482 const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3483 order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
3484
3485 // Copy fix part
3486 sizes[MM]= 1;
3487 memcpy(ptr, src, 4*fix_size);
3488 if(mm_vars || mm_dyns)
3489 {
3490 /*
3491 * Reserve place for initial length word and offset array (with one extra
3492 * offset). This will be filled-in in later, in shrink_tuple().
3493 */
3494 dst_ptr += Varpart_copy::SZ32;
3495
3496 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3497 Uint32 step; // in bytes
3498 Uint32 src_len;
3499 const Uint32 *src_data;
3500 if (bits & Tuple_header::VAR_PART)
3501 {
3502 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3503 if(! (bits & Tuple_header::COPY_TUPLE))
3504 {
3505 /* This is for the initial expansion of a stored row. */
3506 Ptr<Page> var_page;
3507 src_data= get_ptr(&var_page, *var_ref);
3508 src_len= get_len(&var_page, *var_ref);
3509 sizes[MM]= src_len;
3510 step= 0;
3511 req_struct->m_varpart_page_ptr = var_page;
3512
3513 /* An original tuple cant have grown as we're expanding it...
3514 * else we would be "re-expand"*/
3515 ndbassert(! (bits & Tuple_header::MM_GROWN));
3516 }
3517 else
3518 {
3519 /* This is for the re-expansion of a shrunken row (update2 ...) */
3520
3521 Varpart_copy* vp = (Varpart_copy*)src_ptr;
3522 src_len = vp->m_len;
3523 src_data= vp->m_data;
3524 step= (Varpart_copy::SZ32 + src_len); // 1+ is for extra word
3525 req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
3526 sizes[MM]= src_len;
3527 }
3528
3529 if (mm_vars)
3530 {
3531 dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
3532 dst->m_offset_array_ptr= req_struct->var_pos_array;
3533 dst->m_var_len_offset= mm_vars;
3534 dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
3535
3536 dst_ptr= expand_var_part(dst, src_data, desc, order);
3537 ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
3538 /**
3539 * Move to end of fix varpart
3540 */
3541 char* varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3542 Uint32 varlen = ((Uint16*)src_data)[mm_vars];
3543 Uint32 *dynstart = ALIGN_WORD(varstart + varlen);
3544
3545 ndbassert(src_len >= (dynstart - src_data));
3546 src_len -= Uint32(dynstart - src_data);
3547 src_data = dynstart;
3548 }
3549 }
3550 else
3551 {
3552 /**
3553 * No varpart...only allowed for dynattr
3554 */
3555 ndbassert(mm_vars == 0);
3556 src_len = step = sizes[MM] = 0;
3557 src_data = 0;
3558 }
3559
3560 if (mm_dyns)
3561 {
3562 /**
3563 * dynattr needs to be expanded even if no varpart existed before
3564 */
3565 dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
3566 dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
3567 dst->m_max_dyn_offset= tabPtrP->m_offsets[MM].m_max_dyn_offset;
3568 dst->m_dyn_data_ptr= (char*)dst_ptr;
3569 dst_ptr= expand_dyn_part(dst, src_data,
3570 src_len,
3571 desc, order + mm_vars,
3572 mm_dynvar, mm_dynfix,
3573 tabPtrP->m_offsets[MM].m_dyn_null_words);
3574 }
3575
3576 ndbassert((UintPtr(src_ptr) & 3) == 0);
3577 src_ptr = src_ptr + step;
3578 }
3579
3580 src->m_header_bits= bits &
3581 ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
3582
3583 sizes[DD]= 0;
3584 if(disk && dd_tot)
3585 {
3586 const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3587 order+= mm_vars+mm_dynvar+mm_dynfix;
3588
3589 if(bits & Tuple_header::DISK_INLINE)
3590 {
3591 // Only on copy tuple
3592 ndbassert(bits & Tuple_header::COPY_TUPLE);
3593 }
3594 else
3595 {
3596 Local_key key;
3597 memcpy(&key, disk_ref, sizeof(key));
3598 key.m_page_no= req_struct->m_disk_page_ptr.i;
3599 src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3600 }
3601 extra_bits |= Tuple_header::DISK_INLINE;
3602
3603 // Fix diskpart
3604 req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
3605 memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3606 sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3607
3608 ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3609
3610 ndbrequire(dd_vars == 0);
3611 }
3612
3613 ptr->m_header_bits= (extra_bits | Tuple_header::COPY_TUPLE);
3614 req_struct->is_expanded= true;
3615 }
3616
3617 void
dump_tuple(const KeyReqStruct * req_struct,const Tablerec * tabPtrP)3618 Dbtup::dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP)
3619 {
3620 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3621 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3622 //Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3623 const Tuple_header* ptr= req_struct->m_tuple_ptr;
3624 Uint32 bits= ptr->m_header_bits;
3625 const Uint32 *tuple_words= (Uint32 *)ptr;
3626 const Uint32 *fix_p;
3627 Uint32 fix_len;
3628 const Uint32 *var_p;
3629 Uint32 var_len;
3630 //const Uint32 *disk_p;
3631 //Uint32 disk_len;
3632 const char *typ;
3633
3634 fix_p= tuple_words;
3635 fix_len= tabPtrP->m_offsets[MM].m_fix_header_size;
3636 if(req_struct->is_expanded)
3637 {
3638 typ= "expanded";
3639 var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3640 var_len= 0; // No dump of varpart in expanded
3641 #if 0
3642 disk_p= (Uint32 *)req_struct->m_disk_ptr;
3643 disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3644 #endif
3645 }
3646 else if(! (bits & Tuple_header::COPY_TUPLE))
3647 {
3648 typ= "stored";
3649 if(mm_vars+mm_dyns)
3650 {
3651 //const KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3652 const Var_part_ref *varref= ptr->get_var_part_ref_ptr(tabPtrP);
3653 Ptr<Page> tmp;
3654 var_p= get_ptr(&tmp, * varref);
3655 var_len= get_len(&tmp, * varref);
3656 }
3657 else
3658 {
3659 var_p= 0;
3660 var_len= 0;
3661 }
3662 #if 0
3663 if(dd_tot)
3664 {
3665 Local_key key;
3666 memcpy(&key, ptr->get_disk_ref_ptr(tabPtrP), sizeof(key));
3667 key.m_page_no= req_struct->m_disk_page_ptr.i;
3668 disk_p= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3669 disk_len= tabPtrP->m_offsets[DD].m_fix_header_size;
3670 }
3671 else
3672 {
3673 disk_p= var_p;
3674 disk_len= 0;
3675 }
3676 #endif
3677 }
3678 else
3679 {
3680 typ= "shrunken";
3681 if(mm_vars+mm_dyns)
3682 {
3683 var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3684 var_len= *((Uint16 *)var_p) + 1;
3685 }
3686 else
3687 {
3688 var_p= 0;
3689 var_len= 0;
3690 }
3691 #if 0
3692 disk_p= (Uint32 *)(req_struct->m_disk_ptr);
3693 disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3694 #endif
3695 }
3696 ndbout_c("Fixed part[%s](%p len=%u words)",typ, fix_p, fix_len);
3697 dump_hex(fix_p, fix_len);
3698 ndbout_c("Varpart part[%s](%p len=%u words)", typ , var_p, var_len);
3699 dump_hex(var_p, var_len);
3700 #if 0
3701 ndbout_c("Disk part[%s](%p len=%u words)", typ, disk_p, disk_len);
3702 dump_hex(disk_p, disk_len);
3703 #endif
3704 }
3705
3706 void
prepare_read(KeyReqStruct * req_struct,Tablerec * tabPtrP,bool disk)3707 Dbtup::prepare_read(KeyReqStruct* req_struct,
3708 Tablerec* tabPtrP, bool disk)
3709 {
3710 Tuple_header* ptr= req_struct->m_tuple_ptr;
3711
3712 Uint32 bits= ptr->m_header_bits;
3713 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3714 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3715 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3716
3717 const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3718 const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
3719 const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
3720 if(mm_vars || mm_dyns)
3721 {
3722 const Uint32 *src_data= src_ptr;
3723 Uint32 src_len;
3724 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3725 if (bits & Tuple_header::VAR_PART)
3726 {
3727 if(! (bits & Tuple_header::COPY_TUPLE))
3728 {
3729 Ptr<Page> tmp;
3730 src_data= get_ptr(&tmp, * var_ref);
3731 src_len= get_len(&tmp, * var_ref);
3732
3733 /* If the original tuple was grown,
3734 * the old size is stored at the end. */
3735 if(bits & Tuple_header::MM_GROWN)
3736 {
3737 /**
3738 * This is when triggers read before value of update
3739 * when original has been reallocated due to grow
3740 */
3741 ndbassert(src_len>0);
3742 src_len= src_data[src_len-1];
3743 }
3744 }
3745 else
3746 {
3747 Varpart_copy* vp = (Varpart_copy*)src_ptr;
3748 src_len = vp->m_len;
3749 src_data = vp->m_data;
3750 src_ptr++;
3751 }
3752
3753 char* varstart;
3754 Uint32 varlen;
3755 const Uint32* dynstart;
3756 if (mm_vars)
3757 {
3758 varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3759 varlen = ((Uint16*)src_data)[mm_vars];
3760 dynstart = ALIGN_WORD(varstart + varlen);
3761 }
3762 else
3763 {
3764 varstart = 0;
3765 varlen = 0;
3766 dynstart = src_data;
3767 }
3768
3769 dst->m_data_ptr= varstart;
3770 dst->m_offset_array_ptr= (Uint16*)src_data;
3771 dst->m_var_len_offset= 1;
3772 dst->m_max_var_offset= varlen;
3773
3774 Uint32 dynlen = Uint32(src_len - (dynstart - src_data));
3775 ndbassert(src_len >= (dynstart - src_data));
3776 dst->m_dyn_data_ptr= (char*)dynstart;
3777 dst->m_dyn_part_len= dynlen;
3778 // Do or not to to do
3779 // dst->m_dyn_offset_arr_ptr = dynlen ? (Uint16*)(dynstart + *(Uint8*)dynstart) : 0;
3780
3781 /*
3782 dst->m_dyn_offset_arr_ptr and dst->m_dyn_len_offset are not used for
3783 reading the stored/shrunken format.
3784 */
3785 }
3786 else
3787 {
3788 src_len = 0;
3789 dst->m_max_var_offset = 0;
3790 dst->m_dyn_part_len = 0;
3791 #if defined VM_TRACE || defined ERROR_INSERT
3792 bzero(dst, sizeof(* dst));
3793 #endif
3794 }
3795
3796 // disk part start after dynamic part.
3797 src_ptr+= src_len;
3798 }
3799
3800 if(disk && dd_tot)
3801 {
3802 const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3803
3804 if(bits & Tuple_header::DISK_INLINE)
3805 {
3806 // Only on copy tuple
3807 ndbassert(bits & Tuple_header::COPY_TUPLE);
3808 }
3809 else
3810 {
3811 // XXX
3812 Local_key key;
3813 memcpy(&key, disk_ref, sizeof(key));
3814 key.m_page_no= req_struct->m_disk_page_ptr.i;
3815 src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3816 }
3817 // Fix diskpart
3818 req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
3819 ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3820 ndbrequire(dd_vars == 0);
3821 }
3822
3823 req_struct->is_expanded= false;
3824 }
3825
3826 void
shrink_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],const Tablerec * tabPtrP,bool disk)3827 Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
3828 const Tablerec* tabPtrP, bool disk)
3829 {
3830 ndbassert(tabPtrP->need_shrink());
3831 Tuple_header* ptr= req_struct->m_tuple_ptr;
3832 ndbassert(ptr->m_header_bits & Tuple_header::COPY_TUPLE);
3833
3834 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3835 Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3836 const Uint32 * tabDesc= (Uint32*)req_struct->attr_descr;
3837 const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3838 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3839 Uint16 mm_fix= tabPtrP->m_attributes[MM].m_no_of_fixsize;
3840 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3841 Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3842 Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3843 Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3844 Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3845
3846 Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3847 Uint16* src_off_ptr= req_struct->var_pos_array;
3848 order += mm_fix;
3849
3850 sizes[MM] = 1;
3851 sizes[DD] = 0;
3852 if(mm_vars || mm_dyns)
3853 {
3854 Varpart_copy* vp = (Varpart_copy*)dst_ptr;
3855 Uint32* varstart = dst_ptr = vp->m_data;
3856
3857 if (mm_vars)
3858 {
3859 Uint16* dst_off_ptr= (Uint16*)dst_ptr;
3860 char* dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
3861 char* src_data_ptr= dst_data_ptr;
3862 Uint32 off= 0;
3863 for(Uint32 i= 0; i<mm_vars; i++)
3864 {
3865 const char* data_ptr= src_data_ptr + *src_off_ptr;
3866 Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
3867 * dst_off_ptr++= off;
3868 memmove(dst_data_ptr, data_ptr, len);
3869 off += len;
3870 src_off_ptr++;
3871 dst_data_ptr += len;
3872 }
3873 *dst_off_ptr= off;
3874 dst_ptr = ALIGN_WORD(dst_data_ptr);
3875 order += mm_vars; // Point to first dynfix entry
3876 }
3877
3878 if (mm_dyns)
3879 {
3880 dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
3881 order, mm_dynvar, mm_dynfix, MM);
3882 ndbassert((char*)dst_ptr <= ((char*)ptr) + 8192);
3883 order += mm_dynfix + mm_dynvar;
3884 }
3885
3886 Uint32 varpart_len= Uint32(dst_ptr - varstart);
3887 vp->m_len = varpart_len;
3888 sizes[MM] = varpart_len;
3889 ptr->m_header_bits |= (varpart_len) ? Tuple_header::VAR_PART : 0;
3890
3891 ndbassert((UintPtr(ptr) & 3) == 0);
3892 ndbassert(varpart_len < 0x10000);
3893 }
3894
3895 if(disk && dd_tot)
3896 {
3897 Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
3898 req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
3899 ndbrequire(dd_vars == 0);
3900 sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3901 memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3902 }
3903
3904 req_struct->is_expanded= false;
3905
3906 }
3907
3908 void
validate_page(Tablerec * regTabPtr,Var_page * p)3909 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
3910 {
3911 /* ToDo: We could also do some checks here for any dynamic part. */
3912 Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
3913 Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
3914 Tuple_header::HeaderSize;
3915
3916 if(mm_vars == 0)
3917 return;
3918
3919 for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
3920 {
3921 FragrecordPtr fragPtr;
3922
3923 if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
3924 continue;
3925
3926 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3927 for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
3928 {
3929 Uint32 real= getRealpid(fragPtr.p, P);
3930 Var_page* page= (Var_page*)c_page_pool.getPtr(real);
3931
3932 for(Uint32 i=1; i<page->high_index; i++)
3933 {
3934 Uint32 idx= page->get_index_word(i);
3935 Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
3936 if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
3937 {
3938 Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
3939 Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
3940 if(! (ptr->m_header_bits & Tuple_header::COPY_TUPLE))
3941 {
3942 ndbassert(len == fix_sz + 1);
3943 Local_key tmp; tmp.assref(*part);
3944 Ptr<Page> tmpPage;
3945 part= get_ptr(&tmpPage, *(Var_part_ref*)part);
3946 len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
3947 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3948 ndbassert(len >= ((sz + 3) >> 2));
3949 }
3950 else
3951 {
3952 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3953 ndbassert(len >= ((sz+3)>>2)+fix_sz);
3954 }
3955 if(ptr->m_operation_ptr_i != RNIL)
3956 {
3957 c_operation_pool.getPtr(ptr->m_operation_ptr_i);
3958 }
3959 }
3960 else if(!(idx & Var_page::FREE))
3961 {
3962 /**
3963 * Chain
3964 */
3965 Uint32 *part= page->get_ptr(i);
3966 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3967 ndbassert(len >= ((sz + 3) >> 2));
3968 }
3969 else
3970 {
3971
3972 }
3973 }
3974 if(p == 0 && page->high_index > 1)
3975 page->reorg((Var_page*)ctemp_page);
3976 }
3977 }
3978
3979 if(p == 0)
3980 {
3981 validate_page(regTabPtr, (Var_page*)1);
3982 }
3983 }
3984
3985 int
handle_size_change_after_update(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Uint32 sizes[4])3986 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
3987 Tuple_header* org,
3988 Operationrec* regOperPtr,
3989 Fragrecord* regFragPtr,
3990 Tablerec* regTabPtr,
3991 Uint32 sizes[4])
3992 {
3993 ndbrequire(sizes[1] == sizes[3]);
3994 //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
3995 if(0)
3996 printf("%p %d %d - handle_size_change_after_update ",
3997 req_struct->m_tuple_ptr,
3998 regOperPtr->m_tuple_location.m_page_no,
3999 regOperPtr->m_tuple_location.m_page_idx);
4000
4001 Uint32 bits= org->m_header_bits;
4002 Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
4003
4004 if(sizes[2+MM] == sizes[MM])
4005 ;
4006 else if(sizes[2+MM] < sizes[MM])
4007 {
4008 if(0) ndbout_c("shrink");
4009 req_struct->m_tuple_ptr->m_header_bits= copy_bits|Tuple_header::MM_SHRINK;
4010 }
4011 else
4012 {
4013 if(0) printf("grow - ");
4014 Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
4015 Var_page* pageP= (Var_page*)pagePtr.p;
4016 Var_part_ref *refptr= org->get_var_part_ref_ptr(regTabPtr);
4017 ndbassert(! (bits & Tuple_header::COPY_TUPLE));
4018
4019 Local_key ref;
4020 refptr->copyout(&ref);
4021 Uint32 alloc;
4022 Uint32 idx= ref.m_page_idx;
4023 if (bits & Tuple_header::VAR_PART)
4024 {
4025 if (copy_bits & Tuple_header::COPY_TUPLE)
4026 {
4027 c_page_pool.getPtr(pagePtr, ref.m_page_no);
4028 pageP = (Var_page*)pagePtr.p;
4029 }
4030 alloc = pageP->get_entry_len(idx);
4031 }
4032 else
4033 {
4034 alloc = 0;
4035 }
4036 Uint32 orig_size= alloc;
4037 if(bits & Tuple_header::MM_GROWN)
4038 {
4039 /* Was grown before, so must fetch real original size from last word. */
4040 Uint32 *old_var_part= pageP->get_ptr(idx);
4041 ndbassert(alloc>0);
4042 orig_size= old_var_part[alloc-1];
4043 }
4044
4045 if (alloc)
4046 {
4047 #ifdef VM_TRACE
4048 if(!pageP->get_entry_chain(idx))
4049 ndbout << *pageP << endl;
4050 #endif
4051 ndbassert(pageP->get_entry_chain(idx));
4052 }
4053
4054 Uint32 needed= sizes[2+MM];
4055
4056 if(needed <= alloc)
4057 {
4058 //ndbassert(!regOperPtr->is_first_operation());
4059 if (0) ndbout_c(" no grow");
4060 return 0;
4061 }
4062 Uint32 *new_var_part=realloc_var_part(&terrorCode,
4063 regFragPtr, regTabPtr, pagePtr,
4064 refptr, alloc, needed);
4065 if (unlikely(new_var_part==NULL))
4066 return -1;
4067 /* Mark the tuple grown, store the original length at the end. */
4068 org->m_header_bits= bits | Tuple_header::MM_GROWN | Tuple_header::VAR_PART;
4069 new_var_part[needed-1]= orig_size;
4070
4071 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
4072 {
4073 jam();
4074 setChecksum(org, regTabPtr);
4075 }
4076 }
4077 return 0;
4078 }
4079
4080 int
optimize_var_part(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)4081 Dbtup::optimize_var_part(KeyReqStruct* req_struct,
4082 Tuple_header* org,
4083 Operationrec* regOperPtr,
4084 Fragrecord* regFragPtr,
4085 Tablerec* regTabPtr)
4086 {
4087 jam();
4088 Var_part_ref* refptr = org->get_var_part_ref_ptr(regTabPtr);
4089
4090 Local_key ref;
4091 refptr->copyout(&ref);
4092 Uint32 idx = ref.m_page_idx;
4093
4094 Ptr<Page> pagePtr;
4095 c_page_pool.getPtr(pagePtr, ref.m_page_no);
4096
4097 Var_page* pageP = (Var_page*)pagePtr.p;
4098 Uint32 var_part_size = pageP->get_entry_len(idx);
4099
4100 /**
4101 * if the size of page list_index is MAX_FREE_LIST,
4102 * we think it as full page, then need not optimize
4103 */
4104 if(pageP->list_index != MAX_FREE_LIST)
4105 {
4106 jam();
4107 /*
4108 * optimize var part of tuple by moving varpart,
4109 * then we possibly reclaim free pages
4110 */
4111 move_var_part(regFragPtr, regTabPtr, pagePtr,
4112 refptr, var_part_size);
4113
4114 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
4115 {
4116 jam();
4117 setChecksum(org, regTabPtr);
4118 }
4119 }
4120
4121 return 0;
4122 }
4123
4124 int
nr_update_gci(Uint32 fragPtrI,const Local_key * key,Uint32 gci)4125 Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4126 {
4127 FragrecordPtr fragPtr;
4128 fragPtr.i= fragPtrI;
4129 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4130 TablerecPtr tablePtr;
4131 tablePtr.i= fragPtr.p->fragTableId;
4132 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4133
4134 if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4135 {
4136 Local_key tmp = *key;
4137 PagePtr pagePtr;
4138
4139 Uint32 err;
4140 pagePtr.i = allocFragPage(&err, tablePtr.p, fragPtr.p, tmp.m_page_no);
4141 if (unlikely(pagePtr.i == RNIL))
4142 {
4143 return -(int)err;
4144 }
4145 c_page_pool.getPtr(pagePtr);
4146
4147 Tuple_header* ptr = (Tuple_header*)
4148 ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
4149
4150 ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
4151 *ptr->get_mm_gci(tablePtr.p) = gci;
4152 }
4153 return 0;
4154 }
4155
4156 int
nr_read_pk(Uint32 fragPtrI,const Local_key * key,Uint32 * dst,bool & copy)4157 Dbtup::nr_read_pk(Uint32 fragPtrI,
4158 const Local_key* key, Uint32* dst, bool& copy)
4159 {
4160
4161 FragrecordPtr fragPtr;
4162 fragPtr.i= fragPtrI;
4163 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4164 TablerecPtr tablePtr;
4165 tablePtr.i= fragPtr.p->fragTableId;
4166 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4167
4168 Local_key tmp = *key;
4169
4170 Uint32 err;
4171 PagePtr pagePtr;
4172 pagePtr.i = allocFragPage(&err, tablePtr.p, fragPtr.p, tmp.m_page_no);
4173 if (unlikely(pagePtr.i == RNIL))
4174 return -(int)err;
4175
4176 c_page_pool.getPtr(pagePtr);
4177 KeyReqStruct req_struct(this);
4178 Uint32* ptr= ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
4179
4180 req_struct.m_page_ptr = pagePtr;
4181 req_struct.m_tuple_ptr = (Tuple_header*)ptr;
4182 Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
4183
4184 int ret = 0;
4185 copy = false;
4186 if (! (bits & Tuple_header::FREE))
4187 {
4188 if (bits & Tuple_header::ALLOC)
4189 {
4190 Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
4191 Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
4192 ndbassert(!opPtrP->m_copy_tuple_location.isNull());
4193 req_struct.m_tuple_ptr=
4194 get_copy_tuple(&opPtrP->m_copy_tuple_location);
4195 copy = true;
4196 }
4197 req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
4198 req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
4199
4200 Uint32 num_attr= tablePtr.p->m_no_of_attributes;
4201 Uint32 descr_start= tablePtr.p->tabDescriptor;
4202 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
4203 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
4204 req_struct.attr_descr= tab_descr;
4205
4206 if (tablePtr.p->need_expand())
4207 prepare_read(&req_struct, tablePtr.p, false);
4208
4209 const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
4210 const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
4211 // read pk attributes from original tuple
4212
4213 req_struct.tablePtrP = tablePtr.p;
4214 req_struct.fragPtrP = fragPtr.p;
4215
4216 // do it
4217 ret = readAttributes(&req_struct,
4218 attrIds,
4219 numAttrs,
4220 dst,
4221 ZNIL, false);
4222
4223 // done
4224 if (likely(ret >= 0)) {
4225 // remove headers
4226 Uint32 n= 0;
4227 Uint32 i= 0;
4228 while (n < numAttrs) {
4229 const AttributeHeader ah(dst[i]);
4230 Uint32 size= ah.getDataSize();
4231 ndbrequire(size != 0);
4232 for (Uint32 j= 0; j < size; j++) {
4233 dst[i + j - n]= dst[i + j + 1];
4234 }
4235 n+= 1;
4236 i+= 1 + size;
4237 }
4238 ndbrequire((int)i == ret);
4239 ret -= numAttrs;
4240 } else {
4241 return ret;
4242 }
4243 }
4244
4245 if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4246 {
4247 dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
4248 }
4249 else
4250 {
4251 dst[ret] = 0;
4252 }
4253 return ret;
4254 }
4255
4256 #include <signaldata/TuxMaint.hpp>
4257
4258 int
nr_delete(Signal * signal,Uint32 senderData,Uint32 fragPtrI,const Local_key * key,Uint32 gci)4259 Dbtup::nr_delete(Signal* signal, Uint32 senderData,
4260 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4261 {
4262 FragrecordPtr fragPtr;
4263 fragPtr.i= fragPtrI;
4264 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4265 TablerecPtr tablePtr;
4266 tablePtr.i= fragPtr.p->fragTableId;
4267 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4268
4269 Local_key tmp = * key;
4270 tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no);
4271
4272 PagePtr pagePtr;
4273 Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
4274
4275 if (!tablePtr.p->tuxCustomTriggers.isEmpty())
4276 {
4277 jam();
4278 TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
4279 req->tableId = fragPtr.p->fragTableId;
4280 req->fragId = fragPtr.p->fragmentId;
4281 req->pageId = tmp.m_page_no;
4282 req->pageIndex = tmp.m_page_idx;
4283 req->tupVersion = ptr->get_tuple_version();
4284 req->opInfo = TuxMaintReq::OpRemove;
4285 removeTuxEntries(signal, tablePtr.p);
4286 }
4287
4288 Local_key disk;
4289 memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
4290
4291 if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
4292 tablePtr.p->m_attributes[MM].m_no_of_dynamic)
4293 {
4294 jam();
4295 free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
4296 } else {
4297 jam();
4298 free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
4299 }
4300
4301 if (tablePtr.p->m_no_of_disk_attributes)
4302 {
4303 jam();
4304
4305 Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4306 tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4307
4308 D("Logfile_client - nr_delete");
4309 Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4310 int res = lgman.alloc_log_space(sz);
4311 ndbrequire(res == 0);
4312
4313 /**
4314 * 1) alloc log buffer
4315 * 2) get page
4316 * 3) get log buffer
4317 * 4) delete tuple
4318 */
4319 Page_cache_client::Request preq;
4320 preq.m_page = disk;
4321 preq.m_callback.m_callbackData = senderData;
4322 preq.m_callback.m_callbackFunction =
4323 safe_cast(&Dbtup::nr_delete_page_callback);
4324 int flags = Page_cache_client::COMMIT_REQ;
4325
4326 #ifdef ERROR_INSERT
4327 if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
4328 {
4329 int rnd = rand() % 100;
4330 int slp = 0;
4331 if (ERROR_INSERTED(4024))
4332 {
4333 slp = 3000;
4334 }
4335 else if (rnd > 90)
4336 {
4337 slp = 3000;
4338 }
4339 else if (rnd > 70)
4340 {
4341 slp = 100;
4342 }
4343
4344 ndbout_c("rnd: %d slp: %d", rnd, slp);
4345
4346 if (slp)
4347 {
4348 flags |= Page_cache_client::DELAY_REQ;
4349 preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
4350 }
4351 }
4352 #endif
4353
4354 Page_cache_client pgman(this, c_pgman);
4355 res = pgman.get_page(signal, preq, flags);
4356 m_pgman_ptr = pgman.m_ptr;
4357 if (res == 0)
4358 {
4359 goto timeslice;
4360 }
4361 else if (unlikely(res == -1))
4362 {
4363 return -1;
4364 }
4365
4366 PagePtr disk_page = { (Tup_page*)m_pgman_ptr.p, m_pgman_ptr.i };
4367 disk_page_set_dirty(disk_page);
4368
4369 CallbackPtr cptr;
4370 cptr.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4371 cptr.m_callbackData = senderData;
4372 res= lgman.get_log_buffer(signal, sz, &cptr);
4373 switch(res){
4374 case 0:
4375 signal->theData[2] = disk_page.i;
4376 goto timeslice;
4377 case -1:
4378 ndbrequire("NOT YET IMPLEMENTED" == 0);
4379 break;
4380 }
4381
4382 if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
4383 disk_page_free(signal, tablePtr.p, fragPtr.p,
4384 &disk, *(PagePtr*)&disk_page, gci);
4385 return 0;
4386 }
4387
4388 return 0;
4389
4390 timeslice:
4391 memcpy(signal->theData, &disk, sizeof(disk));
4392 return 1;
4393 }
4394
4395 void
nr_delete_page_callback(Signal * signal,Uint32 userpointer,Uint32 page_id)4396 Dbtup::nr_delete_page_callback(Signal* signal,
4397 Uint32 userpointer, Uint32 page_id)//unused
4398 {
4399 Ptr<GlobalPage> gpage;
4400 m_global_page_pool.getPtr(gpage, page_id);
4401 PagePtr pagePtr= { (Tup_page*)gpage.p, gpage.i };
4402 disk_page_set_dirty(pagePtr);
4403 Dblqh::Nr_op_info op;
4404 op.m_ptr_i = userpointer;
4405 op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
4406 op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
4407 c_lqh->get_nr_op_info(&op, page_id);
4408
4409 Ptr<Fragrecord> fragPtr;
4410 fragPtr.i= op.m_tup_frag_ptr_i;
4411 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4412
4413 Ptr<Tablerec> tablePtr;
4414 tablePtr.i = fragPtr.p->fragTableId;
4415 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4416
4417 Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4418 tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4419
4420 CallbackPtr cb;
4421 cb.m_callbackData = userpointer;
4422 cb.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4423 D("Logfile_client - nr_delete_page_callback");
4424 Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4425 int res= lgman.get_log_buffer(signal, sz, &cb);
4426 switch(res){
4427 case 0:
4428 return;
4429 case -1:
4430 ndbrequire("NOT YET IMPLEMENTED" == 0);
4431 break;
4432 }
4433
4434 if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4435 disk_page_free(signal, tablePtr.p, fragPtr.p,
4436 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4437
4438 c_lqh->nr_delete_complete(signal, &op);
4439 return;
4440 }
4441
4442 void
nr_delete_log_buffer_callback(Signal * signal,Uint32 userpointer,Uint32 unused)4443 Dbtup::nr_delete_log_buffer_callback(Signal* signal,
4444 Uint32 userpointer,
4445 Uint32 unused)
4446 {
4447 Dblqh::Nr_op_info op;
4448 op.m_ptr_i = userpointer;
4449 c_lqh->get_nr_op_info(&op, RNIL);
4450
4451 Ptr<Fragrecord> fragPtr;
4452 fragPtr.i= op.m_tup_frag_ptr_i;
4453 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4454
4455 Ptr<Tablerec> tablePtr;
4456 tablePtr.i = fragPtr.p->fragTableId;
4457 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4458
4459 Ptr<GlobalPage> gpage;
4460 m_global_page_pool.getPtr(gpage, op.m_page_id);
4461 PagePtr pagePtr = { (Tup_page*)gpage.p, gpage.i };
4462
4463 /**
4464 * reset page no
4465 */
4466 if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4467
4468 disk_page_free(signal, tablePtr.p, fragPtr.p,
4469 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4470
4471 c_lqh->nr_delete_complete(signal, &op);
4472 }
4473