1// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4// Source code and contact info at http://github.com/streadway/amqp 5 6package amqp 7 8import ( 9 "bufio" 10 "crypto/tls" 11 "io" 12 "net" 13 "reflect" 14 "strconv" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19) 20 21const ( 22 maxChannelMax = (2 << 15) - 1 23 24 defaultHeartbeat = 10 * time.Second 25 defaultConnectionTimeout = 30 * time.Second 26 defaultProduct = "https://github.com/streadway/amqp" 27 defaultVersion = "β" 28 defaultChannelMax = maxChannelMax 29 defaultLocale = "en_US" 30) 31 32// Config is used in DialConfig and Open to specify the desired tuning 33// parameters used during a connection open handshake. The negotiated tuning 34// will be stored in the returned connection's Config field. 35type Config struct { 36 // The SASL mechanisms to try in the client request, and the successful 37 // mechanism used on the Connection object. 38 // If SASL is nil, PlainAuth from the URL is used. 39 SASL []Authentication 40 41 // Vhost specifies the namespace of permissions, exchanges, queues and 42 // bindings on the server. Dial sets this to the path parsed from the URL. 43 Vhost string 44 45 ChannelMax int // 0 max channels means 2^16 - 1 46 FrameSize int // 0 max bytes means unlimited 47 Heartbeat time.Duration // less than 1s uses the server's interval 48 49 // TLSClientConfig specifies the client configuration of the TLS connection 50 // when establishing a tls transport. 51 // If the URL uses an amqps scheme, then an empty tls.Config with the 52 // ServerName from the URL is used. 53 TLSClientConfig *tls.Config 54 55 // Properties is table of properties that the client advertises to the server. 56 // This is an optional setting - if the application does not set this, 57 // the underlying library will use a generic set of client properties. 58 Properties Table 59 60 // Connection locale that we expect to always be en_US 61 // Even though servers must return it as per the AMQP 0-9-1 spec, 62 // we are not aware of it being used other than to satisfy the spec requirements 63 Locale string 64 65 // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig, 66 // then an AMQP connection handshake. 67 // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is 68 // used during TLS and AMQP handshaking. 69 Dial func(network, addr string) (net.Conn, error) 70} 71 72// Connection manages the serialization and deserialization of frames from IO 73// and dispatches the frames to the appropriate channel. All RPC methods and 74// asyncronous Publishing, Delivery, Ack, Nack and Return messages are 75// multiplexed on this channel. There must always be active receivers for 76// every asynchronous message on this connection. 77type Connection struct { 78 destructor sync.Once // shutdown once 79 sendM sync.Mutex // conn writer mutex 80 m sync.Mutex // struct field mutex 81 82 conn io.ReadWriteCloser 83 84 rpc chan message 85 writer *writer 86 sends chan time.Time // timestamps of each frame sent 87 deadlines chan readDeadliner // heartbeater updates read deadlines 88 89 allocator *allocator // id generator valid after openTune 90 channels map[uint16]*Channel 91 92 noNotify bool // true when we will never notify again 93 closes []chan *Error 94 blocks []chan Blocking 95 96 errors chan *Error 97 98 Config Config // The negotiated Config after connection.open 99 100 Major int // Server's major version 101 Minor int // Server's minor version 102 Properties Table // Server properties 103 Locales []string // Server locales 104 105 closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic 106} 107 108type readDeadliner interface { 109 SetReadDeadline(time.Time) error 110} 111 112// defaultDial establishes a connection when config.Dial is not provided 113func defaultDial(network, addr string) (net.Conn, error) { 114 conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout) 115 if err != nil { 116 return nil, err 117 } 118 119 // Heartbeating hasn't started yet, don't stall forever on a dead server. 120 // A deadline is set for TLS and AMQP handshaking. After AMQP is established, 121 // the deadline is cleared in openComplete. 122 if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil { 123 return nil, err 124 } 125 126 return conn, nil 127} 128 129// Dial accepts a string in the AMQP URI format and returns a new Connection 130// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 131// seconds and sets the handshake deadline to 30 seconds. After handshake, 132// deadlines are cleared. 133// 134// Dial uses the zero value of tls.Config when it encounters an amqps:// 135// scheme. It is equivalent to calling DialTLS(amqp, nil). 136func Dial(url string) (*Connection, error) { 137 return DialConfig(url, Config{ 138 Heartbeat: defaultHeartbeat, 139 Locale: defaultLocale, 140 }) 141} 142 143// DialTLS accepts a string in the AMQP URI format and returns a new Connection 144// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 145// seconds and sets the initial read deadline to 30 seconds. 146// 147// DialTLS uses the provided tls.Config when encountering an amqps:// scheme. 148func DialTLS(url string, amqps *tls.Config) (*Connection, error) { 149 return DialConfig(url, Config{ 150 Heartbeat: defaultHeartbeat, 151 TLSClientConfig: amqps, 152 Locale: defaultLocale, 153 }) 154} 155 156// DialConfig accepts a string in the AMQP URI format and a configuration for 157// the transport and connection setup, returning a new Connection. Defaults to 158// a server heartbeat interval of 10 seconds and sets the initial read deadline 159// to 30 seconds. 160func DialConfig(url string, config Config) (*Connection, error) { 161 var err error 162 var conn net.Conn 163 164 uri, err := ParseURI(url) 165 if err != nil { 166 return nil, err 167 } 168 169 if config.SASL == nil { 170 config.SASL = []Authentication{uri.PlainAuth()} 171 } 172 173 if config.Vhost == "" { 174 config.Vhost = uri.Vhost 175 } 176 177 addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10)) 178 179 dialer := config.Dial 180 if dialer == nil { 181 dialer = defaultDial 182 } 183 184 conn, err = dialer("tcp", addr) 185 if err != nil { 186 return nil, err 187 } 188 189 if uri.Scheme == "amqps" { 190 if config.TLSClientConfig == nil { 191 config.TLSClientConfig = new(tls.Config) 192 } 193 194 // If ServerName has not been specified in TLSClientConfig, 195 // set it to the URI host used for this connection. 196 if config.TLSClientConfig.ServerName == "" { 197 config.TLSClientConfig.ServerName = uri.Host 198 } 199 200 client := tls.Client(conn, config.TLSClientConfig) 201 if err := client.Handshake(); err != nil { 202 conn.Close() 203 return nil, err 204 } 205 206 conn = client 207 } 208 209 return Open(conn, config) 210} 211 212/* 213Open accepts an already established connection, or other io.ReadWriteCloser as 214a transport. Use this method if you have established a TLS connection or wish 215to use your own custom transport. 216 217*/ 218func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { 219 c := &Connection{ 220 conn: conn, 221 writer: &writer{bufio.NewWriter(conn)}, 222 channels: make(map[uint16]*Channel), 223 rpc: make(chan message), 224 sends: make(chan time.Time), 225 errors: make(chan *Error, 1), 226 deadlines: make(chan readDeadliner, 1), 227 } 228 go c.reader(conn) 229 return c, c.open(config) 230} 231 232/* 233LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr) 234as a fallback default value if the underlying transport does not support LocalAddr(). 235*/ 236func (c *Connection) LocalAddr() net.Addr { 237 if conn, ok := c.conn.(interface { 238 LocalAddr() net.Addr 239 }); ok { 240 return conn.LocalAddr() 241 } 242 return &net.TCPAddr{} 243} 244 245// ConnectionState returns basic TLS details of the underlying transport. 246// Returns a zero value when the underlying connection does not implement 247// ConnectionState() tls.ConnectionState. 248func (c *Connection) ConnectionState() tls.ConnectionState { 249 if conn, ok := c.conn.(interface { 250 ConnectionState() tls.ConnectionState 251 }); ok { 252 return conn.ConnectionState() 253 } 254 return tls.ConnectionState{} 255} 256 257/* 258NotifyClose registers a listener for close events either initiated by an error 259accompaning a connection.close method or by a normal shutdown. 260 261On normal shutdowns, the chan will be closed. 262 263To reconnect after a transport or protocol error, register a listener here and 264re-run your setup process. 265 266*/ 267func (c *Connection) NotifyClose(receiver chan *Error) chan *Error { 268 c.m.Lock() 269 defer c.m.Unlock() 270 271 if c.noNotify { 272 close(receiver) 273 } else { 274 c.closes = append(c.closes, receiver) 275 } 276 277 return receiver 278} 279 280/* 281NotifyBlocked registers a listener for RabbitMQ specific TCP flow control 282method extensions connection.blocked and connection.unblocked. Flow control is 283active with a reason when Blocking.Blocked is true. When a Connection is 284blocked, all methods will block across all connections until server resources 285become free again. 286 287This optional extension is supported by the server when the 288"connection.blocked" server capability key is true. 289 290*/ 291func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking { 292 c.m.Lock() 293 defer c.m.Unlock() 294 295 if c.noNotify { 296 close(receiver) 297 } else { 298 c.blocks = append(c.blocks, receiver) 299 } 300 301 return receiver 302} 303 304/* 305Close requests and waits for the response to close the AMQP connection. 306 307It's advisable to use this message when publishing to ensure all kernel buffers 308have been flushed on the server and client before exiting. 309 310An error indicates that server may not have received this request to close but 311the connection should be treated as closed regardless. 312 313After returning from this call, all resources associated with this connection, 314including the underlying io, Channels, Notify listeners and Channel consumers 315will also be closed. 316*/ 317func (c *Connection) Close() error { 318 if c.isClosed() { 319 return ErrClosed 320 } 321 322 defer c.shutdown(nil) 323 return c.call( 324 &connectionClose{ 325 ReplyCode: replySuccess, 326 ReplyText: "kthxbai", 327 }, 328 &connectionCloseOk{}, 329 ) 330} 331 332func (c *Connection) closeWith(err *Error) error { 333 if c.isClosed() { 334 return ErrClosed 335 } 336 337 defer c.shutdown(err) 338 return c.call( 339 &connectionClose{ 340 ReplyCode: uint16(err.Code), 341 ReplyText: err.Reason, 342 }, 343 &connectionCloseOk{}, 344 ) 345} 346 347func (c *Connection) isClosed() bool { 348 return (atomic.LoadInt32(&c.closed) == 1) 349} 350 351func (c *Connection) send(f frame) error { 352 if c.isClosed() { 353 return ErrClosed 354 } 355 356 c.sendM.Lock() 357 err := c.writer.WriteFrame(f) 358 c.sendM.Unlock() 359 360 if err != nil { 361 // shutdown could be re-entrant from signaling notify chans 362 go c.shutdown(&Error{ 363 Code: FrameError, 364 Reason: err.Error(), 365 }) 366 } else { 367 // Broadcast we sent a frame, reducing heartbeats, only 368 // if there is something that can receive - like a non-reentrant 369 // call or if the heartbeater isn't running 370 select { 371 case c.sends <- time.Now(): 372 default: 373 } 374 } 375 376 return err 377} 378 379func (c *Connection) shutdown(err *Error) { 380 atomic.StoreInt32(&c.closed, 1) 381 382 c.destructor.Do(func() { 383 c.m.Lock() 384 defer c.m.Unlock() 385 386 if err != nil { 387 for _, c := range c.closes { 388 c <- err 389 } 390 } 391 392 if err != nil { 393 c.errors <- err 394 } 395 // Shutdown handler goroutine can still receive the result. 396 close(c.errors) 397 398 for _, c := range c.closes { 399 close(c) 400 } 401 402 for _, c := range c.blocks { 403 close(c) 404 } 405 406 // Shutdown the channel, but do not use closeChannel() as it calls 407 // releaseChannel() which requires the connection lock. 408 // 409 // Ranging over c.channels and calling releaseChannel() that mutates 410 // c.channels is racy - see commit 6063341 for an example. 411 for _, ch := range c.channels { 412 ch.shutdown(err) 413 } 414 415 c.conn.Close() 416 417 c.channels = map[uint16]*Channel{} 418 c.allocator = newAllocator(1, c.Config.ChannelMax) 419 c.noNotify = true 420 }) 421} 422 423// All methods sent to the connection channel should be synchronous so we 424// can handle them directly without a framing component 425func (c *Connection) demux(f frame) { 426 if f.channel() == 0 { 427 c.dispatch0(f) 428 } else { 429 c.dispatchN(f) 430 } 431} 432 433func (c *Connection) dispatch0(f frame) { 434 switch mf := f.(type) { 435 case *methodFrame: 436 switch m := mf.Method.(type) { 437 case *connectionClose: 438 // Send immediately as shutdown will close our side of the writer. 439 c.send(&methodFrame{ 440 ChannelId: 0, 441 Method: &connectionCloseOk{}, 442 }) 443 444 c.shutdown(newError(m.ReplyCode, m.ReplyText)) 445 case *connectionBlocked: 446 for _, c := range c.blocks { 447 c <- Blocking{Active: true, Reason: m.Reason} 448 } 449 case *connectionUnblocked: 450 for _, c := range c.blocks { 451 c <- Blocking{Active: false} 452 } 453 default: 454 c.rpc <- m 455 } 456 case *heartbeatFrame: 457 // kthx - all reads reset our deadline. so we can drop this 458 default: 459 // lolwat - channel0 only responds to methods and heartbeats 460 c.closeWith(ErrUnexpectedFrame) 461 } 462} 463 464func (c *Connection) dispatchN(f frame) { 465 c.m.Lock() 466 channel := c.channels[f.channel()] 467 c.m.Unlock() 468 469 if channel != nil { 470 channel.recv(channel, f) 471 } else { 472 c.dispatchClosed(f) 473 } 474} 475 476// section 2.3.7: "When a peer decides to close a channel or connection, it 477// sends a Close method. The receiving peer MUST respond to a Close with a 478// Close-Ok, and then both parties can close their channel or connection. Note 479// that if peers ignore Close, deadlock can happen when both peers send Close 480// at the same time." 481// 482// When we don't have a channel, so we must respond with close-ok on a close 483// method. This can happen between a channel exception on an asynchronous 484// method like basic.publish and a synchronous close with channel.close. 485// In that case, we'll get both a channel.close and channel.close-ok in any 486// order. 487func (c *Connection) dispatchClosed(f frame) { 488 // Only consider method frames, drop content/header frames 489 if mf, ok := f.(*methodFrame); ok { 490 switch mf.Method.(type) { 491 case *channelClose: 492 c.send(&methodFrame{ 493 ChannelId: f.channel(), 494 Method: &channelCloseOk{}, 495 }) 496 case *channelCloseOk: 497 // we are already closed, so do nothing 498 default: 499 // unexpected method on closed channel 500 c.closeWith(ErrClosed) 501 } 502 } 503} 504 505// Reads each frame off the IO and hand off to the connection object that 506// will demux the streams and dispatch to one of the opened channels or 507// handle on channel 0 (the connection channel). 508func (c *Connection) reader(r io.Reader) { 509 buf := bufio.NewReader(r) 510 frames := &reader{buf} 511 conn, haveDeadliner := r.(readDeadliner) 512 513 for { 514 frame, err := frames.ReadFrame() 515 516 if err != nil { 517 c.shutdown(&Error{Code: FrameError, Reason: err.Error()}) 518 return 519 } 520 521 c.demux(frame) 522 523 if haveDeadliner { 524 c.deadlines <- conn 525 } 526 } 527} 528 529// Ensures that at least one frame is being sent at the tuned interval with a 530// jitter tolerance of 1s 531func (c *Connection) heartbeater(interval time.Duration, done chan *Error) { 532 const maxServerHeartbeatsInFlight = 3 533 534 var sendTicks <-chan time.Time 535 if interval > 0 { 536 ticker := time.NewTicker(interval) 537 defer ticker.Stop() 538 sendTicks = ticker.C 539 } 540 541 lastSent := time.Now() 542 543 for { 544 select { 545 case at, stillSending := <-c.sends: 546 // When actively sending, depend on sent frames to reset server timer 547 if stillSending { 548 lastSent = at 549 } else { 550 return 551 } 552 553 case at := <-sendTicks: 554 // When idle, fill the space with a heartbeat frame 555 if at.Sub(lastSent) > interval-time.Second { 556 if err := c.send(&heartbeatFrame{}); err != nil { 557 // send heartbeats even after close/closeOk so we 558 // tick until the connection starts erroring 559 return 560 } 561 } 562 563 case conn := <-c.deadlines: 564 // When reading, reset our side of the deadline, if we've negotiated one with 565 // a deadline that covers at least 2 server heartbeats 566 if interval > 0 { 567 conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)) 568 } 569 570 case <-done: 571 return 572 } 573 } 574} 575 576// Convenience method to inspect the Connection.Properties["capabilities"] 577// Table for server identified capabilities like "basic.ack" or 578// "confirm.select". 579func (c *Connection) isCapable(featureName string) bool { 580 capabilities, _ := c.Properties["capabilities"].(Table) 581 hasFeature, _ := capabilities[featureName].(bool) 582 return hasFeature 583} 584 585// allocateChannel records but does not open a new channel with a unique id. 586// This method is the initial part of the channel lifecycle and paired with 587// releaseChannel 588func (c *Connection) allocateChannel() (*Channel, error) { 589 c.m.Lock() 590 defer c.m.Unlock() 591 592 if c.isClosed() { 593 return nil, ErrClosed 594 } 595 596 id, ok := c.allocator.next() 597 if !ok { 598 return nil, ErrChannelMax 599 } 600 601 ch := newChannel(c, uint16(id)) 602 c.channels[uint16(id)] = ch 603 604 return ch, nil 605} 606 607// releaseChannel removes a channel from the registry as the final part of the 608// channel lifecycle 609func (c *Connection) releaseChannel(id uint16) { 610 c.m.Lock() 611 defer c.m.Unlock() 612 613 delete(c.channels, id) 614 c.allocator.release(int(id)) 615} 616 617// openChannel allocates and opens a channel, must be paired with closeChannel 618func (c *Connection) openChannel() (*Channel, error) { 619 ch, err := c.allocateChannel() 620 if err != nil { 621 return nil, err 622 } 623 624 if err := ch.open(); err != nil { 625 c.releaseChannel(ch.id) 626 return nil, err 627 } 628 return ch, nil 629} 630 631// closeChannel releases and initiates a shutdown of the channel. All channel 632// closures should be initiated here for proper channel lifecycle management on 633// this connection. 634func (c *Connection) closeChannel(ch *Channel, e *Error) { 635 ch.shutdown(e) 636 c.releaseChannel(ch.id) 637} 638 639/* 640Channel opens a unique, concurrent server channel to process the bulk of AMQP 641messages. Any error from methods on this receiver will render the receiver 642invalid and a new Channel should be opened. 643 644*/ 645func (c *Connection) Channel() (*Channel, error) { 646 return c.openChannel() 647} 648 649func (c *Connection) call(req message, res ...message) error { 650 // Special case for when the protocol header frame is sent insted of a 651 // request method 652 if req != nil { 653 if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil { 654 return err 655 } 656 } 657 658 select { 659 case err, ok := <-c.errors: 660 if !ok { 661 return ErrClosed 662 } 663 return err 664 665 case msg := <-c.rpc: 666 // Try to match one of the result types 667 for _, try := range res { 668 if reflect.TypeOf(msg) == reflect.TypeOf(try) { 669 // *res = *msg 670 vres := reflect.ValueOf(try).Elem() 671 vmsg := reflect.ValueOf(msg).Elem() 672 vres.Set(vmsg) 673 return nil 674 } 675 } 676 return ErrCommandInvalid 677 } 678 // unreachable 679} 680 681// Connection = open-Connection *use-Connection close-Connection 682// open-Connection = C:protocol-header 683// S:START C:START-OK 684// *challenge 685// S:TUNE C:TUNE-OK 686// C:OPEN S:OPEN-OK 687// challenge = S:SECURE C:SECURE-OK 688// use-Connection = *channel 689// close-Connection = C:CLOSE S:CLOSE-OK 690// / S:CLOSE C:CLOSE-OK 691func (c *Connection) open(config Config) error { 692 if err := c.send(&protocolHeader{}); err != nil { 693 return err 694 } 695 696 return c.openStart(config) 697} 698 699func (c *Connection) openStart(config Config) error { 700 start := &connectionStart{} 701 702 if err := c.call(nil, start); err != nil { 703 return err 704 } 705 706 c.Major = int(start.VersionMajor) 707 c.Minor = int(start.VersionMinor) 708 c.Properties = Table(start.ServerProperties) 709 c.Locales = strings.Split(start.Locales, " ") 710 711 // eventually support challenge/response here by also responding to 712 // connectionSecure. 713 auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " ")) 714 if !ok { 715 return ErrSASL 716 } 717 718 // Save this mechanism off as the one we chose 719 c.Config.SASL = []Authentication{auth} 720 721 // Set the connection locale to client locale 722 c.Config.Locale = config.Locale 723 724 return c.openTune(config, auth) 725} 726 727func (c *Connection) openTune(config Config, auth Authentication) error { 728 if len(config.Properties) == 0 { 729 config.Properties = Table{ 730 "product": defaultProduct, 731 "version": defaultVersion, 732 } 733 } 734 735 config.Properties["capabilities"] = Table{ 736 "connection.blocked": true, 737 "consumer_cancel_notify": true, 738 } 739 740 ok := &connectionStartOk{ 741 ClientProperties: config.Properties, 742 Mechanism: auth.Mechanism(), 743 Response: auth.Response(), 744 Locale: config.Locale, 745 } 746 tune := &connectionTune{} 747 748 if err := c.call(ok, tune); err != nil { 749 // per spec, a connection can only be closed when it has been opened 750 // so at this point, we know it's an auth error, but the socket 751 // was closed instead. Return a meaningful error. 752 return ErrCredentials 753 } 754 755 // When the server and client both use default 0, then the max channel is 756 // only limited by uint16. 757 c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax)) 758 if c.Config.ChannelMax == 0 { 759 c.Config.ChannelMax = defaultChannelMax 760 } 761 c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax) 762 763 // Frame size includes headers and end byte (len(payload)+8), even if 764 // this is less than FrameMinSize, use what the server sends because the 765 // alternative is to stop the handshake here. 766 c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax)) 767 768 // Save this off for resetDeadline() 769 c.Config.Heartbeat = time.Second * time.Duration(pick( 770 int(config.Heartbeat/time.Second), 771 int(tune.Heartbeat))) 772 773 // "The client should start sending heartbeats after receiving a 774 // Connection.Tune method" 775 go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1))) 776 777 if err := c.send(&methodFrame{ 778 ChannelId: 0, 779 Method: &connectionTuneOk{ 780 ChannelMax: uint16(c.Config.ChannelMax), 781 FrameMax: uint32(c.Config.FrameSize), 782 Heartbeat: uint16(c.Config.Heartbeat / time.Second), 783 }, 784 }); err != nil { 785 return err 786 } 787 788 return c.openVhost(config) 789} 790 791func (c *Connection) openVhost(config Config) error { 792 req := &connectionOpen{VirtualHost: config.Vhost} 793 res := &connectionOpenOk{} 794 795 if err := c.call(req, res); err != nil { 796 // Cannot be closed yet, but we know it's a vhost problem 797 return ErrVhost 798 } 799 800 c.Config.Vhost = config.Vhost 801 802 return c.openComplete() 803} 804 805// openComplete performs any final Connection initialization dependent on the 806// connection handshake and clears any state needed for TLS and AMQP handshaking. 807func (c *Connection) openComplete() error { 808 // We clear the deadlines and let the heartbeater reset the read deadline if requested. 809 // RabbitMQ uses TCP flow control at this point for pushback so Writes can 810 // intentionally block. 811 if deadliner, ok := c.conn.(interface { 812 SetDeadline(time.Time) error 813 }); ok { 814 _ = deadliner.SetDeadline(time.Time{}) 815 } 816 817 c.allocator = newAllocator(1, c.Config.ChannelMax) 818 return nil 819} 820 821func max(a, b int) int { 822 if a > b { 823 return a 824 } 825 return b 826} 827 828func min(a, b int) int { 829 if a < b { 830 return a 831 } 832 return b 833} 834 835func pick(client, server int) int { 836 if client == 0 || server == 0 { 837 return max(client, server) 838 } 839 return min(client, server) 840} 841