1 /*
2    Copyright (c) 2000, 2016, Oracle and/or its affiliates.
3    Copyright (c) 2009, 2020, MariaDB Corporation.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA
17 */
18 
19 #include "mariadb.h"
20 #include "sql_class.h"
21 #include "transaction.h"
22 #include "my_cpu.h"
23 #include <pfs_transaction_provider.h>
24 #include <mysql/psi/mysql_transaction.h>
25 
26 static bool slave_applier_reset_xa_trans(THD *thd);
27 
28 /***************************************************************************
29   Handling of XA id caching
30 ***************************************************************************/
31 struct XID_cache_insert_element
32 {
33   enum xa_states xa_state;
34   XID *xid;
35   XID_cache_element *xid_cache_element;
36 
XID_cache_insert_elementXID_cache_insert_element37   XID_cache_insert_element(enum xa_states xa_state_arg, XID *xid_arg):
38     xa_state(xa_state_arg), xid(xid_arg) {}
39 };
40 
41 
42 class XID_cache_element
43 {
44   /*
45     m_state is used to prevent elements from being deleted while XA RECOVER
46     iterates xid cache and to prevent recovered elments from being acquired by
47     multiple threads.
48 
49     bits 1..29 are reference counter
50     bit 30 is RECOVERED flag
51     bit 31 is ACQUIRED flag (thread owns this xid)
52     bit 32 is unused
53 
54     Newly allocated and deleted elements have m_state set to 0.
55 
56     On lock() m_state is atomically incremented. It also creates load-ACQUIRE
57     memory barrier to make sure m_state is actually updated before furhter
58     memory accesses. Attempting to lock an element that has neither ACQUIRED
59     nor RECOVERED flag set returns failure and further accesses to element
60     memory are forbidden.
61 
62     On unlock() m_state is decremented. It also creates store-RELEASE memory
63     barrier to make sure m_state is actually updated after preceding memory
64     accesses.
65 
66     ACQUIRED flag is set when thread registers it's xid or when thread acquires
67     recovered xid.
68 
69     RECOVERED flag is set for elements found during crash recovery.
70 
71     ACQUIRED and RECOVERED flags are cleared before element is deleted from
72     hash in a spin loop, after last reference is released.
73   */
74   std::atomic<int32_t> m_state;
75 public:
76   static const int32 ACQUIRED= 1 << 30;
77   static const int32 RECOVERED= 1 << 29;
78   /* Error reported by the Resource Manager (RM) to the Transaction Manager. */
79   uint rm_error;
80   enum xa_states xa_state;
81   XID xid;
is_set(int32_t flag)82   bool is_set(int32_t flag)
83   { return m_state.load(std::memory_order_relaxed) & flag; }
set(int32_t flag)84   void set(int32_t flag)
85   {
86     DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
87     m_state.fetch_add(flag, std::memory_order_relaxed);
88   }
lock()89   bool lock()
90   {
91     int32_t old= m_state.fetch_add(1, std::memory_order_acquire);
92     if (old & (ACQUIRED | RECOVERED))
93       return true;
94     unlock();
95     return false;
96   }
unlock()97   void unlock()
98   { m_state.fetch_sub(1, std::memory_order_release); }
mark_uninitialized()99   void mark_uninitialized()
100   {
101     int32_t old= ACQUIRED;
102     while (!m_state.compare_exchange_weak(old, 0,
103                                           std::memory_order_relaxed,
104                                           std::memory_order_relaxed))
105     {
106       old&= ACQUIRED | RECOVERED;
107       (void) LF_BACKOFF();
108     }
109   }
acquired_to_recovered()110   void acquired_to_recovered()
111   {
112     m_state.fetch_or(RECOVERED, std::memory_order_relaxed);
113     m_state.fetch_and(~ACQUIRED, std::memory_order_release);
114   }
acquire_recovered()115   bool acquire_recovered()
116   {
117     int32_t old= RECOVERED;
118     while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED,
119                                           std::memory_order_acquire,
120                                           std::memory_order_relaxed))
121     {
122       if (!(old & RECOVERED) || (old & ACQUIRED))
123         return false;
124       old= RECOVERED;
125       (void) LF_BACKOFF();
126     }
127     return true;
128   }
lf_hash_initializer(LF_HASH * hash,XID_cache_element * element,XID_cache_insert_element * new_element)129   static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
130                                   XID_cache_element *element,
131                                   XID_cache_insert_element *new_element)
132   {
133     DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
134     element->rm_error= 0;
135     element->xa_state= new_element->xa_state;
136     element->xid.set(new_element->xid);
137     new_element->xid_cache_element= element;
138   }
lf_alloc_constructor(uchar * ptr)139   static void lf_alloc_constructor(uchar *ptr)
140   {
141     XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
142     element->m_state= 0;
143   }
lf_alloc_destructor(uchar * ptr)144   static void lf_alloc_destructor(uchar *ptr)
145   {
146     DBUG_ASSERT(!reinterpret_cast<XID_cache_element*>(ptr + LF_HASH_OVERHEAD)
147 		->is_set(ACQUIRED));
148   }
key(const XID_cache_element * element,size_t * length,my_bool not_used)149   static uchar *key(const XID_cache_element *element, size_t *length,
150                     my_bool not_used __attribute__((unused)))
151   {
152     *length= element->xid.key_length();
153     return element->xid.key();
154   }
155 };
156 
157 
158 static LF_HASH xid_cache;
159 static bool xid_cache_inited;
160 
161 
get_state_code() const162 enum xa_states XID_STATE::get_state_code() const
163 {
164   return xid_cache_element ? xid_cache_element->xa_state : XA_NO_STATE;
165 }
166 
167 
fix_xid_hash_pins()168 bool THD::fix_xid_hash_pins()
169 {
170   if (!xid_hash_pins)
171     xid_hash_pins= lf_hash_get_pins(&xid_cache);
172   return !xid_hash_pins;
173 }
174 
175 
set_error(uint error)176 void XID_STATE::set_error(uint error)
177 {
178   if (is_explicit_XA())
179     xid_cache_element->rm_error= error;
180 }
181 
182 
er_xaer_rmfail() const183 void XID_STATE::er_xaer_rmfail() const
184 {
185   static const char *xa_state_names[]=
186     { "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY", "NON-EXISTING"};
187   my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[get_state_code()]);
188 }
189 
190 
191 /**
192   Check that XA transaction has an uncommitted work. Report an error
193   to the user in case when there is an uncommitted work for XA transaction.
194 
195   @return  result of check
196     @retval  false  XA transaction is NOT in state IDLE, PREPARED
197                     or ROLLBACK_ONLY.
198     @retval  true   XA transaction is in state IDLE or PREPARED
199                     or ROLLBACK_ONLY.
200 */
201 
check_has_uncommitted_xa() const202 bool XID_STATE::check_has_uncommitted_xa() const
203 {
204   if (is_explicit_XA() && xid_cache_element->xa_state != XA_ACTIVE)
205   {
206     er_xaer_rmfail();
207     return true;
208   }
209   return false;
210 }
211 
212 
get_xid() const213 XID *XID_STATE::get_xid() const
214 {
215   DBUG_ASSERT(is_explicit_XA());
216   return &xid_cache_element->xid;
217 }
218 
219 
xid_cache_init()220 void xid_cache_init()
221 {
222   xid_cache_inited= true;
223   lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
224                (my_hash_get_key) XID_cache_element::key, &my_charset_bin);
225   xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
226   xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
227   xid_cache.initializer=
228     (lf_hash_initializer) XID_cache_element::lf_hash_initializer;
229 }
230 
231 
xid_cache_free()232 void xid_cache_free()
233 {
234   if (xid_cache_inited)
235   {
236     lf_hash_destroy(&xid_cache);
237     xid_cache_inited= false;
238   }
239 }
240 
241 
242 /**
243   Find recovered XA transaction by XID.
244 */
245 
xid_cache_search(THD * thd,XID * xid)246 static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
247 {
248   DBUG_ASSERT(thd->xid_hash_pins);
249   XID_cache_element *element=
250     (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
251                                         xid->key(), xid->key_length());
252   if (element)
253   {
254     if (!element->acquire_recovered())
255       element= 0;
256     lf_hash_search_unpin(thd->xid_hash_pins);
257     DEBUG_SYNC(thd, "xa_after_search");
258   }
259   return element;
260 }
261 
262 
xid_cache_insert(XID * xid)263 bool xid_cache_insert(XID *xid)
264 {
265   XID_cache_insert_element new_element(XA_PREPARED, xid);
266   LF_PINS *pins;
267 
268   if (!(pins= lf_hash_get_pins(&xid_cache)))
269     return true;
270 
271   int res= lf_hash_insert(&xid_cache, pins, &new_element);
272   switch (res)
273   {
274   case 0:
275     new_element.xid_cache_element->set(XID_cache_element::RECOVERED);
276     break;
277   case 1:
278     res= 0;
279   }
280   lf_hash_put_pins(pins);
281   return res;
282 }
283 
284 
xid_cache_insert(THD * thd,XID_STATE * xid_state,XID * xid)285 bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid)
286 {
287   XID_cache_insert_element new_element(XA_ACTIVE, xid);
288 
289   if (thd->fix_xid_hash_pins())
290     return true;
291 
292   int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, &new_element);
293   switch (res)
294   {
295   case 0:
296     xid_state->xid_cache_element= new_element.xid_cache_element;
297     xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
298     break;
299   case 1:
300     my_error(ER_XAER_DUPID, MYF(0));
301   }
302   return res;
303 }
304 
305 
xid_cache_delete(THD * thd,XID_cache_element * & element)306 static void xid_cache_delete(THD *thd, XID_cache_element *&element)
307 {
308   DBUG_ASSERT(thd->xid_hash_pins);
309   element->mark_uninitialized();
310   lf_hash_delete(&xid_cache, thd->xid_hash_pins,
311                  element->xid.key(), element->xid.key_length());
312 }
313 
314 
xid_cache_delete(THD * thd,XID_STATE * xid_state)315 void xid_cache_delete(THD *thd, XID_STATE *xid_state)
316 {
317   DBUG_ASSERT(xid_state->is_explicit_XA());
318   xid_cache_delete(thd, xid_state->xid_cache_element);
319   xid_state->xid_cache_element= 0;
320 }
321 
322 
323 struct xid_cache_iterate_arg
324 {
325   my_hash_walk_action action;
326   void *argument;
327 };
328 
xid_cache_iterate_callback(XID_cache_element * element,xid_cache_iterate_arg * arg)329 static my_bool xid_cache_iterate_callback(XID_cache_element *element,
330                                           xid_cache_iterate_arg *arg)
331 {
332   my_bool res= FALSE;
333   if (element->lock())
334   {
335     res= arg->action(element, arg->argument);
336     element->unlock();
337   }
338   return res;
339 }
340 
xid_cache_iterate(THD * thd,my_hash_walk_action action,void * arg)341 static int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
342 {
343   xid_cache_iterate_arg argument= { action, arg };
344   return thd->fix_xid_hash_pins() ? -1 :
345          lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
346                          (my_hash_walk_action) xid_cache_iterate_callback,
347                          &argument);
348 }
349 
350 
351 /**
352   Mark a XA transaction as rollback-only if the RM unilaterally
353   rolled back the transaction branch.
354 
355   @note If a rollback was requested by the RM, this function sets
356         the appropriate rollback error code and transits the state
357         to XA_ROLLBACK_ONLY.
358 
359   @return TRUE if transaction was rolled back or if the transaction
360           state is XA_ROLLBACK_ONLY. FALSE otherwise.
361 */
xa_trans_rolled_back(XID_cache_element * element)362 static bool xa_trans_rolled_back(XID_cache_element *element)
363 {
364   if (element->rm_error)
365   {
366     switch (element->rm_error) {
367     case ER_LOCK_WAIT_TIMEOUT:
368       my_error(ER_XA_RBTIMEOUT, MYF(0));
369       break;
370     case ER_LOCK_DEADLOCK:
371       my_error(ER_XA_RBDEADLOCK, MYF(0));
372       break;
373     default:
374       my_error(ER_XA_RBROLLBACK, MYF(0));
375     }
376     element->xa_state= XA_ROLLBACK_ONLY;
377   }
378 
379   return element->xa_state == XA_ROLLBACK_ONLY;
380 }
381 
382 
383 /**
384   Rollback the active XA transaction.
385 
386   @return TRUE if the rollback failed, FALSE otherwise.
387 */
388 
xa_trans_force_rollback(THD * thd)389 bool xa_trans_force_rollback(THD *thd)
390 {
391   bool rc= false;
392 
393   if (ha_rollback_trans(thd, true))
394   {
395     my_error(ER_XAER_RMERR, MYF(0));
396     rc= true;
397   }
398   thd->variables.option_bits&=
399     ~(OPTION_BEGIN | OPTION_KEEP_LOG | OPTION_GTID_BEGIN);
400   thd->transaction->all.reset();
401   thd->server_status&=
402     ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
403   DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
404   xid_cache_delete(thd, &thd->transaction->xid_state);
405 
406   trans_track_end_trx(thd);
407   thd->mdl_context.release_transactional_locks(thd);
408 
409   return rc;
410 }
411 
412 
413 /**
414   Starts an XA transaction with the given xid value.
415 
416   @param thd    Current thread
417 
418   @retval FALSE  Success
419   @retval TRUE   Failure
420 */
421 
trans_xa_start(THD * thd)422 bool trans_xa_start(THD *thd)
423 {
424   DBUG_ENTER("trans_xa_start");
425 
426   if (thd->transaction->xid_state.is_explicit_XA() &&
427       thd->transaction->xid_state.xid_cache_element->xa_state == XA_IDLE &&
428       thd->lex->xa_opt == XA_RESUME)
429   {
430     bool not_equal=
431       !thd->transaction->xid_state.xid_cache_element->xid.eq(thd->lex->xid);
432     if (not_equal)
433       my_error(ER_XAER_NOTA, MYF(0));
434     else
435     {
436       thd->transaction->xid_state.xid_cache_element->xa_state= XA_ACTIVE;
437       MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_ACTIVE);
438     }
439     DBUG_RETURN(not_equal);
440   }
441 
442   /* TODO: JOIN is not supported yet. */
443   if (thd->lex->xa_opt != XA_NONE)
444     my_error(ER_XAER_INVAL, MYF(0));
445   else if (!thd->lex->xid->gtrid_length)
446     my_error(ER_XAER_INVAL, MYF(0));
447   else if (thd->transaction->xid_state.is_explicit_XA())
448     thd->transaction->xid_state.er_xaer_rmfail();
449   else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction())
450     my_error(ER_XAER_OUTSIDE, MYF(0));
451   else if (!trans_begin(thd))
452   {
453     MYSQL_SET_TRANSACTION_XID(thd->m_transaction_psi, thd->lex->xid, XA_ACTIVE);
454     if (xid_cache_insert(thd, &thd->transaction->xid_state, thd->lex->xid))
455     {
456       trans_rollback(thd);
457       DBUG_RETURN(true);
458     }
459     DBUG_RETURN(FALSE);
460   }
461 
462   DBUG_RETURN(TRUE);
463 }
464 
465 
466 /**
467   Put a XA transaction in the IDLE state.
468 
469   @param thd    Current thread
470 
471   @retval FALSE  Success
472   @retval TRUE   Failure
473 */
474 
trans_xa_end(THD * thd)475 bool trans_xa_end(THD *thd)
476 {
477   DBUG_ENTER("trans_xa_end");
478 
479   /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */
480   if (thd->lex->xa_opt != XA_NONE)
481     my_error(ER_XAER_INVAL, MYF(0));
482   else if (!thd->transaction->xid_state.is_explicit_XA() ||
483            thd->transaction->xid_state.xid_cache_element->xa_state != XA_ACTIVE)
484     thd->transaction->xid_state.er_xaer_rmfail();
485   else if (!thd->transaction->xid_state.xid_cache_element->xid.eq(thd->lex->xid))
486     my_error(ER_XAER_NOTA, MYF(0));
487   else if (!xa_trans_rolled_back(thd->transaction->xid_state.xid_cache_element))
488   {
489     thd->transaction->xid_state.xid_cache_element->xa_state= XA_IDLE;
490     MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_IDLE);
491   }
492 
493   DBUG_RETURN(thd->is_error() ||
494     thd->transaction->xid_state.xid_cache_element->xa_state != XA_IDLE);
495 }
496 
497 
498 /**
499   Put a XA transaction in the PREPARED state.
500 
501   @param thd    Current thread
502 
503   @retval FALSE  Success
504   @retval TRUE   Failure
505 */
506 
trans_xa_prepare(THD * thd)507 bool trans_xa_prepare(THD *thd)
508 {
509   int res= 1;
510 
511   DBUG_ENTER("trans_xa_prepare");
512 
513   if (!thd->transaction->xid_state.is_explicit_XA() ||
514       thd->transaction->xid_state.xid_cache_element->xa_state != XA_IDLE)
515     thd->transaction->xid_state.er_xaer_rmfail();
516   else if (!thd->transaction->xid_state.xid_cache_element->xid.eq(thd->lex->xid))
517     my_error(ER_XAER_NOTA, MYF(0));
518   else
519   {
520     /*
521       Acquire metadata lock which will ensure that COMMIT is blocked
522       by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
523       progress blocks FTWRL).
524 
525       We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
526     */
527     MDL_request mdl_request;
528     MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
529                      MDL_STATEMENT);
530     if (thd->mdl_context.acquire_lock(&mdl_request,
531                                       thd->variables.lock_wait_timeout) ||
532         ha_prepare(thd))
533     {
534       if (!mdl_request.ticket)
535         ha_rollback_trans(thd, TRUE);
536       thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
537       thd->transaction->all.reset();
538       thd->server_status&=
539         ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
540       xid_cache_delete(thd, &thd->transaction->xid_state);
541       my_error(ER_XA_RBROLLBACK, MYF(0));
542     }
543     else
544     {
545       thd->transaction->xid_state.xid_cache_element->xa_state= XA_PREPARED;
546       MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_PREPARED);
547       res= thd->variables.pseudo_slave_mode || thd->slave_thread ?
548         slave_applier_reset_xa_trans(thd) : 0;
549     }
550   }
551 
552   DBUG_RETURN(res);
553 }
554 
555 
556 /**
557   Commit and terminate the a XA transaction.
558   Transactional locks are released if transaction ended
559 
560   @param thd    Current thread
561 
562   @retval FALSE  Success
563   @retval TRUE   Failure
564 
565 */
566 
trans_xa_commit(THD * thd)567 bool trans_xa_commit(THD *thd)
568 {
569   bool res= true;
570   XID_STATE &xid_state= thd->transaction->xid_state;
571 
572   DBUG_ENTER("trans_xa_commit");
573 
574   if (!xid_state.is_explicit_XA() ||
575       !xid_state.xid_cache_element->xid.eq(thd->lex->xid))
576   {
577     if (thd->in_multi_stmt_transaction_mode())
578     {
579       /*
580         Not allow to commit from inside an not-"native" to xid
581         ongoing transaction: the commit effect can't be reversed.
582       */
583       my_error(ER_XAER_OUTSIDE, MYF(0));
584       DBUG_RETURN(TRUE);
585     }
586     if (thd->lex->xa_opt != XA_NONE)
587     {
588       /*
589         Not allow to commit with one phase a prepared xa out of compatibility
590         with the native commit branch's error out.
591       */
592       my_error(ER_XAER_INVAL, MYF(0));
593       DBUG_RETURN(TRUE);
594     }
595     if (thd->fix_xid_hash_pins())
596     {
597       my_error(ER_OUT_OF_RESOURCES, MYF(0));
598       DBUG_RETURN(TRUE);
599     }
600 
601     if (auto xs= xid_cache_search(thd, thd->lex->xid))
602     {
603       res= xa_trans_rolled_back(xs);
604       /*
605         Acquire metadata lock which will ensure that COMMIT is blocked
606         by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
607         progress blocks FTWRL).
608 
609         We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
610       */
611       MDL_request mdl_request;
612       MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
613                        MDL_STATEMENT);
614       if (thd->mdl_context.acquire_lock(&mdl_request,
615                                         thd->variables.lock_wait_timeout))
616       {
617         /*
618           We can't rollback an XA transaction on lock failure due to
619           Innodb redo log and bin log update is involved in rollback.
620           Return error to user for a retry.
621         */
622         DBUG_ASSERT(thd->is_error());
623 
624         xs->acquired_to_recovered();
625         DBUG_RETURN(true);
626       }
627       DBUG_ASSERT(!xid_state.xid_cache_element);
628 
629       if (thd->wait_for_prior_commit())
630       {
631         DBUG_ASSERT(thd->is_error());
632 
633         xs->acquired_to_recovered();
634         DBUG_RETURN(true);
635       }
636 
637       xid_state.xid_cache_element= xs;
638       ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
639       xid_state.xid_cache_element= 0;
640 
641       res= res || thd->is_error();
642       xid_cache_delete(thd, xs);
643     }
644     else
645       my_error(ER_XAER_NOTA, MYF(0));
646     DBUG_RETURN(res);
647   }
648 
649   if (xa_trans_rolled_back(xid_state.xid_cache_element))
650   {
651     xa_trans_force_rollback(thd);
652     DBUG_RETURN(thd->is_error());
653   }
654   else if (xid_state.xid_cache_element->xa_state == XA_IDLE &&
655            thd->lex->xa_opt == XA_ONE_PHASE)
656   {
657     int r= ha_commit_trans(thd, TRUE);
658     if ((res= MY_TEST(r)))
659       my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
660   }
661   else if (thd->transaction->xid_state.xid_cache_element->xa_state == XA_PREPARED)
662   {
663     MDL_request mdl_request;
664     if (thd->lex->xa_opt != XA_NONE)
665     {
666       my_error(ER_XAER_INVAL, MYF(0));
667       DBUG_RETURN(TRUE);
668     }
669 
670     /*
671       Acquire metadata lock which will ensure that COMMIT is blocked
672       by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
673       progress blocks FTWRL).
674 
675       We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
676     */
677     MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
678                      MDL_TRANSACTION);
679 
680     if (thd->mdl_context.acquire_lock(&mdl_request,
681                                       thd->variables.lock_wait_timeout))
682     {
683       /*
684         We can't rollback an XA transaction on lock failure due to
685         Innodb redo log and bin log update is involved in rollback.
686         Return error to user for a retry.
687       */
688       my_error(ER_XAER_RMERR, MYF(0));
689       DBUG_RETURN(true);
690     }
691     else
692     {
693       DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
694 
695       res= MY_TEST(ha_commit_one_phase(thd, 1));
696       if (res)
697         my_error(ER_XAER_RMERR, MYF(0));
698       else
699       {
700         /*
701           Since we don't call ha_commit_trans() for prepared transactions,
702           we need to explicitly mark the transaction as committed.
703         */
704         MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi);
705       }
706 
707       thd->m_transaction_psi= NULL;
708     }
709   }
710   else
711   {
712     xid_state.er_xaer_rmfail();
713     DBUG_RETURN(TRUE);
714   }
715 
716   thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
717   thd->transaction->all.reset();
718   thd->server_status&=
719     ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
720   DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
721   xid_cache_delete(thd, &xid_state);
722 
723   trans_track_end_trx(thd);
724   thd->mdl_context.release_transactional_locks(thd);
725 
726   /* The transaction should be marked as complete in P_S. */
727   DBUG_ASSERT(thd->m_transaction_psi == NULL || res);
728   DBUG_RETURN(res);
729 }
730 
731 
732 /**
733   Roll back and terminate a XA transaction.
734   Transactional locks are released if transaction ended
735 
736   @param thd    Current thread
737 
738   @retval FALSE  Success
739   @retval TRUE   Failure
740 */
741 
trans_xa_rollback(THD * thd)742 bool trans_xa_rollback(THD *thd)
743 {
744   XID_STATE &xid_state= thd->transaction->xid_state;
745 
746   DBUG_ENTER("trans_xa_rollback");
747 
748   if (!xid_state.is_explicit_XA() ||
749       !xid_state.xid_cache_element->xid.eq(thd->lex->xid))
750   {
751     if (thd->in_multi_stmt_transaction_mode())
752     {
753       my_error(ER_XAER_OUTSIDE, MYF(0));
754       DBUG_RETURN(TRUE);
755     }
756     if (thd->fix_xid_hash_pins())
757     {
758       my_error(ER_OUT_OF_RESOURCES, MYF(0));
759       DBUG_RETURN(TRUE);
760     }
761 
762     if (auto xs= xid_cache_search(thd, thd->lex->xid))
763     {
764       MDL_request mdl_request;
765       MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
766                        MDL_STATEMENT);
767       if (thd->mdl_context.acquire_lock(&mdl_request,
768                                         thd->variables.lock_wait_timeout))
769       {
770         /*
771           We can't rollback an XA transaction on lock failure due to
772           Innodb redo log and bin log update is involved in rollback.
773           Return error to user for a retry.
774         */
775         DBUG_ASSERT(thd->is_error());
776 
777         xs->acquired_to_recovered();
778         DBUG_RETURN(true);
779       }
780       xa_trans_rolled_back(xs);
781       DBUG_ASSERT(!xid_state.xid_cache_element);
782 
783       if (thd->wait_for_prior_commit())
784       {
785         DBUG_ASSERT(thd->is_error());
786         xs->acquired_to_recovered();
787         DBUG_RETURN(true);
788       }
789 
790       xid_state.xid_cache_element= xs;
791       ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
792       xid_state.xid_cache_element= 0;
793       xid_cache_delete(thd, xs);
794     }
795     else
796       my_error(ER_XAER_NOTA, MYF(0));
797     DBUG_RETURN(thd->get_stmt_da()->is_error());
798   }
799 
800   if (xid_state.xid_cache_element->xa_state == XA_ACTIVE)
801   {
802     xid_state.er_xaer_rmfail();
803     DBUG_RETURN(TRUE);
804   }
805 
806   MDL_request mdl_request;
807   MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
808       MDL_STATEMENT);
809   if (thd->mdl_context.acquire_lock(&mdl_request,
810         thd->variables.lock_wait_timeout))
811   {
812     /*
813       We can't rollback an XA transaction on lock failure due to
814       Innodb redo log and bin log update is involved in rollback.
815       Return error to user for a retry.
816     */
817     my_error(ER_XAER_RMERR, MYF(0));
818     DBUG_RETURN(true);
819   }
820 
821   DBUG_RETURN(xa_trans_force_rollback(thd));
822 }
823 
824 
trans_xa_detach(THD * thd)825 bool trans_xa_detach(THD *thd)
826 {
827   DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA());
828 
829   if (thd->transaction->xid_state.xid_cache_element->xa_state != XA_PREPARED)
830     return xa_trans_force_rollback(thd);
831   else if (!thd->transaction->all.is_trx_read_write())
832   {
833     thd->transaction->xid_state.set_error(ER_XA_RBROLLBACK);
834     ha_rollback_trans(thd, true);
835   }
836 
837   thd->transaction->xid_state.xid_cache_element->acquired_to_recovered();
838   thd->transaction->xid_state.xid_cache_element= 0;
839   thd->transaction->cleanup();
840 
841   Ha_trx_info *ha_info, *ha_info_next;
842   for (ha_info= thd->transaction->all.ha_list;
843        ha_info;
844        ha_info= ha_info_next)
845   {
846     ha_info_next= ha_info->next();
847     ha_info->reset(); /* keep it conveniently zero-filled */
848   }
849 
850   thd->transaction->all.ha_list= 0;
851   thd->transaction->all.no_2pc= 0;
852   thd->m_transaction_psi= 0;
853   thd->server_status&= ~(SERVER_STATUS_IN_TRANS |
854                          SERVER_STATUS_IN_TRANS_READONLY);
855   thd->mdl_context.release_transactional_locks(thd);
856 
857   return false;
858 }
859 
860 
861 /**
862   return the XID as it appears in the SQL function's arguments.
863   So this string can be passed to XA START, XA PREPARE etc...
864 
865   @note
866     the 'buf' has to have space for at least SQL_XIDSIZE bytes.
867 */
868 
869 
870 /*
871   'a'..'z' 'A'..'Z', '0'..'9'
872   and '-' '_' ' ' symbols don't have to be
873   converted.
874 */
875 
876 static const char xid_needs_conv[128]=
877 {
878   1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
879   1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
880   0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
881   0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
882   1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
883   0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
884   1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
885   0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
886 };
887 
888 /*
889   The size of XID string representation in the form
890   'gtrid', 'bqual', formatID
891   see xid_t::get_sql_string() for details.
892 */
893 #define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS)
894 
895 /* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */
get_sql_xid(XID * xid,char * buf)896 static uint get_sql_xid(XID *xid, char *buf)
897 {
898   int tot_len= xid->gtrid_length + xid->bqual_length;
899   int i;
900   const char *orig_buf= buf;
901 
902   for (i=0; i<tot_len; i++)
903   {
904     uchar c= ((uchar *) xid->data)[i];
905     if (c >= 128 || xid_needs_conv[c])
906       break;
907   }
908 
909   if (i >= tot_len)
910   {
911     /* No need to convert characters to hexadecimals. */
912     *buf++= '\'';
913     memcpy(buf, xid->data, xid->gtrid_length);
914     buf+= xid->gtrid_length;
915     *buf++= '\'';
916     if (xid->bqual_length > 0 || xid->formatID != 1)
917     {
918       *buf++= ',';
919       *buf++= '\'';
920       memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
921       buf+= xid->bqual_length;
922       *buf++= '\'';
923     }
924   }
925   else
926   {
927     *buf++= 'X';
928     *buf++= '\'';
929     for (i= 0; i < xid->gtrid_length; i++)
930     {
931       *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
932       *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
933     }
934     *buf++= '\'';
935     if (xid->bqual_length > 0 || xid->formatID != 1)
936     {
937       *buf++= ',';
938       *buf++= 'X';
939       *buf++= '\'';
940       for (; i < tot_len; i++)
941       {
942         *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
943         *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
944       }
945       *buf++= '\'';
946     }
947   }
948 
949   if (xid->formatID != 1)
950   {
951     *buf++= ',';
952     buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
953             MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
954   }
955 
956   return (uint)(buf - orig_buf);
957 }
958 
959 
960 /**
961   return the list of XID's to a client, the same way SHOW commands do.
962 
963   @note
964     I didn't find in XA specs that an RM cannot return the same XID twice,
965     so mysql_xa_recover does not filter XID's to ensure uniqueness.
966     It can be easily fixed later, if necessary.
967 */
968 
xa_recover_callback(XID_cache_element * xs,Protocol * protocol,char * data,uint data_len,CHARSET_INFO * data_cs)969 static my_bool xa_recover_callback(XID_cache_element *xs, Protocol *protocol,
970                   char *data, uint data_len, CHARSET_INFO *data_cs)
971 {
972   if (xs->xa_state == XA_PREPARED)
973   {
974     protocol->prepare_for_resend();
975     protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
976     protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
977     protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
978     protocol->store(data, data_len, data_cs);
979     if (protocol->write())
980       return TRUE;
981   }
982   return FALSE;
983 }
984 
985 
xa_recover_callback_short(XID_cache_element * xs,Protocol * protocol)986 static my_bool xa_recover_callback_short(XID_cache_element *xs,
987                                          Protocol *protocol)
988 {
989   return xa_recover_callback(xs, protocol, xs->xid.data,
990       xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
991 }
992 
993 
xa_recover_callback_verbose(XID_cache_element * xs,Protocol * protocol)994 static my_bool xa_recover_callback_verbose(XID_cache_element *xs,
995                                            Protocol *protocol)
996 {
997   char buf[SQL_XIDSIZE];
998   uint len= get_sql_xid(&xs->xid, buf);
999   return xa_recover_callback(xs, protocol, buf, len,
1000                              &my_charset_utf8mb3_general_ci);
1001 }
1002 
1003 
mysql_xa_recover(THD * thd)1004 bool mysql_xa_recover(THD *thd)
1005 {
1006   List<Item> field_list;
1007   Protocol *protocol= thd->protocol;
1008   MEM_ROOT *mem_root= thd->mem_root;
1009   my_hash_walk_action action;
1010   DBUG_ENTER("mysql_xa_recover");
1011 
1012   field_list.push_back(new (mem_root)
1013                        Item_int(thd, "formatID", 0,
1014                                 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
1015   field_list.push_back(new (mem_root)
1016                        Item_int(thd, "gtrid_length", 0,
1017                                 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
1018   field_list.push_back(new (mem_root)
1019                        Item_int(thd, "bqual_length", 0,
1020                                 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
1021   {
1022     uint len;
1023     CHARSET_INFO *cs;
1024 
1025     if (thd->lex->verbose)
1026     {
1027       len= SQL_XIDSIZE;
1028       cs= &my_charset_utf8mb3_general_ci;
1029       action= (my_hash_walk_action) xa_recover_callback_verbose;
1030     }
1031     else
1032     {
1033       len= XIDDATASIZE;
1034       cs= &my_charset_bin;
1035       action= (my_hash_walk_action) xa_recover_callback_short;
1036     }
1037 
1038     field_list.push_back(new (mem_root)
1039                          Item_empty_string(thd, "data", len, cs), mem_root);
1040   }
1041 
1042   if (protocol->send_result_set_metadata(&field_list,
1043                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1044     DBUG_RETURN(1);
1045 
1046   if (xid_cache_iterate(thd, action, protocol))
1047     DBUG_RETURN(1);
1048   my_eof(thd);
1049   DBUG_RETURN(0);
1050 }
1051 
1052 
1053 /**
1054   This is a specific to (pseudo-) slave applier collection of standard cleanup
1055   actions to reset XA transaction state sim to @c ha_commit_one_phase.
1056   THD of the slave applier is dissociated from a transaction object in engine
1057   that continues to exist there.
1058 
1059   @param  THD current thread
1060   @return the value of is_error()
1061 */
1062 
slave_applier_reset_xa_trans(THD * thd)1063 static bool slave_applier_reset_xa_trans(THD *thd)
1064 {
1065   thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
1066   thd->server_status&=
1067     ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
1068   DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
1069 
1070   thd->transaction->xid_state.xid_cache_element->acquired_to_recovered();
1071   thd->transaction->xid_state.xid_cache_element= 0;
1072 
1073   for (Ha_trx_info *ha_info= thd->transaction->all.ha_list, *ha_info_next;
1074        ha_info; ha_info= ha_info_next)
1075   {
1076     ha_info_next= ha_info->next();
1077     ha_info->reset();
1078   }
1079   thd->transaction->all.ha_list= 0;
1080 
1081   ha_close_connection(thd);
1082   thd->transaction->cleanup();
1083   thd->transaction->all.reset();
1084 
1085   DBUG_ASSERT(!thd->transaction->all.ha_list);
1086   DBUG_ASSERT(!thd->transaction->all.no_2pc);
1087 
1088   thd->has_waiter= false;
1089   MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi); // TODO/Fixme: commit?
1090   thd->m_transaction_psi= NULL;
1091   return thd->is_error();
1092 }
1093