1 /*
2    Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "storage/ndb/plugin/ndb_ndbapi_util.h"
26 
27 #include <string.h>  // memcpy
28 
29 #include "my_byteorder.h"
30 #include "my_dbug.h"
31 #include "storage/ndb/plugin/ndb_name_util.h"  // ndb_name_is_temp
32 
ndb_pack_varchar(const NdbDictionary::Table * ndbtab,unsigned column_index,char (& buf)[512],const char * str,size_t sz)33 void ndb_pack_varchar(const NdbDictionary::Table *ndbtab, unsigned column_index,
34                       char (&buf)[512], const char *str, size_t sz) {
35   // Get the column, cast to int to help compiler choose
36   // the "const int" overload rather than "const char*"
37   const NdbDictionary::Column *col =
38       ndbtab->getColumn(static_cast<int>(column_index));
39 
40   assert(col->getLength() <= (int)sizeof(buf));
41 
42   switch (col->getArrayType()) {
43     case NdbDictionary::Column::ArrayTypeFixed:
44       memcpy(buf, str, sz);
45       break;
46     case NdbDictionary::Column::ArrayTypeShortVar:
47       *(uchar *)buf = (uchar)sz;
48       memcpy(buf + 1, str, sz);
49       break;
50     case NdbDictionary::Column::ArrayTypeMediumVar:
51       int2store(buf, (uint16)sz);
52       memcpy(buf + 2, str, sz);
53       break;
54   }
55 }
56 
ndb_pack_varchar(const NdbDictionary::Column * col,size_t offset,const char * str,size_t str_length,char * buf)57 void ndb_pack_varchar(const NdbDictionary::Column *col, size_t offset,
58                       const char *str, size_t str_length, char *buf) {
59   buf += offset;
60   switch (col->getArrayType()) {
61     case NdbDictionary::Column::ArrayTypeFixed:
62       memcpy(buf, str, str_length);
63       break;
64     case NdbDictionary::Column::ArrayTypeShortVar:
65       *(uchar *)buf = (uchar)str_length;
66       memcpy(buf + 1, str, str_length);
67       break;
68     case NdbDictionary::Column::ArrayTypeMediumVar:
69       int2store(buf, (uint16)str_length);
70       memcpy(buf + 2, str, str_length);
71       break;
72   }
73 }
74 
ndb_unpack_varchar(const NdbDictionary::Column * col,size_t offset,const char ** str,size_t * str_length,const char * buf)75 void ndb_unpack_varchar(const NdbDictionary::Column *col, size_t offset,
76                         const char **str, size_t *str_length, const char *buf) {
77   buf += offset;
78 
79   switch (col->getArrayType()) {
80     case NdbDictionary::Column::ArrayTypeFixed:
81       *str_length = col->getLength();
82       *str = buf;
83       break;
84     case NdbDictionary::Column::ArrayTypeShortVar: {
85       const unsigned char len1byte = static_cast<unsigned char>(buf[0]);
86       *str_length = len1byte;
87       *str = buf + 1;
88     } break;
89     case NdbDictionary::Column::ArrayTypeMediumVar: {
90       const unsigned short len2byte = uint2korr(buf);
91       *str_length = len2byte;
92       *str = buf + 2;
93     } break;
94   }
95 }
96 
ndb_get_extra_metadata_version(const NdbDictionary::Table * ndbtab)97 Uint32 ndb_get_extra_metadata_version(const NdbDictionary::Table *ndbtab) {
98   DBUG_TRACE;
99 
100   Uint32 version;
101   void *unpacked_data;
102   Uint32 unpacked_length;
103   const int get_result =
104       ndbtab->getExtraMetadata(version, &unpacked_data, &unpacked_length);
105   if (get_result != 0) {
106     // Could not get extra metadata, return 0
107     return 0;
108   }
109 
110   free(unpacked_data);
111 
112   return version;
113 }
114 
ndb_table_get_serialized_metadata(const NdbDictionary::Table * ndbtab,std::string & serialized_metadata)115 bool ndb_table_get_serialized_metadata(const NdbDictionary::Table *ndbtab,
116                                        std::string &serialized_metadata) {
117   Uint32 version;
118   void *unpacked_data;
119   Uint32 unpacked_len;
120   const int get_result =
121       ndbtab->getExtraMetadata(version, &unpacked_data, &unpacked_len);
122   if (get_result != 0) return false;
123 
124   if (version != 2) {
125     free(unpacked_data);
126     return false;
127   }
128 
129   serialized_metadata.assign(static_cast<const char *>(unpacked_data),
130                              unpacked_len);
131   free(unpacked_data);
132   return true;
133 }
134 
ndb_table_has_blobs(const NdbDictionary::Table * ndbtab)135 bool ndb_table_has_blobs(const NdbDictionary::Table *ndbtab) {
136   const int num_columns = ndbtab->getNoOfColumns();
137   for (int i = 0; i < num_columns; i++) {
138     const NdbDictionary::Column::Type column_type =
139         ndbtab->getColumn(i)->getType();
140     if (column_type == NdbDictionary::Column::Blob ||
141         column_type == NdbDictionary::Column::Text) {
142       // Found at least one blob column, the table has blobs
143       return true;
144     }
145   }
146   return false;
147 }
148 
ndb_table_has_hidden_pk(const NdbDictionary::Table * ndbtab)149 bool ndb_table_has_hidden_pk(const NdbDictionary::Table *ndbtab) {
150   const char *hidden_pk_name = "$PK";
151   if (ndbtab->getNoOfPrimaryKeys() == 1) {
152     const NdbDictionary::Column *ndbcol = ndbtab->getColumn(hidden_pk_name);
153     if (ndbcol && ndbcol->getType() == NdbDictionary::Column::Bigunsigned &&
154         ndbcol->getLength() == 1 && ndbcol->getNullable() == false &&
155         ndbcol->getPrimaryKey() == true && ndbcol->getAutoIncrement() == true &&
156         ndbcol->getDefaultValue() == nullptr) {
157       return true;
158     }
159   }
160   return false;
161 }
162 
ndb_table_has_tablespace(const NdbDictionary::Table * ndbtab)163 bool ndb_table_has_tablespace(const NdbDictionary::Table *ndbtab) {
164   // NOTE! There is a slight ambiguity in the NdbDictionary::Table.
165   // Depending on wheter it has been retrieved from NDB or created
166   // by user as part of defining a new table in NDB, different methods
167   // need to be used for determining if table has tablespace
168 
169   if (ndb_table_tablespace_name(ndbtab) != nullptr) {
170     // Has tablespace
171     return true;
172   }
173 
174   if (ndbtab->getTablespace()) {
175     // Retrieved from NDB, the tablespace id and version
176     // are avaliable in the table definition -> has tablespace.
177     // NOTE! Fetching the name would require another roundtrip to NDB
178     return true;
179   }
180 
181   // Neither name or id of tablespace is set -> no tablespace
182   return false;
183 }
184 
ndb_table_tablespace_name(const NdbDictionary::Table * ndbtab)185 const char *ndb_table_tablespace_name(const NdbDictionary::Table *ndbtab) {
186   // NOTE! The getTablespaceName() returns zero length string
187   // to indicate no tablespace
188   const char *tablespace_name = ndbtab->getTablespaceName();
189   if (strlen(tablespace_name) == 0) {
190     // Just the zero length name, no tablespace name
191     return nullptr;
192   }
193   return tablespace_name;
194 }
195 
ndb_table_tablespace_name(NdbDictionary::Dictionary * dict,const NdbDictionary::Table * ndbtab)196 std::string ndb_table_tablespace_name(NdbDictionary::Dictionary *dict,
197                                       const NdbDictionary::Table *ndbtab) {
198   // NOTE! The getTablespaceName() returns zero length string
199   // to indicate no tablespace
200   std::string tablespace_name = ndbtab->getTablespaceName();
201   if (tablespace_name.empty()) {
202     // Just the zero length name, no tablespace name
203     // Try and retrieve it using the id as a fallback mechanism
204     Uint32 tablespace_id;
205     if (ndbtab->getTablespace(&tablespace_id)) {
206       const NdbDictionary::Tablespace ts = dict->getTablespace(tablespace_id);
207       if (!ndb_dict_check_NDB_error(dict)) {
208         tablespace_name = ts.getName();
209       }
210     }
211   }
212   return tablespace_name;
213 }
214 
ndb_dict_check_NDB_error(NdbDictionary::Dictionary * dict)215 bool ndb_dict_check_NDB_error(NdbDictionary::Dictionary *dict) {
216   return (dict->getNdbError().code != 0);
217 }
218 
ndb_get_logfile_group_names(const NdbDictionary::Dictionary * dict,std::unordered_set<std::string> & lfg_names)219 bool ndb_get_logfile_group_names(const NdbDictionary::Dictionary *dict,
220                                  std::unordered_set<std::string> &lfg_names) {
221   NdbDictionary::Dictionary::List lfg_list;
222   if (dict->listObjects(lfg_list, NdbDictionary::Object::LogfileGroup) != 0) {
223     return false;
224   }
225 
226   for (uint i = 0; i < lfg_list.count; i++) {
227     NdbDictionary::Dictionary::List::Element &elmt = lfg_list.elements[i];
228     lfg_names.insert(elmt.name);
229   }
230   return true;
231 }
232 
ndb_get_tablespace_names(const NdbDictionary::Dictionary * dict,std::unordered_set<std::string> & tablespace_names)233 bool ndb_get_tablespace_names(
234     const NdbDictionary::Dictionary *dict,
235     std::unordered_set<std::string> &tablespace_names) {
236   NdbDictionary::Dictionary::List tablespace_list;
237   if (dict->listObjects(tablespace_list, NdbDictionary::Object::Tablespace) !=
238       0) {
239     return false;
240   }
241 
242   for (uint i = 0; i < tablespace_list.count; i++) {
243     NdbDictionary::Dictionary::List::Element &elmt =
244         tablespace_list.elements[i];
245     tablespace_names.insert(elmt.name);
246   }
247   return true;
248 }
249 
ndb_get_table_names_in_schema(const NdbDictionary::Dictionary * dict,const std::string & schema_name,std::unordered_set<std::string> * table_names,bool skip_util_tables)250 bool ndb_get_table_names_in_schema(const NdbDictionary::Dictionary *dict,
251                                    const std::string &schema_name,
252                                    std::unordered_set<std::string> *table_names,
253                                    bool skip_util_tables) {
254   NdbDictionary::Dictionary::List list;
255   if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) {
256     return false;
257   }
258 
259   for (uint i = 0; i < list.count; i++) {
260     NdbDictionary::Dictionary::List::Element &elmt = list.elements[i];
261 
262     if (schema_name != elmt.database) {
263       continue;
264     }
265 
266     if (ndb_name_is_temp(elmt.name) || ndb_name_is_blob_prefix(elmt.name) ||
267         ndb_name_is_index_stat(elmt.name)) {
268       continue;
269     }
270 
271     if (skip_util_tables && schema_name == "mysql" &&
272         (strcmp(elmt.name, "ndb_schema") == 0 ||
273          strcmp(elmt.name, "ndb_schema_result") == 0 ||
274          strcmp(elmt.name, "ndb_sql_metadata") == 0)) {
275       // Skip NDB utility tables. These tables and marked as hidden in the DD
276       // and are handled specifically by the binlog thread
277       continue;
278     }
279 
280     if (elmt.state == NdbDictionary::Object::StateOnline ||
281         elmt.state == NdbDictionary::Object::ObsoleteStateBackup ||
282         elmt.state == NdbDictionary::Object::StateBuilding) {
283       // Only return the table if they're already usable i.e. StateOnline or
284       // StateBackup or if they're expected to be usable soon which is denoted
285       // by StateBuilding
286       table_names->insert(elmt.name);
287     }
288   }
289   return true;
290 }
291 
ndb_get_undofile_names(NdbDictionary::Dictionary * dict,const std::string & logfile_group_name,std::vector<std::string> * undofile_names)292 bool ndb_get_undofile_names(NdbDictionary::Dictionary *dict,
293                             const std::string &logfile_group_name,
294                             std::vector<std::string> *undofile_names) {
295   NdbDictionary::Dictionary::List undofile_list;
296   if (dict->listObjects(undofile_list, NdbDictionary::Object::Undofile) != 0) {
297     return false;
298   }
299 
300   for (uint i = 0; i < undofile_list.count; i++) {
301     NdbDictionary::Dictionary::List::Element &elmt = undofile_list.elements[i];
302     NdbDictionary::Undofile uf = dict->getUndofile(-1, elmt.name);
303     if (logfile_group_name.compare(uf.getLogfileGroup()) == 0) {
304       undofile_names->push_back(elmt.name);
305     }
306   }
307   return true;
308 }
309 
ndb_get_datafile_names(NdbDictionary::Dictionary * dict,const std::string & tablespace_name,std::vector<std::string> * datafile_names)310 bool ndb_get_datafile_names(NdbDictionary::Dictionary *dict,
311                             const std::string &tablespace_name,
312                             std::vector<std::string> *datafile_names) {
313   NdbDictionary::Dictionary::List datafile_list;
314   if (dict->listObjects(datafile_list, NdbDictionary::Object::Datafile) != 0) {
315     return false;
316   }
317 
318   for (uint i = 0; i < datafile_list.count; i++) {
319     NdbDictionary::Dictionary::List::Element &elmt = datafile_list.elements[i];
320     NdbDictionary::Datafile df = dict->getDatafile(-1, elmt.name);
321     if (tablespace_name.compare(df.getTablespace()) == 0) {
322       datafile_names->push_back(elmt.name);
323     }
324   }
325   return true;
326 }
327 
ndb_get_database_names_in_dictionary(NdbDictionary::Dictionary * dict,std::unordered_set<std::string> * database_names)328 bool ndb_get_database_names_in_dictionary(
329     NdbDictionary::Dictionary *dict,
330     std::unordered_set<std::string> *database_names) {
331   DBUG_TRACE;
332 
333   /* Get all the list of tables from NDB and read the database names */
334   NdbDictionary::Dictionary::List list;
335   if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
336     return false;
337 
338   for (uint i = 0; i < list.count; i++) {
339     NdbDictionary::Dictionary::List::Element &elmt = list.elements[i];
340 
341     /* Skip the table if it is not in an expected state
342        or if it is a temporary or blob table.*/
343     if ((elmt.state != NdbDictionary::Object::StateOnline &&
344          elmt.state != NdbDictionary::Object::StateBuilding) ||
345         ndb_name_is_temp(elmt.name) || ndb_name_is_blob_prefix(elmt.name) ||
346         ndb_name_is_fk_mock_prefix(elmt.name)) {
347       DBUG_PRINT("debug", ("Skipping table %s.%s", elmt.database, elmt.name));
348       continue;
349     }
350     DBUG_PRINT("debug", ("Found %s.%s in NDB", elmt.database, elmt.name));
351 
352     database_names->insert(elmt.database);
353   }
354   return true;
355 }
356 
ndb_database_exists(NdbDictionary::Dictionary * dict,const std::string & database_name,bool & exists)357 bool ndb_database_exists(NdbDictionary::Dictionary *dict,
358                          const std::string &database_name, bool &exists) {
359   // Get list of tables from NDB and read database names
360   NdbDictionary::Dictionary::List list;
361   if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
362     return false;
363   for (uint i = 0; i < list.count; i++) {
364     NdbDictionary::Dictionary::List::Element &elmt = list.elements[i];
365     // Skip the table if it is not in an expected state or if it is a temporary
366     // or blob table
367     if ((elmt.state != NdbDictionary::Object::StateOnline &&
368          elmt.state != NdbDictionary::Object::StateBuilding) ||
369         ndb_name_is_temp(elmt.name) || ndb_name_is_blob_prefix(elmt.name) ||
370         ndb_name_is_fk_mock_prefix(elmt.name)) {
371       continue;
372     }
373     if (database_name == elmt.database) {
374       exists = true;
375       return true;
376     }
377   }
378   exists = false;
379   return true;
380 }
381 
ndb_logfile_group_exists(NdbDictionary::Dictionary * dict,const std::string & logfile_group_name,bool & exists)382 bool ndb_logfile_group_exists(NdbDictionary::Dictionary *dict,
383                               const std::string &logfile_group_name,
384                               bool &exists) {
385   NdbDictionary::LogfileGroup lfg =
386       dict->getLogfileGroup(logfile_group_name.c_str());
387   const int dict_error_code = dict->getNdbError().code;
388   if (dict_error_code == 0) {
389     exists = true;
390     return true;
391   }
392   if (dict_error_code == 723) {
393     exists = false;
394     return true;
395   }
396   return false;
397 }
398 
ndb_tablespace_exists(NdbDictionary::Dictionary * dict,const std::string & tablespace_name,bool & exists)399 bool ndb_tablespace_exists(NdbDictionary::Dictionary *dict,
400                            const std::string &tablespace_name, bool &exists) {
401   NdbDictionary::Tablespace tablespace =
402       dict->getTablespace(tablespace_name.c_str());
403   const int dict_error_code = dict->getNdbError().code;
404   if (dict_error_code == 0) {
405     exists = true;
406     return true;
407   }
408   if (dict_error_code == 723) {
409     exists = false;
410     return true;
411   }
412   return false;
413 }
414 
ndb_table_exists(NdbDictionary::Dictionary * dict,const std::string & db_name,const std::string & table_name,bool & exists)415 bool ndb_table_exists(NdbDictionary::Dictionary *dict,
416                       const std::string &db_name, const std::string &table_name,
417                       bool &exists) {
418   NdbDictionary::Dictionary::List list;
419   if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) {
420     // List objects failed
421     return false;
422   }
423   for (unsigned int i = 0; i < list.count; i++) {
424     NdbDictionary::Dictionary::List::Element &elmt = list.elements[i];
425     if (db_name == elmt.database && table_name == elmt.name &&
426         (elmt.state == NdbDictionary::Object::StateOnline ||
427          elmt.state == NdbDictionary::Object::ObsoleteStateBackup ||
428          elmt.state == NdbDictionary::Object::StateBuilding)) {
429       exists = true;
430       return true;
431     }
432   }
433   exists = false;
434   return true;
435 }
436 
ndb_get_logfile_group_id_and_version(NdbDictionary::Dictionary * dict,const std::string & logfile_group_name,int & id,int & version)437 bool ndb_get_logfile_group_id_and_version(NdbDictionary::Dictionary *dict,
438                                           const std::string &logfile_group_name,
439                                           int &id, int &version) {
440   NdbDictionary::LogfileGroup lfg =
441       dict->getLogfileGroup(logfile_group_name.c_str());
442   if (dict->getNdbError().code != 0) {
443     return false;
444   }
445   id = lfg.getObjectId();
446   version = lfg.getObjectVersion();
447   return true;
448 }
449 
ndb_get_tablespace_id_and_version(NdbDictionary::Dictionary * dict,const std::string & tablespace_name,int & id,int & version)450 bool ndb_get_tablespace_id_and_version(NdbDictionary::Dictionary *dict,
451                                        const std::string &tablespace_name,
452                                        int &id, int &version) {
453   NdbDictionary::Tablespace ts = dict->getTablespace(tablespace_name.c_str());
454   if (dict->getNdbError().code != 0) {
455     return false;
456   }
457   id = ts.getObjectId();
458   version = ts.getObjectVersion();
459   return true;
460 }
461 
ndb_table_index_count(const NdbDictionary::Dictionary * dict,const NdbDictionary::Table * ndbtab,unsigned int & index_count)462 bool ndb_table_index_count(const NdbDictionary::Dictionary *dict,
463                            const NdbDictionary::Table *ndbtab,
464                            unsigned int &index_count) {
465   NdbDictionary::Dictionary::List list;
466   if (dict->listIndexes(list, *ndbtab) != 0) {
467     // List indexes failed
468     return false;
469   }
470   // Separate indexes into ordered and unique indexes
471   std::unordered_set<std::string> ordered_indexes;
472   std::unordered_set<std::string> unique_indexes;
473   for (uint i = 0; i < list.count; i++) {
474     NdbDictionary::Dictionary::List::Element &elmt = list.elements[i];
475     switch (elmt.type) {
476       case NdbDictionary::Object::UniqueHashIndex:
477         unique_indexes.insert(elmt.name);
478         break;
479       case NdbDictionary::Object::OrderedIndex:
480         ordered_indexes.insert(elmt.name);
481         break;
482       default:
483         // Unexpected object type
484         return false;
485     }
486   }
487   index_count = ordered_indexes.size();
488   // Iterate through the ordered indexes and check if any of them are
489   // "companion" ordered indexes. This is required since creating a unique key
490   // leads to 2 indexes being created - a unique hash index (of the form
491   // <index_name>$unique) and a companion ordered index. Note that this is not
492   // the case for hash based unique indexes which have no companion ordered
493   // index
494   for (auto &ordered_index : ordered_indexes) {
495     const std::string unique_index = ordered_index + "$unique";
496     if (unique_indexes.find(unique_index) != unique_indexes.end()) {
497       index_count--;
498     }
499   }
500   index_count += unique_indexes.size();
501   return true;
502 }
503