1 // Copyright (c) 2015 Y. T. Chung <zonyitoo@gmail.com> 2 // Licensed under the Apache License, Version 2.0 3 // <LICENSE-APACHE or 4 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT 5 // license <LICENSE-MIT or http://opensource.org/licenses/MIT>, 6 // at your option. All files in the project carrying such 7 // notice may not be copied, modified, or distributed except 8 // according to those terms. 9 10 //! This module is for serializing binary packet 11 //! 12 //! The protocol specification is defined in 13 //! [BinaryProtocolRevamped](https://code.google.com/p/memcached/wiki/BinaryProtocolRevamped) 14 //! 15 // General format of a packet: 16 // 17 // Byte/ 0 | 1 | 2 | 3 | 18 // / | | | | 19 // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| 20 // +---------------+---------------+---------------+---------------+ 21 // 0/ HEADER / 22 // / / 23 // / / 24 // / / 25 // +---------------+---------------+---------------+---------------+ 26 // 24/ COMMAND-SPECIFIC EXTRAS (as needed) / 27 // +/ (note length in the extras length header field) / 28 // +---------------+---------------+---------------+---------------+ 29 // m/ Key (as needed) / 30 // +/ (note length in key length header field) / 31 // +---------------+---------------+---------------+---------------+ 32 // n/ Value (as needed) / 33 // +/ (note length is total body length header field, minus / 34 // +/ sum of the extras and key length body fields) / 35 // +---------------+---------------+---------------+---------------+ 36 // Total 24 bytes 37 38 #![allow(dead_code)] 39 #![allow(clippy::too_many_arguments)] 40 41 use std::io::{self, Read, Write}; 42 43 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; 44 45 #[rustfmt::skip] 46 mod consts { 47 pub const MAGIC_REQUEST: u8 = 0x80; 48 pub const MAGIC_RESPONSE: u8 = 0x81; 49 50 pub const STATUS_NO_ERROR: u16 = 0x0000; 51 pub const STATUS_KEY_NOT_FOUND: u16 = 0x0001; 52 pub const STATUS_KEY_EXISTS: u16 = 0x0002; 53 pub const STATUS_VALUE_TOO_LARGE: u16 = 0x0003; 54 pub const STATUS_INVALID_ARGUMENTS: u16 = 0x0004; 55 pub const STATUS_ITEM_NOT_STORED: u16 = 0x0005; 56 pub const STATUS_INCR_OR_DECR_ON_NON_NUMERIC_VALUE: u16 = 0x0006; 57 pub const STATUS_VBUCKET_BELONGS_TO_OTHER_SERVER: u16 = 0x0007; 58 pub const STATUS_AUTHENTICATION_ERROR: u16 = 0x0008; 59 pub const STATUS_AUTHENTICATION_CONTINUE: u16 = 0x0009; 60 pub const STATUS_UNKNOWN_COMMAND: u16 = 0x0081; 61 pub const STATUS_OUT_OF_MEMORY: u16 = 0x0082; 62 pub const STATUS_NOT_SUPPORTED: u16 = 0x0083; 63 pub const STATUS_INTERNAL_ERROR: u16 = 0x0084; 64 pub const STATUS_BUSY: u16 = 0x0085; 65 pub const STATUS_TEMPORARY_FAILURE: u16 = 0x0086; 66 pub const STATUS_AUTHENTICATION_REQUIRED: u16 = 0x0020; 67 pub const STATUS_AUTHENTICATION_FURTHER_STEP_REQUIRED: u16 = 0x0021; 68 69 pub const OPCODE_GET: u8 = 0x00; 70 pub const OPCODE_SET: u8 = 0x01; 71 pub const OPCODE_ADD: u8 = 0x02; 72 pub const OPCODE_REPLACE: u8 = 0x03; 73 pub const OPCODE_DEL: u8 = 0x04; 74 pub const OPCODE_INCR: u8 = 0x05; 75 pub const OPCODE_DECR: u8 = 0x06; 76 pub const OPCODE_QUIT: u8 = 0x07; 77 pub const OPCODE_FLUSH: u8 = 0x08; 78 pub const OPCODE_GETQ: u8 = 0x09; 79 pub const OPCODE_NOP: u8 = 0x0A; 80 pub const OPCODE_VERSION: u8 = 0x0B; 81 pub const OPCODE_GETK: u8 = 0x0C; 82 pub const OPCODE_GETKQ: u8 = 0x0D; 83 pub const OPCODE_APPEND: u8 = 0x0E; 84 pub const OPCODE_PREPEND: u8 = 0x0F; 85 pub const OPCODE_STAT: u8 = 0x10; 86 pub const OPCODE_SETQ: u8 = 0x11; 87 pub const OPCODE_ADDQ: u8 = 0x12; 88 pub const OPCODE_REPLACEQ: u8 = 0x13; 89 pub const OPCODE_DELQ: u8 = 0x14; 90 pub const OPCODE_INCRQ: u8 = 0x15; 91 pub const OPCODE_DECRQ: u8 = 0x16; 92 pub const OPCODE_QUITQ: u8 = 0x17; 93 pub const OPCODE_FLUSHQ: u8 = 0x18; 94 pub const OPCODE_APPENDQ: u8 = 0x19; 95 pub const OPCODE_PREPENDQ: u8 = 0x1A; 96 pub const OPCODE_VERBOSITY: u8 = 0x1B; 97 pub const OPCODE_TOUCH: u8 = 0x1C; 98 pub const OPCODE_GAT: u8 = 0x1D; 99 pub const OPCODE_GATQ: u8 = 0x1E; 100 pub const OPCODE_SASL_LIST_MECHS: u8 = 0x20; 101 pub const OPCODE_SASL_AUTH: u8 = 0x21; 102 pub const OPCODE_SASL_STEP: u8 = 0x22; 103 pub const OPCODE_RGET: u8 = 0x30; 104 pub const OPCODE_RSET: u8 = 0x31; 105 pub const OPCODE_RSETQ: u8 = 0x32; 106 pub const OPCODE_RAPPEND: u8 = 0x33; 107 pub const OPCODE_RAPPENDQ: u8 = 0x34; 108 pub const OPCODE_RPREPEND: u8 = 0x35; 109 pub const OPCODE_RPREPENDQ: u8 = 0x36; 110 pub const OPCODE_RDEL: u8 = 0x37; 111 pub const OPCODE_RDELQ: u8 = 0x38; 112 pub const OPCODE_RINCR: u8 = 0x39; 113 pub const OPCODE_RINCRQ: u8 = 0x3A; 114 pub const OPCODE_RDECR: u8 = 0x3B; 115 pub const OPCODE_RDECRQ: u8 = 0x3C; 116 pub const OPCODE_SET_VBUCKET: u8 = 0x3D; 117 pub const OPCODE_GET_VBUCKET: u8 = 0x3E; 118 pub const OPCODE_DEL_VBUCKET: u8 = 0x3F; 119 pub const OPCODE_TAP_CONNECT: u8 = 0x40; 120 pub const OPCODE_TAP_MUTATION: u8 = 0x41; 121 pub const OPCODE_TAP_DEL: u8 = 0x42; 122 pub const OPCODE_TAP_FLUSH: u8 = 0x43; 123 pub const OPCODE_TAP_OPAQUE: u8 = 0x44; 124 pub const OPCODE_TAP_VBUCKET_SET: u8 = 0x45; 125 pub const OPCODE_TAP_CHECKPOINT_START: u8 = 0x46; 126 pub const OPCODE_TAP_CHECKPOINT_END: u8 = 0x47; 127 128 pub const DATA_TYPE_RAW_BYTES: u8 = 0x00; 129 } 130 131 /// Memcached response status 132 #[derive(Copy, Clone, Debug, Eq, PartialEq)] 133 #[repr(u16)] 134 #[rustfmt::skip] 135 pub enum Status { 136 NoError = consts::STATUS_NO_ERROR, 137 KeyNotFound = consts::STATUS_KEY_NOT_FOUND, 138 KeyExists = consts::STATUS_KEY_EXISTS, 139 ValueTooLarge = consts::STATUS_VALUE_TOO_LARGE, 140 InvalidArguments = consts::STATUS_INVALID_ARGUMENTS, 141 ItemNotStored = consts::STATUS_ITEM_NOT_STORED, 142 IncrDecrOnNonNumericValue = consts::STATUS_INCR_OR_DECR_ON_NON_NUMERIC_VALUE, 143 VBucketBelongsToOtherServer = consts::STATUS_VBUCKET_BELONGS_TO_OTHER_SERVER, 144 AuthenticationError = consts::STATUS_AUTHENTICATION_ERROR, 145 AuthenticationContinue = consts::STATUS_AUTHENTICATION_CONTINUE, 146 UnknownCommand = consts::STATUS_UNKNOWN_COMMAND, 147 OutOfMemory = consts::STATUS_OUT_OF_MEMORY, 148 NotSupported = consts::STATUS_NOT_SUPPORTED, 149 InternalError = consts::STATUS_INTERNAL_ERROR, 150 Busy = consts::STATUS_BUSY, 151 TemporaryFailure = consts::STATUS_TEMPORARY_FAILURE, 152 AuthenticationRequired = consts::STATUS_AUTHENTICATION_REQUIRED, 153 AuthenticationFurtherStepRequired = consts::STATUS_AUTHENTICATION_FURTHER_STEP_REQUIRED, 154 } 155 156 impl Status { 157 /// Get the binary code of the status 158 #[inline] to_u16(self) -> u16159 pub fn to_u16(self) -> u16 { 160 self as u16 161 } 162 163 /// Generate a Status from binary code 164 #[inline] 165 #[rustfmt::skip] from_u16(code: u16) -> Option<Status>166 pub fn from_u16(code: u16) -> Option<Status> { 167 match code { 168 consts::STATUS_NO_ERROR => Some(Status::NoError), 169 consts::STATUS_KEY_NOT_FOUND => Some(Status::KeyNotFound), 170 consts::STATUS_KEY_EXISTS => Some(Status::KeyExists), 171 consts::STATUS_VALUE_TOO_LARGE => Some(Status::ValueTooLarge), 172 consts::STATUS_INVALID_ARGUMENTS => Some(Status::InvalidArguments), 173 consts::STATUS_ITEM_NOT_STORED => Some(Status::ItemNotStored), 174 consts::STATUS_INCR_OR_DECR_ON_NON_NUMERIC_VALUE => Some(Status::IncrDecrOnNonNumericValue), 175 consts::STATUS_VBUCKET_BELONGS_TO_OTHER_SERVER => Some(Status::VBucketBelongsToOtherServer), 176 consts::STATUS_AUTHENTICATION_ERROR => Some(Status::AuthenticationError), 177 consts::STATUS_AUTHENTICATION_CONTINUE => Some(Status::AuthenticationContinue), 178 consts::STATUS_UNKNOWN_COMMAND => Some(Status::UnknownCommand), 179 consts::STATUS_OUT_OF_MEMORY => Some(Status::OutOfMemory), 180 consts::STATUS_NOT_SUPPORTED => Some(Status::NotSupported), 181 consts::STATUS_INTERNAL_ERROR => Some(Status::InternalError), 182 consts::STATUS_BUSY => Some(Status::Busy), 183 consts::STATUS_TEMPORARY_FAILURE => Some(Status::TemporaryFailure), 184 consts::STATUS_AUTHENTICATION_REQUIRED => Some(Status::AuthenticationRequired), 185 consts::STATUS_AUTHENTICATION_FURTHER_STEP_REQUIRED => Some(Status::AuthenticationFurtherStepRequired), 186 _ => None, 187 } 188 } 189 190 /// Get a short description 191 #[inline] 192 #[rustfmt::skip] desc(self) -> &'static str193 pub fn desc(self) -> &'static str { 194 match self { 195 Status::NoError => "no error", 196 Status::KeyNotFound => "key not found", 197 Status::KeyExists => "key exists", 198 Status::ValueTooLarge => "value too large", 199 Status::InvalidArguments => "invalid argument", 200 Status::ItemNotStored => "item not stored", 201 Status::IncrDecrOnNonNumericValue => "incr or decr on non-numeric value", 202 Status::VBucketBelongsToOtherServer => "vbucket belongs to other server", 203 Status::AuthenticationError => "authentication error", 204 Status::AuthenticationContinue => "authentication continue", 205 Status::UnknownCommand => "unknown command", 206 Status::OutOfMemory => "out of memory", 207 Status::NotSupported => "not supported", 208 Status::InternalError => "internal error", 209 Status::Busy => "busy", 210 Status::TemporaryFailure => "temporary failure", 211 Status::AuthenticationRequired => "authentication required/not successful", 212 Status::AuthenticationFurtherStepRequired => "further authentication steps required", 213 } 214 } 215 } 216 217 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 218 #[repr(u8)] 219 #[rustfmt::skip] 220 pub enum Command { 221 Get = consts::OPCODE_GET, 222 Set = consts::OPCODE_SET, 223 Add = consts::OPCODE_ADD, 224 Replace = consts::OPCODE_REPLACE, 225 Delete = consts::OPCODE_DEL, 226 Increment = consts::OPCODE_INCR, 227 Decrement = consts::OPCODE_DECR, 228 Quit = consts::OPCODE_QUIT, 229 Flush = consts::OPCODE_FLUSH, 230 GetQuietly = consts::OPCODE_GETQ, 231 Noop = consts::OPCODE_NOP, 232 Version = consts::OPCODE_VERSION, 233 GetKey = consts::OPCODE_GETK, 234 GetKeyQuietly = consts::OPCODE_GETKQ, 235 Append = consts::OPCODE_APPEND, 236 Prepend = consts::OPCODE_PREPEND, 237 Stat = consts::OPCODE_STAT, 238 SetQuietly = consts::OPCODE_SETQ, 239 AddQuietly = consts::OPCODE_ADDQ, 240 ReplaceQuietly = consts::OPCODE_REPLACEQ, 241 DeleteQuietly = consts::OPCODE_DELQ, 242 IncrementQuietly = consts::OPCODE_INCRQ, 243 DecrementQuietly = consts::OPCODE_DECRQ, 244 QuitQuietly = consts::OPCODE_QUITQ, 245 FlushQuietly = consts::OPCODE_FLUSHQ, 246 AppendQuietly = consts::OPCODE_APPENDQ, 247 PrependQuietly = consts::OPCODE_PREPENDQ, 248 Verbosity = consts::OPCODE_VERBOSITY, 249 Touch = consts::OPCODE_TOUCH, 250 GetAndTouch = consts::OPCODE_GAT, 251 GetAndTouchQuietly = consts::OPCODE_GATQ, 252 SaslListMechanisms = consts::OPCODE_SASL_LIST_MECHS, 253 SaslAuthenticate = consts::OPCODE_SASL_AUTH, 254 SaslStep = consts::OPCODE_SASL_STEP, 255 RGet = consts::OPCODE_RGET, 256 RSet = consts::OPCODE_RSET, 257 RSetQuietly = consts::OPCODE_RSETQ, 258 RAppend = consts::OPCODE_RAPPEND, 259 RAppendQuietly = consts::OPCODE_RAPPENDQ, 260 RPrepend = consts::OPCODE_RPREPEND, 261 RPrependQuietly = consts::OPCODE_RPREPENDQ, 262 RDelete = consts::OPCODE_RDEL, 263 RDeleteQuietly = consts::OPCODE_RDELQ, 264 RIncrement = consts::OPCODE_RINCR, 265 RIncrementQuietly = consts::OPCODE_RINCRQ, 266 RDecrement = consts::OPCODE_RDECR, 267 RDecrementQuietly = consts::OPCODE_RDECRQ, 268 SetVBucket = consts::OPCODE_SET_VBUCKET, 269 GetVBucket = consts::OPCODE_GET_VBUCKET, 270 DelVBucket = consts::OPCODE_DEL_VBUCKET, 271 TapConnect = consts::OPCODE_TAP_CONNECT, 272 TapMutation = consts::OPCODE_TAP_MUTATION, 273 TapDelete = consts::OPCODE_TAP_DEL, 274 TapFlush = consts::OPCODE_TAP_FLUSH, 275 TapOpaque = consts::OPCODE_TAP_OPAQUE, 276 TapVBucketSet = consts::OPCODE_TAP_VBUCKET_SET, 277 TapCheckpointStart = consts::OPCODE_TAP_CHECKPOINT_START, 278 TapCheckpointEnd = consts::OPCODE_TAP_CHECKPOINT_END, 279 } 280 281 impl Command { 282 #[inline] to_u8(self) -> u8283 fn to_u8(self) -> u8 { 284 self as u8 285 } 286 287 #[inline] 288 #[rustfmt::skip] from_u8(code: u8) -> Option<Command>289 fn from_u8(code: u8) -> Option<Command> { 290 match code { 291 consts::OPCODE_GET => Some(Command::Get), 292 consts::OPCODE_SET => Some(Command::Set), 293 consts::OPCODE_ADD => Some(Command::Add), 294 consts::OPCODE_REPLACE => Some(Command::Replace), 295 consts::OPCODE_DEL => Some(Command::Delete), 296 consts::OPCODE_INCR => Some(Command::Increment), 297 consts::OPCODE_DECR => Some(Command::Decrement), 298 consts::OPCODE_QUIT => Some(Command::Quit), 299 consts::OPCODE_FLUSH => Some(Command::Flush), 300 consts::OPCODE_GETQ => Some(Command::GetQuietly), 301 consts::OPCODE_NOP => Some(Command::Noop), 302 consts::OPCODE_VERSION => Some(Command::Version), 303 consts::OPCODE_GETK => Some(Command::GetKey), 304 consts::OPCODE_GETKQ => Some(Command::GetKeyQuietly), 305 consts::OPCODE_APPEND => Some(Command::Append), 306 consts::OPCODE_PREPEND => Some(Command::Prepend), 307 consts::OPCODE_STAT => Some(Command::Stat), 308 consts::OPCODE_SETQ => Some(Command::SetQuietly), 309 consts::OPCODE_ADDQ => Some(Command::AddQuietly), 310 consts::OPCODE_REPLACEQ => Some(Command::ReplaceQuietly), 311 consts::OPCODE_DELQ => Some(Command::DeleteQuietly), 312 consts::OPCODE_INCRQ => Some(Command::IncrementQuietly), 313 consts::OPCODE_DECRQ => Some(Command::DecrementQuietly), 314 consts::OPCODE_QUITQ => Some(Command::QuitQuietly), 315 consts::OPCODE_FLUSHQ => Some(Command::FlushQuietly), 316 consts::OPCODE_APPENDQ => Some(Command::AppendQuietly), 317 consts::OPCODE_PREPENDQ => Some(Command::PrependQuietly), 318 consts::OPCODE_VERBOSITY => Some(Command::Verbosity), 319 consts::OPCODE_TOUCH => Some(Command::Touch), 320 consts::OPCODE_GAT => Some(Command::GetAndTouch), 321 consts::OPCODE_GATQ => Some(Command::GetAndTouchQuietly), 322 consts::OPCODE_SASL_LIST_MECHS => Some(Command::SaslListMechanisms), 323 consts::OPCODE_SASL_AUTH => Some(Command::SaslAuthenticate), 324 consts::OPCODE_SASL_STEP => Some(Command::SaslStep), 325 consts::OPCODE_RGET => Some(Command::RGet), 326 consts::OPCODE_RSET => Some(Command::RSet), 327 consts::OPCODE_RSETQ => Some(Command::RSetQuietly), 328 consts::OPCODE_RAPPEND => Some(Command::RAppend), 329 consts::OPCODE_RAPPENDQ => Some(Command::RAppendQuietly), 330 consts::OPCODE_RPREPEND => Some(Command::RPrepend), 331 consts::OPCODE_RPREPENDQ => Some(Command::RPrependQuietly), 332 consts::OPCODE_RDEL => Some(Command::RDelete), 333 consts::OPCODE_RDELQ => Some(Command::RDeleteQuietly), 334 consts::OPCODE_RINCR => Some(Command::RIncrement), 335 consts::OPCODE_RINCRQ => Some(Command::RIncrementQuietly), 336 consts::OPCODE_RDECR => Some(Command::RDecrement), 337 consts::OPCODE_RDECRQ => Some(Command::RDecrementQuietly), 338 consts::OPCODE_SET_VBUCKET => Some(Command::SetVBucket), 339 consts::OPCODE_GET_VBUCKET => Some(Command::GetVBucket), 340 consts::OPCODE_DEL_VBUCKET => Some(Command::DelVBucket), 341 consts::OPCODE_TAP_CONNECT => Some(Command::TapConnect), 342 consts::OPCODE_TAP_MUTATION => Some(Command::TapMutation), 343 consts::OPCODE_TAP_DEL => Some(Command::TapDelete), 344 consts::OPCODE_TAP_FLUSH => Some(Command::TapFlush), 345 consts::OPCODE_TAP_OPAQUE => Some(Command::TapOpaque), 346 consts::OPCODE_TAP_VBUCKET_SET => Some(Command::TapVBucketSet), 347 consts::OPCODE_TAP_CHECKPOINT_START => Some(Command::TapCheckpointStart), 348 consts::OPCODE_TAP_CHECKPOINT_END => Some(Command::TapCheckpointEnd), 349 _ => None, 350 } 351 } 352 } 353 354 #[derive(Copy, Clone, Debug, Eq, PartialEq)] 355 pub enum DataType { 356 RawBytes, 357 } 358 359 impl DataType { 360 #[inline] to_u8(self) -> u8361 fn to_u8(self) -> u8 { 362 match self { 363 DataType::RawBytes => consts::DATA_TYPE_RAW_BYTES, 364 } 365 } 366 367 #[inline] from_u8(code: u8) -> Option<DataType>368 fn from_u8(code: u8) -> Option<DataType> { 369 match code { 370 consts::DATA_TYPE_RAW_BYTES => Some(DataType::RawBytes), 371 _ => None, 372 } 373 } 374 } 375 376 // Byte/ 0 | 1 | 2 | 3 | 377 // / | | | | 378 // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| 379 // +---------------+---------------+---------------+---------------+ 380 // 0| Magic | Opcode | Key length | 381 // +---------------+---------------+---------------+---------------+ 382 // 4| Extras length | Data type | vbucket id | 383 // +---------------+---------------+---------------+---------------+ 384 // 8| Total body length | 385 // +---------------+---------------+---------------+---------------+ 386 // 12| Opaque | 387 // +---------------+---------------+---------------+---------------+ 388 // 16| CAS | 389 // | | 390 // +---------------+---------------+---------------+---------------+ 391 // Total 24 bytes 392 #[derive(Clone, Debug)] 393 pub struct RequestHeader { 394 pub command: Command, 395 key_len: u16, 396 extra_len: u8, 397 pub data_type: DataType, 398 pub vbucket_id: u16, 399 body_len: u32, 400 pub opaque: u32, 401 pub cas: u64, 402 } 403 404 impl RequestHeader { new( cmd: Command, dtype: DataType, vbid: u16, opaque: u32, cas: u64, key_len: u16, extra_len: u8, body_len: u32, ) -> RequestHeader405 pub fn new( 406 cmd: Command, 407 dtype: DataType, 408 vbid: u16, 409 opaque: u32, 410 cas: u64, 411 key_len: u16, 412 extra_len: u8, 413 body_len: u32, 414 ) -> RequestHeader { 415 RequestHeader { 416 command: cmd, 417 key_len, 418 extra_len, 419 data_type: dtype, 420 vbucket_id: vbid, 421 body_len, 422 opaque, 423 cas, 424 } 425 } 426 from_payload( cmd: Command, dtype: DataType, vbid: u16, opaque: u32, cas: u64, key: &[u8], extra: &[u8], value: &[u8], ) -> RequestHeader427 pub fn from_payload( 428 cmd: Command, 429 dtype: DataType, 430 vbid: u16, 431 opaque: u32, 432 cas: u64, 433 key: &[u8], 434 extra: &[u8], 435 value: &[u8], 436 ) -> RequestHeader { 437 let key_len = key.len() as u16; 438 let extra_len = extra.len() as u8; 439 let body_len = (key.len() + extra.len() + value.len()) as u32; 440 441 RequestHeader::new(cmd, dtype, vbid, opaque, cas, key_len, extra_len, body_len) 442 } 443 444 #[inline] write_to<W: Write>(&self, writer: &mut W) -> io::Result<()>445 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> { 446 writer.write_u8(consts::MAGIC_REQUEST)?; 447 writer.write_u8(self.command.to_u8())?; 448 writer.write_u16::<BigEndian>(self.key_len)?; 449 writer.write_u8(self.extra_len)?; 450 writer.write_u8(self.data_type.to_u8())?; 451 writer.write_u16::<BigEndian>(self.vbucket_id)?; 452 writer.write_u32::<BigEndian>(self.body_len)?; 453 writer.write_u32::<BigEndian>(self.opaque)?; 454 writer.write_u64::<BigEndian>(self.cas)?; 455 456 Ok(()) 457 } 458 459 #[inline] read_from<R: Read>(reader: &mut R) -> io::Result<RequestHeader>460 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<RequestHeader> { 461 let magic = reader.read_u8()?; 462 463 if magic != consts::MAGIC_REQUEST { 464 return Err(io::Error::new(io::ErrorKind::Other, "Invalid magic")); 465 } 466 467 Ok(RequestHeader { 468 command: match Command::from_u8(reader.read_u8()?) { 469 Some(c) => c, 470 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid command")), 471 }, 472 key_len: reader.read_u16::<BigEndian>()?, 473 extra_len: reader.read_u8()?, 474 data_type: match DataType::from_u8(reader.read_u8()?) { 475 Some(d) => d, 476 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid data type")), 477 }, 478 vbucket_id: reader.read_u16::<BigEndian>()?, 479 body_len: reader.read_u32::<BigEndian>()?, 480 opaque: reader.read_u32::<BigEndian>()?, 481 cas: reader.read_u64::<BigEndian>()?, 482 }) 483 } 484 } 485 486 // Byte/ 0 | 1 | 2 | 3 | 487 // / | | | | 488 // |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7| 489 // +---------------+---------------+---------------+---------------+ 490 // 0| Magic | Opcode | Key Length | 491 // +---------------+---------------+---------------+---------------+ 492 // 4| Extras length | Data type | Status | 493 // +---------------+---------------+---------------+---------------+ 494 // 8| Total body length | 495 // +---------------+---------------+---------------+---------------+ 496 // 12| Opaque | 497 // +---------------+---------------+---------------+---------------+ 498 // 16| CAS | 499 // | | 500 // +---------------+---------------+---------------+---------------+ 501 // Total 24 bytes 502 #[derive(Clone, Debug)] 503 pub struct ResponseHeader { 504 pub command: Command, 505 key_len: u16, 506 extra_len: u8, 507 pub data_type: DataType, 508 pub status: Status, 509 body_len: u32, 510 pub opaque: u32, 511 pub cas: u64, 512 } 513 514 impl ResponseHeader { new( command: Command, data_type: DataType, status: Status, opaque: u32, cas: u64, key_len: u16, extra_len: u8, body_len: u32, ) -> ResponseHeader515 pub fn new( 516 command: Command, 517 data_type: DataType, 518 status: Status, 519 opaque: u32, 520 cas: u64, 521 key_len: u16, 522 extra_len: u8, 523 body_len: u32, 524 ) -> ResponseHeader { 525 ResponseHeader { 526 command, 527 key_len, 528 extra_len, 529 data_type, 530 status, 531 body_len, 532 opaque, 533 cas, 534 } 535 } 536 from_payload( cmd: Command, dtype: DataType, status: Status, opaque: u32, cas: u64, key: &[u8], extra: &[u8], value: &[u8], ) -> ResponseHeader537 pub fn from_payload( 538 cmd: Command, 539 dtype: DataType, 540 status: Status, 541 opaque: u32, 542 cas: u64, 543 key: &[u8], 544 extra: &[u8], 545 value: &[u8], 546 ) -> ResponseHeader { 547 let key_len = key.len() as u16; 548 let extra_len = extra.len() as u8; 549 let body_len = (key.len() + extra.len() + value.len()) as u32; 550 551 ResponseHeader::new( 552 cmd, dtype, status, opaque, cas, key_len, extra_len, body_len, 553 ) 554 } 555 556 #[inline] write_to<W: Write>(&self, writer: &mut W) -> io::Result<()>557 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> { 558 writer.write_u8(consts::MAGIC_RESPONSE)?; 559 writer.write_u8(self.command.to_u8())?; 560 writer.write_u16::<BigEndian>(self.key_len)?; 561 writer.write_u8(self.extra_len)?; 562 writer.write_u8(self.data_type.to_u8())?; 563 writer.write_u16::<BigEndian>(self.status.to_u16())?; 564 writer.write_u32::<BigEndian>(self.body_len)?; 565 writer.write_u32::<BigEndian>(self.opaque)?; 566 writer.write_u64::<BigEndian>(self.cas)?; 567 568 Ok(()) 569 } 570 571 #[inline] read_from<R: Read>(reader: &mut R) -> io::Result<ResponseHeader>572 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<ResponseHeader> { 573 let magic = reader.read_u8()?; 574 575 if magic != consts::MAGIC_RESPONSE { 576 return Err(io::Error::new(io::ErrorKind::Other, "Invalid magic")); 577 } 578 579 Ok(ResponseHeader { 580 command: match Command::from_u8(reader.read_u8()?) { 581 Some(c) => c, 582 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid command")), 583 }, 584 key_len: reader.read_u16::<BigEndian>()?, 585 extra_len: reader.read_u8()?, 586 data_type: match DataType::from_u8(reader.read_u8()?) { 587 Some(d) => d, 588 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid data type")), 589 }, 590 status: match Status::from_u16(reader.read_u16::<BigEndian>()?) { 591 Some(s) => s, 592 None => return Err(io::Error::new(io::ErrorKind::Other, "Invalid status")), 593 }, 594 body_len: reader.read_u32::<BigEndian>()?, 595 opaque: reader.read_u32::<BigEndian>()?, 596 cas: reader.read_u64::<BigEndian>()?, 597 }) 598 } 599 } 600 601 #[derive(Clone, Debug)] 602 pub struct RequestPacket { 603 pub header: RequestHeader, 604 pub extra: Vec<u8>, 605 pub key: Vec<u8>, 606 pub value: Vec<u8>, 607 } 608 609 impl RequestPacket { new( cmd: Command, dtype: DataType, vbid: u16, opaque: u32, cas: u64, extra: Vec<u8>, key: Vec<u8>, value: Vec<u8>, ) -> RequestPacket610 pub fn new( 611 cmd: Command, 612 dtype: DataType, 613 vbid: u16, 614 opaque: u32, 615 cas: u64, 616 extra: Vec<u8>, 617 key: Vec<u8>, 618 value: Vec<u8>, 619 ) -> RequestPacket { 620 RequestPacket { 621 header: RequestHeader::from_payload( 622 cmd, 623 dtype, 624 vbid, 625 opaque, 626 cas, 627 &key[..], 628 &extra[..], 629 &value[..], 630 ), 631 extra, 632 key, 633 value, 634 } 635 } 636 637 #[inline] write_to<W: Write>(&self, writer: &mut W) -> io::Result<()>638 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> { 639 self.header.write_to(writer)?; 640 writer.write_all(&self.extra[..])?; 641 writer.write_all(&self.key[..])?; 642 writer.write_all(&self.value[..])?; 643 644 Ok(()) 645 } 646 647 #[inline] read_from<R: Read>(reader: &mut R) -> io::Result<RequestPacket>648 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<RequestPacket> { 649 let header = RequestHeader::read_from(reader)?; 650 651 let extra_len = header.extra_len as usize; 652 let key_len = header.key_len as usize; 653 let value_len = header.body_len as usize - extra_len - key_len; 654 655 let extra = { 656 let mut buf = Vec::with_capacity(extra_len as usize); 657 reader.take(extra_len as u64).read_to_end(&mut buf)?; 658 buf 659 }; 660 661 let key = { 662 let mut buf = Vec::with_capacity(key_len as usize); 663 reader.take(key_len as u64).read_to_end(&mut buf)?; 664 buf 665 }; 666 667 let value = { 668 let mut buf = Vec::with_capacity(value_len as usize); 669 reader.take(value_len as u64).read_to_end(&mut buf)?; 670 buf 671 }; 672 673 Ok(RequestPacket { 674 header, 675 extra, 676 key, 677 value, 678 }) 679 } 680 as_ref(&self) -> RequestPacketRef<'_>681 pub fn as_ref(&self) -> RequestPacketRef<'_> { 682 RequestPacketRef::new( 683 &self.header, 684 &self.extra[..], 685 &self.key[..], 686 &self.value[..], 687 ) 688 } 689 } 690 691 #[derive(Debug)] 692 pub struct RequestPacketRef<'a> { 693 pub header: &'a RequestHeader, 694 pub extra: &'a [u8], 695 pub key: &'a [u8], 696 pub value: &'a [u8], 697 } 698 699 impl<'a> RequestPacketRef<'a> { new( header: &'a RequestHeader, extra: &'a [u8], key: &'a [u8], value: &'a [u8], ) -> RequestPacketRef<'a>700 pub fn new( 701 header: &'a RequestHeader, 702 extra: &'a [u8], 703 key: &'a [u8], 704 value: &'a [u8], 705 ) -> RequestPacketRef<'a> { 706 RequestPacketRef { 707 header, 708 extra, 709 key, 710 value, 711 } 712 } 713 714 #[inline] write_to<W: Write>(&self, writer: &mut W) -> io::Result<()>715 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> { 716 self.header.write_to(writer)?; 717 writer.write_all(self.extra)?; 718 writer.write_all(self.key)?; 719 writer.write_all(self.value)?; 720 721 Ok(()) 722 } 723 } 724 725 #[derive(Clone, Debug)] 726 pub struct ResponsePacket { 727 pub header: ResponseHeader, 728 pub extra: Vec<u8>, 729 pub key: Vec<u8>, 730 pub value: Vec<u8>, 731 } 732 733 impl ResponsePacket { new( cmd: Command, dtype: DataType, status: Status, opaque: u32, cas: u64, extra: Vec<u8>, key: Vec<u8>, value: Vec<u8>, ) -> ResponsePacket734 pub fn new( 735 cmd: Command, 736 dtype: DataType, 737 status: Status, 738 opaque: u32, 739 cas: u64, 740 extra: Vec<u8>, 741 key: Vec<u8>, 742 value: Vec<u8>, 743 ) -> ResponsePacket { 744 ResponsePacket { 745 header: ResponseHeader::from_payload( 746 cmd, 747 dtype, 748 status, 749 opaque, 750 cas, 751 &key[..], 752 &extra[..], 753 &value[..], 754 ), 755 extra, 756 key, 757 value, 758 } 759 } 760 761 #[inline] write_to<W: Write>(&self, writer: &mut W) -> io::Result<()>762 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> { 763 self.header.write_to(writer)?; 764 writer.write_all(&self.extra[..])?; 765 writer.write_all(&self.key[..])?; 766 writer.write_all(&self.value[..])?; 767 768 Ok(()) 769 } 770 771 #[inline] read_from<R: Read>(reader: &mut R) -> io::Result<ResponsePacket>772 pub fn read_from<R: Read>(reader: &mut R) -> io::Result<ResponsePacket> { 773 let header = ResponseHeader::read_from(reader)?; 774 775 let extra_len = header.extra_len as usize; 776 let key_len = header.key_len as usize; 777 let value_len = header.body_len as usize - extra_len - key_len; 778 779 let extra = { 780 let mut buf = Vec::with_capacity(extra_len as usize); 781 reader.take(extra_len as u64).read_to_end(&mut buf)?; 782 buf 783 }; 784 785 let key = { 786 let mut buf = Vec::with_capacity(key_len as usize); 787 reader.take(key_len as u64).read_to_end(&mut buf)?; 788 buf 789 }; 790 791 let value = { 792 let mut buf = Vec::with_capacity(value_len as usize); 793 reader.take(value_len as u64).read_to_end(&mut buf)?; 794 buf 795 }; 796 797 Ok(ResponsePacket { 798 header, 799 extra, 800 key, 801 value, 802 }) 803 } 804 } 805 806 pub struct ResponsePacketRef<'a> { 807 pub header: &'a ResponseHeader, 808 pub extra: &'a [u8], 809 pub key: &'a [u8], 810 pub value: &'a [u8], 811 } 812 813 impl<'a> ResponsePacketRef<'a> { new( header: &'a ResponseHeader, extra: &'a [u8], key: &'a [u8], value: &'a [u8], ) -> ResponsePacketRef<'a>814 pub fn new( 815 header: &'a ResponseHeader, 816 extra: &'a [u8], 817 key: &'a [u8], 818 value: &'a [u8], 819 ) -> ResponsePacketRef<'a> { 820 ResponsePacketRef { 821 header, 822 extra, 823 key, 824 value, 825 } 826 } 827 828 #[inline] write_to<W: Write>(&self, writer: &mut W) -> io::Result<()>829 pub fn write_to<W: Write>(&self, writer: &mut W) -> io::Result<()> { 830 self.header.write_to(writer)?; 831 writer.write_all(self.extra)?; 832 writer.write_all(self.key)?; 833 writer.write_all(self.value)?; 834 835 Ok(()) 836 } 837 } 838 839 #[cfg(test)] 840 mod test { 841 use std::io::Write; 842 use std::net::TcpStream; 843 844 use crate::proto; 845 use crate::proto::binarydef::{Command, DataType, RequestPacket, ResponsePacket}; 846 847 use bufstream::BufStream; 848 test_stream() -> TcpStream849 fn test_stream() -> TcpStream { 850 TcpStream::connect("127.0.0.1:11211").unwrap() 851 } 852 853 #[test] test_binary_protocol()854 fn test_binary_protocol() { 855 let mut stream = BufStream::new(test_stream()); 856 857 { 858 let req_packet = RequestPacket::new( 859 Command::Set, 860 DataType::RawBytes, 861 0, 862 0, 863 0, 864 vec![0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x0e, 0x10], 865 b"test:binary_proto:hello".to_vec(), 866 b"world".to_vec(), 867 ); 868 869 req_packet.write_to(&mut stream).unwrap(); 870 stream.flush().unwrap(); 871 872 let resp_packet = ResponsePacket::read_from(&mut stream).unwrap(); 873 874 assert!(resp_packet.header.status == proto::binary::Status::NoError); 875 } 876 877 { 878 let req_packet = RequestPacket::new( 879 Command::Get, 880 DataType::RawBytes, 881 0, 882 0, 883 0, 884 vec![], 885 b"test:binary_proto:hello".to_vec(), 886 vec![], 887 ); 888 889 req_packet.write_to(&mut stream).unwrap(); 890 stream.flush().unwrap(); 891 892 let resp_packet = ResponsePacket::read_from(&mut stream).unwrap(); 893 894 assert!(resp_packet.header.status == proto::binary::Status::NoError); 895 assert_eq!(&resp_packet.value[..], b"world"); 896 } 897 898 { 899 let req_packet = RequestPacket::new( 900 Command::Delete, 901 DataType::RawBytes, 902 0, 903 0, 904 0, 905 vec![], 906 b"test:binary_proto:hello".to_vec(), 907 vec![], 908 ); 909 910 req_packet.write_to(&mut stream).unwrap(); 911 stream.flush().unwrap(); 912 913 let resp_packet = ResponsePacket::read_from(&mut stream).unwrap(); 914 915 assert!(resp_packet.header.status == proto::binary::Status::NoError); 916 } 917 } 918 } 919