1// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4// Source code and contact info at http://github.com/streadway/amqp
5
6package amqp
7
8import (
9	"bufio"
10	"crypto/tls"
11	"io"
12	"net"
13	"reflect"
14	"strconv"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19)
20
21const (
22	maxChannelMax = (2 << 15) - 1
23
24	defaultHeartbeat         = 10 * time.Second
25	defaultConnectionTimeout = 30 * time.Second
26	defaultProduct           = "https://github.com/streadway/amqp"
27	defaultVersion           = "β"
28	defaultChannelMax        = maxChannelMax
29	defaultLocale            = "en_US"
30)
31
32// Config is used in DialConfig and Open to specify the desired tuning
33// parameters used during a connection open handshake.  The negotiated tuning
34// will be stored in the returned connection's Config field.
35type Config struct {
36	// The SASL mechanisms to try in the client request, and the successful
37	// mechanism used on the Connection object.
38	// If SASL is nil, PlainAuth from the URL is used.
39	SASL []Authentication
40
41	// Vhost specifies the namespace of permissions, exchanges, queues and
42	// bindings on the server.  Dial sets this to the path parsed from the URL.
43	Vhost string
44
45	ChannelMax int           // 0 max channels means 2^16 - 1
46	FrameSize  int           // 0 max bytes means unlimited
47	Heartbeat  time.Duration // less than 1s uses the server's interval
48
49	// TLSClientConfig specifies the client configuration of the TLS connection
50	// when establishing a tls transport.
51	// If the URL uses an amqps scheme, then an empty tls.Config with the
52	// ServerName from the URL is used.
53	TLSClientConfig *tls.Config
54
55	// Properties is table of properties that the client advertises to the server.
56	// This is an optional setting - if the application does not set this,
57	// the underlying library will use a generic set of client properties.
58	Properties Table
59
60	// Connection locale that we expect to always be en_US
61	// Even though servers must return it as per the AMQP 0-9-1 spec,
62	// we are not aware of it being used other than to satisfy the spec requirements
63	Locale string
64
65	// Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
66	// then an AMQP connection handshake.
67	// If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
68	// used during TLS and AMQP handshaking.
69	Dial func(network, addr string) (net.Conn, error)
70}
71
72// Connection manages the serialization and deserialization of frames from IO
73// and dispatches the frames to the appropriate channel.  All RPC methods and
74// asyncronous Publishing, Delivery, Ack, Nack and Return messages are
75// multiplexed on this channel.  There must always be active receivers for
76// every asynchronous message on this connection.
77type Connection struct {
78	destructor sync.Once  // shutdown once
79	sendM      sync.Mutex // conn writer mutex
80	m          sync.Mutex // struct field mutex
81
82	conn io.ReadWriteCloser
83
84	rpc       chan message
85	writer    *writer
86	sends     chan time.Time     // timestamps of each frame sent
87	deadlines chan readDeadliner // heartbeater updates read deadlines
88
89	allocator *allocator // id generator valid after openTune
90	channels  map[uint16]*Channel
91
92	noNotify bool // true when we will never notify again
93	closes   []chan *Error
94	blocks   []chan Blocking
95
96	errors chan *Error
97
98	Config Config // The negotiated Config after connection.open
99
100	Major      int      // Server's major version
101	Minor      int      // Server's minor version
102	Properties Table    // Server properties
103	Locales    []string // Server locales
104
105	closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic
106}
107
108type readDeadliner interface {
109	SetReadDeadline(time.Time) error
110}
111
112// defaultDial establishes a connection when config.Dial is not provided
113func defaultDial(network, addr string) (net.Conn, error) {
114	conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
115	if err != nil {
116		return nil, err
117	}
118
119	// Heartbeating hasn't started yet, don't stall forever on a dead server.
120	// A deadline is set for TLS and AMQP handshaking. After AMQP is established,
121	// the deadline is cleared in openComplete.
122	if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
123		return nil, err
124	}
125
126	return conn, nil
127}
128
129// Dial accepts a string in the AMQP URI format and returns a new Connection
130// over TCP using PlainAuth.  Defaults to a server heartbeat interval of 10
131// seconds and sets the handshake deadline to 30 seconds. After handshake,
132// deadlines are cleared.
133//
134// Dial uses the zero value of tls.Config when it encounters an amqps://
135// scheme.  It is equivalent to calling DialTLS(amqp, nil).
136func Dial(url string) (*Connection, error) {
137	return DialConfig(url, Config{
138		Heartbeat: defaultHeartbeat,
139		Locale:    defaultLocale,
140	})
141}
142
143// DialTLS accepts a string in the AMQP URI format and returns a new Connection
144// over TCP using PlainAuth.  Defaults to a server heartbeat interval of 10
145// seconds and sets the initial read deadline to 30 seconds.
146//
147// DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
148func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
149	return DialConfig(url, Config{
150		Heartbeat:       defaultHeartbeat,
151		TLSClientConfig: amqps,
152		Locale:          defaultLocale,
153	})
154}
155
156// DialConfig accepts a string in the AMQP URI format and a configuration for
157// the transport and connection setup, returning a new Connection.  Defaults to
158// a server heartbeat interval of 10 seconds and sets the initial read deadline
159// to 30 seconds.
160func DialConfig(url string, config Config) (*Connection, error) {
161	var err error
162	var conn net.Conn
163
164	uri, err := ParseURI(url)
165	if err != nil {
166		return nil, err
167	}
168
169	if config.SASL == nil {
170		config.SASL = []Authentication{uri.PlainAuth()}
171	}
172
173	if config.Vhost == "" {
174		config.Vhost = uri.Vhost
175	}
176
177	addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
178
179	dialer := config.Dial
180	if dialer == nil {
181		dialer = defaultDial
182	}
183
184	conn, err = dialer("tcp", addr)
185	if err != nil {
186		return nil, err
187	}
188
189	if uri.Scheme == "amqps" {
190		if config.TLSClientConfig == nil {
191			config.TLSClientConfig = new(tls.Config)
192		}
193
194		// If ServerName has not been specified in TLSClientConfig,
195		// set it to the URI host used for this connection.
196		if config.TLSClientConfig.ServerName == "" {
197			config.TLSClientConfig.ServerName = uri.Host
198		}
199
200		client := tls.Client(conn, config.TLSClientConfig)
201		if err := client.Handshake(); err != nil {
202			conn.Close()
203			return nil, err
204		}
205
206		conn = client
207	}
208
209	return Open(conn, config)
210}
211
212/*
213Open accepts an already established connection, or other io.ReadWriteCloser as
214a transport.  Use this method if you have established a TLS connection or wish
215to use your own custom transport.
216
217*/
218func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
219	c := &Connection{
220		conn:      conn,
221		writer:    &writer{bufio.NewWriter(conn)},
222		channels:  make(map[uint16]*Channel),
223		rpc:       make(chan message),
224		sends:     make(chan time.Time),
225		errors:    make(chan *Error, 1),
226		deadlines: make(chan readDeadliner, 1),
227	}
228	go c.reader(conn)
229	return c, c.open(config)
230}
231
232/*
233LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
234as a fallback default value if the underlying transport does not support LocalAddr().
235*/
236func (c *Connection) LocalAddr() net.Addr {
237	if conn, ok := c.conn.(interface {
238		LocalAddr() net.Addr
239	}); ok {
240		return conn.LocalAddr()
241	}
242	return &net.TCPAddr{}
243}
244
245// ConnectionState returns basic TLS details of the underlying transport.
246// Returns a zero value when the underlying connection does not implement
247// ConnectionState() tls.ConnectionState.
248func (c *Connection) ConnectionState() tls.ConnectionState {
249	if conn, ok := c.conn.(interface {
250		ConnectionState() tls.ConnectionState
251	}); ok {
252		return conn.ConnectionState()
253	}
254	return tls.ConnectionState{}
255}
256
257/*
258NotifyClose registers a listener for close events either initiated by an error
259accompaning a connection.close method or by a normal shutdown.
260
261On normal shutdowns, the chan will be closed.
262
263To reconnect after a transport or protocol error, register a listener here and
264re-run your setup process.
265
266*/
267func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
268	c.m.Lock()
269	defer c.m.Unlock()
270
271	if c.noNotify {
272		close(receiver)
273	} else {
274		c.closes = append(c.closes, receiver)
275	}
276
277	return receiver
278}
279
280/*
281NotifyBlocked registers a listener for RabbitMQ specific TCP flow control
282method extensions connection.blocked and connection.unblocked.  Flow control is
283active with a reason when Blocking.Blocked is true.  When a Connection is
284blocked, all methods will block across all connections until server resources
285become free again.
286
287This optional extension is supported by the server when the
288"connection.blocked" server capability key is true.
289
290*/
291func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
292	c.m.Lock()
293	defer c.m.Unlock()
294
295	if c.noNotify {
296		close(receiver)
297	} else {
298		c.blocks = append(c.blocks, receiver)
299	}
300
301	return receiver
302}
303
304/*
305Close requests and waits for the response to close the AMQP connection.
306
307It's advisable to use this message when publishing to ensure all kernel buffers
308have been flushed on the server and client before exiting.
309
310An error indicates that server may not have received this request to close but
311the connection should be treated as closed regardless.
312
313After returning from this call, all resources associated with this connection,
314including the underlying io, Channels, Notify listeners and Channel consumers
315will also be closed.
316*/
317func (c *Connection) Close() error {
318	if c.isClosed() {
319		return ErrClosed
320	}
321
322	defer c.shutdown(nil)
323	return c.call(
324		&connectionClose{
325			ReplyCode: replySuccess,
326			ReplyText: "kthxbai",
327		},
328		&connectionCloseOk{},
329	)
330}
331
332func (c *Connection) closeWith(err *Error) error {
333	if c.isClosed() {
334		return ErrClosed
335	}
336
337	defer c.shutdown(err)
338	return c.call(
339		&connectionClose{
340			ReplyCode: uint16(err.Code),
341			ReplyText: err.Reason,
342		},
343		&connectionCloseOk{},
344	)
345}
346
347func (c *Connection) isClosed() bool {
348	return (atomic.LoadInt32(&c.closed) == 1)
349}
350
351func (c *Connection) send(f frame) error {
352	if c.isClosed() {
353		return ErrClosed
354	}
355
356	c.sendM.Lock()
357	err := c.writer.WriteFrame(f)
358	c.sendM.Unlock()
359
360	if err != nil {
361		// shutdown could be re-entrant from signaling notify chans
362		go c.shutdown(&Error{
363			Code:   FrameError,
364			Reason: err.Error(),
365		})
366	} else {
367		// Broadcast we sent a frame, reducing heartbeats, only
368		// if there is something that can receive - like a non-reentrant
369		// call or if the heartbeater isn't running
370		select {
371		case c.sends <- time.Now():
372		default:
373		}
374	}
375
376	return err
377}
378
379func (c *Connection) shutdown(err *Error) {
380	atomic.StoreInt32(&c.closed, 1)
381
382	c.destructor.Do(func() {
383		c.m.Lock()
384		defer c.m.Unlock()
385
386		if err != nil {
387			for _, c := range c.closes {
388				c <- err
389			}
390		}
391
392		if err != nil {
393			c.errors <- err
394		}
395		// Shutdown handler goroutine can still receive the result.
396		close(c.errors)
397
398		for _, c := range c.closes {
399			close(c)
400		}
401
402		for _, c := range c.blocks {
403			close(c)
404		}
405
406		// Shutdown the channel, but do not use closeChannel() as it calls
407		// releaseChannel() which requires the connection lock.
408		//
409		// Ranging over c.channels and calling releaseChannel() that mutates
410		// c.channels is racy - see commit 6063341 for an example.
411		for _, ch := range c.channels {
412			ch.shutdown(err)
413		}
414
415		c.conn.Close()
416
417		c.channels = map[uint16]*Channel{}
418		c.allocator = newAllocator(1, c.Config.ChannelMax)
419		c.noNotify = true
420	})
421}
422
423// All methods sent to the connection channel should be synchronous so we
424// can handle them directly without a framing component
425func (c *Connection) demux(f frame) {
426	if f.channel() == 0 {
427		c.dispatch0(f)
428	} else {
429		c.dispatchN(f)
430	}
431}
432
433func (c *Connection) dispatch0(f frame) {
434	switch mf := f.(type) {
435	case *methodFrame:
436		switch m := mf.Method.(type) {
437		case *connectionClose:
438			// Send immediately as shutdown will close our side of the writer.
439			c.send(&methodFrame{
440				ChannelId: 0,
441				Method:    &connectionCloseOk{},
442			})
443
444			c.shutdown(newError(m.ReplyCode, m.ReplyText))
445		case *connectionBlocked:
446			for _, c := range c.blocks {
447				c <- Blocking{Active: true, Reason: m.Reason}
448			}
449		case *connectionUnblocked:
450			for _, c := range c.blocks {
451				c <- Blocking{Active: false}
452			}
453		default:
454			c.rpc <- m
455		}
456	case *heartbeatFrame:
457		// kthx - all reads reset our deadline.  so we can drop this
458	default:
459		// lolwat - channel0 only responds to methods and heartbeats
460		c.closeWith(ErrUnexpectedFrame)
461	}
462}
463
464func (c *Connection) dispatchN(f frame) {
465	c.m.Lock()
466	channel := c.channels[f.channel()]
467	c.m.Unlock()
468
469	if channel != nil {
470		channel.recv(channel, f)
471	} else {
472		c.dispatchClosed(f)
473	}
474}
475
476// section 2.3.7: "When a peer decides to close a channel or connection, it
477// sends a Close method.  The receiving peer MUST respond to a Close with a
478// Close-Ok, and then both parties can close their channel or connection.  Note
479// that if peers ignore Close, deadlock can happen when both peers send Close
480// at the same time."
481//
482// When we don't have a channel, so we must respond with close-ok on a close
483// method.  This can happen between a channel exception on an asynchronous
484// method like basic.publish and a synchronous close with channel.close.
485// In that case, we'll get both a channel.close and channel.close-ok in any
486// order.
487func (c *Connection) dispatchClosed(f frame) {
488	// Only consider method frames, drop content/header frames
489	if mf, ok := f.(*methodFrame); ok {
490		switch mf.Method.(type) {
491		case *channelClose:
492			c.send(&methodFrame{
493				ChannelId: f.channel(),
494				Method:    &channelCloseOk{},
495			})
496		case *channelCloseOk:
497			// we are already closed, so do nothing
498		default:
499			// unexpected method on closed channel
500			c.closeWith(ErrClosed)
501		}
502	}
503}
504
505// Reads each frame off the IO and hand off to the connection object that
506// will demux the streams and dispatch to one of the opened channels or
507// handle on channel 0 (the connection channel).
508func (c *Connection) reader(r io.Reader) {
509	buf := bufio.NewReader(r)
510	frames := &reader{buf}
511	conn, haveDeadliner := r.(readDeadliner)
512
513	for {
514		frame, err := frames.ReadFrame()
515
516		if err != nil {
517			c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
518			return
519		}
520
521		c.demux(frame)
522
523		if haveDeadliner {
524			c.deadlines <- conn
525		}
526	}
527}
528
529// Ensures that at least one frame is being sent at the tuned interval with a
530// jitter tolerance of 1s
531func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
532	const maxServerHeartbeatsInFlight = 3
533
534	var sendTicks <-chan time.Time
535	if interval > 0 {
536		ticker := time.NewTicker(interval)
537		defer ticker.Stop()
538		sendTicks = ticker.C
539	}
540
541	lastSent := time.Now()
542
543	for {
544		select {
545		case at, stillSending := <-c.sends:
546			// When actively sending, depend on sent frames to reset server timer
547			if stillSending {
548				lastSent = at
549			} else {
550				return
551			}
552
553		case at := <-sendTicks:
554			// When idle, fill the space with a heartbeat frame
555			if at.Sub(lastSent) > interval-time.Second {
556				if err := c.send(&heartbeatFrame{}); err != nil {
557					// send heartbeats even after close/closeOk so we
558					// tick until the connection starts erroring
559					return
560				}
561			}
562
563		case conn := <-c.deadlines:
564			// When reading, reset our side of the deadline, if we've negotiated one with
565			// a deadline that covers at least 2 server heartbeats
566			if interval > 0 {
567				conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
568			}
569
570		case <-done:
571			return
572		}
573	}
574}
575
576// Convenience method to inspect the Connection.Properties["capabilities"]
577// Table for server identified capabilities like "basic.ack" or
578// "confirm.select".
579func (c *Connection) isCapable(featureName string) bool {
580	capabilities, _ := c.Properties["capabilities"].(Table)
581	hasFeature, _ := capabilities[featureName].(bool)
582	return hasFeature
583}
584
585// allocateChannel records but does not open a new channel with a unique id.
586// This method is the initial part of the channel lifecycle and paired with
587// releaseChannel
588func (c *Connection) allocateChannel() (*Channel, error) {
589	c.m.Lock()
590	defer c.m.Unlock()
591
592	if c.isClosed() {
593		return nil, ErrClosed
594	}
595
596	id, ok := c.allocator.next()
597	if !ok {
598		return nil, ErrChannelMax
599	}
600
601	ch := newChannel(c, uint16(id))
602	c.channels[uint16(id)] = ch
603
604	return ch, nil
605}
606
607// releaseChannel removes a channel from the registry as the final part of the
608// channel lifecycle
609func (c *Connection) releaseChannel(id uint16) {
610	c.m.Lock()
611	defer c.m.Unlock()
612
613	delete(c.channels, id)
614	c.allocator.release(int(id))
615}
616
617// openChannel allocates and opens a channel, must be paired with closeChannel
618func (c *Connection) openChannel() (*Channel, error) {
619	ch, err := c.allocateChannel()
620	if err != nil {
621		return nil, err
622	}
623
624	if err := ch.open(); err != nil {
625		c.releaseChannel(ch.id)
626		return nil, err
627	}
628	return ch, nil
629}
630
631// closeChannel releases and initiates a shutdown of the channel.  All channel
632// closures should be initiated here for proper channel lifecycle management on
633// this connection.
634func (c *Connection) closeChannel(ch *Channel, e *Error) {
635	ch.shutdown(e)
636	c.releaseChannel(ch.id)
637}
638
639/*
640Channel opens a unique, concurrent server channel to process the bulk of AMQP
641messages.  Any error from methods on this receiver will render the receiver
642invalid and a new Channel should be opened.
643
644*/
645func (c *Connection) Channel() (*Channel, error) {
646	return c.openChannel()
647}
648
649func (c *Connection) call(req message, res ...message) error {
650	// Special case for when the protocol header frame is sent insted of a
651	// request method
652	if req != nil {
653		if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
654			return err
655		}
656	}
657
658	select {
659	case err, ok := <-c.errors:
660		if !ok {
661			return ErrClosed
662		}
663		return err
664
665	case msg := <-c.rpc:
666		// Try to match one of the result types
667		for _, try := range res {
668			if reflect.TypeOf(msg) == reflect.TypeOf(try) {
669				// *res = *msg
670				vres := reflect.ValueOf(try).Elem()
671				vmsg := reflect.ValueOf(msg).Elem()
672				vres.Set(vmsg)
673				return nil
674			}
675		}
676		return ErrCommandInvalid
677	}
678	// unreachable
679}
680
681//    Connection          = open-Connection *use-Connection close-Connection
682//    open-Connection     = C:protocol-header
683//                          S:START C:START-OK
684//                          *challenge
685//                          S:TUNE C:TUNE-OK
686//                          C:OPEN S:OPEN-OK
687//    challenge           = S:SECURE C:SECURE-OK
688//    use-Connection      = *channel
689//    close-Connection    = C:CLOSE S:CLOSE-OK
690//                        / S:CLOSE C:CLOSE-OK
691func (c *Connection) open(config Config) error {
692	if err := c.send(&protocolHeader{}); err != nil {
693		return err
694	}
695
696	return c.openStart(config)
697}
698
699func (c *Connection) openStart(config Config) error {
700	start := &connectionStart{}
701
702	if err := c.call(nil, start); err != nil {
703		return err
704	}
705
706	c.Major = int(start.VersionMajor)
707	c.Minor = int(start.VersionMinor)
708	c.Properties = Table(start.ServerProperties)
709	c.Locales = strings.Split(start.Locales, " ")
710
711	// eventually support challenge/response here by also responding to
712	// connectionSecure.
713	auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
714	if !ok {
715		return ErrSASL
716	}
717
718	// Save this mechanism off as the one we chose
719	c.Config.SASL = []Authentication{auth}
720
721	// Set the connection locale to client locale
722	c.Config.Locale = config.Locale
723
724	return c.openTune(config, auth)
725}
726
727func (c *Connection) openTune(config Config, auth Authentication) error {
728	if len(config.Properties) == 0 {
729		config.Properties = Table{
730			"product": defaultProduct,
731			"version": defaultVersion,
732		}
733	}
734
735	config.Properties["capabilities"] = Table{
736		"connection.blocked":     true,
737		"consumer_cancel_notify": true,
738	}
739
740	ok := &connectionStartOk{
741		ClientProperties: config.Properties,
742		Mechanism:        auth.Mechanism(),
743		Response:         auth.Response(),
744		Locale:           config.Locale,
745	}
746	tune := &connectionTune{}
747
748	if err := c.call(ok, tune); err != nil {
749		// per spec, a connection can only be closed when it has been opened
750		// so at this point, we know it's an auth error, but the socket
751		// was closed instead.  Return a meaningful error.
752		return ErrCredentials
753	}
754
755	// When the server and client both use default 0, then the max channel is
756	// only limited by uint16.
757	c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
758	if c.Config.ChannelMax == 0 {
759		c.Config.ChannelMax = defaultChannelMax
760	}
761	c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
762
763	// Frame size includes headers and end byte (len(payload)+8), even if
764	// this is less than FrameMinSize, use what the server sends because the
765	// alternative is to stop the handshake here.
766	c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
767
768	// Save this off for resetDeadline()
769	c.Config.Heartbeat = time.Second * time.Duration(pick(
770		int(config.Heartbeat/time.Second),
771		int(tune.Heartbeat)))
772
773	// "The client should start sending heartbeats after receiving a
774	// Connection.Tune method"
775	go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1)))
776
777	if err := c.send(&methodFrame{
778		ChannelId: 0,
779		Method: &connectionTuneOk{
780			ChannelMax: uint16(c.Config.ChannelMax),
781			FrameMax:   uint32(c.Config.FrameSize),
782			Heartbeat:  uint16(c.Config.Heartbeat / time.Second),
783		},
784	}); err != nil {
785		return err
786	}
787
788	return c.openVhost(config)
789}
790
791func (c *Connection) openVhost(config Config) error {
792	req := &connectionOpen{VirtualHost: config.Vhost}
793	res := &connectionOpenOk{}
794
795	if err := c.call(req, res); err != nil {
796		// Cannot be closed yet, but we know it's a vhost problem
797		return ErrVhost
798	}
799
800	c.Config.Vhost = config.Vhost
801
802	return c.openComplete()
803}
804
805// openComplete performs any final Connection initialization dependent on the
806// connection handshake and clears any state needed for TLS and AMQP handshaking.
807func (c *Connection) openComplete() error {
808	// We clear the deadlines and let the heartbeater reset the read deadline if requested.
809	// RabbitMQ uses TCP flow control at this point for pushback so Writes can
810	// intentionally block.
811	if deadliner, ok := c.conn.(interface {
812		SetDeadline(time.Time) error
813	}); ok {
814		_ = deadliner.SetDeadline(time.Time{})
815	}
816
817	c.allocator = newAllocator(1, c.Config.ChannelMax)
818	return nil
819}
820
821func max(a, b int) int {
822	if a > b {
823		return a
824	}
825	return b
826}
827
828func min(a, b int) int {
829	if a < b {
830		return a
831	}
832	return b
833}
834
835func pick(client, server int) int {
836	if client == 0 || server == 0 {
837		return max(client, server)
838	}
839	return min(client, server)
840}
841