1// +build go1.9 2 3package sarama 4 5import ( 6 "context" 7 "fmt" 8 "log" 9 "reflect" 10 "sync" 11 "sync/atomic" 12 "testing" 13 "time" 14) 15 16func TestFuncConsumerGroupPartitioning(t *testing.T) { 17 checkKafkaVersion(t, "0.10.2") 18 setupFunctionalTest(t) 19 defer teardownFunctionalTest(t) 20 21 groupID := testFuncConsumerGroupID(t) 22 23 // start M1 24 m1 := runTestFuncConsumerGroupMember(t, groupID, "M1", 0, nil) 25 defer m1.Stop() 26 m1.WaitForState(2) 27 m1.WaitForClaims(map[string]int{"test.4": 4}) 28 m1.WaitForHandlers(4) 29 30 // start M2 31 m2 := runTestFuncConsumerGroupMember(t, groupID, "M2", 0, nil, "test.1", "test.4") 32 defer m2.Stop() 33 m2.WaitForState(2) 34 35 // assert that claims are shared among both members 36 m1.WaitForClaims(map[string]int{"test.4": 2}) 37 m1.WaitForHandlers(2) 38 m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 2}) 39 m2.WaitForHandlers(3) 40 41 // shutdown M1, wait for M2 to take over 42 m1.AssertCleanShutdown() 43 m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 4}) 44 m2.WaitForHandlers(5) 45 46 // shutdown M2 47 m2.AssertCleanShutdown() 48} 49 50func TestFuncConsumerGroupExcessConsumers(t *testing.T) { 51 checkKafkaVersion(t, "0.10.2") 52 setupFunctionalTest(t) 53 defer teardownFunctionalTest(t) 54 55 groupID := testFuncConsumerGroupID(t) 56 57 // start members 58 m1 := runTestFuncConsumerGroupMember(t, groupID, "M1", 0, nil) 59 defer m1.Stop() 60 m2 := runTestFuncConsumerGroupMember(t, groupID, "M2", 0, nil) 61 defer m2.Stop() 62 m3 := runTestFuncConsumerGroupMember(t, groupID, "M3", 0, nil) 63 defer m3.Stop() 64 m4 := runTestFuncConsumerGroupMember(t, groupID, "M4", 0, nil) 65 defer m4.Stop() 66 67 m1.WaitForClaims(map[string]int{"test.4": 1}) 68 m2.WaitForClaims(map[string]int{"test.4": 1}) 69 m3.WaitForClaims(map[string]int{"test.4": 1}) 70 m4.WaitForClaims(map[string]int{"test.4": 1}) 71 72 // start M5 73 m5 := runTestFuncConsumerGroupMember(t, groupID, "M5", 0, nil) 74 defer m5.Stop() 75 m5.WaitForState(1) 76 m5.AssertNoErrs() 77 78 // assert that claims are shared among both members 79 m4.AssertCleanShutdown() 80 m5.WaitForState(2) 81 m5.WaitForClaims(map[string]int{"test.4": 1}) 82 83 // shutdown everything 84 m1.AssertCleanShutdown() 85 m2.AssertCleanShutdown() 86 m3.AssertCleanShutdown() 87 m5.AssertCleanShutdown() 88} 89 90func TestFuncConsumerGroupFuzzy(t *testing.T) { 91 checkKafkaVersion(t, "0.10.2") 92 setupFunctionalTest(t) 93 defer teardownFunctionalTest(t) 94 95 if err := testFuncConsumerGroupFuzzySeed("test.4"); err != nil { 96 t.Fatal(err) 97 } 98 99 groupID := testFuncConsumerGroupID(t) 100 sink := &testFuncConsumerGroupSink{msgs: make(chan testFuncConsumerGroupMessage, 20000)} 101 waitForMessages := func(t *testing.T, n int) { 102 t.Helper() 103 104 for i := 0; i < 600; i++ { 105 if sink.Len() >= n { 106 break 107 } 108 time.Sleep(100 * time.Millisecond) 109 } 110 if sz := sink.Len(); sz < n { 111 log.Fatalf("expected to consume %d messages, but consumed %d", n, sz) 112 } 113 } 114 115 defer runTestFuncConsumerGroupMember(t, groupID, "M1", 1500, sink).Stop() 116 defer runTestFuncConsumerGroupMember(t, groupID, "M2", 3000, sink).Stop() 117 defer runTestFuncConsumerGroupMember(t, groupID, "M3", 1500, sink).Stop() 118 defer runTestFuncConsumerGroupMember(t, groupID, "M4", 200, sink).Stop() 119 defer runTestFuncConsumerGroupMember(t, groupID, "M5", 100, sink).Stop() 120 waitForMessages(t, 3000) 121 122 defer runTestFuncConsumerGroupMember(t, groupID, "M6", 300, sink).Stop() 123 defer runTestFuncConsumerGroupMember(t, groupID, "M7", 400, sink).Stop() 124 defer runTestFuncConsumerGroupMember(t, groupID, "M8", 500, sink).Stop() 125 defer runTestFuncConsumerGroupMember(t, groupID, "M9", 2000, sink).Stop() 126 waitForMessages(t, 8000) 127 128 defer runTestFuncConsumerGroupMember(t, groupID, "M10", 1000, sink).Stop() 129 waitForMessages(t, 10000) 130 131 defer runTestFuncConsumerGroupMember(t, groupID, "M11", 1000, sink).Stop() 132 defer runTestFuncConsumerGroupMember(t, groupID, "M12", 2500, sink).Stop() 133 waitForMessages(t, 12000) 134 135 defer runTestFuncConsumerGroupMember(t, groupID, "M13", 1000, sink).Stop() 136 waitForMessages(t, 15000) 137 138 if umap := sink.Close(); len(umap) != 15000 { 139 dupes := make(map[string][]string) 140 for k, v := range umap { 141 if len(v) > 1 { 142 dupes[k] = v 143 } 144 } 145 t.Fatalf("expected %d unique messages to be consumed but got %d, including %d duplicates:\n%v", 15000, len(umap), len(dupes), dupes) 146 } 147} 148 149// -------------------------------------------------------------------- 150 151func testFuncConsumerGroupID(t *testing.T) string { 152 return fmt.Sprintf("sarama.%s%d", t.Name(), time.Now().UnixNano()) 153} 154 155func testFuncConsumerGroupFuzzySeed(topic string) error { 156 client, err := NewClient(kafkaBrokers, nil) 157 if err != nil { 158 return err 159 } 160 defer func() { _ = client.Close() }() 161 162 total := int64(0) 163 for pn := int32(0); pn < 4; pn++ { 164 newest, err := client.GetOffset(topic, pn, OffsetNewest) 165 if err != nil { 166 return err 167 } 168 oldest, err := client.GetOffset(topic, pn, OffsetOldest) 169 if err != nil { 170 return err 171 } 172 total = total + newest - oldest 173 } 174 if total >= 21000 { 175 return nil 176 } 177 178 producer, err := NewAsyncProducerFromClient(client) 179 if err != nil { 180 return err 181 } 182 for i := total; i < 21000; i++ { 183 producer.Input() <- &ProducerMessage{Topic: topic, Value: ByteEncoder([]byte("testdata"))} 184 } 185 return producer.Close() 186} 187 188type testFuncConsumerGroupMessage struct { 189 ClientID string 190 *ConsumerMessage 191} 192 193type testFuncConsumerGroupSink struct { 194 msgs chan testFuncConsumerGroupMessage 195 count int32 196} 197 198func (s *testFuncConsumerGroupSink) Len() int { 199 if s == nil { 200 return -1 201 } 202 return int(atomic.LoadInt32(&s.count)) 203} 204 205func (s *testFuncConsumerGroupSink) Push(clientID string, m *ConsumerMessage) { 206 if s != nil { 207 s.msgs <- testFuncConsumerGroupMessage{ClientID: clientID, ConsumerMessage: m} 208 atomic.AddInt32(&s.count, 1) 209 } 210} 211 212func (s *testFuncConsumerGroupSink) Close() map[string][]string { 213 close(s.msgs) 214 215 res := make(map[string][]string) 216 for msg := range s.msgs { 217 key := fmt.Sprintf("%s-%d:%d", msg.Topic, msg.Partition, msg.Offset) 218 res[key] = append(res[key], msg.ClientID) 219 } 220 return res 221} 222 223type testFuncConsumerGroupMember struct { 224 ConsumerGroup 225 clientID string 226 claims map[string]int 227 state int32 228 handlers int32 229 errs []error 230 maxMessages int32 231 isCapped bool 232 sink *testFuncConsumerGroupSink 233 234 t *testing.T 235 mu sync.RWMutex 236} 237 238func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxMessages int32, sink *testFuncConsumerGroupSink, topics ...string) *testFuncConsumerGroupMember { 239 t.Helper() 240 241 config := NewConfig() 242 config.ClientID = clientID 243 config.Version = V0_10_2_0 244 config.Consumer.Return.Errors = true 245 config.Consumer.Offsets.Initial = OffsetOldest 246 config.Consumer.Group.Rebalance.Timeout = 10 * time.Second 247 248 group, err := NewConsumerGroup(kafkaBrokers, groupID, config) 249 if err != nil { 250 t.Fatal(err) 251 return nil 252 } 253 254 if len(topics) == 0 { 255 topics = []string{"test.4"} 256 } 257 258 member := &testFuncConsumerGroupMember{ 259 ConsumerGroup: group, 260 clientID: clientID, 261 claims: make(map[string]int), 262 maxMessages: maxMessages, 263 isCapped: maxMessages != 0, 264 sink: sink, 265 t: t, 266 } 267 go member.loop(topics) 268 return member 269} 270 271func (m *testFuncConsumerGroupMember) AssertCleanShutdown() { 272 m.t.Helper() 273 274 if err := m.Close(); err != nil { 275 m.t.Fatalf("unexpected error on Close(): %v", err) 276 } 277 m.WaitForState(4) 278 m.WaitForHandlers(0) 279 m.AssertNoErrs() 280} 281 282func (m *testFuncConsumerGroupMember) AssertNoErrs() { 283 m.t.Helper() 284 285 var errs []error 286 m.mu.RLock() 287 errs = append(errs, m.errs...) 288 m.mu.RUnlock() 289 290 if len(errs) != 0 { 291 m.t.Fatalf("unexpected consumer errors: %v", errs) 292 } 293} 294 295func (m *testFuncConsumerGroupMember) WaitForState(expected int32) { 296 m.t.Helper() 297 298 m.waitFor("state", expected, func() (interface{}, error) { 299 return atomic.LoadInt32(&m.state), nil 300 }) 301} 302 303func (m *testFuncConsumerGroupMember) WaitForHandlers(expected int) { 304 m.t.Helper() 305 306 m.waitFor("handlers", expected, func() (interface{}, error) { 307 return int(atomic.LoadInt32(&m.handlers)), nil 308 }) 309} 310 311func (m *testFuncConsumerGroupMember) WaitForClaims(expected map[string]int) { 312 m.t.Helper() 313 314 m.waitFor("claims", expected, func() (interface{}, error) { 315 m.mu.RLock() 316 claims := m.claims 317 m.mu.RUnlock() 318 return claims, nil 319 }) 320} 321 322func (m *testFuncConsumerGroupMember) Stop() { _ = m.Close() } 323 324func (m *testFuncConsumerGroupMember) Setup(s ConsumerGroupSession) error { 325 // store claims 326 claims := make(map[string]int) 327 for topic, partitions := range s.Claims() { 328 claims[topic] = len(partitions) 329 } 330 m.mu.Lock() 331 m.claims = claims 332 m.mu.Unlock() 333 334 // enter post-setup state 335 atomic.StoreInt32(&m.state, 2) 336 return nil 337} 338func (m *testFuncConsumerGroupMember) Cleanup(s ConsumerGroupSession) error { 339 // enter post-cleanup state 340 atomic.StoreInt32(&m.state, 3) 341 return nil 342} 343func (m *testFuncConsumerGroupMember) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error { 344 atomic.AddInt32(&m.handlers, 1) 345 defer atomic.AddInt32(&m.handlers, -1) 346 347 for msg := range c.Messages() { 348 if n := atomic.AddInt32(&m.maxMessages, -1); m.isCapped && n < 0 { 349 break 350 } 351 s.MarkMessage(msg, "") 352 m.sink.Push(m.clientID, msg) 353 } 354 return nil 355} 356 357func (m *testFuncConsumerGroupMember) waitFor(kind string, expected interface{}, factory func() (interface{}, error)) { 358 m.t.Helper() 359 360 deadline := time.NewTimer(60 * time.Second) 361 defer deadline.Stop() 362 363 ticker := time.NewTicker(100 * time.Millisecond) 364 defer ticker.Stop() 365 366 var actual interface{} 367 for { 368 var err error 369 if actual, err = factory(); err != nil { 370 m.t.Errorf("failed retrieve value, expected %s %#v but received error %v", kind, expected, err) 371 } 372 373 if reflect.DeepEqual(expected, actual) { 374 return 375 } 376 377 select { 378 case <-deadline.C: 379 m.t.Fatalf("ttl exceeded, expected %s %#v but got %#v", kind, expected, actual) 380 return 381 case <-ticker.C: 382 } 383 } 384} 385 386func (m *testFuncConsumerGroupMember) loop(topics []string) { 387 defer atomic.StoreInt32(&m.state, 4) 388 389 go func() { 390 for err := range m.Errors() { 391 _ = m.Close() 392 393 m.mu.Lock() 394 m.errs = append(m.errs, err) 395 m.mu.Unlock() 396 } 397 }() 398 399 ctx := context.Background() 400 for { 401 // set state to pre-consume 402 atomic.StoreInt32(&m.state, 1) 403 404 if err := m.Consume(ctx, topics, m); err == ErrClosedConsumerGroup { 405 return 406 } else if err != nil { 407 m.mu.Lock() 408 m.errs = append(m.errs, err) 409 m.mu.Unlock() 410 return 411 } 412 413 // return if capped 414 if n := atomic.LoadInt32(&m.maxMessages); m.isCapped && n < 0 { 415 return 416 } 417 } 418} 419