1 /* Copyright (c) 2003-2008 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 
18 #define DBTUP_C
19 #include <Dblqh.hpp>
20 #include "Dbtup.hpp"
21 #include <RefConvert.hpp>
22 #include <ndb_limits.h>
23 #include <pc.hpp>
24 #include <AttributeDescriptor.hpp>
25 #include "AttributeOffset.hpp"
26 #include <AttributeHeader.hpp>
27 #include <Interpreter.hpp>
28 #include <signaldata/TupKey.hpp>
29 #include <signaldata/AttrInfo.hpp>
30 #include <NdbSqlUtil.hpp>
31 
32 /* ----------------------------------------------------------------- */
33 /* -----------       INIT_STORED_OPERATIONREC         -------------- */
34 /* ----------------------------------------------------------------- */
initStoredOperationrec(Operationrec * regOperPtr,KeyReqStruct * req_struct,Uint32 storedId)35 int Dbtup::initStoredOperationrec(Operationrec* regOperPtr,
36                                   KeyReqStruct* req_struct,
37                                   Uint32 storedId)
38 {
39   jam();
40   StoredProcPtr storedPtr;
41   c_storedProcPool.getPtr(storedPtr, storedId);
42   if (storedPtr.i != RNIL) {
43     if (storedPtr.p->storedCode == ZSCAN_PROCEDURE) {
44       storedPtr.p->storedCounter++;
45       regOperPtr->firstAttrinbufrec= storedPtr.p->storedLinkFirst;
46       regOperPtr->lastAttrinbufrec= storedPtr.p->storedLinkLast;
47       regOperPtr->currentAttrinbufLen= storedPtr.p->storedProcLength;
48       req_struct->attrinfo_len= storedPtr.p->storedProcLength;
49       return ZOK;
50     }
51   }
52   terrorCode= ZSTORED_PROC_ID_ERROR;
53   return terrorCode;
54 }
55 
copyAttrinfo(Operationrec * regOperPtr,Uint32 * inBuffer)56 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
57                          Uint32* inBuffer)
58 {
59   AttrbufrecPtr copyAttrBufPtr;
60   Uint32 RnoOfAttrBufrec= cnoOfAttrbufrec;
61   int RbufLen;
62   Uint32 RinBufIndex= 0;
63   Uint32 Rnext;
64   Uint32 Rfirst;
65   Uint32 TstoredProcedure= (regOperPtr->storedProcedureId != ZNIL);
66   Uint32 RnoFree= cnoFreeAttrbufrec;
67 
68 //-------------------------------------------------------------------------
69 // As a prelude to the execution of the TUPKEYREQ we will copy the program
70 // into the inBuffer to enable easy execution without any complex jumping
71 // between the buffers. In particular this will make the interpreter less
72 // complex. Hopefully it does also improve performance.
73 //-------------------------------------------------------------------------
74   copyAttrBufPtr.i= regOperPtr->firstAttrinbufrec;
75   while (copyAttrBufPtr.i != RNIL) {
76     jam();
77     ndbrequire(copyAttrBufPtr.i < RnoOfAttrBufrec);
78     ptrAss(copyAttrBufPtr, attrbufrec);
79     RbufLen = copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
80     Rnext = copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
81     Rfirst = cfirstfreeAttrbufrec;
82     /*
83      * ATTRINFO comes from 2 mutually exclusive places:
84      * 1) TUPKEYREQ (also interpreted part)
85      * 2) STORED_PROCREQ before scan start
86      * Assert here that both have a check for overflow.
87      * The "<" instead of "<=" is intentional.
88      */
89     ndbrequire(RinBufIndex + RbufLen < ZATTR_BUFFER_SIZE);
90     MEMCOPY_NO_WORDS(&inBuffer[RinBufIndex],
91                      &copyAttrBufPtr.p->attrbuf[0],
92                      RbufLen);
93     RinBufIndex += RbufLen;
94     if (!TstoredProcedure) {
95       copyAttrBufPtr.p->attrbuf[ZBUF_NEXT]= Rfirst;
96       cfirstfreeAttrbufrec= copyAttrBufPtr.i;
97       RnoFree++;
98     }
99     copyAttrBufPtr.i= Rnext;
100   }
101   cnoFreeAttrbufrec= RnoFree;
102   if (TstoredProcedure) {
103     jam();
104     StoredProcPtr storedPtr;
105     c_storedProcPool.getPtr(storedPtr, (Uint32)regOperPtr->storedProcedureId);
106     ndbrequire(storedPtr.p->storedCode == ZSCAN_PROCEDURE);
107     storedPtr.p->storedCounter--;
108   }
109   // Release the ATTRINFO buffers
110   regOperPtr->storedProcedureId= RNIL;
111   regOperPtr->firstAttrinbufrec= RNIL;
112   regOperPtr->lastAttrinbufrec= RNIL;
113   regOperPtr->m_any_value= 0;
114 }
115 
handleATTRINFOforTUPKEYREQ(Signal * signal,const Uint32 * data,Uint32 len,Operationrec * regOperPtr)116 void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
117                                        const Uint32 *data,
118 				       Uint32 len,
119                                        Operationrec * regOperPtr)
120 {
121   while(len)
122   {
123     Uint32 length = len > AttrInfo::DataLength ? AttrInfo::DataLength : len;
124 
125     AttrbufrecPtr TAttrinbufptr;
126     TAttrinbufptr.i= cfirstfreeAttrbufrec;
127     if ((cfirstfreeAttrbufrec < cnoOfAttrbufrec) &&
128 	(cnoFreeAttrbufrec > MIN_ATTRBUF)) {
129       ptrAss(TAttrinbufptr, attrbufrec);
130       MEMCOPY_NO_WORDS(&TAttrinbufptr.p->attrbuf[0],
131 		       data,
132 		       length);
133       Uint32 RnoFree= cnoFreeAttrbufrec;
134       Uint32 Rnext= TAttrinbufptr.p->attrbuf[ZBUF_NEXT];
135       TAttrinbufptr.p->attrbuf[ZBUF_DATA_LEN]= length;
136       TAttrinbufptr.p->attrbuf[ZBUF_NEXT]= RNIL;
137 
138       AttrbufrecPtr locAttrinbufptr;
139       Uint32 RnewLen= regOperPtr->currentAttrinbufLen;
140 
141       locAttrinbufptr.i= regOperPtr->lastAttrinbufrec;
142       cfirstfreeAttrbufrec= Rnext;
143       cnoFreeAttrbufrec= RnoFree - 1;
144       RnewLen += length;
145       regOperPtr->lastAttrinbufrec= TAttrinbufptr.i;
146       regOperPtr->currentAttrinbufLen= RnewLen;
147       if (locAttrinbufptr.i == RNIL) {
148 	regOperPtr->firstAttrinbufrec= TAttrinbufptr.i;
149       } else {
150 	jam();
151 	ptrCheckGuard(locAttrinbufptr, cnoOfAttrbufrec, attrbufrec);
152 	locAttrinbufptr.p->attrbuf[ZBUF_NEXT]= TAttrinbufptr.i;
153       }
154       if (RnewLen < ZATTR_BUFFER_SIZE) {
155       } else {
156 	jam();
157 	set_trans_state(regOperPtr, TRANS_TOO_MUCH_AI);
158 	return;
159       }
160     } else if (cnoFreeAttrbufrec <= MIN_ATTRBUF) {
161       jam();
162       set_trans_state(regOperPtr, TRANS_ERROR_WAIT_TUPKEYREQ);
163     } else {
164       ndbrequire(false);
165     }
166 
167     len -= length;
168     data += length;
169   }
170 }
171 
execATTRINFO(Signal * signal)172 void Dbtup::execATTRINFO(Signal* signal)
173 {
174   Uint32 Rsig0= signal->theData[0];
175   Uint32 Rlen= signal->length();
176   jamEntry();
177 
178   receive_attrinfo(signal, Rsig0, signal->theData+3, Rlen-3);
179 }
180 
181 void
receive_attrinfo(Signal * signal,Uint32 op,const Uint32 * data,Uint32 Rlen)182 Dbtup::receive_attrinfo(Signal* signal, Uint32 op,
183 			const Uint32* data, Uint32 Rlen)
184 {
185   OperationrecPtr regOpPtr;
186   regOpPtr.i= op;
187   c_operation_pool.getPtr(regOpPtr, op);
188   TransState trans_state= get_trans_state(regOpPtr.p);
189   if (trans_state == TRANS_IDLE) {
190     handleATTRINFOforTUPKEYREQ(signal, data, Rlen, regOpPtr.p);
191     return;
192   } else if (trans_state == TRANS_WAIT_STORED_PROCEDURE_ATTR_INFO) {
193     storedProcedureAttrInfo(signal, regOpPtr.p, data, Rlen, false);
194     return;
195   }
196   switch (trans_state) {
197   case TRANS_ERROR_WAIT_STORED_PROCREQ:
198     jam();
199   case TRANS_TOO_MUCH_AI:
200     jam();
201   case TRANS_ERROR_WAIT_TUPKEYREQ:
202     jam();
203     return;	/* IGNORE ATTRINFO IN THOSE STATES, WAITING FOR ABORT SIGNAL */
204   case TRANS_DISCONNECTED:
205     jam();
206   case TRANS_STARTED:
207     jam();
208   default:
209     ndbrequire(false);
210   }
211 }
212 
213 void
setChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)214 Dbtup::setChecksum(Tuple_header* tuple_ptr,
215                    Tablerec* regTabPtr)
216 {
217   tuple_ptr->m_checksum= 0;
218   tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
219 }
220 
221 Uint32
calculateChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)222 Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
223                          Tablerec* regTabPtr)
224 {
225   Uint32 checksum;
226   Uint32 i, rec_size, *tuple_header;
227   rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
228   tuple_header= tuple_ptr->m_data;
229   checksum= 0;
230   // includes tupVersion
231   //printf("%p - ", tuple_ptr);
232 
233   for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
234     checksum ^= tuple_header[i];
235     //printf("%.8x ", tuple_header[i]);
236   }
237 
238   //printf("-> %.8x\n", checksum);
239 
240 #if 0
241   if (var_sized) {
242     /*
243     if (! req_struct->fix_var_together) {
244       jam();
245       checksum ^= tuple_header[rec_size];
246     }
247     */
248     jam();
249     var_data_part= req_struct->var_data_start;
250     vsize_words= calculate_total_var_size(req_struct->var_len_array,
251                                           regTabPtr->no_var_attr);
252     ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
253     for (i= 0; i < vsize_words; i++) {
254       checksum ^= var_data_part[i];
255     }
256   }
257 #endif
258   return checksum;
259 }
260 
261 /* ----------------------------------------------------------------- */
262 /* -----------       INSERT_ACTIVE_OP_LIST            -------------- */
263 /* ----------------------------------------------------------------- */
264 bool
insertActiveOpList(OperationrecPtr regOperPtr,KeyReqStruct * req_struct)265 Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
266 			  KeyReqStruct* req_struct)
267 {
268   OperationrecPtr prevOpPtr;
269   ndbrequire(!regOperPtr.p->op_struct.in_active_list);
270   regOperPtr.p->op_struct.in_active_list= true;
271   req_struct->prevOpPtr.i=
272     prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
273   regOperPtr.p->prevActiveOp= prevOpPtr.i;
274   regOperPtr.p->nextActiveOp= RNIL;
275   regOperPtr.p->m_undo_buffer_space= 0;
276   req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
277   if (prevOpPtr.i == RNIL) {
278     set_change_mask_state(regOperPtr.p, USE_SAVED_CHANGE_MASK);
279     regOperPtr.p->saved_change_mask[0] = 0;
280     regOperPtr.p->saved_change_mask[1] = 0;
281     return true;
282   } else {
283     req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
284     prevOpPtr.p->nextActiveOp= regOperPtr.i;
285 
286     regOperPtr.p->op_struct.m_wait_log_buffer=
287       prevOpPtr.p->op_struct.m_wait_log_buffer;
288     regOperPtr.p->op_struct.m_load_diskpage_on_commit=
289       prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
290     regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
291     // start with prev mask (matters only for UPD o UPD)
292     set_change_mask_state(regOperPtr.p, get_change_mask_state(prevOpPtr.p));
293     regOperPtr.p->saved_change_mask[0] = prevOpPtr.p->saved_change_mask[0];
294     regOperPtr.p->saved_change_mask[1] = prevOpPtr.p->saved_change_mask[1];
295 
296     regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
297 
298     prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
299     prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
300 
301     if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
302     {
303       Uint32 op= regOperPtr.p->op_struct.op_type;
304       Uint32 prevOp= prevOpPtr.p->op_struct.op_type;
305       if (prevOp == ZDELETE)
306       {
307 	if(op == ZINSERT)
308 	{
309 	  // mark both
310 	  prevOpPtr.p->op_struct.delete_insert_flag= true;
311 	  regOperPtr.p->op_struct.delete_insert_flag= true;
312 	  return true;
313 	} else {
314 	  terrorCode= ZTUPLE_DELETED_ERROR;
315 	  return false;
316 	}
317       }
318       else if(op == ZINSERT && prevOp != ZDELETE)
319       {
320 	terrorCode= ZINSERT_ERROR;
321 	return false;
322       }
323       return true;
324     }
325     else
326     {
327       terrorCode= ZMUST_BE_ABORTED_ERROR;
328       return false;
329     }
330   }
331 }
332 
333 bool
setup_read(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,bool disk)334 Dbtup::setup_read(KeyReqStruct *req_struct,
335 		  Operationrec* regOperPtr,
336 		  Fragrecord* regFragPtr,
337 		  Tablerec* regTabPtr,
338 		  bool disk)
339 {
340   OperationrecPtr currOpPtr;
341   currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
342   if (currOpPtr.i == RNIL)
343   {
344     if (regTabPtr->need_expand(disk))
345       prepare_read(req_struct, regTabPtr, disk);
346     return true;
347   }
348 
349   do {
350     Uint32 savepointId= regOperPtr->savepointId;
351     bool dirty= req_struct->dirty_op;
352 
353     c_operation_pool.getPtr(currOpPtr);
354     bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
355 					 req_struct->trans_id1,
356 					 req_struct->trans_id2);
357     /**
358      * Read committed in same trans reads latest copy
359      */
360     if(dirty && !sameTrans)
361     {
362       savepointId= 0;
363     }
364     else if(sameTrans)
365     {
366       // Use savepoint even in read committed mode
367       dirty= false;
368     }
369 
370     bool found= find_savepoint(currOpPtr, savepointId);
371 
372     Uint32 currOp= currOpPtr.p->op_struct.op_type;
373 
374     if((found && currOp == ZDELETE) ||
375        ((dirty || !found) && currOp == ZINSERT))
376     {
377       terrorCode= ZTUPLE_DELETED_ERROR;
378       break;
379     }
380 
381     if(dirty || !found)
382     {
383 
384     }
385     else
386     {
387       req_struct->m_tuple_ptr= (Tuple_header*)
388 	c_undo_buffer.get_ptr(&currOpPtr.p->m_copy_tuple_location);
389     }
390 
391     if (regTabPtr->need_expand(disk))
392       prepare_read(req_struct, regTabPtr, disk);
393 
394 #if 0
395     ndbout_c("reading copy");
396     Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
397     req_struct->m_tuple_ptr= fixed_ptr;
398     req_struct->fix_var_together= true;
399     req_struct->var_len_array= (Uint16*)var_ptr;
400     req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
401     Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
402 					req_struct->var_pos_array,
403 					regTabPtr->no_var_attr);
404     req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
405 #endif
406     return true;
407   } while(0);
408 
409   return false;
410 }
411 
412 int
load_diskpage(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 local_key,Uint32 flags)413 Dbtup::load_diskpage(Signal* signal,
414 		     Uint32 opRec, Uint32 fragPtrI,
415 		     Uint32 local_key, Uint32 flags)
416 {
417   c_operation_pool.getPtr(operPtr, opRec);
418   fragptr.i= fragPtrI;
419   ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
420 
421   Operationrec *  regOperPtr= operPtr.p;
422   Fragrecord * regFragPtr= fragptr.p;
423 
424   tabptr.i = regFragPtr->fragTableId;
425   ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
426   Tablerec* regTabPtr = tabptr.p;
427 
428   if(local_key == ~(Uint32)0)
429   {
430     jam();
431     regOperPtr->op_struct.m_wait_log_buffer= 1;
432     regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
433     return 1;
434   }
435 
436   jam();
437   Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
438   Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
439   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
440 						     frag_page_id);
441   regOperPtr->m_tuple_location.m_page_idx= page_idx;
442 
443   PagePtr page_ptr;
444   Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
445   Tuple_header* ptr= (Tuple_header*)tmp;
446 
447   int res= 1;
448   if(ptr->m_header_bits & Tuple_header::DISK_PART)
449   {
450     Page_cache_client::Request req;
451     memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
452     req.m_callback.m_callbackData= opRec;
453     req.m_callback.m_callbackFunction=
454       safe_cast(&Dbtup::disk_page_load_callback);
455 
456 #ifdef ERROR_INSERT
457     if (ERROR_INSERTED(4022))
458     {
459       flags |= Page_cache_client::DELAY_REQ;
460       req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
461     }
462 #endif
463 
464     if((res= m_pgman.get_page(signal, req, flags)) > 0)
465     {
466       //ndbout_c("in cache");
467       // In cache
468     }
469     else if(res == 0)
470     {
471       //ndbout_c("waiting for callback");
472       // set state
473     }
474     else
475     {
476       // Error
477     }
478   }
479 
480   switch(flags & 7)
481   {
482   case ZREAD:
483   case ZREAD_EX:
484     break;
485   case ZDELETE:
486   case ZUPDATE:
487   case ZINSERT:
488   case ZWRITE:
489     regOperPtr->op_struct.m_wait_log_buffer= 1;
490     regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
491   }
492   return res;
493 }
494 
495 void
disk_page_load_callback(Signal * signal,Uint32 opRec,Uint32 page_id)496 Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
497 {
498   c_operation_pool.getPtr(operPtr, opRec);
499   c_lqh->acckeyconf_load_diskpage_callback(signal,
500 					   operPtr.p->userpointer, page_id);
501 }
502 
503 int
load_diskpage_scan(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 local_key,Uint32 flags)504 Dbtup::load_diskpage_scan(Signal* signal,
505 			  Uint32 opRec, Uint32 fragPtrI,
506 			  Uint32 local_key, Uint32 flags)
507 {
508   c_operation_pool.getPtr(operPtr, opRec);
509   fragptr.i= fragPtrI;
510   ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
511 
512   Operationrec *  regOperPtr= operPtr.p;
513   Fragrecord * regFragPtr= fragptr.p;
514 
515   tabptr.i = regFragPtr->fragTableId;
516   ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
517   Tablerec* regTabPtr = tabptr.p;
518 
519   jam();
520   Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
521   Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
522   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
523 						     frag_page_id);
524   regOperPtr->m_tuple_location.m_page_idx= page_idx;
525   regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
526 
527   PagePtr page_ptr;
528   Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
529   Tuple_header* ptr= (Tuple_header*)tmp;
530 
531   int res= 1;
532   if(ptr->m_header_bits & Tuple_header::DISK_PART)
533   {
534     Page_cache_client::Request req;
535     memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
536     req.m_callback.m_callbackData= opRec;
537     req.m_callback.m_callbackFunction=
538       safe_cast(&Dbtup::disk_page_load_scan_callback);
539 
540     if((res= m_pgman.get_page(signal, req, flags)) > 0)
541     {
542       // ndbout_c("in cache");
543       // In cache
544     }
545     else if(res == 0)
546     {
547       //ndbout_c("waiting for callback");
548       // set state
549     }
550     else
551     {
552       // Error
553     }
554   }
555   return res;
556 }
557 
558 void
disk_page_load_scan_callback(Signal * signal,Uint32 opRec,Uint32 page_id)559 Dbtup::disk_page_load_scan_callback(Signal* signal,
560 				    Uint32 opRec, Uint32 page_id)
561 {
562   c_operation_pool.getPtr(operPtr, opRec);
563   c_lqh->next_scanconf_load_diskpage_callback(signal,
564 					      operPtr.p->userpointer, page_id);
565 }
566 
execTUPKEYREQ(Signal * signal)567 void Dbtup::execTUPKEYREQ(Signal* signal)
568 {
569    TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
570    KeyReqStruct req_struct;
571    Uint32 sig1, sig2, sig3, sig4;
572 
573    Uint32 RoperPtr= tupKeyReq->connectPtr;
574    Uint32 Rfragptr= tupKeyReq->fragPtr;
575 
576    Uint32 RnoOfFragrec= cnoOfFragrec;
577    Uint32 RnoOfTablerec= cnoOfTablerec;
578 
579    jamEntry();
580    fragptr.i= Rfragptr;
581 
582    ndbrequire(Rfragptr < RnoOfFragrec);
583 
584    c_operation_pool.getPtr(operPtr, RoperPtr);
585    ptrAss(fragptr, fragrecord);
586 
587    Uint32 TrequestInfo= tupKeyReq->request;
588 
589    Operationrec *  regOperPtr= operPtr.p;
590    Fragrecord * regFragPtr= fragptr.p;
591 
592    tabptr.i = regFragPtr->fragTableId;
593    ptrCheckGuard(tabptr, RnoOfTablerec, tablerec);
594    Tablerec* regTabPtr = tabptr.p;
595 
596    req_struct.signal= signal;
597    req_struct.dirty_op= TrequestInfo & 1;
598    req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
599    req_struct.no_fired_triggers= 0;
600    req_struct.read_length= 0;
601    req_struct.max_attr_id_updated= 0;
602    req_struct.no_changed_attrs= 0;
603    req_struct.last_row= false;
604    req_struct.changeMask.clear();
605 
606    if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
607    {
608      TUPKEY_abort(signal, 39);
609      return;
610    }
611 
612  /* ----------------------------------------------------------------- */
613  // Operation is ZREAD when we arrive here so no need to worry about the
614  // abort process.
615  /* ----------------------------------------------------------------- */
616  /* -----------    INITIATE THE OPERATION RECORD       -------------- */
617  /* ----------------------------------------------------------------- */
618    Uint32 Rstoredid= tupKeyReq->storedProcedure;
619 
620    regOperPtr->fragmentPtr= Rfragptr;
621    regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0xf;
622    regOperPtr->op_struct.delete_insert_flag = false;
623    regOperPtr->storedProcedureId= Rstoredid;
624 
625    regOperPtr->m_copy_tuple_location.setNull();
626    regOperPtr->tupVersion= ZNIL;
627 
628    sig1= tupKeyReq->savePointId;
629    sig2= tupKeyReq->primaryReplica;
630    sig3= tupKeyReq->keyRef2;
631 
632    regOperPtr->savepointId= sig1;
633    regOperPtr->op_struct.primary_replica= sig2;
634    Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
635 
636    sig1= tupKeyReq->opRef;
637    sig2= tupKeyReq->tcOpIndex;
638    sig3= tupKeyReq->coordinatorTC;
639    sig4= tupKeyReq->keyRef1;
640 
641    req_struct.tc_operation_ptr= sig1;
642    req_struct.TC_index= sig2;
643    req_struct.TC_ref= sig3;
644    Uint32 pageid = req_struct.frag_page_id= sig4;
645    req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
646 
647    sig1= tupKeyReq->attrBufLen;
648    sig2= tupKeyReq->applRef;
649    sig3= tupKeyReq->transId1;
650    sig4= tupKeyReq->transId2;
651 
652    Uint32 disk_page= tupKeyReq->disk_page;
653 
654    req_struct.log_size= sig1;
655    req_struct.attrinfo_len= sig1;
656    req_struct.rec_blockref= sig2;
657    req_struct.trans_id1= sig3;
658    req_struct.trans_id2= sig4;
659    req_struct.m_disk_page_ptr.i= disk_page;
660 
661    sig1 = tupKeyReq->m_row_id_page_no;
662    sig2 = tupKeyReq->m_row_id_page_idx;
663 
664    req_struct.m_row_id.m_page_no = sig1;
665    req_struct.m_row_id.m_page_idx = sig2;
666 
667    Uint32 Roptype = regOperPtr->op_struct.op_type;
668 
669    if (Rstoredid != ZNIL) {
670      ndbrequire(initStoredOperationrec(regOperPtr,
671 				       &req_struct,
672 				       Rstoredid) == ZOK);
673    }
674 
675    copyAttrinfo(regOperPtr, &cinBuffer[0]);
676 
677    Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
678    if (Roptype == ZINSERT && localkey == ~ (Uint32) 0)
679    {
680      // No tuple allocatated yet
681      goto do_insert;
682    }
683 
684    /**
685     * Get pointer to tuple
686     */
687    regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
688 						      req_struct.frag_page_id);
689 
690    setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
691 
692    /**
693     * Check operation
694     */
695    if (Roptype == ZREAD) {
696      jam();
697 
698      if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
699 		    disk_page != RNIL))
700      {
701        if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
702        {
703 	 req_struct.log_size= 0;
704 	 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
705 	 /* ---------------------------------------------------------------- */
706 	 // Read Operations need not to be taken out of any lists.
707 	 // We also do not need to wait for commit since there is no changes
708 	 // to commit. Thus we
709 	 // prepare the operation record already now for the next operation.
710 	 // Write operations have set the state to STARTED above indicating
711 	 // that they are waiting for the Commit or Abort decision.
712 	 /* ---------------------------------------------------------------- */
713 	 set_trans_state(regOperPtr, TRANS_IDLE);
714 	 regOperPtr->currentAttrinbufLen= 0;
715        }
716        return;
717      }
718      tupkeyErrorLab(signal);
719      return;
720    }
721 
722    if(insertActiveOpList(operPtr, &req_struct))
723    {
724      if(Roptype == ZINSERT)
725      {
726        jam();
727    do_insert:
728        if (handleInsertReq(signal, operPtr,
729 			   fragptr, regTabPtr, &req_struct) == -1)
730        {
731 	 return;
732        }
733        if (!regTabPtr->tuxCustomTriggers.isEmpty())
734        {
735 	 jam();
736 	 if (executeTuxInsertTriggers(signal,
737 				      regOperPtr,
738 				      regFragPtr,
739 				      regTabPtr) != 0) {
740 	   jam();
741            /*
742             * TUP insert succeeded but add of TUX entries failed.  All
743             * TUX changes have been rolled back at this point.
744             *
745             * We will abort via tupkeyErrorLab() as usual.  This routine
746             * however resets the operation to ZREAD.  The TUP_ABORTREQ
747             * arriving later cannot then undo the insert.
748             *
749             * Therefore we call TUP_ABORTREQ already now.  Diskdata etc
750             * should be in memory and timeslicing cannot occur.  We must
751             * skip TUX abort triggers since TUX is already aborted.
752             */
753            signal->theData[0] = operPtr.i;
754            do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
755 	   tupkeyErrorLab(signal);
756 	   return;
757 	 }
758        }
759        checkImmediateTriggersAfterInsert(&req_struct,
760 					 regOperPtr,
761 					 regTabPtr,
762                                          disk_page != RNIL);
763        set_change_mask_state(regOperPtr, SET_ALL_MASK);
764        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
765        return;
766      }
767 
768      if (Roptype == ZUPDATE) {
769        jam();
770        if (handleUpdateReq(signal, regOperPtr,
771 			   regFragPtr, regTabPtr, &req_struct, disk_page != RNIL) == -1) {
772 	 return;
773        }
774        // If update operation is done on primary,
775        // check any after op triggers
776        terrorCode= 0;
777        if (!regTabPtr->tuxCustomTriggers.isEmpty()) {
778 	 jam();
779 	 if (executeTuxUpdateTriggers(signal,
780 				      regOperPtr,
781 				      regFragPtr,
782 				      regTabPtr) != 0) {
783 	   jam();
784            /*
785             * See insert case.
786             */
787            signal->theData[0] = operPtr.i;
788            do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
789 	   tupkeyErrorLab(signal);
790 	   return;
791 	 }
792        }
793        checkImmediateTriggersAfterUpdate(&req_struct,
794 					 regOperPtr,
795 					 regTabPtr,
796                                          disk_page != RNIL);
797        // XXX use terrorCode for now since all methods are void
798        if (terrorCode != 0)
799        {
800 	 tupkeyErrorLab(signal);
801 	 return;
802        }
803        update_change_mask_info(&req_struct, regOperPtr);
804        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
805        return;
806      }
807      else if(Roptype == ZDELETE)
808      {
809        jam();
810        req_struct.log_size= 0;
811        if (handleDeleteReq(signal, regOperPtr,
812 			   regFragPtr, regTabPtr,
813 			   &req_struct,
814 			   disk_page != RNIL) == -1) {
815 	 return;
816        }
817        /*
818 	* TUX doesn't need to check for triggers at delete since entries in
819 	* the index are kept until commit time.
820 	*/
821 
822        /*
823 	* Secondary index triggers fire on the primary after a delete.
824 	*/
825        checkImmediateTriggersAfterDelete(&req_struct,
826 					 regOperPtr,
827 					 regTabPtr,
828                                          disk_page != RNIL);
829        set_change_mask_state(regOperPtr, DELETE_CHANGES);
830        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
831        return;
832      }
833      else
834      {
835        ndbrequire(false); // Invalid op type
836      }
837    }
838 
839    tupkeyErrorLab(signal);
840  }
841 
842 void
setup_fixed_part(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)843 Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
844 			Operationrec* regOperPtr,
845 			Tablerec* regTabPtr)
846 {
847   PagePtr page_ptr;
848   Uint32* ptr= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
849   req_struct->m_page_ptr = page_ptr;
850   req_struct->m_tuple_ptr = (Tuple_header*)ptr;
851 
852   ndbassert(regOperPtr->op_struct.op_type == ZINSERT || (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
853 
854   req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
855   req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
856 
857   Uint32 num_attr= regTabPtr->m_no_of_attributes;
858   Uint32 descr_start= regTabPtr->tabDescriptor;
859   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
860   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
861   req_struct->attr_descr= tab_descr;
862 }
863 
864  /* ---------------------------------------------------------------- */
865  /* ------------------------ CONFIRM REQUEST ----------------------- */
866  /* ---------------------------------------------------------------- */
sendTUPKEYCONF(Signal * signal,KeyReqStruct * req_struct,Operationrec * regOperPtr)867  void Dbtup::sendTUPKEYCONF(Signal* signal,
868 			    KeyReqStruct *req_struct,
869 			    Operationrec * regOperPtr)
870 {
871   TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
872 
873   Uint32 Rcreate_rowid = req_struct->m_use_rowid;
874   Uint32 RuserPointer= regOperPtr->userpointer;
875   Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
876   Uint32 log_size= req_struct->log_size;
877   Uint32 read_length= req_struct->read_length;
878   Uint32 last_row= req_struct->last_row;
879 
880   set_trans_state(regOperPtr, TRANS_STARTED);
881   set_tuple_state(regOperPtr, TUPLE_PREPARED);
882   tupKeyConf->userPtr= RuserPointer;
883   tupKeyConf->readLength= read_length;
884   tupKeyConf->writeLength= log_size;
885   tupKeyConf->noFiredTriggers= RnoFiredTriggers;
886   tupKeyConf->lastRow= last_row;
887   tupKeyConf->rowid = Rcreate_rowid;
888 
889   EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
890 		 TupKeyConf::SignalLength);
891 
892 }
893 
894 
895 #define MAX_READ (sizeof(signal->theData) > MAX_MESSAGE_SIZE ? MAX_MESSAGE_SIZE : sizeof(signal->theData))
896 
897 /* ---------------------------------------------------------------- */
898 /* ----------------------------- READ  ---------------------------- */
899 /* ---------------------------------------------------------------- */
handleReadReq(Signal * signal,Operationrec * regOperPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)900 int Dbtup::handleReadReq(Signal* signal,
901                          Operationrec* regOperPtr,
902                          Tablerec* regTabPtr,
903                          KeyReqStruct* req_struct)
904 {
905   Uint32 *dst;
906   Uint32 dstLen, start_index;
907   const BlockReference sendBref= req_struct->rec_blockref;
908   if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
909       (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) {
910     jam();
911     ndbout_c("here2");
912     terrorCode= ZTUPLE_CORRUPTED_ERROR;
913     tupkeyErrorLab(signal);
914     return -1;
915   }
916 
917   const Uint32 node = refToNode(sendBref);
918   if(node != 0 && node != getOwnNodeId()) {
919     start_index= 25;
920   } else {
921     jam();
922     /**
923      * execute direct
924      */
925     start_index= 3;
926   }
927   dst= &signal->theData[start_index];
928   dstLen= (MAX_READ / 4) - start_index;
929   if (!req_struct->interpreted_exec) {
930     jam();
931     int ret = readAttributes(req_struct,
932 			     &cinBuffer[0],
933 			     req_struct->attrinfo_len,
934 			     dst,
935 			     dstLen,
936 			     false);
937     if (likely(ret != -1)) {
938 /* ------------------------------------------------------------------------- */
939 // We have read all data into coutBuffer. Now send it to the API.
940 /* ------------------------------------------------------------------------- */
941       jam();
942       Uint32 TnoOfDataRead= (Uint32) ret;
943       req_struct->read_length= TnoOfDataRead;
944       sendReadAttrinfo(signal, req_struct, TnoOfDataRead, regOperPtr);
945       return 0;
946     }
947   } else {
948     jam();
949     if (likely(interpreterStartLab(signal, req_struct) != -1)) {
950       return 0;
951     }
952     return -1;
953   }
954 
955   jam();
956   tupkeyErrorLab(signal);
957   return -1;
958 }
959 
960 /* ---------------------------------------------------------------- */
961 /* ---------------------------- UPDATE ---------------------------- */
962 /* ---------------------------------------------------------------- */
handleUpdateReq(Signal * signal,Operationrec * operPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)963 int Dbtup::handleUpdateReq(Signal* signal,
964                            Operationrec* operPtrP,
965                            Fragrecord* regFragPtr,
966                            Tablerec* regTabPtr,
967                            KeyReqStruct* req_struct,
968 			   bool disk)
969 {
970   Uint32 *dst;
971   Tuple_header *base= req_struct->m_tuple_ptr, *org;
972   if ((dst= c_undo_buffer.alloc_copy_tuple(&operPtrP->m_copy_tuple_location,
973 					   regTabPtr->total_rec_size)) == 0)
974   {
975     terrorCode= ZMEM_NOMEM_ERROR;
976     goto error;
977   }
978 
979   Uint32 tup_version;
980   if(operPtrP->is_first_operation())
981   {
982     org= req_struct->m_tuple_ptr;
983     tup_version= org->get_tuple_version();
984   }
985   else
986   {
987     Operationrec* prevOp= req_struct->prevOpPtr.p;
988     tup_version= prevOp->tupVersion;
989     org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
990   }
991 
992   /**
993    * Check consistency before update/delete
994    */
995   req_struct->m_tuple_ptr= org;
996   if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
997       (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
998   {
999     terrorCode= ZTUPLE_CORRUPTED_ERROR;
1000     goto error;
1001   }
1002 
1003   req_struct->m_tuple_ptr= (Tuple_header*)dst;
1004 
1005   union {
1006     Uint32 sizes[4];
1007     Uint64 cmp[2];
1008   };
1009 
1010   disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
1011   if (regTabPtr->need_expand(disk))
1012   {
1013     expand_tuple(req_struct, sizes, org, regTabPtr, disk);
1014     if(disk && operPtrP->m_undo_buffer_space == 0)
1015     {
1016       operPtrP->op_struct.m_wait_log_buffer = 1;
1017       operPtrP->op_struct.m_load_diskpage_on_commit = 1;
1018       Uint32 sz= operPtrP->m_undo_buffer_space=
1019 	(sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
1020 
1021       terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
1022 					   sz);
1023       if(unlikely(terrorCode))
1024       {
1025 	operPtrP->m_undo_buffer_space= 0;
1026 	goto error;
1027       }
1028     }
1029   }
1030   else
1031   {
1032     memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1033   }
1034 
1035   tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
1036   operPtrP->tupVersion= tup_version;
1037 
1038   if (!req_struct->interpreted_exec) {
1039     jam();
1040     int retValue = updateAttributes(req_struct,
1041 				    &cinBuffer[0],
1042 				    req_struct->attrinfo_len);
1043     if (unlikely(retValue == -1))
1044       goto error;
1045   } else {
1046     jam();
1047     if (unlikely(interpreterStartLab(signal, req_struct) == -1))
1048       return -1;
1049   }
1050 
1051   if (regTabPtr->need_shrink())
1052   {
1053     shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
1054     if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
1055 							    base,
1056 							    operPtrP,
1057 							    regFragPtr,
1058 							    regTabPtr,
1059 							    sizes)) {
1060       goto error;
1061     }
1062   }
1063 
1064   req_struct->m_tuple_ptr->set_tuple_version(tup_version);
1065   if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
1066     jam();
1067     setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1068   }
1069   return 0;
1070 
1071 error:
1072   tupkeyErrorLab(signal);
1073   return -1;
1074 }
1075 
1076 /* ---------------------------------------------------------------- */
1077 /* ----------------------------- INSERT --------------------------- */
1078 /* ---------------------------------------------------------------- */
1079 void
prepare_initial_insert(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1080 Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
1081 			      Operationrec* regOperPtr,
1082 			      Tablerec* regTabPtr)
1083 {
1084   Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ?
1085     sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
1086   regOperPtr->nextActiveOp= RNIL;
1087   regOperPtr->prevActiveOp= RNIL;
1088   regOperPtr->op_struct.in_active_list= true;
1089   regOperPtr->m_undo_buffer_space= disk_undo;
1090 
1091   req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1092   req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1093 
1094   Uint32 num_attr= regTabPtr->m_no_of_attributes;
1095   Uint32 descr_start= regTabPtr->tabDescriptor;
1096   Uint32 order_desc= regTabPtr->m_real_order_descriptor;
1097   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1098   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1099   req_struct->attr_descr= tab_descr;
1100   Uint16* order= (Uint16*)&tableDescriptor[order_desc];
1101 
1102   const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
1103   const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
1104   Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
1105   Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
1106 
1107   if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
1108   {
1109     ref->m_page_no = RNIL;
1110     ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
1111   }
1112 
1113   if(cnt1)
1114   {
1115     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
1116     dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt1+1);
1117     dst->m_offset_array_ptr= req_struct->var_pos_array;
1118     dst->m_var_len_offset= cnt1;
1119     dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
1120     // Disk part is 32-bit aligned
1121     ptr= ALIGN_WORD(dst->m_data_ptr+regTabPtr->m_offsets[MM].m_max_var_offset);
1122     order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
1123     Uint32 pos= 0;
1124     Uint16 *pos_ptr = req_struct->var_pos_array;
1125     Uint16 *len_ptr = pos_ptr + cnt1;
1126     for(Uint32 i= 0; i<cnt1; i++)
1127     {
1128       * pos_ptr++ = pos;
1129       * len_ptr++ = pos;
1130       pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
1131     }
1132   }
1133 
1134   req_struct->m_disk_ptr= (Tuple_header*)ptr;
1135 
1136   ndbrequire(cnt2 == 0);
1137 
1138   // Set all null bits
1139   memset(req_struct->m_tuple_ptr->m_null_bits+
1140 	 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1141 	 4*regTabPtr->m_offsets[MM].m_null_words);
1142   memset(req_struct->m_disk_ptr->m_null_bits+
1143 	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1144 	 4*regTabPtr->m_offsets[DD].m_null_words);
1145   req_struct->m_tuple_ptr->m_header_bits=
1146     disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
1147 }
1148 
handleInsertReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> fragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)1149 int Dbtup::handleInsertReq(Signal* signal,
1150                            Ptr<Operationrec> regOperPtr,
1151                            Ptr<Fragrecord> fragPtr,
1152                            Tablerec* regTabPtr,
1153                            KeyReqStruct *req_struct)
1154 {
1155   Uint32 tup_version = 1;
1156   Fragrecord* regFragPtr = fragPtr.p;
1157   Uint32 *dst, *ptr= 0;
1158   Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
1159   Tuple_header *tuple_ptr;
1160 
1161   bool disk = regTabPtr->m_no_of_disk_attributes > 0;
1162   bool mem_insert = regOperPtr.p->is_first_operation();
1163   bool disk_insert = mem_insert && disk;
1164   bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
1165   bool rowid = req_struct->m_use_rowid;
1166   Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1167   Uint32 frag_page_id = req_struct->frag_page_id;
1168 
1169   union {
1170     Uint32 sizes[4];
1171     Uint64 cmp[2];
1172   };
1173 
1174   if (ERROR_INSERTED(4014))
1175   {
1176     dst = 0;
1177     goto undo_buffer_error;
1178   }
1179 
1180   dst= c_undo_buffer.alloc_copy_tuple(&regOperPtr.p->m_copy_tuple_location,
1181 				      regTabPtr->total_rec_size);
1182   if (unlikely(dst == 0))
1183   {
1184     goto undo_buffer_error;
1185   }
1186   tuple_ptr= req_struct->m_tuple_ptr= (Tuple_header*)dst;
1187 
1188   if(mem_insert)
1189   {
1190     jam();
1191     prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
1192   }
1193   else
1194   {
1195     Operationrec* prevOp= req_struct->prevOpPtr.p;
1196     ndbassert(prevOp->op_struct.op_type == ZDELETE);
1197     tup_version= prevOp->tupVersion + 1;
1198 
1199     if(!prevOp->is_first_operation())
1200       org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
1201     if (regTabPtr->need_expand())
1202     {
1203       expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
1204       memset(req_struct->m_disk_ptr->m_null_bits+
1205              regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1206              4*regTabPtr->m_offsets[DD].m_null_words);
1207     }
1208     else
1209     {
1210       memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1211     }
1212     memset(tuple_ptr->m_null_bits+
1213            regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1214            4*regTabPtr->m_offsets[MM].m_null_words);
1215   }
1216 
1217   if (disk_insert)
1218   {
1219     int res;
1220 
1221     if (ERROR_INSERTED(4015))
1222     {
1223       terrorCode = 1501;
1224       goto log_space_error;
1225     }
1226 
1227     res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
1228 				  regOperPtr.p->m_undo_buffer_space);
1229     if(unlikely(res))
1230     {
1231       terrorCode= res;
1232       goto log_space_error;
1233     }
1234   }
1235 
1236   regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
1237   tuple_ptr->set_tuple_version(tup_version);
1238 
1239   if (ERROR_INSERTED(4016))
1240   {
1241     terrorCode = ZAI_INCONSISTENCY_ERROR;
1242     goto update_error;
1243   }
1244 
1245   if(unlikely(updateAttributes(req_struct, &cinBuffer[0],
1246 			       req_struct->attrinfo_len) == -1))
1247   {
1248     goto update_error;
1249   }
1250 
1251   if (ERROR_INSERTED(4017))
1252   {
1253     goto null_check_error;
1254   }
1255   if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
1256   {
1257     goto null_check_error;
1258   }
1259 
1260   if (regTabPtr->need_shrink())
1261   {
1262     shrink_tuple(req_struct, sizes+2, regTabPtr, true);
1263   }
1264 
1265   if (ERROR_INSERTED(4025))
1266   {
1267     goto mem_error;
1268   }
1269 
1270   if (ERROR_INSERTED(4026))
1271   {
1272     CLEAR_ERROR_INSERT_VALUE;
1273     goto mem_error;
1274   }
1275 
1276   if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
1277   {
1278     goto mem_error;
1279   }
1280 
1281   if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
1282   {
1283     CLEAR_ERROR_INSERT_VALUE;
1284     goto mem_error;
1285   }
1286 
1287   /**
1288    * Alloc memory
1289    */
1290   if(mem_insert)
1291   {
1292     if (!rowid)
1293     {
1294       if (ERROR_INSERTED(4018))
1295       {
1296 	goto mem_error;
1297       }
1298 
1299       if (!varsize)
1300       {
1301 	jam();
1302 	ptr= alloc_fix_rec(regFragPtr,
1303 			   regTabPtr,
1304 			   &regOperPtr.p->m_tuple_location,
1305 			   &frag_page_id);
1306       }
1307       else
1308       {
1309 	jam();
1310 	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1311 	ptr= alloc_var_rec(regFragPtr, regTabPtr,
1312 			   sizes[2+MM],
1313 			   &regOperPtr.p->m_tuple_location,
1314 			   &frag_page_id);
1315       }
1316       if (unlikely(ptr == 0))
1317       {
1318 	goto mem_error;
1319       }
1320       req_struct->m_use_rowid = true;
1321     }
1322     else
1323     {
1324       regOperPtr.p->m_tuple_location = req_struct->m_row_id;
1325       if (ERROR_INSERTED(4019))
1326       {
1327 	terrorCode = ZROWID_ALLOCATED;
1328 	goto alloc_rowid_error;
1329       }
1330 
1331       if (!varsize)
1332       {
1333 	jam();
1334 	ptr= alloc_fix_rowid(regFragPtr,
1335 			     regTabPtr,
1336 			     &regOperPtr.p->m_tuple_location,
1337 			     &frag_page_id);
1338       }
1339       else
1340       {
1341 	jam();
1342 	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1343 	ptr= alloc_var_rowid(regFragPtr, regTabPtr,
1344 			     sizes[2+MM],
1345 			     &regOperPtr.p->m_tuple_location,
1346 			     &frag_page_id);
1347       }
1348       if (unlikely(ptr == 0))
1349       {
1350 	jam();
1351 	goto alloc_rowid_error;
1352       }
1353     }
1354     real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1355     regOperPtr.p->m_tuple_location.m_page_no= frag_page_id;
1356     c_lqh->accminupdate(signal,
1357 			regOperPtr.p->userpointer,
1358 			&regOperPtr.p->m_tuple_location);
1359 
1360     base = (Tuple_header*)ptr;
1361     base->m_operation_ptr_i= regOperPtr.i;
1362     base->m_header_bits= Tuple_header::ALLOC |
1363       (varsize ? Tuple_header::CHAINED_ROW : 0);
1364     regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
1365   }
1366   else
1367   {
1368     int ret;
1369     if (ERROR_INSERTED(4020))
1370     {
1371       goto size_change_error;
1372     }
1373 
1374     if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
1375 	unlikely(ret = handle_size_change_after_update(req_struct,
1376 						       base,
1377 						       regOperPtr.p,
1378 						       regFragPtr,
1379 						       regTabPtr,
1380 						       sizes)))
1381     {
1382       goto size_change_error;
1383     }
1384     req_struct->m_use_rowid = false;
1385     base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
1386   }
1387 
1388   base->m_header_bits |= Tuple_header::ALLOC &
1389     (regOperPtr.p->is_first_operation() ? ~0 : 1);
1390 
1391   if (disk_insert)
1392   {
1393     Local_key tmp;
1394     Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
1395       1 : sizes[2+DD];
1396 
1397     if (ERROR_INSERTED(4021))
1398     {
1399       terrorCode = 1601;
1400       goto disk_prealloc_error;
1401     }
1402 
1403     int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
1404     if (unlikely(ret < 0))
1405     {
1406       terrorCode = -ret;
1407       goto disk_prealloc_error;
1408     }
1409 
1410     regOperPtr.p->op_struct.m_disk_preallocated= 1;
1411     tmp.m_page_idx= size;
1412     memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
1413 
1414     /**
1415      * Set ref from disk to mm
1416      */
1417     Local_key ref = regOperPtr.p->m_tuple_location;
1418     ref.m_page_no = frag_page_id;
1419 
1420     Tuple_header* disk_ptr= req_struct->m_disk_ptr;
1421     disk_ptr->m_header_bits = 0;
1422     disk_ptr->m_base_record_ref= ref.ref();
1423   }
1424 
1425   if (regTabPtr->m_bits & Tablerec::TR_Checksum)
1426   {
1427     jam();
1428     setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1429   }
1430   return 0;
1431 
1432 size_change_error:
1433   jam();
1434   terrorCode = ZMEM_NOMEM_ERROR;
1435   goto exit_error;
1436 
1437 undo_buffer_error:
1438   jam();
1439   terrorCode= ZMEM_NOMEM_ERROR;
1440   regOperPtr.p->m_undo_buffer_space = 0;
1441   if (mem_insert)
1442     regOperPtr.p->m_tuple_location.setNull();
1443   regOperPtr.p->m_copy_tuple_location.setNull();
1444   tupkeyErrorLab(signal);
1445   return -1;
1446 
1447 null_check_error:
1448   jam();
1449   terrorCode= ZNO_ILLEGAL_NULL_ATTR;
1450   goto update_error;
1451 
1452 mem_error:
1453   jam();
1454   terrorCode= ZMEM_NOMEM_ERROR;
1455   goto update_error;
1456 
1457 log_space_error:
1458   jam();
1459   regOperPtr.p->m_undo_buffer_space = 0;
1460 alloc_rowid_error:
1461   jam();
1462 update_error:
1463   jam();
1464   if (mem_insert)
1465   {
1466     regOperPtr.p->op_struct.in_active_list = false;
1467     regOperPtr.p->m_tuple_location.setNull();
1468   }
1469 exit_error:
1470   tupkeyErrorLab(signal);
1471   return -1;
1472 
1473 disk_prealloc_error:
1474   base->m_header_bits |= Tuple_header::FREED;
1475   goto exit_error;
1476 }
1477 
1478 /* ---------------------------------------------------------------- */
1479 /* ---------------------------- DELETE ---------------------------- */
1480 /* ---------------------------------------------------------------- */
handleDeleteReq(Signal * signal,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)1481 int Dbtup::handleDeleteReq(Signal* signal,
1482                            Operationrec* regOperPtr,
1483                            Fragrecord* regFragPtr,
1484                            Tablerec* regTabPtr,
1485                            KeyReqStruct *req_struct,
1486 			   bool disk)
1487 {
1488   // delete must set but not increment tupVersion
1489   if (!regOperPtr->is_first_operation())
1490   {
1491     Operationrec* prevOp= req_struct->prevOpPtr.p;
1492     regOperPtr->tupVersion= prevOp->tupVersion;
1493     // make copy since previous op is committed before this one
1494     const Uint32* org = c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
1495     Uint32* dst = c_undo_buffer.alloc_copy_tuple(
1496         &regOperPtr->m_copy_tuple_location, regTabPtr->total_rec_size);
1497     if (dst == 0) {
1498       terrorCode = ZMEM_NOMEM_ERROR;
1499       goto error;
1500     }
1501     memcpy(dst, org, regTabPtr->total_rec_size << 2);
1502     req_struct->m_tuple_ptr = (Tuple_header*)dst;
1503   }
1504   else
1505   {
1506     regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
1507   }
1508 
1509   if(disk && regOperPtr->m_undo_buffer_space == 0)
1510   {
1511     regOperPtr->op_struct.m_wait_log_buffer = 1;
1512     regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
1513     Uint32 sz= regOperPtr->m_undo_buffer_space=
1514       (sizeof(Dbtup::Disk_undo::Free) >> 2) +
1515       regTabPtr->m_offsets[DD].m_fix_header_size - 1;
1516 
1517     terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
1518                                          sz);
1519     if(unlikely(terrorCode))
1520     {
1521       regOperPtr->m_undo_buffer_space= 0;
1522       goto error;
1523     }
1524   }
1525   if (req_struct->attrinfo_len == 0)
1526   {
1527     return 0;
1528   }
1529 
1530   if (regTabPtr->need_expand(disk))
1531   {
1532     prepare_read(req_struct, regTabPtr, disk);
1533   }
1534 
1535   {
1536     Uint32 RlogSize;
1537     int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
1538     if (ret == 0 && (RlogSize= req_struct->log_size))
1539     {
1540       jam();
1541       sendLogAttrinfo(signal, RlogSize, regOperPtr);
1542     }
1543     return ret;
1544   }
1545 
1546 error:
1547   tupkeyErrorLab(signal);
1548   return -1;
1549 }
1550 
1551 bool
checkNullAttributes(KeyReqStruct * req_struct,Tablerec * regTabPtr)1552 Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
1553                            Tablerec* regTabPtr)
1554 {
1555 // Implement checking of updating all not null attributes in an insert here.
1556   Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
1557   /*
1558    * The idea here is maybe that changeMask is not-null attributes
1559    * and must contain notNullAttributeMask.  But:
1560    *
1561    * 1. changeMask has all bits set on insert
1562    * 2. not-null is checked in each UpdateFunction
1563    * 3. the code below does not work except trivially due to 1.
1564    *
1565    * XXX remove or fix
1566    */
1567   attributeMask.clear();
1568   attributeMask.bitOR(req_struct->changeMask);
1569   attributeMask.bitAND(regTabPtr->notNullAttributeMask);
1570   attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
1571   if (!attributeMask.isclear()) {
1572     return false;
1573   }
1574   return true;
1575 }
1576 
1577 /* ---------------------------------------------------------------- */
1578 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE    */
1579 /* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
1580 /* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL  */
1581 /* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR  PHASES*/
1582 /* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
1583 /* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
1584 /* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE        */
1585 /* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
1586 /* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL.  */
1587 /* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
1588 /* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN   */
1589 /* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND      */
1590 /* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM  */
1591 /* TO THE CLIENT APPLICATION.                                       */
1592 /* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
1593 /* THE INTERPRETER EXECUTION REGION.                                */
1594 /* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS   */
1595 /*                                                                  */
1596 /* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
1597 /* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
1598 /* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED      */
1599 /* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER.                    */
1600 /*                                                                  */
1601 /* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY   */
1602 /* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO     */
1603 /* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN     */
1604 /* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS.      */
1605 /*                                                                  */
1606 /*                                                                  */
1607 /*       -----------------------------------------                  */
1608 /*       +   INITIAL READ REGION                 +                  */
1609 /*       -----------------------------------------                  */
1610 /*       +   INTERPRETED EXECUTE  REGION         +                  */
1611 /*       -----------------------------------------                  */
1612 /*       +   FINAL UPDATE REGION                 +                  */
1613 /*       -----------------------------------------                  */
1614 /*       +   FINAL READ REGION                   +                  */
1615 /*       -----------------------------------------                  */
1616 /*       +   SUBROUTINE REGION                   +                  */
1617 /*       -----------------------------------------                  */
1618 /* ---------------------------------------------------------------- */
1619 /* ---------------------------------------------------------------- */
1620 /* ----------------- INTERPRETED EXECUTION  ----------------------- */
1621 /* ---------------------------------------------------------------- */
interpreterStartLab(Signal * signal,KeyReqStruct * req_struct)1622 int Dbtup::interpreterStartLab(Signal* signal,
1623                                KeyReqStruct *req_struct)
1624 {
1625   Operationrec *  const regOperPtr= operPtr.p;
1626   int TnoDataRW;
1627   Uint32 RtotalLen, start_index, dstLen;
1628   Uint32 *dst;
1629 
1630   Uint32 RinitReadLen= cinBuffer[0];
1631   Uint32 RexecRegionLen= cinBuffer[1];
1632   Uint32 RfinalUpdateLen= cinBuffer[2];
1633   Uint32 RfinalRLen= cinBuffer[3];
1634   Uint32 RsubLen= cinBuffer[4];
1635 
1636   Uint32 RattrinbufLen= req_struct->attrinfo_len;
1637   const BlockReference sendBref= req_struct->rec_blockref;
1638 
1639   const Uint32 node = refToNode(sendBref);
1640   if(node != 0 && node != getOwnNodeId()) {
1641     start_index= 25;
1642   } else {
1643     jam();
1644     /**
1645      * execute direct
1646      */
1647     start_index= 3;
1648   }
1649   dst= &signal->theData[start_index];
1650   dstLen= (MAX_READ / 4) - start_index;
1651 
1652   RtotalLen= RinitReadLen;
1653   RtotalLen += RexecRegionLen;
1654   RtotalLen += RfinalUpdateLen;
1655   RtotalLen += RfinalRLen;
1656   RtotalLen += RsubLen;
1657 
1658   Uint32 RattroutCounter= 0;
1659   Uint32 RinstructionCounter= 5;
1660   Uint32 RlogSize= 0;
1661   if (((RtotalLen + 5) == RattrinbufLen) &&
1662       (RattrinbufLen >= 5) &&
1663       (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
1664     /* ---------------------------------------------------------------- */
1665     // We start by checking consistency. We must have the first five
1666     // words of the ATTRINFO to give us the length of the regions. The
1667     // size of these regions must be the same as the total ATTRINFO
1668     // length and finally the total length must be within the limits.
1669     /* ---------------------------------------------------------------- */
1670 
1671     if (RinitReadLen > 0) {
1672       jam();
1673       /* ---------------------------------------------------------------- */
1674       // The first step that can be taken in the interpreter is to read
1675       // data of the tuple before any updates have been applied.
1676       /* ---------------------------------------------------------------- */
1677       TnoDataRW= readAttributes(req_struct,
1678 				 &cinBuffer[5],
1679 				 RinitReadLen,
1680 				 &dst[0],
1681 				 dstLen,
1682                                  false);
1683       if (TnoDataRW != -1) {
1684 	RattroutCounter= TnoDataRW;
1685 	RinstructionCounter += RinitReadLen;
1686       } else {
1687 	jam();
1688 	tupkeyErrorLab(signal);
1689 	return -1;
1690       }
1691     }
1692     if (RexecRegionLen > 0) {
1693       jam();
1694       /* ---------------------------------------------------------------- */
1695       // The next step is the actual interpreted execution. This executes
1696       // a register-based virtual machine which can read and write attributes
1697       // to and from registers.
1698       /* ---------------------------------------------------------------- */
1699       Uint32 RsubPC= RinstructionCounter + RfinalUpdateLen + RfinalRLen;
1700       TnoDataRW= interpreterNextLab(signal,
1701                                      req_struct,
1702 				     &clogMemBuffer[0],
1703 				     &cinBuffer[RinstructionCounter],
1704 				     RexecRegionLen,
1705 				     &cinBuffer[RsubPC],
1706 				     RsubLen,
1707 				     &coutBuffer[0],
1708 				     sizeof(coutBuffer) / 4);
1709       if (TnoDataRW != -1) {
1710 	RinstructionCounter += RexecRegionLen;
1711 	RlogSize= TnoDataRW;
1712       } else {
1713 	jam();
1714 	/**
1715 	 * TUPKEY REF is sent from within interpreter
1716 	 */
1717 	return -1;
1718       }
1719     }
1720     if (RfinalUpdateLen > 0) {
1721       jam();
1722       /* ---------------------------------------------------------------- */
1723       // We can also apply a set of updates without any conditions as part
1724       // of the interpreted execution.
1725       /* ---------------------------------------------------------------- */
1726       if (regOperPtr->op_struct.op_type == ZUPDATE) {
1727 	TnoDataRW= updateAttributes(req_struct,
1728 				     &cinBuffer[RinstructionCounter],
1729 				     RfinalUpdateLen);
1730 	if (TnoDataRW != -1) {
1731 	  MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
1732 			   &cinBuffer[RinstructionCounter],
1733 			   RfinalUpdateLen);
1734 	  RinstructionCounter += RfinalUpdateLen;
1735 	  RlogSize += RfinalUpdateLen;
1736 	} else {
1737 	  jam();
1738 	  tupkeyErrorLab(signal);
1739 	  return -1;
1740 	}
1741       } else {
1742 	return TUPKEY_abort(signal, 19);
1743       }
1744     }
1745     if (RfinalRLen > 0) {
1746       jam();
1747       /* ---------------------------------------------------------------- */
1748       // The final action is that we can also read the tuple after it has
1749       // been updated.
1750       /* ---------------------------------------------------------------- */
1751       TnoDataRW= readAttributes(req_struct,
1752 				 &cinBuffer[RinstructionCounter],
1753 				 RfinalRLen,
1754 				 &dst[RattroutCounter],
1755 				 (dstLen - RattroutCounter),
1756                                  false);
1757       if (TnoDataRW != -1) {
1758 	RattroutCounter += TnoDataRW;
1759       } else {
1760 	jam();
1761 	tupkeyErrorLab(signal);
1762 	return -1;
1763       }
1764     }
1765     req_struct->log_size= RlogSize;
1766     req_struct->read_length= RattroutCounter;
1767     sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
1768     if (RlogSize > 0) {
1769       sendLogAttrinfo(signal, RlogSize, regOperPtr);
1770     }
1771     return 0;
1772   } else {
1773     return TUPKEY_abort(signal, 22);
1774   }
1775 }
1776 
1777 /* ---------------------------------------------------------------- */
1778 /*       WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
1779 /*       BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY  */
1780 /*       NODES.                                                     */
1781 /*       INPUT:  LOG_ATTRINFOPTR         WHERE TO FETCH DATA FROM   */
1782 /*               TLOG_START              FIRST INDEX TO LOG         */
1783 /*               TLOG_END                LAST INDEX + 1 TO LOG      */
1784 /* ---------------------------------------------------------------- */
sendLogAttrinfo(Signal * signal,Uint32 TlogSize,Operationrec * const regOperPtr)1785 void Dbtup::sendLogAttrinfo(Signal* signal,
1786                             Uint32 TlogSize,
1787                             Operationrec *  const regOperPtr)
1788 
1789 {
1790   Uint32 TbufferIndex= 0;
1791   signal->theData[0]= regOperPtr->userpointer;
1792   while (TlogSize > 22) {
1793     MEMCOPY_NO_WORDS(&signal->theData[3],
1794                      &clogMemBuffer[TbufferIndex],
1795                      22);
1796     EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 25);
1797     TbufferIndex += 22;
1798     TlogSize -= 22;
1799   }
1800   MEMCOPY_NO_WORDS(&signal->theData[3],
1801                    &clogMemBuffer[TbufferIndex],
1802                    TlogSize);
1803   EXECUTE_DIRECT(DBLQH, GSN_TUP_ATTRINFO, signal, 3 + TlogSize);
1804 }
1805 
1806 inline
1807 Uint32
brancher(Uint32 TheInstruction,Uint32 TprogramCounter)1808 brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
1809 {
1810   Uint32 TbranchDirection= TheInstruction >> 31;
1811   Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
1812   TprogramCounter--;
1813   if (TbranchDirection == 1) {
1814     jam();
1815     /* ---------------------------------------------------------------- */
1816     /*       WE JUMP BACKWARDS.                                         */
1817     /* ---------------------------------------------------------------- */
1818     return (TprogramCounter - TbranchLength);
1819   } else {
1820     jam();
1821     /* ---------------------------------------------------------------- */
1822     /*       WE JUMP FORWARD.                                           */
1823     /* ---------------------------------------------------------------- */
1824     return (TprogramCounter + TbranchLength);
1825   }
1826 }
1827 
interpreterNextLab(Signal * signal,KeyReqStruct * req_struct,Uint32 * logMemory,Uint32 * mainProgram,Uint32 TmainProgLen,Uint32 * subroutineProg,Uint32 TsubroutineLen,Uint32 * tmpArea,Uint32 tmpAreaSz)1828 int Dbtup::interpreterNextLab(Signal* signal,
1829                               KeyReqStruct* req_struct,
1830                               Uint32* logMemory,
1831                               Uint32* mainProgram,
1832                               Uint32 TmainProgLen,
1833                               Uint32* subroutineProg,
1834                               Uint32 TsubroutineLen,
1835 			      Uint32 * tmpArea,
1836 			      Uint32 tmpAreaSz)
1837 {
1838   register Uint32* TcurrentProgram= mainProgram;
1839   register Uint32 TcurrentSize= TmainProgLen;
1840   register Uint32 RnoOfInstructions= 0;
1841   register Uint32 TprogramCounter= 0;
1842   register Uint32 theInstruction;
1843   register Uint32 theRegister;
1844   Uint32 TdataWritten= 0;
1845   Uint32 RstackPtr= 0;
1846   union {
1847     Uint32 TregMemBuffer[32];
1848     Uint64 align[16];
1849   };
1850   Uint32 TstackMemBuffer[32];
1851 
1852   /* ---------------------------------------------------------------- */
1853   // Initialise all 8 registers to contain the NULL value.
1854   // In this version we can handle 32 and 64 bit unsigned integers.
1855   // They are handled as 64 bit values. Thus the 32 most significant
1856   // bits are zeroed for 32 bit values.
1857   /* ---------------------------------------------------------------- */
1858   TregMemBuffer[0]= 0;
1859   TregMemBuffer[4]= 0;
1860   TregMemBuffer[8]= 0;
1861   TregMemBuffer[12]= 0;
1862   TregMemBuffer[16]= 0;
1863   TregMemBuffer[20]= 0;
1864   TregMemBuffer[24]= 0;
1865   TregMemBuffer[28]= 0;
1866   Uint32 tmpHabitant= ~0;
1867 
1868   while (RnoOfInstructions < 8000) {
1869     /* ---------------------------------------------------------------- */
1870     /* EXECUTE THE NEXT INTERPRETER INSTRUCTION.                        */
1871     /* ---------------------------------------------------------------- */
1872     RnoOfInstructions++;
1873     theInstruction= TcurrentProgram[TprogramCounter];
1874     theRegister= Interpreter::getReg1(theInstruction) << 2;
1875     if (TprogramCounter < TcurrentSize) {
1876       TprogramCounter++;
1877       switch (Interpreter::getOpCode(theInstruction)) {
1878       case Interpreter::READ_ATTR_INTO_REG:
1879 	jam();
1880 	/* ---------------------------------------------------------------- */
1881 	// Read an attribute from the tuple into a register.
1882 	// While reading an attribute we allow the attribute to be an array
1883 	// as long as it fits in the 64 bits of the register.
1884 	/* ---------------------------------------------------------------- */
1885 	{
1886 	  Uint32 theAttrinfo= theInstruction;
1887 	  int TnoDataRW= readAttributes(req_struct,
1888 				     &theAttrinfo,
1889 				     (Uint32)1,
1890 				     &TregMemBuffer[theRegister],
1891 				     (Uint32)3,
1892                                      false);
1893 	  if (TnoDataRW == 2) {
1894 	    /* ------------------------------------------------------------- */
1895 	    // Two words read means that we get the instruction plus one 32
1896 	    // word read. Thus we set the register to be a 32 bit register.
1897 	    /* ------------------------------------------------------------- */
1898 	    TregMemBuffer[theRegister]= 0x50;
1899             // arithmetic conversion if big-endian
1900             * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
1901 	  } else if (TnoDataRW == 3) {
1902 	    /* ------------------------------------------------------------- */
1903 	    // Three words read means that we get the instruction plus two
1904 	    // 32 words read. Thus we set the register to be a 64 bit register.
1905 	    /* ------------------------------------------------------------- */
1906 	    TregMemBuffer[theRegister]= 0x60;
1907             TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
1908             TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
1909 	  } else if (TnoDataRW == 1) {
1910 	    /* ------------------------------------------------------------- */
1911 	    // One word read means that we must have read a NULL value. We set
1912 	    // the register to indicate a NULL value.
1913 	    /* ------------------------------------------------------------- */
1914 	    TregMemBuffer[theRegister]= 0;
1915 	    TregMemBuffer[theRegister + 2]= 0;
1916 	    TregMemBuffer[theRegister + 3]= 0;
1917 	  } else if (TnoDataRW == -1) {
1918 	    jam();
1919 	    tupkeyErrorLab(signal);
1920 	    return -1;
1921 	  } else {
1922 	    /* ------------------------------------------------------------- */
1923 	    // Any other return value from the read attribute here is not
1924 	    // allowed and will lead to a system crash.
1925 	    /* ------------------------------------------------------------- */
1926 	    ndbrequire(false);
1927 	  }
1928 	  break;
1929 	}
1930 
1931       case Interpreter::WRITE_ATTR_FROM_REG:
1932 	jam();
1933 	{
1934 	  Uint32 TattrId= theInstruction >> 16;
1935 	  Uint32 TattrDescrIndex= tabptr.p->tabDescriptor +
1936 	    (TattrId << ZAD_LOG_SIZE);
1937 	  Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
1938 	  Uint32 TregType= TregMemBuffer[theRegister];
1939 
1940 	  /* --------------------------------------------------------------- */
1941 	  // Calculate the number of words of this attribute.
1942 	  // We allow writes into arrays as long as they fit into the 64 bit
1943 	  // register size.
1944 	  /* --------------------------------------------------------------- */
1945           Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
1946 	  Uint32 Toptype = operPtr.p->op_struct.op_type;
1947 	  Uint32 TdataForUpdate[3];
1948 	  Uint32 Tlen;
1949 
1950 	  AttributeHeader ah(TattrId, TattrNoOfWords << 2);
1951           TdataForUpdate[0]= ah.m_value;
1952 	  TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
1953 	  TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
1954 	  Tlen= TattrNoOfWords + 1;
1955 	  if (Toptype == ZUPDATE) {
1956 	    if (TattrNoOfWords <= 2) {
1957               if (TattrNoOfWords == 1) {
1958                 // arithmetic conversion if big-endian
1959                 TdataForUpdate[1] = *(Int64*)&TregMemBuffer[theRegister + 2];
1960                 TdataForUpdate[2] = 0;
1961               }
1962 	      if (TregType == 0) {
1963 		/* --------------------------------------------------------- */
1964 		// Write a NULL value into the attribute
1965 		/* --------------------------------------------------------- */
1966 		ah.setNULL();
1967                 TdataForUpdate[0]= ah.m_value;
1968 		Tlen= 1;
1969 	      }
1970 	      int TnoDataRW= updateAttributes(req_struct,
1971 					   &TdataForUpdate[0],
1972 					   Tlen);
1973 	      if (TnoDataRW != -1) {
1974 		/* --------------------------------------------------------- */
1975 		// Write the written data also into the log buffer so that it
1976 		// will be logged.
1977 		/* --------------------------------------------------------- */
1978 		logMemory[TdataWritten + 0]= TdataForUpdate[0];
1979 		logMemory[TdataWritten + 1]= TdataForUpdate[1];
1980 		logMemory[TdataWritten + 2]= TdataForUpdate[2];
1981 		TdataWritten += Tlen;
1982 	      } else {
1983 		tupkeyErrorLab(signal);
1984 		return -1;
1985 	      }
1986 	    } else {
1987 	      return TUPKEY_abort(signal, 15);
1988 	    }
1989 	  } else {
1990 	    return TUPKEY_abort(signal, 16);
1991 	  }
1992 	  break;
1993 	}
1994 
1995       case Interpreter::LOAD_CONST_NULL:
1996 	jam();
1997 	TregMemBuffer[theRegister]= 0;	/* NULL INDICATOR */
1998 	break;
1999 
2000       case Interpreter::LOAD_CONST16:
2001 	jam();
2002 	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
2003 	* (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
2004 	break;
2005 
2006       case Interpreter::LOAD_CONST32:
2007 	jam();
2008 	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
2009 	* (Int64*)(TregMemBuffer+theRegister+2)= *
2010 	  (TcurrentProgram+TprogramCounter);
2011 	TprogramCounter++;
2012 	break;
2013 
2014       case Interpreter::LOAD_CONST64:
2015 	jam();
2016 	TregMemBuffer[theRegister]= 0x60;	/* 64 BIT UNSIGNED CONSTANT */
2017         TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
2018                                              TprogramCounter++);
2019         TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
2020                                              TprogramCounter++);
2021 	break;
2022 
2023       case Interpreter::ADD_REG_REG:
2024 	jam();
2025 	{
2026 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2027 	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2028 
2029 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2030 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2031 
2032 
2033 	  Uint32 TleftType= TregMemBuffer[theRegister];
2034 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2035 
2036 	  if ((TleftType | TrightType) != 0) {
2037 	    Uint64 Tdest0= Tleft0 + Tright0;
2038 	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2039 	    TregMemBuffer[TdestRegister]= 0x60;
2040 	  } else {
2041 	    return TUPKEY_abort(signal, 20);
2042 	  }
2043 	  break;
2044 	}
2045 
2046       case Interpreter::SUB_REG_REG:
2047 	jam();
2048 	{
2049 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2050 	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2051 
2052 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2053 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2054 
2055 	  Uint32 TleftType= TregMemBuffer[theRegister];
2056 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2057 
2058 	  if ((TleftType | TrightType) != 0) {
2059 	    Int64 Tdest0= Tleft0 - Tright0;
2060 	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2061 	    TregMemBuffer[TdestRegister]= 0x60;
2062 	  } else {
2063 	    return TUPKEY_abort(signal, 20);
2064 	  }
2065 	  break;
2066 	}
2067 
2068       case Interpreter::BRANCH:
2069 	TprogramCounter= brancher(theInstruction, TprogramCounter);
2070 	break;
2071 
2072       case Interpreter::BRANCH_REG_EQ_NULL:
2073 	if (TregMemBuffer[theRegister] != 0) {
2074 	  jam();
2075 	  continue;
2076 	} else {
2077 	  jam();
2078 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
2079 	}
2080 	break;
2081 
2082       case Interpreter::BRANCH_REG_NE_NULL:
2083 	if (TregMemBuffer[theRegister] == 0) {
2084 	  jam();
2085 	  continue;
2086 	} else {
2087 	  jam();
2088 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
2089 	}
2090 	break;
2091 
2092 
2093       case Interpreter::BRANCH_EQ_REG_REG:
2094 	{
2095 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2096 
2097 	  Uint32 TleftType= TregMemBuffer[theRegister];
2098 	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2099 	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2100 
2101 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2102 	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2103 	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2104 	  if ((TrightType | TleftType) != 0) {
2105 	    jam();
2106 	    if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
2107 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2108 	    }
2109 	  } else {
2110 	    return TUPKEY_abort(signal, 23);
2111 	  }
2112 	  break;
2113 	}
2114 
2115       case Interpreter::BRANCH_NE_REG_REG:
2116 	{
2117 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2118 
2119 	  Uint32 TleftType= TregMemBuffer[theRegister];
2120 	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2121 	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2122 
2123 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2124 	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2125 	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2126 	  if ((TrightType | TleftType) != 0) {
2127 	    jam();
2128 	    if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
2129 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2130 	    }
2131 	  } else {
2132 	    return TUPKEY_abort(signal, 24);
2133 	  }
2134 	  break;
2135 	}
2136 
2137       case Interpreter::BRANCH_LT_REG_REG:
2138 	{
2139 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2140 
2141 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2142 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2143 
2144 	  Uint32 TleftType= TregMemBuffer[theRegister];
2145 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2146 
2147 
2148 	  if ((TrightType | TleftType) != 0) {
2149 	    jam();
2150 	    if (Tleft0 < Tright0) {
2151 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2152 	    }
2153 	  } else {
2154 	    return TUPKEY_abort(signal, 24);
2155 	  }
2156 	  break;
2157 	}
2158 
2159       case Interpreter::BRANCH_LE_REG_REG:
2160 	{
2161 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2162 
2163 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2164 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2165 
2166 	  Uint32 TleftType= TregMemBuffer[theRegister];
2167 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2168 
2169 
2170 	  if ((TrightType | TleftType) != 0) {
2171 	    jam();
2172 	    if (Tleft0 <= Tright0) {
2173 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2174 	    }
2175 	  } else {
2176 	    return TUPKEY_abort(signal, 26);
2177 	  }
2178 	  break;
2179 	}
2180 
2181       case Interpreter::BRANCH_GT_REG_REG:
2182 	{
2183 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2184 
2185 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2186 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2187 
2188 	  Uint32 TleftType= TregMemBuffer[theRegister];
2189 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2190 
2191 
2192 	  if ((TrightType | TleftType) != 0) {
2193 	    jam();
2194 	    if (Tleft0 > Tright0){
2195 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2196 	    }
2197 	  } else {
2198 	    return TUPKEY_abort(signal, 27);
2199 	  }
2200 	  break;
2201 	}
2202 
2203       case Interpreter::BRANCH_GE_REG_REG:
2204 	{
2205 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2206 
2207 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2208 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2209 
2210 	  Uint32 TleftType= TregMemBuffer[theRegister];
2211 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2212 
2213 
2214 	  if ((TrightType | TleftType) != 0) {
2215 	    jam();
2216 	    if (Tleft0 >= Tright0){
2217 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2218 	    }
2219 	  } else {
2220 	    return TUPKEY_abort(signal, 28);
2221 	  }
2222 	  break;
2223 	}
2224 
2225       case Interpreter::BRANCH_ATTR_OP_ARG:{
2226 	jam();
2227 	Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
2228 	Uint32 ins2 = TcurrentProgram[TprogramCounter];
2229 	Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
2230 	Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
2231 
2232 	if(tmpHabitant != attrId){
2233 	  Int32 TnoDataR = readAttributes(req_struct,
2234 					  &attrId, 1,
2235 					  tmpArea, tmpAreaSz,
2236                                           false);
2237 
2238 	  if (TnoDataR == -1) {
2239 	    jam();
2240 	    tupkeyErrorLab(signal);
2241 	    return -1;
2242 	  }
2243 	  tmpHabitant= attrId;
2244 	}
2245 
2246         // get type
2247 	attrId >>= 16;
2248 	Uint32 TattrDescrIndex = tabptr.p->tabDescriptor +
2249 	  (attrId << ZAD_LOG_SIZE);
2250 	Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
2251 	Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
2252 	Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
2253 	void * cs = 0;
2254 	if(AttributeOffset::getCharsetFlag(TattrDesc2))
2255 	{
2256 	  Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
2257 	  cs = tabptr.p->charsetArray[pos];
2258 	}
2259 	const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
2260 
2261         // get data
2262 	AttributeHeader ah(tmpArea[0]);
2263         const char* s1 = (char*)&tmpArea[1];
2264         const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
2265         // fixed length in 5.0
2266 	Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
2267 
2268 	bool r1_null = ah.isNULL();
2269 	bool r2_null = argLen == 0;
2270 	int res1;
2271         if (cond != Interpreter::LIKE &&
2272             cond != Interpreter::NOT_LIKE) {
2273           if (r1_null || r2_null) {
2274             // NULL==NULL and NULL<not-NULL
2275             res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
2276           } else {
2277 	    jam();
2278 	    if (unlikely(sqlType.m_cmp == 0))
2279 	    {
2280 	      return TUPKEY_abort(signal, 40);
2281 	    }
2282             res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
2283           }
2284 	} else {
2285           if (r1_null || r2_null) {
2286             // NULL like NULL is true (has no practical use)
2287             res1 =  r1_null && r2_null ? 0 : -1;
2288           } else {
2289 	    jam();
2290 	    if (unlikely(sqlType.m_like == 0))
2291 	    {
2292 	      return TUPKEY_abort(signal, 40);
2293 	    }
2294             res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
2295           }
2296         }
2297 
2298         int res = 0;
2299         switch ((Interpreter::BinaryCondition)cond) {
2300         case Interpreter::EQ:
2301           res = (res1 == 0);
2302           break;
2303         case Interpreter::NE:
2304           res = (res1 != 0);
2305           break;
2306         // note the condition is backwards
2307         case Interpreter::LT:
2308           res = (res1 > 0);
2309           break;
2310         case Interpreter::LE:
2311           res = (res1 >= 0);
2312           break;
2313         case Interpreter::GT:
2314           res = (res1 < 0);
2315           break;
2316         case Interpreter::GE:
2317           res = (res1 <= 0);
2318           break;
2319         case Interpreter::LIKE:
2320           res = (res1 == 0);
2321           break;
2322         case Interpreter::NOT_LIKE:
2323           res = (res1 == 1);
2324           break;
2325 	  // XXX handle invalid value
2326         }
2327 #ifdef TRACE_INTERPRETER
2328 	ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
2329 		 cond, attrId >> 16,
2330                  attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
2331 #endif
2332         if (res)
2333           TprogramCounter = brancher(theInstruction, TprogramCounter);
2334         else
2335 	{
2336           Uint32 tmp = ((argLen + 3) >> 2) + 1;
2337           TprogramCounter += tmp;
2338         }
2339 	break;
2340       }
2341 
2342       case Interpreter::BRANCH_ATTR_EQ_NULL:{
2343 	jam();
2344 	Uint32 ins2= TcurrentProgram[TprogramCounter];
2345 	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
2346 
2347 	if (tmpHabitant != attrId){
2348 	  Int32 TnoDataR= readAttributes(req_struct,
2349 					  &attrId, 1,
2350 					  tmpArea, tmpAreaSz,
2351                                           false);
2352 
2353 	  if (TnoDataR == -1) {
2354 	    jam();
2355 	    tupkeyErrorLab(signal);
2356 	    return -1;
2357 	  }
2358 	  tmpHabitant= attrId;
2359 	}
2360 
2361 	AttributeHeader ah(tmpArea[0]);
2362 	if (ah.isNULL()){
2363 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
2364 	} else {
2365 	  TprogramCounter ++;
2366 	}
2367 	break;
2368       }
2369 
2370       case Interpreter::BRANCH_ATTR_NE_NULL:{
2371 	jam();
2372 	Uint32 ins2= TcurrentProgram[TprogramCounter];
2373 	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
2374 
2375 	if (tmpHabitant != attrId){
2376 	  Int32 TnoDataR= readAttributes(req_struct,
2377 					  &attrId, 1,
2378 					  tmpArea, tmpAreaSz,
2379                                           false);
2380 
2381 	  if (TnoDataR == -1) {
2382 	    jam();
2383 	    tupkeyErrorLab(signal);
2384 	    return -1;
2385 	  }
2386 	  tmpHabitant= attrId;
2387 	}
2388 
2389 	AttributeHeader ah(tmpArea[0]);
2390 	if (ah.isNULL()){
2391 	  TprogramCounter ++;
2392 	} else {
2393 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
2394 	}
2395 	break;
2396       }
2397 
2398       case Interpreter::EXIT_OK:
2399 	jam();
2400 #ifdef TRACE_INTERPRETER
2401 	ndbout_c(" - exit_ok");
2402 #endif
2403 	return TdataWritten;
2404 
2405       case Interpreter::EXIT_OK_LAST:
2406 	jam();
2407 #ifdef TRACE_INTERPRETER
2408 	ndbout_c(" - exit_ok_last");
2409 #endif
2410 	req_struct->last_row= true;
2411 	return TdataWritten;
2412 
2413       case Interpreter::EXIT_REFUSE:
2414 	jam();
2415 #ifdef TRACE_INTERPRETER
2416 	ndbout_c(" - exit_nok");
2417 #endif
2418 	terrorCode= theInstruction >> 16;
2419 	return TUPKEY_abort(signal, 29);
2420 
2421       case Interpreter::CALL:
2422 	jam();
2423 	RstackPtr++;
2424 	if (RstackPtr < 32) {
2425 	  TstackMemBuffer[RstackPtr]= TprogramCounter + 1;
2426 	  TprogramCounter= theInstruction >> 16;
2427 	  if (TprogramCounter < TsubroutineLen) {
2428 	    TcurrentProgram= subroutineProg;
2429 	    TcurrentSize= TsubroutineLen;
2430 	  } else {
2431 	    return TUPKEY_abort(signal, 30);
2432 	  }
2433 	} else {
2434 	  return TUPKEY_abort(signal, 31);
2435 	}
2436 	break;
2437 
2438       case Interpreter::RETURN:
2439 	jam();
2440 	if (RstackPtr > 0) {
2441 	  TprogramCounter= TstackMemBuffer[RstackPtr];
2442 	  RstackPtr--;
2443 	  if (RstackPtr == 0) {
2444 	    jam();
2445 	    /* ------------------------------------------------------------- */
2446 	    // We are back to the main program.
2447 	    /* ------------------------------------------------------------- */
2448 	    TcurrentProgram= mainProgram;
2449 	    TcurrentSize= TmainProgLen;
2450 	  }
2451 	} else {
2452 	  return TUPKEY_abort(signal, 32);
2453 	}
2454 	break;
2455 
2456       default:
2457 	return TUPKEY_abort(signal, 33);
2458       }
2459     } else {
2460       return TUPKEY_abort(signal, 34);
2461     }
2462   }
2463   return TUPKEY_abort(signal, 35);
2464 }
2465 
2466 /**
2467  * expand_var_part - copy packed variable attributes to fully expanded size
2468  *
2469  * dst:        where to start writing attribute data
2470  * dst_off_ptr where to write attribute offsets
2471  * src         pointer to packed attributes
2472  * tabDesc     array of attribute descriptors (used for getting max size)
2473  * no_of_attr  no of atributes to expand
2474  */
2475 Uint32*
expand_var_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,const Uint32 * tabDesc,const Uint16 * order)2476 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
2477 		const Uint32* src,
2478 		const Uint32 * tabDesc,
2479 		const Uint16* order)
2480 {
2481   char* dst_ptr= dst->m_data_ptr;
2482   Uint32 no_attr= dst->m_var_len_offset;
2483   Uint16* dst_off_ptr= dst->m_offset_array_ptr;
2484   Uint16* dst_len_ptr= dst_off_ptr + no_attr;
2485   const Uint16* src_off_ptr= (const Uint16*)src;
2486   const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
2487 
2488   Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
2489   for(Uint32 i = 0; i<no_attr; i++)
2490   {
2491     next_pos= *src_off_ptr++;
2492     len= next_pos - tmp;
2493 
2494     *dst_off_ptr++ = dst_off;
2495     *dst_len_ptr++ = dst_off + len;
2496     memcpy(dst_ptr, src_ptr, len);
2497     src_ptr += len;
2498 
2499     max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
2500     dst_ptr += max_len; // Max size
2501     dst_off += max_len;
2502 
2503     tmp= next_pos;
2504   }
2505 
2506   return ALIGN_WORD(dst_ptr);
2507 }
2508 
2509 void
expand_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],Tuple_header * src,const Tablerec * tabPtrP,bool disk)2510 Dbtup::expand_tuple(KeyReqStruct* req_struct,
2511 		    Uint32 sizes[2],
2512 		    Tuple_header* src,
2513 		    const Tablerec* tabPtrP,
2514 		    bool disk)
2515 {
2516   Uint32 bits= src->m_header_bits;
2517   Tuple_header* ptr= req_struct->m_tuple_ptr;
2518 
2519   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
2520   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
2521   Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
2522   Uint32 order_desc= tabPtrP->m_real_order_descriptor;
2523 
2524   Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
2525   const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
2526   const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
2527   const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
2528   const Uint32 *desc= (Uint32*)req_struct->attr_descr;
2529   const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
2530   order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
2531 
2532   if(mm_vars)
2533   {
2534 
2535     Uint32 step; // in bytes
2536     const Uint32 *src_data= src_ptr;
2537     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
2538     if(bits & Tuple_header::CHAINED_ROW)
2539     {
2540       Ptr<Page> var_page;
2541       src_data= get_ptr(&var_page, *var_ref);
2542       step= 4;
2543       sizes[MM]= (2 + (mm_vars << 1) + ((Uint16*)src_data)[mm_vars] + 3) >> 2;
2544       req_struct->m_varpart_page_ptr = var_page;
2545     }
2546     else
2547     {
2548       step= (2 + (mm_vars << 1) + ((Uint16*)src_ptr)[mm_vars]);
2549       sizes[MM]= (step + 3) >> 2;
2550       req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
2551     }
2552     dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
2553     dst->m_offset_array_ptr= req_struct->var_pos_array;
2554     dst->m_var_len_offset= mm_vars;
2555     dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
2556 
2557     dst_ptr= expand_var_part(dst, src_data, desc, order);
2558     ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
2559     ndbassert((UintPtr(src_ptr) & 3) == 0);
2560     src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
2561 
2562     sizes[MM] += fix_size;
2563     memcpy(ptr, src, 4*fix_size);
2564   }
2565   else
2566   {
2567     sizes[MM]= 1;
2568     memcpy(ptr, src, 4*fix_size);
2569   }
2570 
2571   src->m_header_bits= bits &
2572     ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
2573 
2574   sizes[DD]= 0;
2575   if(disk && dd_tot)
2576   {
2577     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
2578     order += mm_vars;
2579 
2580     if(bits & Tuple_header::DISK_INLINE)
2581     {
2582       // Only on copy tuple
2583       ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
2584     }
2585     else
2586     {
2587       Local_key key;
2588       memcpy(&key, disk_ref, sizeof(key));
2589       key.m_page_no= req_struct->m_disk_page_ptr.i;
2590       src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
2591     }
2592     bits |= Tuple_header::DISK_INLINE;
2593 
2594     // Fix diskpart
2595     req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
2596     memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
2597     sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
2598 
2599     ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
2600 
2601     ndbrequire(dd_vars == 0);
2602   }
2603 
2604   ptr->m_header_bits= (bits & ~(Uint32)(Tuple_header::CHAINED_ROW));
2605 }
2606 
2607 void
prepare_read(KeyReqStruct * req_struct,Tablerec * tabPtrP,bool disk)2608 Dbtup::prepare_read(KeyReqStruct* req_struct,
2609 		    Tablerec* tabPtrP, bool disk)
2610 {
2611   Tuple_header* ptr= req_struct->m_tuple_ptr;
2612 
2613   Uint32 bits= ptr->m_header_bits;
2614   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
2615   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
2616 
2617   const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
2618   const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
2619   const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
2620   if(mm_vars)
2621   {
2622     const Uint32 *src_data= src_ptr;
2623     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
2624     if(bits & Tuple_header::CHAINED_ROW)
2625     {
2626 #if VM_TRACE
2627 
2628 #endif
2629       src_data= get_ptr(* var_ref);
2630     }
2631     dst->m_data_ptr= (char*)(((Uint16*)src_data)+mm_vars+1);
2632     dst->m_offset_array_ptr= (Uint16*)src_data;
2633     dst->m_var_len_offset= 1;
2634     dst->m_max_var_offset= ((Uint16*)src_data)[mm_vars];
2635 
2636     // disk part start after varsize (aligned)
2637     src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
2638   }
2639 
2640   if(disk && dd_tot)
2641   {
2642     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
2643 
2644     if(bits & Tuple_header::DISK_INLINE)
2645     {
2646       // Only on copy tuple
2647       ndbassert((bits & Tuple_header::CHAINED_ROW) == 0);
2648     }
2649     else
2650     {
2651       // XXX
2652       Local_key key;
2653       memcpy(&key, disk_ref, sizeof(key));
2654       key.m_page_no= req_struct->m_disk_page_ptr.i;
2655       src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
2656     }
2657     // Fix diskpart
2658     req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
2659     ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
2660     ndbrequire(dd_vars == 0);
2661   }
2662 }
2663 
2664 void
shrink_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],const Tablerec * tabPtrP,bool disk)2665 Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
2666 		    const Tablerec* tabPtrP, bool disk)
2667 {
2668   ndbassert(tabPtrP->need_shrink());
2669   Tuple_header* ptr= req_struct->m_tuple_ptr;
2670 
2671   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
2672   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
2673   Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
2674 
2675   Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
2676   Uint16* src_off_ptr= req_struct->var_pos_array;
2677 
2678   sizes[MM] = 1;
2679   sizes[DD] = 0;
2680   if(mm_vars)
2681   {
2682     Uint16* dst_off_ptr= (Uint16*)dst_ptr;
2683     char*  dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
2684     char*  src_data_ptr= dst_data_ptr;
2685     Uint32 off= 0;
2686     for(Uint32 i= 0; i<mm_vars; i++)
2687     {
2688       const char* data_ptr= src_data_ptr + *src_off_ptr;
2689       Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
2690       * dst_off_ptr++= off;
2691       memmove(dst_data_ptr, data_ptr, len);
2692       off += len;
2693       src_off_ptr++;
2694       dst_data_ptr += len;
2695     }
2696     *dst_off_ptr= off;
2697     ndbassert(dst_data_ptr <= ((char*)ptr) + 8192);
2698     ndbassert((UintPtr(ptr) & 3) == 0);
2699     sizes[MM]= (dst_data_ptr + 3 - ((char*)ptr)) >> 2;
2700 
2701     dst_ptr = ALIGN_WORD(dst_data_ptr);
2702   }
2703 
2704   if(disk && dd_tot)
2705   {
2706     Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
2707     req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
2708     ndbrequire(dd_vars == 0);
2709     sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
2710     memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
2711   }
2712 }
2713 
2714 void
validate_page(Tablerec * regTabPtr,Var_page * p)2715 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
2716 {
2717   Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
2718   Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
2719     Tuple_header::HeaderSize;
2720 
2721   if(mm_vars == 0)
2722     return;
2723 
2724   for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
2725   {
2726     FragrecordPtr fragPtr;
2727 
2728     if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
2729       continue;
2730 
2731     ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
2732     for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
2733     {
2734       Uint32 real= getRealpid(fragPtr.p, P);
2735       Var_page* page= (Var_page*)c_page_pool.getPtr(real);
2736 
2737       for(Uint32 i=1; i<page->high_index; i++)
2738       {
2739 	Uint32 idx= page->get_index_word(i);
2740 	Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
2741 	if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
2742 	{
2743 	  Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
2744 	  Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
2745 	  if(ptr->m_header_bits & Tuple_header::CHAINED_ROW)
2746 	  {
2747 	    ndbassert(len == fix_sz + 1);
2748 	    Local_key tmp; tmp.assref(*part);
2749 	    Ptr<Page> tmpPage;
2750 	    part= get_ptr(&tmpPage, *(Var_part_ref*)part);
2751 	    len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
2752 	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
2753 	    ndbassert(len >= ((sz + 3) >> 2));
2754 	  }
2755 	  else
2756 	  {
2757 	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
2758 	    ndbassert(len >= ((sz+3)>>2)+fix_sz);
2759 	  }
2760 	  if(ptr->m_operation_ptr_i != RNIL)
2761 	  {
2762 	    c_operation_pool.getPtr(ptr->m_operation_ptr_i);
2763 	  }
2764 	}
2765 	else if(!(idx & Var_page::FREE))
2766 	{
2767 	  /**
2768 	   * Chain
2769 	   */
2770 	  Uint32 *part= page->get_ptr(i);
2771 	  Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
2772 	  ndbassert(len >= ((sz + 3) >> 2));
2773 	}
2774 	else
2775 	{
2776 
2777 	}
2778       }
2779       if(p == 0 && page->high_index > 1)
2780 	page->reorg((Var_page*)ctemp_page);
2781     }
2782   }
2783 
2784   if(p == 0)
2785   {
2786     validate_page(regTabPtr, (Var_page*)1);
2787   }
2788 }
2789 
2790 int
handle_size_change_after_update(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Uint32 sizes[4])2791 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
2792 				       Tuple_header* org,
2793 				       Operationrec* regOperPtr,
2794 				       Fragrecord* regFragPtr,
2795 				       Tablerec* regTabPtr,
2796 				       Uint32 sizes[4])
2797 {
2798   ndbrequire(sizes[1] == sizes[3]);
2799   //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
2800   if(0)
2801     printf("%p %d %d - handle_size_change_after_update ",
2802 	   req_struct->m_tuple_ptr,
2803 	   regOperPtr->m_tuple_location.m_page_no,
2804 	   regOperPtr->m_tuple_location.m_page_idx);
2805 
2806   Uint32 bits= org->m_header_bits;
2807   Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
2808   Uint32 fix_sz = regTabPtr->m_offsets[MM].m_fix_header_size;
2809 
2810   if(sizes[MM] == sizes[2+MM])
2811     ;
2812   else if(sizes[MM] > sizes[2+MM])
2813   {
2814     if(0) ndbout_c("shrink");
2815     copy_bits |= Tuple_header::MM_SHRINK;
2816   }
2817   else
2818   {
2819     if(0) printf("grow - ");
2820     Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
2821     Var_page* pageP= (Var_page*)pagePtr.p;
2822     Uint32 idx, alloc, needed;
2823     Var_part_ref *refptr = org->get_var_part_ref_ptr(regTabPtr);
2824     ndbassert(bits & Tuple_header::CHAINED_ROW);
2825 
2826     Local_key ref;
2827     refptr->copyout(&ref);
2828     idx= ref.m_page_idx;
2829     if (! (copy_bits & Tuple_header::CHAINED_ROW))
2830     {
2831       c_page_pool.getPtr(pagePtr, ref.m_page_no);
2832       pageP = (Var_page*)pagePtr.p;
2833     }
2834     alloc= pageP->get_entry_len(idx);
2835 #ifdef VM_TRACE
2836     if(!pageP->get_entry_chain(idx))
2837       ndbout << *pageP << endl;
2838 #endif
2839     ndbassert(pageP->get_entry_chain(idx));
2840     needed= sizes[2+MM] - fix_sz;
2841 
2842     if(needed <= alloc)
2843     {
2844       //ndbassert(!regOperPtr->is_first_operation());
2845       if (0) ndbout_c(" no grow");
2846       return 0;
2847     }
2848     copy_bits |= Tuple_header::MM_GROWN;
2849     if (unlikely(realloc_var_part(regFragPtr, regTabPtr, pagePtr,
2850 				  refptr, alloc, needed)))
2851       return -1;
2852 
2853     if (regTabPtr->m_bits & Tablerec::TR_Checksum)
2854     {
2855       jam();
2856       setChecksum(org, regTabPtr);
2857     }
2858   }
2859   req_struct->m_tuple_ptr->m_header_bits = copy_bits;
2860   return 0;
2861 }
2862 
2863 int
nr_update_gci(Uint32 fragPtrI,const Local_key * key,Uint32 gci)2864 Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
2865 {
2866   FragrecordPtr fragPtr;
2867   fragPtr.i= fragPtrI;
2868   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
2869   TablerecPtr tablePtr;
2870   tablePtr.i= fragPtr.p->fragTableId;
2871   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
2872 
2873   if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
2874   {
2875     Local_key tmp = *key;
2876     PagePtr page_ptr;
2877 
2878     int ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
2879 
2880     if (ret)
2881       return -1;
2882 
2883     Tuple_header* ptr = (Tuple_header*)
2884       ((Fix_page*)page_ptr.p)->get_ptr(tmp.m_page_idx, 0);
2885 
2886     ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
2887     *ptr->get_mm_gci(tablePtr.p) = gci;
2888   }
2889   return 0;
2890 }
2891 
2892 int
nr_read_pk(Uint32 fragPtrI,const Local_key * key,Uint32 * dst,bool & copy)2893 Dbtup::nr_read_pk(Uint32 fragPtrI,
2894 		  const Local_key* key, Uint32* dst, bool& copy)
2895 {
2896 
2897   FragrecordPtr fragPtr;
2898   fragPtr.i= fragPtrI;
2899   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
2900   TablerecPtr tablePtr;
2901   tablePtr.i= fragPtr.p->fragTableId;
2902   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
2903 
2904   Local_key tmp = *key;
2905 
2906 
2907   PagePtr page_ptr;
2908   int ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
2909   if (ret)
2910     return -1;
2911 
2912   KeyReqStruct req_struct;
2913   Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
2914 
2915   req_struct.m_page_ptr = page_ptr;
2916   req_struct.m_tuple_ptr = (Tuple_header*)ptr;
2917   Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
2918 
2919   ret = 0;
2920   copy = false;
2921   if (! (bits & Tuple_header::FREE))
2922   {
2923     if (bits & Tuple_header::ALLOC)
2924     {
2925       Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
2926       Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
2927       ndbassert(!opPtrP->m_copy_tuple_location.isNull());
2928       req_struct.m_tuple_ptr= (Tuple_header*)
2929 	c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
2930       copy = true;
2931     }
2932     req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
2933     req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
2934 
2935     Uint32 num_attr= tablePtr.p->m_no_of_attributes;
2936     Uint32 descr_start= tablePtr.p->tabDescriptor;
2937     TableDescriptor *tab_descr= &tableDescriptor[descr_start];
2938     ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
2939     req_struct.attr_descr= tab_descr;
2940 
2941     if (tablePtr.p->need_expand())
2942       prepare_read(&req_struct, tablePtr.p, false);
2943 
2944     const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
2945     const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
2946     // read pk attributes from original tuple
2947 
2948     // new globals
2949     tabptr= tablePtr;
2950     fragptr= fragPtr;
2951     operPtr.i= RNIL;
2952     operPtr.p= NULL;
2953 
2954     // do it
2955     ret = readAttributes(&req_struct,
2956 			 attrIds,
2957 			 numAttrs,
2958 			 dst,
2959 			 ZNIL, false);
2960 
2961     // done
2962     if (likely(ret != -1)) {
2963       // remove headers
2964       Uint32 n= 0;
2965       Uint32 i= 0;
2966       while (n < numAttrs) {
2967 	const AttributeHeader ah(dst[i]);
2968 	Uint32 size= ah.getDataSize();
2969 	ndbrequire(size != 0);
2970 	for (Uint32 j= 0; j < size; j++) {
2971 	  dst[i + j - n]= dst[i + j + 1];
2972 	}
2973 	n+= 1;
2974 	i+= 1 + size;
2975       }
2976       ndbrequire((int)i == ret);
2977       ret -= numAttrs;
2978     } else {
2979       return terrorCode ? (-(int)terrorCode) : -1;
2980     }
2981   }
2982 
2983   if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
2984   {
2985     dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
2986   }
2987   else
2988   {
2989     dst[ret] = 0;
2990   }
2991   return ret;
2992 }
2993 
2994 #include <signaldata/TuxMaint.hpp>
2995 
2996 int
nr_delete(Signal * signal,Uint32 senderData,Uint32 fragPtrI,const Local_key * key,Uint32 gci)2997 Dbtup::nr_delete(Signal* signal, Uint32 senderData,
2998 		 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
2999 {
3000   FragrecordPtr fragPtr;
3001   fragPtr.i= fragPtrI;
3002   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3003   TablerecPtr tablePtr;
3004   tablePtr.i= fragPtr.p->fragTableId;
3005   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3006 
3007   Local_key tmp = * key;
3008   tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no);
3009 
3010   PagePtr pagePtr;
3011   Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
3012 
3013   if (!tablePtr.p->tuxCustomTriggers.isEmpty())
3014   {
3015     jam();
3016     TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
3017     req->tableId = fragPtr.p->fragTableId;
3018     req->fragId = fragPtr.p->fragmentId;
3019     req->pageId = tmp.m_page_no;
3020     req->pageIndex = tmp.m_page_idx;
3021     req->tupVersion = ptr->get_tuple_version();
3022     req->opInfo = TuxMaintReq::OpRemove;
3023     removeTuxEntries(signal, tablePtr.p);
3024   }
3025 
3026   Local_key disk;
3027   memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
3028 
3029   if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
3030   {
3031     jam();
3032     free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
3033   } else {
3034     jam();
3035     free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
3036   }
3037 
3038   if (tablePtr.p->m_no_of_disk_attributes)
3039   {
3040     jam();
3041 
3042     Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
3043       tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
3044 
3045     int res = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
3046     ndbrequire(res == 0);
3047 
3048     /**
3049      * 1) alloc log buffer
3050      * 2) get page
3051      * 3) get log buffer
3052      * 4) delete tuple
3053      */
3054     Page_cache_client::Request preq;
3055     preq.m_page = disk;
3056     preq.m_callback.m_callbackData = senderData;
3057     preq.m_callback.m_callbackFunction =
3058       safe_cast(&Dbtup::nr_delete_page_callback);
3059     int flags = Page_cache_client::COMMIT_REQ;
3060 
3061 #ifdef ERROR_INSERT
3062     if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
3063     {
3064       int rnd = rand() % 100;
3065       int slp = 0;
3066       if (ERROR_INSERTED(4024))
3067       {
3068 	slp = 3000;
3069       }
3070       else if (rnd > 90)
3071       {
3072 	slp = 3000;
3073       }
3074       else if (rnd > 70)
3075       {
3076 	slp = 100;
3077       }
3078 
3079       ndbout_c("rnd: %d slp: %d", rnd, slp);
3080 
3081       if (slp)
3082       {
3083 	flags |= Page_cache_client::DELAY_REQ;
3084 	preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
3085       }
3086     }
3087 #endif
3088 
3089     res = m_pgman.get_page(signal, preq, flags);
3090     if (res == 0)
3091     {
3092       goto timeslice;
3093     }
3094     else if (unlikely(res == -1))
3095     {
3096       return -1;
3097     }
3098 
3099     PagePtr disk_page = *(PagePtr*)&m_pgman.m_ptr;
3100     disk_page_set_dirty(disk_page);
3101 
3102     preq.m_callback.m_callbackFunction =
3103       safe_cast(&Dbtup::nr_delete_log_buffer_callback);
3104     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
3105     res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
3106     switch(res){
3107     case 0:
3108       signal->theData[2] = disk_page.i;
3109       goto timeslice;
3110     case -1:
3111       ndbrequire("NOT YET IMPLEMENTED" == 0);
3112       break;
3113     }
3114 
3115     if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
3116     disk_page_free(signal, tablePtr.p, fragPtr.p,
3117 		   &disk, *(PagePtr*)&disk_page, gci);
3118     return 0;
3119   }
3120 
3121   return 0;
3122 
3123 timeslice:
3124   memcpy(signal->theData, &disk, sizeof(disk));
3125   return 1;
3126 }
3127 
3128 void
nr_delete_page_callback(Signal * signal,Uint32 userpointer,Uint32 page_id)3129 Dbtup::nr_delete_page_callback(Signal* signal,
3130 			       Uint32 userpointer, Uint32 page_id)
3131 {
3132   Ptr<GlobalPage> gpage;
3133   m_global_page_pool.getPtr(gpage, page_id);
3134   PagePtr pagePtr= *(PagePtr*)&gpage;
3135   disk_page_set_dirty(pagePtr);
3136   Dblqh::Nr_op_info op;
3137   op.m_ptr_i = userpointer;
3138   op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
3139   op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
3140   c_lqh->get_nr_op_info(&op, page_id);
3141 
3142   Ptr<Fragrecord> fragPtr;
3143   fragPtr.i= op.m_tup_frag_ptr_i;
3144   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3145 
3146   Ptr<Tablerec> tablePtr;
3147   tablePtr.i = fragPtr.p->fragTableId;
3148   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3149 
3150   Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
3151     tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
3152 
3153   Callback cb;
3154   cb.m_callbackData = userpointer;
3155   cb.m_callbackFunction =
3156     safe_cast(&Dbtup::nr_delete_log_buffer_callback);
3157   Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
3158   int res= lgman.get_log_buffer(signal, sz, &cb);
3159   switch(res){
3160   case 0:
3161     return;
3162   case -1:
3163     ndbrequire("NOT YET IMPLEMENTED" == 0);
3164     break;
3165   }
3166 
3167   if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
3168   disk_page_free(signal, tablePtr.p, fragPtr.p,
3169 		 &op.m_disk_ref, pagePtr, op.m_gci);
3170 
3171   c_lqh->nr_delete_complete(signal, &op);
3172   return;
3173 }
3174 
3175 void
nr_delete_log_buffer_callback(Signal * signal,Uint32 userpointer,Uint32 unused)3176 Dbtup::nr_delete_log_buffer_callback(Signal* signal,
3177 				    Uint32 userpointer,
3178 				    Uint32 unused)
3179 {
3180   Dblqh::Nr_op_info op;
3181   op.m_ptr_i = userpointer;
3182   c_lqh->get_nr_op_info(&op, RNIL);
3183 
3184   Ptr<Fragrecord> fragPtr;
3185   fragPtr.i= op.m_tup_frag_ptr_i;
3186   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3187 
3188   Ptr<Tablerec> tablePtr;
3189   tablePtr.i = fragPtr.p->fragTableId;
3190   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
3191 
3192   Ptr<GlobalPage> gpage;
3193   m_global_page_pool.getPtr(gpage, op.m_page_id);
3194   PagePtr pagePtr= *(PagePtr*)&gpage;
3195 
3196   /**
3197    * reset page no
3198    */
3199   if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
3200 
3201   disk_page_free(signal, tablePtr.p, fragPtr.p,
3202 		 &op.m_disk_ref, pagePtr, op.m_gci);
3203 
3204   c_lqh->nr_delete_complete(signal, &op);
3205 }
3206