1 /*
2 * Copyright (C) 2018-2021 Codership Oy <info@codership.com>
3 *
4 * This file is part of wsrep-lib.
5 *
6 * Wsrep-lib is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * Wsrep-lib is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include "wsrep_provider_v26.hpp"
21
22 #include "wsrep/encryption_service.hpp"
23 #include "wsrep/server_state.hpp"
24 #include "wsrep/high_priority_service.hpp"
25 #include "wsrep/view.hpp"
26 #include "wsrep/exception.hpp"
27 #include "wsrep/logger.hpp"
28 #include "wsrep/thread_service.hpp"
29 #include "wsrep/tls_service.hpp"
30 #include "wsrep/allowlist_service.hpp"
31
32 #include "thread_service_v1.hpp"
33 #include "tls_service_v1.hpp"
34 #include "allowlist_service_v1.hpp"
35 #include "event_service_v1.hpp"
36 #include "v26/wsrep_api.h"
37
38
39 #include <dlfcn.h>
40 #include <cassert>
41 #include <climits>
42
43 #include <iostream>
44 #include <sstream>
45 #include <cstring> // strerror()
46
47 namespace
48 {
49 /////////////////////////////////////////////////////////////////////
50 // Helpers //
51 /////////////////////////////////////////////////////////////////////
52
map_return_value(wsrep_status_t status)53 enum wsrep::provider::status map_return_value(wsrep_status_t status)
54 {
55 switch (status)
56 {
57 case WSREP_OK:
58 return wsrep::provider::success;
59 case WSREP_WARNING:
60 return wsrep::provider::error_warning;
61 case WSREP_TRX_MISSING:
62 return wsrep::provider::error_transaction_missing;
63 case WSREP_TRX_FAIL:
64 return wsrep::provider::error_certification_failed;
65 case WSREP_BF_ABORT:
66 return wsrep::provider::error_bf_abort;
67 case WSREP_SIZE_EXCEEDED:
68 return wsrep::provider::error_size_exceeded;
69 case WSREP_CONN_FAIL:
70 return wsrep::provider::error_connection_failed;
71 case WSREP_NODE_FAIL:
72 return wsrep::provider::error_provider_failed;
73 case WSREP_FATAL:
74 return wsrep::provider::error_fatal;
75 case WSREP_NOT_IMPLEMENTED:
76 return wsrep::provider::error_not_implemented;
77 case WSREP_NOT_ALLOWED:
78 return wsrep::provider::error_not_allowed;
79 }
80
81 wsrep::log_warning() << "Unexpected value for wsrep_status_t: "
82 << status << " ("
83 << (status < 0 ? strerror(-status) : "") << ')';
84
85 return wsrep::provider::error_unknown;
86 }
87
map_key_type(enum wsrep::key::type type)88 wsrep_key_type_t map_key_type(enum wsrep::key::type type)
89 {
90 switch (type)
91 {
92 case wsrep::key::shared: return WSREP_KEY_SHARED;
93 case wsrep::key::reference: return WSREP_KEY_REFERENCE;
94 case wsrep::key::update: return WSREP_KEY_UPDATE;
95 case wsrep::key::exclusive: return WSREP_KEY_EXCLUSIVE;
96 }
97 assert(0);
98 throw wsrep::runtime_error("Invalid key type");
99 }
100
seqno_to_native(wsrep::seqno seqno)101 static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno)
102 {
103 return seqno.get();
104 }
105
seqno_from_native(wsrep_seqno_t seqno)106 static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno)
107 {
108 return wsrep::seqno(seqno);
109 }
110
111 template <typename F, typename T>
map_one(const int flags,const F from,const T to)112 inline uint32_t map_one(const int flags, const F from,
113 const T to)
114 {
115 // INT_MAX because GCC 4.4 does not eat numeric_limits<int>::max()
116 // in static_assert
117 static_assert(WSREP_FLAGS_LAST < INT_MAX,
118 "WSREP_FLAGS_LAST < INT_MAX");
119 return static_cast<uint32_t>((flags & static_cast<int>(from)) ?
120 static_cast<int>(to) : 0);
121 }
122
map_flags_to_native(int flags)123 uint32_t map_flags_to_native(int flags)
124 {
125 using wsrep::provider;
126 return static_cast<uint32_t>(
127 map_one(flags, provider::flag::start_transaction,
128 WSREP_FLAG_TRX_START) |
129 map_one(flags, provider::flag::commit, WSREP_FLAG_TRX_END) |
130 map_one(flags, provider::flag::rollback, WSREP_FLAG_ROLLBACK) |
131 map_one(flags, provider::flag::isolation, WSREP_FLAG_ISOLATION) |
132 map_one(flags, provider::flag::pa_unsafe, WSREP_FLAG_PA_UNSAFE) |
133 // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
134 // |
135 // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
136 map_one(flags, provider::flag::prepare, WSREP_FLAG_TRX_PREPARE) |
137 map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT) |
138 map_one(flags, provider::flag::implicit_deps,
139 WSREP_FLAG_IMPLICIT_DEPS));
140 }
141
map_flags_from_native(uint32_t flags_u32)142 int map_flags_from_native(uint32_t flags_u32)
143 {
144 using wsrep::provider;
145 const int flags(static_cast<int>(flags_u32));
146 return static_cast<int>(
147 map_one(flags, WSREP_FLAG_TRX_START,
148 provider::flag::start_transaction) |
149 map_one(flags, WSREP_FLAG_TRX_END, provider::flag::commit) |
150 map_one(flags, WSREP_FLAG_ROLLBACK, provider::flag::rollback) |
151 map_one(flags, WSREP_FLAG_ISOLATION, provider::flag::isolation) |
152 map_one(flags, WSREP_FLAG_PA_UNSAFE, provider::flag::pa_unsafe) |
153 // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
154 // |
155 // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
156 map_one(flags, WSREP_FLAG_TRX_PREPARE, provider::flag::prepare) |
157 map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot) |
158 map_one(flags, WSREP_FLAG_IMPLICIT_DEPS,
159 provider::flag::implicit_deps));
160 }
161
162 class mutable_ws_handle
163 {
164 public:
mutable_ws_handle(wsrep::ws_handle & ws_handle)165 mutable_ws_handle(wsrep::ws_handle& ws_handle)
166 : ws_handle_(ws_handle)
167 , native_((wsrep_ws_handle_t)
168 {
169 ws_handle_.transaction_id().get(),
170 ws_handle_.opaque()
171 })
172 { }
173
~mutable_ws_handle()174 ~mutable_ws_handle()
175 {
176 ws_handle_ = wsrep::ws_handle(
177 wsrep::transaction_id(native_.trx_id), native_.opaque);
178 }
179
native()180 wsrep_ws_handle_t* native()
181 {
182 return &native_;
183 }
184 private:
185 wsrep::ws_handle& ws_handle_;
186 wsrep_ws_handle_t native_;
187 };
188
189 class const_ws_handle
190 {
191 public:
const_ws_handle(const wsrep::ws_handle & ws_handle)192 const_ws_handle(const wsrep::ws_handle& ws_handle)
193 : ws_handle_(ws_handle)
194 , native_((wsrep_ws_handle_t)
195 {
196 ws_handle_.transaction_id().get(),
197 ws_handle_.opaque()
198 })
199 { }
200
~const_ws_handle()201 ~const_ws_handle()
202 {
203 assert(ws_handle_.transaction_id().get() == native_.trx_id);
204 assert(ws_handle_.opaque() == native_.opaque);
205 }
206
native() const207 const wsrep_ws_handle_t* native() const
208 {
209 return &native_;
210 }
211 private:
212 const wsrep::ws_handle& ws_handle_;
213 const wsrep_ws_handle_t native_;
214 };
215
216 class mutable_ws_meta
217 {
218 public:
mutable_ws_meta(wsrep::ws_meta & ws_meta,int flags)219 mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags)
220 : ws_meta_(ws_meta)
221 , trx_meta_()
222 , flags_(flags)
223 {
224 std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
225 sizeof(trx_meta_.gtid.uuid.data));
226 trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
227 std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
228 sizeof(trx_meta_.stid.node.data));
229 trx_meta_.stid.conn = ws_meta.client_id().get();
230 trx_meta_.stid.trx = ws_meta.transaction_id().get();
231 trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
232 }
233
~mutable_ws_meta()234 ~mutable_ws_meta()
235 {
236 ws_meta_ = wsrep::ws_meta(
237 wsrep::gtid(
238 wsrep::id(trx_meta_.gtid.uuid.data,
239 sizeof(trx_meta_.gtid.uuid.data)),
240 seqno_from_native(trx_meta_.gtid.seqno)),
241 wsrep::stid(wsrep::id(trx_meta_.stid.node.data,
242 sizeof(trx_meta_.stid.node.data)),
243 wsrep::transaction_id(trx_meta_.stid.trx),
244 wsrep::client_id(trx_meta_.stid.conn)),
245 seqno_from_native(trx_meta_.depends_on), flags_);
246 }
247
native()248 wsrep_trx_meta* native() { return &trx_meta_; }
native_flags() const249 uint32_t native_flags() const { return map_flags_to_native(flags_); }
250 private:
251 wsrep::ws_meta& ws_meta_;
252 wsrep_trx_meta_t trx_meta_;
253 int flags_;
254 };
255
256 class const_ws_meta
257 {
258 public:
const_ws_meta(const wsrep::ws_meta & ws_meta)259 const_ws_meta(const wsrep::ws_meta& ws_meta)
260 : trx_meta_()
261 {
262 std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
263 sizeof(trx_meta_.gtid.uuid.data));
264 trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
265 std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
266 sizeof(trx_meta_.stid.node.data));
267 trx_meta_.stid.conn = ws_meta.client_id().get();
268 trx_meta_.stid.trx = ws_meta.transaction_id().get();
269 trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
270 }
271
~const_ws_meta()272 ~const_ws_meta()
273 {
274 }
275
native() const276 const wsrep_trx_meta* native() const { return &trx_meta_; }
277 private:
278 wsrep_trx_meta_t trx_meta_;
279 };
280
map_view_status_from_native(wsrep_view_status_t status)281 enum wsrep::view::status map_view_status_from_native(
282 wsrep_view_status_t status)
283 {
284 switch (status)
285 {
286 case WSREP_VIEW_PRIMARY: return wsrep::view::primary;
287 case WSREP_VIEW_NON_PRIMARY: return wsrep::view::non_primary;
288 case WSREP_VIEW_DISCONNECTED: return wsrep::view::disconnected;
289 default: throw wsrep::runtime_error("Unknown view status");
290 }
291 }
292
293 /** @todo Currently capabilities defined in provider.hpp
294 * are one to one with wsrep_api.h. However, the mapping should
295 * be made explicit. */
map_capabilities_from_native(wsrep_cap_t capabilities)296 int map_capabilities_from_native(wsrep_cap_t capabilities)
297 {
298 return static_cast<int>(capabilities);
299 }
view_from_native(const wsrep_view_info & view_info,const wsrep::id & own_id)300 wsrep::view view_from_native(const wsrep_view_info& view_info,
301 const wsrep::id& own_id)
302 {
303 std::vector<wsrep::view::member> members;
304 for (int i(0); i < view_info.memb_num; ++i)
305 {
306 wsrep::id id(view_info.members[i].id.data, sizeof(view_info.members[i].id.data));
307 std::string name(
308 view_info.members[i].name,
309 strnlen(view_info.members[i].name,
310 sizeof(view_info.members[i].name)));
311 std::string incoming(
312 view_info.members[i].incoming,
313 strnlen(view_info.members[i].incoming,
314 sizeof(view_info.members[i].incoming)));
315 members.push_back(wsrep::view::member(id, name, incoming));
316 }
317
318 int own_idx(-1);
319 if (own_id.is_undefined())
320 {
321 // If own ID is undefined, obtain it from the view. This is
322 // the case on the initial connect to cluster.
323 own_idx = view_info.my_idx;
324 }
325 else
326 {
327 // If the node has already obtained its ID from cluster,
328 // its position in the view (or lack thereof) must be determined
329 // by the ID.
330 for (size_t i(0); i < members.size(); ++i)
331 {
332 if (own_id == members[i].id())
333 {
334 own_idx = static_cast<int>(i);
335 break;
336 }
337 }
338 }
339
340 return wsrep::view(
341 wsrep::gtid(
342 wsrep::id(view_info.state_id.uuid.data,
343 sizeof(view_info.state_id.uuid.data)),
344 wsrep::seqno(view_info.state_id.seqno)),
345 wsrep::seqno(view_info.view),
346 map_view_status_from_native(view_info.status),
347 map_capabilities_from_native(view_info.capabilities),
348 own_idx,
349 view_info.proto_ver,
350 members);
351 }
352
353 /////////////////////////////////////////////////////////////////////
354 // Callbacks //
355 /////////////////////////////////////////////////////////////////////
356
connected_cb(void * app_ctx,const wsrep_view_info_t * view_info)357 wsrep_cb_status_t connected_cb(
358 void* app_ctx,
359 const wsrep_view_info_t* view_info)
360 {
361 assert(app_ctx);
362 wsrep::server_state& server_state(
363 *reinterpret_cast<wsrep::server_state*>(app_ctx));
364 wsrep::view view(view_from_native(*view_info, server_state.id()));
365 const ssize_t own_index(view.own_index());
366 assert(own_index >= 0);
367 if (own_index < 0)
368 {
369 wsrep::log_error() << "Connected without being in reported view";
370 return WSREP_CB_FAILURE;
371 }
372 assert(// first connect
373 server_state.id().is_undefined() ||
374 // reconnect to primary component
375 server_state.id() ==
376 view.members()[static_cast<size_t>(own_index)].id());
377 try
378 {
379 server_state.on_connect(view);
380 return WSREP_CB_SUCCESS;
381 }
382 catch (const wsrep::runtime_error& e)
383 {
384 wsrep::log_error() << "Exception: " << e.what();
385 return WSREP_CB_FAILURE;
386 }
387 }
388
view_cb(void * app_ctx,void * recv_ctx,const wsrep_view_info_t * view_info,const char *,size_t)389 wsrep_cb_status_t view_cb(void* app_ctx,
390 void* recv_ctx,
391 const wsrep_view_info_t* view_info,
392 const char*,
393 size_t)
394 {
395 assert(app_ctx);
396 assert(view_info);
397 wsrep::server_state& server_state(
398 *reinterpret_cast<wsrep::server_state*>(app_ctx));
399 wsrep::high_priority_service* high_priority_service(
400 reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
401 try
402 {
403 wsrep::view view(view_from_native(*view_info, server_state.id()));
404 server_state.on_view(view, high_priority_service);
405 return WSREP_CB_SUCCESS;
406 }
407 catch (const wsrep::runtime_error& e)
408 {
409 wsrep::log_error() << "Exception: " << e.what();
410 return WSREP_CB_FAILURE;
411 }
412 }
413
sst_request_cb(void * app_ctx,void ** sst_req,size_t * sst_req_len)414 wsrep_cb_status_t sst_request_cb(void* app_ctx,
415 void **sst_req, size_t* sst_req_len)
416 {
417 assert(app_ctx);
418 wsrep::server_state& server_state(
419 *reinterpret_cast<wsrep::server_state*>(app_ctx));
420
421 try
422 {
423 std::string req(server_state.prepare_for_sst());
424 if (req.size() > 0)
425 {
426 *sst_req = ::malloc(req.size() + 1);
427 memcpy(*sst_req, req.data(), req.size() + 1);
428 *sst_req_len = req.size() + 1;
429 }
430 else
431 {
432 *sst_req = 0;
433 *sst_req_len = 0;
434 }
435 return WSREP_CB_SUCCESS;
436 }
437 catch (const wsrep::runtime_error& e)
438 {
439 return WSREP_CB_FAILURE;
440 }
441 }
442
encrypt_cb(void * app_ctx,wsrep_enc_ctx_t * enc_ctx,const wsrep_buf_t * input,void * output,wsrep_enc_direction_t direction,bool last)443 int encrypt_cb(void* app_ctx,
444 wsrep_enc_ctx_t* enc_ctx,
445 const wsrep_buf_t* input,
446 void* output,
447 wsrep_enc_direction_t direction,
448 bool last)
449 {
450 assert(app_ctx);
451 wsrep::server_state& server_state(
452 *static_cast<wsrep::server_state*>(app_ctx));
453
454 assert(server_state.encryption_service());
455 if (server_state.encryption_service() == 0)
456 {
457 wsrep::log_error() << "Encryption service not defined in encrypt_cb()";
458 return -1;
459 }
460
461 wsrep::const_buffer key(enc_ctx->key->ptr, enc_ctx->key->len);
462 wsrep::const_buffer in(input->ptr, input->len);
463 try
464 {
465 return server_state.encryption_service()->do_crypt(&enc_ctx->ctx,
466 key,
467 enc_ctx->iv,
468 in,
469 output,
470 direction == WSREP_ENC,
471 last);
472 }
473 catch (const wsrep::runtime_error& e)
474 {
475 free(enc_ctx->ctx);
476 // Return negative value in case of callback error
477 return -1;
478 }
479 }
480
apply_cb(void * ctx,const wsrep_ws_handle_t * wsh,uint32_t flags,const wsrep_buf_t * buf,const wsrep_trx_meta_t * meta,wsrep_bool_t * exit_loop)481 wsrep_cb_status_t apply_cb(void* ctx,
482 const wsrep_ws_handle_t* wsh,
483 uint32_t flags,
484 const wsrep_buf_t* buf,
485 const wsrep_trx_meta_t* meta,
486 wsrep_bool_t* exit_loop)
487 {
488 wsrep::high_priority_service* high_priority_service(
489 reinterpret_cast<wsrep::high_priority_service*>(ctx));
490 assert(high_priority_service);
491
492 wsrep::const_buffer data(buf->ptr, buf->len);
493 wsrep::ws_handle ws_handle(wsrep::transaction_id(wsh->trx_id),
494 wsh->opaque);
495 wsrep::ws_meta ws_meta(
496 wsrep::gtid(wsrep::id(meta->gtid.uuid.data,
497 sizeof(meta->gtid.uuid.data)),
498 wsrep::seqno(meta->gtid.seqno)),
499 wsrep::stid(wsrep::id(meta->stid.node.data,
500 sizeof(meta->stid.node.data)),
501 wsrep::transaction_id(meta->stid.trx),
502 wsrep::client_id(meta->stid.conn)),
503 wsrep::seqno(seqno_from_native(meta->depends_on)),
504 map_flags_from_native(flags));
505 try
506 {
507 if (high_priority_service->apply(ws_handle, ws_meta, data))
508 {
509 return WSREP_CB_FAILURE;
510 }
511 *exit_loop = high_priority_service->must_exit();
512 return WSREP_CB_SUCCESS;
513 }
514 catch (const wsrep::runtime_error& e)
515 {
516 wsrep::log_error() << "Caught runtime error while applying "
517 << ws_meta.flags() << ": "
518 << e.what();
519 return WSREP_CB_FAILURE;
520 }
521 }
522
synced_cb(void * app_ctx)523 wsrep_cb_status_t synced_cb(void* app_ctx)
524 {
525 assert(app_ctx);
526 wsrep::server_state& server_state(
527 *reinterpret_cast<wsrep::server_state*>(app_ctx));
528 try
529 {
530 server_state.on_sync();
531 return WSREP_CB_SUCCESS;
532 }
533 catch (const wsrep::runtime_error& e)
534 {
535 std::cerr << "On sync failed: " << e.what() << "\n";
536 return WSREP_CB_FAILURE;
537 }
538 }
539
540
sst_donate_cb(void * app_ctx,void *,const wsrep_buf_t * req_buf,const wsrep_gtid_t * req_gtid,const wsrep_buf_t *,bool bypass)541 wsrep_cb_status_t sst_donate_cb(void* app_ctx,
542 void* ,
543 const wsrep_buf_t* req_buf,
544 const wsrep_gtid_t* req_gtid,
545 const wsrep_buf_t*,
546 bool bypass)
547 {
548 assert(app_ctx);
549 wsrep::server_state& server_state(
550 *reinterpret_cast<wsrep::server_state*>(app_ctx));
551 try
552 {
553 std::string req(reinterpret_cast<const char*>(req_buf->ptr),
554 req_buf->len);
555 wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
556 sizeof(req_gtid->uuid.data)),
557 wsrep::seqno(req_gtid->seqno));
558 if (server_state.start_sst(req, gtid, bypass))
559 {
560 return WSREP_CB_FAILURE;
561 }
562 return WSREP_CB_SUCCESS;
563 }
564 catch (const wsrep::runtime_error& e)
565 {
566 return WSREP_CB_FAILURE;
567 }
568 }
569
logger_cb(wsrep_log_level_t level,const char * msg)570 void logger_cb(wsrep_log_level_t level, const char* msg)
571 {
572 static const char* const pfx("P:"); // "provider"
573 wsrep::log::level ll(wsrep::log::unknown);
574 switch (level)
575 {
576 case WSREP_LOG_FATAL:
577 case WSREP_LOG_ERROR:
578 ll = wsrep::log::error;
579 break;
580 case WSREP_LOG_WARN:
581 ll = wsrep::log::warning;
582 break;
583 case WSREP_LOG_INFO:
584 ll = wsrep::log::info;
585 break;
586 case WSREP_LOG_DEBUG:
587 ll = wsrep::log::debug;
588 break;
589 }
590 wsrep::log(ll, pfx) << msg;
591 }
592
init_thread_service(void * dlh,wsrep::thread_service * thread_service)593 static int init_thread_service(void* dlh,
594 wsrep::thread_service* thread_service)
595 {
596 assert(thread_service);
597 if (wsrep::thread_service_v1_probe(dlh))
598 {
599 // No support in library.
600 return 1;
601 }
602 else
603 {
604 if (thread_service->before_init())
605 {
606 wsrep::log_error() << "Thread service before init failed";
607 return 1;
608 }
609 wsrep::thread_service_v1_init(dlh, thread_service);
610 if (thread_service->after_init())
611 {
612 wsrep::log_error() << "Thread service after init failed";
613 return 1;
614 }
615 }
616 return 0;
617 }
618
deinit_thread_service(void * dlh)619 static void deinit_thread_service(void* dlh)
620 {
621 // assert(not wsrep::thread_service_v1_probe(dlh));
622 wsrep::thread_service_v1_deinit(dlh);
623 }
624
init_tls_service(void * dlh,wsrep::tls_service * tls_service)625 static int init_tls_service(void* dlh,
626 wsrep::tls_service* tls_service)
627 {
628 assert(tls_service);
629 if (not wsrep::tls_service_v1_probe(dlh))
630 {
631 return wsrep::tls_service_v1_init(dlh, tls_service);
632 }
633 return 1;
634 }
635
deinit_tls_service(void * dlh)636 static void deinit_tls_service(void* dlh)
637 {
638 // assert(not wsrep::tls_service_v1_probe(dlh));
639 wsrep::tls_service_v1_deinit(dlh);
640 }
641
init_allowlist_service(void * dlh,wsrep::allowlist_service * allowlist_service)642 static int init_allowlist_service(void* dlh,
643 wsrep::allowlist_service* allowlist_service)
644 {
645 assert(allowlist_service);
646 if (not wsrep::allowlist_service_v1_probe(dlh))
647 {
648 return wsrep::allowlist_service_v1_init(dlh, allowlist_service);
649 }
650 return 1;
651 }
652
deinit_allowlist_service(void * dlh)653 static void deinit_allowlist_service(void* dlh)
654 {
655 // assert(not wsrep::allowlist_service_v1_probe(dlh));
656 wsrep::allowlist_service_v1_deinit(dlh);
657 }
658
init_event_service(void * dlh,wsrep::event_service * service)659 static int init_event_service(void* dlh,
660 wsrep::event_service* service)
661 {
662 assert(service);
663 if (not wsrep::event_service_v1_probe(dlh))
664 {
665 return wsrep::event_service_v1_init(dlh, service);
666 }
667 return 1;
668 }
669
deinit_event_service(void * dlh)670 static void deinit_event_service(void* dlh)
671 {
672 wsrep::event_service_v1_deinit(dlh);
673 }
674 }
675
676
677
init_services(const wsrep::provider::services & services)678 void wsrep::wsrep_provider_v26::init_services(
679 const wsrep::provider::services& services)
680 {
681 if (services.thread_service)
682 {
683 if (init_thread_service(wsrep_->dlh, services.thread_service))
684 {
685 throw wsrep::runtime_error("Failed to initialize thread service");
686 }
687 services_enabled_.thread_service = services.thread_service;
688 }
689 if (services.tls_service)
690 {
691 if (init_tls_service(wsrep_->dlh, services.tls_service))
692 {
693 throw wsrep::runtime_error("Failed to initialze TLS service");
694 }
695 services_enabled_.tls_service = services.tls_service;
696 }
697 if (services.allowlist_service)
698 {
699 if (init_allowlist_service(wsrep_->dlh, services.allowlist_service))
700 {
701 throw wsrep::runtime_error("Failed to initialze allowlist service");
702 }
703 services_enabled_.allowlist_service = services.allowlist_service;
704 }
705 if (services.event_service)
706 {
707 if (init_event_service(wsrep_->dlh, services.event_service))
708 {
709 wsrep::log_warning() << "Failed to initialize event service";
710 // provider does not produce events, ignore
711 }
712 else
713 {
714 services_enabled_.event_service = services.event_service;
715 }
716 }
717 }
718
deinit_services()719 void wsrep::wsrep_provider_v26::deinit_services()
720 {
721 if (services_enabled_.event_service)
722 deinit_event_service(wsrep_->dlh);
723 if (services_enabled_.tls_service)
724 deinit_tls_service(wsrep_->dlh);
725 if (services_enabled_.thread_service)
726 deinit_thread_service(wsrep_->dlh);
727 if (services_enabled_.allowlist_service)
728 deinit_allowlist_service(wsrep_->dlh);
729 }
730
wsrep_provider_v26(wsrep::server_state & server_state,const std::string & provider_options,const std::string & provider_spec,const wsrep::provider::services & services)731 wsrep::wsrep_provider_v26::wsrep_provider_v26(
732 wsrep::server_state& server_state,
733 const std::string& provider_options,
734 const std::string& provider_spec,
735 const wsrep::provider::services& services)
736 : provider(server_state)
737 , wsrep_()
738 , services_enabled_()
739 {
740 wsrep_gtid_t state_id;
741 bool encryption_enabled = server_state.encryption_service() &&
742 server_state.encryption_service()->encryption_enabled();
743 std::memcpy(state_id.uuid.data,
744 server_state.initial_position().id().data(),
745 sizeof(state_id.uuid.data));
746 state_id.seqno = server_state.initial_position().seqno().get();
747 struct wsrep_init_args init_args;
748 memset(&init_args, 0, sizeof(init_args));
749 init_args.app_ctx = &server_state;
750 init_args.node_name = server_state_.name().c_str();
751 init_args.node_address = server_state_.address().c_str();
752 init_args.node_incoming = server_state_.incoming_address().c_str();
753 init_args.data_dir = server_state_.working_dir().c_str();
754 init_args.options = provider_options.c_str();
755 init_args.proto_ver = server_state.max_protocol_version();
756 init_args.state_id = &state_id;
757 init_args.state = 0;
758 init_args.logger_cb = &logger_cb;
759 init_args.connected_cb = &connected_cb;
760 init_args.view_cb = &view_cb;
761 init_args.sst_request_cb = &sst_request_cb;
762 init_args.encrypt_cb = encryption_enabled ? encrypt_cb : NULL;
763 init_args.apply_cb = &apply_cb;
764 init_args.unordered_cb = 0;
765 init_args.sst_donate_cb = &sst_donate_cb;
766 init_args.synced_cb = &synced_cb;
767
768 if (wsrep_load(provider_spec.c_str(), &wsrep_, logger_cb))
769 {
770 throw wsrep::runtime_error("Failed to load wsrep library");
771 }
772
773 init_services(services);
774
775 if (wsrep_->init(wsrep_, &init_args) != WSREP_OK)
776 {
777 throw wsrep::runtime_error("Failed to initialize wsrep provider");
778 }
779
780 if (encryption_enabled)
781 {
782 const std::vector<unsigned char>& key = server_state.get_encryption_key();
783 if (key.size())
784 {
785 wsrep::const_buffer const_key(key.data(), key.size());
786 enum status const retval(enc_set_key(const_key));
787 if (retval != success)
788 {
789 std::string msg("Failed to set encryption key: ");
790 msg += to_string(retval);
791 throw wsrep::runtime_error(msg);
792 }
793 }
794 }
795 }
796
~wsrep_provider_v26()797 wsrep::wsrep_provider_v26::~wsrep_provider_v26()
798 {
799 wsrep_->free(wsrep_);
800 deinit_services();
801 wsrep_unload(wsrep_);
802 }
803
connect(const std::string & cluster_name,const std::string & cluster_url,const std::string & state_donor,bool bootstrap)804 enum wsrep::provider::status wsrep::wsrep_provider_v26::connect(
805 const std::string& cluster_name,
806 const std::string& cluster_url,
807 const std::string& state_donor,
808 bool bootstrap)
809 {
810 return map_return_value(wsrep_->connect(wsrep_,
811 cluster_name.c_str(),
812 cluster_url.c_str(),
813 state_donor.c_str(),
814 bootstrap));
815 }
816
disconnect()817 int wsrep::wsrep_provider_v26::disconnect()
818 {
819 int ret(0);
820 wsrep_status_t wret;
821 if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK)
822 {
823 std::cerr << "Failed to disconnect from cluster: "
824 << wret << "\n";
825 ret = 1;
826 }
827 return ret;
828 }
829
capabilities() const830 int wsrep::wsrep_provider_v26::capabilities() const
831 {
832 return map_capabilities_from_native(wsrep_->capabilities(wsrep_));
833 }
desync()834 int wsrep::wsrep_provider_v26::desync()
835 {
836 return (wsrep_->desync(wsrep_) != WSREP_OK);
837 }
838
resync()839 int wsrep::wsrep_provider_v26::resync()
840 {
841 return (wsrep_->resync(wsrep_) != WSREP_OK);
842 }
843
pause()844 wsrep::seqno wsrep::wsrep_provider_v26::pause()
845 {
846 return wsrep::seqno(wsrep_->pause(wsrep_));
847 }
848
resume()849 int wsrep::wsrep_provider_v26::resume()
850 {
851 return (wsrep_->resume(wsrep_) != WSREP_OK);
852 }
853
854 enum wsrep::provider::status
run_applier(wsrep::high_priority_service * applier_ctx)855 wsrep::wsrep_provider_v26::run_applier(
856 wsrep::high_priority_service *applier_ctx)
857 {
858 return map_return_value(wsrep_->recv(wsrep_, applier_ctx));
859 }
860
861 enum wsrep::provider::status
assign_read_view(wsrep::ws_handle & ws_handle,const wsrep::gtid * gtid)862 wsrep::wsrep_provider_v26::assign_read_view(wsrep::ws_handle& ws_handle,
863 const wsrep::gtid* gtid)
864 {
865 const wsrep_gtid_t* gtid_ptr(NULL);
866 wsrep_gtid_t tmp;
867
868 if (gtid)
869 {
870 ::memcpy(&tmp.uuid, gtid->id().data(), sizeof(tmp.uuid));
871 tmp.seqno = gtid->seqno().get();
872 gtid_ptr = &tmp;
873 }
874
875 mutable_ws_handle mwsh(ws_handle);
876 return map_return_value(wsrep_->assign_read_view(wsrep_, mwsh.native(),
877 gtid_ptr));
878 }
879
append_key(wsrep::ws_handle & ws_handle,const wsrep::key & key)880 int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle,
881 const wsrep::key& key)
882 {
883 if (key.size() > 3)
884 {
885 assert(0);
886 return 1;
887 }
888 wsrep_buf_t key_parts[3];
889 for (size_t i(0); i < key.size(); ++i)
890 {
891 key_parts[i].ptr = key.key_parts()[i].ptr();
892 key_parts[i].len = key.key_parts()[i].size();
893 }
894 wsrep_key_t wsrep_key = {key_parts, key.size()};
895 mutable_ws_handle mwsh(ws_handle);
896 return (wsrep_->append_key(
897 wsrep_, mwsh.native(),
898 &wsrep_key, 1, map_key_type(key.type()), true)
899 != WSREP_OK);
900 }
901
902 enum wsrep::provider::status
append_data(wsrep::ws_handle & ws_handle,const wsrep::const_buffer & data)903 wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
904 const wsrep::const_buffer& data)
905 {
906 const wsrep_buf_t wsrep_buf = {data.data(), data.size()};
907 mutable_ws_handle mwsh(ws_handle);
908 return map_return_value(
909 wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf,
910 1, WSREP_DATA_ORDERED, true));
911 }
912
913 enum wsrep::provider::status
certify(wsrep::client_id client_id,wsrep::ws_handle & ws_handle,int flags,wsrep::ws_meta & ws_meta)914 wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
915 wsrep::ws_handle& ws_handle,
916 int flags,
917 wsrep::ws_meta& ws_meta)
918 {
919 mutable_ws_handle mwsh(ws_handle);
920 mutable_ws_meta mmeta(ws_meta, flags);
921 return map_return_value(
922 wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
923 mmeta.native_flags(),
924 mmeta.native()));
925 }
926
927 enum wsrep::provider::status
bf_abort(wsrep::seqno bf_seqno,wsrep::transaction_id victim_id,wsrep::seqno & victim_seqno)928 wsrep::wsrep_provider_v26::bf_abort(
929 wsrep::seqno bf_seqno,
930 wsrep::transaction_id victim_id,
931 wsrep::seqno& victim_seqno)
932 {
933 wsrep_seqno_t wsrep_victim_seqno;
934 wsrep_status_t ret(
935 wsrep_->abort_certification(
936 wsrep_, seqno_to_native(bf_seqno),
937 victim_id.get(), &wsrep_victim_seqno));
938 victim_seqno = seqno_from_native(wsrep_victim_seqno);
939 return map_return_value(ret);
940 }
941
942 enum wsrep::provider::status
rollback(wsrep::transaction_id id)943 wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id)
944 {
945 return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0));
946 }
947
948 enum wsrep::provider::status
commit_order_enter(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)949 wsrep::wsrep_provider_v26::commit_order_enter(
950 const wsrep::ws_handle& ws_handle,
951 const wsrep::ws_meta& ws_meta)
952 {
953 const_ws_handle cwsh(ws_handle);
954 const_ws_meta cwsm(ws_meta);
955 return map_return_value(
956 wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native()));
957 }
958
959 int
commit_order_leave(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::mutable_buffer & err)960 wsrep::wsrep_provider_v26::commit_order_leave(
961 const wsrep::ws_handle& ws_handle,
962 const wsrep::ws_meta& ws_meta,
963 const wsrep::mutable_buffer& err)
964 {
965 const_ws_handle cwsh(ws_handle);
966 const_ws_meta cwsm(ws_meta);
967 wsrep_buf_t const err_buf = { err.data(), err.size() };
968 int ret(wsrep_->commit_order_leave(
969 wsrep_, cwsh.native(), cwsm.native(), &err_buf) != WSREP_OK);
970 return ret;
971 }
972
release(wsrep::ws_handle & ws_handle)973 int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle)
974 {
975 mutable_ws_handle mwsh(ws_handle);
976 return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK);
977 }
978
979 enum wsrep::provider::status
replay(const wsrep::ws_handle & ws_handle,wsrep::high_priority_service * reply_service)980 wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle,
981 wsrep::high_priority_service* reply_service)
982 {
983 const_ws_handle mwsh(ws_handle);
984 return map_return_value(
985 wsrep_->replay_trx(wsrep_, mwsh.native(), reply_service));
986 }
987
988 enum wsrep::provider::status
enter_toi(wsrep::client_id client_id,const wsrep::key_array & keys,const wsrep::const_buffer & buffer,wsrep::ws_meta & ws_meta,int flags)989 wsrep::wsrep_provider_v26::enter_toi(
990 wsrep::client_id client_id,
991 const wsrep::key_array& keys,
992 const wsrep::const_buffer& buffer,
993 wsrep::ws_meta& ws_meta,
994 int flags)
995 {
996 mutable_ws_meta mmeta(ws_meta, flags);
997 std::vector<std::vector<wsrep_buf_t> > key_parts;
998 std::vector<wsrep_key_t> wsrep_keys;
999 wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()};
1000 for (size_t i(0); i < keys.size(); ++i)
1001 {
1002 key_parts.push_back(std::vector<wsrep_buf_t>());
1003 for (size_t kp(0); kp < keys[i].size(); ++kp)
1004 {
1005 wsrep_buf_t buf = {keys[i].key_parts()[kp].data(),
1006 keys[i].key_parts()[kp].size()};
1007 key_parts[i].push_back(buf);
1008 }
1009 }
1010 for (size_t i(0); i < key_parts.size(); ++i)
1011 {
1012 wsrep_key_t key = {key_parts[i].data(), key_parts[i].size()};
1013 wsrep_keys.push_back(key);
1014 }
1015 return map_return_value(wsrep_->to_execute_start(
1016 wsrep_,
1017 client_id.get(),
1018 wsrep_keys.data(),
1019 wsrep_keys.size(),
1020 &wsrep_buf,
1021 buffer.size() ? 1 : 0,
1022 mmeta.native_flags(),
1023 mmeta.native()));
1024 }
1025
1026 enum wsrep::provider::status
leave_toi(wsrep::client_id client_id,const wsrep::mutable_buffer & err)1027 wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id,
1028 const wsrep::mutable_buffer& err)
1029 {
1030 const wsrep_buf_t err_buf = { err.data(), err.size() };
1031 return map_return_value(wsrep_->to_execute_end(
1032 wsrep_, client_id.get(), &err_buf));
1033 }
1034
1035 std::pair<wsrep::gtid, enum wsrep::provider::status>
causal_read(int timeout) const1036 wsrep::wsrep_provider_v26::causal_read(int timeout) const
1037 {
1038 wsrep_gtid_t wsrep_gtid;
1039 wsrep_status_t ret(wsrep_->sync_wait(wsrep_, 0, timeout, &wsrep_gtid));
1040 wsrep::gtid gtid(ret == WSREP_OK ?
1041 wsrep::gtid(wsrep::id(wsrep_gtid.uuid.data,
1042 sizeof(wsrep_gtid.uuid.data)),
1043 wsrep::seqno(wsrep_gtid.seqno)) :
1044 wsrep::gtid::undefined());
1045 return std::make_pair(gtid, map_return_value(ret));
1046 }
1047
1048 enum wsrep::provider::status
wait_for_gtid(const wsrep::gtid & gtid,int timeout) const1049 wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
1050 const
1051 {
1052 wsrep_gtid_t wsrep_gtid;
1053 std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
1054 sizeof(wsrep_gtid.uuid.data));
1055 wsrep_gtid.seqno = gtid.seqno().get();
1056 return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0));
1057 }
1058
last_committed_gtid() const1059 wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const
1060 {
1061 wsrep_gtid_t wsrep_gtid;
1062 if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK)
1063 {
1064 throw wsrep::runtime_error("Failed to read last committed id");
1065 }
1066 return wsrep::gtid(
1067 wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)),
1068 wsrep::seqno(wsrep_gtid.seqno));
1069 }
1070
1071 enum wsrep::provider::status
sst_sent(const wsrep::gtid & gtid,int err)1072 wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
1073 {
1074 wsrep_gtid_t wsrep_gtid;
1075 std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
1076 sizeof(wsrep_gtid.uuid.data));
1077 wsrep_gtid.seqno = gtid.seqno().get();
1078 return map_return_value(wsrep_->sst_sent(wsrep_, &wsrep_gtid, err));
1079 }
1080
1081 enum wsrep::provider::status
sst_received(const wsrep::gtid & gtid,int err)1082 wsrep::wsrep_provider_v26::sst_received(const wsrep::gtid& gtid, int err)
1083 {
1084 wsrep_gtid_t wsrep_gtid;
1085 std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
1086 sizeof(wsrep_gtid.uuid.data));
1087 wsrep_gtid.seqno = gtid.seqno().get();
1088 return map_return_value(wsrep_->sst_received(wsrep_, &wsrep_gtid, 0, err));
1089 }
1090
1091 enum wsrep::provider::status
enc_set_key(const wsrep::const_buffer & key)1092 wsrep::wsrep_provider_v26::enc_set_key(const wsrep::const_buffer& key)
1093 {
1094 wsrep_enc_key_t enc_key = {key.data(), key.size()};
1095 return map_return_value(wsrep_->enc_set_key(wsrep_, &enc_key));
1096 }
1097
1098 std::vector<wsrep::provider::status_variable>
status() const1099 wsrep::wsrep_provider_v26::status() const
1100 {
1101 std::vector<status_variable> ret;
1102 wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_));
1103 wsrep_stats_var* i(stats);
1104 if (i)
1105 {
1106 while (i->name)
1107 {
1108 switch (i->type)
1109 {
1110 case WSREP_VAR_STRING:
1111 ret.push_back(status_variable(i->name, i->value._string));
1112 break;
1113 case WSREP_VAR_INT64:
1114 {
1115 std::ostringstream os;
1116 os << i->value._int64;
1117 ret.push_back(status_variable(i->name, os.str()));
1118 break;
1119 }
1120 case WSREP_VAR_DOUBLE:
1121 {
1122 std::ostringstream os;
1123 os << i->value._double;
1124 ret.push_back(status_variable(i->name, os.str()));
1125 break;
1126 }
1127 default:
1128 assert(0);
1129 break;
1130 }
1131 ++i;
1132 }
1133 wsrep_->stats_free(wsrep_, stats);
1134 }
1135 return ret;
1136 }
1137
reset_status()1138 void wsrep::wsrep_provider_v26::reset_status()
1139 {
1140 wsrep_->stats_reset(wsrep_);
1141 }
1142
options() const1143 std::string wsrep::wsrep_provider_v26::options() const
1144 {
1145 std::string ret;
1146 char* opts;
1147 if ((opts = wsrep_->options_get(wsrep_)))
1148 {
1149 ret = opts;
1150 free(opts);
1151 }
1152 else
1153 {
1154 throw wsrep::runtime_error("Failed to get provider options");
1155 }
1156 return ret;
1157 }
1158
1159 enum wsrep::provider::status
options(const std::string & opts)1160 wsrep::wsrep_provider_v26::options(const std::string& opts)
1161 {
1162 return map_return_value(wsrep_->options_set(wsrep_, opts.c_str()));
1163 }
1164
name() const1165 std::string wsrep::wsrep_provider_v26::name() const
1166 {
1167 return (wsrep_->provider_name ? wsrep_->provider_name : "unknown");
1168 }
1169
version() const1170 std::string wsrep::wsrep_provider_v26::version() const
1171 {
1172 return (wsrep_->provider_version ? wsrep_->provider_version : "unknown");
1173 }
1174
vendor() const1175 std::string wsrep::wsrep_provider_v26::vendor() const
1176 {
1177 return (wsrep_->provider_vendor ? wsrep_->provider_vendor : "unknown");
1178 }
1179
native() const1180 void* wsrep::wsrep_provider_v26::native() const
1181 {
1182 return wsrep_;
1183 }
1184