1package sarama 2 3import ( 4 "bufio" 5 "net" 6) 7 8type none struct{} 9 10// make []int32 sortable so we can sort partition numbers 11type int32Slice []int32 12 13func (slice int32Slice) Len() int { 14 return len(slice) 15} 16 17func (slice int32Slice) Less(i, j int) bool { 18 return slice[i] < slice[j] 19} 20 21func (slice int32Slice) Swap(i, j int) { 22 slice[i], slice[j] = slice[j], slice[i] 23} 24 25func dupInt32Slice(input []int32) []int32 { 26 ret := make([]int32, 0, len(input)) 27 for _, val := range input { 28 ret = append(ret, val) 29 } 30 return ret 31} 32 33func withRecover(fn func()) { 34 defer func() { 35 handler := PanicHandler 36 if handler != nil { 37 if err := recover(); err != nil { 38 handler(err) 39 } 40 } 41 }() 42 43 fn() 44} 45 46func safeAsyncClose(b *Broker) { 47 tmp := b // local var prevents clobbering in goroutine 48 go withRecover(func() { 49 if connected, _ := tmp.Connected(); connected { 50 if err := tmp.Close(); err != nil { 51 Logger.Println("Error closing broker", tmp.ID(), ":", err) 52 } 53 } 54 }) 55} 56 57// Encoder is a simple interface for any type that can be encoded as an array of bytes 58// in order to be sent as the key or value of a Kafka message. Length() is provided as an 59// optimization, and must return the same as len() on the result of Encode(). 60type Encoder interface { 61 Encode() ([]byte, error) 62 Length() int 63} 64 65// make strings and byte slices encodable for convenience so they can be used as keys 66// and/or values in kafka messages 67 68// StringEncoder implements the Encoder interface for Go strings so that they can be used 69// as the Key or Value in a ProducerMessage. 70type StringEncoder string 71 72func (s StringEncoder) Encode() ([]byte, error) { 73 return []byte(s), nil 74} 75 76func (s StringEncoder) Length() int { 77 return len(s) 78} 79 80// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used 81// as the Key or Value in a ProducerMessage. 82type ByteEncoder []byte 83 84func (b ByteEncoder) Encode() ([]byte, error) { 85 return b, nil 86} 87 88func (b ByteEncoder) Length() int { 89 return len(b) 90} 91 92// bufConn wraps a net.Conn with a buffer for reads to reduce the number of 93// reads that trigger syscalls. 94type bufConn struct { 95 net.Conn 96 buf *bufio.Reader 97} 98 99func newBufConn(conn net.Conn) *bufConn { 100 return &bufConn{ 101 Conn: conn, 102 buf: bufio.NewReader(conn), 103 } 104} 105 106func (bc *bufConn) Read(b []byte) (n int, err error) { 107 return bc.buf.Read(b) 108} 109 110// KafkaVersion instances represent versions of the upstream Kafka broker. 111type KafkaVersion struct { 112 // it's a struct rather than just typing the array directly to make it opaque and stop people 113 // generating their own arbitrary versions 114 version [4]uint 115} 116 117func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion { 118 return KafkaVersion{ 119 version: [4]uint{major, minor, veryMinor, patch}, 120 } 121} 122 123// IsAtLeast return true if and only if the version it is called on is 124// greater than or equal to the version passed in: 125// V1.IsAtLeast(V2) // false 126// V2.IsAtLeast(V1) // true 127func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool { 128 for i := range v.version { 129 if v.version[i] > other.version[i] { 130 return true 131 } else if v.version[i] < other.version[i] { 132 return false 133 } 134 } 135 return true 136} 137 138// Effective constants defining the supported kafka versions. 139var ( 140 V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) 141 V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) 142 V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) 143 V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) 144 V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) 145 V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) 146 V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) 147 V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) 148 V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) 149 minVersion = V0_8_2_0 150) 151