1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <algorithm>
21 #include <stdexcept>
22 #include <stdint.h>
23 #include <thrift/server/TServerFramework.h>
24
25 namespace apache {
26 namespace thrift {
27 namespace server {
28
29 using apache::thrift::concurrency::Synchronized;
30 using apache::thrift::protocol::TProtocol;
31 using apache::thrift::protocol::TProtocolFactory;
32 using std::bind;
33 using std::shared_ptr;
34 using apache::thrift::transport::TServerTransport;
35 using apache::thrift::transport::TTransport;
36 using apache::thrift::transport::TTransportException;
37 using apache::thrift::transport::TTransportFactory;
38 using std::string;
39
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)40 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
41 const shared_ptr<TServerTransport>& serverTransport,
42 const shared_ptr<TTransportFactory>& transportFactory,
43 const shared_ptr<TProtocolFactory>& protocolFactory)
44 : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
45 clients_(0),
46 hwm_(0),
47 limit_(INT64_MAX) {
48 }
49
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)50 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
51 const shared_ptr<TServerTransport>& serverTransport,
52 const shared_ptr<TTransportFactory>& transportFactory,
53 const shared_ptr<TProtocolFactory>& protocolFactory)
54 : TServer(processor, serverTransport, transportFactory, protocolFactory),
55 clients_(0),
56 hwm_(0),
57 limit_(INT64_MAX) {
58 }
59
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)60 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
61 const shared_ptr<TServerTransport>& serverTransport,
62 const shared_ptr<TTransportFactory>& inputTransportFactory,
63 const shared_ptr<TTransportFactory>& outputTransportFactory,
64 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
65 const shared_ptr<TProtocolFactory>& outputProtocolFactory)
66 : TServer(processorFactory,
67 serverTransport,
68 inputTransportFactory,
69 outputTransportFactory,
70 inputProtocolFactory,
71 outputProtocolFactory),
72 clients_(0),
73 hwm_(0),
74 limit_(INT64_MAX) {
75 }
76
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)77 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
78 const shared_ptr<TServerTransport>& serverTransport,
79 const shared_ptr<TTransportFactory>& inputTransportFactory,
80 const shared_ptr<TTransportFactory>& outputTransportFactory,
81 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
82 const shared_ptr<TProtocolFactory>& outputProtocolFactory)
83 : TServer(processor,
84 serverTransport,
85 inputTransportFactory,
86 outputTransportFactory,
87 inputProtocolFactory,
88 outputProtocolFactory),
89 clients_(0),
90 hwm_(0),
91 limit_(INT64_MAX) {
92 }
93
94 TServerFramework::~TServerFramework() = default;
95
96 template <typename T>
releaseOneDescriptor(const string & name,T & pTransport)97 static void releaseOneDescriptor(const string& name, T& pTransport) {
98 if (pTransport) {
99 try {
100 pTransport->close();
101 } catch (const TTransportException& ttx) {
102 string errStr = string("TServerFramework " + name + " close failed: ") + ttx.what();
103 GlobalOutput(errStr.c_str());
104 }
105 }
106 }
107
serve()108 void TServerFramework::serve() {
109 shared_ptr<TTransport> client;
110 shared_ptr<TTransport> inputTransport;
111 shared_ptr<TTransport> outputTransport;
112 shared_ptr<TProtocol> inputProtocol;
113 shared_ptr<TProtocol> outputProtocol;
114
115 // Start the server listening
116 serverTransport_->listen();
117
118 // Run the preServe event to indicate server is now listening
119 // and that it is safe to connect.
120 if (eventHandler_) {
121 eventHandler_->preServe();
122 }
123
124 // Fetch client from server
125 for (;;) {
126 try {
127 // Dereference any resources from any previous client creation
128 // such that a blocking accept does not hold them indefinitely.
129 outputProtocol.reset();
130 inputProtocol.reset();
131 outputTransport.reset();
132 inputTransport.reset();
133 client.reset();
134
135 // If we have reached the limit on the number of concurrent
136 // clients allowed, wait for one or more clients to drain before
137 // accepting another.
138 {
139 Synchronized sync(mon_);
140 while (clients_ >= limit_) {
141 mon_.wait();
142 }
143 }
144
145 client = serverTransport_->accept();
146
147 inputTransport = inputTransportFactory_->getTransport(client);
148 outputTransport = outputTransportFactory_->getTransport(client);
149 if (!outputProtocolFactory_) {
150 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
151 outputProtocol = inputProtocol;
152 } else {
153 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
154 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
155 }
156
157 newlyConnectedClient(shared_ptr<TConnectedClient>(
158 new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
159 inputProtocol,
160 outputProtocol,
161 eventHandler_,
162 client),
163 bind(&TServerFramework::disposeConnectedClient, this, std::placeholders::_1)));
164
165 } catch (TTransportException& ttx) {
166 releaseOneDescriptor("inputTransport", inputTransport);
167 releaseOneDescriptor("outputTransport", outputTransport);
168 releaseOneDescriptor("client", client);
169 if (ttx.getType() == TTransportException::TIMED_OUT
170 || ttx.getType() == TTransportException::CLIENT_DISCONNECT) {
171 // Accept timeout and client disconnect - continue processing.
172 continue;
173 } else if (ttx.getType() == TTransportException::END_OF_FILE
174 || ttx.getType() == TTransportException::INTERRUPTED) {
175 // Server was interrupted. This only happens when stopping.
176 break;
177 } else {
178 // All other transport exceptions are logged.
179 // State of connection is unknown. Done.
180 string errStr = string("TServerTransport died: ") + ttx.what();
181 GlobalOutput(errStr.c_str());
182 break;
183 }
184 }
185 }
186
187 releaseOneDescriptor("serverTransport", serverTransport_);
188 }
189
getConcurrentClientLimit() const190 int64_t TServerFramework::getConcurrentClientLimit() const {
191 Synchronized sync(mon_);
192 return limit_;
193 }
194
getConcurrentClientCount() const195 int64_t TServerFramework::getConcurrentClientCount() const {
196 Synchronized sync(mon_);
197 return clients_;
198 }
199
getConcurrentClientCountHWM() const200 int64_t TServerFramework::getConcurrentClientCountHWM() const {
201 Synchronized sync(mon_);
202 return hwm_;
203 }
204
setConcurrentClientLimit(int64_t newLimit)205 void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
206 if (newLimit < 1) {
207 throw std::invalid_argument("newLimit must be greater than zero");
208 }
209 Synchronized sync(mon_);
210 limit_ = newLimit;
211 if (limit_ - clients_ > 0) {
212 mon_.notify();
213 }
214 }
215
stop()216 void TServerFramework::stop() {
217 // Order is important because serve() releases serverTransport_ when it is
218 // interrupted, which closes the socket that interruptChildren uses.
219 serverTransport_->interruptChildren();
220 serverTransport_->interrupt();
221 }
222
newlyConnectedClient(const shared_ptr<TConnectedClient> & pClient)223 void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient>& pClient) {
224 {
225 Synchronized sync(mon_);
226 ++clients_;
227 hwm_ = (std::max)(hwm_, clients_);
228 }
229
230 onClientConnected(pClient);
231 }
232
disposeConnectedClient(TConnectedClient * pClient)233 void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
234 onClientDisconnected(pClient);
235 delete pClient;
236
237 Synchronized sync(mon_);
238 if (limit_ - --clients_ > 0) {
239 mon_.notify();
240 }
241 }
242
243 }
244 }
245 } // apache::thrift::server
246