1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <ndb_global.h>
27
28 #include <SocketServer.hpp>
29
30 #include <NdbTCP.h>
31 #include <NdbOut.hpp>
32 #include <NdbThread.h>
33 #include <NdbSleep.h>
34 #include <NdbTick.h>
35
SocketServer(unsigned maxSessions)36 SocketServer::SocketServer(unsigned maxSessions) :
37 m_sessions(10),
38 m_services(5),
39 m_maxSessions(maxSessions),
40 m_stopThread(false),
41 m_thread(0)
42 {
43 }
44
~SocketServer()45 SocketServer::~SocketServer() {
46 unsigned i;
47 for(i = 0; i<m_sessions.size(); i++){
48 Session* session= m_sessions[i].m_session;
49 assert(session->m_refCount == 0);
50 delete session;
51 }
52 for(i = 0; i<m_services.size(); i++){
53 if(my_socket_valid(m_services[i].m_socket))
54 my_socket_close(m_services[i].m_socket);
55 delete m_services[i].m_service;
56 }
57 }
58
59 bool
tryBind(unsigned short port,const char * intface)60 SocketServer::tryBind(unsigned short port, const char * intface) {
61 struct sockaddr_in servaddr;
62 memset(&servaddr, 0, sizeof(servaddr));
63 servaddr.sin_family = AF_INET;
64 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
65 servaddr.sin_port = htons(port);
66
67 if(intface != 0){
68 if(Ndb_getInAddr(&servaddr.sin_addr, intface))
69 return false;
70 }
71
72 const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
73 if (!my_socket_valid(sock))
74 return false;
75
76 DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
77 MY_SOCKET_FORMAT_VALUE(sock)));
78
79 if (my_socket_reuseaddr(sock, true) == -1)
80 {
81 NDB_CLOSE_SOCKET(sock);
82 return false;
83 }
84
85 if (my_bind_inet(sock, &servaddr) == -1) {
86 NDB_CLOSE_SOCKET(sock);
87 return false;
88 }
89
90 NDB_CLOSE_SOCKET(sock);
91 return true;
92 }
93
94 bool
setup(SocketServer::Service * service,unsigned short * port,const char * intface)95 SocketServer::setup(SocketServer::Service * service,
96 unsigned short * port,
97 const char * intface){
98 DBUG_ENTER("SocketServer::setup");
99 DBUG_PRINT("enter",("interface=%s, port=%u", intface, *port));
100 struct sockaddr_in servaddr;
101 memset(&servaddr, 0, sizeof(servaddr));
102 servaddr.sin_family = AF_INET;
103 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
104 servaddr.sin_port = htons(*port);
105
106 if(intface != 0){
107 if(Ndb_getInAddr(&servaddr.sin_addr, intface))
108 DBUG_RETURN(false);
109 }
110
111 const NDB_SOCKET_TYPE sock = my_socket_create(AF_INET, SOCK_STREAM, 0);
112 if (!my_socket_valid(sock))
113 {
114 DBUG_PRINT("error",("socket() - %d - %s",
115 socket_errno, strerror(socket_errno)));
116 DBUG_RETURN(false);
117 }
118
119 DBUG_PRINT("info",("NDB_SOCKET: " MY_SOCKET_FORMAT,
120 MY_SOCKET_FORMAT_VALUE(sock)));
121
122 if (my_socket_reuseaddr(sock, true) == -1)
123 {
124 DBUG_PRINT("error",("setsockopt() - %d - %s",
125 errno, strerror(errno)));
126 NDB_CLOSE_SOCKET(sock);
127 DBUG_RETURN(false);
128 }
129
130 if (my_bind_inet(sock, &servaddr) == -1) {
131 DBUG_PRINT("error",("bind() - %d - %s",
132 socket_errno, strerror(socket_errno)));
133 NDB_CLOSE_SOCKET(sock);
134 DBUG_RETURN(false);
135 }
136
137 /* Get the port we bound to */
138 if(my_socket_get_port(sock, port))
139 {
140 ndbout_c("An error occurred while trying to find out what"
141 " port we bound to. Error: %d - %s",
142 socket_errno, strerror(socket_errno));
143 my_socket_close(sock);
144 DBUG_RETURN(false);
145 }
146
147 DBUG_PRINT("info",("bound to %u", *port));
148
149 if (my_listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1)
150 {
151 DBUG_PRINT("error",("listen() - %d - %s",
152 socket_errno, strerror(socket_errno)));
153 my_socket_close(sock);
154 DBUG_RETURN(false);
155 }
156
157 ServiceInstance i;
158 i.m_socket = sock;
159 i.m_service = service;
160 m_services.push_back(i);
161
162 // Increase size to allow polling all listening ports
163 m_services_poller.set_max_count(m_services.size());
164
165 DBUG_RETURN(true);
166 }
167
168
169 bool
doAccept()170 SocketServer::doAccept()
171 {
172 m_services.lock();
173
174 m_services_poller.clear();
175 for (unsigned i = 0; i < m_services.size(); i++)
176 {
177 m_services_poller.add(m_services[i].m_socket, true, false, true);
178 }
179 assert(m_services.size() == m_services_poller.count());
180
181 const int accept_timeout_ms = 1000;
182 const int ret = m_services_poller.poll(accept_timeout_ms);
183 if (ret < 0)
184 {
185 // Error occured, indicate error to caller by returning false
186 m_services.unlock();
187 return false;
188 }
189
190 if (ret == 0)
191 {
192 // Timeout occured
193 m_services.unlock();
194 return true;
195 }
196
197 bool result = true;
198 for (unsigned i = 0; i < m_services_poller.count(); i++)
199 {
200 const bool has_read = m_services_poller.has_read(i);
201
202 if (!has_read)
203 continue; // Ignore events where read flag wasn't set
204
205 ServiceInstance & si = m_services[i];
206 assert(m_services_poller.is_socket_equal(i, si.m_socket));
207
208 const NDB_SOCKET_TYPE childSock = my_accept(si.m_socket, 0, 0);
209 if (!my_socket_valid(childSock))
210 {
211 // Could not 'accept' socket(maybe at max fds), indicate error
212 // to caller by returning false
213 result = false;
214 continue;
215 }
216
217 SessionInstance s;
218 s.m_service = si.m_service;
219 s.m_session = si.m_service->newSession(childSock);
220 if (s.m_session != 0)
221 {
222 m_session_mutex.lock();
223 m_sessions.push_back(s);
224 startSession(m_sessions.back());
225 m_session_mutex.unlock();
226 }
227 }
228
229 m_services.unlock();
230 return result;
231 }
232
233 extern "C"
234 void*
socketServerThread_C(void * _ss)235 socketServerThread_C(void* _ss){
236 SocketServer * ss = (SocketServer *)_ss;
237 ss->doRun();
238 return 0;
239 }
240
241 struct NdbThread*
startServer()242 SocketServer::startServer()
243 {
244 m_threadLock.lock();
245 if(m_thread == 0 && m_stopThread == false)
246 {
247 m_thread = NdbThread_Create(socketServerThread_C,
248 (void**)this,
249 0, // default stack size
250 "NdbSockServ",
251 NDB_THREAD_PRIO_LOW);
252 }
253 m_threadLock.unlock();
254 return m_thread;
255 }
256
257 void
stopServer()258 SocketServer::stopServer(){
259 m_threadLock.lock();
260 if(m_thread != 0){
261 m_stopThread = true;
262
263 void * res;
264 NdbThread_WaitFor(m_thread, &res);
265 NdbThread_Destroy(&m_thread);
266 m_thread = 0;
267 }
268 m_threadLock.unlock();
269 }
270
271 void
doRun()272 SocketServer::doRun(){
273
274 while(!m_stopThread){
275 m_session_mutex.lock();
276 checkSessionsImpl();
277 m_session_mutex.unlock();
278
279 if(m_sessions.size() >= m_maxSessions){
280 // Don't accept more connections yet
281 NdbSleep_MilliSleep(200);
282 continue;
283 }
284
285 if (!doAccept()){
286 // accept failed, step back
287 NdbSleep_MilliSleep(200);
288 }
289 }
290 }
291
292 void
startSession(SessionInstance & si)293 SocketServer::startSession(SessionInstance & si){
294 si.m_thread = NdbThread_Create(sessionThread_C,
295 (void**)si.m_session,
296 0, // default stack size
297 "NdbSock_Session",
298 NDB_THREAD_PRIO_LOW);
299 }
300
301 void
foreachSession(void (* func)(SocketServer::Session *,void *),void * data)302 SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *),
303 void *data)
304 {
305 // Build a list of pointers to all active sessions
306 // and increase refcount on the sessions
307 m_session_mutex.lock();
308 Vector<Session*> session_pointers(m_sessions.size());
309 for(unsigned i= 0; i < m_sessions.size(); i++){
310 Session* session= m_sessions[i].m_session;
311 session_pointers.push_back(session);
312 session->m_refCount++;
313 }
314 m_session_mutex.unlock();
315
316 // Call the function on each session
317 for(unsigned i= 0; i < session_pointers.size(); i++){
318 (*func)(session_pointers[i], data);
319 }
320
321 // Release the sessions pointers and any stopped sessions
322 m_session_mutex.lock();
323 for(unsigned i= 0; i < session_pointers.size(); i++){
324 Session* session= session_pointers[i];
325 assert(session->m_refCount > 0);
326 session->m_refCount--;
327 }
328 checkSessionsImpl();
329 m_session_mutex.unlock();
330 }
331
332 void
checkSessions()333 SocketServer::checkSessions()
334 {
335 m_session_mutex.lock();
336 checkSessionsImpl();
337 m_session_mutex.unlock();
338 }
339
340 void
checkSessionsImpl()341 SocketServer::checkSessionsImpl()
342 {
343 for(int i = m_sessions.size() - 1; i >= 0; i--)
344 {
345 if(m_sessions[i].m_session->m_thread_stopped &&
346 (m_sessions[i].m_session->m_refCount == 0))
347 {
348 if(m_sessions[i].m_thread != 0)
349 {
350 void* ret;
351 NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
352 NdbThread_Destroy(&m_sessions[i].m_thread);
353 }
354 m_sessions[i].m_session->stopSession();
355 delete m_sessions[i].m_session;
356 m_sessions.erase(i);
357 }
358 }
359 }
360
361 bool
stopSessions(bool wait,unsigned wait_timeout)362 SocketServer::stopSessions(bool wait, unsigned wait_timeout){
363 int i;
364 m_session_mutex.lock();
365 for(i = m_sessions.size() - 1; i>=0; i--)
366 {
367 m_sessions[i].m_session->stopSession();
368 }
369 m_session_mutex.unlock();
370
371 for(i = m_services.size() - 1; i>=0; i--)
372 m_services[i].m_service->stopSessions();
373
374 if(!wait)
375 return false; // No wait
376
377 NDB_TICKS start = NdbTick_CurrentMillisecond();
378 m_session_mutex.lock();
379 while(m_sessions.size() > 0){
380 checkSessionsImpl();
381 m_session_mutex.unlock();
382
383 if (wait_timeout > 0 &&
384 (NdbTick_CurrentMillisecond() - start) > wait_timeout)
385 return false; // Wait abandoned
386
387 NdbSleep_MilliSleep(100);
388 m_session_mutex.lock();
389 }
390 m_session_mutex.unlock();
391 return true; // All sessions gone
392 }
393
394
395 /***** Session code ******/
396
397 extern "C"
398 void*
sessionThread_C(void * _sc)399 sessionThread_C(void* _sc){
400 SocketServer::Session * si = (SocketServer::Session *)_sc;
401
402 assert(si->m_thread_stopped == false);
403
404 if(!si->m_stop)
405 si->runSession();
406 else
407 NDB_CLOSE_SOCKET(si->m_socket);
408
409 // Mark the thread as stopped to allow the
410 // session resources to be released
411 si->m_thread_stopped = true;
412 return 0;
413 }
414
415 template class MutexVector<SocketServer::ServiceInstance>;
416 template class Vector<SocketServer::SessionInstance>;
417 template class Vector<SocketServer::Session*>;
418