1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 use log::warn;
19 
20 use std::net::{TcpListener, TcpStream, ToSocketAddrs};
21 use std::sync::Arc;
22 use threadpool::ThreadPool;
23 
24 use crate::protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
25 use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
26 use crate::{ApplicationError, ApplicationErrorKind};
27 
28 use super::TProcessor;
29 use crate::TransportErrorKind;
30 
31 /// Fixed-size thread-pool blocking Thrift server.
32 ///
33 /// A `TServer` listens on a given address and submits accepted connections
34 /// to an **unbounded** queue. Connections from this queue are serviced by
35 /// the first available worker thread from a **fixed-size** thread pool. Each
36 /// accepted connection is handled by that worker thread, and communication
37 /// over this thread occurs sequentially and synchronously (i.e. calls block).
38 /// Accepted connections have an input half and an output half, each of which
39 /// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
40 /// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
41 /// and `TTransport` may be used.
42 ///
43 /// # Examples
44 ///
45 /// Creating and running a `TServer` using Thrift-compiler-generated
46 /// service code.
47 ///
48 /// ```no_run
49 /// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
50 /// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
51 /// use thrift::protocol::{TInputProtocol, TOutputProtocol};
52 /// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
53 ///                         TReadTransportFactory, TWriteTransportFactory};
54 /// use thrift::server::{TProcessor, TServer};
55 ///
56 /// //
57 /// // auto-generated
58 /// //
59 ///
60 /// // processor for `SimpleService`
61 /// struct SimpleServiceSyncProcessor;
62 /// impl SimpleServiceSyncProcessor {
63 ///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
64 ///         unimplemented!();
65 ///     }
66 /// }
67 ///
68 /// // `TProcessor` implementation for `SimpleService`
69 /// impl TProcessor for SimpleServiceSyncProcessor {
70 ///     fn process(&self, i: &mut dyn TInputProtocol, o: &mut dyn TOutputProtocol) -> thrift::Result<()> {
71 ///         unimplemented!();
72 ///     }
73 /// }
74 ///
75 /// // service functions for SimpleService
76 /// trait SimpleServiceSyncHandler {
77 ///     fn service_call(&self) -> thrift::Result<()>;
78 /// }
79 ///
80 /// //
81 /// // user-code follows
82 /// //
83 ///
84 /// // define a handler that will be invoked when `service_call` is received
85 /// struct SimpleServiceHandlerImpl;
86 /// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
87 ///     fn service_call(&self) -> thrift::Result<()> {
88 ///         unimplemented!();
89 ///     }
90 /// }
91 ///
92 /// // instantiate the processor
93 /// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
94 ///
95 /// // instantiate the server
96 /// let i_tr_fact: Box<dyn TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
97 /// let i_pr_fact: Box<dyn TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
98 /// let o_tr_fact: Box<dyn TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
99 /// let o_pr_fact: Box<dyn TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
100 ///
101 /// let mut server = TServer::new(
102 ///     i_tr_fact,
103 ///     i_pr_fact,
104 ///     o_tr_fact,
105 ///     o_pr_fact,
106 ///     processor,
107 ///     10
108 /// );
109 ///
110 /// // start listening for incoming connections
111 /// match server.listen("127.0.0.1:8080") {
112 ///   Ok(_)  => println!("listen completed"),
113 ///   Err(e) => println!("listen failed with error {:?}", e),
114 /// }
115 /// ```
116 #[derive(Debug)]
117 pub struct TServer<PRC, RTF, IPF, WTF, OPF>
118 where
119     PRC: TProcessor + Send + Sync + 'static,
120     RTF: TReadTransportFactory + 'static,
121     IPF: TInputProtocolFactory + 'static,
122     WTF: TWriteTransportFactory + 'static,
123     OPF: TOutputProtocolFactory + 'static,
124 {
125     r_trans_factory: RTF,
126     i_proto_factory: IPF,
127     w_trans_factory: WTF,
128     o_proto_factory: OPF,
129     processor: Arc<PRC>,
130     worker_pool: ThreadPool,
131 }
132 
133 impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
134 where
135     PRC: TProcessor + Send + Sync + 'static,
136     RTF: TReadTransportFactory + 'static,
137     IPF: TInputProtocolFactory + 'static,
138     WTF: TWriteTransportFactory + 'static,
139     OPF: TOutputProtocolFactory + 'static,
140 {
141     /// Create a `TServer`.
142     ///
143     /// Each accepted connection has an input and output half, each of which
144     /// requires a `TTransport` and `TProtocol`. `TServer` uses
145     /// `read_transport_factory` and `input_protocol_factory` to create
146     /// implementations for the input, and `write_transport_factory` and
147     /// `output_protocol_factory` to create implementations for the output.
new( read_transport_factory: RTF, input_protocol_factory: IPF, write_transport_factory: WTF, output_protocol_factory: OPF, processor: PRC, num_workers: usize, ) -> TServer<PRC, RTF, IPF, WTF, OPF>148     pub fn new(
149         read_transport_factory: RTF,
150         input_protocol_factory: IPF,
151         write_transport_factory: WTF,
152         output_protocol_factory: OPF,
153         processor: PRC,
154         num_workers: usize,
155     ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
156         TServer {
157             r_trans_factory: read_transport_factory,
158             i_proto_factory: input_protocol_factory,
159             w_trans_factory: write_transport_factory,
160             o_proto_factory: output_protocol_factory,
161             processor: Arc::new(processor),
162             worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
163         }
164     }
165 
166     /// Listen for incoming connections on `listen_address`.
167     ///
168     /// `listen_address` should implement `ToSocketAddrs` trait.
169     ///
170     /// Return `()` if successful.
171     ///
172     /// Return `Err` when the server cannot bind to `listen_address` or there
173     /// is an unrecoverable error.
listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()>174     pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> {
175         let listener = TcpListener::bind(listen_address)?;
176         for stream in listener.incoming() {
177             match stream {
178                 Ok(s) => {
179                     let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
180                     let processor = self.processor.clone();
181                     self.worker_pool
182                         .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
183                 }
184                 Err(e) => {
185                     warn!("failed to accept remote connection with error {:?}", e);
186                 }
187             }
188         }
189 
190         Err(crate::Error::Application(ApplicationError {
191             kind: ApplicationErrorKind::Unknown,
192             message: "aborted listen loop".into(),
193         }))
194     }
195 
new_protocols_for_connection( &mut self, stream: TcpStream, ) -> crate::Result<(Box<dyn TInputProtocol + Send>, Box<dyn TOutputProtocol + Send>)>196     fn new_protocols_for_connection(
197         &mut self,
198         stream: TcpStream,
199     ) -> crate::Result<(Box<dyn TInputProtocol + Send>, Box<dyn TOutputProtocol + Send>)> {
200         // create the shared tcp stream
201         let channel = TTcpChannel::with_stream(stream);
202 
203         // split it into two - one to be owned by the
204         // input tran/proto and the other by the output
205         let (r_chan, w_chan) = channel.split()?;
206 
207         // input protocol and transport
208         let r_tran = self.r_trans_factory.create(Box::new(r_chan));
209         let i_prot = self.i_proto_factory.create(r_tran);
210 
211         // output protocol and transport
212         let w_tran = self.w_trans_factory.create(Box::new(w_chan));
213         let o_prot = self.o_proto_factory.create(w_tran);
214 
215         Ok((i_prot, o_prot))
216     }
217 }
218 
handle_incoming_connection<PRC>( processor: Arc<PRC>, i_prot: Box<dyn TInputProtocol>, o_prot: Box<dyn TOutputProtocol>, ) where PRC: TProcessor,219 fn handle_incoming_connection<PRC>(
220     processor: Arc<PRC>,
221     i_prot: Box<dyn TInputProtocol>,
222     o_prot: Box<dyn TOutputProtocol>,
223 ) where
224     PRC: TProcessor,
225 {
226     let mut i_prot = i_prot;
227     let mut o_prot = o_prot;
228     loop {
229         match processor.process(&mut *i_prot, &mut *o_prot) {
230             Ok(()) => {},
231             Err(err) => {
232                 match err {
233                     crate::Error::Transport(ref transport_err) if transport_err.kind == TransportErrorKind::EndOfFile => {},
234                     other => warn!("processor completed with error: {:?}", other),
235                 }
236                 break;
237             }
238         }
239     }
240 }
241