1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 use std::net::{TcpListener, TcpStream};
19 use std::sync::Arc;
20 use threadpool::ThreadPool;
21 
22 use {ApplicationError, ApplicationErrorKind};
23 use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
24 use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
25 
26 use super::TProcessor;
27 
28 /// Fixed-size thread-pool blocking Thrift server.
29 ///
30 /// A `TServer` listens on a given address and submits accepted connections
31 /// to an **unbounded** queue. Connections from this queue are serviced by
32 /// the first available worker thread from a **fixed-size** thread pool. Each
33 /// accepted connection is handled by that worker thread, and communication
34 /// over this thread occurs sequentially and synchronously (i.e. calls block).
35 /// Accepted connections have an input half and an output half, each of which
36 /// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
37 /// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
38 /// and `TTransport` may be used.
39 ///
40 /// # Examples
41 ///
42 /// Creating and running a `TServer` using Thrift-compiler-generated
43 /// service code.
44 ///
45 /// ```no_run
46 /// use thrift;
47 /// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
48 /// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
49 /// use thrift::protocol::{TInputProtocol, TOutputProtocol};
50 /// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
51 ///                         TReadTransportFactory, TWriteTransportFactory};
52 /// use thrift::server::{TProcessor, TServer};
53 ///
54 /// //
55 /// // auto-generated
56 /// //
57 ///
58 /// // processor for `SimpleService`
59 /// struct SimpleServiceSyncProcessor;
60 /// impl SimpleServiceSyncProcessor {
61 ///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
62 ///         unimplemented!();
63 ///     }
64 /// }
65 ///
66 /// // `TProcessor` implementation for `SimpleService`
67 /// impl TProcessor for SimpleServiceSyncProcessor {
68 ///     fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
69 ///         unimplemented!();
70 ///     }
71 /// }
72 ///
73 /// // service functions for SimpleService
74 /// trait SimpleServiceSyncHandler {
75 ///     fn service_call(&self) -> thrift::Result<()>;
76 /// }
77 ///
78 /// //
79 /// // user-code follows
80 /// //
81 ///
82 /// // define a handler that will be invoked when `service_call` is received
83 /// struct SimpleServiceHandlerImpl;
84 /// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
85 ///     fn service_call(&self) -> thrift::Result<()> {
86 ///         unimplemented!();
87 ///     }
88 /// }
89 ///
90 /// // instantiate the processor
91 /// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
92 ///
93 /// // instantiate the server
94 /// let i_tr_fact: Box<TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
95 /// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
96 /// let o_tr_fact: Box<TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
97 /// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
98 ///
99 /// let mut server = TServer::new(
100 ///     i_tr_fact,
101 ///     i_pr_fact,
102 ///     o_tr_fact,
103 ///     o_pr_fact,
104 ///     processor,
105 ///     10
106 /// );
107 ///
108 /// // start listening for incoming connections
109 /// match server.listen("127.0.0.1:8080") {
110 ///   Ok(_)  => println!("listen completed"),
111 ///   Err(e) => println!("listen failed with error {:?}", e),
112 /// }
113 /// ```
114 #[derive(Debug)]
115 pub struct TServer<PRC, RTF, IPF, WTF, OPF>
116 where
117     PRC: TProcessor + Send + Sync + 'static,
118     RTF: TReadTransportFactory + 'static,
119     IPF: TInputProtocolFactory + 'static,
120     WTF: TWriteTransportFactory + 'static,
121     OPF: TOutputProtocolFactory + 'static,
122 {
123     r_trans_factory: RTF,
124     i_proto_factory: IPF,
125     w_trans_factory: WTF,
126     o_proto_factory: OPF,
127     processor: Arc<PRC>,
128     worker_pool: ThreadPool,
129 }
130 
131 impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
132     where PRC: TProcessor + Send + Sync + 'static,
133           RTF: TReadTransportFactory + 'static,
134           IPF: TInputProtocolFactory + 'static,
135           WTF: TWriteTransportFactory + 'static,
136           OPF: TOutputProtocolFactory + 'static {
137     /// Create a `TServer`.
138     ///
139     /// Each accepted connection has an input and output half, each of which
140     /// requires a `TTransport` and `TProtocol`. `TServer` uses
141     /// `read_transport_factory` and `input_protocol_factory` to create
142     /// implementations for the input, and `write_transport_factory` and
143     /// `output_protocol_factory` to create implementations for the output.
new( read_transport_factory: RTF, input_protocol_factory: IPF, write_transport_factory: WTF, output_protocol_factory: OPF, processor: PRC, num_workers: usize, ) -> TServer<PRC, RTF, IPF, WTF, OPF>144     pub fn new(
145         read_transport_factory: RTF,
146         input_protocol_factory: IPF,
147         write_transport_factory: WTF,
148         output_protocol_factory: OPF,
149         processor: PRC,
150         num_workers: usize,
151     ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
152         TServer {
153             r_trans_factory: read_transport_factory,
154             i_proto_factory: input_protocol_factory,
155             w_trans_factory: write_transport_factory,
156             o_proto_factory: output_protocol_factory,
157             processor: Arc::new(processor),
158             worker_pool: ThreadPool::with_name(
159                 "Thrift service processor".to_owned(),
160                 num_workers,
161             ),
162         }
163     }
164 
165     /// Listen for incoming connections on `listen_address`.
166     ///
167     /// `listen_address` should be in the form `host:port`,
168     /// for example: `127.0.0.1:8080`.
169     ///
170     /// Return `()` if successful.
171     ///
172     /// Return `Err` when the server cannot bind to `listen_address` or there
173     /// is an unrecoverable error.
listen(&mut self, listen_address: &str) -> ::Result<()>174     pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
175         let listener = TcpListener::bind(listen_address)?;
176         for stream in listener.incoming() {
177             match stream {
178                 Ok(s) => {
179                     let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
180                     let processor = self.processor.clone();
181                     self.worker_pool
182                         .execute(move || handle_incoming_connection(processor, i_prot, o_prot),);
183                 }
184                 Err(e) => {
185                     warn!("failed to accept remote connection with error {:?}", e);
186                 }
187             }
188         }
189 
190         Err(
191             ::Error::Application(
192                 ApplicationError {
193                     kind: ApplicationErrorKind::Unknown,
194                     message: "aborted listen loop".into(),
195                 },
196             ),
197         )
198     }
199 
200 
new_protocols_for_connection( &mut self, stream: TcpStream, ) -> ::Result<(Box<TInputProtocol + Send>, Box<TOutputProtocol + Send>)>201     fn new_protocols_for_connection(
202         &mut self,
203         stream: TcpStream,
204     ) -> ::Result<(Box<TInputProtocol + Send>, Box<TOutputProtocol + Send>)> {
205         // create the shared tcp stream
206         let channel = TTcpChannel::with_stream(stream);
207 
208         // split it into two - one to be owned by the
209         // input tran/proto and the other by the output
210         let (r_chan, w_chan) = channel.split()?;
211 
212         // input protocol and transport
213         let r_tran = self.r_trans_factory.create(Box::new(r_chan));
214         let i_prot = self.i_proto_factory.create(r_tran);
215 
216         // output protocol and transport
217         let w_tran = self.w_trans_factory.create(Box::new(w_chan));
218         let o_prot = self.o_proto_factory.create(w_tran);
219 
220         Ok((i_prot, o_prot))
221     }
222 }
223 
handle_incoming_connection<PRC>( processor: Arc<PRC>, i_prot: Box<TInputProtocol>, o_prot: Box<TOutputProtocol>, ) where PRC: TProcessor,224 fn handle_incoming_connection<PRC>(
225     processor: Arc<PRC>,
226     i_prot: Box<TInputProtocol>,
227     o_prot: Box<TOutputProtocol>,
228 ) where
229     PRC: TProcessor,
230 {
231     let mut i_prot = i_prot;
232     let mut o_prot = o_prot;
233     loop {
234         let r = processor.process(&mut *i_prot, &mut *o_prot);
235         if let Err(e) = r {
236             warn!("processor completed with error: {:?}", e);
237             break;
238         }
239     }
240 }
241