1 /*
2   Copyright (c) 2015, 2020, Oracle and/or its affiliates.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23 */
24 
25 #define MYSQL_ROUTER_LOG_DOMAIN \
26   ::mysql_harness::logging::kMainLogger  // must precede #include "logging.h"
27 #include "mysql/harness/loader.h"
28 
29 #include <algorithm>
30 #include <array>
31 #include <atomic>
32 #include <cassert>
33 #include <cctype>
34 #include <cstdarg>
35 #include <cstring>
36 #include <exception>
37 #include <map>
38 #include <sstream>
39 #include <stdexcept>
40 #include <string>
41 #include <system_error>
42 #include <thread>
43 
44 #ifndef _WIN32
45 #include <dlfcn.h>
46 #include <pthread.h>
47 #include <unistd.h>
48 #endif
49 
50 ////////////////////////////////////////
51 // Package include files
52 #include "builtin_plugins.h"
53 #include "common.h"  // mysql_harness::rename_thread()
54 #include "designator.h"
55 #include "dim.h"
56 #include "exception.h"
57 #include "harness_assert.h"
58 #include "my_stacktrace.h"
59 #include "mysql/harness/dynamic_loader.h"
60 #include "mysql/harness/filesystem.h"
61 #include "mysql/harness/logging/logging.h"
62 #include "mysql/harness/logging/registry.h"
63 #include "mysql/harness/plugin.h"
64 #include "utilities.h"
65 IMPORT_LOG_FUNCTIONS()
66 
67 #include "my_compiler.h"
68 
69 using mysql_harness::utility::find_range_first;
70 using mysql_harness::utility::make_range;
71 using mysql_harness::utility::reverse;
72 
73 using mysql_harness::Config;
74 using mysql_harness::Path;
75 
76 using std::ostringstream;
77 
78 #if !defined(_WIN32)
79 #define USE_POSIX_SIGNALS
80 #endif
81 
82 /**
83  * @defgroup Loader Plugin loader
84  *
85  * Plugin loader for loading and working with plugins.
86  */
87 
88 ////////////////////////////////////////////////////////////////////////////////
89 //
90 // Signal handling
91 //
92 ////////////////////////////////////////////////////////////////////////////////
93 
94 std::mutex we_might_shutdown_cond_mutex;
95 std::condition_variable we_might_shutdown_cond;
96 
97 // set when the Router receives a signal to shut down or some fatal error
98 // condition occurred
99 static std::atomic<ShutdownReason> g_shutdown_pending{SHUTDOWN_NONE};
100 
101 // the thread that is setting the g_shutdown_pending to SHUTDOWN_FATAL_ERROR is
102 // supposed to set this error message so that it bubbles up and ends up on the
103 // console
104 static std::string shutdown_fatal_error_message;
105 
106 std::mutex log_reopen_cond_mutex;
107 std::condition_variable log_reopen_cond;
108 mysql_harness::LogReopenThread *g_reopen_thread{nullptr};
109 
110 // application defined pointer to function called at log rename completion
111 static log_reopen_callback g_log_reopen_complete_callback_fp =
112     default_log_reopen_complete_cb;
113 
114 /**
115  * request application shutdown.
116  *
117  * @throws std::system_error same as std::unique_lock::lock does
118  */
request_application_shutdown(const ShutdownReason reason)119 void request_application_shutdown(const ShutdownReason reason) {
120   {
121     std::unique_lock<std::mutex> lk(we_might_shutdown_cond_mutex);
122     std::unique_lock<std::mutex> lk2(log_reopen_cond_mutex);
123     g_shutdown_pending = reason;
124   }
125 
126   we_might_shutdown_cond.notify_one();
127   // let's wake the log_reopen_thread too
128   log_reopen_cond.notify_one();
129 }
130 
131 /**
132  * notify a "log_reopen" is requested with optional filename for old logfile.
133  *
134  * @param dst rename old logfile to filename before reopen
135  * @throws std::system_error same as std::unique_lock::lock does
136  */
request_log_reopen(const std::string dst)137 void request_log_reopen(const std::string dst) {
138   if (g_reopen_thread) g_reopen_thread->request_reopen(dst);
139 }
140 
141 /**
142  * check reopen completed
143  */
log_reopen_completed()144 bool log_reopen_completed() {
145   if (g_reopen_thread) return g_reopen_thread->is_completed();
146 
147   return true;
148 }
149 
150 /**
151  * get last log reopen error
152  */
log_reopen_get_error()153 std::string log_reopen_get_error() {
154   if (g_reopen_thread) return g_reopen_thread->get_last_error();
155 
156   return std::string("");
157 }
158 
159 namespace {
160 #ifdef USE_POSIX_SIGNALS
161 const std::array<int, 6> g_fatal_signals{SIGSEGV, SIGABRT, SIGBUS,
162                                          SIGILL,  SIGFPE,  SIGTRAP};
163 #endif
164 }  // namespace
165 
block_all_nonfatal_signals()166 static void block_all_nonfatal_signals() {
167 #ifdef USE_POSIX_SIGNALS
168   sigset_t ss;
169   sigfillset(&ss);
170   // we can't block those signals globally and rely on our handler thread, as
171   // these are only received by the offending thread itself.
172   // see "man signal" for more details
173   for (const auto &sig : g_fatal_signals) {
174     sigdelset(&ss, sig);
175   }
176   if (0 != pthread_sigmask(SIG_SETMASK, &ss, nullptr)) {
177     throw std::runtime_error("pthread_sigmask() failed: " +
178                              std::string(std::strerror(errno)));
179   }
180 #endif
181 }
182 
183 #if !defined(__has_feature)
184 #define __has_feature(x) 0
185 #endif
186 
187 // GCC defines __SANITIZE_ADDRESS
188 // clang has __has_feature and 'address_sanitizer'
189 #if defined(__SANITIZE_ADDRESS__) || (__has_feature(address_sanitizer))
190 #define HAS_FEATURE_ASAN
191 #endif
192 
register_fatal_signal_handler()193 static void register_fatal_signal_handler() {
194   // enable a crash handler on POSIX systems if not built with ASAN
195 #if defined(USE_POSIX_SIGNALS) && !defined(HAS_FEATURE_ASAN)
196 #if defined(HAVE_STACKTRACE)
197   my_init_stacktrace();
198 #endif  // HAVE_STACKTRACE
199 
200   struct sigaction sa;
201   (void)sigemptyset(&sa.sa_mask);
202   sa.sa_flags = SA_RESETHAND;
203   sa.sa_handler = [](int sig) {
204     my_safe_printf_stderr("Application got fatal signal: %d\n", sig);
205 #ifdef HAVE_STACKTRACE
206     my_print_stacktrace(nullptr, 0);
207 #endif  // HAVE_STACKTRACE
208   };
209 
210   for (const auto &sig : g_fatal_signals) {
211     (void)sigaction(sig, &sa, nullptr);
212   }
213 #endif
214 }
215 
216 /**
217  * Set the log reopen completion callback function pointer.
218  *
219  * @param cb Function to call at completion.
220  */
set_log_reopen_complete_callback(log_reopen_callback cb)221 void set_log_reopen_complete_callback(log_reopen_callback cb) {
222   g_log_reopen_complete_callback_fp = cb;
223 }
224 
225 /**
226  * The default implementation for log reopen thread completion callback
227  * function.
228  *
229  * @param errmsg Error message. Empty string assumes successful completion.
230  */
default_log_reopen_complete_cb(const std::string errmsg)231 void default_log_reopen_complete_cb(const std::string errmsg) {
232   if (!errmsg.empty()) {
233     shutdown_fatal_error_message = errmsg;
234     request_application_shutdown(SHUTDOWN_FATAL_ERROR);
235   }
236 }
237 
238 #ifdef _WIN32
ctrl_c_handler(DWORD ctrl_type)239 static BOOL WINAPI ctrl_c_handler(DWORD ctrl_type) {
240   if (ctrl_type == CTRL_C_EVENT || ctrl_type == CTRL_BREAK_EVENT) {
241     // user presed Ctrl+C or we got Ctrl+Break request
242     request_application_shutdown();
243     return TRUE;  // don't pass this event to further handlers
244   } else {
245     // some other event
246     return FALSE;  // let the default Windows handler deal with it
247   }
248 }
249 
register_ctrl_c_handler()250 void register_ctrl_c_handler() {
251   if (!SetConsoleCtrlHandler(ctrl_c_handler, TRUE)) {
252     std::cerr << "Could not install Ctrl+C handler, exiting.\n";
253     exit(1);
254   }
255 }
256 #endif
257 
258 namespace mysql_harness {
259 
260 ////////////////////////////////////////////////////////////////////////////////
261 //
262 // PluginFuncEnv
263 //
264 ////////////////////////////////////////////////////////////////////////////////
265 
PluginFuncEnv(const AppInfo * info,const ConfigSection * section,bool running)266 PluginFuncEnv::PluginFuncEnv(const AppInfo *info, const ConfigSection *section,
267                              bool running /*= false*/)
268     : app_info_(info), config_section_(section), running_(running) {}
269 
270 //----[ further config getters ]----------------------------------------------
271 
get_config_section() const272 const ConfigSection *PluginFuncEnv::get_config_section() const noexcept {
273   std::lock_guard<std::mutex> lock(mutex_);
274   assert(config_section_);
275   return config_section_;
276 }
277 
get_app_info() const278 const AppInfo *PluginFuncEnv::get_app_info() const noexcept {
279   std::lock_guard<std::mutex> lock(mutex_);
280   assert(app_info_);
281   return app_info_;
282 }
283 
284 //----[ running flag ]--------------------------------------------------------
285 
set_running()286 void PluginFuncEnv::set_running() noexcept {
287   std::lock_guard<std::mutex> lock(mutex_);
288   running_ = true;
289 }
290 
clear_running()291 void PluginFuncEnv::clear_running() noexcept {
292   std::unique_lock<std::mutex> lock(mutex_);
293   running_ = false;
294   lock.unlock();
295   cond_.notify_all();  // for wait_for_stop()
296 }
297 
is_running() const298 bool PluginFuncEnv::is_running() const noexcept {
299   std::lock_guard<std::mutex> lock(mutex_);
300   return running_;
301 }
302 
wait_for_stop(uint32_t milliseconds) const303 bool PluginFuncEnv::wait_for_stop(uint32_t milliseconds) const noexcept {
304   auto pred = [this]() noexcept -> bool { return !running_; };
305 
306   std::unique_lock<std::mutex> lock(mutex_);
307   if (milliseconds)  // 0 = wait forever
308     cond_.wait_for(lock, std::chrono::milliseconds(milliseconds), pred);
309   else
310     cond_.wait(lock, pred);
311   return pred();
312 }
313 
314 //----[ error handling ]------------------------------------------------------
315 
exit_ok() const316 bool PluginFuncEnv::exit_ok() const noexcept {
317   std::lock_guard<std::mutex> lock(mutex_);
318   return error_type_ == kNoError;
319 }
320 
set_error(ErrorType error_type,const char * fmt,va_list ap)321 void PluginFuncEnv::set_error(ErrorType error_type, const char *fmt,
322                               va_list ap) noexcept {
323   std::lock_guard<std::mutex> lock(mutex_);
324 
325   assert(error_message_.empty());   // \_ previous message wasn't consumed
326   assert(error_type_ == kNoError);  // /
327   assert(error_type != kNoError);   // what would be the purpose of that?
328 
329   error_type_ = error_type;
330   if (fmt) {
331     char buf[1024] = {0};
332     vsnprintf(buf, sizeof(buf), fmt, ap);
333     error_message_ = buf;
334   } else {
335     error_message_ = "<empty message>";
336   }
337 }
338 
339 ////////////////////////////////////////////////////////////////////////////////
340 //
341 // Harness API
342 //
343 ////////////////////////////////////////////////////////////////////////////////
344 
345 //----[ further config getters ]----------------------------------------------
346 
get_app_info(const PluginFuncEnv * env)347 const AppInfo *get_app_info(const PluginFuncEnv *env) noexcept {
348   return env->get_app_info();
349 }
350 
get_config_section(const PluginFuncEnv * env)351 const ConfigSection *get_config_section(const PluginFuncEnv *env) noexcept {
352   return env->get_config_section();
353 }
354 
355 //----[ running flag ]--------------------------------------------------------
356 
is_running(const PluginFuncEnv * env)357 bool is_running(const PluginFuncEnv *env) noexcept { return env->is_running(); }
358 
wait_for_stop(const PluginFuncEnv * env,uint32_t milliseconds)359 bool wait_for_stop(const PluginFuncEnv *env, uint32_t milliseconds) noexcept {
360   return env->wait_for_stop(milliseconds);
361 }
362 
clear_running(PluginFuncEnv * env)363 void clear_running(PluginFuncEnv *env) noexcept { return env->clear_running(); }
364 
365 //----[ error handling ]------------------------------------------------------
366 MY_ATTRIBUTE((format(printf, 3, 4)))
set_error(PluginFuncEnv * env,ErrorType error_type,const char * fmt,...)367 void set_error(PluginFuncEnv *env, ErrorType error_type, const char *fmt,
368                ...) noexcept {
369   va_list args;
370   va_start(args, fmt);
371   env->set_error(error_type, fmt, args);
372   va_end(args);
373 }
374 
375 std::tuple<std::string, std::exception_ptr>
pop_error()376 PluginFuncEnv::pop_error() noexcept {
377   std::lock_guard<std::mutex> lock(mutex_);
378 
379   // At the time of writing, the exception type was used in Router's main.cc
380   // to discriminate between error types, to give the user a hint of what
381   // caused the problem (configuration error, runtime error, etc).
382   std::tuple<std::string, std::exception_ptr> ret;
383   switch (error_type_) {
384     case kRuntimeError:
385       ret = std::make_tuple(
386           error_message_,
387           std::make_exception_ptr(std::runtime_error(error_message_)));
388       break;
389 
390     case kConfigInvalidArgument:
391       ret = std::make_tuple(
392           error_message_,
393           std::make_exception_ptr(std::invalid_argument(error_message_)));
394       break;
395 
396     case kConfigSyntaxError:
397       ret = std::make_tuple(
398           error_message_,
399           std::make_exception_ptr(mysql_harness::syntax_error(error_message_)));
400       break;
401 
402     case kUndefinedError:
403       ret = std::make_tuple(
404           error_message_,
405           std::make_exception_ptr(std::runtime_error(error_message_)));
406       break;
407 
408     case kNoError:
409       assert(0);  // this function shouldn't be called in such case
410 
411       // defensive programming:
412       // on production systems, default to runtime_error and go on
413       ret = std::make_tuple(
414           error_message_,
415           std::make_exception_ptr(std::runtime_error(error_message_)));
416       break;
417   }
418 
419   error_type_ = kNoError;
420   error_message_.clear();
421 
422   return ret;
423 }
424 
425 // PluginThreads
426 
427 /**
428  * join all threads.
429  *
430  * @throws std::system_error from std::thread::join()
431  */
join()432 void PluginThreads::join() {
433   // wait for all plugin-threads to join
434   for (auto &thr : threads_) {
435     if (thr.joinable()) thr.join();
436   }
437 }
438 
push_back(std::thread && thr)439 void PluginThreads::push_back(std::thread &&thr) {
440   // if push-back throws it won't inc' 'running_' which is good.
441   threads_.push_back(std::move(thr));
442   ++running_;
443 }
444 
try_stopped(std::exception_ptr & first_exc)445 void PluginThreads::try_stopped(std::exception_ptr &first_exc) {
446   std::exception_ptr exc;
447   while (running_ > 0 && plugin_stopped_events_.try_pop(exc)) {
448     --running_;
449 
450     if (exc) {
451       first_exc = exc;
452       return;
453     }
454   }
455 }
456 
wait_all_stopped(std::exception_ptr & first_exc)457 void PluginThreads::wait_all_stopped(std::exception_ptr &first_exc) {
458   // wait until all plugins signaled their return value
459   for (; running_ > 0; --running_) {
460     auto exc = plugin_stopped_events_.pop();
461     if (!first_exc) first_exc = exc;
462   }
463 }
464 
465 ////////////////////////////////////////////////////////////////////////////////
466 //
467 // Loader
468 //
469 ////////////////////////////////////////////////////////////////////////////////
470 
~Loader()471 Loader::~Loader() {
472   if (signal_thread_.joinable()) {
473 #ifdef USE_POSIX_SIGNALS
474     // as the signal thread is blocked on sigwait(), interrupt it with a SIGTERM
475     pthread_kill(signal_thread_.native_handle(), SIGTERM);
476 #endif
477     signal_thread_.join();
478   }
479 }
480 
spawn_signal_handler_thread()481 void Loader::spawn_signal_handler_thread() {
482 #ifdef USE_POSIX_SIGNALS
483   std::promise<void> signal_handler_thread_setup_done;
484 
485   signal_thread_ = std::thread([&signal_handler_thread_setup_done] {
486     mysql_harness::rename_thread("sig handler");
487 
488     sigset_t ss;
489     sigemptyset(&ss);
490     sigaddset(&ss, SIGINT);
491     sigaddset(&ss, SIGTERM);
492     sigaddset(&ss, SIGHUP);
493 
494     signal_handler_thread_setup_done.set_value();
495     int sig = 0;
496 
497     while (true) {
498       if (0 == sigwait(&ss, &sig)) {
499         if (sig == SIGHUP) {
500           request_log_reopen();
501         } else {
502           harness_assert(sig == SIGINT || sig == SIGTERM);
503           request_application_shutdown();
504           return;
505         }
506       } else {
507         // man sigwait() says, it should only fail if we provided invalid
508         // signals.
509         harness_assert_this_should_not_execute();
510       }
511     }
512   });
513 
514   // wait until the signal handler is setup
515   signal_handler_thread_setup_done.get_future().wait();
516 #endif
517 }
518 
PluginInfo(const std::string & folder,const std::string & libname)519 Loader::PluginInfo::PluginInfo(const std::string &folder,
520                                const std::string &libname) {
521   DynamicLoader dyn_loader(folder);
522 
523   auto res = dyn_loader.load(libname);
524   if (!res) {
525     /* dlerror() from glibc returns:
526      *
527      * ```
528      * {filename}: cannot open shared object file: No such file or directory
529      * {filename}: cannot open shared object file: Permission denied
530      * {filename}: file too short
531      * {filename}: invalid ELF header
532      * ```
533      *
534      * msvcrt returns:
535      *
536      * ```
537      * Module not found.
538      * Access denied.
539      * Bad EXE format for %1
540      * ```
541      */
542     throw bad_plugin(
543 #ifdef _WIN32
544         // prepend filename on windows too, as it is done by glibc too
545         folder + "/" + libname + ".dll: " +
546 #endif
547         (res.error() == make_error_code(DynamicLoaderErrc::kDlError)
548              ? dyn_loader.error_msg()
549              : res.error().message()));
550   }
551 
552   module_ = std::move(res.value());
553 }
554 
load_plugin_descriptor(const std::string & name)555 void Loader::PluginInfo::load_plugin_descriptor(const std::string &name) {
556   const std::string symbol = "harness_plugin_" + name;
557 
558   const auto res = module_.symbol(symbol);
559   if (!res) {
560     /* dlerror() from glibc returns:
561      *
562      * ```
563      * {filename}: undefined symbol: {symbol}
564      * ```
565      *
566      * msvcrt returns:
567      *
568      * ```
569      * Procedure not found.
570      * ```
571      */
572     throw bad_plugin(
573 #ifdef _WIN32
574         module_.filename() + ": " +
575 #endif
576         (res.error() == make_error_code(DynamicLoaderErrc::kDlError)
577              ? module_.error_msg()
578              : res.error().message())
579 #ifdef _WIN32
580         + ": " + symbol
581 #endif
582     );
583   }
584 
585   plugin_ = reinterpret_cast<const Plugin *>(res.value());
586 }
587 
load_from(const std::string & plugin_name,const std::string & library_name)588 const Plugin *Loader::load_from(const std::string &plugin_name,
589                                 const std::string &library_name) {
590   std::string error;
591   setup_info();
592 
593   // We always load the library (even if it is already loaded) to
594   // honor potential dynamic library open/close reference counts. It
595   // is up to the platform implementation to ensure that multiple
596   // instances of a library can be handled.
597 
598   PluginInfo info(plugin_folder_, library_name);  // throws bad_plugin
599 
600   info.load_plugin_descriptor(plugin_name);  // throws bad_plugin
601 
602   // Check that ABI version and architecture match
603   auto plugin = info.plugin();
604   if ((plugin->abi_version & 0xFF00) != (PLUGIN_ABI_VERSION & 0xFF00) ||
605       (plugin->abi_version & 0xFF) > (PLUGIN_ABI_VERSION & 0xFF)) {
606     ostringstream buffer;
607     buffer.setf(std::ios::hex, std::ios::basefield);
608     buffer.setf(std::ios::showbase);
609     buffer << "Bad ABI version - plugin version: " << plugin->abi_version
610            << ", loader version: " << PLUGIN_ABI_VERSION;
611     throw bad_plugin(buffer.str());
612   }
613 
614   // Recursively load dependent modules, we skip NULL entries since
615   // the user might have added these by accident (for example, he
616   // assumed that the array was NULL-terminated) and they can safely
617   // be ignored instead of raising an error.
618   for (auto req : make_range(plugin->requires, plugin->requires_length)) {
619     if (req != nullptr) {
620       // Parse the designator to extract the plugin and constraints.
621       Designator designator(req);
622 
623       // Load the plugin using the plugin name.
624       const Plugin *dep_plugin{nullptr};
625 
626       try {
627         dep_plugin =
628             load(designator.plugin);  // throws bad_plugin and bad_section
629       } catch (const bad_section &) {
630         log_error(
631             "Plugin '%s' needs plugin '%s' which is missing in the "
632             "configuration",
633             plugin_name.c_str(), designator.plugin.c_str());
634         throw;
635       }
636 
637       // Check that the version of the plugin match what the
638       // designator expected and raise an exception if they don't
639       // match.
640       if (!designator.version_good(Version(dep_plugin->plugin_version))) {
641         Version version(dep_plugin->plugin_version);
642         std::ostringstream buffer;
643         buffer << designator.plugin << ": plugin version was " << version
644                << ", expected " << designator.constraint;
645         throw bad_plugin(buffer.str());
646       }
647     }
648   }
649 
650   // If all went well, we register the plugin and return a
651   // pointer to it.
652   plugins_.emplace(plugin_name, std::move(info));
653   log_debug("  plugin '%s' loaded ok", plugin_name.c_str());
654   return plugin;
655 }
656 
load(const std::string & plugin_name,const std::string & key)657 const Plugin *Loader::load(const std::string &plugin_name,
658                            const std::string &key) {
659   log_debug("  plugin '%s:%s' loading", plugin_name.c_str(), key.c_str());
660 
661   if (BuiltinPlugins::instance().has(plugin_name)) {
662     Plugin *plugin = BuiltinPlugins::instance().get_plugin(plugin_name);
663     // if plugin isn't registered yet, add it
664     if (plugins_.find(plugin_name) == plugins_.end()) {
665       plugins_.emplace(plugin_name, plugin);
666     }
667     return plugin;
668   } else {
669     ConfigSection &plugin =
670         config_.get(plugin_name, key);  // throws bad_section
671     const std::string &library_name = plugin.get("library");
672     return load_from(plugin_name,
673                      library_name);  // throws bad_plugin and bad_section
674   }
675 }
676 
load(const std::string & plugin_name)677 const Plugin *Loader::load(const std::string &plugin_name) {
678   log_debug("  plugin '%s' loading", plugin_name.c_str());
679 
680   if (BuiltinPlugins::instance().has(plugin_name)) {
681     Plugin *plugin = BuiltinPlugins::instance().get_plugin(plugin_name);
682     if (plugins_.find(plugin_name) == plugins_.end()) {
683       plugins_.emplace(plugin_name, plugin);
684 
685       // add config-section for builtin plugins, in case it isn't there yet
686       // as the the "start()" function otherwise isn't called by load_all()
687       if (!config_.has_any(plugin_name)) {
688         config_.add(plugin_name);
689       }
690     }
691     return plugin;
692   }
693 
694   if (!config_.has_any(plugin_name)) {
695     // if no section for the plugin exists, try to load it anyway with an empty
696     // key-less section
697     //
698     // in case the plugin fails to load with bad_plugin, return bad_section to
699     // be consistent with existing behaviour
700     config_.add(plugin_name).add("library", plugin_name);
701 
702     try {
703       return load_from(plugin_name, plugin_name);  // throws bad_plugin
704     } catch (const bad_plugin &e) {
705       std::ostringstream buffer;
706       buffer << "Section name '" << plugin_name << "' does not exist";
707       throw bad_section(buffer.str());
708     }
709   }
710 
711   Config::SectionList plugins = config_.get(plugin_name);  // throws bad_section
712   if (plugins.size() > 1) {
713     std::ostringstream buffer;
714     buffer << "Section name '" << plugin_name
715            << "' is ambiguous. Alternatives are:";
716     for (const ConfigSection *plugin : plugins) buffer << " " << plugin->key;
717     throw bad_section(buffer.str());
718   } else if (plugins.size() == 0) {
719     std::ostringstream buffer;
720     buffer << "Section name '" << plugin_name << "' does not exist";
721     throw bad_section(buffer.str());
722   }
723 
724   assert(plugins.size() == 1);
725   const ConfigSection *section = plugins.front();
726   const std::string &library_name = section->get("library");
727   return load_from(plugin_name, library_name);  // throws bad_plugin
728 }
729 
start()730 void Loader::start() {
731   // unload plugins on exit
732   std::shared_ptr<void> exit_guard(nullptr, [this](void *) { unload_all(); });
733 
734   // check if there is anything to load; if not we currently treat is as an
735   // error, not letting the user to run "idle" router that would close right
736   // away
737   if (external_plugins_to_load_count() == 0) {
738     throw std::runtime_error(
739         "Error: MySQL Router not configured to load or start any plugin. "
740         "Exiting.");
741   }
742 
743   // load plugins
744   load_all();  // throws bad_plugin on load error, causing an early return
745 
746   // init and run plugins
747   std::exception_ptr first_eptr = run();
748   if (first_eptr) {
749     std::rethrow_exception(first_eptr);
750   }
751 }
752 
external_plugins_to_load_count()753 size_t Loader::external_plugins_to_load_count() {
754   size_t result = 0;
755   for (std::pair<const std::string &, std::string> name : available()) {
756     if (!BuiltinPlugins::instance().has(name.first)) {
757       result++;
758     }
759   }
760 
761   return result;
762 }
763 
load_all()764 void Loader::load_all() {
765   log_debug("Loading all plugins.");
766 
767   std::string section_name;
768   std::string section_key;
769 
770   for (auto const &section : available()) {
771     try {
772       std::tie(section_name, section_key) = section;
773       load(section_name, section_key);
774     } catch (const bad_plugin &e) {
775       throw bad_plugin(utility::string_format(
776           "Loading plugin for config-section '[%s%s%s]' failed: %s",
777           section_name.c_str(), !section_key.empty() ? ":" : "",
778           section_key.c_str(), e.what()));
779     }
780   }
781 }
782 
unload_all()783 void Loader::unload_all() {
784   // this stage has no implementation so far; however, we want to flag that we
785   // reached this stage
786   log_debug("Unloading all plugins.");
787   // If that ever gets implemented make sure to not attempt unloading
788   // built-in plugins
789 }
790 
791 /**
792  * If a isn't set, return b.
793  *
794  * like ?:, but ensures that b is _always_ evaluated first.
795  */
796 template <class T>
value_or(T a,T b)797 T value_or(T a, T b) {
798   return a ? a : b;
799 }
800 
run()801 std::exception_ptr Loader::run() {
802   // initialize plugins
803   std::exception_ptr first_eptr = init_all();
804 
805   // run plugins if initialization didn't fail
806   if (!first_eptr) {
807     try {
808       std::shared_ptr<void> exit_guard(
809           nullptr, [](void *) { g_reopen_thread = nullptr; });
810 
811       start_all();  // if start() throws, exception is forwarded to
812                     // main_loop()
813 
814       // may throw std::system_error
815       LogReopenThread log_reopen_thread;
816       g_reopen_thread = &log_reopen_thread;
817 
818       first_eptr = main_loop();
819     } catch (const std::exception &e) {
820       log_error("failed running start/main: %s", e.what());
821       first_eptr = stop_and_wait_all();
822     }
823   }
824 
825   // not strict requiremnt, just good measure (they're no longer needed at
826   // this point)
827   assert(plugin_start_env_.empty());
828 
829   // deinitialize plugins
830   first_eptr = value_or(first_eptr, deinit_all());
831 
832   // return the first exception that was triggered by an error returned from
833   // any plugin function
834   return first_eptr;
835 }
836 
available() const837 std::list<Config::SectionKey> Loader::available() const {
838   return config_.section_names();
839 }
840 
setup_info()841 void Loader::setup_info() {
842   logging_folder_ = config_.get_default("logging_folder");
843   plugin_folder_ = config_.get_default("plugin_folder");
844   runtime_folder_ = config_.get_default("runtime_folder");
845   config_folder_ = config_.get_default("config_folder");
846   data_folder_ = config_.get_default("data_folder");
847 
848   appinfo_.logging_folder = logging_folder_.c_str();
849   appinfo_.plugin_folder = plugin_folder_.c_str();
850   appinfo_.runtime_folder = runtime_folder_.c_str();
851   appinfo_.config_folder = config_folder_.c_str();
852   appinfo_.data_folder = data_folder_.c_str();
853   appinfo_.config = &config_;
854   appinfo_.program = program_.c_str();
855 }
856 
call_plugin_function(PluginFuncEnv * env,std::exception_ptr & eptr,void (* fptr)(PluginFuncEnv *),const char * fnc_name,const char * plugin_name,const char * plugin_key=nullptr)857 static void call_plugin_function(PluginFuncEnv *env, std::exception_ptr &eptr,
858                                  void (*fptr)(PluginFuncEnv *),
859                                  const char *fnc_name, const char *plugin_name,
860                                  const char *plugin_key = nullptr) noexcept {
861   auto handle_plugin_exception = [](std::exception_ptr &first_eptr,
862                                     const std::string &func_name,
863                                     const char *plug_name, const char *plug_key,
864                                     const std::exception *e) noexcept -> void {
865     // Plugins are not allowed to throw, so let's alert the devs. But in
866     // production, we want to be robust and try to handle this gracefully
867     assert(0);
868 
869     if (!first_eptr) first_eptr = std::current_exception();
870 
871     std::string what = e ? (std::string(": ") + e->what()) : ".";
872     if (plug_key)
873       log_error(
874           "  plugin '%s:%s' %s threw unexpected exception "
875           "- please contact plugin developers for more information%s",
876           plug_name, plug_key, func_name.c_str(), what.c_str());
877     else
878       log_error(
879           "  plugin '%s' %s threw unexpected exception "
880           "- please contact plugin developers for more information%s",
881           plug_name, func_name.c_str(), what.c_str());
882   };
883 
884   // This try/catch block is about defensive programming - plugins are not
885   // allowed to throw. But if the exception is caught anyway, we have to
886   // handle it somehow. In debug builds, we throw an assertion. In release
887   // builds, we whine about it in logs, but otherwise handle it like a
888   // normal error. This behavior is officially undefined, thus we are free
889   // to change this at our discretion.
890   try {
891     // call the plugin
892     fptr(env);
893 
894     // error handling
895     if (env->exit_ok()) {
896       if (plugin_key)
897         log_debug("  plugin '%s:%s' %s exit ok", plugin_name, plugin_key,
898                   fnc_name);
899       else
900         log_debug("  plugin '%s' %s exit ok", plugin_name, fnc_name);
901     } else {
902       std::string message;
903       if (!eptr) {
904         std::tie(message, eptr) = env->pop_error();
905       } else {
906         std::tie(message, std::ignore) = env->pop_error();
907       }
908       if (plugin_key)
909         log_error("  plugin '%s:%s' %s failed: %s", plugin_name, plugin_key,
910                   fnc_name, message.c_str());
911       else
912         log_error("  plugin '%s' %s failed: %s", plugin_name, fnc_name,
913                   message.c_str());
914     }
915 
916   } catch (const std::exception &e) {
917     handle_plugin_exception(eptr, fnc_name, plugin_name, plugin_key, &e);
918   } catch (...) {
919     handle_plugin_exception(eptr, fnc_name, plugin_name, plugin_key, nullptr);
920   }
921 }
922 
923 // returns first exception triggered by init()
init_all()924 std::exception_ptr Loader::init_all() {
925   // block non-fatal signal handling for all threads
926   //
927   // - no other thread than the signal-handler thread should receive signals
928   // - syscalls should not get interrupted by signals either
929   //
930   // on windows, this is a no-op
931   block_all_nonfatal_signals();
932 
933   // for the fatal signals we want to have a handler that prints the stack-trace
934   // if possible
935   register_fatal_signal_handler();
936 
937   log_debug("Initializing all plugins.");
938 
939   if (!topsort()) throw std::logic_error("Circular dependencies in plugins");
940   order_.reverse();  // we need reverse-topo order for non-built-in plugins
941 
942   for (auto it = order_.begin(); it != order_.end(); ++it) {
943     const std::string &plugin_name = *it;
944     PluginInfo &info = plugins_.at(plugin_name);
945 
946     if (!info.plugin()->init) {
947       log_debug("  plugin '%s' doesn't implement init()", plugin_name.c_str());
948       continue;
949     }
950 
951     log_debug("  plugin '%s' initializing", plugin_name.c_str());
952     PluginFuncEnv env(&appinfo_, nullptr);
953 
954     std::exception_ptr eptr;
955     call_plugin_function(&env, eptr, info.plugin()->init, "init",
956                          plugin_name.c_str());
957     if (eptr) {
958       // erase this and all remaining plugins from the list, so that
959       // deinit_all() will not try to run deinit() on them
960       order_.erase(it, order_.end());
961       return eptr;
962     }
963 
964   }  // for (auto it = order_.begin(); it != order_.end(); ++it)
965 
966   return nullptr;
967 }
968 
969 // forwards first exception triggered by start() to main_loop()
start_all()970 void Loader::start_all() {
971   log_debug("Starting all plugins.");
972 
973   try {
974     // start all the plugins (call plugin's start() function)
975     for (const ConfigSection *section : config_.sections()) {
976       PluginInfo &plugin = plugins_.at(section->name);
977       void (*fptr)(PluginFuncEnv *) = plugin.plugin()->start;
978 
979       if (!fptr) {
980         log_debug("  plugin '%s:%s' doesn't implement start()",
981                   section->name.c_str(), section->key.c_str());
982 
983         // create a env object for later
984         assert(plugin_start_env_.count(section) == 0);
985         plugin_start_env_[section] =
986             std::make_shared<PluginFuncEnv>(nullptr, section, false);
987 
988         continue;
989       }
990 
991       // future will remain valid even after promise is destructed
992       std::promise<std::shared_ptr<PluginFuncEnv>> env_promise;
993 
994       // plugin start() will run in this new thread
995       std::thread plugin_thread([fptr, section, &env_promise, this]() {
996         log_debug("  plugin '%s:%s' starting", section->name.c_str(),
997                   section->key.c_str());
998 
999         // init env object and unblock harness thread
1000         std::shared_ptr<PluginFuncEnv> this_thread_env =
1001             std::make_shared<PluginFuncEnv>(nullptr, section, true);
1002         env_promise.set_value(this_thread_env);  // shared_ptr gets copied here
1003                                                  // (future will own a copy)
1004 
1005         std::exception_ptr eptr;
1006         call_plugin_function(this_thread_env.get(), eptr, fptr, "start",
1007                              section->name.c_str(), section->key.c_str());
1008 
1009         {
1010           std::lock_guard<std::mutex> lock(we_might_shutdown_cond_mutex);
1011           plugin_threads_.push_exit_status(std::move(eptr));
1012         }
1013         we_might_shutdown_cond.notify_one();
1014       });
1015 
1016       // we could combine the thread creation with emplace_back
1017       // but that sometimes leads to a crash on ASAN build (when the thread
1018       // limit is reached apparently sometimes half-baked thread object gets
1019       // added to the vector and its destructor crashes later on when the vector
1020       // gets destroyed)
1021       plugin_threads_.push_back(std::move(plugin_thread));
1022 
1023       // block until starter thread is started
1024       // then save the env object for later
1025       assert(plugin_start_env_.count(section) == 0);
1026       plugin_start_env_[section] =
1027           env_promise.get_future()
1028               .get();  // returns shared_ptr to PluginFuncEnv;
1029                        // PluginFuncEnv exists on heap
1030 
1031     }  // for (const ConfigSection* section: config_.sections())
1032   } catch (const std::system_error &e) {
1033     throw std::system_error(e.code(), "starting plugin-threads failed");
1034   }
1035 
1036   try {
1037     // We wait with this until after we launch all plugin threads, to avoid
1038     // a potential race if a signal was received while plugins were still
1039     // launching.
1040     spawn_signal_handler_thread();
1041   } catch (const std::system_error &e) {
1042     // should we unblock the signals again?
1043     throw std::system_error(e.code(), "starting signal-handler-thread failed");
1044   }
1045 }
1046 
1047 /**
1048  * wait for shutdown signal or plugins exit.
1049  *
1050  * blocks until one of the following happens:
1051  *
1052  * - shutdown signal is received
1053  * - one plugin return an exception
1054  * - all plugins finished
1055  *
1056  * @returns first exception returned by any of the plugins start() or stop()
1057  * functions
1058  * @retval nullptr if no exception was returned
1059  */
main_loop()1060 std::exception_ptr Loader::main_loop() {
1061   // RouterRoutingTest::RoutingPluginCantSpawnMoreThreads is waiting for this
1062   // log-message to appear in the log to get a predictible test-scenario.
1063   //
1064   // Changing or moving this message, will break that test.
1065   log_debug("Running.");
1066 
1067   std::exception_ptr first_eptr;
1068   // wait for a reason to shutdown
1069   {
1070     std::unique_lock<std::mutex> lk(we_might_shutdown_cond_mutex);
1071 
1072     we_might_shutdown_cond.wait(lk, [&first_eptr, this] {
1073       // external shutdown
1074       if (g_shutdown_pending == SHUTDOWN_REQUESTED) return true;
1075 
1076       // shutdown due to a fatal error originating from Loader and its callees
1077       // (but NOT from plugins)
1078       if (g_shutdown_pending == SHUTDOWN_FATAL_ERROR) {
1079         // there is a request to shut down due to a fatal error; generate an
1080         // exception with requested message so that it bubbles up and ends up on
1081         // the console as an error message
1082         try {
1083           throw std::runtime_error(shutdown_fatal_error_message);
1084         } catch (const std::exception &) {
1085           first_eptr = std::current_exception();  // capture
1086         }
1087         return true;
1088       }
1089 
1090       plugin_threads_.try_stopped(first_eptr);
1091       if (first_eptr) return true;
1092 
1093       // all plugins stop successfully
1094       if (plugin_threads_.running() == 0) return true;
1095 
1096       return false;
1097     });
1098   }
1099 
1100   return value_or(first_eptr, stop_and_wait_all());
1101 }
1102 
stop_and_wait_all()1103 std::exception_ptr Loader::stop_and_wait_all() {
1104   std::exception_ptr first_eptr;
1105 
1106   // stop all plugins
1107   first_eptr = value_or(first_eptr, stop_all());
1108 
1109   plugin_threads_.wait_all_stopped(first_eptr);
1110   try {
1111     plugin_threads_.join();
1112   } catch (...) {
1113     // may throw due to deadlocks and other system-related reasons.
1114     if (!first_eptr) {
1115       first_eptr = std::current_exception();
1116     }
1117   }
1118 
1119   // we will no longer need the env objects for start(), might as well
1120   // clean them up now for good measure
1121   plugin_start_env_.clear();
1122 
1123   // We just return the first exception that was raised (if any). If there
1124   // are other exceptions, they are ignored.
1125   return first_eptr;
1126 }
1127 
1128 // returns first exception triggered by stop()
stop_all()1129 std::exception_ptr Loader::stop_all() {
1130   // This function runs exactly once - it will be called even if all plugins
1131   // exit by themselves (thus there's nothing to stop).
1132   log_debug("Shutting down. Stopping all plugins.");
1133 
1134   // iterate over all plugin instances
1135   std::exception_ptr first_eptr;
1136   for (const ConfigSection *section : config_.sections()) {
1137     PluginInfo &plugin = plugins_.at(section->name);
1138     void (*fptr)(PluginFuncEnv *) = plugin.plugin()->stop;
1139 
1140     assert(plugin_start_env_.count(section));
1141     assert(plugin_start_env_[section]->get_config_section() == section);
1142 
1143     // flag plugin::start() to exit (if one exists and it's running)
1144     plugin_start_env_[section]->clear_running();
1145 
1146     if (!fptr) {
1147       log_debug("  plugin '%s:%s' doesn't implement stop()",
1148                 section->name.c_str(), section->key.c_str());
1149       continue;
1150     }
1151 
1152     log_debug("  plugin '%s:%s' stopping", section->name.c_str(),
1153               section->key.c_str());
1154 
1155     PluginFuncEnv stop_env(nullptr, section);
1156     call_plugin_function(&stop_env, first_eptr, fptr, "stop",
1157                          section->name.c_str(), section->key.c_str());
1158 
1159   }  // for (const ConfigSection* section: config_.sections())
1160 
1161   return first_eptr;
1162 }
1163 
1164 // returns first exception triggered by deinit()
deinit_all()1165 std::exception_ptr Loader::deinit_all() {
1166   log_debug("Deinitializing all plugins.");
1167 
1168   // we could just reverse order_ and that would work too,
1169   // but by leaving it intact it's easier to unit-test it
1170   std::list<std::string> deinit_order = order_;
1171   deinit_order.reverse();
1172 
1173   // call deinit() on all plugins that support the call
1174   std::exception_ptr first_eptr;
1175   for (const std::string &plugin_name : deinit_order) {
1176     const PluginInfo &info = plugins_.at(plugin_name);
1177 
1178     if (!info.plugin()->deinit) {
1179       log_debug("  plugin '%s' doesn't implement deinit()",
1180                 plugin_name.c_str());
1181       continue;
1182     }
1183 
1184     log_debug("  plugin '%s' deinitializing", plugin_name.c_str());
1185     PluginFuncEnv env(&appinfo_, nullptr);
1186 
1187     call_plugin_function(&env, first_eptr, info.plugin()->deinit, "deinit",
1188                          plugin_name.c_str());
1189   }
1190 
1191   return first_eptr;
1192 }
1193 
topsort()1194 bool Loader::topsort() {
1195   std::map<std::string, Loader::Status> status;
1196   std::list<std::string> order;
1197 
1198   for (std::pair<const std::string, PluginInfo> &plugin : plugins_) {
1199     bool succeeded = visit(plugin.first, &status, &order);
1200     if (!succeeded) return false;
1201   }
1202 
1203   order_.swap(order);
1204   return true;
1205 }
1206 
visit(const std::string & designator,std::map<std::string,Loader::Status> * status,std::list<std::string> * order)1207 bool Loader::visit(const std::string &designator,
1208                    std::map<std::string, Loader::Status> *status,
1209                    std::list<std::string> *order) {
1210   Designator info(designator);
1211   switch ((*status)[info.plugin]) {
1212     case Status::VISITED:
1213       return true;
1214 
1215     case Status::ONGOING:
1216       // If we see a node we are processing, it's not a DAG and cannot
1217       // be topologically sorted.
1218       return false;
1219 
1220     case Status::UNVISITED: {
1221       (*status)[info.plugin] = Status::ONGOING;
1222       if (const Plugin *plugin = plugins_.at(info.plugin).plugin()) {
1223         for (auto required :
1224              make_range(plugin->requires, plugin->requires_length)) {
1225           assert(required != nullptr);
1226           bool succeeded = visit(required, status, order);
1227           if (!succeeded) return false;
1228         }
1229       }
1230       (*status)[info.plugin] = Status::VISITED;
1231       order->push_front(info.plugin);
1232       return true;
1233     }
1234   }
1235   return true;
1236 }
1237 
1238 ////////////////////////////////////////////////////////////////////////////////
1239 //
1240 // LogReopenThread
1241 //
1242 ////////////////////////////////////////////////////////////////////////////////
1243 
1244 /**
1245  * stop the log_reopen_thread_function.
1246  */
stop()1247 void LogReopenThread::stop() { request_application_shutdown(); }
1248 
1249 /**
1250  * join the log_reopen thread.
1251  */
join()1252 void LogReopenThread::join() { reopen_thr_.join(); }
1253 
1254 /**
1255  * destruct the thread.
1256  */
~LogReopenThread()1257 LogReopenThread::~LogReopenThread() {
1258   // if it didn't throw in the constructor, it is joinable and we have to
1259   // signal its shutdown
1260   if (reopen_thr_.joinable()) {
1261     try {
1262       // if stop throws ... the join will block
1263       stop();
1264 
1265       // if join throws, log it and expect std::thread::~thread to call
1266       // std::terminate
1267       join();
1268     } catch (const std::exception &e) {
1269       try {
1270         log_error("~LogReopenThread failed to join its thread: %s", e.what());
1271       } catch (...) {
1272         // ignore it, we did our best to tell the user why std::terminate will
1273         // be called in a bit
1274       }
1275     }
1276   }
1277 }
1278 
1279 /**
1280  * thread function
1281  */
log_reopen_thread_function(LogReopenThread * t)1282 void LogReopenThread::log_reopen_thread_function(LogReopenThread *t) {
1283   auto &logging_registry = mysql_harness::DIM::instance().get_LoggingRegistry();
1284 
1285   while (true) {
1286     {
1287       std::unique_lock<std::mutex> lk(log_reopen_cond_mutex);
1288       if (g_shutdown_pending) {
1289         break;
1290       }
1291       log_reopen_cond.wait(lk);
1292       if (g_shutdown_pending) {
1293         break;
1294       }
1295       if (!t->is_requested()) {
1296         continue;
1297       }
1298       t->state_ = REOPEN_ACTIVE;
1299       t->errmsg_ = "";
1300       try {
1301         logging_registry.flush_all_loggers(t->dst_);
1302         t->dst_ = "";
1303       } catch (const std::exception &e) {
1304         // leave actions on error to the defined callback function
1305         t->errmsg_ = e.what();
1306       }
1307     }
1308     // trigger the completion callback once mutex is not locked
1309     g_log_reopen_complete_callback_fp(t->errmsg_);
1310     {
1311       std::unique_lock<std::mutex> lk(log_reopen_cond_mutex);
1312       t->state_ = REOPEN_NONE;
1313     }
1314   }
1315 }
1316 
1317 /*
1318  * request reopen
1319  */
request_reopen(const std::string dst)1320 void LogReopenThread::request_reopen(const std::string dst) {
1321   std::unique_lock<std::mutex> lk(log_reopen_cond_mutex, std::defer_lock);
1322 
1323   if (!lk.try_lock()) return;
1324 
1325   state_ = REOPEN_REQUESTED;
1326   dst_ = dst;
1327 
1328   log_reopen_cond.notify_one();
1329 }
1330 
1331 }  // namespace mysql_harness
1332 
1333 // unit test access - DON'T USE IN PRODUCTION CODE!
1334 // (unfortunately we cannot guard this with #ifdef FRIEND_TEST)
1335 namespace unittest_backdoor {
1336 HARNESS_EXPORT
set_shutdown_pending(bool shutdown_pending)1337 void set_shutdown_pending(bool shutdown_pending) {
1338   g_shutdown_pending = shutdown_pending ? SHUTDOWN_REQUESTED : SHUTDOWN_NONE;
1339 }
1340 }  // namespace unittest_backdoor
1341