1 /*
2    Copyright (c) 2011, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 // Implements the functions declared in ndb_schema_object.h
26 #include "storage/ndb/plugin/ndb_schema_object.h"
27 
28 #include <atomic>
29 #include <mutex>
30 #include <sstream>
31 #include <string>
32 #include <unordered_map>
33 
34 #include "storage/ndb/plugin/ndb_bitmap.h"
35 #include "storage/ndb/plugin/ndb_require.h"
36 
37 // List keeping track of active NDB_SCHEMA_OBJECTs. The list is used
38 // by the schema distribution coordinator to find the correct NDB_SCHEMA_OBJECT
39 // in order to communicate with the schema dist client.
40 class Ndb_schema_objects {
41  public:
42   // Nodeid of this node
43   uint32 m_own_nodeid{0};
44 
45   // Mutex protecting the unordered map
46   std::mutex m_lock;
47   std::unordered_map<std::string, NDB_SCHEMA_OBJECT *> m_hash;
Ndb_schema_objects()48   Ndb_schema_objects() {}
49 
find(std::string key) const50   NDB_SCHEMA_OBJECT *find(std::string key) const {
51     const auto it = m_hash.find(key);
52     if (it == m_hash.end()) return nullptr;
53     return it->second;
54   }
55 
56   /**
57      @brief Find NDB_SCHEMA_OBJECT with corresponding nodeid and schema_op_id
58 
59      @param nodeid Nodeid to find
60      @param schema_op_id Schema operation id to find
61      @return Pointer to NDB_SCHEMA_OBJECT or nullptr
62 
63      @note Searches by iterating over the list until an entry is found. This is
64      ok as normally only one schema operation at a time is supported and thus
65      there is only one entry in the hash.
66    */
find(uint32 nodeid,uint32 schema_op_id) const67   NDB_SCHEMA_OBJECT *find(uint32 nodeid, uint32 schema_op_id) const {
68     DBUG_ASSERT(nodeid);
69 
70     // Make sure that own nodeid has been set
71     DBUG_ASSERT(m_own_nodeid);
72 
73     if (nodeid != m_own_nodeid) {
74       // Looking for a schema operation started in another node,  the
75       // schema_op_id is only valid in the node which started
76       return nullptr;
77     }
78 
79     for (const auto &entry : m_hash) {
80       NDB_SCHEMA_OBJECT *schema_object = entry.second;
81       if (schema_object->schema_op_id() == schema_op_id) return schema_object;
82     }
83     return nullptr;
84   }
85 } active_schema_clients;
86 
init(uint32 nodeid)87 void NDB_SCHEMA_OBJECT::init(uint32 nodeid) {
88   DBUG_ASSERT(nodeid);
89   std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
90   // Make sure that no active schema clients exist when function is called
91   DBUG_ASSERT(active_schema_clients.m_hash.size() == 0);
92   active_schema_clients.m_own_nodeid = nodeid;
93 }
94 
get_schema_op_ids(std::vector<uint32> & ids)95 void NDB_SCHEMA_OBJECT::get_schema_op_ids(std::vector<uint32> &ids) {
96   std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
97   for (const auto &entry : active_schema_clients.m_hash) {
98     NDB_SCHEMA_OBJECT *schema_object = entry.second;
99     ids.push_back(schema_object->schema_op_id());
100   }
101 }
102 
next_schema_op_id()103 static uint32 next_schema_op_id() {
104   static std::atomic<uint32> schema_op_id_sequence{1};
105   uint32 id = schema_op_id_sequence++;
106   // Handle wraparound
107   if (id == 0) {
108     id = schema_op_id_sequence++;
109   }
110   DBUG_ASSERT(id != 0);
111   return id;
112 }
113 
decremement_use_count() const114 uint NDB_SCHEMA_OBJECT::decremement_use_count() const {
115   std::lock_guard<std::mutex> lock_state(state.m_lock);
116   ndbcluster::ndbrequire(state.m_use_count > 0);
117   state.m_use_count--;
118   DBUG_PRINT("info", ("use_count: %d", state.m_use_count));
119   return state.m_use_count;
120 }
121 
increment_use_count() const122 uint NDB_SCHEMA_OBJECT::increment_use_count() const {
123   std::lock_guard<std::mutex> lock_state(state.m_lock);
124   state.m_use_count++;
125   DBUG_PRINT("info", ("use_count: %d", state.m_use_count));
126   return state.m_use_count;
127 }
128 
NDB_SCHEMA_OBJECT(const char * key,const char * db,const char * name,uint32 id,uint32 version)129 NDB_SCHEMA_OBJECT::NDB_SCHEMA_OBJECT(const char *key, const char *db,
130                                      const char *name, uint32 id,
131                                      uint32 version)
132     : m_key(key),
133       m_db(db),
134       m_name(name),
135       m_id(id),
136       m_version(version),
137       m_schema_op_id(next_schema_op_id()),
138       m_started(std::chrono::steady_clock::now()) {}
139 
~NDB_SCHEMA_OBJECT()140 NDB_SCHEMA_OBJECT::~NDB_SCHEMA_OBJECT() {
141   DBUG_ASSERT(state.m_use_count == 0);
142   // Check that all participants have completed
143   DBUG_ASSERT(state.m_participants.size() == count_completed_participants());
144   // Coordinator should have completed
145   DBUG_ASSERT(state.m_coordinator_completed);
146 }
147 
get(const char * db,const char * table_name,uint32 id,uint32 version,bool create)148 NDB_SCHEMA_OBJECT *NDB_SCHEMA_OBJECT::get(const char *db,
149                                           const char *table_name, uint32 id,
150                                           uint32 version, bool create) {
151   DBUG_TRACE;
152   DBUG_PRINT("enter", ("db: '%s', table_name: '%s', id: %u, version: %u", db,
153                        table_name, id, version));
154 
155   // Build a key on the form "./<db>/<name>_<id>_<version>"
156   const std::string key = std::string("./") + db + "/" + table_name + "_" +
157                           std::to_string(id) + "_" + std::to_string(version);
158   DBUG_PRINT("info", ("key: '%s'", key.c_str()));
159 
160   std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
161 
162   NDB_SCHEMA_OBJECT *ndb_schema_object = active_schema_clients.find(key);
163   if (ndb_schema_object) {
164     // Don't allow reuse of existing NDB_SCHEMA_OBJECT when requesting to
165     // create, only the Ndb_schema_dist_client will create NDB_SCHEMA_OBJECT
166     // and it should wait until previous schema operation with
167     // same key has completed.
168     ndbcluster::ndbrequire(!create);
169 
170     (void)ndb_schema_object->increment_use_count();
171     return ndb_schema_object;
172   }
173 
174   if (!create) {
175     DBUG_PRINT("info", ("does not exist"));
176     return nullptr;
177   }
178 
179   ndb_schema_object = new (std::nothrow)
180       NDB_SCHEMA_OBJECT(key.c_str(), db, table_name, id, version);
181   if (!ndb_schema_object) {
182     DBUG_PRINT("info", ("failed to allocate"));
183     return nullptr;
184   }
185 
186   // Add to list of NDB_SCHEMA_OBJECTs
187   active_schema_clients.m_hash.emplace(key, ndb_schema_object);
188   return ndb_schema_object;
189 }
190 
get(uint32 nodeid,uint32 schema_op_id)191 NDB_SCHEMA_OBJECT *NDB_SCHEMA_OBJECT::get(uint32 nodeid, uint32 schema_op_id) {
192   DBUG_TRACE;
193   DBUG_PRINT("enter", ("nodeid: %d, schema_op_id: %u", nodeid, schema_op_id));
194 
195   std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
196 
197   NDB_SCHEMA_OBJECT *ndb_schema_object =
198       active_schema_clients.find(nodeid, schema_op_id);
199   if (ndb_schema_object) {
200     (void)ndb_schema_object->increment_use_count();
201     return ndb_schema_object;
202   }
203 
204   DBUG_PRINT("info", ("No NDB_SCHEMA_OBJECT found"));
205   return nullptr;
206 }
207 
get(NDB_SCHEMA_OBJECT * schema_object)208 NDB_SCHEMA_OBJECT *NDB_SCHEMA_OBJECT::get(NDB_SCHEMA_OBJECT *schema_object) {
209   DBUG_TRACE;
210   DBUG_PRINT("enter", ("schema_object: %p", schema_object));
211 
212   ndbcluster::ndbrequire(schema_object);
213 
214   const uint use_count = schema_object->increment_use_count();
215   // Should already have been used before calling this function
216   ndbcluster::ndbrequire(use_count > 1);
217 
218   return schema_object;
219 }
220 
release(NDB_SCHEMA_OBJECT * ndb_schema_object)221 void NDB_SCHEMA_OBJECT::release(NDB_SCHEMA_OBJECT *ndb_schema_object) {
222   DBUG_TRACE;
223   DBUG_PRINT("enter", ("key: '%s'", ndb_schema_object->m_key.c_str()));
224 
225   const uint use_count = ndb_schema_object->decremement_use_count();
226   if (use_count != 0) {
227     // Not the last user
228     if (use_count == 1) {
229       // Only one user left, must be the Client, signal it to wakeup
230       ndb_schema_object->state.m_cond.notify_one();
231     }
232     return;
233   }
234 
235   // Last user, remove from list of NDB_SCHEMA_OBJECTS and delete instance
236   std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
237   active_schema_clients.m_hash.erase(ndb_schema_object->m_key);
238   delete ndb_schema_object;
239 }
240 
count_active_schema_ops()241 size_t NDB_SCHEMA_OBJECT::count_active_schema_ops() {
242   std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
243   return active_schema_clients.m_hash.size();
244 }
245 
waiting_participants_to_string() const246 std::string NDB_SCHEMA_OBJECT::waiting_participants_to_string() const {
247   std::lock_guard<std::mutex> lock_state(state.m_lock);
248   const char *separator = "";
249   std::string participants("[");
250   for (const auto &it : state.m_participants) {
251     if (it.second.m_completed == true) continue;  // Don't show completed
252     participants.append(separator).append(std::to_string(it.first));
253     separator = ",";
254   }
255   participants.append("]");
256   return participants;
257 }
258 
to_string(const char * line_separator) const259 std::string NDB_SCHEMA_OBJECT::to_string(const char *line_separator) const {
260   std::stringstream ss;
261   ss << "NDB_SCHEMA_OBJECT { " << line_separator << "  '" << m_db << "'.'"
262      << m_name << "', " << line_separator << "  id: " << m_id
263      << ", version: " << m_version << ", " << line_separator
264      << "  schema_op_id: " << m_schema_op_id << ", " << line_separator;
265 
266   // Dump state
267   std::lock_guard<std::mutex> lock_state(state.m_lock);
268   {
269     ss << "  use_count: " << state.m_use_count << ", " << line_separator;
270     // Print the participant list
271     ss << "  participants: " << state.m_participants.size() << " [ "
272        << line_separator;
273     for (const auto &it : state.m_participants) {
274       const uint32 nodeid = it.first;
275       const State::Participant &participant = it.second;
276       ss << "    { nodeid: " << nodeid << ", "
277          << "completed: " << participant.m_completed << ", "
278          << "result: " << participant.m_result << ", "
279          << "message: '" << participant.m_message << "'"
280          << "}," << line_separator;
281     }
282     ss << "  ]," << line_separator;
283     ss << "  coordinator_completed: " << state.m_coordinator_completed << ", "
284        << line_separator;
285   }
286   ss << "}";
287   return ss.str();
288 }
289 
count_completed_participants() const290 size_t NDB_SCHEMA_OBJECT::count_completed_participants() const {
291   size_t count = 0;
292   for (const auto &it : state.m_participants) {
293     const State::Participant &participant = it.second;
294     if (participant.m_completed) count++;
295   }
296   return count;
297 }
298 
register_participants(const std::unordered_set<uint32> & nodes) const299 void NDB_SCHEMA_OBJECT::register_participants(
300     const std::unordered_set<uint32> &nodes) const {
301   std::lock_guard<std::mutex> lock_state(state.m_lock);
302 
303   // Assume the list of participants is empty
304   ndbcluster::ndbrequire(state.m_participants.size() == 0);
305   // Assume coordinator have not completed
306   ndbcluster::ndbrequire(!state.m_coordinator_completed);
307 
308   // Insert new participants as specified by nodes list
309   for (const uint32 node : nodes) state.m_participants[node];
310 
311   // Double check that there are as many participants as nodes
312   ndbcluster::ndbrequire(nodes.size() == state.m_participants.size());
313 }
314 
result_received_from_node(uint32 participant_node_id,uint32 result,const std::string & message) const315 void NDB_SCHEMA_OBJECT::result_received_from_node(
316     uint32 participant_node_id, uint32 result,
317     const std::string &message) const {
318   std::lock_guard<std::mutex> lock_state(state.m_lock);
319 
320   const auto it = state.m_participants.find(participant_node_id);
321   if (it == state.m_participants.end()) {
322     // Received reply from node not registered as participant, may happen
323     // when a node hears the schema op but this node hasn't registered it as
324     // subscriber yet.
325     return;
326   }
327 
328   // Mark participant as completed and save result
329   State::Participant &participant = it->second;
330   participant.m_completed = true;
331   participant.m_result = result;
332   participant.m_message = message;
333 }
334 
result_received_from_nodes(const std::unordered_set<uint32> & nodes) const335 void NDB_SCHEMA_OBJECT::result_received_from_nodes(
336     const std::unordered_set<uint32> &nodes) const {
337   std::unique_lock<std::mutex> lock_state(state.m_lock);
338 
339   // Mark the listed nodes as completed
340   for (auto node : nodes) {
341     const auto it = state.m_participants.find(node);
342     if (it == state.m_participants.end()) {
343       // Received reply from node not registered as participant, may happen
344       // when a node hears the schema op but this node hasn't registered it as
345       // subscriber yet.
346       return;
347     }
348 
349     // Participant is not in list, mark it as failed
350     State::Participant &participant = it->second;
351     participant.m_completed = true;
352     // No result or message provided in old protocol
353   }
354 }
355 
check_all_participants_completed() const356 bool NDB_SCHEMA_OBJECT::check_all_participants_completed() const {
357   std::lock_guard<std::mutex> lock_state(state.m_lock);
358   return state.m_participants.size() == count_completed_participants();
359 }
360 
fail_participants_not_in_list(const std::unordered_set<uint32> & nodes,uint32 result,const char * message) const361 void NDB_SCHEMA_OBJECT::fail_participants_not_in_list(
362     const std::unordered_set<uint32> &nodes, uint32 result,
363     const char *message) const {
364   for (auto &it : state.m_participants) {
365     if (nodes.find(it.first) != nodes.end()) {
366       // Participant still exist in list
367       continue;
368     }
369 
370     // Participant is not in list, mark it as failed
371     State::Participant &participant = it.second;
372     participant.m_completed = true;
373     participant.m_result = result;
374     participant.m_message = message;
375   }
376 }
377 
check_for_failed_subscribers(const std::unordered_set<uint32> & new_subscribers,uint32 result,const char * message) const378 bool NDB_SCHEMA_OBJECT::check_for_failed_subscribers(
379     const std::unordered_set<uint32> &new_subscribers, uint32 result,
380     const char *message) const {
381   std::unique_lock<std::mutex> lock_state(state.m_lock);
382 
383   // Fail participants not in list of nodes
384   fail_participants_not_in_list(new_subscribers, result, message);
385 
386   if (state.m_participants.size() != count_completed_participants()) {
387     // Not all participants have completed yet
388     return false;
389   }
390 
391   // All participants have replied
392   return true;
393 }
394 
check_timeout(int timeout_seconds,uint32 result,const char * message) const395 bool NDB_SCHEMA_OBJECT::check_timeout(int timeout_seconds, uint32 result,
396                                       const char *message) const {
397   std::unique_lock<std::mutex> lock_state(state.m_lock);
398 
399   if (m_started + std::chrono::seconds(timeout_seconds) >
400       std::chrono::steady_clock::now())
401     return false;  // Timeout has not occured
402 
403   // Mark all participants who hasn't already completed as timedout
404   for (auto &it : state.m_participants) {
405     State::Participant &participant = it.second;
406     if (participant.m_completed) continue;
407 
408     participant.m_completed = true;
409     participant.m_result = result;
410     participant.m_message = message;
411   }
412 
413   // All participant should now have been marked as completed
414   ndbcluster::ndbrequire(state.m_participants.size() ==
415                          count_completed_participants());
416   return true;
417 }
418 
fail_schema_op(uint32 result,const char * message) const419 void NDB_SCHEMA_OBJECT::fail_schema_op(uint32 result,
420                                        const char *message) const {
421   std::unique_lock<std::mutex> lock_state(state.m_lock);
422 
423   if (state.m_participants.size() == 0) {
424     // Participants hasn't been registered yet since the coordinator
425     // hasn't heard about schema operation, add own node as participant
426     state.m_participants[active_schema_clients.m_own_nodeid];
427   }
428 
429   // Mark all participants who hasn't already completed as failed
430   for (auto &it : state.m_participants) {
431     State::Participant &participant = it.second;
432     if (participant.m_completed) continue;
433 
434     participant.m_completed = true;
435     participant.m_result = result;
436     participant.m_message = message;
437   }
438 
439   // All participant should now have been marked as completed
440   ndbcluster::ndbrequire(state.m_participants.size() ==
441                          count_completed_participants());
442   // Mark als coordinator as completed
443   state.m_coordinator_completed = true;
444 }
445 
fail_all_schema_ops(uint32 result,const char * message)446 void NDB_SCHEMA_OBJECT::fail_all_schema_ops(uint32 result,
447                                             const char *message) {
448   std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
449   for (const auto &entry : active_schema_clients.m_hash) {
450     const NDB_SCHEMA_OBJECT *schema_object = entry.second;
451     schema_object->fail_schema_op(result, message);
452   }
453 }
454 
check_coordinator_completed() const455 bool NDB_SCHEMA_OBJECT::check_coordinator_completed() const {
456   std::unique_lock<std::mutex> lock_state(state.m_lock);
457   // Don't set completed unless all participants have replied
458   if (state.m_participants.size() != count_completed_participants())
459     return false;
460 
461   state.m_coordinator_completed = true;
462   return true;
463 }
464 
client_wait_completed(uint max_wait_seconds) const465 bool NDB_SCHEMA_OBJECT::client_wait_completed(uint max_wait_seconds) const {
466   const auto timeout_time = std::chrono::seconds(max_wait_seconds);
467   std::unique_lock<std::mutex> lock_state(state.m_lock);
468 
469   const bool completed =
470       state.m_cond.wait_for(lock_state, timeout_time, [this]() {
471         return state.m_use_count == 1 &&  // Only the Client left
472                state.m_coordinator_completed &&
473                state.m_participants.size() == count_completed_participants();
474       });
475 
476   return completed;
477 }
478 
client_get_schema_op_results(std::vector<Result> & results) const479 void NDB_SCHEMA_OBJECT::client_get_schema_op_results(
480     std::vector<Result> &results) const {
481   std::unique_lock<std::mutex> lock_state(state.m_lock);
482   // Make sure that coordinator has completed
483   ndbcluster::ndbrequire(state.m_coordinator_completed);
484 
485   for (const auto &it : state.m_participants) {
486     const State::Participant &participant = it.second;
487     if (participant.m_result)
488       results.push_back({
489           it.first,              // nodeid
490           participant.m_result,  // result
491           participant.m_message  // message
492       });
493   }
494 }
495