1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 use super::{
19     TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType,
20     TOutputProtocol, TSetIdentifier, TStructIdentifier,
21 };
22 
23 /// `TOutputProtocol` that prefixes the service name to all outgoing Thrift
24 /// messages.
25 ///
26 /// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services
27 /// send messages over a single I/O channel. By prefixing service identifiers
28 /// to outgoing messages receivers are able to demux them and route them to the
29 /// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor`
30 /// to process incoming messages, while other languages must use their
31 /// corresponding multiplexed processor implementations.
32 ///
33 /// For example, given a service `TestService` and a service call `test_call`,
34 /// this implementation would identify messages as originating from
35 /// `TestService:test_call`.
36 ///
37 /// # Examples
38 ///
39 /// Create and use a `TMultiplexedOutputProtocol`.
40 ///
41 /// ```no_run
42 /// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
43 /// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
44 /// use thrift::transport::TTcpChannel;
45 ///
46 /// let mut channel = TTcpChannel::new();
47 /// channel.open("localhost:9090").unwrap();
48 ///
49 /// let protocol = TBinaryOutputProtocol::new(channel, true);
50 /// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol);
51 ///
52 /// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
53 /// protocol.write_message_begin(&ident).unwrap();
54 /// ```
55 #[derive(Debug)]
56 pub struct TMultiplexedOutputProtocol<P>
57 where
58     P: TOutputProtocol,
59 {
60     service_name: String,
61     inner: P,
62 }
63 
64 impl<P> TMultiplexedOutputProtocol<P>
65 where
66     P: TOutputProtocol,
67 {
68     /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
69     /// as originating from a service named `service_name` and sends them over
70     /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
71     /// by `wrapped`, not by this instance.
new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P>72     pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> {
73         TMultiplexedOutputProtocol {
74             service_name: service_name.to_owned(),
75             inner: wrapped,
76         }
77     }
78 }
79 
80 // FIXME: avoid passthrough methods
81 impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P>
82 where
83     P: TOutputProtocol,
84 {
write_message_begin(&mut self, identifier: &TMessageIdentifier) -> crate::Result<()>85     fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> crate::Result<()> {
86         match identifier.message_type {
87             // FIXME: is there a better way to override identifier here?
88             TMessageType::Call | TMessageType::OneWay => {
89                 let identifier = TMessageIdentifier {
90                     name: format!("{}:{}", self.service_name, identifier.name),
91                     ..*identifier
92                 };
93                 self.inner.write_message_begin(&identifier)
94             }
95             _ => self.inner.write_message_begin(identifier),
96         }
97     }
98 
write_message_end(&mut self) -> crate::Result<()>99     fn write_message_end(&mut self) -> crate::Result<()> {
100         self.inner.write_message_end()
101     }
102 
write_struct_begin(&mut self, identifier: &TStructIdentifier) -> crate::Result<()>103     fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> crate::Result<()> {
104         self.inner.write_struct_begin(identifier)
105     }
106 
write_struct_end(&mut self) -> crate::Result<()>107     fn write_struct_end(&mut self) -> crate::Result<()> {
108         self.inner.write_struct_end()
109     }
110 
write_field_begin(&mut self, identifier: &TFieldIdentifier) -> crate::Result<()>111     fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> crate::Result<()> {
112         self.inner.write_field_begin(identifier)
113     }
114 
write_field_end(&mut self) -> crate::Result<()>115     fn write_field_end(&mut self) -> crate::Result<()> {
116         self.inner.write_field_end()
117     }
118 
write_field_stop(&mut self) -> crate::Result<()>119     fn write_field_stop(&mut self) -> crate::Result<()> {
120         self.inner.write_field_stop()
121     }
122 
write_bytes(&mut self, b: &[u8]) -> crate::Result<()>123     fn write_bytes(&mut self, b: &[u8]) -> crate::Result<()> {
124         self.inner.write_bytes(b)
125     }
126 
write_bool(&mut self, b: bool) -> crate::Result<()>127     fn write_bool(&mut self, b: bool) -> crate::Result<()> {
128         self.inner.write_bool(b)
129     }
130 
write_i8(&mut self, i: i8) -> crate::Result<()>131     fn write_i8(&mut self, i: i8) -> crate::Result<()> {
132         self.inner.write_i8(i)
133     }
134 
write_i16(&mut self, i: i16) -> crate::Result<()>135     fn write_i16(&mut self, i: i16) -> crate::Result<()> {
136         self.inner.write_i16(i)
137     }
138 
write_i32(&mut self, i: i32) -> crate::Result<()>139     fn write_i32(&mut self, i: i32) -> crate::Result<()> {
140         self.inner.write_i32(i)
141     }
142 
write_i64(&mut self, i: i64) -> crate::Result<()>143     fn write_i64(&mut self, i: i64) -> crate::Result<()> {
144         self.inner.write_i64(i)
145     }
146 
write_double(&mut self, d: f64) -> crate::Result<()>147     fn write_double(&mut self, d: f64) -> crate::Result<()> {
148         self.inner.write_double(d)
149     }
150 
write_string(&mut self, s: &str) -> crate::Result<()>151     fn write_string(&mut self, s: &str) -> crate::Result<()> {
152         self.inner.write_string(s)
153     }
154 
write_list_begin(&mut self, identifier: &TListIdentifier) -> crate::Result<()>155     fn write_list_begin(&mut self, identifier: &TListIdentifier) -> crate::Result<()> {
156         self.inner.write_list_begin(identifier)
157     }
158 
write_list_end(&mut self) -> crate::Result<()>159     fn write_list_end(&mut self) -> crate::Result<()> {
160         self.inner.write_list_end()
161     }
162 
write_set_begin(&mut self, identifier: &TSetIdentifier) -> crate::Result<()>163     fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> crate::Result<()> {
164         self.inner.write_set_begin(identifier)
165     }
166 
write_set_end(&mut self) -> crate::Result<()>167     fn write_set_end(&mut self) -> crate::Result<()> {
168         self.inner.write_set_end()
169     }
170 
write_map_begin(&mut self, identifier: &TMapIdentifier) -> crate::Result<()>171     fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> crate::Result<()> {
172         self.inner.write_map_begin(identifier)
173     }
174 
write_map_end(&mut self) -> crate::Result<()>175     fn write_map_end(&mut self) -> crate::Result<()> {
176         self.inner.write_map_end()
177     }
178 
flush(&mut self) -> crate::Result<()>179     fn flush(&mut self) -> crate::Result<()> {
180         self.inner.flush()
181     }
182 
183     // utility
184     //
185 
write_byte(&mut self, b: u8) -> crate::Result<()>186     fn write_byte(&mut self, b: u8) -> crate::Result<()> {
187         self.inner.write_byte(b)
188     }
189 }
190 
191 #[cfg(test)]
192 mod tests {
193 
194     use crate::protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
195     use crate::transport::{TBufferChannel, TIoChannel, WriteHalf};
196 
197     use super::*;
198 
199     #[test]
must_write_message_begin_with_prefixed_service_name()200     fn must_write_message_begin_with_prefixed_service_name() {
201         let mut o_prot = test_objects();
202 
203         let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
204         assert_success!(o_prot.write_message_begin(&ident));
205 
206         #[rustfmt::skip]
207         let expected: [u8; 19] = [
208             0x80,
209             0x01, /* protocol identifier */
210             0x00,
211             0x01, /* message type */
212             0x00,
213             0x00,
214             0x00,
215             0x07,
216             0x66,
217             0x6F,
218             0x6F, /* "foo" */
219             0x3A, /* ":" */
220             0x62,
221             0x61,
222             0x72, /* "bar" */
223             0x00,
224             0x00,
225             0x00,
226             0x02 /* sequence number */,
227         ];
228 
229         assert_eq!(o_prot.inner.transport.write_bytes(), expected);
230     }
231 
test_objects() -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>232     fn test_objects() -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>
233     {
234         let c = TBufferChannel::with_capacity(40, 40);
235         let (_, w_chan) = c.split().unwrap();
236         let prot = TBinaryOutputProtocol::new(w_chan, true);
237         TMultiplexedOutputProtocol::new("foo", prot)
238     }
239 }
240