1package sarama
2
3import (
4	"crypto/tls"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"net"
9	"strconv"
10	"sync"
11	"sync/atomic"
12	"time"
13
14	"github.com/rcrowley/go-metrics"
15)
16
17// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
18type Broker struct {
19	id   int32
20	addr string
21	rack *string
22
23	conf          *Config
24	correlationID int32
25	conn          net.Conn
26	connErr       error
27	lock          sync.Mutex
28	opened        int32
29
30	responses chan responsePromise
31	done      chan bool
32
33	incomingByteRate       metrics.Meter
34	requestRate            metrics.Meter
35	requestSize            metrics.Histogram
36	requestLatency         metrics.Histogram
37	outgoingByteRate       metrics.Meter
38	responseRate           metrics.Meter
39	responseSize           metrics.Histogram
40	brokerIncomingByteRate metrics.Meter
41	brokerRequestRate      metrics.Meter
42	brokerRequestSize      metrics.Histogram
43	brokerRequestLatency   metrics.Histogram
44	brokerOutgoingByteRate metrics.Meter
45	brokerResponseRate     metrics.Meter
46	brokerResponseSize     metrics.Histogram
47}
48
49type responsePromise struct {
50	requestTime   time.Time
51	correlationID int32
52	packets       chan []byte
53	errors        chan error
54}
55
56// NewBroker creates and returns a Broker targeting the given host:port address.
57// This does not attempt to actually connect, you have to call Open() for that.
58func NewBroker(addr string) *Broker {
59	return &Broker{id: -1, addr: addr}
60}
61
62// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
63// waiting for the connection to complete. This means that any subsequent operations on the broker will
64// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
65// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
66// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
67func (b *Broker) Open(conf *Config) error {
68	if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
69		return ErrAlreadyConnected
70	}
71
72	if conf == nil {
73		conf = NewConfig()
74	}
75
76	err := conf.Validate()
77	if err != nil {
78		return err
79	}
80
81	b.lock.Lock()
82
83	go withRecover(func() {
84		defer b.lock.Unlock()
85
86		dialer := net.Dialer{
87			Timeout:   conf.Net.DialTimeout,
88			KeepAlive: conf.Net.KeepAlive,
89			LocalAddr: conf.Net.LocalAddr,
90		}
91
92		if conf.Net.TLS.Enable {
93			b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
94		} else {
95			b.conn, b.connErr = dialer.Dial("tcp", b.addr)
96		}
97		if b.connErr != nil {
98			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
99			b.conn = nil
100			atomic.StoreInt32(&b.opened, 0)
101			return
102		}
103		b.conn = newBufConn(b.conn)
104
105		b.conf = conf
106
107		// Create or reuse the global metrics shared between brokers
108		b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
109		b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
110		b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
111		b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
112		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
113		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
114		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
115		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
116		// the same id (-1) and are already exposed through the global metrics above
117		if b.id >= 0 {
118			b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
119			b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
120			b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
121			b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
122			b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
123			b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
124			b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
125		}
126
127		if conf.Net.SASL.Enable {
128			b.connErr = b.sendAndReceiveSASLPlainAuth()
129			if b.connErr != nil {
130				err = b.conn.Close()
131				if err == nil {
132					Logger.Printf("Closed connection to broker %s\n", b.addr)
133				} else {
134					Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
135				}
136				b.conn = nil
137				atomic.StoreInt32(&b.opened, 0)
138				return
139			}
140		}
141
142		b.done = make(chan bool)
143		b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
144
145		if b.id >= 0 {
146			Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
147		} else {
148			Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
149		}
150		go withRecover(b.responseReceiver)
151	})
152
153	return nil
154}
155
156// Connected returns true if the broker is connected and false otherwise. If the broker is not
157// connected but it had tried to connect, the error from that connection attempt is also returned.
158func (b *Broker) Connected() (bool, error) {
159	b.lock.Lock()
160	defer b.lock.Unlock()
161
162	return b.conn != nil, b.connErr
163}
164
165func (b *Broker) Close() error {
166	b.lock.Lock()
167	defer b.lock.Unlock()
168
169	if b.conn == nil {
170		return ErrNotConnected
171	}
172
173	close(b.responses)
174	<-b.done
175
176	err := b.conn.Close()
177
178	b.conn = nil
179	b.connErr = nil
180	b.done = nil
181	b.responses = nil
182
183	if b.id >= 0 {
184		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
185		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
186		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
187		b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
188	}
189
190	if err == nil {
191		Logger.Printf("Closed connection to broker %s\n", b.addr)
192	} else {
193		Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
194	}
195
196	atomic.StoreInt32(&b.opened, 0)
197
198	return err
199}
200
201// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
202func (b *Broker) ID() int32 {
203	return b.id
204}
205
206// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
207func (b *Broker) Addr() string {
208	return b.addr
209}
210
211func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
212	response := new(MetadataResponse)
213
214	err := b.sendAndReceive(request, response)
215
216	if err != nil {
217		return nil, err
218	}
219
220	return response, nil
221}
222
223func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
224	response := new(ConsumerMetadataResponse)
225
226	err := b.sendAndReceive(request, response)
227
228	if err != nil {
229		return nil, err
230	}
231
232	return response, nil
233}
234
235func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
236	response := new(FindCoordinatorResponse)
237
238	err := b.sendAndReceive(request, response)
239
240	if err != nil {
241		return nil, err
242	}
243
244	return response, nil
245}
246
247func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
248	response := new(OffsetResponse)
249
250	err := b.sendAndReceive(request, response)
251
252	if err != nil {
253		return nil, err
254	}
255
256	return response, nil
257}
258
259func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
260	var response *ProduceResponse
261	var err error
262
263	if request.RequiredAcks == NoResponse {
264		err = b.sendAndReceive(request, nil)
265	} else {
266		response = new(ProduceResponse)
267		err = b.sendAndReceive(request, response)
268	}
269
270	if err != nil {
271		return nil, err
272	}
273
274	return response, nil
275}
276
277func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
278	response := new(FetchResponse)
279
280	err := b.sendAndReceive(request, response)
281
282	if err != nil {
283		return nil, err
284	}
285
286	return response, nil
287}
288
289func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
290	response := new(OffsetCommitResponse)
291
292	err := b.sendAndReceive(request, response)
293
294	if err != nil {
295		return nil, err
296	}
297
298	return response, nil
299}
300
301func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
302	response := new(OffsetFetchResponse)
303
304	err := b.sendAndReceive(request, response)
305
306	if err != nil {
307		return nil, err
308	}
309
310	return response, nil
311}
312
313func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
314	response := new(JoinGroupResponse)
315
316	err := b.sendAndReceive(request, response)
317	if err != nil {
318		return nil, err
319	}
320
321	return response, nil
322}
323
324func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
325	response := new(SyncGroupResponse)
326
327	err := b.sendAndReceive(request, response)
328	if err != nil {
329		return nil, err
330	}
331
332	return response, nil
333}
334
335func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
336	response := new(LeaveGroupResponse)
337
338	err := b.sendAndReceive(request, response)
339	if err != nil {
340		return nil, err
341	}
342
343	return response, nil
344}
345
346func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
347	response := new(HeartbeatResponse)
348
349	err := b.sendAndReceive(request, response)
350	if err != nil {
351		return nil, err
352	}
353
354	return response, nil
355}
356
357func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
358	response := new(ListGroupsResponse)
359
360	err := b.sendAndReceive(request, response)
361	if err != nil {
362		return nil, err
363	}
364
365	return response, nil
366}
367
368func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
369	response := new(DescribeGroupsResponse)
370
371	err := b.sendAndReceive(request, response)
372	if err != nil {
373		return nil, err
374	}
375
376	return response, nil
377}
378
379func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
380	response := new(ApiVersionsResponse)
381
382	err := b.sendAndReceive(request, response)
383	if err != nil {
384		return nil, err
385	}
386
387	return response, nil
388}
389
390func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
391	response := new(CreateTopicsResponse)
392
393	err := b.sendAndReceive(request, response)
394	if err != nil {
395		return nil, err
396	}
397
398	return response, nil
399}
400
401func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
402	response := new(DeleteTopicsResponse)
403
404	err := b.sendAndReceive(request, response)
405	if err != nil {
406		return nil, err
407	}
408
409	return response, nil
410}
411
412func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
413	response := new(CreatePartitionsResponse)
414
415	err := b.sendAndReceive(request, response)
416	if err != nil {
417		return nil, err
418	}
419
420	return response, nil
421}
422
423func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
424	response := new(DeleteRecordsResponse)
425
426	err := b.sendAndReceive(request, response)
427	if err != nil {
428		return nil, err
429	}
430
431	return response, nil
432}
433
434func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
435	response := new(DescribeAclsResponse)
436
437	err := b.sendAndReceive(request, response)
438	if err != nil {
439		return nil, err
440	}
441
442	return response, nil
443}
444
445func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
446	response := new(CreateAclsResponse)
447
448	err := b.sendAndReceive(request, response)
449	if err != nil {
450		return nil, err
451	}
452
453	return response, nil
454}
455
456func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
457	response := new(DeleteAclsResponse)
458
459	err := b.sendAndReceive(request, response)
460	if err != nil {
461		return nil, err
462	}
463
464	return response, nil
465}
466
467func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
468	response := new(InitProducerIDResponse)
469
470	err := b.sendAndReceive(request, response)
471	if err != nil {
472		return nil, err
473	}
474
475	return response, nil
476}
477
478func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
479	response := new(AddPartitionsToTxnResponse)
480
481	err := b.sendAndReceive(request, response)
482	if err != nil {
483		return nil, err
484	}
485
486	return response, nil
487}
488
489func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
490	response := new(AddOffsetsToTxnResponse)
491
492	err := b.sendAndReceive(request, response)
493	if err != nil {
494		return nil, err
495	}
496
497	return response, nil
498}
499
500func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
501	response := new(EndTxnResponse)
502
503	err := b.sendAndReceive(request, response)
504	if err != nil {
505		return nil, err
506	}
507
508	return response, nil
509}
510
511func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
512	response := new(TxnOffsetCommitResponse)
513
514	err := b.sendAndReceive(request, response)
515	if err != nil {
516		return nil, err
517	}
518
519	return response, nil
520}
521
522func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
523	response := new(DescribeConfigsResponse)
524
525	err := b.sendAndReceive(request, response)
526	if err != nil {
527		return nil, err
528	}
529
530	return response, nil
531}
532
533func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
534	response := new(AlterConfigsResponse)
535
536	err := b.sendAndReceive(request, response)
537	if err != nil {
538		return nil, err
539	}
540
541	return response, nil
542}
543
544func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
545	response := new(DeleteGroupsResponse)
546
547	if err := b.sendAndReceive(request, response); err != nil {
548		return nil, err
549	}
550
551	return response, nil
552}
553
554func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
555	b.lock.Lock()
556	defer b.lock.Unlock()
557
558	if b.conn == nil {
559		if b.connErr != nil {
560			return nil, b.connErr
561		}
562		return nil, ErrNotConnected
563	}
564
565	if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
566		return nil, ErrUnsupportedVersion
567	}
568
569	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
570	buf, err := encode(req, b.conf.MetricRegistry)
571	if err != nil {
572		return nil, err
573	}
574
575	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
576	if err != nil {
577		return nil, err
578	}
579
580	requestTime := time.Now()
581	bytes, err := b.conn.Write(buf)
582	b.updateOutgoingCommunicationMetrics(bytes)
583	if err != nil {
584		return nil, err
585	}
586	b.correlationID++
587
588	if !promiseResponse {
589		// Record request latency without the response
590		b.updateRequestLatencyMetrics(time.Since(requestTime))
591		return nil, nil
592	}
593
594	promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
595	b.responses <- promise
596
597	return &promise, nil
598}
599
600func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
601	promise, err := b.send(req, res != nil)
602
603	if err != nil {
604		return err
605	}
606
607	if promise == nil {
608		return nil
609	}
610
611	select {
612	case buf := <-promise.packets:
613		return versionedDecode(buf, res, req.version())
614	case err = <-promise.errors:
615		return err
616	}
617}
618
619func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
620	b.id, err = pd.getInt32()
621	if err != nil {
622		return err
623	}
624
625	host, err := pd.getString()
626	if err != nil {
627		return err
628	}
629
630	port, err := pd.getInt32()
631	if err != nil {
632		return err
633	}
634
635	if version >= 1 {
636		b.rack, err = pd.getNullableString()
637		if err != nil {
638			return err
639		}
640	}
641
642	b.addr = net.JoinHostPort(host, fmt.Sprint(port))
643	if _, _, err := net.SplitHostPort(b.addr); err != nil {
644		return err
645	}
646
647	return nil
648}
649
650func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
651
652	host, portstr, err := net.SplitHostPort(b.addr)
653	if err != nil {
654		return err
655	}
656	port, err := strconv.Atoi(portstr)
657	if err != nil {
658		return err
659	}
660
661	pe.putInt32(b.id)
662
663	err = pe.putString(host)
664	if err != nil {
665		return err
666	}
667
668	pe.putInt32(int32(port))
669
670	if version >= 1 {
671		err = pe.putNullableString(b.rack)
672		if err != nil {
673			return err
674		}
675	}
676
677	return nil
678}
679
680func (b *Broker) responseReceiver() {
681	var dead error
682	header := make([]byte, 8)
683	for response := range b.responses {
684		if dead != nil {
685			response.errors <- dead
686			continue
687		}
688
689		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
690		if err != nil {
691			dead = err
692			response.errors <- err
693			continue
694		}
695
696		bytesReadHeader, err := io.ReadFull(b.conn, header)
697		requestLatency := time.Since(response.requestTime)
698		if err != nil {
699			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
700			dead = err
701			response.errors <- err
702			continue
703		}
704
705		decodedHeader := responseHeader{}
706		err = decode(header, &decodedHeader)
707		if err != nil {
708			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
709			dead = err
710			response.errors <- err
711			continue
712		}
713		if decodedHeader.correlationID != response.correlationID {
714			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
715			// TODO if decoded ID < cur ID, discard until we catch up
716			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
717			dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
718			response.errors <- dead
719			continue
720		}
721
722		buf := make([]byte, decodedHeader.length-4)
723		bytesReadBody, err := io.ReadFull(b.conn, buf)
724		b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
725		if err != nil {
726			dead = err
727			response.errors <- err
728			continue
729		}
730
731		response.packets <- buf
732	}
733	close(b.done)
734}
735
736func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
737	rb := &SaslHandshakeRequest{"PLAIN"}
738	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
739	buf, err := encode(req, b.conf.MetricRegistry)
740	if err != nil {
741		return err
742	}
743
744	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
745	if err != nil {
746		return err
747	}
748
749	requestTime := time.Now()
750	bytes, err := b.conn.Write(buf)
751	b.updateOutgoingCommunicationMetrics(bytes)
752	if err != nil {
753		Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
754		return err
755	}
756	b.correlationID++
757	//wait for the response
758	header := make([]byte, 8) // response header
759	_, err = io.ReadFull(b.conn, header)
760	if err != nil {
761		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
762		return err
763	}
764	length := binary.BigEndian.Uint32(header[:4])
765	payload := make([]byte, length-4)
766	n, err := io.ReadFull(b.conn, payload)
767	if err != nil {
768		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
769		return err
770	}
771	b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
772	res := &SaslHandshakeResponse{}
773	err = versionedDecode(payload, res, 0)
774	if err != nil {
775		Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
776		return err
777	}
778	if res.Err != ErrNoError {
779		Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
780		return res.Err
781	}
782	Logger.Print("Successful SASL handshake")
783	return nil
784}
785
786// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
787// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
788//
789// In SASL Plain, Kafka expects the auth header to be in the following format
790// Message format (from https://tools.ietf.org/html/rfc4616):
791//
792//   message   = [authzid] UTF8NUL authcid UTF8NUL passwd
793//   authcid   = 1*SAFE ; MUST accept up to 255 octets
794//   authzid   = 1*SAFE ; MUST accept up to 255 octets
795//   passwd    = 1*SAFE ; MUST accept up to 255 octets
796//   UTF8NUL   = %x00 ; UTF-8 encoded NUL character
797//
798//   SAFE      = UTF1 / UTF2 / UTF3 / UTF4
799//                  ;; any UTF-8 encoded Unicode character except NUL
800//
801// When credentials are valid, Kafka returns a 4 byte array of null characters.
802// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
803// of responding to bad credentials but thats how its being done today.
804func (b *Broker) sendAndReceiveSASLPlainAuth() error {
805	if b.conf.Net.SASL.Handshake {
806		handshakeErr := b.sendAndReceiveSASLPlainHandshake()
807		if handshakeErr != nil {
808			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
809			return handshakeErr
810		}
811	}
812	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
813	authBytes := make([]byte, length+4) //4 byte length header + auth data
814	binary.BigEndian.PutUint32(authBytes, uint32(length))
815	copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
816
817	err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
818	if err != nil {
819		Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
820		return err
821	}
822
823	requestTime := time.Now()
824	bytesWritten, err := b.conn.Write(authBytes)
825	b.updateOutgoingCommunicationMetrics(bytesWritten)
826	if err != nil {
827		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
828		return err
829	}
830
831	header := make([]byte, 4)
832	n, err := io.ReadFull(b.conn, header)
833	b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
834	// If the credentials are valid, we would get a 4 byte response filled with null characters.
835	// Otherwise, the broker closes the connection and we get an EOF
836	if err != nil {
837		Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
838		return err
839	}
840
841	Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
842	return nil
843}
844
845func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
846	b.updateRequestLatencyMetrics(requestLatency)
847	b.responseRate.Mark(1)
848	if b.brokerResponseRate != nil {
849		b.brokerResponseRate.Mark(1)
850	}
851	responseSize := int64(bytes)
852	b.incomingByteRate.Mark(responseSize)
853	if b.brokerIncomingByteRate != nil {
854		b.brokerIncomingByteRate.Mark(responseSize)
855	}
856	b.responseSize.Update(responseSize)
857	if b.brokerResponseSize != nil {
858		b.brokerResponseSize.Update(responseSize)
859	}
860}
861
862func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
863	requestLatencyInMs := int64(requestLatency / time.Millisecond)
864	b.requestLatency.Update(requestLatencyInMs)
865	if b.brokerRequestLatency != nil {
866		b.brokerRequestLatency.Update(requestLatencyInMs)
867	}
868}
869
870func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
871	b.requestRate.Mark(1)
872	if b.brokerRequestRate != nil {
873		b.brokerRequestRate.Mark(1)
874	}
875	requestSize := int64(bytes)
876	b.outgoingByteRate.Mark(requestSize)
877	if b.brokerOutgoingByteRate != nil {
878		b.brokerOutgoingByteRate.Mark(requestSize)
879	}
880	b.requestSize.Update(requestSize)
881	if b.brokerRequestSize != nil {
882		b.brokerRequestSize.Update(requestSize)
883	}
884}
885