1 /*
2 * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 * 02110-1301 USA
24 */
25
26 #include "ngs/interface/connection_acceptor_interface.h"
27 #include "ngs_common/connection_vio.h"
28 #include "ngs_common/operations_factory.h"
29 #include <algorithm>
30 #include "ngs/socket_events.h"
31
32 // Surpressing numerous warnings generated by libevent on Windows.
33 #ifdef WIN32
34 #pragma warning(push)
35 #pragma warning(disable: 4005)
36 #endif // WIN32
37 #include <event.h> // libevent
38 #ifdef WIN32
39 #pragma warning(pop)
40 #endif // WIN32
41
42 namespace ngs {
43
44 class Connection_acceptor_socket : public Connection_acceptor_interface {
45 public:
46 typedef Socket_interface::Shared_ptr Socket_ptr;
47
Connection_acceptor_socket(Socket_ptr listener,System_interface & system_interface)48 Connection_acceptor_socket(Socket_ptr listener, System_interface &system_interface)
49 : m_socket_listener(listener),
50 m_system_interface(system_interface) {
51 }
52
accept()53 Vio *accept() {
54 Vio *vio;
55 sockaddr_storage accept_address;
56 MYSQL_SOCKET sock = MYSQL_INVALID_SOCKET;
57
58 for (int i = 0; i < MAX_ACCEPT_REATTEMPT; ++i) {
59 socklen_t accept_len = sizeof(accept_address);
60
61 sock = m_socket_listener->accept(KEY_socket_x_client_connection, (struct sockaddr*)&accept_address, &accept_len);
62
63 if (INVALID_SOCKET != mysql_socket_getfd(sock))
64 break;
65
66 const int error_code = m_system_interface.get_socket_errno();
67 if (error_code != SOCKET_EINTR &&
68 error_code != SOCKET_EAGAIN)
69 return NULL;
70 }
71
72 const bool is_tcpip = (accept_address.ss_family == AF_INET || accept_address.ss_family == AF_INET6);
73 vio = mysql_socket_vio_new(sock, is_tcpip ? VIO_TYPE_TCPIP : VIO_TYPE_SOCKET, 0);
74 if (!vio)
75 throw std::bad_alloc();
76
77 // enable TCP_NODELAY
78 vio_fastsend(vio);
79 vio_keepalive(vio, TRUE);
80
81 return vio;
82 }
83
84 private:
85 Socket_ptr m_socket_listener;
86 System_interface &m_system_interface;
87 static const int MAX_ACCEPT_REATTEMPT = 10;
88 };
89
90
91 struct Socket_events::Timer_data {
92 ngs::function<bool ()> callback;
93 event ev;
94 timeval tv;
95 Socket_events *self;
96
freengs::Socket_events::Timer_data97 static void free(Timer_data *data) {
98 evtimer_del(&data->ev);
99 ngs::free_object(data);
100 }
101 };
102
103
104 struct Socket_events::Socket_data {
105 ngs::function<void (Connection_acceptor_interface &)> callback;
106 event ev;
107 Socket_interface::Shared_ptr socket;
108
freengs::Socket_events::Socket_data109 static void free(Socket_data *data) {
110 event_del(&data->ev);
111 ngs::free_object(data);
112 }
113 };
114
115
Socket_events()116 Socket_events::Socket_events() {
117 m_evbase = event_base_new();
118
119 if (!m_evbase)
120 throw std::bad_alloc();
121 }
122
~Socket_events()123 Socket_events::~Socket_events() {
124 std::for_each(m_timer_events.begin(),
125 m_timer_events.end(),
126 &Timer_data::free);
127
128 std::for_each(m_socket_events.begin(),
129 m_socket_events.end(),
130 &Socket_data::free);
131
132 event_base_free(m_evbase);
133 }
134
listen(Socket_interface::Shared_ptr sock,ngs::function<void (Connection_acceptor_interface &)> callback)135 bool Socket_events::listen(Socket_interface::Shared_ptr sock, ngs::function<void (Connection_acceptor_interface &)> callback) {
136 m_socket_events.push_back(ngs::allocate_object<Socket_data>());
137 Socket_data *socket_event = m_socket_events.back();
138
139 socket_event->callback = callback;
140 socket_event->socket = sock;
141
142 event_set(&socket_event->ev, static_cast<int>(sock->get_socket_fd()), EV_READ|EV_PERSIST, &Socket_events::socket_data_avaiable, socket_event);
143 event_base_set(m_evbase, &socket_event->ev);
144
145 return 0 == event_add(&socket_event->ev, NULL);
146 }
147
148 /** Register a callback to be executed in a fixed time interval.
149
150 The callback is called from the server's event loop thread, until either
151 the server is stopped or the callback returns false.
152
153 NOTE: This method may only be called from the same thread as the event loop.
154 */
add_timer(const std::size_t delay_ms,ngs::function<bool ()> callback)155 void Socket_events::add_timer(const std::size_t delay_ms, ngs::function<bool ()> callback) {
156 Timer_data *data = ngs::allocate_object<Timer_data>();
157 data->tv.tv_sec = static_cast<long>(delay_ms / 1000);
158 data->tv.tv_usec = (delay_ms % 1000) * 1000;
159 data->callback = callback;
160 data->self = this;
161 //XXX use persistent timer events after switch to libevent2
162 evtimer_set(&data->ev, timeout_call, data);
163 event_base_set(m_evbase, &data->ev);
164 evtimer_add(&data->ev, &data->tv);
165
166 Mutex_lock lock(m_timers_mutex);
167 m_timer_events.push_back(data);
168 }
169
loop()170 void Socket_events::loop() {
171 event_base_loop(m_evbase, 0);
172 }
173
break_loop()174 void Socket_events::break_loop() {
175 event_base_loopbreak(m_evbase);
176 }
177
timeout_call(socket_type sock,short which,void * arg)178 void Socket_events::timeout_call(socket_type sock, short which, void *arg) {
179 Timer_data *data = (Timer_data*)arg;
180
181 if (!data->callback()) {
182 evtimer_del(&data->ev);
183
184 {
185 Mutex_lock timer_lock(data->self->m_timers_mutex);
186 data->self->m_timer_events.erase(std::remove(data->self->m_timer_events.begin(), data->self->m_timer_events.end(), data),
187 data->self->m_timer_events.end());
188 }
189
190 ngs::free_object(data);
191 }
192 else {
193 // schedule for another round
194 evtimer_add(&data->ev, &data->tv);
195 }
196 }
197
socket_data_avaiable(socket_type sock,short which,void * arg)198 void Socket_events::socket_data_avaiable(socket_type sock, short which, void *arg) {
199 Socket_data *data = (Socket_data*)arg;
200 Operations_factory operations_factory;
201 System_interface::Shared_ptr system_interface(operations_factory.create_system_interface());
202 Connection_acceptor_socket acceptor(data->socket, *system_interface);
203
204 data->callback(acceptor);
205 }
206
207 } // namespace ngs
208