1 /* Copyright (c) 2003-2008 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17
18 #define DBTUP_C
19 #include <Dblqh.hpp>
20 #include "Dbtup.hpp"
21 #include <RefConvert.hpp>
22 #include <ndb_limits.h>
23 #include <pc.hpp>
24 #include <AttributeDescriptor.hpp>
25 #include "AttributeOffset.hpp"
26 #include <AttributeHeader.hpp>
27 #include <Interpreter.hpp>
28 #include <signaldata/TupKey.hpp>
29 #include <signaldata/AttrInfo.hpp>
30 #include <NdbSqlUtil.hpp>
31
32 /* ----------------------------------------------------------------- */
33 /* ----------- INIT_STORED_OPERATIONREC -------------- */
34 /* ----------------------------------------------------------------- */
initStoredOperationrec(Operationrec * regOperPtr,KeyReqStruct * req_struct,Uint32 storedId)35 int Dbtup::initStoredOperationrec(Operationrec* regOperPtr,
36 KeyReqStruct* req_struct,
37 Uint32 storedId)
38 {
39 jam();
40 StoredProcPtr storedPtr;
41 c_storedProcPool.getPtr(storedPtr, storedId);
42 if (storedPtr.i != RNIL) {
43 if (storedPtr.p->storedCode == ZSCAN_PROCEDURE) {
44 storedPtr.p->storedCounter++;
45 regOperPtr->firstAttrinbufrec= storedPtr.p->storedLinkFirst;
46 regOperPtr->lastAttrinbufrec= storedPtr.p->storedLinkLast;
47 regOperPtr->currentAttrinbufLen= storedPtr.p->storedProcLength;
48 req_struct->attrinfo_len= storedPtr.p->storedProcLength;
49 return ZOK;
50 }
51 }
52 terrorCode= ZSTORED_PROC_ID_ERROR;
53 return terrorCode;
54 }
55
copyAttrinfo(Operationrec * regOperPtr,Uint32 * inBuffer)56 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
57 Uint32* inBuffer)
58 {
59 AttrbufrecPtr copyAttrBufPtr;
60 Uint32 RnoOfAttrBufrec= cnoOfAttrbufrec;
61 int RbufLen;
62 Uint32 RinBufIndex= 0;
63 Uint32 Rnext;
64 Uint32 Rfirst;
65 Uint32 TstoredProcedure= (regOperPtr->storedProcedureId != ZNIL);
66 Uint32 RnoFree= cnoFreeAttrbufrec;
67
68 //-------------------------------------------------------------------------
69 // As a prelude to the execution of the TUPKEYREQ we will copy the program
70 // into the inBuffer to enable easy execution without any complex jumping
71 // between the buffers. In particular this will make the interpreter less
72 // complex. Hopefully it does also improve performance.
73 //-------------------------------------------------------------------------
74 copyAttrBufPtr.i= regOperPtr->firstAttrinbufrec;
75 while (copyAttrBufPtr.i != RNIL) {
76 jam();
77 ndbrequire(copyAttrBufPtr.i < RnoOfAttrBufrec);
78 ptrAss(copyAttrBufPtr, attrbufrec);
79 RbufLen = copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
80 Rnext = copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
81 Rfirst = cfirstfreeAttrbufrec;
82 /*
83 * ATTRINFO comes from 2 mutually exclusive places:
84 * 1) TUPKEYREQ (also interpreted part)
85 * 2) STORED_PROCREQ before scan start
86 * Assert here that both have a check for overflow.
87 * The "<" instead of "<=" is intentional.
88 */
89 ndbrequire(RinBufIndex + RbufLen < ZATTR_BUFFER_SIZE);
90 MEMCOPY_NO_WORDS(&inBuffer[RinBufIndex],
91 ©AttrBufPtr.p->attrbuf[0],
92 RbufLen);
93 RinBufIndex += RbufLen;
94 if (!TstoredProcedure) {
95 copyAttrBufPtr.p->attrbuf[ZBUF_NEXT]= Rfirst;
96 cfirstfreeAttrbufrec= copyAttrBufPtr.i;
97 RnoFree++;
98 }
99 copyAttrBufPtr.i= Rnext;
100 }
101 cnoFreeAttrbufrec= RnoFree;
102 if (TstoredProcedure) {
103 jam();
104 StoredProcPtr storedPtr;
105 c_storedProcPool.getPtr(storedPtr, (Uint32)regOperPtr->storedProcedureId);
106 ndbrequire(storedPtr.p->storedCode == ZSCAN_PROCEDURE);
107 storedPtr.p->storedCounter--;
108 }
109 // Release the ATTRINFO buffers
110 regOperPtr->storedProcedureId= RNIL;
111 regOperPtr->firstAttrinbufrec= RNIL;
112 regOperPtr->lastAttrinbufrec= RNIL;
113 regOperPtr->m_any_value= 0;
114 }
115
handleATTRINFOforTUPKEYREQ(Signal * signal,const Uint32 * data,Uint32 len,Operationrec * regOperPtr)116 void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
117 const Uint32 *data,
118 Uint32 len,
119 Operationrec * regOperPtr)
120 {
121 while(len)
122 {
123 Uint32 length = len > AttrInfo::DataLength ? AttrInfo::DataLength : len;
124
125 AttrbufrecPtr TAttrinbufptr;
126 TAttrinbufptr.i= cfirstfreeAttrbufrec;
127 if ((cfirstfreeAttrbufrec < cnoOfAttrbufrec) &&
128 (cnoFreeAttrbufrec > MIN_ATTRBUF)) {
129 ptrAss(TAttrinbufptr, attrbufrec);
130 MEMCOPY_NO_WORDS(&TAttrinbufptr.p->attrbuf[0],
131 data,
132 length);
133 Uint32 RnoFree= cnoFreeAttrbufrec;
134 Uint32 Rnext= TAttrinbufptr.p->attrbuf[ZBUF_NEXT];
135 TAttrinbufptr.p->attrbuf[ZBUF_DATA_LEN]= length;
136 TAttrinbufptr.p->attrbuf[ZBUF_NEXT]= RNIL;
137
138 AttrbufrecPtr locAttrinbufptr;
139 Uint32 RnewLen= regOperPtr->currentAttrinbufLen;
140
141 locAttrinbufptr.i= regOperPtr->lastAttrinbufrec;
142 cfirstfreeAttrbufrec= Rnext;
143 cnoFreeAttrbufrec= RnoFree - 1;
144 RnewLen += length;
145 regOperPtr->lastAttrinbufrec= TAttrinbufptr.i;
146 regOperPtr->currentAttrinbufLen= RnewLen;
147 if (locAttrinbufptr.i == RNIL) {
148 regOperPtr->firstAttrinbufrec= TAttrinbufptr.i;
149 } else {
150 jam();
151 ptrCheckGuard(locAttrinbufptr, cnoOfAttrbufrec, attrbufrec);
152 locAttrinbufptr.p->attrbuf[ZBUF_NEXT]= TAttrinbufptr.i;
153 }
154 if (RnewLen < ZATTR_BUFFER_SIZE) {
155 } else {
156 jam();
157 set_trans_state(regOperPtr, TRANS_TOO_MUCH_AI);
158 return;
159 }
160 } else if (cnoFreeAttrbufrec <= MIN_ATTRBUF) {
161 jam();
162 set_trans_state(regOperPtr, TRANS_ERROR_WAIT_TUPKEYREQ);
163 } else {
164 ndbrequire(false);
165 }
166
167 len -= length;
168 data += length;
169 }
170 }
171
execATTRINFO(Signal * signal)172 void Dbtup::execATTRINFO(Signal* signal)
173 {
174 Uint32 Rsig0= signal->theData[0];
175 Uint32 Rlen= signal->length();
176 jamEntry();
177
178 receive_attrinfo(signal, Rsig0, signal->theData+3, Rlen-3);
179 }
180
181 void
receive_attrinfo(Signal * signal,Uint32 op,const Uint32 * data,Uint32 Rlen)182 Dbtup::receive_attrinfo(Signal* signal, Uint32 op,
183 const Uint32* data, Uint32 Rlen)
184 {
185 OperationrecPtr regOpPtr;
186 regOpPtr.i= op;
187 c_operation_pool.getPtr(regOpPtr, op);
188 TransState trans_state= get_trans_state(regOpPtr.p);
189 if (trans_state == TRANS_IDLE) {
190 handleATTRINFOforTUPKEYREQ(signal, data, Rlen, regOpPtr.p);
191 return;
192 } else if (trans_state == TRANS_WAIT_STORED_PROCEDURE_ATTR_INFO) {
193 storedProcedureAttrInfo(signal, regOpPtr.p, data, Rlen, false);
194 return;
195 }
196 switch (trans_state) {
197 case TRANS_ERROR_WAIT_STORED_PROCREQ:
198 jam();
199 case TRANS_TOO_MUCH_AI:
200 jam();
201 case TRANS_ERROR_WAIT_TUPKEYREQ:
202 jam();
203 return; /* IGNORE ATTRINFO IN THOSE STATES, WAITING FOR ABORT SIGNAL */
204 case TRANS_DISCONNECTED:
205 jam();
206 case TRANS_STARTED:
207 jam();
208 default:
209 ndbrequire(false);
210 }
211 }
212
213 void
setChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)214 Dbtup::setChecksum(Tuple_header* tuple_ptr,
215 Tablerec* regTabPtr)
216 {
217 tuple_ptr->m_checksum= 0;
218 tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
219 }
220
221 Uint32
calculateChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)222 Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
223 Tablerec* regTabPtr)
224 {
225 Uint32 checksum;
226 Uint32 i, rec_size, *tuple_header;
227 rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
228 tuple_header= tuple_ptr->m_data;
229 checksum= 0;
230 // includes tupVersion
231 //printf("%p - ", tuple_ptr);
232
233 for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
234 checksum ^= tuple_header[i];
235 //printf("%.8x ", tuple_header[i]);
236 }
237
238 //printf("-> %.8x\n", checksum);
239
240 #if 0
241 if (var_sized) {
242 /*
243 if (! req_struct->fix_var_together) {
244 jam();
245 checksum ^= tuple_header[rec_size];
246 }
247 */
248 jam();
249 var_data_part= req_struct->var_data_start;
250 vsize_words= calculate_total_var_size(req_struct->var_len_array,
251 regTabPtr->no_var_attr);
252 ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
253 for (i= 0; i < vsize_words; i++) {
254 checksum ^= var_data_part[i];
255 }
256 }
257 #endif
258 return checksum;
259 }
260
261 /* ----------------------------------------------------------------- */
262 /* ----------- INSERT_ACTIVE_OP_LIST -------------- */
263 /* ----------------------------------------------------------------- */
264 bool
insertActiveOpList(OperationrecPtr regOperPtr,KeyReqStruct * req_struct)265 Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
266 KeyReqStruct* req_struct)
267 {
268 OperationrecPtr prevOpPtr;
269 ndbrequire(!regOperPtr.p->op_struct.in_active_list);
270 regOperPtr.p->op_struct.in_active_list= true;
271 req_struct->prevOpPtr.i=
272 prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
273 regOperPtr.p->prevActiveOp= prevOpPtr.i;
274 regOperPtr.p->nextActiveOp= RNIL;
275 regOperPtr.p->m_undo_buffer_space= 0;
276 req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
277 if (prevOpPtr.i == RNIL) {
278 set_change_mask_state(regOperPtr.p, USE_SAVED_CHANGE_MASK);
279 regOperPtr.p->saved_change_mask[0] = 0;
280 regOperPtr.p->saved_change_mask[1] = 0;
281 return true;
282 } else {
283 req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
284 prevOpPtr.p->nextActiveOp= regOperPtr.i;
285
286 regOperPtr.p->op_struct.m_wait_log_buffer=
287 prevOpPtr.p->op_struct.m_wait_log_buffer;
288 regOperPtr.p->op_struct.m_load_diskpage_on_commit=
289 prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
290 regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
291 // start with prev mask (matters only for UPD o UPD)
292 set_change_mask_state(regOperPtr.p, get_change_mask_state(prevOpPtr.p));
293 regOperPtr.p->saved_change_mask[0] = prevOpPtr.p->saved_change_mask[0];
294 regOperPtr.p->saved_change_mask[1] = prevOpPtr.p->saved_change_mask[1];
295
296 regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
297
298 prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
299 prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
300
301 if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
302 {
303 Uint32 op= regOperPtr.p->op_struct.op_type;
304 Uint32 prevOp= prevOpPtr.p->op_struct.op_type;
305 if (prevOp == ZDELETE)
306 {
307 if(op == ZINSERT)
308 {
309 // mark both
310 prevOpPtr.p->op_struct.delete_insert_flag= true;
311 regOperPtr.p->op_struct.delete_insert_flag= true;
312 return true;
313 } else {
314 terrorCode= ZTUPLE_DELETED_ERROR;
315 return false;
316 }
317 }
318 else if(op == ZINSERT && prevOp != ZDELETE)
319 {
320 terrorCode= ZINSERT_ERROR;
321 return false;
322 }
323 return true;
324 }
325 else
326 {
327 terrorCode= ZMUST_BE_ABORTED_ERROR;
328 return false;
329 }
330 }
331 }
332
333 bool
setup_read(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,bool disk)334 Dbtup::setup_read(KeyReqStruct *req_struct,
335 Operationrec* regOperPtr,
336 Fragrecord* regFragPtr,
337 Tablerec* regTabPtr,
338 bool disk)
339 {
340 OperationrecPtr currOpPtr;
341 currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
342 if (currOpPtr.i == RNIL)
343 {
344 if (regTabPtr->need_expand(disk))
345 prepare_read(req_struct, regTabPtr, disk);
346 return true;
347 }
348
349 do {
350 Uint32 savepointId= regOperPtr->savepointId;
351 bool dirty= req_struct->dirty_op;
352
353 c_operation_pool.getPtr(currOpPtr);
354 bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
355 req_struct->trans_id1,
356 req_struct->trans_id2);
357 /**
358 * Read committed in same trans reads latest copy
359 */
360 if(dirty && !sameTrans)
361 {
362 savepointId= 0;
363 }
364 else if(sameTrans)
365 {
366 // Use savepoint even in read committed mode
367 dirty= false;
368 }
369
370 bool found= find_savepoint(currOpPtr, savepointId);
371
372 Uint32 currOp= currOpPtr.p->op_struct.op_type;
373
374 if((found && currOp == ZDELETE) ||
375 ((dirty || !found) && currOp == ZINSERT))
376 {
377 terrorCode= ZTUPLE_DELETED_ERROR;
378 break;
379 }
380
381 if(dirty || !found)
382 {
383
384 }
385 else
386 {
387 req_struct->m_tuple_ptr= (Tuple_header*)
388 c_undo_buffer.get_ptr(&currOpPtr.p->m_copy_tuple_location);
389 }
390
391 if (regTabPtr->need_expand(disk))
392 prepare_read(req_struct, regTabPtr, disk);
393
394 #if 0
395 ndbout_c("reading copy");
396 Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
397 req_struct->m_tuple_ptr= fixed_ptr;
398 req_struct->fix_var_together= true;
399 req_struct->var_len_array= (Uint16*)var_ptr;
400 req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
401 Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
402 req_struct->var_pos_array,
403 regTabPtr->no_var_attr);
404 req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
405 #endif
406 return true;
407 } while(0);
408
409 return false;
410 }
411
412 int
load_diskpage(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 local_key,Uint32 flags)413 Dbtup::load_diskpage(Signal* signal,
414 Uint32 opRec, Uint32 fragPtrI,
415 Uint32 local_key, Uint32 flags)
416 {
417 c_operation_pool.getPtr(operPtr, opRec);
418 fragptr.i= fragPtrI;
419 ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
420
421 Operationrec * regOperPtr= operPtr.p;
422 Fragrecord * regFragPtr= fragptr.p;
423
424 tabptr.i = regFragPtr->fragTableId;
425 ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
426 Tablerec* regTabPtr = tabptr.p;
427
428 if(local_key == ~(Uint32)0)
429 {
430 jam();
431 regOperPtr->op_struct.m_wait_log_buffer= 1;
432 regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
433 return 1;
434 }
435
436 jam();
437 Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
438 Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
439 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
440 frag_page_id);
441 regOperPtr->m_tuple_location.m_page_idx= page_idx;
442
443 PagePtr page_ptr;
444 Uint32* tmp= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
445 Tuple_header* ptr= (Tuple_header*)tmp;
446
447 int res= 1;
448 if(ptr->m_header_bits & Tuple_header::DISK_PART)
449 {
450 Page_cache_client::Request req;
451 memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
452 req.m_callback.m_callbackData= opRec;
453 req.m_callback.m_callbackFunction=
454 safe_cast(&Dbtup::disk_page_load_callback);
455
456 #ifdef ERROR_INSERT
457 if (ERROR_INSERTED(4022))
458 {
459 flags |= Page_cache_client::DELAY_REQ;
460 req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
461 }
462 #endif
463
464 if((res= m_pgman.get_page(signal, req, flags)) > 0)
465 {
466 //ndbout_c("in cache");
467 // In cache
468 }
469 else if(res == 0)
470 {
471 //ndbout_c("waiting for callback");
472 // set state
473 }
474 else
475 {
476 // Error
477 }
478 }
479
480 switch(flags & 7)
481 {
482 case ZREAD:
483 case ZREAD_EX:
484 break;
485 case ZDELETE:
486 case ZUPDATE:
487 case ZINSERT:
488 case ZWRITE:
489 regOperPtr->op_struct.m_wait_log_buffer= 1;
490 regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
491 }
492 return res;
493 }
494
495 void
disk_page_load_callback(Signal * signal,Uint32 opRec,Uint32 page_id)496 Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
497 {
498 c_operation_pool.getPtr(operPtr, opRec);
499 c_lqh->acckeyconf_load_diskpage_callback(signal,
500 operPtr.p->userpointer, page_id);
501 }
502
503 int
load_diskpage_scan(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 local_key,Uint32 flags)504 Dbtup::load_diskpage_scan(Signal* signal,
505 Uint32 opRec, Uint32 fragPtrI,
506 Uint32 local_key, Uint32 flags)
507 {
508 c_operation_pool.getPtr(operPtr, opRec);
509 fragptr.i= fragPtrI;
510 ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
511
512 Operationrec * regOperPtr= operPtr.p;
513 Fragrecord * regFragPtr= fragptr.p;
514
515 tabptr.i = regFragPtr->fragTableId;
516 ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
517 Tablerec* regTabPtr = tabptr.p;
518
519 jam();
520 Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
521 Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
522 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
523 frag_page_id);
524 regOperPtr->m_tuple_location.m_page_idx= page_idx;
525 regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
526
527 PagePtr page_ptr;
528 Uint32* tmp= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
529 Tuple_header* ptr= (Tuple_header*)tmp;
530
531 int res= 1;
532 if(ptr->m_header_bits & Tuple_header::DISK_PART)
533 {
534 Page_cache_client::Request req;
535 memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
536 req.m_callback.m_callbackData= opRec;
537 req.m_callback.m_callbackFunction=
538 safe_cast(&Dbtup::disk_page_load_scan_callback);
539
540 if((res= m_pgman.get_page(signal, req, flags)) > 0)
541 {
542 // ndbout_c("in cache");
543 // In cache
544 }
545 else if(res == 0)
546 {
547 //ndbout_c("waiting for callback");
548 // set state
549 }
550 else
551 {
552 // Error
553 }
554 }
555 return res;
556 }
557
558 void
disk_page_load_scan_callback(Signal * signal,Uint32 opRec,Uint32 page_id)559 Dbtup::disk_page_load_scan_callback(Signal* signal,
560 Uint32 opRec, Uint32 page_id)
561 {
562 c_operation_pool.getPtr(operPtr, opRec);
563 c_lqh->next_scanconf_load_diskpage_callback(signal,
564 operPtr.p->userpointer, page_id);
565 }
566
execTUPKEYREQ(Signal * signal)567 void Dbtup::execTUPKEYREQ(Signal* signal)
568 {
569 TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
570 KeyReqStruct req_struct;
571 Uint32 sig1, sig2, sig3, sig4;
572
573 Uint32 RoperPtr= tupKeyReq->connectPtr;
574 Uint32 Rfragptr= tupKeyReq->fragPtr;
575
576 Uint32 RnoOfFragrec= cnoOfFragrec;
577 Uint32 RnoOfTablerec= cnoOfTablerec;
578
579 jamEntry();
580 fragptr.i= Rfragptr;
581
582 ndbrequire(Rfragptr < RnoOfFragrec);
583
584 c_operation_pool.getPtr(operPtr, RoperPtr);
585 ptrAss(fragptr, fragrecord);
586
587 Uint32 TrequestInfo= tupKeyReq->request;
588
589 Operationrec * regOperPtr= operPtr.p;
590 Fragrecord * regFragPtr= fragptr.p;
591
592 tabptr.i = regFragPtr->fragTableId;
593 ptrCheckGuard(tabptr, RnoOfTablerec, tablerec);
594 Tablerec* regTabPtr = tabptr.p;
595
596 req_struct.signal= signal;
597 req_struct.dirty_op= TrequestInfo & 1;
598 req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
599 req_struct.no_fired_triggers= 0;
600 req_struct.read_length= 0;
601 req_struct.max_attr_id_updated= 0;
602 req_struct.no_changed_attrs= 0;
603 req_struct.last_row= false;
604 req_struct.changeMask.clear();
605
606 if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
607 {
608 TUPKEY_abort(signal, 39);
609 return;
610 }
611
612 /* ----------------------------------------------------------------- */
613 // Operation is ZREAD when we arrive here so no need to worry about the
614 // abort process.
615 /* ----------------------------------------------------------------- */
616 /* ----------- INITIATE THE OPERATION RECORD -------------- */
617 /* ----------------------------------------------------------------- */
618 Uint32 Rstoredid= tupKeyReq->storedProcedure;
619
620 regOperPtr->fragmentPtr= Rfragptr;
621 regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0xf;
622 regOperPtr->op_struct.delete_insert_flag = false;
623 regOperPtr->storedProcedureId= Rstoredid;
624
625 regOperPtr->m_copy_tuple_location.setNull();
626 regOperPtr->tupVersion= ZNIL;
627
628 sig1= tupKeyReq->savePointId;
629 sig2= tupKeyReq->primaryReplica;
630 sig3= tupKeyReq->keyRef2;
631
632 regOperPtr->savepointId= sig1;
633 regOperPtr->op_struct.primary_replica= sig2;
634 Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
635
636 sig1= tupKeyReq->opRef;
637 sig2= tupKeyReq->tcOpIndex;
638 sig3= tupKeyReq->coordinatorTC;
639 sig4= tupKeyReq->keyRef1;
640
641 req_struct.tc_operation_ptr= sig1;
642 req_struct.TC_index= sig2;
643 req_struct.TC_ref= sig3;
644 Uint32 pageid = req_struct.frag_page_id= sig4;
645 req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
646
647 sig1= tupKeyReq->attrBufLen;
648 sig2= tupKeyReq->applRef;
649 sig3= tupKeyReq->transId1;
650 sig4= tupKeyReq->transId2;
651
652 Uint32 disk_page= tupKeyReq->disk_page;
653
654 req_struct.log_size= sig1;
655 req_struct.attrinfo_len= sig1;
656 req_struct.rec_blockref= sig2;
657 req_struct.trans_id1= sig3;
658 req_struct.trans_id2= sig4;
659 req_struct.m_disk_page_ptr.i= disk_page;
660
661 sig1 = tupKeyReq->m_row_id_page_no;
662 sig2 = tupKeyReq->m_row_id_page_idx;
663
664 req_struct.m_row_id.m_page_no = sig1;
665 req_struct.m_row_id.m_page_idx = sig2;
666
667 Uint32 Roptype = regOperPtr->op_struct.op_type;
668
669 if (Rstoredid != ZNIL) {
670 ndbrequire(initStoredOperationrec(regOperPtr,
671 &req_struct,
672 Rstoredid) == ZOK);
673 }
674
675 copyAttrinfo(regOperPtr, &cinBuffer[0]);
676
677 Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
678 if (Roptype == ZINSERT && localkey == ~ (Uint32) 0)
679 {
680 // No tuple allocatated yet
681 goto do_insert;
682 }
683
684 /**
685 * Get pointer to tuple
686 */
687 regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
688 req_struct.frag_page_id);
689
690 setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
691
692 /**
693 * Check operation
694 */
695 if (Roptype == ZREAD) {
696 jam();
697
698 if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
699 disk_page != RNIL))
700 {
701 if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
702 {
703 req_struct.log_size= 0;
704 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
705 /* ---------------------------------------------------------------- */
706 // Read Operations need not to be taken out of any lists.
707 // We also do not need to wait for commit since there is no changes
708 // to commit. Thus we
709 // prepare the operation record already now for the next operation.
710 // Write operations have set the state to STARTED above indicating
711 // that they are waiting for the Commit or Abort decision.
712 /* ---------------------------------------------------------------- */
713 set_trans_state(regOperPtr, TRANS_IDLE);
714 regOperPtr->currentAttrinbufLen= 0;
715 }
716 return;
717 }
718 tupkeyErrorLab(signal);
719 return;
720 }
721
722 if(insertActiveOpList(operPtr, &req_struct))
723 {
724 if(Roptype == ZINSERT)
725 {
726 jam();
727 do_insert:
728 if (handleInsertReq(signal, operPtr,
729 fragptr, regTabPtr, &req_struct) == -1)
730 {
731 return;
732 }
733 if (!regTabPtr->tuxCustomTriggers.isEmpty())
734 {
735 jam();
736 if (executeTuxInsertTriggers(signal,
737 regOperPtr,
738 regFragPtr,
739 regTabPtr) != 0) {
740 jam();
741 /*
742 * TUP insert succeeded but add of TUX entries failed. All
743 * TUX changes have been rolled back at this point.
744 *
745 * We will abort via tupkeyErrorLab() as usual. This routine
746 * however resets the operation to ZREAD. The TUP_ABORTREQ
747 * arriving later cannot then undo the insert.
748 *
749 * Therefore we call TUP_ABORTREQ already now. Diskdata etc
750 * should be in memory and timeslicing cannot occur. We must
751 * skip TUX abort triggers since TUX is already aborted.
752 */
753 signal->theData[0] = operPtr.i;
754 do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
755 tupkeyErrorLab(signal);
756 return;
757 }
758 }
759 checkImmediateTriggersAfterInsert(&req_struct,
760 regOperPtr,
761 regTabPtr,
762 disk_page != RNIL);
763 set_change_mask_state(regOperPtr, SET_ALL_MASK);
764 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
765 return;
766 }
767
768 if (Roptype == ZUPDATE) {
769 jam();
770 if (handleUpdateReq(signal, regOperPtr,
771 regFragPtr, regTabPtr, &req_struct, disk_page != RNIL) == -1) {
772 return;
773 }
774 // If update operation is done on primary,
775 // check any after op triggers
776 terrorCode= 0;
777 if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
778 jam();
779 if (executeTuxUpdateTriggers(signal,
780 regOperPtr,
781 regFragPtr,
782 regTabPtr) != 0) {
783 jam();
784 /*
785 * See insert case.
786 */
787 signal->theData[0] = operPtr.i;
788 do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
789 tupkeyErrorLab(signal);
790 return;
791 }
792 }
793 checkImmediateTriggersAfterUpdate(&req_struct,
794 regOperPtr,
795 regTabPtr,
796 disk_page != RNIL);
797 // XXX use terrorCode for now since all methods are void
798 if (terrorCode != 0)
799 {
800 tupkeyErrorLab(signal);
801 return;
802 }
803 update_change_mask_info(&req_struct, regOperPtr);
804 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
805 return;
806 }
807 else if(Roptype == ZDELETE)
808 {
809 jam();
810 req_struct.log_size= 0;
811 if (handleDeleteReq(signal, regOperPtr,
812 regFragPtr, regTabPtr,
813 &req_struct,
814 disk_page != RNIL) == -1) {
815 return;
816 }
817 /*
818 * TUX doesn't need to check for triggers at delete since entries in
819 * the index are kept until commit time.
820 */
821
822 /*
823 * Secondary index triggers fire on the primary after a delete.
824 */
825 checkImmediateTriggersAfterDelete(&req_struct,
826 regOperPtr,
827 regTabPtr,
828 disk_page != RNIL);
829 set_change_mask_state(regOperPtr, DELETE_CHANGES);
830 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
831 return;
832 }
833 else
834 {
835 ndbrequire(false); // Invalid op type
836 }
837 }
838
839 tupkeyErrorLab(signal);
840 }
841
842 void
setup_fixed_part(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)843 Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
844 Operationrec* regOperPtr,
845 Tablerec* regTabPtr)
846 {
847 PagePtr page_ptr;
848 Uint32* ptr= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
849 req_struct->m_page_ptr = page_ptr;
850 req_struct->m_tuple_ptr = (Tuple_header*)ptr;
851
852 ndbassert(regOperPtr->op_struct.op_type == ZINSERT || (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
853
854 req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
855 req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
856
857 Uint32 num_attr= regTabPtr->m_no_of_attributes;
858 Uint32 descr_start= regTabPtr->tabDescriptor;
859 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
860 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
861 req_struct->attr_descr= tab_descr;
862 }
863
864 /* ---------------------------------------------------------------- */
865 /* ------------------------ CONFIRM REQUEST ----------------------- */
866 /* ---------------------------------------------------------------- */
sendTUPKEYCONF(Signal * signal,KeyReqStruct * req_struct,Operationrec * regOperPtr)867 void Dbtup::sendTUPKEYCONF(Signal* signal,
868 KeyReqStruct *req_struct,
869 Operationrec * regOperPtr)
870 {
871 TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
872
873 Uint32 Rcreate_rowid = req_struct->m_use_rowid;
874 Uint32 RuserPointer= regOperPtr->userpointer;
875 Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
876 Uint32 log_size= req_struct->log_size;
877 Uint32 read_length= req_struct->read_length;
878 Uint32 last_row= req_struct->last_row;
879
880 set_trans_state(regOperPtr, TRANS_STARTED);
881 set_tuple_state(regOperPtr, TUPLE_PREPARED);
882 tupKeyConf->userPtr= RuserPointer;
883 tupKeyConf->readLength= read_length;
884 tupKeyConf->writeLength= log_size;
885 tupKeyConf->noFiredTriggers= RnoFiredTriggers;
886 tupKeyConf->lastRow= last_row;
887 tupKeyConf->rowid = Rcreate_rowid;
888
889 EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
890 TupKeyConf::SignalLength);
891
892 }
893
894
895 #define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))
896
897 /* ---------------------------------------------------------------- */
898 /* ----------------------------- READ ---------------------------- */
899 /* ---------------------------------------------------------------- */
handleReadReq(Signal * signal,Operationrec * regOperPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)900 int Dbtup::handleReadReq(Signal* signal,
901 Operationrec* regOperPtr,
902 Tablerec* regTabPtr,
903 KeyReqStruct* req_struct)
904 {
905 Uint32 *dst;
906 Uint32 dstLen, start_index;
907 const BlockReference sendBref= req_struct->rec_blockref;
908 if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
909 (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) {
910 jam();
911 ndbout_c("here2");
912 terrorCode= ZTUPLE_CORRUPTED_ERROR;
913 tupkeyErrorLab(signal);
914 return -1;
915 }
916
917 const Uint32 node = refToNode(sendBref);
918 if(node != 0 && node != getOwnNodeId()) {
919 start_index= 25;
920 } else {
921 jam();
922 /**
923 * execute direct
924 */
925 start_index= 3;
926 }
927 dst= &signal->theData[start_index];
928 dstLen= (MAX_READ / 4) - start_index;
929 if (!req_struct->interpreted_exec) {
930 jam();
931 int ret = readAttributes(req_struct,
932 &cinBuffer[0],
933 req_struct->attrinfo_len,
934 dst,
935 dstLen,
936 false);
937 if (likely(ret != -1)) {
938 /* ------------------------------------------------------------------------- */
939 // We have read all data into coutBuffer. Now send it to the API.
940 /* ------------------------------------------------------------------------- */
941 jam();
942 Uint32 TnoOfDataRead= (Uint32) ret;
943 req_struct->read_length= TnoOfDataRead;
944 sendReadAttrinfo(signal, req_struct, TnoOfDataRead, regOperPtr);
945 return 0;
946 }
947 } else {
948 jam();
949 if (likely(interpreterStartLab(signal, req_struct) != -1)) {
950 return 0;
951 }
952 return -1;
953 }
954
955 jam();
956 tupkeyErrorLab(signal);
957 return -1;
958 }
959
960 /* ---------------------------------------------------------------- */
961 /* ---------------------------- UPDATE ---------------------------- */
962 /* ---------------------------------------------------------------- */
handleUpdateReq(Signal * signal,Operationrec * operPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)963 int Dbtup::handleUpdateReq(Signal* signal,
964 Operationrec* operPtrP,
965 Fragrecord* regFragPtr,
966 Tablerec* regTabPtr,
967 KeyReqStruct* req_struct,
968 bool disk)
969 {
970 Uint32 *dst;
971 Tuple_header *base= req_struct->m_tuple_ptr, *org;
972 if ((dst= c_undo_buffer.alloc_copy_tuple(&operPtrP->m_copy_tuple_location,
973 regTabPtr->total_rec_size)) == 0)
974 {
975 terrorCode= ZMEM_NOMEM_ERROR;
976 goto error;
977 }
978
979 Uint32 tup_version;
980 if(operPtrP->is_first_operation())
981 {
982 org= req_struct->m_tuple_ptr;
983 tup_version= org->get_tuple_version();
984 }
985 else
986 {
987 Operationrec* prevOp= req_struct->prevOpPtr.p;
988 tup_version= prevOp->tupVersion;
989 org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
990 }
991
992 /**
993 * Check consistency before update/delete
994 */
995 req_struct->m_tuple_ptr= org;
996 if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
997 (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
998 {
999 terrorCode= ZTUPLE_CORRUPTED_ERROR;
1000 goto error;
1001 }
1002
1003 req_struct->m_tuple_ptr= (Tuple_header*)dst;
1004
1005 union {
1006 Uint32 sizes[4];
1007 Uint64 cmp[2];
1008 };
1009
1010 disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
1011 if (regTabPtr->need_expand(disk))
1012 {
1013 expand_tuple(req_struct, sizes, org, regTabPtr, disk);
1014 if(disk && operPtrP->m_undo_buffer_space == 0)
1015 {
1016 operPtrP->op_struct.m_wait_log_buffer = 1;
1017 operPtrP->op_struct.m_load_diskpage_on_commit = 1;
1018 Uint32 sz= operPtrP->m_undo_buffer_space=
1019 (sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
1020
1021 terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
1022 sz);
1023 if(unlikely(terrorCode))
1024 {
1025 operPtrP->m_undo_buffer_space= 0;
1026 goto error;
1027 }
1028 }
1029 }
1030 else
1031 {
1032 memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1033 }
1034
1035 tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
1036 operPtrP->tupVersion= tup_version;
1037
1038 if (!req_struct->interpreted_exec) {
1039 jam();
1040 int retValue = updateAttributes(req_struct,
1041 &cinBuffer[0],
1042 req_struct->attrinfo_len);
1043 if (unlikely(retValue == -1))
1044 goto error;
1045 } else {
1046 jam();
1047 if (unlikely(interpreterStartLab(signal, req_struct) == -1))
1048 return -1;
1049 }
1050
1051 if (regTabPtr->need_shrink())
1052 {
1053 shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
1054 if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
1055 base,
1056 operPtrP,
1057 regFragPtr,
1058 regTabPtr,
1059 sizes)) {
1060 goto error;
1061 }
1062 }
1063
1064 req_struct->m_tuple_ptr->set_tuple_version(tup_version);
1065 if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
1066 jam();
1067 setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1068 }
1069 return 0;
1070
1071 error:
1072 tupkeyErrorLab(signal);
1073 return -1;
1074 }
1075
1076 /* ---------------------------------------------------------------- */
1077 /* ----------------------------- INSERT --------------------------- */
1078 /* ---------------------------------------------------------------- */
1079 void
prepare_initial_insert(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1080 Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
1081 Operationrec* regOperPtr,
1082 Tablerec* regTabPtr)
1083 {
1084 Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ?
1085 sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
1086 regOperPtr->nextActiveOp= RNIL;
1087 regOperPtr->prevActiveOp= RNIL;
1088 regOperPtr->op_struct.in_active_list= true;
1089 regOperPtr->m_undo_buffer_space= disk_undo;
1090
1091 req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1092 req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1093
1094 Uint32 num_attr= regTabPtr->m_no_of_attributes;
1095 Uint32 descr_start= regTabPtr->tabDescriptor;
1096 Uint32 order_desc= regTabPtr->m_real_order_descriptor;
1097 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1098 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1099 req_struct->attr_descr= tab_descr;
1100 Uint16* order= (Uint16*)&tableDescriptor[order_desc];
1101
1102 const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
1103 const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
1104 Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
1105 Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
1106
1107 if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
1108 {
1109 ref->m_page_no = RNIL;
1110 ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
1111 }
1112
1113 if(cnt1)
1114 {
1115 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
1116 dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt1+1);
1117 dst->m_offset_array_ptr= req_struct->var_pos_array;
1118 dst->m_var_len_offset= cnt1;
1119 dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
1120 // Disk part is 32-bit aligned
1121 ptr= ALIGN_WORD(dst->m_data_ptr+regTabPtr->m_offsets[MM].m_max_var_offset);
1122 order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
1123 Uint32 pos= 0;
1124 Uint16 *pos_ptr = req_struct->var_pos_array;
1125 Uint16 *len_ptr = pos_ptr + cnt1;
1126 for(Uint32 i= 0; i<cnt1; i++)
1127 {
1128 * pos_ptr++ = pos;
1129 * len_ptr++ = pos;
1130 pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
1131 }
1132 }
1133
1134 req_struct->m_disk_ptr= (Tuple_header*)ptr;
1135
1136 ndbrequire(cnt2 == 0);
1137
1138 // Set all null bits
1139 memset(req_struct->m_tuple_ptr->m_null_bits+
1140 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1141 4*regTabPtr->m_offsets[MM].m_null_words);
1142 memset(req_struct->m_disk_ptr->m_null_bits+
1143 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1144 4*regTabPtr->m_offsets[DD].m_null_words);
1145 req_struct->m_tuple_ptr->m_header_bits=
1146 disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
1147 }
1148
handleInsertReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> fragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)1149 int Dbtup::handleInsertReq(Signal* signal,
1150 Ptr<Operationrec> regOperPtr,
1151 Ptr<Fragrecord> fragPtr,
1152 Tablerec* regTabPtr,
1153 KeyReqStruct *req_struct)
1154 {
1155 Uint32 tup_version = 1;
1156 Fragrecord* regFragPtr = fragPtr.p;
1157 Uint32 *dst, *ptr= 0;
1158 Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
1159 Tuple_header *tuple_ptr;
1160
1161 bool disk = regTabPtr->m_no_of_disk_attributes > 0;
1162 bool mem_insert = regOperPtr.p->is_first_operation();
1163 bool disk_insert = mem_insert && disk;
1164 bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
1165 bool rowid = req_struct->m_use_rowid;
1166 Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1167 Uint32 frag_page_id = req_struct->frag_page_id;
1168
1169 union {
1170 Uint32 sizes[4];
1171 Uint64 cmp[2];
1172 };
1173
1174 if (ERROR_INSERTED(4014))
1175 {
1176 dst = 0;
1177 goto undo_buffer_error;
1178 }
1179
1180 dst= c_undo_buffer.alloc_copy_tuple(®OperPtr.p->m_copy_tuple_location,
1181 regTabPtr->total_rec_size);
1182 if (unlikely(dst == 0))
1183 {
1184 goto undo_buffer_error;
1185 }
1186 tuple_ptr= req_struct->m_tuple_ptr= (Tuple_header*)dst;
1187
1188 if(mem_insert)
1189 {
1190 jam();
1191 prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
1192 }
1193 else
1194 {
1195 Operationrec* prevOp= req_struct->prevOpPtr.p;
1196 ndbassert(prevOp->op_struct.op_type == ZDELETE);
1197 tup_version= prevOp->tupVersion + 1;
1198
1199 if(!prevOp->is_first_operation())
1200 org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
1201 if (regTabPtr->need_expand())
1202 {
1203 expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
1204 memset(req_struct->m_disk_ptr->m_null_bits+
1205 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1206 4*regTabPtr->m_offsets[DD].m_null_words);
1207 }
1208 else
1209 {
1210 memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1211 }
1212 memset(tuple_ptr->m_null_bits+
1213 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1214 4*regTabPtr->m_offsets[MM].m_null_words);
1215 }
1216
1217 if (disk_insert)
1218 {
1219 int res;
1220
1221 if (ERROR_INSERTED(4015))
1222 {
1223 terrorCode = 1501;
1224 goto log_space_error;
1225 }
1226
1227 res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
1228 regOperPtr.p->m_undo_buffer_space);
1229 if(unlikely(res))
1230 {
1231 terrorCode= res;
1232 goto log_space_error;
1233 }
1234 }
1235
1236 regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
1237 tuple_ptr->set_tuple_version(tup_version);
1238
1239 if (ERROR_INSERTED(4016))
1240 {
1241 terrorCode = ZAI_INCONSISTENCY_ERROR;
1242 goto update_error;
1243 }
1244
1245 if(unlikely(updateAttributes(req_struct, &cinBuffer[0],
1246 req_struct->attrinfo_len) == -1))
1247 {
1248 goto update_error;
1249 }
1250
1251 if (ERROR_INSERTED(4017))
1252 {
1253 goto null_check_error;
1254 }
1255 if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
1256 {
1257 goto null_check_error;
1258 }
1259
1260 if (regTabPtr->need_shrink())
1261 {
1262 shrink_tuple(req_struct, sizes+2, regTabPtr, true);
1263 }
1264
1265 if (ERROR_INSERTED(4025))
1266 {
1267 goto mem_error;
1268 }
1269
1270 if (ERROR_INSERTED(4026))
1271 {
1272 CLEAR_ERROR_INSERT_VALUE;
1273 goto mem_error;
1274 }
1275
1276 if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
1277 {
1278 goto mem_error;
1279 }
1280
1281 if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
1282 {
1283 CLEAR_ERROR_INSERT_VALUE;
1284 goto mem_error;
1285 }
1286
1287 /**
1288 * Alloc memory
1289 */
1290 if(mem_insert)
1291 {
1292 if (!rowid)
1293 {
1294 if (ERROR_INSERTED(4018))
1295 {
1296 goto mem_error;
1297 }
1298
1299 if (!varsize)
1300 {
1301 jam();
1302 ptr= alloc_fix_rec(regFragPtr,
1303 regTabPtr,
1304 ®OperPtr.p->m_tuple_location,
1305 &frag_page_id);
1306 }
1307 else
1308 {
1309 jam();
1310 regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1311 ptr= alloc_var_rec(regFragPtr, regTabPtr,
1312 sizes[2+MM],
1313 ®OperPtr.p->m_tuple_location,
1314 &frag_page_id);
1315 }
1316 if (unlikely(ptr == 0))
1317 {
1318 goto mem_error;
1319 }
1320 req_struct->m_use_rowid = true;
1321 }
1322 else
1323 {
1324 regOperPtr.p->m_tuple_location = req_struct->m_row_id;
1325 if (ERROR_INSERTED(4019))
1326 {
1327 terrorCode = ZROWID_ALLOCATED;
1328 goto alloc_rowid_error;
1329 }
1330
1331 if (!varsize)
1332 {
1333 jam();
1334 ptr= alloc_fix_rowid(regFragPtr,
1335 regTabPtr,
1336 ®OperPtr.p->m_tuple_location,
1337 &frag_page_id);
1338 }
1339 else
1340 {
1341 jam();
1342 regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1343 ptr= alloc_var_rowid(regFragPtr, regTabPtr,
1344 sizes[2+MM],
1345 ®OperPtr.p->m_tuple_location,
1346 &frag_page_id);
1347 }
1348 if (unlikely(ptr == 0))
1349 {
1350 jam();
1351 goto alloc_rowid_error;
1352 }
1353 }
1354 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1355 regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
1356 c_lqh->accminupdate(signal,
1357 regOperPtr.p->userpointer,
1358 ®OperPtr.p->m_tuple_location);
1359
1360 base = (Tuple_header*)ptr;
1361 base->m_operation_ptr_i= regOperPtr.i;
1362 base->m_header_bits= Tuple_header::ALLOC |
1363 (varsize ? Tuple_header::CHAINED_ROW : 0);
1364 regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
1365 }
1366 else
1367 {
1368 int ret;
1369 if (ERROR_INSERTED(4020))
1370 {
1371 goto size_change_error;
1372 }
1373
1374 if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
1375 unlikely(ret = handle_size_change_after_update(req_struct,
1376 base,
1377 regOperPtr.p,
1378 regFragPtr,
1379 regTabPtr,
1380 sizes)))
1381 {
1382 goto size_change_error;
1383 }
1384 req_struct->m_use_rowid = false;
1385 base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
1386 }
1387
1388 base->m_header_bits |= Tuple_header::ALLOC &
1389 (regOperPtr.p->is_first_operation() ? ~0 : 1);
1390
1391 if (disk_insert)
1392 {
1393 Local_key tmp;
1394 Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
1395 1 : sizes[2+DD];
1396
1397 if (ERROR_INSERTED(4021))
1398 {
1399 terrorCode = 1601;
1400 goto disk_prealloc_error;
1401 }
1402
1403 int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
1404 if (unlikely(ret < 0))
1405 {
1406 terrorCode = -ret;
1407 goto disk_prealloc_error;
1408 }
1409
1410 regOperPtr.p->op_struct.m_disk_preallocated= 1;
1411 tmp.m_page_idx= size;
1412 memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
1413
1414 /**
1415 * Set ref from disk to mm
1416 */
1417 Local_key ref = regOperPtr.p->m_tuple_location;
1418 ref.m_page_no = frag_page_id;
1419
1420 Tuple_header* disk_ptr= req_struct->m_disk_ptr;
1421 disk_ptr->m_header_bits = 0;
1422 disk_ptr->m_base_record_ref= ref.ref();
1423 }
1424
1425 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
1426 {
1427 jam();
1428 setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1429 }
1430 return 0;
1431
1432 size_change_error:
1433 jam();
1434 terrorCode = ZMEM_NOMEM_ERROR;
1435 goto exit_error;
1436
1437 undo_buffer_error:
1438 jam();
1439 terrorCode= ZMEM_NOMEM_ERROR;
1440 regOperPtr.p->m_undo_buffer_space = 0;
1441 if (mem_insert)
1442 regOperPtr.p->m_tuple_location.setNull();
1443 regOperPtr.p->m_copy_tuple_location.setNull();
1444 tupkeyErrorLab(signal);
1445 return -1;
1446
1447 null_check_error:
1448 jam();
1449 terrorCode= ZNO_ILLEGAL_NULL_ATTR;
1450 goto update_error;
1451
1452 mem_error:
1453 jam();
1454 terrorCode= ZMEM_NOMEM_ERROR;
1455 goto update_error;
1456
1457 log_space_error:
1458 jam();
1459 regOperPtr.p->m_undo_buffer_space = 0;
1460 alloc_rowid_error:
1461 jam();
1462 update_error:
1463 jam();
1464 if (mem_insert)
1465 {
1466 regOperPtr.p->op_struct.in_active_list = false;
1467 regOperPtr.p->m_tuple_location.setNull();
1468 }
1469 exit_error:
1470 tupkeyErrorLab(signal);
1471 return -1;
1472
1473 disk_prealloc_error:
1474 base->m_header_bits |= Tuple_header::FREED;
1475 goto exit_error;
1476 }
1477
1478 /* ---------------------------------------------------------------- */
1479 /* ---------------------------- DELETE ---------------------------- */
1480 /* ---------------------------------------------------------------- */
handleDeleteReq(Signal * signal,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)1481 int Dbtup::handleDeleteReq(Signal* signal,
1482 Operationrec* regOperPtr,
1483 Fragrecord* regFragPtr,
1484 Tablerec* regTabPtr,
1485 KeyReqStruct *req_struct,
1486 bool disk)
1487 {
1488 // delete must set but not increment tupVersion
1489 if (!regOperPtr->is_first_operation())
1490 {
1491 Operationrec* prevOp= req_struct->prevOpPtr.p;
1492 regOperPtr->tupVersion= prevOp->tupVersion;
1493 // make copy since previous op is committed before this one
1494 const Uint32* org = c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
1495 Uint32* dst = c_undo_buffer.alloc_copy_tuple(
1496 ®OperPtr->m_copy_tuple_location, regTabPtr->total_rec_size);
1497 if (dst == 0) {
1498 terrorCode = ZMEM_NOMEM_ERROR;
1499 goto error;
1500 }
1501 memcpy(dst, org, regTabPtr->total_rec_size << 2);
1502 req_struct->m_tuple_ptr = (Tuple_header*)dst;
1503 }
1504 else
1505 {
1506 regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
1507 }
1508
1509 if(disk && regOperPtr->m_undo_buffer_space == 0)
1510 {
1511 regOperPtr->op_struct.m_wait_log_buffer = 1;
1512 regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
1513 Uint32 sz= regOperPtr->m_undo_buffer_space=
1514 (sizeof(Dbtup::Disk_undo::Free) >> 2) +
1515 regTabPtr->m_offsets[DD].m_fix_header_size - 1;
1516
1517 terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
1518 sz);
1519 if(unlikely(terrorCode))
1520 {
1521 regOperPtr->m_undo_buffer_space= 0;
1522 goto error;
1523 }
1524 }
1525 if (req_struct->attrinfo_len == 0)
1526 {
1527 return 0;
1528 }
1529
1530 if (regTabPtr->need_expand(disk))
1531 {
1532 prepare_read(req_struct, regTabPtr, disk);
1533 }
1534
1535 {
1536 Uint32 RlogSize;
1537 int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
1538 if (ret == 0 && (RlogSize= req_struct->log_size))
1539 {
1540 jam();
1541 sendLogAttrinfo(signal, RlogSize, regOperPtr);
1542 }
1543 return ret;
1544 }
1545
1546 error:
1547 tupkeyErrorLab(signal);
1548 return -1;
1549 }
1550
1551 bool
checkNullAttributes(KeyReqStruct * req_struct,Tablerec * regTabPtr)1552 Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
1553 Tablerec* regTabPtr)
1554 {
1555 // Implement checking of updating all not null attributes in an insert here.
1556 Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
1557 /*
1558 * The idea here is maybe that changeMask is not-null attributes
1559 * and must contain notNullAttributeMask. But:
1560 *
1561 * 1. changeMask has all bits set on insert
1562 * 2. not-null is checked in each UpdateFunction
1563 * 3. the code below does not work except trivially due to 1.
1564 *
1565 * XXX remove or fix
1566 */
1567 attributeMask.clear();
1568 attributeMask.bitOR(req_struct->changeMask);
1569 attributeMask.bitAND(regTabPtr->notNullAttributeMask);
1570 attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
1571 if (!attributeMask.isclear()) {
1572 return false;
1573 }
1574 return true;
1575 }
1576
1577 /* ---------------------------------------------------------------- */
1578 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE */
1579 /* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
1580 /* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL */
1581 /* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR PHASES*/
1582 /* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
1583 /* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
1584 /* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE */
1585 /* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
1586 /* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL. */
1587 /* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
1588 /* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN */
1589 /* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND */
1590 /* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM */
1591 /* TO THE CLIENT APPLICATION. */
1592 /* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
1593 /* THE INTERPRETER EXECUTION REGION. */
1594 /* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS */
1595 /* */
1596 /* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
1597 /* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
1598 /* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED */
1599 /* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER. */
1600 /* */
1601 /* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY */
1602 /* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO */
1603 /* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN */
1604 /* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS. */
1605 /* */
1606 /* */
1607 /* ----------------------------------------- */
1608 /* + INITIAL READ REGION + */
1609 /* ----------------------------------------- */
1610 /* + INTERPRETED EXECUTE REGION + */
1611 /* ----------------------------------------- */
1612 /* + FINAL UPDATE REGION + */
1613 /* ----------------------------------------- */
1614 /* + FINAL READ REGION + */
1615 /* ----------------------------------------- */
1616 /* + SUBROUTINE REGION + */
1617 /* ----------------------------------------- */
1618 /* ---------------------------------------------------------------- */
1619 /* ---------------------------------------------------------------- */
1620 /* ----------------- INTERPRETED EXECUTION ----------------------- */
1621 /* ---------------------------------------------------------------- */
interpreterStartLab(Signal * signal,KeyReqStruct * req_struct)1622 int Dbtup::interpreterStartLab(Signal* signal,
1623 KeyReqStruct *req_struct)
1624 {
1625 Operationrec * const regOperPtr= operPtr.p;
1626 int TnoDataRW;
1627 Uint32 RtotalLen, start_index, dstLen;
1628 Uint32 *dst;
1629
1630 Uint32 RinitReadLen= cinBuffer[0];
1631 Uint32 RexecRegionLen= cinBuffer[1];
1632 Uint32 RfinalUpdateLen= cinBuffer[2];
1633 Uint32 RfinalRLen= cinBuffer[3];
1634 Uint32 RsubLen= cinBuffer[4];
1635
1636 Uint32 RattrinbufLen= req_struct->attrinfo_len;
1637 const BlockReference sendBref= req_struct->rec_blockref;
1638
1639 const Uint32 node = refToNode(sendBref);
1640 if(node != 0 && node != getOwnNodeId()) {
1641 start_index= 25;
1642 } else {
1643 jam();
1644 /**
1645 * execute direct
1646 */
1647 start_index= 3;
1648 }
1649 dst= &signal->theData[start_index];
1650 dstLen= (MAX_READ / 4) - start_index;
1651
1652 RtotalLen= RinitReadLen;
1653 RtotalLen += RexecRegionLen;
1654 RtotalLen += RfinalUpdateLen;
1655 RtotalLen += RfinalRLen;
1656 RtotalLen += RsubLen;
1657
1658 Uint32 RattroutCounter= 0;
1659 Uint32 RinstructionCounter= 5;
1660 Uint32 RlogSize= 0;
1661 if (((RtotalLen + 5) == RattrinbufLen) &&
1662 (RattrinbufLen >= 5) &&
1663 (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
1664 /* ---------------------------------------------------------------- */
1665 // We start by checking consistency. We must have the first five
1666 // words of the ATTRINFO to give us the length of the regions. The
1667 // size of these regions must be the same as the total ATTRINFO
1668 // length and finally the total length must be within the limits.
1669 /* ---------------------------------------------------------------- */
1670
1671 if (RinitReadLen > 0) {
1672 jam();
1673 /* ---------------------------------------------------------------- */
1674 // The first step that can be taken in the interpreter is to read
1675 // data of the tuple before any updates have been applied.
1676 /* ---------------------------------------------------------------- */
1677 TnoDataRW= readAttributes(req_struct,
1678 &cinBuffer[5],
1679 RinitReadLen,
1680 &dst[0],
1681 dstLen,
1682 false);
1683 if (TnoDataRW != -1) {
1684 RattroutCounter= TnoDataRW;
1685 RinstructionCounter += RinitReadLen;
1686 } else {
1687 jam();
1688 tupkeyErrorLab(signal);
1689 return -1;
1690 }
1691 }
1692 if (RexecRegionLen > 0) {
1693 jam();
1694 /* ---------------------------------------------------------------- */
1695 // The next step is the actual interpreted execution. This executes
1696 // a register-based virtual machine which can read and write attributes
1697 // to and from registers.
1698 /* ---------------------------------------------------------------- */
1699 Uint32 RsubPC= RinstructionCounter + RfinalUpdateLen + RfinalRLen;
1700 TnoDataRW= interpreterNextLab(signal,
1701 req_struct,
1702 &clogMemBuffer[0],
1703 &cinBuffer[RinstructionCounter],
1704 RexecRegionLen,
1705 &cinBuffer[RsubPC],
1706 RsubLen,
1707 &coutBuffer[0],
1708 sizeof(coutBuffer) / 4);
1709 if (TnoDataRW != -1) {
1710 RinstructionCounter += RexecRegionLen;
1711 RlogSize= TnoDataRW;
1712 } else {
1713 jam();
1714 /**
1715 * TUPKEY REF is sent from within interpreter
1716 */
1717 return -1;
1718 }
1719 }
1720 if (RfinalUpdateLen > 0) {
1721 jam();
1722 /* ---------------------------------------------------------------- */
1723 // We can also apply a set of updates without any conditions as part
1724 // of the interpreted execution.
1725 /* ---------------------------------------------------------------- */
1726 if (regOperPtr->op_struct.op_type == ZUPDATE) {
1727 TnoDataRW= updateAttributes(req_struct,
1728 &cinBuffer[RinstructionCounter],
1729 RfinalUpdateLen);
1730 if (TnoDataRW != -1) {
1731 MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
1732 &cinBuffer[RinstructionCounter],
1733 RfinalUpdateLen);
1734 RinstructionCounter += RfinalUpdateLen;
1735 RlogSize += RfinalUpdateLen;
1736 } else {
1737 jam();
1738 tupkeyErrorLab(signal);
1739 return -1;
1740 }
1741 } else {
1742 return TUPKEY_abort(signal, 19);
1743 }
1744 }
1745 if (RfinalRLen > 0) {
1746 jam();
1747 /* ---------------------------------------------------------------- */
1748 // The final action is that we can also read the tuple after it has
1749 // been updated.
1750 /* ---------------------------------------------------------------- */
1751 TnoDataRW= readAttributes(req_struct,
1752 &cinBuffer[RinstructionCounter],
1753 RfinalRLen,
1754 &dst[RattroutCounter],
1755 (dstLen - RattroutCounter),
1756 false);
1757 if (TnoDataRW != -1) {
1758 RattroutCounter += TnoDataRW;
1759 } else {
1760 jam();
1761 tupkeyErrorLab(signal);
1762 return -1;
1763 }
1764 }
1765 req_struct->log_size= RlogSize;
1766 req_struct->read_length= RattroutCounter;
1767 sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
1768 if (RlogSize > 0) {
1769 sendLogAttrinfo(signal, RlogSize, regOperPtr);
1770 }
1771 return 0;
1772 } else {
1773 return TUPKEY_abort(signal, 22);
1774 }
1775 }
1776
1777 /* ---------------------------------------------------------------- */
1778 /* WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
1779 /* BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY */
1780 /* NODES. */
1781 /* INPUT: LOG_ATTRINFOPTR WHERE TO FETCH DATA FROM */
1782 /* TLOG_START FIRST INDEX TO LOG */
1783 /* TLOG_END LAST INDEX + 1 TO LOG */
1784 /* ---------------------------------------------------------------- */
sendLogAttrinfo(Signal * signal,Uint32 TlogSize,Operationrec * const regOperPtr)1785 void Dbtup::sendLogAttrinfo(Signal* signal,
1786 Uint32 TlogSize,
1787 Operationrec * const regOperPtr)
1788
1789 {
1790 Uint32 TbufferIndex= 0;
1791 signal->theData[0]= regOperPtr->userpointer;
1792 while (TlogSize > 22) {
1793 MEMCOPY_NO_WORDS(&signal->theData[3],
1794 &clogMemBuffer[TbufferIndex],
1795 22);
1796 EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 25);
1797 TbufferIndex += 22;
1798 TlogSize -= 22;
1799 }
1800 MEMCOPY_NO_WORDS(&signal->theData[3],
1801 &clogMemBuffer[TbufferIndex],
1802 TlogSize);
1803 EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
1804 }
1805
1806 inline
1807 Uint32
brancher(Uint32 TheInstruction,Uint32 TprogramCounter)1808 brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
1809 {
1810 Uint32 TbranchDirection= TheInstruction >> 31;
1811 Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
1812 TprogramCounter--;
1813 if (TbranchDirection == 1) {
1814 jam();
1815 /* ---------------------------------------------------------------- */
1816 /* WE JUMP BACKWARDS. */
1817 /* ---------------------------------------------------------------- */
1818 return (TprogramCounter - TbranchLength);
1819 } else {
1820 jam();
1821 /* ---------------------------------------------------------------- */
1822 /* WE JUMP FORWARD. */
1823 /* ---------------------------------------------------------------- */
1824 return (TprogramCounter + TbranchLength);
1825 }
1826 }
1827
interpreterNextLab(Signal * signal,KeyReqStruct * req_struct,Uint32 * logMemory,Uint32 * mainProgram,Uint32 TmainProgLen,Uint32 * subroutineProg,Uint32 TsubroutineLen,Uint32 * tmpArea,Uint32 tmpAreaSz)1828 int Dbtup::interpreterNextLab(Signal* signal,
1829 KeyReqStruct* req_struct,
1830 Uint32* logMemory,
1831 Uint32* mainProgram,
1832 Uint32 TmainProgLen,
1833 Uint32* subroutineProg,
1834 Uint32 TsubroutineLen,
1835 Uint32 * tmpArea,
1836 Uint32 tmpAreaSz)
1837 {
1838 register Uint32* TcurrentProgram= mainProgram;
1839 register Uint32 TcurrentSize= TmainProgLen;
1840 register Uint32 RnoOfInstructions= 0;
1841 register Uint32 TprogramCounter= 0;
1842 register Uint32 theInstruction;
1843 register Uint32 theRegister;
1844 Uint32 TdataWritten= 0;
1845 Uint32 RstackPtr= 0;
1846 union {
1847 Uint32 TregMemBuffer[32];
1848 Uint64 align[16];
1849 };
1850 Uint32 TstackMemBuffer[32];
1851
1852 /* ---------------------------------------------------------------- */
1853 // Initialise all 8 registers to contain the NULL value.
1854 // In this version we can handle 32 and 64 bit unsigned integers.
1855 // They are handled as 64 bit values. Thus the 32 most significant
1856 // bits are zeroed for 32 bit values.
1857 /* ---------------------------------------------------------------- */
1858 TregMemBuffer[0]= 0;
1859 TregMemBuffer[4]= 0;
1860 TregMemBuffer[8]= 0;
1861 TregMemBuffer[12]= 0;
1862 TregMemBuffer[16]= 0;
1863 TregMemBuffer[20]= 0;
1864 TregMemBuffer[24]= 0;
1865 TregMemBuffer[28]= 0;
1866 Uint32 tmpHabitant= ~0;
1867
1868 while (RnoOfInstructions < 8000) {
1869 /* ---------------------------------------------------------------- */
1870 /* EXECUTE THE NEXT INTERPRETER INSTRUCTION. */
1871 /* ---------------------------------------------------------------- */
1872 RnoOfInstructions++;
1873 theInstruction= TcurrentProgram[TprogramCounter];
1874 theRegister= Interpreter::getReg1(theInstruction) << 2;
1875 if (TprogramCounter < TcurrentSize) {
1876 TprogramCounter++;
1877 switch (Interpreter::getOpCode(theInstruction)) {
1878 case Interpreter::READ_ATTR_INTO_REG:
1879 jam();
1880 /* ---------------------------------------------------------------- */
1881 // Read an attribute from the tuple into a register.
1882 // While reading an attribute we allow the attribute to be an array
1883 // as long as it fits in the 64 bits of the register.
1884 /* ---------------------------------------------------------------- */
1885 {
1886 Uint32 theAttrinfo= theInstruction;
1887 int TnoDataRW= readAttributes(req_struct,
1888 &theAttrinfo,
1889 (Uint32)1,
1890 &TregMemBuffer[theRegister],
1891 (Uint32)3,
1892 false);
1893 if (TnoDataRW == 2) {
1894 /* ------------------------------------------------------------- */
1895 // Two words read means that we get the instruction plus one 32
1896 // word read. Thus we set the register to be a 32 bit register.
1897 /* ------------------------------------------------------------- */
1898 TregMemBuffer[theRegister]= 0x50;
1899 // arithmetic conversion if big-endian
1900 * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
1901 } else if (TnoDataRW == 3) {
1902 /* ------------------------------------------------------------- */
1903 // Three words read means that we get the instruction plus two
1904 // 32 words read. Thus we set the register to be a 64 bit register.
1905 /* ------------------------------------------------------------- */
1906 TregMemBuffer[theRegister]= 0x60;
1907 TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
1908 TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
1909 } else if (TnoDataRW == 1) {
1910 /* ------------------------------------------------------------- */
1911 // One word read means that we must have read a NULL value. We set
1912 // the register to indicate a NULL value.
1913 /* ------------------------------------------------------------- */
1914 TregMemBuffer[theRegister]= 0;
1915 TregMemBuffer[theRegister + 2]= 0;
1916 TregMemBuffer[theRegister + 3]= 0;
1917 } else if (TnoDataRW == -1) {
1918 jam();
1919 tupkeyErrorLab(signal);
1920 return -1;
1921 } else {
1922 /* ------------------------------------------------------------- */
1923 // Any other return value from the read attribute here is not
1924 // allowed and will lead to a system crash.
1925 /* ------------------------------------------------------------- */
1926 ndbrequire(false);
1927 }
1928 break;
1929 }
1930
1931 case Interpreter::WRITE_ATTR_FROM_REG:
1932 jam();
1933 {
1934 Uint32 TattrId= theInstruction >> 16;
1935 Uint32 TattrDescrIndex= tabptr.p->tabDescriptor +
1936 (TattrId << ZAD_LOG_SIZE);
1937 Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
1938 Uint32 TregType= TregMemBuffer[theRegister];
1939
1940 /* --------------------------------------------------------------- */
1941 // Calculate the number of words of this attribute.
1942 // We allow writes into arrays as long as they fit into the 64 bit
1943 // register size.
1944 /* --------------------------------------------------------------- */
1945 Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
1946 Uint32 Toptype = operPtr.p->op_struct.op_type;
1947 Uint32 TdataForUpdate[3];
1948 Uint32 Tlen;
1949
1950 AttributeHeader ah(TattrId, TattrNoOfWords << 2);
1951 TdataForUpdate[0]= ah.m_value;
1952 TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
1953 TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
1954 Tlen= TattrNoOfWords + 1;
1955 if (Toptype == ZUPDATE) {
1956 if (TattrNoOfWords <= 2) {
1957 if (TattrNoOfWords == 1) {
1958 // arithmetic conversion if big-endian
1959 TdataForUpdate[1] = *(Int64*)&TregMemBuffer[theRegister + 2];
1960 TdataForUpdate[2] = 0;
1961 }
1962 if (TregType == 0) {
1963 /* --------------------------------------------------------- */
1964 // Write a NULL value into the attribute
1965 /* --------------------------------------------------------- */
1966 ah.setNULL();
1967 TdataForUpdate[0]= ah.m_value;
1968 Tlen= 1;
1969 }
1970 int TnoDataRW= updateAttributes(req_struct,
1971 &TdataForUpdate[0],
1972 Tlen);
1973 if (TnoDataRW != -1) {
1974 /* --------------------------------------------------------- */
1975 // Write the written data also into the log buffer so that it
1976 // will be logged.
1977 /* --------------------------------------------------------- */
1978 logMemory[TdataWritten + 0]= TdataForUpdate[0];
1979 logMemory[TdataWritten + 1]= TdataForUpdate[1];
1980 logMemory[TdataWritten + 2]= TdataForUpdate[2];
1981 TdataWritten += Tlen;
1982 } else {
1983 tupkeyErrorLab(signal);
1984 return -1;
1985 }
1986 } else {
1987 return TUPKEY_abort(signal, 15);
1988 }
1989 } else {
1990 return TUPKEY_abort(signal, 16);
1991 }
1992 break;
1993 }
1994
1995 case Interpreter::LOAD_CONST_NULL:
1996 jam();
1997 TregMemBuffer[theRegister]= 0; /* NULL INDICATOR */
1998 break;
1999
2000 case Interpreter::LOAD_CONST16:
2001 jam();
2002 TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
2003 * (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
2004 break;
2005
2006 case Interpreter::LOAD_CONST32:
2007 jam();
2008 TregMemBuffer[theRegister]= 0x50; /* 32 BIT UNSIGNED CONSTANT */
2009 * (Int64*)(TregMemBuffer+theRegister+2)= *
2010 (TcurrentProgram+TprogramCounter);
2011 TprogramCounter++;
2012 break;
2013
2014 case Interpreter::LOAD_CONST64:
2015 jam();
2016 TregMemBuffer[theRegister]= 0x60; /* 64 BIT UNSIGNED CONSTANT */
2017 TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
2018 TprogramCounter++);
2019 TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
2020 TprogramCounter++);
2021 break;
2022
2023 case Interpreter::ADD_REG_REG:
2024 jam();
2025 {
2026 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2027 Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2028
2029 Uint32 TrightType= TregMemBuffer[TrightRegister];
2030 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2031
2032
2033 Uint32 TleftType= TregMemBuffer[theRegister];
2034 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2035
2036 if ((TleftType | TrightType) != 0) {
2037 Uint64 Tdest0= Tleft0 + Tright0;
2038 * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2039 TregMemBuffer[TdestRegister]= 0x60;
2040 } else {
2041 return TUPKEY_abort(signal, 20);
2042 }
2043 break;
2044 }
2045
2046 case Interpreter::SUB_REG_REG:
2047 jam();
2048 {
2049 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2050 Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2051
2052 Uint32 TrightType= TregMemBuffer[TrightRegister];
2053 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2054
2055 Uint32 TleftType= TregMemBuffer[theRegister];
2056 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2057
2058 if ((TleftType | TrightType) != 0) {
2059 Int64 Tdest0= Tleft0 - Tright0;
2060 * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2061 TregMemBuffer[TdestRegister]= 0x60;
2062 } else {
2063 return TUPKEY_abort(signal, 20);
2064 }
2065 break;
2066 }
2067
2068 case Interpreter::BRANCH:
2069 TprogramCounter= brancher(theInstruction, TprogramCounter);
2070 break;
2071
2072 case Interpreter::BRANCH_REG_EQ_NULL:
2073 if (TregMemBuffer[theRegister] != 0) {
2074 jam();
2075 continue;
2076 } else {
2077 jam();
2078 TprogramCounter= brancher(theInstruction, TprogramCounter);
2079 }
2080 break;
2081
2082 case Interpreter::BRANCH_REG_NE_NULL:
2083 if (TregMemBuffer[theRegister] == 0) {
2084 jam();
2085 continue;
2086 } else {
2087 jam();
2088 TprogramCounter= brancher(theInstruction, TprogramCounter);
2089 }
2090 break;
2091
2092
2093 case Interpreter::BRANCH_EQ_REG_REG:
2094 {
2095 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2096
2097 Uint32 TleftType= TregMemBuffer[theRegister];
2098 Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2099 Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2100
2101 Uint32 TrightType= TregMemBuffer[TrightRegister];
2102 Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2103 Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2104 if ((TrightType | TleftType) != 0) {
2105 jam();
2106 if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
2107 TprogramCounter= brancher(theInstruction, TprogramCounter);
2108 }
2109 } else {
2110 return TUPKEY_abort(signal, 23);
2111 }
2112 break;
2113 }
2114
2115 case Interpreter::BRANCH_NE_REG_REG:
2116 {
2117 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2118
2119 Uint32 TleftType= TregMemBuffer[theRegister];
2120 Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2121 Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2122
2123 Uint32 TrightType= TregMemBuffer[TrightRegister];
2124 Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2125 Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2126 if ((TrightType | TleftType) != 0) {
2127 jam();
2128 if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
2129 TprogramCounter= brancher(theInstruction, TprogramCounter);
2130 }
2131 } else {
2132 return TUPKEY_abort(signal, 24);
2133 }
2134 break;
2135 }
2136
2137 case Interpreter::BRANCH_LT_REG_REG:
2138 {
2139 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2140
2141 Uint32 TrightType= TregMemBuffer[TrightRegister];
2142 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2143
2144 Uint32 TleftType= TregMemBuffer[theRegister];
2145 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2146
2147
2148 if ((TrightType | TleftType) != 0) {
2149 jam();
2150 if (Tleft0 < Tright0) {
2151 TprogramCounter= brancher(theInstruction, TprogramCounter);
2152 }
2153 } else {
2154 return TUPKEY_abort(signal, 24);
2155 }
2156 break;
2157 }
2158
2159 case Interpreter::BRANCH_LE_REG_REG:
2160 {
2161 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2162
2163 Uint32 TrightType= TregMemBuffer[TrightRegister];
2164 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2165
2166 Uint32 TleftType= TregMemBuffer[theRegister];
2167 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2168
2169
2170 if ((TrightType | TleftType) != 0) {
2171 jam();
2172 if (Tleft0 <= Tright0) {
2173 TprogramCounter= brancher(theInstruction, TprogramCounter);
2174 }
2175 } else {
2176 return TUPKEY_abort(signal, 26);
2177 }
2178 break;
2179 }
2180
2181 case Interpreter::BRANCH_GT_REG_REG:
2182 {
2183 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2184
2185 Uint32 TrightType= TregMemBuffer[TrightRegister];
2186 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2187
2188 Uint32 TleftType= TregMemBuffer[theRegister];
2189 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2190
2191
2192 if ((TrightType | TleftType) != 0) {
2193 jam();
2194 if (Tleft0 > Tright0){
2195 TprogramCounter= brancher(theInstruction, TprogramCounter);
2196 }
2197 } else {
2198 return TUPKEY_abort(signal, 27);
2199 }
2200 break;
2201 }
2202
2203 case Interpreter::BRANCH_GE_REG_REG:
2204 {
2205 Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2206
2207 Uint32 TrightType= TregMemBuffer[TrightRegister];
2208 Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2209
2210 Uint32 TleftType= TregMemBuffer[theRegister];
2211 Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2212
2213
2214 if ((TrightType | TleftType) != 0) {
2215 jam();
2216 if (Tleft0 >= Tright0){
2217 TprogramCounter= brancher(theInstruction, TprogramCounter);
2218 }
2219 } else {
2220 return TUPKEY_abort(signal, 28);
2221 }
2222 break;
2223 }
2224
2225 case Interpreter::BRANCH_ATTR_OP_ARG:{
2226 jam();
2227 Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
2228 Uint32 ins2 = TcurrentProgram[TprogramCounter];
2229 Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
2230 Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
2231
2232 if(tmpHabitant != attrId){
2233 Int32 TnoDataR = readAttributes(req_struct,
2234 &attrId, 1,
2235 tmpArea, tmpAreaSz,
2236 false);
2237
2238 if (TnoDataR == -1) {
2239 jam();
2240 tupkeyErrorLab(signal);
2241 return -1;
2242 }
2243 tmpHabitant= attrId;
2244 }
2245
2246 // get type
2247 attrId >>= 16;
2248 Uint32 TattrDescrIndex = tabptr.p->tabDescriptor +
2249 (attrId << ZAD_LOG_SIZE);
2250 Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
2251 Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
2252 Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
2253 void * cs = 0;
2254 if(AttributeOffset::getCharsetFlag(TattrDesc2))
2255 {
2256 Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
2257 cs = tabptr.p->charsetArray[pos];
2258 }
2259 const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
2260
2261 // get data
2262 AttributeHeader ah(tmpArea[0]);
2263 const char* s1 = (char*)&tmpArea[1];
2264 const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
2265 // fixed length in 5.0
2266 Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
2267
2268 bool r1_null = ah.isNULL();
2269 bool r2_null = argLen == 0;
2270 int res1;
2271 if (cond != Interpreter::LIKE &&
2272 cond != Interpreter::NOT_LIKE) {
2273 if (r1_null || r2_null) {
2274 // NULL==NULL and NULL<not-NULL
2275 res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
2276 } else {
2277 jam();
2278 if (unlikely(sqlType.m_cmp == 0))
2279 {
2280 return TUPKEY_abort(signal, 40);
2281 }
2282 res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
2283 }
2284 } else {
2285 if (r1_null || r2_null) {
2286 // NULL like NULL is true (has no practical use)
2287 res1 = r1_null && r2_null ? 0 : -1;
2288 } else {
2289 jam();
2290 if (unlikely(sqlType.m_like == 0))
2291 {
2292 return TUPKEY_abort(signal, 40);
2293 }
2294 res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
2295 }
2296 }
2297
2298 int res = 0;
2299 switch ((Interpreter::BinaryCondition)cond) {
2300 case Interpreter::EQ:
2301 res = (res1 == 0);
2302 break;
2303 case Interpreter::NE:
2304 res = (res1 != 0);
2305 break;
2306 // note the condition is backwards
2307 case Interpreter::LT:
2308 res = (res1 > 0);
2309 break;
2310 case Interpreter::LE:
2311 res = (res1 >= 0);
2312 break;
2313 case Interpreter::GT:
2314 res = (res1 < 0);
2315 break;
2316 case Interpreter::GE:
2317 res = (res1 <= 0);
2318 break;
2319 case Interpreter::LIKE:
2320 res = (res1 == 0);
2321 break;
2322 case Interpreter::NOT_LIKE:
2323 res = (res1 == 1);
2324 break;
2325 // XXX handle invalid value
2326 }
2327 #ifdef TRACE_INTERPRETER
2328 ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
2329 cond, attrId >> 16,
2330 attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
2331 #endif
2332 if (res)
2333 TprogramCounter = brancher(theInstruction, TprogramCounter);
2334 else
2335 {
2336 Uint32 tmp = ((argLen + 3) >> 2) + 1;
2337 TprogramCounter += tmp;
2338 }
2339 break;
2340 }
2341
2342 case Interpreter::BRANCH_ATTR_EQ_NULL:{
2343 jam();
2344 Uint32 ins2= TcurrentProgram[TprogramCounter];
2345 Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
2346
2347 if (tmpHabitant != attrId){
2348 Int32 TnoDataR= readAttributes(req_struct,
2349 &attrId, 1,
2350 tmpArea, tmpAreaSz,
2351 false);
2352
2353 if (TnoDataR == -1) {
2354 jam();
2355 tupkeyErrorLab(signal);
2356 return -1;
2357 }
2358 tmpHabitant= attrId;
2359 }
2360
2361 AttributeHeader ah(tmpArea[0]);
2362 if (ah.isNULL()){
2363 TprogramCounter= brancher(theInstruction, TprogramCounter);
2364 } else {
2365 TprogramCounter ++;
2366 }
2367 break;
2368 }
2369
2370 case Interpreter::BRANCH_ATTR_NE_NULL:{
2371 jam();
2372 Uint32 ins2= TcurrentProgram[TprogramCounter];
2373 Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
2374
2375 if (tmpHabitant != attrId){
2376 Int32 TnoDataR= readAttributes(req_struct,
2377 &attrId, 1,
2378 tmpArea, tmpAreaSz,
2379 false);
2380
2381 if (TnoDataR == -1) {
2382 jam();
2383 tupkeyErrorLab(signal);
2384 return -1;
2385 }
2386 tmpHabitant= attrId;
2387 }
2388
2389 AttributeHeader ah(tmpArea[0]);
2390 if (ah.isNULL()){
2391 TprogramCounter ++;
2392 } else {
2393 TprogramCounter= brancher(theInstruction, TprogramCounter);
2394 }
2395 break;
2396 }
2397
2398 case Interpreter::EXIT_OK:
2399 jam();
2400 #ifdef TRACE_INTERPRETER
2401 ndbout_c(" - exit_ok");
2402 #endif
2403 return TdataWritten;
2404
2405 case Interpreter::EXIT_OK_LAST:
2406 jam();
2407 #ifdef TRACE_INTERPRETER
2408 ndbout_c(" - exit_ok_last");
2409 #endif
2410 req_struct->last_row= true;
2411 return TdataWritten;
2412
2413 case Interpreter::EXIT_REFUSE:
2414 jam();
2415 #ifdef TRACE_INTERPRETER
2416 ndbout_c(" - exit_nok");
2417 #endif
2418 terrorCode= theInstruction >> 16;
2419 return TUPKEY_abort(signal, 29);
2420
2421 case Interpreter::CALL:
2422 jam();
2423 RstackPtr++;
2424 if (RstackPtr < 32) {
2425 TstackMemBuffer[RstackPtr]= TprogramCounter + 1;
2426 TprogramCounter= theInstruction >> 16;
2427 if (TprogramCounter < TsubroutineLen) {
2428 TcurrentProgram= subroutineProg;
2429 TcurrentSize= TsubroutineLen;
2430 } else {
2431 return TUPKEY_abort(signal, 30);
2432 }
2433 } else {
2434 return TUPKEY_abort(signal, 31);
2435 }
2436 break;
2437
2438 case Interpreter::RETURN:
2439 jam();
2440 if (RstackPtr > 0) {
2441 TprogramCounter= TstackMemBuffer[RstackPtr];
2442 RstackPtr--;
2443 if (RstackPtr == 0) {
2444 jam();
2445 /* ------------------------------------------------------------- */
2446 // We are back to the main program.
2447 /* ------------------------------------------------------------- */
2448 TcurrentProgram= mainProgram;
2449 TcurrentSize= TmainProgLen;
2450 }
2451 } else {
2452 return TUPKEY_abort(signal, 32);
2453 }
2454 break;
2455
2456 default:
2457 return TUPKEY_abort(signal, 33);
2458 }
2459 } else {
2460 return TUPKEY_abort(signal, 34);
2461 }
2462 }
2463 return TUPKEY_abort(signal, 35);
2464 }
2465
2466 /**
2467 * expand_var_part - copy packed variable attributes to fully expanded size
2468 *
2469 * dst: where to start writing attribute data
2470 * dst_off_ptr where to write attribute offsets
2471 * src pointer to packed attributes
2472 * tabDesc array of attribute descriptors (used for getting max size)
2473 * no_of_attr no of atributes to expand
2474 */
2475 Uint32*
expand_var_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,const Uint32 * tabDesc,const Uint16 * order)2476 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
2477 const Uint32* src,
2478 const Uint32 * tabDesc,
2479 const Uint16* order)
2480 {
2481 char* dst_ptr= dst->m_data_ptr;
2482 Uint32 no_attr= dst->m_var_len_offset;
2483 Uint16* dst_off_ptr= dst->m_offset_array_ptr;
2484 Uint16* dst_len_ptr= dst_off_ptr + no_attr;
2485 const Uint16* src_off_ptr= (const Uint16*)src;
2486 const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
2487
2488 Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
2489 for(Uint32 i = 0; i<no_attr; i++)
2490 {
2491 next_pos= *src_off_ptr++;
2492 len= next_pos - tmp;
2493
2494 *dst_off_ptr++ = dst_off;
2495 *dst_len_ptr++ = dst_off + len;
2496 memcpy(dst_ptr, src_ptr, len);
2497 src_ptr += len;
2498
2499 max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
2500 dst_ptr += max_len; // Max size
2501 dst_off += max_len;
2502
2503 tmp= next_pos;
2504 }
2505
2506 return ALIGN_WORD(dst_ptr);
2507 }
2508
2509 void
expand_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],Tuple_header * src,const Tablerec * tabPtrP,bool disk)2510 Dbtup::expand_tuple(KeyReqStruct* req_struct,
2511 Uint32 sizes[2],
2512 Tuple_header* src,
2513 const Tablerec* tabPtrP,
2514 bool disk)
2515 {
2516 Uint32 bits= src->m_header_bits;
2517 Tuple_header* ptr= req_struct->m_tuple_ptr;
2518
2519 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
2520 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
2521 Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
2522 Uint32 order_desc= tabPtrP->m_real_order_descriptor;
2523
2524 Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
2525 const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
2526 const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
2527 const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
2528 const Uint32 *desc= (Uint32*)req_struct->attr_descr;
2529 const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
2530 order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
2531
2532 if(mm_vars)
2533 {
2534
2535 Uint32 step; // in bytes
2536 const Uint32 *src_data= src_ptr;
2537 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
2538 if(bits & Tuple_header::CHAINED_ROW)
2539 {
2540 Ptr<Page> var_page;
2541 src_data= get_ptr(&var_page, *var_ref);
2542 step= 4;
2543 sizes[MM]= (2 + (mm_vars << 1) + ((Uint16*)src_data)[mm_vars] + 3) >> 2;
2544 req_struct->m_varpart_page_ptr = var_page;
2545 }
2546 else
2547 {
2548 step= (2 + (mm_vars << 1) + ((Uint16*)src_ptr)[mm_vars]);
2549 sizes[MM]= (step + 3) >> 2;
2550 req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
2551 }
2552 dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
2553 dst->m_offset_array_ptr= req_struct->var_pos_array;
2554 dst->m_var_len_offset= mm_vars;
2555 dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
2556
2557 dst_ptr= expand_var_part(dst, src_data, desc, order);
2558 ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
2559 ndbassert((UintPtr(src_ptr) & 3) == 0);
2560 src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
2561
2562 sizes[MM] += fix_size;
2563 memcpy(ptr, src, 4*fix_size);
2564 }
2565 else
2566 {
2567 sizes[MM]= 1;
2568 memcpy(ptr, src, 4*fix_size);
2569 }
2570
2571 src->m_header_bits= bits &
2572 ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
2573
2574 sizes[DD]= 0;
2575 if(disk && dd_tot)
2576 {
2577 const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
2578 order += mm_vars;
2579
2580 if(bits & Tuple_header::DISK_INLINE)
2581 {
2582 // Only on copy tuple
2583 ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
2584 }
2585 else
2586 {
2587 Local_key key;
2588 memcpy(&key, disk_ref, sizeof(key));
2589 key.m_page_no= req_struct->m_disk_page_ptr.i;
2590 src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
2591 }
2592 bits |= Tuple_header::DISK_INLINE;
2593
2594 // Fix diskpart
2595 req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
2596 memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
2597 sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
2598
2599 ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
2600
2601 ndbrequire(dd_vars == 0);
2602 }
2603
2604 ptr->m_header_bits= (bits & ~(Uint32)(Tuple_header::CHAINED_ROW));
2605 }
2606
2607 void
prepare_read(KeyReqStruct * req_struct,Tablerec * tabPtrP,bool disk)2608 Dbtup::prepare_read(KeyReqStruct* req_struct,
2609 Tablerec* tabPtrP, bool disk)
2610 {
2611 Tuple_header* ptr= req_struct->m_tuple_ptr;
2612
2613 Uint32 bits= ptr->m_header_bits;
2614 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
2615 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
2616
2617 const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
2618 const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
2619 const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
2620 if(mm_vars)
2621 {
2622 const Uint32 *src_data= src_ptr;
2623 KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
2624 if(bits & Tuple_header::CHAINED_ROW)
2625 {
2626 #if VM_TRACE
2627
2628 #endif
2629 src_data= get_ptr(* var_ref);
2630 }
2631 dst->m_data_ptr= (char*)(((Uint16*)src_data)+mm_vars+1);
2632 dst->m_offset_array_ptr= (Uint16*)src_data;
2633 dst->m_var_len_offset= 1;
2634 dst->m_max_var_offset= ((Uint16*)src_data)[mm_vars];
2635
2636 // disk part start after varsize (aligned)
2637 src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
2638 }
2639
2640 if(disk && dd_tot)
2641 {
2642 const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
2643
2644 if(bits & Tuple_header::DISK_INLINE)
2645 {
2646 // Only on copy tuple
2647 ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
2648 }
2649 else
2650 {
2651 // XXX
2652 Local_key key;
2653 memcpy(&key, disk_ref, sizeof(key));
2654 key.m_page_no= req_struct->m_disk_page_ptr.i;
2655 src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
2656 }
2657 // Fix diskpart
2658 req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
2659 ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
2660 ndbrequire(dd_vars == 0);
2661 }
2662 }
2663
2664 void
shrink_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],const Tablerec * tabPtrP,bool disk)2665 Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
2666 const Tablerec* tabPtrP, bool disk)
2667 {
2668 ndbassert(tabPtrP->need_shrink());
2669 Tuple_header* ptr= req_struct->m_tuple_ptr;
2670
2671 Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
2672 Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
2673 Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
2674
2675 Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
2676 Uint16* src_off_ptr= req_struct->var_pos_array;
2677
2678 sizes[MM] = 1;
2679 sizes[DD] = 0;
2680 if(mm_vars)
2681 {
2682 Uint16* dst_off_ptr= (Uint16*)dst_ptr;
2683 char* dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
2684 char* src_data_ptr= dst_data_ptr;
2685 Uint32 off= 0;
2686 for(Uint32 i= 0; i<mm_vars; i++)
2687 {
2688 const char* data_ptr= src_data_ptr + *src_off_ptr;
2689 Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
2690 * dst_off_ptr++= off;
2691 memmove(dst_data_ptr, data_ptr, len);
2692 off += len;
2693 src_off_ptr++;
2694 dst_data_ptr += len;
2695 }
2696 *dst_off_ptr= off;
2697 ndbassert(dst_data_ptr <= ((char*)ptr) + 8192);
2698 ndbassert((UintPtr(ptr) & 3) == 0);
2699 sizes[MM]= (dst_data_ptr + 3 - ((char*)ptr)) >> 2;
2700
2701 dst_ptr = ALIGN_WORD(dst_data_ptr);
2702 }
2703
2704 if(disk && dd_tot)
2705 {
2706 Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
2707 req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
2708 ndbrequire(dd_vars == 0);
2709 sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
2710 memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
2711 }
2712 }
2713
2714 void
validate_page(Tablerec * regTabPtr,Var_page * p)2715 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
2716 {
2717 Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
2718 Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
2719 Tuple_header::HeaderSize;
2720
2721 if(mm_vars == 0)
2722 return;
2723
2724 for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
2725 {
2726 FragrecordPtr fragPtr;
2727
2728 if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
2729 continue;
2730
2731 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
2732 for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
2733 {
2734 Uint32 real= getRealpid(fragPtr.p, P);
2735 Var_page* page= (Var_page*)c_page_pool.getPtr(real);
2736
2737 for(Uint32 i=1; i<page->high_index; i++)
2738 {
2739 Uint32 idx= page->get_index_word(i);
2740 Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
2741 if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
2742 {
2743 Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
2744 Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
2745 if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
2746 {
2747 ndbassert(len == fix_sz + 1);
2748 Local_key tmp; tmp.assref(*part);
2749 Ptr<Page> tmpPage;
2750 part= get_ptr(&tmpPage, *(Var_part_ref*)part);
2751 len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
2752 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
2753 ndbassert(len >= ((sz + 3) >> 2));
2754 }
2755 else
2756 {
2757 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
2758 ndbassert(len >= ((sz+3)>>2)+fix_sz);
2759 }
2760 if(ptr->m_operation_ptr_i != RNIL)
2761 {
2762 c_operation_pool.getPtr(ptr->m_operation_ptr_i);
2763 }
2764 }
2765 else if(!(idx & Var_page::FREE))
2766 {
2767 /**
2768 * Chain
2769 */
2770 Uint32 *part= page->get_ptr(i);
2771 Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
2772 ndbassert(len >= ((sz + 3) >> 2));
2773 }
2774 else
2775 {
2776
2777 }
2778 }
2779 if(p == 0 && page->high_index > 1)
2780 page->reorg((Var_page*)ctemp_page);
2781 }
2782 }
2783
2784 if(p == 0)
2785 {
2786 validate_page(regTabPtr, (Var_page*)1);
2787 }
2788 }
2789
2790 int
handle_size_change_after_update(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Uint32 sizes[4])2791 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
2792 Tuple_header* org,
2793 Operationrec* regOperPtr,
2794 Fragrecord* regFragPtr,
2795 Tablerec* regTabPtr,
2796 Uint32 sizes[4])
2797 {
2798 ndbrequire(sizes[1] == sizes[3]);
2799 //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
2800 if(0)
2801 printf("%p %d %d - handle_size_change_after_update ",
2802 req_struct->m_tuple_ptr,
2803 regOperPtr->m_tuple_location.m_page_no,
2804 regOperPtr->m_tuple_location.m_page_idx);
2805
2806 Uint32 bits= org->m_header_bits;
2807 Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
2808 Uint32 fix_sz = regTabPtr->m_offsets[MM].m_fix_header_size;
2809
2810 if(sizes[MM] == sizes[2+MM])
2811 ;
2812 else if(sizes[MM] > sizes[2+MM])
2813 {
2814 if(0) ndbout_c("shrink");
2815 copy_bits |= Tuple_header::MM_SHRINK;
2816 }
2817 else
2818 {
2819 if(0) printf("grow - ");
2820 Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
2821 Var_page* pageP= (Var_page*)pagePtr.p;
2822 Uint32 idx, alloc, needed;
2823 Var_part_ref *refptr = org->get_var_part_ref_ptr(regTabPtr);
2824 ndbassert(bits & Tuple_header::CHAINED_ROW);
2825
2826 Local_key ref;
2827 refptr->copyout(&ref);
2828 idx= ref.m_page_idx;
2829 if (! (copy_bits & Tuple_header::CHAINED_ROW))
2830 {
2831 c_page_pool.getPtr(pagePtr, ref.m_page_no);
2832 pageP = (Var_page*)pagePtr.p;
2833 }
2834 alloc= pageP->get_entry_len(idx);
2835 #ifdef VM_TRACE
2836 if(!pageP->get_entry_chain(idx))
2837 ndbout << *pageP << endl;
2838 #endif
2839 ndbassert(pageP->get_entry_chain(idx));
2840 needed= sizes[2+MM] - fix_sz;
2841
2842 if(needed <= alloc)
2843 {
2844 //ndbassert(!regOperPtr->is_first_operation());
2845 if (0) ndbout_c(" no grow");
2846 return 0;
2847 }
2848 copy_bits |= Tuple_header::MM_GROWN;
2849 if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr,
2850 refptr, alloc, needed)))
2851 return -1;
2852
2853 if (regTabPtr->m_bits & Tablerec::TR_Checksum)
2854 {
2855 jam();
2856 setChecksum(org, regTabPtr);
2857 }
2858 }
2859 req_struct->m_tuple_ptr->m_header_bits = copy_bits;
2860 return 0;
2861 }
2862
2863 int
nr_update_gci(Uint32 fragPtrI,const Local_key * key,Uint32 gci)2864 Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
2865 {
2866 FragrecordPtr fragPtr;
2867 fragPtr.i= fragPtrI;
2868 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
2869 TablerecPtr tablePtr;
2870 tablePtr.i= fragPtr.p->fragTableId;
2871 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
2872
2873 if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
2874 {
2875 Local_key tmp = *key;
2876 PagePtr page_ptr;
2877
2878 int ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
2879
2880 if (ret)
2881 return -1;
2882
2883 Tuple_header* ptr = (Tuple_header*)
2884 ((Fix_page*)page_ptr.p)->get_ptr(tmp.m_page_idx, 0);
2885
2886 ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
2887 *ptr->get_mm_gci(tablePtr.p) = gci;
2888 }
2889 return 0;
2890 }
2891
2892 int
nr_read_pk(Uint32 fragPtrI,const Local_key * key,Uint32 * dst,bool & copy)2893 Dbtup::nr_read_pk(Uint32 fragPtrI,
2894 const Local_key* key, Uint32* dst, bool& copy)
2895 {
2896
2897 FragrecordPtr fragPtr;
2898 fragPtr.i= fragPtrI;
2899 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
2900 TablerecPtr tablePtr;
2901 tablePtr.i= fragPtr.p->fragTableId;
2902 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
2903
2904 Local_key tmp = *key;
2905
2906
2907 PagePtr page_ptr;
2908 int ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
2909 if (ret)
2910 return -1;
2911
2912 KeyReqStruct req_struct;
2913 Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
2914
2915 req_struct.m_page_ptr = page_ptr;
2916 req_struct.m_tuple_ptr = (Tuple_header*)ptr;
2917 Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
2918
2919 ret = 0;
2920 copy = false;
2921 if (! (bits & Tuple_header::FREE))
2922 {
2923 if (bits & Tuple_header::ALLOC)
2924 {
2925 Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
2926 Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
2927 ndbassert(!opPtrP->m_copy_tuple_location.isNull());
2928 req_struct.m_tuple_ptr= (Tuple_header*)
2929 c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
2930 copy = true;
2931 }
2932 req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
2933 req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
2934
2935 Uint32 num_attr= tablePtr.p->m_no_of_attributes;
2936 Uint32 descr_start= tablePtr.p->tabDescriptor;
2937 TableDescriptor *tab_descr= &tableDescriptor[descr_start];
2938 ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
2939 req_struct.attr_descr= tab_descr;
2940
2941 if (tablePtr.p->need_expand())
2942 prepare_read(&req_struct, tablePtr.p, false);
2943
2944 const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
2945 const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
2946 // read pk attributes from original tuple
2947
2948 // new globals
2949 tabptr= tablePtr;
2950 fragptr= fragPtr;
2951 operPtr.i= RNIL;
2952 operPtr.p= NULL;
2953
2954 // do it
2955 ret = readAttributes(&req_struct,
2956 attrIds,
2957 numAttrs,
2958 dst,
2959 ZNIL, false);
2960
2961 // done
2962 if (likely(ret != -1)) {
2963 // remove headers
2964 Uint32 n= 0;
2965 Uint32 i= 0;
2966 while (n < numAttrs) {
2967 const AttributeHeader ah(dst[i]);
2968 Uint32 size= ah.getDataSize();
2969 ndbrequire(size != 0);
2970 for (Uint32 j= 0; j < size; j++) {
2971 dst[i + j - n]= dst[i + j + 1];
2972 }
2973 n+= 1;
2974 i+= 1 + size;
2975 }
2976 ndbrequire((int)i == ret);
2977 ret -= numAttrs;
2978 } else {
2979 return terrorCode ? (-(int)terrorCode) : -1;
2980 }
2981 }
2982
2983 if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
2984 {
2985 dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
2986 }
2987 else
2988 {
2989 dst[ret] = 0;
2990 }
2991 return ret;
2992 }
2993
2994 #include <signaldata/TuxMaint.hpp>
2995
2996 int
nr_delete(Signal * signal,Uint32 senderData,Uint32 fragPtrI,const Local_key * key,Uint32 gci)2997 Dbtup::nr_delete(Signal* signal, Uint32 senderData,
2998 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
2999 {
3000 FragrecordPtr fragPtr;
3001 fragPtr.i= fragPtrI;
3002 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3003 TablerecPtr tablePtr;
3004 tablePtr.i= fragPtr.p->fragTableId;
3005 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3006
3007 Local_key tmp = * key;
3008 tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no);
3009
3010 PagePtr pagePtr;
3011 Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
3012
3013 if (!tablePtr.p->tuxCustomTriggers.isEmpty())
3014 {
3015 jam();
3016 TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
3017 req->tableId = fragPtr.p->fragTableId;
3018 req->fragId = fragPtr.p->fragmentId;
3019 req->pageId = tmp.m_page_no;
3020 req->pageIndex = tmp.m_page_idx;
3021 req->tupVersion = ptr->get_tuple_version();
3022 req->opInfo = TuxMaintReq::OpRemove;
3023 removeTuxEntries(signal, tablePtr.p);
3024 }
3025
3026 Local_key disk;
3027 memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
3028
3029 if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
3030 {
3031 jam();
3032 free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
3033 } else {
3034 jam();
3035 free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
3036 }
3037
3038 if (tablePtr.p->m_no_of_disk_attributes)
3039 {
3040 jam();
3041
3042 Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
3043 tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
3044
3045 int res = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
3046 ndbrequire(res == 0);
3047
3048 /**
3049 * 1) alloc log buffer
3050 * 2) get page
3051 * 3) get log buffer
3052 * 4) delete tuple
3053 */
3054 Page_cache_client::Request preq;
3055 preq.m_page = disk;
3056 preq.m_callback.m_callbackData = senderData;
3057 preq.m_callback.m_callbackFunction =
3058 safe_cast(&Dbtup::nr_delete_page_callback);
3059 int flags = Page_cache_client::COMMIT_REQ;
3060
3061 #ifdef ERROR_INSERT
3062 if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
3063 {
3064 int rnd = rand() % 100;
3065 int slp = 0;
3066 if (ERROR_INSERTED(4024))
3067 {
3068 slp = 3000;
3069 }
3070 else if (rnd > 90)
3071 {
3072 slp = 3000;
3073 }
3074 else if (rnd > 70)
3075 {
3076 slp = 100;
3077 }
3078
3079 ndbout_c("rnd: %d slp: %d", rnd, slp);
3080
3081 if (slp)
3082 {
3083 flags |= Page_cache_client::DELAY_REQ;
3084 preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
3085 }
3086 }
3087 #endif
3088
3089 res = m_pgman.get_page(signal, preq, flags);
3090 if (res == 0)
3091 {
3092 goto timeslice;
3093 }
3094 else if (unlikely(res == -1))
3095 {
3096 return -1;
3097 }
3098
3099 PagePtr disk_page = *(PagePtr*)&m_pgman.m_ptr;
3100 disk_page_set_dirty(disk_page);
3101
3102 preq.m_callback.m_callbackFunction =
3103 safe_cast(&Dbtup::nr_delete_log_buffer_callback);
3104 Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
3105 res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
3106 switch(res){
3107 case 0:
3108 signal->theData[2] = disk_page.i;
3109 goto timeslice;
3110 case -1:
3111 ndbrequire("NOT YET IMPLEMENTED" == 0);
3112 break;
3113 }
3114
3115 if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
3116 disk_page_free(signal, tablePtr.p, fragPtr.p,
3117 &disk, *(PagePtr*)&disk_page, gci);
3118 return 0;
3119 }
3120
3121 return 0;
3122
3123 timeslice:
3124 memcpy(signal->theData, &disk, sizeof(disk));
3125 return 1;
3126 }
3127
3128 void
nr_delete_page_callback(Signal * signal,Uint32 userpointer,Uint32 page_id)3129 Dbtup::nr_delete_page_callback(Signal* signal,
3130 Uint32 userpointer, Uint32 page_id)
3131 {
3132 Ptr<GlobalPage> gpage;
3133 m_global_page_pool.getPtr(gpage, page_id);
3134 PagePtr pagePtr= *(PagePtr*)&gpage;
3135 disk_page_set_dirty(pagePtr);
3136 Dblqh::Nr_op_info op;
3137 op.m_ptr_i = userpointer;
3138 op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
3139 op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
3140 c_lqh->get_nr_op_info(&op, page_id);
3141
3142 Ptr<Fragrecord> fragPtr;
3143 fragPtr.i= op.m_tup_frag_ptr_i;
3144 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3145
3146 Ptr<Tablerec> tablePtr;
3147 tablePtr.i = fragPtr.p->fragTableId;
3148 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3149
3150 Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
3151 tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
3152
3153 Callback cb;
3154 cb.m_callbackData = userpointer;
3155 cb.m_callbackFunction =
3156 safe_cast(&Dbtup::nr_delete_log_buffer_callback);
3157 Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
3158 int res= lgman.get_log_buffer(signal, sz, &cb);
3159 switch(res){
3160 case 0:
3161 return;
3162 case -1:
3163 ndbrequire("NOT YET IMPLEMENTED" == 0);
3164 break;
3165 }
3166
3167 if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
3168 disk_page_free(signal, tablePtr.p, fragPtr.p,
3169 &op.m_disk_ref, pagePtr, op.m_gci);
3170
3171 c_lqh->nr_delete_complete(signal, &op);
3172 return;
3173 }
3174
3175 void
nr_delete_log_buffer_callback(Signal * signal,Uint32 userpointer,Uint32 unused)3176 Dbtup::nr_delete_log_buffer_callback(Signal* signal,
3177 Uint32 userpointer,
3178 Uint32 unused)
3179 {
3180 Dblqh::Nr_op_info op;
3181 op.m_ptr_i = userpointer;
3182 c_lqh->get_nr_op_info(&op, RNIL);
3183
3184 Ptr<Fragrecord> fragPtr;
3185 fragPtr.i= op.m_tup_frag_ptr_i;
3186 ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3187
3188 Ptr<Tablerec> tablePtr;
3189 tablePtr.i = fragPtr.p->fragTableId;
3190 ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3191
3192 Ptr<GlobalPage> gpage;
3193 m_global_page_pool.getPtr(gpage, op.m_page_id);
3194 PagePtr pagePtr= *(PagePtr*)&gpage;
3195
3196 /**
3197 * reset page no
3198 */
3199 if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
3200
3201 disk_page_free(signal, tablePtr.p, fragPtr.p,
3202 &op.m_disk_ref, pagePtr, op.m_gci);
3203
3204 c_lqh->nr_delete_complete(signal, &op);
3205 }
3206