1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <cstring>
23 #include <sys/types.h>
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif
27 #ifdef HAVE_SYS_UN_H
28 #include <sys/un.h>
29 #endif
30 #ifdef HAVE_SYS_POLL_H
31 #include <sys/poll.h>
32 #endif
33 #ifdef HAVE_NETINET_IN_H
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #endif
37 #ifdef HAVE_NETDB_H
38 #include <netdb.h>
39 #endif
40 #include <fcntl.h>
41 #ifdef HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44 
45 #include <thrift/transport/TSocket.h>
46 #include <thrift/transport/TServerSocket.h>
47 #include <thrift/transport/PlatformSocket.h>
48 #include <boost/shared_ptr.hpp>
49 
50 #ifndef AF_LOCAL
51 #define AF_LOCAL AF_UNIX
52 #endif
53 
54 #ifndef SOCKOPT_CAST_T
55 #   ifndef _WIN32
56 #       define SOCKOPT_CAST_T void
57 #   else
58 #       define SOCKOPT_CAST_T char
59 #   endif // _WIN32
60 #endif
61 
62 template<class T>
const_cast_sockopt(const T * v)63 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v)
64 {
65     return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
66 }
67 
68 template<class T>
cast_sockopt(T * v)69 inline SOCKOPT_CAST_T* cast_sockopt(T* v)
70 {
71     return reinterpret_cast<SOCKOPT_CAST_T*>(v);
72 }
73 
74 namespace apache
75 {
76 namespace thrift
77 {
78 namespace transport
79 {
80 
81 using namespace std;
82 using boost::shared_ptr;
83 
TServerSocket(int port)84 TServerSocket::TServerSocket(int port) :
85     port_(port),
86     serverSocket_(THRIFT_INVALID_SOCKET),
87     acceptBacklog_(DEFAULT_BACKLOG),
88     sendTimeout_(0),
89     recvTimeout_(0),
90     accTimeout_(-1),
91     retryLimit_(0),
92     retryDelay_(0),
93     tcpSendBuffer_(0),
94     tcpRecvBuffer_(0),
95     intSock1_(THRIFT_INVALID_SOCKET),
96     intSock2_(THRIFT_INVALID_SOCKET) {}
97 
TServerSocket(int port,int sendTimeout,int recvTimeout)98 TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
99     port_(port),
100     serverSocket_(THRIFT_INVALID_SOCKET),
101     acceptBacklog_(DEFAULT_BACKLOG),
102     sendTimeout_(sendTimeout),
103     recvTimeout_(recvTimeout),
104     accTimeout_(-1),
105     retryLimit_(0),
106     retryDelay_(0),
107     tcpSendBuffer_(0),
108     tcpRecvBuffer_(0),
109     intSock1_(THRIFT_INVALID_SOCKET),
110     intSock2_(THRIFT_INVALID_SOCKET) {}
111 
TServerSocket(string path)112 TServerSocket::TServerSocket(string path) :
113     port_(0),
114     path_(path),
115     serverSocket_(THRIFT_INVALID_SOCKET),
116     acceptBacklog_(DEFAULT_BACKLOG),
117     sendTimeout_(0),
118     recvTimeout_(0),
119     accTimeout_(-1),
120     retryLimit_(0),
121     retryDelay_(0),
122     tcpSendBuffer_(0),
123     tcpRecvBuffer_(0),
124     intSock1_(THRIFT_INVALID_SOCKET),
125     intSock2_(THRIFT_INVALID_SOCKET) {}
126 
~TServerSocket()127 TServerSocket::~TServerSocket()
128 {
129     close();
130 }
131 
setSendTimeout(int sendTimeout)132 void TServerSocket::setSendTimeout(int sendTimeout)
133 {
134     sendTimeout_ = sendTimeout;
135 }
136 
setRecvTimeout(int recvTimeout)137 void TServerSocket::setRecvTimeout(int recvTimeout)
138 {
139     recvTimeout_ = recvTimeout;
140 }
141 
setAcceptTimeout(int accTimeout)142 void TServerSocket::setAcceptTimeout(int accTimeout)
143 {
144     accTimeout_ = accTimeout;
145 }
146 
setAcceptBacklog(int accBacklog)147 void TServerSocket::setAcceptBacklog(int accBacklog)
148 {
149     acceptBacklog_ = accBacklog;
150 }
151 
setRetryLimit(int retryLimit)152 void TServerSocket::setRetryLimit(int retryLimit)
153 {
154     retryLimit_ = retryLimit;
155 }
156 
setRetryDelay(int retryDelay)157 void TServerSocket::setRetryDelay(int retryDelay)
158 {
159     retryDelay_ = retryDelay;
160 }
161 
setTcpSendBuffer(int tcpSendBuffer)162 void TServerSocket::setTcpSendBuffer(int tcpSendBuffer)
163 {
164     tcpSendBuffer_ = tcpSendBuffer;
165 }
166 
setTcpRecvBuffer(int tcpRecvBuffer)167 void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer)
168 {
169     tcpRecvBuffer_ = tcpRecvBuffer;
170 }
171 
listen()172 void TServerSocket::listen()
173 {
174     THRIFT_SOCKET sv[2];
175 
176     if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv))
177     {
178         GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR);
179         intSock1_ = THRIFT_INVALID_SOCKET;
180         intSock2_ = THRIFT_INVALID_SOCKET;
181     }
182     else
183     {
184         intSock1_ = sv[1];
185         intSock2_ = sv[0];
186     }
187 
188     struct addrinfo hints, *res, *res0;
189 
190     int error;
191 
192     char port[sizeof("65536") + 1];
193 
194     std::memset(&hints, 0, sizeof(hints));
195 
196     hints.ai_family = PF_UNSPEC;
197 
198     hints.ai_socktype = SOCK_STREAM;
199 
200     hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
201 
202     sprintf(port, "%d", port_);
203 
204     // Wildcard address
205     error = getaddrinfo(NULL, port, &hints, &res0);
206 
207     if (error)
208     {
209         GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error));
210         close();
211         throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
212     }
213 
214     // Pick the ipv6 address first since ipv4 addresses can be mapped
215     // into ipv6 space.
216     for (res = res0; res; res = res->ai_next)
217     {
218         if (res->ai_family == AF_INET6 || res->ai_next == NULL)
219             break;
220     }
221 
222     if (! path_.empty())
223     {
224         serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
225     }
226     else
227     {
228         serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
229     }
230 
231     if (serverSocket_ == THRIFT_INVALID_SOCKET)
232     {
233         int errno_copy = THRIFT_GET_SOCKET_ERROR;
234         GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
235         close();
236         throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy);
237     }
238 
239     // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
240     int one = 1;
241 
242     if (-1 == setsockopt(serverSocket_, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING,
243                          cast_sockopt(&one), sizeof(one)))
244     {
245         //ignore errors coming out of this setsockopt on Windows.  This is because
246         //SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
247         //want to force servers to be an admin.
248 #ifndef _WIN32
249         int errno_copy = THRIFT_GET_SOCKET_ERROR;
250         GlobalOutput.perror("TServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ", errno_copy);
251         close();
252         throw TTransportException(TTransportException::NOT_OPEN, "Could not set THRIFT_NO_SOCKET_CACHING", errno_copy);
253 #endif
254     }
255 
256     // Set TCP buffer sizes
257     if (tcpSendBuffer_ > 0)
258     {
259         if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF,
260                              cast_sockopt(&tcpSendBuffer_), sizeof(tcpSendBuffer_)))
261         {
262             int errno_copy = THRIFT_GET_SOCKET_ERROR;
263             GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
264             close();
265             throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy);
266         }
267     }
268 
269     if (tcpRecvBuffer_ > 0)
270     {
271         if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF,
272                              cast_sockopt(&tcpRecvBuffer_), sizeof(tcpRecvBuffer_)))
273         {
274             int errno_copy = THRIFT_GET_SOCKET_ERROR;
275             GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
276             close();
277             throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy);
278         }
279     }
280 
281     // Defer accept
282 #ifdef TCP_DEFER_ACCEPT
283 
284     if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
285                          &one, sizeof(one)))
286     {
287         int errno_copy = THRIFT_GET_SOCKET_ERROR;
288         GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
289         close();
290         throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy);
291     }
292 
293 #endif // #ifdef TCP_DEFER_ACCEPT
294 
295 #ifdef IPV6_V6ONLY
296 
297     if (res->ai_family == AF_INET6 && path_.empty())
298     {
299         int zero = 0;
300 
301         if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
302                              cast_sockopt(&zero), sizeof(zero)))
303         {
304             GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
305         }
306     }
307 
308 #endif // #ifdef IPV6_V6ONLY
309 
310     // Turn linger off, don't want to block on calls to close
311     struct linger ling = {0, 0};
312 
313     if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
314                          cast_sockopt(&ling), sizeof(ling)))
315     {
316         int errno_copy = THRIFT_GET_SOCKET_ERROR;
317         GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
318         close();
319         throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
320     }
321 
322     // Unix Sockets do not need that
323     if (path_.empty())
324     {
325         // TCP Nodelay, speed over bandwidth
326         if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
327                              cast_sockopt(&one), sizeof(one)))
328         {
329             int errno_copy = THRIFT_GET_SOCKET_ERROR;
330             GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
331             close();
332             throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
333         }
334     }
335 
336     // Set NONBLOCK on the accept socket
337     int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
338 
339     if (flags == -1)
340     {
341         int errno_copy = THRIFT_GET_SOCKET_ERROR;
342         GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
343         throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
344     }
345 
346     if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK))
347     {
348         int errno_copy = THRIFT_GET_SOCKET_ERROR;
349         GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
350         throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
351     }
352 
353     // prepare the port information
354     // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
355     // always seem to work. The client can configure the retry variables.
356     int retries = 0;
357 
358     if (! path_.empty())
359     {
360 
361 #ifndef _WIN32
362 
363         // Unix Domain Socket
364         struct sockaddr_un address;
365         socklen_t len;
366 
367         if (path_.length() > sizeof(address.sun_path))
368         {
369             int errno_copy = THRIFT_GET_SOCKET_ERROR;
370             GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy);
371             throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
372         }
373 
374         address.sun_family = AF_UNIX;
375         THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str());
376         len = sizeof(address);
377 
378         do
379         {
380             if (0 == ::bind(serverSocket_, (struct sockaddr*) &address, len))
381             {
382                 break;
383             }
384 
385             // use short circuit evaluation here to only sleep if we need to
386         }
387         while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
388 
389 #else
390         GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
391         throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported");
392 #endif
393     }
394     else
395     {
396         do
397         {
398             if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen)))
399             {
400                 break;
401             }
402 
403             // use short circuit evaluation here to only sleep if we need to
404         }
405         while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
406 
407         // free addrinfo
408         freeaddrinfo(res0);
409     }
410 
411     // throw an error if we failed to bind properly
412     if (retries > retryLimit_)
413     {
414         char errbuf[1024];
415 
416         if (! path_.empty())
417         {
418             sprintf(errbuf, "TServerSocket::listen() PATH %s", path_.c_str());
419         }
420         else
421         {
422             sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
423         }
424 
425         GlobalOutput(errbuf);
426         close();
427         throw TTransportException(TTransportException::NOT_OPEN, "Could not bind",
428                                   THRIFT_GET_SOCKET_ERROR);
429     }
430 
431     // Call listen
432     if (-1 == ::listen(serverSocket_, acceptBacklog_))
433     {
434         int errno_copy = THRIFT_GET_SOCKET_ERROR;
435         GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
436         close();
437         throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
438     }
439 
440     // The socket is now listening!
441 }
442 
acceptImpl()443 shared_ptr<TTransport> TServerSocket::acceptImpl()
444 {
445     if (serverSocket_ == THRIFT_INVALID_SOCKET)
446     {
447         throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
448     }
449 
450     struct THRIFT_POLLFD fds[2];
451 
452     int maxEintrs = 5;
453 
454     int numEintrs = 0;
455 
456     while (true)
457     {
458         std::memset(fds, 0, sizeof(fds));
459         fds[0].fd = serverSocket_;
460         fds[0].events = THRIFT_POLLIN;
461 
462         if (intSock2_ != THRIFT_INVALID_SOCKET)
463         {
464             fds[1].fd = intSock2_;
465             fds[1].events = THRIFT_POLLIN;
466         }
467 
468         /*
469           TODO: if THRIFT_EINTR is received, we'll restart the timeout.
470           To be accurate, we need to fix this in the future.
471          */
472         int ret = THRIFT_POLL(fds, 2, accTimeout_);
473 
474         if (ret < 0)
475         {
476             // error cases
477             if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && (numEintrs++ < maxEintrs))
478             {
479                 // THRIFT_EINTR needs to be handled manually and we can tolerate
480                 // a certain number
481                 continue;
482             }
483 
484             int errno_copy = THRIFT_GET_SOCKET_ERROR;
485             GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_POLL() ", errno_copy);
486             throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
487         }
488         else if (ret > 0)
489         {
490             // Check for an interrupt signal
491             if (intSock2_ != THRIFT_INVALID_SOCKET
492                     && (fds[1].revents & THRIFT_POLLIN))
493             {
494                 int8_t buf;
495 
496                 if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0))
497                 {
498                     GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", THRIFT_GET_SOCKET_ERROR);
499                 }
500 
501                 throw TTransportException(TTransportException::INTERRUPTED);
502             }
503 
504             // Check for the actual server socket being ready
505             if (fds[0].revents & THRIFT_POLLIN)
506             {
507                 break;
508             }
509         }
510         else
511         {
512             GlobalOutput("TServerSocket::acceptImpl() THRIFT_POLL 0");
513             throw TTransportException(TTransportException::UNKNOWN);
514         }
515     }
516 
517     struct sockaddr_storage clientAddress;
518 
519     int size = sizeof(clientAddress);
520 
521     THRIFT_SOCKET clientSocket = ::accept(serverSocket_,
522                                           (struct sockaddr*) &clientAddress,
523                                           (socklen_t*) &size);
524 
525     if (clientSocket == -1)
526     {
527         int errno_copy = THRIFT_GET_SOCKET_ERROR;
528         GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
529         throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
530     }
531 
532     // Make sure client socket is blocking
533     int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
534 
535     if (flags == -1)
536     {
537         int errno_copy = THRIFT_GET_SOCKET_ERROR;
538         GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
539         throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_GETFL)", errno_copy);
540     }
541 
542     if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK))
543     {
544         int errno_copy = THRIFT_GET_SOCKET_ERROR;
545         GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ", errno_copy);
546         throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_SETFL)", errno_copy);
547     }
548 
549     shared_ptr<TSocket> client = createSocket(clientSocket);
550 
551     if (sendTimeout_ > 0)
552     {
553         client->setSendTimeout(sendTimeout_);
554     }
555 
556     if (recvTimeout_ > 0)
557     {
558         client->setRecvTimeout(recvTimeout_);
559     }
560 
561     client->setCachedAddress((sockaddr*) &clientAddress, size);
562 
563     return client;
564 }
565 
createSocket(THRIFT_SOCKET clientSocket)566 shared_ptr<TSocket> TServerSocket::createSocket(THRIFT_SOCKET clientSocket)
567 {
568     return shared_ptr<TSocket>(new TSocket(clientSocket));
569 }
570 
interrupt()571 void TServerSocket::interrupt()
572 {
573     if (intSock1_ != THRIFT_INVALID_SOCKET)
574     {
575         int8_t byte = 0;
576 
577         if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0))
578         {
579             GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR);
580         }
581     }
582 }
583 
close()584 void TServerSocket::close()
585 {
586     if (serverSocket_ != THRIFT_INVALID_SOCKET)
587     {
588         shutdown(serverSocket_, THRIFT_SHUT_RDWR);
589         ::THRIFT_CLOSESOCKET(serverSocket_);
590     }
591 
592     if (intSock1_ != THRIFT_INVALID_SOCKET)
593     {
594         ::THRIFT_CLOSESOCKET(intSock1_);
595     }
596 
597     if (intSock2_ != THRIFT_INVALID_SOCKET)
598     {
599         ::THRIFT_CLOSESOCKET(intSock2_);
600     }
601 
602     serverSocket_ = THRIFT_INVALID_SOCKET;
603     intSock1_ = THRIFT_INVALID_SOCKET;
604     intSock2_ = THRIFT_INVALID_SOCKET;
605 }
606 
607 }
608 }
609 } // apache::thrift::transport
610