1// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4// Source code and contact info at http://github.com/streadway/amqp 5 6package amqp 7 8import ( 9 "fmt" 10 "io" 11 "time" 12) 13 14// Constants for standard AMQP 0-9-1 exchange types. 15const ( 16 ExchangeDirect = "direct" 17 ExchangeFanout = "fanout" 18 ExchangeTopic = "topic" 19 ExchangeHeaders = "headers" 20) 21 22var ( 23 // ErrClosed is returned when the channel or connection is not open 24 ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"} 25 26 // ErrChannelMax is returned when Connection.Channel has been called enough 27 // times that all channel IDs have been exhausted in the client or the 28 // server. 29 ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"} 30 31 // ErrSASL is returned from Dial when the authentication mechanism could not 32 // be negoated. 33 ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"} 34 35 // ErrCredentials is returned when the authenticated client is not authorized 36 // to any vhost. 37 ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"} 38 39 // ErrVhost is returned when the authenticated user is not permitted to 40 // access the requested Vhost. 41 ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"} 42 43 // ErrSyntax is hard protocol error, indicating an unsupported protocol, 44 // implementation or encoding. 45 ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"} 46 47 // ErrFrame is returned when the protocol frame cannot be read from the 48 // server, indicating an unsupported protocol or unsupported frame type. 49 ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"} 50 51 // ErrCommandInvalid is returned when the server sends an unexpected response 52 // to this requested message type. This indicates a bug in this client. 53 ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"} 54 55 // ErrUnexpectedFrame is returned when something other than a method or 56 // heartbeat frame is delivered to the Connection, indicating a bug in the 57 // client. 58 ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"} 59 60 // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP. 61 ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} 62) 63 64// Error captures the code and reason a channel or connection has been closed 65// by the server. 66type Error struct { 67 Code int // constant code from the specification 68 Reason string // description of the error 69 Server bool // true when initiated from the server, false when from this library 70 Recover bool // true when this error can be recovered by retrying later or with different parameters 71} 72 73func newError(code uint16, text string) *Error { 74 return &Error{ 75 Code: int(code), 76 Reason: text, 77 Recover: isSoftExceptionCode(int(code)), 78 Server: true, 79 } 80} 81 82func (e Error) Error() string { 83 return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason) 84} 85 86// Used by header frames to capture routing and header information 87type properties struct { 88 ContentType string // MIME content type 89 ContentEncoding string // MIME content encoding 90 Headers Table // Application or header exchange table 91 DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2) 92 Priority uint8 // queue implementation use - 0 to 9 93 CorrelationId string // application use - correlation identifier 94 ReplyTo string // application use - address to to reply to (ex: RPC) 95 Expiration string // implementation use - message expiration spec 96 MessageId string // application use - message identifier 97 Timestamp time.Time // application use - message timestamp 98 Type string // application use - message type name 99 UserId string // application use - creating user id 100 AppId string // application use - creating application 101 reserved1 string // was cluster-id - process for buffer consumption 102} 103 104// DeliveryMode. Transient means higher throughput but messages will not be 105// restored on broker restart. The delivery mode of publishings is unrelated 106// to the durability of the queues they reside on. Transient messages will 107// not be restored to durable queues, persistent messages will be restored to 108// durable queues and lost on non-durable queues during server restart. 109// 110// This remains typed as uint8 to match Publishing.DeliveryMode. Other 111// delivery modes specific to custom queue implementations are not enumerated 112// here. 113const ( 114 Transient uint8 = 1 115 Persistent uint8 = 2 116) 117 118// The property flags are an array of bits that indicate the presence or 119// absence of each property value in sequence. The bits are ordered from most 120// high to low - bit 15 indicates the first property. 121const ( 122 flagContentType = 0x8000 123 flagContentEncoding = 0x4000 124 flagHeaders = 0x2000 125 flagDeliveryMode = 0x1000 126 flagPriority = 0x0800 127 flagCorrelationId = 0x0400 128 flagReplyTo = 0x0200 129 flagExpiration = 0x0100 130 flagMessageId = 0x0080 131 flagTimestamp = 0x0040 132 flagType = 0x0020 133 flagUserId = 0x0010 134 flagAppId = 0x0008 135 flagReserved1 = 0x0004 136) 137 138// Queue captures the current server state of the queue on the server returned 139// from Channel.QueueDeclare or Channel.QueueInspect. 140type Queue struct { 141 Name string // server confirmed or generated name 142 Messages int // count of messages not awaiting acknowledgment 143 Consumers int // number of consumers receiving deliveries 144} 145 146// Publishing captures the client message sent to the server. The fields 147// outside of the Headers table included in this struct mirror the underlying 148// fields in the content frame. They use native types for convenience and 149// efficiency. 150type Publishing struct { 151 // Application or exchange specific fields, 152 // the headers exchange will inspect this field. 153 Headers Table 154 155 // Properties 156 ContentType string // MIME content type 157 ContentEncoding string // MIME content encoding 158 DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) 159 Priority uint8 // 0 to 9 160 CorrelationId string // correlation identifier 161 ReplyTo string // address to to reply to (ex: RPC) 162 Expiration string // message expiration spec 163 MessageId string // message identifier 164 Timestamp time.Time // message timestamp 165 Type string // message type name 166 UserId string // creating user id - ex: "guest" 167 AppId string // creating application id 168 169 // The application specific payload of the message 170 Body []byte 171} 172 173// Blocking notifies the server's TCP flow control of the Connection. When a 174// server hits a memory or disk alarm it will block all connections until the 175// resources are reclaimed. Use NotifyBlock on the Connection to receive these 176// events. 177type Blocking struct { 178 Active bool // TCP pushback active/inactive on server 179 Reason string // Server reason for activation 180} 181 182// Confirmation notifies the acknowledgment or negative acknowledgement of a 183// publishing identified by its delivery tag. Use NotifyPublish on the Channel 184// to consume these events. 185type Confirmation struct { 186 DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode 187 Ack bool // True when the server successfully received the publishing 188} 189 190// Decimal matches the AMQP decimal type. Scale is the number of decimal 191// digits Scale == 2, Value == 12345, Decimal == 123.45 192type Decimal struct { 193 Scale uint8 194 Value int32 195} 196 197// Table stores user supplied fields of the following types: 198// 199// bool 200// byte 201// float32 202// float64 203// int16 204// int32 205// int64 206// nil 207// string 208// time.Time 209// amqp.Decimal 210// amqp.Table 211// []byte 212// []interface{} - containing above types 213// 214// Functions taking a table will immediately fail when the table contains a 215// value of an unsupported type. 216// 217// The caller must be specific in which precision of integer it wishes to 218// encode. 219// 220// Use a type assertion when reading values from a table for type conversion. 221// 222// RabbitMQ expects int32 for integer values. 223// 224type Table map[string]interface{} 225 226func validateField(f interface{}) error { 227 switch fv := f.(type) { 228 case nil, bool, byte, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: 229 return nil 230 231 case []interface{}: 232 for _, v := range fv { 233 if err := validateField(v); err != nil { 234 return fmt.Errorf("in array %s", err) 235 } 236 } 237 return nil 238 239 case Table: 240 for k, v := range fv { 241 if err := validateField(v); err != nil { 242 return fmt.Errorf("table field %q %s", k, err) 243 } 244 } 245 return nil 246 } 247 248 return fmt.Errorf("value %t not supported", f) 249} 250 251// Validate returns and error if any Go types in the table are incompatible with AMQP types. 252func (t Table) Validate() error { 253 return validateField(t) 254} 255 256// Heap interface for maintaining delivery tags 257type tagSet []uint64 258 259func (set tagSet) Len() int { return len(set) } 260func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] } 261func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] } 262func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) } 263func (set *tagSet) Pop() interface{} { 264 val := (*set)[len(*set)-1] 265 *set = (*set)[:len(*set)-1] 266 return val 267} 268 269type message interface { 270 id() (uint16, uint16) 271 wait() bool 272 read(io.Reader) error 273 write(io.Writer) error 274} 275 276type messageWithContent interface { 277 message 278 getContent() (properties, []byte) 279 setContent(properties, []byte) 280} 281 282/* 283The base interface implemented as: 284 2852.3.5 frame Details 286 287All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects 288malformed frames: 289 290 0 1 3 7 size+7 size+8 291 +------+---------+-------------+ +------------+ +-----------+ 292 | type | channel | size | | payload | | frame-end | 293 +------+---------+-------------+ +------------+ +-----------+ 294 octet short long size octets octet 295 296To read a frame, we: 297 298 1. Read the header and check the frame type and channel. 299 2. Depending on the frame type, we read the payload and process it. 300 3. Read the frame end octet. 301 302In realistic implementations where performance is a concern, we would use 303“read-ahead buffering” or “gathering reads” to avoid doing three separate 304system calls to read a frame. 305 306*/ 307type frame interface { 308 write(io.Writer) error 309 channel() uint16 310} 311 312type reader struct { 313 r io.Reader 314} 315 316type writer struct { 317 w io.Writer 318} 319 320// Implements the frame interface for Connection RPC 321type protocolHeader struct{} 322 323func (protocolHeader) write(w io.Writer) error { 324 _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1}) 325 return err 326} 327 328func (protocolHeader) channel() uint16 { 329 panic("only valid as initial handshake") 330} 331 332/* 333Method frames carry the high-level protocol commands (which we call "methods"). 334One method frame carries one command. The method frame payload has this format: 335 336 0 2 4 337 +----------+-----------+-------------- - - 338 | class-id | method-id | arguments... 339 +----------+-----------+-------------- - - 340 short short ... 341 342To process a method frame, we: 343 1. Read the method frame payload. 344 2. Unpack it into a structure. A given method always has the same structure, 345 so we can unpack the method rapidly. 3. Check that the method is allowed in 346 the current context. 347 4. Check that the method arguments are valid. 348 5. Execute the method. 349 350Method frame bodies are constructed as a list of AMQP data fields (bits, 351integers, strings and string tables). The marshalling code is trivially 352generated directly from the protocol specifications, and can be very rapid. 353*/ 354type methodFrame struct { 355 ChannelId uint16 356 ClassId uint16 357 MethodId uint16 358 Method message 359} 360 361func (f *methodFrame) channel() uint16 { return f.ChannelId } 362 363/* 364Heartbeating is a technique designed to undo one of TCP/IP's features, namely 365its ability to recover from a broken physical connection by closing only after 366a quite long time-out. In some scenarios we need to know very rapidly if a 367peer is disconnected or not responding for other reasons (e.g. it is looping). 368Since heartbeating can be done at a low level, we implement this as a special 369type of frame that peers exchange at the transport level, rather than as a 370class method. 371*/ 372type heartbeatFrame struct { 373 ChannelId uint16 374} 375 376func (f *heartbeatFrame) channel() uint16 { return f.ChannelId } 377 378/* 379Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally 380defined as carrying content. When a peer sends such a method frame, it always 381follows it with a content header and zero or more content body frames. 382 383A content header frame has this format: 384 385 0 2 4 12 14 386 +----------+--------+-----------+----------------+------------- - - 387 | class-id | weight | body size | property flags | property list... 388 +----------+--------+-----------+----------------+------------- - - 389 short short long long short remainder... 390 391We place content body in distinct frames (rather than including it in the 392method) so that AMQP may support "zero copy" techniques in which content is 393never marshalled or encoded. We place the content properties in their own 394frame so that recipients can selectively discard contents they do not want to 395process 396*/ 397type headerFrame struct { 398 ChannelId uint16 399 ClassId uint16 400 weight uint16 401 Size uint64 402 Properties properties 403} 404 405func (f *headerFrame) channel() uint16 { return f.ChannelId } 406 407/* 408Content is the application data we carry from client-to-client via the AMQP 409server. Content is, roughly speaking, a set of properties plus a binary data 410part. The set of allowed properties are defined by the Basic class, and these 411form the "content header frame". The data can be any size, and MAY be broken 412into several (or many) chunks, each forming a "content body frame". 413 414Looking at the frames for a specific channel, as they pass on the wire, we 415might see something like this: 416 417 [method] 418 [method] [header] [body] [body] 419 [method] 420 ... 421*/ 422type bodyFrame struct { 423 ChannelId uint16 424 Body []byte 425} 426 427func (f *bodyFrame) channel() uint16 { return f.ChannelId } 428