1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <ndb_global.h>
27 
28 #include <SocketServer.hpp>
29 
30 #include <NdbTCP.h>
31 #include <NdbOut.hpp>
32 #include <NdbThread.h>
33 #include <NdbSleep.h>
34 #include <NdbTick.h>
35 
SocketServer(unsigned maxSessions)36 SocketServer::SocketServer(unsigned maxSessions) :
37   m_sessions(10),
38   m_services(5),
39   m_maxSessions(maxSessions),
40   m_stopThread(false),
41   m_thread(0)
42 {
43 }
44 
~SocketServer()45 SocketServer::~SocketServer() {
46   unsigned i;
47   for(i = 0; i<m_sessions.size(); i++){
48     Session* session= m_sessions[i].m_session;
49     assert(session->m_refCount == 0);
50     delete session;
51   }
52   for(i = 0; i<m_services.size(); i++){
53     if(my_socket_valid(m_services[i].m_socket))
54       my_socket_close(m_services[i].m_socket);
55     delete m_services[i].m_service;
56   }
57 }
58 
59 bool
tryBind(unsigned short port,const char * intface)60 SocketServer::tryBind(unsigned short port, const char * intface) {
61   struct sockaddr_in servaddr;
62   memset(&servaddr, 0, sizeof(servaddr));
63   servaddr.sin_family = AF_INET;
64   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
65   servaddr.sin_port = htons(port);
66 
67   if(intface != 0){
68     if(Ndb_getInAddr(&servaddr.sin_addr, intface))
69       return false;
70   }
71 
72   const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
73   if (!my_socket_valid(sock))
74     return false;
75 
76   DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
77                      MY_SOCKET_FORMAT_VALUE(sock)));
78 
79   if (my_socket_reuseaddr(sock, true) == -1)
80   {
81     NDB_CLOSE_SOCKET(sock);
82     return false;
83   }
84 
85   if (my_bind_inet(sock, &servaddr) == -1) {
86     NDB_CLOSE_SOCKET(sock);
87     return false;
88   }
89 
90   NDB_CLOSE_SOCKET(sock);
91   return true;
92 }
93 
94 bool
setup(SocketServer::Service * service,unsigned short * port,const char * intface)95 SocketServer::setup(SocketServer::Service * service,
96 		    unsigned short * port,
97 		    const char * intface){
98   DBUG_ENTER("SocketServer::setup");
99   DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
100   struct sockaddr_in servaddr;
101   memset(&servaddr, 0, sizeof(servaddr));
102   servaddr.sin_family = AF_INET;
103   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
104   servaddr.sin_port = htons(*port);
105 
106   if(intface != 0){
107     if(Ndb_getInAddr(&servaddr.sin_addr, intface))
108       DBUG_RETURN(false);
109   }
110 
111   const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
112   if (!my_socket_valid(sock))
113   {
114     DBUG_PRINT("error",("socket() - %d - %s",
115 			socket_errno, strerror(socket_errno)));
116     DBUG_RETURN(false);
117   }
118 
119   DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
120                      MY_SOCKET_FORMAT_VALUE(sock)));
121 
122   if (my_socket_reuseaddr(sock, true) == -1)
123   {
124     DBUG_PRINT("error",("setsockopt() - %d - %s",
125 			errno, strerror(errno)));
126     NDB_CLOSE_SOCKET(sock);
127     DBUG_RETURN(false);
128   }
129 
130   if (my_bind_inet(sock, &servaddr) == -1) {
131     DBUG_PRINT("error",("bind() - %d - %s",
132 			socket_errno, strerror(socket_errno)));
133     NDB_CLOSE_SOCKET(sock);
134     DBUG_RETURN(false);
135   }
136 
137   /* Get the port we bound to */
138   if(my_socket_get_port(sock, port))
139   {
140     ndbout_c("An error occurred while trying to find out what"
141 	     " port we bound to. Error: %d - %s",
142              socket_errno, strerror(socket_errno));
143     my_socket_close(sock);
144     DBUG_RETURN(false);
145   }
146 
147   DBUG_PRINT("info",("bound to %u", *port));
148 
149   if (my_listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1)
150   {
151     DBUG_PRINT("error",("listen() - %d - %s",
152 			socket_errno, strerror(socket_errno)));
153     my_socket_close(sock);
154     DBUG_RETURN(false);
155   }
156 
157   ServiceInstance i;
158   i.m_socket = sock;
159   i.m_service = service;
160   m_services.push_back(i);
161 
162   // Increase size to allow polling all listening ports
163   m_services_poller.set_max_count(m_services.size());
164 
165   DBUG_RETURN(true);
166 }
167 
168 
169 bool
doAccept()170 SocketServer::doAccept()
171 {
172   m_services.lock();
173 
174   m_services_poller.clear();
175   for (unsigned i = 0; i < m_services.size(); i++)
176   {
177     m_services_poller.add(m_services[i].m_socket, true, false, true);
178   }
179   assert(m_services.size() == m_services_poller.count());
180 
181   const int accept_timeout_ms = 1000;
182   const int ret = m_services_poller.poll(accept_timeout_ms);
183   if (ret < 0)
184   {
185     // Error occured, indicate error to caller by returning false
186     m_services.unlock();
187     return false;
188   }
189 
190   if (ret == 0)
191   {
192     // Timeout occured
193     m_services.unlock();
194     return true;
195   }
196 
197   bool result = true;
198   for (unsigned i = 0; i < m_services_poller.count(); i++)
199   {
200     const bool has_read = m_services_poller.has_read(i);
201 
202     if (!has_read)
203       continue; // Ignore events where read flag wasn't set
204 
205     ServiceInstance & si = m_services[i];
206     assert(m_services_poller.is_socket_equal(i, si.m_socket));
207 
208     const NDB_SOCKET_TYPE childSock = my_accept(si.m_socket, 0, 0);
209     if (!my_socket_valid(childSock))
210     {
211       // Could not 'accept' socket(maybe at max fds), indicate error
212       // to caller by returning false
213       result = false;
214       continue;
215     }
216 
217     SessionInstance s;
218     s.m_service = si.m_service;
219     s.m_session = si.m_service->newSession(childSock);
220     if (s.m_session != 0)
221     {
222       m_session_mutex.lock();
223       m_sessions.push_back(s);
224       startSession(m_sessions.back());
225       m_session_mutex.unlock();
226     }
227   }
228 
229   m_services.unlock();
230   return result;
231 }
232 
233 extern "C"
234 void*
socketServerThread_C(void * _ss)235 socketServerThread_C(void* _ss){
236   SocketServer * ss = (SocketServer *)_ss;
237   ss->doRun();
238   return 0;
239 }
240 
241 struct NdbThread*
startServer()242 SocketServer::startServer()
243 {
244   m_threadLock.lock();
245   if(m_thread == 0 && m_stopThread == false)
246   {
247     m_thread = NdbThread_Create(socketServerThread_C,
248 				(void**)this,
249                                 0, // default stack size
250 				"NdbSockServ",
251 				NDB_THREAD_PRIO_LOW);
252   }
253   m_threadLock.unlock();
254   return m_thread;
255 }
256 
257 void
stopServer()258 SocketServer::stopServer(){
259   m_threadLock.lock();
260   if(m_thread != 0){
261     m_stopThread = true;
262 
263     void * res;
264     NdbThread_WaitFor(m_thread, &res);
265     NdbThread_Destroy(&m_thread);
266     m_thread = 0;
267   }
268   m_threadLock.unlock();
269 }
270 
271 void
doRun()272 SocketServer::doRun(){
273 
274   while(!m_stopThread){
275     m_session_mutex.lock();
276     checkSessionsImpl();
277     m_session_mutex.unlock();
278 
279     if(m_sessions.size() >= m_maxSessions){
280       // Don't accept more connections yet
281       NdbSleep_MilliSleep(200);
282       continue;
283     }
284 
285     if (!doAccept()){
286       // accept failed, step back
287       NdbSleep_MilliSleep(200);
288     }
289   }
290 }
291 
292 void
startSession(SessionInstance & si)293 SocketServer::startSession(SessionInstance & si){
294   si.m_thread = NdbThread_Create(sessionThread_C,
295 				 (void**)si.m_session,
296                                  0, // default stack size
297 				 "NdbSock_Session",
298 				 NDB_THREAD_PRIO_LOW);
299 }
300 
301 void
foreachSession(void (* func)(SocketServer::Session *,void *),void * data)302 SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *),
303                              void *data)
304 {
305   // Build a list of pointers to all active sessions
306   // and increase refcount on the sessions
307   m_session_mutex.lock();
308   Vector<Session*> session_pointers(m_sessions.size());
309   for(unsigned i= 0; i < m_sessions.size(); i++){
310     Session* session= m_sessions[i].m_session;
311     session_pointers.push_back(session);
312     session->m_refCount++;
313   }
314   m_session_mutex.unlock();
315 
316   // Call the function on each session
317   for(unsigned i= 0; i < session_pointers.size(); i++){
318     (*func)(session_pointers[i], data);
319   }
320 
321   // Release the sessions pointers and any stopped sessions
322   m_session_mutex.lock();
323   for(unsigned i= 0; i < session_pointers.size(); i++){
324     Session* session= session_pointers[i];
325     assert(session->m_refCount > 0);
326     session->m_refCount--;
327   }
328   checkSessionsImpl();
329   m_session_mutex.unlock();
330 }
331 
332 void
checkSessions()333 SocketServer::checkSessions()
334 {
335   m_session_mutex.lock();
336   checkSessionsImpl();
337   m_session_mutex.unlock();
338 }
339 
340 void
checkSessionsImpl()341 SocketServer::checkSessionsImpl()
342 {
343   for(int i = m_sessions.size() - 1; i >= 0; i--)
344   {
345     if(m_sessions[i].m_session->m_thread_stopped &&
346        (m_sessions[i].m_session->m_refCount == 0))
347     {
348       if(m_sessions[i].m_thread != 0)
349       {
350 	void* ret;
351 	NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
352 	NdbThread_Destroy(&m_sessions[i].m_thread);
353       }
354       m_sessions[i].m_session->stopSession();
355       delete m_sessions[i].m_session;
356       m_sessions.erase(i);
357     }
358   }
359 }
360 
361 bool
stopSessions(bool wait,unsigned wait_timeout)362 SocketServer::stopSessions(bool wait, unsigned wait_timeout){
363   int i;
364   m_session_mutex.lock();
365   for(i = m_sessions.size() - 1; i>=0; i--)
366   {
367     m_sessions[i].m_session->stopSession();
368   }
369   m_session_mutex.unlock();
370 
371   for(i = m_services.size() - 1; i>=0; i--)
372     m_services[i].m_service->stopSessions();
373 
374   if(!wait)
375     return false; // No wait
376 
377   NDB_TICKS start = NdbTick_CurrentMillisecond();
378   m_session_mutex.lock();
379   while(m_sessions.size() > 0){
380     checkSessionsImpl();
381     m_session_mutex.unlock();
382 
383     if (wait_timeout > 0 &&
384         (NdbTick_CurrentMillisecond() - start) > wait_timeout)
385       return false; // Wait abandoned
386 
387     NdbSleep_MilliSleep(100);
388     m_session_mutex.lock();
389   }
390   m_session_mutex.unlock();
391   return true; // All sessions gone
392 }
393 
394 
395 /***** Session code ******/
396 
397 extern "C"
398 void*
sessionThread_C(void * _sc)399 sessionThread_C(void* _sc){
400   SocketServer::Session * si = (SocketServer::Session *)_sc;
401 
402   assert(si->m_thread_stopped == false);
403 
404   if(!si->m_stop)
405     si->runSession();
406   else
407     NDB_CLOSE_SOCKET(si->m_socket);
408 
409   // Mark the thread as stopped to allow the
410   // session resources to be released
411   si->m_thread_stopped = true;
412   return 0;
413 }
414 
415 template class MutexVector<SocketServer::ServiceInstance>;
416 template class Vector<SocketServer::SessionInstance>;
417 template class Vector<SocketServer::Session*>;
418