1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <algorithm>
23 #include <iostream>
24 #include <memory>
25 #if __cplusplus >= 201703L
26 #include <random>
27 #endif
28 
29 #include <thrift/transport/TSocketPool.h>
30 
31 using std::pair;
32 using std::string;
33 using std::vector;
34 
35 namespace apache {
36 namespace thrift {
37 namespace transport {
38 
39 using std::shared_ptr;
40 
41 /**
42  * TSocketPoolServer implementation
43  *
44  */
TSocketPoolServer()45 TSocketPoolServer::TSocketPoolServer()
46   : host_(""), port_(0), socket_(THRIFT_INVALID_SOCKET), lastFailTime_(0), consecutiveFailures_(0) {
47 }
48 
49 /**
50  * Constructor for TSocketPool server
51  */
TSocketPoolServer(const string & host,int port)52 TSocketPoolServer::TSocketPoolServer(const string& host, int port)
53   : host_(host),
54     port_(port),
55     socket_(THRIFT_INVALID_SOCKET),
56     lastFailTime_(0),
57     consecutiveFailures_(0) {
58 }
59 
60 /**
61  * TSocketPool implementation.
62  *
63  */
64 
TSocketPool()65 TSocketPool::TSocketPool()
66   : TSocket(),
67     numRetries_(1),
68     retryInterval_(60),
69     maxConsecutiveFailures_(1),
70     randomize_(true),
71     alwaysTryLast_(true) {
72 }
73 
TSocketPool(const vector<string> & hosts,const vector<int> & ports)74 TSocketPool::TSocketPool(const vector<string>& hosts, const vector<int>& ports)
75   : TSocket(),
76     numRetries_(1),
77     retryInterval_(60),
78     maxConsecutiveFailures_(1),
79     randomize_(true),
80     alwaysTryLast_(true) {
81   if (hosts.size() != ports.size()) {
82     GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
83     throw TTransportException(TTransportException::BAD_ARGS);
84   }
85 
86   for (unsigned int i = 0; i < hosts.size(); ++i) {
87     addServer(hosts[i], ports[i]);
88   }
89 }
90 
TSocketPool(const vector<pair<string,int>> & servers)91 TSocketPool::TSocketPool(const vector<pair<string, int> >& servers)
92   : TSocket(),
93     numRetries_(1),
94     retryInterval_(60),
95     maxConsecutiveFailures_(1),
96     randomize_(true),
97     alwaysTryLast_(true) {
98   for (const auto & server : servers) {
99     addServer(server.first, server.second);
100   }
101 }
102 
TSocketPool(const vector<shared_ptr<TSocketPoolServer>> & servers)103 TSocketPool::TSocketPool(const vector<shared_ptr<TSocketPoolServer> >& servers)
104   : TSocket(),
105     servers_(servers),
106     numRetries_(1),
107     retryInterval_(60),
108     maxConsecutiveFailures_(1),
109     randomize_(true),
110     alwaysTryLast_(true) {
111 }
112 
TSocketPool(const string & host,int port)113 TSocketPool::TSocketPool(const string& host, int port)
114   : TSocket(),
115     numRetries_(1),
116     retryInterval_(60),
117     maxConsecutiveFailures_(1),
118     randomize_(true),
119     alwaysTryLast_(true) {
120   addServer(host, port);
121 }
122 
~TSocketPool()123 TSocketPool::~TSocketPool() {
124   vector<shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
125   vector<shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
126   for (; iter != iterEnd; ++iter) {
127     setCurrentServer(*iter);
128     TSocketPool::close();
129   }
130 }
131 
addServer(const string & host,int port)132 void TSocketPool::addServer(const string& host, int port) {
133   servers_.push_back(std::make_shared<TSocketPoolServer>(host, port));
134 }
135 
addServer(shared_ptr<TSocketPoolServer> & server)136 void TSocketPool::addServer(shared_ptr<TSocketPoolServer>& server) {
137   if (server) {
138     servers_.push_back(server);
139   }
140 }
141 
setServers(const vector<shared_ptr<TSocketPoolServer>> & servers)142 void TSocketPool::setServers(const vector<shared_ptr<TSocketPoolServer> >& servers) {
143   servers_ = servers;
144 }
145 
getServers(vector<shared_ptr<TSocketPoolServer>> & servers)146 void TSocketPool::getServers(vector<shared_ptr<TSocketPoolServer> >& servers) {
147   servers = servers_;
148 }
149 
setNumRetries(int numRetries)150 void TSocketPool::setNumRetries(int numRetries) {
151   numRetries_ = numRetries;
152 }
153 
setRetryInterval(int retryInterval)154 void TSocketPool::setRetryInterval(int retryInterval) {
155   retryInterval_ = retryInterval;
156 }
157 
setMaxConsecutiveFailures(int maxConsecutiveFailures)158 void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
159   maxConsecutiveFailures_ = maxConsecutiveFailures;
160 }
161 
setRandomize(bool randomize)162 void TSocketPool::setRandomize(bool randomize) {
163   randomize_ = randomize;
164 }
165 
setAlwaysTryLast(bool alwaysTryLast)166 void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
167   alwaysTryLast_ = alwaysTryLast;
168 }
169 
setCurrentServer(const shared_ptr<TSocketPoolServer> & server)170 void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer>& server) {
171   currentServer_ = server;
172   host_ = server->host_;
173   port_ = server->port_;
174   socket_ = server->socket_;
175 }
176 
177 /**
178  * This function throws an exception if socket open fails. When socket
179  * opens fails, the socket in the current server is reset.
180  */
181 /* TODO: without apcu we ignore a lot of functionality from the php version */
open()182 void TSocketPool::open() {
183 
184   size_t numServers = servers_.size();
185   if (numServers == 0) {
186     socket_ = THRIFT_INVALID_SOCKET;
187     throw TTransportException(TTransportException::NOT_OPEN);
188   }
189 
190   if (isOpen()) {
191     return;
192   }
193 
194   if (randomize_ && numServers > 1) {
195 #if __cplusplus >= 201703L
196     std::random_device rng;
197     std::mt19937 urng(rng());
198     std::shuffle(servers_.begin(), servers_.end(), urng);
199 #else
200     std::random_shuffle(servers_.begin(), servers_.end());
201 #endif
202   }
203 
204   for (size_t i = 0; i < numServers; ++i) {
205 
206     shared_ptr<TSocketPoolServer>& server = servers_[i];
207     // Impersonate the server socket
208     setCurrentServer(server);
209 
210     if (isOpen()) {
211       // already open means we're done
212       return;
213     }
214 
215     bool retryIntervalPassed = (server->lastFailTime_ == 0);
216     bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
217 
218     if (server->lastFailTime_ > 0) {
219       // The server was marked as down, so check if enough time has elapsed to retry
220       time_t elapsedTime = time(nullptr) - server->lastFailTime_;
221       if (elapsedTime > retryInterval_) {
222         retryIntervalPassed = true;
223       }
224     }
225 
226     if (retryIntervalPassed || isLastServer) {
227       for (int j = 0; j < numRetries_; ++j) {
228         try {
229           TSocket::open();
230         } catch (const TException &e) {
231           string errStr = "TSocketPool::open failed " + getSocketInfo() + ": " + e.what();
232           GlobalOutput(errStr.c_str());
233           socket_ = THRIFT_INVALID_SOCKET;
234           continue;
235         }
236 
237         // Copy over the opened socket so that we can keep it persistent
238         server->socket_ = socket_;
239         // reset lastFailTime_ is required
240         server->lastFailTime_ = 0;
241         // success
242         return;
243       }
244 
245       ++server->consecutiveFailures_;
246       if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
247         // Mark server as down
248         server->consecutiveFailures_ = 0;
249         server->lastFailTime_ = time(nullptr);
250       }
251     }
252   }
253 
254   GlobalOutput("TSocketPool::open: all connections failed");
255   throw TTransportException(TTransportException::NOT_OPEN);
256 }
257 
close()258 void TSocketPool::close() {
259   TSocket::close();
260   if (currentServer_) {
261     currentServer_->socket_ = THRIFT_INVALID_SOCKET;
262   }
263 }
264 }
265 }
266 } // apache::thrift::transport
267