1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 using System; 21 using System.Collections.Generic; 22 using System.Threading; 23 using Thrift.Collections; 24 using Thrift.Protocol; 25 using Thrift.Transport; 26 27 namespace Thrift.Server 28 { 29 /// <summary> 30 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests. 31 /// </summary> 32 public class TThreadedServer : TServer 33 { 34 private const int DEFAULT_MAX_THREADS = 100; 35 private volatile bool stop = false; 36 private readonly int maxThreads; 37 38 private Queue<TTransport> clientQueue; 39 private THashSet<Thread> clientThreads; 40 private object clientLock; 41 private Thread workerThread; 42 43 public int ClientThreadsCount 44 { 45 get { return clientThreads.Count; } 46 } 47 TThreadedServer(TProcessor processor, TServerTransport serverTransport)48 public TThreadedServer(TProcessor processor, TServerTransport serverTransport) 49 : this(new TSingletonProcessorFactory(processor), serverTransport, 50 new TTransportFactory(), new TTransportFactory(), 51 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), 52 DEFAULT_MAX_THREADS, DefaultLogDelegate) 53 { 54 } 55 TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)56 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate) 57 : this(new TSingletonProcessorFactory(processor), serverTransport, 58 new TTransportFactory(), new TTransportFactory(), 59 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), 60 DEFAULT_MAX_THREADS, logDelegate) 61 { 62 } 63 64 TThreadedServer(TProcessor processor, TServerTransport serverTransport, TTransportFactory transportFactory, TProtocolFactory protocolFactory)65 public TThreadedServer(TProcessor processor, 66 TServerTransport serverTransport, 67 TTransportFactory transportFactory, 68 TProtocolFactory protocolFactory) 69 : this(new TSingletonProcessorFactory(processor), serverTransport, 70 transportFactory, transportFactory, 71 protocolFactory, protocolFactory, 72 DEFAULT_MAX_THREADS, DefaultLogDelegate) 73 { 74 } 75 TThreadedServer(TProcessorFactory processorFactory, TServerTransport serverTransport, TTransportFactory transportFactory, TProtocolFactory protocolFactory)76 public TThreadedServer(TProcessorFactory processorFactory, 77 TServerTransport serverTransport, 78 TTransportFactory transportFactory, 79 TProtocolFactory protocolFactory) 80 : this(processorFactory, serverTransport, 81 transportFactory, transportFactory, 82 protocolFactory, protocolFactory, 83 DEFAULT_MAX_THREADS, DefaultLogDelegate) 84 { 85 } TThreadedServer(TProcessorFactory processorFactory, TServerTransport serverTransport, TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, int maxThreads, LogDelegate logDel)86 public TThreadedServer(TProcessorFactory processorFactory, 87 TServerTransport serverTransport, 88 TTransportFactory inputTransportFactory, 89 TTransportFactory outputTransportFactory, 90 TProtocolFactory inputProtocolFactory, 91 TProtocolFactory outputProtocolFactory, 92 int maxThreads, LogDelegate logDel) 93 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, 94 inputProtocolFactory, outputProtocolFactory, logDel) 95 { 96 this.maxThreads = maxThreads; 97 clientQueue = new Queue<TTransport>(); 98 clientLock = new object(); 99 clientThreads = new THashSet<Thread>(); 100 } 101 102 /// <summary> 103 /// Use new Thread for each new client connection. block until numConnections < maxThreads. 104 /// </summary> Serve()105 public override void Serve() 106 { 107 try 108 { 109 //start worker thread 110 workerThread = new Thread(new ThreadStart(Execute)); 111 workerThread.Start(); 112 serverTransport.Listen(); 113 } 114 catch (TTransportException ttx) 115 { 116 logDelegate("Error, could not listen on ServerTransport: " + ttx); 117 return; 118 } 119 120 //Fire the preServe server event when server is up but before any client connections 121 if (serverEventHandler != null) 122 serverEventHandler.preServe(); 123 124 while (!stop) 125 { 126 int failureCount = 0; 127 try 128 { 129 TTransport client = serverTransport.Accept(); 130 lock (clientLock) 131 { 132 clientQueue.Enqueue(client); 133 Monitor.Pulse(clientLock); 134 } 135 } 136 catch (TTransportException ttx) 137 { 138 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted) 139 { 140 ++failureCount; 141 logDelegate(ttx.ToString()); 142 } 143 144 } 145 } 146 147 if (stop) 148 { 149 try 150 { 151 serverTransport.Close(); 152 } 153 catch (TTransportException ttx) 154 { 155 logDelegate("TServeTransport failed on close: " + ttx.Message); 156 } 157 stop = false; 158 } 159 } 160 161 /// <summary> 162 /// Loops on processing a client forever 163 /// </summary> Execute()164 private void Execute() 165 { 166 while (!stop) 167 { 168 TTransport client; 169 Thread t; 170 lock (clientLock) 171 { 172 //don't dequeue if too many connections 173 while (clientThreads.Count >= maxThreads) 174 { 175 Monitor.Wait(clientLock); 176 } 177 178 while (clientQueue.Count == 0) 179 { 180 Monitor.Wait(clientLock); 181 } 182 183 client = clientQueue.Dequeue(); 184 t = new Thread(new ParameterizedThreadStart(ClientWorker)); 185 clientThreads.Add(t); 186 } 187 //start processing requests from client on new thread 188 t.Start(client); 189 } 190 } 191 ClientWorker(object context)192 private void ClientWorker(object context) 193 { 194 using (TTransport client = (TTransport)context) 195 { 196 TProcessor processor = processorFactory.GetProcessor(client); 197 TTransport inputTransport = null; 198 TTransport outputTransport = null; 199 TProtocol inputProtocol = null; 200 TProtocol outputProtocol = null; 201 object connectionContext = null; 202 try 203 { 204 try 205 { 206 inputTransport = inputTransportFactory.GetTransport(client); 207 outputTransport = outputTransportFactory.GetTransport(client); 208 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport); 209 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport); 210 211 //Recover event handler (if any) and fire createContext server event when a client connects 212 if (serverEventHandler != null) 213 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol); 214 215 //Process client requests until client disconnects 216 while (!stop) 217 { 218 if (!inputTransport.Peek()) 219 break; 220 221 //Fire processContext server event 222 //N.B. This is the pattern implemented in C++ and the event fires provisionally. 223 //That is to say it may be many minutes between the event firing and the client request 224 //actually arriving or the client may hang up without ever makeing a request. 225 if (serverEventHandler != null) 226 serverEventHandler.processContext(connectionContext, inputTransport); 227 //Process client request (blocks until transport is readable) 228 if (!processor.Process(inputProtocol, outputProtocol)) 229 break; 230 } 231 } 232 catch (TTransportException) 233 { 234 //Usually a client disconnect, expected 235 } 236 catch (Exception x) 237 { 238 //Unexpected 239 logDelegate("Error: " + x); 240 } 241 242 //Fire deleteContext server event after client disconnects 243 if (serverEventHandler != null) 244 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); 245 246 lock (clientLock) 247 { 248 clientThreads.Remove(Thread.CurrentThread); 249 Monitor.Pulse(clientLock); 250 } 251 252 } 253 finally 254 { 255 //Close transports 256 if (inputTransport != null) 257 inputTransport.Close(); 258 if (outputTransport != null) 259 outputTransport.Close(); 260 261 // disposable stuff should be disposed 262 if (inputProtocol != null) 263 inputProtocol.Dispose(); 264 if (outputProtocol != null) 265 outputProtocol.Dispose(); 266 } 267 } 268 } 269 Stop()270 public override void Stop() 271 { 272 stop = true; 273 serverTransport.Close(); 274 //clean up all the threads myself 275 workerThread.Abort(); 276 foreach (Thread t in clientThreads) 277 { 278 t.Abort(); 279 } 280 } 281 } 282 } 283