1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <ndb_global.h>
27 
28 #include <SocketServer.hpp>
29 
30 #include <NdbTCP.h>
31 #include <NdbOut.hpp>
32 #include <NdbThread.h>
33 #include <NdbSleep.h>
34 #include <NdbTick.h>
35 #include "ndb_socket.h"
36 #include <OwnProcessInfo.hpp>
37 
38 #if 0
39 #define DEBUG_FPRINTF(arglist) do { fprintf arglist ; } while (0)
40 #else
41 #define DEBUG_FPRINTF(a)
42 #endif
43 
SocketServer(unsigned maxSessions)44 SocketServer::SocketServer(unsigned maxSessions) :
45   m_sessions(10),
46   m_services(5),
47   m_maxSessions(maxSessions),
48   m_stopThread(false),
49   m_thread(0)
50 {
51 }
52 
~SocketServer()53 SocketServer::~SocketServer() {
54   unsigned i;
55   for(i = 0; i<m_sessions.size(); i++){
56     Session* session= m_sessions[i].m_session;
57     assert(session->m_refCount == 0);
58     delete session;
59   }
60   for(i = 0; i<m_services.size(); i++){
61     if(ndb_socket_valid(m_services[i].m_socket))
62       ndb_socket_close(m_services[i].m_socket);
63     delete m_services[i].m_service;
64   }
65 }
66 
67 bool
tryBind(unsigned short port,const char * intface)68 SocketServer::tryBind(unsigned short port, const char * intface) {
69   struct sockaddr_in servaddr;
70   memset(&servaddr, 0, sizeof(servaddr));
71   servaddr.sin_family = AF_INET;
72   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
73   servaddr.sin_port = htons(port);
74 
75   if(intface != 0){
76     if(Ndb_getInAddr(&servaddr.sin_addr, intface))
77       return false;
78   }
79 
80   const NDB_SOCKET_TYPE sock = ndb_socket_create(AF_INET, SOCK_STREAM, 0);
81   if (!ndb_socket_valid(sock))
82     return false;
83 
84   DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
85                      MY_SOCKET_FORMAT_VALUE(sock)));
86 
87   if (ndb_socket_reuseaddr(sock, true) == -1)
88   {
89     ndb_socket_close(sock);
90     return false;
91   }
92 
93   if (ndb_bind_inet(sock, &servaddr) == -1) {
94     ndb_socket_close(sock);
95     return false;
96   }
97 
98   ndb_socket_close(sock);
99   return true;
100 }
101 
102 #define MAX_SOCKET_SERVER_TCP_BACKLOG 64
103 bool
setup(SocketServer::Service * service,unsigned short * port,const char * intface)104 SocketServer::setup(SocketServer::Service * service,
105 		    unsigned short * port,
106 		    const char * intface){
107   DBUG_ENTER("SocketServer::setup");
108   DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
109   struct sockaddr_in servaddr;
110   memset(&servaddr, 0, sizeof(servaddr));
111   servaddr.sin_family = AF_INET;
112   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
113   servaddr.sin_port = htons(*port);
114 
115   if(intface != 0){
116     if(Ndb_getInAddr(&servaddr.sin_addr, intface))
117       DBUG_RETURN(false);
118   }
119 
120   const NDB_SOCKET_TYPE sock = ndb_socket_create(AF_INET, SOCK_STREAM, 0);
121   if (!ndb_socket_valid(sock))
122   {
123     DBUG_PRINT("error",("socket() - %d - %s",
124 			socket_errno, strerror(socket_errno)));
125     DBUG_RETURN(false);
126   }
127 
128   DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
129                      MY_SOCKET_FORMAT_VALUE(sock)));
130 
131   if (ndb_socket_reuseaddr(sock, true) == -1)
132   {
133     DBUG_PRINT("error",("setsockopt() - %d - %s",
134 			errno, strerror(errno)));
135     ndb_socket_close(sock);
136     DBUG_RETURN(false);
137   }
138 
139   if (ndb_bind_inet(sock, &servaddr) == -1) {
140     DBUG_PRINT("error",("bind() - %d - %s",
141 			socket_errno, strerror(socket_errno)));
142     ndb_socket_close(sock);
143     DBUG_RETURN(false);
144   }
145 
146   /* Get the address and port we bound to */
147   struct sockaddr_in serv_addr;
148   ndb_socket_len_t addr_len = sizeof(serv_addr);
149   if(ndb_getsockname(sock, (struct sockaddr *) &serv_addr, &addr_len))
150   {
151     ndbout_c("An error occurred while trying to find out what"
152 	     " port we bound to. Error: %d - %s",
153              ndb_socket_errno(), strerror(ndb_socket_errno()));
154     ndb_socket_close(sock);
155     DBUG_RETURN(false);
156   }
157   *port = ntohs(serv_addr.sin_port);
158   setOwnProcessInfoServerAddress(& serv_addr.sin_addr);
159 
160   DBUG_PRINT("info",("bound to %u", *port));
161 
162   if (ndb_listen(sock, m_maxSessions > MAX_SOCKET_SERVER_TCP_BACKLOG ?
163                       MAX_SOCKET_SERVER_TCP_BACKLOG : m_maxSessions) == -1)
164   {
165     DBUG_PRINT("error",("listen() - %d - %s",
166 			socket_errno, strerror(socket_errno)));
167     ndb_socket_close(sock);
168     DBUG_RETURN(false);
169   }
170 
171   DEBUG_FPRINTF((stderr, "Listening on port: %u\n",
172                 (Uint32)*port));
173 
174   ServiceInstance i;
175   i.m_socket = sock;
176   i.m_service = service;
177   m_services.push_back(i);
178 
179   // Increase size to allow polling all listening ports
180   m_services_poller.set_max_count(m_services.size());
181 
182   DBUG_RETURN(true);
183 }
184 
185 bool
doAccept()186 SocketServer::doAccept()
187 {
188   m_services.lock();
189 
190   m_services_poller.clear();
191   for (unsigned i = 0; i < m_services.size(); i++)
192   {
193     m_services_poller.add(m_services[i].m_socket, true, false, true);
194   }
195   assert(m_services.size() == m_services_poller.count());
196 
197   const int accept_timeout_ms = 1000;
198   const int ret = m_services_poller.poll(accept_timeout_ms);
199   if (ret < 0)
200   {
201     // Error occurred, indicate error to caller by returning false
202     m_services.unlock();
203     return false;
204   }
205 
206   if (ret == 0)
207   {
208     // Timeout occurred
209     m_services.unlock();
210     return true;
211   }
212 
213   bool result = true;
214   for (unsigned i = 0; i < m_services_poller.count(); i++)
215   {
216     const bool has_read = m_services_poller.has_read(i);
217 
218     if (!has_read)
219       continue; // Ignore events where read flag wasn't set
220 
221     ServiceInstance & si = m_services[i];
222     assert(m_services_poller.is_socket_equal(i, si.m_socket));
223 
224     const NDB_SOCKET_TYPE childSock = ndb_accept(si.m_socket, 0, 0);
225     if (!ndb_socket_valid(childSock))
226     {
227       // Could not 'accept' socket(maybe at max fds), indicate error
228       // to caller by returning false
229       result = false;
230       continue;
231     }
232 
233     SessionInstance s;
234     s.m_service = si.m_service;
235     s.m_session = si.m_service->newSession(childSock);
236     if (s.m_session != 0)
237     {
238       m_session_mutex.lock();
239       m_sessions.push_back(s);
240       startSession(m_sessions.back());
241       m_session_mutex.unlock();
242     }
243   }
244 
245   m_services.unlock();
246   return result;
247 }
248 
249 extern "C"
250 void*
socketServerThread_C(void * _ss)251 socketServerThread_C(void* _ss){
252   SocketServer * ss = (SocketServer *)_ss;
253   ss->doRun();
254   return 0;
255 }
256 
257 struct NdbThread*
startServer()258 SocketServer::startServer()
259 {
260   m_threadLock.lock();
261   if(m_thread == 0 && m_stopThread == false)
262   {
263     m_thread = NdbThread_Create(socketServerThread_C,
264 				(void**)this,
265                                 0, // default stack size
266 				"NdbSockServ",
267 				NDB_THREAD_PRIO_LOW);
268   }
269   m_threadLock.unlock();
270   return m_thread;
271 }
272 
273 void
stopServer()274 SocketServer::stopServer(){
275   m_threadLock.lock();
276   if(m_thread != 0){
277     m_stopThread = true;
278 
279     void * res;
280     NdbThread_WaitFor(m_thread, &res);
281     NdbThread_Destroy(&m_thread);
282     m_thread = 0;
283   }
284   m_threadLock.unlock();
285 }
286 
287 void
doRun()288 SocketServer::doRun(){
289 
290   while(!m_stopThread){
291     m_session_mutex.lock();
292     checkSessionsImpl();
293     m_session_mutex.unlock();
294 
295     if(m_sessions.size() >= m_maxSessions){
296       // Don't accept more connections yet
297       DEBUG_FPRINTF((stderr, "Too many connections\n"));
298       NdbSleep_MilliSleep(200);
299       continue;
300     }
301 
302     if (!doAccept()){
303       // accept failed, step back
304       DEBUG_FPRINTF((stderr, "Accept failed\n"));
305       NdbSleep_MilliSleep(200);
306     }
307   }
308 }
309 
310 void
startSession(SessionInstance & si)311 SocketServer::startSession(SessionInstance & si){
312   si.m_thread = NdbThread_Create(sessionThread_C,
313 				 (void**)si.m_session,
314                                  0, // default stack size
315 				 "NdbSock_Session",
316 				 NDB_THREAD_PRIO_LOW);
317 }
318 
319 void
foreachSession(void (* func)(SocketServer::Session *,void *),void * data)320 SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *),
321                              void *data)
322 {
323   // Build a list of pointers to all active sessions
324   // and increase refcount on the sessions
325   m_session_mutex.lock();
326   Vector<Session*> session_pointers(m_sessions.size());
327   for(unsigned i= 0; i < m_sessions.size(); i++){
328     Session* session= m_sessions[i].m_session;
329     session_pointers.push_back(session);
330     session->m_refCount++;
331   }
332   m_session_mutex.unlock();
333 
334   // Call the function on each session
335   for(unsigned i= 0; i < session_pointers.size(); i++){
336     (*func)(session_pointers[i], data);
337   }
338 
339   // Release the sessions pointers and any stopped sessions
340   m_session_mutex.lock();
341   for(unsigned i= 0; i < session_pointers.size(); i++){
342     Session* session= session_pointers[i];
343     assert(session->m_refCount > 0);
344     session->m_refCount--;
345   }
346   checkSessionsImpl();
347   m_session_mutex.unlock();
348 }
349 
350 void
checkSessions()351 SocketServer::checkSessions()
352 {
353   m_session_mutex.lock();
354   checkSessionsImpl();
355   m_session_mutex.unlock();
356 }
357 
358 void
checkSessionsImpl()359 SocketServer::checkSessionsImpl()
360 {
361   for(int i = m_sessions.size() - 1; i >= 0; i--)
362   {
363     if(m_sessions[i].m_session->m_thread_stopped &&
364        (m_sessions[i].m_session->m_refCount == 0))
365     {
366       if(m_sessions[i].m_thread != 0)
367       {
368 	void* ret;
369 	NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
370 	NdbThread_Destroy(&m_sessions[i].m_thread);
371       }
372       m_sessions[i].m_session->stopSession();
373       delete m_sessions[i].m_session;
374       m_sessions.erase(i);
375     }
376   }
377 }
378 
379 bool
stopSessions(bool wait,unsigned wait_timeout)380 SocketServer::stopSessions(bool wait, unsigned wait_timeout){
381   int i;
382   m_session_mutex.lock();
383   for(i = m_sessions.size() - 1; i>=0; i--)
384   {
385     m_sessions[i].m_session->stopSession();
386   }
387   m_session_mutex.unlock();
388 
389   for(i = m_services.size() - 1; i>=0; i--)
390     m_services[i].m_service->stopSessions();
391 
392   if(!wait)
393     return false; // No wait
394 
395   const NDB_TICKS start = NdbTick_getCurrentTicks();
396   m_session_mutex.lock();
397   while(m_sessions.size() > 0){
398     checkSessionsImpl();
399     m_session_mutex.unlock();
400 
401     if (wait_timeout > 0 &&
402         NdbTick_Elapsed(start,NdbTick_getCurrentTicks()).milliSec() > wait_timeout)
403       return false; // Wait abandoned
404 
405     NdbSleep_MilliSleep(100);
406     m_session_mutex.lock();
407   }
408   m_session_mutex.unlock();
409   return true; // All sessions gone
410 }
411 
412 
413 /***** Session code ******/
414 
415 extern "C"
416 void*
sessionThread_C(void * _sc)417 sessionThread_C(void* _sc){
418   SocketServer::Session * si = (SocketServer::Session *)_sc;
419 
420   assert(si->m_thread_stopped == false);
421 
422   if(!si->m_stop)
423     si->runSession();
424   else
425   {
426     ndb_socket_close(si->m_socket);
427     ndb_socket_invalidate(&si->m_socket);
428   }
429 
430   // Mark the thread as stopped to allow the
431   // session resources to be released
432   si->m_thread_stopped = true;
433   return 0;
434 }
435 
436 template class MutexVector<SocketServer::ServiceInstance>;
437 template class Vector<SocketServer::SessionInstance>;
438 template class Vector<SocketServer::Session*>;
439