1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_
21 #define _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ 1
22 
23 // for inet_ntop --
24 #include <arpa/inet.h>
25 #include <thrift/server/TServer.h>
26 #include <thrift/transport/TSocket.h>
27 #include <thrift/concurrency/Mutex.h>
28 
29 namespace apache { namespace thrift { namespace server {
30 
31 using namespace apache::thrift;
32 using namespace apache::thrift::transport;
33 using namespace apache::thrift::concurrency;
34 using boost::shared_ptr;
35 using std::string;
36 using std::vector;
37 
38 /**
39  * StableVector -- a minimal vector class where growth is automatic and
40  * vector elements never move as the vector grows.  Allocates new space
41  * as needed, but does not copy old values.
42  *
43  * A level vector stores a list of storage vectors containing the actual
44  * elements.  Levels are added as needed, doubling in size each time.
45  * Locking is only done when a level is added.  Access is amortized
46  * constant time.
47  */
48 template <typename T>
49 class StableVector {
50   /// The initial allocation as an exponent of 2
51   static const uint32_t kInitialSizePowOf2 = 10;
52   /// The initial allocation size
53   static const uint32_t kInitialVectorSize = 1 << kInitialSizePowOf2;
54   /// This bound is guaranteed not to be exceeded on 64-bit archs
55   static const int kMaxLevels = 64;
56 
57   /// Values are kept in one or more of these
58   typedef vector<T> Vect;
59   /// One or more value vectors are kept in one of these
60   typedef vector<Vect*> LevelVector;
61 
62   Mutex mutex_;
63   /// current size
64   size_t size_;
65   _Atomic_word vectLvl_;
66   LevelVector vects_;
67 
68  public:
69   /**
70    * Constructor -- initialize the level vector and allocate the
71    * initial storage vector
72    */
StableVector()73   StableVector()
74     : size_(0)
75     , vectLvl_(0) {
76     vects_.reserve(kMaxLevels);
77     Vect* storageVector(new Vect(1 << kInitialSizePowOf2));
78     vects_.push_back(storageVector);
79   }
80 
81  private:
82   /**
83    * make sure the requested number of storage levels have been allocated.
84    */
expand(uint32_t level)85   void expand(uint32_t level) {
86     // we need the guard to insure that we only allocate once.
87     Guard g(mutex_);
88     while (level > vectLvl_) {
89       Vect* levelVect(new Vect(1 << (vectLvl_ + kInitialSizePowOf2)));
90       vects_.push_back(levelVect);
91       // we need to make sure this is done after levelVect is inserted
92       // (what we want is effectively a memory barrier here).
93       __gnu_cxx::__atomic_add(&vectLvl_, 1);
94     }
95   }
96 
97   /**
98    * Given an index, determine which level and element of that level is
99    * required.  Grows if needed.
100    */
which(uint32_t n,uint32_t * vno,uint32_t * idx)101   void which(uint32_t n, uint32_t* vno, uint32_t* idx) {
102     if (n >= size_) {
103       size_ = n + 1;
104     }
105     if (n < kInitialVectorSize) {
106       *idx = n;
107       *vno = 0;
108     } else {
109       uint32_t upper = n >> kInitialSizePowOf2;
110       *vno = CHAR_BIT*sizeof(upper) - __builtin_clz(upper);
111       *idx = n - (1 << (*vno + kInitialSizePowOf2 - 1));
112       if (*vno > vectLvl_) {
113         expand(*vno);
114       }
115     }
116   }
117 
118  public:
119   /**
120    * Given an index, return a reference to that element, perhaps after
121    * allocating additional space.
122    *
123    * @param n a positive integer
124    */
125   T& operator[](uint32_t n) {
126     uint32_t vno;
127     uint32_t idx;
128     which(n, &vno, &idx);
129     return (*vects_[vno])[idx];
130   }
131 
132   /**
133    * Return the present size of the vector.
134    */
size()135   size_t size() const { return size_; }
136 };
137 
138 
139 /**
140  * This class embodies the representation of a single connection during
141  * processing.  We'll keep one of these per file descriptor in TClientInfo.
142  */
143 class TClientInfoConnection {
144  public:
145   const static int kNameLen = 32;
146 
147  private:
148   typedef union IPAddrUnion {
149     sockaddr_in ipv4;
150     sockaddr_in6 ipv6;
151   };
152 
153   char call_[kNameLen];            ///< The name of the thrift call
154   IPAddrUnion addr_;               ///< The client's IP address
155   timespec time_;                  ///< Time processing started
156   uint64_t ncalls_;                ///< # of calls processed
157 
158  public:
159   /**
160    * Constructor; insure that no client address or thrift call name is
161    * represented.
162    */
163   TClientInfoConnection();
164 
165   /**
166    * A connection has been made; record its address.  Since this is the
167    * first we'll know of a connection we start the timer here as well.
168    */
169   void recordAddr(const sockaddr* addr);
170 
171   /**
172    * Mark the address as empty/unknown.
173    */
174   void eraseAddr();
175 
176   /**
177    * Return a string representing the present address, or NULL if none.
178    * Copies the string into the buffer provided.
179    */
180   const char* getAddr(char* buf, int len) const;
181 
182   /**
183    * A call has been made on this connection; record its name.  Since this is
184    * called for every thrift call processed, we also do our call count here.
185    */
186   void recordCall(const char* name);
187 
188   /**
189    * Invoked when processing has ended to clear the call name.
190    */
191   void eraseCall();
192 
193   /**
194    * Return as string the thrift call either currently being processed or
195    * most recently processed if the connection is still open for additional
196    * calls.  Returns NULL if a call hasn't been made yet or processing
197    * has ended.
198    */
199   const char* getCall() const;
200 
201   /**
202    * Get the timespec for the start of this connection (specifically, when
203    * recordAddr() was first called).
204    */
205   void getTime(timespec* time) const;
206 
207   /**
208    * Return the number of calls made on this connection.
209    */
210   uint64_t getNCalls() const;
211 
212  private:
213   void initTime();
214 };
215 
216 
217 /**
218  * Store for info about a server's clients -- specifically, the client's IP
219  * address and the call it is executing.  This information is indexed by
220  * socket file descriptor and in the present implementation is updated
221  * asynchronously, so it may only approximate reality.
222  */
223 class TClientInfo {
224  private:
225   StableVector<TClientInfoConnection> info_;
226 
227  public:
228   /**
229    * Return the info object for a given file descriptor.  If "grow" is true
230    * extend the info vector if required (such as for a file descriptor not seen
231    * before).  If "grow" is false and the info vector isn't large enough,
232    * or if "fd" is negative, return NULL.
233    */
234   TClientInfoConnection* getConnection(int fd, bool grow);
235 
236   size_t size() const;
237 };
238 
239 /**
240  * This derivation of TServerEventHandler encapsulates the main status vector
241  * and provides context to the server's processing loop via overrides.
242  * Together with TClientInfoCallHandler (derived from TProcessorEventHandler)
243  * it integrates client info collection into the server.
244  */
245 class TClientInfoServerHandler : public TServerEventHandler {
246  private:
247   TClientInfo clientInfo_;
248 
249  public:
250   /**
251    * One of these is constructed for each open connection/descriptor and links
252    * to both the status vector (clientInfo_) and that descriptor's entry
253    * within it.
254    */
255   struct Connect {
256     TClientInfo* clientInfo_;
257     TClientInfoConnection* callInfo_;
258 
ConnectConnect259     explicit Connect(TClientInfo* clientInfo)
260       : clientInfo_(clientInfo)
261       , callInfo_(NULL) {
262     }
263   };
264 
265   /**
266    * Generate processor context; we don't know what descriptor we belong to
267    * yet -- we'll get hooked up in contextProcess().
268    */
269   void* createContext(boost::shared_ptr<TProtocol> input,
270                       boost::shared_ptr<TProtocol> output);
271 
272   /**
273    * Mark our slot as unused and delete the context created in createContext().
274    */
275   void deleteContext(void* processorContext,
276                      boost::shared_ptr<TProtocol> input,
277                      boost::shared_ptr<TProtocol> output);
278 
279   /**
280    * Called in the processing loop just before the server invokes the
281    * processor itself, on the first call we establish which descriptor
282    * we correspond to and set it to that socket's peer IP address.  This
283    * also has the side effect of initializing call counting and connection
284    * timing.  We won't know which call we're handling until the handler
285    * first gets called in TClientInfoCallHandler::getContext().
286    */
287   void processContext(void* processorContext,
288                       shared_ptr<TTransport> transport);
289 
290   /**
291    * Get status report for server in the form of a vector of strings.
292    * Each active client appears as one string in the format:
293    *
294    *     FD IPADDR CALLNAME DURATION NCALLS
295    *
296    * where "FD" is the file descriptor for the client's socket, "IPADDR"
297    * is the IP address (as reported by accept()), "CALLNAME" is the
298    * current or most recent Thrift function name, "DURATION" is the
299    * duration of the connection, while NCALLS is the number of Thrift
300    * calls made since the connection was made.  A single space separates
301    * fields.
302    */
303   void getStatsStrings(vector<string>& result);
304 };
305 
306 /**
307  * This class derives from TProcessorEventHandler to gain access to the
308  * function name for the current Thrift call.  We need two versions of
309  * this -- TClientInfoCallStatsHandler is the other -- since in the latter
310  * case we pass through to TFunctionStatHandler to perform Thrift call
311  * stats.
312  */
313 class TClientInfoCallHandler : public TProcessorEventHandler {
314  public:
315   virtual void* getContext(const char* fn_name, void* serverContext);
316 };
317 
318 } } } // namespace apache::thrift::server
319 
320 #endif // !_FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_
321