1 /*
2    Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "storage/ndb/plugin/ndb_share.h"
26 
27 #include <iostream>
28 #include <sstream>
29 #include <unordered_set>
30 
31 #include "m_string.h"
32 #include "sql/sql_class.h"
33 #include "sql/strfunc.h"
34 #include "sql/table.h"
35 #include "storage/ndb/include/ndbapi/NdbEventOperation.hpp"
36 #include "storage/ndb/plugin/ndb_conflict.h"
37 #include "storage/ndb/plugin/ndb_event_data.h"
38 #include "storage/ndb/plugin/ndb_log.h"
39 #include "storage/ndb/plugin/ndb_name_util.h"
40 #include "storage/ndb/plugin/ndb_require.h"
41 #include "storage/ndb/plugin/ndb_table_map.h"
42 
43 extern Ndb *g_ndb;
44 extern mysql_mutex_t ndbcluster_mutex;
45 
46 // List of NDB_SHARE's which correspond to an open table.
47 std::unique_ptr<collation_unordered_map<std::string, NDB_SHARE *>>
48     ndbcluster_open_tables;
49 
50 // List of NDB_SHARE's which have been dropped, they are kept in this list
51 // until all references to them have been released.
52 static std::unordered_set<NDB_SHARE *> dropped_shares;
53 
create(const char * key)54 NDB_SHARE *NDB_SHARE::create(const char *key) {
55   if (DBUG_EVALUATE_IF("ndb_share_create_fail1", true, false)) {
56     // Simulate failure to create NDB_SHARE
57     return nullptr;
58   }
59 
60   NDB_SHARE *share;
61   if (!(share = (NDB_SHARE *)my_malloc(PSI_INSTRUMENT_ME, sizeof(*share),
62                                        MYF(MY_WME | MY_ZEROFILL))))
63     return nullptr;
64 
65   share->flags = 0;
66   share->state = NSS_INITIAL;
67 
68   /* Allocates enough space for key, db, and table_name */
69   share->key = NDB_SHARE::create_key(key);
70 
71   share->db = NDB_SHARE::key_get_db_name(share->key);
72   share->table_name = NDB_SHARE::key_get_table_name(share->key);
73 
74   thr_lock_init(&share->lock);
75   mysql_mutex_init(PSI_INSTRUMENT_ME, &share->mutex, MY_MUTEX_INIT_FAST);
76 
77   share->m_cfn_share = nullptr;
78 
79   share->op = 0;
80 
81 #ifndef DBUG_OFF
82   DBUG_ASSERT(share->m_use_count == 0);
83   share->refs = new Ndb_share_references();
84 #endif
85 
86   share->inplace_alter_new_table_def = nullptr;
87 
88   return share;
89 }
90 
destroy(NDB_SHARE * share)91 void NDB_SHARE::destroy(NDB_SHARE *share) {
92   thr_lock_delete(&share->lock);
93   mysql_mutex_destroy(&share->mutex);
94 
95   // ndb_index_stat_free() should have cleaned up:
96   assert(share->index_stat_list == NULL);
97 
98   teardown_conflict_fn(g_ndb, share->m_cfn_share);
99 
100 #ifndef DBUG_OFF
101   DBUG_ASSERT(share->m_use_count == 0);
102   DBUG_ASSERT(share->refs->check_empty());
103   delete share->refs;
104 #endif
105 
106   // Release memory for the variable length strings held by
107   // key but also referenced by db, table_name and shadow_table->db etc.
108   free_key(share->key);
109   my_free(share);
110 }
111 
112 /*
113   Struct holding dynamic length strings for NDB_SHARE. The type is
114   opaque to the user of NDB_SHARE and should
115   only be accessed using NDB_SHARE accessor functions.
116 
117   All the strings are zero terminated.
118 
119   Layout:
120   size_t key_length
121   "key"\0
122   "db\0"
123   "table_name\0"
124 */
125 struct NDB_SHARE_KEY {
126   size_t m_key_length;
127   char m_buffer[1];
128 };
129 
create_key(const char * new_key)130 NDB_SHARE_KEY *NDB_SHARE::create_key(const char *new_key) {
131   const size_t new_key_length = strlen(new_key);
132 
133   char db_name_buf[FN_HEADLEN];
134   ndb_set_dbname(new_key, db_name_buf);
135   const size_t db_name_len = strlen(db_name_buf);
136 
137   char table_name_buf[FN_HEADLEN];
138   ndb_set_tabname(new_key, table_name_buf);
139   const size_t table_name_len = strlen(table_name_buf);
140 
141   // Calculate total size needed for the variable length strings
142   const size_t size = sizeof(NDB_SHARE_KEY) + new_key_length + db_name_len + 1 +
143                       table_name_len + 1;
144 
145   NDB_SHARE_KEY *allocated_key = (NDB_SHARE_KEY *)my_malloc(
146       PSI_INSTRUMENT_ME, size, MYF(MY_WME | ME_FATALERROR));
147 
148   allocated_key->m_key_length = new_key_length;
149 
150   // Copy key into the buffer
151   char *buf_ptr = allocated_key->m_buffer;
152   my_stpcpy(buf_ptr, new_key);
153   buf_ptr += new_key_length + 1;
154 
155   // Copy db_name into the buffer
156   my_stpcpy(buf_ptr, db_name_buf);
157   buf_ptr += db_name_len + 1;
158 
159   // Copy table_name into the buffer
160   my_stpcpy(buf_ptr, table_name_buf);
161   buf_ptr += table_name_len;
162 
163   // Check that writing has not occurred beyond end of allocated memory
164   assert(buf_ptr < reinterpret_cast<char *>(allocated_key) + size);
165 
166   DBUG_PRINT("info", ("size: %lu", (unsigned long)size));
167   DBUG_PRINT("info",
168              ("new_key: '%s', %lu", new_key, (unsigned long)new_key_length));
169   DBUG_PRINT("info",
170              ("db_name: '%s', %lu", db_name_buf, (unsigned long)db_name_len));
171   DBUG_PRINT("info", ("table_name: '%s', %lu", table_name_buf,
172                       (unsigned long)table_name_len));
173   DBUG_DUMP("NDB_SHARE_KEY: ", (const uchar *)allocated_key->m_buffer, size);
174 
175   return allocated_key;
176 }
177 
free_key(NDB_SHARE_KEY * key)178 void NDB_SHARE::free_key(NDB_SHARE_KEY *key) { my_free(key); }
179 
key_get_key(NDB_SHARE_KEY * key)180 std::string NDB_SHARE::key_get_key(NDB_SHARE_KEY *key) {
181   assert(key->m_key_length == strlen(key->m_buffer));
182   return key->m_buffer;
183 }
184 
key_get_db_name(NDB_SHARE_KEY * key)185 char *NDB_SHARE::key_get_db_name(NDB_SHARE_KEY *key) {
186   char *buf_ptr = key->m_buffer;
187   // Step past the key string and it's zero terminator
188   buf_ptr += key->m_key_length + 1;
189   return buf_ptr;
190 }
191 
key_get_table_name(NDB_SHARE_KEY * key)192 char *NDB_SHARE::key_get_table_name(NDB_SHARE_KEY *key) {
193   char *buf_ptr = key_get_db_name(key);
194   const size_t db_name_len = strlen(buf_ptr);
195   // Step past the db name string and it's zero terminator
196   buf_ptr += db_name_len + 1;
197   return buf_ptr;
198 }
199 
key_length() const200 size_t NDB_SHARE::key_length() const {
201   assert(key->m_key_length == strlen(key->m_buffer));
202   return key->m_key_length;
203 }
204 
key_string() const205 const char *NDB_SHARE::key_string() const {
206   assert(strlen(key->m_buffer) == key->m_key_length);
207   return key->m_buffer;
208 }
209 
share_state_string(void) const210 const char *NDB_SHARE::share_state_string(void) const {
211   switch (state) {
212     case NSS_INITIAL:
213       return "NSS_INITIAL";
214     case NSS_DROPPED:
215       return "NSS_DROPPED";
216   }
217   assert(false);
218   return "<unknown>";
219 }
220 
free_share(NDB_SHARE ** share)221 void NDB_SHARE::free_share(NDB_SHARE **share) {
222   DBUG_TRACE;
223   mysql_mutex_assert_owner(&ndbcluster_mutex);
224 
225   if (!(*share)->decrement_use_count()) {
226     // Noone is using the NDB_SHARE anymore, release it
227     NDB_SHARE::real_free_share(share);
228   }
229 }
230 
create_and_acquire_reference(const char * key,const char * reference)231 NDB_SHARE *NDB_SHARE::create_and_acquire_reference(const char *key,
232                                                    const char *reference) {
233   DBUG_TRACE;
234   DBUG_PRINT("enter", ("key: '%s'", key));
235 
236   mysql_mutex_assert_owner(&ndbcluster_mutex);
237 
238   // Make sure that the SHARE does not already exist
239   DBUG_ASSERT(!acquire_reference_impl(key));
240 
241   NDB_SHARE *share = NDB_SHARE::create(key);
242   if (share == nullptr) {
243     DBUG_PRINT("error", ("failed to create NDB_SHARE"));
244     return nullptr;
245   }
246 
247   // Insert the new share in list of open shares
248   ndbcluster_open_tables->emplace(key, share);
249 
250   // Add share refcount from 'ndbcluster_open_tables'
251   share->increment_use_count();
252   share->refs_insert("ndbcluster_open_tables");
253 
254   // Add refcount for returned 'share'.
255   share->increment_use_count();
256   share->refs_insert(reference);
257 
258   return share;
259 }
260 
create_and_acquire_reference(const char * key,const class ha_ndbcluster * reference)261 NDB_SHARE *NDB_SHARE::create_and_acquire_reference(
262     const char *key, const class ha_ndbcluster *reference) {
263   mysql_mutex_lock(&ndbcluster_mutex);
264 
265   NDB_SHARE *share = NDB_SHARE::create(key);
266   if (share == nullptr)
267     ndb_log_error("failed to create NDB_SHARE for key: %s", key);
268   else {
269     // Insert the new share in list of open shares
270     ndbcluster_open_tables->emplace(key, share);
271 
272     // Add share refcount from 'ndbcluster_open_tables'
273     share->increment_use_count();
274     share->refs_insert("ndbcluster_open_tables");
275 
276     // Add refcount for returned 'share'.
277     share->increment_use_count();
278     share->refs_insert(reference);
279   }
280 
281   mysql_mutex_unlock(&ndbcluster_mutex);
282   return share;
283 }
284 
acquire_for_handler(const char * key,const class ha_ndbcluster * reference)285 NDB_SHARE *NDB_SHARE::acquire_for_handler(
286     const char *key, const class ha_ndbcluster *reference) {
287   DBUG_TRACE;
288 
289   mysql_mutex_lock(&ndbcluster_mutex);
290   NDB_SHARE *share = acquire_reference_impl(key);
291   if (share) {
292     share->refs_insert(reference);
293     DBUG_PRINT("NDB_SHARE",
294                ("'%s' reference: 'ha_ndbcluster(%p)', "
295                 "use_count: %u",
296                 share->key_string(), reference, share->use_count()));
297   }
298   mysql_mutex_unlock(&ndbcluster_mutex);
299 
300   return share;
301 }
302 
release_for_handler(NDB_SHARE * share,const ha_ndbcluster * reference)303 void NDB_SHARE::release_for_handler(NDB_SHARE *share,
304                                     const ha_ndbcluster *reference) {
305   DBUG_TRACE;
306 
307   mysql_mutex_lock(&ndbcluster_mutex);
308 
309   DBUG_PRINT("NDB_SHARE", ("release '%s', reference: 'ha_ndbcluster(%p)', "
310                            "use_count: %u",
311                            share->key_string(), reference, share->use_count()));
312 
313   share->refs_erase(reference);
314   NDB_SHARE::free_share(&share);
315   mysql_mutex_unlock(&ndbcluster_mutex);
316 }
317 
318 /*
319   Acquire another reference using existing share reference.
320 */
321 
acquire_reference_on_existing(NDB_SHARE * share,const char * reference)322 NDB_SHARE *NDB_SHARE::acquire_reference_on_existing(NDB_SHARE *share,
323                                                     const char *reference) {
324   mysql_mutex_lock(&ndbcluster_mutex);
325 
326   // Should already be referenced
327   DBUG_ASSERT(share->use_count() > 0);
328   // Number of references should match use_count
329   DBUG_ASSERT(share->use_count() == share->refs->size());
330 
331   share->increment_use_count();
332   share->refs_insert(reference);
333 
334   DBUG_PRINT("NDB_SHARE", ("'%s', reference: '%s', use_count: %u",
335                            share->key_string(), reference, share->use_count()));
336 
337   mysql_mutex_unlock(&ndbcluster_mutex);
338   return share;
339 }
340 
341 /*
342   Acquire reference using key.
343 */
344 
acquire_reference_by_key(const char * key,const char * reference)345 NDB_SHARE *NDB_SHARE::acquire_reference_by_key(const char *key,
346                                                const char *reference) {
347   mysql_mutex_lock(&ndbcluster_mutex);
348 
349   NDB_SHARE *share = acquire_reference_impl(key);
350   if (share) {
351     share->refs_insert(reference);
352     DBUG_PRINT("NDB_SHARE",
353                ("'%s', reference: '%s', use_count: %u", share->key_string(),
354                 reference, share->use_count()));
355   }
356 
357   mysql_mutex_unlock(&ndbcluster_mutex);
358   return share;
359 }
360 
acquire_reference_by_key_have_lock(const char * key,const char * reference)361 NDB_SHARE *NDB_SHARE::acquire_reference_by_key_have_lock(
362     const char *key, const char *reference) {
363   mysql_mutex_assert_owner(&ndbcluster_mutex);
364 
365   NDB_SHARE *share = acquire_reference_impl(key);
366   if (share) {
367     share->refs_insert(reference);
368     DBUG_PRINT("NDB_SHARE",
369                ("'%s', reference: '%s', use_count: %u", share->key_string(),
370                 reference, share->use_count()));
371   }
372 
373   return share;
374 }
375 
release_reference(NDB_SHARE * share,const char * reference)376 void NDB_SHARE::release_reference(NDB_SHARE *share, const char *reference) {
377   mysql_mutex_lock(&ndbcluster_mutex);
378 
379   DBUG_PRINT("NDB_SHARE", ("release '%s', reference: '%s', use_count: %u",
380                            share->key_string(), reference, share->use_count()));
381 
382   share->refs_erase(reference);
383   NDB_SHARE::free_share(&share);
384 
385   mysql_mutex_unlock(&ndbcluster_mutex);
386 }
387 
release_reference_have_lock(NDB_SHARE * share,const char * reference)388 void NDB_SHARE::release_reference_have_lock(NDB_SHARE *share,
389                                             const char *reference) {
390   mysql_mutex_assert_owner(&ndbcluster_mutex);
391 
392   DBUG_PRINT("NDB_SHARE", ("release '%s', reference: '%s', use_count: %u",
393                            share->key_string(), reference, share->use_count()));
394 
395   share->refs_erase(reference);
396   NDB_SHARE::free_share(&share);
397 }
398 
399 #ifndef DBUG_OFF
400 
check_empty() const401 bool NDB_SHARE::Ndb_share_references::check_empty() const {
402   if (size() == 0) {
403     // There are no references, all OK
404     return true;
405   }
406 
407   ndb_log_error(
408       "Consistency check of NDB_SHARE references failed, the list "
409       "of references should be empty at this time");
410 
411   std::string s;
412   debug_print(s);
413   ndb_log_error("%s", s.c_str());
414   abort();
415   return false;
416 }
417 
debug_print(std::string & out,const char * line_separator) const418 void NDB_SHARE::Ndb_share_references::debug_print(
419     std::string &out, const char *line_separator) const {
420   std::stringstream ss;
421 
422   // Print the handler list
423   {
424     const char *separator = "";
425     ss << "  handlers: " << handlers.size() << " [ ";
426     for (const auto &key : handlers) {
427       ss << separator << "'" << key << "'";
428       separator = ",";
429     }
430     ss << " ]";
431   }
432   ss << ", " << line_separator;
433 
434   // Print the strings list
435   {
436     const char *separator = "";
437     ss << "  strings: " << strings.size() << " [ ";
438     for (const auto &key : strings) {
439       ss << separator << "'" << key.c_str() << "'";
440       separator = ",";
441     }
442     ss << " ]";
443   }
444   ss << ", " << line_separator;
445 
446   out = ss.str();
447 }
448 
449 #endif
450 
debug_print(std::string & out,const char * line_separator) const451 void NDB_SHARE::debug_print(std::string &out,
452                             const char *line_separator) const {
453   std::stringstream ss;
454   ss << "NDB_SHARE { " << line_separator << "  db: '" << db << "',"
455      << line_separator << "  table_name: '" << table_name << "', "
456      << line_separator << "  key: '" << key_string() << "', " << line_separator
457      << "  use_count: " << use_count() << ", " << line_separator
458      << "  state: " << share_state_string() << ", " << line_separator
459      << "  op: " << op << ", " << line_separator;
460 
461 #ifndef DBUG_OFF
462   std::string refs_string;
463   refs->debug_print(refs_string, line_separator);
464   ss << refs_string.c_str();
465 
466   // There should be as many refs as the use_count says
467   DBUG_ASSERT(use_count() == refs->size());
468 #endif
469 
470   ss << "}";
471 
472   out = ss.str();
473 }
474 
debug_print_shares(std::string & out)475 void NDB_SHARE::debug_print_shares(std::string &out) {
476   std::stringstream ss;
477   ss << "ndbcluster_open_tables {"
478      << "\n";
479 
480   for (const auto &key_and_value : *ndbcluster_open_tables) {
481     const NDB_SHARE *share = key_and_value.second;
482     std::string s;
483     share->debug_print(s, "\n");
484     ss << s << "\n";
485   }
486 
487   ss << "}"
488      << "\n";
489 
490   out = ss.str();
491 }
492 
decrement_use_count()493 uint NDB_SHARE::decrement_use_count() {
494   ndbcluster::ndbrequire(m_use_count > 0);
495   return --m_use_count;
496 }
497 
print_remaining_open_tables(void)498 void NDB_SHARE::print_remaining_open_tables(void) {
499   mysql_mutex_lock(&ndbcluster_mutex);
500   if (!ndbcluster_open_tables->empty()) {
501     std::string s;
502     NDB_SHARE::debug_print_shares(s);
503     std::cerr << s << std::endl;
504   }
505   mysql_mutex_unlock(&ndbcluster_mutex);
506 }
507 
rename_share(NDB_SHARE * share,NDB_SHARE_KEY * new_key)508 int NDB_SHARE::rename_share(NDB_SHARE *share, NDB_SHARE_KEY *new_key) {
509   DBUG_TRACE;
510   DBUG_PRINT("enter", ("share->key: '%s'", share->key_string()));
511   DBUG_PRINT("enter",
512              ("new_key: '%s'", NDB_SHARE::key_get_key(new_key).c_str()));
513 
514   mysql_mutex_lock(&ndbcluster_mutex);
515 
516   // Make sure that no NDB_SHARE with new_key already exists
517   if (find_or_nullptr(*ndbcluster_open_tables,
518                       NDB_SHARE::key_get_key(new_key))) {
519     // Dump the list of open NDB_SHARE's since new_key already exists
520     ndb_log_error(
521         "INTERNAL ERROR: Found existing NDB_SHARE for "
522         "new key: '%s' while renaming: '%s'",
523         NDB_SHARE::key_get_key(new_key).c_str(), share->key_string());
524     std::string s;
525     NDB_SHARE::debug_print_shares(s);
526     std::cerr << s << std::endl;
527     abort();
528   }
529 
530   /* Update the share hash key. */
531   NDB_SHARE_KEY *old_key = share->key;
532   share->key = new_key;
533   ndbcluster_open_tables->erase(NDB_SHARE::key_get_key(old_key));
534   ndbcluster_open_tables->emplace(NDB_SHARE::key_get_key(new_key), share);
535 
536   // Make sure that NDB_SHARE with old key does not exist
537   DBUG_ASSERT(find_or_nullptr(*ndbcluster_open_tables,
538                               NDB_SHARE::key_get_key(old_key)) == nullptr);
539   // Make sure that NDB_SHARE with new key does exist
540   DBUG_ASSERT(find_or_nullptr(*ndbcluster_open_tables,
541                               NDB_SHARE::key_get_key(new_key)));
542 
543   DBUG_PRINT("info", ("setting db and table_name to point at new key"));
544   share->db = NDB_SHARE::key_get_db_name(share->key);
545   share->table_name = NDB_SHARE::key_get_table_name(share->key);
546 
547   if (share->op) {
548     Ndb_event_data *event_data =
549         static_cast<Ndb_event_data *>(share->op->getCustomData());
550     if (event_data && event_data->shadow_table) {
551       if (!ndb_name_is_temp(share->table_name)) {
552         DBUG_PRINT("info", ("Renaming shadow table"));
553         // Allocate new strings for db and table_name for shadow_table
554         // in event_data's MEM_ROOT(where the shadow_table itself is allocated)
555         // NOTE! This causes a slight memory leak since the already existing
556         // strings are not release until the mem_root is eventually
557         // released.
558         lex_string_strmake(&event_data->mem_root,
559                            &event_data->shadow_table->s->db, share->db,
560                            strlen(share->db));
561         lex_string_strmake(&event_data->mem_root,
562                            &event_data->shadow_table->s->table_name,
563                            share->table_name, strlen(share->table_name));
564       } else {
565         DBUG_PRINT("info", ("Name is temporary, skip rename of shadow table"));
566         // don't rename the shadow table here, it's used by injector and all
567         // events might not have been processed. It will be dropped anyway
568       }
569     }
570   }
571   mysql_mutex_unlock(&ndbcluster_mutex);
572   return 0;
573 }
574 
575 /**
576   Acquire NDB_SHARE for key
577 
578   Returns share for key, and increases the refcount on the share.
579 
580   @param key      The key for NDB_SHARE to acquire
581 */
582 
acquire_reference_impl(const char * key)583 NDB_SHARE *NDB_SHARE::acquire_reference_impl(const char *key) {
584   DBUG_TRACE;
585   DBUG_PRINT("enter", ("key: '%s'", key));
586 
587   if (DBUG_EVALUATE_IF("ndb_share_acquire_fail1", true, false)) {
588     // Simulate failure to acquire NDB_SHARE
589     return nullptr;
590   }
591 
592   mysql_mutex_assert_owner(&ndbcluster_mutex);
593 
594   auto it = ndbcluster_open_tables->find(key);
595   if (it == ndbcluster_open_tables->end()) {
596     DBUG_PRINT("error", ("%s does not exist", key));
597     return nullptr;
598   }
599 
600   NDB_SHARE *share = it->second;
601 
602   // Add refcount for returned 'share'.
603   share->increment_use_count();
604 
605   return share;
606 }
607 
initialize(CHARSET_INFO * charset)608 void NDB_SHARE::initialize(CHARSET_INFO *charset) {
609   ndbcluster_open_tables.reset(
610       new collation_unordered_map<std::string, NDB_SHARE *>(charset,
611                                                             PSI_INSTRUMENT_ME));
612 }
613 
deinitialize(void)614 void NDB_SHARE::deinitialize(void) {
615   mysql_mutex_lock(&ndbcluster_mutex);
616 
617   // There should not be any NDB_SHARE's left -> crash after logging in debug
618   const class Debug_require {
619     const bool m_required_val;
620 
621    public:
622     Debug_require(bool val) : m_required_val(val) {}
623     ~Debug_require() { DBUG_ASSERT(m_required_val); }
624   } shares_remaining(ndbcluster_open_tables->empty() && dropped_shares.empty());
625 
626   // Drop remaining open shares, drop one NDB_SHARE after the other
627   // until open tables list is empty
628   while (!ndbcluster_open_tables->empty()) {
629     NDB_SHARE *share = ndbcluster_open_tables->begin()->second;
630     ndb_log_error("Still open NDB_SHARE '%s', use_count: %d, state: %s",
631                   share->key_string(), share->use_count(),
632                   share->share_state_string());
633     // If last ref, share is destroyed immediately, else moved to list of
634     // dropped shares
635     NDB_SHARE::mark_share_dropped(&share);
636   }
637 
638   // Release remaining dropped shares, release one NDB_SHARE after the other
639   // until dropped list is empty
640   while (!dropped_shares.empty()) {
641     NDB_SHARE *share = *dropped_shares.begin();
642     ndb_log_error("Not freed NDB_SHARE '%s', use_count: %d, state: %s",
643                   share->key_string(), share->use_count(),
644                   share->share_state_string());
645     NDB_SHARE::real_free_share(&share);
646   }
647 
648   mysql_mutex_unlock(&ndbcluster_mutex);
649 }
650 
release_extra_share_references(void)651 void NDB_SHARE::release_extra_share_references(void) {
652   mysql_mutex_lock(&ndbcluster_mutex);
653   while (!ndbcluster_open_tables->empty()) {
654     NDB_SHARE *share = ndbcluster_open_tables->begin()->second;
655     /*
656       The share kept by the server has not been freed, free it
657       Will also take it out of _open_tables list
658     */
659     DBUG_ASSERT(share->use_count() > 0);
660     DBUG_ASSERT(share->state != NSS_DROPPED);
661     NDB_SHARE::mark_share_dropped(&share);
662   }
663   mysql_mutex_unlock(&ndbcluster_mutex);
664 }
665 
real_free_share(NDB_SHARE ** share_ptr)666 void NDB_SHARE::real_free_share(NDB_SHARE **share_ptr) {
667   NDB_SHARE *share = *share_ptr;
668   DBUG_TRACE;
669   mysql_mutex_assert_owner(&ndbcluster_mutex);
670 
671   // Share must already be marked as dropped
672   ndbcluster::ndbrequire(share->state == NSS_DROPPED);
673 
674   // Share must be in dropped list
675   ndbcluster::ndbrequire(dropped_shares.find(share) != dropped_shares.end());
676 
677   // Remove share from dropped list
678   ndbcluster::ndbrequire(dropped_shares.erase(share));
679 
680   // Remove shares reference from 'dropped_shares'
681   share->refs_erase("dropped_shares");
682 
683   NDB_SHARE::destroy(share);
684 }
685 
686 extern void ndb_index_stat_free(NDB_SHARE *);
687 
mark_share_dropped(NDB_SHARE ** share_ptr)688 void NDB_SHARE::mark_share_dropped(NDB_SHARE **share_ptr) {
689   NDB_SHARE *share = *share_ptr;
690   DBUG_TRACE;
691   mysql_mutex_assert_owner(&ndbcluster_mutex);
692 
693   // The NDB_SHARE should not have any event operations, those
694   // should have been removed already _before_ marking the NDB_SHARE
695   // as dropped.
696   DBUG_ASSERT(share->op == nullptr);
697 
698   if (share->state == NSS_DROPPED) {
699     // The NDB_SHARE was already marked as dropped
700     return;
701   }
702 
703   // The index_stat is not needed anymore, free it.
704   ndb_index_stat_free(share);
705 
706   // Mark share as dropped
707   share->state = NSS_DROPPED;
708 
709   // Remove share from list of open shares
710   ndbcluster::ndbrequire(ndbcluster_open_tables->erase(share->key_string()));
711 
712   // Remove reference from list of open shares and decrement use count
713   share->refs_erase("ndbcluster_open_tables");
714   share->decrement_use_count();
715 
716   // Destroy the NDB_SHARE if noone is using it, this is normally a special
717   // case for shutdown code path. In all other cases the caller will hold
718   // reference to the share.
719   if (share->use_count() == 0) {
720     NDB_SHARE::destroy(share);
721     return;
722   }
723 
724   // Someone is still using the NDB_SHARE, insert it into the list of dropped
725   // to keep track of it until all references has been released
726   dropped_shares.emplace(share);
727 
728 #ifndef DBUG_OFF
729   std::string s;
730   share->debug_print(s, "\n");
731   std::cerr << "dropped_share: " << s << std::endl;
732 #endif
733 
734   // Share is referenced by 'dropped_shares'
735   share->refs_insert("dropped_shares");
736   // NOTE! The refcount has not been incremented
737 }
738 
739 #ifndef DBUG_OFF
dbg_check_shares_update()740 void NDB_SHARE::dbg_check_shares_update() {
741   ndb_log_info("dbug_check_shares open:");
742   for (const auto &key_and_value : *ndbcluster_open_tables) {
743     const NDB_SHARE *share = key_and_value.second;
744     ndb_log_info("  %s.%s: state: %s(%u) use_count: %u", share->db,
745                  share->table_name, share->share_state_string(),
746                  (unsigned)share->state, share->use_count());
747     assert(share->state != NSS_DROPPED);
748   }
749 
750   ndb_log_info("dbug_check_shares dropped:");
751   for (const NDB_SHARE *share : dropped_shares) {
752     ndb_log_info("  %s.%s: state: %s(%u) use_count: %u", share->db,
753                  share->table_name, share->share_state_string(),
754                  (unsigned)share->state, share->use_count());
755     assert(share->state == NSS_DROPPED);
756   }
757 
758   /**
759    * Only shares in mysql database may be open...
760    */
761   for (const auto &key_and_value : *ndbcluster_open_tables) {
762     const NDB_SHARE *share = key_and_value.second;
763     assert(strcmp(share->db, "mysql") == 0);
764   }
765 
766   /**
767    * Only shares in mysql database may be in dropped list...
768    */
769   for (const NDB_SHARE *share : dropped_shares) {
770     assert(strcmp(share->db, "mysql") == 0);
771   }
772 }
773 #endif
774