1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 using System;
21 using System.Collections.Generic;
22 using System.Threading;
23 using Thrift.Collections;
24 using Thrift.Protocol;
25 using Thrift.Transport;
26 
27 namespace Thrift.Server
28 {
29     /// <summary>
30     /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests.
31     /// </summary>
32     public class TThreadedServer : TServer
33     {
34         private const int DEFAULT_MAX_THREADS = 100;
35         private volatile bool stop = false;
36         private readonly int maxThreads;
37 
38         private Queue<TTransport> clientQueue;
39         private THashSet<Thread> clientThreads;
40         private object clientLock;
41         private Thread workerThread;
42 
43         public int ClientThreadsCount
44         {
45             get { return clientThreads.Count; }
46         }
47 
TThreadedServer(TProcessor processor, TServerTransport serverTransport)48         public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
49             : this(new TSingletonProcessorFactory(processor), serverTransport,
50              new TTransportFactory(), new TTransportFactory(),
51              new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
52              DEFAULT_MAX_THREADS, DefaultLogDelegate)
53         {
54         }
55 
TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)56         public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
57             : this(new TSingletonProcessorFactory(processor), serverTransport,
58              new TTransportFactory(), new TTransportFactory(),
59              new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
60              DEFAULT_MAX_THREADS, logDelegate)
61         {
62         }
63 
64 
TThreadedServer(TProcessor processor, TServerTransport serverTransport, TTransportFactory transportFactory, TProtocolFactory protocolFactory)65         public TThreadedServer(TProcessor processor,
66                      TServerTransport serverTransport,
67                      TTransportFactory transportFactory,
68                      TProtocolFactory protocolFactory)
69             : this(new TSingletonProcessorFactory(processor), serverTransport,
70              transportFactory, transportFactory,
71              protocolFactory, protocolFactory,
72              DEFAULT_MAX_THREADS, DefaultLogDelegate)
73         {
74         }
75 
TThreadedServer(TProcessorFactory processorFactory, TServerTransport serverTransport, TTransportFactory transportFactory, TProtocolFactory protocolFactory)76         public TThreadedServer(TProcessorFactory processorFactory,
77                TServerTransport serverTransport,
78                TTransportFactory transportFactory,
79                TProtocolFactory protocolFactory)
80             : this(processorFactory, serverTransport,
81              transportFactory, transportFactory,
82              protocolFactory, protocolFactory,
83              DEFAULT_MAX_THREADS, DefaultLogDelegate)
84         {
85         }
TThreadedServer(TProcessorFactory processorFactory, TServerTransport serverTransport, TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, int maxThreads, LogDelegate logDel)86         public TThreadedServer(TProcessorFactory processorFactory,
87                      TServerTransport serverTransport,
88                      TTransportFactory inputTransportFactory,
89                      TTransportFactory outputTransportFactory,
90                      TProtocolFactory inputProtocolFactory,
91                      TProtocolFactory outputProtocolFactory,
92                      int maxThreads, LogDelegate logDel)
93           : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
94               inputProtocolFactory, outputProtocolFactory, logDel)
95         {
96             this.maxThreads = maxThreads;
97             clientQueue = new Queue<TTransport>();
98             clientLock = new object();
99             clientThreads = new THashSet<Thread>();
100         }
101 
102         /// <summary>
103         /// Use new Thread for each new client connection. block until numConnections &lt; maxThreads.
104         /// </summary>
Serve()105         public override void Serve()
106         {
107             try
108             {
109                 //start worker thread
110                 workerThread = new Thread(new ThreadStart(Execute));
111                 workerThread.Start();
112                 serverTransport.Listen();
113             }
114             catch (TTransportException ttx)
115             {
116                 logDelegate("Error, could not listen on ServerTransport: " + ttx);
117                 return;
118             }
119 
120             //Fire the preServe server event when server is up but before any client connections
121             if (serverEventHandler != null)
122                 serverEventHandler.preServe();
123 
124             while (!stop)
125             {
126                 int failureCount = 0;
127                 try
128                 {
129                     TTransport client = serverTransport.Accept();
130                     lock (clientLock)
131                     {
132                         clientQueue.Enqueue(client);
133                         Monitor.Pulse(clientLock);
134                     }
135                 }
136                 catch (TTransportException ttx)
137                 {
138                     if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
139                     {
140                         ++failureCount;
141                         logDelegate(ttx.ToString());
142                     }
143 
144                 }
145             }
146 
147             if (stop)
148             {
149                 try
150                 {
151                     serverTransport.Close();
152                 }
153                 catch (TTransportException ttx)
154                 {
155                     logDelegate("TServeTransport failed on close: " + ttx.Message);
156                 }
157                 stop = false;
158             }
159         }
160 
161         /// <summary>
162         /// Loops on processing a client forever
163         /// </summary>
Execute()164         private void Execute()
165         {
166             while (!stop)
167             {
168                 TTransport client;
169                 Thread t;
170                 lock (clientLock)
171                 {
172                     //don't dequeue if too many connections
173                     while (clientThreads.Count >= maxThreads)
174                     {
175                         Monitor.Wait(clientLock);
176                     }
177 
178                     while (clientQueue.Count == 0)
179                     {
180                         Monitor.Wait(clientLock);
181                     }
182 
183                     client = clientQueue.Dequeue();
184                     t = new Thread(new ParameterizedThreadStart(ClientWorker));
185                     clientThreads.Add(t);
186                 }
187                 //start processing requests from client on new thread
188                 t.Start(client);
189             }
190         }
191 
ClientWorker(object context)192         private void ClientWorker(object context)
193         {
194             using (TTransport client = (TTransport)context)
195             {
196                 TProcessor processor = processorFactory.GetProcessor(client);
197                 TTransport inputTransport = null;
198                 TTransport outputTransport = null;
199                 TProtocol inputProtocol = null;
200                 TProtocol outputProtocol = null;
201                 object connectionContext = null;
202                 try
203                 {
204                     try
205                     {
206                         inputTransport = inputTransportFactory.GetTransport(client);
207                         outputTransport = outputTransportFactory.GetTransport(client);
208                         inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
209                         outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
210 
211                         //Recover event handler (if any) and fire createContext server event when a client connects
212                         if (serverEventHandler != null)
213                             connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
214 
215                         //Process client requests until client disconnects
216                         while (!stop)
217                         {
218                             if (!inputTransport.Peek())
219                                 break;
220 
221                             //Fire processContext server event
222                             //N.B. This is the pattern implemented in C++ and the event fires provisionally.
223                             //That is to say it may be many minutes between the event firing and the client request
224                             //actually arriving or the client may hang up without ever makeing a request.
225                             if (serverEventHandler != null)
226                                 serverEventHandler.processContext(connectionContext, inputTransport);
227                             //Process client request (blocks until transport is readable)
228                             if (!processor.Process(inputProtocol, outputProtocol))
229                                 break;
230                         }
231                     }
232                     catch (TTransportException)
233                     {
234                         //Usually a client disconnect, expected
235                     }
236                     catch (Exception x)
237                     {
238                         //Unexpected
239                         logDelegate("Error: " + x);
240                     }
241 
242                     //Fire deleteContext server event after client disconnects
243                     if (serverEventHandler != null)
244                         serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
245 
246                     lock (clientLock)
247                     {
248                         clientThreads.Remove(Thread.CurrentThread);
249                         Monitor.Pulse(clientLock);
250                     }
251 
252                 }
253                 finally
254                 {
255                     //Close transports
256                     if (inputTransport != null)
257                         inputTransport.Close();
258                     if (outputTransport != null)
259                         outputTransport.Close();
260 
261                     // disposable stuff should be disposed
262                     if (inputProtocol != null)
263                         inputProtocol.Dispose();
264                     if (outputProtocol != null)
265                         outputProtocol.Dispose();
266                 }
267             }
268         }
269 
Stop()270         public override void Stop()
271         {
272             stop = true;
273             serverTransport.Close();
274             //clean up all the threads myself
275             workerThread.Abort();
276             foreach (Thread t in clientThreads)
277             {
278                 t.Abort();
279             }
280         }
281     }
282 }
283