1// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4// Source code and contact info at http://github.com/streadway/amqp
5
6package amqp
7
8import (
9	"fmt"
10	"io"
11	"time"
12)
13
14// Constants for standard AMQP 0-9-1 exchange types.
15const (
16	ExchangeDirect  = "direct"
17	ExchangeFanout  = "fanout"
18	ExchangeTopic   = "topic"
19	ExchangeHeaders = "headers"
20)
21
22var (
23	// ErrClosed is returned when the channel or connection is not open
24	ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}
25
26	// ErrChannelMax is returned when Connection.Channel has been called enough
27	// times that all channel IDs have been exhausted in the client or the
28	// server.
29	ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"}
30
31	// ErrSASL is returned from Dial when the authentication mechanism could not
32	// be negoated.
33	ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"}
34
35	// ErrCredentials is returned when the authenticated client is not authorized
36	// to any vhost.
37	ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"}
38
39	// ErrVhost is returned when the authenticated user is not permitted to
40	// access the requested Vhost.
41	ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"}
42
43	// ErrSyntax is hard protocol error, indicating an unsupported protocol,
44	// implementation or encoding.
45	ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"}
46
47	// ErrFrame is returned when the protocol frame cannot be read from the
48	// server, indicating an unsupported protocol or unsupported frame type.
49	ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"}
50
51	// ErrCommandInvalid is returned when the server sends an unexpected response
52	// to this requested message type. This indicates a bug in this client.
53	ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"}
54
55	// ErrUnexpectedFrame is returned when something other than a method or
56	// heartbeat frame is delivered to the Connection, indicating a bug in the
57	// client.
58	ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"}
59
60	// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
61	ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
62)
63
64// Error captures the code and reason a channel or connection has been closed
65// by the server.
66type Error struct {
67	Code    int    // constant code from the specification
68	Reason  string // description of the error
69	Server  bool   // true when initiated from the server, false when from this library
70	Recover bool   // true when this error can be recovered by retrying later or with different parameters
71}
72
73func newError(code uint16, text string) *Error {
74	return &Error{
75		Code:    int(code),
76		Reason:  text,
77		Recover: isSoftExceptionCode(int(code)),
78		Server:  true,
79	}
80}
81
82func (e Error) Error() string {
83	return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason)
84}
85
86// Used by header frames to capture routing and header information
87type properties struct {
88	ContentType     string    // MIME content type
89	ContentEncoding string    // MIME content encoding
90	Headers         Table     // Application or header exchange table
91	DeliveryMode    uint8     // queue implementation use - Transient (1) or Persistent (2)
92	Priority        uint8     // queue implementation use - 0 to 9
93	CorrelationId   string    // application use - correlation identifier
94	ReplyTo         string    // application use - address to to reply to (ex: RPC)
95	Expiration      string    // implementation use - message expiration spec
96	MessageId       string    // application use - message identifier
97	Timestamp       time.Time // application use - message timestamp
98	Type            string    // application use - message type name
99	UserId          string    // application use - creating user id
100	AppId           string    // application use - creating application
101	reserved1       string    // was cluster-id - process for buffer consumption
102}
103
104// DeliveryMode.  Transient means higher throughput but messages will not be
105// restored on broker restart.  The delivery mode of publishings is unrelated
106// to the durability of the queues they reside on.  Transient messages will
107// not be restored to durable queues, persistent messages will be restored to
108// durable queues and lost on non-durable queues during server restart.
109//
110// This remains typed as uint8 to match Publishing.DeliveryMode.  Other
111// delivery modes specific to custom queue implementations are not enumerated
112// here.
113const (
114	Transient  uint8 = 1
115	Persistent uint8 = 2
116)
117
118// The property flags are an array of bits that indicate the presence or
119// absence of each property value in sequence.  The bits are ordered from most
120// high to low - bit 15 indicates the first property.
121const (
122	flagContentType     = 0x8000
123	flagContentEncoding = 0x4000
124	flagHeaders         = 0x2000
125	flagDeliveryMode    = 0x1000
126	flagPriority        = 0x0800
127	flagCorrelationId   = 0x0400
128	flagReplyTo         = 0x0200
129	flagExpiration      = 0x0100
130	flagMessageId       = 0x0080
131	flagTimestamp       = 0x0040
132	flagType            = 0x0020
133	flagUserId          = 0x0010
134	flagAppId           = 0x0008
135	flagReserved1       = 0x0004
136)
137
138// Queue captures the current server state of the queue on the server returned
139// from Channel.QueueDeclare or Channel.QueueInspect.
140type Queue struct {
141	Name      string // server confirmed or generated name
142	Messages  int    // count of messages not awaiting acknowledgment
143	Consumers int    // number of consumers receiving deliveries
144}
145
146// Publishing captures the client message sent to the server.  The fields
147// outside of the Headers table included in this struct mirror the underlying
148// fields in the content frame.  They use native types for convenience and
149// efficiency.
150type Publishing struct {
151	// Application or exchange specific fields,
152	// the headers exchange will inspect this field.
153	Headers Table
154
155	// Properties
156	ContentType     string    // MIME content type
157	ContentEncoding string    // MIME content encoding
158	DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
159	Priority        uint8     // 0 to 9
160	CorrelationId   string    // correlation identifier
161	ReplyTo         string    // address to to reply to (ex: RPC)
162	Expiration      string    // message expiration spec
163	MessageId       string    // message identifier
164	Timestamp       time.Time // message timestamp
165	Type            string    // message type name
166	UserId          string    // creating user id - ex: "guest"
167	AppId           string    // creating application id
168
169	// The application specific payload of the message
170	Body []byte
171}
172
173// Blocking notifies the server's TCP flow control of the Connection.  When a
174// server hits a memory or disk alarm it will block all connections until the
175// resources are reclaimed.  Use NotifyBlock on the Connection to receive these
176// events.
177type Blocking struct {
178	Active bool   // TCP pushback active/inactive on server
179	Reason string // Server reason for activation
180}
181
182// Confirmation notifies the acknowledgment or negative acknowledgement of a
183// publishing identified by its delivery tag.  Use NotifyPublish on the Channel
184// to consume these events.
185type Confirmation struct {
186	DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode
187	Ack         bool   // True when the server successfully received the publishing
188}
189
190// Decimal matches the AMQP decimal type.  Scale is the number of decimal
191// digits Scale == 2, Value == 12345, Decimal == 123.45
192type Decimal struct {
193	Scale uint8
194	Value int32
195}
196
197// Table stores user supplied fields of the following types:
198//
199//   bool
200//   byte
201//   float32
202//   float64
203//   int16
204//   int32
205//   int64
206//   nil
207//   string
208//   time.Time
209//   amqp.Decimal
210//   amqp.Table
211//   []byte
212//   []interface{} - containing above types
213//
214// Functions taking a table will immediately fail when the table contains a
215// value of an unsupported type.
216//
217// The caller must be specific in which precision of integer it wishes to
218// encode.
219//
220// Use a type assertion when reading values from a table for type conversion.
221//
222// RabbitMQ expects int32 for integer values.
223//
224type Table map[string]interface{}
225
226func validateField(f interface{}) error {
227	switch fv := f.(type) {
228	case nil, bool, byte, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
229		return nil
230
231	case []interface{}:
232		for _, v := range fv {
233			if err := validateField(v); err != nil {
234				return fmt.Errorf("in array %s", err)
235			}
236		}
237		return nil
238
239	case Table:
240		for k, v := range fv {
241			if err := validateField(v); err != nil {
242				return fmt.Errorf("table field %q %s", k, err)
243			}
244		}
245		return nil
246	}
247
248	return fmt.Errorf("value %t not supported", f)
249}
250
251// Validate returns and error if any Go types in the table are incompatible with AMQP types.
252func (t Table) Validate() error {
253	return validateField(t)
254}
255
256// Heap interface for maintaining delivery tags
257type tagSet []uint64
258
259func (set tagSet) Len() int              { return len(set) }
260func (set tagSet) Less(i, j int) bool    { return (set)[i] < (set)[j] }
261func (set tagSet) Swap(i, j int)         { (set)[i], (set)[j] = (set)[j], (set)[i] }
262func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) }
263func (set *tagSet) Pop() interface{} {
264	val := (*set)[len(*set)-1]
265	*set = (*set)[:len(*set)-1]
266	return val
267}
268
269type message interface {
270	id() (uint16, uint16)
271	wait() bool
272	read(io.Reader) error
273	write(io.Writer) error
274}
275
276type messageWithContent interface {
277	message
278	getContent() (properties, []byte)
279	setContent(properties, []byte)
280}
281
282/*
283The base interface implemented as:
284
2852.3.5  frame Details
286
287All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects
288malformed frames:
289
290  0      1         3             7                  size+7 size+8
291  +------+---------+-------------+  +------------+  +-----------+
292  | type | channel |     size    |  |  payload   |  | frame-end |
293  +------+---------+-------------+  +------------+  +-----------+
294   octet   short         long         size octets       octet
295
296To read a frame, we:
297
298 1. Read the header and check the frame type and channel.
299 2. Depending on the frame type, we read the payload and process it.
300 3. Read the frame end octet.
301
302In realistic implementations where performance is a concern, we would use
303“read-ahead buffering” or “gathering reads” to avoid doing three separate
304system calls to read a frame.
305
306*/
307type frame interface {
308	write(io.Writer) error
309	channel() uint16
310}
311
312type reader struct {
313	r io.Reader
314}
315
316type writer struct {
317	w io.Writer
318}
319
320// Implements the frame interface for Connection RPC
321type protocolHeader struct{}
322
323func (protocolHeader) write(w io.Writer) error {
324	_, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
325	return err
326}
327
328func (protocolHeader) channel() uint16 {
329	panic("only valid as initial handshake")
330}
331
332/*
333Method frames carry the high-level protocol commands (which we call "methods").
334One method frame carries one command.  The method frame payload has this format:
335
336  0          2           4
337  +----------+-----------+-------------- - -
338  | class-id | method-id | arguments...
339  +----------+-----------+-------------- - -
340     short      short    ...
341
342To process a method frame, we:
343 1. Read the method frame payload.
344 2. Unpack it into a structure.  A given method always has the same structure,
345 so we can unpack the method rapidly.  3. Check that the method is allowed in
346 the current context.
347 4. Check that the method arguments are valid.
348 5. Execute the method.
349
350Method frame bodies are constructed as a list of AMQP data fields (bits,
351integers, strings and string tables).  The marshalling code is trivially
352generated directly from the protocol specifications, and can be very rapid.
353*/
354type methodFrame struct {
355	ChannelId uint16
356	ClassId   uint16
357	MethodId  uint16
358	Method    message
359}
360
361func (f *methodFrame) channel() uint16 { return f.ChannelId }
362
363/*
364Heartbeating is a technique designed to undo one of TCP/IP's features, namely
365its ability to recover from a broken physical connection by closing only after
366a quite long time-out.  In some scenarios we need to know very rapidly if a
367peer is disconnected or not responding for other reasons (e.g. it is looping).
368Since heartbeating can be done at a low level, we implement this as a special
369type of frame that peers exchange at the transport level, rather than as a
370class method.
371*/
372type heartbeatFrame struct {
373	ChannelId uint16
374}
375
376func (f *heartbeatFrame) channel() uint16 { return f.ChannelId }
377
378/*
379Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally
380defined as carrying content.  When a peer sends such a method frame, it always
381follows it with a content header and zero or more content body frames.
382
383A content header frame has this format:
384
385    0          2        4           12               14
386    +----------+--------+-----------+----------------+------------- - -
387    | class-id | weight | body size | property flags | property list...
388    +----------+--------+-----------+----------------+------------- - -
389      short     short    long long       short        remainder...
390
391We place content body in distinct frames (rather than including it in the
392method) so that AMQP may support "zero copy" techniques in which content is
393never marshalled or encoded.  We place the content properties in their own
394frame so that recipients can selectively discard contents they do not want to
395process
396*/
397type headerFrame struct {
398	ChannelId  uint16
399	ClassId    uint16
400	weight     uint16
401	Size       uint64
402	Properties properties
403}
404
405func (f *headerFrame) channel() uint16 { return f.ChannelId }
406
407/*
408Content is the application data we carry from client-to-client via the AMQP
409server.  Content is, roughly speaking, a set of properties plus a binary data
410part.  The set of allowed properties are defined by the Basic class, and these
411form the "content header frame".  The data can be any size, and MAY be broken
412into several (or many) chunks, each forming a "content body frame".
413
414Looking at the frames for a specific channel, as they pass on the wire, we
415might see something like this:
416
417		[method]
418		[method] [header] [body] [body]
419		[method]
420		...
421*/
422type bodyFrame struct {
423	ChannelId uint16
424	Body      []byte
425}
426
427func (f *bodyFrame) channel() uint16 { return f.ChannelId }
428