1 /*
2 Copyright (c) 2015, 2020, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #define MYSQL_ROUTER_LOG_DOMAIN \
26 ::mysql_harness::logging::kMainLogger // must precede #include "logging.h"
27 #include "mysql/harness/loader.h"
28
29 #include <algorithm>
30 #include <array>
31 #include <atomic>
32 #include <cassert>
33 #include <cctype>
34 #include <cstdarg>
35 #include <cstring>
36 #include <exception>
37 #include <map>
38 #include <sstream>
39 #include <stdexcept>
40 #include <string>
41 #include <system_error>
42 #include <thread>
43
44 #ifndef _WIN32
45 #include <dlfcn.h>
46 #include <pthread.h>
47 #include <unistd.h>
48 #endif
49
50 ////////////////////////////////////////
51 // Package include files
52 #include "builtin_plugins.h"
53 #include "common.h" // mysql_harness::rename_thread()
54 #include "designator.h"
55 #include "dim.h"
56 #include "exception.h"
57 #include "harness_assert.h"
58 #include "my_stacktrace.h"
59 #include "mysql/harness/dynamic_loader.h"
60 #include "mysql/harness/filesystem.h"
61 #include "mysql/harness/logging/logging.h"
62 #include "mysql/harness/logging/registry.h"
63 #include "mysql/harness/plugin.h"
64 #include "utilities.h"
65 IMPORT_LOG_FUNCTIONS()
66
67 #include "my_compiler.h"
68
69 using mysql_harness::utility::find_range_first;
70 using mysql_harness::utility::make_range;
71 using mysql_harness::utility::reverse;
72
73 using mysql_harness::Config;
74 using mysql_harness::Path;
75
76 using std::ostringstream;
77
78 #if !defined(_WIN32)
79 #define USE_POSIX_SIGNALS
80 #endif
81
82 /**
83 * @defgroup Loader Plugin loader
84 *
85 * Plugin loader for loading and working with plugins.
86 */
87
88 ////////////////////////////////////////////////////////////////////////////////
89 //
90 // Signal handling
91 //
92 ////////////////////////////////////////////////////////////////////////////////
93
94 std::mutex we_might_shutdown_cond_mutex;
95 std::condition_variable we_might_shutdown_cond;
96
97 // set when the Router receives a signal to shut down or some fatal error
98 // condition occurred
99 static std::atomic<ShutdownReason> g_shutdown_pending{SHUTDOWN_NONE};
100
101 // the thread that is setting the g_shutdown_pending to SHUTDOWN_FATAL_ERROR is
102 // supposed to set this error message so that it bubbles up and ends up on the
103 // console
104 static std::string shutdown_fatal_error_message;
105
106 std::mutex log_reopen_cond_mutex;
107 std::condition_variable log_reopen_cond;
108 mysql_harness::LogReopenThread *g_reopen_thread{nullptr};
109
110 // application defined pointer to function called at log rename completion
111 static log_reopen_callback g_log_reopen_complete_callback_fp =
112 default_log_reopen_complete_cb;
113
114 /**
115 * request application shutdown.
116 *
117 * @throws std::system_error same as std::unique_lock::lock does
118 */
request_application_shutdown(const ShutdownReason reason)119 void request_application_shutdown(const ShutdownReason reason) {
120 {
121 std::unique_lock<std::mutex> lk(we_might_shutdown_cond_mutex);
122 std::unique_lock<std::mutex> lk2(log_reopen_cond_mutex);
123 g_shutdown_pending = reason;
124 }
125
126 we_might_shutdown_cond.notify_one();
127 // let's wake the log_reopen_thread too
128 log_reopen_cond.notify_one();
129 }
130
131 /**
132 * notify a "log_reopen" is requested with optional filename for old logfile.
133 *
134 * @param dst rename old logfile to filename before reopen
135 * @throws std::system_error same as std::unique_lock::lock does
136 */
request_log_reopen(const std::string dst)137 void request_log_reopen(const std::string dst) {
138 if (g_reopen_thread) g_reopen_thread->request_reopen(dst);
139 }
140
141 /**
142 * check reopen completed
143 */
log_reopen_completed()144 bool log_reopen_completed() {
145 if (g_reopen_thread) return g_reopen_thread->is_completed();
146
147 return true;
148 }
149
150 /**
151 * get last log reopen error
152 */
log_reopen_get_error()153 std::string log_reopen_get_error() {
154 if (g_reopen_thread) return g_reopen_thread->get_last_error();
155
156 return std::string("");
157 }
158
159 namespace {
160 #ifdef USE_POSIX_SIGNALS
161 const std::array<int, 6> g_fatal_signals{SIGSEGV, SIGABRT, SIGBUS,
162 SIGILL, SIGFPE, SIGTRAP};
163 #endif
164 } // namespace
165
block_all_nonfatal_signals()166 static void block_all_nonfatal_signals() {
167 #ifdef USE_POSIX_SIGNALS
168 sigset_t ss;
169 sigfillset(&ss);
170 // we can't block those signals globally and rely on our handler thread, as
171 // these are only received by the offending thread itself.
172 // see "man signal" for more details
173 for (const auto &sig : g_fatal_signals) {
174 sigdelset(&ss, sig);
175 }
176 if (0 != pthread_sigmask(SIG_SETMASK, &ss, nullptr)) {
177 throw std::runtime_error("pthread_sigmask() failed: " +
178 std::string(std::strerror(errno)));
179 }
180 #endif
181 }
182
183 #if !defined(__has_feature)
184 #define __has_feature(x) 0
185 #endif
186
187 // GCC defines __SANITIZE_ADDRESS
188 // clang has __has_feature and 'address_sanitizer'
189 #if defined(__SANITIZE_ADDRESS__) || (__has_feature(address_sanitizer))
190 #define HAS_FEATURE_ASAN
191 #endif
192
register_fatal_signal_handler()193 static void register_fatal_signal_handler() {
194 // enable a crash handler on POSIX systems if not built with ASAN
195 #if defined(USE_POSIX_SIGNALS) && !defined(HAS_FEATURE_ASAN)
196 #if defined(HAVE_STACKTRACE)
197 my_init_stacktrace();
198 #endif // HAVE_STACKTRACE
199
200 struct sigaction sa;
201 (void)sigemptyset(&sa.sa_mask);
202 sa.sa_flags = SA_RESETHAND;
203 sa.sa_handler = [](int sig) {
204 my_safe_printf_stderr("Application got fatal signal: %d\n", sig);
205 #ifdef HAVE_STACKTRACE
206 my_print_stacktrace(nullptr, 0);
207 #endif // HAVE_STACKTRACE
208 };
209
210 for (const auto &sig : g_fatal_signals) {
211 (void)sigaction(sig, &sa, nullptr);
212 }
213 #endif
214 }
215
216 /**
217 * Set the log reopen completion callback function pointer.
218 *
219 * @param cb Function to call at completion.
220 */
set_log_reopen_complete_callback(log_reopen_callback cb)221 void set_log_reopen_complete_callback(log_reopen_callback cb) {
222 g_log_reopen_complete_callback_fp = cb;
223 }
224
225 /**
226 * The default implementation for log reopen thread completion callback
227 * function.
228 *
229 * @param errmsg Error message. Empty string assumes successful completion.
230 */
default_log_reopen_complete_cb(const std::string errmsg)231 void default_log_reopen_complete_cb(const std::string errmsg) {
232 if (!errmsg.empty()) {
233 shutdown_fatal_error_message = errmsg;
234 request_application_shutdown(SHUTDOWN_FATAL_ERROR);
235 }
236 }
237
238 #ifdef _WIN32
ctrl_c_handler(DWORD ctrl_type)239 static BOOL WINAPI ctrl_c_handler(DWORD ctrl_type) {
240 if (ctrl_type == CTRL_C_EVENT || ctrl_type == CTRL_BREAK_EVENT) {
241 // user presed Ctrl+C or we got Ctrl+Break request
242 request_application_shutdown();
243 return TRUE; // don't pass this event to further handlers
244 } else {
245 // some other event
246 return FALSE; // let the default Windows handler deal with it
247 }
248 }
249
register_ctrl_c_handler()250 void register_ctrl_c_handler() {
251 if (!SetConsoleCtrlHandler(ctrl_c_handler, TRUE)) {
252 std::cerr << "Could not install Ctrl+C handler, exiting.\n";
253 exit(1);
254 }
255 }
256 #endif
257
258 namespace mysql_harness {
259
260 ////////////////////////////////////////////////////////////////////////////////
261 //
262 // PluginFuncEnv
263 //
264 ////////////////////////////////////////////////////////////////////////////////
265
PluginFuncEnv(const AppInfo * info,const ConfigSection * section,bool running)266 PluginFuncEnv::PluginFuncEnv(const AppInfo *info, const ConfigSection *section,
267 bool running /*= false*/)
268 : app_info_(info), config_section_(section), running_(running) {}
269
270 //----[ further config getters ]----------------------------------------------
271
get_config_section() const272 const ConfigSection *PluginFuncEnv::get_config_section() const noexcept {
273 std::lock_guard<std::mutex> lock(mutex_);
274 assert(config_section_);
275 return config_section_;
276 }
277
get_app_info() const278 const AppInfo *PluginFuncEnv::get_app_info() const noexcept {
279 std::lock_guard<std::mutex> lock(mutex_);
280 assert(app_info_);
281 return app_info_;
282 }
283
284 //----[ running flag ]--------------------------------------------------------
285
set_running()286 void PluginFuncEnv::set_running() noexcept {
287 std::lock_guard<std::mutex> lock(mutex_);
288 running_ = true;
289 }
290
clear_running()291 void PluginFuncEnv::clear_running() noexcept {
292 std::unique_lock<std::mutex> lock(mutex_);
293 running_ = false;
294 lock.unlock();
295 cond_.notify_all(); // for wait_for_stop()
296 }
297
is_running() const298 bool PluginFuncEnv::is_running() const noexcept {
299 std::lock_guard<std::mutex> lock(mutex_);
300 return running_;
301 }
302
wait_for_stop(uint32_t milliseconds) const303 bool PluginFuncEnv::wait_for_stop(uint32_t milliseconds) const noexcept {
304 auto pred = [this]() noexcept -> bool { return !running_; };
305
306 std::unique_lock<std::mutex> lock(mutex_);
307 if (milliseconds) // 0 = wait forever
308 cond_.wait_for(lock, std::chrono::milliseconds(milliseconds), pred);
309 else
310 cond_.wait(lock, pred);
311 return pred();
312 }
313
314 //----[ error handling ]------------------------------------------------------
315
exit_ok() const316 bool PluginFuncEnv::exit_ok() const noexcept {
317 std::lock_guard<std::mutex> lock(mutex_);
318 return error_type_ == kNoError;
319 }
320
set_error(ErrorType error_type,const char * fmt,va_list ap)321 void PluginFuncEnv::set_error(ErrorType error_type, const char *fmt,
322 va_list ap) noexcept {
323 std::lock_guard<std::mutex> lock(mutex_);
324
325 assert(error_message_.empty()); // \_ previous message wasn't consumed
326 assert(error_type_ == kNoError); // /
327 assert(error_type != kNoError); // what would be the purpose of that?
328
329 error_type_ = error_type;
330 if (fmt) {
331 char buf[1024] = {0};
332 vsnprintf(buf, sizeof(buf), fmt, ap);
333 error_message_ = buf;
334 } else {
335 error_message_ = "<empty message>";
336 }
337 }
338
339 ////////////////////////////////////////////////////////////////////////////////
340 //
341 // Harness API
342 //
343 ////////////////////////////////////////////////////////////////////////////////
344
345 //----[ further config getters ]----------------------------------------------
346
get_app_info(const PluginFuncEnv * env)347 const AppInfo *get_app_info(const PluginFuncEnv *env) noexcept {
348 return env->get_app_info();
349 }
350
get_config_section(const PluginFuncEnv * env)351 const ConfigSection *get_config_section(const PluginFuncEnv *env) noexcept {
352 return env->get_config_section();
353 }
354
355 //----[ running flag ]--------------------------------------------------------
356
is_running(const PluginFuncEnv * env)357 bool is_running(const PluginFuncEnv *env) noexcept { return env->is_running(); }
358
wait_for_stop(const PluginFuncEnv * env,uint32_t milliseconds)359 bool wait_for_stop(const PluginFuncEnv *env, uint32_t milliseconds) noexcept {
360 return env->wait_for_stop(milliseconds);
361 }
362
clear_running(PluginFuncEnv * env)363 void clear_running(PluginFuncEnv *env) noexcept { return env->clear_running(); }
364
365 //----[ error handling ]------------------------------------------------------
366 MY_ATTRIBUTE((format(printf, 3, 4)))
set_error(PluginFuncEnv * env,ErrorType error_type,const char * fmt,...)367 void set_error(PluginFuncEnv *env, ErrorType error_type, const char *fmt,
368 ...) noexcept {
369 va_list args;
370 va_start(args, fmt);
371 env->set_error(error_type, fmt, args);
372 va_end(args);
373 }
374
375 std::tuple<std::string, std::exception_ptr>
pop_error()376 PluginFuncEnv::pop_error() noexcept {
377 std::lock_guard<std::mutex> lock(mutex_);
378
379 // At the time of writing, the exception type was used in Router's main.cc
380 // to discriminate between error types, to give the user a hint of what
381 // caused the problem (configuration error, runtime error, etc).
382 std::tuple<std::string, std::exception_ptr> ret;
383 switch (error_type_) {
384 case kRuntimeError:
385 ret = std::make_tuple(
386 error_message_,
387 std::make_exception_ptr(std::runtime_error(error_message_)));
388 break;
389
390 case kConfigInvalidArgument:
391 ret = std::make_tuple(
392 error_message_,
393 std::make_exception_ptr(std::invalid_argument(error_message_)));
394 break;
395
396 case kConfigSyntaxError:
397 ret = std::make_tuple(
398 error_message_,
399 std::make_exception_ptr(mysql_harness::syntax_error(error_message_)));
400 break;
401
402 case kUndefinedError:
403 ret = std::make_tuple(
404 error_message_,
405 std::make_exception_ptr(std::runtime_error(error_message_)));
406 break;
407
408 case kNoError:
409 assert(0); // this function shouldn't be called in such case
410
411 // defensive programming:
412 // on production systems, default to runtime_error and go on
413 ret = std::make_tuple(
414 error_message_,
415 std::make_exception_ptr(std::runtime_error(error_message_)));
416 break;
417 }
418
419 error_type_ = kNoError;
420 error_message_.clear();
421
422 return ret;
423 }
424
425 // PluginThreads
426
427 /**
428 * join all threads.
429 *
430 * @throws std::system_error from std::thread::join()
431 */
join()432 void PluginThreads::join() {
433 // wait for all plugin-threads to join
434 for (auto &thr : threads_) {
435 if (thr.joinable()) thr.join();
436 }
437 }
438
push_back(std::thread && thr)439 void PluginThreads::push_back(std::thread &&thr) {
440 // if push-back throws it won't inc' 'running_' which is good.
441 threads_.push_back(std::move(thr));
442 ++running_;
443 }
444
try_stopped(std::exception_ptr & first_exc)445 void PluginThreads::try_stopped(std::exception_ptr &first_exc) {
446 std::exception_ptr exc;
447 while (running_ > 0 && plugin_stopped_events_.try_pop(exc)) {
448 --running_;
449
450 if (exc) {
451 first_exc = exc;
452 return;
453 }
454 }
455 }
456
wait_all_stopped(std::exception_ptr & first_exc)457 void PluginThreads::wait_all_stopped(std::exception_ptr &first_exc) {
458 // wait until all plugins signaled their return value
459 for (; running_ > 0; --running_) {
460 auto exc = plugin_stopped_events_.pop();
461 if (!first_exc) first_exc = exc;
462 }
463 }
464
465 ////////////////////////////////////////////////////////////////////////////////
466 //
467 // Loader
468 //
469 ////////////////////////////////////////////////////////////////////////////////
470
~Loader()471 Loader::~Loader() {
472 if (signal_thread_.joinable()) {
473 #ifdef USE_POSIX_SIGNALS
474 // as the signal thread is blocked on sigwait(), interrupt it with a SIGTERM
475 pthread_kill(signal_thread_.native_handle(), SIGTERM);
476 #endif
477 signal_thread_.join();
478 }
479 }
480
spawn_signal_handler_thread()481 void Loader::spawn_signal_handler_thread() {
482 #ifdef USE_POSIX_SIGNALS
483 std::promise<void> signal_handler_thread_setup_done;
484
485 signal_thread_ = std::thread([&signal_handler_thread_setup_done] {
486 mysql_harness::rename_thread("sig handler");
487
488 sigset_t ss;
489 sigemptyset(&ss);
490 sigaddset(&ss, SIGINT);
491 sigaddset(&ss, SIGTERM);
492 sigaddset(&ss, SIGHUP);
493
494 signal_handler_thread_setup_done.set_value();
495 int sig = 0;
496
497 while (true) {
498 if (0 == sigwait(&ss, &sig)) {
499 if (sig == SIGHUP) {
500 request_log_reopen();
501 } else {
502 harness_assert(sig == SIGINT || sig == SIGTERM);
503 request_application_shutdown();
504 return;
505 }
506 } else {
507 // man sigwait() says, it should only fail if we provided invalid
508 // signals.
509 harness_assert_this_should_not_execute();
510 }
511 }
512 });
513
514 // wait until the signal handler is setup
515 signal_handler_thread_setup_done.get_future().wait();
516 #endif
517 }
518
PluginInfo(const std::string & folder,const std::string & libname)519 Loader::PluginInfo::PluginInfo(const std::string &folder,
520 const std::string &libname) {
521 DynamicLoader dyn_loader(folder);
522
523 auto res = dyn_loader.load(libname);
524 if (!res) {
525 /* dlerror() from glibc returns:
526 *
527 * ```
528 * {filename}: cannot open shared object file: No such file or directory
529 * {filename}: cannot open shared object file: Permission denied
530 * {filename}: file too short
531 * {filename}: invalid ELF header
532 * ```
533 *
534 * msvcrt returns:
535 *
536 * ```
537 * Module not found.
538 * Access denied.
539 * Bad EXE format for %1
540 * ```
541 */
542 throw bad_plugin(
543 #ifdef _WIN32
544 // prepend filename on windows too, as it is done by glibc too
545 folder + "/" + libname + ".dll: " +
546 #endif
547 (res.error() == make_error_code(DynamicLoaderErrc::kDlError)
548 ? dyn_loader.error_msg()
549 : res.error().message()));
550 }
551
552 module_ = std::move(res.value());
553 }
554
load_plugin_descriptor(const std::string & name)555 void Loader::PluginInfo::load_plugin_descriptor(const std::string &name) {
556 const std::string symbol = "harness_plugin_" + name;
557
558 const auto res = module_.symbol(symbol);
559 if (!res) {
560 /* dlerror() from glibc returns:
561 *
562 * ```
563 * {filename}: undefined symbol: {symbol}
564 * ```
565 *
566 * msvcrt returns:
567 *
568 * ```
569 * Procedure not found.
570 * ```
571 */
572 throw bad_plugin(
573 #ifdef _WIN32
574 module_.filename() + ": " +
575 #endif
576 (res.error() == make_error_code(DynamicLoaderErrc::kDlError)
577 ? module_.error_msg()
578 : res.error().message())
579 #ifdef _WIN32
580 + ": " + symbol
581 #endif
582 );
583 }
584
585 plugin_ = reinterpret_cast<const Plugin *>(res.value());
586 }
587
load_from(const std::string & plugin_name,const std::string & library_name)588 const Plugin *Loader::load_from(const std::string &plugin_name,
589 const std::string &library_name) {
590 std::string error;
591 setup_info();
592
593 // We always load the library (even if it is already loaded) to
594 // honor potential dynamic library open/close reference counts. It
595 // is up to the platform implementation to ensure that multiple
596 // instances of a library can be handled.
597
598 PluginInfo info(plugin_folder_, library_name); // throws bad_plugin
599
600 info.load_plugin_descriptor(plugin_name); // throws bad_plugin
601
602 // Check that ABI version and architecture match
603 auto plugin = info.plugin();
604 if ((plugin->abi_version & 0xFF00) != (PLUGIN_ABI_VERSION & 0xFF00) ||
605 (plugin->abi_version & 0xFF) > (PLUGIN_ABI_VERSION & 0xFF)) {
606 ostringstream buffer;
607 buffer.setf(std::ios::hex, std::ios::basefield);
608 buffer.setf(std::ios::showbase);
609 buffer << "Bad ABI version - plugin version: " << plugin->abi_version
610 << ", loader version: " << PLUGIN_ABI_VERSION;
611 throw bad_plugin(buffer.str());
612 }
613
614 // Recursively load dependent modules, we skip NULL entries since
615 // the user might have added these by accident (for example, he
616 // assumed that the array was NULL-terminated) and they can safely
617 // be ignored instead of raising an error.
618 for (auto req : make_range(plugin->requires, plugin->requires_length)) {
619 if (req != nullptr) {
620 // Parse the designator to extract the plugin and constraints.
621 Designator designator(req);
622
623 // Load the plugin using the plugin name.
624 const Plugin *dep_plugin{nullptr};
625
626 try {
627 dep_plugin =
628 load(designator.plugin); // throws bad_plugin and bad_section
629 } catch (const bad_section &) {
630 log_error(
631 "Plugin '%s' needs plugin '%s' which is missing in the "
632 "configuration",
633 plugin_name.c_str(), designator.plugin.c_str());
634 throw;
635 }
636
637 // Check that the version of the plugin match what the
638 // designator expected and raise an exception if they don't
639 // match.
640 if (!designator.version_good(Version(dep_plugin->plugin_version))) {
641 Version version(dep_plugin->plugin_version);
642 std::ostringstream buffer;
643 buffer << designator.plugin << ": plugin version was " << version
644 << ", expected " << designator.constraint;
645 throw bad_plugin(buffer.str());
646 }
647 }
648 }
649
650 // If all went well, we register the plugin and return a
651 // pointer to it.
652 plugins_.emplace(plugin_name, std::move(info));
653 log_debug(" plugin '%s' loaded ok", plugin_name.c_str());
654 return plugin;
655 }
656
load(const std::string & plugin_name,const std::string & key)657 const Plugin *Loader::load(const std::string &plugin_name,
658 const std::string &key) {
659 log_debug(" plugin '%s:%s' loading", plugin_name.c_str(), key.c_str());
660
661 if (BuiltinPlugins::instance().has(plugin_name)) {
662 Plugin *plugin = BuiltinPlugins::instance().get_plugin(plugin_name);
663 // if plugin isn't registered yet, add it
664 if (plugins_.find(plugin_name) == plugins_.end()) {
665 plugins_.emplace(plugin_name, plugin);
666 }
667 return plugin;
668 } else {
669 ConfigSection &plugin =
670 config_.get(plugin_name, key); // throws bad_section
671 const std::string &library_name = plugin.get("library");
672 return load_from(plugin_name,
673 library_name); // throws bad_plugin and bad_section
674 }
675 }
676
load(const std::string & plugin_name)677 const Plugin *Loader::load(const std::string &plugin_name) {
678 log_debug(" plugin '%s' loading", plugin_name.c_str());
679
680 if (BuiltinPlugins::instance().has(plugin_name)) {
681 Plugin *plugin = BuiltinPlugins::instance().get_plugin(plugin_name);
682 if (plugins_.find(plugin_name) == plugins_.end()) {
683 plugins_.emplace(plugin_name, plugin);
684
685 // add config-section for builtin plugins, in case it isn't there yet
686 // as the the "start()" function otherwise isn't called by load_all()
687 if (!config_.has_any(plugin_name)) {
688 config_.add(plugin_name);
689 }
690 }
691 return plugin;
692 }
693
694 if (!config_.has_any(plugin_name)) {
695 // if no section for the plugin exists, try to load it anyway with an empty
696 // key-less section
697 //
698 // in case the plugin fails to load with bad_plugin, return bad_section to
699 // be consistent with existing behaviour
700 config_.add(plugin_name).add("library", plugin_name);
701
702 try {
703 return load_from(plugin_name, plugin_name); // throws bad_plugin
704 } catch (const bad_plugin &e) {
705 std::ostringstream buffer;
706 buffer << "Section name '" << plugin_name << "' does not exist";
707 throw bad_section(buffer.str());
708 }
709 }
710
711 Config::SectionList plugins = config_.get(plugin_name); // throws bad_section
712 if (plugins.size() > 1) {
713 std::ostringstream buffer;
714 buffer << "Section name '" << plugin_name
715 << "' is ambiguous. Alternatives are:";
716 for (const ConfigSection *plugin : plugins) buffer << " " << plugin->key;
717 throw bad_section(buffer.str());
718 } else if (plugins.size() == 0) {
719 std::ostringstream buffer;
720 buffer << "Section name '" << plugin_name << "' does not exist";
721 throw bad_section(buffer.str());
722 }
723
724 assert(plugins.size() == 1);
725 const ConfigSection *section = plugins.front();
726 const std::string &library_name = section->get("library");
727 return load_from(plugin_name, library_name); // throws bad_plugin
728 }
729
start()730 void Loader::start() {
731 // unload plugins on exit
732 std::shared_ptr<void> exit_guard(nullptr, [this](void *) { unload_all(); });
733
734 // check if there is anything to load; if not we currently treat is as an
735 // error, not letting the user to run "idle" router that would close right
736 // away
737 if (external_plugins_to_load_count() == 0) {
738 throw std::runtime_error(
739 "Error: MySQL Router not configured to load or start any plugin. "
740 "Exiting.");
741 }
742
743 // load plugins
744 load_all(); // throws bad_plugin on load error, causing an early return
745
746 // init and run plugins
747 std::exception_ptr first_eptr = run();
748 if (first_eptr) {
749 std::rethrow_exception(first_eptr);
750 }
751 }
752
external_plugins_to_load_count()753 size_t Loader::external_plugins_to_load_count() {
754 size_t result = 0;
755 for (std::pair<const std::string &, std::string> name : available()) {
756 if (!BuiltinPlugins::instance().has(name.first)) {
757 result++;
758 }
759 }
760
761 return result;
762 }
763
load_all()764 void Loader::load_all() {
765 log_debug("Loading all plugins.");
766
767 std::string section_name;
768 std::string section_key;
769
770 for (auto const §ion : available()) {
771 try {
772 std::tie(section_name, section_key) = section;
773 load(section_name, section_key);
774 } catch (const bad_plugin &e) {
775 throw bad_plugin(utility::string_format(
776 "Loading plugin for config-section '[%s%s%s]' failed: %s",
777 section_name.c_str(), !section_key.empty() ? ":" : "",
778 section_key.c_str(), e.what()));
779 }
780 }
781 }
782
unload_all()783 void Loader::unload_all() {
784 // this stage has no implementation so far; however, we want to flag that we
785 // reached this stage
786 log_debug("Unloading all plugins.");
787 // If that ever gets implemented make sure to not attempt unloading
788 // built-in plugins
789 }
790
791 /**
792 * If a isn't set, return b.
793 *
794 * like ?:, but ensures that b is _always_ evaluated first.
795 */
796 template <class T>
value_or(T a,T b)797 T value_or(T a, T b) {
798 return a ? a : b;
799 }
800
run()801 std::exception_ptr Loader::run() {
802 // initialize plugins
803 std::exception_ptr first_eptr = init_all();
804
805 // run plugins if initialization didn't fail
806 if (!first_eptr) {
807 try {
808 std::shared_ptr<void> exit_guard(
809 nullptr, [](void *) { g_reopen_thread = nullptr; });
810
811 start_all(); // if start() throws, exception is forwarded to
812 // main_loop()
813
814 // may throw std::system_error
815 LogReopenThread log_reopen_thread;
816 g_reopen_thread = &log_reopen_thread;
817
818 first_eptr = main_loop();
819 } catch (const std::exception &e) {
820 log_error("failed running start/main: %s", e.what());
821 first_eptr = stop_and_wait_all();
822 }
823 }
824
825 // not strict requiremnt, just good measure (they're no longer needed at
826 // this point)
827 assert(plugin_start_env_.empty());
828
829 // deinitialize plugins
830 first_eptr = value_or(first_eptr, deinit_all());
831
832 // return the first exception that was triggered by an error returned from
833 // any plugin function
834 return first_eptr;
835 }
836
available() const837 std::list<Config::SectionKey> Loader::available() const {
838 return config_.section_names();
839 }
840
setup_info()841 void Loader::setup_info() {
842 logging_folder_ = config_.get_default("logging_folder");
843 plugin_folder_ = config_.get_default("plugin_folder");
844 runtime_folder_ = config_.get_default("runtime_folder");
845 config_folder_ = config_.get_default("config_folder");
846 data_folder_ = config_.get_default("data_folder");
847
848 appinfo_.logging_folder = logging_folder_.c_str();
849 appinfo_.plugin_folder = plugin_folder_.c_str();
850 appinfo_.runtime_folder = runtime_folder_.c_str();
851 appinfo_.config_folder = config_folder_.c_str();
852 appinfo_.data_folder = data_folder_.c_str();
853 appinfo_.config = &config_;
854 appinfo_.program = program_.c_str();
855 }
856
call_plugin_function(PluginFuncEnv * env,std::exception_ptr & eptr,void (* fptr)(PluginFuncEnv *),const char * fnc_name,const char * plugin_name,const char * plugin_key=nullptr)857 static void call_plugin_function(PluginFuncEnv *env, std::exception_ptr &eptr,
858 void (*fptr)(PluginFuncEnv *),
859 const char *fnc_name, const char *plugin_name,
860 const char *plugin_key = nullptr) noexcept {
861 auto handle_plugin_exception = [](std::exception_ptr &first_eptr,
862 const std::string &func_name,
863 const char *plug_name, const char *plug_key,
864 const std::exception *e) noexcept -> void {
865 // Plugins are not allowed to throw, so let's alert the devs. But in
866 // production, we want to be robust and try to handle this gracefully
867 assert(0);
868
869 if (!first_eptr) first_eptr = std::current_exception();
870
871 std::string what = e ? (std::string(": ") + e->what()) : ".";
872 if (plug_key)
873 log_error(
874 " plugin '%s:%s' %s threw unexpected exception "
875 "- please contact plugin developers for more information%s",
876 plug_name, plug_key, func_name.c_str(), what.c_str());
877 else
878 log_error(
879 " plugin '%s' %s threw unexpected exception "
880 "- please contact plugin developers for more information%s",
881 plug_name, func_name.c_str(), what.c_str());
882 };
883
884 // This try/catch block is about defensive programming - plugins are not
885 // allowed to throw. But if the exception is caught anyway, we have to
886 // handle it somehow. In debug builds, we throw an assertion. In release
887 // builds, we whine about it in logs, but otherwise handle it like a
888 // normal error. This behavior is officially undefined, thus we are free
889 // to change this at our discretion.
890 try {
891 // call the plugin
892 fptr(env);
893
894 // error handling
895 if (env->exit_ok()) {
896 if (plugin_key)
897 log_debug(" plugin '%s:%s' %s exit ok", plugin_name, plugin_key,
898 fnc_name);
899 else
900 log_debug(" plugin '%s' %s exit ok", plugin_name, fnc_name);
901 } else {
902 std::string message;
903 if (!eptr) {
904 std::tie(message, eptr) = env->pop_error();
905 } else {
906 std::tie(message, std::ignore) = env->pop_error();
907 }
908 if (plugin_key)
909 log_error(" plugin '%s:%s' %s failed: %s", plugin_name, plugin_key,
910 fnc_name, message.c_str());
911 else
912 log_error(" plugin '%s' %s failed: %s", plugin_name, fnc_name,
913 message.c_str());
914 }
915
916 } catch (const std::exception &e) {
917 handle_plugin_exception(eptr, fnc_name, plugin_name, plugin_key, &e);
918 } catch (...) {
919 handle_plugin_exception(eptr, fnc_name, plugin_name, plugin_key, nullptr);
920 }
921 }
922
923 // returns first exception triggered by init()
init_all()924 std::exception_ptr Loader::init_all() {
925 // block non-fatal signal handling for all threads
926 //
927 // - no other thread than the signal-handler thread should receive signals
928 // - syscalls should not get interrupted by signals either
929 //
930 // on windows, this is a no-op
931 block_all_nonfatal_signals();
932
933 // for the fatal signals we want to have a handler that prints the stack-trace
934 // if possible
935 register_fatal_signal_handler();
936
937 log_debug("Initializing all plugins.");
938
939 if (!topsort()) throw std::logic_error("Circular dependencies in plugins");
940 order_.reverse(); // we need reverse-topo order for non-built-in plugins
941
942 for (auto it = order_.begin(); it != order_.end(); ++it) {
943 const std::string &plugin_name = *it;
944 PluginInfo &info = plugins_.at(plugin_name);
945
946 if (!info.plugin()->init) {
947 log_debug(" plugin '%s' doesn't implement init()", plugin_name.c_str());
948 continue;
949 }
950
951 log_debug(" plugin '%s' initializing", plugin_name.c_str());
952 PluginFuncEnv env(&appinfo_, nullptr);
953
954 std::exception_ptr eptr;
955 call_plugin_function(&env, eptr, info.plugin()->init, "init",
956 plugin_name.c_str());
957 if (eptr) {
958 // erase this and all remaining plugins from the list, so that
959 // deinit_all() will not try to run deinit() on them
960 order_.erase(it, order_.end());
961 return eptr;
962 }
963
964 } // for (auto it = order_.begin(); it != order_.end(); ++it)
965
966 return nullptr;
967 }
968
969 // forwards first exception triggered by start() to main_loop()
start_all()970 void Loader::start_all() {
971 log_debug("Starting all plugins.");
972
973 try {
974 // start all the plugins (call plugin's start() function)
975 for (const ConfigSection *section : config_.sections()) {
976 PluginInfo &plugin = plugins_.at(section->name);
977 void (*fptr)(PluginFuncEnv *) = plugin.plugin()->start;
978
979 if (!fptr) {
980 log_debug(" plugin '%s:%s' doesn't implement start()",
981 section->name.c_str(), section->key.c_str());
982
983 // create a env object for later
984 assert(plugin_start_env_.count(section) == 0);
985 plugin_start_env_[section] =
986 std::make_shared<PluginFuncEnv>(nullptr, section, false);
987
988 continue;
989 }
990
991 // future will remain valid even after promise is destructed
992 std::promise<std::shared_ptr<PluginFuncEnv>> env_promise;
993
994 // plugin start() will run in this new thread
995 std::thread plugin_thread([fptr, section, &env_promise, this]() {
996 log_debug(" plugin '%s:%s' starting", section->name.c_str(),
997 section->key.c_str());
998
999 // init env object and unblock harness thread
1000 std::shared_ptr<PluginFuncEnv> this_thread_env =
1001 std::make_shared<PluginFuncEnv>(nullptr, section, true);
1002 env_promise.set_value(this_thread_env); // shared_ptr gets copied here
1003 // (future will own a copy)
1004
1005 std::exception_ptr eptr;
1006 call_plugin_function(this_thread_env.get(), eptr, fptr, "start",
1007 section->name.c_str(), section->key.c_str());
1008
1009 {
1010 std::lock_guard<std::mutex> lock(we_might_shutdown_cond_mutex);
1011 plugin_threads_.push_exit_status(std::move(eptr));
1012 }
1013 we_might_shutdown_cond.notify_one();
1014 });
1015
1016 // we could combine the thread creation with emplace_back
1017 // but that sometimes leads to a crash on ASAN build (when the thread
1018 // limit is reached apparently sometimes half-baked thread object gets
1019 // added to the vector and its destructor crashes later on when the vector
1020 // gets destroyed)
1021 plugin_threads_.push_back(std::move(plugin_thread));
1022
1023 // block until starter thread is started
1024 // then save the env object for later
1025 assert(plugin_start_env_.count(section) == 0);
1026 plugin_start_env_[section] =
1027 env_promise.get_future()
1028 .get(); // returns shared_ptr to PluginFuncEnv;
1029 // PluginFuncEnv exists on heap
1030
1031 } // for (const ConfigSection* section: config_.sections())
1032 } catch (const std::system_error &e) {
1033 throw std::system_error(e.code(), "starting plugin-threads failed");
1034 }
1035
1036 try {
1037 // We wait with this until after we launch all plugin threads, to avoid
1038 // a potential race if a signal was received while plugins were still
1039 // launching.
1040 spawn_signal_handler_thread();
1041 } catch (const std::system_error &e) {
1042 // should we unblock the signals again?
1043 throw std::system_error(e.code(), "starting signal-handler-thread failed");
1044 }
1045 }
1046
1047 /**
1048 * wait for shutdown signal or plugins exit.
1049 *
1050 * blocks until one of the following happens:
1051 *
1052 * - shutdown signal is received
1053 * - one plugin return an exception
1054 * - all plugins finished
1055 *
1056 * @returns first exception returned by any of the plugins start() or stop()
1057 * functions
1058 * @retval nullptr if no exception was returned
1059 */
main_loop()1060 std::exception_ptr Loader::main_loop() {
1061 // RouterRoutingTest::RoutingPluginCantSpawnMoreThreads is waiting for this
1062 // log-message to appear in the log to get a predictible test-scenario.
1063 //
1064 // Changing or moving this message, will break that test.
1065 log_debug("Running.");
1066
1067 std::exception_ptr first_eptr;
1068 // wait for a reason to shutdown
1069 {
1070 std::unique_lock<std::mutex> lk(we_might_shutdown_cond_mutex);
1071
1072 we_might_shutdown_cond.wait(lk, [&first_eptr, this] {
1073 // external shutdown
1074 if (g_shutdown_pending == SHUTDOWN_REQUESTED) return true;
1075
1076 // shutdown due to a fatal error originating from Loader and its callees
1077 // (but NOT from plugins)
1078 if (g_shutdown_pending == SHUTDOWN_FATAL_ERROR) {
1079 // there is a request to shut down due to a fatal error; generate an
1080 // exception with requested message so that it bubbles up and ends up on
1081 // the console as an error message
1082 try {
1083 throw std::runtime_error(shutdown_fatal_error_message);
1084 } catch (const std::exception &) {
1085 first_eptr = std::current_exception(); // capture
1086 }
1087 return true;
1088 }
1089
1090 plugin_threads_.try_stopped(first_eptr);
1091 if (first_eptr) return true;
1092
1093 // all plugins stop successfully
1094 if (plugin_threads_.running() == 0) return true;
1095
1096 return false;
1097 });
1098 }
1099
1100 return value_or(first_eptr, stop_and_wait_all());
1101 }
1102
stop_and_wait_all()1103 std::exception_ptr Loader::stop_and_wait_all() {
1104 std::exception_ptr first_eptr;
1105
1106 // stop all plugins
1107 first_eptr = value_or(first_eptr, stop_all());
1108
1109 plugin_threads_.wait_all_stopped(first_eptr);
1110 try {
1111 plugin_threads_.join();
1112 } catch (...) {
1113 // may throw due to deadlocks and other system-related reasons.
1114 if (!first_eptr) {
1115 first_eptr = std::current_exception();
1116 }
1117 }
1118
1119 // we will no longer need the env objects for start(), might as well
1120 // clean them up now for good measure
1121 plugin_start_env_.clear();
1122
1123 // We just return the first exception that was raised (if any). If there
1124 // are other exceptions, they are ignored.
1125 return first_eptr;
1126 }
1127
1128 // returns first exception triggered by stop()
stop_all()1129 std::exception_ptr Loader::stop_all() {
1130 // This function runs exactly once - it will be called even if all plugins
1131 // exit by themselves (thus there's nothing to stop).
1132 log_debug("Shutting down. Stopping all plugins.");
1133
1134 // iterate over all plugin instances
1135 std::exception_ptr first_eptr;
1136 for (const ConfigSection *section : config_.sections()) {
1137 PluginInfo &plugin = plugins_.at(section->name);
1138 void (*fptr)(PluginFuncEnv *) = plugin.plugin()->stop;
1139
1140 assert(plugin_start_env_.count(section));
1141 assert(plugin_start_env_[section]->get_config_section() == section);
1142
1143 // flag plugin::start() to exit (if one exists and it's running)
1144 plugin_start_env_[section]->clear_running();
1145
1146 if (!fptr) {
1147 log_debug(" plugin '%s:%s' doesn't implement stop()",
1148 section->name.c_str(), section->key.c_str());
1149 continue;
1150 }
1151
1152 log_debug(" plugin '%s:%s' stopping", section->name.c_str(),
1153 section->key.c_str());
1154
1155 PluginFuncEnv stop_env(nullptr, section);
1156 call_plugin_function(&stop_env, first_eptr, fptr, "stop",
1157 section->name.c_str(), section->key.c_str());
1158
1159 } // for (const ConfigSection* section: config_.sections())
1160
1161 return first_eptr;
1162 }
1163
1164 // returns first exception triggered by deinit()
deinit_all()1165 std::exception_ptr Loader::deinit_all() {
1166 log_debug("Deinitializing all plugins.");
1167
1168 // we could just reverse order_ and that would work too,
1169 // but by leaving it intact it's easier to unit-test it
1170 std::list<std::string> deinit_order = order_;
1171 deinit_order.reverse();
1172
1173 // call deinit() on all plugins that support the call
1174 std::exception_ptr first_eptr;
1175 for (const std::string &plugin_name : deinit_order) {
1176 const PluginInfo &info = plugins_.at(plugin_name);
1177
1178 if (!info.plugin()->deinit) {
1179 log_debug(" plugin '%s' doesn't implement deinit()",
1180 plugin_name.c_str());
1181 continue;
1182 }
1183
1184 log_debug(" plugin '%s' deinitializing", plugin_name.c_str());
1185 PluginFuncEnv env(&appinfo_, nullptr);
1186
1187 call_plugin_function(&env, first_eptr, info.plugin()->deinit, "deinit",
1188 plugin_name.c_str());
1189 }
1190
1191 return first_eptr;
1192 }
1193
topsort()1194 bool Loader::topsort() {
1195 std::map<std::string, Loader::Status> status;
1196 std::list<std::string> order;
1197
1198 for (std::pair<const std::string, PluginInfo> &plugin : plugins_) {
1199 bool succeeded = visit(plugin.first, &status, &order);
1200 if (!succeeded) return false;
1201 }
1202
1203 order_.swap(order);
1204 return true;
1205 }
1206
visit(const std::string & designator,std::map<std::string,Loader::Status> * status,std::list<std::string> * order)1207 bool Loader::visit(const std::string &designator,
1208 std::map<std::string, Loader::Status> *status,
1209 std::list<std::string> *order) {
1210 Designator info(designator);
1211 switch ((*status)[info.plugin]) {
1212 case Status::VISITED:
1213 return true;
1214
1215 case Status::ONGOING:
1216 // If we see a node we are processing, it's not a DAG and cannot
1217 // be topologically sorted.
1218 return false;
1219
1220 case Status::UNVISITED: {
1221 (*status)[info.plugin] = Status::ONGOING;
1222 if (const Plugin *plugin = plugins_.at(info.plugin).plugin()) {
1223 for (auto required :
1224 make_range(plugin->requires, plugin->requires_length)) {
1225 assert(required != nullptr);
1226 bool succeeded = visit(required, status, order);
1227 if (!succeeded) return false;
1228 }
1229 }
1230 (*status)[info.plugin] = Status::VISITED;
1231 order->push_front(info.plugin);
1232 return true;
1233 }
1234 }
1235 return true;
1236 }
1237
1238 ////////////////////////////////////////////////////////////////////////////////
1239 //
1240 // LogReopenThread
1241 //
1242 ////////////////////////////////////////////////////////////////////////////////
1243
1244 /**
1245 * stop the log_reopen_thread_function.
1246 */
stop()1247 void LogReopenThread::stop() { request_application_shutdown(); }
1248
1249 /**
1250 * join the log_reopen thread.
1251 */
join()1252 void LogReopenThread::join() { reopen_thr_.join(); }
1253
1254 /**
1255 * destruct the thread.
1256 */
~LogReopenThread()1257 LogReopenThread::~LogReopenThread() {
1258 // if it didn't throw in the constructor, it is joinable and we have to
1259 // signal its shutdown
1260 if (reopen_thr_.joinable()) {
1261 try {
1262 // if stop throws ... the join will block
1263 stop();
1264
1265 // if join throws, log it and expect std::thread::~thread to call
1266 // std::terminate
1267 join();
1268 } catch (const std::exception &e) {
1269 try {
1270 log_error("~LogReopenThread failed to join its thread: %s", e.what());
1271 } catch (...) {
1272 // ignore it, we did our best to tell the user why std::terminate will
1273 // be called in a bit
1274 }
1275 }
1276 }
1277 }
1278
1279 /**
1280 * thread function
1281 */
log_reopen_thread_function(LogReopenThread * t)1282 void LogReopenThread::log_reopen_thread_function(LogReopenThread *t) {
1283 auto &logging_registry = mysql_harness::DIM::instance().get_LoggingRegistry();
1284
1285 while (true) {
1286 {
1287 std::unique_lock<std::mutex> lk(log_reopen_cond_mutex);
1288 if (g_shutdown_pending) {
1289 break;
1290 }
1291 log_reopen_cond.wait(lk);
1292 if (g_shutdown_pending) {
1293 break;
1294 }
1295 if (!t->is_requested()) {
1296 continue;
1297 }
1298 t->state_ = REOPEN_ACTIVE;
1299 t->errmsg_ = "";
1300 try {
1301 logging_registry.flush_all_loggers(t->dst_);
1302 t->dst_ = "";
1303 } catch (const std::exception &e) {
1304 // leave actions on error to the defined callback function
1305 t->errmsg_ = e.what();
1306 }
1307 }
1308 // trigger the completion callback once mutex is not locked
1309 g_log_reopen_complete_callback_fp(t->errmsg_);
1310 {
1311 std::unique_lock<std::mutex> lk(log_reopen_cond_mutex);
1312 t->state_ = REOPEN_NONE;
1313 }
1314 }
1315 }
1316
1317 /*
1318 * request reopen
1319 */
request_reopen(const std::string dst)1320 void LogReopenThread::request_reopen(const std::string dst) {
1321 std::unique_lock<std::mutex> lk(log_reopen_cond_mutex, std::defer_lock);
1322
1323 if (!lk.try_lock()) return;
1324
1325 state_ = REOPEN_REQUESTED;
1326 dst_ = dst;
1327
1328 log_reopen_cond.notify_one();
1329 }
1330
1331 } // namespace mysql_harness
1332
1333 // unit test access - DON'T USE IN PRODUCTION CODE!
1334 // (unfortunately we cannot guard this with #ifdef FRIEND_TEST)
1335 namespace unittest_backdoor {
1336 HARNESS_EXPORT
set_shutdown_pending(bool shutdown_pending)1337 void set_shutdown_pending(bool shutdown_pending) {
1338 g_shutdown_pending = shutdown_pending ? SHUTDOWN_REQUESTED : SHUTDOWN_NONE;
1339 }
1340 } // namespace unittest_backdoor
1341