1 /*
2  * Copyright (C) 2018-2021 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "wsrep_provider_v26.hpp"
21 
22 #include "wsrep/encryption_service.hpp"
23 #include "wsrep/server_state.hpp"
24 #include "wsrep/high_priority_service.hpp"
25 #include "wsrep/view.hpp"
26 #include "wsrep/exception.hpp"
27 #include "wsrep/logger.hpp"
28 #include "wsrep/thread_service.hpp"
29 #include "wsrep/tls_service.hpp"
30 #include "wsrep/allowlist_service.hpp"
31 
32 #include "thread_service_v1.hpp"
33 #include "tls_service_v1.hpp"
34 #include "allowlist_service_v1.hpp"
35 #include "event_service_v1.hpp"
36 #include "v26/wsrep_api.h"
37 
38 
39 #include <dlfcn.h>
40 #include <cassert>
41 #include <climits>
42 
43 #include <iostream>
44 #include <sstream>
45 #include <cstring> // strerror()
46 
47 namespace
48 {
49     /////////////////////////////////////////////////////////////////////
50     //                           Helpers                               //
51     /////////////////////////////////////////////////////////////////////
52 
map_return_value(wsrep_status_t status)53     enum wsrep::provider::status map_return_value(wsrep_status_t status)
54     {
55         switch (status)
56         {
57         case WSREP_OK:
58             return wsrep::provider::success;
59         case WSREP_WARNING:
60             return wsrep::provider::error_warning;
61         case WSREP_TRX_MISSING:
62             return wsrep::provider::error_transaction_missing;
63         case WSREP_TRX_FAIL:
64             return wsrep::provider::error_certification_failed;
65         case WSREP_BF_ABORT:
66             return wsrep::provider::error_bf_abort;
67         case WSREP_SIZE_EXCEEDED:
68             return wsrep::provider::error_size_exceeded;
69         case WSREP_CONN_FAIL:
70             return wsrep::provider::error_connection_failed;
71         case WSREP_NODE_FAIL:
72             return wsrep::provider::error_provider_failed;
73         case WSREP_FATAL:
74             return wsrep::provider::error_fatal;
75         case WSREP_NOT_IMPLEMENTED:
76             return wsrep::provider::error_not_implemented;
77         case WSREP_NOT_ALLOWED:
78             return wsrep::provider::error_not_allowed;
79         }
80 
81         wsrep::log_warning() << "Unexpected value for wsrep_status_t: "
82                              << status << " ("
83                              << (status < 0 ? strerror(-status) : "") << ')';
84 
85         return wsrep::provider::error_unknown;
86     }
87 
map_key_type(enum wsrep::key::type type)88     wsrep_key_type_t map_key_type(enum wsrep::key::type type)
89     {
90         switch (type)
91         {
92         case wsrep::key::shared:        return WSREP_KEY_SHARED;
93         case wsrep::key::reference:     return WSREP_KEY_REFERENCE;
94         case wsrep::key::update:        return WSREP_KEY_UPDATE;
95         case wsrep::key::exclusive:     return WSREP_KEY_EXCLUSIVE;
96         }
97         assert(0);
98         throw wsrep::runtime_error("Invalid key type");
99     }
100 
seqno_to_native(wsrep::seqno seqno)101     static inline wsrep_seqno_t seqno_to_native(wsrep::seqno seqno)
102     {
103         return seqno.get();
104     }
105 
seqno_from_native(wsrep_seqno_t seqno)106     static inline wsrep::seqno seqno_from_native(wsrep_seqno_t seqno)
107     {
108         return wsrep::seqno(seqno);
109     }
110 
111     template <typename F, typename T>
map_one(const int flags,const F from,const T to)112     inline uint32_t map_one(const int flags, const F from,
113                             const T to)
114     {
115         // INT_MAX because GCC 4.4 does not eat numeric_limits<int>::max()
116         // in static_assert
117         static_assert(WSREP_FLAGS_LAST < INT_MAX,
118                       "WSREP_FLAGS_LAST < INT_MAX");
119         return static_cast<uint32_t>((flags & static_cast<int>(from)) ?
120                                      static_cast<int>(to) : 0);
121     }
122 
map_flags_to_native(int flags)123     uint32_t map_flags_to_native(int flags)
124     {
125       using wsrep::provider;
126       return static_cast<uint32_t>(
127           map_one(flags, provider::flag::start_transaction,
128                   WSREP_FLAG_TRX_START) |
129           map_one(flags, provider::flag::commit, WSREP_FLAG_TRX_END) |
130           map_one(flags, provider::flag::rollback, WSREP_FLAG_ROLLBACK) |
131           map_one(flags, provider::flag::isolation, WSREP_FLAG_ISOLATION) |
132           map_one(flags, provider::flag::pa_unsafe, WSREP_FLAG_PA_UNSAFE) |
133           // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
134           // |
135           // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
136           map_one(flags, provider::flag::prepare, WSREP_FLAG_TRX_PREPARE) |
137           map_one(flags, provider::flag::snapshot, WSREP_FLAG_SNAPSHOT) |
138           map_one(flags, provider::flag::implicit_deps,
139                   WSREP_FLAG_IMPLICIT_DEPS));
140     }
141 
map_flags_from_native(uint32_t flags_u32)142     int map_flags_from_native(uint32_t flags_u32)
143     {
144       using wsrep::provider;
145       const int flags(static_cast<int>(flags_u32));
146       return static_cast<int>(
147           map_one(flags, WSREP_FLAG_TRX_START,
148                   provider::flag::start_transaction) |
149           map_one(flags, WSREP_FLAG_TRX_END, provider::flag::commit) |
150           map_one(flags, WSREP_FLAG_ROLLBACK, provider::flag::rollback) |
151           map_one(flags, WSREP_FLAG_ISOLATION, provider::flag::isolation) |
152           map_one(flags, WSREP_FLAG_PA_UNSAFE, provider::flag::pa_unsafe) |
153           // map_one(flags, provider::flag::commutative, WSREP_FLAG_COMMUTATIVE)
154           // |
155           // map_one(flags, provider::flag::native, WSREP_FLAG_NATIVE) |
156           map_one(flags, WSREP_FLAG_TRX_PREPARE, provider::flag::prepare) |
157           map_one(flags, WSREP_FLAG_SNAPSHOT, provider::flag::snapshot) |
158           map_one(flags, WSREP_FLAG_IMPLICIT_DEPS,
159                   provider::flag::implicit_deps));
160     }
161 
162     class mutable_ws_handle
163     {
164     public:
mutable_ws_handle(wsrep::ws_handle & ws_handle)165         mutable_ws_handle(wsrep::ws_handle& ws_handle)
166             : ws_handle_(ws_handle)
167             , native_((wsrep_ws_handle_t)
168                       {
169                           ws_handle_.transaction_id().get(),
170                           ws_handle_.opaque()
171                       })
172         { }
173 
~mutable_ws_handle()174         ~mutable_ws_handle()
175         {
176             ws_handle_ = wsrep::ws_handle(
177                 wsrep::transaction_id(native_.trx_id), native_.opaque);
178         }
179 
native()180         wsrep_ws_handle_t* native()
181         {
182             return &native_;
183         }
184     private:
185         wsrep::ws_handle& ws_handle_;
186         wsrep_ws_handle_t native_;
187     };
188 
189     class const_ws_handle
190     {
191     public:
const_ws_handle(const wsrep::ws_handle & ws_handle)192         const_ws_handle(const wsrep::ws_handle& ws_handle)
193             : ws_handle_(ws_handle)
194             , native_((wsrep_ws_handle_t)
195                       {
196                           ws_handle_.transaction_id().get(),
197                           ws_handle_.opaque()
198                       })
199         { }
200 
~const_ws_handle()201         ~const_ws_handle()
202         {
203             assert(ws_handle_.transaction_id().get() == native_.trx_id);
204             assert(ws_handle_.opaque() == native_.opaque);
205         }
206 
native() const207         const wsrep_ws_handle_t* native() const
208         {
209             return &native_;
210         }
211     private:
212         const wsrep::ws_handle& ws_handle_;
213         const wsrep_ws_handle_t native_;
214     };
215 
216     class mutable_ws_meta
217     {
218     public:
mutable_ws_meta(wsrep::ws_meta & ws_meta,int flags)219         mutable_ws_meta(wsrep::ws_meta& ws_meta, int flags)
220             : ws_meta_(ws_meta)
221             , trx_meta_()
222             , flags_(flags)
223         {
224             std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
225                         sizeof(trx_meta_.gtid.uuid.data));
226             trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
227             std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
228                         sizeof(trx_meta_.stid.node.data));
229             trx_meta_.stid.conn = ws_meta.client_id().get();
230             trx_meta_.stid.trx = ws_meta.transaction_id().get();
231             trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
232         }
233 
~mutable_ws_meta()234         ~mutable_ws_meta()
235         {
236             ws_meta_ = wsrep::ws_meta(
237                 wsrep::gtid(
238                     wsrep::id(trx_meta_.gtid.uuid.data,
239                               sizeof(trx_meta_.gtid.uuid.data)),
240                     seqno_from_native(trx_meta_.gtid.seqno)),
241                 wsrep::stid(wsrep::id(trx_meta_.stid.node.data,
242                                       sizeof(trx_meta_.stid.node.data)),
243                             wsrep::transaction_id(trx_meta_.stid.trx),
244                             wsrep::client_id(trx_meta_.stid.conn)),
245                 seqno_from_native(trx_meta_.depends_on), flags_);
246         }
247 
native()248         wsrep_trx_meta* native() { return &trx_meta_; }
native_flags() const249         uint32_t native_flags() const { return map_flags_to_native(flags_); }
250     private:
251         wsrep::ws_meta& ws_meta_;
252         wsrep_trx_meta_t trx_meta_;
253         int flags_;
254     };
255 
256     class const_ws_meta
257     {
258     public:
const_ws_meta(const wsrep::ws_meta & ws_meta)259         const_ws_meta(const wsrep::ws_meta& ws_meta)
260             : trx_meta_()
261         {
262             std::memcpy(trx_meta_.gtid.uuid.data, ws_meta.group_id().data(),
263                         sizeof(trx_meta_.gtid.uuid.data));
264             trx_meta_.gtid.seqno = seqno_to_native(ws_meta.seqno());
265             std::memcpy(trx_meta_.stid.node.data, ws_meta.server_id().data(),
266                         sizeof(trx_meta_.stid.node.data));
267             trx_meta_.stid.conn = ws_meta.client_id().get();
268             trx_meta_.stid.trx = ws_meta.transaction_id().get();
269             trx_meta_.depends_on = seqno_to_native(ws_meta.depends_on());
270         }
271 
~const_ws_meta()272         ~const_ws_meta()
273         {
274         }
275 
native() const276         const wsrep_trx_meta* native() const { return &trx_meta_; }
277     private:
278         wsrep_trx_meta_t trx_meta_;
279     };
280 
map_view_status_from_native(wsrep_view_status_t status)281     enum wsrep::view::status map_view_status_from_native(
282         wsrep_view_status_t status)
283     {
284         switch (status)
285         {
286         case WSREP_VIEW_PRIMARY: return wsrep::view::primary;
287         case WSREP_VIEW_NON_PRIMARY: return wsrep::view::non_primary;
288         case WSREP_VIEW_DISCONNECTED: return wsrep::view::disconnected;
289         default: throw wsrep::runtime_error("Unknown view status");
290         }
291     }
292 
293     /** @todo Currently capabilities defined in provider.hpp
294      * are one to one with wsrep_api.h. However, the mapping should
295      * be made explicit. */
map_capabilities_from_native(wsrep_cap_t capabilities)296     int map_capabilities_from_native(wsrep_cap_t capabilities)
297     {
298         return static_cast<int>(capabilities);
299     }
view_from_native(const wsrep_view_info & view_info,const wsrep::id & own_id)300     wsrep::view view_from_native(const wsrep_view_info& view_info,
301                                  const wsrep::id& own_id)
302     {
303         std::vector<wsrep::view::member> members;
304         for (int i(0); i < view_info.memb_num; ++i)
305         {
306             wsrep::id id(view_info.members[i].id.data, sizeof(view_info.members[i].id.data));
307             std::string name(
308                 view_info.members[i].name,
309                 strnlen(view_info.members[i].name,
310                         sizeof(view_info.members[i].name)));
311             std::string incoming(
312                 view_info.members[i].incoming,
313                 strnlen(view_info.members[i].incoming,
314                         sizeof(view_info.members[i].incoming)));
315             members.push_back(wsrep::view::member(id, name, incoming));
316         }
317 
318         int own_idx(-1);
319         if (own_id.is_undefined())
320         {
321             // If own ID is undefined, obtain it from the view. This is
322             // the case on the initial connect to cluster.
323             own_idx = view_info.my_idx;
324         }
325         else
326         {
327             // If the node has already obtained its ID from cluster,
328             // its position in the view (or lack thereof) must be determined
329             // by the ID.
330             for (size_t i(0); i < members.size(); ++i)
331             {
332                 if (own_id == members[i].id())
333                 {
334                     own_idx = static_cast<int>(i);
335                     break;
336                 }
337             }
338         }
339 
340         return wsrep::view(
341             wsrep::gtid(
342                 wsrep::id(view_info.state_id.uuid.data,
343                           sizeof(view_info.state_id.uuid.data)),
344                 wsrep::seqno(view_info.state_id.seqno)),
345             wsrep::seqno(view_info.view),
346             map_view_status_from_native(view_info.status),
347             map_capabilities_from_native(view_info.capabilities),
348             own_idx,
349             view_info.proto_ver,
350             members);
351     }
352 
353     /////////////////////////////////////////////////////////////////////
354     //                         Callbacks                               //
355     /////////////////////////////////////////////////////////////////////
356 
connected_cb(void * app_ctx,const wsrep_view_info_t * view_info)357     wsrep_cb_status_t connected_cb(
358         void* app_ctx,
359         const wsrep_view_info_t* view_info)
360     {
361         assert(app_ctx);
362         wsrep::server_state& server_state(
363             *reinterpret_cast<wsrep::server_state*>(app_ctx));
364         wsrep::view view(view_from_native(*view_info, server_state.id()));
365         const ssize_t own_index(view.own_index());
366         assert(own_index >= 0);
367         if (own_index < 0)
368         {
369             wsrep::log_error() << "Connected without being in reported view";
370             return WSREP_CB_FAILURE;
371         }
372         assert(// first connect
373             server_state.id().is_undefined() ||
374             // reconnect to primary component
375             server_state.id() ==
376             view.members()[static_cast<size_t>(own_index)].id());
377         try
378         {
379             server_state.on_connect(view);
380             return WSREP_CB_SUCCESS;
381         }
382         catch (const wsrep::runtime_error& e)
383         {
384             wsrep::log_error() << "Exception: " << e.what();
385             return WSREP_CB_FAILURE;
386         }
387     }
388 
view_cb(void * app_ctx,void * recv_ctx,const wsrep_view_info_t * view_info,const char *,size_t)389     wsrep_cb_status_t view_cb(void* app_ctx,
390                               void* recv_ctx,
391                               const wsrep_view_info_t* view_info,
392                               const char*,
393                               size_t)
394     {
395         assert(app_ctx);
396         assert(view_info);
397         wsrep::server_state& server_state(
398             *reinterpret_cast<wsrep::server_state*>(app_ctx));
399         wsrep::high_priority_service* high_priority_service(
400             reinterpret_cast<wsrep::high_priority_service*>(recv_ctx));
401         try
402         {
403             wsrep::view view(view_from_native(*view_info, server_state.id()));
404             server_state.on_view(view, high_priority_service);
405             return WSREP_CB_SUCCESS;
406         }
407         catch (const wsrep::runtime_error& e)
408         {
409             wsrep::log_error() << "Exception: " << e.what();
410             return WSREP_CB_FAILURE;
411         }
412     }
413 
sst_request_cb(void * app_ctx,void ** sst_req,size_t * sst_req_len)414     wsrep_cb_status_t sst_request_cb(void* app_ctx,
415                                      void **sst_req, size_t* sst_req_len)
416     {
417         assert(app_ctx);
418         wsrep::server_state& server_state(
419             *reinterpret_cast<wsrep::server_state*>(app_ctx));
420 
421         try
422         {
423             std::string req(server_state.prepare_for_sst());
424             if (req.size() > 0)
425             {
426                 *sst_req = ::malloc(req.size() + 1);
427                 memcpy(*sst_req, req.data(), req.size() + 1);
428                 *sst_req_len = req.size() + 1;
429             }
430             else
431             {
432                 *sst_req = 0;
433                 *sst_req_len = 0;
434             }
435             return WSREP_CB_SUCCESS;
436         }
437         catch (const wsrep::runtime_error& e)
438         {
439             return WSREP_CB_FAILURE;
440         }
441     }
442 
encrypt_cb(void * app_ctx,wsrep_enc_ctx_t * enc_ctx,const wsrep_buf_t * input,void * output,wsrep_enc_direction_t direction,bool last)443     int encrypt_cb(void*                 app_ctx,
444                    wsrep_enc_ctx_t*      enc_ctx,
445                    const wsrep_buf_t*    input,
446                    void*                 output,
447                    wsrep_enc_direction_t direction,
448                    bool                  last)
449     {
450         assert(app_ctx);
451         wsrep::server_state& server_state(
452             *static_cast<wsrep::server_state*>(app_ctx));
453 
454         assert(server_state.encryption_service());
455         if (server_state.encryption_service() == 0)
456         {
457             wsrep::log_error() << "Encryption service not defined in encrypt_cb()";
458             return -1;
459         }
460 
461         wsrep::const_buffer key(enc_ctx->key->ptr, enc_ctx->key->len);
462         wsrep::const_buffer in(input->ptr, input->len);
463         try
464         {
465             return server_state.encryption_service()->do_crypt(&enc_ctx->ctx,
466                                                               key,
467                                                               enc_ctx->iv,
468                                                               in,
469                                                               output,
470                                                               direction == WSREP_ENC,
471                                                               last);
472         }
473         catch (const wsrep::runtime_error& e)
474         {
475             free(enc_ctx->ctx);
476             // Return negative value in case of callback error
477             return -1;
478         }
479     }
480 
apply_cb(void * ctx,const wsrep_ws_handle_t * wsh,uint32_t flags,const wsrep_buf_t * buf,const wsrep_trx_meta_t * meta,wsrep_bool_t * exit_loop)481     wsrep_cb_status_t apply_cb(void* ctx,
482                                const wsrep_ws_handle_t* wsh,
483                                uint32_t flags,
484                                const wsrep_buf_t* buf,
485                                const wsrep_trx_meta_t* meta,
486                                wsrep_bool_t* exit_loop)
487     {
488         wsrep::high_priority_service* high_priority_service(
489             reinterpret_cast<wsrep::high_priority_service*>(ctx));
490         assert(high_priority_service);
491 
492         wsrep::const_buffer data(buf->ptr, buf->len);
493         wsrep::ws_handle ws_handle(wsrep::transaction_id(wsh->trx_id),
494                                    wsh->opaque);
495         wsrep::ws_meta ws_meta(
496             wsrep::gtid(wsrep::id(meta->gtid.uuid.data,
497                                   sizeof(meta->gtid.uuid.data)),
498                         wsrep::seqno(meta->gtid.seqno)),
499             wsrep::stid(wsrep::id(meta->stid.node.data,
500                                   sizeof(meta->stid.node.data)),
501                         wsrep::transaction_id(meta->stid.trx),
502                         wsrep::client_id(meta->stid.conn)),
503             wsrep::seqno(seqno_from_native(meta->depends_on)),
504             map_flags_from_native(flags));
505         try
506         {
507             if (high_priority_service->apply(ws_handle, ws_meta, data))
508             {
509                 return WSREP_CB_FAILURE;
510             }
511             *exit_loop = high_priority_service->must_exit();
512             return WSREP_CB_SUCCESS;
513         }
514         catch (const wsrep::runtime_error& e)
515         {
516             wsrep::log_error() << "Caught runtime error while applying "
517                                << ws_meta.flags() << ": "
518                                << e.what();
519             return WSREP_CB_FAILURE;
520         }
521     }
522 
synced_cb(void * app_ctx)523     wsrep_cb_status_t synced_cb(void* app_ctx)
524     {
525         assert(app_ctx);
526         wsrep::server_state& server_state(
527             *reinterpret_cast<wsrep::server_state*>(app_ctx));
528         try
529         {
530             server_state.on_sync();
531             return WSREP_CB_SUCCESS;
532         }
533         catch (const wsrep::runtime_error& e)
534         {
535             std::cerr << "On sync failed: " << e.what() << "\n";
536             return WSREP_CB_FAILURE;
537         }
538     }
539 
540 
sst_donate_cb(void * app_ctx,void *,const wsrep_buf_t * req_buf,const wsrep_gtid_t * req_gtid,const wsrep_buf_t *,bool bypass)541     wsrep_cb_status_t sst_donate_cb(void* app_ctx,
542                                     void* ,
543                                     const wsrep_buf_t* req_buf,
544                                     const wsrep_gtid_t* req_gtid,
545                                     const wsrep_buf_t*,
546                                     bool bypass)
547     {
548         assert(app_ctx);
549         wsrep::server_state& server_state(
550             *reinterpret_cast<wsrep::server_state*>(app_ctx));
551         try
552         {
553             std::string req(reinterpret_cast<const char*>(req_buf->ptr),
554                             req_buf->len);
555             wsrep::gtid gtid(wsrep::id(req_gtid->uuid.data,
556                                        sizeof(req_gtid->uuid.data)),
557                              wsrep::seqno(req_gtid->seqno));
558             if (server_state.start_sst(req, gtid, bypass))
559             {
560                 return WSREP_CB_FAILURE;
561             }
562             return WSREP_CB_SUCCESS;
563         }
564         catch (const wsrep::runtime_error& e)
565         {
566             return WSREP_CB_FAILURE;
567         }
568     }
569 
logger_cb(wsrep_log_level_t level,const char * msg)570     void logger_cb(wsrep_log_level_t level, const char* msg)
571     {
572         static const char* const pfx("P:"); // "provider"
573         wsrep::log::level ll(wsrep::log::unknown);
574         switch (level)
575         {
576         case WSREP_LOG_FATAL:
577         case WSREP_LOG_ERROR:
578             ll = wsrep::log::error;
579             break;
580         case WSREP_LOG_WARN:
581             ll = wsrep::log::warning;
582             break;
583         case WSREP_LOG_INFO:
584             ll = wsrep::log::info;
585             break;
586         case WSREP_LOG_DEBUG:
587             ll = wsrep::log::debug;
588             break;
589         }
590         wsrep::log(ll, pfx) << msg;
591     }
592 
init_thread_service(void * dlh,wsrep::thread_service * thread_service)593     static int init_thread_service(void* dlh,
594                                    wsrep::thread_service* thread_service)
595     {
596         assert(thread_service);
597         if (wsrep::thread_service_v1_probe(dlh))
598         {
599             // No support in library.
600             return 1;
601         }
602         else
603         {
604             if (thread_service->before_init())
605             {
606                 wsrep::log_error() << "Thread service before init failed";
607                 return 1;
608             }
609             wsrep::thread_service_v1_init(dlh, thread_service);
610             if (thread_service->after_init())
611             {
612                 wsrep::log_error() << "Thread service after init failed";
613                 return 1;
614             }
615         }
616         return 0;
617     }
618 
deinit_thread_service(void * dlh)619     static void deinit_thread_service(void* dlh)
620     {
621         // assert(not wsrep::thread_service_v1_probe(dlh));
622         wsrep::thread_service_v1_deinit(dlh);
623     }
624 
init_tls_service(void * dlh,wsrep::tls_service * tls_service)625     static int init_tls_service(void* dlh,
626                                 wsrep::tls_service* tls_service)
627     {
628         assert(tls_service);
629         if (not wsrep::tls_service_v1_probe(dlh))
630         {
631             return wsrep::tls_service_v1_init(dlh, tls_service);
632         }
633         return 1;
634     }
635 
deinit_tls_service(void * dlh)636     static void deinit_tls_service(void* dlh)
637     {
638         // assert(not wsrep::tls_service_v1_probe(dlh));
639         wsrep::tls_service_v1_deinit(dlh);
640     }
641 
init_allowlist_service(void * dlh,wsrep::allowlist_service * allowlist_service)642     static int init_allowlist_service(void* dlh,
643                                       wsrep::allowlist_service* allowlist_service)
644     {
645         assert(allowlist_service);
646         if (not wsrep::allowlist_service_v1_probe(dlh))
647         {
648             return wsrep::allowlist_service_v1_init(dlh, allowlist_service);
649         }
650         return 1;
651     }
652 
deinit_allowlist_service(void * dlh)653     static void deinit_allowlist_service(void* dlh)
654     {
655         // assert(not wsrep::allowlist_service_v1_probe(dlh));
656         wsrep::allowlist_service_v1_deinit(dlh);
657     }
658 
init_event_service(void * dlh,wsrep::event_service * service)659     static int init_event_service(void* dlh,
660                                   wsrep::event_service* service)
661     {
662         assert(service);
663         if (not wsrep::event_service_v1_probe(dlh))
664         {
665             return wsrep::event_service_v1_init(dlh, service);
666         }
667         return 1;
668     }
669 
deinit_event_service(void * dlh)670     static void deinit_event_service(void* dlh)
671     {
672         wsrep::event_service_v1_deinit(dlh);
673     }
674 }
675 
676 
677 
init_services(const wsrep::provider::services & services)678 void wsrep::wsrep_provider_v26::init_services(
679     const wsrep::provider::services& services)
680 {
681     if (services.thread_service)
682     {
683         if (init_thread_service(wsrep_->dlh, services.thread_service))
684         {
685             throw wsrep::runtime_error("Failed to initialize thread service");
686         }
687         services_enabled_.thread_service = services.thread_service;
688     }
689     if (services.tls_service)
690     {
691         if (init_tls_service(wsrep_->dlh, services.tls_service))
692         {
693             throw wsrep::runtime_error("Failed to initialze TLS service");
694         }
695         services_enabled_.tls_service = services.tls_service;
696     }
697     if (services.allowlist_service)
698     {
699         if (init_allowlist_service(wsrep_->dlh, services.allowlist_service))
700         {
701             throw wsrep::runtime_error("Failed to initialze allowlist service");
702         }
703         services_enabled_.allowlist_service = services.allowlist_service;
704     }
705     if (services.event_service)
706     {
707         if (init_event_service(wsrep_->dlh, services.event_service))
708         {
709             wsrep::log_warning() << "Failed to initialize event service";
710             // provider does not produce events, ignore
711         }
712         else
713         {
714             services_enabled_.event_service = services.event_service;
715         }
716     }
717 }
718 
deinit_services()719 void wsrep::wsrep_provider_v26::deinit_services()
720 {
721     if (services_enabled_.event_service)
722         deinit_event_service(wsrep_->dlh);
723     if (services_enabled_.tls_service)
724         deinit_tls_service(wsrep_->dlh);
725     if (services_enabled_.thread_service)
726         deinit_thread_service(wsrep_->dlh);
727     if (services_enabled_.allowlist_service)
728         deinit_allowlist_service(wsrep_->dlh);
729 }
730 
wsrep_provider_v26(wsrep::server_state & server_state,const std::string & provider_options,const std::string & provider_spec,const wsrep::provider::services & services)731 wsrep::wsrep_provider_v26::wsrep_provider_v26(
732     wsrep::server_state& server_state,
733     const std::string& provider_options,
734     const std::string& provider_spec,
735     const wsrep::provider::services& services)
736     : provider(server_state)
737     , wsrep_()
738     , services_enabled_()
739 {
740     wsrep_gtid_t state_id;
741     bool encryption_enabled = server_state.encryption_service() &&
742                               server_state.encryption_service()->encryption_enabled();
743     std::memcpy(state_id.uuid.data,
744                 server_state.initial_position().id().data(),
745                 sizeof(state_id.uuid.data));
746     state_id.seqno = server_state.initial_position().seqno().get();
747     struct wsrep_init_args init_args;
748     memset(&init_args, 0, sizeof(init_args));
749     init_args.app_ctx = &server_state;
750     init_args.node_name = server_state_.name().c_str();
751     init_args.node_address = server_state_.address().c_str();
752     init_args.node_incoming = server_state_.incoming_address().c_str();
753     init_args.data_dir = server_state_.working_dir().c_str();
754     init_args.options = provider_options.c_str();
755     init_args.proto_ver = server_state.max_protocol_version();
756     init_args.state_id = &state_id;
757     init_args.state = 0;
758     init_args.logger_cb = &logger_cb;
759     init_args.connected_cb = &connected_cb;
760     init_args.view_cb = &view_cb;
761     init_args.sst_request_cb = &sst_request_cb;
762     init_args.encrypt_cb = encryption_enabled ? encrypt_cb : NULL;
763     init_args.apply_cb = &apply_cb;
764     init_args.unordered_cb = 0;
765     init_args.sst_donate_cb = &sst_donate_cb;
766     init_args.synced_cb = &synced_cb;
767 
768     if (wsrep_load(provider_spec.c_str(), &wsrep_, logger_cb))
769     {
770         throw wsrep::runtime_error("Failed to load wsrep library");
771     }
772 
773     init_services(services);
774 
775     if (wsrep_->init(wsrep_, &init_args) != WSREP_OK)
776     {
777         throw wsrep::runtime_error("Failed to initialize wsrep provider");
778     }
779 
780     if (encryption_enabled)
781     {
782         const std::vector<unsigned char>& key = server_state.get_encryption_key();
783         if (key.size())
784         {
785             wsrep::const_buffer const_key(key.data(), key.size());
786             enum status const retval(enc_set_key(const_key));
787             if (retval != success)
788             {
789                 std::string msg("Failed to set encryption key: ");
790                 msg += to_string(retval);
791                 throw wsrep::runtime_error(msg);
792             }
793         }
794     }
795 }
796 
~wsrep_provider_v26()797 wsrep::wsrep_provider_v26::~wsrep_provider_v26()
798 {
799     wsrep_->free(wsrep_);
800     deinit_services();
801     wsrep_unload(wsrep_);
802 }
803 
connect(const std::string & cluster_name,const std::string & cluster_url,const std::string & state_donor,bool bootstrap)804 enum wsrep::provider::status wsrep::wsrep_provider_v26::connect(
805     const std::string& cluster_name,
806     const std::string& cluster_url,
807     const std::string& state_donor,
808     bool bootstrap)
809 {
810     return map_return_value(wsrep_->connect(wsrep_,
811                                             cluster_name.c_str(),
812                                             cluster_url.c_str(),
813                                             state_donor.c_str(),
814                                             bootstrap));
815 }
816 
disconnect()817 int wsrep::wsrep_provider_v26::disconnect()
818 {
819     int ret(0);
820     wsrep_status_t wret;
821     if ((wret = wsrep_->disconnect(wsrep_)) != WSREP_OK)
822     {
823         std::cerr << "Failed to disconnect from cluster: "
824                   << wret << "\n";
825         ret = 1;
826     }
827     return ret;
828 }
829 
capabilities() const830 int wsrep::wsrep_provider_v26::capabilities() const
831 {
832     return map_capabilities_from_native(wsrep_->capabilities(wsrep_));
833 }
desync()834 int wsrep::wsrep_provider_v26::desync()
835 {
836     return (wsrep_->desync(wsrep_) != WSREP_OK);
837 }
838 
resync()839 int wsrep::wsrep_provider_v26::resync()
840 {
841     return (wsrep_->resync(wsrep_) != WSREP_OK);
842 }
843 
pause()844 wsrep::seqno wsrep::wsrep_provider_v26::pause()
845 {
846     return wsrep::seqno(wsrep_->pause(wsrep_));
847 }
848 
resume()849 int wsrep::wsrep_provider_v26::resume()
850 {
851     return (wsrep_->resume(wsrep_) != WSREP_OK);
852 }
853 
854 enum wsrep::provider::status
run_applier(wsrep::high_priority_service * applier_ctx)855 wsrep::wsrep_provider_v26::run_applier(
856     wsrep::high_priority_service *applier_ctx)
857 {
858     return map_return_value(wsrep_->recv(wsrep_, applier_ctx));
859 }
860 
861 enum wsrep::provider::status
assign_read_view(wsrep::ws_handle & ws_handle,const wsrep::gtid * gtid)862 wsrep::wsrep_provider_v26::assign_read_view(wsrep::ws_handle& ws_handle,
863                                             const wsrep::gtid* gtid)
864 {
865     const wsrep_gtid_t* gtid_ptr(NULL);
866     wsrep_gtid_t tmp;
867 
868     if (gtid)
869     {
870         ::memcpy(&tmp.uuid, gtid->id().data(), sizeof(tmp.uuid));
871         tmp.seqno = gtid->seqno().get();
872         gtid_ptr = &tmp;
873     }
874 
875     mutable_ws_handle mwsh(ws_handle);
876     return map_return_value(wsrep_->assign_read_view(wsrep_, mwsh.native(),
877                                                      gtid_ptr));
878 }
879 
append_key(wsrep::ws_handle & ws_handle,const wsrep::key & key)880 int wsrep::wsrep_provider_v26::append_key(wsrep::ws_handle& ws_handle,
881                                           const wsrep::key& key)
882 {
883     if (key.size() > 3)
884     {
885         assert(0);
886         return 1;
887     }
888     wsrep_buf_t key_parts[3];
889     for (size_t i(0); i < key.size(); ++i)
890     {
891         key_parts[i].ptr = key.key_parts()[i].ptr();
892         key_parts[i].len = key.key_parts()[i].size();
893     }
894     wsrep_key_t wsrep_key = {key_parts, key.size()};
895     mutable_ws_handle mwsh(ws_handle);
896     return (wsrep_->append_key(
897                 wsrep_, mwsh.native(),
898                 &wsrep_key, 1, map_key_type(key.type()), true)
899             != WSREP_OK);
900 }
901 
902 enum wsrep::provider::status
append_data(wsrep::ws_handle & ws_handle,const wsrep::const_buffer & data)903 wsrep::wsrep_provider_v26::append_data(wsrep::ws_handle& ws_handle,
904                                        const wsrep::const_buffer& data)
905 {
906     const wsrep_buf_t wsrep_buf = {data.data(), data.size()};
907     mutable_ws_handle mwsh(ws_handle);
908     return map_return_value(
909         wsrep_->append_data(wsrep_, mwsh.native(), &wsrep_buf,
910                             1, WSREP_DATA_ORDERED, true));
911 }
912 
913 enum wsrep::provider::status
certify(wsrep::client_id client_id,wsrep::ws_handle & ws_handle,int flags,wsrep::ws_meta & ws_meta)914 wsrep::wsrep_provider_v26::certify(wsrep::client_id client_id,
915                                    wsrep::ws_handle& ws_handle,
916                                    int flags,
917                                    wsrep::ws_meta& ws_meta)
918 {
919     mutable_ws_handle mwsh(ws_handle);
920     mutable_ws_meta mmeta(ws_meta, flags);
921     return map_return_value(
922         wsrep_->certify(wsrep_, client_id.get(), mwsh.native(),
923                         mmeta.native_flags(),
924                         mmeta.native()));
925 }
926 
927 enum wsrep::provider::status
bf_abort(wsrep::seqno bf_seqno,wsrep::transaction_id victim_id,wsrep::seqno & victim_seqno)928 wsrep::wsrep_provider_v26::bf_abort(
929     wsrep::seqno bf_seqno,
930     wsrep::transaction_id victim_id,
931     wsrep::seqno& victim_seqno)
932 {
933     wsrep_seqno_t wsrep_victim_seqno;
934     wsrep_status_t ret(
935         wsrep_->abort_certification(
936             wsrep_, seqno_to_native(bf_seqno),
937             victim_id.get(), &wsrep_victim_seqno));
938     victim_seqno = seqno_from_native(wsrep_victim_seqno);
939     return map_return_value(ret);
940 }
941 
942 enum wsrep::provider::status
rollback(wsrep::transaction_id id)943 wsrep::wsrep_provider_v26::rollback(wsrep::transaction_id id)
944 {
945     return map_return_value(wsrep_->rollback(wsrep_, id.get(), 0));
946 }
947 
948 enum wsrep::provider::status
commit_order_enter(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)949 wsrep::wsrep_provider_v26::commit_order_enter(
950     const wsrep::ws_handle& ws_handle,
951     const wsrep::ws_meta& ws_meta)
952 {
953     const_ws_handle cwsh(ws_handle);
954     const_ws_meta cwsm(ws_meta);
955     return map_return_value(
956         wsrep_->commit_order_enter(wsrep_, cwsh.native(), cwsm.native()));
957 }
958 
959 int
commit_order_leave(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::mutable_buffer & err)960 wsrep::wsrep_provider_v26::commit_order_leave(
961     const wsrep::ws_handle& ws_handle,
962     const wsrep::ws_meta& ws_meta,
963     const wsrep::mutable_buffer& err)
964 {
965     const_ws_handle cwsh(ws_handle);
966     const_ws_meta cwsm(ws_meta);
967     wsrep_buf_t const err_buf = { err.data(), err.size() };
968     int ret(wsrep_->commit_order_leave(
969          wsrep_, cwsh.native(), cwsm.native(), &err_buf) != WSREP_OK);
970     return ret;
971 }
972 
release(wsrep::ws_handle & ws_handle)973 int wsrep::wsrep_provider_v26::release(wsrep::ws_handle& ws_handle)
974 {
975     mutable_ws_handle mwsh(ws_handle);
976     return (wsrep_->release(wsrep_, mwsh.native()) != WSREP_OK);
977 }
978 
979 enum wsrep::provider::status
replay(const wsrep::ws_handle & ws_handle,wsrep::high_priority_service * reply_service)980 wsrep::wsrep_provider_v26::replay(const wsrep::ws_handle& ws_handle,
981                                   wsrep::high_priority_service* reply_service)
982 {
983     const_ws_handle mwsh(ws_handle);
984     return map_return_value(
985         wsrep_->replay_trx(wsrep_, mwsh.native(), reply_service));
986 }
987 
988 enum wsrep::provider::status
enter_toi(wsrep::client_id client_id,const wsrep::key_array & keys,const wsrep::const_buffer & buffer,wsrep::ws_meta & ws_meta,int flags)989 wsrep::wsrep_provider_v26::enter_toi(
990     wsrep::client_id client_id,
991     const wsrep::key_array& keys,
992     const wsrep::const_buffer& buffer,
993     wsrep::ws_meta& ws_meta,
994     int flags)
995 {
996     mutable_ws_meta mmeta(ws_meta, flags);
997     std::vector<std::vector<wsrep_buf_t> > key_parts;
998     std::vector<wsrep_key_t> wsrep_keys;
999     wsrep_buf_t wsrep_buf = {buffer.data(), buffer.size()};
1000     for (size_t i(0); i < keys.size(); ++i)
1001     {
1002         key_parts.push_back(std::vector<wsrep_buf_t>());
1003         for (size_t kp(0); kp < keys[i].size(); ++kp)
1004         {
1005             wsrep_buf_t buf = {keys[i].key_parts()[kp].data(),
1006                                keys[i].key_parts()[kp].size()};
1007             key_parts[i].push_back(buf);
1008         }
1009     }
1010     for (size_t i(0); i < key_parts.size(); ++i)
1011     {
1012         wsrep_key_t key = {key_parts[i].data(), key_parts[i].size()};
1013         wsrep_keys.push_back(key);
1014     }
1015     return map_return_value(wsrep_->to_execute_start(
1016                                 wsrep_,
1017                                 client_id.get(),
1018                                 wsrep_keys.data(),
1019                                 wsrep_keys.size(),
1020                                 &wsrep_buf,
1021                                 buffer.size() ? 1 : 0,
1022                                 mmeta.native_flags(),
1023                                 mmeta.native()));
1024 }
1025 
1026 enum wsrep::provider::status
leave_toi(wsrep::client_id client_id,const wsrep::mutable_buffer & err)1027 wsrep::wsrep_provider_v26::leave_toi(wsrep::client_id client_id,
1028                                      const wsrep::mutable_buffer& err)
1029 {
1030     const wsrep_buf_t err_buf = { err.data(), err.size() };
1031     return map_return_value(wsrep_->to_execute_end(
1032                                 wsrep_, client_id.get(), &err_buf));
1033 }
1034 
1035 std::pair<wsrep::gtid, enum wsrep::provider::status>
causal_read(int timeout) const1036 wsrep::wsrep_provider_v26::causal_read(int timeout) const
1037 {
1038     wsrep_gtid_t wsrep_gtid;
1039     wsrep_status_t ret(wsrep_->sync_wait(wsrep_, 0, timeout, &wsrep_gtid));
1040     wsrep::gtid gtid(ret == WSREP_OK ?
1041                      wsrep::gtid(wsrep::id(wsrep_gtid.uuid.data,
1042                                            sizeof(wsrep_gtid.uuid.data)),
1043                                  wsrep::seqno(wsrep_gtid.seqno)) :
1044                      wsrep::gtid::undefined());
1045     return std::make_pair(gtid, map_return_value(ret));
1046 }
1047 
1048 enum wsrep::provider::status
wait_for_gtid(const wsrep::gtid & gtid,int timeout) const1049 wsrep::wsrep_provider_v26::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
1050     const
1051 {
1052     wsrep_gtid_t wsrep_gtid;
1053     std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
1054                 sizeof(wsrep_gtid.uuid.data));
1055     wsrep_gtid.seqno = gtid.seqno().get();
1056     return map_return_value(wsrep_->sync_wait(wsrep_, &wsrep_gtid, timeout, 0));
1057 }
1058 
last_committed_gtid() const1059 wsrep::gtid wsrep::wsrep_provider_v26::last_committed_gtid() const
1060 {
1061     wsrep_gtid_t wsrep_gtid;
1062     if (wsrep_->last_committed_id(wsrep_, &wsrep_gtid) != WSREP_OK)
1063     {
1064         throw wsrep::runtime_error("Failed to read last committed id");
1065     }
1066     return wsrep::gtid(
1067         wsrep::id(wsrep_gtid.uuid.data, sizeof(wsrep_gtid.uuid.data)),
1068         wsrep::seqno(wsrep_gtid.seqno));
1069 }
1070 
1071 enum wsrep::provider::status
sst_sent(const wsrep::gtid & gtid,int err)1072 wsrep::wsrep_provider_v26::sst_sent(const wsrep::gtid& gtid, int err)
1073 {
1074     wsrep_gtid_t wsrep_gtid;
1075     std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
1076                 sizeof(wsrep_gtid.uuid.data));
1077     wsrep_gtid.seqno = gtid.seqno().get();
1078     return map_return_value(wsrep_->sst_sent(wsrep_, &wsrep_gtid, err));
1079 }
1080 
1081 enum wsrep::provider::status
sst_received(const wsrep::gtid & gtid,int err)1082 wsrep::wsrep_provider_v26::sst_received(const wsrep::gtid& gtid, int err)
1083 {
1084     wsrep_gtid_t wsrep_gtid;
1085     std::memcpy(wsrep_gtid.uuid.data, gtid.id().data(),
1086                 sizeof(wsrep_gtid.uuid.data));
1087     wsrep_gtid.seqno = gtid.seqno().get();
1088     return map_return_value(wsrep_->sst_received(wsrep_, &wsrep_gtid, 0, err));
1089 }
1090 
1091 enum wsrep::provider::status
enc_set_key(const wsrep::const_buffer & key)1092 wsrep::wsrep_provider_v26::enc_set_key(const wsrep::const_buffer& key)
1093 {
1094     wsrep_enc_key_t enc_key = {key.data(), key.size()};
1095     return map_return_value(wsrep_->enc_set_key(wsrep_, &enc_key));
1096 }
1097 
1098 std::vector<wsrep::provider::status_variable>
status() const1099 wsrep::wsrep_provider_v26::status() const
1100 {
1101     std::vector<status_variable> ret;
1102     wsrep_stats_var* const stats(wsrep_->stats_get(wsrep_));
1103     wsrep_stats_var* i(stats);
1104     if (i)
1105     {
1106         while (i->name)
1107         {
1108             switch (i->type)
1109             {
1110             case WSREP_VAR_STRING:
1111                 ret.push_back(status_variable(i->name, i->value._string));
1112                 break;
1113             case WSREP_VAR_INT64:
1114             {
1115                 std::ostringstream os;
1116                 os << i->value._int64;
1117                 ret.push_back(status_variable(i->name, os.str()));
1118                 break;
1119             }
1120             case WSREP_VAR_DOUBLE:
1121             {
1122                 std::ostringstream os;
1123                 os << i->value._double;
1124                 ret.push_back(status_variable(i->name, os.str()));
1125                 break;
1126             }
1127             default:
1128                 assert(0);
1129                 break;
1130             }
1131             ++i;
1132         }
1133         wsrep_->stats_free(wsrep_, stats);
1134     }
1135     return ret;
1136 }
1137 
reset_status()1138 void wsrep::wsrep_provider_v26::reset_status()
1139 {
1140     wsrep_->stats_reset(wsrep_);
1141 }
1142 
options() const1143 std::string wsrep::wsrep_provider_v26::options() const
1144 {
1145     std::string ret;
1146     char* opts;
1147     if ((opts = wsrep_->options_get(wsrep_)))
1148     {
1149         ret = opts;
1150         free(opts);
1151     }
1152     else
1153     {
1154         throw wsrep::runtime_error("Failed to get provider options");
1155     }
1156     return ret;
1157 }
1158 
1159 enum wsrep::provider::status
options(const std::string & opts)1160 wsrep::wsrep_provider_v26::options(const std::string& opts)
1161 {
1162     return map_return_value(wsrep_->options_set(wsrep_, opts.c_str()));
1163 }
1164 
name() const1165 std::string wsrep::wsrep_provider_v26::name() const
1166 {
1167     return (wsrep_->provider_name ? wsrep_->provider_name : "unknown");
1168 }
1169 
version() const1170 std::string wsrep::wsrep_provider_v26::version() const
1171 {
1172     return (wsrep_->provider_version ? wsrep_->provider_version : "unknown");
1173 }
1174 
vendor() const1175 std::string wsrep::wsrep_provider_v26::vendor() const
1176 {
1177     return (wsrep_->provider_vendor ? wsrep_->provider_vendor : "unknown");
1178 }
1179 
native() const1180 void* wsrep::wsrep_provider_v26::native() const
1181 {
1182     return wsrep_;
1183 }
1184