1 //
2 // Copyright (C) 2014-2020 Codership Oy <info@codership.com>
3 //
4 
5 
6 //
7 // Common ASIO methods and configuration options for Galera
8 //
9 
10 #ifndef GU_ASIO_HPP
11 #define GU_ASIO_HPP
12 
13 #include "gu_config.hpp"
14 #include "gu_uri.hpp"
15 #include "gu_signals.hpp"
16 
17 #include <netinet/tcp.h> // tcp_info
18 
19 #include <array>
20 #include <chrono>
21 #include <functional>
22 #include <memory>
23 #include <string>
24 
25 namespace gu
26 {
27     // URI schemes for networking
28     namespace scheme
29     {
30         const std::string tcp("tcp"); /// TCP scheme
31         const std::string udp("udp"); /// UDP scheme
32         const std::string ssl("ssl"); /// SSL scheme
33         const std::string def("tcp"); /// default scheme (TCP)
34     }
35 
36     namespace conf
37     {
38         // Enable dynamic socket support
39         const std::string socket_dynamic("socket.dynamic");
40     }
41 
42 #ifdef GALERA_HAVE_SSL
43     //
44     // SSL
45     //
46 
47     // Configuration options for sockets
48     namespace conf
49     {
50         /// Enable SSL explicitly
51         const std::string use_ssl("socket.ssl");
52         /// SSL cipher list
53         const std::string ssl_cipher("socket.ssl_cipher");
54         /// SSL compression algorithm
55         const std::string ssl_compression("socket.ssl_compression");
56         /// SSL private key file
57         const std::string ssl_key("socket.ssl_key");
58         /// SSL certificate file
59         const std::string ssl_cert("socket.ssl_cert");
60         /// SSL CA file
61         const std::string ssl_ca("socket.ssl_ca");
62         /// SSL password file
63         const std::string ssl_password_file("socket.ssl_password_file");
64         // SSL reload
65         const std::string ssl_reload("socket.ssl_reload");
66     }
67 
68 
69     // register ssl parameters to config
70     void ssl_register_params(gu::Config&);
71 
72     // initialize defaults, verify set options
73     void ssl_init_options(gu::Config&);
74 
75     // update ssl parameters
76     void ssl_param_set(const std::string&, const std::string&, gu::Config&);
77 #else
ssl_register_params(gu::Config &)78     static inline void ssl_register_params(gu::Config&) { }
ssl_init_options(gu::Config &)79     static inline void ssl_init_options(gu::Config&) { }
80 #endif // GALERA_HAVE_SSL
81 
82     //
83     // Address manipulation helpers
84     //
85 
86     /**
87      * @class AsioIpAddressV4
88      *
89      * A wrapper around asio::ip::address_v4
90      */
91     class AsioIpAddressV4
92     {
93     public:
94         AsioIpAddressV4();
95         AsioIpAddressV4(const AsioIpAddressV4&);
96         AsioIpAddressV4& operator=(AsioIpAddressV4);
97         ~AsioIpAddressV4();
98         bool is_multicast() const;
99         class Impl;
100         Impl& impl();
101         const Impl& impl() const;
102     private:
103         std::unique_ptr<Impl> impl_;
104     };
105 
106     /**
107      * @class AsioIpAddressV6
108      *
109      * A wrapper around asio::ip::address_v6
110      */
111     class AsioIpAddressV6
112     {
113     public:
114         AsioIpAddressV6();
115         AsioIpAddressV6(const AsioIpAddressV6&);
116         AsioIpAddressV6& operator=(AsioIpAddressV6);
117         ~AsioIpAddressV6();
118         bool is_link_local() const;
119         unsigned long scope_id() const;
120         bool is_multicast() const;
121         class Impl;
122         Impl& impl();
123         const Impl& impl() const;
124     private:
125         std::unique_ptr<Impl> impl_;
126     };
127 
128     /**
129      * @class AsioIpAddressV6
130      *
131      * A wrapper around asio::ip::address
132      */
133     class AsioIpAddress
134     {
135     public:
136         class Impl;
137         AsioIpAddress();
138         AsioIpAddress(const AsioIpAddress&);
139         AsioIpAddress& operator=(AsioIpAddress);
140         ~AsioIpAddress();
141         bool is_v4() const;
142         bool is_v6() const;
143         AsioIpAddressV4 to_v4() const;
144         AsioIpAddressV6 to_v6() const;
145         Impl& impl();
146         const Impl& impl() const;
147     private:
148         std::unique_ptr<Impl> impl_;
149     };
150 
151     // Return any address string.
152     std::string any_addr(const AsioIpAddress& addr);
153 
154     // Escape address string. Surrounds IPv6 address with [].
155     // IPv4 addresses not affected.
156     std::string escape_addr(const AsioIpAddress& addr);
157 
158     // Unescape address string. Remove [] from around the address if found.
159     std::string unescape_addr(const std::string& addr);
160 
161     // Construct asio::ip::address from address string
162     AsioIpAddress make_address(const std::string& addr);
163 
164 
165     class AsioMutableBuffer
166     {
167     public:
AsioMutableBuffer()168         AsioMutableBuffer() : data_(), size_() { }
AsioMutableBuffer(void * data,size_t size)169         AsioMutableBuffer(void* data, size_t size)
170             : data_(data)
171             , size_(size)
172         { }
data() const173         void* data() const { return  data_; }
size() const174         size_t size() const { return size_; }
175     private:
176         void* data_;
177         size_t size_;
178     };
179 
180     class AsioConstBuffer
181     {
182     public:
AsioConstBuffer()183         AsioConstBuffer()
184             : data_()
185             , size_()
186         { }
AsioConstBuffer(const void * data,size_t size)187         AsioConstBuffer(const void* data, size_t size)
188             : data_(data)
189             , size_(size)
190         { }
data() const191         const void* data() const { return data_; }
size() const192         size_t size() const { return size_; }
193     private:
194         const void* data_;
195         size_t size_;
196     };
197 
198 
199     class AsioErrorCategory;
200     class AsioErrorCode
201     {
202     public:
203         /**
204          * A default constructor. Constructs success error code.
205          */
206         AsioErrorCode();
207         /**
208          * A constructor to generate error codes from system error codes.
209          */
210         AsioErrorCode(int value);
211 
212         /**
213          * A constructor to generate error codes from asio errors.
214          */
AsioErrorCode(int value,const AsioErrorCategory & category)215         AsioErrorCode(int value, const AsioErrorCategory& category)
216             : value_(value)
217             , category_(&category)
218             , error_extra_()
219         { }
220 
AsioErrorCode(int value,const AsioErrorCategory & category,int error_extra)221         AsioErrorCode(int value, const AsioErrorCategory& category,
222                       int error_extra)
223             : value_(value)
224             , category_(&category)
225             , error_extra_(error_extra)
226         { }
227         /**
228          * Return error number.
229          */
value() const230         int value() const { return value_; }
231 
category() const232         const AsioErrorCategory* category() const { return category_; }
233 
234         /**
235          * Return human readable error message.
236          */
237         std::string message() const;
238 
operator bool() const239         operator bool() const { return value_; }
240 
operator !() const241         bool operator!() const { return value_ == 0; }
242 
243         /**
244          * Return true if the error is end of file.
245          */
246         bool is_eof() const;
247 
248         /**
249          * Return true if the error is system error.
250          */
251         bool is_system() const;
252 
253     private:
254         int value_;
255         const AsioErrorCategory* category_;
256         // Extra category specific error information
257         int error_extra_;
258     };
259 
260     std::ostream& operator<<(std::ostream&, const AsioErrorCode&);
261 
262     /*
263      * Helper to determine if the error code originates from an
264      * event which happens often and pollutes logs but for which the error
265      * does not provide any helpful information.
266      *
267      * Errors which happen frequently during cluster configuration changes
268      * and when connections break are considered verbose.
269      *
270      * Certain SSL errors such as 'short read' error are considered verbose.
271      *
272      * All errors which originate from TLS service hooks are considered
273      * verbose, it is up to application report them if appropriate.
274      */
275     bool is_verbose_error(const AsioErrorCode&);
276 
277     // TODO: Hide extra error info from public interface. It should be
278     // called internally by calls which produce human readable error messages.
279 #ifdef GALERA_HAVE_SSL
280     std::string extra_error_info(const gu::AsioErrorCode& ec);
281 #else // GALERA_HAVE_SSL
extra_error_info(const gu::AsioErrorCode &)282     static inline std::string extra_error_info(const gu::AsioErrorCode&)
283     { return ""; }
284 #endif // GALERA_HAVE_SSL
285     class AsioSocket;
286     /**
287      * Abstract interface for asio socket handlers.
288      */
289     class AsioSocketHandler
290     {
291     public:
~AsioSocketHandler()292         virtual ~AsioSocketHandler() { }
293         /**
294          * This will be called after asynchronous connection to the
295          * remote endpoint after call to AsioSocket::async_connect()
296          * completes.
297          *
298          * All internal protocol handshakes (e.g. SSL) will be completed
299          * before this handler is called.
300          *
301          * @param socket Reference to socket which initiated the call.
302          * @param ec     Error code.
303          */
304         virtual void connect_handler(AsioSocket& socket,
305                                      const AsioErrorCode& ec) = 0;
306         virtual void write_handler(AsioSocket&,
307                                    const AsioErrorCode&,
308                                    size_t bytes_transferred) = 0;
309         /**
310          * This call is called every time more data has been written
311          * into receive buffer submitted via async_read() call.
312          * The return value of the call should be maximum number of
313          * bytes to be transferred before the read is considered
314          * complete and read_handler() will be called.
315          *
316          * If the returned value is larger than the available space in
317          * read buffer, the maximum number of bytes to be transferred
318          * will be the available space in read buffer. It is up to application
319          * to resize the read buffer in read_handler() and restart async read
320          * if the available space was not enough to contain the whole message.
321          *
322          * @param socket Stream socket associated to this handler.
323          * @param ec Error code.
324          * @param bytes_transferred Number of bytes transferred so far.
325          *
326          * @return Maximum number of bytes to read to complete the
327          *         read operation.
328          */
329         virtual size_t read_completion_condition(AsioSocket& socket,
330                                                  const AsioErrorCode&,
331                                                  size_t bytes_transferred) = 0;
332         virtual void read_handler(AsioSocket&, const AsioErrorCode&,
333                                   size_t bytes_transferred) = 0;
334 
335     };
336 
337     /**
338      * @class AsioSocket
339      *
340      * Abstract interface for stream socket implementations.
341      *
342      * Although the interface provides both sync and async operations,
343      * those should never be mixed. If the socket is connected
344      * via connect() call (or accepted via AsioAcceptor::accept() call),
345      * the underlying implementation uses blocking calls for
346      * reading and writing.  On the other hand, if async_connect()
347      * or AsioAcceptor::async_accept() is used, the underlying implementation
348      * uses non-blocking operations.
349      */
350     class AsioSocket
351     {
352     public:
AsioSocket()353         AsioSocket() { }
354 
355         AsioSocket(const AsioSocket&) = delete;
356         AsioSocket& operator=(const AsioSocket&) = delete;
357 
~AsioSocket()358         virtual ~AsioSocket() { }
359 
360         /**
361          * Open the socket without connecting.
362          */
363         virtual void open(const gu::URI& uri) = 0;
364 
365         /**
366          * Return true if the socket is open.
367          */
368         virtual bool is_open() const = 0;
369 
370         /**
371          * Close the socket.
372          */
373         virtual void close() = 0;
374 
375         /**
376          * Bind the socket to interface specified by address.
377          */
378         virtual void bind(const gu::AsioIpAddress&) = 0;
379 
380         // Asynchronous operations
381 
382         virtual void async_connect(
383             const gu::URI& uri,
384             const std::shared_ptr<AsioSocketHandler>& handler) = 0;
385         /**
386          * Call once. Next call can be made after socket handler is called
387          * with bytes transferred equal to last write size.
388          */
389         virtual void async_write(
390             const std::array<AsioConstBuffer, 2>&,
391             const std::shared_ptr<AsioSocketHandler>& handler) = 0;
392 
393         /**
394          * Call once. Next call can be done from socket handler
395          * read_handler or read_completion_condition.
396          */
397         virtual void async_read(
398             const AsioMutableBuffer&,
399             const std::shared_ptr<AsioSocketHandler>& handler) = 0;
400 
401         // Synchronous operations
402 
403         /**
404          * Connect to remote endpoint specified by uri.
405          *
406          * @throw gu::Exception in case of error.
407          */
408         virtual void connect(const gu::URI& uri) = 0;
409 
410         /**
411          * Write contents of buffer into socket. This call blocks until
412          * all data has been written or error occurs.
413          *
414          * @throw gu::Exception in case of error.
415          */
416         virtual size_t write(const AsioConstBuffer& buffer) = 0;
417 
418         /**
419          * Read data from socket into buffer. The value returned is the
420          * number of bytes read so far.
421          *
422          * @throw gu::Exception in case of error.
423          */
424         virtual size_t read(const AsioMutableBuffer& buffer) = 0;
425 
426         // Utility operations.
427 
428         /**
429          * Return address URI of local endpoint. Return empty string
430          * if not connected.
431          */
432         virtual std::string local_addr() const = 0;
433 
434         /**
435          * Return address URI of remote endpoint. Returns empty string
436          * if not connected.
437          */
438         virtual std::string remote_addr() const = 0;
439 
440         /**
441          * Set receive buffer size for the socket. This must be called
442          * before the socket is connected/accepted.
443          */
444         virtual void set_receive_buffer_size(size_t) = 0;
445 
446         /**
447          * Return currently effective receive buffer size.
448          */
449         virtual size_t get_receive_buffer_size() = 0;
450 
451         /**
452          * Set send buffer size for the socket. This must be called
453          * before the socket is connected/accepted.
454          */
455         virtual void set_send_buffer_size(size_t) = 0;
456 
457         /**
458          * Return currently effective send buffer size.
459          */
460         virtual size_t get_send_buffer_size() = 0;
461 
462         /**
463          * Read tcp_info struct from the underlying TCP socket.
464          */
465 #ifndef __DragonFly__
466         virtual struct tcp_info get_tcp_info() = 0;
467 #endif
468     };
469 
470     /**
471      * Helper template to write buffer sequences.
472      *
473      * @todo This should probably be optimized by implementing
474      * AsioSocket::write() overload which takes iterators to
475      * buffer sequences.
476      */
477     template <class ConstBufferSequence>
write(AsioSocket & socket,const ConstBufferSequence & bufs)478     size_t write(AsioSocket& socket, const ConstBufferSequence& bufs)
479     {
480         size_t written(0);
481         for (auto b(bufs.begin()); b != bufs.end(); ++b)
482         {
483             if (b->size() > 0)
484             {
485                 written += socket.write(AsioConstBuffer(b->data(), b->size()));
486             }
487         }
488         return written;
489     }
490 
491     class AsioDatagramSocket;
492     class AsioDatagramSocketHandler
493     {
494     public:
~AsioDatagramSocketHandler()495         virtual ~AsioDatagramSocketHandler() { }
496         virtual void read_handler(AsioDatagramSocket&, const AsioErrorCode&,
497                                   size_t bytes_transferred) = 0;
498     };
499 
500     /**
501      * @class AsioDatagramSocket
502      *
503      * Abstract interface for datagram socket implementations.
504      */
505     class AsioDatagramSocket
506     {
507     public:
AsioDatagramSocket()508         AsioDatagramSocket() { }
~AsioDatagramSocket()509         virtual ~AsioDatagramSocket() { }
510 
511         /**
512          * Open the socket.
513          */
514         virtual void open(const URI&) = 0;
515 
516         /**
517          * Connect the socket to desired local endpoint. The socket
518          * will be bound to endpoint specified the uri. If the uri
519          * contains a multicast address, the connect will join the
520          * multicast group automatically.
521          */
522         virtual void connect(const URI& uri) = 0;
523 
524         /**
525          * Close the socket. If the socket was joined to multicast group,
526          * the group is left automatically.
527          */
528         virtual void close() = 0;
529 
530         /**
531          * Performa a write to the socket. The write is best effort only
532          * and the message can be dropped because of various reasons like
533          * kernel send buffer being full, network dropping the packet or
534          * receiving end(s) dropping the packet for whatever reason.
535          *
536          * The socket must be connected before writing into it.
537          * If connect() is not called, send_to() can be used to send
538          * datagram into desired address.
539          *
540          * @param bufs Array of two buffers.
541          *
542          * @throw gu::Exception If an other error than message being dropped
543          *        occurs, an exception containing the error code will be thrown.
544          */
545         virtual void write(const std::array<AsioConstBuffer, 2>& bufs) = 0;
546 
547         /**
548          * Send a datagram to destination given by target. Sending a
549          * message is best effort only, the message may be dropped
550          * because of whatever reason and no error is given if the
551          * target endpoint does not exist.
552          */
553         virtual void send_to(const std::array<AsioConstBuffer, 2>& bufs,
554                              const AsioIpAddress& target_host,
555                              unsigned short target_port) = 0;
556 
557         /**
558          * Start asynchronous read from the socket. The socket handler
559          * read_handler() method will be called for each complete message
560          * which has been received.
561          */
562         virtual void async_read(const AsioMutableBuffer&,
563                                 const std::shared_ptr<AsioDatagramSocketHandler>& handler) = 0;
564 
565         /**
566          * Return address containing the local endpoint where the socket
567          * was bound to.
568          *
569          * @todo Maybe this should be bind addr and corresponding call
570          * connected_addr() should be introduced.
571          */
572         virtual std::string local_addr() const = 0;
573     };
574 
575     class AsioAcceptor;
576     class AsioAcceptorHandler
577     {
578     public:
~AsioAcceptorHandler()579         virtual ~AsioAcceptorHandler() { }
580         virtual void accept_handler(AsioAcceptor&,
581                                     const std::shared_ptr<AsioSocket>&,
582                                     const gu::AsioErrorCode&) = 0;
583     };
584 
585     // Forward declaration for AsioAcceptor and make_socket()
586     class AsioStreamEngine;
587 
588 
589     /** @class AsioAcceptor
590      *
591      * Acceptor interface for stream sockets.
592      */
593     class AsioAcceptor
594     {
595     public:
AsioAcceptor()596         AsioAcceptor() { }
597         AsioAcceptor(const AsioAcceptor&) = delete;
598         AsioAcceptor& operator=(const AsioAcceptor&) = delete;
~AsioAcceptor()599         virtual ~AsioAcceptor() { }
600         virtual void open(const gu::URI& uri) = 0;
601         virtual void listen(const gu::URI& uri) = 0;
602         virtual void close() = 0;
603         virtual void async_accept(const std::shared_ptr<AsioAcceptorHandler>&,
604                                   const std::shared_ptr<AsioStreamEngine>& engine = nullptr) = 0;
605         virtual std::shared_ptr<AsioSocket> accept() = 0;
606         virtual std::string listen_addr() const = 0;
607         virtual unsigned short listen_port() const = 0;
608 
609         /**
610          * Set receive buffer size for the acceptor. This must be called
611          * before listening.
612          */
613         virtual void set_receive_buffer_size(size_t) = 0;
614 
615         /**
616          * Return currently effective receive buffer size.
617          */
618         virtual size_t get_receive_buffer_size() = 0;
619 
620         /**
621          * Set send buffer size for the acceptor. This must be called
622          * before listening.
623          */
624         virtual void set_send_buffer_size(size_t) = 0;
625 
626         virtual size_t get_send_buffer_size() = 0;
627 
628     };
629 
630     class AsioIoService
631     {
632     public:
633         AsioIoService(const gu::Config& conf = gu::Config());
634         ~AsioIoService();
635         AsioIoService(const AsioIoService&) = delete;
636         AsioIoService operator=(const AsioIoService&) = delete;
637 
638         /**
639          * Handle global signals.
640          */
641         void handle_signal(const gu::Signals::SignalType&);
642 
643         /**
644          * Is dynamic socket enabled
645          */
dynamic_socket_enabled() const646         bool dynamic_socket_enabled() const
647         {
648             return dynamic_socket_;
649         }
650 
651         /**
652          * Is SSL enabled and configured
653          */
654         bool ssl_enabled() const;
655 
656         /**
657          * Load crypto context.
658          */
659         void load_crypto_context();
660 
661         /**
662          * Run one IO service handler.
663          */
664         void run_one();
665 
666         /**
667          * Run until IO service is stopped or runs out of work.
668          */
669         void run();
670 
671         /**
672          * Post a function for execution. The function will be invoked
673          * from inside run() or run_one().
674          */
675         void post(std::function<void()>);
676 
677         /**
678          * Stop the IO service processing loop and return from run_one()
679          * or run() calls as soon as possible. Call to reset() is required
680          * to start processing via run_one() or run() after stop() has
681          * been called.
682          */
683         void stop();
684 
685         /**
686          * Reset the IO service for subsequent call to run() or run_one().
687          * This function must not be called from inside run() or run_one().
688          */
689         void reset();
690 
691         /**
692          * Make a new socket. The underlying transport will be
693          * a stream socket (TCP, SSL).
694          *
695          * @param uri An URI describing a desired socket scheme.
696          * @param handler Pointer to socket handler implementation.
697          *
698          * @return Shared pointer to AsioSocket.
699          */
700         std::shared_ptr<AsioSocket> make_socket(
701             const gu::URI& uri,
702             const std::shared_ptr<AsioStreamEngine>& engine = nullptr);
703 
704         /**
705          * Make a new datagram socket. The underlying transport
706          * will be a datagram socket (UDP).
707          *
708          * @param uri An URI describing a desired socket scheme.
709          * @param handler Pointer to socket handler implementation.
710          *
711          * @return Shared pointer to AsioDatagramSocket.
712          */
713         std::shared_ptr<AsioDatagramSocket> make_datagram_socket(
714             const gu::URI& uri);
715 
716         /**
717          * Make a new acceptor.
718          *
719          * @param uri Uri describing a desired socket scheme.
720          * @param acceptor_handler Pointer to acceptor handler implementation.
721          * @param socket_handler Pointer to socket handler implementation.
722          *
723          * @return Shared pointer to AsioSocketAcceptor.
724          */
725         std::shared_ptr<AsioAcceptor> make_acceptor(const gu::URI& uri);
726 
727         class Impl;
728         Impl& impl();
729     private:
730         std::unique_ptr<Impl> impl_;
731         const gu::Config& conf_;
732         gu::Signals::signal_connection signal_connection_;
733         bool dynamic_socket_;
734     };
735 
736     class AsioSteadyTimerHandler
737     {
738     public:
~AsioSteadyTimerHandler()739         virtual ~AsioSteadyTimerHandler() { }
740         virtual void handle_wait(const AsioErrorCode&) = 0;
741     };
742 
743 #if (__GNUC__ == 4 && __GNUC_MINOR__ == 4)
744     typedef std::chrono::monotonic_clock AsioClock;
745 #else
746     typedef std::chrono::steady_clock AsioClock;
747 #endif // (__GNUC__ == 4 && __GNUC_MINOR__ == 4)
748 
749     class AsioSteadyTimer
750     {
751     public:
752         AsioSteadyTimer(AsioIoService& io_service);
753         ~AsioSteadyTimer();
754         AsioSteadyTimer(const AsioSteadyTimer&) = delete;
755         AsioSteadyTimer& operator=(const AsioSteadyTimer&) = delete;
756         void expires_from_now(const AsioClock::duration&);
757         void async_wait(const std::shared_ptr<AsioSteadyTimerHandler>&);
758         void cancel();
759     private:
760         class Impl;
761         std::unique_ptr<Impl> impl_;
762     };
763 }
764 
765 #endif // GU_ASIO_HPP
766