1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 use super::{ 19 TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, 20 TOutputProtocol, TSetIdentifier, TStructIdentifier, 21 }; 22 23 /// `TOutputProtocol` that prefixes the service name to all outgoing Thrift 24 /// messages. 25 /// 26 /// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services 27 /// send messages over a single I/O channel. By prefixing service identifiers 28 /// to outgoing messages receivers are able to demux them and route them to the 29 /// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor` 30 /// to process incoming messages, while other languages must use their 31 /// corresponding multiplexed processor implementations. 32 /// 33 /// For example, given a service `TestService` and a service call `test_call`, 34 /// this implementation would identify messages as originating from 35 /// `TestService:test_call`. 36 /// 37 /// # Examples 38 /// 39 /// Create and use a `TMultiplexedOutputProtocol`. 40 /// 41 /// ```no_run 42 /// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol}; 43 /// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol}; 44 /// use thrift::transport::TTcpChannel; 45 /// 46 /// let mut channel = TTcpChannel::new(); 47 /// channel.open("localhost:9090").unwrap(); 48 /// 49 /// let protocol = TBinaryOutputProtocol::new(channel, true); 50 /// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol); 51 /// 52 /// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1); 53 /// protocol.write_message_begin(&ident).unwrap(); 54 /// ``` 55 #[derive(Debug)] 56 pub struct TMultiplexedOutputProtocol<P> 57 where 58 P: TOutputProtocol, 59 { 60 service_name: String, 61 inner: P, 62 } 63 64 impl<P> TMultiplexedOutputProtocol<P> 65 where 66 P: TOutputProtocol, 67 { 68 /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages 69 /// as originating from a service named `service_name` and sends them over 70 /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent 71 /// by `wrapped`, not by this instance. new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P>72 pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> { 73 TMultiplexedOutputProtocol { 74 service_name: service_name.to_owned(), 75 inner: wrapped, 76 } 77 } 78 } 79 80 // FIXME: avoid passthrough methods 81 impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P> 82 where 83 P: TOutputProtocol, 84 { write_message_begin(&mut self, identifier: &TMessageIdentifier) -> crate::Result<()>85 fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> crate::Result<()> { 86 match identifier.message_type { 87 // FIXME: is there a better way to override identifier here? 88 TMessageType::Call | TMessageType::OneWay => { 89 let identifier = TMessageIdentifier { 90 name: format!("{}:{}", self.service_name, identifier.name), 91 ..*identifier 92 }; 93 self.inner.write_message_begin(&identifier) 94 } 95 _ => self.inner.write_message_begin(identifier), 96 } 97 } 98 write_message_end(&mut self) -> crate::Result<()>99 fn write_message_end(&mut self) -> crate::Result<()> { 100 self.inner.write_message_end() 101 } 102 write_struct_begin(&mut self, identifier: &TStructIdentifier) -> crate::Result<()>103 fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> crate::Result<()> { 104 self.inner.write_struct_begin(identifier) 105 } 106 write_struct_end(&mut self) -> crate::Result<()>107 fn write_struct_end(&mut self) -> crate::Result<()> { 108 self.inner.write_struct_end() 109 } 110 write_field_begin(&mut self, identifier: &TFieldIdentifier) -> crate::Result<()>111 fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> crate::Result<()> { 112 self.inner.write_field_begin(identifier) 113 } 114 write_field_end(&mut self) -> crate::Result<()>115 fn write_field_end(&mut self) -> crate::Result<()> { 116 self.inner.write_field_end() 117 } 118 write_field_stop(&mut self) -> crate::Result<()>119 fn write_field_stop(&mut self) -> crate::Result<()> { 120 self.inner.write_field_stop() 121 } 122 write_bytes(&mut self, b: &[u8]) -> crate::Result<()>123 fn write_bytes(&mut self, b: &[u8]) -> crate::Result<()> { 124 self.inner.write_bytes(b) 125 } 126 write_bool(&mut self, b: bool) -> crate::Result<()>127 fn write_bool(&mut self, b: bool) -> crate::Result<()> { 128 self.inner.write_bool(b) 129 } 130 write_i8(&mut self, i: i8) -> crate::Result<()>131 fn write_i8(&mut self, i: i8) -> crate::Result<()> { 132 self.inner.write_i8(i) 133 } 134 write_i16(&mut self, i: i16) -> crate::Result<()>135 fn write_i16(&mut self, i: i16) -> crate::Result<()> { 136 self.inner.write_i16(i) 137 } 138 write_i32(&mut self, i: i32) -> crate::Result<()>139 fn write_i32(&mut self, i: i32) -> crate::Result<()> { 140 self.inner.write_i32(i) 141 } 142 write_i64(&mut self, i: i64) -> crate::Result<()>143 fn write_i64(&mut self, i: i64) -> crate::Result<()> { 144 self.inner.write_i64(i) 145 } 146 write_double(&mut self, d: f64) -> crate::Result<()>147 fn write_double(&mut self, d: f64) -> crate::Result<()> { 148 self.inner.write_double(d) 149 } 150 write_string(&mut self, s: &str) -> crate::Result<()>151 fn write_string(&mut self, s: &str) -> crate::Result<()> { 152 self.inner.write_string(s) 153 } 154 write_list_begin(&mut self, identifier: &TListIdentifier) -> crate::Result<()>155 fn write_list_begin(&mut self, identifier: &TListIdentifier) -> crate::Result<()> { 156 self.inner.write_list_begin(identifier) 157 } 158 write_list_end(&mut self) -> crate::Result<()>159 fn write_list_end(&mut self) -> crate::Result<()> { 160 self.inner.write_list_end() 161 } 162 write_set_begin(&mut self, identifier: &TSetIdentifier) -> crate::Result<()>163 fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> crate::Result<()> { 164 self.inner.write_set_begin(identifier) 165 } 166 write_set_end(&mut self) -> crate::Result<()>167 fn write_set_end(&mut self) -> crate::Result<()> { 168 self.inner.write_set_end() 169 } 170 write_map_begin(&mut self, identifier: &TMapIdentifier) -> crate::Result<()>171 fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> crate::Result<()> { 172 self.inner.write_map_begin(identifier) 173 } 174 write_map_end(&mut self) -> crate::Result<()>175 fn write_map_end(&mut self) -> crate::Result<()> { 176 self.inner.write_map_end() 177 } 178 flush(&mut self) -> crate::Result<()>179 fn flush(&mut self) -> crate::Result<()> { 180 self.inner.flush() 181 } 182 183 // utility 184 // 185 write_byte(&mut self, b: u8) -> crate::Result<()>186 fn write_byte(&mut self, b: u8) -> crate::Result<()> { 187 self.inner.write_byte(b) 188 } 189 } 190 191 #[cfg(test)] 192 mod tests { 193 194 use crate::protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol}; 195 use crate::transport::{TBufferChannel, TIoChannel, WriteHalf}; 196 197 use super::*; 198 199 #[test] must_write_message_begin_with_prefixed_service_name()200 fn must_write_message_begin_with_prefixed_service_name() { 201 let mut o_prot = test_objects(); 202 203 let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2); 204 assert_success!(o_prot.write_message_begin(&ident)); 205 206 #[rustfmt::skip] 207 let expected: [u8; 19] = [ 208 0x80, 209 0x01, /* protocol identifier */ 210 0x00, 211 0x01, /* message type */ 212 0x00, 213 0x00, 214 0x00, 215 0x07, 216 0x66, 217 0x6F, 218 0x6F, /* "foo" */ 219 0x3A, /* ":" */ 220 0x62, 221 0x61, 222 0x72, /* "bar" */ 223 0x00, 224 0x00, 225 0x00, 226 0x02 /* sequence number */, 227 ]; 228 229 assert_eq!(o_prot.inner.transport.write_bytes(), expected); 230 } 231 test_objects() -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>232 fn test_objects() -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>> 233 { 234 let c = TBufferChannel::with_capacity(40, 40); 235 let (_, w_chan) = c.split().unwrap(); 236 let prot = TBinaryOutputProtocol::new(w_chan, true); 237 TMultiplexedOutputProtocol::new("foo", prot) 238 } 239 } 240