1 /*
2 Copyright (c) 2011, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 // Implements the functions declared in ndb_schema_object.h
26 #include "storage/ndb/plugin/ndb_schema_object.h"
27
28 #include <atomic>
29 #include <mutex>
30 #include <sstream>
31 #include <string>
32 #include <unordered_map>
33
34 #include "storage/ndb/plugin/ndb_bitmap.h"
35 #include "storage/ndb/plugin/ndb_require.h"
36
37 // List keeping track of active NDB_SCHEMA_OBJECTs. The list is used
38 // by the schema distribution coordinator to find the correct NDB_SCHEMA_OBJECT
39 // in order to communicate with the schema dist client.
40 class Ndb_schema_objects {
41 public:
42 // Nodeid of this node
43 uint32 m_own_nodeid{0};
44
45 // Mutex protecting the unordered map
46 std::mutex m_lock;
47 std::unordered_map<std::string, NDB_SCHEMA_OBJECT *> m_hash;
Ndb_schema_objects()48 Ndb_schema_objects() {}
49
find(std::string key) const50 NDB_SCHEMA_OBJECT *find(std::string key) const {
51 const auto it = m_hash.find(key);
52 if (it == m_hash.end()) return nullptr;
53 return it->second;
54 }
55
56 /**
57 @brief Find NDB_SCHEMA_OBJECT with corresponding nodeid and schema_op_id
58
59 @param nodeid Nodeid to find
60 @param schema_op_id Schema operation id to find
61 @return Pointer to NDB_SCHEMA_OBJECT or nullptr
62
63 @note Searches by iterating over the list until an entry is found. This is
64 ok as normally only one schema operation at a time is supported and thus
65 there is only one entry in the hash.
66 */
find(uint32 nodeid,uint32 schema_op_id) const67 NDB_SCHEMA_OBJECT *find(uint32 nodeid, uint32 schema_op_id) const {
68 DBUG_ASSERT(nodeid);
69
70 // Make sure that own nodeid has been set
71 DBUG_ASSERT(m_own_nodeid);
72
73 if (nodeid != m_own_nodeid) {
74 // Looking for a schema operation started in another node, the
75 // schema_op_id is only valid in the node which started
76 return nullptr;
77 }
78
79 for (const auto &entry : m_hash) {
80 NDB_SCHEMA_OBJECT *schema_object = entry.second;
81 if (schema_object->schema_op_id() == schema_op_id) return schema_object;
82 }
83 return nullptr;
84 }
85 } active_schema_clients;
86
init(uint32 nodeid)87 void NDB_SCHEMA_OBJECT::init(uint32 nodeid) {
88 DBUG_ASSERT(nodeid);
89 std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
90 // Make sure that no active schema clients exist when function is called
91 DBUG_ASSERT(active_schema_clients.m_hash.size() == 0);
92 active_schema_clients.m_own_nodeid = nodeid;
93 }
94
get_schema_op_ids(std::vector<uint32> & ids)95 void NDB_SCHEMA_OBJECT::get_schema_op_ids(std::vector<uint32> &ids) {
96 std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
97 for (const auto &entry : active_schema_clients.m_hash) {
98 NDB_SCHEMA_OBJECT *schema_object = entry.second;
99 ids.push_back(schema_object->schema_op_id());
100 }
101 }
102
next_schema_op_id()103 static uint32 next_schema_op_id() {
104 static std::atomic<uint32> schema_op_id_sequence{1};
105 uint32 id = schema_op_id_sequence++;
106 // Handle wraparound
107 if (id == 0) {
108 id = schema_op_id_sequence++;
109 }
110 DBUG_ASSERT(id != 0);
111 return id;
112 }
113
decremement_use_count() const114 uint NDB_SCHEMA_OBJECT::decremement_use_count() const {
115 std::lock_guard<std::mutex> lock_state(state.m_lock);
116 ndbcluster::ndbrequire(state.m_use_count > 0);
117 state.m_use_count--;
118 DBUG_PRINT("info", ("use_count: %d", state.m_use_count));
119 return state.m_use_count;
120 }
121
increment_use_count() const122 uint NDB_SCHEMA_OBJECT::increment_use_count() const {
123 std::lock_guard<std::mutex> lock_state(state.m_lock);
124 state.m_use_count++;
125 DBUG_PRINT("info", ("use_count: %d", state.m_use_count));
126 return state.m_use_count;
127 }
128
NDB_SCHEMA_OBJECT(const char * key,const char * db,const char * name,uint32 id,uint32 version)129 NDB_SCHEMA_OBJECT::NDB_SCHEMA_OBJECT(const char *key, const char *db,
130 const char *name, uint32 id,
131 uint32 version)
132 : m_key(key),
133 m_db(db),
134 m_name(name),
135 m_id(id),
136 m_version(version),
137 m_schema_op_id(next_schema_op_id()),
138 m_started(std::chrono::steady_clock::now()) {}
139
~NDB_SCHEMA_OBJECT()140 NDB_SCHEMA_OBJECT::~NDB_SCHEMA_OBJECT() {
141 DBUG_ASSERT(state.m_use_count == 0);
142 // Check that all participants have completed
143 DBUG_ASSERT(state.m_participants.size() == count_completed_participants());
144 // Coordinator should have completed
145 DBUG_ASSERT(state.m_coordinator_completed);
146 }
147
get(const char * db,const char * table_name,uint32 id,uint32 version,bool create)148 NDB_SCHEMA_OBJECT *NDB_SCHEMA_OBJECT::get(const char *db,
149 const char *table_name, uint32 id,
150 uint32 version, bool create) {
151 DBUG_TRACE;
152 DBUG_PRINT("enter", ("db: '%s', table_name: '%s', id: %u, version: %u", db,
153 table_name, id, version));
154
155 // Build a key on the form "./<db>/<name>_<id>_<version>"
156 const std::string key = std::string("./") + db + "/" + table_name + "_" +
157 std::to_string(id) + "_" + std::to_string(version);
158 DBUG_PRINT("info", ("key: '%s'", key.c_str()));
159
160 std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
161
162 NDB_SCHEMA_OBJECT *ndb_schema_object = active_schema_clients.find(key);
163 if (ndb_schema_object) {
164 // Don't allow reuse of existing NDB_SCHEMA_OBJECT when requesting to
165 // create, only the Ndb_schema_dist_client will create NDB_SCHEMA_OBJECT
166 // and it should wait until previous schema operation with
167 // same key has completed.
168 ndbcluster::ndbrequire(!create);
169
170 (void)ndb_schema_object->increment_use_count();
171 return ndb_schema_object;
172 }
173
174 if (!create) {
175 DBUG_PRINT("info", ("does not exist"));
176 return nullptr;
177 }
178
179 ndb_schema_object = new (std::nothrow)
180 NDB_SCHEMA_OBJECT(key.c_str(), db, table_name, id, version);
181 if (!ndb_schema_object) {
182 DBUG_PRINT("info", ("failed to allocate"));
183 return nullptr;
184 }
185
186 // Add to list of NDB_SCHEMA_OBJECTs
187 active_schema_clients.m_hash.emplace(key, ndb_schema_object);
188 return ndb_schema_object;
189 }
190
get(uint32 nodeid,uint32 schema_op_id)191 NDB_SCHEMA_OBJECT *NDB_SCHEMA_OBJECT::get(uint32 nodeid, uint32 schema_op_id) {
192 DBUG_TRACE;
193 DBUG_PRINT("enter", ("nodeid: %d, schema_op_id: %u", nodeid, schema_op_id));
194
195 std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
196
197 NDB_SCHEMA_OBJECT *ndb_schema_object =
198 active_schema_clients.find(nodeid, schema_op_id);
199 if (ndb_schema_object) {
200 (void)ndb_schema_object->increment_use_count();
201 return ndb_schema_object;
202 }
203
204 DBUG_PRINT("info", ("No NDB_SCHEMA_OBJECT found"));
205 return nullptr;
206 }
207
get(NDB_SCHEMA_OBJECT * schema_object)208 NDB_SCHEMA_OBJECT *NDB_SCHEMA_OBJECT::get(NDB_SCHEMA_OBJECT *schema_object) {
209 DBUG_TRACE;
210 DBUG_PRINT("enter", ("schema_object: %p", schema_object));
211
212 ndbcluster::ndbrequire(schema_object);
213
214 const uint use_count = schema_object->increment_use_count();
215 // Should already have been used before calling this function
216 ndbcluster::ndbrequire(use_count > 1);
217
218 return schema_object;
219 }
220
release(NDB_SCHEMA_OBJECT * ndb_schema_object)221 void NDB_SCHEMA_OBJECT::release(NDB_SCHEMA_OBJECT *ndb_schema_object) {
222 DBUG_TRACE;
223 DBUG_PRINT("enter", ("key: '%s'", ndb_schema_object->m_key.c_str()));
224
225 const uint use_count = ndb_schema_object->decremement_use_count();
226 if (use_count != 0) {
227 // Not the last user
228 if (use_count == 1) {
229 // Only one user left, must be the Client, signal it to wakeup
230 ndb_schema_object->state.m_cond.notify_one();
231 }
232 return;
233 }
234
235 // Last user, remove from list of NDB_SCHEMA_OBJECTS and delete instance
236 std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
237 active_schema_clients.m_hash.erase(ndb_schema_object->m_key);
238 delete ndb_schema_object;
239 }
240
count_active_schema_ops()241 size_t NDB_SCHEMA_OBJECT::count_active_schema_ops() {
242 std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
243 return active_schema_clients.m_hash.size();
244 }
245
waiting_participants_to_string() const246 std::string NDB_SCHEMA_OBJECT::waiting_participants_to_string() const {
247 std::lock_guard<std::mutex> lock_state(state.m_lock);
248 const char *separator = "";
249 std::string participants("[");
250 for (const auto &it : state.m_participants) {
251 if (it.second.m_completed == true) continue; // Don't show completed
252 participants.append(separator).append(std::to_string(it.first));
253 separator = ",";
254 }
255 participants.append("]");
256 return participants;
257 }
258
to_string(const char * line_separator) const259 std::string NDB_SCHEMA_OBJECT::to_string(const char *line_separator) const {
260 std::stringstream ss;
261 ss << "NDB_SCHEMA_OBJECT { " << line_separator << " '" << m_db << "'.'"
262 << m_name << "', " << line_separator << " id: " << m_id
263 << ", version: " << m_version << ", " << line_separator
264 << " schema_op_id: " << m_schema_op_id << ", " << line_separator;
265
266 // Dump state
267 std::lock_guard<std::mutex> lock_state(state.m_lock);
268 {
269 ss << " use_count: " << state.m_use_count << ", " << line_separator;
270 // Print the participant list
271 ss << " participants: " << state.m_participants.size() << " [ "
272 << line_separator;
273 for (const auto &it : state.m_participants) {
274 const uint32 nodeid = it.first;
275 const State::Participant &participant = it.second;
276 ss << " { nodeid: " << nodeid << ", "
277 << "completed: " << participant.m_completed << ", "
278 << "result: " << participant.m_result << ", "
279 << "message: '" << participant.m_message << "'"
280 << "}," << line_separator;
281 }
282 ss << " ]," << line_separator;
283 ss << " coordinator_completed: " << state.m_coordinator_completed << ", "
284 << line_separator;
285 }
286 ss << "}";
287 return ss.str();
288 }
289
count_completed_participants() const290 size_t NDB_SCHEMA_OBJECT::count_completed_participants() const {
291 size_t count = 0;
292 for (const auto &it : state.m_participants) {
293 const State::Participant &participant = it.second;
294 if (participant.m_completed) count++;
295 }
296 return count;
297 }
298
register_participants(const std::unordered_set<uint32> & nodes) const299 void NDB_SCHEMA_OBJECT::register_participants(
300 const std::unordered_set<uint32> &nodes) const {
301 std::lock_guard<std::mutex> lock_state(state.m_lock);
302
303 // Assume the list of participants is empty
304 ndbcluster::ndbrequire(state.m_participants.size() == 0);
305 // Assume coordinator have not completed
306 ndbcluster::ndbrequire(!state.m_coordinator_completed);
307
308 // Insert new participants as specified by nodes list
309 for (const uint32 node : nodes) state.m_participants[node];
310
311 // Double check that there are as many participants as nodes
312 ndbcluster::ndbrequire(nodes.size() == state.m_participants.size());
313 }
314
result_received_from_node(uint32 participant_node_id,uint32 result,const std::string & message) const315 void NDB_SCHEMA_OBJECT::result_received_from_node(
316 uint32 participant_node_id, uint32 result,
317 const std::string &message) const {
318 std::lock_guard<std::mutex> lock_state(state.m_lock);
319
320 const auto it = state.m_participants.find(participant_node_id);
321 if (it == state.m_participants.end()) {
322 // Received reply from node not registered as participant, may happen
323 // when a node hears the schema op but this node hasn't registered it as
324 // subscriber yet.
325 return;
326 }
327
328 // Mark participant as completed and save result
329 State::Participant &participant = it->second;
330 participant.m_completed = true;
331 participant.m_result = result;
332 participant.m_message = message;
333 }
334
result_received_from_nodes(const std::unordered_set<uint32> & nodes) const335 void NDB_SCHEMA_OBJECT::result_received_from_nodes(
336 const std::unordered_set<uint32> &nodes) const {
337 std::unique_lock<std::mutex> lock_state(state.m_lock);
338
339 // Mark the listed nodes as completed
340 for (auto node : nodes) {
341 const auto it = state.m_participants.find(node);
342 if (it == state.m_participants.end()) {
343 // Received reply from node not registered as participant, may happen
344 // when a node hears the schema op but this node hasn't registered it as
345 // subscriber yet.
346 return;
347 }
348
349 // Participant is not in list, mark it as failed
350 State::Participant &participant = it->second;
351 participant.m_completed = true;
352 // No result or message provided in old protocol
353 }
354 }
355
check_all_participants_completed() const356 bool NDB_SCHEMA_OBJECT::check_all_participants_completed() const {
357 std::lock_guard<std::mutex> lock_state(state.m_lock);
358 return state.m_participants.size() == count_completed_participants();
359 }
360
fail_participants_not_in_list(const std::unordered_set<uint32> & nodes,uint32 result,const char * message) const361 void NDB_SCHEMA_OBJECT::fail_participants_not_in_list(
362 const std::unordered_set<uint32> &nodes, uint32 result,
363 const char *message) const {
364 for (auto &it : state.m_participants) {
365 if (nodes.find(it.first) != nodes.end()) {
366 // Participant still exist in list
367 continue;
368 }
369
370 // Participant is not in list, mark it as failed
371 State::Participant &participant = it.second;
372 participant.m_completed = true;
373 participant.m_result = result;
374 participant.m_message = message;
375 }
376 }
377
check_for_failed_subscribers(const std::unordered_set<uint32> & new_subscribers,uint32 result,const char * message) const378 bool NDB_SCHEMA_OBJECT::check_for_failed_subscribers(
379 const std::unordered_set<uint32> &new_subscribers, uint32 result,
380 const char *message) const {
381 std::unique_lock<std::mutex> lock_state(state.m_lock);
382
383 // Fail participants not in list of nodes
384 fail_participants_not_in_list(new_subscribers, result, message);
385
386 if (state.m_participants.size() != count_completed_participants()) {
387 // Not all participants have completed yet
388 return false;
389 }
390
391 // All participants have replied
392 return true;
393 }
394
check_timeout(int timeout_seconds,uint32 result,const char * message) const395 bool NDB_SCHEMA_OBJECT::check_timeout(int timeout_seconds, uint32 result,
396 const char *message) const {
397 std::unique_lock<std::mutex> lock_state(state.m_lock);
398
399 if (m_started + std::chrono::seconds(timeout_seconds) >
400 std::chrono::steady_clock::now())
401 return false; // Timeout has not occured
402
403 // Mark all participants who hasn't already completed as timedout
404 for (auto &it : state.m_participants) {
405 State::Participant &participant = it.second;
406 if (participant.m_completed) continue;
407
408 participant.m_completed = true;
409 participant.m_result = result;
410 participant.m_message = message;
411 }
412
413 // All participant should now have been marked as completed
414 ndbcluster::ndbrequire(state.m_participants.size() ==
415 count_completed_participants());
416 return true;
417 }
418
fail_schema_op(uint32 result,const char * message) const419 void NDB_SCHEMA_OBJECT::fail_schema_op(uint32 result,
420 const char *message) const {
421 std::unique_lock<std::mutex> lock_state(state.m_lock);
422
423 if (state.m_participants.size() == 0) {
424 // Participants hasn't been registered yet since the coordinator
425 // hasn't heard about schema operation, add own node as participant
426 state.m_participants[active_schema_clients.m_own_nodeid];
427 }
428
429 // Mark all participants who hasn't already completed as failed
430 for (auto &it : state.m_participants) {
431 State::Participant &participant = it.second;
432 if (participant.m_completed) continue;
433
434 participant.m_completed = true;
435 participant.m_result = result;
436 participant.m_message = message;
437 }
438
439 // All participant should now have been marked as completed
440 ndbcluster::ndbrequire(state.m_participants.size() ==
441 count_completed_participants());
442 // Mark als coordinator as completed
443 state.m_coordinator_completed = true;
444 }
445
fail_all_schema_ops(uint32 result,const char * message)446 void NDB_SCHEMA_OBJECT::fail_all_schema_ops(uint32 result,
447 const char *message) {
448 std::lock_guard<std::mutex> lock_hash(active_schema_clients.m_lock);
449 for (const auto &entry : active_schema_clients.m_hash) {
450 const NDB_SCHEMA_OBJECT *schema_object = entry.second;
451 schema_object->fail_schema_op(result, message);
452 }
453 }
454
check_coordinator_completed() const455 bool NDB_SCHEMA_OBJECT::check_coordinator_completed() const {
456 std::unique_lock<std::mutex> lock_state(state.m_lock);
457 // Don't set completed unless all participants have replied
458 if (state.m_participants.size() != count_completed_participants())
459 return false;
460
461 state.m_coordinator_completed = true;
462 return true;
463 }
464
client_wait_completed(uint max_wait_seconds) const465 bool NDB_SCHEMA_OBJECT::client_wait_completed(uint max_wait_seconds) const {
466 const auto timeout_time = std::chrono::seconds(max_wait_seconds);
467 std::unique_lock<std::mutex> lock_state(state.m_lock);
468
469 const bool completed =
470 state.m_cond.wait_for(lock_state, timeout_time, [this]() {
471 return state.m_use_count == 1 && // Only the Client left
472 state.m_coordinator_completed &&
473 state.m_participants.size() == count_completed_participants();
474 });
475
476 return completed;
477 }
478
client_get_schema_op_results(std::vector<Result> & results) const479 void NDB_SCHEMA_OBJECT::client_get_schema_op_results(
480 std::vector<Result> &results) const {
481 std::unique_lock<std::mutex> lock_state(state.m_lock);
482 // Make sure that coordinator has completed
483 ndbcluster::ndbrequire(state.m_coordinator_completed);
484
485 for (const auto &it : state.m_participants) {
486 const State::Participant &participant = it.second;
487 if (participant.m_result)
488 results.push_back({
489 it.first, // nodeid
490 participant.m_result, // result
491 participant.m_message // message
492 });
493 }
494 }
495