1// +build go1.9
2
3package sarama
4
5import (
6	"context"
7	"fmt"
8	"log"
9	"reflect"
10	"sync"
11	"sync/atomic"
12	"testing"
13	"time"
14)
15
16func TestFuncConsumerGroupPartitioning(t *testing.T) {
17	checkKafkaVersion(t, "0.10.2")
18	setupFunctionalTest(t)
19	defer teardownFunctionalTest(t)
20
21	groupID := testFuncConsumerGroupID(t)
22
23	// start M1
24	m1 := runTestFuncConsumerGroupMember(t, groupID, "M1", 0, nil)
25	defer m1.Stop()
26	m1.WaitForState(2)
27	m1.WaitForClaims(map[string]int{"test.4": 4})
28	m1.WaitForHandlers(4)
29
30	// start M2
31	m2 := runTestFuncConsumerGroupMember(t, groupID, "M2", 0, nil, "test.1", "test.4")
32	defer m2.Stop()
33	m2.WaitForState(2)
34
35	// assert that claims are shared among both members
36	m1.WaitForClaims(map[string]int{"test.4": 2})
37	m1.WaitForHandlers(2)
38	m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 2})
39	m2.WaitForHandlers(3)
40
41	// shutdown M1, wait for M2 to take over
42	m1.AssertCleanShutdown()
43	m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 4})
44	m2.WaitForHandlers(5)
45
46	// shutdown M2
47	m2.AssertCleanShutdown()
48}
49
50func TestFuncConsumerGroupExcessConsumers(t *testing.T) {
51	checkKafkaVersion(t, "0.10.2")
52	setupFunctionalTest(t)
53	defer teardownFunctionalTest(t)
54
55	groupID := testFuncConsumerGroupID(t)
56
57	// start members
58	m1 := runTestFuncConsumerGroupMember(t, groupID, "M1", 0, nil)
59	defer m1.Stop()
60	m2 := runTestFuncConsumerGroupMember(t, groupID, "M2", 0, nil)
61	defer m2.Stop()
62	m3 := runTestFuncConsumerGroupMember(t, groupID, "M3", 0, nil)
63	defer m3.Stop()
64	m4 := runTestFuncConsumerGroupMember(t, groupID, "M4", 0, nil)
65	defer m4.Stop()
66
67	m1.WaitForClaims(map[string]int{"test.4": 1})
68	m2.WaitForClaims(map[string]int{"test.4": 1})
69	m3.WaitForClaims(map[string]int{"test.4": 1})
70	m4.WaitForClaims(map[string]int{"test.4": 1})
71
72	// start M5
73	m5 := runTestFuncConsumerGroupMember(t, groupID, "M5", 0, nil)
74	defer m5.Stop()
75	m5.WaitForState(1)
76	m5.AssertNoErrs()
77
78	// assert that claims are shared among both members
79	m4.AssertCleanShutdown()
80	m5.WaitForState(2)
81	m5.WaitForClaims(map[string]int{"test.4": 1})
82
83	// shutdown everything
84	m1.AssertCleanShutdown()
85	m2.AssertCleanShutdown()
86	m3.AssertCleanShutdown()
87	m5.AssertCleanShutdown()
88}
89
90func TestFuncConsumerGroupFuzzy(t *testing.T) {
91	checkKafkaVersion(t, "0.10.2")
92	setupFunctionalTest(t)
93	defer teardownFunctionalTest(t)
94
95	if err := testFuncConsumerGroupFuzzySeed("test.4"); err != nil {
96		t.Fatal(err)
97	}
98
99	groupID := testFuncConsumerGroupID(t)
100	sink := &testFuncConsumerGroupSink{msgs: make(chan testFuncConsumerGroupMessage, 20000)}
101	waitForMessages := func(t *testing.T, n int) {
102		t.Helper()
103
104		for i := 0; i < 600; i++ {
105			if sink.Len() >= n {
106				break
107			}
108			time.Sleep(100 * time.Millisecond)
109		}
110		if sz := sink.Len(); sz < n {
111			log.Fatalf("expected to consume %d messages, but consumed %d", n, sz)
112		}
113	}
114
115	defer runTestFuncConsumerGroupMember(t, groupID, "M1", 1500, sink).Stop()
116	defer runTestFuncConsumerGroupMember(t, groupID, "M2", 3000, sink).Stop()
117	defer runTestFuncConsumerGroupMember(t, groupID, "M3", 1500, sink).Stop()
118	defer runTestFuncConsumerGroupMember(t, groupID, "M4", 200, sink).Stop()
119	defer runTestFuncConsumerGroupMember(t, groupID, "M5", 100, sink).Stop()
120	waitForMessages(t, 3000)
121
122	defer runTestFuncConsumerGroupMember(t, groupID, "M6", 300, sink).Stop()
123	defer runTestFuncConsumerGroupMember(t, groupID, "M7", 400, sink).Stop()
124	defer runTestFuncConsumerGroupMember(t, groupID, "M8", 500, sink).Stop()
125	defer runTestFuncConsumerGroupMember(t, groupID, "M9", 2000, sink).Stop()
126	waitForMessages(t, 8000)
127
128	defer runTestFuncConsumerGroupMember(t, groupID, "M10", 1000, sink).Stop()
129	waitForMessages(t, 10000)
130
131	defer runTestFuncConsumerGroupMember(t, groupID, "M11", 1000, sink).Stop()
132	defer runTestFuncConsumerGroupMember(t, groupID, "M12", 2500, sink).Stop()
133	waitForMessages(t, 12000)
134
135	defer runTestFuncConsumerGroupMember(t, groupID, "M13", 1000, sink).Stop()
136	waitForMessages(t, 15000)
137
138	if umap := sink.Close(); len(umap) != 15000 {
139		dupes := make(map[string][]string)
140		for k, v := range umap {
141			if len(v) > 1 {
142				dupes[k] = v
143			}
144		}
145		t.Fatalf("expected %d unique messages to be consumed but got %d, including %d duplicates:\n%v", 15000, len(umap), len(dupes), dupes)
146	}
147}
148
149// --------------------------------------------------------------------
150
151func testFuncConsumerGroupID(t *testing.T) string {
152	return fmt.Sprintf("sarama.%s%d", t.Name(), time.Now().UnixNano())
153}
154
155func testFuncConsumerGroupFuzzySeed(topic string) error {
156	client, err := NewClient(kafkaBrokers, nil)
157	if err != nil {
158		return err
159	}
160	defer func() { _ = client.Close() }()
161
162	total := int64(0)
163	for pn := int32(0); pn < 4; pn++ {
164		newest, err := client.GetOffset(topic, pn, OffsetNewest)
165		if err != nil {
166			return err
167		}
168		oldest, err := client.GetOffset(topic, pn, OffsetOldest)
169		if err != nil {
170			return err
171		}
172		total = total + newest - oldest
173	}
174	if total >= 21000 {
175		return nil
176	}
177
178	producer, err := NewAsyncProducerFromClient(client)
179	if err != nil {
180		return err
181	}
182	for i := total; i < 21000; i++ {
183		producer.Input() <- &ProducerMessage{Topic: topic, Value: ByteEncoder([]byte("testdata"))}
184	}
185	return producer.Close()
186}
187
188type testFuncConsumerGroupMessage struct {
189	ClientID string
190	*ConsumerMessage
191}
192
193type testFuncConsumerGroupSink struct {
194	msgs  chan testFuncConsumerGroupMessage
195	count int32
196}
197
198func (s *testFuncConsumerGroupSink) Len() int {
199	if s == nil {
200		return -1
201	}
202	return int(atomic.LoadInt32(&s.count))
203}
204
205func (s *testFuncConsumerGroupSink) Push(clientID string, m *ConsumerMessage) {
206	if s != nil {
207		s.msgs <- testFuncConsumerGroupMessage{ClientID: clientID, ConsumerMessage: m}
208		atomic.AddInt32(&s.count, 1)
209	}
210}
211
212func (s *testFuncConsumerGroupSink) Close() map[string][]string {
213	close(s.msgs)
214
215	res := make(map[string][]string)
216	for msg := range s.msgs {
217		key := fmt.Sprintf("%s-%d:%d", msg.Topic, msg.Partition, msg.Offset)
218		res[key] = append(res[key], msg.ClientID)
219	}
220	return res
221}
222
223type testFuncConsumerGroupMember struct {
224	ConsumerGroup
225	clientID    string
226	claims      map[string]int
227	state       int32
228	handlers    int32
229	errs        []error
230	maxMessages int32
231	isCapped    bool
232	sink        *testFuncConsumerGroupSink
233
234	t  *testing.T
235	mu sync.RWMutex
236}
237
238func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxMessages int32, sink *testFuncConsumerGroupSink, topics ...string) *testFuncConsumerGroupMember {
239	t.Helper()
240
241	config := NewConfig()
242	config.ClientID = clientID
243	config.Version = V0_10_2_0
244	config.Consumer.Return.Errors = true
245	config.Consumer.Offsets.Initial = OffsetOldest
246	config.Consumer.Group.Rebalance.Timeout = 10 * time.Second
247
248	group, err := NewConsumerGroup(kafkaBrokers, groupID, config)
249	if err != nil {
250		t.Fatal(err)
251		return nil
252	}
253
254	if len(topics) == 0 {
255		topics = []string{"test.4"}
256	}
257
258	member := &testFuncConsumerGroupMember{
259		ConsumerGroup: group,
260		clientID:      clientID,
261		claims:        make(map[string]int),
262		maxMessages:   maxMessages,
263		isCapped:      maxMessages != 0,
264		sink:          sink,
265		t:             t,
266	}
267	go member.loop(topics)
268	return member
269}
270
271func (m *testFuncConsumerGroupMember) AssertCleanShutdown() {
272	m.t.Helper()
273
274	if err := m.Close(); err != nil {
275		m.t.Fatalf("unexpected error on Close(): %v", err)
276	}
277	m.WaitForState(4)
278	m.WaitForHandlers(0)
279	m.AssertNoErrs()
280}
281
282func (m *testFuncConsumerGroupMember) AssertNoErrs() {
283	m.t.Helper()
284
285	var errs []error
286	m.mu.RLock()
287	errs = append(errs, m.errs...)
288	m.mu.RUnlock()
289
290	if len(errs) != 0 {
291		m.t.Fatalf("unexpected consumer errors: %v", errs)
292	}
293}
294
295func (m *testFuncConsumerGroupMember) WaitForState(expected int32) {
296	m.t.Helper()
297
298	m.waitFor("state", expected, func() (interface{}, error) {
299		return atomic.LoadInt32(&m.state), nil
300	})
301}
302
303func (m *testFuncConsumerGroupMember) WaitForHandlers(expected int) {
304	m.t.Helper()
305
306	m.waitFor("handlers", expected, func() (interface{}, error) {
307		return int(atomic.LoadInt32(&m.handlers)), nil
308	})
309}
310
311func (m *testFuncConsumerGroupMember) WaitForClaims(expected map[string]int) {
312	m.t.Helper()
313
314	m.waitFor("claims", expected, func() (interface{}, error) {
315		m.mu.RLock()
316		claims := m.claims
317		m.mu.RUnlock()
318		return claims, nil
319	})
320}
321
322func (m *testFuncConsumerGroupMember) Stop() { _ = m.Close() }
323
324func (m *testFuncConsumerGroupMember) Setup(s ConsumerGroupSession) error {
325	// store claims
326	claims := make(map[string]int)
327	for topic, partitions := range s.Claims() {
328		claims[topic] = len(partitions)
329	}
330	m.mu.Lock()
331	m.claims = claims
332	m.mu.Unlock()
333
334	// enter post-setup state
335	atomic.StoreInt32(&m.state, 2)
336	return nil
337}
338func (m *testFuncConsumerGroupMember) Cleanup(s ConsumerGroupSession) error {
339	// enter post-cleanup state
340	atomic.StoreInt32(&m.state, 3)
341	return nil
342}
343func (m *testFuncConsumerGroupMember) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error {
344	atomic.AddInt32(&m.handlers, 1)
345	defer atomic.AddInt32(&m.handlers, -1)
346
347	for msg := range c.Messages() {
348		if n := atomic.AddInt32(&m.maxMessages, -1); m.isCapped && n < 0 {
349			break
350		}
351		s.MarkMessage(msg, "")
352		m.sink.Push(m.clientID, msg)
353	}
354	return nil
355}
356
357func (m *testFuncConsumerGroupMember) waitFor(kind string, expected interface{}, factory func() (interface{}, error)) {
358	m.t.Helper()
359
360	deadline := time.NewTimer(60 * time.Second)
361	defer deadline.Stop()
362
363	ticker := time.NewTicker(100 * time.Millisecond)
364	defer ticker.Stop()
365
366	var actual interface{}
367	for {
368		var err error
369		if actual, err = factory(); err != nil {
370			m.t.Errorf("failed retrieve value, expected %s %#v but received error %v", kind, expected, err)
371		}
372
373		if reflect.DeepEqual(expected, actual) {
374			return
375		}
376
377		select {
378		case <-deadline.C:
379			m.t.Fatalf("ttl exceeded, expected %s %#v but got %#v", kind, expected, actual)
380			return
381		case <-ticker.C:
382		}
383	}
384}
385
386func (m *testFuncConsumerGroupMember) loop(topics []string) {
387	defer atomic.StoreInt32(&m.state, 4)
388
389	go func() {
390		for err := range m.Errors() {
391			_ = m.Close()
392
393			m.mu.Lock()
394			m.errs = append(m.errs, err)
395			m.mu.Unlock()
396		}
397	}()
398
399	ctx := context.Background()
400	for {
401		// set state to pre-consume
402		atomic.StoreInt32(&m.state, 1)
403
404		if err := m.Consume(ctx, topics, m); err == ErrClosedConsumerGroup {
405			return
406		} else if err != nil {
407			m.mu.Lock()
408			m.errs = append(m.errs, err)
409			m.mu.Unlock()
410			return
411		}
412
413		// return if capped
414		if n := atomic.LoadInt32(&m.maxMessages); m.isCapped && n < 0 {
415			return
416		}
417	}
418}
419