1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #define DBTUP_C
27 #include <dblqh/Dblqh.hpp>
28 #include "Dbtup.hpp"
29 #include <RefConvert.hpp>
30 #include <ndb_limits.h>
31 #include <pc.hpp>
32 #include <AttributeDescriptor.hpp>
33 #include "AttributeOffset.hpp"
34 #include <AttributeHeader.hpp>
35 #include <Interpreter.hpp>
36 #include <signaldata/TupKey.hpp>
37 #include <signaldata/AttrInfo.hpp>
38 #include <signaldata/TuxMaint.hpp>
39 #include <signaldata/ScanFrag.hpp>
40 #include <NdbSqlUtil.hpp>
41 #include <Checksum.hpp>
42 #include <portlib/ndb_prefetch.h>
43 
44 #define JAM_FILE_ID 422
45 
46 
47 // #define TRACE_INTERPRETER
48 
49 /* For debugging */
50 static void
dump_hex(const Uint32 * p,Uint32 len)51 dump_hex(const Uint32 *p, Uint32 len)
52 {
53   if(len > 2560)
54     len= 160;
55   if(len==0)
56     return;
57   for(;;)
58   {
59     if(len>=4)
60       ndbout_c("%8p %08X %08X %08X %08X", p, p[0], p[1], p[2], p[3]);
61     else if(len>=3)
62       ndbout_c("%8p %08X %08X %08X", p, p[0], p[1], p[2]);
63     else if(len>=2)
64       ndbout_c("%8p %08X %08X", p, p[0], p[1]);
65     else
66       ndbout_c("%8p %08X", p, p[0]);
67     if(len <= 4)
68       break;
69     len-= 4;
70     p+= 4;
71   }
72 }
73 
74 /**
75  * getStoredProcAttrInfo
76  *
77  * Get the I-Val of the supplied stored procedure's
78  * AttrInfo section
79  * Initialise the AttrInfo length in the request
80  */
getStoredProcAttrInfo(Uint32 storedId,KeyReqStruct * req_struct,Uint32 & attrInfoIVal)81 int Dbtup::getStoredProcAttrInfo(Uint32 storedId,
82                                  KeyReqStruct* req_struct,
83                                  Uint32& attrInfoIVal)
84 {
85   jam();
86   StoredProcPtr storedPtr;
87   c_storedProcPool.getPtr(storedPtr, storedId);
88   if (storedPtr.i != RNIL) {
89     if ((storedPtr.p->storedCode == ZSCAN_PROCEDURE) ||
90         (storedPtr.p->storedCode == ZCOPY_PROCEDURE)) {
91       /* Setup OperationRec with stored procedure AttrInfo section */
92       SegmentedSectionPtr sectionPtr;
93       getSection(sectionPtr, storedPtr.p->storedProcIVal);
94       Uint32 storedProcLen= sectionPtr.sz;
95 
96       ndbassert( attrInfoIVal == RNIL );
97       attrInfoIVal= storedPtr.p->storedProcIVal;
98       req_struct->attrinfo_len= storedProcLen;
99       return ZOK;
100     }
101   }
102   terrorCode= ZSTORED_PROC_ID_ERROR;
103   return terrorCode;
104 }
105 
copyAttrinfo(Operationrec * regOperPtr,Uint32 * inBuffer,Uint32 expectedLen,Uint32 attrInfoIVal)106 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
107                          Uint32* inBuffer,
108                          Uint32 expectedLen,
109                          Uint32 attrInfoIVal)
110 {
111   ndbassert( expectedLen > 0 || attrInfoIVal == RNIL );
112 
113   if (expectedLen > 0)
114   {
115     ndbassert( attrInfoIVal != RNIL );
116 
117     /* Check length in section is as we expect */
118     SegmentedSectionPtr sectionPtr;
119     getSection(sectionPtr, attrInfoIVal);
120 
121     ndbrequire(sectionPtr.sz == expectedLen);
122     ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
123 
124     /* Copy attrInfo data into linear buffer */
125     // TODO : Consider operating TUP out of first segment where
126     // appropriate
127     copy(inBuffer, attrInfoIVal);
128   }
129 
130   regOperPtr->m_any_value= 0;
131 
132   return;
133 }
134 
135 void
setInvalidChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr)136 Dbtup::setInvalidChecksum(Tuple_header *tuple_ptr,
137                           const Tablerec * regTabPtr)
138 {
139   if (regTabPtr->m_bits & Tablerec::TR_Checksum)
140   {
141     jam();
142     /**
143      * Set a magic checksum when tuple isn't supposed to be read.
144      */
145     tuple_ptr->m_checksum = 0x87654321;
146   }
147 }
148 
149 void
updateChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr,Uint32 old_header,Uint32 new_header)150 Dbtup::updateChecksum(Tuple_header *tuple_ptr,
151                       const Tablerec *regTabPtr,
152                       Uint32 old_header,
153                       Uint32 new_header)
154 {
155   /**
156    * This function is used when only updating the header bits in row.
157    * We start by XOR:ing the old header, this negates the impact of the
158    * old header since old_header ^ old_header = 0. Next we XOR with new
159    * header to get the new checksum and finally we store the new checksum.
160    */
161   if (regTabPtr->m_bits & Tablerec::TR_Checksum)
162   {
163     Uint32 checksum = tuple_ptr->m_checksum;
164     jam();
165     checksum ^= old_header;
166     checksum ^= new_header;
167     tuple_ptr->m_checksum = checksum;
168   }
169 }
170 
171 void
setChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr)172 Dbtup::setChecksum(Tuple_header* tuple_ptr,
173                    const Tablerec* regTabPtr)
174 {
175   if (regTabPtr->m_bits & Tablerec::TR_Checksum)
176   {
177     jam();
178     tuple_ptr->m_checksum= 0;
179     tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
180   }
181 }
182 
183 Uint32
calculateChecksum(Tuple_header * tuple_ptr,const Tablerec * regTabPtr)184 Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
185                          const Tablerec* regTabPtr)
186 {
187   Uint32 checksum;
188   Uint32 rec_size, *tuple_header;
189   rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
190   tuple_header= &tuple_ptr->m_header_bits;
191   // includes tupVersion
192   //printf("%p - ", tuple_ptr);
193 
194   /**
195    * We include every except the first word of the Tuple header
196    * which is only used on copy tuples. We do however include
197    * the header bits.
198    */
199   checksum = computeXorChecksum(
200                tuple_header, (rec_size-Tuple_header::HeaderSize) + 1);
201 
202   //printf("-> %.8x\n", checksum);
203 
204 #if 0
205   if (var_sized) {
206     /*
207     if (! req_struct->fix_var_together) {
208       jam();
209       checksum ^= tuple_header[rec_size];
210     }
211     */
212     jam();
213     var_data_part= req_struct->var_data_start;
214     vsize_words= calculate_total_var_size(req_struct->var_len_array,
215                                           regTabPtr->no_var_attr);
216     ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
217     checksum = computeXorChecksum(var_data_part,vsize_words,checksum);
218   }
219 #endif
220   return checksum;
221 }
222 
223 int
corruptedTupleDetected(KeyReqStruct * req_struct,Tablerec * regTabPtr)224 Dbtup::corruptedTupleDetected(KeyReqStruct *req_struct, Tablerec *regTabPtr)
225 {
226   Uint32 checksum = calculateChecksum(req_struct->m_tuple_ptr, regTabPtr);
227   Uint32 header_bits = req_struct->m_tuple_ptr->m_header_bits;
228   ndbout_c("Tuple corruption detected, checksum: 0x%x, header_bits: 0x%x"
229            ", checksum word: 0x%x",
230            checksum, header_bits, req_struct->m_tuple_ptr->m_checksum);
231   if (c_crashOnCorruptedTuple && !ERROR_INSERTED(4036))
232   {
233     ndbout_c(" Exiting.");
234     ndbrequire(false);
235   }
236   (void)ERROR_INSERTED_CLEAR(4036);
237   terrorCode= ZTUPLE_CORRUPTED_ERROR;
238   tupkeyErrorLab(req_struct);
239   return -1;
240 }
241 
242 /* ----------------------------------------------------------------- */
243 /* -----------       INSERT_ACTIVE_OP_LIST            -------------- */
244 /* ----------------------------------------------------------------- */
245 bool
insertActiveOpList(OperationrecPtr regOperPtr,KeyReqStruct * req_struct)246 Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
247 			  KeyReqStruct* req_struct)
248 {
249   OperationrecPtr prevOpPtr;
250   ndbrequire(!regOperPtr.p->op_struct.bit_field.in_active_list);
251   regOperPtr.p->op_struct.bit_field.in_active_list= true;
252   req_struct->prevOpPtr.i=
253     prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
254   regOperPtr.p->prevActiveOp= prevOpPtr.i;
255   regOperPtr.p->nextActiveOp= RNIL;
256   regOperPtr.p->m_undo_buffer_space= 0;
257   req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
258   if (prevOpPtr.i == RNIL) {
259     return true;
260   } else {
261     jam();
262     req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
263     prevOpPtr.p->nextActiveOp= regOperPtr.i;
264 
265     regOperPtr.p->op_struct.bit_field.m_wait_log_buffer=
266       prevOpPtr.p->op_struct.bit_field.m_wait_log_buffer;
267     regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit=
268       prevOpPtr.p->op_struct.bit_field.m_load_diskpage_on_commit;
269     regOperPtr.p->op_struct.bit_field.m_gci_written=
270       prevOpPtr.p->op_struct.bit_field.m_gci_written;
271     regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
272     // start with prev mask (matters only for UPD o UPD)
273 
274     regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
275 
276     prevOpPtr.p->op_struct.bit_field.m_wait_log_buffer= 0;
277     prevOpPtr.p->op_struct.bit_field.m_load_diskpage_on_commit= 0;
278 
279     if(prevOpPtr.p->tuple_state == TUPLE_PREPARED)
280     {
281       Uint32 op= regOperPtr.p->op_type;
282       Uint32 prevOp= prevOpPtr.p->op_type;
283       if (prevOp == ZDELETE)
284       {
285 	if(op == ZINSERT)
286 	{
287 	  // mark both
288 	  prevOpPtr.p->op_struct.bit_field.delete_insert_flag= true;
289 	  regOperPtr.p->op_struct.bit_field.delete_insert_flag= true;
290 	  return true;
291 	}
292         else if (op == ZREFRESH)
293         {
294           /* ZREFRESH after Delete - ok */
295           return true;
296         }
297         else
298         {
299 	  terrorCode= ZTUPLE_DELETED_ERROR;
300 	  return false;
301 	}
302       }
303       else if(op == ZINSERT && prevOp != ZDELETE)
304       {
305 	terrorCode= ZINSERT_ERROR;
306 	return false;
307       }
308       else if (prevOp == ZREFRESH)
309       {
310         /* No operation after a ZREFRESH */
311         terrorCode= ZOP_AFTER_REFRESH_ERROR;
312         return false;
313       }
314       return true;
315     }
316     else
317     {
318       terrorCode= ZMUST_BE_ABORTED_ERROR;
319       return false;
320     }
321   }
322 }
323 
324 bool
setup_read(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,bool disk)325 Dbtup::setup_read(KeyReqStruct *req_struct,
326 		  Operationrec* regOperPtr,
327 		  Fragrecord* regFragPtr,
328 		  Tablerec* regTabPtr,
329 		  bool disk)
330 {
331   OperationrecPtr currOpPtr;
332   currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
333   Uint32 bits = req_struct->m_tuple_ptr->m_header_bits;
334 
335   if (unlikely(req_struct->m_reorg != ScanFragReq::REORG_ALL))
336   {
337     Uint32 moved = bits & Tuple_header::REORG_MOVE;
338     if (! ((req_struct->m_reorg == ScanFragReq::REORG_NOT_MOVED && moved == 0) ||
339            (req_struct->m_reorg == ScanFragReq::REORG_MOVED && moved != 0)))
340     {
341       terrorCode= ZTUPLE_DELETED_ERROR;
342       return false;
343     }
344   }
345   if (currOpPtr.i == RNIL)
346   {
347     if (regTabPtr->need_expand(disk))
348       prepare_read(req_struct, regTabPtr, disk);
349     return true;
350   }
351 
352   do {
353     Uint32 savepointId= regOperPtr->savepointId;
354     bool dirty= req_struct->dirty_op;
355 
356     c_operation_pool.getPtr(currOpPtr);
357     bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
358 					 req_struct->trans_id1,
359 					 req_struct->trans_id2);
360     /**
361      * Read committed in same trans reads latest copy
362      */
363     if(dirty && !sameTrans)
364     {
365       savepointId= 0;
366     }
367     else if(sameTrans)
368     {
369       // Use savepoint even in read committed mode
370       dirty= false;
371     }
372 
373     /* found == true indicates that savepoint is some state
374      * within tuple's current transaction's uncommitted operations
375      */
376     bool found= find_savepoint(currOpPtr, savepointId);
377 
378     Uint32 currOp= currOpPtr.p->op_type;
379 
380     /* is_insert==true if tuple did not exist before its current
381      * transaction
382      */
383     bool is_insert = (bits & Tuple_header::ALLOC);
384 
385     /* If savepoint is in transaction, and post-delete-op
386      *   OR
387      * Tuple didn't exist before
388      *      AND
389      *   Read is dirty
390      *           OR
391      *   Savepoint is before-transaction
392      *
393      * Tuple does not exist in read's view
394      */
395     if((found && currOp == ZDELETE) ||
396        ((dirty || !found) && is_insert))
397     {
398       /* Tuple not visible to this read operation */
399       terrorCode= ZTUPLE_DELETED_ERROR;
400       break;
401     }
402 
403     if(dirty || !found)
404     {
405       /* Read existing committed tuple */
406     }
407     else
408     {
409       req_struct->m_tuple_ptr=
410         get_copy_tuple(&currOpPtr.p->m_copy_tuple_location);
411     }
412 
413     if (regTabPtr->need_expand(disk))
414       prepare_read(req_struct, regTabPtr, disk);
415 
416 #if 0
417     ndbout_c("reading copy");
418     Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
419     req_struct->m_tuple_ptr= fixed_ptr;
420     req_struct->fix_var_together= true;
421     req_struct->var_len_array= (Uint16*)var_ptr;
422     req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
423     Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
424 					req_struct->var_pos_array,
425 					regTabPtr->no_var_attr);
426     req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
427 #endif
428     return true;
429   } while(0);
430 
431   return false;
432 }
433 
434 int
load_diskpage(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)435 Dbtup::load_diskpage(Signal* signal,
436 		     Uint32 opRec, Uint32 fragPtrI,
437 		     Uint32 lkey1, Uint32 lkey2, Uint32 flags)
438 {
439   Ptr<Operationrec> operPtr;
440 
441   c_operation_pool.getPtr(operPtr, opRec);
442 
443   Operationrec *  regOperPtr= operPtr.p;
444   Fragrecord * regFragPtr= prepare_fragptr.p;
445   Tablerec* regTabPtr = prepare_tabptr.p;
446 
447   if (Local_key::ref(lkey1, lkey2) == ~(Uint32)0)
448   {
449     jam();
450     regOperPtr->op_struct.bit_field.m_wait_log_buffer= 1;
451     regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 1;
452     if (unlikely((flags & 7) == ZREFRESH))
453     {
454       jam();
455       /* Refresh of previously nonexistant DD tuple.
456        * No diskpage to load at commit time
457        */
458       regOperPtr->op_struct.bit_field.m_wait_log_buffer= 0;
459       regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 0;
460     }
461 
462     /* In either case return 1 for 'proceed' */
463     return 1;
464   }
465 
466   jam();
467   Uint32 page_idx= lkey2;
468   Uint32 frag_page_id= lkey1;
469   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
470 						     frag_page_id);
471   regOperPtr->m_tuple_location.m_page_idx= page_idx;
472 
473   PagePtr page_ptr;
474   Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
475   Tuple_header* ptr= (Tuple_header*)tmp;
476 
477   int res= 1;
478   if(ptr->m_header_bits & Tuple_header::DISK_PART)
479   {
480     jam();
481     Page_cache_client::Request req;
482     memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
483     req.m_callback.m_callbackData= opRec;
484     req.m_callback.m_callbackFunction=
485       safe_cast(&Dbtup::disk_page_load_callback);
486 
487 #ifdef ERROR_INSERT
488     if (ERROR_INSERTED(4022))
489     {
490       flags |= Page_cache_client::DELAY_REQ;
491       const NDB_TICKS now = NdbTick_getCurrentTicks();
492       req.m_delay_until_time = NdbTick_AddMilliseconds(now,(Uint64)3000);
493     }
494     if (ERROR_INSERTED(4035) && (rand() % 13) == 0)
495     {
496       // Disk access have to randomly wait max 16ms for a diskpage
497       Uint64 delay = (Uint64)(rand() % 16) + 1;
498       flags |= Page_cache_client::DELAY_REQ;
499       const NDB_TICKS now = NdbTick_getCurrentTicks();
500       req.m_delay_until_time = NdbTick_AddMilliseconds(now,delay);
501     }
502 #endif
503 
504     Page_cache_client pgman(this, c_pgman);
505     res= pgman.get_page(signal, req, flags);
506   }
507 
508   switch(flags & 7)
509   {
510   case ZREAD:
511   case ZREAD_EX:
512     break;
513   case ZDELETE:
514   case ZUPDATE:
515   case ZINSERT:
516   case ZWRITE:
517   case ZREFRESH:
518     jam();
519     regOperPtr->op_struct.bit_field.m_wait_log_buffer= 1;
520     regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 1;
521   }
522   return res;
523 }
524 
525 void
disk_page_load_callback(Signal * signal,Uint32 opRec,Uint32 page_id)526 Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
527 {
528   Ptr<Operationrec> operPtr;
529   c_operation_pool.getPtr(operPtr, opRec);
530   c_lqh->acckeyconf_load_diskpage_callback(signal,
531 					   operPtr.p->userpointer, page_id);
532 }
533 
534 int
load_diskpage_scan(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)535 Dbtup::load_diskpage_scan(Signal* signal,
536 			  Uint32 opRec, Uint32 fragPtrI,
537 			  Uint32 lkey1, Uint32 lkey2, Uint32 flags)
538 {
539   Ptr<Operationrec> operPtr;
540 
541   c_operation_pool.getPtr(operPtr, opRec);
542 
543   Operationrec *  regOperPtr= operPtr.p;
544   Fragrecord * regFragPtr= prepare_fragptr.p;
545   Tablerec* regTabPtr = prepare_tabptr.p;
546 
547   jam();
548   Uint32 page_idx= lkey2;
549   Uint32 frag_page_id= lkey1;
550   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
551 						     frag_page_id);
552   regOperPtr->m_tuple_location.m_page_idx= page_idx;
553   regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 0;
554 
555   PagePtr page_ptr;
556   Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
557   Tuple_header* ptr= (Tuple_header*)tmp;
558 
559   int res= 1;
560   if(ptr->m_header_bits & Tuple_header::DISK_PART)
561   {
562     jam();
563     Page_cache_client::Request req;
564     memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
565     req.m_callback.m_callbackData= opRec;
566     req.m_callback.m_callbackFunction=
567       safe_cast(&Dbtup::disk_page_load_scan_callback);
568 
569     Page_cache_client pgman(this, c_pgman);
570     res= pgman.get_page(signal, req, flags);
571   }
572   return res;
573 }
574 
575 void
disk_page_load_scan_callback(Signal * signal,Uint32 opRec,Uint32 page_id)576 Dbtup::disk_page_load_scan_callback(Signal* signal,
577 				    Uint32 opRec, Uint32 page_id)
578 {
579   Ptr<Operationrec> operPtr;
580   c_operation_pool.getPtr(operPtr, opRec);
581   c_lqh->next_scanconf_load_diskpage_callback(signal,
582 					      operPtr.p->userpointer, page_id);
583 }
584 
585 /**
586   This method is used to prepare for faster execution of TUPKEYREQ.
587   It prepares the pointers to the fragment record, the table record,
588   the page for the record and the tuple pointer to the record. In
589   addition it also prefetches the cache lines of the fixed size part
590   of the tuple.
591 
592   The calculations performed here have to be done when we arrive in
593   execTUPKEYREQ, we perform them here to enable prefetching the
594   cache lines of the fixed part of the tuple storage. In order to not
595   do the same work twice we store the calculated information in
596   block variables. Given that we can arrive in execTUPKEYREQ from
597   multiple directions, we have added debug-code that verifies that we
598   have passed through prepareTUPKEYREQ always before we reach
599   execTUPKEYREQ.
600 
601   The access of the fixed size part of the tuple is an almost certain
602   CPU cache miss and so performing this as early as possible will
603   decrease the time for cache misses later in the process. Tests using
604   Sysbench indicates that this prefetch gains about 5% in performance.
605 */
prepareTUPKEYREQ(Uint32 page_id,Uint32 page_idx,Uint32 frag_id)606 void Dbtup::prepareTUPKEYREQ(Uint32 page_id,
607                              Uint32 page_idx,
608                              Uint32 frag_id)
609 {
610   FragrecordPtr fragptr;
611   TablerecPtr tabptr;
612   PagePtr pagePtr;
613   Local_key key;
614 
615   fragptr.i = frag_id;
616   const Uint32 RnoOfFragrec= cnoOfFragrec;
617   const Uint32 RnoOfTablerec= cnoOfTablerec;
618   Fragrecord * Rfragrecord = fragrecord;
619   Tablerec * Rtablerec = tablerec;
620 
621   jamEntry();
622   ndbrequire(fragptr.i < RnoOfFragrec);
623   ptrAss(fragptr, Rfragrecord);
624   tabptr.i = fragptr.p->fragTableId;
625 #ifdef VM_TRACE
626   prepare_orig_local_key.m_page_no = page_id;
627   prepare_orig_local_key.m_page_idx = page_idx;
628 #endif
629   bool is_page_key = (!(Local_key::isInvalid(page_id, page_idx) ||
630                         isCopyTuple(page_id, page_idx)));
631   ptrCheckGuard(tabptr, RnoOfTablerec, Rtablerec);
632 
633   if (is_page_key)
634   {
635     register Uint32 fixed_part_size_in_words =
636       tabptr.p->m_offsets[MM].m_fix_header_size;
637     page_id = getRealpid(fragptr.p, page_id);
638     key.m_page_no = page_id;
639     key.m_page_idx = page_idx;
640     register Uint32 *tuple_ptr = get_ptr(&pagePtr,
641                                          &key,
642                                          tabptr.p);
643     jam();
644     prepare_pageptr = pagePtr;
645     prepare_tuple_ptr = tuple_ptr;
646     prepare_page_no = page_id;
647     for (Uint32 i = 0; i < fixed_part_size_in_words; i+= 16)
648     {
649       NDB_PREFETCH_WRITE(tuple_ptr + i);
650     }
651   }
652   prepare_tabptr = tabptr;
653   prepare_fragptr = fragptr;
654 }
655 
execTUPKEYREQ(Signal * signal)656 bool Dbtup::execTUPKEYREQ(Signal* signal)
657 {
658    TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
659    Ptr<Operationrec> operPtr;
660    KeyReqStruct req_struct(this);
661 
662    Uint32 RoperPtr= tupKeyReq->connectPtr;
663 
664    jamEntry();
665 
666    c_operation_pool.getPtr(operPtr, RoperPtr);
667 
668 #ifdef VM_TRACE
669    {
670      bool error_found = false;
671      Local_key key;
672      key.m_page_no = tupKeyReq->keyRef1;
673      key.m_page_idx = tupKeyReq->keyRef2;
674      if (key.m_page_no != prepare_orig_local_key.m_page_no)
675      {
676        ndbout << "page_no = " << prepare_orig_local_key.m_page_no;
677        ndbout << " keyRef1 = " << key.m_page_no << endl;
678        error_found = true;
679      }
680      if (key.m_page_idx != prepare_orig_local_key.m_page_idx)
681      {
682        ndbout << "page_idx = " << prepare_orig_local_key.m_page_idx;
683        ndbout << " keyRef2 = " << key.m_page_idx << endl;
684        error_found = true;
685      }
686      if (prepare_fragptr.i != tupKeyReq->fragPtr)
687      {
688        ndbout << "fragptr.i = " << prepare_fragptr.i;
689        ndbout << " keyRef1 = " << tupKeyReq->fragPtr << endl;
690        error_found = true;
691      }
692      if (error_found)
693      {
694        ndbout << flush;
695      }
696      ndbassert(prepare_orig_local_key.m_page_no == key.m_page_no);
697      ndbassert(prepare_orig_local_key.m_page_idx == key.m_page_idx);
698      ndbassert(prepare_fragptr.i == tupKeyReq->fragPtr);
699    }
700 #endif
701 
702    const Uint32 fragPtrI = prepare_fragptr.i;
703    /**
704     * DESIGN PATTERN DESCRIPTION
705     * --------------------------
706     * The variable operPtr.p is located on the block object, it is located
707     * there to ensure that we can easily access it in many methods such
708     * that we don't have to transport it through method calls. There are
709     * a number of references to structs that we store in this manner.
710     * Oftentimes they refer to the operation object, the table object,
711     * the fragment object and sometimes also a transaction object.
712     *
713     * Given that we both need access to the .i-value and the .p-value
714     * of all of those objects we store them on the block object to
715     * avoid the need of transporting them from function to function.
716     * This is an optimisation and obviously requires that one keeps
717     * track of which variables are alive and which are not.
718     * The function clear_global_variables used in debug mode ensures
719     * that all pointer variables are cleared before an asynchronous
720     * signal is executed.
721     *
722     * When we need to access data through the .p-value many times
723     * (more than one time), then it often pays off to declare a
724     * stack variable such as below regOperPtr. This helps the compiler
725     * to avoid having to constantly reload the .p-value from the
726     * block object after each store operation through a pointer.
727     *
728     * One has to take care though when doing this to ensure that
729     * one doesn't create a stack variable that creates too much
730     * pressure on the register allocation in the method. This is
731     * particularly important in large methods.
732     *
733     * The pattern is to define the variable as:
734     * Operationrec * const regOperPtr = operPtr.p;
735     * This helps the compiler to understand that we won't change the
736     * pointer here.
737     */
738    Operationrec * const regOperPtr= operPtr.p;
739 
740    Dbtup::TransState trans_state = get_trans_state(regOperPtr);
741 
742    req_struct.signal= signal;
743    req_struct.num_fired_triggers= 0;
744    req_struct.no_exec_instructions = 0;
745    req_struct.read_length= 0;
746    req_struct.last_row= false;
747    req_struct.changeMask.clear();
748    req_struct.m_is_lcp = false;
749    req_struct.operPtrP = regOperPtr;
750    regOperPtr->fragmentPtr= fragPtrI;
751 
752    if (unlikely(trans_state != TRANS_IDLE))
753    {
754      TUPKEY_abort(&req_struct, 39);
755      return false;
756    }
757 
758  /* ----------------------------------------------------------------- */
759  // Operation is ZREAD when we arrive here so no need to worry about the
760  // abort process.
761  /* ----------------------------------------------------------------- */
762  /* -----------    INITIATE THE OPERATION RECORD       -------------- */
763  /* ----------------------------------------------------------------- */
764    {
765      register Operationrec::OpStruct op_struct;
766      op_struct.op_bit_fields = regOperPtr->op_struct.op_bit_fields;
767      const Uint32 TrequestInfo= tupKeyReq->request;
768      const Uint32 disable_fk_checks = tupKeyReq->disable_fk_checks;
769      const Uint32 primaryReplica = tupKeyReq->primaryReplica;
770 
771      regOperPtr->m_copy_tuple_location.setNull();
772      op_struct.bit_field.delete_insert_flag = false;
773      op_struct.bit_field.tupVersion= ZNIL;
774      op_struct.bit_field.m_physical_only_op = 0;
775      op_struct.bit_field.m_gci_written = 0;
776      op_struct.bit_field.m_disable_fk_checks = disable_fk_checks;
777      op_struct.bit_field.primary_replica= primaryReplica;
778      op_struct.bit_field.m_reorg = TupKeyReq::getReorgFlag(TrequestInfo);
779      req_struct.m_prio_a_flag = TupKeyReq::getPrioAFlag(TrequestInfo);
780      req_struct.m_reorg = TupKeyReq::getReorgFlag(TrequestInfo);
781      regOperPtr->op_struct.op_bit_fields = op_struct.op_bit_fields;
782      regOperPtr->op_type= TupKeyReq::getOperation(TrequestInfo);
783      req_struct.m_disable_fk_checks = disable_fk_checks;
784      req_struct.m_use_rowid = TupKeyReq::getRowidFlag(TrequestInfo);
785      req_struct.interpreted_exec= TupKeyReq::getInterpretedFlag(TrequestInfo);
786      req_struct.dirty_op= TupKeyReq::getDirtyFlag(TrequestInfo);
787    }
788 
789    {
790      /**
791       * DESIGN PATTERN DESCRIPTION
792       * --------------------------
793       * This code segment is using a common design pattern in the
794       * signal reception and signal sending code of performance
795       * critical functions such as execTUPKEYREQ.
796       * The idea is that at signal reception we need to transfer
797       * data from the signal object to state variables relating to
798       * the operation we are about to execute.
799       * The normal manner to do this would be to write:
800       * regOperPtr->savePointId = tupKeyReq->savePointId;
801       *
802       * This normal manner would however not work so well due to
803       * that the compiler has to issue assembler code that does
804       * a load operation immediately followed by a store operation.
805       * Many modern CPUs can hide parts of this deficiency in the
806       * code, but only to a certain extent.
807       *
808       * What we want to do here is instead to perform a series of
809       * six loads followed by six stores. The delay after a load
810       * is ready for a store operation is oftentimes 3 cycles. Many
811       * CPUs can handle two loads per cycle. So by using 6 loads
812       * we ensure that we execute at full speed as long as the data
813       * is available in the first level CPU cache.
814       *
815       * The reason we don't want to use more than 6 loads before
816       * we start storing is that CPUs have a limited amount of
817       * CPU registers. The x86 have 16 CPU registers available.
818       * Here is a short description of commonly used registers:
819       * RIP: Instruction pointer, not available
820       * RSP: Top of Stack pointer, not available for L/S
821       * RBP: Current Stack frame pointer, not available for L/S
822       * RDI: Usually this-pointer, reference to Dbtup object here
823       *
824       * In this particular example we also need to have a register
825       * for storing:
826       * tupKeyReq, req_struct, regOperPtr.
827       *
828       * The compiler also needs a few more registers to track some
829       * of the other live variables such that not all of the live
830       * variables have to be spilled to the stack.
831       *
832       * Thus the design pattern uses between 4 to 6 variables loaded
833       * before storing them. Another commonly used manner is to locate
834       * all initialisations to constants in one or more of those
835       * initialisation code blocks as well.
836       *
837       * The naming pattern is to define the temporary variable as
838       * const Uint32 name_of_variable_to_assign = x->name;
839       * y->name_of_variable_to_assign = name_of_variable_to_assign.
840       *
841       * In the case where the receiver of the data is a signal object
842       * we use the pattern:
843       * const Uint32 sig0 = x->name;
844       * signal->theData[0] = sig0;
845       *
846       * Finally if possible we should place this initialisation in a
847       * separate code block by surrounding it with brackets, this is
848       * to assist the compiler to understand that the variables used
849       * are not needed after storing its value. Most compilers will
850       * handle this well anyways, but it helps the compiler avoid
851       * doing mistakes and it also clarifies for the reader of the
852       * source code. As can be seen in code below this rule is
853       * however not followed if it will remove other possibilities.
854       */
855      const Uint32 savePointId= tupKeyReq->savePointId;
856      const Uint32 attrBufLen = tupKeyReq->attrBufLen;
857      const Uint32 opRef = tupKeyReq->opRef;
858      const Uint32 tcOpIndex = tupKeyReq->tcOpIndex;
859      const Uint32 coordinatorTC= tupKeyReq->coordinatorTC;
860      const Uint32 applRef = tupKeyReq->applRef;
861 
862      regOperPtr->savepointId= savePointId;
863      req_struct.log_size= attrBufLen;
864      req_struct.attrinfo_len= attrBufLen;
865      req_struct.tc_operation_ptr= opRef;
866      req_struct.TC_index= tcOpIndex;
867      req_struct.TC_ref= coordinatorTC;
868      req_struct.rec_blockref= applRef;
869    }
870 
871    const Uint32 disk_page= tupKeyReq->disk_page;
872    const Uint32 row_id_page_no = tupKeyReq->m_row_id_page_no;
873    const Uint32 row_id_page_idx = tupKeyReq->m_row_id_page_idx;
874    const Uint32 deferred_constraints = tupKeyReq->deferred_constraints;
875    const Uint32 keyRef1= tupKeyReq->keyRef1;
876    const Uint32 keyRef2 = tupKeyReq->keyRef2;
877 
878    req_struct.m_disk_page_ptr.i= disk_page;
879    req_struct.m_row_id.m_page_no = row_id_page_no;
880    req_struct.m_row_id.m_page_idx = row_id_page_idx;
881    req_struct.m_deferred_constraints = deferred_constraints;
882    Uint32 pageid = req_struct.frag_page_id= keyRef1;
883    Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= keyRef2;
884 
885    const Uint32 transId1 = tupKeyReq->transId1;
886    const Uint32 transId2 = tupKeyReq->transId2;
887    Tablerec * const regTabPtr = prepare_tabptr.p;
888    Fragrecord * const regFragPtr = prepare_fragptr.p;
889 
890    /* Get AttrInfo section if this is a long TUPKEYREQ */
891    Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
892    const Uint32 Rstoredid= tupKeyReq->storedProcedure;
893 
894    req_struct.trans_id1= transId1;
895    req_struct.trans_id2= transId2;
896    req_struct.tablePtrP = regTabPtr;
897    req_struct.fragPtrP = regFragPtr;
898 
899    /* If we have AttrInfo, check we expected it, and
900     * that we don't have AttrInfo by another means
901     */
902    ndbassert( (attrInfoIVal == RNIL) ||
903               (tupKeyReq->attrBufLen > 0));
904 
905    const Uint32 Roptype = regOperPtr->op_type;
906 
907    if (Rstoredid != ZNIL) {
908      /* This is part of a scan, get attrInfoIVal for
909       * given stored procedure
910       */
911      ndbrequire(getStoredProcAttrInfo(Rstoredid,
912                                       &req_struct,
913                                       attrInfoIVal) == ZOK);
914    }
915 
916    /* Copy AttrInfo from section into linear in-buffer */
917    copyAttrinfo(regOperPtr,
918                 &cinBuffer[0],
919                 req_struct.attrinfo_len,
920                 attrInfoIVal);
921 
922 
923    const Uint32 loc_prepare_page_id = prepare_page_no;
924    if (Roptype == ZINSERT && Local_key::isInvalid(pageid, pageidx))
925    {
926      // No tuple allocated yet
927      goto do_insert;
928    }
929 
930    if (Roptype == ZREFRESH && Local_key::isInvalid(pageid, pageidx))
931    {
932      // No tuple allocated yet
933      goto do_refresh;
934    }
935 
936    if (unlikely(isCopyTuple(pageid, pageidx)))
937    {
938      /**
939       * Only LCP reads a copy-tuple "directly"
940       */
941      ndbassert(Roptype == ZREAD);
942      ndbassert(disk_page == RNIL);
943      setup_lcp_read_copy_tuple(&req_struct, regOperPtr, regFragPtr, regTabPtr);
944      goto do_read;
945    }
946 
947    /**
948     * Get pointer to tuple
949     */
950    regOperPtr->m_tuple_location.m_page_no = loc_prepare_page_id;
951    setup_fixed_tuple_ref_opt(&req_struct);
952    setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
953 
954    /**
955     * Check operation
956     */
957    if (Roptype == ZREAD) {
958      jam();
959 
960      if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
961 		    disk_page != RNIL))
962      {
963    do_read:
964        if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
965        {
966 	 req_struct.log_size= 0;
967 	 /* ---------------------------------------------------------------- */
968 	 // Read Operations need not to be taken out of any lists.
969 	 // We also do not need to wait for commit since there is no changes
970 	 // to commit. Thus we
971 	 // prepare the operation record already now for the next operation.
972 	 // Write operations set the state to STARTED indicating that they
973 	 // are waiting for the Commit or Abort decision.
974 	 /* ---------------------------------------------------------------- */
975          returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_IDLE);
976          return true;
977        }
978        return false;
979      }
980      tupkeyErrorLab(&req_struct);
981      return false;
982    }
983 
984    if(insertActiveOpList(operPtr, &req_struct))
985    {
986      if(Roptype == ZINSERT)
987      {
988        jam();
989    do_insert:
990        Local_key accminupdate;
991        Local_key * accminupdateptr = &accminupdate;
992        if (unlikely(handleInsertReq(signal, operPtr,
993                                     prepare_fragptr, regTabPtr, &req_struct,
994                                     &accminupdateptr) == -1))
995        {
996          return false;
997        }
998 
999        terrorCode = 0;
1000        checkImmediateTriggersAfterInsert(&req_struct,
1001                                          regOperPtr,
1002                                          regTabPtr,
1003                                          disk_page != RNIL);
1004 
1005        if (unlikely(terrorCode != 0))
1006        {
1007          tupkeyErrorLab(&req_struct);
1008          return false;
1009        }
1010 
1011        if (!regTabPtr->tuxCustomTriggers.isEmpty())
1012        {
1013          jam();
1014          if (unlikely(executeTuxInsertTriggers(signal,
1015                                                regOperPtr,
1016                                                regFragPtr,
1017                                                regTabPtr) != 0))
1018          {
1019            jam();
1020            /*
1021             * TUP insert succeeded but add of TUX entries failed.  All
1022             * TUX changes have been rolled back at this point.
1023             *
1024             * We will abort via tupkeyErrorLab() as usual.  This routine
1025             * however resets the operation to ZREAD.  The TUP_ABORTREQ
1026             * arriving later cannot then undo the insert.
1027             *
1028             * Therefore we call TUP_ABORTREQ already now.  Diskdata etc
1029             * should be in memory and timeslicing cannot occur.  We must
1030             * skip TUX abort triggers since TUX is already aborted.  We
1031             * will dealloc the fixed and var parts if necessary.
1032             */
1033            signal->theData[0] = operPtr.i;
1034            do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS | ZABORT_DEALLOC);
1035            tupkeyErrorLab(&req_struct);
1036            return false;
1037          }
1038        }
1039 
1040        if (accminupdateptr)
1041        {
1042          /**
1043           * Update ACC local-key, once *everything* has completed succesfully
1044           */
1045          c_lqh->accminupdate(signal,
1046                              regOperPtr->userpointer,
1047                              accminupdateptr);
1048        }
1049 
1050        returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1051        return true;
1052      }
1053 
1054      if (Roptype == ZUPDATE) {
1055        jam();
1056        if (unlikely(handleUpdateReq(signal, regOperPtr,
1057                                     regFragPtr, regTabPtr,
1058                                     &req_struct, disk_page != RNIL) == -1))
1059        {
1060          return false;
1061        }
1062 
1063        terrorCode = 0;
1064        checkImmediateTriggersAfterUpdate(&req_struct,
1065                                          regOperPtr,
1066                                          regTabPtr,
1067                                          disk_page != RNIL);
1068 
1069        if (unlikely(terrorCode != 0))
1070        {
1071          tupkeyErrorLab(&req_struct);
1072          return false;
1073        }
1074 
1075        if (!regTabPtr->tuxCustomTriggers.isEmpty())
1076        {
1077          jam();
1078          if (unlikely(executeTuxUpdateTriggers(signal,
1079                                                regOperPtr,
1080                                                regFragPtr,
1081                                                regTabPtr) != 0))
1082          {
1083            jam();
1084            /*
1085             * See insert case.
1086             */
1087            signal->theData[0] = operPtr.i;
1088            do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
1089            tupkeyErrorLab(&req_struct);
1090            return false;
1091          }
1092        }
1093 
1094        returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1095        return true;
1096      }
1097      else if(Roptype == ZDELETE)
1098      {
1099        jam();
1100        req_struct.log_size= 0;
1101        if (unlikely(handleDeleteReq(signal, regOperPtr,
1102                                     regFragPtr, regTabPtr,
1103                                     &req_struct,
1104                                     disk_page != RNIL) == -1))
1105        {
1106          return false;
1107        }
1108 
1109        terrorCode = 0;
1110        checkImmediateTriggersAfterDelete(&req_struct,
1111                                          regOperPtr,
1112                                          regTabPtr,
1113                                          disk_page != RNIL);
1114 
1115        if (unlikely(terrorCode != 0))
1116        {
1117          tupkeyErrorLab(&req_struct);
1118          return false;
1119        }
1120 
1121        /*
1122         * TUX doesn't need to check for triggers at delete since entries in
1123         * the index are kept until commit time.
1124         */
1125 
1126        returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1127        return true;
1128      }
1129      else if (Roptype == ZREFRESH)
1130      {
1131        /**
1132         * No TUX or immediate triggers, just detached triggers
1133         */
1134    do_refresh:
1135        if (unlikely(handleRefreshReq(signal, operPtr,
1136                                      prepare_fragptr, regTabPtr,
1137                                      &req_struct, disk_page != RNIL) == -1))
1138        {
1139          return false;
1140        }
1141 
1142        returnTUPKEYCONF(signal, &req_struct, regOperPtr, TRANS_STARTED);
1143        return true;
1144 
1145      }
1146      else
1147      {
1148        ndbrequire(false); // Invalid op type
1149      }
1150    }
1151 
1152    tupkeyErrorLab(&req_struct);
1153    return false;
1154 }
1155 
1156 void
setup_fixed_part(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1157 Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
1158 			Operationrec* regOperPtr,
1159 			Tablerec* regTabPtr)
1160 {
1161   ndbassert(regOperPtr->op_type == ZINSERT ||
1162             (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
1163 
1164   req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1165   req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1166 
1167   Uint32 num_attr= regTabPtr->m_no_of_attributes;
1168   Uint32 descr_start= regTabPtr->tabDescriptor;
1169   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1170   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1171   req_struct->attr_descr= tab_descr;
1172 }
1173 
1174 void
setup_lcp_read_copy_tuple(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)1175 Dbtup::setup_lcp_read_copy_tuple(KeyReqStruct* req_struct,
1176                                  Operationrec* regOperPtr,
1177                                  Fragrecord* regFragPtr,
1178                                  Tablerec* regTabPtr)
1179 {
1180   Local_key tmp;
1181   tmp.m_page_no = req_struct->frag_page_id;
1182   tmp.m_page_idx = regOperPtr->m_tuple_location.m_page_idx;
1183   clearCopyTuple(tmp.m_page_no, tmp.m_page_idx);
1184 
1185   Uint32 * copytuple = get_copy_tuple_raw(&tmp);
1186   Local_key rowid;
1187   memcpy(&rowid, copytuple+0, sizeof(Local_key));
1188 
1189   req_struct->frag_page_id = rowid.m_page_no;
1190   regOperPtr->m_tuple_location.m_page_idx = rowid.m_page_idx;
1191 
1192   Tuple_header * th = get_copy_tuple(copytuple);
1193   req_struct->m_page_ptr.setNull();
1194   req_struct->m_tuple_ptr = (Tuple_header*)th;
1195   th->m_operation_ptr_i = RNIL;
1196   ndbassert((th->m_header_bits & Tuple_header::COPY_TUPLE) != 0);
1197 
1198   Uint32 num_attr= regTabPtr->m_no_of_attributes;
1199   Uint32 descr_start= regTabPtr->tabDescriptor;
1200   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1201   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1202   req_struct->attr_descr= tab_descr;
1203 
1204   bool disk = false;
1205   if (regTabPtr->need_expand(disk))
1206   {
1207     jam();
1208     prepare_read(req_struct, regTabPtr, disk);
1209   }
1210 }
1211 
1212  /* ---------------------------------------------------------------- */
1213  /* ------------------------ CONFIRM REQUEST ----------------------- */
1214  /* ---------------------------------------------------------------- */
1215  inline
returnTUPKEYCONF(Signal * signal,KeyReqStruct * req_struct,Operationrec * regOperPtr,TransState trans_state)1216  void Dbtup::returnTUPKEYCONF(Signal* signal,
1217 			    KeyReqStruct *req_struct,
1218 			    Operationrec * regOperPtr,
1219                             TransState trans_state)
1220 {
1221   /**
1222    * When we arrive here we have been executing read code and/or write
1223    * code to read/write the tuple. During this execution path we have
1224    * not accessed the regOperPtr object for a long time and we have
1225    * accessed lots of other data in the meantime. This prefetch was
1226    * shown useful by using the perf tool. So not an obvious prefetch.
1227    */
1228   NDB_PREFETCH_WRITE(regOperPtr);
1229   TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
1230 
1231   Uint32 Rcreate_rowid = req_struct->m_use_rowid;
1232   Uint32 RuserPointer= regOperPtr->userpointer;
1233   Uint32 RnumFiredTriggers= req_struct->num_fired_triggers;
1234   const Uint32 RnoExecInstructions = req_struct->no_exec_instructions;
1235   Uint32 log_size= req_struct->log_size;
1236   Uint32 read_length= req_struct->read_length;
1237   Uint32 last_row= req_struct->last_row;
1238 
1239   tupKeyConf->userPtr= RuserPointer;
1240   tupKeyConf->readLength= read_length;
1241   tupKeyConf->writeLength= log_size;
1242   tupKeyConf->numFiredTriggers= RnumFiredTriggers;
1243   tupKeyConf->lastRow= last_row;
1244   tupKeyConf->rowid = Rcreate_rowid;
1245   tupKeyConf->noExecInstructions = RnoExecInstructions;
1246   set_tuple_state(regOperPtr, TUPLE_PREPARED);
1247   set_trans_state(regOperPtr, trans_state);
1248 }
1249 
1250 
1251 #define MAX_READ (MIN(sizeof(signal->theData), MAX_SEND_MESSAGE_BYTESIZE))
1252 
1253 /* ---------------------------------------------------------------- */
1254 /* ----------------------------- READ  ---------------------------- */
1255 /* ---------------------------------------------------------------- */
handleReadReq(Signal * signal,Operationrec * regOperPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)1256 int Dbtup::handleReadReq(Signal* signal,
1257                          Operationrec* regOperPtr,
1258                          Tablerec* regTabPtr,
1259                          KeyReqStruct* req_struct)
1260 {
1261   Uint32 *dst;
1262   Uint32 dstLen, start_index;
1263   const BlockReference sendBref= req_struct->rec_blockref;
1264   if (((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1265        (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) ||
1266       ERROR_INSERTED(4036)) {
1267     jam();
1268     return corruptedTupleDetected(req_struct, regTabPtr);
1269   }
1270 
1271   const Uint32 node = refToNode(sendBref);
1272   if(node != 0 && node != getOwnNodeId()) {
1273     start_index= 25;
1274   } else {
1275     jam();
1276     /**
1277      * execute direct
1278      */
1279     start_index= 3;
1280   }
1281   dst= &signal->theData[start_index];
1282   dstLen= (MAX_READ / 4) - start_index;
1283   if (!req_struct->interpreted_exec) {
1284     jam();
1285     int ret = readAttributes(req_struct,
1286 			     &cinBuffer[0],
1287 			     req_struct->attrinfo_len,
1288 			     dst,
1289 			     dstLen,
1290 			     false);
1291     if (likely(ret >= 0)) {
1292 /* ------------------------------------------------------------------------- */
1293 // We have read all data into coutBuffer. Now send it to the API.
1294 /* ------------------------------------------------------------------------- */
1295       jam();
1296       Uint32 TnoOfDataRead= (Uint32) ret;
1297       req_struct->read_length += TnoOfDataRead;
1298       sendReadAttrinfo(signal, req_struct, TnoOfDataRead);
1299       return 0;
1300     }
1301     else
1302     {
1303       terrorCode = Uint32(-ret);
1304     }
1305   } else {
1306     return interpreterStartLab(signal, req_struct);
1307   }
1308 
1309   jam();
1310   tupkeyErrorLab(req_struct);
1311   return -1;
1312 }
1313 
1314 static
1315 void
handle_reorg(Dbtup::KeyReqStruct * req_struct,Dbtup::Fragrecord::FragState state)1316 handle_reorg(Dbtup::KeyReqStruct * req_struct,
1317              Dbtup::Fragrecord::FragState state)
1318 {
1319   Uint32 reorg = req_struct->m_reorg;
1320   switch(state){
1321   case Dbtup::Fragrecord::FS_FREE:
1322   case Dbtup::Fragrecord::FS_REORG_NEW:
1323   case Dbtup::Fragrecord::FS_REORG_COMMIT_NEW:
1324   case Dbtup::Fragrecord::FS_REORG_COMPLETE_NEW:
1325     return;
1326   case Dbtup::Fragrecord::FS_REORG_COMMIT:
1327   case Dbtup::Fragrecord::FS_REORG_COMPLETE:
1328     if (reorg != ScanFragReq::REORG_NOT_MOVED)
1329       return;
1330     break;
1331   case Dbtup::Fragrecord::FS_ONLINE:
1332     if (reorg != ScanFragReq::REORG_MOVED)
1333       return;
1334     break;
1335   default:
1336     return;
1337   }
1338   req_struct->m_tuple_ptr->m_header_bits |= Dbtup::Tuple_header::REORG_MOVE;
1339 }
1340 
1341 /* ---------------------------------------------------------------- */
1342 /* ---------------------------- UPDATE ---------------------------- */
1343 /* ---------------------------------------------------------------- */
handleUpdateReq(Signal * signal,Operationrec * operPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)1344 int Dbtup::handleUpdateReq(Signal* signal,
1345                            Operationrec* operPtrP,
1346                            Fragrecord* regFragPtr,
1347                            Tablerec* regTabPtr,
1348                            KeyReqStruct* req_struct,
1349 			   bool disk)
1350 {
1351   Tuple_header *dst;
1352   Tuple_header *base= req_struct->m_tuple_ptr, *org;
1353   ChangeMask * change_mask_ptr;
1354   if ((dst= alloc_copy_tuple(regTabPtr, &operPtrP->m_copy_tuple_location))== 0)
1355   {
1356     terrorCode= ZMEM_NOMEM_ERROR;
1357     goto error;
1358   }
1359 
1360   Uint32 tup_version;
1361   change_mask_ptr = get_change_mask_ptr(regTabPtr, dst);
1362   if(operPtrP->is_first_operation())
1363   {
1364     org= req_struct->m_tuple_ptr;
1365     tup_version= org->get_tuple_version();
1366     clear_change_mask_info(regTabPtr, change_mask_ptr);
1367   }
1368   else
1369   {
1370     jam();
1371     Operationrec* prevOp= req_struct->prevOpPtr.p;
1372     tup_version= prevOp->op_struct.bit_field.tupVersion;
1373     Uint32 * rawptr = get_copy_tuple_raw(&prevOp->m_copy_tuple_location);
1374     org= get_copy_tuple(rawptr);
1375     copy_change_mask_info(regTabPtr,
1376                           change_mask_ptr,
1377                           get_change_mask_ptr(rawptr));
1378   }
1379 
1380   /**
1381    * Check consistency before update/delete
1382    */
1383   req_struct->m_tuple_ptr= org;
1384   if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1385       (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
1386   {
1387     jam();
1388     return corruptedTupleDetected(req_struct, regTabPtr);
1389   }
1390 
1391   req_struct->m_tuple_ptr= dst;
1392 
1393   union {
1394     Uint32 sizes[4];
1395     Uint64 cmp[2];
1396   };
1397 
1398   disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
1399   if (regTabPtr->need_expand(disk))
1400   {
1401     expand_tuple(req_struct, sizes, org, regTabPtr, disk);
1402     if(disk && operPtrP->m_undo_buffer_space == 0)
1403     {
1404       jam();
1405       operPtrP->op_struct.bit_field.m_wait_log_buffer = 1;
1406       operPtrP->op_struct.bit_field.m_load_diskpage_on_commit = 1;
1407       Uint32 sz= operPtrP->m_undo_buffer_space=
1408 	(sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
1409 
1410       D("Logfile_client - handleUpdateReq");
1411       Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1412       terrorCode= lgman.alloc_log_space(sz, jamBuffer());
1413       if(unlikely(terrorCode))
1414       {
1415         jam();
1416 	operPtrP->m_undo_buffer_space= 0;
1417 	goto error;
1418       }
1419     }
1420   }
1421   else
1422   {
1423     memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1424     req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1425   }
1426 
1427   tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
1428   operPtrP->op_struct.bit_field.tupVersion= tup_version;
1429 
1430   req_struct->optimize_options = 0;
1431 
1432   if (!req_struct->interpreted_exec) {
1433     jam();
1434 
1435     if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1436     {
1437       jam();
1438       Uint32 attrId =
1439         regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1440 
1441       store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
1442     }
1443     int retValue = updateAttributes(req_struct,
1444 				    &cinBuffer[0],
1445 				    req_struct->attrinfo_len);
1446     if (unlikely(retValue < 0))
1447     {
1448       terrorCode = Uint32(-retValue);
1449       goto error;
1450     }
1451   } else {
1452     if (unlikely(interpreterStartLab(signal, req_struct) == -1))
1453       return -1;
1454   }
1455 
1456   update_change_mask_info(regTabPtr,
1457                           change_mask_ptr,
1458                           req_struct->changeMask.rep.data);
1459 
1460   switch (req_struct->optimize_options) {
1461     case AttributeHeader::OPTIMIZE_MOVE_VARPART:
1462       /**
1463        * optimize varpart of tuple,  move varpart of tuple from
1464        * big-free-size page list into small-free-size page list
1465        */
1466       if(base->m_header_bits & Tuple_header::VAR_PART)
1467         optimize_var_part(req_struct, base, operPtrP,
1468                           regFragPtr, regTabPtr);
1469       break;
1470     case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
1471       //TODO: move fix part of tuple
1472       break;
1473     default:
1474       break;
1475   }
1476 
1477   if (regTabPtr->need_shrink())
1478   {
1479     shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
1480     if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
1481 							    base,
1482 							    operPtrP,
1483 							    regFragPtr,
1484 							    regTabPtr,
1485 							    sizes)) {
1486       goto error;
1487     }
1488   }
1489 
1490   if (req_struct->m_reorg != ScanFragReq::REORG_ALL)
1491   {
1492     handle_reorg(req_struct, regFragPtr->fragStatus);
1493   }
1494 
1495   req_struct->m_tuple_ptr->set_tuple_version(tup_version);
1496 
1497   setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1498 
1499   set_tuple_state(operPtrP, TUPLE_PREPARED);
1500 
1501   return 0;
1502 
1503 error:
1504   tupkeyErrorLab(req_struct);
1505   return -1;
1506 }
1507 
1508 /*
1509   expand_dyn_part - copy dynamic attributes to fully expanded size.
1510 
1511   Both variable-sized and fixed-size attributes are stored in the same way
1512   in the expanded form as variable-sized attributes (in expand_var_part()).
1513 
1514   This method is used for both mem and disk dynamic data.
1515 
1516     dst         Destination for expanded data
1517     tabPtrP     Table descriptor
1518     src         Pointer to the start of dynamic bitmap in source row
1519     row_len     Total number of 32-bit words in dynamic part of row
1520     tabDesc     Array of table descriptors
1521     order       Array of indexes into tabDesc, dynfix followed by dynvar
1522 */
1523 static
1524 Uint32*
expand_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,Uint32 row_len,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 max_bmlen)1525 expand_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1526 		const Uint32* src,
1527 		Uint32 row_len,
1528 		const Uint32 * tabDesc,
1529 		const Uint16* order,
1530 		Uint32 dynvar,
1531 		Uint32 dynfix,
1532 		Uint32 max_bmlen)
1533 {
1534   /* Copy the bitmap, zeroing out any words not stored in the row. */
1535   Uint32 *dst_bm_ptr= (Uint32*)dst->m_dyn_data_ptr;
1536   Uint32 bm_len = row_len ? (* src & Dbtup::DYN_BM_LEN_MASK) : 0;
1537 
1538   assert(bm_len <= max_bmlen);
1539 
1540   if(bm_len > 0)
1541     memcpy(dst_bm_ptr, src, 4*bm_len);
1542   if(bm_len < max_bmlen)
1543     bzero(dst_bm_ptr + bm_len, 4 * (max_bmlen - bm_len));
1544 
1545   /**
1546    * Store max_bmlen for homogen code in DbtupRoutines
1547    */
1548   Uint32 tmp = (* dst_bm_ptr);
1549   * dst_bm_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | max_bmlen;
1550 
1551   char *src_off_start= (char*)(src + bm_len);
1552   assert((UintPtr(src_off_start)&3) == 0);
1553   Uint16 *src_off_ptr= (Uint16*)src_off_start;
1554 
1555   /*
1556     Prepare the variable-sized dynamic attributes, copying out data from the
1557     source row for any that are not NULL.
1558   */
1559   Uint32 no_attr= dst->m_dyn_len_offset;
1560   Uint16* dst_off_ptr= dst->m_dyn_offset_arr_ptr;
1561   Uint16* dst_len_ptr= dst_off_ptr + no_attr;
1562   Uint16 this_src_off= row_len ? * src_off_ptr++ : 0;
1563   /* We need to reserve room for the offsets written by shrink_tuple+padding.*/
1564   Uint16 dst_off= 4 * (max_bmlen + ((dynvar+2)>>1));
1565   char *dst_ptr= (char*)dst_bm_ptr + dst_off;
1566   for(Uint32 i= 0; i<dynvar; i++)
1567   {
1568     Uint16 j= order[dynfix+i];
1569     Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
1570     Uint32 len;
1571     Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1572     if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1573     {
1574       Uint16 next_src_off= *src_off_ptr++;
1575       len= next_src_off - this_src_off;
1576       memcpy(dst_ptr, src_off_start+this_src_off, len);
1577       this_src_off= next_src_off;
1578     }
1579     else
1580     {
1581       len= 0;
1582     }
1583     dst_off_ptr[i]= dst_off;
1584     dst_len_ptr[i]= dst_off+len;
1585     dst_off+= max_len;
1586     dst_ptr+= max_len;
1587   }
1588   /*
1589     The fixed-size data is stored 32-bit aligned after the variable-sized
1590     data.
1591   */
1592   char *src_ptr= src_off_start+this_src_off;
1593   src_ptr= (char *)(ALIGN_WORD(src_ptr));
1594 
1595   /*
1596     Prepare the fixed-size dynamic attributes, copying out data from the
1597     source row for any that are not NULL.
1598     Note that the fixed-size data is stored in reverse from the end of the
1599     dynamic part of the row. This is true both for the stored/shrunken and
1600     for the expanded form.
1601   */
1602   for(Uint32 i= dynfix; i>0; )
1603   {
1604     i--;
1605     Uint16 j= order[i];
1606     Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1607     dst_off_ptr[dynvar+i]= dst_off;
1608     /* len offset array is not used for fixed size. */
1609     Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1610     if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1611     {
1612       assert((UintPtr(dst_ptr)&3) == 0);
1613       memcpy(dst_ptr, src_ptr, fix_size);
1614       src_ptr+= fix_size;
1615     }
1616     dst_off+= fix_size;
1617     dst_ptr+= fix_size;
1618   }
1619 
1620   return (Uint32 *)dst_ptr;
1621 }
1622 
1623 static
1624 Uint32*
shrink_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,Uint32 * dst_ptr,const Dbtup::Tablerec * tabPtrP,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 ind)1625 shrink_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1626                 Uint32 *dst_ptr,
1627                 const Dbtup::Tablerec* tabPtrP,
1628                 const Uint32 * tabDesc,
1629                 const Uint16* order,
1630                 Uint32 dynvar,
1631                 Uint32 dynfix,
1632                 Uint32 ind)
1633 {
1634   /**
1635    * Now build the dynamic part, if any.
1636    * First look for any trailing all-NULL words of the bitmap; we do
1637    * not need to store those.
1638    */
1639   assert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
1640   char *dyn_src_ptr= dst->m_dyn_data_ptr;
1641   Uint32 bm_len = tabPtrP->m_offsets[ind].m_dyn_null_words; // In words
1642 
1643   /* If no dynamic variables, store nothing. */
1644   assert(bm_len);
1645   {
1646     /**
1647      * clear bm-len bits, so they won't incorrect indicate
1648      *   a non-zero map
1649      */
1650     * ((Uint32 *)dyn_src_ptr) &= ~Uint32(Dbtup::DYN_BM_LEN_MASK);
1651 
1652     Uint32 *bm_ptr= (Uint32 *)dyn_src_ptr + bm_len - 1;
1653     while(*bm_ptr == 0)
1654     {
1655       bm_ptr--;
1656       bm_len--;
1657       if(bm_len == 0)
1658         break;
1659     }
1660   }
1661 
1662   if (bm_len)
1663   {
1664     /**
1665      * Copy the bitmap, counting the number of variable sized
1666      * attributes that are not NULL on the way.
1667      */
1668     Uint32 *dyn_dst_ptr= dst_ptr;
1669     Uint32 dyn_var_count= 0;
1670     const Uint32 *src_bm_ptr= (Uint32 *)(dyn_src_ptr);
1671     Uint32 *dst_bm_ptr= (Uint32 *)dyn_dst_ptr;
1672 
1673     /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
1674      * split to separate function. */
1675     Uint16 dyn_dst_data_offset= 0;
1676     const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask[ind];
1677     for(Uint16 i= 0; i< bm_len; i++)
1678     {
1679       Uint32 v= src_bm_ptr[i];
1680       dyn_var_count+= BitmaskImpl::count_bits(v & *dyn_bm_var_mask_ptr++);
1681       dst_bm_ptr[i]= v;
1682     }
1683 
1684     Uint32 tmp = *dyn_dst_ptr;
1685     assert(bm_len <= Dbtup::DYN_BM_LEN_MASK);
1686     * dyn_dst_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | bm_len;
1687     dyn_dst_ptr+= bm_len;
1688     dyn_dst_data_offset= 2*dyn_var_count + 2;
1689 
1690     Uint16 *dyn_src_off_array= dst->m_dyn_offset_arr_ptr;
1691     Uint16 *dyn_src_lenoff_array=
1692       dyn_src_off_array + dst->m_dyn_len_offset;
1693     Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
1694 
1695     /**
1696      * Copy over the variable sized not-NULL attributes.
1697      * Data offsets are counted from the start of the offset array, and
1698      * we store one additional offset to be able to easily compute the
1699      * data length as the difference between offsets.
1700      */
1701     Uint16 off_idx= 0;
1702     for(Uint32 i= 0; i<dynvar; i++)
1703     {
1704       /**
1705        * Note that we must use the destination (shrunken) bitmap here,
1706        * as the source (expanded) bitmap may have been already clobbered
1707        * (by offset data).
1708        */
1709       Uint32 attrDesc2 = tabDesc[order[dynfix+i]+1];
1710       Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1711       if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1712       {
1713         dyn_dst_off_array[off_idx++]= dyn_dst_data_offset;
1714         Uint32 dyn_src_off= dyn_src_off_array[i];
1715         Uint32 dyn_len= dyn_src_lenoff_array[i] - dyn_src_off;
1716         memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
1717                 dyn_src_ptr + dyn_src_off,
1718                 dyn_len);
1719         dyn_dst_data_offset+= dyn_len;
1720       }
1721     }
1722     /* If all dynamic attributes are NULL, we store nothing. */
1723     dyn_dst_off_array[off_idx]= dyn_dst_data_offset;
1724     assert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
1725 
1726     char *dynvar_end_ptr= ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
1727     char *dyn_dst_data_ptr= (char *)(ALIGN_WORD(dynvar_end_ptr));
1728 
1729     /**
1730      * Zero out any padding bytes. Might not be strictly necessary,
1731      * but seems cleaner than leaving random stuff in there.
1732      */
1733     bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
1734 
1735     /* *
1736      * Copy over the fixed-sized not-NULL attributes.
1737      * Note that attributes are copied in reverse order; this is to avoid
1738      * overwriting not-yet-copied data, as the data is also stored in
1739      * reverse order.
1740      */
1741     for(Uint32 i= dynfix; i > 0; )
1742     {
1743       i--;
1744       Uint16 j= order[i];
1745       Uint32 attrDesc2 = tabDesc[j+1];
1746       Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1747       if(bm_len > (pos >>5 ) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1748       {
1749         Uint32 fixsize=
1750           4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1751         memmove(dyn_dst_data_ptr,
1752                 dyn_src_ptr + dyn_src_off_array[dynvar+i],
1753                 fixsize);
1754         dyn_dst_data_ptr += fixsize;
1755       }
1756     }
1757     dst_ptr = (Uint32*)dyn_dst_data_ptr;
1758     assert((UintPtr(dst_ptr) & 3) == 0);
1759   }
1760   return (Uint32 *)dst_ptr;
1761 }
1762 
1763 /* ---------------------------------------------------------------- */
1764 /* ----------------------------- INSERT --------------------------- */
1765 /* ---------------------------------------------------------------- */
1766 void
prepare_initial_insert(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1767 Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
1768 			      Operationrec* regOperPtr,
1769 			      Tablerec* regTabPtr)
1770 {
1771   Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ?
1772     sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
1773   regOperPtr->nextActiveOp= RNIL;
1774   regOperPtr->prevActiveOp= RNIL;
1775   regOperPtr->op_struct.bit_field.in_active_list= true;
1776   regOperPtr->m_undo_buffer_space= disk_undo;
1777 
1778   req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1779   req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1780 
1781   Uint32 num_attr= regTabPtr->m_no_of_attributes;
1782   Uint32 descr_start= regTabPtr->tabDescriptor;
1783   Uint32 order_desc= regTabPtr->m_real_order_descriptor;
1784   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1785   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1786   req_struct->attr_descr= tab_descr;
1787   Uint16* order= (Uint16*)&tableDescriptor[order_desc];
1788   order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
1789 
1790   Uint32 bits = Tuple_header::COPY_TUPLE;
1791   bits |= disk_undo ? (Tuple_header::DISK_ALLOC|Tuple_header::DISK_INLINE) : 0;
1792 
1793   const Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
1794   const Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
1795   const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
1796   const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
1797   const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
1798   Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
1799   Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
1800 
1801   if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
1802   {
1803     ref->m_page_no = RNIL;
1804     ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
1805   }
1806 
1807   if(mm_vars || mm_dyns)
1808   {
1809     jam();
1810     /* Init Varpart_copy struct */
1811     Varpart_copy * cp = (Varpart_copy*)ptr;
1812     cp->m_len = 0;
1813     ptr += Varpart_copy::SZ32;
1814 
1815     /* Prepare empty varsize part. */
1816     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
1817 
1818     if (mm_vars)
1819     {
1820       dst->m_data_ptr= (char*)(((Uint16*)ptr)+mm_vars+1);
1821       dst->m_offset_array_ptr= req_struct->var_pos_array;
1822       dst->m_var_len_offset= mm_vars;
1823       dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
1824 
1825       Uint32 pos= 0;
1826       Uint16 *pos_ptr = req_struct->var_pos_array;
1827       Uint16 *len_ptr = pos_ptr + mm_vars;
1828       for(Uint32 i= 0; i<mm_vars; i++)
1829       {
1830         * pos_ptr++ = pos;
1831         * len_ptr++ = pos;
1832         pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
1833       }
1834 
1835       // Disk/dynamic part is 32-bit aligned
1836       ptr = ALIGN_WORD(dst->m_data_ptr+pos);
1837       ndbassert(ptr == ALIGN_WORD(dst->m_data_ptr +
1838                                   regTabPtr->m_offsets[MM].m_max_var_offset));
1839     }
1840 
1841     if (mm_dyns)
1842     {
1843       jam();
1844       /* Prepare empty dynamic part. */
1845       dst->m_dyn_data_ptr= (char *)ptr;
1846       dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
1847       dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
1848       dst->m_max_dyn_offset= regTabPtr->m_offsets[MM].m_max_dyn_offset;
1849 
1850       ptr = expand_dyn_part(dst, 0, 0,
1851                             (Uint32*)tab_descr, order,
1852                             mm_dynvar, mm_dynfix,
1853                             regTabPtr->m_offsets[MM].m_dyn_null_words);
1854     }
1855 
1856     ndbassert((UintPtr(ptr)&3) == 0);
1857   }
1858 
1859   req_struct->m_disk_ptr= (Tuple_header*)ptr;
1860 
1861   ndbrequire(dd_vars == 0);
1862 
1863   req_struct->m_tuple_ptr->m_header_bits= bits;
1864 
1865   // Set all null bits
1866   memset(req_struct->m_tuple_ptr->m_null_bits+
1867 	 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1868 	 4*regTabPtr->m_offsets[MM].m_null_words);
1869   memset(req_struct->m_disk_ptr->m_null_bits+
1870 	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1871 	 4*regTabPtr->m_offsets[DD].m_null_words);
1872 }
1873 
handleInsertReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> fragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,Local_key ** accminupdateptr)1874 int Dbtup::handleInsertReq(Signal* signal,
1875                            Ptr<Operationrec> regOperPtr,
1876                            Ptr<Fragrecord> fragPtr,
1877                            Tablerec* regTabPtr,
1878                            KeyReqStruct *req_struct,
1879                            Local_key ** accminupdateptr)
1880 {
1881   Uint32 tup_version = 1;
1882   Fragrecord* regFragPtr = fragPtr.p;
1883   Uint32 *ptr= 0;
1884   Tuple_header *dst;
1885   Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
1886   Tuple_header *tuple_ptr;
1887 
1888   bool disk = regTabPtr->m_no_of_disk_attributes > 0;
1889   bool mem_insert = regOperPtr.p->is_first_operation();
1890   bool disk_insert = mem_insert && disk;
1891   bool vardynsize = (regTabPtr->m_attributes[MM].m_no_of_varsize ||
1892                      regTabPtr->m_attributes[MM].m_no_of_dynamic);
1893   bool varalloc = vardynsize || regTabPtr->m_bits & Tablerec::TR_ForceVarPart;
1894   bool rowid = req_struct->m_use_rowid;
1895   bool update_acc = false;
1896   Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1897   Uint32 frag_page_id = req_struct->frag_page_id;
1898 
1899   union {
1900     Uint32 sizes[4];
1901     Uint64 cmp[2];
1902   };
1903   cmp[0] = cmp[1] = 0;
1904 
1905   if (ERROR_INSERTED(4014))
1906   {
1907     dst = 0;
1908     goto undo_buffer_error;
1909   }
1910 
1911   dst= alloc_copy_tuple(regTabPtr, &regOperPtr.p->m_copy_tuple_location);
1912 
1913   if (unlikely(dst == 0))
1914   {
1915     goto undo_buffer_error;
1916   }
1917   tuple_ptr= req_struct->m_tuple_ptr= dst;
1918   set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
1919 
1920   if(mem_insert)
1921   {
1922     jam();
1923     prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
1924   }
1925   else
1926   {
1927     Operationrec* prevOp= req_struct->prevOpPtr.p;
1928     ndbassert(prevOp->op_type == ZDELETE);
1929     tup_version= prevOp->op_struct.bit_field.tupVersion + 1;
1930 
1931     if(!prevOp->is_first_operation())
1932     {
1933       jam();
1934       org= get_copy_tuple(&prevOp->m_copy_tuple_location);
1935     }
1936     if (regTabPtr->need_expand())
1937     {
1938       expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
1939       memset(req_struct->m_disk_ptr->m_null_bits+
1940              regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1941              4*regTabPtr->m_offsets[DD].m_null_words);
1942 
1943       Uint32 bm_size_in_bytes= 4*(regTabPtr->m_offsets[MM].m_dyn_null_words);
1944       if (bm_size_in_bytes)
1945       {
1946         Uint32* ptr =
1947           (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
1948         bzero(ptr, bm_size_in_bytes);
1949         * ptr = bm_size_in_bytes >> 2;
1950       }
1951     }
1952     else
1953     {
1954       memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1955       tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1956     }
1957     memset(tuple_ptr->m_null_bits+
1958            regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1959            4*regTabPtr->m_offsets[MM].m_null_words);
1960   }
1961 
1962   int res;
1963   if (disk_insert)
1964   {
1965     jam();
1966     if (ERROR_INSERTED(4015))
1967     {
1968       terrorCode = 1501;
1969       goto log_space_error;
1970     }
1971 
1972     D("Logfile_client - handleInsertReq");
1973     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1974     res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space, jamBuffer());
1975     if(unlikely(res))
1976     {
1977       jam();
1978       terrorCode= res;
1979       goto log_space_error;
1980     }
1981   }
1982 
1983   regOperPtr.p->op_struct.bit_field.tupVersion=
1984     tup_version & ZTUP_VERSION_MASK;
1985   tuple_ptr->set_tuple_version(tup_version);
1986 
1987   if (ERROR_INSERTED(4016))
1988   {
1989     terrorCode = ZAI_INCONSISTENCY_ERROR;
1990     goto update_error;
1991   }
1992 
1993   if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1994   {
1995     Uint32 attrId =
1996       regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1997 
1998     store_extra_row_bits(attrId, regTabPtr, tuple_ptr, /* default */ 0, false);
1999   }
2000 
2001   if (!regTabPtr->m_default_value_location.isNull())
2002   {
2003     jam();
2004     Uint32 default_values_len;
2005     /* Get default values ptr + len for this table */
2006     Uint32* default_values = get_default_ptr(regTabPtr, default_values_len);
2007     ndbrequire(default_values_len != 0 && default_values != NULL);
2008     /*
2009      * Update default values into row first,
2010      * next update with data received from the client.
2011      */
2012     if(unlikely((res = updateAttributes(req_struct, default_values,
2013                                         default_values_len)) < 0))
2014     {
2015       jam();
2016       terrorCode = Uint32(-res);
2017       goto update_error;
2018     }
2019   }
2020 
2021   if(unlikely((res = updateAttributes(req_struct, &cinBuffer[0],
2022                                       req_struct->attrinfo_len)) < 0))
2023   {
2024     terrorCode = Uint32(-res);
2025     goto update_error;
2026   }
2027 
2028   if (ERROR_INSERTED(4017))
2029   {
2030     goto null_check_error;
2031   }
2032   if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
2033   {
2034     goto null_check_error;
2035   }
2036 
2037   if (req_struct->m_is_lcp)
2038   {
2039     jam();
2040     sizes[2+MM] = req_struct->m_lcp_varpart_len;
2041   }
2042   else if (regTabPtr->need_shrink())
2043   {
2044     shrink_tuple(req_struct, sizes+2, regTabPtr, true);
2045   }
2046 
2047   if (ERROR_INSERTED(4025))
2048   {
2049     goto mem_error;
2050   }
2051 
2052   if (ERROR_INSERTED(4026))
2053   {
2054     CLEAR_ERROR_INSERT_VALUE;
2055     goto mem_error;
2056   }
2057 
2058   if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
2059   {
2060     goto mem_error;
2061   }
2062 
2063   if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
2064   {
2065     CLEAR_ERROR_INSERT_VALUE;
2066     goto mem_error;
2067   }
2068 
2069   /**
2070    * Alloc memory
2071    */
2072   if(mem_insert)
2073   {
2074     terrorCode = 0;
2075     if (!rowid)
2076     {
2077       if (ERROR_INSERTED(4018))
2078       {
2079 	goto mem_error;
2080       }
2081 
2082       if (!varalloc)
2083       {
2084 	jam();
2085 	ptr= alloc_fix_rec(jamBuffer(),
2086                            &terrorCode,
2087                            regFragPtr,
2088 			   regTabPtr,
2089 			   &regOperPtr.p->m_tuple_location,
2090 			   &frag_page_id);
2091       }
2092       else
2093       {
2094 	jam();
2095 	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
2096 	ptr= alloc_var_rec(&terrorCode,
2097                            regFragPtr, regTabPtr,
2098 			   sizes[2+MM],
2099 			   &regOperPtr.p->m_tuple_location,
2100 			   &frag_page_id);
2101       }
2102       if (unlikely(ptr == 0))
2103       {
2104 	goto mem_error;
2105       }
2106       req_struct->m_use_rowid = true;
2107     }
2108     else
2109     {
2110       regOperPtr.p->m_tuple_location = req_struct->m_row_id;
2111       if (ERROR_INSERTED(4019))
2112       {
2113 	terrorCode = ZROWID_ALLOCATED;
2114 	goto alloc_rowid_error;
2115       }
2116 
2117       if (!varalloc)
2118       {
2119 	jam();
2120 	ptr= alloc_fix_rowid(&terrorCode,
2121                              regFragPtr,
2122 			     regTabPtr,
2123 			     &regOperPtr.p->m_tuple_location,
2124 			     &frag_page_id);
2125       }
2126       else
2127       {
2128 	jam();
2129 	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
2130 	ptr= alloc_var_rowid(&terrorCode,
2131                              regFragPtr, regTabPtr,
2132 			     sizes[2+MM],
2133 			     &regOperPtr.p->m_tuple_location,
2134 			     &frag_page_id);
2135       }
2136       if (unlikely(ptr == 0))
2137       {
2138 	jam();
2139 	goto alloc_rowid_error;
2140       }
2141     }
2142     real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
2143     update_acc = true; /* Will be updated later once success is known */
2144 
2145     base = (Tuple_header*)ptr;
2146     base->m_operation_ptr_i= regOperPtr.i;
2147     base->m_header_bits= Tuple_header::ALLOC |
2148       (sizes[2+MM] > 0 ? Tuple_header::VAR_PART : 0);
2149     /**
2150      * No need to set checksum here, the tuple is allocated, but contains
2151      * no data, so if we attempt to read it in this state we even want the
2152      * checksum to be wrong since it is not allowed to read the tuple in
2153      * this state.
2154      */
2155   }
2156   else
2157   {
2158     if (ERROR_INSERTED(4020))
2159     {
2160       goto size_change_error;
2161     }
2162 
2163     if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
2164 	unlikely(handle_size_change_after_update(req_struct,
2165                                                  base,
2166                                                  regOperPtr.p,
2167                                                  regFragPtr,
2168                                                  regTabPtr,
2169                                                  sizes) != 0))
2170     {
2171       goto size_change_error;
2172     }
2173     req_struct->m_use_rowid = false;
2174     Uint32 old_header = base->m_header_bits;
2175     base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
2176     Uint32 new_header = base->m_header_bits;
2177     if (old_header != new_header)
2178     {
2179       jam();
2180       updateChecksum(base, regTabPtr, old_header, new_header);
2181     }
2182   }
2183 
2184   if (disk_insert)
2185   {
2186     jam();
2187     Local_key tmp;
2188     Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
2189       1 : sizes[2+DD];
2190 
2191     if (ERROR_INSERTED(4021))
2192     {
2193       terrorCode = 1601;
2194       goto disk_prealloc_error;
2195     }
2196 
2197     if (!Local_key::isShort(frag_page_id))
2198     {
2199       jam();
2200       terrorCode = 1603;
2201       goto disk_prealloc_error;
2202     }
2203 
2204     int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
2205     if (unlikely(ret < 0))
2206     {
2207       jam();
2208       terrorCode = -ret;
2209       goto disk_prealloc_error;
2210     }
2211 
2212     regOperPtr.p->op_struct.bit_field.m_disk_preallocated= 1;
2213     tmp.m_page_idx= size;
2214     memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
2215 
2216     /**
2217      * Set ref from disk to mm
2218      */
2219     Local_key ref = regOperPtr.p->m_tuple_location;
2220     ref.m_page_no = frag_page_id;
2221 
2222     Tuple_header* disk_ptr= req_struct->m_disk_ptr;
2223     disk_ptr->m_header_bits = 0;
2224     disk_ptr->m_base_record_ref= ref.ref();
2225   }
2226 
2227   if (req_struct->m_reorg != ScanFragReq::REORG_ALL)
2228   {
2229     handle_reorg(req_struct, regFragPtr->fragStatus);
2230   }
2231 
2232   /* Have been successful with disk + mem, update ACC to point to
2233    * new record if necessary
2234    * Failures in disk alloc will skip this part
2235    */
2236   if (update_acc)
2237   {
2238     /* Acc stores the local key with the frag_page_id rather
2239      * than the real_page_id
2240      */
2241     ndbassert(regOperPtr.p->m_tuple_location.m_page_no == real_page_id);
2242 
2243     Local_key accKey = regOperPtr.p->m_tuple_location;
2244     accKey.m_page_no = frag_page_id;
2245     ** accminupdateptr = accKey;
2246   }
2247   else
2248   {
2249     * accminupdateptr = 0; // No accminupdate should be performed
2250   }
2251 
2252   setChecksum(req_struct->m_tuple_ptr, regTabPtr);
2253 
2254   set_tuple_state(regOperPtr.p, TUPLE_PREPARED);
2255 
2256   return 0;
2257 
2258 size_change_error:
2259   jam();
2260   terrorCode = ZMEM_NOMEM_ERROR;
2261   goto exit_error;
2262 
2263 undo_buffer_error:
2264   jam();
2265   terrorCode= ZMEM_NOMEM_ERROR;
2266   regOperPtr.p->m_undo_buffer_space = 0;
2267   if (mem_insert)
2268     regOperPtr.p->m_tuple_location.setNull();
2269   regOperPtr.p->m_copy_tuple_location.setNull();
2270   tupkeyErrorLab(req_struct);
2271   return -1;
2272 
2273 null_check_error:
2274   jam();
2275   terrorCode= ZNO_ILLEGAL_NULL_ATTR;
2276   goto update_error;
2277 
2278 mem_error:
2279   jam();
2280   if (terrorCode == 0)
2281   {
2282     terrorCode= ZMEM_NOMEM_ERROR;
2283   }
2284   goto update_error;
2285 
2286 log_space_error:
2287   jam();
2288   regOperPtr.p->m_undo_buffer_space = 0;
2289 alloc_rowid_error:
2290   jam();
2291 update_error:
2292   jam();
2293   if (mem_insert)
2294   {
2295     regOperPtr.p->op_struct.bit_field.in_active_list = false;
2296     regOperPtr.p->m_tuple_location.setNull();
2297   }
2298 exit_error:
2299   if (!regOperPtr.p->m_tuple_location.isNull())
2300   {
2301     jam();
2302     /* Memory allocated, abort insert, releasing memory if appropriate */
2303     signal->theData[0] = regOperPtr.i;
2304     do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS | ZABORT_DEALLOC);
2305   }
2306   tupkeyErrorLab(req_struct);
2307   return -1;
2308 
2309 disk_prealloc_error:
2310   jam();
2311   base->m_header_bits |= Tuple_header::FREED;
2312   setInvalidChecksum(base, regTabPtr);
2313   goto exit_error;
2314 }
2315 
2316 /* ---------------------------------------------------------------- */
2317 /* ---------------------------- DELETE ---------------------------- */
2318 /* ---------------------------------------------------------------- */
handleDeleteReq(Signal * signal,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2319 int Dbtup::handleDeleteReq(Signal* signal,
2320                            Operationrec* regOperPtr,
2321                            Fragrecord* regFragPtr,
2322                            Tablerec* regTabPtr,
2323                            KeyReqStruct *req_struct,
2324 			   bool disk)
2325 {
2326   Tuple_header* dst = alloc_copy_tuple(regTabPtr,
2327                                        &regOperPtr->m_copy_tuple_location);
2328   if (dst == 0) {
2329     terrorCode = ZMEM_NOMEM_ERROR;
2330     goto error;
2331   }
2332 
2333   // delete must set but not increment tupVersion
2334   if (!regOperPtr->is_first_operation())
2335   {
2336     jam();
2337     Operationrec* prevOp= req_struct->prevOpPtr.p;
2338     regOperPtr->op_struct.bit_field.tupVersion=
2339       prevOp->op_struct.bit_field.tupVersion;
2340     // make copy since previous op is committed before this one
2341     const Tuple_header* org = get_copy_tuple(&prevOp->m_copy_tuple_location);
2342     Uint32 len = regTabPtr->total_rec_size -
2343       Uint32(((Uint32*)dst) -
2344              get_copy_tuple_raw(&regOperPtr->m_copy_tuple_location));
2345     memcpy(dst, org, 4 * len);
2346     req_struct->m_tuple_ptr = dst;
2347   }
2348   else
2349   {
2350     regOperPtr->op_struct.bit_field.tupVersion=
2351       req_struct->m_tuple_ptr->get_tuple_version();
2352     if (regTabPtr->m_no_of_disk_attributes)
2353     {
2354       dst->m_header_bits = req_struct->m_tuple_ptr->m_header_bits;
2355       memcpy(dst->get_disk_ref_ptr(regTabPtr),
2356 	     req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
2357              sizeof(Local_key));
2358     }
2359   }
2360   req_struct->changeMask.set();
2361   set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
2362 
2363   if(disk && regOperPtr->m_undo_buffer_space == 0)
2364   {
2365     jam();
2366     regOperPtr->op_struct.bit_field.m_wait_log_buffer = 1;
2367     regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit = 1;
2368     Uint32 sz= regOperPtr->m_undo_buffer_space=
2369       (sizeof(Dbtup::Disk_undo::Free) >> 2) +
2370       regTabPtr->m_offsets[DD].m_fix_header_size - 1;
2371 
2372     D("Logfile_client - handleDeleteReq");
2373     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
2374     terrorCode= lgman.alloc_log_space(sz, jamBuffer());
2375     if(unlikely(terrorCode))
2376     {
2377       jam();
2378       regOperPtr->m_undo_buffer_space= 0;
2379       goto error;
2380     }
2381   }
2382 
2383   set_tuple_state(regOperPtr, TUPLE_PREPARED);
2384 
2385   if (req_struct->attrinfo_len == 0)
2386   {
2387     return 0;
2388   }
2389 
2390   if (regTabPtr->need_expand(disk))
2391   {
2392     prepare_read(req_struct, regTabPtr, disk);
2393   }
2394 
2395   {
2396     Uint32 RlogSize;
2397     int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
2398     if (ret == 0 && (RlogSize= req_struct->log_size))
2399     {
2400       jam();
2401       sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2402     }
2403     return ret;
2404   }
2405 
2406 error:
2407   tupkeyErrorLab(req_struct);
2408   return -1;
2409 }
2410 
2411 int
handleRefreshReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2412 Dbtup::handleRefreshReq(Signal* signal,
2413                         Ptr<Operationrec> regOperPtr,
2414                         Ptr<Fragrecord>  regFragPtr,
2415                         Tablerec* regTabPtr,
2416                         KeyReqStruct *req_struct,
2417                         bool disk)
2418 {
2419   /* Here we setup the tuple so that a transition to its current
2420    * state can be observed by SUMA's detached triggers.
2421    *
2422    * If the tuple does not exist then we fabricate a tuple
2423    * so that it can appear to be 'deleted'.
2424    *   The fabricated tuple may have invalid NULL values etc.
2425    * If the tuple does exist then we fabricate a null-change
2426    * update to the tuple.
2427    *
2428    * The logic differs depending on whether there are already
2429    * other operations on the tuple in this transaction.
2430    * No other operations (including Refresh) are allowed after
2431    * a refresh.
2432    */
2433   Uint32 refresh_case;
2434   if (regOperPtr.p->is_first_operation())
2435   {
2436     jam();
2437     if (Local_key::isInvalid(req_struct->frag_page_id,
2438                              regOperPtr.p->m_tuple_location.m_page_idx))
2439     {
2440       jam();
2441       refresh_case = Operationrec::RF_SINGLE_NOT_EXIST;
2442       //ndbout_c("case 1");
2443       /**
2444        * This is refresh of non-existing tuple...
2445        *   i.e "delete", reuse initial insert
2446        */
2447        Local_key accminupdate;
2448        Local_key * accminupdateptr = &accminupdate;
2449 
2450        /**
2451         * We don't need ...in this scenario
2452         * - disk
2453         * - default values
2454         */
2455        Uint32 save_disk = regTabPtr->m_no_of_disk_attributes;
2456        Local_key save_defaults = regTabPtr->m_default_value_location;
2457        Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2458          regTabPtr->notNullAttributeMask;
2459 
2460        regTabPtr->m_no_of_disk_attributes = 0;
2461        regTabPtr->m_default_value_location.setNull();
2462        regOperPtr.p->op_type = ZINSERT;
2463 
2464        /**
2465         * Update notNullAttributeMask  to only include primary keys
2466         */
2467        regTabPtr->notNullAttributeMask.clear();
2468        const Uint32 * primarykeys =
2469          (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2470        for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2471          regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2472 
2473        int res = handleInsertReq(signal, regOperPtr,
2474                                  regFragPtr, regTabPtr, req_struct,
2475                                  &accminupdateptr);
2476 
2477        regTabPtr->m_no_of_disk_attributes = save_disk;
2478        regTabPtr->m_default_value_location = save_defaults;
2479        regTabPtr->notNullAttributeMask = save_mask;
2480 
2481        if (unlikely(res == -1))
2482        {
2483          return -1;
2484        }
2485 
2486        regOperPtr.p->op_type = ZREFRESH;
2487 
2488        if (accminupdateptr)
2489        {
2490        /**
2491           * Update ACC local-key, once *everything* has completed succesfully
2492           */
2493          c_lqh->accminupdate(signal,
2494                              regOperPtr.p->userpointer,
2495                              accminupdateptr);
2496        }
2497     }
2498     else
2499     {
2500       refresh_case = Operationrec::RF_SINGLE_EXIST;
2501       //ndbout_c("case 2");
2502       jam();
2503 
2504       Tuple_header* origTuple = req_struct->m_tuple_ptr;
2505       Uint32 tup_version_save = origTuple->get_tuple_version();
2506       {
2507         /* Set new row version and update the tuple header */
2508         Uint32 old_header = origTuple->m_header_bits;
2509         Uint32 new_tup_version = decr_tup_version(tup_version_save);
2510         origTuple->set_tuple_version(new_tup_version);
2511         Uint32 new_header = origTuple->m_header_bits;
2512         updateChecksum(origTuple, regTabPtr, old_header, new_header);
2513       }
2514       int res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2515                                 regTabPtr, req_struct, disk);
2516 
2517       /* Now we must reset the original tuple header back
2518        * to the original version.
2519        * The copy tuple will have the correct version due to
2520        * the update incrementing it.
2521        * On commit, the tuple becomes the copy tuple.
2522        * On abort, the original tuple remains.  If we don't
2523        * reset it here, then aborts cause the version to
2524        * decrease
2525        *
2526        * We also need to recalculate checksum since we're changing the
2527        * row here.
2528        */
2529       {
2530         Uint32 old_header = origTuple->m_header_bits;
2531         origTuple->set_tuple_version(tup_version_save);
2532         Uint32 new_header = origTuple->m_header_bits;
2533         updateChecksum(origTuple, regTabPtr, old_header, new_header);
2534       }
2535       if (res == -1)
2536         return -1;
2537     }
2538   }
2539   else
2540   {
2541     /* Not first operation on tuple in transaction */
2542     jam();
2543 
2544     Uint32 tup_version_save =
2545       req_struct->prevOpPtr.p->op_struct.bit_field.tupVersion;
2546     Uint32 new_tup_version = decr_tup_version(tup_version_save);
2547     req_struct->prevOpPtr.p->op_struct.bit_field.tupVersion = new_tup_version;
2548 
2549     int res;
2550     if (req_struct->prevOpPtr.p->op_type == ZDELETE)
2551     {
2552       refresh_case = Operationrec::RF_MULTI_NOT_EXIST;
2553       //ndbout_c("case 3");
2554 
2555       jam();
2556       /**
2557        * We don't need ...in this scenario
2558        * - default values
2559        *
2560        * We keep disk attributes to avoid issues with 'insert'
2561        */
2562       Local_key save_defaults = regTabPtr->m_default_value_location;
2563       Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2564         regTabPtr->notNullAttributeMask;
2565 
2566       regTabPtr->m_default_value_location.setNull();
2567       regOperPtr.p->op_type = ZINSERT;
2568 
2569       /**
2570        * Update notNullAttributeMask  to only include primary keys
2571        */
2572       regTabPtr->notNullAttributeMask.clear();
2573       const Uint32 * primarykeys =
2574         (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2575       for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2576         regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2577 
2578       /**
2579        * This is multi-update + DELETE + REFRESH
2580        */
2581       Local_key * accminupdateptr = 0;
2582       res = handleInsertReq(signal, regOperPtr,
2583                             regFragPtr, regTabPtr, req_struct,
2584                             &accminupdateptr);
2585 
2586       regTabPtr->m_default_value_location = save_defaults;
2587       regTabPtr->notNullAttributeMask = save_mask;
2588 
2589       if (unlikely(res == -1))
2590       {
2591         return -1;
2592       }
2593 
2594       regOperPtr.p->op_type = ZREFRESH;
2595     }
2596     else
2597     {
2598       jam();
2599       refresh_case = Operationrec::RF_MULTI_EXIST;
2600       //ndbout_c("case 4");
2601       /**
2602        * This is multi-update + INSERT/UPDATE + REFRESH
2603        */
2604       res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2605                             regTabPtr, req_struct, disk);
2606     }
2607     req_struct->prevOpPtr.p->op_struct.bit_field.tupVersion = tup_version_save;
2608     if (res == -1)
2609       return -1;
2610   }
2611 
2612   /* Store the refresh scenario in the copy tuple location */
2613   // TODO : Verify this is never used as a copy tuple location!
2614   regOperPtr.p->m_copy_tuple_location.m_file_no = refresh_case;
2615   return 0;
2616 }
2617 
2618 bool
checkNullAttributes(KeyReqStruct * req_struct,Tablerec * regTabPtr)2619 Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
2620                            Tablerec* regTabPtr)
2621 {
2622 // Implement checking of updating all not null attributes in an insert here.
2623   Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
2624   /*
2625    * The idea here is maybe that changeMask is not-null attributes
2626    * and must contain notNullAttributeMask.  But:
2627    *
2628    * 1. changeMask has all bits set on insert
2629    * 2. not-null is checked in each UpdateFunction
2630    * 3. the code below does not work except trivially due to 1.
2631    *
2632    * XXX remove or fix
2633    */
2634   attributeMask.clear();
2635   attributeMask.bitOR(req_struct->changeMask);
2636   attributeMask.bitAND(regTabPtr->notNullAttributeMask);
2637   attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
2638   if (!attributeMask.isclear()) {
2639     return false;
2640   }
2641   return true;
2642 }
2643 
2644 /* ---------------------------------------------------------------- */
2645 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE    */
2646 /* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
2647 /* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL  */
2648 /* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR  PHASES*/
2649 /* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
2650 /* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
2651 /* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE        */
2652 /* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
2653 /* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL.  */
2654 /* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
2655 /* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN   */
2656 /* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND      */
2657 /* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM  */
2658 /* TO THE CLIENT APPLICATION.                                       */
2659 /* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
2660 /* THE INTERPRETER EXECUTION REGION.                                */
2661 /* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS   */
2662 /*                                                                  */
2663 /* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
2664 /* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
2665 /* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED      */
2666 /* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER.                    */
2667 /*                                                                  */
2668 /* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY   */
2669 /* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO     */
2670 /* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN     */
2671 /* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS.      */
2672 /*                                                                  */
2673 /*                                                                  */
2674 /*       -----------------------------------------                  */
2675 /*       +   INITIAL READ REGION                 +                  */
2676 /*       -----------------------------------------                  */
2677 /*       +   INTERPRETED EXECUTE  REGION         +                  */
2678 /*       -----------------------------------------                  */
2679 /*       +   FINAL UPDATE REGION                 +                  */
2680 /*       -----------------------------------------                  */
2681 /*       +   FINAL READ REGION                   +                  */
2682 /*       -----------------------------------------                  */
2683 /*       +   SUBROUTINE REGION                   +                  */
2684 /*       -----------------------------------------                  */
2685 /* ---------------------------------------------------------------- */
2686 /* ---------------------------------------------------------------- */
2687 /* ----------------- INTERPRETED EXECUTION  ----------------------- */
2688 /* ---------------------------------------------------------------- */
interpreterStartLab(Signal * signal,KeyReqStruct * req_struct)2689 int Dbtup::interpreterStartLab(Signal* signal,
2690                                KeyReqStruct *req_struct)
2691 {
2692   Operationrec * const regOperPtr = req_struct->operPtrP;
2693   int TnoDataRW;
2694   Uint32 RtotalLen, start_index, dstLen;
2695   Uint32 *dst;
2696 
2697   Uint32 RinitReadLen= cinBuffer[0];
2698   Uint32 RexecRegionLen= cinBuffer[1];
2699   Uint32 RfinalUpdateLen= cinBuffer[2];
2700   Uint32 RfinalRLen= cinBuffer[3];
2701   Uint32 RsubLen= cinBuffer[4];
2702 
2703   jam();
2704 
2705   Uint32 RattrinbufLen= req_struct->attrinfo_len;
2706   const BlockReference sendBref= req_struct->rec_blockref;
2707 
2708   const Uint32 node = refToNode(sendBref);
2709   if(node != 0 && node != getOwnNodeId()) {
2710     start_index= 25;
2711   } else {
2712     jam();
2713     /**
2714      * execute direct
2715      */
2716     start_index= 3;
2717   }
2718   dst= &signal->theData[start_index];
2719   dstLen= (MAX_READ / 4) - start_index;
2720 
2721   RtotalLen= RinitReadLen;
2722   RtotalLen += RexecRegionLen;
2723   RtotalLen += RfinalUpdateLen;
2724   RtotalLen += RfinalRLen;
2725   RtotalLen += RsubLen;
2726 
2727   Uint32 RattroutCounter= 0;
2728   Uint32 RinstructionCounter= 5;
2729 
2730   /* All information to be logged/propagated to replicas
2731    * is generated from here on so reset the log word count
2732    */
2733   Uint32 RlogSize= req_struct->log_size= 0;
2734   if (((RtotalLen + 5) == RattrinbufLen) &&
2735       (RattrinbufLen >= 5) &&
2736       (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
2737     /* ---------------------------------------------------------------- */
2738     // We start by checking consistency. We must have the first five
2739     // words of the ATTRINFO to give us the length of the regions. The
2740     // size of these regions must be the same as the total ATTRINFO
2741     // length and finally the total length must be within the limits.
2742     /* ---------------------------------------------------------------- */
2743 
2744     if (RinitReadLen > 0) {
2745       jam();
2746       /* ---------------------------------------------------------------- */
2747       // The first step that can be taken in the interpreter is to read
2748       // data of the tuple before any updates have been applied.
2749       /* ---------------------------------------------------------------- */
2750       TnoDataRW= readAttributes(req_struct,
2751 				 &cinBuffer[5],
2752 				 RinitReadLen,
2753 				 &dst[0],
2754 				 dstLen,
2755                                  false);
2756       if (TnoDataRW >= 0) {
2757 	RattroutCounter= TnoDataRW;
2758 	RinstructionCounter += RinitReadLen;
2759       } else {
2760 	jam();
2761         terrorCode = Uint32(-TnoDataRW);
2762 	tupkeyErrorLab(req_struct);
2763 	return -1;
2764       }
2765     }
2766     if (RexecRegionLen > 0) {
2767       jam();
2768       /* ---------------------------------------------------------------- */
2769       // The next step is the actual interpreted execution. This executes
2770       // a register-based virtual machine which can read and write attributes
2771       // to and from registers.
2772       /* ---------------------------------------------------------------- */
2773       Uint32 RsubPC= RinstructionCounter + RexecRegionLen
2774         + RfinalUpdateLen + RfinalRLen;
2775       TnoDataRW= interpreterNextLab(signal,
2776                                      req_struct,
2777 				     &clogMemBuffer[0],
2778 				     &cinBuffer[RinstructionCounter],
2779 				     RexecRegionLen,
2780 				     &cinBuffer[RsubPC],
2781 				     RsubLen,
2782 				     &coutBuffer[0],
2783 				     sizeof(coutBuffer) / 4);
2784       if (TnoDataRW != -1) {
2785 	RinstructionCounter += RexecRegionLen;
2786 	RlogSize= TnoDataRW;
2787       } else {
2788 	jam();
2789 	/**
2790 	 * TUPKEY REF is sent from within interpreter
2791 	 */
2792 	return -1;
2793       }
2794     }
2795 
2796     if ((RlogSize > 0) ||
2797         (RfinalUpdateLen > 0))
2798     {
2799       /* Operation updates row,
2800        * reset author pseudo-col before update takes effect
2801        * This should probably occur only if the interpreted program
2802        * did not explicitly write the value, but that requires a bit
2803        * to record whether the value has been written.
2804        */
2805       Tablerec* regTabPtr = req_struct->tablePtrP;
2806       Tuple_header* dst = req_struct->m_tuple_ptr;
2807 
2808       if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
2809       {
2810         Uint32 attrId =
2811           regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
2812 
2813         store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
2814       }
2815     }
2816 
2817     if (RfinalUpdateLen > 0) {
2818       jam();
2819       /* ---------------------------------------------------------------- */
2820       // We can also apply a set of updates without any conditions as part
2821       // of the interpreted execution.
2822       /* ---------------------------------------------------------------- */
2823       if (regOperPtr->op_type == ZUPDATE) {
2824 	TnoDataRW= updateAttributes(req_struct,
2825 				     &cinBuffer[RinstructionCounter],
2826 				     RfinalUpdateLen);
2827 	if (TnoDataRW >= 0) {
2828 	  MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
2829 			   &cinBuffer[RinstructionCounter],
2830 			   RfinalUpdateLen);
2831 	  RinstructionCounter += RfinalUpdateLen;
2832 	  RlogSize += RfinalUpdateLen;
2833 	} else {
2834 	  jam();
2835           terrorCode = Uint32(-TnoDataRW);
2836 	  tupkeyErrorLab(req_struct);
2837 	  return -1;
2838 	}
2839       } else {
2840 	return TUPKEY_abort(req_struct, 19);
2841       }
2842     }
2843     if (RfinalRLen > 0) {
2844       jam();
2845       /* ---------------------------------------------------------------- */
2846       // The final action is that we can also read the tuple after it has
2847       // been updated.
2848       /* ---------------------------------------------------------------- */
2849       TnoDataRW= readAttributes(req_struct,
2850 				 &cinBuffer[RinstructionCounter],
2851 				 RfinalRLen,
2852 				 &dst[RattroutCounter],
2853 				 (dstLen - RattroutCounter),
2854                                  false);
2855       if (TnoDataRW >= 0) {
2856 	RattroutCounter += TnoDataRW;
2857       } else {
2858 	jam();
2859         terrorCode = Uint32(-TnoDataRW);
2860 	tupkeyErrorLab(req_struct);
2861 	return -1;
2862       }
2863     }
2864     /* Add log words explicitly generated here to existing log size
2865      *  - readAttributes can generate log for ANYVALUE column
2866      *    It adds the words directly to req_struct->log_size
2867      *    This is used for ANYVALUE and interpreted delete.
2868      */
2869     req_struct->log_size+= RlogSize;
2870     req_struct->read_length += RattroutCounter;
2871     sendReadAttrinfo(signal, req_struct, RattroutCounter);
2872     if (RlogSize > 0) {
2873       return sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2874     }
2875     return 0;
2876   } else {
2877     return TUPKEY_abort(req_struct, 22);
2878   }
2879 }
2880 
2881 /* ---------------------------------------------------------------- */
2882 /*       WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
2883 /*       BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY  */
2884 /*       NODES.                                                     */
2885 /*       INPUT:  LOG_ATTRINFOPTR         WHERE TO FETCH DATA FROM   */
2886 /*               TLOG_START              FIRST INDEX TO LOG         */
2887 /*               TLOG_END                LAST INDEX + 1 TO LOG      */
2888 /* ---------------------------------------------------------------- */
sendLogAttrinfo(Signal * signal,KeyReqStruct * req_struct,Uint32 TlogSize,Operationrec * const regOperPtr)2889 int Dbtup::sendLogAttrinfo(Signal* signal,
2890                            KeyReqStruct * req_struct,
2891                            Uint32 TlogSize,
2892                            Operationrec *  const regOperPtr)
2893 
2894 {
2895   /* Copy from Log buffer to segmented section,
2896    * then attach to ATTRINFO and execute direct
2897    * to LQH
2898    */
2899   ndbrequire( TlogSize > 0 );
2900   Uint32 longSectionIVal= RNIL;
2901   bool ok= appendToSection(longSectionIVal,
2902                            &clogMemBuffer[0],
2903                            TlogSize);
2904   if (unlikely(!ok))
2905   {
2906     /* Resource error, abort transaction */
2907     terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
2908     tupkeyErrorLab(req_struct);
2909     return -1;
2910   }
2911 
2912   /* Send a TUP_ATTRINFO signal to LQH, which contains
2913    * the relevant user pointer and the attrinfo section's
2914    * IVAL
2915    */
2916   signal->theData[0]= regOperPtr->userpointer;
2917   signal->theData[1]= TlogSize;
2918   signal->theData[2]= longSectionIVal;
2919 
2920   EXECUTE_DIRECT(DBLQH,
2921                  GSN_TUP_ATTRINFO,
2922                  signal,
2923                  3);
2924   return 0;
2925 }
2926 
2927 inline
2928 Uint32
brancher(Uint32 TheInstruction,Uint32 TprogramCounter)2929 Dbtup::brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
2930 {
2931   Uint32 TbranchDirection= TheInstruction >> 31;
2932   Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
2933   TprogramCounter--;
2934   if (TbranchDirection == 1) {
2935     jam();
2936     /* ---------------------------------------------------------------- */
2937     /*       WE JUMP BACKWARDS.                                         */
2938     /* ---------------------------------------------------------------- */
2939     return (TprogramCounter - TbranchLength);
2940   } else {
2941     jam();
2942     /* ---------------------------------------------------------------- */
2943     /*       WE JUMP FORWARD.                                           */
2944     /* ---------------------------------------------------------------- */
2945     return (TprogramCounter + TbranchLength);
2946   }
2947 }
2948 
2949 const Uint32 *
lookupInterpreterParameter(Uint32 paramNo,const Uint32 * subptr,Uint32 sublen) const2950 Dbtup::lookupInterpreterParameter(Uint32 paramNo,
2951                                   const Uint32 * subptr,
2952                                   Uint32 sublen) const
2953 {
2954   /**
2955    * The parameters...are stored in the subroutine section
2956    *
2957    * WORD2         WORD3       WORD4         WORD5
2958    * [ P0 HEADER ] [ P0 DATA ] [ P1 HEADER ] [ P1 DATA ]
2959    *
2960    *
2961    * len=4 <=> 1 word
2962    */
2963   Uint32 pos = 0;
2964   while (paramNo)
2965   {
2966     const Uint32 * head = subptr + pos;
2967     Uint32 len = AttributeHeader::getDataSize(* head);
2968     paramNo --;
2969     pos += 1 + len;
2970     if (unlikely(pos >= sublen))
2971       return 0;
2972   }
2973 
2974   const Uint32 * head = subptr + pos;
2975   Uint32 len = AttributeHeader::getDataSize(* head);
2976   if (unlikely(pos + 1 + len > sublen))
2977     return 0;
2978 
2979   return head;
2980 }
2981 
interpreterNextLab(Signal * signal,KeyReqStruct * req_struct,Uint32 * logMemory,Uint32 * mainProgram,Uint32 TmainProgLen,Uint32 * subroutineProg,Uint32 TsubroutineLen,Uint32 * tmpArea,Uint32 tmpAreaSz)2982 int Dbtup::interpreterNextLab(Signal* signal,
2983                               KeyReqStruct* req_struct,
2984                               Uint32* logMemory,
2985                               Uint32* mainProgram,
2986                               Uint32 TmainProgLen,
2987                               Uint32* subroutineProg,
2988                               Uint32 TsubroutineLen,
2989 			      Uint32 * tmpArea,
2990 			      Uint32 tmpAreaSz)
2991 {
2992   register Uint32* TcurrentProgram= mainProgram;
2993   register Uint32 TcurrentSize= TmainProgLen;
2994   register Uint32 TprogramCounter= 0;
2995   register Uint32 theInstruction;
2996   register Uint32 theRegister;
2997   Uint32 TdataWritten= 0;
2998   Uint32 RstackPtr= 0;
2999   union {
3000     Uint32 TregMemBuffer[32];
3001     Uint64 align[16];
3002   };
3003   (void)align; // kill warning
3004   Uint32 TstackMemBuffer[32];
3005 
3006   Uint32& RnoOfInstructions = req_struct->no_exec_instructions;
3007   ndbassert(RnoOfInstructions == 0);
3008   /* ---------------------------------------------------------------- */
3009   // Initialise all 8 registers to contain the NULL value.
3010   // In this version we can handle 32 and 64 bit unsigned integers.
3011   // They are handled as 64 bit values. Thus the 32 most significant
3012   // bits are zeroed for 32 bit values.
3013   /* ---------------------------------------------------------------- */
3014   TregMemBuffer[0]= 0;
3015   TregMemBuffer[4]= 0;
3016   TregMemBuffer[8]= 0;
3017   TregMemBuffer[12]= 0;
3018   TregMemBuffer[16]= 0;
3019   TregMemBuffer[20]= 0;
3020   TregMemBuffer[24]= 0;
3021   TregMemBuffer[28]= 0;
3022   Uint32 tmpHabitant= ~0;
3023 
3024   while (RnoOfInstructions < 8000) {
3025     /* ---------------------------------------------------------------- */
3026     /* EXECUTE THE NEXT INTERPRETER INSTRUCTION.                        */
3027     /* ---------------------------------------------------------------- */
3028     RnoOfInstructions++;
3029     theInstruction= TcurrentProgram[TprogramCounter];
3030     theRegister= Interpreter::getReg1(theInstruction) << 2;
3031 #ifdef TRACE_INTERPRETER
3032     ndbout_c("Interpreter : RnoOfInstructions : %u.  TprogramCounter : %u.  Opcode : %u",
3033              RnoOfInstructions, TprogramCounter, Interpreter::getOpCode(theInstruction));
3034 #endif
3035     if (TprogramCounter < TcurrentSize) {
3036       TprogramCounter++;
3037       switch (Interpreter::getOpCode(theInstruction)) {
3038       case Interpreter::READ_ATTR_INTO_REG:
3039 	jam();
3040 	/* ---------------------------------------------------------------- */
3041 	// Read an attribute from the tuple into a register.
3042 	// While reading an attribute we allow the attribute to be an array
3043 	// as long as it fits in the 64 bits of the register.
3044 	/* ---------------------------------------------------------------- */
3045 	{
3046 	  Uint32 theAttrinfo= theInstruction;
3047 	  int TnoDataRW= readAttributes(req_struct,
3048 				     &theAttrinfo,
3049 				     (Uint32)1,
3050 				     &TregMemBuffer[theRegister],
3051 				     (Uint32)3,
3052                                      false);
3053 	  if (TnoDataRW == 2) {
3054 	    /* ------------------------------------------------------------- */
3055 	    // Two words read means that we get the instruction plus one 32
3056 	    // word read. Thus we set the register to be a 32 bit register.
3057 	    /* ------------------------------------------------------------- */
3058 	    TregMemBuffer[theRegister]= 0x50;
3059             // arithmetic conversion if big-endian
3060             * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
3061 	  } else if (TnoDataRW == 3) {
3062 	    /* ------------------------------------------------------------- */
3063 	    // Three words read means that we get the instruction plus two
3064 	    // 32 words read. Thus we set the register to be a 64 bit register.
3065 	    /* ------------------------------------------------------------- */
3066 	    TregMemBuffer[theRegister]= 0x60;
3067             TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
3068             TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
3069 	  } else if (TnoDataRW == 1) {
3070 	    /* ------------------------------------------------------------- */
3071 	    // One word read means that we must have read a NULL value. We set
3072 	    // the register to indicate a NULL value.
3073 	    /* ------------------------------------------------------------- */
3074 	    TregMemBuffer[theRegister]= 0;
3075 	    TregMemBuffer[theRegister + 2]= 0;
3076 	    TregMemBuffer[theRegister + 3]= 0;
3077 	  } else if (TnoDataRW < 0) {
3078 	    jam();
3079             terrorCode = Uint32(-TnoDataRW);
3080 	    tupkeyErrorLab(req_struct);
3081 	    return -1;
3082 	  } else {
3083 	    /* ------------------------------------------------------------- */
3084 	    // Any other return value from the read attribute here is not
3085 	    // allowed and will lead to a system crash.
3086 	    /* ------------------------------------------------------------- */
3087 	    ndbrequire(false);
3088 	  }
3089 	  break;
3090 	}
3091 
3092       case Interpreter::WRITE_ATTR_FROM_REG:
3093 	jam();
3094 	{
3095 	  Uint32 TattrId= theInstruction >> 16;
3096 	  Uint32 TattrDescrIndex= req_struct->tablePtrP->tabDescriptor +
3097 	    (TattrId << ZAD_LOG_SIZE);
3098 	  Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
3099 	  Uint32 TregType= TregMemBuffer[theRegister];
3100 
3101 	  /* --------------------------------------------------------------- */
3102 	  // Calculate the number of words of this attribute.
3103 	  // We allow writes into arrays as long as they fit into the 64 bit
3104 	  // register size.
3105 	  /* --------------------------------------------------------------- */
3106           Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
3107 	  Uint32 Toptype = req_struct->operPtrP->op_type;
3108 	  Uint32 TdataForUpdate[3];
3109 	  Uint32 Tlen;
3110 
3111 	  AttributeHeader ah(TattrId, TattrNoOfWords << 2);
3112           TdataForUpdate[0]= ah.m_value;
3113 	  TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
3114 	  TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
3115 	  Tlen= TattrNoOfWords + 1;
3116 	  if (Toptype == ZUPDATE) {
3117 	    if (TattrNoOfWords <= 2) {
3118               if (TattrNoOfWords == 1) {
3119                 // arithmetic conversion if big-endian
3120                 Int64 * tmp = new (&TregMemBuffer[theRegister + 2]) Int64;
3121                 TdataForUpdate[1] = Uint32(* tmp);
3122                 TdataForUpdate[2] = 0;
3123               }
3124 	      if (TregType == 0) {
3125 		/* --------------------------------------------------------- */
3126 		// Write a NULL value into the attribute
3127 		/* --------------------------------------------------------- */
3128 		ah.setNULL();
3129                 TdataForUpdate[0]= ah.m_value;
3130 		Tlen= 1;
3131 	      }
3132 	      int TnoDataRW= updateAttributes(req_struct,
3133 					   &TdataForUpdate[0],
3134 					   Tlen);
3135 	      if (TnoDataRW >= 0) {
3136 		/* --------------------------------------------------------- */
3137 		// Write the written data also into the log buffer so that it
3138 		// will be logged.
3139 		/* --------------------------------------------------------- */
3140 		logMemory[TdataWritten + 0]= TdataForUpdate[0];
3141 		logMemory[TdataWritten + 1]= TdataForUpdate[1];
3142 		logMemory[TdataWritten + 2]= TdataForUpdate[2];
3143 		TdataWritten += Tlen;
3144 	      } else {
3145                 terrorCode = Uint32(-TnoDataRW);
3146 		tupkeyErrorLab(req_struct);
3147 		return -1;
3148 	      }
3149 	    } else {
3150 	      return TUPKEY_abort(req_struct, 15);
3151 	    }
3152 	  } else {
3153 	    return TUPKEY_abort(req_struct, 16);
3154 	  }
3155 	  break;
3156 	}
3157 
3158       case Interpreter::LOAD_CONST_NULL:
3159 	jam();
3160 	TregMemBuffer[theRegister]= 0;	/* NULL INDICATOR */
3161 	break;
3162 
3163       case Interpreter::LOAD_CONST16:
3164 	jam();
3165 	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
3166 	* (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
3167 	break;
3168 
3169       case Interpreter::LOAD_CONST32:
3170 	jam();
3171 	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
3172 	* (Int64*)(TregMemBuffer+theRegister+2)= *
3173 	  (TcurrentProgram+TprogramCounter);
3174 	TprogramCounter++;
3175 	break;
3176 
3177       case Interpreter::LOAD_CONST64:
3178 	jam();
3179 	TregMemBuffer[theRegister]= 0x60;	/* 64 BIT UNSIGNED CONSTANT */
3180         TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
3181                                              TprogramCounter++);
3182         TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
3183                                              TprogramCounter++);
3184 	break;
3185 
3186       case Interpreter::ADD_REG_REG:
3187 	jam();
3188 	{
3189 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3190 	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
3191 
3192 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3193 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3194 
3195 
3196 	  Uint32 TleftType= TregMemBuffer[theRegister];
3197 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3198 
3199 	  if ((TleftType | TrightType) != 0) {
3200 	    Uint64 Tdest0= Tleft0 + Tright0;
3201 	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
3202 	    TregMemBuffer[TdestRegister]= 0x60;
3203 	  } else {
3204 	    return TUPKEY_abort(req_struct, 20);
3205 	  }
3206 	  break;
3207 	}
3208 
3209       case Interpreter::SUB_REG_REG:
3210 	jam();
3211 	{
3212 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3213 	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
3214 
3215 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3216 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3217 
3218 	  Uint32 TleftType= TregMemBuffer[theRegister];
3219 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3220 
3221 	  if ((TleftType | TrightType) != 0) {
3222 	    Int64 Tdest0= Tleft0 - Tright0;
3223 	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
3224 	    TregMemBuffer[TdestRegister]= 0x60;
3225 	  } else {
3226 	    return TUPKEY_abort(req_struct, 20);
3227 	  }
3228 	  break;
3229 	}
3230 
3231       case Interpreter::BRANCH:
3232 	TprogramCounter= brancher(theInstruction, TprogramCounter);
3233 	break;
3234 
3235       case Interpreter::BRANCH_REG_EQ_NULL:
3236 	if (TregMemBuffer[theRegister] != 0) {
3237 	  jam();
3238 	  continue;
3239 	} else {
3240 	  jam();
3241 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
3242 	}
3243 	break;
3244 
3245       case Interpreter::BRANCH_REG_NE_NULL:
3246 	if (TregMemBuffer[theRegister] == 0) {
3247 	  jam();
3248 	  continue;
3249 	} else {
3250 	  jam();
3251 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
3252 	}
3253 	break;
3254 
3255 
3256       case Interpreter::BRANCH_EQ_REG_REG:
3257 	{
3258 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3259 
3260 	  Uint32 TleftType= TregMemBuffer[theRegister];
3261 	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
3262 	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
3263 
3264 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3265 	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
3266 	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
3267 	  if ((TrightType | TleftType) != 0) {
3268 	    jam();
3269 	    if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
3270 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3271 	    }
3272 	  } else {
3273 	    return TUPKEY_abort(req_struct, 23);
3274 	  }
3275 	  break;
3276 	}
3277 
3278       case Interpreter::BRANCH_NE_REG_REG:
3279 	{
3280 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3281 
3282 	  Uint32 TleftType= TregMemBuffer[theRegister];
3283 	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
3284 	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
3285 
3286 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3287 	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
3288 	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
3289 	  if ((TrightType | TleftType) != 0) {
3290 	    jam();
3291 	    if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
3292 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3293 	    }
3294 	  } else {
3295 	    return TUPKEY_abort(req_struct, 24);
3296 	  }
3297 	  break;
3298 	}
3299 
3300       case Interpreter::BRANCH_LT_REG_REG:
3301 	{
3302 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3303 
3304 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3305 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3306 
3307 	  Uint32 TleftType= TregMemBuffer[theRegister];
3308 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3309 
3310 
3311 	  if ((TrightType | TleftType) != 0) {
3312 	    jam();
3313 	    if (Tleft0 < Tright0) {
3314 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3315 	    }
3316 	  } else {
3317 	    return TUPKEY_abort(req_struct, 24);
3318 	  }
3319 	  break;
3320 	}
3321 
3322       case Interpreter::BRANCH_LE_REG_REG:
3323 	{
3324 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3325 
3326 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3327 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3328 
3329 	  Uint32 TleftType= TregMemBuffer[theRegister];
3330 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3331 
3332 
3333 	  if ((TrightType | TleftType) != 0) {
3334 	    jam();
3335 	    if (Tleft0 <= Tright0) {
3336 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3337 	    }
3338 	  } else {
3339 	    return TUPKEY_abort(req_struct, 26);
3340 	  }
3341 	  break;
3342 	}
3343 
3344       case Interpreter::BRANCH_GT_REG_REG:
3345 	{
3346 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3347 
3348 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3349 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3350 
3351 	  Uint32 TleftType= TregMemBuffer[theRegister];
3352 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3353 
3354 
3355 	  if ((TrightType | TleftType) != 0) {
3356 	    jam();
3357 	    if (Tleft0 > Tright0){
3358 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3359 	    }
3360 	  } else {
3361 	    return TUPKEY_abort(req_struct, 27);
3362 	  }
3363 	  break;
3364 	}
3365 
3366       case Interpreter::BRANCH_GE_REG_REG:
3367 	{
3368 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3369 
3370 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3371 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3372 
3373 	  Uint32 TleftType= TregMemBuffer[theRegister];
3374 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3375 
3376 
3377 	  if ((TrightType | TleftType) != 0) {
3378 	    jam();
3379 	    if (Tleft0 >= Tright0){
3380 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3381 	    }
3382 	  } else {
3383 	    return TUPKEY_abort(req_struct, 28);
3384 	  }
3385 	  break;
3386 	}
3387 
3388       case Interpreter::BRANCH_ATTR_OP_ARG_2:
3389       case Interpreter::BRANCH_ATTR_OP_ARG:{
3390 	jam();
3391 	Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
3392 	Uint32 ins2 = TcurrentProgram[TprogramCounter];
3393 	Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
3394 	Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
3395         Uint32 step = argLen;
3396 
3397 	if(tmpHabitant != attrId){
3398 	  Int32 TnoDataR = readAttributes(req_struct,
3399 					  &attrId, 1,
3400 					  tmpArea, tmpAreaSz,
3401                                           false);
3402 
3403 	  if (TnoDataR < 0) {
3404 	    jam();
3405             terrorCode = Uint32(-TnoDataR);
3406 	    tupkeyErrorLab(req_struct);
3407 	    return -1;
3408 	  }
3409 	  tmpHabitant= attrId;
3410 	}
3411 
3412         // get type
3413 	attrId >>= 16;
3414 	Uint32 TattrDescrIndex = req_struct->tablePtrP->tabDescriptor +
3415 	  (attrId << ZAD_LOG_SIZE);
3416 	Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
3417 	Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
3418 	Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
3419 	void * cs = 0;
3420 	if(AttributeOffset::getCharsetFlag(TattrDesc2))
3421 	{
3422 	  Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
3423 	  cs = req_struct->tablePtrP->charsetArray[pos];
3424 	}
3425 	const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
3426 
3427         // get data
3428 	AttributeHeader ah(tmpArea[0]);
3429         const char* s1 = (char*)&tmpArea[1];
3430         const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
3431         // fixed length in 5.0
3432 	Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
3433 
3434         if (Interpreter::getOpCode(theInstruction) ==
3435             Interpreter::BRANCH_ATTR_OP_ARG_2)
3436         {
3437           jam();
3438           Uint32 paramNo = Interpreter::getBranchCol_ParamNo(ins2);
3439           const Uint32 * paramptr = lookupInterpreterParameter(paramNo,
3440                                                                subroutineProg,
3441                                                                TsubroutineLen);
3442           if (unlikely(paramptr == 0))
3443           {
3444             jam();
3445             terrorCode = 99; // TODO
3446             tupkeyErrorLab(req_struct);
3447             return -1;
3448           }
3449 
3450           argLen = AttributeHeader::getByteSize(* paramptr);
3451           step = 0;
3452           s2 = (char*)(paramptr + 1);
3453         }
3454 
3455         if (typeId == NDB_TYPE_BIT)
3456         {
3457           /* Size in bytes for bit fields can be incorrect due to
3458            * rounding down
3459            */
3460           Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
3461                                    + 7) / 8;
3462           attrLen= bitFieldAttrLen;
3463         }
3464 
3465 	bool r1_null = ah.isNULL();
3466 	bool r2_null = argLen == 0;
3467 	int res1;
3468         if (cond <= Interpreter::GE)
3469         {
3470           /* Inequality - EQ, NE, LT, LE, GT, GE */
3471           if (r1_null || r2_null) {
3472             // NULL==NULL and NULL<not-NULL
3473             res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
3474           } else {
3475 	    jam();
3476 	    if (unlikely(sqlType.m_cmp == 0))
3477 	    {
3478 	      return TUPKEY_abort(req_struct, 40);
3479 	    }
3480             res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen);
3481           }
3482 	} else {
3483           if ((cond == Interpreter::LIKE) ||
3484               (cond == Interpreter::NOT_LIKE))
3485           {
3486             if (r1_null || r2_null) {
3487               // NULL like NULL is true (has no practical use)
3488               res1 =  r1_null && r2_null ? 0 : -1;
3489             } else {
3490               jam();
3491               if (unlikely(sqlType.m_like == 0))
3492               {
3493                 return TUPKEY_abort(req_struct, 40);
3494               }
3495               res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
3496             }
3497           }
3498           else
3499           {
3500             /* AND_XX_MASK condition */
3501             ndbassert(cond <= Interpreter::AND_NE_ZERO);
3502             if (unlikely(sqlType.m_mask == 0))
3503             {
3504               return TUPKEY_abort(req_struct,40);
3505             }
3506             /* If either arg is NULL, we say COL AND MASK
3507              * NE_ZERO and NE_MASK.
3508              */
3509             if (r1_null || r2_null) {
3510               res1= 1;
3511             } else {
3512 
3513               bool cmpZero=
3514                 (cond == Interpreter::AND_EQ_ZERO) ||
3515                 (cond == Interpreter::AND_NE_ZERO);
3516 
3517               res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
3518             }
3519           }
3520         }
3521 
3522         int res = 0;
3523         switch ((Interpreter::BinaryCondition)cond) {
3524         case Interpreter::EQ:
3525           res = (res1 == 0);
3526           break;
3527         case Interpreter::NE:
3528           res = (res1 != 0);
3529           break;
3530         // note the condition is backwards
3531         case Interpreter::LT:
3532           res = (res1 > 0);
3533           break;
3534         case Interpreter::LE:
3535           res = (res1 >= 0);
3536           break;
3537         case Interpreter::GT:
3538           res = (res1 < 0);
3539           break;
3540         case Interpreter::GE:
3541           res = (res1 <= 0);
3542           break;
3543         case Interpreter::LIKE:
3544           res = (res1 == 0);
3545           break;
3546         case Interpreter::NOT_LIKE:
3547           res = (res1 == 1);
3548           break;
3549         case Interpreter::AND_EQ_MASK:
3550           res = (res1 == 0);
3551           break;
3552         case Interpreter::AND_NE_MASK:
3553           res = (res1 != 0);
3554           break;
3555         case Interpreter::AND_EQ_ZERO:
3556           res = (res1 == 0);
3557           break;
3558         case Interpreter::AND_NE_ZERO:
3559           res = (res1 != 0);
3560           break;
3561 	  // XXX handle invalid value
3562         }
3563 #ifdef TRACE_INTERPRETER
3564 	ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
3565 		 cond, attrId >> 16,
3566                  attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
3567 #endif
3568         if (res)
3569           TprogramCounter = brancher(theInstruction, TprogramCounter);
3570         else
3571 	{
3572           Uint32 tmp = ((step + 3) >> 2) + 1;
3573           TprogramCounter += tmp;
3574         }
3575 	break;
3576       }
3577 
3578       case Interpreter::BRANCH_ATTR_EQ_NULL:{
3579 	jam();
3580 	Uint32 ins2= TcurrentProgram[TprogramCounter];
3581 	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3582 
3583 	if (tmpHabitant != attrId){
3584 	  Int32 TnoDataR= readAttributes(req_struct,
3585 					  &attrId, 1,
3586 					  tmpArea, tmpAreaSz,
3587                                           false);
3588 
3589 	  if (TnoDataR < 0) {
3590 	    jam();
3591             terrorCode = Uint32(-TnoDataR);
3592 	    tupkeyErrorLab(req_struct);
3593 	    return -1;
3594 	  }
3595 	  tmpHabitant= attrId;
3596 	}
3597 
3598 	AttributeHeader ah(tmpArea[0]);
3599 	if (ah.isNULL()){
3600 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
3601 	} else {
3602 	  TprogramCounter ++;
3603 	}
3604 	break;
3605       }
3606 
3607       case Interpreter::BRANCH_ATTR_NE_NULL:{
3608 	jam();
3609 	Uint32 ins2= TcurrentProgram[TprogramCounter];
3610 	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3611 
3612 	if (tmpHabitant != attrId){
3613 	  Int32 TnoDataR= readAttributes(req_struct,
3614 					  &attrId, 1,
3615 					  tmpArea, tmpAreaSz,
3616                                           false);
3617 
3618 	  if (TnoDataR < 0) {
3619 	    jam();
3620             terrorCode = Uint32(-TnoDataR);
3621 	    tupkeyErrorLab(req_struct);
3622 	    return -1;
3623 	  }
3624 	  tmpHabitant= attrId;
3625 	}
3626 
3627 	AttributeHeader ah(tmpArea[0]);
3628 	if (ah.isNULL()){
3629 	  TprogramCounter ++;
3630 	} else {
3631 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
3632 	}
3633 	break;
3634       }
3635 
3636       case Interpreter::EXIT_OK:
3637 	jam();
3638 #ifdef TRACE_INTERPRETER
3639 	ndbout_c(" - exit_ok");
3640 #endif
3641 	return TdataWritten;
3642 
3643       case Interpreter::EXIT_OK_LAST:
3644 	jam();
3645 #ifdef TRACE_INTERPRETER
3646 	ndbout_c(" - exit_ok_last");
3647 #endif
3648 	req_struct->last_row= true;
3649 	return TdataWritten;
3650 
3651       case Interpreter::EXIT_REFUSE:
3652 	jam();
3653 #ifdef TRACE_INTERPRETER
3654 	ndbout_c(" - exit_nok");
3655 #endif
3656 	terrorCode= theInstruction >> 16;
3657 	return TUPKEY_abort(req_struct, 29);
3658 
3659       case Interpreter::CALL:
3660 	jam();
3661 #ifdef TRACE_INTERPRETER
3662         ndbout_c(" - call addr=%u, subroutine len=%u ret addr=%u",
3663                  theInstruction >> 16, TsubroutineLen, TprogramCounter);
3664 #endif
3665 	RstackPtr++;
3666 	if (RstackPtr < 32) {
3667           TstackMemBuffer[RstackPtr]= TprogramCounter;
3668           TprogramCounter= theInstruction >> 16;
3669 	  if (TprogramCounter < TsubroutineLen) {
3670 	    TcurrentProgram= subroutineProg;
3671 	    TcurrentSize= TsubroutineLen;
3672 	  } else {
3673 	    return TUPKEY_abort(req_struct, 30);
3674 	  }
3675 	} else {
3676 	  return TUPKEY_abort(req_struct, 31);
3677 	}
3678 	break;
3679 
3680       case Interpreter::RETURN:
3681 	jam();
3682 #ifdef TRACE_INTERPRETER
3683         ndbout_c(" - return to %u from stack level %u",
3684                  TstackMemBuffer[RstackPtr],
3685                  RstackPtr);
3686 #endif
3687 	if (RstackPtr > 0) {
3688 	  TprogramCounter= TstackMemBuffer[RstackPtr];
3689 	  RstackPtr--;
3690 	  if (RstackPtr == 0) {
3691 	    jam();
3692 	    /* ------------------------------------------------------------- */
3693 	    // We are back to the main program.
3694 	    /* ------------------------------------------------------------- */
3695 	    TcurrentProgram= mainProgram;
3696 	    TcurrentSize= TmainProgLen;
3697 	  }
3698 	} else {
3699 	  return TUPKEY_abort(req_struct, 32);
3700 	}
3701 	break;
3702 
3703       default:
3704 	return TUPKEY_abort(req_struct, 33);
3705       }
3706     } else {
3707       return TUPKEY_abort(req_struct, 34);
3708     }
3709   }
3710   return TUPKEY_abort(req_struct, 35);
3711 }
3712 
3713 /**
3714  * expand_var_part - copy packed variable attributes to fully expanded size
3715  *
3716  * dst:        where to start writing attribute data
3717  * dst_off_ptr where to write attribute offsets
3718  * src         pointer to packed attributes
3719  * tabDesc     array of attribute descriptors (used for getting max size)
3720  * no_of_attr  no of atributes to expand
3721  */
3722 static
3723 Uint32*
expand_var_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,const Uint32 * tabDesc,const Uint16 * order)3724 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
3725 		const Uint32* src,
3726 		const Uint32 * tabDesc,
3727 		const Uint16* order)
3728 {
3729   char* dst_ptr= dst->m_data_ptr;
3730   Uint32 no_attr= dst->m_var_len_offset;
3731   Uint16* dst_off_ptr= dst->m_offset_array_ptr;
3732   Uint16* dst_len_ptr= dst_off_ptr + no_attr;
3733   const Uint16* src_off_ptr= (const Uint16*)src;
3734   const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
3735 
3736   Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
3737   for(Uint32 i = 0; i<no_attr; i++)
3738   {
3739     next_pos= *src_off_ptr++;
3740     len= next_pos - tmp;
3741 
3742     *dst_off_ptr++ = dst_off;
3743     *dst_len_ptr++ = dst_off + len;
3744     memcpy(dst_ptr, src_ptr, len);
3745     src_ptr += len;
3746 
3747     max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
3748     dst_ptr += max_len; // Max size
3749     dst_off += max_len;
3750 
3751     tmp= next_pos;
3752   }
3753 
3754   return ALIGN_WORD(dst_ptr);
3755 }
3756 
3757 void
expand_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],Tuple_header * src,const Tablerec * tabPtrP,bool disk)3758 Dbtup::expand_tuple(KeyReqStruct* req_struct,
3759 		    Uint32 sizes[2],
3760 		    Tuple_header* src,
3761 		    const Tablerec* tabPtrP,
3762 		    bool disk)
3763 {
3764   Uint32 bits= src->m_header_bits;
3765   Uint32 extra_bits = bits;
3766   Tuple_header* ptr= req_struct->m_tuple_ptr;
3767 
3768   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3769   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3770   Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3771   Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3772   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3773   Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
3774   Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3775 
3776   Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3777   const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
3778   const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
3779   const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
3780   const Uint32 *desc= (Uint32*)req_struct->attr_descr;
3781   const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3782   order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
3783 
3784   // Copy fix part
3785   sizes[MM]= 1;
3786   memcpy(ptr, src, 4*fix_size);
3787   if(mm_vars || mm_dyns)
3788   {
3789     /*
3790      * Reserve place for initial length word and offset array (with one extra
3791      * offset). This will be filled-in in later, in shrink_tuple().
3792      */
3793     dst_ptr += Varpart_copy::SZ32;
3794 
3795     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3796     Uint32 step; // in bytes
3797     Uint32 src_len;
3798     const Uint32 *src_data;
3799     if (bits & Tuple_header::VAR_PART)
3800     {
3801       KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3802       if(! (bits & Tuple_header::COPY_TUPLE))
3803       {
3804         /* This is for the initial expansion of a stored row. */
3805         Ptr<Page> var_page;
3806         src_data= get_ptr(&var_page, *var_ref);
3807         src_len= get_len(&var_page, *var_ref);
3808         sizes[MM]= src_len;
3809         step= 0;
3810         req_struct->m_varpart_page_ptr = var_page;
3811 
3812         /* An original tuple cant have grown as we're expanding it...
3813          * else we would be "re-expand"*/
3814         ndbassert(! (bits & Tuple_header::MM_GROWN));
3815       }
3816       else
3817       {
3818         /* This is for the re-expansion of a shrunken row (update2 ...) */
3819 
3820         Varpart_copy* vp = (Varpart_copy*)src_ptr;
3821         src_len = vp->m_len;
3822         src_data= vp->m_data;
3823         step= (Varpart_copy::SZ32 + src_len); // 1+ is for extra word
3824         req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
3825         sizes[MM]= src_len;
3826       }
3827 
3828       if (mm_vars)
3829       {
3830         dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
3831         dst->m_offset_array_ptr= req_struct->var_pos_array;
3832         dst->m_var_len_offset= mm_vars;
3833         dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
3834 
3835         dst_ptr= expand_var_part(dst, src_data, desc, order);
3836         ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
3837         /**
3838          * Move to end of fix varpart
3839          */
3840         char* varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3841         Uint32 varlen = ((Uint16*)src_data)[mm_vars];
3842         Uint32 *dynstart = ALIGN_WORD(varstart + varlen);
3843 
3844         ndbassert(src_len >= (dynstart - src_data));
3845         src_len -= Uint32(dynstart - src_data);
3846         src_data = dynstart;
3847       }
3848     }
3849     else
3850     {
3851       /**
3852        * No varpart...only allowed for dynattr
3853        */
3854       ndbassert(mm_vars == 0);
3855       src_len = step = sizes[MM] = 0;
3856       src_data = 0;
3857     }
3858 
3859     if (mm_dyns)
3860     {
3861       /**
3862        * dynattr needs to be expanded even if no varpart existed before
3863        */
3864       dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
3865       dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
3866       dst->m_max_dyn_offset= tabPtrP->m_offsets[MM].m_max_dyn_offset;
3867       dst->m_dyn_data_ptr= (char*)dst_ptr;
3868       dst_ptr= expand_dyn_part(dst, src_data,
3869                                src_len,
3870                                desc, order + mm_vars,
3871                                mm_dynvar, mm_dynfix,
3872                                tabPtrP->m_offsets[MM].m_dyn_null_words);
3873     }
3874 
3875     ndbassert((UintPtr(src_ptr) & 3) == 0);
3876     src_ptr = src_ptr + step;
3877   }
3878 
3879   src->m_header_bits= bits &
3880     ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
3881 
3882   /**
3883    * The source tuple only touches the header parts. The updates of the
3884    * tuple is applied on the new copy tuple. We still need to ensure that
3885    * the checksum is correct on the tuple even after changing the header
3886    * parts since the header is part of the checksum. This is not covered
3887    * by setting checksum normally since mostly we don't touch the
3888    * original tuple.
3889    */
3890   updateChecksum(src, tabPtrP, bits, src->m_header_bits);
3891 
3892   sizes[DD]= 0;
3893   if(disk && dd_tot)
3894   {
3895     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3896     order+= mm_vars+mm_dynvar+mm_dynfix;
3897 
3898     if(bits & Tuple_header::DISK_INLINE)
3899     {
3900       // Only on copy tuple
3901       ndbassert(bits & Tuple_header::COPY_TUPLE);
3902     }
3903     else
3904     {
3905       Local_key key;
3906       memcpy(&key, disk_ref, sizeof(key));
3907       key.m_page_no= req_struct->m_disk_page_ptr.i;
3908       src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3909     }
3910     extra_bits |= Tuple_header::DISK_INLINE;
3911 
3912     // Fix diskpart
3913     req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
3914     memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3915     sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3916 
3917     ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3918 
3919     ndbrequire(dd_vars == 0);
3920   }
3921 
3922   ptr->m_header_bits= (extra_bits | Tuple_header::COPY_TUPLE);
3923   req_struct->is_expanded= true;
3924 }
3925 
3926 void
dump_tuple(const KeyReqStruct * req_struct,const Tablerec * tabPtrP)3927 Dbtup::dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP)
3928 {
3929   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3930   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3931   //Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3932   const Tuple_header* ptr= req_struct->m_tuple_ptr;
3933   Uint32 bits= ptr->m_header_bits;
3934   const Uint32 *tuple_words= (Uint32 *)ptr;
3935   const Uint32 *fix_p;
3936   Uint32 fix_len;
3937   const Uint32 *var_p;
3938   Uint32 var_len;
3939   //const Uint32 *disk_p;
3940   //Uint32 disk_len;
3941   const char *typ;
3942 
3943   fix_p= tuple_words;
3944   fix_len= tabPtrP->m_offsets[MM].m_fix_header_size;
3945   if(req_struct->is_expanded)
3946   {
3947     typ= "expanded";
3948     var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3949     var_len= 0;                                 // No dump of varpart in expanded
3950 #if 0
3951     disk_p= (Uint32 *)req_struct->m_disk_ptr;
3952     disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3953 #endif
3954   }
3955   else if(! (bits & Tuple_header::COPY_TUPLE))
3956   {
3957     typ= "stored";
3958     if(mm_vars+mm_dyns)
3959     {
3960       //const KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3961       const Var_part_ref *varref= ptr->get_var_part_ref_ptr(tabPtrP);
3962       Ptr<Page> tmp;
3963       var_p= get_ptr(&tmp, * varref);
3964       var_len= get_len(&tmp, * varref);
3965     }
3966     else
3967     {
3968       var_p= 0;
3969       var_len= 0;
3970     }
3971 #if 0
3972     if(dd_tot)
3973     {
3974       Local_key key;
3975       memcpy(&key, ptr->get_disk_ref_ptr(tabPtrP), sizeof(key));
3976       key.m_page_no= req_struct->m_disk_page_ptr.i;
3977       disk_p= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3978       disk_len= tabPtrP->m_offsets[DD].m_fix_header_size;
3979     }
3980     else
3981     {
3982       disk_p= var_p;
3983       disk_len= 0;
3984     }
3985 #endif
3986   }
3987   else
3988   {
3989     typ= "shrunken";
3990     if(mm_vars+mm_dyns)
3991     {
3992       var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3993       var_len= *((Uint16 *)var_p) + 1;
3994     }
3995     else
3996     {
3997       var_p= 0;
3998       var_len= 0;
3999     }
4000 #if 0
4001     disk_p= (Uint32 *)(req_struct->m_disk_ptr);
4002     disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
4003 #endif
4004   }
4005   ndbout_c("Fixed part[%s](%p len=%u words)",typ, fix_p, fix_len);
4006   dump_hex(fix_p, fix_len);
4007   ndbout_c("Varpart part[%s](%p len=%u words)", typ , var_p, var_len);
4008   dump_hex(var_p, var_len);
4009 #if 0
4010   ndbout_c("Disk part[%s](%p len=%u words)", typ, disk_p, disk_len);
4011   dump_hex(disk_p, disk_len);
4012 #endif
4013 }
4014 
4015 void
prepare_read(KeyReqStruct * req_struct,Tablerec * tabPtrP,bool disk)4016 Dbtup::prepare_read(KeyReqStruct* req_struct,
4017 		    Tablerec* tabPtrP, bool disk)
4018 {
4019   Tuple_header* ptr= req_struct->m_tuple_ptr;
4020 
4021   Uint32 bits= ptr->m_header_bits;
4022   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
4023   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
4024   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
4025 
4026   const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
4027   const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
4028   const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
4029   if(mm_vars || mm_dyns)
4030   {
4031     const Uint32 *src_data= src_ptr;
4032     Uint32 src_len;
4033     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
4034     if (bits & Tuple_header::VAR_PART)
4035     {
4036       if(! (bits & Tuple_header::COPY_TUPLE))
4037       {
4038         Ptr<Page> tmp;
4039         src_data= get_ptr(&tmp, * var_ref);
4040         src_len= get_len(&tmp, * var_ref);
4041 
4042         /* If the original tuple was grown,
4043          * the old size is stored at the end. */
4044         if(bits & Tuple_header::MM_GROWN)
4045         {
4046           /**
4047            * This is when triggers read before value of update
4048            *   when original has been reallocated due to grow
4049            */
4050           ndbassert(src_len>0);
4051           src_len= src_data[src_len-1];
4052         }
4053       }
4054       else
4055       {
4056         Varpart_copy* vp = (Varpart_copy*)src_ptr;
4057         src_len = vp->m_len;
4058         src_data = vp->m_data;
4059         src_ptr++;
4060       }
4061 
4062       char* varstart;
4063       Uint32 varlen;
4064       const Uint32* dynstart;
4065       if (mm_vars)
4066       {
4067         varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
4068         varlen = ((Uint16*)src_data)[mm_vars];
4069         dynstart = ALIGN_WORD(varstart + varlen);
4070       }
4071       else
4072       {
4073         varstart = 0;
4074         varlen = 0;
4075         dynstart = src_data;
4076       }
4077 
4078       dst->m_data_ptr= varstart;
4079       dst->m_offset_array_ptr= (Uint16*)src_data;
4080       dst->m_var_len_offset= 1;
4081       dst->m_max_var_offset= varlen;
4082 
4083       Uint32 dynlen = Uint32(src_len - (dynstart - src_data));
4084       ndbassert(src_len >= (dynstart - src_data));
4085       dst->m_dyn_data_ptr= (char*)dynstart;
4086       dst->m_dyn_part_len= dynlen;
4087       // Do or not to to do
4088       // dst->m_dyn_offset_arr_ptr = dynlen ? (Uint16*)(dynstart + *(Uint8*)dynstart) : 0;
4089 
4090       /*
4091         dst->m_dyn_offset_arr_ptr and dst->m_dyn_len_offset are not used for
4092         reading the stored/shrunken format.
4093       */
4094     }
4095     else
4096     {
4097       src_len = 0;
4098       dst->m_max_var_offset = 0;
4099       dst->m_dyn_part_len = 0;
4100 #if defined VM_TRACE || defined ERROR_INSERT
4101       bzero(dst, sizeof(* dst));
4102 #endif
4103     }
4104 
4105     // disk part start after dynamic part.
4106     src_ptr+= src_len;
4107   }
4108 
4109   if(disk && dd_tot)
4110   {
4111     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
4112 
4113     if(bits & Tuple_header::DISK_INLINE)
4114     {
4115       // Only on copy tuple
4116       ndbassert(bits & Tuple_header::COPY_TUPLE);
4117     }
4118     else
4119     {
4120       // XXX
4121       Local_key key;
4122       memcpy(&key, disk_ref, sizeof(key));
4123       key.m_page_no= req_struct->m_disk_page_ptr.i;
4124       src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
4125     }
4126     // Fix diskpart
4127     req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
4128     ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
4129     ndbrequire(dd_vars == 0);
4130   }
4131 
4132   req_struct->is_expanded= false;
4133 }
4134 
4135 void
shrink_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],const Tablerec * tabPtrP,bool disk)4136 Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
4137 		    const Tablerec* tabPtrP, bool disk)
4138 {
4139   ndbassert(tabPtrP->need_shrink());
4140   Tuple_header* ptr= req_struct->m_tuple_ptr;
4141   ndbassert(ptr->m_header_bits & Tuple_header::COPY_TUPLE);
4142 
4143   KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
4144   Uint32 order_desc= tabPtrP->m_real_order_descriptor;
4145   const Uint32 * tabDesc= (Uint32*)req_struct->attr_descr;
4146   const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
4147   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
4148   Uint16 mm_fix= tabPtrP->m_attributes[MM].m_no_of_fixsize;
4149   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
4150   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
4151   Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
4152   Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
4153   Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
4154 
4155   Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
4156   Uint16* src_off_ptr= req_struct->var_pos_array;
4157   order += mm_fix;
4158 
4159   sizes[MM] = 1;
4160   sizes[DD] = 0;
4161   if(mm_vars || mm_dyns)
4162   {
4163     Varpart_copy* vp = (Varpart_copy*)dst_ptr;
4164     Uint32* varstart = dst_ptr = vp->m_data;
4165 
4166     if (mm_vars)
4167     {
4168       Uint16* dst_off_ptr= (Uint16*)dst_ptr;
4169       char*  dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
4170       char*  src_data_ptr= dst_data_ptr;
4171       Uint32 off= 0;
4172       for(Uint32 i= 0; i<mm_vars; i++)
4173       {
4174         const char* data_ptr= src_data_ptr + *src_off_ptr;
4175         Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
4176         * dst_off_ptr++= off;
4177         memmove(dst_data_ptr, data_ptr, len);
4178         off += len;
4179         src_off_ptr++;
4180         dst_data_ptr += len;
4181       }
4182       *dst_off_ptr= off;
4183       dst_ptr = ALIGN_WORD(dst_data_ptr);
4184       order += mm_vars; // Point to first dynfix entry
4185     }
4186 
4187     if (mm_dyns)
4188     {
4189       dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
4190                                 order, mm_dynvar, mm_dynfix, MM);
4191       ndbassert((char*)dst_ptr <= ((char*)ptr) + 8192);
4192       order += mm_dynfix + mm_dynvar;
4193     }
4194 
4195     Uint32 varpart_len= Uint32(dst_ptr - varstart);
4196     vp->m_len = varpart_len;
4197     sizes[MM] = varpart_len;
4198     ptr->m_header_bits |= (varpart_len) ? Tuple_header::VAR_PART : 0;
4199 
4200     ndbassert((UintPtr(ptr) & 3) == 0);
4201     ndbassert(varpart_len < 0x10000);
4202   }
4203 
4204   if(disk && dd_tot)
4205   {
4206     Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
4207     req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
4208     ndbrequire(dd_vars == 0);
4209     sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
4210     memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
4211   }
4212 
4213   req_struct->is_expanded= false;
4214 
4215 }
4216 
4217 void
validate_page(Tablerec * regTabPtr,Var_page * p)4218 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
4219 {
4220   /* ToDo: We could also do some checks here for any dynamic part. */
4221   Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
4222   Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
4223     Tuple_header::HeaderSize;
4224 
4225   if(mm_vars == 0)
4226     return;
4227 
4228   for(Uint32 F= 0; F<NDB_ARRAY_SIZE(regTabPtr->fragrec); F++)
4229   {
4230     FragrecordPtr fragPtr;
4231 
4232     if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
4233       continue;
4234 
4235     ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4236     for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
4237     {
4238       Uint32 real= getRealpid(fragPtr.p, P);
4239       Var_page* page= (Var_page*)c_page_pool.getPtr(real);
4240 
4241       for(Uint32 i=1; i<page->high_index; i++)
4242       {
4243 	Uint32 idx= page->get_index_word(i);
4244 	Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
4245 	if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
4246 	{
4247 	  Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
4248 	  Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
4249 	  if(! (ptr->m_header_bits & Tuple_header::COPY_TUPLE))
4250 	  {
4251 	    ndbrequire(len == fix_sz + 1);
4252 	    Local_key tmp; tmp.assref(*part);
4253 	    Ptr<Page> tmpPage;
4254 	    part= get_ptr(&tmpPage, *(Var_part_ref*)part);
4255 	    len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
4256 	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
4257 	    ndbrequire(len >= ((sz + 3) >> 2));
4258 	  }
4259 	  else
4260 	  {
4261 	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
4262 	    ndbrequire(len >= ((sz+3)>>2)+fix_sz);
4263 	  }
4264 	  if(ptr->m_operation_ptr_i != RNIL)
4265 	  {
4266 	    c_operation_pool.getPtr(ptr->m_operation_ptr_i);
4267 	  }
4268 	}
4269 	else if(!(idx & Var_page::FREE))
4270 	{
4271 	  /**
4272 	   * Chain
4273 	   */
4274 	  Uint32 *part= page->get_ptr(i);
4275 	  Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
4276 	  ndbrequire(len >= ((sz + 3) >> 2));
4277 	}
4278 	else
4279 	{
4280 
4281 	}
4282       }
4283       if(p == 0 && page->high_index > 1)
4284 	page->reorg((Var_page*)ctemp_page);
4285     }
4286   }
4287 
4288   if(p == 0)
4289   {
4290     validate_page(regTabPtr, (Var_page*)1);
4291   }
4292 }
4293 
4294 int
handle_size_change_after_update(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Uint32 sizes[4])4295 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
4296 				       Tuple_header* org,
4297 				       Operationrec* regOperPtr,
4298 				       Fragrecord* regFragPtr,
4299 				       Tablerec* regTabPtr,
4300 				       Uint32 sizes[4])
4301 {
4302   ndbrequire(sizes[1] == sizes[3]);
4303   //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
4304   if(0)
4305     printf("%p %d %d - handle_size_change_after_update ",
4306 	   req_struct->m_tuple_ptr,
4307 	   regOperPtr->m_tuple_location.m_page_no,
4308 	   regOperPtr->m_tuple_location.m_page_idx);
4309 
4310   Uint32 bits= org->m_header_bits;
4311   Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
4312 
4313   if(sizes[2+MM] == sizes[MM])
4314     ;
4315   else if(sizes[2+MM] < sizes[MM])
4316   {
4317     if(0) ndbout_c("shrink");
4318     req_struct->m_tuple_ptr->m_header_bits= copy_bits|Tuple_header::MM_SHRINK;
4319   }
4320   else
4321   {
4322     if(0) printf("grow - ");
4323     Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
4324     Var_page* pageP= (Var_page*)pagePtr.p;
4325     Var_part_ref *refptr= org->get_var_part_ref_ptr(regTabPtr);
4326     ndbassert(! (bits & Tuple_header::COPY_TUPLE));
4327 
4328     Local_key ref;
4329     refptr->copyout(&ref);
4330     Uint32 alloc;
4331     Uint32 idx= ref.m_page_idx;
4332     if (bits & Tuple_header::VAR_PART)
4333     {
4334       if (copy_bits & Tuple_header::COPY_TUPLE)
4335       {
4336         c_page_pool.getPtr(pagePtr, ref.m_page_no);
4337         pageP = (Var_page*)pagePtr.p;
4338       }
4339       alloc = pageP->get_entry_len(idx);
4340     }
4341     else
4342     {
4343       alloc = 0;
4344     }
4345     Uint32 orig_size= alloc;
4346     if(bits & Tuple_header::MM_GROWN)
4347     {
4348       /* Was grown before, so must fetch real original size from last word. */
4349       Uint32 *old_var_part= pageP->get_ptr(idx);
4350       ndbassert(alloc>0);
4351       orig_size= old_var_part[alloc-1];
4352     }
4353 
4354     if (alloc)
4355     {
4356 #ifdef VM_TRACE
4357       if(!pageP->get_entry_chain(idx))
4358         ndbout << *pageP << endl;
4359 #endif
4360       ndbassert(pageP->get_entry_chain(idx));
4361     }
4362 
4363     Uint32 needed= sizes[2+MM];
4364 
4365     if(needed <= alloc)
4366     {
4367       //ndbassert(!regOperPtr->is_first_operation());
4368       if (0) ndbout_c(" no grow");
4369       return 0;
4370     }
4371     Uint32 *new_var_part=realloc_var_part(&terrorCode,
4372                                           regFragPtr, regTabPtr, pagePtr,
4373                                           refptr, alloc, needed);
4374     if (unlikely(new_var_part==NULL))
4375       return -1;
4376     /* Mark the tuple grown, store the original length at the end. */
4377     org->m_header_bits= bits | Tuple_header::MM_GROWN | Tuple_header::VAR_PART;
4378     new_var_part[needed-1]= orig_size;
4379 
4380     /**
4381      * Here we can change both header bits and the reference to the varpart,
4382      * this means that we need to completely recalculate the checksum here.
4383      */
4384     setChecksum(org, regTabPtr);
4385   }
4386   return 0;
4387 }
4388 
4389 int
optimize_var_part(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)4390 Dbtup::optimize_var_part(KeyReqStruct* req_struct,
4391                          Tuple_header* org,
4392                          Operationrec* regOperPtr,
4393                          Fragrecord* regFragPtr,
4394                          Tablerec* regTabPtr)
4395 {
4396   jam();
4397   Var_part_ref* refptr = org->get_var_part_ref_ptr(regTabPtr);
4398 
4399   Local_key ref;
4400   refptr->copyout(&ref);
4401   Uint32 idx = ref.m_page_idx;
4402 
4403   Ptr<Page> pagePtr;
4404   c_page_pool.getPtr(pagePtr, ref.m_page_no);
4405 
4406   Var_page* pageP = (Var_page*)pagePtr.p;
4407   Uint32 var_part_size = pageP->get_entry_len(idx);
4408 
4409   /**
4410    * if the size of page list_index is MAX_FREE_LIST,
4411    * we think it as full page, then need not optimize
4412    */
4413   if(pageP->list_index != MAX_FREE_LIST)
4414   {
4415     jam();
4416     /*
4417      * optimize var part of tuple by moving varpart,
4418      * then we possibly reclaim free pages
4419      */
4420     move_var_part(regFragPtr, regTabPtr, pagePtr,
4421                   refptr, var_part_size);
4422     setChecksum(org, regTabPtr);
4423   }
4424 
4425   return 0;
4426 }
4427 
4428 int
nr_update_gci(Uint32 fragPtrI,const Local_key * key,Uint32 gci)4429 Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4430 {
4431   FragrecordPtr fragPtr;
4432   fragPtr.i= fragPtrI;
4433   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4434   TablerecPtr tablePtr;
4435   tablePtr.i= fragPtr.p->fragTableId;
4436   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4437 
4438   if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4439   {
4440     Local_key tmp = *key;
4441     PagePtr pagePtr;
4442 
4443     pagePtr.i = getRealpidCheck(fragPtr.p, tmp.m_page_no);
4444     if (unlikely(pagePtr.i == RNIL))
4445     {
4446       jam();
4447       return 0;
4448     }
4449 
4450     c_page_pool.getPtr(pagePtr);
4451 
4452     Tuple_header* ptr = (Tuple_header*)
4453       ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
4454 
4455     ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
4456     *ptr->get_mm_gci(tablePtr.p) = gci;
4457   }
4458   return 0;
4459 }
4460 
4461 int
nr_read_pk(Uint32 fragPtrI,const Local_key * key,Uint32 * dst,bool & copy)4462 Dbtup::nr_read_pk(Uint32 fragPtrI,
4463 		  const Local_key* key, Uint32* dst, bool& copy)
4464 {
4465 
4466   FragrecordPtr fragPtr;
4467   fragPtr.i= fragPtrI;
4468   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4469   TablerecPtr tablePtr;
4470   tablePtr.i= fragPtr.p->fragTableId;
4471   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4472 
4473   Local_key tmp = *key;
4474 
4475   PagePtr pagePtr;
4476   pagePtr.i = getRealpidCheck(fragPtr.p, tmp.m_page_no);
4477   if (unlikely(pagePtr.i == RNIL))
4478   {
4479     jam();
4480     dst[0] = 0;
4481     return 0;
4482   }
4483 
4484   c_page_pool.getPtr(pagePtr);
4485   KeyReqStruct req_struct(this);
4486   Uint32* ptr= ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
4487 
4488   req_struct.m_page_ptr = pagePtr;
4489   req_struct.m_tuple_ptr = (Tuple_header*)ptr;
4490   Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
4491 
4492   int ret = 0;
4493   copy = false;
4494   if (! (bits & Tuple_header::FREE))
4495   {
4496     if (bits & Tuple_header::ALLOC)
4497     {
4498       Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
4499       Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
4500       ndbassert(!opPtrP->m_copy_tuple_location.isNull());
4501       req_struct.m_tuple_ptr=
4502         get_copy_tuple(&opPtrP->m_copy_tuple_location);
4503       copy = true;
4504     }
4505     req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
4506     req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
4507 
4508     Uint32 num_attr= tablePtr.p->m_no_of_attributes;
4509     Uint32 descr_start= tablePtr.p->tabDescriptor;
4510     TableDescriptor *tab_descr= &tableDescriptor[descr_start];
4511     ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
4512     req_struct.attr_descr= tab_descr;
4513 
4514     if (tablePtr.p->need_expand())
4515       prepare_read(&req_struct, tablePtr.p, false);
4516 
4517     const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
4518     const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
4519     // read pk attributes from original tuple
4520 
4521     req_struct.tablePtrP = tablePtr.p;
4522     req_struct.fragPtrP = fragPtr.p;
4523 
4524     // do it
4525     ret = readAttributes(&req_struct,
4526 			 attrIds,
4527 			 numAttrs,
4528 			 dst,
4529 			 ZNIL, false);
4530 
4531     // done
4532     if (likely(ret >= 0)) {
4533       // remove headers
4534       Uint32 n= 0;
4535       Uint32 i= 0;
4536       while (n < numAttrs) {
4537 	const AttributeHeader ah(dst[i]);
4538 	Uint32 size= ah.getDataSize();
4539 	ndbrequire(size != 0);
4540 	for (Uint32 j= 0; j < size; j++) {
4541 	  dst[i + j - n]= dst[i + j + 1];
4542 	}
4543 	n+= 1;
4544 	i+= 1 + size;
4545       }
4546       ndbrequire((int)i == ret);
4547       ret -= numAttrs;
4548     } else {
4549       return ret;
4550     }
4551   }
4552 
4553   if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4554   {
4555     dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
4556   }
4557   else
4558   {
4559     dst[ret] = 0;
4560   }
4561   return ret;
4562 }
4563 
4564 int
nr_delete(Signal * signal,Uint32 senderData,Uint32 fragPtrI,const Local_key * key,Uint32 gci)4565 Dbtup::nr_delete(Signal* signal, Uint32 senderData,
4566 		 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4567 {
4568   FragrecordPtr fragPtr;
4569   fragPtr.i= fragPtrI;
4570   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4571   TablerecPtr tablePtr;
4572   tablePtr.i= fragPtr.p->fragTableId;
4573   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4574 
4575   Local_key tmp = * key;
4576   tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no);
4577 
4578   PagePtr pagePtr;
4579   Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
4580 
4581   if (!tablePtr.p->tuxCustomTriggers.isEmpty())
4582   {
4583     jam();
4584     TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
4585     req->tableId = fragPtr.p->fragTableId;
4586     req->fragId = fragPtr.p->fragmentId;
4587     req->pageId = tmp.m_page_no;
4588     req->pageIndex = tmp.m_page_idx;
4589     req->tupVersion = ptr->get_tuple_version();
4590     req->opInfo = TuxMaintReq::OpRemove;
4591     removeTuxEntries(signal, tablePtr.p);
4592   }
4593 
4594   Local_key disk;
4595   memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
4596 
4597   if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
4598       tablePtr.p->m_attributes[MM].m_no_of_dynamic)
4599   {
4600     jam();
4601     free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
4602   } else {
4603     jam();
4604     free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
4605   }
4606 
4607   if (tablePtr.p->m_no_of_disk_attributes)
4608   {
4609     jam();
4610 
4611     Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4612       tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4613 
4614     D("Logfile_client - nr_delete");
4615     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4616     int res = lgman.alloc_log_space(sz, jamBuffer());
4617     ndbrequire(res == 0);
4618 
4619     /**
4620      * 1) alloc log buffer
4621      * 2) get page
4622      * 3) get log buffer
4623      * 4) delete tuple
4624      */
4625     Page_cache_client::Request preq;
4626     preq.m_page = disk;
4627     preq.m_callback.m_callbackData = senderData;
4628     preq.m_callback.m_callbackFunction =
4629       safe_cast(&Dbtup::nr_delete_page_callback);
4630     int flags = Page_cache_client::COMMIT_REQ;
4631 
4632 #ifdef ERROR_INSERT
4633     if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
4634     {
4635       int rnd = rand() % 100;
4636       int slp = 0;
4637       if (ERROR_INSERTED(4024))
4638       {
4639 	slp = 3000;
4640       }
4641       else if (rnd > 90)
4642       {
4643 	slp = 3000;
4644       }
4645       else if (rnd > 70)
4646       {
4647 	slp = 100;
4648       }
4649 
4650       ndbout_c("rnd: %d slp: %d", rnd, slp);
4651 
4652       if (slp)
4653       {
4654 	flags |= Page_cache_client::DELAY_REQ;
4655         const NDB_TICKS now = NdbTick_getCurrentTicks();
4656         preq.m_delay_until_time = NdbTick_AddMilliseconds(now,(Uint64)slp);
4657       }
4658     }
4659 #endif
4660     Ptr<GlobalPage> diskPagePtr;
4661     Page_cache_client pgman(this, c_pgman);
4662     res = pgman.get_page(signal, preq, flags);
4663     diskPagePtr = pgman.m_ptr;
4664     if (res == 0)
4665     {
4666       goto timeslice;
4667     }
4668     else if (unlikely(res == -1))
4669     {
4670       return -1;
4671     }
4672 
4673     PagePtr disk_page((Tup_page*)diskPagePtr.p, diskPagePtr.i);
4674     disk_page_set_dirty(disk_page);
4675 
4676     CallbackPtr cptr;
4677     cptr.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4678     cptr.m_callbackData = senderData;
4679     res= lgman.get_log_buffer(signal, sz, &cptr);
4680     switch(res){
4681     case 0:
4682       signal->theData[2] = disk_page.i;
4683       goto timeslice;
4684     case -1:
4685       ndbrequire("NOT YET IMPLEMENTED" == 0);
4686       break;
4687     }
4688 
4689     if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
4690     disk_page_free(signal, tablePtr.p, fragPtr.p,
4691 		   &disk, *(PagePtr*)&disk_page, gci);
4692     return 0;
4693   }
4694 
4695   return 0;
4696 
4697 timeslice:
4698   memcpy(signal->theData, &disk, sizeof(disk));
4699   return 1;
4700 }
4701 
4702 void
nr_delete_page_callback(Signal * signal,Uint32 userpointer,Uint32 page_id)4703 Dbtup::nr_delete_page_callback(Signal* signal,
4704 			       Uint32 userpointer, Uint32 page_id)//unused
4705 {
4706   Ptr<GlobalPage> gpage;
4707   m_global_page_pool.getPtr(gpage, page_id);
4708   PagePtr pagePtr((Tup_page*)gpage.p, gpage.i);
4709   disk_page_set_dirty(pagePtr);
4710   Dblqh::Nr_op_info op;
4711   op.m_ptr_i = userpointer;
4712   op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
4713   op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
4714   c_lqh->get_nr_op_info(&op, page_id);
4715 
4716   Ptr<Fragrecord> fragPtr;
4717   fragPtr.i= op.m_tup_frag_ptr_i;
4718   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4719 
4720   Ptr<Tablerec> tablePtr;
4721   tablePtr.i = fragPtr.p->fragTableId;
4722   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4723 
4724   Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4725     tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4726 
4727   CallbackPtr cb;
4728   cb.m_callbackData = userpointer;
4729   cb.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4730   D("Logfile_client - nr_delete_page_callback");
4731   Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4732   int res= lgman.get_log_buffer(signal, sz, &cb);
4733   switch(res){
4734   case 0:
4735     jam();
4736     return;
4737   case -1:
4738     ndbrequire("NOT YET IMPLEMENTED" == 0);
4739     break;
4740   }
4741   jam();
4742 
4743   if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4744   disk_page_free(signal, tablePtr.p, fragPtr.p,
4745 		 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4746 
4747   c_lqh->nr_delete_complete(signal, &op);
4748   return;
4749 }
4750 
4751 void
nr_delete_log_buffer_callback(Signal * signal,Uint32 userpointer,Uint32 unused)4752 Dbtup::nr_delete_log_buffer_callback(Signal* signal,
4753 				    Uint32 userpointer,
4754 				    Uint32 unused)
4755 {
4756   Dblqh::Nr_op_info op;
4757   op.m_ptr_i = userpointer;
4758   c_lqh->get_nr_op_info(&op, RNIL);
4759 
4760   Ptr<Fragrecord> fragPtr;
4761   fragPtr.i= op.m_tup_frag_ptr_i;
4762   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4763 
4764   Ptr<Tablerec> tablePtr;
4765   tablePtr.i = fragPtr.p->fragTableId;
4766   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4767 
4768   Ptr<GlobalPage> gpage;
4769   m_global_page_pool.getPtr(gpage, op.m_page_id);
4770   PagePtr pagePtr((Tup_page*)gpage.p, gpage.i);
4771 
4772   /**
4773    * reset page no
4774    */
4775   if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4776   jam();
4777   disk_page_free(signal, tablePtr.p, fragPtr.p,
4778 		 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4779 
4780   c_lqh->nr_delete_complete(signal, &op);
4781 }
4782