1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <algorithm>
23 #include <iostream>
24 #include <memory>
25 #if __cplusplus >= 201703L
26 #include <random>
27 #endif
28
29 #include <thrift/transport/TSocketPool.h>
30
31 using std::pair;
32 using std::string;
33 using std::vector;
34
35 namespace apache {
36 namespace thrift {
37 namespace transport {
38
39 using std::shared_ptr;
40
41 /**
42 * TSocketPoolServer implementation
43 *
44 */
TSocketPoolServer()45 TSocketPoolServer::TSocketPoolServer()
46 : host_(""), port_(0), socket_(THRIFT_INVALID_SOCKET), lastFailTime_(0), consecutiveFailures_(0) {
47 }
48
49 /**
50 * Constructor for TSocketPool server
51 */
TSocketPoolServer(const string & host,int port)52 TSocketPoolServer::TSocketPoolServer(const string& host, int port)
53 : host_(host),
54 port_(port),
55 socket_(THRIFT_INVALID_SOCKET),
56 lastFailTime_(0),
57 consecutiveFailures_(0) {
58 }
59
60 /**
61 * TSocketPool implementation.
62 *
63 */
64
TSocketPool()65 TSocketPool::TSocketPool()
66 : TSocket(),
67 numRetries_(1),
68 retryInterval_(60),
69 maxConsecutiveFailures_(1),
70 randomize_(true),
71 alwaysTryLast_(true) {
72 }
73
TSocketPool(const vector<string> & hosts,const vector<int> & ports)74 TSocketPool::TSocketPool(const vector<string>& hosts, const vector<int>& ports)
75 : TSocket(),
76 numRetries_(1),
77 retryInterval_(60),
78 maxConsecutiveFailures_(1),
79 randomize_(true),
80 alwaysTryLast_(true) {
81 if (hosts.size() != ports.size()) {
82 GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
83 throw TTransportException(TTransportException::BAD_ARGS);
84 }
85
86 for (unsigned int i = 0; i < hosts.size(); ++i) {
87 addServer(hosts[i], ports[i]);
88 }
89 }
90
TSocketPool(const vector<pair<string,int>> & servers)91 TSocketPool::TSocketPool(const vector<pair<string, int> >& servers)
92 : TSocket(),
93 numRetries_(1),
94 retryInterval_(60),
95 maxConsecutiveFailures_(1),
96 randomize_(true),
97 alwaysTryLast_(true) {
98 for (const auto & server : servers) {
99 addServer(server.first, server.second);
100 }
101 }
102
TSocketPool(const vector<shared_ptr<TSocketPoolServer>> & servers)103 TSocketPool::TSocketPool(const vector<shared_ptr<TSocketPoolServer> >& servers)
104 : TSocket(),
105 servers_(servers),
106 numRetries_(1),
107 retryInterval_(60),
108 maxConsecutiveFailures_(1),
109 randomize_(true),
110 alwaysTryLast_(true) {
111 }
112
TSocketPool(const string & host,int port)113 TSocketPool::TSocketPool(const string& host, int port)
114 : TSocket(),
115 numRetries_(1),
116 retryInterval_(60),
117 maxConsecutiveFailures_(1),
118 randomize_(true),
119 alwaysTryLast_(true) {
120 addServer(host, port);
121 }
122
~TSocketPool()123 TSocketPool::~TSocketPool() {
124 vector<shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
125 vector<shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
126 for (; iter != iterEnd; ++iter) {
127 setCurrentServer(*iter);
128 TSocketPool::close();
129 }
130 }
131
addServer(const string & host,int port)132 void TSocketPool::addServer(const string& host, int port) {
133 servers_.push_back(std::make_shared<TSocketPoolServer>(host, port));
134 }
135
addServer(shared_ptr<TSocketPoolServer> & server)136 void TSocketPool::addServer(shared_ptr<TSocketPoolServer>& server) {
137 if (server) {
138 servers_.push_back(server);
139 }
140 }
141
setServers(const vector<shared_ptr<TSocketPoolServer>> & servers)142 void TSocketPool::setServers(const vector<shared_ptr<TSocketPoolServer> >& servers) {
143 servers_ = servers;
144 }
145
getServers(vector<shared_ptr<TSocketPoolServer>> & servers)146 void TSocketPool::getServers(vector<shared_ptr<TSocketPoolServer> >& servers) {
147 servers = servers_;
148 }
149
setNumRetries(int numRetries)150 void TSocketPool::setNumRetries(int numRetries) {
151 numRetries_ = numRetries;
152 }
153
setRetryInterval(int retryInterval)154 void TSocketPool::setRetryInterval(int retryInterval) {
155 retryInterval_ = retryInterval;
156 }
157
setMaxConsecutiveFailures(int maxConsecutiveFailures)158 void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
159 maxConsecutiveFailures_ = maxConsecutiveFailures;
160 }
161
setRandomize(bool randomize)162 void TSocketPool::setRandomize(bool randomize) {
163 randomize_ = randomize;
164 }
165
setAlwaysTryLast(bool alwaysTryLast)166 void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
167 alwaysTryLast_ = alwaysTryLast;
168 }
169
setCurrentServer(const shared_ptr<TSocketPoolServer> & server)170 void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer>& server) {
171 currentServer_ = server;
172 host_ = server->host_;
173 port_ = server->port_;
174 socket_ = server->socket_;
175 }
176
177 /**
178 * This function throws an exception if socket open fails. When socket
179 * opens fails, the socket in the current server is reset.
180 */
181 /* TODO: without apcu we ignore a lot of functionality from the php version */
open()182 void TSocketPool::open() {
183
184 size_t numServers = servers_.size();
185 if (numServers == 0) {
186 socket_ = THRIFT_INVALID_SOCKET;
187 throw TTransportException(TTransportException::NOT_OPEN);
188 }
189
190 if (isOpen()) {
191 return;
192 }
193
194 if (randomize_ && numServers > 1) {
195 #if __cplusplus >= 201703L
196 std::random_device rng;
197 std::mt19937 urng(rng());
198 std::shuffle(servers_.begin(), servers_.end(), urng);
199 #else
200 std::random_shuffle(servers_.begin(), servers_.end());
201 #endif
202 }
203
204 for (size_t i = 0; i < numServers; ++i) {
205
206 shared_ptr<TSocketPoolServer>& server = servers_[i];
207 // Impersonate the server socket
208 setCurrentServer(server);
209
210 if (isOpen()) {
211 // already open means we're done
212 return;
213 }
214
215 bool retryIntervalPassed = (server->lastFailTime_ == 0);
216 bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
217
218 if (server->lastFailTime_ > 0) {
219 // The server was marked as down, so check if enough time has elapsed to retry
220 time_t elapsedTime = time(nullptr) - server->lastFailTime_;
221 if (elapsedTime > retryInterval_) {
222 retryIntervalPassed = true;
223 }
224 }
225
226 if (retryIntervalPassed || isLastServer) {
227 for (int j = 0; j < numRetries_; ++j) {
228 try {
229 TSocket::open();
230 } catch (const TException &e) {
231 string errStr = "TSocketPool::open failed " + getSocketInfo() + ": " + e.what();
232 GlobalOutput(errStr.c_str());
233 socket_ = THRIFT_INVALID_SOCKET;
234 continue;
235 }
236
237 // Copy over the opened socket so that we can keep it persistent
238 server->socket_ = socket_;
239 // reset lastFailTime_ is required
240 server->lastFailTime_ = 0;
241 // success
242 return;
243 }
244
245 ++server->consecutiveFailures_;
246 if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
247 // Mark server as down
248 server->consecutiveFailures_ = 0;
249 server->lastFailTime_ = time(nullptr);
250 }
251 }
252 }
253
254 GlobalOutput("TSocketPool::open: all connections failed");
255 throw TTransportException(TTransportException::NOT_OPEN);
256 }
257
close()258 void TSocketPool::close() {
259 TSocket::close();
260 if (currentServer_) {
261 currentServer_->socket_ = THRIFT_INVALID_SOCKET;
262 }
263 }
264 }
265 }
266 } // apache::thrift::transport
267