1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <cstring>
23 #include <sys/types.h>
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif
27 #ifdef HAVE_SYS_UN_H
28 #include <sys/un.h>
29 #endif
30 #ifdef HAVE_SYS_POLL_H
31 #include <sys/poll.h>
32 #endif
33 #ifdef HAVE_NETINET_IN_H
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #endif
37 #ifdef HAVE_NETDB_H
38 #include <netdb.h>
39 #endif
40 #include <fcntl.h>
41 #ifdef HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44
45 #include <thrift/transport/TSocket.h>
46 #include <thrift/transport/TServerSocket.h>
47 #include <thrift/transport/PlatformSocket.h>
48 #include <boost/shared_ptr.hpp>
49
50 #ifndef AF_LOCAL
51 #define AF_LOCAL AF_UNIX
52 #endif
53
54 #ifndef SOCKOPT_CAST_T
55 # ifndef _WIN32
56 # define SOCKOPT_CAST_T void
57 # else
58 # define SOCKOPT_CAST_T char
59 # endif // _WIN32
60 #endif
61
62 template<class T>
const_cast_sockopt(const T * v)63 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v)
64 {
65 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
66 }
67
68 template<class T>
cast_sockopt(T * v)69 inline SOCKOPT_CAST_T* cast_sockopt(T* v)
70 {
71 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
72 }
73
74 namespace apache
75 {
76 namespace thrift
77 {
78 namespace transport
79 {
80
81 using namespace std;
82 using boost::shared_ptr;
83
TServerSocket(int port)84 TServerSocket::TServerSocket(int port) :
85 port_(port),
86 serverSocket_(THRIFT_INVALID_SOCKET),
87 acceptBacklog_(DEFAULT_BACKLOG),
88 sendTimeout_(0),
89 recvTimeout_(0),
90 accTimeout_(-1),
91 retryLimit_(0),
92 retryDelay_(0),
93 tcpSendBuffer_(0),
94 tcpRecvBuffer_(0),
95 intSock1_(THRIFT_INVALID_SOCKET),
96 intSock2_(THRIFT_INVALID_SOCKET) {}
97
TServerSocket(int port,int sendTimeout,int recvTimeout)98 TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
99 port_(port),
100 serverSocket_(THRIFT_INVALID_SOCKET),
101 acceptBacklog_(DEFAULT_BACKLOG),
102 sendTimeout_(sendTimeout),
103 recvTimeout_(recvTimeout),
104 accTimeout_(-1),
105 retryLimit_(0),
106 retryDelay_(0),
107 tcpSendBuffer_(0),
108 tcpRecvBuffer_(0),
109 intSock1_(THRIFT_INVALID_SOCKET),
110 intSock2_(THRIFT_INVALID_SOCKET) {}
111
TServerSocket(string path)112 TServerSocket::TServerSocket(string path) :
113 port_(0),
114 path_(path),
115 serverSocket_(THRIFT_INVALID_SOCKET),
116 acceptBacklog_(DEFAULT_BACKLOG),
117 sendTimeout_(0),
118 recvTimeout_(0),
119 accTimeout_(-1),
120 retryLimit_(0),
121 retryDelay_(0),
122 tcpSendBuffer_(0),
123 tcpRecvBuffer_(0),
124 intSock1_(THRIFT_INVALID_SOCKET),
125 intSock2_(THRIFT_INVALID_SOCKET) {}
126
~TServerSocket()127 TServerSocket::~TServerSocket()
128 {
129 close();
130 }
131
setSendTimeout(int sendTimeout)132 void TServerSocket::setSendTimeout(int sendTimeout)
133 {
134 sendTimeout_ = sendTimeout;
135 }
136
setRecvTimeout(int recvTimeout)137 void TServerSocket::setRecvTimeout(int recvTimeout)
138 {
139 recvTimeout_ = recvTimeout;
140 }
141
setAcceptTimeout(int accTimeout)142 void TServerSocket::setAcceptTimeout(int accTimeout)
143 {
144 accTimeout_ = accTimeout;
145 }
146
setAcceptBacklog(int accBacklog)147 void TServerSocket::setAcceptBacklog(int accBacklog)
148 {
149 acceptBacklog_ = accBacklog;
150 }
151
setRetryLimit(int retryLimit)152 void TServerSocket::setRetryLimit(int retryLimit)
153 {
154 retryLimit_ = retryLimit;
155 }
156
setRetryDelay(int retryDelay)157 void TServerSocket::setRetryDelay(int retryDelay)
158 {
159 retryDelay_ = retryDelay;
160 }
161
setTcpSendBuffer(int tcpSendBuffer)162 void TServerSocket::setTcpSendBuffer(int tcpSendBuffer)
163 {
164 tcpSendBuffer_ = tcpSendBuffer;
165 }
166
setTcpRecvBuffer(int tcpRecvBuffer)167 void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer)
168 {
169 tcpRecvBuffer_ = tcpRecvBuffer;
170 }
171
listen()172 void TServerSocket::listen()
173 {
174 THRIFT_SOCKET sv[2];
175
176 if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv))
177 {
178 GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR);
179 intSock1_ = THRIFT_INVALID_SOCKET;
180 intSock2_ = THRIFT_INVALID_SOCKET;
181 }
182 else
183 {
184 intSock1_ = sv[1];
185 intSock2_ = sv[0];
186 }
187
188 struct addrinfo hints, *res, *res0;
189
190 int error;
191
192 char port[sizeof("65536") + 1];
193
194 std::memset(&hints, 0, sizeof(hints));
195
196 hints.ai_family = PF_UNSPEC;
197
198 hints.ai_socktype = SOCK_STREAM;
199
200 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
201
202 sprintf(port, "%d", port_);
203
204 // Wildcard address
205 error = getaddrinfo(NULL, port, &hints, &res0);
206
207 if (error)
208 {
209 GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error));
210 close();
211 throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
212 }
213
214 // Pick the ipv6 address first since ipv4 addresses can be mapped
215 // into ipv6 space.
216 for (res = res0; res; res = res->ai_next)
217 {
218 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
219 break;
220 }
221
222 if (! path_.empty())
223 {
224 serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
225 }
226 else
227 {
228 serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
229 }
230
231 if (serverSocket_ == THRIFT_INVALID_SOCKET)
232 {
233 int errno_copy = THRIFT_GET_SOCKET_ERROR;
234 GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
235 close();
236 throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy);
237 }
238
239 // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
240 int one = 1;
241
242 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING,
243 cast_sockopt(&one), sizeof(one)))
244 {
245 //ignore errors coming out of this setsockopt on Windows. This is because
246 //SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
247 //want to force servers to be an admin.
248 #ifndef _WIN32
249 int errno_copy = THRIFT_GET_SOCKET_ERROR;
250 GlobalOutput.perror("TServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ", errno_copy);
251 close();
252 throw TTransportException(TTransportException::NOT_OPEN, "Could not set THRIFT_NO_SOCKET_CACHING", errno_copy);
253 #endif
254 }
255
256 // Set TCP buffer sizes
257 if (tcpSendBuffer_ > 0)
258 {
259 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF,
260 cast_sockopt(&tcpSendBuffer_), sizeof(tcpSendBuffer_)))
261 {
262 int errno_copy = THRIFT_GET_SOCKET_ERROR;
263 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
264 close();
265 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy);
266 }
267 }
268
269 if (tcpRecvBuffer_ > 0)
270 {
271 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF,
272 cast_sockopt(&tcpRecvBuffer_), sizeof(tcpRecvBuffer_)))
273 {
274 int errno_copy = THRIFT_GET_SOCKET_ERROR;
275 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
276 close();
277 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy);
278 }
279 }
280
281 // Defer accept
282 #ifdef TCP_DEFER_ACCEPT
283
284 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
285 &one, sizeof(one)))
286 {
287 int errno_copy = THRIFT_GET_SOCKET_ERROR;
288 GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
289 close();
290 throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy);
291 }
292
293 #endif // #ifdef TCP_DEFER_ACCEPT
294
295 #ifdef IPV6_V6ONLY
296
297 if (res->ai_family == AF_INET6 && path_.empty())
298 {
299 int zero = 0;
300
301 if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
302 cast_sockopt(&zero), sizeof(zero)))
303 {
304 GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
305 }
306 }
307
308 #endif // #ifdef IPV6_V6ONLY
309
310 // Turn linger off, don't want to block on calls to close
311 struct linger ling = {0, 0};
312
313 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
314 cast_sockopt(&ling), sizeof(ling)))
315 {
316 int errno_copy = THRIFT_GET_SOCKET_ERROR;
317 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
318 close();
319 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
320 }
321
322 // Unix Sockets do not need that
323 if (path_.empty())
324 {
325 // TCP Nodelay, speed over bandwidth
326 if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
327 cast_sockopt(&one), sizeof(one)))
328 {
329 int errno_copy = THRIFT_GET_SOCKET_ERROR;
330 GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
331 close();
332 throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
333 }
334 }
335
336 // Set NONBLOCK on the accept socket
337 int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
338
339 if (flags == -1)
340 {
341 int errno_copy = THRIFT_GET_SOCKET_ERROR;
342 GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
343 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
344 }
345
346 if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK))
347 {
348 int errno_copy = THRIFT_GET_SOCKET_ERROR;
349 GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
350 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
351 }
352
353 // prepare the port information
354 // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
355 // always seem to work. The client can configure the retry variables.
356 int retries = 0;
357
358 if (! path_.empty())
359 {
360
361 #ifndef _WIN32
362
363 // Unix Domain Socket
364 struct sockaddr_un address;
365 socklen_t len;
366
367 if (path_.length() > sizeof(address.sun_path))
368 {
369 int errno_copy = THRIFT_GET_SOCKET_ERROR;
370 GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy);
371 throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
372 }
373
374 address.sun_family = AF_UNIX;
375 THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
376 len = sizeof(address);
377
378 do
379 {
380 if (0 == ::bind(serverSocket_, (struct sockaddr*) &address, len))
381 {
382 break;
383 }
384
385 // use short circuit evaluation here to only sleep if we need to
386 }
387 while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
388
389 #else
390 GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
391 throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported");
392 #endif
393 }
394 else
395 {
396 do
397 {
398 if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen)))
399 {
400 break;
401 }
402
403 // use short circuit evaluation here to only sleep if we need to
404 }
405 while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
406
407 // free addrinfo
408 freeaddrinfo(res0);
409 }
410
411 // throw an error if we failed to bind properly
412 if (retries > retryLimit_)
413 {
414 char errbuf[1024];
415
416 if (! path_.empty())
417 {
418 sprintf(errbuf, "TServerSocket::listen() PATH %s", path_.c_str());
419 }
420 else
421 {
422 sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
423 }
424
425 GlobalOutput(errbuf);
426 close();
427 throw TTransportException(TTransportException::NOT_OPEN, "Could not bind",
428 THRIFT_GET_SOCKET_ERROR);
429 }
430
431 // Call listen
432 if (-1 == ::listen(serverSocket_, acceptBacklog_))
433 {
434 int errno_copy = THRIFT_GET_SOCKET_ERROR;
435 GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
436 close();
437 throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
438 }
439
440 // The socket is now listening!
441 }
442
acceptImpl()443 shared_ptr<TTransport> TServerSocket::acceptImpl()
444 {
445 if (serverSocket_ == THRIFT_INVALID_SOCKET)
446 {
447 throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
448 }
449
450 struct THRIFT_POLLFD fds[2];
451
452 int maxEintrs = 5;
453
454 int numEintrs = 0;
455
456 while (true)
457 {
458 std::memset(fds, 0, sizeof(fds));
459 fds[0].fd = serverSocket_;
460 fds[0].events = THRIFT_POLLIN;
461
462 if (intSock2_ != THRIFT_INVALID_SOCKET)
463 {
464 fds[1].fd = intSock2_;
465 fds[1].events = THRIFT_POLLIN;
466 }
467
468 /*
469 TODO: if THRIFT_EINTR is received, we'll restart the timeout.
470 To be accurate, we need to fix this in the future.
471 */
472 int ret = THRIFT_POLL(fds, 2, accTimeout_);
473
474 if (ret < 0)
475 {
476 // error cases
477 if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && (numEintrs++ < maxEintrs))
478 {
479 // THRIFT_EINTR needs to be handled manually and we can tolerate
480 // a certain number
481 continue;
482 }
483
484 int errno_copy = THRIFT_GET_SOCKET_ERROR;
485 GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_POLL() ", errno_copy);
486 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
487 }
488 else if (ret > 0)
489 {
490 // Check for an interrupt signal
491 if (intSock2_ != THRIFT_INVALID_SOCKET
492 && (fds[1].revents & THRIFT_POLLIN))
493 {
494 int8_t buf;
495
496 if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0))
497 {
498 GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", THRIFT_GET_SOCKET_ERROR);
499 }
500
501 throw TTransportException(TTransportException::INTERRUPTED);
502 }
503
504 // Check for the actual server socket being ready
505 if (fds[0].revents & THRIFT_POLLIN)
506 {
507 break;
508 }
509 }
510 else
511 {
512 GlobalOutput("TServerSocket::acceptImpl() THRIFT_POLL 0");
513 throw TTransportException(TTransportException::UNKNOWN);
514 }
515 }
516
517 struct sockaddr_storage clientAddress;
518
519 int size = sizeof(clientAddress);
520
521 THRIFT_SOCKET clientSocket = ::accept(serverSocket_,
522 (struct sockaddr*) &clientAddress,
523 (socklen_t*) &size);
524
525 if (clientSocket == -1)
526 {
527 int errno_copy = THRIFT_GET_SOCKET_ERROR;
528 GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
529 throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
530 }
531
532 // Make sure client socket is blocking
533 int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
534
535 if (flags == -1)
536 {
537 int errno_copy = THRIFT_GET_SOCKET_ERROR;
538 GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
539 throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_GETFL)", errno_copy);
540 }
541
542 if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK))
543 {
544 int errno_copy = THRIFT_GET_SOCKET_ERROR;
545 GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ", errno_copy);
546 throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_SETFL)", errno_copy);
547 }
548
549 shared_ptr<TSocket> client = createSocket(clientSocket);
550
551 if (sendTimeout_ > 0)
552 {
553 client->setSendTimeout(sendTimeout_);
554 }
555
556 if (recvTimeout_ > 0)
557 {
558 client->setRecvTimeout(recvTimeout_);
559 }
560
561 client->setCachedAddress((sockaddr*) &clientAddress, size);
562
563 return client;
564 }
565
createSocket(THRIFT_SOCKET clientSocket)566 shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket)
567 {
568 return shared_ptr<TSocket>(new TSocket(clientSocket));
569 }
570
interrupt()571 void TServerSocket::interrupt()
572 {
573 if (intSock1_ != THRIFT_INVALID_SOCKET)
574 {
575 int8_t byte = 0;
576
577 if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0))
578 {
579 GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR);
580 }
581 }
582 }
583
close()584 void TServerSocket::close()
585 {
586 if (serverSocket_ != THRIFT_INVALID_SOCKET)
587 {
588 shutdown(serverSocket_, THRIFT_SHUT_RDWR);
589 ::THRIFT_CLOSESOCKET(serverSocket_);
590 }
591
592 if (intSock1_ != THRIFT_INVALID_SOCKET)
593 {
594 ::THRIFT_CLOSESOCKET(intSock1_);
595 }
596
597 if (intSock2_ != THRIFT_INVALID_SOCKET)
598 {
599 ::THRIFT_CLOSESOCKET(intSock2_);
600 }
601
602 serverSocket_ = THRIFT_INVALID_SOCKET;
603 intSock1_ = THRIFT_INVALID_SOCKET;
604 intSock2_ = THRIFT_INVALID_SOCKET;
605 }
606
607 }
608 }
609 } // apache::thrift::transport
610