1 /*
2 Copyright (c) 2000, 2016, Oracle and/or its affiliates.
3 Copyright (c) 2009, 2020, MariaDB Corporation.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
17 */
18
19 #include "mariadb.h"
20 #include "sql_class.h"
21 #include "transaction.h"
22 #include "my_cpu.h"
23 #include <pfs_transaction_provider.h>
24 #include <mysql/psi/mysql_transaction.h>
25
26 static bool slave_applier_reset_xa_trans(THD *thd);
27
28 /***************************************************************************
29 Handling of XA id caching
30 ***************************************************************************/
31 struct XID_cache_insert_element
32 {
33 enum xa_states xa_state;
34 XID *xid;
35 XID_cache_element *xid_cache_element;
36
XID_cache_insert_elementXID_cache_insert_element37 XID_cache_insert_element(enum xa_states xa_state_arg, XID *xid_arg):
38 xa_state(xa_state_arg), xid(xid_arg) {}
39 };
40
41
42 class XID_cache_element
43 {
44 /*
45 m_state is used to prevent elements from being deleted while XA RECOVER
46 iterates xid cache and to prevent recovered elments from being acquired by
47 multiple threads.
48
49 bits 1..29 are reference counter
50 bit 30 is RECOVERED flag
51 bit 31 is ACQUIRED flag (thread owns this xid)
52 bit 32 is unused
53
54 Newly allocated and deleted elements have m_state set to 0.
55
56 On lock() m_state is atomically incremented. It also creates load-ACQUIRE
57 memory barrier to make sure m_state is actually updated before furhter
58 memory accesses. Attempting to lock an element that has neither ACQUIRED
59 nor RECOVERED flag set returns failure and further accesses to element
60 memory are forbidden.
61
62 On unlock() m_state is decremented. It also creates store-RELEASE memory
63 barrier to make sure m_state is actually updated after preceding memory
64 accesses.
65
66 ACQUIRED flag is set when thread registers it's xid or when thread acquires
67 recovered xid.
68
69 RECOVERED flag is set for elements found during crash recovery.
70
71 ACQUIRED and RECOVERED flags are cleared before element is deleted from
72 hash in a spin loop, after last reference is released.
73 */
74 std::atomic<int32_t> m_state;
75 public:
76 static const int32 ACQUIRED= 1 << 30;
77 static const int32 RECOVERED= 1 << 29;
78 /* Error reported by the Resource Manager (RM) to the Transaction Manager. */
79 uint rm_error;
80 enum xa_states xa_state;
81 XID xid;
is_set(int32_t flag)82 bool is_set(int32_t flag)
83 { return m_state.load(std::memory_order_relaxed) & flag; }
set(int32_t flag)84 void set(int32_t flag)
85 {
86 DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
87 m_state.fetch_add(flag, std::memory_order_relaxed);
88 }
lock()89 bool lock()
90 {
91 int32_t old= m_state.fetch_add(1, std::memory_order_acquire);
92 if (old & (ACQUIRED | RECOVERED))
93 return true;
94 unlock();
95 return false;
96 }
unlock()97 void unlock()
98 { m_state.fetch_sub(1, std::memory_order_release); }
mark_uninitialized()99 void mark_uninitialized()
100 {
101 int32_t old= ACQUIRED;
102 while (!m_state.compare_exchange_weak(old, 0,
103 std::memory_order_relaxed,
104 std::memory_order_relaxed))
105 {
106 old&= ACQUIRED | RECOVERED;
107 (void) LF_BACKOFF();
108 }
109 }
acquired_to_recovered()110 void acquired_to_recovered()
111 {
112 m_state.fetch_or(RECOVERED, std::memory_order_relaxed);
113 m_state.fetch_and(~ACQUIRED, std::memory_order_release);
114 }
acquire_recovered()115 bool acquire_recovered()
116 {
117 int32_t old= RECOVERED;
118 while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED,
119 std::memory_order_acquire,
120 std::memory_order_relaxed))
121 {
122 if (!(old & RECOVERED) || (old & ACQUIRED))
123 return false;
124 old= RECOVERED;
125 (void) LF_BACKOFF();
126 }
127 return true;
128 }
lf_hash_initializer(LF_HASH * hash,XID_cache_element * element,XID_cache_insert_element * new_element)129 static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
130 XID_cache_element *element,
131 XID_cache_insert_element *new_element)
132 {
133 DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
134 element->rm_error= 0;
135 element->xa_state= new_element->xa_state;
136 element->xid.set(new_element->xid);
137 new_element->xid_cache_element= element;
138 }
lf_alloc_constructor(uchar * ptr)139 static void lf_alloc_constructor(uchar *ptr)
140 {
141 XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
142 element->m_state= 0;
143 }
lf_alloc_destructor(uchar * ptr)144 static void lf_alloc_destructor(uchar *ptr)
145 {
146 DBUG_ASSERT(!reinterpret_cast<XID_cache_element*>(ptr + LF_HASH_OVERHEAD)
147 ->is_set(ACQUIRED));
148 }
key(const XID_cache_element * element,size_t * length,my_bool not_used)149 static uchar *key(const XID_cache_element *element, size_t *length,
150 my_bool not_used __attribute__((unused)))
151 {
152 *length= element->xid.key_length();
153 return element->xid.key();
154 }
155 };
156
157
158 static LF_HASH xid_cache;
159 static bool xid_cache_inited;
160
161
get_state_code() const162 enum xa_states XID_STATE::get_state_code() const
163 {
164 return xid_cache_element ? xid_cache_element->xa_state : XA_NO_STATE;
165 }
166
167
fix_xid_hash_pins()168 bool THD::fix_xid_hash_pins()
169 {
170 if (!xid_hash_pins)
171 xid_hash_pins= lf_hash_get_pins(&xid_cache);
172 return !xid_hash_pins;
173 }
174
175
set_error(uint error)176 void XID_STATE::set_error(uint error)
177 {
178 if (is_explicit_XA())
179 xid_cache_element->rm_error= error;
180 }
181
182
er_xaer_rmfail() const183 void XID_STATE::er_xaer_rmfail() const
184 {
185 static const char *xa_state_names[]=
186 { "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY", "NON-EXISTING"};
187 my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[get_state_code()]);
188 }
189
190
191 /**
192 Check that XA transaction has an uncommitted work. Report an error
193 to the user in case when there is an uncommitted work for XA transaction.
194
195 @return result of check
196 @retval false XA transaction is NOT in state IDLE, PREPARED
197 or ROLLBACK_ONLY.
198 @retval true XA transaction is in state IDLE or PREPARED
199 or ROLLBACK_ONLY.
200 */
201
check_has_uncommitted_xa() const202 bool XID_STATE::check_has_uncommitted_xa() const
203 {
204 if (is_explicit_XA() && xid_cache_element->xa_state != XA_ACTIVE)
205 {
206 er_xaer_rmfail();
207 return true;
208 }
209 return false;
210 }
211
212
get_xid() const213 XID *XID_STATE::get_xid() const
214 {
215 DBUG_ASSERT(is_explicit_XA());
216 return &xid_cache_element->xid;
217 }
218
219
xid_cache_init()220 void xid_cache_init()
221 {
222 xid_cache_inited= true;
223 lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
224 (my_hash_get_key) XID_cache_element::key, &my_charset_bin);
225 xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
226 xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
227 xid_cache.initializer=
228 (lf_hash_initializer) XID_cache_element::lf_hash_initializer;
229 }
230
231
xid_cache_free()232 void xid_cache_free()
233 {
234 if (xid_cache_inited)
235 {
236 lf_hash_destroy(&xid_cache);
237 xid_cache_inited= false;
238 }
239 }
240
241
242 /**
243 Find recovered XA transaction by XID.
244 */
245
xid_cache_search(THD * thd,XID * xid)246 static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
247 {
248 DBUG_ASSERT(thd->xid_hash_pins);
249 XID_cache_element *element=
250 (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
251 xid->key(), xid->key_length());
252 if (element)
253 {
254 if (!element->acquire_recovered())
255 element= 0;
256 lf_hash_search_unpin(thd->xid_hash_pins);
257 DEBUG_SYNC(thd, "xa_after_search");
258 }
259 return element;
260 }
261
262
xid_cache_insert(XID * xid)263 bool xid_cache_insert(XID *xid)
264 {
265 XID_cache_insert_element new_element(XA_PREPARED, xid);
266 LF_PINS *pins;
267
268 if (!(pins= lf_hash_get_pins(&xid_cache)))
269 return true;
270
271 int res= lf_hash_insert(&xid_cache, pins, &new_element);
272 switch (res)
273 {
274 case 0:
275 new_element.xid_cache_element->set(XID_cache_element::RECOVERED);
276 break;
277 case 1:
278 res= 0;
279 }
280 lf_hash_put_pins(pins);
281 return res;
282 }
283
284
xid_cache_insert(THD * thd,XID_STATE * xid_state,XID * xid)285 bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid)
286 {
287 XID_cache_insert_element new_element(XA_ACTIVE, xid);
288
289 if (thd->fix_xid_hash_pins())
290 return true;
291
292 int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, &new_element);
293 switch (res)
294 {
295 case 0:
296 xid_state->xid_cache_element= new_element.xid_cache_element;
297 xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
298 break;
299 case 1:
300 my_error(ER_XAER_DUPID, MYF(0));
301 }
302 return res;
303 }
304
305
xid_cache_delete(THD * thd,XID_cache_element * & element)306 static void xid_cache_delete(THD *thd, XID_cache_element *&element)
307 {
308 DBUG_ASSERT(thd->xid_hash_pins);
309 element->mark_uninitialized();
310 lf_hash_delete(&xid_cache, thd->xid_hash_pins,
311 element->xid.key(), element->xid.key_length());
312 }
313
314
xid_cache_delete(THD * thd,XID_STATE * xid_state)315 void xid_cache_delete(THD *thd, XID_STATE *xid_state)
316 {
317 DBUG_ASSERT(xid_state->is_explicit_XA());
318 xid_cache_delete(thd, xid_state->xid_cache_element);
319 xid_state->xid_cache_element= 0;
320 }
321
322
323 struct xid_cache_iterate_arg
324 {
325 my_hash_walk_action action;
326 void *argument;
327 };
328
xid_cache_iterate_callback(XID_cache_element * element,xid_cache_iterate_arg * arg)329 static my_bool xid_cache_iterate_callback(XID_cache_element *element,
330 xid_cache_iterate_arg *arg)
331 {
332 my_bool res= FALSE;
333 if (element->lock())
334 {
335 res= arg->action(element, arg->argument);
336 element->unlock();
337 }
338 return res;
339 }
340
xid_cache_iterate(THD * thd,my_hash_walk_action action,void * arg)341 static int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
342 {
343 xid_cache_iterate_arg argument= { action, arg };
344 return thd->fix_xid_hash_pins() ? -1 :
345 lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
346 (my_hash_walk_action) xid_cache_iterate_callback,
347 &argument);
348 }
349
350
351 /**
352 Mark a XA transaction as rollback-only if the RM unilaterally
353 rolled back the transaction branch.
354
355 @note If a rollback was requested by the RM, this function sets
356 the appropriate rollback error code and transits the state
357 to XA_ROLLBACK_ONLY.
358
359 @return TRUE if transaction was rolled back or if the transaction
360 state is XA_ROLLBACK_ONLY. FALSE otherwise.
361 */
xa_trans_rolled_back(XID_cache_element * element)362 static bool xa_trans_rolled_back(XID_cache_element *element)
363 {
364 if (element->rm_error)
365 {
366 switch (element->rm_error) {
367 case ER_LOCK_WAIT_TIMEOUT:
368 my_error(ER_XA_RBTIMEOUT, MYF(0));
369 break;
370 case ER_LOCK_DEADLOCK:
371 my_error(ER_XA_RBDEADLOCK, MYF(0));
372 break;
373 default:
374 my_error(ER_XA_RBROLLBACK, MYF(0));
375 }
376 element->xa_state= XA_ROLLBACK_ONLY;
377 }
378
379 return element->xa_state == XA_ROLLBACK_ONLY;
380 }
381
382
383 /**
384 Rollback the active XA transaction.
385
386 @return TRUE if the rollback failed, FALSE otherwise.
387 */
388
xa_trans_force_rollback(THD * thd)389 bool xa_trans_force_rollback(THD *thd)
390 {
391 bool rc= false;
392
393 if (ha_rollback_trans(thd, true))
394 {
395 my_error(ER_XAER_RMERR, MYF(0));
396 rc= true;
397 }
398 thd->variables.option_bits&=
399 ~(OPTION_BEGIN | OPTION_KEEP_LOG | OPTION_GTID_BEGIN);
400 thd->transaction->all.reset();
401 thd->server_status&=
402 ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
403 DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
404 xid_cache_delete(thd, &thd->transaction->xid_state);
405
406 trans_track_end_trx(thd);
407 thd->mdl_context.release_transactional_locks(thd);
408
409 return rc;
410 }
411
412
413 /**
414 Starts an XA transaction with the given xid value.
415
416 @param thd Current thread
417
418 @retval FALSE Success
419 @retval TRUE Failure
420 */
421
trans_xa_start(THD * thd)422 bool trans_xa_start(THD *thd)
423 {
424 DBUG_ENTER("trans_xa_start");
425
426 if (thd->transaction->xid_state.is_explicit_XA() &&
427 thd->transaction->xid_state.xid_cache_element->xa_state == XA_IDLE &&
428 thd->lex->xa_opt == XA_RESUME)
429 {
430 bool not_equal=
431 !thd->transaction->xid_state.xid_cache_element->xid.eq(thd->lex->xid);
432 if (not_equal)
433 my_error(ER_XAER_NOTA, MYF(0));
434 else
435 {
436 thd->transaction->xid_state.xid_cache_element->xa_state= XA_ACTIVE;
437 MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_ACTIVE);
438 }
439 DBUG_RETURN(not_equal);
440 }
441
442 /* TODO: JOIN is not supported yet. */
443 if (thd->lex->xa_opt != XA_NONE)
444 my_error(ER_XAER_INVAL, MYF(0));
445 else if (!thd->lex->xid->gtrid_length)
446 my_error(ER_XAER_INVAL, MYF(0));
447 else if (thd->transaction->xid_state.is_explicit_XA())
448 thd->transaction->xid_state.er_xaer_rmfail();
449 else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction())
450 my_error(ER_XAER_OUTSIDE, MYF(0));
451 else if (!trans_begin(thd))
452 {
453 MYSQL_SET_TRANSACTION_XID(thd->m_transaction_psi, thd->lex->xid, XA_ACTIVE);
454 if (xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid))
455 {
456 trans_rollback(thd);
457 DBUG_RETURN(true);
458 }
459 DBUG_RETURN(FALSE);
460 }
461
462 DBUG_RETURN(TRUE);
463 }
464
465
466 /**
467 Put a XA transaction in the IDLE state.
468
469 @param thd Current thread
470
471 @retval FALSE Success
472 @retval TRUE Failure
473 */
474
trans_xa_end(THD * thd)475 bool trans_xa_end(THD *thd)
476 {
477 DBUG_ENTER("trans_xa_end");
478
479 /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */
480 if (thd->lex->xa_opt != XA_NONE)
481 my_error(ER_XAER_INVAL, MYF(0));
482 else if (!thd->transaction->xid_state.is_explicit_XA() ||
483 thd->transaction->xid_state.xid_cache_element->xa_state != XA_ACTIVE)
484 thd->transaction->xid_state.er_xaer_rmfail();
485 else if (!thd->transaction->xid_state.xid_cache_element->xid.eq(thd->lex->xid))
486 my_error(ER_XAER_NOTA, MYF(0));
487 else if (!xa_trans_rolled_back(thd->transaction->xid_state.xid_cache_element))
488 {
489 thd->transaction->xid_state.xid_cache_element->xa_state= XA_IDLE;
490 MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_IDLE);
491 }
492
493 DBUG_RETURN(thd->is_error() ||
494 thd->transaction->xid_state.xid_cache_element->xa_state != XA_IDLE);
495 }
496
497
498 /**
499 Put a XA transaction in the PREPARED state.
500
501 @param thd Current thread
502
503 @retval FALSE Success
504 @retval TRUE Failure
505 */
506
trans_xa_prepare(THD * thd)507 bool trans_xa_prepare(THD *thd)
508 {
509 int res= 1;
510
511 DBUG_ENTER("trans_xa_prepare");
512
513 if (!thd->transaction->xid_state.is_explicit_XA() ||
514 thd->transaction->xid_state.xid_cache_element->xa_state != XA_IDLE)
515 thd->transaction->xid_state.er_xaer_rmfail();
516 else if (!thd->transaction->xid_state.xid_cache_element->xid.eq(thd->lex->xid))
517 my_error(ER_XAER_NOTA, MYF(0));
518 else
519 {
520 /*
521 Acquire metadata lock which will ensure that COMMIT is blocked
522 by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
523 progress blocks FTWRL).
524
525 We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
526 */
527 MDL_request mdl_request;
528 MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
529 MDL_STATEMENT);
530 if (thd->mdl_context.acquire_lock(&mdl_request,
531 thd->variables.lock_wait_timeout) ||
532 ha_prepare(thd))
533 {
534 if (!mdl_request.ticket)
535 ha_rollback_trans(thd, TRUE);
536 thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
537 thd->transaction->all.reset();
538 thd->server_status&=
539 ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
540 xid_cache_delete(thd, &thd->transaction->xid_state);
541 my_error(ER_XA_RBROLLBACK, MYF(0));
542 }
543 else
544 {
545 thd->transaction->xid_state.xid_cache_element->xa_state= XA_PREPARED;
546 MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_PREPARED);
547 res= thd->variables.pseudo_slave_mode || thd->slave_thread ?
548 slave_applier_reset_xa_trans(thd) : 0;
549 }
550 }
551
552 DBUG_RETURN(res);
553 }
554
555
556 /**
557 Commit and terminate the a XA transaction.
558 Transactional locks are released if transaction ended
559
560 @param thd Current thread
561
562 @retval FALSE Success
563 @retval TRUE Failure
564
565 */
566
trans_xa_commit(THD * thd)567 bool trans_xa_commit(THD *thd)
568 {
569 bool res= true;
570 XID_STATE &xid_state= thd->transaction->xid_state;
571
572 DBUG_ENTER("trans_xa_commit");
573
574 if (!xid_state.is_explicit_XA() ||
575 !xid_state.xid_cache_element->xid.eq(thd->lex->xid))
576 {
577 if (thd->in_multi_stmt_transaction_mode())
578 {
579 /*
580 Not allow to commit from inside an not-"native" to xid
581 ongoing transaction: the commit effect can't be reversed.
582 */
583 my_error(ER_XAER_OUTSIDE, MYF(0));
584 DBUG_RETURN(TRUE);
585 }
586 if (thd->lex->xa_opt != XA_NONE)
587 {
588 /*
589 Not allow to commit with one phase a prepared xa out of compatibility
590 with the native commit branch's error out.
591 */
592 my_error(ER_XAER_INVAL, MYF(0));
593 DBUG_RETURN(TRUE);
594 }
595 if (thd->fix_xid_hash_pins())
596 {
597 my_error(ER_OUT_OF_RESOURCES, MYF(0));
598 DBUG_RETURN(TRUE);
599 }
600
601 if (auto xs= xid_cache_search(thd, thd->lex->xid))
602 {
603 res= xa_trans_rolled_back(xs);
604 /*
605 Acquire metadata lock which will ensure that COMMIT is blocked
606 by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
607 progress blocks FTWRL).
608
609 We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
610 */
611 MDL_request mdl_request;
612 MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
613 MDL_STATEMENT);
614 if (thd->mdl_context.acquire_lock(&mdl_request,
615 thd->variables.lock_wait_timeout))
616 {
617 /*
618 We can't rollback an XA transaction on lock failure due to
619 Innodb redo log and bin log update is involved in rollback.
620 Return error to user for a retry.
621 */
622 DBUG_ASSERT(thd->is_error());
623
624 xs->acquired_to_recovered();
625 DBUG_RETURN(true);
626 }
627 DBUG_ASSERT(!xid_state.xid_cache_element);
628
629 if (thd->wait_for_prior_commit())
630 {
631 DBUG_ASSERT(thd->is_error());
632
633 xs->acquired_to_recovered();
634 DBUG_RETURN(true);
635 }
636
637 xid_state.xid_cache_element= xs;
638 ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
639 xid_state.xid_cache_element= 0;
640
641 res= res || thd->is_error();
642 xid_cache_delete(thd, xs);
643 }
644 else
645 my_error(ER_XAER_NOTA, MYF(0));
646 DBUG_RETURN(res);
647 }
648
649 if (xa_trans_rolled_back(xid_state.xid_cache_element))
650 {
651 xa_trans_force_rollback(thd);
652 DBUG_RETURN(thd->is_error());
653 }
654 else if (xid_state.xid_cache_element->xa_state == XA_IDLE &&
655 thd->lex->xa_opt == XA_ONE_PHASE)
656 {
657 int r= ha_commit_trans(thd, TRUE);
658 if ((res= MY_TEST(r)))
659 my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
660 }
661 else if (thd->transaction->xid_state.xid_cache_element->xa_state == XA_PREPARED)
662 {
663 MDL_request mdl_request;
664 if (thd->lex->xa_opt != XA_NONE)
665 {
666 my_error(ER_XAER_INVAL, MYF(0));
667 DBUG_RETURN(TRUE);
668 }
669
670 /*
671 Acquire metadata lock which will ensure that COMMIT is blocked
672 by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
673 progress blocks FTWRL).
674
675 We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
676 */
677 MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
678 MDL_TRANSACTION);
679
680 if (thd->mdl_context.acquire_lock(&mdl_request,
681 thd->variables.lock_wait_timeout))
682 {
683 /*
684 We can't rollback an XA transaction on lock failure due to
685 Innodb redo log and bin log update is involved in rollback.
686 Return error to user for a retry.
687 */
688 my_error(ER_XAER_RMERR, MYF(0));
689 DBUG_RETURN(true);
690 }
691 else
692 {
693 DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
694
695 res= MY_TEST(ha_commit_one_phase(thd, 1));
696 if (res)
697 my_error(ER_XAER_RMERR, MYF(0));
698 else
699 {
700 /*
701 Since we don't call ha_commit_trans() for prepared transactions,
702 we need to explicitly mark the transaction as committed.
703 */
704 MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi);
705 }
706
707 thd->m_transaction_psi= NULL;
708 }
709 }
710 else
711 {
712 xid_state.er_xaer_rmfail();
713 DBUG_RETURN(TRUE);
714 }
715
716 thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
717 thd->transaction->all.reset();
718 thd->server_status&=
719 ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
720 DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
721 xid_cache_delete(thd, &xid_state);
722
723 trans_track_end_trx(thd);
724 thd->mdl_context.release_transactional_locks(thd);
725
726 /* The transaction should be marked as complete in P_S. */
727 DBUG_ASSERT(thd->m_transaction_psi == NULL || res);
728 DBUG_RETURN(res);
729 }
730
731
732 /**
733 Roll back and terminate a XA transaction.
734 Transactional locks are released if transaction ended
735
736 @param thd Current thread
737
738 @retval FALSE Success
739 @retval TRUE Failure
740 */
741
trans_xa_rollback(THD * thd)742 bool trans_xa_rollback(THD *thd)
743 {
744 XID_STATE &xid_state= thd->transaction->xid_state;
745
746 DBUG_ENTER("trans_xa_rollback");
747
748 if (!xid_state.is_explicit_XA() ||
749 !xid_state.xid_cache_element->xid.eq(thd->lex->xid))
750 {
751 if (thd->in_multi_stmt_transaction_mode())
752 {
753 my_error(ER_XAER_OUTSIDE, MYF(0));
754 DBUG_RETURN(TRUE);
755 }
756 if (thd->fix_xid_hash_pins())
757 {
758 my_error(ER_OUT_OF_RESOURCES, MYF(0));
759 DBUG_RETURN(TRUE);
760 }
761
762 if (auto xs= xid_cache_search(thd, thd->lex->xid))
763 {
764 MDL_request mdl_request;
765 MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
766 MDL_STATEMENT);
767 if (thd->mdl_context.acquire_lock(&mdl_request,
768 thd->variables.lock_wait_timeout))
769 {
770 /*
771 We can't rollback an XA transaction on lock failure due to
772 Innodb redo log and bin log update is involved in rollback.
773 Return error to user for a retry.
774 */
775 DBUG_ASSERT(thd->is_error());
776
777 xs->acquired_to_recovered();
778 DBUG_RETURN(true);
779 }
780 xa_trans_rolled_back(xs);
781 DBUG_ASSERT(!xid_state.xid_cache_element);
782
783 if (thd->wait_for_prior_commit())
784 {
785 DBUG_ASSERT(thd->is_error());
786 xs->acquired_to_recovered();
787 DBUG_RETURN(true);
788 }
789
790 xid_state.xid_cache_element= xs;
791 ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
792 xid_state.xid_cache_element= 0;
793 xid_cache_delete(thd, xs);
794 }
795 else
796 my_error(ER_XAER_NOTA, MYF(0));
797 DBUG_RETURN(thd->get_stmt_da()->is_error());
798 }
799
800 if (xid_state.xid_cache_element->xa_state == XA_ACTIVE)
801 {
802 xid_state.er_xaer_rmfail();
803 DBUG_RETURN(TRUE);
804 }
805
806 MDL_request mdl_request;
807 MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
808 MDL_STATEMENT);
809 if (thd->mdl_context.acquire_lock(&mdl_request,
810 thd->variables.lock_wait_timeout))
811 {
812 /*
813 We can't rollback an XA transaction on lock failure due to
814 Innodb redo log and bin log update is involved in rollback.
815 Return error to user for a retry.
816 */
817 my_error(ER_XAER_RMERR, MYF(0));
818 DBUG_RETURN(true);
819 }
820
821 DBUG_RETURN(xa_trans_force_rollback(thd));
822 }
823
824
trans_xa_detach(THD * thd)825 bool trans_xa_detach(THD *thd)
826 {
827 DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA());
828
829 if (thd->transaction->xid_state.xid_cache_element->xa_state != XA_PREPARED)
830 return xa_trans_force_rollback(thd);
831 else if (!thd->transaction->all.is_trx_read_write())
832 {
833 thd->transaction->xid_state.set_error(ER_XA_RBROLLBACK);
834 ha_rollback_trans(thd, true);
835 }
836
837 thd->transaction->xid_state.xid_cache_element->acquired_to_recovered();
838 thd->transaction->xid_state.xid_cache_element= 0;
839 thd->transaction->cleanup();
840
841 Ha_trx_info *ha_info, *ha_info_next;
842 for (ha_info= thd->transaction->all.ha_list;
843 ha_info;
844 ha_info= ha_info_next)
845 {
846 ha_info_next= ha_info->next();
847 ha_info->reset(); /* keep it conveniently zero-filled */
848 }
849
850 thd->transaction->all.ha_list= 0;
851 thd->transaction->all.no_2pc= 0;
852 thd->m_transaction_psi= 0;
853 thd->server_status&= ~(SERVER_STATUS_IN_TRANS |
854 SERVER_STATUS_IN_TRANS_READONLY);
855 thd->mdl_context.release_transactional_locks(thd);
856
857 return false;
858 }
859
860
861 /**
862 return the XID as it appears in the SQL function's arguments.
863 So this string can be passed to XA START, XA PREPARE etc...
864
865 @note
866 the 'buf' has to have space for at least SQL_XIDSIZE bytes.
867 */
868
869
870 /*
871 'a'..'z' 'A'..'Z', '0'..'9'
872 and '-' '_' ' ' symbols don't have to be
873 converted.
874 */
875
876 static const char xid_needs_conv[128]=
877 {
878 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
879 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
880 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
881 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
882 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
883 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
884 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
885 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
886 };
887
888 /*
889 The size of XID string representation in the form
890 'gtrid', 'bqual', formatID
891 see xid_t::get_sql_string() for details.
892 */
893 #define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS)
894
895 /* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */
get_sql_xid(XID * xid,char * buf)896 static uint get_sql_xid(XID *xid, char *buf)
897 {
898 int tot_len= xid->gtrid_length + xid->bqual_length;
899 int i;
900 const char *orig_buf= buf;
901
902 for (i=0; i<tot_len; i++)
903 {
904 uchar c= ((uchar *) xid->data)[i];
905 if (c >= 128 || xid_needs_conv[c])
906 break;
907 }
908
909 if (i >= tot_len)
910 {
911 /* No need to convert characters to hexadecimals. */
912 *buf++= '\'';
913 memcpy(buf, xid->data, xid->gtrid_length);
914 buf+= xid->gtrid_length;
915 *buf++= '\'';
916 if (xid->bqual_length > 0 || xid->formatID != 1)
917 {
918 *buf++= ',';
919 *buf++= '\'';
920 memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
921 buf+= xid->bqual_length;
922 *buf++= '\'';
923 }
924 }
925 else
926 {
927 *buf++= 'X';
928 *buf++= '\'';
929 for (i= 0; i < xid->gtrid_length; i++)
930 {
931 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
932 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
933 }
934 *buf++= '\'';
935 if (xid->bqual_length > 0 || xid->formatID != 1)
936 {
937 *buf++= ',';
938 *buf++= 'X';
939 *buf++= '\'';
940 for (; i < tot_len; i++)
941 {
942 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
943 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
944 }
945 *buf++= '\'';
946 }
947 }
948
949 if (xid->formatID != 1)
950 {
951 *buf++= ',';
952 buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
953 MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
954 }
955
956 return (uint)(buf - orig_buf);
957 }
958
959
960 /**
961 return the list of XID's to a client, the same way SHOW commands do.
962
963 @note
964 I didn't find in XA specs that an RM cannot return the same XID twice,
965 so mysql_xa_recover does not filter XID's to ensure uniqueness.
966 It can be easily fixed later, if necessary.
967 */
968
xa_recover_callback(XID_cache_element * xs,Protocol * protocol,char * data,uint data_len,CHARSET_INFO * data_cs)969 static my_bool xa_recover_callback(XID_cache_element *xs, Protocol *protocol,
970 char *data, uint data_len, CHARSET_INFO *data_cs)
971 {
972 if (xs->xa_state == XA_PREPARED)
973 {
974 protocol->prepare_for_resend();
975 protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
976 protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
977 protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
978 protocol->store(data, data_len, data_cs);
979 if (protocol->write())
980 return TRUE;
981 }
982 return FALSE;
983 }
984
985
xa_recover_callback_short(XID_cache_element * xs,Protocol * protocol)986 static my_bool xa_recover_callback_short(XID_cache_element *xs,
987 Protocol *protocol)
988 {
989 return xa_recover_callback(xs, protocol, xs->xid.data,
990 xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
991 }
992
993
xa_recover_callback_verbose(XID_cache_element * xs,Protocol * protocol)994 static my_bool xa_recover_callback_verbose(XID_cache_element *xs,
995 Protocol *protocol)
996 {
997 char buf[SQL_XIDSIZE];
998 uint len= get_sql_xid(&xs->xid, buf);
999 return xa_recover_callback(xs, protocol, buf, len,
1000 &my_charset_utf8mb3_general_ci);
1001 }
1002
1003
mysql_xa_recover(THD * thd)1004 bool mysql_xa_recover(THD *thd)
1005 {
1006 List<Item> field_list;
1007 Protocol *protocol= thd->protocol;
1008 MEM_ROOT *mem_root= thd->mem_root;
1009 my_hash_walk_action action;
1010 DBUG_ENTER("mysql_xa_recover");
1011
1012 field_list.push_back(new (mem_root)
1013 Item_int(thd, "formatID", 0,
1014 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
1015 field_list.push_back(new (mem_root)
1016 Item_int(thd, "gtrid_length", 0,
1017 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
1018 field_list.push_back(new (mem_root)
1019 Item_int(thd, "bqual_length", 0,
1020 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
1021 {
1022 uint len;
1023 CHARSET_INFO *cs;
1024
1025 if (thd->lex->verbose)
1026 {
1027 len= SQL_XIDSIZE;
1028 cs= &my_charset_utf8mb3_general_ci;
1029 action= (my_hash_walk_action) xa_recover_callback_verbose;
1030 }
1031 else
1032 {
1033 len= XIDDATASIZE;
1034 cs= &my_charset_bin;
1035 action= (my_hash_walk_action) xa_recover_callback_short;
1036 }
1037
1038 field_list.push_back(new (mem_root)
1039 Item_empty_string(thd, "data", len, cs), mem_root);
1040 }
1041
1042 if (protocol->send_result_set_metadata(&field_list,
1043 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1044 DBUG_RETURN(1);
1045
1046 if (xid_cache_iterate(thd, action, protocol))
1047 DBUG_RETURN(1);
1048 my_eof(thd);
1049 DBUG_RETURN(0);
1050 }
1051
1052
1053 /**
1054 This is a specific to (pseudo-) slave applier collection of standard cleanup
1055 actions to reset XA transaction state sim to @c ha_commit_one_phase.
1056 THD of the slave applier is dissociated from a transaction object in engine
1057 that continues to exist there.
1058
1059 @param THD current thread
1060 @return the value of is_error()
1061 */
1062
slave_applier_reset_xa_trans(THD * thd)1063 static bool slave_applier_reset_xa_trans(THD *thd)
1064 {
1065 thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
1066 thd->server_status&=
1067 ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
1068 DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
1069
1070 thd->transaction->xid_state.xid_cache_element->acquired_to_recovered();
1071 thd->transaction->xid_state.xid_cache_element= 0;
1072
1073 for (Ha_trx_info *ha_info= thd->transaction->all.ha_list, *ha_info_next;
1074 ha_info; ha_info= ha_info_next)
1075 {
1076 ha_info_next= ha_info->next();
1077 ha_info->reset();
1078 }
1079 thd->transaction->all.ha_list= 0;
1080
1081 ha_close_connection(thd);
1082 thd->transaction->cleanup();
1083 thd->transaction->all.reset();
1084
1085 DBUG_ASSERT(!thd->transaction->all.ha_list);
1086 DBUG_ASSERT(!thd->transaction->all.no_2pc);
1087
1088 thd->has_waiter= false;
1089 MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi); // TODO/Fixme: commit?
1090 thd->m_transaction_psi= NULL;
1091 return thd->is_error();
1092 }
1093