1package sarama
2
3import (
4	"bufio"
5	"net"
6)
7
8type none struct{}
9
10// make []int32 sortable so we can sort partition numbers
11type int32Slice []int32
12
13func (slice int32Slice) Len() int {
14	return len(slice)
15}
16
17func (slice int32Slice) Less(i, j int) bool {
18	return slice[i] < slice[j]
19}
20
21func (slice int32Slice) Swap(i, j int) {
22	slice[i], slice[j] = slice[j], slice[i]
23}
24
25func dupInt32Slice(input []int32) []int32 {
26	ret := make([]int32, 0, len(input))
27	for _, val := range input {
28		ret = append(ret, val)
29	}
30	return ret
31}
32
33func withRecover(fn func()) {
34	defer func() {
35		handler := PanicHandler
36		if handler != nil {
37			if err := recover(); err != nil {
38				handler(err)
39			}
40		}
41	}()
42
43	fn()
44}
45
46func safeAsyncClose(b *Broker) {
47	tmp := b // local var prevents clobbering in goroutine
48	go withRecover(func() {
49		if connected, _ := tmp.Connected(); connected {
50			if err := tmp.Close(); err != nil {
51				Logger.Println("Error closing broker", tmp.ID(), ":", err)
52			}
53		}
54	})
55}
56
57// Encoder is a simple interface for any type that can be encoded as an array of bytes
58// in order to be sent as the key or value of a Kafka message. Length() is provided as an
59// optimization, and must return the same as len() on the result of Encode().
60type Encoder interface {
61	Encode() ([]byte, error)
62	Length() int
63}
64
65// make strings and byte slices encodable for convenience so they can be used as keys
66// and/or values in kafka messages
67
68// StringEncoder implements the Encoder interface for Go strings so that they can be used
69// as the Key or Value in a ProducerMessage.
70type StringEncoder string
71
72func (s StringEncoder) Encode() ([]byte, error) {
73	return []byte(s), nil
74}
75
76func (s StringEncoder) Length() int {
77	return len(s)
78}
79
80// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
81// as the Key or Value in a ProducerMessage.
82type ByteEncoder []byte
83
84func (b ByteEncoder) Encode() ([]byte, error) {
85	return b, nil
86}
87
88func (b ByteEncoder) Length() int {
89	return len(b)
90}
91
92// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
93// reads that trigger syscalls.
94type bufConn struct {
95	net.Conn
96	buf *bufio.Reader
97}
98
99func newBufConn(conn net.Conn) *bufConn {
100	return &bufConn{
101		Conn: conn,
102		buf:  bufio.NewReader(conn),
103	}
104}
105
106func (bc *bufConn) Read(b []byte) (n int, err error) {
107	return bc.buf.Read(b)
108}
109
110// KafkaVersion instances represent versions of the upstream Kafka broker.
111type KafkaVersion struct {
112	// it's a struct rather than just typing the array directly to make it opaque and stop people
113	// generating their own arbitrary versions
114	version [4]uint
115}
116
117func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
118	return KafkaVersion{
119		version: [4]uint{major, minor, veryMinor, patch},
120	}
121}
122
123// IsAtLeast return true if and only if the version it is called on is
124// greater than or equal to the version passed in:
125//    V1.IsAtLeast(V2) // false
126//    V2.IsAtLeast(V1) // true
127func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
128	for i := range v.version {
129		if v.version[i] > other.version[i] {
130			return true
131		} else if v.version[i] < other.version[i] {
132			return false
133		}
134	}
135	return true
136}
137
138// Effective constants defining the supported kafka versions.
139var (
140	V0_8_2_0   = newKafkaVersion(0, 8, 2, 0)
141	V0_8_2_1   = newKafkaVersion(0, 8, 2, 1)
142	V0_8_2_2   = newKafkaVersion(0, 8, 2, 2)
143	V0_9_0_0   = newKafkaVersion(0, 9, 0, 0)
144	V0_9_0_1   = newKafkaVersion(0, 9, 0, 1)
145	V0_10_0_0  = newKafkaVersion(0, 10, 0, 0)
146	V0_10_0_1  = newKafkaVersion(0, 10, 0, 1)
147	V0_10_1_0  = newKafkaVersion(0, 10, 1, 0)
148	V0_10_2_0  = newKafkaVersion(0, 10, 2, 0)
149	minVersion = V0_8_2_0
150)
151