1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <cstring>
23 #include <memory>
24 #include <stdexcept>
25 #include <sys/types.h>
26 #ifdef HAVE_SYS_SOCKET_H
27 #include <sys/socket.h>
28 #endif
29 #ifdef HAVE_SYS_UN_H
30 #include <sys/un.h>
31 #endif
32 #ifdef HAVE_SYS_POLL_H
33 #include <sys/poll.h>
34 #endif
35 #ifdef HAVE_NETINET_IN_H
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #endif
39 #ifdef HAVE_NETDB_H
40 #include <netdb.h>
41 #endif
42 #include <fcntl.h>
43 #ifdef HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46
47 #include <thrift/transport/PlatformSocket.h>
48 #include <thrift/transport/TNonblockingServerSocket.h>
49 #include <thrift/transport/TSocket.h>
50 #include <thrift/transport/TSocketUtils.h>
51 #include <thrift/transport/SocketCommon.h>
52
53 #ifndef AF_LOCAL
54 #define AF_LOCAL AF_UNIX
55 #endif
56
57 #ifndef SOCKOPT_CAST_T
58 #ifndef _WIN32
59 #define SOCKOPT_CAST_T void
60 #else
61 #define SOCKOPT_CAST_T char
62 #endif // _WIN32
63 #endif
64
65 template <class T>
const_cast_sockopt(const T * v)66 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
67 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
68 }
69
70 template <class T>
cast_sockopt(T * v)71 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
72 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
73 }
74
75 namespace apache {
76 namespace thrift {
77 namespace transport {
78
79 using std::shared_ptr;
80 using std::string;
81
TNonblockingServerSocket(int port)82 TNonblockingServerSocket::TNonblockingServerSocket(int port)
83 : port_(port),
84 listenPort_(port),
85 serverSocket_(THRIFT_INVALID_SOCKET),
86 acceptBacklog_(DEFAULT_BACKLOG),
87 sendTimeout_(0),
88 recvTimeout_(0),
89 retryLimit_(0),
90 retryDelay_(0),
91 tcpSendBuffer_(0),
92 tcpRecvBuffer_(0),
93 keepAlive_(false),
94 listening_(false) {
95 }
96
TNonblockingServerSocket(int port,int sendTimeout,int recvTimeout)97 TNonblockingServerSocket::TNonblockingServerSocket(int port, int sendTimeout, int recvTimeout)
98 : port_(port),
99 listenPort_(port),
100 serverSocket_(THRIFT_INVALID_SOCKET),
101 acceptBacklog_(DEFAULT_BACKLOG),
102 sendTimeout_(sendTimeout),
103 recvTimeout_(recvTimeout),
104 retryLimit_(0),
105 retryDelay_(0),
106 tcpSendBuffer_(0),
107 tcpRecvBuffer_(0),
108 keepAlive_(false),
109 listening_(false) {
110 }
111
TNonblockingServerSocket(const string & address,int port)112 TNonblockingServerSocket::TNonblockingServerSocket(const string& address, int port)
113 : port_(port),
114 listenPort_(port),
115 address_(address),
116 serverSocket_(THRIFT_INVALID_SOCKET),
117 acceptBacklog_(DEFAULT_BACKLOG),
118 sendTimeout_(0),
119 recvTimeout_(0),
120 retryLimit_(0),
121 retryDelay_(0),
122 tcpSendBuffer_(0),
123 tcpRecvBuffer_(0),
124 keepAlive_(false),
125 listening_(false) {
126 }
127
TNonblockingServerSocket(const string & path)128 TNonblockingServerSocket::TNonblockingServerSocket(const string& path)
129 : port_(0),
130 listenPort_(0),
131 path_(path),
132 serverSocket_(THRIFT_INVALID_SOCKET),
133 acceptBacklog_(DEFAULT_BACKLOG),
134 sendTimeout_(0),
135 recvTimeout_(0),
136 retryLimit_(0),
137 retryDelay_(0),
138 tcpSendBuffer_(0),
139 tcpRecvBuffer_(0),
140 keepAlive_(false),
141 listening_(false) {
142 }
143
~TNonblockingServerSocket()144 TNonblockingServerSocket::~TNonblockingServerSocket() {
145 close();
146 }
147
setSendTimeout(int sendTimeout)148 void TNonblockingServerSocket::setSendTimeout(int sendTimeout) {
149 sendTimeout_ = sendTimeout;
150 }
151
setRecvTimeout(int recvTimeout)152 void TNonblockingServerSocket::setRecvTimeout(int recvTimeout) {
153 recvTimeout_ = recvTimeout;
154 }
155
setAcceptBacklog(int accBacklog)156 void TNonblockingServerSocket::setAcceptBacklog(int accBacklog) {
157 acceptBacklog_ = accBacklog;
158 }
159
setRetryLimit(int retryLimit)160 void TNonblockingServerSocket::setRetryLimit(int retryLimit) {
161 retryLimit_ = retryLimit;
162 }
163
setRetryDelay(int retryDelay)164 void TNonblockingServerSocket::setRetryDelay(int retryDelay) {
165 retryDelay_ = retryDelay;
166 }
167
setTcpSendBuffer(int tcpSendBuffer)168 void TNonblockingServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
169 tcpSendBuffer_ = tcpSendBuffer;
170 }
171
setTcpRecvBuffer(int tcpRecvBuffer)172 void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
173 tcpRecvBuffer_ = tcpRecvBuffer;
174 }
175
_setup_sockopts()176 void TNonblockingServerSocket::_setup_sockopts() {
177
178 // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
179 int one = 1;
180 if (-1 == setsockopt(serverSocket_,
181 SOL_SOCKET,
182 THRIFT_NO_SOCKET_CACHING,
183 cast_sockopt(&one),
184 sizeof(one))) {
185 // ignore errors coming out of this setsockopt on Windows. This is because
186 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
187 // want to force servers to be an admin.
188 #ifndef _WIN32
189 int errno_copy = THRIFT_GET_SOCKET_ERROR;
190 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
191 errno_copy);
192 close();
193 throw TTransportException(TTransportException::NOT_OPEN,
194 "Could not set THRIFT_NO_SOCKET_CACHING",
195 errno_copy);
196 #endif
197 }
198
199 // Set TCP buffer sizes
200 if (tcpSendBuffer_ > 0) {
201 if (-1 == setsockopt(serverSocket_,
202 SOL_SOCKET,
203 SO_SNDBUF,
204 cast_sockopt(&tcpSendBuffer_),
205 sizeof(tcpSendBuffer_))) {
206 int errno_copy = THRIFT_GET_SOCKET_ERROR;
207 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
208 close();
209 throw TTransportException(TTransportException::NOT_OPEN,
210 "Could not set SO_SNDBUF",
211 errno_copy);
212 }
213 }
214
215 if (tcpRecvBuffer_ > 0) {
216 if (-1 == setsockopt(serverSocket_,
217 SOL_SOCKET,
218 SO_RCVBUF,
219 cast_sockopt(&tcpRecvBuffer_),
220 sizeof(tcpRecvBuffer_))) {
221 int errno_copy = THRIFT_GET_SOCKET_ERROR;
222 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
223 close();
224 throw TTransportException(TTransportException::NOT_OPEN,
225 "Could not set SO_RCVBUF",
226 errno_copy);
227 }
228 }
229
230 // Turn linger off, don't want to block on calls to close
231 struct linger ling = {0, 0};
232 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
233 int errno_copy = THRIFT_GET_SOCKET_ERROR;
234 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
235 close();
236 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
237 }
238
239 // Keepalive to ensure full result flushing
240 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one))) {
241 int errno_copy = THRIFT_GET_SOCKET_ERROR;
242 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_KEEPALIVE ", errno_copy);
243 close();
244 throw TTransportException(TTransportException::NOT_OPEN,
245 "Could not set TCP_NODELAY",
246 errno_copy);
247 }
248
249 // Set NONBLOCK on the accept socket
250 int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
251 if (flags == -1) {
252 int errno_copy = THRIFT_GET_SOCKET_ERROR;
253 GlobalOutput.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
254 close();
255 throw TTransportException(TTransportException::NOT_OPEN,
256 "THRIFT_FCNTL() THRIFT_F_GETFL failed",
257 errno_copy);
258 }
259
260 if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
261 int errno_copy = THRIFT_GET_SOCKET_ERROR;
262 GlobalOutput.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
263 close();
264 throw TTransportException(TTransportException::NOT_OPEN,
265 "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
266 errno_copy);
267 }
268
269 } // _setup_sockopts()
270
_setup_tcp_sockopts()271 void TNonblockingServerSocket::_setup_tcp_sockopts() {
272 int one = 1;
273
274 // Set TCP nodelay if available, MAC OS X Hack
275 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
276 #ifndef TCP_NOPUSH
277 // TCP Nodelay, speed over bandwidth
278 if (-1
279 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
280 int errno_copy = THRIFT_GET_SOCKET_ERROR;
281 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
282 close();
283 throw TTransportException(TTransportException::NOT_OPEN,
284 "Could not set TCP_NODELAY",
285 errno_copy);
286 }
287 #endif
288
289 #ifdef TCP_LOW_MIN_RTO
290 if (TSocket::getUseLowMinRto()) {
291 if (-1 == setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one))) {
292 int errno_copy = THRIFT_GET_SOCKET_ERROR;
293 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_LOW_MIN_RTO ", errno_copy);
294 close();
295 throw TTransportException(TTransportException::NOT_OPEN,
296 "Could not set TCP_NODELAY",
297 errno_copy);
298 }
299 }
300 #endif
301
302 } // _setup_tcp_sockopts()
303
listen()304 void TNonblockingServerSocket::listen() {
305 listening_ = true;
306 #ifdef _WIN32
307 TWinsockSingleton::create();
308 #endif // _WIN32
309
310 // tcp == false means Unix Domain socket
311 bool tcp = (path_.empty());
312
313 // Validate port number
314 if (port_ < 0 || port_ > 0xFFFF) {
315 throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
316 }
317
318 // Resolve host:port strings into an iterable of struct addrinfo*
319 AddressResolutionHelper resolved_addresses;
320 if (tcp) {
321 try {
322 resolved_addresses.resolve(address_, std::to_string(port_), SOCK_STREAM,
323 AI_PASSIVE | AI_V4MAPPED);
324 } catch (const std::system_error& e) {
325 GlobalOutput.printf("getaddrinfo() -> %d. %s", e.code().value(), e.what());
326 close();
327 throw TTransportException(TTransportException::NOT_OPEN,
328 "Could not resolve host for server socket.");
329 }
330 }
331
332 // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
333 // always seem to work. The client can configure the retry variables.
334 int retries = 0;
335 int errno_copy = 0;
336
337 if (!tcp) {
338 // -- Unix Domain Socket -- //
339
340 serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
341
342 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
343 int errno_copy = THRIFT_GET_SOCKET_ERROR;
344 GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
345 close();
346 throw TTransportException(TTransportException::NOT_OPEN,
347 "Could not create server socket.",
348 errno_copy);
349 }
350
351 _setup_sockopts();
352 //_setup_unixdomain_sockopts();
353
354 /*
355 * TODO: seems that windows now support unix sockets,
356 * see: https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/
357 */
358 #ifndef _WIN32
359
360 struct sockaddr_un address;
361 socklen_t structlen = fillUnixSocketAddr(address, path_);
362
363 do {
364 if (0 == ::bind(serverSocket_, (struct sockaddr*)&address, structlen)) {
365 break;
366 }
367 errno_copy = THRIFT_GET_SOCKET_ERROR;
368 // use short circuit evaluation here to only sleep if we need to
369 } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
370 #else
371 GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
372 throw TTransportException(TTransportException::NOT_OPEN,
373 " Unix Domain socket path not supported");
374 #endif
375 } else {
376
377 // -- TCP socket -- //
378
379 auto addr_iter = AddressResolutionHelper::Iter{};
380
381 // Via DNS or somehow else, single hostname can resolve into many addresses.
382 // Results may contain perhaps a mix of IPv4 and IPv6. Here, we iterate
383 // over what system gave us, picking the first address that works.
384 do {
385 if (!addr_iter) {
386 // init + recycle over many retries
387 addr_iter = resolved_addresses.iterate();
388 }
389 auto trybind = *addr_iter++;
390
391 serverSocket_ = socket(trybind->ai_family, trybind->ai_socktype, trybind->ai_protocol);
392 if (serverSocket_ == -1) {
393 errno_copy = THRIFT_GET_SOCKET_ERROR;
394 continue;
395 }
396
397 _setup_sockopts();
398 _setup_tcp_sockopts();
399
400 #ifdef IPV6_V6ONLY
401 if (trybind->ai_family == AF_INET6) {
402 int zero = 0;
403 if (-1 == setsockopt(serverSocket_,
404 IPPROTO_IPV6,
405 IPV6_V6ONLY,
406 cast_sockopt(&zero),
407 sizeof(zero))) {
408 GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
409 }
410 }
411 #endif // #ifdef IPV6_V6ONLY
412
413 if (0 == ::bind(serverSocket_, trybind->ai_addr, static_cast<int>(trybind->ai_addrlen))) {
414 break;
415 }
416 errno_copy = THRIFT_GET_SOCKET_ERROR;
417
418 // use short circuit evaluation here to only sleep if we need to
419 } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
420
421 // retrieve bind info
422 if (port_ == 0 && retries <= retryLimit_) {
423 struct sockaddr_storage sa;
424 socklen_t len = sizeof(sa);
425 std::memset(&sa, 0, len);
426 if (::getsockname(serverSocket_, reinterpret_cast<struct sockaddr*>(&sa), &len) < 0) {
427 errno_copy = THRIFT_GET_SOCKET_ERROR;
428 GlobalOutput.perror("TNonblockingServerSocket::getPort() getsockname() ", errno_copy);
429 } else {
430 if (sa.ss_family == AF_INET6) {
431 const auto* sin = reinterpret_cast<const struct sockaddr_in6*>(&sa);
432 listenPort_ = ntohs(sin->sin6_port);
433 } else {
434 const auto* sin = reinterpret_cast<const struct sockaddr_in*>(&sa);
435 listenPort_ = ntohs(sin->sin_port);
436 }
437 }
438 }
439 } // TCP socket //
440
441 // throw error if socket still wasn't created successfully
442 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
443 GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
444 close();
445 throw TTransportException(TTransportException::NOT_OPEN,
446 "Could not create server socket.",
447 errno_copy);
448 }
449
450 // throw an error if we failed to bind properly
451 if (retries > retryLimit_) {
452 char errbuf[1024];
453 if (!tcp) {
454 THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() PATH %s", path_.c_str());
455 } else {
456 THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() BIND %d", port_);
457 }
458 GlobalOutput(errbuf);
459 close();
460 throw TTransportException(TTransportException::NOT_OPEN,
461 "Could not bind",
462 errno_copy);
463 }
464
465 if (listenCallback_)
466 listenCallback_(serverSocket_);
467
468 // Call listen
469 if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
470 errno_copy = THRIFT_GET_SOCKET_ERROR;
471 GlobalOutput.perror("TNonblockingServerSocket::listen() listen() ", errno_copy);
472 close();
473 throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
474 }
475
476 // The socket is now listening!
477 }
478
getPort()479 int TNonblockingServerSocket::getPort() {
480 return port_;
481 }
482
getListenPort()483 int TNonblockingServerSocket::getListenPort() {
484 return listenPort_;
485 }
486
acceptImpl()487 shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
488 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
489 throw TTransportException(TTransportException::NOT_OPEN,
490 "TNonblockingServerSocket not listening");
491 }
492
493 struct sockaddr_storage clientAddress;
494 int size = sizeof(clientAddress);
495 THRIFT_SOCKET clientSocket
496 = ::accept(serverSocket_, (struct sockaddr*)&clientAddress, (socklen_t*)&size);
497
498 if (clientSocket == THRIFT_INVALID_SOCKET) {
499 int errno_copy = THRIFT_GET_SOCKET_ERROR;
500 GlobalOutput.perror("TNonblockingServerSocket::acceptImpl() ::accept() ", errno_copy);
501 throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
502 }
503
504 // Explicitly set this socket to NONBLOCK mode
505 int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
506 if (flags == -1) {
507 int errno_copy = THRIFT_GET_SOCKET_ERROR;
508 ::THRIFT_CLOSESOCKET(clientSocket);
509 GlobalOutput.perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
510 throw TTransportException(TTransportException::UNKNOWN,
511 "THRIFT_FCNTL(THRIFT_F_GETFL)",
512 errno_copy);
513 }
514
515 if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
516 int errno_copy = THRIFT_GET_SOCKET_ERROR;
517 ::THRIFT_CLOSESOCKET(clientSocket);
518 GlobalOutput
519 .perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ",
520 errno_copy);
521 throw TTransportException(TTransportException::UNKNOWN,
522 "THRIFT_FCNTL(THRIFT_F_SETFL)",
523 errno_copy);
524 }
525
526 shared_ptr<TSocket> client = createSocket(clientSocket);
527 if (sendTimeout_ > 0) {
528 client->setSendTimeout(sendTimeout_);
529 }
530 if (recvTimeout_ > 0) {
531 client->setRecvTimeout(recvTimeout_);
532 }
533 if (keepAlive_) {
534 client->setKeepAlive(keepAlive_);
535 }
536 client->setCachedAddress((sockaddr*)&clientAddress, size);
537
538 if (acceptCallback_)
539 acceptCallback_(clientSocket);
540
541 return client;
542 }
543
createSocket(THRIFT_SOCKET clientSocket)544 shared_ptr<TSocket> TNonblockingServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
545 return std::make_shared<TSocket>(clientSocket);
546 }
547
close()548 void TNonblockingServerSocket::close() {
549 if (serverSocket_ != THRIFT_INVALID_SOCKET) {
550 shutdown(serverSocket_, THRIFT_SHUT_RDWR);
551 ::THRIFT_CLOSESOCKET(serverSocket_);
552 }
553 serverSocket_ = THRIFT_INVALID_SOCKET;
554 listening_ = false;
555 }
556 } // namespace transport
557 } // namespace thrift
558 } // namespace apache
559