1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <algorithm>
21 #include <stdexcept>
22 #include <stdint.h>
23 #include <thrift/server/TServerFramework.h>
24 
25 namespace apache {
26 namespace thrift {
27 namespace server {
28 
29 using apache::thrift::concurrency::Synchronized;
30 using apache::thrift::protocol::TProtocol;
31 using apache::thrift::protocol::TProtocolFactory;
32 using std::bind;
33 using std::shared_ptr;
34 using apache::thrift::transport::TServerTransport;
35 using apache::thrift::transport::TTransport;
36 using apache::thrift::transport::TTransportException;
37 using apache::thrift::transport::TTransportFactory;
38 using std::string;
39 
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)40 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
41                                    const shared_ptr<TServerTransport>& serverTransport,
42                                    const shared_ptr<TTransportFactory>& transportFactory,
43                                    const shared_ptr<TProtocolFactory>& protocolFactory)
44   : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
45     clients_(0),
46     hwm_(0),
47     limit_(INT64_MAX) {
48 }
49 
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)50 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
51                                    const shared_ptr<TServerTransport>& serverTransport,
52                                    const shared_ptr<TTransportFactory>& transportFactory,
53                                    const shared_ptr<TProtocolFactory>& protocolFactory)
54   : TServer(processor, serverTransport, transportFactory, protocolFactory),
55     clients_(0),
56     hwm_(0),
57     limit_(INT64_MAX) {
58 }
59 
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)60 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
61                                    const shared_ptr<TServerTransport>& serverTransport,
62                                    const shared_ptr<TTransportFactory>& inputTransportFactory,
63                                    const shared_ptr<TTransportFactory>& outputTransportFactory,
64                                    const shared_ptr<TProtocolFactory>& inputProtocolFactory,
65                                    const shared_ptr<TProtocolFactory>& outputProtocolFactory)
66   : TServer(processorFactory,
67             serverTransport,
68             inputTransportFactory,
69             outputTransportFactory,
70             inputProtocolFactory,
71             outputProtocolFactory),
72     clients_(0),
73     hwm_(0),
74     limit_(INT64_MAX) {
75 }
76 
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)77 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
78                                    const shared_ptr<TServerTransport>& serverTransport,
79                                    const shared_ptr<TTransportFactory>& inputTransportFactory,
80                                    const shared_ptr<TTransportFactory>& outputTransportFactory,
81                                    const shared_ptr<TProtocolFactory>& inputProtocolFactory,
82                                    const shared_ptr<TProtocolFactory>& outputProtocolFactory)
83   : TServer(processor,
84             serverTransport,
85             inputTransportFactory,
86             outputTransportFactory,
87             inputProtocolFactory,
88             outputProtocolFactory),
89     clients_(0),
90     hwm_(0),
91     limit_(INT64_MAX) {
92 }
93 
94 TServerFramework::~TServerFramework() = default;
95 
96 template <typename T>
releaseOneDescriptor(const string & name,T & pTransport)97 static void releaseOneDescriptor(const string& name, T& pTransport) {
98   if (pTransport) {
99     try {
100       pTransport->close();
101     } catch (const TTransportException& ttx) {
102       string errStr = string("TServerFramework " + name + " close failed: ") + ttx.what();
103       GlobalOutput(errStr.c_str());
104     }
105   }
106 }
107 
serve()108 void TServerFramework::serve() {
109   shared_ptr<TTransport> client;
110   shared_ptr<TTransport> inputTransport;
111   shared_ptr<TTransport> outputTransport;
112   shared_ptr<TProtocol> inputProtocol;
113   shared_ptr<TProtocol> outputProtocol;
114 
115   // Start the server listening
116   serverTransport_->listen();
117 
118   // Run the preServe event to indicate server is now listening
119   // and that it is safe to connect.
120   if (eventHandler_) {
121     eventHandler_->preServe();
122   }
123 
124   // Fetch client from server
125   for (;;) {
126     try {
127       // Dereference any resources from any previous client creation
128       // such that a blocking accept does not hold them indefinitely.
129       outputProtocol.reset();
130       inputProtocol.reset();
131       outputTransport.reset();
132       inputTransport.reset();
133       client.reset();
134 
135       // If we have reached the limit on the number of concurrent
136       // clients allowed, wait for one or more clients to drain before
137       // accepting another.
138       {
139         Synchronized sync(mon_);
140         while (clients_ >= limit_) {
141           mon_.wait();
142         }
143       }
144 
145       client = serverTransport_->accept();
146 
147       inputTransport = inputTransportFactory_->getTransport(client);
148       outputTransport = outputTransportFactory_->getTransport(client);
149       if (!outputProtocolFactory_) {
150         inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
151         outputProtocol = inputProtocol;
152       } else {
153         inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
154         outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
155       }
156 
157       newlyConnectedClient(shared_ptr<TConnectedClient>(
158           new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
159                                inputProtocol,
160                                outputProtocol,
161                                eventHandler_,
162                                client),
163           bind(&TServerFramework::disposeConnectedClient, this, std::placeholders::_1)));
164 
165     } catch (TTransportException& ttx) {
166       releaseOneDescriptor("inputTransport", inputTransport);
167       releaseOneDescriptor("outputTransport", outputTransport);
168       releaseOneDescriptor("client", client);
169       if (ttx.getType() == TTransportException::TIMED_OUT
170           || ttx.getType() == TTransportException::CLIENT_DISCONNECT) {
171         // Accept timeout and client disconnect - continue processing.
172         continue;
173       } else if (ttx.getType() == TTransportException::END_OF_FILE
174                  || ttx.getType() == TTransportException::INTERRUPTED) {
175         // Server was interrupted.  This only happens when stopping.
176         break;
177       } else {
178         // All other transport exceptions are logged.
179         // State of connection is unknown.  Done.
180         string errStr = string("TServerTransport died: ") + ttx.what();
181         GlobalOutput(errStr.c_str());
182         break;
183       }
184     }
185   }
186 
187   releaseOneDescriptor("serverTransport", serverTransport_);
188 }
189 
getConcurrentClientLimit() const190 int64_t TServerFramework::getConcurrentClientLimit() const {
191   Synchronized sync(mon_);
192   return limit_;
193 }
194 
getConcurrentClientCount() const195 int64_t TServerFramework::getConcurrentClientCount() const {
196   Synchronized sync(mon_);
197   return clients_;
198 }
199 
getConcurrentClientCountHWM() const200 int64_t TServerFramework::getConcurrentClientCountHWM() const {
201   Synchronized sync(mon_);
202   return hwm_;
203 }
204 
setConcurrentClientLimit(int64_t newLimit)205 void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
206   if (newLimit < 1) {
207     throw std::invalid_argument("newLimit must be greater than zero");
208   }
209   Synchronized sync(mon_);
210   limit_ = newLimit;
211   if (limit_ - clients_ > 0) {
212     mon_.notify();
213   }
214 }
215 
stop()216 void TServerFramework::stop() {
217   // Order is important because serve() releases serverTransport_ when it is
218   // interrupted, which closes the socket that interruptChildren uses.
219   serverTransport_->interruptChildren();
220   serverTransport_->interrupt();
221 }
222 
newlyConnectedClient(const shared_ptr<TConnectedClient> & pClient)223 void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient>& pClient) {
224   {
225     Synchronized sync(mon_);
226     ++clients_;
227     hwm_ = (std::max)(hwm_, clients_);
228   }
229 
230   onClientConnected(pClient);
231 }
232 
disposeConnectedClient(TConnectedClient * pClient)233 void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
234   onClientDisconnected(pClient);
235   delete pClient;
236 
237   Synchronized sync(mon_);
238   if (limit_ - --clients_ > 0) {
239     mon_.notify();
240   }
241 }
242 
243 }
244 }
245 } // apache::thrift::server
246