1 /*
2  * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 #include "ngs/interface/connection_acceptor_interface.h"
27 #include "ngs_common/connection_vio.h"
28 #include "ngs_common/operations_factory.h"
29 #include <algorithm>
30 #include "ngs/socket_events.h"
31 
32 // Surpressing numerous warnings generated by libevent on Windows.
33 #ifdef WIN32
34 #pragma warning(push)
35 #pragma warning(disable: 4005)
36 #endif // WIN32
37 #include <event.h> // libevent
38 #ifdef WIN32
39 #pragma warning(pop)
40 #endif // WIN32
41 
42 namespace ngs {
43 
44 class Connection_acceptor_socket : public Connection_acceptor_interface {
45 public:
46   typedef Socket_interface::Shared_ptr Socket_ptr;
47 
Connection_acceptor_socket(Socket_ptr listener,System_interface & system_interface)48   Connection_acceptor_socket(Socket_ptr listener, System_interface &system_interface)
49   : m_socket_listener(listener),
50     m_system_interface(system_interface) {
51   }
52 
accept()53   Vio *accept() {
54     Vio *vio;
55     sockaddr_storage accept_address;
56     MYSQL_SOCKET sock = MYSQL_INVALID_SOCKET;
57 
58     for (int i = 0; i < MAX_ACCEPT_REATTEMPT; ++i) {
59       socklen_t accept_len = sizeof(accept_address);
60 
61       sock = m_socket_listener->accept(KEY_socket_x_client_connection, (struct sockaddr*)&accept_address, &accept_len);
62 
63       if (INVALID_SOCKET != mysql_socket_getfd(sock))
64         break;
65 
66       const int error_code  = m_system_interface.get_socket_errno();
67       if (error_code != SOCKET_EINTR &&
68           error_code != SOCKET_EAGAIN)
69         return NULL;
70     }
71 
72     const bool is_tcpip = (accept_address.ss_family == AF_INET || accept_address.ss_family == AF_INET6);
73     vio = mysql_socket_vio_new(sock, is_tcpip ? VIO_TYPE_TCPIP : VIO_TYPE_SOCKET, 0);
74     if (!vio)
75       throw std::bad_alloc();
76 
77     // enable TCP_NODELAY
78     vio_fastsend(vio);
79     vio_keepalive(vio, TRUE);
80 
81     return vio;
82   }
83 
84 private:
85   Socket_ptr m_socket_listener;
86   System_interface &m_system_interface;
87   static const int MAX_ACCEPT_REATTEMPT = 10;
88 };
89 
90 
91 struct Socket_events::Timer_data {
92   ngs::function<bool ()> callback;
93   event ev;
94   timeval tv;
95   Socket_events *self;
96 
freengs::Socket_events::Timer_data97   static void free(Timer_data *data) {
98     evtimer_del(&data->ev);
99     ngs::free_object(data);
100   }
101 };
102 
103 
104 struct Socket_events::Socket_data {
105   ngs::function<void (Connection_acceptor_interface &)> callback;
106   event ev;
107   Socket_interface::Shared_ptr socket;
108 
freengs::Socket_events::Socket_data109   static void free(Socket_data *data) {
110     event_del(&data->ev);
111     ngs::free_object(data);
112   }
113 };
114 
115 
Socket_events()116 Socket_events::Socket_events() {
117   m_evbase = event_base_new();
118 
119   if (!m_evbase)
120     throw std::bad_alloc();
121 }
122 
~Socket_events()123 Socket_events::~Socket_events() {
124   std::for_each(m_timer_events.begin(),
125                 m_timer_events.end(),
126                 &Timer_data::free);
127 
128   std::for_each(m_socket_events.begin(),
129                 m_socket_events.end(),
130                 &Socket_data::free);
131 
132   event_base_free(m_evbase);
133 }
134 
listen(Socket_interface::Shared_ptr sock,ngs::function<void (Connection_acceptor_interface &)> callback)135 bool Socket_events::listen(Socket_interface::Shared_ptr sock, ngs::function<void (Connection_acceptor_interface &)> callback) {
136   m_socket_events.push_back(ngs::allocate_object<Socket_data>());
137   Socket_data *socket_event = m_socket_events.back();
138 
139   socket_event->callback = callback;
140   socket_event->socket = sock;
141 
142   event_set(&socket_event->ev, static_cast<int>(sock->get_socket_fd()), EV_READ|EV_PERSIST, &Socket_events::socket_data_avaiable, socket_event);
143   event_base_set(m_evbase, &socket_event->ev);
144 
145   return 0 == event_add(&socket_event->ev, NULL);
146 }
147 
148 /** Register a callback to be executed in a fixed time interval.
149 
150 The callback is called from the server's event loop thread, until either
151 the server is stopped or the callback returns false.
152 
153 NOTE: This method may only be called from the same thread as the event loop.
154 */
add_timer(const std::size_t delay_ms,ngs::function<bool ()> callback)155 void Socket_events::add_timer(const std::size_t delay_ms, ngs::function<bool ()> callback) {
156   Timer_data *data = ngs::allocate_object<Timer_data>();
157   data->tv.tv_sec = static_cast<long>(delay_ms / 1000);
158   data->tv.tv_usec = (delay_ms % 1000) * 1000;
159   data->callback = callback;
160   data->self = this;
161   //XXX use persistent timer events after switch to libevent2
162   evtimer_set(&data->ev, timeout_call, data);
163   event_base_set(m_evbase, &data->ev);
164   evtimer_add(&data->ev, &data->tv);
165 
166   Mutex_lock lock(m_timers_mutex);
167   m_timer_events.push_back(data);
168 }
169 
loop()170 void Socket_events::loop() {
171   event_base_loop(m_evbase, 0);
172 }
173 
break_loop()174 void Socket_events::break_loop() {
175   event_base_loopbreak(m_evbase);
176 }
177 
timeout_call(socket_type sock,short which,void * arg)178 void Socket_events::timeout_call(socket_type sock, short which, void *arg) {
179   Timer_data *data = (Timer_data*)arg;
180 
181   if (!data->callback()) {
182     evtimer_del(&data->ev);
183 
184     {
185       Mutex_lock timer_lock(data->self->m_timers_mutex);
186       data->self->m_timer_events.erase(std::remove(data->self->m_timer_events.begin(), data->self->m_timer_events.end(), data),
187                 data->self->m_timer_events.end());
188     }
189 
190     ngs::free_object(data);
191   }
192   else {
193     // schedule for another round
194     evtimer_add(&data->ev, &data->tv);
195   }
196 }
197 
socket_data_avaiable(socket_type sock,short which,void * arg)198 void Socket_events::socket_data_avaiable(socket_type sock, short which, void *arg) {
199   Socket_data *data = (Socket_data*)arg;
200   Operations_factory operations_factory;
201   System_interface::Shared_ptr system_interface(operations_factory.create_system_interface());
202   Connection_acceptor_socket acceptor(data->socket, *system_interface);
203 
204   data->callback(acceptor);
205 }
206 
207 }  // namespace ngs
208