1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #define DBTUP_C
26 #define DBTUP_COMMIT_CPP
27 #include "Dbtup.hpp"
28 #include <RefConvert.hpp>
29 #include <ndb_limits.h>
30 #include <pc.hpp>
31 #include <signaldata/TupCommit.hpp>
32 #include <EventLogger.hpp>
33 #include "../dblqh/Dblqh.hpp"
34
35 #define JAM_FILE_ID 416
36
37 extern EventLogger *g_eventLogger;
38
39 #if (defined(VM_TRACE) || defined(ERROR_INSERT))
40 //#define DEBUG_LCP 1
41 //#define DEBUG_LCP_SKIP_DELETE_EXTRA 1
42 //#define DEBUG_INSERT_EXTRA 1
43 //#define DEBUG_LCP_SCANNED_BIT 1
44 //#define DEBUG_PGMAN 1
45 //#define DEBUG_ROW_COUNT_DEL 1
46 //#define DEBUG_ROW_COUNT_INS 1
47 //#define DEBUG_DELETE 1
48 //#define DEBUG_DELETE_EXTRA 1
49 //#define DEBUG_LCP_SKIP_DELETE2 1
50 //#define DEBUG_LCP_DEL 1
51 //#define DEBUG_LCP_SKIP 1
52 //#define DEBUG_LCP_SKIP_DELETE 1
53 #endif
54
55 #ifdef DEBUG_LCP
56 #define DEB_LCP(arglist) do { g_eventLogger->info arglist ; } while (0)
57 #else
58 #define DEB_LCP(arglist) do { } while (0)
59 #endif
60
61 #ifdef DEBUG_DELETE_EXTRA
62 #define DEB_DELETE_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
63 #else
64 #define DEB_DELETE_EXTRA(arglist) do { } while (0)
65 #endif
66
67 #ifdef DEBUG_INSERT_EXTRA
68 #define DEB_INSERT_EXTRA(arglist) do { g_eventLogger->info arglist ; } while (0)
69 #else
70 #define DEB_INSERT_EXTRA(arglist) do { } while (0)
71 #endif
72
73 #ifdef DEBUG_LCP_DEL
74 #define DEB_LCP_DEL(arglist) do { g_eventLogger->info arglist ; } while (0)
75 #else
76 #define DEB_LCP_DEL(arglist) do { } while (0)
77 #endif
78
79 #ifdef DEBUG_LCP_SKIP
80 #define DEB_LCP_SKIP(arglist) do { g_eventLogger->info arglist ; } while (0)
81 #else
82 #define DEB_LCP_SKIP(arglist) do { } while (0)
83 #endif
84
85 #ifdef DEBUG_LCP_SKIP_DELETE
86 #define DEB_LCP_SKIP_DELETE(arglist) do { g_eventLogger->info arglist ; } while (0)
87 #else
88 #define DEB_LCP_SKIP_DELETE(arglist) do { } while (0)
89 #endif
90
91 #ifdef DEBUG_LCP_SKIP_DELETE2
92 #define DEB_LCP_SKIP_DELETE2(arglist) do { g_eventLogger->info arglist ; } while (0)
93 #else
94 #define DEB_LCP_SKIP_DELETE2(arglist) do { } while (0)
95 #endif
96
97 #ifdef DEBUG_LCP_SCANNED_BIT
98 #define DEB_LCP_SCANNED_BIT(arglist) do { g_eventLogger->info arglist ; } while (0)
99 #else
100 #define DEB_LCP_SCANNED_BIT(arglist) do { } while (0)
101 #endif
102
103 #ifdef DEBUG_PGMAN
104 #define DEB_PGMAN(arglist) do { g_eventLogger->info arglist ; } while (0)
105 #else
106 #define DEB_PGMAN(arglist) do { } while (0)
107 #endif
108
109 #ifdef DEBUG_DELETE
110 #define DEB_DELETE(arglist) do { g_eventLogger->info arglist ; } while (0)
111 #else
112 #define DEB_DELETE(arglist) do { } while (0)
113 #endif
114
execTUP_DEALLOCREQ(Signal * signal)115 void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
116 {
117 TablerecPtr regTabPtr;
118 FragrecordPtr regFragPtr;
119 Uint32 frag_page_id, frag_id;
120
121 jamEntry();
122
123 frag_id= signal->theData[0];
124 regTabPtr.i= signal->theData[1];
125 frag_page_id= signal->theData[2];
126 Uint32 page_index= signal->theData[3];
127
128 ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
129
130 getFragmentrec(regFragPtr, frag_id, regTabPtr.p);
131 ndbassert(regFragPtr.p != NULL);
132
133 if (! Local_key::isInvalid(frag_page_id, page_index))
134 {
135 Local_key tmp;
136 tmp.m_page_no= getRealpid(regFragPtr.p, frag_page_id);
137 tmp.m_page_idx= page_index;
138 PagePtr pagePtr;
139 Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
140
141 DEB_DELETE(("(%u)dealloc tab(%u,%u), row(%u,%u), header: %x",
142 instance(),
143 regTabPtr.i,
144 frag_id,
145 frag_page_id,
146 page_index,
147 ptr->m_header_bits));
148
149 ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
150
151 if (regTabPtr.p->m_attributes[MM].m_no_of_varsize +
152 regTabPtr.p->m_attributes[MM].m_no_of_dynamic)
153 {
154 jam();
155 free_var_rec(regFragPtr.p, regTabPtr.p, &tmp, pagePtr);
156 } else {
157 free_fix_rec(regFragPtr.p, regTabPtr.p, &tmp, (Fix_page*)pagePtr.p);
158 }
159 }
160 else
161 {
162 jam();
163 }
164 }
165
execTUP_WRITELOG_REQ(Signal * signal)166 void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
167 {
168 jamEntry();
169 OperationrecPtr loopOpPtr;
170 loopOpPtr.i= signal->theData[0];
171 Uint32 gci_hi = signal->theData[1];
172 Uint32 gci_lo = signal->theData[2];
173 ndbrequire(c_operation_pool.getValidPtr(loopOpPtr));
174 while (loopOpPtr.p->prevActiveOp != RNIL) {
175 jam();
176 loopOpPtr.i= loopOpPtr.p->prevActiveOp;
177 ndbrequire(c_operation_pool.getValidPtr(loopOpPtr));
178 }
179 do {
180 ndbrequire(get_trans_state(loopOpPtr.p) == TRANS_STARTED);
181 signal->theData[0] = loopOpPtr.p->userpointer;
182 signal->theData[1] = gci_hi;
183 signal->theData[2] = gci_lo;
184 if (loopOpPtr.p->nextActiveOp == RNIL) {
185 jam();
186 EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 3);
187 return;
188 }
189 jam();
190 EXECUTE_DIRECT(DBLQH, GSN_LQH_WRITELOG_REQ, signal, 3);
191 jamEntry();
192 loopOpPtr.i= loopOpPtr.p->nextActiveOp;
193 ndbrequire(c_operation_pool.getValidPtr(loopOpPtr));
194 } while (true);
195 }
196
197 /* ---------------------------------------------------------------- */
198 /* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP. */
199 /* ---------------------------------------------------------------- */
initOpConnection(Operationrec * regOperPtr)200 void Dbtup::initOpConnection(Operationrec* regOperPtr)
201 {
202 set_tuple_state(regOperPtr, TUPLE_ALREADY_ABORTED);
203 set_trans_state(regOperPtr, TRANS_IDLE);
204 regOperPtr->op_type= ZREAD;
205 regOperPtr->op_struct.bit_field.m_disk_preallocated= 0;
206 regOperPtr->op_struct.bit_field.m_load_diskpage_on_commit= 0;
207 regOperPtr->op_struct.bit_field.m_wait_log_buffer= 0;
208 regOperPtr->op_struct.bit_field.in_active_list = false;
209 regOperPtr->m_undo_buffer_space= 0;
210 }
211
212 bool
is_rowid_in_remaining_lcp_set(const Page * page,Fragrecord * regFragPtr,const Local_key & key1,const Dbtup::ScanOp & op,Uint32 check_lcp_scanned_state_reversed)213 Dbtup::is_rowid_in_remaining_lcp_set(const Page* page,
214 Fragrecord* regFragPtr,
215 const Local_key& key1,
216 const Dbtup::ScanOp& op,
217 Uint32 check_lcp_scanned_state_reversed)
218 {
219
220 if (page->is_page_to_skip_lcp() ||
221 (check_lcp_scanned_state_reversed == 0 &&
222 get_lcp_scanned_bit(regFragPtr, key1.m_page_no)))
223 {
224 /**
225 * We have to check whether the page have already been scanned by
226 * the LCP. We have two different flags for this. The first one
227 * is checked by is_page_to_skip_lcp(). This is set when a page
228 * is allocated during an LCP scan and not previously released
229 * in the same LCP scan.
230 *
231 * If a page is released during the LCP scan we set the lcp
232 * scanned bit in the page map. We need to check both those to
233 * see if the page have been LCP scanned.
234 *
235 * When check_lcp_scanned_state_reversed is != 0 we are not interested
236 * in the lcp scanned state and will ignore checking this. We can
237 * call it with check_lcp_scanned_state_reversed set to 0 even if we
238 * know that the lcp scanned bit isn't set. The reason is that the
239 * check_lcp_state_reversed is also used for debug printouts as well.
240 */
241 jam();
242 return false; /* Page already scanned for skipped pages */
243 }
244 bool dummy;
245 int ret_val = c_backup->is_page_lcp_scanned(key1.m_page_no, dummy);
246 if (ret_val == +1)
247 {
248 jam();
249 return false;
250 }
251 else if (ret_val == -1)
252 {
253 jam();
254 if (check_lcp_scanned_state_reversed != 0)
255 {
256 DEB_LCP_SCANNED_BIT(("(%u)Line: %u, page: %u, debug_val: %u",
257 instance(),
258 __LINE__,
259 key1.m_page_no,
260 check_lcp_scanned_state_reversed));
261 }
262 return true;
263 }
264 /* We are scanning the given page */
265 Local_key key2 = op.m_scanPos.m_key;
266 switch (op.m_state) {
267 case Dbtup::ScanOp::First:
268 {
269 jam();
270 ndbrequire(key2.isNull());
271 if (check_lcp_scanned_state_reversed != 0)
272 {
273 DEB_LCP_SCANNED_BIT(("(%u)Line: %u, page: %u, debug_val: %u",
274 instance(),
275 __LINE__,
276 key1.m_page_no,
277 check_lcp_scanned_state_reversed));
278 }
279 return true; /* Already checked page id above, so will scan the page */
280 }
281 case Dbtup::ScanOp::Current:
282 {
283 /* Impossible state for LCP scans */
284 ndbabort();
285 }
286 case Dbtup::ScanOp::Next:
287 {
288 ndbrequire(key1.m_page_no == key2.m_page_no);
289 ndbrequire(!key2.isNull());
290 if (op.m_scanPos.m_get == ScanPos::Get_next_page_mm)
291 {
292 jam();
293 /**
294 * We got a real-time break while switching to a new page.
295 * In this case we can skip the page since it is already
296 * LCP:ed.
297 */
298 return false;
299 }
300 if (key1.m_page_idx < key2.m_page_idx)
301 {
302 jam();
303 /* Ignore rows already LCP:ed */
304 return false;
305 }
306 if (key1.m_page_idx > key2.m_page_idx)
307 {
308 jam();
309 /* Include rows not LCP:ed yet */
310 if (check_lcp_scanned_state_reversed != 0)
311 {
312 DEB_LCP_SCANNED_BIT(("(%u)Line: %u, page: %u, debug_val: %u",
313 instance(),
314 __LINE__,
315 key1.m_page_no,
316 check_lcp_scanned_state_reversed));
317 }
318 return true;
319 }
320 ndbrequire(key1.m_page_idx == key2.m_page_idx);
321 /* keys are equal */
322 jam();
323 /* Ignore current row that already have been LCP:ed. */
324 return false;
325 }
326 case Dbtup::ScanOp::Last:
327 case Dbtup::ScanOp::Aborting:
328 {
329 jam();
330 return false; /* Everything scanned already */
331 }
332 default:
333 break;
334 }
335 /* Will never arrive here */
336 jamLine(Uint16(op.m_state));
337 ndbabort();
338 return true;
339 }
340
341 void
dealloc_tuple(Signal * signal,Uint32 gci_hi,Uint32 gci_lo,Page * page,Tuple_header * ptr,KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Ptr<GlobalPage> pagePtr)342 Dbtup::dealloc_tuple(Signal* signal,
343 Uint32 gci_hi,
344 Uint32 gci_lo,
345 Page* page,
346 Tuple_header* ptr,
347 KeyReqStruct * req_struct,
348 Operationrec* regOperPtr,
349 Fragrecord* regFragPtr,
350 Tablerec* regTabPtr,
351 Ptr<GlobalPage> pagePtr)
352 {
353 Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
354 Uint32 average_row_size = regFragPtr->m_average_row_size;
355
356 Uint32 bits = ptr->m_header_bits;
357 Uint32 extra_bits = Tuple_header::FREE;
358 c_lqh->add_delete_size(average_row_size);
359 if (bits & Tuple_header::DISK_PART)
360 {
361 if (likely(pagePtr.i != RNIL))
362 {
363 jam();
364 ndbrequire(c_lqh->is_restore_phase_done());
365 Local_key disk;
366 memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
367 PagePtr tmpptr;
368 Local_key rowid = regOperPtr->m_tuple_location;
369 rowid.m_page_no = page->frag_page_id;
370 tmpptr.i = pagePtr.i;
371 tmpptr.p = reinterpret_cast<Page*>(pagePtr.p);
372 disk_page_free(signal,
373 regTabPtr,
374 regFragPtr,
375 &disk,
376 tmpptr,
377 gci_hi,
378 &rowid,
379 regOperPtr->m_undo_buffer_space);
380 }
381 else
382 {
383 ndbrequire(!c_lqh->is_restore_phase_done());
384 }
385 }
386
387 if (! (bits & (Tuple_header::LCP_SKIP |
388 Tuple_header::ALLOC |
389 Tuple_header::LCP_DELETE)) &&
390 lcpScan_ptr_i != RNIL)
391 {
392 jam();
393 ScanOpPtr scanOp;
394 scanOp.i = lcpScan_ptr_i;
395 ndbrequire(c_scanOpPool.getValidPtr(scanOp));
396 Local_key rowid = regOperPtr->m_tuple_location;
397 rowid.m_page_no = page->frag_page_id;
398 if (is_rowid_in_remaining_lcp_set(page, regFragPtr, rowid, *scanOp.p, 0))
399 {
400 jam();
401
402 /**
403 * We're committing a delete, on a row that should
404 * be part of LCP. Copy original row into copy-tuple
405 * and add this copy-tuple to lcp-keep-list
406 *
407 * We also need to set the LCP_SKIP bit in the tuple header to avoid
408 * that the LCP scan finds this row and records it as a deleted
409 * rowid before the LCP scan start. This can happen on CHANGED ROW
410 * pages only.
411 *
412 */
413 /* Coverage tested */
414 extra_bits |= Tuple_header::LCP_SKIP;
415 DEB_LCP_SKIP_DELETE(("(%u)tab(%u,%u), row(%u,%u),"
416 " handle_lcp_keep_commit"
417 ", set LCP_SKIP, bits: %x",
418 instance(),
419 regFragPtr->fragTableId,
420 regFragPtr->fragmentId,
421 rowid.m_page_no,
422 rowid.m_page_idx,
423 bits | extra_bits));
424 handle_lcp_keep_commit(&rowid,
425 req_struct,
426 regOperPtr,
427 regFragPtr,
428 regTabPtr);
429 }
430 else
431 {
432 /* Coverage tested */
433 DEB_LCP_SKIP_DELETE2(("(%u)tab(%u,%u), row(%u,%u) DELETE"
434 " already LCP:ed",
435 instance(),
436 regFragPtr->fragTableId,
437 regFragPtr->fragmentId,
438 rowid.m_page_no,
439 rowid.m_page_idx));
440 }
441 }
442 else
443 {
444 #ifdef DEBUG_LCP_SKIP_DELETE_EXTRA
445 Local_key rowid = regOperPtr->m_tuple_location;
446 rowid.m_page_no = page->frag_page_id;
447 g_eventLogger->info("(%u)tab(%u,%u)row(%u,%u),"
448 ", skip LCP, bits: %x"
449 ", lcpScan_ptr: %u",
450 instance(),
451 regFragPtr->fragTableId,
452 regFragPtr->fragmentId,
453 rowid.m_page_no,
454 rowid.m_page_idx,
455 bits,
456 lcpScan_ptr_i);
457 #endif
458 }
459
460
461 #ifdef DEBUG_DELETE_EXTRA
462 if (c_started)
463 {
464 Local_key rowid = regOperPtr->m_tuple_location;
465 rowid.m_page_no = page->frag_page_id;
466 DEB_DELETE_EXTRA(("(%u)tab(%u,%u),DELETE row(%u,%u)",
467 instance(),
468 regFragPtr->fragTableId,
469 regFragPtr->fragmentId,
470 rowid.m_page_no,
471 rowid.m_page_idx));
472 }
473 #endif
474 ptr->m_header_bits = bits | extra_bits;
475
476 if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
477 {
478 jam();
479 update_gci(regFragPtr, regTabPtr, ptr, gci_hi);
480 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowGCIBits)
481 {
482 Uint32 attrId = regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowGCIBits>();
483 store_extra_row_bits(attrId, regTabPtr, ptr, gci_lo, /* truncate */true);
484 }
485 }
486 else
487 {
488 /**
489 * This should be dead code, but we ensure that we don't miss those
490 * updates even for those tables.
491 */
492 jam();
493 regFragPtr->m_lcp_changed_rows++;
494 }
495 Tup_fixsize_page *fix_page = (Tup_fixsize_page*)page;
496 fix_page->set_change_maps(regOperPtr->m_tuple_location.m_page_idx);
497 ndbassert(fix_page->verify_change_maps(jamBuffer()));
498 fix_page->set_max_gci(gci_hi);
499 setInvalidChecksum(ptr, regTabPtr);
500 if (regOperPtr->op_struct.bit_field.m_tuple_existed_at_start)
501 {
502 ndbrequire(regFragPtr->m_row_count > 0);
503 regFragPtr->m_row_count--;
504 #ifdef DEBUG_ROW_COUNT_DEL
505 Local_key rowid = regOperPtr->m_tuple_location;
506 rowid.m_page_no = page->frag_page_id;
507 g_eventLogger->info("(%u) tab(%u,%u) Deleted row(%u,%u)"
508 ", bits: %x, row_count = %llu"
509 ", tuple_header_ptr: %p, gci: %u",
510 instance(),
511 regFragPtr->fragTableId,
512 regFragPtr->fragmentId,
513 rowid.m_page_no,
514 rowid.m_page_idx,
515 ptr->m_header_bits,
516 regFragPtr->m_row_count,
517 ptr,
518 gci_hi);
519 #endif
520 }
521 }
522
523 void
update_gci(Fragrecord * regFragPtr,Tablerec * regTabPtr,Tuple_header * ptr,Uint32 new_gci)524 Dbtup::update_gci(Fragrecord * regFragPtr,
525 Tablerec * regTabPtr,
526 Tuple_header* ptr,
527 Uint32 new_gci)
528 {
529 /**
530 * Update GCI on the row, also update statistics used by LCP.
531 */
532 Uint32 *gci_ptr = ptr->get_mm_gci(regTabPtr);
533 Uint32 old_gci = *gci_ptr;
534 *gci_ptr = new_gci;
535 if (old_gci <= regFragPtr->m_lcp_start_gci)
536 {
537 jam();
538 regFragPtr->m_lcp_changed_rows++;
539 }
540 }
541
542 void
handle_lcp_keep_commit(const Local_key * rowid,KeyReqStruct * req_struct,Operationrec * opPtrP,Fragrecord * regFragPtr,Tablerec * regTabPtr)543 Dbtup::handle_lcp_keep_commit(const Local_key* rowid,
544 KeyReqStruct * req_struct,
545 Operationrec * opPtrP,
546 Fragrecord * regFragPtr,
547 Tablerec * regTabPtr)
548 {
549 bool disk = false;
550 /* Coverage tested */
551 Uint32 sizes[4];
552 Uint32 * copytuple = get_copy_tuple_raw(&opPtrP->m_copy_tuple_location);
553 Tuple_header * dst = get_copy_tuple(copytuple);
554 Tuple_header * org = req_struct->m_tuple_ptr;
555 if (regTabPtr->need_expand(disk))
556 {
557 jam();
558 req_struct->fragPtrP = regFragPtr;
559 req_struct->m_row_id = opPtrP->m_tuple_location;
560 req_struct->operPtrP = opPtrP;
561 setup_fixed_tuple_ref(req_struct, opPtrP, regTabPtr);
562 setup_fixed_part(req_struct, opPtrP, regTabPtr);
563 req_struct->m_tuple_ptr = dst;
564 expand_tuple(req_struct, sizes, org, regTabPtr, disk, true);
565 shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
566 }
567 else
568 {
569 jam();
570 memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
571 }
572 dst->m_header_bits |= Tuple_header::COPY_TUPLE;
573
574 setChecksum(dst, regTabPtr);
575 /**
576 * Link it to list
577 */
578 insert_lcp_keep_list(regFragPtr,
579 opPtrP->m_copy_tuple_location,
580 copytuple,
581 rowid);
582 /**
583 * And finally clear m_copy_tuple_location so that it won't be freed
584 */
585 opPtrP->m_copy_tuple_location.setNull();
586 }
587
588 #if 0
589 static void dump_buf_hex(unsigned char *p, Uint32 bytes)
590 {
591 char buf[3001];
592 char *q= buf;
593 buf[0]= '\0';
594
595 for(Uint32 i=0; i<bytes; i++)
596 {
597 if(i==((sizeof(buf)/3)-1))
598 {
599 sprintf(q, "...");
600 break;
601 }
602 sprintf(q+3*i, " %02X", p[i]);
603 }
604 ndbout_c("%8p: %s", p, buf);
605 }
606 #endif
607
608 /**
609 * Handling COMMIT
610 * ---------------
611 * The most complex part of our operations on a tuple is when we have
612 * multiple row operations on the same tuple within the same operation.
613 * There might even be an insert followed by a delete followed by a
614 * new insert followed by an update! The only operation that isn't
615 * allowed is a DELETE followed by a DELETE and an INSERT followed by
616 * an INSERT and a DELETE followed by an UPDATE.
617 *
618 * Each operation carries with it a copy row. This makes it easy to
619 * commit and abort multi-operations on one tuple within one
620 * transaction.
621 *
622 * At the time of the commit we can have multiple operations in a list
623 * linked from the row. The "surviving" operation is the one which is
624 * last in the list. This is the only operation that will be truly
625 * committed. All other copy rows simply represent intermediate states
626 * in getting to the committed state. The transaction itself can have
627 * seen these uncommitted intermediate states, but no other transaction
628 * have the ability to see those intermediate row states.
629 *
630 * The last operation in the list is the operation linked from the
631 * tuple header. The "last" operation in the list was also the last
632 * operation prepared.
633 *
634 * The last operation in the list will be committed for "real". This means
635 * that the copy row for the last operation will be copied to the rowid of
636 * the row. However the TUX commit triggers are fired on the first operation
637 * in the operation list.
638 *
639 * COMMIT handling of shrinking varpart's
640 * --------------------------------------
641 * The varpart entry header contains the actual length of the varpart
642 * allocated from the page. This size might be equal or bigger than
643 * the size of the varpart to be committed. We will always at COMMIT time
644 * ensure that we shrink it to the minimum size. It migth even be
645 * shrunk to 0 in which case we free the varpart entirely.
646 *
647 * Handling ABORT
648 * --------------
649 * Given that we have a copy tuple for each row it means that it is very
650 * easy to abort operations without aborting the entire transaction. Abort
651 * can happen at any time before the commit has started and abort can
652 * happen either on the entire transaction or on a subset of the transaction.
653 *
654 * One example when we can abort a subset of the transaction is when we get
655 * an LQHKEYREF returned from the backup replica. In this case we did a
656 * successful operation at the primary replica, but at the backup replica
657 * we failed for some reason. There might actually even be multiple operations
658 * outstanding at the same time since we allow for multiple operations within
659 * the same batch to execute in parallel. It is not defined what the end
660 * result will be if such a batch have multiple updates on the same row, but
661 * we still have to ensure that we can handle those cases in a secure manner.
662 *
663 * This also means that the code is prepared to allow for aborting to a
664 * savepoint. However the functionality that handles this will be in DBTC and
665 * is independent of the code here in DBTUP.
666 *
667 * When aborting an operation we simply drop it from the list of operations
668 * on the row and if it is the last then we also restore the header.
669 * This means that an abort operation for a row with multiple changes to it
670 * is really easy, it needs only to drop the operation and drop the copy
671 * row attached to it.
672 *
673 * If we increase the size of the varpart for a row we need to extend the
674 * size. This means that the header of the varpart will contain the new
675 * length. So in order to restore we need to store the original varpart
676 * length somewhere.
677 *
678 * The MM_GROWN bit and its meaning
679 * --------------------------------
680 * During an operation that increases the size of the varpart we might actually
681 * change the location of the committed varpart of the row. To ensure that any
682 * readers of the row that does a COMMITTED READ can still see the original
683 * row size we store this at the last word of the new varpart. We also set the
684 * MM_GROWN bit in the tuple header to indicate this.
685 *
686 * The consequence of this is that an aborted transaction can not have changed
687 * the row content, but it can have changed the place the row is stored. The
688 * actual row content is however only changed when we commit the transaction,
689 * until then the new data is always stored in the copy rows.
690 *
691 * When aborting we need to care about MM_GROWN since then we have to restore
692 * the varpart size by shrinking it. If MM_GROWN is set we might have attempted
693 * to shrink the tuple, but this information is only represented by a smaller
694 * size of the copy row and thus when the copy row is free'd we have done
695 * everything needed to abort this operation.
696 *
697 * Acceptable order of ABORT and COMMIT and WRITE operations
698 * ---------------------------------------------------------
699 * So acceptable order of COMMIT's is that once a COMMIT has arrived on a row
700 * then no ABORT is allowed AND no new WRITE operation on the row in the same
701 * transaction is allowed. When the commit is complete then the row is
702 * unlocked and ready for a new transaction again. COMMIT operations can
703 * arrive in any order.
704 *
705 * Before any operation on the row have received COMMIT we can receive ABORT
706 * operations in any order. TUP have no ability to verify that the upper level
707 * ABORT operations are executed correctly. However since ABORTs can happen in
708 * any order it is only vital that the correct operations are ABORTed, it
709 * doesn't matter in which order they are ABORTed.
710 *
711 * The upper level (mainly TC and LQH) will maintain the correctness when it
712 * comes to transaction concepts.
713 */
714 void
commit_operation(Signal * signal,Uint32 gci_hi,Uint32 gci_lo,Tuple_header * tuple_ptr,PagePtr pagePtr,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Ptr<GlobalPage> globDiskPagePtr)715 Dbtup::commit_operation(Signal* signal,
716 Uint32 gci_hi,
717 Uint32 gci_lo,
718 Tuple_header* tuple_ptr,
719 PagePtr pagePtr,
720 Operationrec* regOperPtr,
721 Fragrecord* regFragPtr,
722 Tablerec* regTabPtr,
723 Ptr<GlobalPage> globDiskPagePtr)
724 {
725 ndbassert(regOperPtr->op_type != ZDELETE);
726
727 Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
728 Uint32 save= tuple_ptr->m_operation_ptr_i;
729 Uint32 bits= tuple_ptr->m_header_bits;
730
731 Tuple_header *disk_ptr= 0;
732 Tuple_header *copy= get_copy_tuple(®OperPtr->m_copy_tuple_location);
733
734 Uint32 copy_bits= copy->m_header_bits;
735
736 Uint32 fixsize= regTabPtr->m_offsets[MM].m_fix_header_size;
737 Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
738 Uint32 mm_dyns= regTabPtr->m_attributes[MM].m_no_of_dynamic;
739 bool update_gci_at_commit = ! regOperPtr->op_struct.bit_field.m_gci_written;
740 if((mm_vars+mm_dyns) == 0)
741 {
742 jam();
743 memcpy(tuple_ptr, copy, 4*fixsize);
744 disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize);
745 }
746 else
747 {
748 jam();
749 /**
750 * Var_part_ref is only stored in *allocated* tuple
751 * so memcpy from copy, will over write it...
752 * hence subtle copyout/assign...
753 */
754 Local_key tmp;
755 Var_part_ref *ref= tuple_ptr->get_var_part_ref_ptr(regTabPtr);
756 ref->copyout(&tmp);
757
758 memcpy(tuple_ptr, copy, 4*fixsize);
759 ref->assign(&tmp);
760
761 PagePtr vpagePtr;
762 if (copy_bits & Tuple_header::VAR_PART)
763 {
764 jam();
765 ndbassert(bits & Tuple_header::VAR_PART);
766 ndbassert(tmp.m_page_no != RNIL);
767 ndbassert(copy_bits & Tuple_header::COPY_TUPLE);
768
769 Uint32 *dst= get_ptr(&vpagePtr, *ref);
770 Var_page* vpagePtrP = (Var_page*)vpagePtr.p;
771 Varpart_copy*vp =(Varpart_copy*)copy->get_end_of_fix_part_ptr(regTabPtr);
772 /* The first word of shrunken tuple holds the length in words. */
773 Uint32 len = vp->m_len;
774 memcpy(dst, vp->m_data, 4*len);
775
776 /**
777 * When we come here we will commit a varpart with length specified in
778 * the copy tuple.
779 *
780 * The length in the page entry specifies the length we have allocated.
781 * This means that the page entry length either specifies the original
782 * length or the length that we allocated when growing the varsize part
783 * of the tuple.
784 *
785 * The following cases exists:
786 * 1) MM_GROWN not set
787 * Since MM_GROWN is never set then we have never extended the length
788 * of the varpart. We might however have executed one operation that
789 * shrunk the varpart size followed by an operation that grew the
790 * varpart again. It can however not have grown to be bigger than the
791 * original size since then MM_GROWN would be set.
792 *
793 * The new varpart length might thus in this case be smaller than the
794 * page entry length.
795 *
796 * 2) MM_GROWN set
797 * In this case we have extended the varpart size in some operation.
798 *
799 * If no more operation was performed after that then the page entry
800 * length and the committed varpart length will be equal. However if
801 * more operations are executed after this operation then they might
802 * decrease the varpart length without updating the page entry length.
803 * So also in this case we might actually have a smaller committed
804 * varpart length compared to the current page entry length.
805 *
806 * So the conclusion is that when we arrive here we can always have a
807 * smaller committed varpart length compared to the page entry length.
808 * So we always need to check whether we should shrink the varpart
809 * entry to the committed length. The new committed length might even
810 * be zero in which case we should release the varpart entirely.
811 *
812 * We need to check this independent of if MM_GROWN is set or not as
813 * there might be multiple row operations both increasing and
814 * shrinking the tuple.
815 */
816 ndbassert(vpagePtrP->get_entry_len(tmp.m_page_idx) >= len);
817 if (vpagePtrP->get_entry_len(tmp.m_page_idx) > len)
818 {
819 /**
820 * Page entry is now bigger than it needs to be, we are committing
821 * and can thus shrink the entry to its correct size now.
822 */
823 jam();
824 if (len)
825 {
826 jam();
827 ndbassert(regFragPtr->m_varWordsFree >= vpagePtrP->free_space);
828 regFragPtr->m_varWordsFree -= vpagePtrP->free_space;
829 vpagePtrP->shrink_entry(tmp.m_page_idx, len);
830 // Adds the new free space value for the page to the fragment total.
831 update_free_page_list(regFragPtr, vpagePtr);
832 }
833 else
834 {
835 jam();
836 /**
837 * We have shrunk the varsize part down to zero, so in this case
838 * we don't shrink it, in this case we simply free it.
839 */
840 free_var_part(regFragPtr, vpagePtr, tmp.m_page_idx);
841 tmp.m_page_no = RNIL;
842 ref->assign(&tmp);
843 copy_bits &= ~(Uint32)Tuple_header::VAR_PART;
844 }
845 }
846 /**
847 * Find disk part after
848 * header + fixed MM part + length word + varsize part.
849 */
850 disk_ptr = (Tuple_header*)(vp->m_data + len);
851 }
852 else
853 {
854 jam();
855 ndbassert(tmp.m_page_no == RNIL);
856 disk_ptr = (Tuple_header*)copy->get_end_of_fix_part_ptr(regTabPtr);
857 }
858 }
859
860 if (regTabPtr->m_no_of_disk_attributes &&
861 (copy_bits & Tuple_header::DISK_INLINE))
862 {
863 jam();
864 Local_key key;
865 memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
866 Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
867
868 PagePtr diskPagePtr((Tup_page*)globDiskPagePtr.p, globDiskPagePtr.i);
869 ndbassert(diskPagePtr.p->m_page_no == key.m_page_no);
870 ndbassert(diskPagePtr.p->m_file_no == key.m_file_no);
871 Uint32 sz, *dst;
872 if(copy_bits & Tuple_header::DISK_ALLOC)
873 {
874 jam();
875 Local_key rowid = regOperPtr->m_tuple_location;
876 rowid.m_page_no = pagePtr.p->frag_page_id;
877 disk_page_alloc(signal,
878 regTabPtr,
879 regFragPtr,
880 &key,
881 diskPagePtr,
882 gci_hi,
883 &rowid,
884 regOperPtr->m_undo_buffer_space);
885 }
886
887 if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
888 {
889 jam();
890 sz= regTabPtr->m_offsets[DD].m_fix_header_size;
891 dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
892 }
893 else
894 {
895 jam();
896 dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
897 sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
898 }
899
900 if(! (copy_bits & Tuple_header::DISK_ALLOC))
901 {
902 jam();
903 #ifdef DEBUG_PGMAN
904 Uint64 lsn =
905 #endif
906 disk_page_undo_update(signal,
907 diskPagePtr.p,
908 &key,
909 dst,
910 sz,
911 gci_hi,
912 logfile_group_id,
913 regOperPtr->m_undo_buffer_space);
914 DEB_PGMAN(("disk_page_undo_update: page(%u,%u,%u).%u, LSN(%u,%u), gci: %u",
915 instance(),
916 key.m_file_no,
917 key.m_page_no,
918 key.m_page_idx,
919 Uint32(Uint64(lsn >> 32)),
920 Uint32(Uint64(lsn & 0xFFFFFFFF)),
921 gci_hi));
922 }
923
924 memcpy(dst, disk_ptr, 4*sz);
925 memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr), &key, sizeof(Local_key));
926
927 ndbassert(! (disk_ptr->m_header_bits & Tuple_header::FREE));
928 copy_bits |= Tuple_header::DISK_PART;
929 }
930
931 #ifdef DEBUG_INSERT_EXTRA
932 if (c_started)
933 {
934 Local_key rowid = regOperPtr->m_tuple_location;
935 rowid.m_page_no = pagePtr.p->frag_page_id;
936 g_eventLogger->info("(%u)tab(%u,%u) commit row(%u,%u)",
937 instance(),
938 regFragPtr->fragTableId,
939 regFragPtr->fragmentId,
940 rowid.m_page_no,
941 rowid.m_page_idx);
942 }
943 #endif
944 Uint32 lcp_bits = 0;
945 if (lcpScan_ptr_i != RNIL &&
946 (bits & Tuple_header::ALLOC) &&
947 !(bits & (Tuple_header::LCP_SKIP | Tuple_header::LCP_DELETE)))
948 {
949 jam();
950 ScanOpPtr scanOp;
951 scanOp.i = lcpScan_ptr_i;
952 ndbrequire(c_scanOpPool.getValidPtr(scanOp));
953 Local_key rowid = regOperPtr->m_tuple_location;
954 rowid.m_page_no = pagePtr.p->frag_page_id;
955 if (is_rowid_in_remaining_lcp_set(pagePtr.p,
956 regFragPtr,
957 rowid,
958 *scanOp.p,
959 0))
960 {
961 bool all_part;
962 ndbrequire(c_backup->is_page_lcp_scanned(rowid.m_page_no,
963 all_part) != +1);
964 if (all_part)
965 {
966 /**
967 * Rows that are inserted during LCPs are never required to be
968 * recorded as part of the LCP, this can be avoided in multiple ways,
969 * in this case we avoid it by setting bit on Tuple header.
970 */
971 jam();
972 /* Coverage tested */
973 lcp_bits |= Tuple_header::LCP_SKIP;
974 DEB_LCP_SKIP(("(%u)Set LCP_SKIP on tab(%u,%u), row(%u,%u)",
975 instance(),
976 regFragPtr->fragTableId,
977 regFragPtr->fragmentId,
978 rowid.m_page_no,
979 rowid.m_page_idx));
980 }
981 else
982 {
983 jam();
984 /**
985 * The row state at start of LCP was deleted, so we need to record
986 * this to ensure that it doesn't disappear with a later insert
987 * operation.
988 */
989 /* Coverage tested */
990 DEB_LCP_DEL(("(%u)Set LCP_DELETE on tab(%u,%u), row(%u,%u)",
991 instance(),
992 regFragPtr->fragTableId,
993 regFragPtr->fragmentId,
994 rowid.m_page_no,
995 rowid.m_page_idx));
996 ndbrequire(c_backup->is_partial_lcp_enabled());
997 lcp_bits |= Tuple_header::LCP_DELETE;
998 }
999 }
1000 }
1001
1002 /**
1003 * Here we are copying header bits from the copy row to the main row.
1004 * We need to ensure that a few bits are retained from the main row
1005 * that are not necessarily set in the copy row.
1006 *
1007 * For example a row could have its LCP_SKIP set when it is updated
1008 * or deleted before the LCP reaches it. After deleting it is important
1009 * not to clear these when starting a new insert on the same row id.
1010 * This is handled in DbtupExecQuery.cpp. Here we can be committing the
1011 * same insert, so again it is important to not lose the LCP bits
1012 * on the main row. The LCP bits are never needed on the copy row since
1013 * the LCP only cares about the main rows. The LCP can even change
1014 * the LCP bits between prepare and commit of a row change. Thus it is
1015 * important to not lose the LCP_SKIP bit here.
1016 *
1017 * Similarly for LCP_DELETE we might lose the state after coming here
1018 * again before the LCP have had time to come and reset the bits.
1019 *
1020 * Similarly it is very important to not transport those bits from the
1021 * copy row back to the main row. These bits should only be used in the
1022 * main row and we should never take those bits from the copy row back
1023 * to the main row.
1024 */
1025
1026 Uint32 clear=
1027 Tuple_header::ALLOC | Tuple_header::FREE | Tuple_header::COPY_TUPLE |
1028 Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE |
1029 Tuple_header::MM_GROWN | Tuple_header::LCP_SKIP |
1030 Tuple_header::LCP_DELETE;
1031 copy_bits &= ~(Uint32)clear;
1032 lcp_bits |= (bits & (Tuple_header::LCP_SKIP | Tuple_header::LCP_DELETE));
1033
1034 tuple_ptr->m_header_bits= copy_bits | lcp_bits;
1035 tuple_ptr->m_operation_ptr_i= save;
1036
1037 Tup_fixsize_page *fix_page = (Tup_fixsize_page*)pagePtr.p;
1038 fix_page->set_change_maps(regOperPtr->m_tuple_location.m_page_idx);
1039 fix_page->set_max_gci(gci_hi);
1040 ndbassert(fix_page->verify_change_maps(jamBuffer()));
1041
1042 if (regTabPtr->m_bits & Tablerec::TR_RowGCI &&
1043 update_gci_at_commit)
1044 {
1045 jam();
1046 update_gci(regFragPtr, regTabPtr, tuple_ptr, gci_hi);
1047 if (regTabPtr->m_bits & Tablerec::TR_ExtraRowGCIBits)
1048 {
1049 jam();
1050 Uint32 attrId = regTabPtr->getExtraAttrId<Tablerec::TR_ExtraRowGCIBits>();
1051 store_extra_row_bits(attrId, regTabPtr, tuple_ptr, gci_lo,
1052 /* truncate */true);
1053 }
1054 }
1055 else
1056 {
1057 /**
1058 * This should be dead code, but we ensure that we don't miss those
1059 * updates even for those tables.
1060 *
1061 * In case of an explicit GCI update we always increment number of changed rows
1062 * to ensure we don't miss any updates.
1063 */
1064 jam();
1065 regFragPtr->m_lcp_changed_rows++;
1066 }
1067 setChecksum(tuple_ptr, regTabPtr);
1068 Uint32 average_row_size = regFragPtr->m_average_row_size;
1069 if (!regOperPtr->op_struct.bit_field.m_tuple_existed_at_start)
1070 {
1071 regFragPtr->m_row_count++;
1072 c_lqh->add_insert_size(average_row_size);
1073 #ifdef DEBUG_ROW_COUNT_INS
1074 Local_key rowid = regOperPtr->m_tuple_location;
1075 rowid.m_page_no = pagePtr.p->frag_page_id;
1076 g_eventLogger->info("(%u) tab(%u,%u) Inserted row(%u,%u)"
1077 ", bits: %x, row_count = %llu, tuple_ptr: %p, gci: %u",
1078 instance(),
1079 regFragPtr->fragTableId,
1080 regFragPtr->fragmentId,
1081 rowid.m_page_no,
1082 rowid.m_page_idx,
1083 tuple_ptr->m_header_bits,
1084 regFragPtr->m_row_count,
1085 tuple_ptr,
1086 gci_hi);
1087 #endif
1088 }
1089 else
1090 {
1091 c_lqh->add_update_size(average_row_size);
1092 }
1093 }
1094
1095 void
disk_page_commit_callback(Signal * signal,Uint32 opPtrI,Uint32 page_id)1096 Dbtup::disk_page_commit_callback(Signal* signal,
1097 Uint32 opPtrI, Uint32 page_id)
1098 {
1099 Uint32 hash_value;
1100 Uint32 gci_hi, gci_lo;
1101 Uint32 transId1, transId2;
1102 OperationrecPtr regOperPtr;
1103 Ptr<GlobalPage> diskPagePtr;
1104
1105 jamEntry();
1106
1107 regOperPtr.i = opPtrI;
1108 ndbrequire(c_operation_pool.getValidPtr(regOperPtr));
1109 c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo,
1110 &transId1, &transId2);
1111
1112 TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
1113
1114 tupCommitReq->opPtr= opPtrI;
1115 tupCommitReq->hashValue= hash_value;
1116 tupCommitReq->gci_hi= gci_hi;
1117 tupCommitReq->gci_lo= gci_lo;
1118 tupCommitReq->diskpage = page_id;
1119 tupCommitReq->transId1 = transId1;
1120 tupCommitReq->transId2 = transId2;
1121
1122 regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit= 0;
1123 regOperPtr.p->m_commit_disk_callback_page= page_id;
1124 m_global_page_pool.getPtr(diskPagePtr, page_id);
1125
1126 {
1127 PagePtr tmp;
1128 tmp.i = diskPagePtr.i;
1129 tmp.p = reinterpret_cast<Page*>(diskPagePtr.p);
1130 disk_page_set_dirty(tmp);
1131 }
1132
1133 execTUP_COMMITREQ(signal);
1134 if(signal->theData[0] == 0)
1135 {
1136 jam();
1137 c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
1138 }
1139 }
1140
1141 void
disk_page_log_buffer_callback(Signal * signal,Uint32 opPtrI,Uint32 unused)1142 Dbtup::disk_page_log_buffer_callback(Signal* signal,
1143 Uint32 opPtrI,
1144 Uint32 unused)
1145 {
1146 Uint32 hash_value;
1147 Uint32 gci_hi, gci_lo;
1148 Uint32 transId1, transId2;
1149 OperationrecPtr regOperPtr;
1150
1151 jamEntry();
1152
1153 regOperPtr.i = opPtrI;
1154 ndbrequire(c_operation_pool.getValidPtr(regOperPtr));
1155 c_lqh->get_op_info(regOperPtr.p->userpointer, &hash_value, &gci_hi, &gci_lo,
1156 &transId1, &transId2);
1157 Uint32 page= regOperPtr.p->m_commit_disk_callback_page;
1158
1159 TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
1160
1161 tupCommitReq->opPtr= opPtrI;
1162 tupCommitReq->hashValue= hash_value;
1163 tupCommitReq->gci_hi= gci_hi;
1164 tupCommitReq->gci_lo= gci_lo;
1165 tupCommitReq->diskpage = page;
1166 tupCommitReq->transId1 = transId1;
1167 tupCommitReq->transId2 = transId2;
1168
1169 ndbassert(regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit == 0);
1170 regOperPtr.p->op_struct.bit_field.m_wait_log_buffer= 0;
1171
1172 execTUP_COMMITREQ(signal);
1173 ndbassert(signal->theData[0] == 0);
1174
1175 c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
1176 }
1177
retrieve_data_page(Signal * signal,Page_cache_client::Request req,OperationrecPtr regOperPtr,Ptr<GlobalPage> & diskPagePtr,Fragrecord * fragPtrP)1178 int Dbtup::retrieve_data_page(Signal *signal,
1179 Page_cache_client::Request req,
1180 OperationrecPtr regOperPtr,
1181 Ptr<GlobalPage> &diskPagePtr,
1182 Fragrecord *fragPtrP)
1183 {
1184 req.m_callback.m_callbackData= regOperPtr.i;
1185 req.m_table_id = fragPtrP->fragTableId;
1186 req.m_fragment_id = fragPtrP->fragmentId;
1187 req.m_callback.m_callbackFunction =
1188 safe_cast(&Dbtup::disk_page_commit_callback);
1189
1190 /*
1191 * Consider commit to be correlated. Otherwise pk op + commit makes
1192 * the page hot. XXX move to TUP which knows better.
1193 */
1194 int flags= regOperPtr.p->op_type |
1195 Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
1196 Page_cache_client pgman(this, c_pgman);
1197 int res= pgman.get_page(signal, req, flags);
1198 diskPagePtr = pgman.m_ptr;
1199
1200 switch(res){
1201 case 0:
1202 /**
1203 * Timeslice
1204 */
1205 jam();
1206 signal->theData[0] = 1;
1207 return res;
1208 case -1:
1209 ndbrequire("NOT YET IMPLEMENTED" == 0);
1210 break;
1211 default:
1212 ndbrequire(res > 0);
1213 jam();
1214 }
1215 {
1216 PagePtr tmpptr;
1217 tmpptr.i = diskPagePtr.i;
1218 tmpptr.p = reinterpret_cast<Page*>(diskPagePtr.p);
1219
1220 disk_page_set_dirty(tmpptr);
1221 }
1222 regOperPtr.p->m_commit_disk_callback_page= res;
1223 regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit= 0;
1224
1225 return res;
1226 }
1227
retrieve_log_page(Signal * signal,FragrecordPtr regFragPtr,OperationrecPtr regOperPtr)1228 int Dbtup::retrieve_log_page(Signal *signal,
1229 FragrecordPtr regFragPtr,
1230 OperationrecPtr regOperPtr)
1231 {
1232 jam();
1233 /**
1234 * Only last op on tuple needs "real" commit,
1235 * hence only this one should have m_wait_log_buffer
1236 */
1237
1238 CallbackPtr cb;
1239 cb.m_callbackData= regOperPtr.i;
1240 cb.m_callbackIndex = DISK_PAGE_LOG_BUFFER_CALLBACK;
1241 Uint32 sz= regOperPtr.p->m_undo_buffer_space;
1242
1243 int res;
1244 {
1245 D("Logfile_client - execTUP_COMMITREQ");
1246 Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
1247 res= lgman.get_log_buffer(signal, sz, &cb);
1248 }
1249 jamEntry();
1250 switch(res){
1251 case 0:
1252 jam();
1253 signal->theData[0] = 1;
1254 return res;
1255 case -1:
1256 g_eventLogger->warning("Out of space in RG_TRANSACTION_MEMORY resource,"
1257 " increase config parameter GlobalSharedMemory");
1258 ndbrequire("NOT YET IMPLEMENTED" == 0);
1259 break;
1260 default:
1261 jam();
1262 }
1263 regOperPtr.p->op_struct.bit_field.m_wait_log_buffer= 0;
1264
1265 return res;
1266 }
1267
1268 /**
1269 * Move to the first operation performed on this tuple
1270 */
1271 void
findFirstOp(OperationrecPtr & firstPtr)1272 Dbtup::findFirstOp(OperationrecPtr & firstPtr)
1273 {
1274 jam();
1275 printf("Detect out-of-order commit(%u) -> ", firstPtr.i);
1276 ndbassert(!firstPtr.p->is_first_operation());
1277 while(firstPtr.p->prevActiveOp != RNIL)
1278 {
1279 firstPtr.i = firstPtr.p->prevActiveOp;
1280 ndbrequire(c_operation_pool.getValidPtr(firstPtr));
1281 }
1282 ndbout_c("%u", firstPtr.i);
1283 }
1284
1285 /* ----------------------------------------------------------------- */
1286 /* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
1287 /* ----------------------------------------------------------------- */
execTUP_COMMITREQ(Signal * signal)1288 void Dbtup::execTUP_COMMITREQ(Signal* signal)
1289 {
1290 FragrecordPtr regFragPtr;
1291 OperationrecPtr regOperPtr;
1292 TablerecPtr regTabPtr;
1293 KeyReqStruct req_struct(this, KRS_COMMIT);
1294 TransState trans_state;
1295 Ptr<GlobalPage> diskPagePtr;
1296 Uint32 no_of_fragrec, no_of_tablerec;
1297
1298 TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
1299
1300 regOperPtr.i= tupCommitReq->opPtr;
1301 Uint32 hash_value= tupCommitReq->hashValue;
1302 Uint32 gci_hi = tupCommitReq->gci_hi;
1303 Uint32 gci_lo = tupCommitReq->gci_lo;
1304 Uint32 transId1 = tupCommitReq->transId1;
1305 Uint32 transId2 = tupCommitReq->transId2;
1306
1307 jamEntry();
1308
1309 ndbrequire(c_operation_pool.getUncheckedPtrRW(regOperPtr));
1310
1311 diskPagePtr.i = tupCommitReq->diskpage;
1312 regFragPtr.i= regOperPtr.p->fragmentPtr;
1313 no_of_fragrec= cnoOfFragrec;
1314 no_of_tablerec= cnoOfTablerec;
1315
1316 req_struct.signal= signal;
1317 req_struct.hash_value= hash_value;
1318 req_struct.gci_hi = gci_hi;
1319 req_struct.gci_lo = gci_lo;
1320
1321 ndbrequire(Magic::check_ptr(regOperPtr.p));
1322 trans_state= get_trans_state(regOperPtr.p);
1323
1324
1325 ndbrequire(trans_state == TRANS_STARTED);
1326 ptrCheckGuard(regFragPtr, no_of_fragrec, fragrecord);
1327
1328 regTabPtr.i= regFragPtr.p->fragTableId;
1329
1330 /* Put transid in req_struct, so detached triggers can access it */
1331 req_struct.trans_id1 = transId1;
1332 req_struct.trans_id2 = transId2;
1333 req_struct.m_reorg = regOperPtr.p->op_struct.bit_field.m_reorg;
1334 regOperPtr.p->m_commit_disk_callback_page = tupCommitReq->diskpage;
1335
1336 ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
1337 PagePtr page;
1338 Tuple_header* tuple_ptr= (Tuple_header*)
1339 get_ptr(&page, ®OperPtr.p->m_tuple_location, regTabPtr.p);
1340
1341 Tup_fixsize_page *fix_page = (Tup_fixsize_page*)page.p;
1342 fix_page->prefetch_change_map();
1343 NDB_PREFETCH_WRITE(tuple_ptr);
1344
1345 if (diskPagePtr.i == RNIL)
1346 {
1347 jam();
1348 diskPagePtr.p = 0;
1349 req_struct.m_disk_page_ptr.i = RNIL;
1350 req_struct.m_disk_page_ptr.p = 0;
1351 }
1352 else
1353 {
1354 m_global_page_pool.getPtr(diskPagePtr, diskPagePtr.i);
1355 }
1356
1357 ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
1358
1359 prepare_fragptr = regFragPtr;
1360 prepare_tabptr = regTabPtr;
1361
1362 /**
1363 * NOTE: This has to be run before potential time-slice when
1364 * waiting for disk, as otherwise the "other-ops" in a multi-op
1365 * commit might run while we're waiting for disk
1366 *
1367 */
1368 if (!regTabPtr.p->tuxCustomTriggers.isEmpty())
1369 {
1370 if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
1371 {
1372 jam();
1373
1374 OperationrecPtr loopPtr = regOperPtr;
1375 if (unlikely(!regOperPtr.p->is_first_operation()))
1376 {
1377 findFirstOp(loopPtr);
1378 }
1379
1380 /**
1381 * Execute all tux triggers at first commit
1382 * since previous tuple is otherwise removed...
1383 */
1384 jam();
1385 goto first;
1386 while(loopPtr.i != RNIL)
1387 {
1388 ndbrequire(c_operation_pool.getValidPtr(loopPtr));
1389 first:
1390 executeTuxCommitTriggers(signal,
1391 loopPtr.p,
1392 regFragPtr.p,
1393 regTabPtr.p);
1394 set_tuple_state(loopPtr.p, TUPLE_TO_BE_COMMITTED);
1395 loopPtr.i = loopPtr.p->nextActiveOp;
1396 }
1397 }
1398 }
1399
1400 bool get_page = false;
1401 bool initial_delete = false;
1402 if(regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit)
1403 {
1404 jam();
1405 Page_cache_client::Request req;
1406
1407 /**
1408 * Only last op on tuple needs "real" commit,
1409 * hence only this one should have m_load_diskpage_on_commit
1410 */
1411 ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
1412
1413 /**
1414 * Check for page
1415 */
1416 if(!regOperPtr.p->m_copy_tuple_location.isNull())
1417 {
1418 jam();
1419 Tuple_header* tmp= get_copy_tuple(®OperPtr.p->m_copy_tuple_location);
1420
1421 memcpy(&req.m_page,
1422 tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
1423
1424 if (unlikely(regOperPtr.p->op_type == ZDELETE &&
1425 tmp->m_header_bits & Tuple_header::DISK_ALLOC))
1426 {
1427 jam();
1428 /**
1429 * Insert+Delete
1430 * In this case we want to release the Copy page tuple that was
1431 * allocated for the insert operation since the commit of the
1432 * delete operation here makes it unnecessary to save the
1433 * new record.
1434 */
1435 regOperPtr.p->op_struct.bit_field.m_load_diskpage_on_commit = 0;
1436 regOperPtr.p->op_struct.bit_field.m_wait_log_buffer = 0;
1437 disk_page_abort_prealloc(signal, regFragPtr.p,
1438 &req.m_page, req.m_page.m_page_idx);
1439
1440 {
1441 D("Logfile_client - execTUP_COMMITREQ");
1442 Logfile_client lgman(this,
1443 c_lgman,
1444 regFragPtr.p->m_logfile_group_id);
1445 lgman.free_log_space(regOperPtr.p->m_undo_buffer_space,
1446 jamBuffer());
1447 }
1448 goto skip_disk;
1449 }
1450 }
1451 else
1452 {
1453 jam();
1454 // initial delete
1455 initial_delete = true;
1456 ndbassert(regOperPtr.p->op_type == ZDELETE);
1457 memcpy(&req.m_page,
1458 tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
1459
1460 ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
1461 }
1462
1463 if (retrieve_data_page(signal,
1464 req,
1465 regOperPtr,
1466 diskPagePtr,
1467 regFragPtr.p) == 0)
1468 {
1469 if (!initial_delete)
1470 {
1471 jam();
1472 }
1473 else
1474 {
1475 jam();
1476 /* Set bit to indicate the tuple is already deleted */
1477 Uint32 old_header = tuple_ptr->m_header_bits;
1478 Uint32 new_header = tuple_ptr->m_header_bits =
1479 old_header | Tuple_header::DELETE_WAIT;
1480 updateChecksum(tuple_ptr, regTabPtr.p, old_header, new_header);
1481 }
1482 signal->theData[0] = 1; //Ensure we report real-time break
1483 return; // Data page has not been retrieved yet.
1484 }
1485 get_page = true;
1486 }
1487
1488 if(regOperPtr.p->op_struct.bit_field.m_wait_log_buffer)
1489 {
1490 jam();
1491 /**
1492 * Only last op on tuple needs "real" commit,
1493 * hence only this one should have m_wait_log_buffer
1494 */
1495 ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
1496
1497 if (retrieve_log_page(signal, regFragPtr, regOperPtr) == 0)
1498 {
1499 if (!initial_delete)
1500 {
1501 jam();
1502 }
1503 else
1504 {
1505 jam();
1506 /* Set bit to indicate the tuple is already deleted */
1507 Uint32 old_header = tuple_ptr->m_header_bits;
1508 Uint32 new_header = tuple_ptr->m_header_bits =
1509 old_header | Tuple_header::DELETE_WAIT;
1510 updateChecksum(tuple_ptr, regTabPtr.p, old_header, new_header);
1511 }
1512 signal->theData[0] = 1; //Ensure we report real-time break
1513 return; // Log page has not been retrieved yet.
1514 }
1515 }
1516
1517 assert(tuple_ptr);
1518 skip_disk:
1519 req_struct.m_tuple_ptr = tuple_ptr;
1520
1521 Uint32 nextOp = regOperPtr.p->nextActiveOp;
1522 Uint32 prevOp = regOperPtr.p->prevActiveOp;
1523 /**
1524 * The trigger code (which is shared between detached/imediate)
1525 * check op-list to check were to read before values from
1526 * detached triggers should always read from original tuple value
1527 * from before transaction start, not from any intermediate update
1528 *
1529 * Setting the op-list has this effect
1530 */
1531 regOperPtr.p->nextActiveOp = RNIL;
1532 regOperPtr.p->prevActiveOp = RNIL;
1533 if(tuple_ptr->m_operation_ptr_i == regOperPtr.i)
1534 {
1535 jam();
1536 /**
1537 * Perform "real" commit
1538 */
1539 Uint32 disk = regOperPtr.p->m_commit_disk_callback_page;
1540 set_commit_change_mask_info(regTabPtr.p, &req_struct, regOperPtr.p);
1541 checkDetachedTriggers(&req_struct,
1542 regOperPtr.p,
1543 regTabPtr.p,
1544 disk != RNIL,
1545 diskPagePtr.i);
1546
1547 tuple_ptr->m_operation_ptr_i = RNIL;
1548
1549 if (regOperPtr.p->op_type == ZDELETE)
1550 {
1551 jam();
1552 if (get_page)
1553 {
1554 ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
1555 }
1556 dealloc_tuple(signal,
1557 gci_hi,
1558 gci_lo,
1559 page.p,
1560 tuple_ptr,
1561 &req_struct,
1562 regOperPtr.p,
1563 regFragPtr.p,
1564 regTabPtr.p,
1565 diskPagePtr);
1566 }
1567 else if(regOperPtr.p->op_type != ZREFRESH)
1568 {
1569 jam();
1570 commit_operation(signal,
1571 gci_hi,
1572 gci_lo,
1573 tuple_ptr,
1574 page,
1575 regOperPtr.p,
1576 regFragPtr.p,
1577 regTabPtr.p,
1578 diskPagePtr);
1579 }
1580 else
1581 {
1582 jam();
1583 commit_refresh(signal,
1584 gci_hi,
1585 gci_lo,
1586 tuple_ptr,
1587 page,
1588 &req_struct,
1589 regOperPtr.p,
1590 regFragPtr.p,
1591 regTabPtr.p,
1592 diskPagePtr);
1593 }
1594 }
1595
1596 if (nextOp != RNIL)
1597 {
1598 OperationrecPtr opPtr;
1599 opPtr.i = nextOp;
1600 ndbrequire(c_operation_pool.getValidPtr(opPtr));
1601 opPtr.p->prevActiveOp = prevOp;
1602 }
1603
1604 if (prevOp != RNIL)
1605 {
1606 OperationrecPtr opPtr;
1607 opPtr.i = prevOp;
1608 ndbrequire(c_operation_pool.getValidPtr(opPtr));
1609 opPtr.p->nextActiveOp = nextOp;
1610 }
1611
1612 if(!regOperPtr.p->m_copy_tuple_location.isNull())
1613 {
1614 jam();
1615 c_undo_buffer.free_copy_tuple(®OperPtr.p->m_copy_tuple_location);
1616 }
1617
1618 regFragPtr.p->m_committed_changes++;
1619
1620 initOpConnection(regOperPtr.p);
1621 signal->theData[0] = 0;
1622 }
1623
1624 void
set_commit_change_mask_info(const Tablerec * regTabPtr,KeyReqStruct * req_struct,const Operationrec * regOperPtr)1625 Dbtup::set_commit_change_mask_info(const Tablerec* regTabPtr,
1626 KeyReqStruct * req_struct,
1627 const Operationrec * regOperPtr)
1628 {
1629 Uint32 masklen = (regTabPtr->m_no_of_attributes + 31) >> 5;
1630 if (regOperPtr->m_copy_tuple_location.isNull())
1631 {
1632 ndbassert(regOperPtr->op_type == ZDELETE);
1633 req_struct->changeMask.set();
1634 }
1635 else
1636 {
1637 Uint32 * dst = req_struct->changeMask.rep.data;
1638 Uint32 * rawptr = get_copy_tuple_raw(®OperPtr->m_copy_tuple_location);
1639 ChangeMask * maskptr = get_change_mask_ptr(rawptr);
1640 Uint32 cols = maskptr->m_cols;
1641 if (cols == regTabPtr->m_no_of_attributes)
1642 {
1643 memcpy(dst, maskptr->m_mask, 4*masklen);
1644 }
1645 else
1646 {
1647 ndbassert(regTabPtr->m_no_of_attributes > cols); // no drop column
1648 memcpy(dst, maskptr->m_mask, 4*((cols + 31) >> 5));
1649 req_struct->changeMask.setRange(cols,
1650 regTabPtr->m_no_of_attributes - cols);
1651 }
1652 }
1653 }
1654
1655 void
commit_refresh(Signal * signal,Uint32 gci_hi,Uint32 gci_lo,Tuple_header * tuple_ptr,PagePtr pagePtr,KeyReqStruct * req_struct,Operationrec * regOperPtr,Fragrecord * regFragPtr,Tablerec * regTabPtr,Ptr<GlobalPage> diskPagePtr)1656 Dbtup::commit_refresh(Signal* signal,
1657 Uint32 gci_hi,
1658 Uint32 gci_lo,
1659 Tuple_header* tuple_ptr,
1660 PagePtr pagePtr,
1661 KeyReqStruct * req_struct,
1662 Operationrec* regOperPtr,
1663 Fragrecord* regFragPtr,
1664 Tablerec* regTabPtr,
1665 Ptr<GlobalPage> diskPagePtr)
1666 {
1667 /* Committing a refresh operation.
1668 * Refresh of an existing row looks like an update
1669 * and can commit normally.
1670 * Refresh of a non-existing row looks like an Insert which
1671 * is 'undone' at commit time.
1672 * This is achieved by making special calls to ACC to get
1673 * it to forget, before deallocating the tuple locally.
1674 */
1675 switch(regOperPtr->m_copy_tuple_location.m_file_no){
1676 case Operationrec::RF_SINGLE_NOT_EXIST:
1677 case Operationrec::RF_MULTI_NOT_EXIST:
1678 break;
1679 case Operationrec::RF_SINGLE_EXIST:
1680 case Operationrec::RF_MULTI_EXIST:
1681 // "Normal" update
1682 commit_operation(signal,
1683 gci_hi,
1684 gci_lo,
1685 tuple_ptr,
1686 pagePtr,
1687 regOperPtr,
1688 regFragPtr,
1689 regTabPtr,
1690 diskPagePtr);
1691 return;
1692
1693 default:
1694 ndbabort();
1695 }
1696
1697 Local_key key = regOperPtr->m_tuple_location;
1698 key.m_page_no = pagePtr.p->frag_page_id;
1699
1700 /**
1701 * Tell ACC to delete
1702 */
1703 c_lqh->accremoverow(signal, regOperPtr->userpointer, &key);
1704 dealloc_tuple(signal,
1705 gci_hi,
1706 gci_lo,
1707 pagePtr.p,
1708 tuple_ptr,
1709 req_struct,
1710 regOperPtr,
1711 regFragPtr,
1712 regTabPtr,
1713 diskPagePtr);
1714 }
1715