1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #define DBTUP_C
26 #define DBTUP_COMMIT_CPP
27 #include "Dbtup.hpp"
28 #include <RefConvert.hpp>
29 #include <ndb_limits.h>
30 #include <pc.hpp>
31 #include <signaldata/TupCommit.hpp>
32 #include <EventLogger.hpp>
33 #include "../dblqh/Dblqh.hpp"
34 
35 #define JAM_FILE_ID 416
36 
37 extern EventLogger *g_eventLogger;
38 
39 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
40 //#define DEBUG_LCP 1
41 //#define DEBUG_LCP_SKIP_DELETE_EXTRA 1
42 //#define DEBUG_INSERT_EXTRA 1
43 //#define DEBUG_LCP_SCANNED_BIT 1
44 //#define DEBUG_PGMAN 1
45 //#define DEBUG_ROW_COUNT_DEL 1
46 //#define DEBUG_ROW_COUNT_INS 1
47 //#define DEBUG_DELETE 1
48 //#define DEBUG_DELETE_EXTRA 1
49 //#define DEBUG_LCP_SKIP_DELETE2 1
50 //#define DEBUG_LCP_DEL 1
51 //#define DEBUG_LCP_SKIP 1
52 //#define DEBUG_LCP_SKIP_DELETE 1
53 #endif
54 
55 #ifdef DEBUG_LCP
56 #define DEB_LCP(arglist) do { g_eventLogger->info arglist ; } while (0)
57 #else
58 #define DEB_LCP(arglist) do { } while (0)
59 #endif
60 
61 #ifdef DEBUG_DELETE_EXTRA
62 #define DEB_DELETE_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
63 #else
64 #define DEB_DELETE_EXTRA(arglist) do { } while (0)
65 #endif
66 
67 #ifdef DEBUG_INSERT_EXTRA
68 #define DEB_INSERT_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
69 #else
70 #define DEB_INSERT_EXTRA(arglist) do { } while (0)
71 #endif
72 
73 #ifdef DEBUG_LCP_DEL
74 #define DEB_LCP_DEL(arglist) do { g_eventLogger->info arglist ; } while (0)
75 #else
76 #define DEB_LCP_DEL(arglist) do { } while (0)
77 #endif
78 
79 #ifdef DEBUG_LCP_SKIP
80 #define DEB_LCP_SKIP(arglist) do { g_eventLogger->info arglist ; } while (0)
81 #else
82 #define DEB_LCP_SKIP(arglist) do { } while (0)
83 #endif
84 
85 #ifdef DEBUG_LCP_SKIP_DELETE
86 #define DEB_LCP_SKIP_DELETE(arglist) do { g_eventLogger->info arglist ; } while (0)
87 #else
88 #define DEB_LCP_SKIP_DELETE(arglist) do { } while (0)
89 #endif
90 
91 #ifdef DEBUG_LCP_SKIP_DELETE2
92 #define DEB_LCP_SKIP_DELETE2(arglist) do { g_eventLogger->info arglist ; } while (0)
93 #else
94 #define DEB_LCP_SKIP_DELETE2(arglist) do { } while (0)
95 #endif
96 
97 #ifdef DEBUG_LCP_SCANNED_BIT
98 #define DEB_LCP_SCANNED_BIT(arglist) do { g_eventLogger->info arglist ; } while (0)
99 #else
100 #define DEB_LCP_SCANNED_BIT(arglist) do { } while (0)
101 #endif
102 
103 #ifdef DEBUG_PGMAN
104 #define DEB_PGMAN(arglist) do { g_eventLogger->info arglist ; } while (0)
105 #else
106 #define DEB_PGMAN(arglist) do { } while (0)
107 #endif
108 
109 #ifdef DEBUG_DELETE
110 #define DEB_DELETE(arglist) do { g_eventLogger->info arglist ; } while (0)
111 #else
112 #define DEB_DELETE(arglist) do { } while (0)
113 #endif
114 
execTUP_DEALLOCREQ(Signal * signal)115 void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
116 {
117   TablerecPtr regTabPtr;
118   FragrecordPtr regFragPtr;
119   Uint32 frag_page_id, frag_id;
120 
121   jamEntry();
122 
123   frag_id= signal->theData[0];
124   regTabPtr.i= signal->theData[1];
125   frag_page_id= signal->theData[2];
126   Uint32 page_index= signal->theData[3];
127 
128   ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
129 
130   getFragmentrec(regFragPtr, frag_id, regTabPtr.p);
131   ndbassert(regFragPtr.p != NULL);
132 
133   if (! Local_key::isInvalid(frag_page_id, page_index))
134   {
135     Local_key tmp;
136     tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id);
137     tmp.m_page_idx= page_index;
138     PagePtr pagePtr;
139     Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
140 
141     DEB_DELETE(("(%u)dealloc tab(%u,%u), row(%u,%u), header: %x",
142                  instance(),
143                  regTabPtr.i,
144                  frag_id,
145                  frag_page_id,
146                  page_index,
147                  ptr->m_header_bits));
148 
149     ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
150 
151     if (regTabPtr.p->m_attributes[MM].m_no_of_varsize +
152         regTabPtr.p->m_attributes[MM].m_no_of_dynamic)
153     {
154       jam();
155       free_var_rec(regFragPtr.p, regTabPtr.p, &tmp, pagePtr);
156     } else {
157       free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
158     }
159   }
160   else
161   {
162     jam();
163   }
164 }
165 
execTUP_WRITELOG_REQ(Signal * signal)166 void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
167 {
168   jamEntry();
169   OperationrecPtr loopOpPtr;
170   loopOpPtr.i= signal->theData[0];
171   Uint32 gci_hi = signal->theData[1];
172   Uint32 gci_lo = signal->theData[2];
173   ndbrequire(c_operation_pool.getValidPtr(loopOpPtr));
174   while (loopOpPtr.p->prevActiveOp != RNIL) {
175     jam();
176     loopOpPtr.i= loopOpPtr.p->prevActiveOp;
177     ndbrequire(c_operation_pool.getValidPtr(loopOpPtr));
178   }
179   do {
180     ndbrequire(get_trans_state(loopOpPtr.p) == TRANS_STARTED);
181     signal->theData[0] = loopOpPtr.p->userpointer;
182     signal->theData[1] = gci_hi;
183     signal->theData[2] = gci_lo;
184     if (loopOpPtr.p->nextActiveOp == RNIL) {
185       jam();
186       EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 3);
187       return;
188     }
189     jam();
190     EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 3);
191     jamEntry();
192     loopOpPtr.i= loopOpPtr.p->nextActiveOp;
193     ndbrequire(c_operation_pool.getValidPtr(loopOpPtr));
194   } while (true);
195 }
196 
197 /* ---------------------------------------------------------------- */
198 /* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP.  */
199 /* ---------------------------------------------------------------- */
initOpConnection(Operationrec * regOperPtr)200 void Dbtup::initOpConnection(Operationrec* regOperPtr)
201 {
202   set_tuple_state(regOperPtr, TUPLE_ALREADY_ABORTED);
203   set_trans_state(regOperPtr, TRANS_IDLE);
204   regOperPtr->op_type= ZREAD;
205   regOperPtr->op_struct.bit_field.m_disk_preallocated= 0;
206   regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 0;
207   regOperPtr->op_struct.bit_field.m_wait_log_buffer= 0;
208   regOperPtr->op_struct.bit_field.in_active_list = false;
209   regOperPtr->m_undo_buffer_space= 0;
210 }
211 
212 bool
is_rowid_in_remaining_lcp_set(const Page * page,Fragrecord * regFragPtr,const Local_key & key1,const Dbtup::ScanOp & op,Uint32 check_lcp_scanned_state_reversed)213 Dbtup::is_rowid_in_remaining_lcp_set(const Page* page,
214 		                     Fragrecord* regFragPtr,
215                                      const Local_key& key1,
216                                      const Dbtup::ScanOp& op,
217                                      Uint32 check_lcp_scanned_state_reversed)
218 {
219 
220   if (page->is_page_to_skip_lcp() ||
221       (check_lcp_scanned_state_reversed == 0 &&
222         get_lcp_scanned_bit(regFragPtr, key1.m_page_no)))
223   {
224     /**
225      * We have to check whether the page have already been scanned by
226      * the LCP. We have two different flags for this. The first one
227      * is checked by is_page_to_skip_lcp(). This is set when a page
228      * is allocated during an LCP scan and not previously released
229      * in the same LCP scan.
230      *
231      * If a page is released during the LCP scan we set the lcp
232      * scanned bit in the page map. We need to check both those to
233      * see if the page have been LCP scanned.
234      *
235      * When check_lcp_scanned_state_reversed is != 0 we are not interested
236      * in the lcp scanned state and will ignore checking this. We can
237      * call it with check_lcp_scanned_state_reversed set to 0 even if we
238      * know that the lcp scanned bit isn't set. The reason is that the
239      * check_lcp_state_reversed is also used for debug printouts as well.
240      */
241     jam();
242     return false; /* Page already scanned for skipped pages */
243   }
244   bool dummy;
245   int ret_val = c_backup->is_page_lcp_scanned(key1.m_page_no, dummy);
246   if (ret_val == +1)
247   {
248     jam();
249     return false;
250   }
251   else if (ret_val == -1)
252   {
253     jam();
254     if (check_lcp_scanned_state_reversed != 0)
255     {
256       DEB_LCP_SCANNED_BIT(("(%u)Line: %u, page: %u, debug_val: %u",
257                            instance(),
258                            __LINE__,
259                            key1.m_page_no,
260                            check_lcp_scanned_state_reversed));
261     }
262     return true;
263   }
264   /* We are scanning the given page */
265   Local_key key2 = op.m_scanPos.m_key;
266   switch (op.m_state) {
267   case Dbtup::ScanOp::First:
268   {
269     jam();
270     ndbrequire(key2.isNull());
271     if (check_lcp_scanned_state_reversed != 0)
272     {
273       DEB_LCP_SCANNED_BIT(("(%u)Line: %u, page: %u, debug_val: %u",
274                            instance(),
275                            __LINE__,
276                            key1.m_page_no,
277                            check_lcp_scanned_state_reversed));
278     }
279     return true; /* Already checked page id above, so will scan the page */
280   }
281   case Dbtup::ScanOp::Current:
282   {
283     /* Impossible state for LCP scans */
284     ndbabort();
285   }
286   case Dbtup::ScanOp::Next:
287   {
288     ndbrequire(key1.m_page_no == key2.m_page_no);
289     ndbrequire(!key2.isNull());
290     if (op.m_scanPos.m_get == ScanPos::Get_next_page_mm)
291     {
292       jam();
293       /**
294        * We got a real-time break while switching to a new page.
295        * In this case we can skip the page since it is already
296        * LCP:ed.
297        */
298       return false;
299     }
300     if (key1.m_page_idx < key2.m_page_idx)
301     {
302       jam();
303       /* Ignore rows already LCP:ed */
304       return false;
305     }
306     if (key1.m_page_idx > key2.m_page_idx)
307     {
308       jam();
309       /* Include rows not LCP:ed yet */
310       if (check_lcp_scanned_state_reversed != 0)
311       {
312         DEB_LCP_SCANNED_BIT(("(%u)Line: %u, page: %u, debug_val: %u",
313                              instance(),
314                              __LINE__,
315                              key1.m_page_no,
316                              check_lcp_scanned_state_reversed));
317       }
318       return true;
319     }
320     ndbrequire(key1.m_page_idx == key2.m_page_idx);
321     /* keys are equal */
322     jam();
323     /* Ignore current row that already have been LCP:ed. */
324     return false;
325   }
326   case Dbtup::ScanOp::Last:
327   case Dbtup::ScanOp::Aborting:
328   {
329     jam();
330     return false; /* Everything scanned already */
331   }
332   default:
333     break;
334   }
335   /* Will never arrive here */
336   jamLine(Uint16(op.m_state));
337   ndbabort();
338   return true;
339 }
340 
341 void
dealloc_tuple(Signal * signal,Uint32 gci_hi,Uint32 gci_lo,Page * page,Tuple_header * ptr,KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Ptr<GlobalPage> pagePtr)342 Dbtup::dealloc_tuple(Signal* signal,
343 		     Uint32 gci_hi,
344                      Uint32 gci_lo,
345 		     Page* page,
346 		     Tuple_header* ptr,
347                      KeyReqStruct * req_struct,
348 		     Operationrec* regOperPtr,
349 		     Fragrecord* regFragPtr,
350 		     Tablerec* regTabPtr,
351                      Ptr<GlobalPage> pagePtr)
352 {
353   Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
354   Uint32 average_row_size = regFragPtr->m_average_row_size;
355 
356   Uint32 bits = ptr->m_header_bits;
357   Uint32 extra_bits = Tuple_header::FREE;
358   c_lqh->add_delete_size(average_row_size);
359   if (bits & Tuple_header::DISK_PART)
360   {
361     if (likely(pagePtr.i != RNIL))
362     {
363       jam();
364       ndbrequire(c_lqh->is_restore_phase_done());
365       Local_key disk;
366       memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
367       PagePtr tmpptr;
368       Local_key rowid = regOperPtr->m_tuple_location;
369       rowid.m_page_no = page->frag_page_id;
370       tmpptr.i = pagePtr.i;
371       tmpptr.p = reinterpret_cast<Page*>(pagePtr.p);
372       disk_page_free(signal,
373                      regTabPtr,
374                      regFragPtr,
375 		     &disk,
376                      tmpptr,
377                      gci_hi,
378                      &rowid,
379                      regOperPtr->m_undo_buffer_space);
380     }
381     else
382     {
383       ndbrequire(!c_lqh->is_restore_phase_done());
384     }
385   }
386 
387   if (! (bits & (Tuple_header::LCP_SKIP |
388                  Tuple_header::ALLOC |
389                  Tuple_header::LCP_DELETE)) &&
390       lcpScan_ptr_i != RNIL)
391   {
392     jam();
393     ScanOpPtr scanOp;
394     scanOp.i = lcpScan_ptr_i;
395     ndbrequire(c_scanOpPool.getValidPtr(scanOp));
396     Local_key rowid = regOperPtr->m_tuple_location;
397     rowid.m_page_no = page->frag_page_id;
398     if (is_rowid_in_remaining_lcp_set(page, regFragPtr, rowid, *scanOp.p, 0))
399     {
400       jam();
401 
402       /**
403        * We're committing a delete, on a row that should
404        *   be part of LCP. Copy original row into copy-tuple
405        *   and add this copy-tuple to lcp-keep-list
406        *
407        * We also need to set the LCP_SKIP bit in the tuple header to avoid
408        * that the LCP scan finds this row and records it as a deleted
409        * rowid before the LCP scan start. This can happen on CHANGED ROW
410        * pages only.
411        *
412        */
413       /* Coverage tested */
414       extra_bits |= Tuple_header::LCP_SKIP;
415       DEB_LCP_SKIP_DELETE(("(%u)tab(%u,%u), row(%u,%u),"
416                            " handle_lcp_keep_commit"
417                            ", set LCP_SKIP, bits: %x",
418                            instance(),
419                            regFragPtr->fragTableId,
420                            regFragPtr->fragmentId,
421                            rowid.m_page_no,
422                            rowid.m_page_idx,
423                            bits | extra_bits));
424       handle_lcp_keep_commit(&rowid,
425                              req_struct,
426                              regOperPtr,
427                              regFragPtr,
428                              regTabPtr);
429     }
430     else
431     {
432       /* Coverage tested */
433       DEB_LCP_SKIP_DELETE2(("(%u)tab(%u,%u), row(%u,%u) DELETE"
434                            " already LCP:ed",
435                            instance(),
436                            regFragPtr->fragTableId,
437                            regFragPtr->fragmentId,
438                            rowid.m_page_no,
439                            rowid.m_page_idx));
440     }
441   }
442   else
443   {
444 #ifdef DEBUG_LCP_SKIP_DELETE_EXTRA
445     Local_key rowid = regOperPtr->m_tuple_location;
446     rowid.m_page_no = page->frag_page_id;
447     g_eventLogger->info("(%u)tab(%u,%u)row(%u,%u),"
448                         ", skip LCP, bits: %x"
449                         ", lcpScan_ptr: %u",
450                         instance(),
451                         regFragPtr->fragTableId,
452                         regFragPtr->fragmentId,
453                         rowid.m_page_no,
454                         rowid.m_page_idx,
455                         bits,
456                         lcpScan_ptr_i);
457 #endif
458   }
459 
460 
461 #ifdef DEBUG_DELETE_EXTRA
462   if (c_started)
463   {
464     Local_key rowid = regOperPtr->m_tuple_location;
465     rowid.m_page_no = page->frag_page_id;
466     DEB_DELETE_EXTRA(("(%u)tab(%u,%u),DELETE row(%u,%u)",
467                       instance(),
468                       regFragPtr->fragTableId,
469                       regFragPtr->fragmentId,
470                       rowid.m_page_no,
471                       rowid.m_page_idx));
472   }
473 #endif
474   ptr->m_header_bits = bits | extra_bits;
475 
476   if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
477   {
478     jam();
479     update_gci(regFragPtr, regTabPtr, ptr, gci_hi);
480     if (regTabPtr->m_bits & Tablerec::TR_ExtraRowGCIBits)
481     {
482       Uint32 attrId = regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowGCIBits>();
483       store_extra_row_bits(attrId, regTabPtr, ptr, gci_lo, /* truncate */true);
484     }
485   }
486   else
487   {
488     /**
489      * This should be dead code, but we ensure that we don't miss those
490      * updates even for those tables.
491      */
492     jam();
493     regFragPtr->m_lcp_changed_rows++;
494   }
495   Tup_fixsize_page *fix_page = (Tup_fixsize_page*)page;
496   fix_page->set_change_maps(regOperPtr->m_tuple_location.m_page_idx);
497   ndbassert(fix_page->verify_change_maps(jamBuffer()));
498   fix_page->set_max_gci(gci_hi);
499   setInvalidChecksum(ptr, regTabPtr);
500   if (regOperPtr->op_struct.bit_field.m_tuple_existed_at_start)
501   {
502     ndbrequire(regFragPtr->m_row_count > 0);
503     regFragPtr->m_row_count--;
504 #ifdef DEBUG_ROW_COUNT_DEL
505     Local_key rowid = regOperPtr->m_tuple_location;
506     rowid.m_page_no = page->frag_page_id;
507     g_eventLogger->info("(%u) tab(%u,%u) Deleted row(%u,%u)"
508                         ", bits: %x, row_count = %llu"
509                         ", tuple_header_ptr: %p, gci: %u",
510                         instance(),
511                         regFragPtr->fragTableId,
512                         regFragPtr->fragmentId,
513                         rowid.m_page_no,
514                         rowid.m_page_idx,
515                         ptr->m_header_bits,
516                         regFragPtr->m_row_count,
517                         ptr,
518                         gci_hi);
519 #endif
520   }
521 }
522 
523 void
update_gci(Fragrecord * regFragPtr,Tablerec * regTabPtr,Tuple_header * ptr,Uint32 new_gci)524 Dbtup::update_gci(Fragrecord * regFragPtr,
525                   Tablerec * regTabPtr,
526 		  Tuple_header* ptr,
527                   Uint32 new_gci)
528 {
529   /**
530    * Update GCI on the row, also update statistics used by LCP.
531    */
532   Uint32 *gci_ptr = ptr->get_mm_gci(regTabPtr);
533   Uint32 old_gci = *gci_ptr;
534   *gci_ptr = new_gci;
535   if (old_gci <= regFragPtr->m_lcp_start_gci)
536   {
537     jam();
538     regFragPtr->m_lcp_changed_rows++;
539   }
540 }
541 
542 void
handle_lcp_keep_commit(const Local_key * rowid,KeyReqStruct * req_struct,Operationrec * opPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr)543 Dbtup::handle_lcp_keep_commit(const Local_key* rowid,
544                               KeyReqStruct * req_struct,
545                               Operationrec * opPtrP,
546                               Fragrecord * regFragPtr,
547                               Tablerec * regTabPtr)
548 {
549   bool disk = false;
550   /* Coverage tested */
551   Uint32 sizes[4];
552   Uint32 * copytuple = get_copy_tuple_raw(&opPtrP->m_copy_tuple_location);
553   Tuple_header * dst = get_copy_tuple(copytuple);
554   Tuple_header * org = req_struct->m_tuple_ptr;
555   if (regTabPtr->need_expand(disk))
556   {
557     jam();
558     req_struct->fragPtrP = regFragPtr;
559     req_struct->m_row_id = opPtrP->m_tuple_location;
560     req_struct->operPtrP = opPtrP;
561     setup_fixed_tuple_ref(req_struct, opPtrP, regTabPtr);
562     setup_fixed_part(req_struct, opPtrP, regTabPtr);
563     req_struct->m_tuple_ptr = dst;
564     expand_tuple(req_struct, sizes, org, regTabPtr, disk, true);
565     shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
566   }
567   else
568   {
569     jam();
570     memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
571   }
572   dst->m_header_bits |= Tuple_header::COPY_TUPLE;
573 
574   setChecksum(dst, regTabPtr);
575   /**
576    * Link it to list
577    */
578   insert_lcp_keep_list(regFragPtr,
579                        opPtrP->m_copy_tuple_location,
580                        copytuple,
581                        rowid);
582   /**
583    * And finally clear m_copy_tuple_location so that it won't be freed
584    */
585   opPtrP->m_copy_tuple_location.setNull();
586 }
587 
588 #if 0
589 static void dump_buf_hex(unsigned char *p, Uint32 bytes)
590 {
591   char buf[3001];
592   char *q= buf;
593   buf[0]= '\0';
594 
595   for(Uint32 i=0; i<bytes; i++)
596   {
597     if(i==((sizeof(buf)/3)-1))
598     {
599       sprintf(q, "...");
600       break;
601     }
602     sprintf(q+3*i, " %02X", p[i]);
603   }
604   ndbout_c("%8p: %s", p, buf);
605 }
606 #endif
607 
608 /**
609  * Handling COMMIT
610  * ---------------
611  * The most complex part of our operations on a tuple is when we have
612  * multiple row operations on the same tuple within the same operation.
613  * There might even be an insert followed by a delete followed by a
614  * new insert followed by an update! The only operation that isn't
615  * allowed is a DELETE followed by a DELETE and an INSERT followed by
616  * an INSERT and a DELETE followed by an UPDATE.
617  *
618  * Each operation carries with it a copy row. This makes it easy to
619  * commit and abort multi-operations on one tuple within one
620  * transaction.
621  *
622  * At the time of the commit we can have multiple operations in a list
623  * linked from the row. The "surviving" operation is the one which is
624  * last in the list. This is the only operation that will be truly
625  * committed. All other copy rows simply represent intermediate states
626  * in getting to the committed state. The transaction itself can have
627  * seen these uncommitted intermediate states, but no other transaction
628  * have the ability to see those intermediate row states.
629  *
630  * The last operation in the list is the operation linked from the
631  * tuple header. The "last" operation in the list was also the last
632  * operation prepared.
633  *
634  * The last operation in the list will be committed for "real". This means
635  * that the copy row for the last operation will be copied to the rowid of
636  * the row. However the TUX commit triggers are fired on the first operation
637  * in the operation list.
638  *
639  * COMMIT handling of shrinking varpart's
640  * --------------------------------------
641  * The varpart entry header contains the actual length of the varpart
642  * allocated from the page. This size might be equal or bigger than
643  * the size of the varpart to be committed. We will always at COMMIT time
644  * ensure that we shrink it to the minimum size. It migth even be
645  * shrunk to 0 in which case we free the varpart entirely.
646  *
647  * Handling ABORT
648  * --------------
649  * Given that we have a copy tuple for each row it means that it is very
650  * easy to abort operations without aborting the entire transaction. Abort
651  * can happen at any time before the commit has started and abort can
652  * happen either on the entire transaction or on a subset of the transaction.
653  *
654  * One example when we can abort a subset of the transaction is when we get
655  * an LQHKEYREF returned from the backup replica. In this case we did a
656  * successful operation at the primary replica, but at the backup replica
657  * we failed for some reason. There might actually even be multiple operations
658  * outstanding at the same time since we allow for multiple operations within
659  * the same batch to execute in parallel. It is not defined what the end
660  * result will be if such a batch have multiple updates on the same row, but
661  * we still have to ensure that we can handle those cases in a secure manner.
662  *
663  * This also means that the code is prepared to allow for aborting to a
664  * savepoint. However the functionality that handles this will be in DBTC and
665  * is independent of the code here in DBTUP.
666  *
667  * When aborting an operation we simply drop it from the list of operations
668  * on the row and if it is the last then we also restore the header.
669  * This means that an abort operation for a row with multiple changes to it
670  * is really easy, it needs only to drop the operation and drop the copy
671  * row attached to it.
672  *
673  * If we increase the size of the varpart for a row we need to extend the
674  * size. This means that the header of the varpart will contain the new
675  * length. So in order to restore we need to store the original varpart
676  * length somewhere.
677  *
678  * The MM_GROWN bit and its meaning
679  * --------------------------------
680  * During an operation that increases the size of the varpart we might actually
681  * change the location of the committed varpart of the row. To ensure that any
682  * readers of the row that does a COMMITTED READ can still see the original
683  * row size we store this at the last word of the new varpart. We also set the
684  * MM_GROWN bit in the tuple header to indicate this.
685  *
686  * The consequence of this is that an aborted transaction can not have changed
687  * the row content, but it can have changed the place the row is stored. The
688  * actual row content is however only changed when we commit the transaction,
689  * until then the new data is always stored in the copy rows.
690  *
691  * When aborting we need to care about MM_GROWN since then we have to restore
692  * the varpart size by shrinking it. If MM_GROWN is set we might have attempted
693  * to shrink the tuple, but this information is only represented by a smaller
694  * size of the copy row and thus when the copy row is free'd we have done
695  * everything needed to abort this operation.
696  *
697  * Acceptable order of ABORT and COMMIT and WRITE operations
698  * ---------------------------------------------------------
699  * So acceptable order of COMMIT's is that once a COMMIT has arrived on a row
700  * then no ABORT is allowed AND no new WRITE operation on the row in the same
701  * transaction is allowed. When the commit is complete then the row is
702  * unlocked and ready for a new transaction again. COMMIT operations can
703  * arrive in any order.
704  *
705  * Before any operation on the row have received COMMIT we can receive ABORT
706  * operations in any order. TUP have no ability to verify that the upper level
707  * ABORT operations are executed correctly. However since ABORTs can happen in
708  * any order it is only vital that the correct operations are ABORTed, it
709  * doesn't matter in which order they are ABORTed.
710  *
711  * The upper level (mainly TC and LQH) will maintain the correctness when it
712  * comes to transaction concepts.
713  */
714 void
commit_operation(Signal * signal,Uint32 gci_hi,Uint32 gci_lo,Tuple_header * tuple_ptr,PagePtr pagePtr,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Ptr<GlobalPage> globDiskPagePtr)715 Dbtup::commit_operation(Signal* signal,
716 			Uint32 gci_hi,
717                         Uint32 gci_lo,
718 			Tuple_header* tuple_ptr,
719 			PagePtr pagePtr,
720 			Operationrec* regOperPtr,
721 			Fragrecord* regFragPtr,
722 			Tablerec* regTabPtr,
723                         Ptr<GlobalPage> globDiskPagePtr)
724 {
725   ndbassert(regOperPtr->op_type != ZDELETE);
726 
727   Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
728   Uint32 save= tuple_ptr->m_operation_ptr_i;
729   Uint32 bits= tuple_ptr->m_header_bits;
730 
731   Tuple_header *disk_ptr= 0;
732   Tuple_header *copy= get_copy_tuple(&regOperPtr->m_copy_tuple_location);
733 
734   Uint32 copy_bits= copy->m_header_bits;
735 
736   Uint32 fixsize= regTabPtr->m_offsets[MM].m_fix_header_size;
737   Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
738   Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
739   bool update_gci_at_commit = ! regOperPtr->op_struct.bit_field.m_gci_written;
740   if((mm_vars+mm_dyns) == 0)
741   {
742     jam();
743     memcpy(tuple_ptr, copy, 4*fixsize);
744     disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize);
745   }
746   else
747   {
748     jam();
749     /**
750      * Var_part_ref is only stored in *allocated* tuple
751      * so memcpy from copy, will over write it...
752      * hence subtle copyout/assign...
753      */
754     Local_key tmp;
755     Var_part_ref *ref= tuple_ptr->get_var_part_ref_ptr(regTabPtr);
756     ref->copyout(&tmp);
757 
758     memcpy(tuple_ptr, copy, 4*fixsize);
759     ref->assign(&tmp);
760 
761     PagePtr vpagePtr;
762     if (copy_bits & Tuple_header::VAR_PART)
763     {
764       jam();
765       ndbassert(bits & Tuple_header::VAR_PART);
766       ndbassert(tmp.m_page_no != RNIL);
767       ndbassert(copy_bits & Tuple_header::COPY_TUPLE);
768 
769       Uint32 *dst= get_ptr(&vpagePtr, *ref);
770       Var_page* vpagePtrP = (Var_page*)vpagePtr.p;
771       Varpart_copy*vp =(Varpart_copy*)copy->get_end_of_fix_part_ptr(regTabPtr);
772       /* The first word of shrunken tuple holds the length in words. */
773       Uint32 len = vp->m_len;
774       memcpy(dst, vp->m_data, 4*len);
775 
776       /**
777        * When we come here we will commit a varpart with length specified in
778        * the copy tuple.
779        *
780        * The length in the page entry specifies the length we have allocated.
781        * This means that the page entry length either specifies the original
782        * length or the length that we allocated when growing the varsize part
783        * of the tuple.
784        *
785        * The following cases exists:
786        * 1) MM_GROWN not set
787        *    Since MM_GROWN is never set then we have never extended the length
788        *    of the varpart. We might however have executed one operation that
789        *    shrunk the varpart size followed by an operation that grew the
790        *    varpart again. It can however not have grown to be bigger than the
791        *    original size since then MM_GROWN would be set.
792        *
793        *    The new varpart length might thus in this case be smaller than the
794        *    page entry length.
795        *
796        * 2) MM_GROWN set
797        *    In this case we have extended the varpart size in some operation.
798        *
799        *    If no more operation was performed after that then the page entry
800        *    length and the committed varpart length will be equal. However if
801        *    more operations are executed after this operation then they might
802        *    decrease the varpart length without updating the page entry length.
803        *    So also in this case we might actually have a smaller committed
804        *    varpart length compared to the current page entry length.
805        *
806        * So the conclusion is that when we arrive here we can always have a
807        * smaller committed varpart length compared to the page entry length.
808        * So we always need to check whether we should shrink the varpart
809        * entry to the committed length. The new committed length might even
810        * be zero in which case we should release the varpart entirely.
811        *
812        * We need to check this independent of if MM_GROWN is set or not as
813        * there might be multiple row operations both increasing and
814        * shrinking the tuple.
815        */
816       ndbassert(vpagePtrP->get_entry_len(tmp.m_page_idx) >= len);
817       if (vpagePtrP->get_entry_len(tmp.m_page_idx) > len)
818       {
819         /**
820          * Page entry is now bigger than it needs to be, we are committing
821          * and can thus shrink the entry to its correct size now.
822          */
823         jam();
824         if (len)
825         {
826           jam();
827           ndbassert(regFragPtr->m_varWordsFree >= vpagePtrP->free_space);
828           regFragPtr->m_varWordsFree -= vpagePtrP->free_space;
829           vpagePtrP->shrink_entry(tmp.m_page_idx, len);
830           // Adds the new free space value for the page to the fragment total.
831           update_free_page_list(regFragPtr, vpagePtr);
832         }
833         else
834         {
835           jam();
836           /**
837            * We have shrunk the varsize part down to zero, so in this case
838            * we don't shrink it, in this case we simply free it.
839            */
840           free_var_part(regFragPtr, vpagePtr, tmp.m_page_idx);
841           tmp.m_page_no = RNIL;
842           ref->assign(&tmp);
843           copy_bits &= ~(Uint32)Tuple_header::VAR_PART;
844         }
845       }
846       /**
847        * Find disk part after
848        * header + fixed MM part + length word + varsize part.
849        */
850       disk_ptr = (Tuple_header*)(vp->m_data + len);
851     }
852     else
853     {
854       jam();
855       ndbassert(tmp.m_page_no == RNIL);
856       disk_ptr = (Tuple_header*)copy->get_end_of_fix_part_ptr(regTabPtr);
857     }
858   }
859 
860   if (regTabPtr->m_no_of_disk_attributes &&
861       (copy_bits & Tuple_header::DISK_INLINE))
862   {
863     jam();
864     Local_key key;
865     memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
866     Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
867 
868     PagePtr diskPagePtr((Tup_page*)globDiskPagePtr.p, globDiskPagePtr.i);
869     ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
870     ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
871     Uint32 sz, *dst;
872     if(copy_bits & Tuple_header::DISK_ALLOC)
873     {
874       jam();
875       Local_key rowid = regOperPtr->m_tuple_location;
876       rowid.m_page_no = pagePtr.p->frag_page_id;
877       disk_page_alloc(signal,
878                       regTabPtr,
879                       regFragPtr,
880                       &key,
881                       diskPagePtr,
882                       gci_hi,
883                       &rowid,
884                       regOperPtr->m_undo_buffer_space);
885     }
886 
887     if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
888     {
889       jam();
890       sz= regTabPtr->m_offsets[DD].m_fix_header_size;
891       dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
892     }
893     else
894     {
895       jam();
896       dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
897       sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
898     }
899 
900     if(! (copy_bits & Tuple_header::DISK_ALLOC))
901     {
902       jam();
903 #ifdef DEBUG_PGMAN
904       Uint64 lsn =
905 #endif
906         disk_page_undo_update(signal,
907                               diskPagePtr.p,
908                               &key,
909                               dst,
910                               sz,
911                               gci_hi,
912                               logfile_group_id,
913                               regOperPtr->m_undo_buffer_space);
914       DEB_PGMAN(("disk_page_undo_update: page(%u,%u,%u).%u, LSN(%u,%u), gci: %u",
915                 instance(),
916                 key.m_file_no,
917                 key.m_page_no,
918                 key.m_page_idx,
919                 Uint32(Uint64(lsn >> 32)),
920                 Uint32(Uint64(lsn & 0xFFFFFFFF)),
921                 gci_hi));
922     }
923 
924     memcpy(dst, disk_ptr, 4*sz);
925     memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
926 
927     ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
928     copy_bits |= Tuple_header::DISK_PART;
929   }
930 
931 #ifdef DEBUG_INSERT_EXTRA
932   if (c_started)
933   {
934     Local_key rowid = regOperPtr->m_tuple_location;
935     rowid.m_page_no = pagePtr.p->frag_page_id;
936     g_eventLogger->info("(%u)tab(%u,%u) commit row(%u,%u)",
937                         instance(),
938                         regFragPtr->fragTableId,
939                         regFragPtr->fragmentId,
940                         rowid.m_page_no,
941                         rowid.m_page_idx);
942   }
943 #endif
944   Uint32 lcp_bits = 0;
945   if (lcpScan_ptr_i != RNIL &&
946       (bits & Tuple_header::ALLOC) &&
947       !(bits & (Tuple_header::LCP_SKIP | Tuple_header::LCP_DELETE)))
948   {
949     jam();
950     ScanOpPtr scanOp;
951     scanOp.i = lcpScan_ptr_i;
952     ndbrequire(c_scanOpPool.getValidPtr(scanOp));
953     Local_key rowid = regOperPtr->m_tuple_location;
954     rowid.m_page_no = pagePtr.p->frag_page_id;
955     if (is_rowid_in_remaining_lcp_set(pagePtr.p,
956                                       regFragPtr,
957                                       rowid,
958                                       *scanOp.p,
959                                       0))
960     {
961       bool all_part;
962       ndbrequire(c_backup->is_page_lcp_scanned(rowid.m_page_no,
963                                                all_part) != +1);
964       if (all_part)
965       {
966         /**
967          * Rows that are inserted during LCPs are never required to be
968          * recorded as part of the LCP, this can be avoided in multiple ways,
969          * in this case we avoid it by setting bit on Tuple header.
970          */
971         jam();
972         /* Coverage tested */
973         lcp_bits |= Tuple_header::LCP_SKIP;
974         DEB_LCP_SKIP(("(%u)Set LCP_SKIP on tab(%u,%u), row(%u,%u)",
975                       instance(),
976                       regFragPtr->fragTableId,
977                       regFragPtr->fragmentId,
978                       rowid.m_page_no,
979                       rowid.m_page_idx));
980       }
981       else
982       {
983         jam();
984         /**
985          * The row state at start of LCP was deleted, so we need to record
986          * this to ensure that it doesn't disappear with a later insert
987          * operation.
988          */
989         /* Coverage tested */
990         DEB_LCP_DEL(("(%u)Set LCP_DELETE on tab(%u,%u), row(%u,%u)",
991                      instance(),
992                      regFragPtr->fragTableId,
993                      regFragPtr->fragmentId,
994                      rowid.m_page_no,
995                      rowid.m_page_idx));
996         ndbrequire(c_backup->is_partial_lcp_enabled());
997         lcp_bits |= Tuple_header::LCP_DELETE;
998       }
999     }
1000   }
1001 
1002   /**
1003    * Here we are copying header bits from the copy row to the main row.
1004    * We need to ensure that a few bits are retained from the main row
1005    * that are not necessarily set in the copy row.
1006    *
1007    * For example a row could have its LCP_SKIP set when it is updated
1008    * or deleted before the LCP reaches it. After deleting it is important
1009    * not to clear these when starting a new insert on the same row id.
1010    * This is handled in DbtupExecQuery.cpp. Here we can be committing the
1011    * same insert, so again it is important to not lose the LCP bits
1012    * on the main row. The LCP bits are never needed on the copy row since
1013    * the LCP only cares about the main rows. The LCP can even change
1014    * the LCP bits between prepare and commit of a row change. Thus it is
1015    * important to not lose the LCP_SKIP bit here.
1016    *
1017    * Similarly for LCP_DELETE we might lose the state after coming here
1018    * again before the LCP have had time to come and reset the bits.
1019    *
1020    * Similarly it is very important to not transport those bits from the
1021    * copy row back to the main row. These bits should only be used in the
1022    * main row and we should never take those bits from the copy row back
1023    * to the main row.
1024    */
1025 
1026   Uint32 clear=
1027     Tuple_header::ALLOC | Tuple_header::FREE | Tuple_header::COPY_TUPLE |
1028     Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE |
1029     Tuple_header::MM_GROWN | Tuple_header::LCP_SKIP |
1030     Tuple_header::LCP_DELETE;
1031   copy_bits &= ~(Uint32)clear;
1032   lcp_bits |= (bits & (Tuple_header::LCP_SKIP | Tuple_header::LCP_DELETE));
1033 
1034   tuple_ptr->m_header_bits= copy_bits | lcp_bits;
1035   tuple_ptr->m_operation_ptr_i= save;
1036 
1037   Tup_fixsize_page *fix_page = (Tup_fixsize_page*)pagePtr.p;
1038   fix_page->set_change_maps(regOperPtr->m_tuple_location.m_page_idx);
1039   fix_page->set_max_gci(gci_hi);
1040   ndbassert(fix_page->verify_change_maps(jamBuffer()));
1041 
1042   if (regTabPtr->m_bits & Tablerec::TR_RowGCI &&
1043       update_gci_at_commit)
1044   {
1045     jam();
1046     update_gci(regFragPtr, regTabPtr, tuple_ptr, gci_hi);
1047     if (regTabPtr->m_bits & Tablerec::TR_ExtraRowGCIBits)
1048     {
1049       jam();
1050       Uint32 attrId = regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowGCIBits>();
1051       store_extra_row_bits(attrId, regTabPtr, tuple_ptr, gci_lo,
1052                            /* truncate */true);
1053     }
1054   }
1055   else
1056   {
1057     /**
1058      * This should be dead code, but we ensure that we don't miss those
1059      * updates even for those tables.
1060      *
1061      * In case of an explicit GCI update we always increment number of changed rows
1062      * to ensure we don't miss any updates.
1063      */
1064     jam();
1065     regFragPtr->m_lcp_changed_rows++;
1066   }
1067   setChecksum(tuple_ptr, regTabPtr);
1068   Uint32 average_row_size = regFragPtr->m_average_row_size;
1069   if (!regOperPtr->op_struct.bit_field.m_tuple_existed_at_start)
1070   {
1071     regFragPtr->m_row_count++;
1072     c_lqh->add_insert_size(average_row_size);
1073 #ifdef DEBUG_ROW_COUNT_INS
1074     Local_key rowid = regOperPtr->m_tuple_location;
1075     rowid.m_page_no = pagePtr.p->frag_page_id;
1076     g_eventLogger->info("(%u) tab(%u,%u) Inserted row(%u,%u)"
1077                         ", bits: %x, row_count = %llu, tuple_ptr: %p, gci: %u",
1078                         instance(),
1079                         regFragPtr->fragTableId,
1080                         regFragPtr->fragmentId,
1081                         rowid.m_page_no,
1082                         rowid.m_page_idx,
1083                         tuple_ptr->m_header_bits,
1084                         regFragPtr->m_row_count,
1085                         tuple_ptr,
1086                         gci_hi);
1087 #endif
1088   }
1089   else
1090   {
1091     c_lqh->add_update_size(average_row_size);
1092   }
1093 }
1094 
1095 void
disk_page_commit_callback(Signal * signal,Uint32 opPtrI,Uint32 page_id)1096 Dbtup::disk_page_commit_callback(Signal* signal,
1097 				 Uint32 opPtrI, Uint32 page_id)
1098 {
1099   Uint32 hash_value;
1100   Uint32 gci_hi, gci_lo;
1101   Uint32 transId1, transId2;
1102   OperationrecPtr regOperPtr;
1103   Ptr<GlobalPage> diskPagePtr;
1104 
1105   jamEntry();
1106 
1107   regOperPtr.i = opPtrI;
1108   ndbrequire(c_operation_pool.getValidPtr(regOperPtr));
1109   c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo,
1110                      &transId1, &transId2);
1111 
1112   TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
1113 
1114   tupCommitReq->opPtr= opPtrI;
1115   tupCommitReq->hashValue= hash_value;
1116   tupCommitReq->gci_hi= gci_hi;
1117   tupCommitReq->gci_lo= gci_lo;
1118   tupCommitReq->diskpage = page_id;
1119   tupCommitReq->transId1 = transId1;
1120   tupCommitReq->transId2 = transId2;
1121 
1122   regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit= 0;
1123   regOperPtr.p->m_commit_disk_callback_page= page_id;
1124   m_global_page_pool.getPtr(diskPagePtr, page_id);
1125 
1126   {
1127     PagePtr tmp;
1128     tmp.i = diskPagePtr.i;
1129     tmp.p = reinterpret_cast<Page*>(diskPagePtr.p);
1130     disk_page_set_dirty(tmp);
1131   }
1132 
1133   execTUP_COMMITREQ(signal);
1134   if(signal->theData[0] == 0)
1135   {
1136     jam();
1137     c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
1138   }
1139 }
1140 
1141 void
disk_page_log_buffer_callback(Signal * signal,Uint32 opPtrI,Uint32 unused)1142 Dbtup::disk_page_log_buffer_callback(Signal* signal,
1143 				     Uint32 opPtrI,
1144 				     Uint32 unused)
1145 {
1146   Uint32 hash_value;
1147   Uint32 gci_hi, gci_lo;
1148   Uint32 transId1, transId2;
1149   OperationrecPtr regOperPtr;
1150 
1151   jamEntry();
1152 
1153   regOperPtr.i = opPtrI;
1154   ndbrequire(c_operation_pool.getValidPtr(regOperPtr));
1155   c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo,
1156                      &transId1, &transId2);
1157   Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
1158 
1159   TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
1160 
1161   tupCommitReq->opPtr= opPtrI;
1162   tupCommitReq->hashValue= hash_value;
1163   tupCommitReq->gci_hi= gci_hi;
1164   tupCommitReq->gci_lo= gci_lo;
1165   tupCommitReq->diskpage = page;
1166   tupCommitReq->transId1 = transId1;
1167   tupCommitReq->transId2 = transId2;
1168 
1169   ndbassert(regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit == 0);
1170   regOperPtr.p->op_struct.bit_field.m_wait_log_buffer= 0;
1171 
1172   execTUP_COMMITREQ(signal);
1173   ndbassert(signal->theData[0] == 0);
1174 
1175   c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
1176 }
1177 
retrieve_data_page(Signal * signal,Page_cache_client::Request req,OperationrecPtr regOperPtr,Ptr<GlobalPage> & diskPagePtr,Fragrecord * fragPtrP)1178 int Dbtup::retrieve_data_page(Signal *signal,
1179                               Page_cache_client::Request req,
1180                               OperationrecPtr regOperPtr,
1181                               Ptr<GlobalPage> &diskPagePtr,
1182                               Fragrecord *fragPtrP)
1183 {
1184   req.m_callback.m_callbackData= regOperPtr.i;
1185   req.m_table_id = fragPtrP->fragTableId;
1186   req.m_fragment_id = fragPtrP->fragmentId;
1187   req.m_callback.m_callbackFunction =
1188     safe_cast(&Dbtup::disk_page_commit_callback);
1189 
1190   /*
1191    * Consider commit to be correlated.  Otherwise pk op + commit makes
1192    * the page hot.   XXX move to TUP which knows better.
1193    */
1194   int flags= regOperPtr.p->op_type |
1195     Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
1196   Page_cache_client pgman(this, c_pgman);
1197   int res= pgman.get_page(signal, req, flags);
1198   diskPagePtr = pgman.m_ptr;
1199 
1200   switch(res){
1201   case 0:
1202     /**
1203      * Timeslice
1204      */
1205     jam();
1206     signal->theData[0] = 1;
1207     return res;
1208   case -1:
1209     ndbrequire("NOT YET IMPLEMENTED" == 0);
1210     break;
1211   default:
1212     ndbrequire(res > 0);
1213     jam();
1214   }
1215   {
1216     PagePtr tmpptr;
1217     tmpptr.i = diskPagePtr.i;
1218     tmpptr.p = reinterpret_cast<Page*>(diskPagePtr.p);
1219 
1220     disk_page_set_dirty(tmpptr);
1221   }
1222   regOperPtr.p->m_commit_disk_callback_page= res;
1223   regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit= 0;
1224 
1225   return res;
1226 }
1227 
retrieve_log_page(Signal * signal,FragrecordPtr regFragPtr,OperationrecPtr regOperPtr)1228 int Dbtup::retrieve_log_page(Signal *signal,
1229                              FragrecordPtr regFragPtr,
1230                              OperationrecPtr regOperPtr)
1231 {
1232   jam();
1233   /**
1234    * Only last op on tuple needs "real" commit,
1235    *   hence only this one should have m_wait_log_buffer
1236    */
1237 
1238   CallbackPtr cb;
1239   cb.m_callbackData= regOperPtr.i;
1240   cb.m_callbackIndex = DISK_PAGE_LOG_BUFFER_CALLBACK;
1241   Uint32 sz= regOperPtr.p->m_undo_buffer_space;
1242 
1243   int res;
1244   {
1245     D("Logfile_client - execTUP_COMMITREQ");
1246     Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
1247     res= lgman.get_log_buffer(signal, sz, &cb);
1248   }
1249   jamEntry();
1250   switch(res){
1251   case 0:
1252     jam();
1253     signal->theData[0] = 1;
1254     return res;
1255   case -1:
1256     g_eventLogger->warning("Out of space in RG_TRANSACTION_MEMORY resource,"
1257                            " increase config parameter GlobalSharedMemory");
1258     ndbrequire("NOT YET IMPLEMENTED" == 0);
1259     break;
1260   default:
1261     jam();
1262   }
1263   regOperPtr.p->op_struct.bit_field.m_wait_log_buffer= 0;
1264 
1265   return res;
1266 }
1267 
1268 /**
1269  * Move to the first operation performed on this tuple
1270  */
1271 void
findFirstOp(OperationrecPtr & firstPtr)1272 Dbtup::findFirstOp(OperationrecPtr & firstPtr)
1273 {
1274   jam();
1275   printf("Detect out-of-order commit(%u) -> ", firstPtr.i);
1276   ndbassert(!firstPtr.p->is_first_operation());
1277   while(firstPtr.p->prevActiveOp != RNIL)
1278   {
1279     firstPtr.i = firstPtr.p->prevActiveOp;
1280     ndbrequire(c_operation_pool.getValidPtr(firstPtr));
1281   }
1282   ndbout_c("%u", firstPtr.i);
1283 }
1284 
1285 /* ----------------------------------------------------------------- */
1286 /* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
1287 /* ----------------------------------------------------------------- */
execTUP_COMMITREQ(Signal * signal)1288 void Dbtup::execTUP_COMMITREQ(Signal* signal)
1289 {
1290   FragrecordPtr regFragPtr;
1291   OperationrecPtr regOperPtr;
1292   TablerecPtr regTabPtr;
1293   KeyReqStruct req_struct(this, KRS_COMMIT);
1294   TransState trans_state;
1295   Ptr<GlobalPage> diskPagePtr;
1296   Uint32 no_of_fragrec, no_of_tablerec;
1297 
1298   TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
1299 
1300   regOperPtr.i= tupCommitReq->opPtr;
1301   Uint32 hash_value= tupCommitReq->hashValue;
1302   Uint32 gci_hi = tupCommitReq->gci_hi;
1303   Uint32 gci_lo = tupCommitReq->gci_lo;
1304   Uint32 transId1 = tupCommitReq->transId1;
1305   Uint32 transId2 = tupCommitReq->transId2;
1306 
1307   jamEntry();
1308 
1309   ndbrequire(c_operation_pool.getUncheckedPtrRW(regOperPtr));
1310 
1311   diskPagePtr.i = tupCommitReq->diskpage;
1312   regFragPtr.i= regOperPtr.p->fragmentPtr;
1313   no_of_fragrec= cnoOfFragrec;
1314   no_of_tablerec= cnoOfTablerec;
1315 
1316   req_struct.signal= signal;
1317   req_struct.hash_value= hash_value;
1318   req_struct.gci_hi = gci_hi;
1319   req_struct.gci_lo = gci_lo;
1320 
1321   ndbrequire(Magic::check_ptr(regOperPtr.p));
1322   trans_state= get_trans_state(regOperPtr.p);
1323 
1324 
1325   ndbrequire(trans_state == TRANS_STARTED);
1326   ptrCheckGuard(regFragPtr, no_of_fragrec, fragrecord);
1327 
1328   regTabPtr.i= regFragPtr.p->fragTableId;
1329 
1330   /* Put transid in req_struct, so detached triggers can access it */
1331   req_struct.trans_id1 = transId1;
1332   req_struct.trans_id2 = transId2;
1333   req_struct.m_reorg = regOperPtr.p->op_struct.bit_field.m_reorg;
1334   regOperPtr.p->m_commit_disk_callback_page = tupCommitReq->diskpage;
1335 
1336   ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
1337   PagePtr page;
1338   Tuple_header* tuple_ptr= (Tuple_header*)
1339     get_ptr(&page, &regOperPtr.p->m_tuple_location, regTabPtr.p);
1340 
1341   Tup_fixsize_page *fix_page = (Tup_fixsize_page*)page.p;
1342   fix_page->prefetch_change_map();
1343   NDB_PREFETCH_WRITE(tuple_ptr);
1344 
1345   if (diskPagePtr.i == RNIL)
1346   {
1347     jam();
1348     diskPagePtr.p = 0;
1349     req_struct.m_disk_page_ptr.i = RNIL;
1350     req_struct.m_disk_page_ptr.p = 0;
1351   }
1352   else
1353   {
1354     m_global_page_pool.getPtr(diskPagePtr, diskPagePtr.i);
1355   }
1356 
1357   ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
1358 
1359   prepare_fragptr = regFragPtr;
1360   prepare_tabptr = regTabPtr;
1361 
1362   /**
1363    * NOTE: This has to be run before potential time-slice when
1364    *       waiting for disk, as otherwise the "other-ops" in a multi-op
1365    *       commit might run while we're waiting for disk
1366    *
1367    */
1368   if (!regTabPtr.p->tuxCustomTriggers.isEmpty())
1369   {
1370     if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
1371     {
1372       jam();
1373 
1374       OperationrecPtr loopPtr = regOperPtr;
1375       if (unlikely(!regOperPtr.p->is_first_operation()))
1376       {
1377         findFirstOp(loopPtr);
1378       }
1379 
1380       /**
1381        * Execute all tux triggers at first commit
1382        *   since previous tuple is otherwise removed...
1383        */
1384       jam();
1385       goto first;
1386       while(loopPtr.i != RNIL)
1387       {
1388         ndbrequire(c_operation_pool.getValidPtr(loopPtr));
1389     first:
1390 	executeTuxCommitTriggers(signal,
1391 				 loopPtr.p,
1392 				 regFragPtr.p,
1393 				 regTabPtr.p);
1394 	set_tuple_state(loopPtr.p, TUPLE_TO_BE_COMMITTED);
1395 	loopPtr.i = loopPtr.p->nextActiveOp;
1396       }
1397     }
1398   }
1399 
1400   bool get_page = false;
1401   bool initial_delete = false;
1402   if(regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit)
1403   {
1404     jam();
1405     Page_cache_client::Request req;
1406 
1407     /**
1408      * Only last op on tuple needs "real" commit,
1409      *   hence only this one should have m_load_diskpage_on_commit
1410      */
1411     ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
1412 
1413     /**
1414      * Check for page
1415      */
1416     if(!regOperPtr.p->m_copy_tuple_location.isNull())
1417     {
1418       jam();
1419       Tuple_header* tmp= get_copy_tuple(&regOperPtr.p->m_copy_tuple_location);
1420 
1421       memcpy(&req.m_page,
1422 	     tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
1423 
1424       if (unlikely(regOperPtr.p->op_type == ZDELETE &&
1425 		   tmp->m_header_bits & Tuple_header::DISK_ALLOC))
1426       {
1427         jam();
1428 	/**
1429 	 * Insert+Delete
1430          * In this case we want to release the Copy page tuple that was
1431          * allocated for the insert operation since the commit of the
1432          * delete operation here makes it unnecessary to save the
1433          * new record.
1434 	 */
1435         regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit = 0;
1436         regOperPtr.p->op_struct.bit_field.m_wait_log_buffer = 0;
1437         disk_page_abort_prealloc(signal, regFragPtr.p,
1438 				 &req.m_page, req.m_page.m_page_idx);
1439 
1440         {
1441           D("Logfile_client - execTUP_COMMITREQ");
1442           Logfile_client lgman(this,
1443                                c_lgman,
1444                                regFragPtr.p->m_logfile_group_id);
1445           lgman.free_log_space(regOperPtr.p->m_undo_buffer_space,
1446                                jamBuffer());
1447         }
1448 	goto skip_disk;
1449       }
1450     }
1451     else
1452     {
1453       jam();
1454       // initial delete
1455       initial_delete = true;
1456       ndbassert(regOperPtr.p->op_type == ZDELETE);
1457       memcpy(&req.m_page,
1458 	     tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
1459 
1460       ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
1461     }
1462 
1463     if (retrieve_data_page(signal,
1464                            req,
1465                            regOperPtr,
1466                            diskPagePtr,
1467                            regFragPtr.p) == 0)
1468     {
1469       if (!initial_delete)
1470       {
1471         jam();
1472       }
1473       else
1474       {
1475         jam();
1476         /* Set bit to indicate the tuple is already deleted */
1477         Uint32 old_header = tuple_ptr->m_header_bits;
1478         Uint32 new_header = tuple_ptr->m_header_bits =
1479           old_header | Tuple_header::DELETE_WAIT;
1480         updateChecksum(tuple_ptr, regTabPtr.p, old_header, new_header);
1481       }
1482       signal->theData[0] = 1; //Ensure we report real-time break
1483       return; // Data page has not been retrieved yet.
1484     }
1485     get_page = true;
1486   }
1487 
1488   if(regOperPtr.p->op_struct.bit_field.m_wait_log_buffer)
1489   {
1490     jam();
1491     /**
1492      * Only last op on tuple needs "real" commit,
1493      *   hence only this one should have m_wait_log_buffer
1494      */
1495     ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
1496 
1497     if (retrieve_log_page(signal, regFragPtr, regOperPtr) == 0)
1498     {
1499       if (!initial_delete)
1500       {
1501         jam();
1502       }
1503       else
1504       {
1505         jam();
1506         /* Set bit to indicate the tuple is already deleted */
1507         Uint32 old_header = tuple_ptr->m_header_bits;
1508         Uint32 new_header = tuple_ptr->m_header_bits =
1509           old_header | Tuple_header::DELETE_WAIT;
1510         updateChecksum(tuple_ptr, regTabPtr.p, old_header, new_header);
1511       }
1512       signal->theData[0] = 1; //Ensure we report real-time break
1513       return; // Log page has not been retrieved yet.
1514     }
1515   }
1516 
1517   assert(tuple_ptr);
1518 skip_disk:
1519   req_struct.m_tuple_ptr = tuple_ptr;
1520 
1521   Uint32 nextOp = regOperPtr.p->nextActiveOp;
1522   Uint32 prevOp = regOperPtr.p->prevActiveOp;
1523   /**
1524    * The trigger code (which is shared between detached/imediate)
1525    *   check op-list to check were to read before values from
1526    *   detached triggers should always read from original tuple value
1527    *   from before transaction start, not from any intermediate update
1528    *
1529    * Setting the op-list has this effect
1530    */
1531   regOperPtr.p->nextActiveOp = RNIL;
1532   regOperPtr.p->prevActiveOp = RNIL;
1533   if(tuple_ptr->m_operation_ptr_i == regOperPtr.i)
1534   {
1535     jam();
1536     /**
1537      * Perform "real" commit
1538      */
1539     Uint32 disk = regOperPtr.p->m_commit_disk_callback_page;
1540     set_commit_change_mask_info(regTabPtr.p, &req_struct, regOperPtr.p);
1541     checkDetachedTriggers(&req_struct,
1542                           regOperPtr.p,
1543                           regTabPtr.p,
1544                           disk != RNIL,
1545                           diskPagePtr.i);
1546 
1547     tuple_ptr->m_operation_ptr_i = RNIL;
1548 
1549     if (regOperPtr.p->op_type == ZDELETE)
1550     {
1551       jam();
1552       if (get_page)
1553       {
1554         ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
1555       }
1556       dealloc_tuple(signal,
1557                     gci_hi,
1558                     gci_lo,
1559                     page.p,
1560                     tuple_ptr,
1561                     &req_struct,
1562                     regOperPtr.p,
1563                     regFragPtr.p,
1564                     regTabPtr.p,
1565                     diskPagePtr);
1566     }
1567     else if(regOperPtr.p->op_type != ZREFRESH)
1568     {
1569       jam();
1570       commit_operation(signal,
1571                        gci_hi,
1572                        gci_lo,
1573                        tuple_ptr,
1574                        page,
1575 		       regOperPtr.p,
1576                        regFragPtr.p,
1577                        regTabPtr.p,
1578                        diskPagePtr);
1579     }
1580     else
1581     {
1582       jam();
1583       commit_refresh(signal,
1584                      gci_hi,
1585                      gci_lo,
1586                      tuple_ptr,
1587                      page,
1588                      &req_struct,
1589                      regOperPtr.p,
1590                      regFragPtr.p,
1591                      regTabPtr.p,
1592                      diskPagePtr);
1593     }
1594   }
1595 
1596   if (nextOp != RNIL)
1597   {
1598     OperationrecPtr opPtr;
1599     opPtr.i = nextOp;
1600     ndbrequire(c_operation_pool.getValidPtr(opPtr));
1601     opPtr.p->prevActiveOp = prevOp;
1602   }
1603 
1604   if (prevOp != RNIL)
1605   {
1606     OperationrecPtr opPtr;
1607     opPtr.i = prevOp;
1608     ndbrequire(c_operation_pool.getValidPtr(opPtr));
1609     opPtr.p->nextActiveOp = nextOp;
1610   }
1611 
1612   if(!regOperPtr.p->m_copy_tuple_location.isNull())
1613   {
1614     jam();
1615     c_undo_buffer.free_copy_tuple(&regOperPtr.p->m_copy_tuple_location);
1616   }
1617 
1618   regFragPtr.p->m_committed_changes++;
1619 
1620   initOpConnection(regOperPtr.p);
1621   signal->theData[0] = 0;
1622 }
1623 
1624 void
set_commit_change_mask_info(const Tablerec * regTabPtr,KeyReqStruct * req_struct,const Operationrec * regOperPtr)1625 Dbtup::set_commit_change_mask_info(const Tablerec* regTabPtr,
1626                                    KeyReqStruct * req_struct,
1627                                    const Operationrec * regOperPtr)
1628 {
1629   Uint32 masklen = (regTabPtr->m_no_of_attributes + 31) >> 5;
1630   if (regOperPtr->m_copy_tuple_location.isNull())
1631   {
1632     ndbassert(regOperPtr->op_type == ZDELETE);
1633     req_struct->changeMask.set();
1634   }
1635   else
1636   {
1637     Uint32 * dst = req_struct->changeMask.rep.data;
1638     Uint32 * rawptr = get_copy_tuple_raw(&regOperPtr->m_copy_tuple_location);
1639     ChangeMask * maskptr = get_change_mask_ptr(rawptr);
1640     Uint32 cols = maskptr->m_cols;
1641     if (cols == regTabPtr->m_no_of_attributes)
1642     {
1643       memcpy(dst, maskptr->m_mask, 4*masklen);
1644     }
1645     else
1646     {
1647       ndbassert(regTabPtr->m_no_of_attributes > cols); // no drop column
1648       memcpy(dst, maskptr->m_mask, 4*((cols + 31) >> 5));
1649       req_struct->changeMask.setRange(cols,
1650                                       regTabPtr->m_no_of_attributes - cols);
1651     }
1652   }
1653 }
1654 
1655 void
commit_refresh(Signal * signal,Uint32 gci_hi,Uint32 gci_lo,Tuple_header * tuple_ptr,PagePtr pagePtr,KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Ptr<GlobalPage> diskPagePtr)1656 Dbtup::commit_refresh(Signal* signal,
1657                       Uint32 gci_hi,
1658                       Uint32 gci_lo,
1659                       Tuple_header* tuple_ptr,
1660                       PagePtr pagePtr,
1661                       KeyReqStruct * req_struct,
1662                       Operationrec* regOperPtr,
1663                       Fragrecord* regFragPtr,
1664                       Tablerec* regTabPtr,
1665                       Ptr<GlobalPage> diskPagePtr)
1666 {
1667   /* Committing a refresh operation.
1668    * Refresh of an existing row looks like an update
1669    * and can commit normally.
1670    * Refresh of a non-existing row looks like an Insert which
1671    * is 'undone' at commit time.
1672    * This is achieved by making special calls to ACC to get
1673    * it to forget, before deallocating the tuple locally.
1674    */
1675   switch(regOperPtr->m_copy_tuple_location.m_file_no){
1676   case Operationrec::RF_SINGLE_NOT_EXIST:
1677   case Operationrec::RF_MULTI_NOT_EXIST:
1678     break;
1679   case Operationrec::RF_SINGLE_EXIST:
1680   case Operationrec::RF_MULTI_EXIST:
1681     // "Normal" update
1682     commit_operation(signal,
1683                      gci_hi,
1684                      gci_lo,
1685                      tuple_ptr,
1686                      pagePtr,
1687                      regOperPtr,
1688                      regFragPtr,
1689                      regTabPtr,
1690                      diskPagePtr);
1691     return;
1692 
1693   default:
1694     ndbabort();
1695   }
1696 
1697   Local_key key = regOperPtr->m_tuple_location;
1698   key.m_page_no = pagePtr.p->frag_page_id;
1699 
1700   /**
1701    * Tell ACC to delete
1702    */
1703   c_lqh->accremoverow(signal, regOperPtr->userpointer, &key);
1704   dealloc_tuple(signal,
1705                 gci_hi,
1706                 gci_lo,
1707                 pagePtr.p,
1708                 tuple_ptr,
1709                 req_struct,
1710                 regOperPtr,
1711                 regFragPtr,
1712                 regTabPtr,
1713                 diskPagePtr);
1714 }
1715