1 /*
2 Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <ndb_global.h>
27
28 #include <SocketServer.hpp>
29
30 #include <NdbTCP.h>
31 #include <NdbOut.hpp>
32 #include <NdbThread.h>
33 #include <NdbSleep.h>
34 #include <NdbTick.h>
35 #include "ndb_socket.h"
36 #include <OwnProcessInfo.hpp>
37
38 #if 0
39 #define DEBUG_FPRINTF(arglist) do { fprintf arglist ; } while (0)
40 #else
41 #define DEBUG_FPRINTF(a)
42 #endif
43
SocketServer(unsigned maxSessions)44 SocketServer::SocketServer(unsigned maxSessions) :
45 m_sessions(10),
46 m_services(5),
47 m_maxSessions(maxSessions),
48 m_stopThread(false),
49 m_thread(0)
50 {
51 }
52
~SocketServer()53 SocketServer::~SocketServer() {
54 unsigned i;
55 for(i = 0; i<m_sessions.size(); i++){
56 Session* session= m_sessions[i].m_session;
57 assert(session->m_refCount == 0);
58 delete session;
59 }
60 for(i = 0; i<m_services.size(); i++){
61 if(ndb_socket_valid(m_services[i].m_socket))
62 ndb_socket_close(m_services[i].m_socket);
63 delete m_services[i].m_service;
64 }
65 }
66
67 bool
tryBind(unsigned short port,const char * intface)68 SocketServer::tryBind(unsigned short port, const char * intface) {
69 struct sockaddr_in servaddr;
70 memset(&servaddr, 0, sizeof(servaddr));
71 servaddr.sin_family = AF_INET;
72 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
73 servaddr.sin_port = htons(port);
74
75 if(intface != 0){
76 if(Ndb_getInAddr(&servaddr.sin_addr, intface))
77 return false;
78 }
79
80 const NDB_SOCKET_TYPE sock = ndb_socket_create(AF_INET, SOCK_STREAM, 0);
81 if (!ndb_socket_valid(sock))
82 return false;
83
84 DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
85 MY_SOCKET_FORMAT_VALUE(sock)));
86
87 if (ndb_socket_reuseaddr(sock, true) == -1)
88 {
89 ndb_socket_close(sock);
90 return false;
91 }
92
93 if (ndb_bind_inet(sock, &servaddr) == -1) {
94 ndb_socket_close(sock);
95 return false;
96 }
97
98 ndb_socket_close(sock);
99 return true;
100 }
101
102 #define MAX_SOCKET_SERVER_TCP_BACKLOG 64
103 bool
setup(SocketServer::Service * service,unsigned short * port,const char * intface)104 SocketServer::setup(SocketServer::Service * service,
105 unsigned short * port,
106 const char * intface){
107 DBUG_ENTER("SocketServer::setup");
108 DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
109 struct sockaddr_in servaddr;
110 memset(&servaddr, 0, sizeof(servaddr));
111 servaddr.sin_family = AF_INET;
112 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
113 servaddr.sin_port = htons(*port);
114
115 if(intface != 0){
116 if(Ndb_getInAddr(&servaddr.sin_addr, intface))
117 DBUG_RETURN(false);
118 }
119
120 const NDB_SOCKET_TYPE sock = ndb_socket_create(AF_INET, SOCK_STREAM, 0);
121 if (!ndb_socket_valid(sock))
122 {
123 DBUG_PRINT("error",("socket() - %d - %s",
124 socket_errno, strerror(socket_errno)));
125 DBUG_RETURN(false);
126 }
127
128 DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
129 MY_SOCKET_FORMAT_VALUE(sock)));
130
131 if (ndb_socket_reuseaddr(sock, true) == -1)
132 {
133 DBUG_PRINT("error",("setsockopt() - %d - %s",
134 errno, strerror(errno)));
135 ndb_socket_close(sock);
136 DBUG_RETURN(false);
137 }
138
139 if (ndb_bind_inet(sock, &servaddr) == -1) {
140 DBUG_PRINT("error",("bind() - %d - %s",
141 socket_errno, strerror(socket_errno)));
142 ndb_socket_close(sock);
143 DBUG_RETURN(false);
144 }
145
146 /* Get the address and port we bound to */
147 struct sockaddr_in serv_addr;
148 ndb_socket_len_t addr_len = sizeof(serv_addr);
149 if(ndb_getsockname(sock, (struct sockaddr *) &serv_addr, &addr_len))
150 {
151 ndbout_c("An error occurred while trying to find out what"
152 " port we bound to. Error: %d - %s",
153 ndb_socket_errno(), strerror(ndb_socket_errno()));
154 ndb_socket_close(sock);
155 DBUG_RETURN(false);
156 }
157 *port = ntohs(serv_addr.sin_port);
158 setOwnProcessInfoServerAddress(& serv_addr.sin_addr);
159
160 DBUG_PRINT("info",("bound to %u", *port));
161
162 if (ndb_listen(sock, m_maxSessions > MAX_SOCKET_SERVER_TCP_BACKLOG ?
163 MAX_SOCKET_SERVER_TCP_BACKLOG : m_maxSessions) == -1)
164 {
165 DBUG_PRINT("error",("listen() - %d - %s",
166 socket_errno, strerror(socket_errno)));
167 ndb_socket_close(sock);
168 DBUG_RETURN(false);
169 }
170
171 DEBUG_FPRINTF((stderr, "Listening on port: %u\n",
172 (Uint32)*port));
173
174 ServiceInstance i;
175 i.m_socket = sock;
176 i.m_service = service;
177 m_services.push_back(i);
178
179 // Increase size to allow polling all listening ports
180 m_services_poller.set_max_count(m_services.size());
181
182 DBUG_RETURN(true);
183 }
184
185 bool
doAccept()186 SocketServer::doAccept()
187 {
188 m_services.lock();
189
190 m_services_poller.clear();
191 for (unsigned i = 0; i < m_services.size(); i++)
192 {
193 m_services_poller.add(m_services[i].m_socket, true, false, true);
194 }
195 assert(m_services.size() == m_services_poller.count());
196
197 const int accept_timeout_ms = 1000;
198 const int ret = m_services_poller.poll(accept_timeout_ms);
199 if (ret < 0)
200 {
201 // Error occurred, indicate error to caller by returning false
202 m_services.unlock();
203 return false;
204 }
205
206 if (ret == 0)
207 {
208 // Timeout occurred
209 m_services.unlock();
210 return true;
211 }
212
213 bool result = true;
214 for (unsigned i = 0; i < m_services_poller.count(); i++)
215 {
216 const bool has_read = m_services_poller.has_read(i);
217
218 if (!has_read)
219 continue; // Ignore events where read flag wasn't set
220
221 ServiceInstance & si = m_services[i];
222 assert(m_services_poller.is_socket_equal(i, si.m_socket));
223
224 const NDB_SOCKET_TYPE childSock = ndb_accept(si.m_socket, 0, 0);
225 if (!ndb_socket_valid(childSock))
226 {
227 // Could not 'accept' socket(maybe at max fds), indicate error
228 // to caller by returning false
229 result = false;
230 continue;
231 }
232
233 SessionInstance s;
234 s.m_service = si.m_service;
235 s.m_session = si.m_service->newSession(childSock);
236 if (s.m_session != 0)
237 {
238 m_session_mutex.lock();
239 m_sessions.push_back(s);
240 startSession(m_sessions.back());
241 m_session_mutex.unlock();
242 }
243 }
244
245 m_services.unlock();
246 return result;
247 }
248
249 extern "C"
250 void*
socketServerThread_C(void * _ss)251 socketServerThread_C(void* _ss){
252 SocketServer * ss = (SocketServer *)_ss;
253 ss->doRun();
254 return 0;
255 }
256
257 struct NdbThread*
startServer()258 SocketServer::startServer()
259 {
260 m_threadLock.lock();
261 if(m_thread == 0 && m_stopThread == false)
262 {
263 m_thread = NdbThread_Create(socketServerThread_C,
264 (void**)this,
265 0, // default stack size
266 "NdbSockServ",
267 NDB_THREAD_PRIO_LOW);
268 }
269 m_threadLock.unlock();
270 return m_thread;
271 }
272
273 void
stopServer()274 SocketServer::stopServer(){
275 m_threadLock.lock();
276 if(m_thread != 0){
277 m_stopThread = true;
278
279 void * res;
280 NdbThread_WaitFor(m_thread, &res);
281 NdbThread_Destroy(&m_thread);
282 m_thread = 0;
283 }
284 m_threadLock.unlock();
285 }
286
287 void
doRun()288 SocketServer::doRun(){
289
290 while(!m_stopThread){
291 m_session_mutex.lock();
292 checkSessionsImpl();
293 m_session_mutex.unlock();
294
295 if(m_sessions.size() >= m_maxSessions){
296 // Don't accept more connections yet
297 DEBUG_FPRINTF((stderr, "Too many connections\n"));
298 NdbSleep_MilliSleep(200);
299 continue;
300 }
301
302 if (!doAccept()){
303 // accept failed, step back
304 DEBUG_FPRINTF((stderr, "Accept failed\n"));
305 NdbSleep_MilliSleep(200);
306 }
307 }
308 }
309
310 void
startSession(SessionInstance & si)311 SocketServer::startSession(SessionInstance & si){
312 si.m_thread = NdbThread_Create(sessionThread_C,
313 (void**)si.m_session,
314 0, // default stack size
315 "NdbSock_Session",
316 NDB_THREAD_PRIO_LOW);
317 }
318
319 void
foreachSession(void (* func)(SocketServer::Session *,void *),void * data)320 SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *),
321 void *data)
322 {
323 // Build a list of pointers to all active sessions
324 // and increase refcount on the sessions
325 m_session_mutex.lock();
326 Vector<Session*> session_pointers(m_sessions.size());
327 for(unsigned i= 0; i < m_sessions.size(); i++){
328 Session* session= m_sessions[i].m_session;
329 session_pointers.push_back(session);
330 session->m_refCount++;
331 }
332 m_session_mutex.unlock();
333
334 // Call the function on each session
335 for(unsigned i= 0; i < session_pointers.size(); i++){
336 (*func)(session_pointers[i], data);
337 }
338
339 // Release the sessions pointers and any stopped sessions
340 m_session_mutex.lock();
341 for(unsigned i= 0; i < session_pointers.size(); i++){
342 Session* session= session_pointers[i];
343 assert(session->m_refCount > 0);
344 session->m_refCount--;
345 }
346 checkSessionsImpl();
347 m_session_mutex.unlock();
348 }
349
350 void
checkSessions()351 SocketServer::checkSessions()
352 {
353 m_session_mutex.lock();
354 checkSessionsImpl();
355 m_session_mutex.unlock();
356 }
357
358 void
checkSessionsImpl()359 SocketServer::checkSessionsImpl()
360 {
361 for(int i = m_sessions.size() - 1; i >= 0; i--)
362 {
363 if(m_sessions[i].m_session->m_thread_stopped &&
364 (m_sessions[i].m_session->m_refCount == 0))
365 {
366 if(m_sessions[i].m_thread != 0)
367 {
368 void* ret;
369 NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
370 NdbThread_Destroy(&m_sessions[i].m_thread);
371 }
372 m_sessions[i].m_session->stopSession();
373 delete m_sessions[i].m_session;
374 m_sessions.erase(i);
375 }
376 }
377 }
378
379 bool
stopSessions(bool wait,unsigned wait_timeout)380 SocketServer::stopSessions(bool wait, unsigned wait_timeout){
381 int i;
382 m_session_mutex.lock();
383 for(i = m_sessions.size() - 1; i>=0; i--)
384 {
385 m_sessions[i].m_session->stopSession();
386 }
387 m_session_mutex.unlock();
388
389 for(i = m_services.size() - 1; i>=0; i--)
390 m_services[i].m_service->stopSessions();
391
392 if(!wait)
393 return false; // No wait
394
395 const NDB_TICKS start = NdbTick_getCurrentTicks();
396 m_session_mutex.lock();
397 while(m_sessions.size() > 0){
398 checkSessionsImpl();
399 m_session_mutex.unlock();
400
401 if (wait_timeout > 0 &&
402 NdbTick_Elapsed(start,NdbTick_getCurrentTicks()).milliSec() > wait_timeout)
403 return false; // Wait abandoned
404
405 NdbSleep_MilliSleep(100);
406 m_session_mutex.lock();
407 }
408 m_session_mutex.unlock();
409 return true; // All sessions gone
410 }
411
412
413 /***** Session code ******/
414
415 extern "C"
416 void*
sessionThread_C(void * _sc)417 sessionThread_C(void* _sc){
418 SocketServer::Session * si = (SocketServer::Session *)_sc;
419
420 assert(si->m_thread_stopped == false);
421
422 if(!si->m_stop)
423 si->runSession();
424 else
425 {
426 ndb_socket_close(si->m_socket);
427 ndb_socket_invalidate(&si->m_socket);
428 }
429
430 // Mark the thread as stopped to allow the
431 // session resources to be released
432 si->m_thread_stopped = true;
433 return 0;
434 }
435
436 template class MutexVector<SocketServer::ServiceInstance>;
437 template class Vector<SocketServer::SessionInstance>;
438 template class Vector<SocketServer::Session*>;
439