1package sarama 2 3import ( 4 "bufio" 5 "fmt" 6 "net" 7 "regexp" 8) 9 10type none struct{} 11 12// make []int32 sortable so we can sort partition numbers 13type int32Slice []int32 14 15func (slice int32Slice) Len() int { 16 return len(slice) 17} 18 19func (slice int32Slice) Less(i, j int) bool { 20 return slice[i] < slice[j] 21} 22 23func (slice int32Slice) Swap(i, j int) { 24 slice[i], slice[j] = slice[j], slice[i] 25} 26 27func dupInt32Slice(input []int32) []int32 { 28 ret := make([]int32, 0, len(input)) 29 ret = append(ret, input...) 30 return ret 31} 32 33func withRecover(fn func()) { 34 defer func() { 35 handler := PanicHandler 36 if handler != nil { 37 if err := recover(); err != nil { 38 handler(err) 39 } 40 } 41 }() 42 43 fn() 44} 45 46func safeAsyncClose(b *Broker) { 47 tmp := b // local var prevents clobbering in goroutine 48 go withRecover(func() { 49 if connected, _ := tmp.Connected(); connected { 50 if err := tmp.Close(); err != nil { 51 Logger.Println("Error closing broker", tmp.ID(), ":", err) 52 } 53 } 54 }) 55} 56 57// Encoder is a simple interface for any type that can be encoded as an array of bytes 58// in order to be sent as the key or value of a Kafka message. Length() is provided as an 59// optimization, and must return the same as len() on the result of Encode(). 60type Encoder interface { 61 Encode() ([]byte, error) 62 Length() int 63} 64 65// make strings and byte slices encodable for convenience so they can be used as keys 66// and/or values in kafka messages 67 68// StringEncoder implements the Encoder interface for Go strings so that they can be used 69// as the Key or Value in a ProducerMessage. 70type StringEncoder string 71 72func (s StringEncoder) Encode() ([]byte, error) { 73 return []byte(s), nil 74} 75 76func (s StringEncoder) Length() int { 77 return len(s) 78} 79 80// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used 81// as the Key or Value in a ProducerMessage. 82type ByteEncoder []byte 83 84func (b ByteEncoder) Encode() ([]byte, error) { 85 return b, nil 86} 87 88func (b ByteEncoder) Length() int { 89 return len(b) 90} 91 92// bufConn wraps a net.Conn with a buffer for reads to reduce the number of 93// reads that trigger syscalls. 94type bufConn struct { 95 net.Conn 96 buf *bufio.Reader 97} 98 99func newBufConn(conn net.Conn) *bufConn { 100 return &bufConn{ 101 Conn: conn, 102 buf: bufio.NewReader(conn), 103 } 104} 105 106func (bc *bufConn) Read(b []byte) (n int, err error) { 107 return bc.buf.Read(b) 108} 109 110// KafkaVersion instances represent versions of the upstream Kafka broker. 111type KafkaVersion struct { 112 // it's a struct rather than just typing the array directly to make it opaque and stop people 113 // generating their own arbitrary versions 114 version [4]uint 115} 116 117func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion { 118 return KafkaVersion{ 119 version: [4]uint{major, minor, veryMinor, patch}, 120 } 121} 122 123// IsAtLeast return true if and only if the version it is called on is 124// greater than or equal to the version passed in: 125// V1.IsAtLeast(V2) // false 126// V2.IsAtLeast(V1) // true 127func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool { 128 for i := range v.version { 129 if v.version[i] > other.version[i] { 130 return true 131 } else if v.version[i] < other.version[i] { 132 return false 133 } 134 } 135 return true 136} 137 138// Effective constants defining the supported kafka versions. 139var ( 140 V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) 141 V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) 142 V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) 143 V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) 144 V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) 145 V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) 146 V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) 147 V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) 148 V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) 149 V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) 150 V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) 151 V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) 152 V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) 153 V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) 154 V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) 155 V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) 156 V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) 157 V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) 158 V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) 159 V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) 160 V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) 161 V2_3_0_0 = newKafkaVersion(2, 3, 0, 0) 162 V2_4_0_0 = newKafkaVersion(2, 4, 0, 0) 163 V2_5_0_0 = newKafkaVersion(2, 5, 0, 0) 164 V2_6_0_0 = newKafkaVersion(2, 6, 0, 0) 165 V2_7_0_0 = newKafkaVersion(2, 7, 0, 0) 166 V2_8_0_0 = newKafkaVersion(2, 8, 0, 0) 167 168 SupportedVersions = []KafkaVersion{ 169 V0_8_2_0, 170 V0_8_2_1, 171 V0_8_2_2, 172 V0_9_0_0, 173 V0_9_0_1, 174 V0_10_0_0, 175 V0_10_0_1, 176 V0_10_1_0, 177 V0_10_1_1, 178 V0_10_2_0, 179 V0_10_2_1, 180 V0_11_0_0, 181 V0_11_0_1, 182 V0_11_0_2, 183 V1_0_0_0, 184 V1_1_0_0, 185 V1_1_1_0, 186 V2_0_0_0, 187 V2_0_1_0, 188 V2_1_0_0, 189 V2_2_0_0, 190 V2_3_0_0, 191 V2_4_0_0, 192 V2_5_0_0, 193 V2_6_0_0, 194 V2_7_0_0, 195 V2_8_0_0, 196 } 197 MinVersion = V0_8_2_0 198 MaxVersion = V2_8_0_0 199 DefaultVersion = V1_0_0_0 200) 201 202// ParseKafkaVersion parses and returns kafka version or error from a string 203func ParseKafkaVersion(s string) (KafkaVersion, error) { 204 if len(s) < 5 { 205 return DefaultVersion, fmt.Errorf("invalid version `%s`", s) 206 } 207 var major, minor, veryMinor, patch uint 208 var err error 209 if s[0] == '0' { 210 err = scanKafkaVersion(s, `^0\.\d+\.\d+\.\d+$`, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch}) 211 } else { 212 err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor}) 213 } 214 if err != nil { 215 return DefaultVersion, err 216 } 217 return newKafkaVersion(major, minor, veryMinor, patch), nil 218} 219 220func scanKafkaVersion(s string, pattern string, format string, v [3]*uint) error { 221 if !regexp.MustCompile(pattern).MatchString(s) { 222 return fmt.Errorf("invalid version `%s`", s) 223 } 224 _, err := fmt.Sscanf(s, format, v[0], v[1], v[2]) 225 return err 226} 227 228func (v KafkaVersion) String() string { 229 if v.version[0] == 0 { 230 return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3]) 231 } 232 233 return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2]) 234} 235