1package sarama 2 3import ( 4 "crypto/tls" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "net" 9 "strconv" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/rcrowley/go-metrics" 15) 16 17// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. 18type Broker struct { 19 id int32 20 addr string 21 rack *string 22 23 conf *Config 24 correlationID int32 25 conn net.Conn 26 connErr error 27 lock sync.Mutex 28 opened int32 29 30 responses chan responsePromise 31 done chan bool 32 33 incomingByteRate metrics.Meter 34 requestRate metrics.Meter 35 requestSize metrics.Histogram 36 requestLatency metrics.Histogram 37 outgoingByteRate metrics.Meter 38 responseRate metrics.Meter 39 responseSize metrics.Histogram 40 brokerIncomingByteRate metrics.Meter 41 brokerRequestRate metrics.Meter 42 brokerRequestSize metrics.Histogram 43 brokerRequestLatency metrics.Histogram 44 brokerOutgoingByteRate metrics.Meter 45 brokerResponseRate metrics.Meter 46 brokerResponseSize metrics.Histogram 47} 48 49type responsePromise struct { 50 requestTime time.Time 51 correlationID int32 52 packets chan []byte 53 errors chan error 54} 55 56// NewBroker creates and returns a Broker targeting the given host:port address. 57// This does not attempt to actually connect, you have to call Open() for that. 58func NewBroker(addr string) *Broker { 59 return &Broker{id: -1, addr: addr} 60} 61 62// Open tries to connect to the Broker if it is not already connected or connecting, but does not block 63// waiting for the connection to complete. This means that any subsequent operations on the broker will 64// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, 65// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or 66// AlreadyConnected. If conf is nil, the result of NewConfig() is used. 67func (b *Broker) Open(conf *Config) error { 68 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { 69 return ErrAlreadyConnected 70 } 71 72 if conf == nil { 73 conf = NewConfig() 74 } 75 76 err := conf.Validate() 77 if err != nil { 78 return err 79 } 80 81 b.lock.Lock() 82 83 go withRecover(func() { 84 defer b.lock.Unlock() 85 86 dialer := net.Dialer{ 87 Timeout: conf.Net.DialTimeout, 88 KeepAlive: conf.Net.KeepAlive, 89 LocalAddr: conf.Net.LocalAddr, 90 } 91 92 if conf.Net.TLS.Enable { 93 b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config) 94 } else { 95 b.conn, b.connErr = dialer.Dial("tcp", b.addr) 96 } 97 if b.connErr != nil { 98 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) 99 b.conn = nil 100 atomic.StoreInt32(&b.opened, 0) 101 return 102 } 103 b.conn = newBufConn(b.conn) 104 105 b.conf = conf 106 107 // Create or reuse the global metrics shared between brokers 108 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry) 109 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry) 110 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry) 111 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry) 112 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry) 113 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry) 114 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry) 115 // Do not gather metrics for seeded broker (only used during bootstrap) because they share 116 // the same id (-1) and are already exposed through the global metrics above 117 if b.id >= 0 { 118 b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry) 119 b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry) 120 b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry) 121 b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry) 122 b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry) 123 b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry) 124 b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry) 125 } 126 127 if conf.Net.SASL.Enable { 128 b.connErr = b.sendAndReceiveSASLPlainAuth() 129 if b.connErr != nil { 130 err = b.conn.Close() 131 if err == nil { 132 Logger.Printf("Closed connection to broker %s\n", b.addr) 133 } else { 134 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) 135 } 136 b.conn = nil 137 atomic.StoreInt32(&b.opened, 0) 138 return 139 } 140 } 141 142 b.done = make(chan bool) 143 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1) 144 145 if b.id >= 0 { 146 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id) 147 } else { 148 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr) 149 } 150 go withRecover(b.responseReceiver) 151 }) 152 153 return nil 154} 155 156// Connected returns true if the broker is connected and false otherwise. If the broker is not 157// connected but it had tried to connect, the error from that connection attempt is also returned. 158func (b *Broker) Connected() (bool, error) { 159 b.lock.Lock() 160 defer b.lock.Unlock() 161 162 return b.conn != nil, b.connErr 163} 164 165func (b *Broker) Close() error { 166 b.lock.Lock() 167 defer b.lock.Unlock() 168 169 if b.conn == nil { 170 return ErrNotConnected 171 } 172 173 close(b.responses) 174 <-b.done 175 176 err := b.conn.Close() 177 178 b.conn = nil 179 b.connErr = nil 180 b.done = nil 181 b.responses = nil 182 183 if b.id >= 0 { 184 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b)) 185 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b)) 186 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b)) 187 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b)) 188 } 189 190 if err == nil { 191 Logger.Printf("Closed connection to broker %s\n", b.addr) 192 } else { 193 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) 194 } 195 196 atomic.StoreInt32(&b.opened, 0) 197 198 return err 199} 200 201// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known. 202func (b *Broker) ID() int32 { 203 return b.id 204} 205 206// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker. 207func (b *Broker) Addr() string { 208 return b.addr 209} 210 211func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) { 212 response := new(MetadataResponse) 213 214 err := b.sendAndReceive(request, response) 215 216 if err != nil { 217 return nil, err 218 } 219 220 return response, nil 221} 222 223func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) { 224 response := new(ConsumerMetadataResponse) 225 226 err := b.sendAndReceive(request, response) 227 228 if err != nil { 229 return nil, err 230 } 231 232 return response, nil 233} 234 235func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) { 236 response := new(FindCoordinatorResponse) 237 238 err := b.sendAndReceive(request, response) 239 240 if err != nil { 241 return nil, err 242 } 243 244 return response, nil 245} 246 247func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) { 248 response := new(OffsetResponse) 249 250 err := b.sendAndReceive(request, response) 251 252 if err != nil { 253 return nil, err 254 } 255 256 return response, nil 257} 258 259func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { 260 var response *ProduceResponse 261 var err error 262 263 if request.RequiredAcks == NoResponse { 264 err = b.sendAndReceive(request, nil) 265 } else { 266 response = new(ProduceResponse) 267 err = b.sendAndReceive(request, response) 268 } 269 270 if err != nil { 271 return nil, err 272 } 273 274 return response, nil 275} 276 277func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) { 278 response := new(FetchResponse) 279 280 err := b.sendAndReceive(request, response) 281 282 if err != nil { 283 return nil, err 284 } 285 286 return response, nil 287} 288 289func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) { 290 response := new(OffsetCommitResponse) 291 292 err := b.sendAndReceive(request, response) 293 294 if err != nil { 295 return nil, err 296 } 297 298 return response, nil 299} 300 301func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) { 302 response := new(OffsetFetchResponse) 303 304 err := b.sendAndReceive(request, response) 305 306 if err != nil { 307 return nil, err 308 } 309 310 return response, nil 311} 312 313func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) { 314 response := new(JoinGroupResponse) 315 316 err := b.sendAndReceive(request, response) 317 if err != nil { 318 return nil, err 319 } 320 321 return response, nil 322} 323 324func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) { 325 response := new(SyncGroupResponse) 326 327 err := b.sendAndReceive(request, response) 328 if err != nil { 329 return nil, err 330 } 331 332 return response, nil 333} 334 335func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) { 336 response := new(LeaveGroupResponse) 337 338 err := b.sendAndReceive(request, response) 339 if err != nil { 340 return nil, err 341 } 342 343 return response, nil 344} 345 346func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) { 347 response := new(HeartbeatResponse) 348 349 err := b.sendAndReceive(request, response) 350 if err != nil { 351 return nil, err 352 } 353 354 return response, nil 355} 356 357func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) { 358 response := new(ListGroupsResponse) 359 360 err := b.sendAndReceive(request, response) 361 if err != nil { 362 return nil, err 363 } 364 365 return response, nil 366} 367 368func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) { 369 response := new(DescribeGroupsResponse) 370 371 err := b.sendAndReceive(request, response) 372 if err != nil { 373 return nil, err 374 } 375 376 return response, nil 377} 378 379func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) { 380 response := new(ApiVersionsResponse) 381 382 err := b.sendAndReceive(request, response) 383 if err != nil { 384 return nil, err 385 } 386 387 return response, nil 388} 389 390func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { 391 response := new(CreateTopicsResponse) 392 393 err := b.sendAndReceive(request, response) 394 if err != nil { 395 return nil, err 396 } 397 398 return response, nil 399} 400 401func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { 402 response := new(DeleteTopicsResponse) 403 404 err := b.sendAndReceive(request, response) 405 if err != nil { 406 return nil, err 407 } 408 409 return response, nil 410} 411 412func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { 413 response := new(CreatePartitionsResponse) 414 415 err := b.sendAndReceive(request, response) 416 if err != nil { 417 return nil, err 418 } 419 420 return response, nil 421} 422 423func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { 424 response := new(DeleteRecordsResponse) 425 426 err := b.sendAndReceive(request, response) 427 if err != nil { 428 return nil, err 429 } 430 431 return response, nil 432} 433 434func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) { 435 response := new(DescribeAclsResponse) 436 437 err := b.sendAndReceive(request, response) 438 if err != nil { 439 return nil, err 440 } 441 442 return response, nil 443} 444 445func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) { 446 response := new(CreateAclsResponse) 447 448 err := b.sendAndReceive(request, response) 449 if err != nil { 450 return nil, err 451 } 452 453 return response, nil 454} 455 456func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) { 457 response := new(DeleteAclsResponse) 458 459 err := b.sendAndReceive(request, response) 460 if err != nil { 461 return nil, err 462 } 463 464 return response, nil 465} 466 467func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) { 468 response := new(InitProducerIDResponse) 469 470 err := b.sendAndReceive(request, response) 471 if err != nil { 472 return nil, err 473 } 474 475 return response, nil 476} 477 478func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) { 479 response := new(AddPartitionsToTxnResponse) 480 481 err := b.sendAndReceive(request, response) 482 if err != nil { 483 return nil, err 484 } 485 486 return response, nil 487} 488 489func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) { 490 response := new(AddOffsetsToTxnResponse) 491 492 err := b.sendAndReceive(request, response) 493 if err != nil { 494 return nil, err 495 } 496 497 return response, nil 498} 499 500func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) { 501 response := new(EndTxnResponse) 502 503 err := b.sendAndReceive(request, response) 504 if err != nil { 505 return nil, err 506 } 507 508 return response, nil 509} 510 511func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) { 512 response := new(TxnOffsetCommitResponse) 513 514 err := b.sendAndReceive(request, response) 515 if err != nil { 516 return nil, err 517 } 518 519 return response, nil 520} 521 522func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) { 523 response := new(DescribeConfigsResponse) 524 525 err := b.sendAndReceive(request, response) 526 if err != nil { 527 return nil, err 528 } 529 530 return response, nil 531} 532 533func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) { 534 response := new(AlterConfigsResponse) 535 536 err := b.sendAndReceive(request, response) 537 if err != nil { 538 return nil, err 539 } 540 541 return response, nil 542} 543 544func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) { 545 response := new(DeleteGroupsResponse) 546 547 if err := b.sendAndReceive(request, response); err != nil { 548 return nil, err 549 } 550 551 return response, nil 552} 553 554func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { 555 b.lock.Lock() 556 defer b.lock.Unlock() 557 558 if b.conn == nil { 559 if b.connErr != nil { 560 return nil, b.connErr 561 } 562 return nil, ErrNotConnected 563 } 564 565 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) { 566 return nil, ErrUnsupportedVersion 567 } 568 569 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} 570 buf, err := encode(req, b.conf.MetricRegistry) 571 if err != nil { 572 return nil, err 573 } 574 575 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) 576 if err != nil { 577 return nil, err 578 } 579 580 requestTime := time.Now() 581 bytes, err := b.conn.Write(buf) 582 b.updateOutgoingCommunicationMetrics(bytes) 583 if err != nil { 584 return nil, err 585 } 586 b.correlationID++ 587 588 if !promiseResponse { 589 // Record request latency without the response 590 b.updateRequestLatencyMetrics(time.Since(requestTime)) 591 return nil, nil 592 } 593 594 promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)} 595 b.responses <- promise 596 597 return &promise, nil 598} 599 600func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error { 601 promise, err := b.send(req, res != nil) 602 603 if err != nil { 604 return err 605 } 606 607 if promise == nil { 608 return nil 609 } 610 611 select { 612 case buf := <-promise.packets: 613 return versionedDecode(buf, res, req.version()) 614 case err = <-promise.errors: 615 return err 616 } 617} 618 619func (b *Broker) decode(pd packetDecoder, version int16) (err error) { 620 b.id, err = pd.getInt32() 621 if err != nil { 622 return err 623 } 624 625 host, err := pd.getString() 626 if err != nil { 627 return err 628 } 629 630 port, err := pd.getInt32() 631 if err != nil { 632 return err 633 } 634 635 if version >= 1 { 636 b.rack, err = pd.getNullableString() 637 if err != nil { 638 return err 639 } 640 } 641 642 b.addr = net.JoinHostPort(host, fmt.Sprint(port)) 643 if _, _, err := net.SplitHostPort(b.addr); err != nil { 644 return err 645 } 646 647 return nil 648} 649 650func (b *Broker) encode(pe packetEncoder, version int16) (err error) { 651 652 host, portstr, err := net.SplitHostPort(b.addr) 653 if err != nil { 654 return err 655 } 656 port, err := strconv.Atoi(portstr) 657 if err != nil { 658 return err 659 } 660 661 pe.putInt32(b.id) 662 663 err = pe.putString(host) 664 if err != nil { 665 return err 666 } 667 668 pe.putInt32(int32(port)) 669 670 if version >= 1 { 671 err = pe.putNullableString(b.rack) 672 if err != nil { 673 return err 674 } 675 } 676 677 return nil 678} 679 680func (b *Broker) responseReceiver() { 681 var dead error 682 header := make([]byte, 8) 683 for response := range b.responses { 684 if dead != nil { 685 response.errors <- dead 686 continue 687 } 688 689 err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)) 690 if err != nil { 691 dead = err 692 response.errors <- err 693 continue 694 } 695 696 bytesReadHeader, err := io.ReadFull(b.conn, header) 697 requestLatency := time.Since(response.requestTime) 698 if err != nil { 699 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) 700 dead = err 701 response.errors <- err 702 continue 703 } 704 705 decodedHeader := responseHeader{} 706 err = decode(header, &decodedHeader) 707 if err != nil { 708 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) 709 dead = err 710 response.errors <- err 711 continue 712 } 713 if decodedHeader.correlationID != response.correlationID { 714 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency) 715 // TODO if decoded ID < cur ID, discard until we catch up 716 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response 717 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} 718 response.errors <- dead 719 continue 720 } 721 722 buf := make([]byte, decodedHeader.length-4) 723 bytesReadBody, err := io.ReadFull(b.conn, buf) 724 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency) 725 if err != nil { 726 dead = err 727 response.errors <- err 728 continue 729 } 730 731 response.packets <- buf 732 } 733 close(b.done) 734} 735 736func (b *Broker) sendAndReceiveSASLPlainHandshake() error { 737 rb := &SaslHandshakeRequest{"PLAIN"} 738 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} 739 buf, err := encode(req, b.conf.MetricRegistry) 740 if err != nil { 741 return err 742 } 743 744 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) 745 if err != nil { 746 return err 747 } 748 749 requestTime := time.Now() 750 bytes, err := b.conn.Write(buf) 751 b.updateOutgoingCommunicationMetrics(bytes) 752 if err != nil { 753 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error()) 754 return err 755 } 756 b.correlationID++ 757 //wait for the response 758 header := make([]byte, 8) // response header 759 _, err = io.ReadFull(b.conn, header) 760 if err != nil { 761 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error()) 762 return err 763 } 764 length := binary.BigEndian.Uint32(header[:4]) 765 payload := make([]byte, length-4) 766 n, err := io.ReadFull(b.conn, payload) 767 if err != nil { 768 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error()) 769 return err 770 } 771 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime)) 772 res := &SaslHandshakeResponse{} 773 err = versionedDecode(payload, res, 0) 774 if err != nil { 775 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error()) 776 return err 777 } 778 if res.Err != ErrNoError { 779 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error()) 780 return res.Err 781 } 782 Logger.Print("Successful SASL handshake") 783 return nil 784} 785 786// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149) 787// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9 788// 789// In SASL Plain, Kafka expects the auth header to be in the following format 790// Message format (from https://tools.ietf.org/html/rfc4616): 791// 792// message = [authzid] UTF8NUL authcid UTF8NUL passwd 793// authcid = 1*SAFE ; MUST accept up to 255 octets 794// authzid = 1*SAFE ; MUST accept up to 255 octets 795// passwd = 1*SAFE ; MUST accept up to 255 octets 796// UTF8NUL = %x00 ; UTF-8 encoded NUL character 797// 798// SAFE = UTF1 / UTF2 / UTF3 / UTF4 799// ;; any UTF-8 encoded Unicode character except NUL 800// 801// When credentials are valid, Kafka returns a 4 byte array of null characters. 802// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way 803// of responding to bad credentials but thats how its being done today. 804func (b *Broker) sendAndReceiveSASLPlainAuth() error { 805 if b.conf.Net.SASL.Handshake { 806 handshakeErr := b.sendAndReceiveSASLPlainHandshake() 807 if handshakeErr != nil { 808 Logger.Printf("Error while performing SASL handshake %s\n", b.addr) 809 return handshakeErr 810 } 811 } 812 length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) 813 authBytes := make([]byte, length+4) //4 byte length header + auth data 814 binary.BigEndian.PutUint32(authBytes, uint32(length)) 815 copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)) 816 817 err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) 818 if err != nil { 819 Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error()) 820 return err 821 } 822 823 requestTime := time.Now() 824 bytesWritten, err := b.conn.Write(authBytes) 825 b.updateOutgoingCommunicationMetrics(bytesWritten) 826 if err != nil { 827 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) 828 return err 829 } 830 831 header := make([]byte, 4) 832 n, err := io.ReadFull(b.conn, header) 833 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime)) 834 // If the credentials are valid, we would get a 4 byte response filled with null characters. 835 // Otherwise, the broker closes the connection and we get an EOF 836 if err != nil { 837 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error()) 838 return err 839 } 840 841 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) 842 return nil 843} 844 845func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) { 846 b.updateRequestLatencyMetrics(requestLatency) 847 b.responseRate.Mark(1) 848 if b.brokerResponseRate != nil { 849 b.brokerResponseRate.Mark(1) 850 } 851 responseSize := int64(bytes) 852 b.incomingByteRate.Mark(responseSize) 853 if b.brokerIncomingByteRate != nil { 854 b.brokerIncomingByteRate.Mark(responseSize) 855 } 856 b.responseSize.Update(responseSize) 857 if b.brokerResponseSize != nil { 858 b.brokerResponseSize.Update(responseSize) 859 } 860} 861 862func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) { 863 requestLatencyInMs := int64(requestLatency / time.Millisecond) 864 b.requestLatency.Update(requestLatencyInMs) 865 if b.brokerRequestLatency != nil { 866 b.brokerRequestLatency.Update(requestLatencyInMs) 867 } 868} 869 870func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) { 871 b.requestRate.Mark(1) 872 if b.brokerRequestRate != nil { 873 b.brokerRequestRate.Mark(1) 874 } 875 requestSize := int64(bytes) 876 b.outgoingByteRate.Mark(requestSize) 877 if b.brokerOutgoingByteRate != nil { 878 b.brokerOutgoingByteRate.Mark(requestSize) 879 } 880 b.requestSize.Update(requestSize) 881 if b.brokerRequestSize != nil { 882 b.brokerRequestSize.Update(requestSize) 883 } 884} 885