1 /*
2    Copyright (c) 2011, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "storage/ndb/plugin/ndb_global_schema_lock.h"
26 
27 #include <mutex>
28 
29 #include "my_dbug.h"
30 #include "mysql/plugin.h"
31 #include "sql/debug_sync.h"
32 #include "sql/sql_class.h"
33 #include "sql/sql_thd_internal_api.h"  // thd_query_unsafe
34 #include "storage/ndb/include/ndbapi/NdbApi.hpp"
35 #include "storage/ndb/plugin/ndb_schema_dist.h"
36 #include "storage/ndb/plugin/ndb_sleep.h"
37 #include "storage/ndb/plugin/ndb_table_guard.h"
38 
39 /**
40  * There is a potential for deadlocks between MDL and GSL locks:
41  *
42  * A client thread might have acquired an MDL_INTENTIONAL_EXCLUSIVE (IX)
43  * lock, and attempt to upgrade this to a MDL_EXCLUSIVE (X) locks, which
44  * requires the GSL lock to be taken.
45  *
46  * However, the GSL lock may already be held by the binlog schema-change
47  * coordinator on another mysqld. All participants has to complete
48  * the schema change op before the coordinator will release the GSL.
49  * As part of that, the participants will request a MDL-X-lock which blocks
50  * due to the other client thread holding an MDL-IX-lock. Thus, we
51  * have effectively a deadlock between the client thread and the
52  * schema change participant.
53  *
54  * We detect, and break, such deadlock by recording whether we
55  * have an active 'IS_SCHEMA_DIST_PARTICIPANT' on this mysqld.
56  * Iff another GSL request times-out while there are active
57  * schema dist participants, we *assume* we were involved in
58  * a deadlock.
59  *
60  * The MDL code is able to handle such deadlocks by releasing the
61  * locks and retry later
62  */
63 
64 static class Ndb_thd_gsl_participant {
65   std::mutex m_mutex;
66   const THD *m_thd{nullptr};
67 
68  public:
operator =(const THD * thd)69   Ndb_thd_gsl_participant &operator=(const THD *thd) {
70     std::lock_guard<std::mutex> lock_thd(m_mutex);
71     m_thd = thd;
72     return *this;
73   }
operator !=(const THD * thd)74   bool operator!=(const THD *thd) {
75     std::lock_guard<std::mutex> lock_thd(m_mutex);
76     return m_thd != thd;
77   }
78 } thd_gsl_participant;
79 
ndb_set_gsl_participant(THD * thd)80 static void ndb_set_gsl_participant(THD *thd) { thd_gsl_participant = thd; }
81 
ndb_is_gsl_participant_active()82 static bool ndb_is_gsl_participant_active() {
83   return (thd_gsl_participant != nullptr);
84 }
85 
86 /**
87  * Another potential scenario for a deadlock between MDL and GSL locks is as
88  * follows:
89  *
90  * A disk data table DDL will try and acquire the following -
91  *  - Global read lock of type INTENTION EXCLUSIVE (IX)
92  *  - IX lock on the schema
93  *  - Shared lock on the table
94  *  - Backup lock of type IX
95  *  - IX lock on the tablespace
96  *  - Upgrade the previously acquired shared lock on the table to an EXCLUSIVE
97  *    (X) lock
98  *  - The X lock is granted only after the GSL has been acquired
99  *
100  * A tablespace DDL will try and acquire the following -
101  *  - Global read lock of type IX
102  *  - X lock on the 'ts1' tablespace
103  *  - The X lock is granted only after the GSL has been acquired
104  *  - Backup lock of type IX
105  *
106  * Assume that the table DDL has acquired an IX lock on the tablespace and is
107  * waiting for the GSL in order to acquire an X lock on the table. At the same
108  * time the tablespace DDL has acquired the GSL and is waiting to acquire an X
109  * lock on the tablespace - Deadlock!
110  *
111  * We detect such a deadlock by tracking when the GSL is acquired (and released)
112  * during an attempt to obtain an X lock on a tablespace. When this condition
113  * holds true (along with the other 2 conditions specified in gsl_lock_ext()
114  * below), we assume that a deadlock has occurred
115  */
116 
117 static class Ndb_tablespace_gsl_guard {
118   std::mutex m_tablespace_gsl_acquired_mutex;  // for m_tablespace_gsl_acquired
119   bool m_tablespace_gsl_acquired{false};
120 
121  public:
tablespace_gsl_acquired()122   void tablespace_gsl_acquired() {
123     std::lock_guard<std::mutex> lock_gsl_acquired(
124         m_tablespace_gsl_acquired_mutex);
125     m_tablespace_gsl_acquired = true;
126   }
127 
tablespace_gsl_released()128   void tablespace_gsl_released() {
129     std::lock_guard<std::mutex> lock_gsl_acquired(
130         m_tablespace_gsl_acquired_mutex);
131     m_tablespace_gsl_acquired = false;
132   }
133 
is_tablespace_gsl_acquired()134   bool is_tablespace_gsl_acquired() {
135     std::lock_guard<std::mutex> lock_gsl_acquired(
136         m_tablespace_gsl_acquired_mutex);
137     return m_tablespace_gsl_acquired;
138   }
139 } tablespace_gsl_guard;
140 
141 /*
142   The lock/unlock functions use the BACKUP_SEQUENCE row in SYSTAB_0
143 
144   In case retry = true, the function will retry infinitely or until the THD
145   is killed or a GSL / MDL deadlock is detected/assumed. In the last case a
146   timeout error (266) is returned. If retry = false, then the function attempts
147   to acquire GSL only once and returns.
148 
149   Returns a NdbTransaction owning the gsl-lock if it was taken. NULL is returned
150   if failed to take lock. Returned NdbError will then contain the error code if
151   lock failed due to some NdbError. If there is no error code set, lock was
152   rejected by lock manager, likely due to deadlock.
153 */
gsl_lock_ext(THD * thd,Ndb * ndb,NdbError & ndb_error,bool retry,bool no_wait)154 static NdbTransaction *gsl_lock_ext(THD *thd, Ndb *ndb, NdbError &ndb_error,
155                                     bool retry, bool no_wait) {
156   while (true) {
157     /*
158       while loop to control the behaviour of the attempt to lock the row.
159       - Temporary errors are dealt with by closing the transaction (if
160         applicable) and continuing from the beginning of the loop if retry is
161         set to true. A fresh attempt to acquire the GSL occurs after a random
162         sleep. If retry = false, even temporary errors are handled as described
163         in the next point
164       - Other errors are handled by setting ndb_error, closing the transaction
165         (if applicable), and returning nullptr
166       - A pointer to the NdbTransaction is returned in case of success
167     */
168 
169     // Get table from dictionary
170     Ndb_table_guard ndbtab_g(ndb, "sys", "SYSTAB_0");
171     const NdbDictionary::Table *ndbtab = ndbtab_g.get_table();
172     if (ndbtab == nullptr) {
173       if (ndb->getDictionary()->getNdbError().status ==
174               NdbError::TemporaryError &&
175           retry) {
176         ndb_trans_retry_sleep();
177         continue;
178       }
179       ndb_error = ndb->getDictionary()->getNdbError();
180       return nullptr;
181     }
182 
183     // Start NDB transaction
184     NdbTransaction *trans = ndb->startTransaction();
185     if (trans == nullptr) {
186       ndb_error = ndb->getNdbError();
187       return nullptr;
188     }
189 
190     // Get NDB operation
191     NdbOperation *op = trans->getNdbOperation(ndbtab);
192     if (op == nullptr) {
193       if (trans->getNdbError().status == NdbError::TemporaryError && retry) {
194         ndb->closeTransaction(trans);
195         ndb_trans_retry_sleep();
196         continue;
197       }
198       ndb_error = trans->getNdbError();
199       ndb->closeTransaction(trans);
200       return nullptr;
201     }
202 
203     // Read the tuple
204     if (op->readTuple(NdbOperation::LM_Exclusive)) {
205       ndb_error = trans->getNdbError();
206       ndb->closeTransaction(trans);
207       return nullptr;
208     }
209 
210     // Set the 'NoWait' option if the caller has requested to do so
211     if (no_wait && op->setNoWait()) {
212       ndb_error = trans->getNdbError();
213       ndb->closeTransaction(trans);
214       return nullptr;
215     }
216 
217     // Attempt to lock the tuple where SYSKEY_0 = NDB_BACKUP_SEQUENCE
218     if (op->equal("SYSKEY_0", NDB_BACKUP_SEQUENCE)) {
219       ndb_error = trans->getNdbError();
220       ndb->closeTransaction(trans);
221       return nullptr;
222     }
223 
224     // Execute transaction
225     if (trans->execute(NdbTransaction::NoCommit) == 0) {
226       /*
227         The transaction is successful but still check if the operation has
228         failed since the abort mode is set to AO_IgnoreError. Error 635
229         is the expected error when no_wait has been set and the row could not
230         be locked immediately
231       */
232       if (trans->getNdbError().code == 635) {
233         ndb_error = trans->getNdbError();
234         ndb->closeTransaction(trans);
235         return nullptr;
236       }
237       /*
238         Transaction executed successfully i.e. GSL has been obtained. The
239         transaction will eventually be closed in the gsl_unlock_ext() function
240       */
241       return trans;
242     }
243 
244     if (trans->getNdbError().status != NdbError::TemporaryError ||
245         thd_killed(thd)) {
246       ndb_error = trans->getNdbError();
247       ndb->closeTransaction(trans);
248       return nullptr;
249     }
250 
251     /*
252       Check for MDL / GSL deadlock. A deadlock is assumed if:
253       1)  ::execute failed with a timeout error.
254       2a) There already is another THD being an participant in a schema distr.
255           operation (which implies that the coordinator already held the GSL
256                                   OR
257       2b) The GSL has already been acquired for a pending exclusive MDL on a
258           tablespace. It's highly likely that there are two DDL statements
259           competing for a lock on the same tablespace
260       3)  This THD holds a lock being waited for by another THD
261 
262       Note: If we incorrectly assume a deadlock above, the caller
263       will still either retry indefinitely as today, (notify_alter),
264       or now be able to release locks gotten so far and retry later.
265     */
266     if (trans->getNdbError().code == 266 &&                     // 1)
267         (ndb_is_gsl_participant_active() ||                     // 2a)
268          tablespace_gsl_guard.is_tablespace_gsl_acquired()) &&  // 2b)
269         thd->mdl_context.has_locks_waited_for()) {              // 3)
270       ndb_error = trans->getNdbError();
271       ndb->closeTransaction(trans);
272       return nullptr;
273     }
274 
275     DBUG_ASSERT(trans->getNdbError().status == NdbError::TemporaryError);
276     if (!retry) {
277       ndb_error = trans->getNdbError();
278       ndb->closeTransaction(trans);
279       return nullptr;
280     }
281     // Sleep and then retry
282     ndb->closeTransaction(trans);
283     ndb_trans_retry_sleep();
284   }
285 
286   // This should be unreachable code
287   DBUG_ASSERT(false);
288   return nullptr;
289 }
290 
gsl_unlock_ext(Ndb * ndb,NdbTransaction * trans,NdbError & ndb_error)291 static bool gsl_unlock_ext(Ndb *ndb, NdbTransaction *trans,
292                            NdbError &ndb_error) {
293   if (trans->execute(NdbTransaction::Commit)) {
294     ndb_error = trans->getNdbError();
295     ndb->closeTransaction(trans);
296     return false;
297   }
298   ndb->closeTransaction(trans);
299   return true;
300 }
301 
302 class Thd_proc_info_guard {
303  public:
Thd_proc_info_guard(THD * thd)304   Thd_proc_info_guard(THD *thd) : m_thd(thd), m_proc_info(NULL) {}
set(const char * message)305   void set(const char *message) {
306     const char *old = thd_proc_info(m_thd, message);
307     if (!m_proc_info) {
308       // Save the original on first change
309       m_proc_info = old;
310     }
311   }
~Thd_proc_info_guard()312   ~Thd_proc_info_guard() {
313     if (m_proc_info) thd_proc_info(m_thd, m_proc_info);
314   }
315 
316  private:
317   THD *const m_thd;
318   const char *m_proc_info;
319 };
320 
321 #include "storage/ndb/plugin/ndb_log.h"
322 #include "storage/ndb/plugin/ndb_thd.h"
323 #include "storage/ndb/plugin/ndb_thd_ndb.h"
324 
325 /*
326   lock/unlock calls are reference counted, so calls to lock
327   must be matched to a call to unlock if the lock call succeeded
328 */
ndbcluster_global_schema_lock(THD * thd,bool report_cluster_disconnected,bool is_tablespace,bool * victimized)329 static int ndbcluster_global_schema_lock(THD *thd,
330                                          bool report_cluster_disconnected,
331                                          bool is_tablespace, bool *victimized) {
332   Ndb *ndb = check_ndb_in_thd(thd);
333   Thd_ndb *thd_ndb = get_thd_ndb(thd);
334   NdbError ndb_error;
335   *victimized = false;
336 
337   if (thd_ndb->check_option(Thd_ndb::IS_SCHEMA_DIST_PARTICIPANT)) {
338     ndb_set_gsl_participant(thd);
339     return 0;
340   }
341   DBUG_TRACE;
342 
343   if (thd_ndb->global_schema_lock_count) {
344     // Remember that GSL was locked for tablespace
345     if (is_tablespace) tablespace_gsl_guard.tablespace_gsl_acquired();
346 
347     if (thd_ndb->global_schema_lock_trans)
348       thd_ndb->global_schema_lock_trans->refresh();
349     else
350       DBUG_ASSERT(thd_ndb->global_schema_lock_error != 0);
351     thd_ndb->global_schema_lock_count++;
352     DBUG_PRINT("exit", ("global_schema_lock_count: %d",
353                         thd_ndb->global_schema_lock_count));
354     return 0;
355   }
356   DBUG_ASSERT(thd_ndb->global_schema_lock_count == 0);
357   thd_ndb->global_schema_lock_count = 1;
358   thd_ndb->global_schema_lock_error = 0;
359   DBUG_PRINT("exit", ("global_schema_lock_count: %d",
360                       thd_ndb->global_schema_lock_count));
361 
362   /*
363     Take the lock
364   */
365   Thd_proc_info_guard proc_info(thd);
366   proc_info.set("Waiting for ndbcluster global schema lock");
367   thd_ndb->global_schema_lock_trans =
368       gsl_lock_ext(thd, ndb, ndb_error, true /* retry */, false /* no_wait */);
369 
370   if (DBUG_EVALUATE_IF("sleep_after_global_schema_lock", true, false)) {
371     ndb_milli_sleep(6000);
372   }
373 
374   if (thd_ndb->global_schema_lock_trans) {
375     ndb_log_verbose(19, "Global schema lock acquired");
376 
377     // Count number of global schema locks taken by this thread
378     thd_ndb->schema_locks_count++;
379     thd_ndb->global_schema_lock_count = 1;
380     DBUG_PRINT("info", ("schema_locks_count: %d", thd_ndb->schema_locks_count));
381 
382     // Remember that GSL was locked for tablespace
383     if (is_tablespace) tablespace_gsl_guard.tablespace_gsl_acquired();
384 
385     // Sync point used when testing global schema lock concurrency
386     DEBUG_SYNC(thd, "ndb_global_schema_lock_acquired");
387 
388     return 0;
389   }
390   // Else, didn't get GSL: Deadlock or failure from NDB
391 
392   /**
393    * If GSL request failed due to no cluster connection (4009),
394    * we consider the lock granted, else GSL request failed.
395    */
396   if (ndb_error.code != 4009)  // No cluster connection
397   {
398     DBUG_ASSERT(thd_ndb->global_schema_lock_count == 1);
399     // This reset triggers the special case in ndbcluster_global_schema_unlock()
400     thd_ndb->global_schema_lock_count = 0;
401   }
402 
403   if (ndb_error.code == 266)  // Deadlock resolution
404   {
405     ndb_log_info(
406         "Failed to acquire global schema lock due to deadlock resolution");
407     *victimized = true;
408   } else if (ndb_error.code != 4009 || report_cluster_disconnected) {
409     if (ndb_thd_is_background_thread(thd)) {
410       // Don't push any warning when background thread fail to acquire GSL
411     } else {
412       thd_ndb->push_ndb_error_warning(ndb_error);
413       thd_ndb->push_warning("Could not acquire global schema lock");
414     }
415   }
416   thd_ndb->global_schema_lock_error = ndb_error.code ? ndb_error.code : -1;
417   return -1;
418 }
419 
ndbcluster_global_schema_unlock(THD * thd,bool is_tablespace)420 static int ndbcluster_global_schema_unlock(THD *thd, bool is_tablespace) {
421   Thd_ndb *thd_ndb = get_thd_ndb(thd);
422   if (unlikely(thd_ndb == NULL)) {
423     return 0;
424   }
425 
426   if (thd_ndb->check_option(Thd_ndb::IS_SCHEMA_DIST_PARTICIPANT)) {
427     ndb_set_gsl_participant(NULL);
428     return 0;
429   }
430 
431   if (thd_ndb->global_schema_lock_error != 4009 &&
432       thd_ndb->global_schema_lock_count == 0) {
433     // Special case to handle unlock after failure to acquire GSL due to
434     // any error other than 4009.
435     // - when error 4009 occurs the lock is granted anyway and the lock count is
436     // not reset, thus unlock() should be called.
437     // - for other errors the lock is not granted, lock count is reset and
438     // the exact same error code is returned. Thus it's impossible to know
439     // that there is actually no need to call unlock. Fix by allowing unlock
440     // without doing anything since the trans is already closed.
441     DBUG_ASSERT(thd_ndb->global_schema_lock_trans == NULL);
442     thd_ndb->global_schema_lock_count++;
443   }
444 
445   Ndb *ndb = thd_ndb->ndb;
446   DBUG_TRACE;
447   NdbTransaction *trans = thd_ndb->global_schema_lock_trans;
448   // Don't allow decrementing from zero
449   DBUG_ASSERT(thd_ndb->global_schema_lock_count > 0);
450   thd_ndb->global_schema_lock_count--;
451   DBUG_PRINT("exit", ("global_schema_lock_count: %d",
452                       thd_ndb->global_schema_lock_count));
453   DBUG_ASSERT(ndb != NULL);
454   if (ndb == NULL) {
455     return 0;
456   }
457   DBUG_ASSERT(trans != NULL || thd_ndb->global_schema_lock_error != 0);
458   if (thd_ndb->global_schema_lock_count != 0) {
459     return 0;
460   }
461   thd_ndb->global_schema_lock_error = 0;
462 
463   if (trans) {
464     thd_ndb->global_schema_lock_trans = NULL;
465 
466     // Remember GSL for tablespace released
467     if (is_tablespace) tablespace_gsl_guard.tablespace_gsl_released();
468 
469     NdbError ndb_error;
470     if (!gsl_unlock_ext(ndb, trans, ndb_error)) {
471       ndb_log_warning("Failed to release global schema lock, error: (%d)%s",
472                       ndb_error.code, ndb_error.message);
473       thd_ndb->push_ndb_error_warning(ndb_error);
474       thd_ndb->push_warning("Failed to release global schema lock");
475       return -1;
476     }
477 
478     ndb_log_verbose(19, "Global schema lock release");
479   }
480   return 0;
481 }
482 
ndb_gsl_lock(THD * thd,bool lock,bool is_tablespace,bool * victimized)483 bool ndb_gsl_lock(THD *thd, bool lock, bool is_tablespace, bool *victimized) {
484   DBUG_TRACE;
485 
486   if (lock) {
487     if (ndbcluster_global_schema_lock(thd, true, is_tablespace, victimized) !=
488         0) {
489       DBUG_PRINT("error", ("Failed to lock global schema lock"));
490       return true;  // Error
491     }
492 
493     return false;  // OK
494   }
495 
496   *victimized = false;
497   if (ndbcluster_global_schema_unlock(thd, is_tablespace) != 0) {
498     DBUG_PRINT("error", ("Failed to unlock global schema lock"));
499     return true;  // Error
500   }
501 
502   return false;  // OK
503 }
504 
has_required_global_schema_lock(const char * func) const505 bool Thd_ndb::has_required_global_schema_lock(const char *func) const {
506   if (global_schema_lock_error) {
507     // An error occurred while locking, either because
508     // no connection to cluster or another user has locked
509     // the lock -> ok, but caller should not allow to continue
510     return false;
511   }
512 
513   if (global_schema_lock_trans) {
514     global_schema_lock_trans->refresh();
515     return true;  // All OK
516   }
517 
518   // No attempt at taking global schema lock has been done, neither
519   // error or trans set -> programming error
520   LEX_CSTRING query = thd_query_unsafe(m_thd);
521   ndb_log_error(
522       "programming error, no lock taken while running "
523       "query '%*s' in function '%s'",
524       (int)query.length, query.str, func);
525   abort();
526   return false;
527 }
528 
529 #include "storage/ndb/plugin/ndb_global_schema_lock_guard.h"
530 
Ndb_global_schema_lock_guard(THD * thd)531 Ndb_global_schema_lock_guard::Ndb_global_schema_lock_guard(THD *thd)
532     : m_thd(thd), m_locked(false), m_try_locked(false) {}
533 
~Ndb_global_schema_lock_guard()534 Ndb_global_schema_lock_guard::~Ndb_global_schema_lock_guard() {
535   if (m_try_locked)
536     unlock();
537   else if (m_locked)
538     ndbcluster_global_schema_unlock(m_thd, false /* is_tablespace */);
539 }
540 
541 /**
542  * Set a Global Schema Lock.
543  * May fail due to either Ndb Cluster failure, or due to being
544  * 'victimized' as part of deadlock resolution. In the later case we
545  * retry the GSL locking.
546  */
lock(void)547 int Ndb_global_schema_lock_guard::lock(void) {
548   /* only one lock call allowed */
549   assert(!m_locked);
550 
551   /*
552     Always set m_locked, even if lock fails. Since the
553     lock/unlock calls are reference counted, the number
554     of calls to lock and unlock need to match up.
555   */
556   m_locked = true;
557   bool victimized = false;
558   bool ret;
559   do {
560     ret = ndbcluster_global_schema_lock(m_thd, false, false /* is_tablespace */,
561                                         &victimized);
562     if (ret && thd_killed(m_thd)) {
563       // Failed to acuire GSL and THD is killed -> give up!
564       break;  // Terminate loop
565     }
566   } while (victimized);
567 
568   return ret;
569 }
570 
try_lock(void)571 bool Ndb_global_schema_lock_guard::try_lock(void) {
572   /*
573     Always set m_locked, even if lock fails. Since the lock/unlock calls are
574     reference counted, the number of calls to lock and unlock need to match up.
575   */
576   m_locked = true;
577   m_try_locked = true;
578   Thd_ndb *thd_ndb = get_thd_ndb(m_thd);
579   // Check if this thd has acquired GSL already
580   if (thd_ndb->global_schema_lock_count) return false;
581 
582   thd_ndb->global_schema_lock_error = 0;
583 
584   Ndb *ndb = check_ndb_in_thd(m_thd);
585   NdbError ndb_error;
586   // Attempt to take the GSL with no retry and no waiting
587   thd_ndb->global_schema_lock_trans =
588       gsl_lock_ext(m_thd, ndb, ndb_error, false, /* retry */
589                    true /* no_wait */);
590 
591   if (thd_ndb->global_schema_lock_trans != nullptr) {
592     ndb_log_verbose(19, "Global schema lock acquired");
593 
594     // Count number of global schema locks taken by this thread
595     thd_ndb->schema_locks_count++;
596     thd_ndb->global_schema_lock_count = 1;
597     DBUG_PRINT("info", ("schema_locks_count: %d", thd_ndb->schema_locks_count));
598 
599     return true;
600   }
601   thd_ndb->global_schema_lock_error = ndb_error.code ? ndb_error.code : -1;
602   return false;
603 }
604 
unlock()605 bool Ndb_global_schema_lock_guard::unlock() {
606   // This function should only be called in conjunction with try_lock()
607   DBUG_ASSERT(m_try_locked);
608 
609   Thd_ndb *thd_ndb = get_thd_ndb(m_thd);
610   if (unlikely(thd_ndb == nullptr)) {
611     return true;
612   }
613 
614   Ndb *ndb = thd_ndb->ndb;
615   if (ndb == nullptr) {
616     return true;
617   }
618   NdbTransaction *trans = thd_ndb->global_schema_lock_trans;
619   thd_ndb->global_schema_lock_error = 0;
620   if (trans != nullptr) {
621     thd_ndb->global_schema_lock_trans = nullptr;
622     thd_ndb->global_schema_lock_count = 0;
623 
624     NdbError ndb_error;
625     if (!gsl_unlock_ext(ndb, trans, ndb_error)) {
626       ndb_log_warning("Failed to release global schema lock, error: (%d)%s",
627                       ndb_error.code, ndb_error.message);
628       thd_ndb->push_ndb_error_warning(ndb_error);
629       thd_ndb->push_warning("Failed to release global schema lock");
630       return false;
631     }
632     ndb_log_verbose(19, "Global schema lock release");
633   }
634   return true;
635 }
636