1 // 2 // Copyright (C) 2014-2020 Codership Oy <info@codership.com> 3 // 4 5 6 // 7 // Common ASIO methods and configuration options for Galera 8 // 9 10 #ifndef GU_ASIO_HPP 11 #define GU_ASIO_HPP 12 13 #include "gu_config.hpp" 14 #include "gu_uri.hpp" 15 #include "gu_signals.hpp" 16 17 #include <netinet/tcp.h> // tcp_info 18 19 #include <array> 20 #include <chrono> 21 #include <functional> 22 #include <memory> 23 #include <string> 24 25 namespace gu 26 { 27 // URI schemes for networking 28 namespace scheme 29 { 30 const std::string tcp("tcp"); /// TCP scheme 31 const std::string udp("udp"); /// UDP scheme 32 const std::string ssl("ssl"); /// SSL scheme 33 const std::string def("tcp"); /// default scheme (TCP) 34 } 35 36 namespace conf 37 { 38 // Enable dynamic socket support 39 const std::string socket_dynamic("socket.dynamic"); 40 } 41 42 #ifdef GALERA_HAVE_SSL 43 // 44 // SSL 45 // 46 47 // Configuration options for sockets 48 namespace conf 49 { 50 /// Enable SSL explicitly 51 const std::string use_ssl("socket.ssl"); 52 /// SSL cipher list 53 const std::string ssl_cipher("socket.ssl_cipher"); 54 /// SSL compression algorithm 55 const std::string ssl_compression("socket.ssl_compression"); 56 /// SSL private key file 57 const std::string ssl_key("socket.ssl_key"); 58 /// SSL certificate file 59 const std::string ssl_cert("socket.ssl_cert"); 60 /// SSL CA file 61 const std::string ssl_ca("socket.ssl_ca"); 62 /// SSL password file 63 const std::string ssl_password_file("socket.ssl_password_file"); 64 // SSL reload 65 const std::string ssl_reload("socket.ssl_reload"); 66 } 67 68 69 // register ssl parameters to config 70 void ssl_register_params(gu::Config&); 71 72 // initialize defaults, verify set options 73 void ssl_init_options(gu::Config&); 74 75 // update ssl parameters 76 void ssl_param_set(const std::string&, const std::string&, gu::Config&); 77 #else ssl_register_params(gu::Config &)78 static inline void ssl_register_params(gu::Config&) { } ssl_init_options(gu::Config &)79 static inline void ssl_init_options(gu::Config&) { } 80 #endif // GALERA_HAVE_SSL 81 82 // 83 // Address manipulation helpers 84 // 85 86 /** 87 * @class AsioIpAddressV4 88 * 89 * A wrapper around asio::ip::address_v4 90 */ 91 class AsioIpAddressV4 92 { 93 public: 94 AsioIpAddressV4(); 95 AsioIpAddressV4(const AsioIpAddressV4&); 96 AsioIpAddressV4& operator=(AsioIpAddressV4); 97 ~AsioIpAddressV4(); 98 bool is_multicast() const; 99 class Impl; 100 Impl& impl(); 101 const Impl& impl() const; 102 private: 103 std::unique_ptr<Impl> impl_; 104 }; 105 106 /** 107 * @class AsioIpAddressV6 108 * 109 * A wrapper around asio::ip::address_v6 110 */ 111 class AsioIpAddressV6 112 { 113 public: 114 AsioIpAddressV6(); 115 AsioIpAddressV6(const AsioIpAddressV6&); 116 AsioIpAddressV6& operator=(AsioIpAddressV6); 117 ~AsioIpAddressV6(); 118 bool is_link_local() const; 119 unsigned long scope_id() const; 120 bool is_multicast() const; 121 class Impl; 122 Impl& impl(); 123 const Impl& impl() const; 124 private: 125 std::unique_ptr<Impl> impl_; 126 }; 127 128 /** 129 * @class AsioIpAddressV6 130 * 131 * A wrapper around asio::ip::address 132 */ 133 class AsioIpAddress 134 { 135 public: 136 class Impl; 137 AsioIpAddress(); 138 AsioIpAddress(const AsioIpAddress&); 139 AsioIpAddress& operator=(AsioIpAddress); 140 ~AsioIpAddress(); 141 bool is_v4() const; 142 bool is_v6() const; 143 AsioIpAddressV4 to_v4() const; 144 AsioIpAddressV6 to_v6() const; 145 Impl& impl(); 146 const Impl& impl() const; 147 private: 148 std::unique_ptr<Impl> impl_; 149 }; 150 151 // Return any address string. 152 std::string any_addr(const AsioIpAddress& addr); 153 154 // Escape address string. Surrounds IPv6 address with []. 155 // IPv4 addresses not affected. 156 std::string escape_addr(const AsioIpAddress& addr); 157 158 // Unescape address string. Remove [] from around the address if found. 159 std::string unescape_addr(const std::string& addr); 160 161 // Construct asio::ip::address from address string 162 AsioIpAddress make_address(const std::string& addr); 163 164 165 class AsioMutableBuffer 166 { 167 public: AsioMutableBuffer()168 AsioMutableBuffer() : data_(), size_() { } AsioMutableBuffer(void * data,size_t size)169 AsioMutableBuffer(void* data, size_t size) 170 : data_(data) 171 , size_(size) 172 { } data() const173 void* data() const { return data_; } size() const174 size_t size() const { return size_; } 175 private: 176 void* data_; 177 size_t size_; 178 }; 179 180 class AsioConstBuffer 181 { 182 public: AsioConstBuffer()183 AsioConstBuffer() 184 : data_() 185 , size_() 186 { } AsioConstBuffer(const void * data,size_t size)187 AsioConstBuffer(const void* data, size_t size) 188 : data_(data) 189 , size_(size) 190 { } data() const191 const void* data() const { return data_; } size() const192 size_t size() const { return size_; } 193 private: 194 const void* data_; 195 size_t size_; 196 }; 197 198 199 class AsioErrorCategory; 200 class AsioErrorCode 201 { 202 public: 203 /** 204 * A default constructor. Constructs success error code. 205 */ 206 AsioErrorCode(); 207 /** 208 * A constructor to generate error codes from system error codes. 209 */ 210 AsioErrorCode(int value); 211 212 /** 213 * A constructor to generate error codes from asio errors. 214 */ AsioErrorCode(int value,const AsioErrorCategory & category)215 AsioErrorCode(int value, const AsioErrorCategory& category) 216 : value_(value) 217 , category_(&category) 218 , error_extra_() 219 { } 220 AsioErrorCode(int value,const AsioErrorCategory & category,int error_extra)221 AsioErrorCode(int value, const AsioErrorCategory& category, 222 int error_extra) 223 : value_(value) 224 , category_(&category) 225 , error_extra_(error_extra) 226 { } 227 /** 228 * Return error number. 229 */ value() const230 int value() const { return value_; } 231 category() const232 const AsioErrorCategory* category() const { return category_; } 233 234 /** 235 * Return human readable error message. 236 */ 237 std::string message() const; 238 operator bool() const239 operator bool() const { return value_; } 240 operator !() const241 bool operator!() const { return value_ == 0; } 242 243 /** 244 * Return true if the error is end of file. 245 */ 246 bool is_eof() const; 247 248 /** 249 * Return true if the error is system error. 250 */ 251 bool is_system() const; 252 253 private: 254 int value_; 255 const AsioErrorCategory* category_; 256 // Extra category specific error information 257 int error_extra_; 258 }; 259 260 std::ostream& operator<<(std::ostream&, const AsioErrorCode&); 261 262 /* 263 * Helper to determine if the error code originates from an 264 * event which happens often and pollutes logs but for which the error 265 * does not provide any helpful information. 266 * 267 * Errors which happen frequently during cluster configuration changes 268 * and when connections break are considered verbose. 269 * 270 * Certain SSL errors such as 'short read' error are considered verbose. 271 * 272 * All errors which originate from TLS service hooks are considered 273 * verbose, it is up to application report them if appropriate. 274 */ 275 bool is_verbose_error(const AsioErrorCode&); 276 277 // TODO: Hide extra error info from public interface. It should be 278 // called internally by calls which produce human readable error messages. 279 #ifdef GALERA_HAVE_SSL 280 std::string extra_error_info(const gu::AsioErrorCode& ec); 281 #else // GALERA_HAVE_SSL extra_error_info(const gu::AsioErrorCode &)282 static inline std::string extra_error_info(const gu::AsioErrorCode&) 283 { return ""; } 284 #endif // GALERA_HAVE_SSL 285 class AsioSocket; 286 /** 287 * Abstract interface for asio socket handlers. 288 */ 289 class AsioSocketHandler 290 { 291 public: ~AsioSocketHandler()292 virtual ~AsioSocketHandler() { } 293 /** 294 * This will be called after asynchronous connection to the 295 * remote endpoint after call to AsioSocket::async_connect() 296 * completes. 297 * 298 * All internal protocol handshakes (e.g. SSL) will be completed 299 * before this handler is called. 300 * 301 * @param socket Reference to socket which initiated the call. 302 * @param ec Error code. 303 */ 304 virtual void connect_handler(AsioSocket& socket, 305 const AsioErrorCode& ec) = 0; 306 virtual void write_handler(AsioSocket&, 307 const AsioErrorCode&, 308 size_t bytes_transferred) = 0; 309 /** 310 * This call is called every time more data has been written 311 * into receive buffer submitted via async_read() call. 312 * The return value of the call should be maximum number of 313 * bytes to be transferred before the read is considered 314 * complete and read_handler() will be called. 315 * 316 * If the returned value is larger than the available space in 317 * read buffer, the maximum number of bytes to be transferred 318 * will be the available space in read buffer. It is up to application 319 * to resize the read buffer in read_handler() and restart async read 320 * if the available space was not enough to contain the whole message. 321 * 322 * @param socket Stream socket associated to this handler. 323 * @param ec Error code. 324 * @param bytes_transferred Number of bytes transferred so far. 325 * 326 * @return Maximum number of bytes to read to complete the 327 * read operation. 328 */ 329 virtual size_t read_completion_condition(AsioSocket& socket, 330 const AsioErrorCode&, 331 size_t bytes_transferred) = 0; 332 virtual void read_handler(AsioSocket&, const AsioErrorCode&, 333 size_t bytes_transferred) = 0; 334 335 }; 336 337 /** 338 * @class AsioSocket 339 * 340 * Abstract interface for stream socket implementations. 341 * 342 * Although the interface provides both sync and async operations, 343 * those should never be mixed. If the socket is connected 344 * via connect() call (or accepted via AsioAcceptor::accept() call), 345 * the underlying implementation uses blocking calls for 346 * reading and writing. On the other hand, if async_connect() 347 * or AsioAcceptor::async_accept() is used, the underlying implementation 348 * uses non-blocking operations. 349 */ 350 class AsioSocket 351 { 352 public: AsioSocket()353 AsioSocket() { } 354 355 AsioSocket(const AsioSocket&) = delete; 356 AsioSocket& operator=(const AsioSocket&) = delete; 357 ~AsioSocket()358 virtual ~AsioSocket() { } 359 360 /** 361 * Open the socket without connecting. 362 */ 363 virtual void open(const gu::URI& uri) = 0; 364 365 /** 366 * Return true if the socket is open. 367 */ 368 virtual bool is_open() const = 0; 369 370 /** 371 * Close the socket. 372 */ 373 virtual void close() = 0; 374 375 /** 376 * Bind the socket to interface specified by address. 377 */ 378 virtual void bind(const gu::AsioIpAddress&) = 0; 379 380 // Asynchronous operations 381 382 virtual void async_connect( 383 const gu::URI& uri, 384 const std::shared_ptr<AsioSocketHandler>& handler) = 0; 385 /** 386 * Call once. Next call can be made after socket handler is called 387 * with bytes transferred equal to last write size. 388 */ 389 virtual void async_write( 390 const std::array<AsioConstBuffer, 2>&, 391 const std::shared_ptr<AsioSocketHandler>& handler) = 0; 392 393 /** 394 * Call once. Next call can be done from socket handler 395 * read_handler or read_completion_condition. 396 */ 397 virtual void async_read( 398 const AsioMutableBuffer&, 399 const std::shared_ptr<AsioSocketHandler>& handler) = 0; 400 401 // Synchronous operations 402 403 /** 404 * Connect to remote endpoint specified by uri. 405 * 406 * @throw gu::Exception in case of error. 407 */ 408 virtual void connect(const gu::URI& uri) = 0; 409 410 /** 411 * Write contents of buffer into socket. This call blocks until 412 * all data has been written or error occurs. 413 * 414 * @throw gu::Exception in case of error. 415 */ 416 virtual size_t write(const AsioConstBuffer& buffer) = 0; 417 418 /** 419 * Read data from socket into buffer. The value returned is the 420 * number of bytes read so far. 421 * 422 * @throw gu::Exception in case of error. 423 */ 424 virtual size_t read(const AsioMutableBuffer& buffer) = 0; 425 426 // Utility operations. 427 428 /** 429 * Return address URI of local endpoint. Return empty string 430 * if not connected. 431 */ 432 virtual std::string local_addr() const = 0; 433 434 /** 435 * Return address URI of remote endpoint. Returns empty string 436 * if not connected. 437 */ 438 virtual std::string remote_addr() const = 0; 439 440 /** 441 * Set receive buffer size for the socket. This must be called 442 * before the socket is connected/accepted. 443 */ 444 virtual void set_receive_buffer_size(size_t) = 0; 445 446 /** 447 * Return currently effective receive buffer size. 448 */ 449 virtual size_t get_receive_buffer_size() = 0; 450 451 /** 452 * Set send buffer size for the socket. This must be called 453 * before the socket is connected/accepted. 454 */ 455 virtual void set_send_buffer_size(size_t) = 0; 456 457 /** 458 * Return currently effective send buffer size. 459 */ 460 virtual size_t get_send_buffer_size() = 0; 461 462 /** 463 * Read tcp_info struct from the underlying TCP socket. 464 */ 465 #ifndef __DragonFly__ 466 virtual struct tcp_info get_tcp_info() = 0; 467 #endif 468 }; 469 470 /** 471 * Helper template to write buffer sequences. 472 * 473 * @todo This should probably be optimized by implementing 474 * AsioSocket::write() overload which takes iterators to 475 * buffer sequences. 476 */ 477 template <class ConstBufferSequence> write(AsioSocket & socket,const ConstBufferSequence & bufs)478 size_t write(AsioSocket& socket, const ConstBufferSequence& bufs) 479 { 480 size_t written(0); 481 for (auto b(bufs.begin()); b != bufs.end(); ++b) 482 { 483 if (b->size() > 0) 484 { 485 written += socket.write(AsioConstBuffer(b->data(), b->size())); 486 } 487 } 488 return written; 489 } 490 491 class AsioDatagramSocket; 492 class AsioDatagramSocketHandler 493 { 494 public: ~AsioDatagramSocketHandler()495 virtual ~AsioDatagramSocketHandler() { } 496 virtual void read_handler(AsioDatagramSocket&, const AsioErrorCode&, 497 size_t bytes_transferred) = 0; 498 }; 499 500 /** 501 * @class AsioDatagramSocket 502 * 503 * Abstract interface for datagram socket implementations. 504 */ 505 class AsioDatagramSocket 506 { 507 public: AsioDatagramSocket()508 AsioDatagramSocket() { } ~AsioDatagramSocket()509 virtual ~AsioDatagramSocket() { } 510 511 /** 512 * Open the socket. 513 */ 514 virtual void open(const URI&) = 0; 515 516 /** 517 * Connect the socket to desired local endpoint. The socket 518 * will be bound to endpoint specified the uri. If the uri 519 * contains a multicast address, the connect will join the 520 * multicast group automatically. 521 */ 522 virtual void connect(const URI& uri) = 0; 523 524 /** 525 * Close the socket. If the socket was joined to multicast group, 526 * the group is left automatically. 527 */ 528 virtual void close() = 0; 529 530 /** 531 * Performa a write to the socket. The write is best effort only 532 * and the message can be dropped because of various reasons like 533 * kernel send buffer being full, network dropping the packet or 534 * receiving end(s) dropping the packet for whatever reason. 535 * 536 * The socket must be connected before writing into it. 537 * If connect() is not called, send_to() can be used to send 538 * datagram into desired address. 539 * 540 * @param bufs Array of two buffers. 541 * 542 * @throw gu::Exception If an other error than message being dropped 543 * occurs, an exception containing the error code will be thrown. 544 */ 545 virtual void write(const std::array<AsioConstBuffer, 2>& bufs) = 0; 546 547 /** 548 * Send a datagram to destination given by target. Sending a 549 * message is best effort only, the message may be dropped 550 * because of whatever reason and no error is given if the 551 * target endpoint does not exist. 552 */ 553 virtual void send_to(const std::array<AsioConstBuffer, 2>& bufs, 554 const AsioIpAddress& target_host, 555 unsigned short target_port) = 0; 556 557 /** 558 * Start asynchronous read from the socket. The socket handler 559 * read_handler() method will be called for each complete message 560 * which has been received. 561 */ 562 virtual void async_read(const AsioMutableBuffer&, 563 const std::shared_ptr<AsioDatagramSocketHandler>& handler) = 0; 564 565 /** 566 * Return address containing the local endpoint where the socket 567 * was bound to. 568 * 569 * @todo Maybe this should be bind addr and corresponding call 570 * connected_addr() should be introduced. 571 */ 572 virtual std::string local_addr() const = 0; 573 }; 574 575 class AsioAcceptor; 576 class AsioAcceptorHandler 577 { 578 public: ~AsioAcceptorHandler()579 virtual ~AsioAcceptorHandler() { } 580 virtual void accept_handler(AsioAcceptor&, 581 const std::shared_ptr<AsioSocket>&, 582 const gu::AsioErrorCode&) = 0; 583 }; 584 585 // Forward declaration for AsioAcceptor and make_socket() 586 class AsioStreamEngine; 587 588 589 /** @class AsioAcceptor 590 * 591 * Acceptor interface for stream sockets. 592 */ 593 class AsioAcceptor 594 { 595 public: AsioAcceptor()596 AsioAcceptor() { } 597 AsioAcceptor(const AsioAcceptor&) = delete; 598 AsioAcceptor& operator=(const AsioAcceptor&) = delete; ~AsioAcceptor()599 virtual ~AsioAcceptor() { } 600 virtual void open(const gu::URI& uri) = 0; 601 virtual void listen(const gu::URI& uri) = 0; 602 virtual void close() = 0; 603 virtual void async_accept(const std::shared_ptr<AsioAcceptorHandler>&, 604 const std::shared_ptr<AsioStreamEngine>& engine = nullptr) = 0; 605 virtual std::shared_ptr<AsioSocket> accept() = 0; 606 virtual std::string listen_addr() const = 0; 607 virtual unsigned short listen_port() const = 0; 608 609 /** 610 * Set receive buffer size for the acceptor. This must be called 611 * before listening. 612 */ 613 virtual void set_receive_buffer_size(size_t) = 0; 614 615 /** 616 * Return currently effective receive buffer size. 617 */ 618 virtual size_t get_receive_buffer_size() = 0; 619 620 /** 621 * Set send buffer size for the acceptor. This must be called 622 * before listening. 623 */ 624 virtual void set_send_buffer_size(size_t) = 0; 625 626 virtual size_t get_send_buffer_size() = 0; 627 628 }; 629 630 class AsioIoService 631 { 632 public: 633 AsioIoService(const gu::Config& conf = gu::Config()); 634 ~AsioIoService(); 635 AsioIoService(const AsioIoService&) = delete; 636 AsioIoService operator=(const AsioIoService&) = delete; 637 638 /** 639 * Handle global signals. 640 */ 641 void handle_signal(const gu::Signals::SignalType&); 642 643 /** 644 * Is dynamic socket enabled 645 */ dynamic_socket_enabled() const646 bool dynamic_socket_enabled() const 647 { 648 return dynamic_socket_; 649 } 650 651 /** 652 * Is SSL enabled and configured 653 */ 654 bool ssl_enabled() const; 655 656 /** 657 * Load crypto context. 658 */ 659 void load_crypto_context(); 660 661 /** 662 * Run one IO service handler. 663 */ 664 void run_one(); 665 666 /** 667 * Run until IO service is stopped or runs out of work. 668 */ 669 void run(); 670 671 /** 672 * Post a function for execution. The function will be invoked 673 * from inside run() or run_one(). 674 */ 675 void post(std::function<void()>); 676 677 /** 678 * Stop the IO service processing loop and return from run_one() 679 * or run() calls as soon as possible. Call to reset() is required 680 * to start processing via run_one() or run() after stop() has 681 * been called. 682 */ 683 void stop(); 684 685 /** 686 * Reset the IO service for subsequent call to run() or run_one(). 687 * This function must not be called from inside run() or run_one(). 688 */ 689 void reset(); 690 691 /** 692 * Make a new socket. The underlying transport will be 693 * a stream socket (TCP, SSL). 694 * 695 * @param uri An URI describing a desired socket scheme. 696 * @param handler Pointer to socket handler implementation. 697 * 698 * @return Shared pointer to AsioSocket. 699 */ 700 std::shared_ptr<AsioSocket> make_socket( 701 const gu::URI& uri, 702 const std::shared_ptr<AsioStreamEngine>& engine = nullptr); 703 704 /** 705 * Make a new datagram socket. The underlying transport 706 * will be a datagram socket (UDP). 707 * 708 * @param uri An URI describing a desired socket scheme. 709 * @param handler Pointer to socket handler implementation. 710 * 711 * @return Shared pointer to AsioDatagramSocket. 712 */ 713 std::shared_ptr<AsioDatagramSocket> make_datagram_socket( 714 const gu::URI& uri); 715 716 /** 717 * Make a new acceptor. 718 * 719 * @param uri Uri describing a desired socket scheme. 720 * @param acceptor_handler Pointer to acceptor handler implementation. 721 * @param socket_handler Pointer to socket handler implementation. 722 * 723 * @return Shared pointer to AsioSocketAcceptor. 724 */ 725 std::shared_ptr<AsioAcceptor> make_acceptor(const gu::URI& uri); 726 727 class Impl; 728 Impl& impl(); 729 private: 730 std::unique_ptr<Impl> impl_; 731 const gu::Config& conf_; 732 gu::Signals::signal_connection signal_connection_; 733 bool dynamic_socket_; 734 }; 735 736 class AsioSteadyTimerHandler 737 { 738 public: ~AsioSteadyTimerHandler()739 virtual ~AsioSteadyTimerHandler() { } 740 virtual void handle_wait(const AsioErrorCode&) = 0; 741 }; 742 743 #if (__GNUC__ == 4 && __GNUC_MINOR__ == 4) 744 typedef std::chrono::monotonic_clock AsioClock; 745 #else 746 typedef std::chrono::steady_clock AsioClock; 747 #endif // (__GNUC__ == 4 && __GNUC_MINOR__ == 4) 748 749 class AsioSteadyTimer 750 { 751 public: 752 AsioSteadyTimer(AsioIoService& io_service); 753 ~AsioSteadyTimer(); 754 AsioSteadyTimer(const AsioSteadyTimer&) = delete; 755 AsioSteadyTimer& operator=(const AsioSteadyTimer&) = delete; 756 void expires_from_now(const AsioClock::duration&); 757 void async_wait(const std::shared_ptr<AsioSteadyTimerHandler>&); 758 void cancel(); 759 private: 760 class Impl; 761 std::unique_ptr<Impl> impl_; 762 }; 763 } 764 765 #endif // GU_ASIO_HPP 766