1 /*
2    Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #define DBTUP_C
27 #include <dblqh/Dblqh.hpp>
28 #include "Dbtup.hpp"
29 #include <RefConvert.hpp>
30 #include <ndb_limits.h>
31 #include <pc.hpp>
32 #include <AttributeDescriptor.hpp>
33 #include "AttributeOffset.hpp"
34 #include <AttributeHeader.hpp>
35 #include <Interpreter.hpp>
36 #include <signaldata/TupKey.hpp>
37 #include <signaldata/AttrInfo.hpp>
38 #include <NdbSqlUtil.hpp>
39 
40 // #define TRACE_INTERPRETER
41 
42 /* For debugging */
43 static void
dump_hex(const Uint32 * p,Uint32 len)44 dump_hex(const Uint32 *p, Uint32 len)
45 {
46   if(len > 2560)
47     len= 160;
48   if(len==0)
49     return;
50   for(;;)
51   {
52     if(len>=4)
53       ndbout_c("%8p %08X %08X %08X %08X", p, p[0], p[1], p[2], p[3]);
54     else if(len>=3)
55       ndbout_c("%8p %08X %08X %08X", p, p[0], p[1], p[2]);
56     else if(len>=2)
57       ndbout_c("%8p %08X %08X", p, p[0], p[1]);
58     else
59       ndbout_c("%8p %08X", p, p[0]);
60     if(len <= 4)
61       break;
62     len-= 4;
63     p+= 4;
64   }
65 }
66 
67 /**
68  * getStoredProcAttrInfo
69  *
70  * Get the I-Val of the supplied stored procedure's
71  * AttrInfo section
72  * Initialise the AttrInfo length in the request
73  */
getStoredProcAttrInfo(Uint32 storedId,KeyReqStruct * req_struct,Uint32 & attrInfoIVal)74 int Dbtup::getStoredProcAttrInfo(Uint32 storedId,
75                                  KeyReqStruct* req_struct,
76                                  Uint32& attrInfoIVal)
77 {
78   jam();
79   StoredProcPtr storedPtr;
80   c_storedProcPool.getPtr(storedPtr, storedId);
81   if (storedPtr.i != RNIL) {
82     if ((storedPtr.p->storedCode == ZSCAN_PROCEDURE) ||
83         (storedPtr.p->storedCode == ZCOPY_PROCEDURE)) {
84       /* Setup OperationRec with stored procedure AttrInfo section */
85       SegmentedSectionPtr sectionPtr;
86       getSection(sectionPtr, storedPtr.p->storedProcIVal);
87       Uint32 storedProcLen= sectionPtr.sz;
88 
89       ndbassert( attrInfoIVal == RNIL );
90       attrInfoIVal= storedPtr.p->storedProcIVal;
91       req_struct->attrinfo_len= storedProcLen;
92       return ZOK;
93     }
94   }
95   terrorCode= ZSTORED_PROC_ID_ERROR;
96   return terrorCode;
97 }
98 
copyAttrinfo(Operationrec * regOperPtr,Uint32 * inBuffer,Uint32 expectedLen,Uint32 attrInfoIVal)99 void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
100                          Uint32* inBuffer,
101                          Uint32 expectedLen,
102                          Uint32 attrInfoIVal)
103 {
104   ndbassert( expectedLen > 0 || attrInfoIVal == RNIL );
105 
106   if (expectedLen > 0)
107   {
108     ndbassert( attrInfoIVal != RNIL );
109 
110     /* Check length in section is as we expect */
111     SegmentedSectionPtr sectionPtr;
112     getSection(sectionPtr, attrInfoIVal);
113 
114     ndbrequire(sectionPtr.sz == expectedLen);
115     ndbrequire(sectionPtr.sz < ZATTR_BUFFER_SIZE);
116 
117     /* Copy attrInfo data into linear buffer */
118     // TODO : Consider operating TUP out of first segment where
119     // appropriate
120     copy(inBuffer, attrInfoIVal);
121   }
122 
123   regOperPtr->m_any_value= 0;
124 
125   return;
126 }
127 
128 void
setChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)129 Dbtup::setChecksum(Tuple_header* tuple_ptr,
130                    Tablerec* regTabPtr)
131 {
132   tuple_ptr->m_checksum= 0;
133   tuple_ptr->m_checksum= calculateChecksum(tuple_ptr, regTabPtr);
134 }
135 
136 Uint32
calculateChecksum(Tuple_header * tuple_ptr,Tablerec * regTabPtr)137 Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
138                          Tablerec* regTabPtr)
139 {
140   Uint32 checksum;
141   Uint32 i, rec_size, *tuple_header;
142   rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
143   tuple_header= tuple_ptr->m_data;
144   checksum= 0;
145   // includes tupVersion
146   //printf("%p - ", tuple_ptr);
147 
148   for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
149     checksum ^= tuple_header[i];
150     //printf("%.8x ", tuple_header[i]);
151   }
152 
153   //printf("-> %.8x\n", checksum);
154 
155 #if 0
156   if (var_sized) {
157     /*
158     if (! req_struct->fix_var_together) {
159       jam();
160       checksum ^= tuple_header[rec_size];
161     }
162     */
163     jam();
164     var_data_part= req_struct->var_data_start;
165     vsize_words= calculate_total_var_size(req_struct->var_len_array,
166                                           regTabPtr->no_var_attr);
167     ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
168     for (i= 0; i < vsize_words; i++) {
169       checksum ^= var_data_part[i];
170     }
171   }
172 #endif
173   return checksum;
174 }
175 
176 /* ----------------------------------------------------------------- */
177 /* -----------       INSERT_ACTIVE_OP_LIST            -------------- */
178 /* ----------------------------------------------------------------- */
179 bool
insertActiveOpList(OperationrecPtr regOperPtr,KeyReqStruct * req_struct)180 Dbtup::insertActiveOpList(OperationrecPtr regOperPtr,
181 			  KeyReqStruct* req_struct)
182 {
183   OperationrecPtr prevOpPtr;
184   ndbrequire(!regOperPtr.p->op_struct.in_active_list);
185   regOperPtr.p->op_struct.in_active_list= true;
186   req_struct->prevOpPtr.i=
187     prevOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
188   regOperPtr.p->prevActiveOp= prevOpPtr.i;
189   regOperPtr.p->nextActiveOp= RNIL;
190   regOperPtr.p->m_undo_buffer_space= 0;
191   req_struct->m_tuple_ptr->m_operation_ptr_i= regOperPtr.i;
192   if (prevOpPtr.i == RNIL) {
193     return true;
194   } else {
195     req_struct->prevOpPtr.p= prevOpPtr.p= c_operation_pool.getPtr(prevOpPtr.i);
196     prevOpPtr.p->nextActiveOp= regOperPtr.i;
197 
198     regOperPtr.p->op_struct.m_wait_log_buffer=
199       prevOpPtr.p->op_struct.m_wait_log_buffer;
200     regOperPtr.p->op_struct.m_load_diskpage_on_commit=
201       prevOpPtr.p->op_struct.m_load_diskpage_on_commit;
202     regOperPtr.p->op_struct.m_gci_written=
203       prevOpPtr.p->op_struct.m_gci_written;
204     regOperPtr.p->m_undo_buffer_space= prevOpPtr.p->m_undo_buffer_space;
205     // start with prev mask (matters only for UPD o UPD)
206 
207     regOperPtr.p->m_any_value = prevOpPtr.p->m_any_value;
208 
209     prevOpPtr.p->op_struct.m_wait_log_buffer= 0;
210     prevOpPtr.p->op_struct.m_load_diskpage_on_commit= 0;
211 
212     if(prevOpPtr.p->op_struct.tuple_state == TUPLE_PREPARED)
213     {
214       Uint32 op= regOperPtr.p->op_struct.op_type;
215       Uint32 prevOp= prevOpPtr.p->op_struct.op_type;
216       if (prevOp == ZDELETE)
217       {
218 	if(op == ZINSERT)
219 	{
220 	  // mark both
221 	  prevOpPtr.p->op_struct.delete_insert_flag= true;
222 	  regOperPtr.p->op_struct.delete_insert_flag= true;
223 	  return true;
224 	}
225         else if (op == ZREFRESH)
226         {
227           /* ZREFRESH after Delete - ok */
228           return true;
229         }
230         else
231         {
232 	  terrorCode= ZTUPLE_DELETED_ERROR;
233 	  return false;
234 	}
235       }
236       else if(op == ZINSERT && prevOp != ZDELETE)
237       {
238 	terrorCode= ZINSERT_ERROR;
239 	return false;
240       }
241       else if (prevOp == ZREFRESH)
242       {
243         /* No operation after a ZREFRESH */
244         terrorCode= ZOP_AFTER_REFRESH_ERROR;
245         return false;
246       }
247       return true;
248     }
249     else
250     {
251       terrorCode= ZMUST_BE_ABORTED_ERROR;
252       return false;
253     }
254   }
255 }
256 
257 bool
setup_read(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,bool disk)258 Dbtup::setup_read(KeyReqStruct *req_struct,
259 		  Operationrec* regOperPtr,
260 		  Fragrecord* regFragPtr,
261 		  Tablerec* regTabPtr,
262 		  bool disk)
263 {
264   OperationrecPtr currOpPtr;
265   currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
266   Uint32 bits = req_struct->m_tuple_ptr->m_header_bits;
267 
268   if (unlikely(req_struct->m_reorg))
269   {
270     Uint32 moved = bits & Tuple_header::REORG_MOVE;
271     if (! ((req_struct->m_reorg == 1 && moved == 0) ||
272            (req_struct->m_reorg == 2 && moved != 0)))
273     {
274       terrorCode= ZTUPLE_DELETED_ERROR;
275       return false;
276     }
277   }
278   if (currOpPtr.i == RNIL)
279   {
280     if (regTabPtr->need_expand(disk))
281       prepare_read(req_struct, regTabPtr, disk);
282     return true;
283   }
284 
285   do {
286     Uint32 savepointId= regOperPtr->savepointId;
287     bool dirty= req_struct->dirty_op;
288 
289     c_operation_pool.getPtr(currOpPtr);
290     bool sameTrans= c_lqh->is_same_trans(currOpPtr.p->userpointer,
291 					 req_struct->trans_id1,
292 					 req_struct->trans_id2);
293     /**
294      * Read committed in same trans reads latest copy
295      */
296     if(dirty && !sameTrans)
297     {
298       savepointId= 0;
299     }
300     else if(sameTrans)
301     {
302       // Use savepoint even in read committed mode
303       dirty= false;
304     }
305 
306     /* found == true indicates that savepoint is some state
307      * within tuple's current transaction's uncommitted operations
308      */
309     bool found= find_savepoint(currOpPtr, savepointId);
310 
311     Uint32 currOp= currOpPtr.p->op_struct.op_type;
312 
313     /* is_insert==true if tuple did not exist before its current
314      * transaction
315      */
316     bool is_insert = (bits & Tuple_header::ALLOC);
317 
318     /* If savepoint is in transaction, and post-delete-op
319      *   OR
320      * Tuple didn't exist before
321      *      AND
322      *   Read is dirty
323      *           OR
324      *   Savepoint is before-transaction
325      *
326      * Tuple does not exist in read's view
327      */
328     if((found && currOp == ZDELETE) ||
329        ((dirty || !found) && is_insert))
330     {
331       /* Tuple not visible to this read operation */
332       terrorCode= ZTUPLE_DELETED_ERROR;
333       break;
334     }
335 
336     if(dirty || !found)
337     {
338       /* Read existing committed tuple */
339     }
340     else
341     {
342       req_struct->m_tuple_ptr=
343         get_copy_tuple(&currOpPtr.p->m_copy_tuple_location);
344     }
345 
346     if (regTabPtr->need_expand(disk))
347       prepare_read(req_struct, regTabPtr, disk);
348 
349 #if 0
350     ndbout_c("reading copy");
351     Uint32 *var_ptr = fixed_ptr+regTabPtr->var_offset;
352     req_struct->m_tuple_ptr= fixed_ptr;
353     req_struct->fix_var_together= true;
354     req_struct->var_len_array= (Uint16*)var_ptr;
355     req_struct->var_data_start= var_ptr+regTabPtr->var_array_wsize;
356     Uint32 var_sz32= init_var_pos_array((Uint16*)var_ptr,
357 					req_struct->var_pos_array,
358 					regTabPtr->no_var_attr);
359     req_struct->var_data_end= var_ptr+regTabPtr->var_array_wsize + var_sz32;
360 #endif
361     return true;
362   } while(0);
363 
364   return false;
365 }
366 
367 int
load_diskpage(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)368 Dbtup::load_diskpage(Signal* signal,
369 		     Uint32 opRec, Uint32 fragPtrI,
370 		     Uint32 lkey1, Uint32 lkey2, Uint32 flags)
371 {
372   Ptr<Tablerec> tabptr;
373   Ptr<Fragrecord> fragptr;
374   Ptr<Operationrec> operPtr;
375 
376   c_operation_pool.getPtr(operPtr, opRec);
377   fragptr.i= fragPtrI;
378   ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
379 
380   Operationrec *  regOperPtr= operPtr.p;
381   Fragrecord * regFragPtr= fragptr.p;
382 
383   tabptr.i = regFragPtr->fragTableId;
384   ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
385   Tablerec* regTabPtr = tabptr.p;
386 
387   if (Local_key::ref(lkey1, lkey2) == ~(Uint32)0)
388   {
389     jam();
390     regOperPtr->op_struct.m_wait_log_buffer= 1;
391     regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
392     if (unlikely((flags & 7) == ZREFRESH))
393     {
394       jam();
395       /* Refresh of previously nonexistant DD tuple.
396        * No diskpage to load at commit time
397        */
398       regOperPtr->op_struct.m_wait_log_buffer= 0;
399       regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
400     }
401 
402     /* In either case return 1 for 'proceed' */
403     return 1;
404   }
405 
406   jam();
407   Uint32 page_idx= lkey2;
408   Uint32 frag_page_id= lkey1;
409   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
410 						     frag_page_id);
411   regOperPtr->m_tuple_location.m_page_idx= page_idx;
412 
413   PagePtr page_ptr;
414   Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
415   Tuple_header* ptr= (Tuple_header*)tmp;
416 
417   int res= 1;
418   if(ptr->m_header_bits & Tuple_header::DISK_PART)
419   {
420     Page_cache_client::Request req;
421     memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
422     req.m_callback.m_callbackData= opRec;
423     req.m_callback.m_callbackFunction=
424       safe_cast(&Dbtup::disk_page_load_callback);
425 
426 #ifdef ERROR_INSERT
427     if (ERROR_INSERTED(4022))
428     {
429       flags |= Page_cache_client::DELAY_REQ;
430       req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
431     }
432 #endif
433 
434     Page_cache_client pgman(this, c_pgman);
435     res= pgman.get_page(signal, req, flags);
436     m_pgman_ptr = pgman.m_ptr;
437     if(res > 0)
438     {
439       //ndbout_c("in cache");
440       // In cache
441     }
442     else if(res == 0)
443     {
444       //ndbout_c("waiting for callback");
445       // set state
446     }
447     else
448     {
449       // Error
450     }
451   }
452 
453   switch(flags & 7)
454   {
455   case ZREAD:
456   case ZREAD_EX:
457     break;
458   case ZDELETE:
459   case ZUPDATE:
460   case ZINSERT:
461   case ZWRITE:
462   case ZREFRESH:
463     regOperPtr->op_struct.m_wait_log_buffer= 1;
464     regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
465   }
466   return res;
467 }
468 
469 void
disk_page_load_callback(Signal * signal,Uint32 opRec,Uint32 page_id)470 Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
471 {
472   Ptr<Operationrec> operPtr;
473   c_operation_pool.getPtr(operPtr, opRec);
474   c_lqh->acckeyconf_load_diskpage_callback(signal,
475 					   operPtr.p->userpointer, page_id);
476 }
477 
478 int
load_diskpage_scan(Signal * signal,Uint32 opRec,Uint32 fragPtrI,Uint32 lkey1,Uint32 lkey2,Uint32 flags)479 Dbtup::load_diskpage_scan(Signal* signal,
480 			  Uint32 opRec, Uint32 fragPtrI,
481 			  Uint32 lkey1, Uint32 lkey2, Uint32 flags)
482 {
483   Ptr<Tablerec> tabptr;
484   Ptr<Fragrecord> fragptr;
485   Ptr<Operationrec> operPtr;
486 
487   c_operation_pool.getPtr(operPtr, opRec);
488   fragptr.i= fragPtrI;
489   ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
490 
491   Operationrec *  regOperPtr= operPtr.p;
492   Fragrecord * regFragPtr= fragptr.p;
493 
494   tabptr.i = regFragPtr->fragTableId;
495   ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
496   Tablerec* regTabPtr = tabptr.p;
497 
498   jam();
499   Uint32 page_idx= lkey2;
500   Uint32 frag_page_id= lkey1;
501   regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
502 						     frag_page_id);
503   regOperPtr->m_tuple_location.m_page_idx= page_idx;
504   regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
505 
506   PagePtr page_ptr;
507   Uint32* tmp= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
508   Tuple_header* ptr= (Tuple_header*)tmp;
509 
510   int res= 1;
511   if(ptr->m_header_bits & Tuple_header::DISK_PART)
512   {
513     Page_cache_client::Request req;
514     memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
515     req.m_callback.m_callbackData= opRec;
516     req.m_callback.m_callbackFunction=
517       safe_cast(&Dbtup::disk_page_load_scan_callback);
518 
519     Page_cache_client pgman(this, c_pgman);
520     res= pgman.get_page(signal, req, flags);
521     m_pgman_ptr = pgman.m_ptr;
522     if(res > 0)
523     {
524       // ndbout_c("in cache");
525       // In cache
526     }
527     else if(res == 0)
528     {
529       //ndbout_c("waiting for callback");
530       // set state
531     }
532     else
533     {
534       // Error
535     }
536   }
537   return res;
538 }
539 
540 void
disk_page_load_scan_callback(Signal * signal,Uint32 opRec,Uint32 page_id)541 Dbtup::disk_page_load_scan_callback(Signal* signal,
542 				    Uint32 opRec, Uint32 page_id)
543 {
544   Ptr<Operationrec> operPtr;
545   c_operation_pool.getPtr(operPtr, opRec);
546   c_lqh->next_scanconf_load_diskpage_callback(signal,
547 					      operPtr.p->userpointer, page_id);
548 }
549 
execTUPKEYREQ(Signal * signal)550 void Dbtup::execTUPKEYREQ(Signal* signal)
551 {
552    TupKeyReq * tupKeyReq= (TupKeyReq *)signal->getDataPtr();
553    Ptr<Tablerec> tabptr;
554    Ptr<Fragrecord> fragptr;
555    Ptr<Operationrec> operPtr;
556    KeyReqStruct req_struct(this);
557    Uint32 sig1, sig2, sig3, sig4;
558 
559    Uint32 RoperPtr= tupKeyReq->connectPtr;
560    Uint32 Rfragptr= tupKeyReq->fragPtr;
561 
562    Uint32 RnoOfFragrec= cnoOfFragrec;
563    Uint32 RnoOfTablerec= cnoOfTablerec;
564 
565    jamEntry();
566    fragptr.i= Rfragptr;
567 
568    ndbrequire(Rfragptr < RnoOfFragrec);
569 
570    c_operation_pool.getPtr(operPtr, RoperPtr);
571    ptrAss(fragptr, fragrecord);
572 
573    Uint32 TrequestInfo= tupKeyReq->request;
574 
575    Operationrec *  regOperPtr= operPtr.p;
576    Fragrecord * regFragPtr= fragptr.p;
577 
578    tabptr.i = regFragPtr->fragTableId;
579    ptrCheckGuard(tabptr, RnoOfTablerec, tablerec);
580    Tablerec* regTabPtr = tabptr.p;
581 
582    req_struct.tablePtrP = tabptr.p;
583    req_struct.fragPtrP = fragptr.p;
584    req_struct.operPtrP = operPtr.p;
585    req_struct.signal= signal;
586    req_struct.dirty_op= TrequestInfo & 1;
587    req_struct.interpreted_exec= (TrequestInfo >> 10) & 1;
588    req_struct.no_fired_triggers= 0;
589    req_struct.read_length= 0;
590    req_struct.last_row= false;
591    req_struct.changeMask.clear();
592    req_struct.m_is_lcp = false;
593 
594    if (unlikely(get_trans_state(regOperPtr) != TRANS_IDLE))
595    {
596      TUPKEY_abort(&req_struct, 39);
597      return;
598    }
599 
600  /* ----------------------------------------------------------------- */
601  // Operation is ZREAD when we arrive here so no need to worry about the
602  // abort process.
603  /* ----------------------------------------------------------------- */
604  /* -----------    INITIATE THE OPERATION RECORD       -------------- */
605  /* ----------------------------------------------------------------- */
606    Uint32 Rstoredid= tupKeyReq->storedProcedure;
607 
608    regOperPtr->fragmentPtr= Rfragptr;
609    regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0x7;
610    regOperPtr->op_struct.delete_insert_flag = false;
611    regOperPtr->op_struct.m_reorg = (TrequestInfo >> 12) & 3;
612 
613    regOperPtr->m_copy_tuple_location.setNull();
614    regOperPtr->tupVersion= ZNIL;
615 
616    sig1= tupKeyReq->savePointId;
617    sig2= tupKeyReq->primaryReplica;
618    sig3= tupKeyReq->keyRef2;
619 
620    regOperPtr->savepointId= sig1;
621    regOperPtr->op_struct.primary_replica= sig2;
622    Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
623 
624    sig1= tupKeyReq->opRef;
625    sig2= tupKeyReq->tcOpIndex;
626    sig3= tupKeyReq->coordinatorTC;
627    sig4= tupKeyReq->keyRef1;
628 
629    req_struct.tc_operation_ptr= sig1;
630    req_struct.TC_index= sig2;
631    req_struct.TC_ref= sig3;
632    Uint32 pageid = req_struct.frag_page_id= sig4;
633    req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
634    req_struct.m_reorg = (TrequestInfo >> 12) & 3;
635 
636    sig1= tupKeyReq->attrBufLen;
637    sig2= tupKeyReq->applRef;
638    sig3= tupKeyReq->transId1;
639    sig4= tupKeyReq->transId2;
640 
641    Uint32 disk_page= tupKeyReq->disk_page;
642 
643    req_struct.log_size= sig1;
644    req_struct.attrinfo_len= sig1;
645    req_struct.rec_blockref= sig2;
646    req_struct.trans_id1= sig3;
647    req_struct.trans_id2= sig4;
648    req_struct.m_disk_page_ptr.i= disk_page;
649 
650    sig1 = tupKeyReq->m_row_id_page_no;
651    sig2 = tupKeyReq->m_row_id_page_idx;
652    sig3 = tupKeyReq->deferred_constraints;
653 
654    req_struct.m_row_id.m_page_no = sig1;
655    req_struct.m_row_id.m_page_idx = sig2;
656    req_struct.m_deferred_constraints = sig3;
657 
658    /* Get AttrInfo section if this is a long TUPKEYREQ */
659    Uint32 attrInfoIVal= tupKeyReq->attrInfoIVal;
660 
661    /* If we have AttrInfo, check we expected it, and
662     * that we don't have AttrInfo by another means
663     */
664    ndbassert( (attrInfoIVal == RNIL) ||
665               (tupKeyReq->attrBufLen > 0));
666 
667    Uint32 Roptype = regOperPtr->op_struct.op_type;
668 
669    if (Rstoredid != ZNIL) {
670      /* This is part of a scan, get attrInfoIVal for
671       * given stored procedure
672       */
673      ndbrequire(getStoredProcAttrInfo(Rstoredid,
674                                       &req_struct,
675                                       attrInfoIVal) == ZOK);
676    }
677 
678    /* Copy AttrInfo from section into linear in-buffer */
679    copyAttrinfo(regOperPtr,
680                 &cinBuffer[0],
681                 req_struct.attrinfo_len,
682                 attrInfoIVal);
683 
684    regOperPtr->op_struct.m_gci_written = 0;
685 
686    if (Roptype == ZINSERT && Local_key::isInvalid(pageid, pageidx))
687    {
688      // No tuple allocated yet
689      goto do_insert;
690    }
691 
692    if (Roptype == ZREFRESH && Local_key::isInvalid(pageid, pageidx))
693    {
694      // No tuple allocated yet
695      goto do_refresh;
696    }
697 
698    if (unlikely(isCopyTuple(pageid, pageidx)))
699    {
700      /**
701       * Only LCP reads a copy-tuple "directly"
702       */
703      ndbassert(Roptype == ZREAD);
704      ndbassert(disk_page == RNIL);
705      setup_lcp_read_copy_tuple(&req_struct, regOperPtr, regFragPtr, regTabPtr);
706      goto do_read;
707    }
708 
709    /**
710     * Get pointer to tuple
711     */
712    regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
713 						      req_struct.frag_page_id);
714 
715    setup_fixed_part(&req_struct, regOperPtr, regTabPtr);
716 
717    /**
718     * Check operation
719     */
720    if (Roptype == ZREAD) {
721      jam();
722 
723      if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
724 		    disk_page != RNIL))
725      {
726    do_read:
727        if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
728        {
729 	 req_struct.log_size= 0;
730 	 sendTUPKEYCONF(signal, &req_struct, regOperPtr);
731 	 /* ---------------------------------------------------------------- */
732 	 // Read Operations need not to be taken out of any lists.
733 	 // We also do not need to wait for commit since there is no changes
734 	 // to commit. Thus we
735 	 // prepare the operation record already now for the next operation.
736 	 // Write operations have set the state to STARTED above indicating
737 	 // that they are waiting for the Commit or Abort decision.
738 	 /* ---------------------------------------------------------------- */
739 	 set_trans_state(regOperPtr, TRANS_IDLE);
740        }
741        return;
742      }
743      tupkeyErrorLab(&req_struct);
744      return;
745    }
746 
747    if(insertActiveOpList(operPtr, &req_struct))
748    {
749      if(Roptype == ZINSERT)
750      {
751        jam();
752    do_insert:
753        Local_key accminupdate;
754        Local_key * accminupdateptr = &accminupdate;
755        if (unlikely(handleInsertReq(signal, operPtr,
756                                     fragptr, regTabPtr, &req_struct,
757                                     &accminupdateptr) == -1))
758        {
759          return;
760        }
761 
762        terrorCode = 0;
763        checkImmediateTriggersAfterInsert(&req_struct,
764                                          regOperPtr,
765                                          regTabPtr,
766                                          disk_page != RNIL);
767 
768        if (unlikely(terrorCode != 0))
769        {
770          tupkeyErrorLab(&req_struct);
771          return;
772        }
773 
774        if (!regTabPtr->tuxCustomTriggers.isEmpty())
775        {
776          jam();
777          if (unlikely(executeTuxInsertTriggers(signal,
778                                                regOperPtr,
779                                                regFragPtr,
780                                                regTabPtr) != 0))
781          {
782            jam();
783            /*
784             * TUP insert succeeded but add of TUX entries failed.  All
785             * TUX changes have been rolled back at this point.
786             *
787             * We will abort via tupkeyErrorLab() as usual.  This routine
788             * however resets the operation to ZREAD.  The TUP_ABORTREQ
789             * arriving later cannot then undo the insert.
790             *
791             * Therefore we call TUP_ABORTREQ already now.  Diskdata etc
792             * should be in memory and timeslicing cannot occur.  We must
793             * skip TUX abort triggers since TUX is already aborted.
794             */
795            signal->theData[0] = operPtr.i;
796            do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
797            tupkeyErrorLab(&req_struct);
798            return;
799          }
800        }
801 
802        if (accminupdateptr)
803        {
804          /**
805           * Update ACC local-key, once *everything* has completed succesfully
806           */
807          c_lqh->accminupdate(signal,
808                              regOperPtr->userpointer,
809                              accminupdateptr);
810        }
811 
812        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
813        return;
814      }
815 
816      if (Roptype == ZUPDATE) {
817        jam();
818        if (unlikely(handleUpdateReq(signal, regOperPtr,
819                                     regFragPtr, regTabPtr,
820                                     &req_struct, disk_page != RNIL) == -1))
821        {
822          return;
823        }
824 
825        terrorCode = 0;
826        checkImmediateTriggersAfterUpdate(&req_struct,
827                                          regOperPtr,
828                                          regTabPtr,
829                                          disk_page != RNIL);
830 
831        if (unlikely(terrorCode != 0))
832        {
833          tupkeyErrorLab(&req_struct);
834          return;
835        }
836 
837        if (!regTabPtr->tuxCustomTriggers.isEmpty())
838        {
839          jam();
840          if (unlikely(executeTuxUpdateTriggers(signal,
841                                                regOperPtr,
842                                                regFragPtr,
843                                                regTabPtr) != 0))
844          {
845            jam();
846            /*
847             * See insert case.
848             */
849            signal->theData[0] = operPtr.i;
850            do_tup_abortreq(signal, ZSKIP_TUX_TRIGGERS);
851            tupkeyErrorLab(&req_struct);
852            return;
853          }
854        }
855 
856        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
857        return;
858      }
859      else if(Roptype == ZDELETE)
860      {
861        jam();
862        req_struct.log_size= 0;
863        if (unlikely(handleDeleteReq(signal, regOperPtr,
864                                     regFragPtr, regTabPtr,
865                                     &req_struct,
866                                     disk_page != RNIL) == -1))
867        {
868          return;
869        }
870 
871        terrorCode = 0;
872        checkImmediateTriggersAfterDelete(&req_struct,
873                                          regOperPtr,
874                                          regTabPtr,
875                                          disk_page != RNIL);
876 
877        if (unlikely(terrorCode != 0))
878        {
879          tupkeyErrorLab(&req_struct);
880          return;
881        }
882 
883        /*
884         * TUX doesn't need to check for triggers at delete since entries in
885         * the index are kept until commit time.
886         */
887 
888        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
889        return;
890      }
891      else if (Roptype == ZREFRESH)
892      {
893        /**
894         * No TUX or immediate triggers, just detached triggers
895         */
896    do_refresh:
897        if (unlikely(handleRefreshReq(signal, operPtr,
898                                      fragptr, regTabPtr,
899                                      &req_struct, disk_page != RNIL) == -1))
900        {
901          return;
902        }
903 
904        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
905        return;
906 
907      }
908      else
909      {
910        ndbrequire(false); // Invalid op type
911      }
912    }
913 
914    tupkeyErrorLab(&req_struct);
915 }
916 
917 void
setup_fixed_part(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)918 Dbtup::setup_fixed_part(KeyReqStruct* req_struct,
919 			Operationrec* regOperPtr,
920 			Tablerec* regTabPtr)
921 {
922   PagePtr page_ptr;
923   Uint32* ptr= get_ptr(&page_ptr, &regOperPtr->m_tuple_location, regTabPtr);
924   req_struct->m_page_ptr = page_ptr;
925   req_struct->m_tuple_ptr = (Tuple_header*)ptr;
926 
927   ndbassert(regOperPtr->op_struct.op_type == ZINSERT || (! (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::FREE)));
928 
929   req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
930   req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
931 
932   Uint32 num_attr= regTabPtr->m_no_of_attributes;
933   Uint32 descr_start= regTabPtr->tabDescriptor;
934   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
935   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
936   req_struct->attr_descr= tab_descr;
937 }
938 
939 void
setup_lcp_read_copy_tuple(KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)940 Dbtup::setup_lcp_read_copy_tuple(KeyReqStruct* req_struct,
941                                  Operationrec* regOperPtr,
942                                  Fragrecord* regFragPtr,
943                                  Tablerec* regTabPtr)
944 {
945   Local_key tmp;
946   tmp.m_page_no = req_struct->frag_page_id;
947   tmp.m_page_idx = regOperPtr->m_tuple_location.m_page_idx;
948   clearCopyTuple(tmp.m_page_no, tmp.m_page_idx);
949 
950   Uint32 * copytuple = get_copy_tuple_raw(&tmp);
951   Local_key rowid;
952   memcpy(&rowid, copytuple+0, sizeof(Local_key));
953 
954   req_struct->frag_page_id = rowid.m_page_no;
955   regOperPtr->m_tuple_location.m_page_idx = rowid.m_page_idx;
956 
957   Tuple_header * th = get_copy_tuple(copytuple);
958   req_struct->m_page_ptr.setNull();
959   req_struct->m_tuple_ptr = (Tuple_header*)th;
960   th->m_operation_ptr_i = RNIL;
961   ndbassert((th->m_header_bits & Tuple_header::COPY_TUPLE) != 0);
962 
963   Uint32 num_attr= regTabPtr->m_no_of_attributes;
964   Uint32 descr_start= regTabPtr->tabDescriptor;
965   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
966   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
967   req_struct->attr_descr= tab_descr;
968 
969   bool disk = false;
970   if (regTabPtr->need_expand(disk))
971   {
972     jam();
973     prepare_read(req_struct, regTabPtr, disk);
974   }
975 }
976 
977  /* ---------------------------------------------------------------- */
978  /* ------------------------ CONFIRM REQUEST ----------------------- */
979  /* ---------------------------------------------------------------- */
sendTUPKEYCONF(Signal * signal,KeyReqStruct * req_struct,Operationrec * regOperPtr)980  void Dbtup::sendTUPKEYCONF(Signal* signal,
981 			    KeyReqStruct *req_struct,
982 			    Operationrec * regOperPtr)
983 {
984   TupKeyConf * tupKeyConf= (TupKeyConf *)signal->getDataPtrSend();
985 
986   Uint32 Rcreate_rowid = req_struct->m_use_rowid;
987   Uint32 RuserPointer= regOperPtr->userpointer;
988   Uint32 RnoFiredTriggers= req_struct->no_fired_triggers;
989   Uint32 log_size= req_struct->log_size;
990   Uint32 read_length= req_struct->read_length;
991   Uint32 last_row= req_struct->last_row;
992 
993   set_trans_state(regOperPtr, TRANS_STARTED);
994   set_tuple_state(regOperPtr, TUPLE_PREPARED);
995   tupKeyConf->userPtr= RuserPointer;
996   tupKeyConf->readLength= read_length;
997   tupKeyConf->writeLength= log_size;
998   tupKeyConf->noFiredTriggers= RnoFiredTriggers;
999   tupKeyConf->lastRow= last_row;
1000   tupKeyConf->rowid = Rcreate_rowid;
1001 
1002   EXECUTE_DIRECT(DBLQH, GSN_TUPKEYCONF, signal,
1003 		 TupKeyConf::SignalLength);
1004 
1005 }
1006 
1007 
1008 #define MAX_READ (MIN(sizeof(signal->theData), MAX_SEND_MESSAGE_BYTESIZE))
1009 
1010 /* ---------------------------------------------------------------- */
1011 /* ----------------------------- READ  ---------------------------- */
1012 /* ---------------------------------------------------------------- */
handleReadReq(Signal * signal,Operationrec * regOperPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct)1013 int Dbtup::handleReadReq(Signal* signal,
1014                          Operationrec* regOperPtr,
1015                          Tablerec* regTabPtr,
1016                          KeyReqStruct* req_struct)
1017 {
1018   Uint32 *dst;
1019   Uint32 dstLen, start_index;
1020   const BlockReference sendBref= req_struct->rec_blockref;
1021   if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1022       (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) {
1023     jam();
1024     ndbout_c("here2");
1025     terrorCode= ZTUPLE_CORRUPTED_ERROR;
1026     tupkeyErrorLab(req_struct);
1027     return -1;
1028   }
1029 
1030   const Uint32 node = refToNode(sendBref);
1031   if(node != 0 && node != getOwnNodeId()) {
1032     start_index= 25;
1033   } else {
1034     jam();
1035     /**
1036      * execute direct
1037      */
1038     start_index= 3;
1039   }
1040   dst= &signal->theData[start_index];
1041   dstLen= (MAX_READ / 4) - start_index;
1042   if (!req_struct->interpreted_exec) {
1043     jam();
1044     int ret = readAttributes(req_struct,
1045 			     &cinBuffer[0],
1046 			     req_struct->attrinfo_len,
1047 			     dst,
1048 			     dstLen,
1049 			     false);
1050     if (likely(ret >= 0)) {
1051 /* ------------------------------------------------------------------------- */
1052 // We have read all data into coutBuffer. Now send it to the API.
1053 /* ------------------------------------------------------------------------- */
1054       jam();
1055       Uint32 TnoOfDataRead= (Uint32) ret;
1056       req_struct->read_length += TnoOfDataRead;
1057       sendReadAttrinfo(signal, req_struct, TnoOfDataRead, regOperPtr);
1058       return 0;
1059     }
1060     else
1061     {
1062       terrorCode = Uint32(-ret);
1063     }
1064   } else {
1065     jam();
1066     if (likely(interpreterStartLab(signal, req_struct) != -1)) {
1067       return 0;
1068     }
1069     return -1;
1070   }
1071 
1072   jam();
1073   tupkeyErrorLab(req_struct);
1074   return -1;
1075 }
1076 
1077 static
1078 void
handle_reorg(Dbtup::KeyReqStruct * req_struct,Dbtup::Fragrecord::FragState state)1079 handle_reorg(Dbtup::KeyReqStruct * req_struct,
1080              Dbtup::Fragrecord::FragState state)
1081 {
1082   Uint32 reorg = req_struct->m_reorg;
1083   switch(state){
1084   case Dbtup::Fragrecord::FS_FREE:
1085   case Dbtup::Fragrecord::FS_REORG_NEW:
1086   case Dbtup::Fragrecord::FS_REORG_COMMIT_NEW:
1087   case Dbtup::Fragrecord::FS_REORG_COMPLETE_NEW:
1088     return;
1089   case Dbtup::Fragrecord::FS_REORG_COMMIT:
1090   case Dbtup::Fragrecord::FS_REORG_COMPLETE:
1091     if (reorg != 1)
1092       return;
1093     break;
1094   case Dbtup::Fragrecord::FS_ONLINE:
1095     if (reorg != 2)
1096       return;
1097     break;
1098   default:
1099     return;
1100   }
1101   req_struct->m_tuple_ptr->m_header_bits |= Dbtup::Tuple_header::REORG_MOVE;
1102 }
1103 
1104 /* ---------------------------------------------------------------- */
1105 /* ---------------------------- UPDATE ---------------------------- */
1106 /* ---------------------------------------------------------------- */
handleUpdateReq(Signal * signal,Operationrec * operPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)1107 int Dbtup::handleUpdateReq(Signal* signal,
1108                            Operationrec* operPtrP,
1109                            Fragrecord* regFragPtr,
1110                            Tablerec* regTabPtr,
1111                            KeyReqStruct* req_struct,
1112 			   bool disk)
1113 {
1114   Tuple_header *dst;
1115   Tuple_header *base= req_struct->m_tuple_ptr, *org;
1116   ChangeMask * change_mask_ptr;
1117   if ((dst= alloc_copy_tuple(regTabPtr, &operPtrP->m_copy_tuple_location))== 0)
1118   {
1119     terrorCode= ZMEM_NOMEM_ERROR;
1120     goto error;
1121   }
1122 
1123   Uint32 tup_version;
1124   change_mask_ptr = get_change_mask_ptr(regTabPtr, dst);
1125   if(operPtrP->is_first_operation())
1126   {
1127     org= req_struct->m_tuple_ptr;
1128     tup_version= org->get_tuple_version();
1129     clear_change_mask_info(regTabPtr, change_mask_ptr);
1130   }
1131   else
1132   {
1133     Operationrec* prevOp= req_struct->prevOpPtr.p;
1134     tup_version= prevOp->tupVersion;
1135     Uint32 * rawptr = get_copy_tuple_raw(&prevOp->m_copy_tuple_location);
1136     org= get_copy_tuple(rawptr);
1137     copy_change_mask_info(regTabPtr,
1138                           change_mask_ptr,
1139                           get_change_mask_ptr(rawptr));
1140   }
1141 
1142   /**
1143    * Check consistency before update/delete
1144    */
1145   req_struct->m_tuple_ptr= org;
1146   if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
1147       (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0))
1148   {
1149     terrorCode= ZTUPLE_CORRUPTED_ERROR;
1150     goto error;
1151   }
1152 
1153   req_struct->m_tuple_ptr= dst;
1154 
1155   union {
1156     Uint32 sizes[4];
1157     Uint64 cmp[2];
1158   };
1159 
1160   disk = disk || (org->m_header_bits & Tuple_header::DISK_INLINE);
1161   if (regTabPtr->need_expand(disk))
1162   {
1163     expand_tuple(req_struct, sizes, org, regTabPtr, disk);
1164     if(disk && operPtrP->m_undo_buffer_space == 0)
1165     {
1166       operPtrP->op_struct.m_wait_log_buffer = 1;
1167       operPtrP->op_struct.m_load_diskpage_on_commit = 1;
1168       Uint32 sz= operPtrP->m_undo_buffer_space=
1169 	(sizeof(Dbtup::Disk_undo::Update) >> 2) + sizes[DD] - 1;
1170 
1171       D("Logfile_client - handleUpdateReq");
1172       Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1173       terrorCode= lgman.alloc_log_space(sz);
1174       if(unlikely(terrorCode))
1175       {
1176 	operPtrP->m_undo_buffer_space= 0;
1177 	goto error;
1178       }
1179     }
1180   }
1181   else
1182   {
1183     memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1184     req_struct->m_tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1185   }
1186 
1187   tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
1188   operPtrP->tupVersion= tup_version;
1189 
1190   req_struct->optimize_options = 0;
1191 
1192   if (!req_struct->interpreted_exec) {
1193     jam();
1194 
1195     if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1196     {
1197       jam();
1198       Uint32 attrId =
1199         regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1200 
1201       store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
1202     }
1203     int retValue = updateAttributes(req_struct,
1204 				    &cinBuffer[0],
1205 				    req_struct->attrinfo_len);
1206     if (unlikely(retValue < 0))
1207     {
1208       terrorCode = Uint32(-retValue);
1209       goto error;
1210     }
1211   } else {
1212     jam();
1213     if (unlikely(interpreterStartLab(signal, req_struct) == -1))
1214       return -1;
1215   }
1216 
1217   update_change_mask_info(regTabPtr,
1218                           change_mask_ptr,
1219                           req_struct->changeMask.rep.data);
1220 
1221   switch (req_struct->optimize_options) {
1222     case AttributeHeader::OPTIMIZE_MOVE_VARPART:
1223       /**
1224        * optimize varpart of tuple,  move varpart of tuple from
1225        * big-free-size page list into small-free-size page list
1226        */
1227       if(base->m_header_bits & Tuple_header::VAR_PART)
1228         optimize_var_part(req_struct, base, operPtrP,
1229                           regFragPtr, regTabPtr);
1230       break;
1231     case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
1232       //TODO: move fix part of tuple
1233       break;
1234     default:
1235       break;
1236   }
1237 
1238   if (regTabPtr->need_shrink())
1239   {
1240     shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
1241     if (cmp[0] != cmp[1] && handle_size_change_after_update(req_struct,
1242 							    base,
1243 							    operPtrP,
1244 							    regFragPtr,
1245 							    regTabPtr,
1246 							    sizes)) {
1247       goto error;
1248     }
1249   }
1250 
1251   if (req_struct->m_reorg)
1252   {
1253     handle_reorg(req_struct, regFragPtr->fragStatus);
1254   }
1255 
1256   req_struct->m_tuple_ptr->set_tuple_version(tup_version);
1257   if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
1258     jam();
1259     setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1260   }
1261 
1262   set_tuple_state(operPtrP, TUPLE_PREPARED);
1263 
1264   return 0;
1265 
1266 error:
1267   tupkeyErrorLab(req_struct);
1268   return -1;
1269 }
1270 
1271 /*
1272   expand_dyn_part - copy dynamic attributes to fully expanded size.
1273 
1274   Both variable-sized and fixed-size attributes are stored in the same way
1275   in the expanded form as variable-sized attributes (in expand_var_part()).
1276 
1277   This method is used for both mem and disk dynamic data.
1278 
1279     dst         Destination for expanded data
1280     tabPtrP     Table descriptor
1281     src         Pointer to the start of dynamic bitmap in source row
1282     row_len     Total number of 32-bit words in dynamic part of row
1283     tabDesc     Array of table descriptors
1284     order       Array of indexes into tabDesc, dynfix followed by dynvar
1285 */
1286 static
1287 Uint32*
expand_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,Uint32 row_len,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 max_bmlen)1288 expand_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1289 		const Uint32* src,
1290 		Uint32 row_len,
1291 		const Uint32 * tabDesc,
1292 		const Uint16* order,
1293 		Uint32 dynvar,
1294 		Uint32 dynfix,
1295 		Uint32 max_bmlen)
1296 {
1297   /* Copy the bitmap, zeroing out any words not stored in the row. */
1298   Uint32 *dst_bm_ptr= (Uint32*)dst->m_dyn_data_ptr;
1299   Uint32 bm_len = row_len ? (* src & Dbtup::DYN_BM_LEN_MASK) : 0;
1300 
1301   assert(bm_len <= max_bmlen);
1302 
1303   if(bm_len > 0)
1304     memcpy(dst_bm_ptr, src, 4*bm_len);
1305   if(bm_len < max_bmlen)
1306     bzero(dst_bm_ptr + bm_len, 4 * (max_bmlen - bm_len));
1307 
1308   /**
1309    * Store max_bmlen for homogen code in DbtupRoutines
1310    */
1311   Uint32 tmp = (* dst_bm_ptr);
1312   * dst_bm_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | max_bmlen;
1313 
1314   char *src_off_start= (char*)(src + bm_len);
1315   assert((UintPtr(src_off_start)&3) == 0);
1316   Uint16 *src_off_ptr= (Uint16*)src_off_start;
1317 
1318   /*
1319     Prepare the variable-sized dynamic attributes, copying out data from the
1320     source row for any that are not NULL.
1321   */
1322   Uint32 no_attr= dst->m_dyn_len_offset;
1323   Uint16* dst_off_ptr= dst->m_dyn_offset_arr_ptr;
1324   Uint16* dst_len_ptr= dst_off_ptr + no_attr;
1325   Uint16 this_src_off= row_len ? * src_off_ptr++ : 0;
1326   /* We need to reserve room for the offsets written by shrink_tuple+padding.*/
1327   Uint16 dst_off= 4 * (max_bmlen + ((dynvar+2)>>1));
1328   char *dst_ptr= (char*)dst_bm_ptr + dst_off;
1329   for(Uint32 i= 0; i<dynvar; i++)
1330   {
1331     Uint16 j= order[dynfix+i];
1332     Uint32 max_len= 4 *AttributeDescriptor::getSizeInWords(tabDesc[j]);
1333     Uint32 len;
1334     Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1335     if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1336     {
1337       Uint16 next_src_off= *src_off_ptr++;
1338       len= next_src_off - this_src_off;
1339       memcpy(dst_ptr, src_off_start+this_src_off, len);
1340       this_src_off= next_src_off;
1341     }
1342     else
1343     {
1344       len= 0;
1345     }
1346     dst_off_ptr[i]= dst_off;
1347     dst_len_ptr[i]= dst_off+len;
1348     dst_off+= max_len;
1349     dst_ptr+= max_len;
1350   }
1351   /*
1352     The fixed-size data is stored 32-bit aligned after the variable-sized
1353     data.
1354   */
1355   char *src_ptr= src_off_start+this_src_off;
1356   src_ptr= (char *)(ALIGN_WORD(src_ptr));
1357 
1358   /*
1359     Prepare the fixed-size dynamic attributes, copying out data from the
1360     source row for any that are not NULL.
1361     Note that the fixed-size data is stored in reverse from the end of the
1362     dynamic part of the row. This is true both for the stored/shrunken and
1363     for the expanded form.
1364   */
1365   for(Uint32 i= dynfix; i>0; )
1366   {
1367     i--;
1368     Uint16 j= order[i];
1369     Uint32 fix_size= 4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1370     dst_off_ptr[dynvar+i]= dst_off;
1371     /* len offset array is not used for fixed size. */
1372     Uint32 pos = AttributeOffset::getNullFlagPos(tabDesc[j+1]);
1373     if(bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, src, pos))
1374     {
1375       assert((UintPtr(dst_ptr)&3) == 0);
1376       memcpy(dst_ptr, src_ptr, fix_size);
1377       src_ptr+= fix_size;
1378     }
1379     dst_off+= fix_size;
1380     dst_ptr+= fix_size;
1381   }
1382 
1383   return (Uint32 *)dst_ptr;
1384 }
1385 
1386 static
1387 Uint32*
shrink_dyn_part(Dbtup::KeyReqStruct::Var_data * dst,Uint32 * dst_ptr,const Dbtup::Tablerec * tabPtrP,const Uint32 * tabDesc,const Uint16 * order,Uint32 dynvar,Uint32 dynfix,Uint32 ind)1388 shrink_dyn_part(Dbtup::KeyReqStruct::Var_data *dst,
1389                 Uint32 *dst_ptr,
1390                 const Dbtup::Tablerec* tabPtrP,
1391                 const Uint32 * tabDesc,
1392                 const Uint16* order,
1393                 Uint32 dynvar,
1394                 Uint32 dynfix,
1395                 Uint32 ind)
1396 {
1397   /**
1398    * Now build the dynamic part, if any.
1399    * First look for any trailing all-NULL words of the bitmap; we do
1400    * not need to store those.
1401    */
1402   assert((UintPtr(dst->m_dyn_data_ptr)&3) == 0);
1403   char *dyn_src_ptr= dst->m_dyn_data_ptr;
1404   Uint32 bm_len = tabPtrP->m_offsets[ind].m_dyn_null_words; // In words
1405 
1406   /* If no dynamic variables, store nothing. */
1407   assert(bm_len);
1408   {
1409     /**
1410      * clear bm-len bits, so they won't incorrect indicate
1411      *   a non-zero map
1412      */
1413     * ((Uint32 *)dyn_src_ptr) &= ~Uint32(Dbtup::DYN_BM_LEN_MASK);
1414 
1415     Uint32 *bm_ptr= (Uint32 *)dyn_src_ptr + bm_len - 1;
1416     while(*bm_ptr == 0)
1417     {
1418       bm_ptr--;
1419       bm_len--;
1420       if(bm_len == 0)
1421         break;
1422     }
1423   }
1424 
1425   if (bm_len)
1426   {
1427     /**
1428      * Copy the bitmap, counting the number of variable sized
1429      * attributes that are not NULL on the way.
1430      */
1431     Uint32 *dyn_dst_ptr= dst_ptr;
1432     Uint32 dyn_var_count= 0;
1433     const Uint32 *src_bm_ptr= (Uint32 *)(dyn_src_ptr);
1434     Uint32 *dst_bm_ptr= (Uint32 *)dyn_dst_ptr;
1435 
1436     /* ToDo: Put all of the dynattr code inside if(bm_len>0) { ... },
1437      * split to separate function. */
1438     Uint16 dyn_dst_data_offset= 0;
1439     const Uint32 *dyn_bm_var_mask_ptr= tabPtrP->dynVarSizeMask[ind];
1440     for(Uint16 i= 0; i< bm_len; i++)
1441     {
1442       Uint32 v= src_bm_ptr[i];
1443       dyn_var_count+= BitmaskImpl::count_bits(v & *dyn_bm_var_mask_ptr++);
1444       dst_bm_ptr[i]= v;
1445     }
1446 
1447     Uint32 tmp = *dyn_dst_ptr;
1448     assert(bm_len <= Dbtup::DYN_BM_LEN_MASK);
1449     * dyn_dst_ptr = (tmp & ~(Uint32)Dbtup::DYN_BM_LEN_MASK) | bm_len;
1450     dyn_dst_ptr+= bm_len;
1451     dyn_dst_data_offset= 2*dyn_var_count + 2;
1452 
1453     Uint16 *dyn_src_off_array= dst->m_dyn_offset_arr_ptr;
1454     Uint16 *dyn_src_lenoff_array=
1455       dyn_src_off_array + dst->m_dyn_len_offset;
1456     Uint16* dyn_dst_off_array = (Uint16*)dyn_dst_ptr;
1457 
1458     /**
1459      * Copy over the variable sized not-NULL attributes.
1460      * Data offsets are counted from the start of the offset array, and
1461      * we store one additional offset to be able to easily compute the
1462      * data length as the difference between offsets.
1463      */
1464     Uint16 off_idx= 0;
1465     for(Uint32 i= 0; i<dynvar; i++)
1466     {
1467       /**
1468        * Note that we must use the destination (shrunken) bitmap here,
1469        * as the source (expanded) bitmap may have been already clobbered
1470        * (by offset data).
1471        */
1472       Uint32 attrDesc2 = tabDesc[order[dynfix+i]+1];
1473       Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1474       if (bm_len > (pos >> 5) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1475       {
1476         dyn_dst_off_array[off_idx++]= dyn_dst_data_offset;
1477         Uint32 dyn_src_off= dyn_src_off_array[i];
1478         Uint32 dyn_len= dyn_src_lenoff_array[i] - dyn_src_off;
1479         memmove(((char *)dyn_dst_ptr) + dyn_dst_data_offset,
1480                 dyn_src_ptr + dyn_src_off,
1481                 dyn_len);
1482         dyn_dst_data_offset+= dyn_len;
1483       }
1484     }
1485     /* If all dynamic attributes are NULL, we store nothing. */
1486     dyn_dst_off_array[off_idx]= dyn_dst_data_offset;
1487     assert(dyn_dst_off_array + off_idx == (Uint16*)dyn_dst_ptr+dyn_var_count);
1488 
1489     char *dynvar_end_ptr= ((char *)dyn_dst_ptr) + dyn_dst_data_offset;
1490     char *dyn_dst_data_ptr= (char *)(ALIGN_WORD(dynvar_end_ptr));
1491 
1492     /**
1493      * Zero out any padding bytes. Might not be strictly necessary,
1494      * but seems cleaner than leaving random stuff in there.
1495      */
1496     bzero(dynvar_end_ptr, dyn_dst_data_ptr-dynvar_end_ptr);
1497 
1498     /* *
1499      * Copy over the fixed-sized not-NULL attributes.
1500      * Note that attributes are copied in reverse order; this is to avoid
1501      * overwriting not-yet-copied data, as the data is also stored in
1502      * reverse order.
1503      */
1504     for(Uint32 i= dynfix; i > 0; )
1505     {
1506       i--;
1507       Uint16 j= order[i];
1508       Uint32 attrDesc2 = tabDesc[j+1];
1509       Uint32 pos = AttributeOffset::getNullFlagPos(attrDesc2);
1510       if(bm_len > (pos >>5 ) && BitmaskImpl::get(bm_len, dst_bm_ptr, pos))
1511       {
1512         Uint32 fixsize=
1513           4*AttributeDescriptor::getSizeInWords(tabDesc[j]);
1514         memmove(dyn_dst_data_ptr,
1515                 dyn_src_ptr + dyn_src_off_array[dynvar+i],
1516                 fixsize);
1517         dyn_dst_data_ptr += fixsize;
1518       }
1519     }
1520     dst_ptr = (Uint32*)dyn_dst_data_ptr;
1521     assert((UintPtr(dst_ptr) & 3) == 0);
1522   }
1523   return (Uint32 *)dst_ptr;
1524 }
1525 
1526 /* ---------------------------------------------------------------- */
1527 /* ----------------------------- INSERT --------------------------- */
1528 /* ---------------------------------------------------------------- */
1529 void
prepare_initial_insert(KeyReqStruct * req_struct,Operationrec * regOperPtr,Tablerec * regTabPtr)1530 Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
1531 			      Operationrec* regOperPtr,
1532 			      Tablerec* regTabPtr)
1533 {
1534   Uint32 disk_undo = regTabPtr->m_no_of_disk_attributes ?
1535     sizeof(Dbtup::Disk_undo::Alloc) >> 2 : 0;
1536   regOperPtr->nextActiveOp= RNIL;
1537   regOperPtr->prevActiveOp= RNIL;
1538   regOperPtr->op_struct.in_active_list= true;
1539   regOperPtr->m_undo_buffer_space= disk_undo;
1540 
1541   req_struct->check_offset[MM]= regTabPtr->get_check_offset(MM);
1542   req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
1543 
1544   Uint32 num_attr= regTabPtr->m_no_of_attributes;
1545   Uint32 descr_start= regTabPtr->tabDescriptor;
1546   Uint32 order_desc= regTabPtr->m_real_order_descriptor;
1547   TableDescriptor *tab_descr= &tableDescriptor[descr_start];
1548   ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
1549   req_struct->attr_descr= tab_descr;
1550   Uint16* order= (Uint16*)&tableDescriptor[order_desc];
1551   order += regTabPtr->m_attributes[MM].m_no_of_fixsize;
1552 
1553   Uint32 bits = Tuple_header::COPY_TUPLE;
1554   bits |= disk_undo ? (Tuple_header::DISK_ALLOC|Tuple_header::DISK_INLINE) : 0;
1555 
1556   const Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
1557   const Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
1558   const Uint32 mm_dynvar= regTabPtr->m_attributes[MM].m_no_of_dyn_var;
1559   const Uint32 mm_dynfix= regTabPtr->m_attributes[MM].m_no_of_dyn_fix;
1560   const Uint32 dd_vars= regTabPtr->m_attributes[DD].m_no_of_varsize;
1561   Uint32 *ptr= req_struct->m_tuple_ptr->get_end_of_fix_part_ptr(regTabPtr);
1562   Var_part_ref* ref = req_struct->m_tuple_ptr->get_var_part_ref_ptr(regTabPtr);
1563 
1564   if (regTabPtr->m_bits & Tablerec::TR_ForceVarPart)
1565   {
1566     ref->m_page_no = RNIL;
1567     ref->m_page_idx = Tup_varsize_page::END_OF_FREE_LIST;
1568   }
1569 
1570   if(mm_vars || mm_dyns)
1571   {
1572     jam();
1573     /* Init Varpart_copy struct */
1574     Varpart_copy * cp = (Varpart_copy*)ptr;
1575     cp->m_len = 0;
1576     ptr += Varpart_copy::SZ32;
1577 
1578     /* Prepare empty varsize part. */
1579     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
1580 
1581     if (mm_vars)
1582     {
1583       dst->m_data_ptr= (char*)(((Uint16*)ptr)+mm_vars+1);
1584       dst->m_offset_array_ptr= req_struct->var_pos_array;
1585       dst->m_var_len_offset= mm_vars;
1586       dst->m_max_var_offset= regTabPtr->m_offsets[MM].m_max_var_offset;
1587 
1588       Uint32 pos= 0;
1589       Uint16 *pos_ptr = req_struct->var_pos_array;
1590       Uint16 *len_ptr = pos_ptr + mm_vars;
1591       for(Uint32 i= 0; i<mm_vars; i++)
1592       {
1593         * pos_ptr++ = pos;
1594         * len_ptr++ = pos;
1595         pos += AttributeDescriptor::getSizeInBytes(tab_descr[*order++].tabDescr);
1596       }
1597 
1598       // Disk/dynamic part is 32-bit aligned
1599       ptr = ALIGN_WORD(dst->m_data_ptr+pos);
1600       ndbassert(ptr == ALIGN_WORD(dst->m_data_ptr +
1601                                   regTabPtr->m_offsets[MM].m_max_var_offset));
1602     }
1603 
1604     if (mm_dyns)
1605     {
1606       jam();
1607       /* Prepare empty dynamic part. */
1608       dst->m_dyn_data_ptr= (char *)ptr;
1609       dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
1610       dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
1611       dst->m_max_dyn_offset= regTabPtr->m_offsets[MM].m_max_dyn_offset;
1612 
1613       ptr = expand_dyn_part(dst, 0, 0,
1614                             (Uint32*)tab_descr, order,
1615                             mm_dynvar, mm_dynfix,
1616                             regTabPtr->m_offsets[MM].m_dyn_null_words);
1617     }
1618 
1619     ndbassert((UintPtr(ptr)&3) == 0);
1620   }
1621 
1622   req_struct->m_disk_ptr= (Tuple_header*)ptr;
1623 
1624   ndbrequire(dd_vars == 0);
1625 
1626   req_struct->m_tuple_ptr->m_header_bits= bits;
1627 
1628   // Set all null bits
1629   memset(req_struct->m_tuple_ptr->m_null_bits+
1630 	 regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1631 	 4*regTabPtr->m_offsets[MM].m_null_words);
1632   memset(req_struct->m_disk_ptr->m_null_bits+
1633 	 regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1634 	 4*regTabPtr->m_offsets[DD].m_null_words);
1635 }
1636 
handleInsertReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> fragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,Local_key ** accminupdateptr)1637 int Dbtup::handleInsertReq(Signal* signal,
1638                            Ptr<Operationrec> regOperPtr,
1639                            Ptr<Fragrecord> fragPtr,
1640                            Tablerec* regTabPtr,
1641                            KeyReqStruct *req_struct,
1642                            Local_key ** accminupdateptr)
1643 {
1644   Uint32 tup_version = 1;
1645   Fragrecord* regFragPtr = fragPtr.p;
1646   Uint32 *ptr= 0;
1647   Tuple_header *dst;
1648   Tuple_header *base= req_struct->m_tuple_ptr, *org= base;
1649   Tuple_header *tuple_ptr;
1650 
1651   bool disk = regTabPtr->m_no_of_disk_attributes > 0;
1652   bool mem_insert = regOperPtr.p->is_first_operation();
1653   bool disk_insert = mem_insert && disk;
1654   bool vardynsize = (regTabPtr->m_attributes[MM].m_no_of_varsize ||
1655                      regTabPtr->m_attributes[MM].m_no_of_dynamic);
1656   bool varalloc = vardynsize || regTabPtr->m_bits & Tablerec::TR_ForceVarPart;
1657   bool rowid = req_struct->m_use_rowid;
1658   bool update_acc = false;
1659   Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1660   Uint32 frag_page_id = req_struct->frag_page_id;
1661 
1662   union {
1663     Uint32 sizes[4];
1664     Uint64 cmp[2];
1665   };
1666   cmp[0] = cmp[1] = 0;
1667 
1668   if (ERROR_INSERTED(4014))
1669   {
1670     dst = 0;
1671     goto undo_buffer_error;
1672   }
1673 
1674   dst= alloc_copy_tuple(regTabPtr, &regOperPtr.p->m_copy_tuple_location);
1675 
1676   if (unlikely(dst == 0))
1677   {
1678     goto undo_buffer_error;
1679   }
1680   tuple_ptr= req_struct->m_tuple_ptr= dst;
1681   set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
1682 
1683   if(mem_insert)
1684   {
1685     jam();
1686     prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
1687   }
1688   else
1689   {
1690     Operationrec* prevOp= req_struct->prevOpPtr.p;
1691     ndbassert(prevOp->op_struct.op_type == ZDELETE);
1692     tup_version= prevOp->tupVersion + 1;
1693 
1694     if(!prevOp->is_first_operation())
1695       org= get_copy_tuple(&prevOp->m_copy_tuple_location);
1696     if (regTabPtr->need_expand())
1697     {
1698       expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
1699       memset(req_struct->m_disk_ptr->m_null_bits+
1700              regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
1701              4*regTabPtr->m_offsets[DD].m_null_words);
1702 
1703       Uint32 bm_size_in_bytes= 4*(regTabPtr->m_offsets[MM].m_dyn_null_words);
1704       if (bm_size_in_bytes)
1705       {
1706         Uint32* ptr =
1707           (Uint32*)req_struct->m_var_data[MM].m_dyn_data_ptr;
1708         bzero(ptr, bm_size_in_bytes);
1709         * ptr = bm_size_in_bytes >> 2;
1710       }
1711     }
1712     else
1713     {
1714       memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
1715       tuple_ptr->m_header_bits |= Tuple_header::COPY_TUPLE;
1716     }
1717     memset(tuple_ptr->m_null_bits+
1718            regTabPtr->m_offsets[MM].m_null_offset, 0xFF,
1719            4*regTabPtr->m_offsets[MM].m_null_words);
1720   }
1721 
1722   int res;
1723   if (disk_insert)
1724   {
1725     if (ERROR_INSERTED(4015))
1726     {
1727       terrorCode = 1501;
1728       goto log_space_error;
1729     }
1730 
1731     D("Logfile_client - handleInsertReq");
1732     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
1733     res= lgman.alloc_log_space(regOperPtr.p->m_undo_buffer_space);
1734     if(unlikely(res))
1735     {
1736       terrorCode= res;
1737       goto log_space_error;
1738     }
1739   }
1740 
1741   regOperPtr.p->tupVersion= tup_version & ZTUP_VERSION_MASK;
1742   tuple_ptr->set_tuple_version(tup_version);
1743 
1744   if (ERROR_INSERTED(4016))
1745   {
1746     terrorCode = ZAI_INCONSISTENCY_ERROR;
1747     goto update_error;
1748   }
1749 
1750   if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
1751   {
1752     Uint32 attrId =
1753       regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
1754 
1755     store_extra_row_bits(attrId, regTabPtr, tuple_ptr, /* default */ 0, false);
1756   }
1757 
1758   if (!regTabPtr->m_default_value_location.isNull())
1759   {
1760     jam();
1761     Uint32 default_values_len;
1762     /* Get default values ptr + len for this table */
1763     Uint32* default_values = get_default_ptr(regTabPtr, default_values_len);
1764     ndbrequire(default_values_len != 0 && default_values != NULL);
1765     /*
1766      * Update default values into row first,
1767      * next update with data received from the client.
1768      */
1769     if(unlikely((res = updateAttributes(req_struct, default_values,
1770                                         default_values_len)) < 0))
1771     {
1772       jam();
1773       terrorCode = Uint32(-res);
1774       goto update_error;
1775     }
1776   }
1777 
1778   if(unlikely((res = updateAttributes(req_struct, &cinBuffer[0],
1779                                       req_struct->attrinfo_len)) < 0))
1780   {
1781     terrorCode = Uint32(-res);
1782     goto update_error;
1783   }
1784 
1785   if (ERROR_INSERTED(4017))
1786   {
1787     goto null_check_error;
1788   }
1789   if (unlikely(checkNullAttributes(req_struct, regTabPtr) == false))
1790   {
1791     goto null_check_error;
1792   }
1793 
1794   if (req_struct->m_is_lcp)
1795   {
1796     jam();
1797     sizes[2+MM] = req_struct->m_lcp_varpart_len;
1798   }
1799   else if (regTabPtr->need_shrink())
1800   {
1801     shrink_tuple(req_struct, sizes+2, regTabPtr, true);
1802   }
1803 
1804   if (ERROR_INSERTED(4025))
1805   {
1806     goto mem_error;
1807   }
1808 
1809   if (ERROR_INSERTED(4026))
1810   {
1811     CLEAR_ERROR_INSERT_VALUE;
1812     goto mem_error;
1813   }
1814 
1815   if (ERROR_INSERTED(4027) && (rand() % 100) > 25)
1816   {
1817     goto mem_error;
1818   }
1819 
1820   if (ERROR_INSERTED(4028) && (rand() % 100) > 25)
1821   {
1822     CLEAR_ERROR_INSERT_VALUE;
1823     goto mem_error;
1824   }
1825 
1826   /**
1827    * Alloc memory
1828    */
1829   if(mem_insert)
1830   {
1831     if (!rowid)
1832     {
1833       if (ERROR_INSERTED(4018))
1834       {
1835 	goto mem_error;
1836       }
1837 
1838       if (!varalloc)
1839       {
1840 	jam();
1841 	ptr= alloc_fix_rec(&terrorCode,
1842                            regFragPtr,
1843 			   regTabPtr,
1844 			   &regOperPtr.p->m_tuple_location,
1845 			   &frag_page_id);
1846       }
1847       else
1848       {
1849 	jam();
1850 	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1851 	ptr= alloc_var_rec(&terrorCode,
1852                            regFragPtr, regTabPtr,
1853 			   sizes[2+MM],
1854 			   &regOperPtr.p->m_tuple_location,
1855 			   &frag_page_id);
1856       }
1857       if (unlikely(ptr == 0))
1858       {
1859 	goto mem_error;
1860       }
1861       req_struct->m_use_rowid = true;
1862     }
1863     else
1864     {
1865       regOperPtr.p->m_tuple_location = req_struct->m_row_id;
1866       if (ERROR_INSERTED(4019))
1867       {
1868 	terrorCode = ZROWID_ALLOCATED;
1869 	goto alloc_rowid_error;
1870       }
1871 
1872       if (!varalloc)
1873       {
1874 	jam();
1875 	ptr= alloc_fix_rowid(&terrorCode,
1876                              regFragPtr,
1877 			     regTabPtr,
1878 			     &regOperPtr.p->m_tuple_location,
1879 			     &frag_page_id);
1880       }
1881       else
1882       {
1883 	jam();
1884 	regOperPtr.p->m_tuple_location.m_file_no= sizes[2+MM];
1885 	ptr= alloc_var_rowid(&terrorCode,
1886                              regFragPtr, regTabPtr,
1887 			     sizes[2+MM],
1888 			     &regOperPtr.p->m_tuple_location,
1889 			     &frag_page_id);
1890       }
1891       if (unlikely(ptr == 0))
1892       {
1893 	jam();
1894 	goto alloc_rowid_error;
1895       }
1896     }
1897     real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
1898     update_acc = true; /* Will be updated later once success is known */
1899 
1900     base = (Tuple_header*)ptr;
1901     base->m_operation_ptr_i= regOperPtr.i;
1902     base->m_header_bits= Tuple_header::ALLOC |
1903       (sizes[2+MM] > 0 ? Tuple_header::VAR_PART : 0);
1904   }
1905   else
1906   {
1907     if (ERROR_INSERTED(4020))
1908     {
1909       goto size_change_error;
1910     }
1911 
1912     if (regTabPtr->need_shrink() && cmp[0] != cmp[1] &&
1913 	unlikely(handle_size_change_after_update(req_struct,
1914                                                  base,
1915                                                  regOperPtr.p,
1916                                                  regFragPtr,
1917                                                  regTabPtr,
1918                                                  sizes) != 0))
1919     {
1920       goto size_change_error;
1921     }
1922     req_struct->m_use_rowid = false;
1923     base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
1924   }
1925 
1926   if (disk_insert)
1927   {
1928     Local_key tmp;
1929     Uint32 size= regTabPtr->m_attributes[DD].m_no_of_varsize == 0 ?
1930       1 : sizes[2+DD];
1931 
1932     if (ERROR_INSERTED(4021))
1933     {
1934       terrorCode = 1601;
1935       goto disk_prealloc_error;
1936     }
1937 
1938     int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
1939     if (unlikely(ret < 0))
1940     {
1941       terrorCode = -ret;
1942       goto disk_prealloc_error;
1943     }
1944 
1945     regOperPtr.p->op_struct.m_disk_preallocated= 1;
1946     tmp.m_page_idx= size;
1947     memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &tmp, sizeof(tmp));
1948 
1949     /**
1950      * Set ref from disk to mm
1951      */
1952     Local_key ref = regOperPtr.p->m_tuple_location;
1953     ref.m_page_no = frag_page_id;
1954 
1955     Tuple_header* disk_ptr= req_struct->m_disk_ptr;
1956     disk_ptr->m_header_bits = 0;
1957     disk_ptr->m_base_record_ref= ref.ref();
1958   }
1959 
1960   if (req_struct->m_reorg)
1961   {
1962     handle_reorg(req_struct, regFragPtr->fragStatus);
1963   }
1964 
1965   /* Have been successful with disk + mem, update ACC to point to
1966    * new record if necessary
1967    * Failures in disk alloc will skip this part
1968    */
1969   if (update_acc)
1970   {
1971     /* Acc stores the local key with the frag_page_id rather
1972      * than the real_page_id
1973      */
1974     ndbassert(regOperPtr.p->m_tuple_location.m_page_no == real_page_id);
1975 
1976     Local_key accKey = regOperPtr.p->m_tuple_location;
1977     accKey.m_page_no = frag_page_id;
1978     ** accminupdateptr = accKey;
1979   }
1980   else
1981   {
1982     * accminupdateptr = 0; // No accminupdate should be performed
1983   }
1984 
1985   if (regTabPtr->m_bits & Tablerec::TR_Checksum)
1986   {
1987     jam();
1988     setChecksum(req_struct->m_tuple_ptr, regTabPtr);
1989   }
1990 
1991   set_tuple_state(regOperPtr.p, TUPLE_PREPARED);
1992 
1993   return 0;
1994 
1995 size_change_error:
1996   jam();
1997   terrorCode = ZMEM_NOMEM_ERROR;
1998   goto exit_error;
1999 
2000 undo_buffer_error:
2001   jam();
2002   terrorCode= ZMEM_NOMEM_ERROR;
2003   regOperPtr.p->m_undo_buffer_space = 0;
2004   if (mem_insert)
2005     regOperPtr.p->m_tuple_location.setNull();
2006   regOperPtr.p->m_copy_tuple_location.setNull();
2007   tupkeyErrorLab(req_struct);
2008   return -1;
2009 
2010 null_check_error:
2011   jam();
2012   terrorCode= ZNO_ILLEGAL_NULL_ATTR;
2013   goto update_error;
2014 
2015 mem_error:
2016   jam();
2017   terrorCode= ZMEM_NOMEM_ERROR;
2018   goto update_error;
2019 
2020 log_space_error:
2021   jam();
2022   regOperPtr.p->m_undo_buffer_space = 0;
2023 alloc_rowid_error:
2024   jam();
2025 update_error:
2026   jam();
2027   if (mem_insert)
2028   {
2029     regOperPtr.p->op_struct.in_active_list = false;
2030     regOperPtr.p->m_tuple_location.setNull();
2031   }
2032 exit_error:
2033   tupkeyErrorLab(req_struct);
2034   return -1;
2035 
2036 disk_prealloc_error:
2037   base->m_header_bits |= Tuple_header::FREED;
2038   goto exit_error;
2039 }
2040 
2041 /* ---------------------------------------------------------------- */
2042 /* ---------------------------- DELETE ---------------------------- */
2043 /* ---------------------------------------------------------------- */
handleDeleteReq(Signal * signal,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2044 int Dbtup::handleDeleteReq(Signal* signal,
2045                            Operationrec* regOperPtr,
2046                            Fragrecord* regFragPtr,
2047                            Tablerec* regTabPtr,
2048                            KeyReqStruct *req_struct,
2049 			   bool disk)
2050 {
2051   Tuple_header* dst = alloc_copy_tuple(regTabPtr,
2052                                        &regOperPtr->m_copy_tuple_location);
2053   if (dst == 0) {
2054     terrorCode = ZMEM_NOMEM_ERROR;
2055     goto error;
2056   }
2057 
2058   // delete must set but not increment tupVersion
2059   if (!regOperPtr->is_first_operation())
2060   {
2061     Operationrec* prevOp= req_struct->prevOpPtr.p;
2062     regOperPtr->tupVersion= prevOp->tupVersion;
2063     // make copy since previous op is committed before this one
2064     const Tuple_header* org = get_copy_tuple(&prevOp->m_copy_tuple_location);
2065     Uint32 len = regTabPtr->total_rec_size -
2066       Uint32(((Uint32*)dst) -
2067              get_copy_tuple_raw(&regOperPtr->m_copy_tuple_location));
2068     memcpy(dst, org, 4 * len);
2069     req_struct->m_tuple_ptr = dst;
2070   }
2071   else
2072   {
2073     regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
2074     if (regTabPtr->m_no_of_disk_attributes)
2075     {
2076       dst->m_header_bits = req_struct->m_tuple_ptr->m_header_bits;
2077       memcpy(dst->get_disk_ref_ptr(regTabPtr),
2078 	     req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
2079              sizeof(Local_key));
2080     }
2081   }
2082   req_struct->changeMask.set();
2083   set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
2084 
2085   if(disk && regOperPtr->m_undo_buffer_space == 0)
2086   {
2087     regOperPtr->op_struct.m_wait_log_buffer = 1;
2088     regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
2089     Uint32 sz= regOperPtr->m_undo_buffer_space=
2090       (sizeof(Dbtup::Disk_undo::Free) >> 2) +
2091       regTabPtr->m_offsets[DD].m_fix_header_size - 1;
2092 
2093     D("Logfile_client - handleDeleteReq");
2094     Logfile_client lgman(this, c_lgman, regFragPtr->m_logfile_group_id);
2095     terrorCode= lgman.alloc_log_space(sz);
2096     if(unlikely(terrorCode))
2097     {
2098       regOperPtr->m_undo_buffer_space= 0;
2099       goto error;
2100     }
2101   }
2102 
2103   set_tuple_state(regOperPtr, TUPLE_PREPARED);
2104 
2105   if (req_struct->attrinfo_len == 0)
2106   {
2107     return 0;
2108   }
2109 
2110   if (regTabPtr->need_expand(disk))
2111   {
2112     prepare_read(req_struct, regTabPtr, disk);
2113   }
2114 
2115   {
2116     Uint32 RlogSize;
2117     int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
2118     if (ret == 0 && (RlogSize= req_struct->log_size))
2119     {
2120       jam();
2121       sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2122     }
2123     return ret;
2124   }
2125 
2126 error:
2127   tupkeyErrorLab(req_struct);
2128   return -1;
2129 }
2130 
2131 int
handleRefreshReq(Signal * signal,Ptr<Operationrec> regOperPtr,Ptr<Fragrecord> regFragPtr,Tablerec * regTabPtr,KeyReqStruct * req_struct,bool disk)2132 Dbtup::handleRefreshReq(Signal* signal,
2133                         Ptr<Operationrec> regOperPtr,
2134                         Ptr<Fragrecord>  regFragPtr,
2135                         Tablerec* regTabPtr,
2136                         KeyReqStruct *req_struct,
2137                         bool disk)
2138 {
2139   /* Here we setup the tuple so that a transition to its current
2140    * state can be observed by SUMA's detached triggers.
2141    *
2142    * If the tuple does not exist then we fabricate a tuple
2143    * so that it can appear to be 'deleted'.
2144    *   The fabricated tuple may have invalid NULL values etc.
2145    * If the tuple does exist then we fabricate a null-change
2146    * update to the tuple.
2147    *
2148    * The logic differs depending on whether there are already
2149    * other operations on the tuple in this transaction.
2150    * No other operations (including Refresh) are allowed after
2151    * a refresh.
2152    */
2153   Uint32 refresh_case;
2154   if (regOperPtr.p->is_first_operation())
2155   {
2156     jam();
2157     if (Local_key::isInvalid(req_struct->frag_page_id,
2158                              regOperPtr.p->m_tuple_location.m_page_idx))
2159     {
2160       jam();
2161       refresh_case = Operationrec::RF_SINGLE_NOT_EXIST;
2162       //ndbout_c("case 1");
2163       /**
2164        * This is refresh of non-existing tuple...
2165        *   i.e "delete", reuse initial insert
2166        */
2167        Local_key accminupdate;
2168        Local_key * accminupdateptr = &accminupdate;
2169 
2170        /**
2171         * We don't need ...in this scenario
2172         * - disk
2173         * - default values
2174         */
2175        Uint32 save_disk = regTabPtr->m_no_of_disk_attributes;
2176        Local_key save_defaults = regTabPtr->m_default_value_location;
2177        Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2178          regTabPtr->notNullAttributeMask;
2179 
2180        regTabPtr->m_no_of_disk_attributes = 0;
2181        regTabPtr->m_default_value_location.setNull();
2182        regOperPtr.p->op_struct.op_type = ZINSERT;
2183 
2184        /**
2185         * Update notNullAttributeMask  to only include primary keys
2186         */
2187        regTabPtr->notNullAttributeMask.clear();
2188        const Uint32 * primarykeys =
2189          (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2190        for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2191          regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2192 
2193        int res = handleInsertReq(signal, regOperPtr,
2194                                  regFragPtr, regTabPtr, req_struct,
2195                                  &accminupdateptr);
2196 
2197        regTabPtr->m_no_of_disk_attributes = save_disk;
2198        regTabPtr->m_default_value_location = save_defaults;
2199        regTabPtr->notNullAttributeMask = save_mask;
2200 
2201        if (unlikely(res == -1))
2202        {
2203          return -1;
2204        }
2205 
2206        regOperPtr.p->op_struct.op_type = ZREFRESH;
2207 
2208        if (accminupdateptr)
2209        {
2210        /**
2211           * Update ACC local-key, once *everything* has completed succesfully
2212           */
2213          c_lqh->accminupdate(signal,
2214                              regOperPtr.p->userpointer,
2215                              accminupdateptr);
2216        }
2217     }
2218     else
2219     {
2220       refresh_case = Operationrec::RF_SINGLE_EXIST;
2221       //ndbout_c("case 2");
2222       jam();
2223 
2224       Uint32 tup_version_save = req_struct->m_tuple_ptr->get_tuple_version();
2225       Uint32 new_tup_version = decr_tup_version(tup_version_save);
2226       Tuple_header* origTuple = req_struct->m_tuple_ptr;
2227       origTuple->set_tuple_version(new_tup_version);
2228       int res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2229                                 regTabPtr, req_struct, disk);
2230       /* Now we must reset the original tuple header back
2231        * to the original version.
2232        * The copy tuple will have the correct version due to
2233        * the update incrementing it.
2234        * On commit, the tuple becomes the copy tuple.
2235        * On abort, the original tuple remains.  If we don't
2236        * reset it here, then aborts cause the version to
2237        * decrease
2238        */
2239       origTuple->set_tuple_version(tup_version_save);
2240       if (res == -1)
2241         return -1;
2242     }
2243   }
2244   else
2245   {
2246     /* Not first operation on tuple in transaction */
2247     jam();
2248 
2249     Uint32 tup_version_save = req_struct->prevOpPtr.p->tupVersion;
2250     Uint32 new_tup_version = decr_tup_version(tup_version_save);
2251     req_struct->prevOpPtr.p->tupVersion = new_tup_version;
2252 
2253     int res;
2254     if (req_struct->prevOpPtr.p->op_struct.op_type == ZDELETE)
2255     {
2256       refresh_case = Operationrec::RF_MULTI_NOT_EXIST;
2257       //ndbout_c("case 3");
2258 
2259       jam();
2260       /**
2261        * We don't need ...in this scenario
2262        * - default values
2263        *
2264        * We keep disk attributes to avoid issues with 'insert'
2265        */
2266       Local_key save_defaults = regTabPtr->m_default_value_location;
2267       Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
2268         regTabPtr->notNullAttributeMask;
2269 
2270       regTabPtr->m_default_value_location.setNull();
2271       regOperPtr.p->op_struct.op_type = ZINSERT;
2272 
2273       /**
2274        * Update notNullAttributeMask  to only include primary keys
2275        */
2276       regTabPtr->notNullAttributeMask.clear();
2277       const Uint32 * primarykeys =
2278         (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
2279       for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
2280         regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
2281 
2282       /**
2283        * This is multi-update + DELETE + REFRESH
2284        */
2285       Local_key * accminupdateptr = 0;
2286       res = handleInsertReq(signal, regOperPtr,
2287                             regFragPtr, regTabPtr, req_struct,
2288                             &accminupdateptr);
2289 
2290       regTabPtr->m_default_value_location = save_defaults;
2291       regTabPtr->notNullAttributeMask = save_mask;
2292 
2293       if (unlikely(res == -1))
2294       {
2295         return -1;
2296       }
2297 
2298       regOperPtr.p->op_struct.op_type = ZREFRESH;
2299     }
2300     else
2301     {
2302       jam();
2303       refresh_case = Operationrec::RF_MULTI_EXIST;
2304       //ndbout_c("case 4");
2305       /**
2306        * This is multi-update + INSERT/UPDATE + REFRESH
2307        */
2308       res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
2309                             regTabPtr, req_struct, disk);
2310     }
2311     req_struct->prevOpPtr.p->tupVersion = tup_version_save;
2312     if (res == -1)
2313       return -1;
2314   }
2315 
2316   /* Store the refresh scenario in the copy tuple location */
2317   // TODO : Verify this is never used as a copy tuple location!
2318   regOperPtr.p->m_copy_tuple_location.m_file_no = refresh_case;
2319   return 0;
2320 }
2321 
2322 bool
checkNullAttributes(KeyReqStruct * req_struct,Tablerec * regTabPtr)2323 Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
2324                            Tablerec* regTabPtr)
2325 {
2326 // Implement checking of updating all not null attributes in an insert here.
2327   Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
2328   /*
2329    * The idea here is maybe that changeMask is not-null attributes
2330    * and must contain notNullAttributeMask.  But:
2331    *
2332    * 1. changeMask has all bits set on insert
2333    * 2. not-null is checked in each UpdateFunction
2334    * 3. the code below does not work except trivially due to 1.
2335    *
2336    * XXX remove or fix
2337    */
2338   attributeMask.clear();
2339   attributeMask.bitOR(req_struct->changeMask);
2340   attributeMask.bitAND(regTabPtr->notNullAttributeMask);
2341   attributeMask.bitXOR(regTabPtr->notNullAttributeMask);
2342   if (!attributeMask.isclear()) {
2343     return false;
2344   }
2345   return true;
2346 }
2347 
2348 /* ---------------------------------------------------------------- */
2349 /* THIS IS THE START OF THE INTERPRETED EXECUTION OF UPDATES. WE    */
2350 /* START BY LINKING ALL ATTRINFO'S IN A DOUBLY LINKED LIST (THEY ARE*/
2351 /* ALREADY IN A LINKED LIST). WE ALLOCATE A REGISTER MEMORY (EQUAL  */
2352 /* TO AN ATTRINFO RECORD). THE INTERPRETER GOES THROUGH FOUR  PHASES*/
2353 /* DURING THE FIRST PHASE IT IS ONLY ALLOWED TO READ ATTRIBUTES THAT*/
2354 /* ARE SENT TO THE CLIENT APPLICATION. DURING THE SECOND PHASE IT IS*/
2355 /* ALLOWED TO READ FROM ATTRIBUTES INTO REGISTERS, TO UPDATE        */
2356 /* ATTRIBUTES BASED ON EITHER A CONSTANT VALUE OR A REGISTER VALUE, */
2357 /* A DIVERSE SET OF OPERATIONS ON REGISTERS ARE AVAILABLE AS WELL.  */
2358 /* IT IS ALSO POSSIBLE TO PERFORM JUMPS WITHIN THE INSTRUCTIONS THAT*/
2359 /* BELONGS TO THE SECOND PHASE. ALSO SUBROUTINES CAN BE CALLED IN   */
2360 /* THIS PHASE. THE THIRD PHASE IS TO AGAIN READ ATTRIBUTES AND      */
2361 /* FINALLY THE FOURTH PHASE READS SELECTED REGISTERS AND SEND THEM  */
2362 /* TO THE CLIENT APPLICATION.                                       */
2363 /* THERE IS A FIFTH REGION WHICH CONTAINS SUBROUTINES CALLABLE FROM */
2364 /* THE INTERPRETER EXECUTION REGION.                                */
2365 /* THE FIRST FIVE WORDS WILL GIVE THE LENGTH OF THE FIVEE REGIONS   */
2366 /*                                                                  */
2367 /* THIS MEANS THAT FROM THE APPLICATIONS POINT OF VIEW THE DATABASE */
2368 /* CAN HANDLE SUBROUTINE CALLS WHERE THE CODE IS SENT IN THE REQUEST*/
2369 /* THE RETURN PARAMETERS ARE FIXED AND CAN EITHER BE GENERATED      */
2370 /* BEFORE THE EXECUTION OF THE ROUTINE OR AFTER.                    */
2371 /*                                                                  */
2372 /* IN LATER VERSIONS WE WILL ADD MORE THINGS LIKE THE POSSIBILITY   */
2373 /* TO ALLOCATE MEMORY AND USE THIS AS LOCAL STORAGE. IT IS ALSO     */
2374 /* IMAGINABLE TO HAVE SPECIAL ROUTINES THAT CAN PERFORM CERTAIN     */
2375 /* OPERATIONS ON BLOB'S DEPENDENT ON WHAT THE BLOB REPRESENTS.      */
2376 /*                                                                  */
2377 /*                                                                  */
2378 /*       -----------------------------------------                  */
2379 /*       +   INITIAL READ REGION                 +                  */
2380 /*       -----------------------------------------                  */
2381 /*       +   INTERPRETED EXECUTE  REGION         +                  */
2382 /*       -----------------------------------------                  */
2383 /*       +   FINAL UPDATE REGION                 +                  */
2384 /*       -----------------------------------------                  */
2385 /*       +   FINAL READ REGION                   +                  */
2386 /*       -----------------------------------------                  */
2387 /*       +   SUBROUTINE REGION                   +                  */
2388 /*       -----------------------------------------                  */
2389 /* ---------------------------------------------------------------- */
2390 /* ---------------------------------------------------------------- */
2391 /* ----------------- INTERPRETED EXECUTION  ----------------------- */
2392 /* ---------------------------------------------------------------- */
interpreterStartLab(Signal * signal,KeyReqStruct * req_struct)2393 int Dbtup::interpreterStartLab(Signal* signal,
2394                                KeyReqStruct *req_struct)
2395 {
2396   Operationrec * const regOperPtr = req_struct->operPtrP;
2397   int TnoDataRW;
2398   Uint32 RtotalLen, start_index, dstLen;
2399   Uint32 *dst;
2400 
2401   Uint32 RinitReadLen= cinBuffer[0];
2402   Uint32 RexecRegionLen= cinBuffer[1];
2403   Uint32 RfinalUpdateLen= cinBuffer[2];
2404   Uint32 RfinalRLen= cinBuffer[3];
2405   Uint32 RsubLen= cinBuffer[4];
2406 
2407   Uint32 RattrinbufLen= req_struct->attrinfo_len;
2408   const BlockReference sendBref= req_struct->rec_blockref;
2409 
2410   const Uint32 node = refToNode(sendBref);
2411   if(node != 0 && node != getOwnNodeId()) {
2412     start_index= 25;
2413   } else {
2414     jam();
2415     /**
2416      * execute direct
2417      */
2418     start_index= 3;
2419   }
2420   dst= &signal->theData[start_index];
2421   dstLen= (MAX_READ / 4) - start_index;
2422 
2423   RtotalLen= RinitReadLen;
2424   RtotalLen += RexecRegionLen;
2425   RtotalLen += RfinalUpdateLen;
2426   RtotalLen += RfinalRLen;
2427   RtotalLen += RsubLen;
2428 
2429   Uint32 RattroutCounter= 0;
2430   Uint32 RinstructionCounter= 5;
2431 
2432   /* All information to be logged/propagated to replicas
2433    * is generated from here on so reset the log word count
2434    */
2435   Uint32 RlogSize= req_struct->log_size= 0;
2436   if (((RtotalLen + 5) == RattrinbufLen) &&
2437       (RattrinbufLen >= 5) &&
2438       (RattrinbufLen < ZATTR_BUFFER_SIZE)) {
2439     /* ---------------------------------------------------------------- */
2440     // We start by checking consistency. We must have the first five
2441     // words of the ATTRINFO to give us the length of the regions. The
2442     // size of these regions must be the same as the total ATTRINFO
2443     // length and finally the total length must be within the limits.
2444     /* ---------------------------------------------------------------- */
2445 
2446     if (RinitReadLen > 0) {
2447       jam();
2448       /* ---------------------------------------------------------------- */
2449       // The first step that can be taken in the interpreter is to read
2450       // data of the tuple before any updates have been applied.
2451       /* ---------------------------------------------------------------- */
2452       TnoDataRW= readAttributes(req_struct,
2453 				 &cinBuffer[5],
2454 				 RinitReadLen,
2455 				 &dst[0],
2456 				 dstLen,
2457                                  false);
2458       if (TnoDataRW >= 0) {
2459 	RattroutCounter= TnoDataRW;
2460 	RinstructionCounter += RinitReadLen;
2461       } else {
2462 	jam();
2463         terrorCode = Uint32(-TnoDataRW);
2464 	tupkeyErrorLab(req_struct);
2465 	return -1;
2466       }
2467     }
2468     if (RexecRegionLen > 0) {
2469       jam();
2470       /* ---------------------------------------------------------------- */
2471       // The next step is the actual interpreted execution. This executes
2472       // a register-based virtual machine which can read and write attributes
2473       // to and from registers.
2474       /* ---------------------------------------------------------------- */
2475       Uint32 RsubPC= RinstructionCounter + RexecRegionLen
2476         + RfinalUpdateLen + RfinalRLen;
2477       TnoDataRW= interpreterNextLab(signal,
2478                                      req_struct,
2479 				     &clogMemBuffer[0],
2480 				     &cinBuffer[RinstructionCounter],
2481 				     RexecRegionLen,
2482 				     &cinBuffer[RsubPC],
2483 				     RsubLen,
2484 				     &coutBuffer[0],
2485 				     sizeof(coutBuffer) / 4);
2486       if (TnoDataRW != -1) {
2487 	RinstructionCounter += RexecRegionLen;
2488 	RlogSize= TnoDataRW;
2489       } else {
2490 	jam();
2491 	/**
2492 	 * TUPKEY REF is sent from within interpreter
2493 	 */
2494 	return -1;
2495       }
2496     }
2497 
2498     if ((RlogSize > 0) ||
2499         (RfinalUpdateLen > 0))
2500     {
2501       /* Operation updates row,
2502        * reset author pseudo-col before update takes effect
2503        * This should probably occur only if the interpreted program
2504        * did not explicitly write the value, but that requires a bit
2505        * to record whether the value has been written.
2506        */
2507       Tablerec* regTabPtr = req_struct->tablePtrP;
2508       Tuple_header* dst = req_struct->m_tuple_ptr;
2509 
2510       if (regTabPtr->m_bits & Tablerec::TR_ExtraRowAuthorBits)
2511       {
2512         Uint32 attrId =
2513           regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowAuthorBits>();
2514 
2515         store_extra_row_bits(attrId, regTabPtr, dst, /* default */ 0, false);
2516       }
2517     }
2518 
2519     if (RfinalUpdateLen > 0) {
2520       jam();
2521       /* ---------------------------------------------------------------- */
2522       // We can also apply a set of updates without any conditions as part
2523       // of the interpreted execution.
2524       /* ---------------------------------------------------------------- */
2525       if (regOperPtr->op_struct.op_type == ZUPDATE) {
2526 	TnoDataRW= updateAttributes(req_struct,
2527 				     &cinBuffer[RinstructionCounter],
2528 				     RfinalUpdateLen);
2529 	if (TnoDataRW >= 0) {
2530 	  MEMCOPY_NO_WORDS(&clogMemBuffer[RlogSize],
2531 			   &cinBuffer[RinstructionCounter],
2532 			   RfinalUpdateLen);
2533 	  RinstructionCounter += RfinalUpdateLen;
2534 	  RlogSize += RfinalUpdateLen;
2535 	} else {
2536 	  jam();
2537           terrorCode = Uint32(-TnoDataRW);
2538 	  tupkeyErrorLab(req_struct);
2539 	  return -1;
2540 	}
2541       } else {
2542 	return TUPKEY_abort(req_struct, 19);
2543       }
2544     }
2545     if (RfinalRLen > 0) {
2546       jam();
2547       /* ---------------------------------------------------------------- */
2548       // The final action is that we can also read the tuple after it has
2549       // been updated.
2550       /* ---------------------------------------------------------------- */
2551       TnoDataRW= readAttributes(req_struct,
2552 				 &cinBuffer[RinstructionCounter],
2553 				 RfinalRLen,
2554 				 &dst[RattroutCounter],
2555 				 (dstLen - RattroutCounter),
2556                                  false);
2557       if (TnoDataRW >= 0) {
2558 	RattroutCounter += TnoDataRW;
2559       } else {
2560 	jam();
2561         terrorCode = Uint32(-TnoDataRW);
2562 	tupkeyErrorLab(req_struct);
2563 	return -1;
2564       }
2565     }
2566     /* Add log words explicitly generated here to existing log size
2567      *  - readAttributes can generate log for ANYVALUE column
2568      *    It adds the words directly to req_struct->log_size
2569      *    This is used for ANYVALUE and interpreted delete.
2570      */
2571     req_struct->log_size+= RlogSize;
2572     req_struct->read_length += RattroutCounter;
2573     sendReadAttrinfo(signal, req_struct, RattroutCounter, regOperPtr);
2574     if (RlogSize > 0) {
2575       return sendLogAttrinfo(signal, req_struct, RlogSize, regOperPtr);
2576     }
2577     return 0;
2578   } else {
2579     return TUPKEY_abort(req_struct, 22);
2580   }
2581 }
2582 
2583 /* ---------------------------------------------------------------- */
2584 /*       WHEN EXECUTION IS INTERPRETED WE NEED TO SEND SOME ATTRINFO*/
2585 /*       BACK TO LQH FOR LOGGING AND SENDING TO BACKUP AND STANDBY  */
2586 /*       NODES.                                                     */
2587 /*       INPUT:  LOG_ATTRINFOPTR         WHERE TO FETCH DATA FROM   */
2588 /*               TLOG_START              FIRST INDEX TO LOG         */
2589 /*               TLOG_END                LAST INDEX + 1 TO LOG      */
2590 /* ---------------------------------------------------------------- */
sendLogAttrinfo(Signal * signal,KeyReqStruct * req_struct,Uint32 TlogSize,Operationrec * const regOperPtr)2591 int Dbtup::sendLogAttrinfo(Signal* signal,
2592                            KeyReqStruct * req_struct,
2593                            Uint32 TlogSize,
2594                            Operationrec *  const regOperPtr)
2595 
2596 {
2597   /* Copy from Log buffer to segmented section,
2598    * then attach to ATTRINFO and execute direct
2599    * to LQH
2600    */
2601   ndbrequire( TlogSize > 0 );
2602   Uint32 longSectionIVal= RNIL;
2603   bool ok= appendToSection(longSectionIVal,
2604                            &clogMemBuffer[0],
2605                            TlogSize);
2606   if (unlikely(!ok))
2607   {
2608     /* Resource error, abort transaction */
2609     terrorCode = ZSEIZE_ATTRINBUFREC_ERROR;
2610     tupkeyErrorLab(req_struct);
2611     return -1;
2612   }
2613 
2614   /* Send a TUP_ATTRINFO signal to LQH, which contains
2615    * the relevant user pointer and the attrinfo section's
2616    * IVAL
2617    */
2618   signal->theData[0]= regOperPtr->userpointer;
2619   signal->theData[1]= TlogSize;
2620   signal->theData[2]= longSectionIVal;
2621 
2622   EXECUTE_DIRECT(DBLQH,
2623                  GSN_TUP_ATTRINFO,
2624                  signal,
2625                  3);
2626   return 0;
2627 }
2628 
2629 inline
2630 Uint32
brancher(Uint32 TheInstruction,Uint32 TprogramCounter)2631 Dbtup::brancher(Uint32 TheInstruction, Uint32 TprogramCounter)
2632 {
2633   Uint32 TbranchDirection= TheInstruction >> 31;
2634   Uint32 TbranchLength= (TheInstruction >> 16) & 0x7fff;
2635   TprogramCounter--;
2636   if (TbranchDirection == 1) {
2637     jam();
2638     /* ---------------------------------------------------------------- */
2639     /*       WE JUMP BACKWARDS.                                         */
2640     /* ---------------------------------------------------------------- */
2641     return (TprogramCounter - TbranchLength);
2642   } else {
2643     jam();
2644     /* ---------------------------------------------------------------- */
2645     /*       WE JUMP FORWARD.                                           */
2646     /* ---------------------------------------------------------------- */
2647     return (TprogramCounter + TbranchLength);
2648   }
2649 }
2650 
2651 const Uint32 *
lookupInterpreterParameter(Uint32 paramNo,const Uint32 * subptr,Uint32 sublen) const2652 Dbtup::lookupInterpreterParameter(Uint32 paramNo,
2653                                   const Uint32 * subptr,
2654                                   Uint32 sublen) const
2655 {
2656   /**
2657    * The parameters...are stored in the subroutine section
2658    *
2659    * WORD2         WORD3       WORD4         WORD5
2660    * [ P0 HEADER ] [ P0 DATA ] [ P1 HEADER ] [ P1 DATA ]
2661    *
2662    *
2663    * len=4 <=> 1 word
2664    */
2665   Uint32 pos = 0;
2666   while (paramNo)
2667   {
2668     const Uint32 * head = subptr + pos;
2669     Uint32 len = AttributeHeader::getDataSize(* head);
2670     paramNo --;
2671     pos += 1 + len;
2672     if (unlikely(pos >= sublen))
2673       return 0;
2674   }
2675 
2676   const Uint32 * head = subptr + pos;
2677   Uint32 len = AttributeHeader::getDataSize(* head);
2678   if (unlikely(pos + 1 + len > sublen))
2679     return 0;
2680 
2681   return head;
2682 }
2683 
interpreterNextLab(Signal * signal,KeyReqStruct * req_struct,Uint32 * logMemory,Uint32 * mainProgram,Uint32 TmainProgLen,Uint32 * subroutineProg,Uint32 TsubroutineLen,Uint32 * tmpArea,Uint32 tmpAreaSz)2684 int Dbtup::interpreterNextLab(Signal* signal,
2685                               KeyReqStruct* req_struct,
2686                               Uint32* logMemory,
2687                               Uint32* mainProgram,
2688                               Uint32 TmainProgLen,
2689                               Uint32* subroutineProg,
2690                               Uint32 TsubroutineLen,
2691 			      Uint32 * tmpArea,
2692 			      Uint32 tmpAreaSz)
2693 {
2694   register Uint32* TcurrentProgram= mainProgram;
2695   register Uint32 TcurrentSize= TmainProgLen;
2696   register Uint32 RnoOfInstructions= 0;
2697   register Uint32 TprogramCounter= 0;
2698   register Uint32 theInstruction;
2699   register Uint32 theRegister;
2700   Uint32 TdataWritten= 0;
2701   Uint32 RstackPtr= 0;
2702   union {
2703     Uint32 TregMemBuffer[32];
2704     Uint64 align[16];
2705   };
2706   (void)align; // kill warning
2707   Uint32 TstackMemBuffer[32];
2708 
2709   /* ---------------------------------------------------------------- */
2710   // Initialise all 8 registers to contain the NULL value.
2711   // In this version we can handle 32 and 64 bit unsigned integers.
2712   // They are handled as 64 bit values. Thus the 32 most significant
2713   // bits are zeroed for 32 bit values.
2714   /* ---------------------------------------------------------------- */
2715   TregMemBuffer[0]= 0;
2716   TregMemBuffer[4]= 0;
2717   TregMemBuffer[8]= 0;
2718   TregMemBuffer[12]= 0;
2719   TregMemBuffer[16]= 0;
2720   TregMemBuffer[20]= 0;
2721   TregMemBuffer[24]= 0;
2722   TregMemBuffer[28]= 0;
2723   Uint32 tmpHabitant= ~0;
2724 
2725   while (RnoOfInstructions < 8000) {
2726     /* ---------------------------------------------------------------- */
2727     /* EXECUTE THE NEXT INTERPRETER INSTRUCTION.                        */
2728     /* ---------------------------------------------------------------- */
2729     RnoOfInstructions++;
2730     theInstruction= TcurrentProgram[TprogramCounter];
2731     theRegister= Interpreter::getReg1(theInstruction) << 2;
2732 #ifdef TRACE_INTERPRETER
2733     ndbout_c("Interpreter : RnoOfInstructions : %u.  TprogramCounter : %u.  Opcode : %u",
2734              RnoOfInstructions, TprogramCounter, Interpreter::getOpCode(theInstruction));
2735 #endif
2736     if (TprogramCounter < TcurrentSize) {
2737       TprogramCounter++;
2738       switch (Interpreter::getOpCode(theInstruction)) {
2739       case Interpreter::READ_ATTR_INTO_REG:
2740 	jam();
2741 	/* ---------------------------------------------------------------- */
2742 	// Read an attribute from the tuple into a register.
2743 	// While reading an attribute we allow the attribute to be an array
2744 	// as long as it fits in the 64 bits of the register.
2745 	/* ---------------------------------------------------------------- */
2746 	{
2747 	  Uint32 theAttrinfo= theInstruction;
2748 	  int TnoDataRW= readAttributes(req_struct,
2749 				     &theAttrinfo,
2750 				     (Uint32)1,
2751 				     &TregMemBuffer[theRegister],
2752 				     (Uint32)3,
2753                                      false);
2754 	  if (TnoDataRW == 2) {
2755 	    /* ------------------------------------------------------------- */
2756 	    // Two words read means that we get the instruction plus one 32
2757 	    // word read. Thus we set the register to be a 32 bit register.
2758 	    /* ------------------------------------------------------------- */
2759 	    TregMemBuffer[theRegister]= 0x50;
2760             // arithmetic conversion if big-endian
2761             * (Int64*)(TregMemBuffer+theRegister+2)= TregMemBuffer[theRegister+1];
2762 	  } else if (TnoDataRW == 3) {
2763 	    /* ------------------------------------------------------------- */
2764 	    // Three words read means that we get the instruction plus two
2765 	    // 32 words read. Thus we set the register to be a 64 bit register.
2766 	    /* ------------------------------------------------------------- */
2767 	    TregMemBuffer[theRegister]= 0x60;
2768             TregMemBuffer[theRegister+3]= TregMemBuffer[theRegister+2];
2769             TregMemBuffer[theRegister+2]= TregMemBuffer[theRegister+1];
2770 	  } else if (TnoDataRW == 1) {
2771 	    /* ------------------------------------------------------------- */
2772 	    // One word read means that we must have read a NULL value. We set
2773 	    // the register to indicate a NULL value.
2774 	    /* ------------------------------------------------------------- */
2775 	    TregMemBuffer[theRegister]= 0;
2776 	    TregMemBuffer[theRegister + 2]= 0;
2777 	    TregMemBuffer[theRegister + 3]= 0;
2778 	  } else if (TnoDataRW < 0) {
2779 	    jam();
2780             terrorCode = Uint32(-TnoDataRW);
2781 	    tupkeyErrorLab(req_struct);
2782 	    return -1;
2783 	  } else {
2784 	    /* ------------------------------------------------------------- */
2785 	    // Any other return value from the read attribute here is not
2786 	    // allowed and will lead to a system crash.
2787 	    /* ------------------------------------------------------------- */
2788 	    ndbrequire(false);
2789 	  }
2790 	  break;
2791 	}
2792 
2793       case Interpreter::WRITE_ATTR_FROM_REG:
2794 	jam();
2795 	{
2796 	  Uint32 TattrId= theInstruction >> 16;
2797 	  Uint32 TattrDescrIndex= req_struct->tablePtrP->tabDescriptor +
2798 	    (TattrId << ZAD_LOG_SIZE);
2799 	  Uint32 TattrDesc1= tableDescriptor[TattrDescrIndex].tabDescr;
2800 	  Uint32 TregType= TregMemBuffer[theRegister];
2801 
2802 	  /* --------------------------------------------------------------- */
2803 	  // Calculate the number of words of this attribute.
2804 	  // We allow writes into arrays as long as they fit into the 64 bit
2805 	  // register size.
2806 	  /* --------------------------------------------------------------- */
2807           Uint32 TattrNoOfWords = AttributeDescriptor::getSizeInWords(TattrDesc1);
2808 	  Uint32 Toptype = req_struct->operPtrP->op_struct.op_type;
2809 	  Uint32 TdataForUpdate[3];
2810 	  Uint32 Tlen;
2811 
2812 	  AttributeHeader ah(TattrId, TattrNoOfWords << 2);
2813           TdataForUpdate[0]= ah.m_value;
2814 	  TdataForUpdate[1]= TregMemBuffer[theRegister + 2];
2815 	  TdataForUpdate[2]= TregMemBuffer[theRegister + 3];
2816 	  Tlen= TattrNoOfWords + 1;
2817 	  if (Toptype == ZUPDATE) {
2818 	    if (TattrNoOfWords <= 2) {
2819               if (TattrNoOfWords == 1) {
2820                 // arithmetic conversion if big-endian
2821                 Int64 * tmp = new (&TregMemBuffer[theRegister + 2]) Int64;
2822                 TdataForUpdate[1] = Uint32(* tmp);
2823                 TdataForUpdate[2] = 0;
2824               }
2825 	      if (TregType == 0) {
2826 		/* --------------------------------------------------------- */
2827 		// Write a NULL value into the attribute
2828 		/* --------------------------------------------------------- */
2829 		ah.setNULL();
2830                 TdataForUpdate[0]= ah.m_value;
2831 		Tlen= 1;
2832 	      }
2833 	      int TnoDataRW= updateAttributes(req_struct,
2834 					   &TdataForUpdate[0],
2835 					   Tlen);
2836 	      if (TnoDataRW >= 0) {
2837 		/* --------------------------------------------------------- */
2838 		// Write the written data also into the log buffer so that it
2839 		// will be logged.
2840 		/* --------------------------------------------------------- */
2841 		logMemory[TdataWritten + 0]= TdataForUpdate[0];
2842 		logMemory[TdataWritten + 1]= TdataForUpdate[1];
2843 		logMemory[TdataWritten + 2]= TdataForUpdate[2];
2844 		TdataWritten += Tlen;
2845 	      } else {
2846                 terrorCode = Uint32(-TnoDataRW);
2847 		tupkeyErrorLab(req_struct);
2848 		return -1;
2849 	      }
2850 	    } else {
2851 	      return TUPKEY_abort(req_struct, 15);
2852 	    }
2853 	  } else {
2854 	    return TUPKEY_abort(req_struct, 16);
2855 	  }
2856 	  break;
2857 	}
2858 
2859       case Interpreter::LOAD_CONST_NULL:
2860 	jam();
2861 	TregMemBuffer[theRegister]= 0;	/* NULL INDICATOR */
2862 	break;
2863 
2864       case Interpreter::LOAD_CONST16:
2865 	jam();
2866 	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
2867 	* (Int64*)(TregMemBuffer+theRegister+2)= theInstruction >> 16;
2868 	break;
2869 
2870       case Interpreter::LOAD_CONST32:
2871 	jam();
2872 	TregMemBuffer[theRegister]= 0x50;	/* 32 BIT UNSIGNED CONSTANT */
2873 	* (Int64*)(TregMemBuffer+theRegister+2)= *
2874 	  (TcurrentProgram+TprogramCounter);
2875 	TprogramCounter++;
2876 	break;
2877 
2878       case Interpreter::LOAD_CONST64:
2879 	jam();
2880 	TregMemBuffer[theRegister]= 0x60;	/* 64 BIT UNSIGNED CONSTANT */
2881         TregMemBuffer[theRegister + 2 ]= * (TcurrentProgram +
2882                                              TprogramCounter++);
2883         TregMemBuffer[theRegister + 3 ]= * (TcurrentProgram +
2884                                              TprogramCounter++);
2885 	break;
2886 
2887       case Interpreter::ADD_REG_REG:
2888 	jam();
2889 	{
2890 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2891 	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2892 
2893 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2894 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2895 
2896 
2897 	  Uint32 TleftType= TregMemBuffer[theRegister];
2898 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2899 
2900 	  if ((TleftType | TrightType) != 0) {
2901 	    Uint64 Tdest0= Tleft0 + Tright0;
2902 	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2903 	    TregMemBuffer[TdestRegister]= 0x60;
2904 	  } else {
2905 	    return TUPKEY_abort(req_struct, 20);
2906 	  }
2907 	  break;
2908 	}
2909 
2910       case Interpreter::SUB_REG_REG:
2911 	jam();
2912 	{
2913 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2914 	  Uint32 TdestRegister= Interpreter::getReg3(theInstruction) << 2;
2915 
2916 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2917 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
2918 
2919 	  Uint32 TleftType= TregMemBuffer[theRegister];
2920 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
2921 
2922 	  if ((TleftType | TrightType) != 0) {
2923 	    Int64 Tdest0= Tleft0 - Tright0;
2924 	    * (Int64*)(TregMemBuffer+TdestRegister+2)= Tdest0;
2925 	    TregMemBuffer[TdestRegister]= 0x60;
2926 	  } else {
2927 	    return TUPKEY_abort(req_struct, 20);
2928 	  }
2929 	  break;
2930 	}
2931 
2932       case Interpreter::BRANCH:
2933 	TprogramCounter= brancher(theInstruction, TprogramCounter);
2934 	break;
2935 
2936       case Interpreter::BRANCH_REG_EQ_NULL:
2937 	if (TregMemBuffer[theRegister] != 0) {
2938 	  jam();
2939 	  continue;
2940 	} else {
2941 	  jam();
2942 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
2943 	}
2944 	break;
2945 
2946       case Interpreter::BRANCH_REG_NE_NULL:
2947 	if (TregMemBuffer[theRegister] == 0) {
2948 	  jam();
2949 	  continue;
2950 	} else {
2951 	  jam();
2952 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
2953 	}
2954 	break;
2955 
2956 
2957       case Interpreter::BRANCH_EQ_REG_REG:
2958 	{
2959 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2960 
2961 	  Uint32 TleftType= TregMemBuffer[theRegister];
2962 	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2963 	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2964 
2965 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2966 	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2967 	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2968 	  if ((TrightType | TleftType) != 0) {
2969 	    jam();
2970 	    if ((Tleft0 == Tright0) && (Tleft1 == Tright1)) {
2971 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2972 	    }
2973 	  } else {
2974 	    return TUPKEY_abort(req_struct, 23);
2975 	  }
2976 	  break;
2977 	}
2978 
2979       case Interpreter::BRANCH_NE_REG_REG:
2980 	{
2981 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
2982 
2983 	  Uint32 TleftType= TregMemBuffer[theRegister];
2984 	  Uint32 Tleft0= TregMemBuffer[theRegister + 2];
2985 	  Uint32 Tleft1= TregMemBuffer[theRegister + 3];
2986 
2987 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
2988 	  Uint32 Tright0= TregMemBuffer[TrightRegister + 2];
2989 	  Uint32 Tright1= TregMemBuffer[TrightRegister + 3];
2990 	  if ((TrightType | TleftType) != 0) {
2991 	    jam();
2992 	    if ((Tleft0 != Tright0) || (Tleft1 != Tright1)) {
2993 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
2994 	    }
2995 	  } else {
2996 	    return TUPKEY_abort(req_struct, 24);
2997 	  }
2998 	  break;
2999 	}
3000 
3001       case Interpreter::BRANCH_LT_REG_REG:
3002 	{
3003 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3004 
3005 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3006 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3007 
3008 	  Uint32 TleftType= TregMemBuffer[theRegister];
3009 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3010 
3011 
3012 	  if ((TrightType | TleftType) != 0) {
3013 	    jam();
3014 	    if (Tleft0 < Tright0) {
3015 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3016 	    }
3017 	  } else {
3018 	    return TUPKEY_abort(req_struct, 24);
3019 	  }
3020 	  break;
3021 	}
3022 
3023       case Interpreter::BRANCH_LE_REG_REG:
3024 	{
3025 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3026 
3027 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3028 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3029 
3030 	  Uint32 TleftType= TregMemBuffer[theRegister];
3031 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3032 
3033 
3034 	  if ((TrightType | TleftType) != 0) {
3035 	    jam();
3036 	    if (Tleft0 <= Tright0) {
3037 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3038 	    }
3039 	  } else {
3040 	    return TUPKEY_abort(req_struct, 26);
3041 	  }
3042 	  break;
3043 	}
3044 
3045       case Interpreter::BRANCH_GT_REG_REG:
3046 	{
3047 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3048 
3049 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3050 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3051 
3052 	  Uint32 TleftType= TregMemBuffer[theRegister];
3053 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3054 
3055 
3056 	  if ((TrightType | TleftType) != 0) {
3057 	    jam();
3058 	    if (Tleft0 > Tright0){
3059 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3060 	    }
3061 	  } else {
3062 	    return TUPKEY_abort(req_struct, 27);
3063 	  }
3064 	  break;
3065 	}
3066 
3067       case Interpreter::BRANCH_GE_REG_REG:
3068 	{
3069 	  Uint32 TrightRegister= Interpreter::getReg2(theInstruction) << 2;
3070 
3071 	  Uint32 TrightType= TregMemBuffer[TrightRegister];
3072 	  Int64 Tright0= * (Int64*)(TregMemBuffer + TrightRegister + 2);
3073 
3074 	  Uint32 TleftType= TregMemBuffer[theRegister];
3075 	  Int64 Tleft0= * (Int64*)(TregMemBuffer + theRegister + 2);
3076 
3077 
3078 	  if ((TrightType | TleftType) != 0) {
3079 	    jam();
3080 	    if (Tleft0 >= Tright0){
3081 	      TprogramCounter= brancher(theInstruction, TprogramCounter);
3082 	    }
3083 	  } else {
3084 	    return TUPKEY_abort(req_struct, 28);
3085 	  }
3086 	  break;
3087 	}
3088 
3089       case Interpreter::BRANCH_ATTR_OP_ARG_2:
3090       case Interpreter::BRANCH_ATTR_OP_ARG:{
3091 	jam();
3092 	Uint32 cond = Interpreter::getBinaryCondition(theInstruction);
3093 	Uint32 ins2 = TcurrentProgram[TprogramCounter];
3094 	Uint32 attrId = Interpreter::getBranchCol_AttrId(ins2) << 16;
3095 	Uint32 argLen = Interpreter::getBranchCol_Len(ins2);
3096         Uint32 step = argLen;
3097 
3098 	if(tmpHabitant != attrId){
3099 	  Int32 TnoDataR = readAttributes(req_struct,
3100 					  &attrId, 1,
3101 					  tmpArea, tmpAreaSz,
3102                                           false);
3103 
3104 	  if (TnoDataR < 0) {
3105 	    jam();
3106             terrorCode = Uint32(-TnoDataR);
3107 	    tupkeyErrorLab(req_struct);
3108 	    return -1;
3109 	  }
3110 	  tmpHabitant= attrId;
3111 	}
3112 
3113         // get type
3114 	attrId >>= 16;
3115 	Uint32 TattrDescrIndex = req_struct->tablePtrP->tabDescriptor +
3116 	  (attrId << ZAD_LOG_SIZE);
3117 	Uint32 TattrDesc1 = tableDescriptor[TattrDescrIndex].tabDescr;
3118 	Uint32 TattrDesc2 = tableDescriptor[TattrDescrIndex+1].tabDescr;
3119 	Uint32 typeId = AttributeDescriptor::getType(TattrDesc1);
3120 	void * cs = 0;
3121 	if(AttributeOffset::getCharsetFlag(TattrDesc2))
3122 	{
3123 	  Uint32 pos = AttributeOffset::getCharsetPos(TattrDesc2);
3124 	  cs = req_struct->tablePtrP->charsetArray[pos];
3125 	}
3126 	const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(typeId);
3127 
3128         // get data
3129 	AttributeHeader ah(tmpArea[0]);
3130         const char* s1 = (char*)&tmpArea[1];
3131         const char* s2 = (char*)&TcurrentProgram[TprogramCounter+1];
3132         // fixed length in 5.0
3133 	Uint32 attrLen = AttributeDescriptor::getSizeInBytes(TattrDesc1);
3134 
3135         if (Interpreter::getOpCode(theInstruction) ==
3136             Interpreter::BRANCH_ATTR_OP_ARG_2)
3137         {
3138           jam();
3139           Uint32 paramNo = Interpreter::getBranchCol_ParamNo(ins2);
3140           const Uint32 * paramptr = lookupInterpreterParameter(paramNo,
3141                                                                subroutineProg,
3142                                                                TsubroutineLen);
3143           if (unlikely(paramptr == 0))
3144           {
3145             jam();
3146             terrorCode = 99; // TODO
3147             tupkeyErrorLab(req_struct);
3148             return -1;
3149           }
3150 
3151           argLen = AttributeHeader::getByteSize(* paramptr);
3152           step = 0;
3153           s2 = (char*)(paramptr + 1);
3154         }
3155 
3156         if (typeId == NDB_TYPE_BIT)
3157         {
3158           /* Size in bytes for bit fields can be incorrect due to
3159            * rounding down
3160            */
3161           Uint32 bitFieldAttrLen= (AttributeDescriptor::getArraySize(TattrDesc1)
3162                                    + 7) / 8;
3163           attrLen= bitFieldAttrLen;
3164         }
3165 
3166 	bool r1_null = ah.isNULL();
3167 	bool r2_null = argLen == 0;
3168 	int res1;
3169         if (cond <= Interpreter::GE)
3170         {
3171           /* Inequality - EQ, NE, LT, LE, GT, GE */
3172           if (r1_null || r2_null) {
3173             // NULL==NULL and NULL<not-NULL
3174             res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
3175           } else {
3176 	    jam();
3177 	    if (unlikely(sqlType.m_cmp == 0))
3178 	    {
3179 	      return TUPKEY_abort(req_struct, 40);
3180 	    }
3181             res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen);
3182           }
3183 	} else {
3184           if ((cond == Interpreter::LIKE) ||
3185               (cond == Interpreter::NOT_LIKE))
3186           {
3187             if (r1_null || r2_null) {
3188               // NULL like NULL is true (has no practical use)
3189               res1 =  r1_null && r2_null ? 0 : -1;
3190             } else {
3191               jam();
3192               if (unlikely(sqlType.m_like == 0))
3193               {
3194                 return TUPKEY_abort(req_struct, 40);
3195               }
3196               res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
3197             }
3198           }
3199           else
3200           {
3201             /* AND_XX_MASK condition */
3202             ndbassert(cond <= Interpreter::AND_NE_ZERO);
3203             if (unlikely(sqlType.m_mask == 0))
3204             {
3205               return TUPKEY_abort(req_struct,40);
3206             }
3207             /* If either arg is NULL, we say COL AND MASK
3208              * NE_ZERO and NE_MASK.
3209              */
3210             if (r1_null || r2_null) {
3211               res1= 1;
3212             } else {
3213 
3214               bool cmpZero=
3215                 (cond == Interpreter::AND_EQ_ZERO) ||
3216                 (cond == Interpreter::AND_NE_ZERO);
3217 
3218               res1 = (*sqlType.m_mask)(s1, attrLen, s2, argLen, cmpZero);
3219             }
3220           }
3221         }
3222 
3223         int res = 0;
3224         switch ((Interpreter::BinaryCondition)cond) {
3225         case Interpreter::EQ:
3226           res = (res1 == 0);
3227           break;
3228         case Interpreter::NE:
3229           res = (res1 != 0);
3230           break;
3231         // note the condition is backwards
3232         case Interpreter::LT:
3233           res = (res1 > 0);
3234           break;
3235         case Interpreter::LE:
3236           res = (res1 >= 0);
3237           break;
3238         case Interpreter::GT:
3239           res = (res1 < 0);
3240           break;
3241         case Interpreter::GE:
3242           res = (res1 <= 0);
3243           break;
3244         case Interpreter::LIKE:
3245           res = (res1 == 0);
3246           break;
3247         case Interpreter::NOT_LIKE:
3248           res = (res1 == 1);
3249           break;
3250         case Interpreter::AND_EQ_MASK:
3251           res = (res1 == 0);
3252           break;
3253         case Interpreter::AND_NE_MASK:
3254           res = (res1 != 0);
3255           break;
3256         case Interpreter::AND_EQ_ZERO:
3257           res = (res1 == 0);
3258           break;
3259         case Interpreter::AND_NE_ZERO:
3260           res = (res1 != 0);
3261           break;
3262 	  // XXX handle invalid value
3263         }
3264 #ifdef TRACE_INTERPRETER
3265 	ndbout_c("cond=%u attr(%d)='%.*s'(%d) str='%.*s'(%d) res1=%d res=%d",
3266 		 cond, attrId >> 16,
3267                  attrLen, s1, attrLen, argLen, s2, argLen, res1, res);
3268 #endif
3269         if (res)
3270           TprogramCounter = brancher(theInstruction, TprogramCounter);
3271         else
3272 	{
3273           Uint32 tmp = ((step + 3) >> 2) + 1;
3274           TprogramCounter += tmp;
3275         }
3276 	break;
3277       }
3278 
3279       case Interpreter::BRANCH_ATTR_EQ_NULL:{
3280 	jam();
3281 	Uint32 ins2= TcurrentProgram[TprogramCounter];
3282 	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3283 
3284 	if (tmpHabitant != attrId){
3285 	  Int32 TnoDataR= readAttributes(req_struct,
3286 					  &attrId, 1,
3287 					  tmpArea, tmpAreaSz,
3288                                           false);
3289 
3290 	  if (TnoDataR < 0) {
3291 	    jam();
3292             terrorCode = Uint32(-TnoDataR);
3293 	    tupkeyErrorLab(req_struct);
3294 	    return -1;
3295 	  }
3296 	  tmpHabitant= attrId;
3297 	}
3298 
3299 	AttributeHeader ah(tmpArea[0]);
3300 	if (ah.isNULL()){
3301 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
3302 	} else {
3303 	  TprogramCounter ++;
3304 	}
3305 	break;
3306       }
3307 
3308       case Interpreter::BRANCH_ATTR_NE_NULL:{
3309 	jam();
3310 	Uint32 ins2= TcurrentProgram[TprogramCounter];
3311 	Uint32 attrId= Interpreter::getBranchCol_AttrId(ins2) << 16;
3312 
3313 	if (tmpHabitant != attrId){
3314 	  Int32 TnoDataR= readAttributes(req_struct,
3315 					  &attrId, 1,
3316 					  tmpArea, tmpAreaSz,
3317                                           false);
3318 
3319 	  if (TnoDataR < 0) {
3320 	    jam();
3321             terrorCode = Uint32(-TnoDataR);
3322 	    tupkeyErrorLab(req_struct);
3323 	    return -1;
3324 	  }
3325 	  tmpHabitant= attrId;
3326 	}
3327 
3328 	AttributeHeader ah(tmpArea[0]);
3329 	if (ah.isNULL()){
3330 	  TprogramCounter ++;
3331 	} else {
3332 	  TprogramCounter= brancher(theInstruction, TprogramCounter);
3333 	}
3334 	break;
3335       }
3336 
3337       case Interpreter::EXIT_OK:
3338 	jam();
3339 #ifdef TRACE_INTERPRETER
3340 	ndbout_c(" - exit_ok");
3341 #endif
3342 	return TdataWritten;
3343 
3344       case Interpreter::EXIT_OK_LAST:
3345 	jam();
3346 #ifdef TRACE_INTERPRETER
3347 	ndbout_c(" - exit_ok_last");
3348 #endif
3349 	req_struct->last_row= true;
3350 	return TdataWritten;
3351 
3352       case Interpreter::EXIT_REFUSE:
3353 	jam();
3354 #ifdef TRACE_INTERPRETER
3355 	ndbout_c(" - exit_nok");
3356 #endif
3357 	terrorCode= theInstruction >> 16;
3358 	return TUPKEY_abort(req_struct, 29);
3359 
3360       case Interpreter::CALL:
3361 	jam();
3362 #ifdef TRACE_INTERPRETER
3363         ndbout_c(" - call addr=%u, subroutine len=%u ret addr=%u",
3364                  theInstruction >> 16, TsubroutineLen, TprogramCounter);
3365 #endif
3366 	RstackPtr++;
3367 	if (RstackPtr < 32) {
3368           TstackMemBuffer[RstackPtr]= TprogramCounter;
3369           TprogramCounter= theInstruction >> 16;
3370 	  if (TprogramCounter < TsubroutineLen) {
3371 	    TcurrentProgram= subroutineProg;
3372 	    TcurrentSize= TsubroutineLen;
3373 	  } else {
3374 	    return TUPKEY_abort(req_struct, 30);
3375 	  }
3376 	} else {
3377 	  return TUPKEY_abort(req_struct, 31);
3378 	}
3379 	break;
3380 
3381       case Interpreter::RETURN:
3382 	jam();
3383 #ifdef TRACE_INTERPRETER
3384         ndbout_c(" - return to %u from stack level %u",
3385                  TstackMemBuffer[RstackPtr],
3386                  RstackPtr);
3387 #endif
3388 	if (RstackPtr > 0) {
3389 	  TprogramCounter= TstackMemBuffer[RstackPtr];
3390 	  RstackPtr--;
3391 	  if (RstackPtr == 0) {
3392 	    jam();
3393 	    /* ------------------------------------------------------------- */
3394 	    // We are back to the main program.
3395 	    /* ------------------------------------------------------------- */
3396 	    TcurrentProgram= mainProgram;
3397 	    TcurrentSize= TmainProgLen;
3398 	  }
3399 	} else {
3400 	  return TUPKEY_abort(req_struct, 32);
3401 	}
3402 	break;
3403 
3404       default:
3405 	return TUPKEY_abort(req_struct, 33);
3406       }
3407     } else {
3408       return TUPKEY_abort(req_struct, 34);
3409     }
3410   }
3411   return TUPKEY_abort(req_struct, 35);
3412 }
3413 
3414 /**
3415  * expand_var_part - copy packed variable attributes to fully expanded size
3416  *
3417  * dst:        where to start writing attribute data
3418  * dst_off_ptr where to write attribute offsets
3419  * src         pointer to packed attributes
3420  * tabDesc     array of attribute descriptors (used for getting max size)
3421  * no_of_attr  no of atributes to expand
3422  */
3423 static
3424 Uint32*
expand_var_part(Dbtup::KeyReqStruct::Var_data * dst,const Uint32 * src,const Uint32 * tabDesc,const Uint16 * order)3425 expand_var_part(Dbtup::KeyReqStruct::Var_data *dst,
3426 		const Uint32* src,
3427 		const Uint32 * tabDesc,
3428 		const Uint16* order)
3429 {
3430   char* dst_ptr= dst->m_data_ptr;
3431   Uint32 no_attr= dst->m_var_len_offset;
3432   Uint16* dst_off_ptr= dst->m_offset_array_ptr;
3433   Uint16* dst_len_ptr= dst_off_ptr + no_attr;
3434   const Uint16* src_off_ptr= (const Uint16*)src;
3435   const char* src_ptr= (const char*)(src_off_ptr + no_attr + 1);
3436 
3437   Uint16 tmp= *src_off_ptr++, next_pos, len, max_len, dst_off= 0;
3438   for(Uint32 i = 0; i<no_attr; i++)
3439   {
3440     next_pos= *src_off_ptr++;
3441     len= next_pos - tmp;
3442 
3443     *dst_off_ptr++ = dst_off;
3444     *dst_len_ptr++ = dst_off + len;
3445     memcpy(dst_ptr, src_ptr, len);
3446     src_ptr += len;
3447 
3448     max_len= AttributeDescriptor::getSizeInBytes(tabDesc[* order++]);
3449     dst_ptr += max_len; // Max size
3450     dst_off += max_len;
3451 
3452     tmp= next_pos;
3453   }
3454 
3455   return ALIGN_WORD(dst_ptr);
3456 }
3457 
3458 void
expand_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],Tuple_header * src,const Tablerec * tabPtrP,bool disk)3459 Dbtup::expand_tuple(KeyReqStruct* req_struct,
3460 		    Uint32 sizes[2],
3461 		    Tuple_header* src,
3462 		    const Tablerec* tabPtrP,
3463 		    bool disk)
3464 {
3465   Uint32 bits= src->m_header_bits;
3466   Uint32 extra_bits = bits;
3467   Tuple_header* ptr= req_struct->m_tuple_ptr;
3468 
3469   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3470   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3471   Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3472   Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3473   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3474   Uint32 fix_size= tabPtrP->m_offsets[MM].m_fix_header_size;
3475   Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3476 
3477   Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3478   const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
3479   const Uint32 *src_ptr= src->get_end_of_fix_part_ptr(tabPtrP);
3480   const Var_part_ref* var_ref = src->get_var_part_ref_ptr(tabPtrP);
3481   const Uint32 *desc= (Uint32*)req_struct->attr_descr;
3482   const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3483   order += tabPtrP->m_attributes[MM].m_no_of_fixsize;
3484 
3485   // Copy fix part
3486   sizes[MM]= 1;
3487   memcpy(ptr, src, 4*fix_size);
3488   if(mm_vars || mm_dyns)
3489   {
3490     /*
3491      * Reserve place for initial length word and offset array (with one extra
3492      * offset). This will be filled-in in later, in shrink_tuple().
3493      */
3494     dst_ptr += Varpart_copy::SZ32;
3495 
3496     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3497     Uint32 step; // in bytes
3498     Uint32 src_len;
3499     const Uint32 *src_data;
3500     if (bits & Tuple_header::VAR_PART)
3501     {
3502       KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3503       if(! (bits & Tuple_header::COPY_TUPLE))
3504       {
3505         /* This is for the initial expansion of a stored row. */
3506         Ptr<Page> var_page;
3507         src_data= get_ptr(&var_page, *var_ref);
3508         src_len= get_len(&var_page, *var_ref);
3509         sizes[MM]= src_len;
3510         step= 0;
3511         req_struct->m_varpart_page_ptr = var_page;
3512 
3513         /* An original tuple cant have grown as we're expanding it...
3514          * else we would be "re-expand"*/
3515         ndbassert(! (bits & Tuple_header::MM_GROWN));
3516       }
3517       else
3518       {
3519         /* This is for the re-expansion of a shrunken row (update2 ...) */
3520 
3521         Varpart_copy* vp = (Varpart_copy*)src_ptr;
3522         src_len = vp->m_len;
3523         src_data= vp->m_data;
3524         step= (Varpart_copy::SZ32 + src_len); // 1+ is for extra word
3525         req_struct->m_varpart_page_ptr = req_struct->m_page_ptr;
3526         sizes[MM]= src_len;
3527       }
3528 
3529       if (mm_vars)
3530       {
3531         dst->m_data_ptr= (char*)(((Uint16*)dst_ptr)+mm_vars+1);
3532         dst->m_offset_array_ptr= req_struct->var_pos_array;
3533         dst->m_var_len_offset= mm_vars;
3534         dst->m_max_var_offset= tabPtrP->m_offsets[MM].m_max_var_offset;
3535 
3536         dst_ptr= expand_var_part(dst, src_data, desc, order);
3537         ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
3538         /**
3539          * Move to end of fix varpart
3540          */
3541         char* varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3542         Uint32 varlen = ((Uint16*)src_data)[mm_vars];
3543         Uint32 *dynstart = ALIGN_WORD(varstart + varlen);
3544 
3545         ndbassert(src_len >= (dynstart - src_data));
3546         src_len -= Uint32(dynstart - src_data);
3547         src_data = dynstart;
3548       }
3549     }
3550     else
3551     {
3552       /**
3553        * No varpart...only allowed for dynattr
3554        */
3555       ndbassert(mm_vars == 0);
3556       src_len = step = sizes[MM] = 0;
3557       src_data = 0;
3558     }
3559 
3560     if (mm_dyns)
3561     {
3562       /**
3563        * dynattr needs to be expanded even if no varpart existed before
3564        */
3565       dst->m_dyn_offset_arr_ptr= req_struct->var_pos_array+2*mm_vars;
3566       dst->m_dyn_len_offset= mm_dynvar+mm_dynfix;
3567       dst->m_max_dyn_offset= tabPtrP->m_offsets[MM].m_max_dyn_offset;
3568       dst->m_dyn_data_ptr= (char*)dst_ptr;
3569       dst_ptr= expand_dyn_part(dst, src_data,
3570                                src_len,
3571                                desc, order + mm_vars,
3572                                mm_dynvar, mm_dynfix,
3573                                tabPtrP->m_offsets[MM].m_dyn_null_words);
3574     }
3575 
3576     ndbassert((UintPtr(src_ptr) & 3) == 0);
3577     src_ptr = src_ptr + step;
3578   }
3579 
3580   src->m_header_bits= bits &
3581     ~(Uint32)(Tuple_header::MM_SHRINK | Tuple_header::MM_GROWN);
3582 
3583   sizes[DD]= 0;
3584   if(disk && dd_tot)
3585   {
3586     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3587     order+= mm_vars+mm_dynvar+mm_dynfix;
3588 
3589     if(bits & Tuple_header::DISK_INLINE)
3590     {
3591       // Only on copy tuple
3592       ndbassert(bits & Tuple_header::COPY_TUPLE);
3593     }
3594     else
3595     {
3596       Local_key key;
3597       memcpy(&key, disk_ref, sizeof(key));
3598       key.m_page_no= req_struct->m_disk_page_ptr.i;
3599       src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3600     }
3601     extra_bits |= Tuple_header::DISK_INLINE;
3602 
3603     // Fix diskpart
3604     req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
3605     memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3606     sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3607 
3608     ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3609 
3610     ndbrequire(dd_vars == 0);
3611   }
3612 
3613   ptr->m_header_bits= (extra_bits | Tuple_header::COPY_TUPLE);
3614   req_struct->is_expanded= true;
3615 }
3616 
3617 void
dump_tuple(const KeyReqStruct * req_struct,const Tablerec * tabPtrP)3618 Dbtup::dump_tuple(const KeyReqStruct* req_struct, const Tablerec* tabPtrP)
3619 {
3620   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3621   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3622   //Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3623   const Tuple_header* ptr= req_struct->m_tuple_ptr;
3624   Uint32 bits= ptr->m_header_bits;
3625   const Uint32 *tuple_words= (Uint32 *)ptr;
3626   const Uint32 *fix_p;
3627   Uint32 fix_len;
3628   const Uint32 *var_p;
3629   Uint32 var_len;
3630   //const Uint32 *disk_p;
3631   //Uint32 disk_len;
3632   const char *typ;
3633 
3634   fix_p= tuple_words;
3635   fix_len= tabPtrP->m_offsets[MM].m_fix_header_size;
3636   if(req_struct->is_expanded)
3637   {
3638     typ= "expanded";
3639     var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3640     var_len= 0;                                 // No dump of varpart in expanded
3641 #if 0
3642     disk_p= (Uint32 *)req_struct->m_disk_ptr;
3643     disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3644 #endif
3645   }
3646   else if(! (bits & Tuple_header::COPY_TUPLE))
3647   {
3648     typ= "stored";
3649     if(mm_vars+mm_dyns)
3650     {
3651       //const KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3652       const Var_part_ref *varref= ptr->get_var_part_ref_ptr(tabPtrP);
3653       Ptr<Page> tmp;
3654       var_p= get_ptr(&tmp, * varref);
3655       var_len= get_len(&tmp, * varref);
3656     }
3657     else
3658     {
3659       var_p= 0;
3660       var_len= 0;
3661     }
3662 #if 0
3663     if(dd_tot)
3664     {
3665       Local_key key;
3666       memcpy(&key, ptr->get_disk_ref_ptr(tabPtrP), sizeof(key));
3667       key.m_page_no= req_struct->m_disk_page_ptr.i;
3668       disk_p= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3669       disk_len= tabPtrP->m_offsets[DD].m_fix_header_size;
3670     }
3671     else
3672     {
3673       disk_p= var_p;
3674       disk_len= 0;
3675     }
3676 #endif
3677   }
3678   else
3679   {
3680     typ= "shrunken";
3681     if(mm_vars+mm_dyns)
3682     {
3683       var_p= ptr->get_end_of_fix_part_ptr(tabPtrP);
3684       var_len= *((Uint16 *)var_p) + 1;
3685     }
3686     else
3687     {
3688       var_p= 0;
3689       var_len= 0;
3690     }
3691 #if 0
3692     disk_p= (Uint32 *)(req_struct->m_disk_ptr);
3693     disk_len= (dd_tot ? tabPtrP->m_offsets[DD].m_fix_header_size : 0);
3694 #endif
3695   }
3696   ndbout_c("Fixed part[%s](%p len=%u words)",typ, fix_p, fix_len);
3697   dump_hex(fix_p, fix_len);
3698   ndbout_c("Varpart part[%s](%p len=%u words)", typ , var_p, var_len);
3699   dump_hex(var_p, var_len);
3700 #if 0
3701   ndbout_c("Disk part[%s](%p len=%u words)", typ, disk_p, disk_len);
3702   dump_hex(disk_p, disk_len);
3703 #endif
3704 }
3705 
3706 void
prepare_read(KeyReqStruct * req_struct,Tablerec * tabPtrP,bool disk)3707 Dbtup::prepare_read(KeyReqStruct* req_struct,
3708 		    Tablerec* tabPtrP, bool disk)
3709 {
3710   Tuple_header* ptr= req_struct->m_tuple_ptr;
3711 
3712   Uint32 bits= ptr->m_header_bits;
3713   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3714   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3715   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3716 
3717   const Uint32 *src_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3718   const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
3719   const Var_part_ref* var_ref = ptr->get_var_part_ref_ptr(tabPtrP);
3720   if(mm_vars || mm_dyns)
3721   {
3722     const Uint32 *src_data= src_ptr;
3723     Uint32 src_len;
3724     KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3725     if (bits & Tuple_header::VAR_PART)
3726     {
3727       if(! (bits & Tuple_header::COPY_TUPLE))
3728       {
3729         Ptr<Page> tmp;
3730         src_data= get_ptr(&tmp, * var_ref);
3731         src_len= get_len(&tmp, * var_ref);
3732 
3733         /* If the original tuple was grown,
3734          * the old size is stored at the end. */
3735         if(bits & Tuple_header::MM_GROWN)
3736         {
3737           /**
3738            * This is when triggers read before value of update
3739            *   when original has been reallocated due to grow
3740            */
3741           ndbassert(src_len>0);
3742           src_len= src_data[src_len-1];
3743         }
3744       }
3745       else
3746       {
3747         Varpart_copy* vp = (Varpart_copy*)src_ptr;
3748         src_len = vp->m_len;
3749         src_data = vp->m_data;
3750         src_ptr++;
3751       }
3752 
3753       char* varstart;
3754       Uint32 varlen;
3755       const Uint32* dynstart;
3756       if (mm_vars)
3757       {
3758         varstart = (char*)(((Uint16*)src_data)+mm_vars+1);
3759         varlen = ((Uint16*)src_data)[mm_vars];
3760         dynstart = ALIGN_WORD(varstart + varlen);
3761       }
3762       else
3763       {
3764         varstart = 0;
3765         varlen = 0;
3766         dynstart = src_data;
3767       }
3768 
3769       dst->m_data_ptr= varstart;
3770       dst->m_offset_array_ptr= (Uint16*)src_data;
3771       dst->m_var_len_offset= 1;
3772       dst->m_max_var_offset= varlen;
3773 
3774       Uint32 dynlen = Uint32(src_len - (dynstart - src_data));
3775       ndbassert(src_len >= (dynstart - src_data));
3776       dst->m_dyn_data_ptr= (char*)dynstart;
3777       dst->m_dyn_part_len= dynlen;
3778       // Do or not to to do
3779       // dst->m_dyn_offset_arr_ptr = dynlen ? (Uint16*)(dynstart + *(Uint8*)dynstart) : 0;
3780 
3781       /*
3782         dst->m_dyn_offset_arr_ptr and dst->m_dyn_len_offset are not used for
3783         reading the stored/shrunken format.
3784       */
3785     }
3786     else
3787     {
3788       src_len = 0;
3789       dst->m_max_var_offset = 0;
3790       dst->m_dyn_part_len = 0;
3791 #if defined VM_TRACE || defined ERROR_INSERT
3792       bzero(dst, sizeof(* dst));
3793 #endif
3794     }
3795 
3796     // disk part start after dynamic part.
3797     src_ptr+= src_len;
3798   }
3799 
3800   if(disk && dd_tot)
3801   {
3802     const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3803 
3804     if(bits & Tuple_header::DISK_INLINE)
3805     {
3806       // Only on copy tuple
3807       ndbassert(bits & Tuple_header::COPY_TUPLE);
3808     }
3809     else
3810     {
3811       // XXX
3812       Local_key key;
3813       memcpy(&key, disk_ref, sizeof(key));
3814       key.m_page_no= req_struct->m_disk_page_ptr.i;
3815       src_ptr= get_dd_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP);
3816     }
3817     // Fix diskpart
3818     req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
3819     ndbassert(! (req_struct->m_disk_ptr->m_header_bits & Tuple_header::FREE));
3820     ndbrequire(dd_vars == 0);
3821   }
3822 
3823   req_struct->is_expanded= false;
3824 }
3825 
3826 void
shrink_tuple(KeyReqStruct * req_struct,Uint32 sizes[2],const Tablerec * tabPtrP,bool disk)3827 Dbtup::shrink_tuple(KeyReqStruct* req_struct, Uint32 sizes[2],
3828 		    const Tablerec* tabPtrP, bool disk)
3829 {
3830   ndbassert(tabPtrP->need_shrink());
3831   Tuple_header* ptr= req_struct->m_tuple_ptr;
3832   ndbassert(ptr->m_header_bits & Tuple_header::COPY_TUPLE);
3833 
3834   KeyReqStruct::Var_data* dst= &req_struct->m_var_data[MM];
3835   Uint32 order_desc= tabPtrP->m_real_order_descriptor;
3836   const Uint32 * tabDesc= (Uint32*)req_struct->attr_descr;
3837   const Uint16 *order = (Uint16*)(&tableDescriptor[order_desc]);
3838   Uint16 dd_tot= tabPtrP->m_no_of_disk_attributes;
3839   Uint16 mm_fix= tabPtrP->m_attributes[MM].m_no_of_fixsize;
3840   Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
3841   Uint16 mm_dyns= tabPtrP->m_attributes[MM].m_no_of_dynamic;
3842   Uint16 mm_dynvar= tabPtrP->m_attributes[MM].m_no_of_dyn_var;
3843   Uint16 mm_dynfix= tabPtrP->m_attributes[MM].m_no_of_dyn_fix;
3844   Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
3845 
3846   Uint32 *dst_ptr= ptr->get_end_of_fix_part_ptr(tabPtrP);
3847   Uint16* src_off_ptr= req_struct->var_pos_array;
3848   order += mm_fix;
3849 
3850   sizes[MM] = 1;
3851   sizes[DD] = 0;
3852   if(mm_vars || mm_dyns)
3853   {
3854     Varpart_copy* vp = (Varpart_copy*)dst_ptr;
3855     Uint32* varstart = dst_ptr = vp->m_data;
3856 
3857     if (mm_vars)
3858     {
3859       Uint16* dst_off_ptr= (Uint16*)dst_ptr;
3860       char*  dst_data_ptr= (char*)(dst_off_ptr + mm_vars + 1);
3861       char*  src_data_ptr= dst_data_ptr;
3862       Uint32 off= 0;
3863       for(Uint32 i= 0; i<mm_vars; i++)
3864       {
3865         const char* data_ptr= src_data_ptr + *src_off_ptr;
3866         Uint32 len= src_off_ptr[mm_vars] - *src_off_ptr;
3867         * dst_off_ptr++= off;
3868         memmove(dst_data_ptr, data_ptr, len);
3869         off += len;
3870         src_off_ptr++;
3871         dst_data_ptr += len;
3872       }
3873       *dst_off_ptr= off;
3874       dst_ptr = ALIGN_WORD(dst_data_ptr);
3875       order += mm_vars; // Point to first dynfix entry
3876     }
3877 
3878     if (mm_dyns)
3879     {
3880       dst_ptr = shrink_dyn_part(dst, dst_ptr, tabPtrP, tabDesc,
3881                                 order, mm_dynvar, mm_dynfix, MM);
3882       ndbassert((char*)dst_ptr <= ((char*)ptr) + 8192);
3883       order += mm_dynfix + mm_dynvar;
3884     }
3885 
3886     Uint32 varpart_len= Uint32(dst_ptr - varstart);
3887     vp->m_len = varpart_len;
3888     sizes[MM] = varpart_len;
3889     ptr->m_header_bits |= (varpart_len) ? Tuple_header::VAR_PART : 0;
3890 
3891     ndbassert((UintPtr(ptr) & 3) == 0);
3892     ndbassert(varpart_len < 0x10000);
3893   }
3894 
3895   if(disk && dd_tot)
3896   {
3897     Uint32 * src_ptr = (Uint32*)req_struct->m_disk_ptr;
3898     req_struct->m_disk_ptr = (Tuple_header*)dst_ptr;
3899     ndbrequire(dd_vars == 0);
3900     sizes[DD] = tabPtrP->m_offsets[DD].m_fix_header_size;
3901     memmove(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
3902   }
3903 
3904   req_struct->is_expanded= false;
3905 
3906 }
3907 
3908 void
validate_page(Tablerec * regTabPtr,Var_page * p)3909 Dbtup::validate_page(Tablerec* regTabPtr, Var_page* p)
3910 {
3911   /* ToDo: We could also do some checks here for any dynamic part. */
3912   Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
3913   Uint32 fix_sz= regTabPtr->m_offsets[MM].m_fix_header_size +
3914     Tuple_header::HeaderSize;
3915 
3916   if(mm_vars == 0)
3917     return;
3918 
3919   for(Uint32 F= 0; F<MAX_FRAG_PER_NODE; F++)
3920   {
3921     FragrecordPtr fragPtr;
3922 
3923     if((fragPtr.i = regTabPtr->fragrec[F]) == RNIL)
3924       continue;
3925 
3926     ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
3927     for(Uint32 P= 0; P<fragPtr.p->noOfPages; P++)
3928     {
3929       Uint32 real= getRealpid(fragPtr.p, P);
3930       Var_page* page= (Var_page*)c_page_pool.getPtr(real);
3931 
3932       for(Uint32 i=1; i<page->high_index; i++)
3933       {
3934 	Uint32 idx= page->get_index_word(i);
3935 	Uint32 len = (idx & Var_page::LEN_MASK) >> Var_page::LEN_SHIFT;
3936 	if(!(idx & Var_page::FREE) && !(idx & Var_page::CHAIN))
3937 	{
3938 	  Tuple_header *ptr= (Tuple_header*)page->get_ptr(i);
3939 	  Uint32 *part= ptr->get_end_of_fix_part_ptr(regTabPtr);
3940 	  if(! (ptr->m_header_bits & Tuple_header::COPY_TUPLE))
3941 	  {
3942 	    ndbassert(len == fix_sz + 1);
3943 	    Local_key tmp; tmp.assref(*part);
3944 	    Ptr<Page> tmpPage;
3945 	    part= get_ptr(&tmpPage, *(Var_part_ref*)part);
3946 	    len= ((Var_page*)tmpPage.p)->get_entry_len(tmp.m_page_idx);
3947 	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3948 	    ndbassert(len >= ((sz + 3) >> 2));
3949 	  }
3950 	  else
3951 	  {
3952 	    Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3953 	    ndbassert(len >= ((sz+3)>>2)+fix_sz);
3954 	  }
3955 	  if(ptr->m_operation_ptr_i != RNIL)
3956 	  {
3957 	    c_operation_pool.getPtr(ptr->m_operation_ptr_i);
3958 	  }
3959 	}
3960 	else if(!(idx & Var_page::FREE))
3961 	{
3962 	  /**
3963 	   * Chain
3964 	   */
3965 	  Uint32 *part= page->get_ptr(i);
3966 	  Uint32 sz= ((mm_vars + 1) << 1) + (((Uint16*)part)[mm_vars]);
3967 	  ndbassert(len >= ((sz + 3) >> 2));
3968 	}
3969 	else
3970 	{
3971 
3972 	}
3973       }
3974       if(p == 0 && page->high_index > 1)
3975 	page->reorg((Var_page*)ctemp_page);
3976     }
3977   }
3978 
3979   if(p == 0)
3980   {
3981     validate_page(regTabPtr, (Var_page*)1);
3982   }
3983 }
3984 
3985 int
handle_size_change_after_update(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Uint32 sizes[4])3986 Dbtup::handle_size_change_after_update(KeyReqStruct* req_struct,
3987 				       Tuple_header* org,
3988 				       Operationrec* regOperPtr,
3989 				       Fragrecord* regFragPtr,
3990 				       Tablerec* regTabPtr,
3991 				       Uint32 sizes[4])
3992 {
3993   ndbrequire(sizes[1] == sizes[3]);
3994   //ndbout_c("%d %d %d %d", sizes[0], sizes[1], sizes[2], sizes[3]);
3995   if(0)
3996     printf("%p %d %d - handle_size_change_after_update ",
3997 	   req_struct->m_tuple_ptr,
3998 	   regOperPtr->m_tuple_location.m_page_no,
3999 	   regOperPtr->m_tuple_location.m_page_idx);
4000 
4001   Uint32 bits= org->m_header_bits;
4002   Uint32 copy_bits= req_struct->m_tuple_ptr->m_header_bits;
4003 
4004   if(sizes[2+MM] == sizes[MM])
4005     ;
4006   else if(sizes[2+MM] < sizes[MM])
4007   {
4008     if(0) ndbout_c("shrink");
4009     req_struct->m_tuple_ptr->m_header_bits= copy_bits|Tuple_header::MM_SHRINK;
4010   }
4011   else
4012   {
4013     if(0) printf("grow - ");
4014     Ptr<Page> pagePtr = req_struct->m_varpart_page_ptr;
4015     Var_page* pageP= (Var_page*)pagePtr.p;
4016     Var_part_ref *refptr= org->get_var_part_ref_ptr(regTabPtr);
4017     ndbassert(! (bits & Tuple_header::COPY_TUPLE));
4018 
4019     Local_key ref;
4020     refptr->copyout(&ref);
4021     Uint32 alloc;
4022     Uint32 idx= ref.m_page_idx;
4023     if (bits & Tuple_header::VAR_PART)
4024     {
4025       if (copy_bits & Tuple_header::COPY_TUPLE)
4026       {
4027         c_page_pool.getPtr(pagePtr, ref.m_page_no);
4028         pageP = (Var_page*)pagePtr.p;
4029       }
4030       alloc = pageP->get_entry_len(idx);
4031     }
4032     else
4033     {
4034       alloc = 0;
4035     }
4036     Uint32 orig_size= alloc;
4037     if(bits & Tuple_header::MM_GROWN)
4038     {
4039       /* Was grown before, so must fetch real original size from last word. */
4040       Uint32 *old_var_part= pageP->get_ptr(idx);
4041       ndbassert(alloc>0);
4042       orig_size= old_var_part[alloc-1];
4043     }
4044 
4045     if (alloc)
4046     {
4047 #ifdef VM_TRACE
4048       if(!pageP->get_entry_chain(idx))
4049         ndbout << *pageP << endl;
4050 #endif
4051       ndbassert(pageP->get_entry_chain(idx));
4052     }
4053 
4054     Uint32 needed= sizes[2+MM];
4055 
4056     if(needed <= alloc)
4057     {
4058       //ndbassert(!regOperPtr->is_first_operation());
4059       if (0) ndbout_c(" no grow");
4060       return 0;
4061     }
4062     Uint32 *new_var_part=realloc_var_part(&terrorCode,
4063                                           regFragPtr, regTabPtr, pagePtr,
4064                                           refptr, alloc, needed);
4065     if (unlikely(new_var_part==NULL))
4066       return -1;
4067     /* Mark the tuple grown, store the original length at the end. */
4068     org->m_header_bits= bits | Tuple_header::MM_GROWN | Tuple_header::VAR_PART;
4069     new_var_part[needed-1]= orig_size;
4070 
4071     if (regTabPtr->m_bits & Tablerec::TR_Checksum)
4072     {
4073       jam();
4074       setChecksum(org, regTabPtr);
4075     }
4076   }
4077   return 0;
4078 }
4079 
4080 int
optimize_var_part(KeyReqStruct * req_struct,Tuple_header * org,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr)4081 Dbtup::optimize_var_part(KeyReqStruct* req_struct,
4082                          Tuple_header* org,
4083                          Operationrec* regOperPtr,
4084                          Fragrecord* regFragPtr,
4085                          Tablerec* regTabPtr)
4086 {
4087   jam();
4088   Var_part_ref* refptr = org->get_var_part_ref_ptr(regTabPtr);
4089 
4090   Local_key ref;
4091   refptr->copyout(&ref);
4092   Uint32 idx = ref.m_page_idx;
4093 
4094   Ptr<Page> pagePtr;
4095   c_page_pool.getPtr(pagePtr, ref.m_page_no);
4096 
4097   Var_page* pageP = (Var_page*)pagePtr.p;
4098   Uint32 var_part_size = pageP->get_entry_len(idx);
4099 
4100   /**
4101    * if the size of page list_index is MAX_FREE_LIST,
4102    * we think it as full page, then need not optimize
4103    */
4104   if(pageP->list_index != MAX_FREE_LIST)
4105   {
4106     jam();
4107     /*
4108      * optimize var part of tuple by moving varpart,
4109      * then we possibly reclaim free pages
4110      */
4111     move_var_part(regFragPtr, regTabPtr, pagePtr,
4112                   refptr, var_part_size);
4113 
4114     if (regTabPtr->m_bits & Tablerec::TR_Checksum)
4115     {
4116       jam();
4117       setChecksum(org, regTabPtr);
4118     }
4119   }
4120 
4121   return 0;
4122 }
4123 
4124 int
nr_update_gci(Uint32 fragPtrI,const Local_key * key,Uint32 gci)4125 Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4126 {
4127   FragrecordPtr fragPtr;
4128   fragPtr.i= fragPtrI;
4129   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4130   TablerecPtr tablePtr;
4131   tablePtr.i= fragPtr.p->fragTableId;
4132   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4133 
4134   if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4135   {
4136     Local_key tmp = *key;
4137     PagePtr pagePtr;
4138 
4139     Uint32 err;
4140     pagePtr.i = allocFragPage(&err, tablePtr.p, fragPtr.p, tmp.m_page_no);
4141     if (unlikely(pagePtr.i == RNIL))
4142     {
4143       return -(int)err;
4144     }
4145     c_page_pool.getPtr(pagePtr);
4146 
4147     Tuple_header* ptr = (Tuple_header*)
4148       ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
4149 
4150     ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
4151     *ptr->get_mm_gci(tablePtr.p) = gci;
4152   }
4153   return 0;
4154 }
4155 
4156 int
nr_read_pk(Uint32 fragPtrI,const Local_key * key,Uint32 * dst,bool & copy)4157 Dbtup::nr_read_pk(Uint32 fragPtrI,
4158 		  const Local_key* key, Uint32* dst, bool& copy)
4159 {
4160 
4161   FragrecordPtr fragPtr;
4162   fragPtr.i= fragPtrI;
4163   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4164   TablerecPtr tablePtr;
4165   tablePtr.i= fragPtr.p->fragTableId;
4166   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4167 
4168   Local_key tmp = *key;
4169 
4170   Uint32 err;
4171   PagePtr pagePtr;
4172   pagePtr.i = allocFragPage(&err, tablePtr.p, fragPtr.p, tmp.m_page_no);
4173   if (unlikely(pagePtr.i == RNIL))
4174     return -(int)err;
4175 
4176   c_page_pool.getPtr(pagePtr);
4177   KeyReqStruct req_struct(this);
4178   Uint32* ptr= ((Fix_page*)pagePtr.p)->get_ptr(key->m_page_idx, 0);
4179 
4180   req_struct.m_page_ptr = pagePtr;
4181   req_struct.m_tuple_ptr = (Tuple_header*)ptr;
4182   Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
4183 
4184   int ret = 0;
4185   copy = false;
4186   if (! (bits & Tuple_header::FREE))
4187   {
4188     if (bits & Tuple_header::ALLOC)
4189     {
4190       Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
4191       Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
4192       ndbassert(!opPtrP->m_copy_tuple_location.isNull());
4193       req_struct.m_tuple_ptr=
4194         get_copy_tuple(&opPtrP->m_copy_tuple_location);
4195       copy = true;
4196     }
4197     req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
4198     req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
4199 
4200     Uint32 num_attr= tablePtr.p->m_no_of_attributes;
4201     Uint32 descr_start= tablePtr.p->tabDescriptor;
4202     TableDescriptor *tab_descr= &tableDescriptor[descr_start];
4203     ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
4204     req_struct.attr_descr= tab_descr;
4205 
4206     if (tablePtr.p->need_expand())
4207       prepare_read(&req_struct, tablePtr.p, false);
4208 
4209     const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
4210     const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
4211     // read pk attributes from original tuple
4212 
4213     req_struct.tablePtrP = tablePtr.p;
4214     req_struct.fragPtrP = fragPtr.p;
4215 
4216     // do it
4217     ret = readAttributes(&req_struct,
4218 			 attrIds,
4219 			 numAttrs,
4220 			 dst,
4221 			 ZNIL, false);
4222 
4223     // done
4224     if (likely(ret >= 0)) {
4225       // remove headers
4226       Uint32 n= 0;
4227       Uint32 i= 0;
4228       while (n < numAttrs) {
4229 	const AttributeHeader ah(dst[i]);
4230 	Uint32 size= ah.getDataSize();
4231 	ndbrequire(size != 0);
4232 	for (Uint32 j= 0; j < size; j++) {
4233 	  dst[i + j - n]= dst[i + j + 1];
4234 	}
4235 	n+= 1;
4236 	i+= 1 + size;
4237       }
4238       ndbrequire((int)i == ret);
4239       ret -= numAttrs;
4240     } else {
4241       return ret;
4242     }
4243   }
4244 
4245   if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
4246   {
4247     dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
4248   }
4249   else
4250   {
4251     dst[ret] = 0;
4252   }
4253   return ret;
4254 }
4255 
4256 #include <signaldata/TuxMaint.hpp>
4257 
4258 int
nr_delete(Signal * signal,Uint32 senderData,Uint32 fragPtrI,const Local_key * key,Uint32 gci)4259 Dbtup::nr_delete(Signal* signal, Uint32 senderData,
4260 		 Uint32 fragPtrI, const Local_key* key, Uint32 gci)
4261 {
4262   FragrecordPtr fragPtr;
4263   fragPtr.i= fragPtrI;
4264   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4265   TablerecPtr tablePtr;
4266   tablePtr.i= fragPtr.p->fragTableId;
4267   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4268 
4269   Local_key tmp = * key;
4270   tmp.m_page_no= getRealpid(fragPtr.p, tmp.m_page_no);
4271 
4272   PagePtr pagePtr;
4273   Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, tablePtr.p);
4274 
4275   if (!tablePtr.p->tuxCustomTriggers.isEmpty())
4276   {
4277     jam();
4278     TuxMaintReq* req = (TuxMaintReq*)signal->getDataPtrSend();
4279     req->tableId = fragPtr.p->fragTableId;
4280     req->fragId = fragPtr.p->fragmentId;
4281     req->pageId = tmp.m_page_no;
4282     req->pageIndex = tmp.m_page_idx;
4283     req->tupVersion = ptr->get_tuple_version();
4284     req->opInfo = TuxMaintReq::OpRemove;
4285     removeTuxEntries(signal, tablePtr.p);
4286   }
4287 
4288   Local_key disk;
4289   memcpy(&disk, ptr->get_disk_ref_ptr(tablePtr.p), sizeof(disk));
4290 
4291   if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
4292       tablePtr.p->m_attributes[MM].m_no_of_dynamic)
4293   {
4294     jam();
4295     free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
4296   } else {
4297     jam();
4298     free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
4299   }
4300 
4301   if (tablePtr.p->m_no_of_disk_attributes)
4302   {
4303     jam();
4304 
4305     Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4306       tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4307 
4308     D("Logfile_client - nr_delete");
4309     Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4310     int res = lgman.alloc_log_space(sz);
4311     ndbrequire(res == 0);
4312 
4313     /**
4314      * 1) alloc log buffer
4315      * 2) get page
4316      * 3) get log buffer
4317      * 4) delete tuple
4318      */
4319     Page_cache_client::Request preq;
4320     preq.m_page = disk;
4321     preq.m_callback.m_callbackData = senderData;
4322     preq.m_callback.m_callbackFunction =
4323       safe_cast(&Dbtup::nr_delete_page_callback);
4324     int flags = Page_cache_client::COMMIT_REQ;
4325 
4326 #ifdef ERROR_INSERT
4327     if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
4328     {
4329       int rnd = rand() % 100;
4330       int slp = 0;
4331       if (ERROR_INSERTED(4024))
4332       {
4333 	slp = 3000;
4334       }
4335       else if (rnd > 90)
4336       {
4337 	slp = 3000;
4338       }
4339       else if (rnd > 70)
4340       {
4341 	slp = 100;
4342       }
4343 
4344       ndbout_c("rnd: %d slp: %d", rnd, slp);
4345 
4346       if (slp)
4347       {
4348 	flags |= Page_cache_client::DELAY_REQ;
4349 	preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
4350       }
4351     }
4352 #endif
4353 
4354     Page_cache_client pgman(this, c_pgman);
4355     res = pgman.get_page(signal, preq, flags);
4356     m_pgman_ptr = pgman.m_ptr;
4357     if (res == 0)
4358     {
4359       goto timeslice;
4360     }
4361     else if (unlikely(res == -1))
4362     {
4363       return -1;
4364     }
4365 
4366     PagePtr disk_page = { (Tup_page*)m_pgman_ptr.p, m_pgman_ptr.i };
4367     disk_page_set_dirty(disk_page);
4368 
4369     CallbackPtr cptr;
4370     cptr.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4371     cptr.m_callbackData = senderData;
4372     res= lgman.get_log_buffer(signal, sz, &cptr);
4373     switch(res){
4374     case 0:
4375       signal->theData[2] = disk_page.i;
4376       goto timeslice;
4377     case -1:
4378       ndbrequire("NOT YET IMPLEMENTED" == 0);
4379       break;
4380     }
4381 
4382     if (0) ndbout << "DIRECT DISK DELETE: " << disk << endl;
4383     disk_page_free(signal, tablePtr.p, fragPtr.p,
4384 		   &disk, *(PagePtr*)&disk_page, gci);
4385     return 0;
4386   }
4387 
4388   return 0;
4389 
4390 timeslice:
4391   memcpy(signal->theData, &disk, sizeof(disk));
4392   return 1;
4393 }
4394 
4395 void
nr_delete_page_callback(Signal * signal,Uint32 userpointer,Uint32 page_id)4396 Dbtup::nr_delete_page_callback(Signal* signal,
4397 			       Uint32 userpointer, Uint32 page_id)//unused
4398 {
4399   Ptr<GlobalPage> gpage;
4400   m_global_page_pool.getPtr(gpage, page_id);
4401   PagePtr pagePtr= { (Tup_page*)gpage.p, gpage.i };
4402   disk_page_set_dirty(pagePtr);
4403   Dblqh::Nr_op_info op;
4404   op.m_ptr_i = userpointer;
4405   op.m_disk_ref.m_page_no = pagePtr.p->m_page_no;
4406   op.m_disk_ref.m_file_no = pagePtr.p->m_file_no;
4407   c_lqh->get_nr_op_info(&op, page_id);
4408 
4409   Ptr<Fragrecord> fragPtr;
4410   fragPtr.i= op.m_tup_frag_ptr_i;
4411   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4412 
4413   Ptr<Tablerec> tablePtr;
4414   tablePtr.i = fragPtr.p->fragTableId;
4415   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4416 
4417   Uint32 sz = (sizeof(Dbtup::Disk_undo::Free) >> 2) +
4418     tablePtr.p->m_offsets[DD].m_fix_header_size - 1;
4419 
4420   CallbackPtr cb;
4421   cb.m_callbackData = userpointer;
4422   cb.m_callbackIndex = NR_DELETE_LOG_BUFFER_CALLBACK;
4423   D("Logfile_client - nr_delete_page_callback");
4424   Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
4425   int res= lgman.get_log_buffer(signal, sz, &cb);
4426   switch(res){
4427   case 0:
4428     return;
4429   case -1:
4430     ndbrequire("NOT YET IMPLEMENTED" == 0);
4431     break;
4432   }
4433 
4434   if (0) ndbout << "PAGE CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4435   disk_page_free(signal, tablePtr.p, fragPtr.p,
4436 		 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4437 
4438   c_lqh->nr_delete_complete(signal, &op);
4439   return;
4440 }
4441 
4442 void
nr_delete_log_buffer_callback(Signal * signal,Uint32 userpointer,Uint32 unused)4443 Dbtup::nr_delete_log_buffer_callback(Signal* signal,
4444 				    Uint32 userpointer,
4445 				    Uint32 unused)
4446 {
4447   Dblqh::Nr_op_info op;
4448   op.m_ptr_i = userpointer;
4449   c_lqh->get_nr_op_info(&op, RNIL);
4450 
4451   Ptr<Fragrecord> fragPtr;
4452   fragPtr.i= op.m_tup_frag_ptr_i;
4453   ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
4454 
4455   Ptr<Tablerec> tablePtr;
4456   tablePtr.i = fragPtr.p->fragTableId;
4457   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
4458 
4459   Ptr<GlobalPage> gpage;
4460   m_global_page_pool.getPtr(gpage, op.m_page_id);
4461   PagePtr pagePtr = { (Tup_page*)gpage.p, gpage.i };
4462 
4463   /**
4464    * reset page no
4465    */
4466   if (0) ndbout << "LOGBUFFER CALLBACK DISK DELETE: " << op.m_disk_ref << endl;
4467 
4468   disk_page_free(signal, tablePtr.p, fragPtr.p,
4469 		 &op.m_disk_ref, pagePtr, op.m_gci_hi);
4470 
4471   c_lqh->nr_delete_complete(signal, &op);
4472 }
4473