1package sarama
2
3import (
4	"log"
5	"os"
6	"os/signal"
7	"reflect"
8	"sync"
9	"sync/atomic"
10	"testing"
11	"time"
12)
13
14var testMsg = StringEncoder("Foo")
15
16// If a particular offset is provided then messages are consumed starting from
17// that offset.
18func TestConsumerOffsetManual(t *testing.T) {
19	// Given
20	broker0 := NewMockBroker(t, 0)
21
22	mockFetchResponse := NewMockFetchResponse(t, 1)
23	for i := 0; i < 10; i++ {
24		mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
25	}
26
27	broker0.SetHandlerByMap(map[string]MockResponse{
28		"MetadataRequest": NewMockMetadataResponse(t).
29			SetBroker(broker0.Addr(), broker0.BrokerID()).
30			SetLeader("my_topic", 0, broker0.BrokerID()),
31		"OffsetRequest": NewMockOffsetResponse(t).
32			SetOffset("my_topic", 0, OffsetOldest, 0).
33			SetOffset("my_topic", 0, OffsetNewest, 2345),
34		"FetchRequest": mockFetchResponse,
35	})
36
37	// When
38	master, err := NewConsumer([]string{broker0.Addr()}, nil)
39	if err != nil {
40		t.Fatal(err)
41	}
42
43	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
44	if err != nil {
45		t.Fatal(err)
46	}
47
48	// Then: messages starting from offset 1234 are consumed.
49	for i := 0; i < 10; i++ {
50		select {
51		case message := <-consumer.Messages():
52			assertMessageOffset(t, message, int64(i+1234))
53		case err := <-consumer.Errors():
54			t.Error(err)
55		}
56	}
57
58	safeClose(t, consumer)
59	safeClose(t, master)
60	broker0.Close()
61}
62
63// If `OffsetNewest` is passed as the initial offset then the first consumed
64// message is indeed corresponds to the offset that broker claims to be the
65// newest in its metadata response.
66func TestConsumerOffsetNewest(t *testing.T) {
67	// Given
68	broker0 := NewMockBroker(t, 0)
69	broker0.SetHandlerByMap(map[string]MockResponse{
70		"MetadataRequest": NewMockMetadataResponse(t).
71			SetBroker(broker0.Addr(), broker0.BrokerID()).
72			SetLeader("my_topic", 0, broker0.BrokerID()),
73		"OffsetRequest": NewMockOffsetResponse(t).
74			SetOffset("my_topic", 0, OffsetNewest, 10).
75			SetOffset("my_topic", 0, OffsetOldest, 7),
76		"FetchRequest": NewMockFetchResponse(t, 1).
77			SetMessage("my_topic", 0, 9, testMsg).
78			SetMessage("my_topic", 0, 10, testMsg).
79			SetMessage("my_topic", 0, 11, testMsg).
80			SetHighWaterMark("my_topic", 0, 14),
81	})
82
83	master, err := NewConsumer([]string{broker0.Addr()}, nil)
84	if err != nil {
85		t.Fatal(err)
86	}
87
88	// When
89	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
90	if err != nil {
91		t.Fatal(err)
92	}
93
94	// Then
95	assertMessageOffset(t, <-consumer.Messages(), 10)
96	if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
97		t.Errorf("Expected high water mark offset 14, found %d", hwmo)
98	}
99
100	safeClose(t, consumer)
101	safeClose(t, master)
102	broker0.Close()
103}
104
105// It is possible to close a partition consumer and create the same anew.
106func TestConsumerRecreate(t *testing.T) {
107	// Given
108	broker0 := NewMockBroker(t, 0)
109	broker0.SetHandlerByMap(map[string]MockResponse{
110		"MetadataRequest": NewMockMetadataResponse(t).
111			SetBroker(broker0.Addr(), broker0.BrokerID()).
112			SetLeader("my_topic", 0, broker0.BrokerID()),
113		"OffsetRequest": NewMockOffsetResponse(t).
114			SetOffset("my_topic", 0, OffsetOldest, 0).
115			SetOffset("my_topic", 0, OffsetNewest, 1000),
116		"FetchRequest": NewMockFetchResponse(t, 1).
117			SetMessage("my_topic", 0, 10, testMsg),
118	})
119
120	c, err := NewConsumer([]string{broker0.Addr()}, nil)
121	if err != nil {
122		t.Fatal(err)
123	}
124
125	pc, err := c.ConsumePartition("my_topic", 0, 10)
126	if err != nil {
127		t.Fatal(err)
128	}
129	assertMessageOffset(t, <-pc.Messages(), 10)
130
131	// When
132	safeClose(t, pc)
133	pc, err = c.ConsumePartition("my_topic", 0, 10)
134	if err != nil {
135		t.Fatal(err)
136	}
137
138	// Then
139	assertMessageOffset(t, <-pc.Messages(), 10)
140
141	safeClose(t, pc)
142	safeClose(t, c)
143	broker0.Close()
144}
145
146// An attempt to consume the same partition twice should fail.
147func TestConsumerDuplicate(t *testing.T) {
148	// Given
149	broker0 := NewMockBroker(t, 0)
150	broker0.SetHandlerByMap(map[string]MockResponse{
151		"MetadataRequest": NewMockMetadataResponse(t).
152			SetBroker(broker0.Addr(), broker0.BrokerID()).
153			SetLeader("my_topic", 0, broker0.BrokerID()),
154		"OffsetRequest": NewMockOffsetResponse(t).
155			SetOffset("my_topic", 0, OffsetOldest, 0).
156			SetOffset("my_topic", 0, OffsetNewest, 1000),
157		"FetchRequest": NewMockFetchResponse(t, 1),
158	})
159
160	config := NewConfig()
161	config.ChannelBufferSize = 0
162	c, err := NewConsumer([]string{broker0.Addr()}, config)
163	if err != nil {
164		t.Fatal(err)
165	}
166
167	pc1, err := c.ConsumePartition("my_topic", 0, 0)
168	if err != nil {
169		t.Fatal(err)
170	}
171
172	// When
173	pc2, err := c.ConsumePartition("my_topic", 0, 0)
174
175	// Then
176	if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
177		t.Fatal("A partition cannot be consumed twice at the same time")
178	}
179
180	safeClose(t, pc1)
181	safeClose(t, c)
182	broker0.Close()
183}
184
185func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
186	// Given
187	broker0 := NewMockBroker(t, 100)
188
189	// Stage 1: my_topic/0 served by broker0
190	Logger.Printf("    STAGE 1")
191
192	broker0.SetHandlerByMap(map[string]MockResponse{
193		"MetadataRequest": NewMockMetadataResponse(t).
194			SetBroker(broker0.Addr(), broker0.BrokerID()).
195			SetLeader("my_topic", 0, broker0.BrokerID()),
196		"OffsetRequest": NewMockOffsetResponse(t).
197			SetOffset("my_topic", 0, OffsetOldest, 123).
198			SetOffset("my_topic", 0, OffsetNewest, 1000),
199		"FetchRequest": NewMockFetchResponse(t, 1).
200			SetMessage("my_topic", 0, 123, testMsg),
201	})
202
203	c, err := NewConsumer([]string{broker0.Addr()}, config)
204	if err != nil {
205		t.Fatal(err)
206	}
207
208	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
209	if err != nil {
210		t.Fatal(err)
211	}
212
213	assertMessageOffset(t, <-pc.Messages(), 123)
214
215	// Stage 2: broker0 says that it is no longer the leader for my_topic/0,
216	// but the requests to retrieve metadata fail with network timeout.
217	Logger.Printf("    STAGE 2")
218
219	fetchResponse2 := &FetchResponse{}
220	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
221
222	broker0.SetHandlerByMap(map[string]MockResponse{
223		"FetchRequest": NewMockWrapper(fetchResponse2),
224	})
225
226	if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
227		t.Errorf("Unexpected error: %v", consErr.Err)
228	}
229
230	// Stage 3: finally the metadata returned by broker0 tells that broker1 is
231	// a new leader for my_topic/0. Consumption resumes.
232
233	Logger.Printf("    STAGE 3")
234
235	broker1 := NewMockBroker(t, 101)
236
237	broker1.SetHandlerByMap(map[string]MockResponse{
238		"FetchRequest": NewMockFetchResponse(t, 1).
239			SetMessage("my_topic", 0, 124, testMsg),
240	})
241	broker0.SetHandlerByMap(map[string]MockResponse{
242		"MetadataRequest": NewMockMetadataResponse(t).
243			SetBroker(broker0.Addr(), broker0.BrokerID()).
244			SetBroker(broker1.Addr(), broker1.BrokerID()).
245			SetLeader("my_topic", 0, broker1.BrokerID()),
246	})
247
248	assertMessageOffset(t, <-pc.Messages(), 124)
249
250	safeClose(t, pc)
251	safeClose(t, c)
252	broker1.Close()
253	broker0.Close()
254}
255
256// If consumer fails to refresh metadata it keeps retrying with frequency
257// specified by `Config.Consumer.Retry.Backoff`.
258func TestConsumerLeaderRefreshError(t *testing.T) {
259	config := NewConfig()
260	config.Net.ReadTimeout = 100 * time.Millisecond
261	config.Consumer.Retry.Backoff = 200 * time.Millisecond
262	config.Consumer.Return.Errors = true
263	config.Metadata.Retry.Max = 0
264
265	runConsumerLeaderRefreshErrorTestWithConfig(t, config)
266}
267
268func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
269	var calls int32 = 0
270
271	config := NewConfig()
272	config.Net.ReadTimeout = 100 * time.Millisecond
273	config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
274		atomic.AddInt32(&calls, 1)
275		return 200 * time.Millisecond
276	}
277	config.Consumer.Return.Errors = true
278	config.Metadata.Retry.Max = 0
279
280	runConsumerLeaderRefreshErrorTestWithConfig(t, config)
281
282	// we expect at least one call to our backoff function
283	if calls == 0 {
284		t.Fail()
285	}
286}
287
288func TestConsumerInvalidTopic(t *testing.T) {
289	// Given
290	broker0 := NewMockBroker(t, 100)
291	broker0.SetHandlerByMap(map[string]MockResponse{
292		"MetadataRequest": NewMockMetadataResponse(t).
293			SetBroker(broker0.Addr(), broker0.BrokerID()),
294	})
295
296	c, err := NewConsumer([]string{broker0.Addr()}, nil)
297	if err != nil {
298		t.Fatal(err)
299	}
300
301	// When
302	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
303
304	// Then
305	if pc != nil || err != ErrUnknownTopicOrPartition {
306		t.Errorf("Should fail with, err=%v", err)
307	}
308
309	safeClose(t, c)
310	broker0.Close()
311}
312
313// Nothing bad happens if a partition consumer that has no leader assigned at
314// the moment is closed.
315func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
316	// Given
317	broker0 := NewMockBroker(t, 100)
318	broker0.SetHandlerByMap(map[string]MockResponse{
319		"MetadataRequest": NewMockMetadataResponse(t).
320			SetBroker(broker0.Addr(), broker0.BrokerID()).
321			SetLeader("my_topic", 0, broker0.BrokerID()),
322		"OffsetRequest": NewMockOffsetResponse(t).
323			SetOffset("my_topic", 0, OffsetOldest, 123).
324			SetOffset("my_topic", 0, OffsetNewest, 1000),
325		"FetchRequest": NewMockFetchResponse(t, 1).
326			SetMessage("my_topic", 0, 123, testMsg),
327	})
328
329	config := NewConfig()
330	config.Net.ReadTimeout = 100 * time.Millisecond
331	config.Consumer.Retry.Backoff = 100 * time.Millisecond
332	config.Consumer.Return.Errors = true
333	config.Metadata.Retry.Max = 0
334	c, err := NewConsumer([]string{broker0.Addr()}, config)
335	if err != nil {
336		t.Fatal(err)
337	}
338
339	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
340	if err != nil {
341		t.Fatal(err)
342	}
343
344	assertMessageOffset(t, <-pc.Messages(), 123)
345
346	// broker0 says that it is no longer the leader for my_topic/0, but the
347	// requests to retrieve metadata fail with network timeout.
348	fetchResponse2 := &FetchResponse{}
349	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
350
351	broker0.SetHandlerByMap(map[string]MockResponse{
352		"FetchRequest": NewMockWrapper(fetchResponse2),
353	})
354
355	// When
356	if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
357		t.Errorf("Unexpected error: %v", consErr.Err)
358	}
359
360	// Then: the partition consumer can be closed without any problem.
361	safeClose(t, pc)
362	safeClose(t, c)
363	broker0.Close()
364}
365
366// If the initial offset passed on partition consumer creation is out of the
367// actual offset range for the partition, then the partition consumer stops
368// immediately closing its output channels.
369func TestConsumerShutsDownOutOfRange(t *testing.T) {
370	// Given
371	broker0 := NewMockBroker(t, 0)
372	fetchResponse := new(FetchResponse)
373	fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
374	broker0.SetHandlerByMap(map[string]MockResponse{
375		"MetadataRequest": NewMockMetadataResponse(t).
376			SetBroker(broker0.Addr(), broker0.BrokerID()).
377			SetLeader("my_topic", 0, broker0.BrokerID()),
378		"OffsetRequest": NewMockOffsetResponse(t).
379			SetOffset("my_topic", 0, OffsetNewest, 1234).
380			SetOffset("my_topic", 0, OffsetOldest, 7),
381		"FetchRequest": NewMockWrapper(fetchResponse),
382	})
383
384	master, err := NewConsumer([]string{broker0.Addr()}, nil)
385	if err != nil {
386		t.Fatal(err)
387	}
388
389	// When
390	consumer, err := master.ConsumePartition("my_topic", 0, 101)
391	if err != nil {
392		t.Fatal(err)
393	}
394
395	// Then: consumer should shut down closing its messages and errors channels.
396	if _, ok := <-consumer.Messages(); ok {
397		t.Error("Expected the consumer to shut down")
398	}
399	safeClose(t, consumer)
400
401	safeClose(t, master)
402	broker0.Close()
403}
404
405// If a fetch response contains messages with offsets that are smaller then
406// requested, then such messages are ignored.
407func TestConsumerExtraOffsets(t *testing.T) {
408	// Given
409	legacyFetchResponse := &FetchResponse{}
410	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
411	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
412	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
413	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
414	newFetchResponse := &FetchResponse{Version: 4}
415	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
416	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
417	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
418	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
419	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
420	newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
421	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
422		var offsetResponseVersion int16
423		cfg := NewConfig()
424		cfg.Consumer.Return.Errors = true
425		if fetchResponse1.Version >= 4 {
426			cfg.Version = V0_11_0_0
427			offsetResponseVersion = 1
428		}
429
430		broker0 := NewMockBroker(t, 0)
431		fetchResponse2 := &FetchResponse{}
432		fetchResponse2.Version = fetchResponse1.Version
433		fetchResponse2.AddError("my_topic", 0, ErrNoError)
434		broker0.SetHandlerByMap(map[string]MockResponse{
435			"MetadataRequest": NewMockMetadataResponse(t).
436				SetBroker(broker0.Addr(), broker0.BrokerID()).
437				SetLeader("my_topic", 0, broker0.BrokerID()),
438			"OffsetRequest": NewMockOffsetResponse(t).
439				SetVersion(offsetResponseVersion).
440				SetOffset("my_topic", 0, OffsetNewest, 1234).
441				SetOffset("my_topic", 0, OffsetOldest, 0),
442			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
443		})
444
445		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
446		if err != nil {
447			t.Fatal(err)
448		}
449
450		// When
451		consumer, err := master.ConsumePartition("my_topic", 0, 3)
452		if err != nil {
453			t.Fatal(err)
454		}
455
456		// Then: messages with offsets 1 and 2 are not returned even though they
457		// are present in the response.
458		select {
459		case msg := <-consumer.Messages():
460			assertMessageOffset(t, msg, 3)
461		case err := <-consumer.Errors():
462			t.Fatal(err)
463		}
464
465		select {
466		case msg := <-consumer.Messages():
467			assertMessageOffset(t, msg, 4)
468		case err := <-consumer.Errors():
469			t.Fatal(err)
470		}
471
472		safeClose(t, consumer)
473		safeClose(t, master)
474		broker0.Close()
475	}
476}
477
478// In some situations broker may return a block containing only
479// messages older then requested, even though there would be
480// more messages if higher offset was requested.
481func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
482	// Given
483	fetchResponse1 := &FetchResponse{Version: 4}
484	fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
485
486	fetchResponse2 := &FetchResponse{Version: 4}
487	fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
488
489	cfg := NewConfig()
490	cfg.Consumer.Return.Errors = true
491	cfg.Version = V0_11_0_0
492
493	broker0 := NewMockBroker(t, 0)
494
495	broker0.SetHandlerByMap(map[string]MockResponse{
496		"MetadataRequest": NewMockMetadataResponse(t).
497			SetBroker(broker0.Addr(), broker0.BrokerID()).
498			SetLeader("my_topic", 0, broker0.BrokerID()),
499		"OffsetRequest": NewMockOffsetResponse(t).
500			SetVersion(1).
501			SetOffset("my_topic", 0, OffsetNewest, 1234).
502			SetOffset("my_topic", 0, OffsetOldest, 0),
503		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
504	})
505
506	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
507	if err != nil {
508		t.Fatal(err)
509	}
510
511	// When
512	consumer, err := master.ConsumePartition("my_topic", 0, 2)
513	if err != nil {
514		t.Fatal(err)
515	}
516
517	select {
518	case msg := <-consumer.Messages():
519		assertMessageOffset(t, msg, 1000000)
520	case err := <-consumer.Errors():
521		t.Fatal(err)
522	}
523
524	safeClose(t, consumer)
525	safeClose(t, master)
526	broker0.Close()
527}
528
529func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
530	// Given
531	fetchResponse1 := &FetchResponse{Version: 4}
532	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
533	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
534
535	cfg := NewConfig()
536	cfg.Version = V0_11_0_0
537
538	broker0 := NewMockBroker(t, 0)
539	fetchResponse2 := &FetchResponse{}
540	fetchResponse2.Version = 4
541	fetchResponse2.AddError("my_topic", 0, ErrNoError)
542	broker0.SetHandlerByMap(map[string]MockResponse{
543		"MetadataRequest": NewMockMetadataResponse(t).
544			SetBroker(broker0.Addr(), broker0.BrokerID()).
545			SetLeader("my_topic", 0, broker0.BrokerID()),
546		"OffsetRequest": NewMockOffsetResponse(t).
547			SetVersion(1).
548			SetOffset("my_topic", 0, OffsetNewest, 1234).
549			SetOffset("my_topic", 0, OffsetOldest, 0),
550		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
551	})
552
553	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
554	if err != nil {
555		t.Fatal(err)
556	}
557
558	// When
559	consumer, err := master.ConsumePartition("my_topic", 0, 1)
560	if err != nil {
561		t.Fatal(err)
562	}
563
564	assertMessageOffset(t, <-consumer.Messages(), 1)
565	assertMessageOffset(t, <-consumer.Messages(), 2)
566
567	safeClose(t, consumer)
568	safeClose(t, master)
569	broker0.Close()
570}
571
572func TestConsumeMessageWithSessionIDs(t *testing.T) {
573	// Given
574	fetchResponse1 := &FetchResponse{Version: 7}
575	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
576	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
577
578	cfg := NewConfig()
579	cfg.Version = V1_1_0_0
580
581	broker0 := NewMockBroker(t, 0)
582	fetchResponse2 := &FetchResponse{}
583	fetchResponse2.Version = 7
584	fetchResponse2.AddError("my_topic", 0, ErrNoError)
585
586	broker0.SetHandlerByMap(map[string]MockResponse{
587		"MetadataRequest": NewMockMetadataResponse(t).
588			SetBroker(broker0.Addr(), broker0.BrokerID()).
589			SetLeader("my_topic", 0, broker0.BrokerID()),
590		"OffsetRequest": NewMockOffsetResponse(t).
591			SetVersion(1).
592			SetOffset("my_topic", 0, OffsetNewest, 1234).
593			SetOffset("my_topic", 0, OffsetOldest, 0),
594		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
595	})
596
597	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
598	if err != nil {
599		t.Fatal(err)
600	}
601
602	// When
603	consumer, err := master.ConsumePartition("my_topic", 0, 1)
604	if err != nil {
605		t.Fatal(err)
606	}
607
608	assertMessageOffset(t, <-consumer.Messages(), 1)
609	assertMessageOffset(t, <-consumer.Messages(), 2)
610
611	safeClose(t, consumer)
612	safeClose(t, master)
613	broker0.Close()
614
615	fetchReq := broker0.History()[3].Request.(*FetchRequest)
616	if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
617		t.Error("Expected session ID to be zero & Epoch to be -1")
618	}
619}
620
621// It is fine if offsets of fetched messages are not sequential (although
622// strictly increasing!).
623func TestConsumerNonSequentialOffsets(t *testing.T) {
624	// Given
625	legacyFetchResponse := &FetchResponse{}
626	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
627	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
628	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
629	newFetchResponse := &FetchResponse{Version: 4}
630	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
631	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
632	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
633	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
634	newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
635	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
636		var offsetResponseVersion int16
637		cfg := NewConfig()
638		if fetchResponse1.Version >= 4 {
639			cfg.Version = V0_11_0_0
640			offsetResponseVersion = 1
641		}
642
643		broker0 := NewMockBroker(t, 0)
644		fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
645		fetchResponse2.AddError("my_topic", 0, ErrNoError)
646		broker0.SetHandlerByMap(map[string]MockResponse{
647			"MetadataRequest": NewMockMetadataResponse(t).
648				SetBroker(broker0.Addr(), broker0.BrokerID()).
649				SetLeader("my_topic", 0, broker0.BrokerID()),
650			"OffsetRequest": NewMockOffsetResponse(t).
651				SetVersion(offsetResponseVersion).
652				SetOffset("my_topic", 0, OffsetNewest, 1234).
653				SetOffset("my_topic", 0, OffsetOldest, 0),
654			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
655		})
656
657		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
658		if err != nil {
659			t.Fatal(err)
660		}
661
662		// When
663		consumer, err := master.ConsumePartition("my_topic", 0, 3)
664		if err != nil {
665			t.Fatal(err)
666		}
667
668		// Then: messages with offsets 1 and 2 are not returned even though they
669		// are present in the response.
670		assertMessageOffset(t, <-consumer.Messages(), 5)
671		assertMessageOffset(t, <-consumer.Messages(), 7)
672		assertMessageOffset(t, <-consumer.Messages(), 11)
673
674		safeClose(t, consumer)
675		safeClose(t, master)
676		broker0.Close()
677	}
678}
679
680// If leadership for a partition is changing then consumer resolves the new
681// leader and switches to it.
682func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
683	// initial setup
684	seedBroker := NewMockBroker(t, 10)
685	leader0 := NewMockBroker(t, 0)
686	leader1 := NewMockBroker(t, 1)
687
688	seedBroker.SetHandlerByMap(map[string]MockResponse{
689		"MetadataRequest": NewMockMetadataResponse(t).
690			SetBroker(leader0.Addr(), leader0.BrokerID()).
691			SetBroker(leader1.Addr(), leader1.BrokerID()).
692			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
693			SetLeader("my_topic", 0, leader0.BrokerID()).
694			SetLeader("my_topic", 1, leader1.BrokerID()),
695	})
696
697	mockOffsetResponse1 := NewMockOffsetResponse(t).
698		SetOffset("my_topic", 0, OffsetOldest, 0).
699		SetOffset("my_topic", 0, OffsetNewest, 1000).
700		SetOffset("my_topic", 1, OffsetOldest, 0).
701		SetOffset("my_topic", 1, OffsetNewest, 1000)
702	leader0.SetHandlerByMap(map[string]MockResponse{
703		"OffsetRequest": mockOffsetResponse1,
704		"FetchRequest":  NewMockFetchResponse(t, 1),
705	})
706	leader1.SetHandlerByMap(map[string]MockResponse{
707		"OffsetRequest": mockOffsetResponse1,
708		"FetchRequest":  NewMockFetchResponse(t, 1),
709	})
710
711	// launch test goroutines
712	config := NewConfig()
713	config.Consumer.Retry.Backoff = 50
714	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
715	if err != nil {
716		t.Fatal(err)
717	}
718
719	// we expect to end up (eventually) consuming exactly ten messages on each partition
720	var wg sync.WaitGroup
721	for i := int32(0); i < 2; i++ {
722		consumer, err := master.ConsumePartition("my_topic", i, 0)
723		if err != nil {
724			t.Error(err)
725		}
726
727		go func(c PartitionConsumer) {
728			for err := range c.Errors() {
729				t.Error(err)
730			}
731		}(consumer)
732
733		wg.Add(1)
734		go func(partition int32, c PartitionConsumer) {
735			for i := 0; i < 10; i++ {
736				message := <-consumer.Messages()
737				if message.Offset != int64(i) {
738					t.Error("Incorrect message offset!", i, partition, message.Offset)
739				}
740				if message.Partition != partition {
741					t.Error("Incorrect message partition!")
742				}
743			}
744			safeClose(t, consumer)
745			wg.Done()
746		}(i, consumer)
747	}
748
749	time.Sleep(50 * time.Millisecond)
750	Logger.Printf("    STAGE 1")
751	// Stage 1:
752	//   * my_topic/0 -> leader0 serves 4 messages
753	//   * my_topic/1 -> leader1 serves 0 messages
754
755	mockFetchResponse := NewMockFetchResponse(t, 1)
756	for i := 0; i < 4; i++ {
757		mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
758	}
759	leader0.SetHandlerByMap(map[string]MockResponse{
760		"FetchRequest": mockFetchResponse,
761	})
762
763	time.Sleep(50 * time.Millisecond)
764	Logger.Printf("    STAGE 2")
765	// Stage 2:
766	//   * leader0 says that it is no longer serving my_topic/0
767	//   * seedBroker tells that leader1 is serving my_topic/0 now
768
769	// seed broker tells that the new partition 0 leader is leader1
770	seedBroker.SetHandlerByMap(map[string]MockResponse{
771		"MetadataRequest": NewMockMetadataResponse(t).
772			SetLeader("my_topic", 0, leader1.BrokerID()).
773			SetLeader("my_topic", 1, leader1.BrokerID()).
774			SetBroker(leader0.Addr(), leader0.BrokerID()).
775			SetBroker(leader1.Addr(), leader1.BrokerID()).
776			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
777	})
778
779	// leader0 says no longer leader of partition 0
780	fetchResponse := new(FetchResponse)
781	fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
782	leader0.SetHandlerByMap(map[string]MockResponse{
783		"FetchRequest": NewMockWrapper(fetchResponse),
784	})
785
786	time.Sleep(50 * time.Millisecond)
787	Logger.Printf("    STAGE 3")
788	// Stage 3:
789	//   * my_topic/0 -> leader1 serves 3 messages
790	//   * my_topic/1 -> leader1 server 8 messages
791
792	// leader1 provides 3 message on partition 0, and 8 messages on partition 1
793	mockFetchResponse2 := NewMockFetchResponse(t, 2)
794	for i := 4; i < 7; i++ {
795		mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
796	}
797	for i := 0; i < 8; i++ {
798		mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
799	}
800	leader1.SetHandlerByMap(map[string]MockResponse{
801		"FetchRequest": mockFetchResponse2,
802	})
803
804	time.Sleep(50 * time.Millisecond)
805	Logger.Printf("    STAGE 4")
806	// Stage 4:
807	//   * my_topic/0 -> leader1 serves 3 messages
808	//   * my_topic/1 -> leader1 tells that it is no longer the leader
809	//   * seedBroker tells that leader0 is a new leader for my_topic/1
810
811	// metadata assigns 0 to leader1 and 1 to leader0
812	seedBroker.SetHandlerByMap(map[string]MockResponse{
813		"MetadataRequest": NewMockMetadataResponse(t).
814			SetLeader("my_topic", 0, leader1.BrokerID()).
815			SetLeader("my_topic", 1, leader0.BrokerID()).
816			SetBroker(leader0.Addr(), leader0.BrokerID()).
817			SetBroker(leader1.Addr(), leader1.BrokerID()).
818			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
819	})
820
821	// leader1 provides three more messages on partition0, says no longer leader of partition1
822	mockFetchResponse3 := NewMockFetchResponse(t, 3).
823		SetMessage("my_topic", 0, int64(7), testMsg).
824		SetMessage("my_topic", 0, int64(8), testMsg).
825		SetMessage("my_topic", 0, int64(9), testMsg)
826	fetchResponse4 := new(FetchResponse)
827	fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
828	leader1.SetHandlerByMap(map[string]MockResponse{
829		"FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
830	})
831
832	// leader0 provides two messages on partition 1
833	mockFetchResponse4 := NewMockFetchResponse(t, 2)
834	for i := 8; i < 10; i++ {
835		mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
836	}
837	leader0.SetHandlerByMap(map[string]MockResponse{
838		"FetchRequest": mockFetchResponse4,
839	})
840
841	wg.Wait()
842	safeClose(t, master)
843	leader1.Close()
844	leader0.Close()
845	seedBroker.Close()
846}
847
848// When two partitions have the same broker as the leader, if one partition
849// consumer channel buffer is full then that does not affect the ability to
850// read messages by the other consumer.
851func TestConsumerInterleavedClose(t *testing.T) {
852	// Given
853	broker0 := NewMockBroker(t, 0)
854	broker0.SetHandlerByMap(map[string]MockResponse{
855		"MetadataRequest": NewMockMetadataResponse(t).
856			SetBroker(broker0.Addr(), broker0.BrokerID()).
857			SetLeader("my_topic", 0, broker0.BrokerID()).
858			SetLeader("my_topic", 1, broker0.BrokerID()),
859		"OffsetRequest": NewMockOffsetResponse(t).
860			SetOffset("my_topic", 0, OffsetOldest, 1000).
861			SetOffset("my_topic", 0, OffsetNewest, 1100).
862			SetOffset("my_topic", 1, OffsetOldest, 2000).
863			SetOffset("my_topic", 1, OffsetNewest, 2100),
864		"FetchRequest": NewMockFetchResponse(t, 1).
865			SetMessage("my_topic", 0, 1000, testMsg).
866			SetMessage("my_topic", 0, 1001, testMsg).
867			SetMessage("my_topic", 0, 1002, testMsg).
868			SetMessage("my_topic", 1, 2000, testMsg),
869	})
870
871	config := NewConfig()
872	config.ChannelBufferSize = 0
873	master, err := NewConsumer([]string{broker0.Addr()}, config)
874	if err != nil {
875		t.Fatal(err)
876	}
877
878	c0, err := master.ConsumePartition("my_topic", 0, 1000)
879	if err != nil {
880		t.Fatal(err)
881	}
882
883	c1, err := master.ConsumePartition("my_topic", 1, 2000)
884	if err != nil {
885		t.Fatal(err)
886	}
887
888	// When/Then: we can read from partition 0 even if nobody reads from partition 1
889	assertMessageOffset(t, <-c0.Messages(), 1000)
890	assertMessageOffset(t, <-c0.Messages(), 1001)
891	assertMessageOffset(t, <-c0.Messages(), 1002)
892
893	safeClose(t, c1)
894	safeClose(t, c0)
895	safeClose(t, master)
896	broker0.Close()
897}
898
899func TestConsumerBounceWithReferenceOpen(t *testing.T) {
900	broker0 := NewMockBroker(t, 0)
901	broker0Addr := broker0.Addr()
902	broker1 := NewMockBroker(t, 1)
903
904	mockMetadataResponse := NewMockMetadataResponse(t).
905		SetBroker(broker0.Addr(), broker0.BrokerID()).
906		SetBroker(broker1.Addr(), broker1.BrokerID()).
907		SetLeader("my_topic", 0, broker0.BrokerID()).
908		SetLeader("my_topic", 1, broker1.BrokerID())
909
910	mockOffsetResponse := NewMockOffsetResponse(t).
911		SetOffset("my_topic", 0, OffsetOldest, 1000).
912		SetOffset("my_topic", 0, OffsetNewest, 1100).
913		SetOffset("my_topic", 1, OffsetOldest, 2000).
914		SetOffset("my_topic", 1, OffsetNewest, 2100)
915
916	mockFetchResponse := NewMockFetchResponse(t, 1)
917	for i := 0; i < 10; i++ {
918		mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
919		mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
920	}
921
922	broker0.SetHandlerByMap(map[string]MockResponse{
923		"OffsetRequest": mockOffsetResponse,
924		"FetchRequest":  mockFetchResponse,
925	})
926	broker1.SetHandlerByMap(map[string]MockResponse{
927		"MetadataRequest": mockMetadataResponse,
928		"OffsetRequest":   mockOffsetResponse,
929		"FetchRequest":    mockFetchResponse,
930	})
931
932	config := NewConfig()
933	config.Consumer.Return.Errors = true
934	config.Consumer.Retry.Backoff = 100 * time.Millisecond
935	config.ChannelBufferSize = 1
936	master, err := NewConsumer([]string{broker1.Addr()}, config)
937	if err != nil {
938		t.Fatal(err)
939	}
940
941	c0, err := master.ConsumePartition("my_topic", 0, 1000)
942	if err != nil {
943		t.Fatal(err)
944	}
945
946	c1, err := master.ConsumePartition("my_topic", 1, 2000)
947	if err != nil {
948		t.Fatal(err)
949	}
950
951	// read messages from both partition to make sure that both brokers operate
952	// normally.
953	assertMessageOffset(t, <-c0.Messages(), 1000)
954	assertMessageOffset(t, <-c1.Messages(), 2000)
955
956	// Simulate broker shutdown. Note that metadata response does not change,
957	// that is the leadership does not move to another broker. So partition
958	// consumer will keep retrying to restore the connection with the broker.
959	broker0.Close()
960
961	// Make sure that while the partition/0 leader is down, consumer/partition/1
962	// is capable of pulling messages from broker1.
963	for i := 1; i < 7; i++ {
964		offset := (<-c1.Messages()).Offset
965		if offset != int64(2000+i) {
966			t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
967		}
968	}
969
970	// Bring broker0 back to service.
971	broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
972	broker0.SetHandlerByMap(map[string]MockResponse{
973		"FetchRequest": mockFetchResponse,
974	})
975
976	// Read the rest of messages from both partitions.
977	for i := 7; i < 10; i++ {
978		assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
979	}
980	for i := 1; i < 10; i++ {
981		assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
982	}
983
984	select {
985	case <-c0.Errors():
986	default:
987		t.Errorf("Partition consumer should have detected broker restart")
988	}
989
990	safeClose(t, c1)
991	safeClose(t, c0)
992	safeClose(t, master)
993	broker0.Close()
994	broker1.Close()
995}
996
997func TestConsumerOffsetOutOfRange(t *testing.T) {
998	// Given
999	broker0 := NewMockBroker(t, 2)
1000	broker0.SetHandlerByMap(map[string]MockResponse{
1001		"MetadataRequest": NewMockMetadataResponse(t).
1002			SetBroker(broker0.Addr(), broker0.BrokerID()).
1003			SetLeader("my_topic", 0, broker0.BrokerID()),
1004		"OffsetRequest": NewMockOffsetResponse(t).
1005			SetOffset("my_topic", 0, OffsetNewest, 1234).
1006			SetOffset("my_topic", 0, OffsetOldest, 2345),
1007	})
1008
1009	master, err := NewConsumer([]string{broker0.Addr()}, nil)
1010	if err != nil {
1011		t.Fatal(err)
1012	}
1013
1014	// When/Then
1015	if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
1016		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
1017	}
1018	if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
1019		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
1020	}
1021	if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
1022		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
1023	}
1024
1025	safeClose(t, master)
1026	broker0.Close()
1027}
1028
1029func TestConsumerExpiryTicker(t *testing.T) {
1030	// Given
1031	broker0 := NewMockBroker(t, 0)
1032	fetchResponse1 := &FetchResponse{}
1033	for i := 1; i <= 8; i++ {
1034		fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
1035	}
1036	broker0.SetHandlerByMap(map[string]MockResponse{
1037		"MetadataRequest": NewMockMetadataResponse(t).
1038			SetBroker(broker0.Addr(), broker0.BrokerID()).
1039			SetLeader("my_topic", 0, broker0.BrokerID()),
1040		"OffsetRequest": NewMockOffsetResponse(t).
1041			SetOffset("my_topic", 0, OffsetNewest, 1234).
1042			SetOffset("my_topic", 0, OffsetOldest, 1),
1043		"FetchRequest": NewMockSequence(fetchResponse1),
1044	})
1045
1046	config := NewConfig()
1047	config.ChannelBufferSize = 0
1048	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
1049	master, err := NewConsumer([]string{broker0.Addr()}, config)
1050	if err != nil {
1051		t.Fatal(err)
1052	}
1053
1054	// When
1055	consumer, err := master.ConsumePartition("my_topic", 0, 1)
1056	if err != nil {
1057		t.Fatal(err)
1058	}
1059
1060	// Then: messages with offsets 1 through 8 are read
1061	for i := 1; i <= 8; i++ {
1062		assertMessageOffset(t, <-consumer.Messages(), int64(i))
1063		time.Sleep(2 * time.Millisecond)
1064	}
1065
1066	safeClose(t, consumer)
1067	safeClose(t, master)
1068	broker0.Close()
1069}
1070
1071func TestConsumerTimestamps(t *testing.T) {
1072	now := time.Now().Truncate(time.Millisecond)
1073	type testMessage struct {
1074		key       Encoder
1075		offset    int64
1076		timestamp time.Time
1077	}
1078	for _, d := range []struct {
1079		kversion          KafkaVersion
1080		logAppendTime     bool
1081		messages          []testMessage
1082		expectedTimestamp []time.Time
1083	}{
1084		{MinVersion, false, []testMessage{
1085			{testMsg, 1, now},
1086			{testMsg, 2, now},
1087		}, []time.Time{{}, {}}},
1088		{V0_9_0_0, false, []testMessage{
1089			{testMsg, 1, now},
1090			{testMsg, 2, now},
1091		}, []time.Time{{}, {}}},
1092		{V0_10_0_0, false, []testMessage{
1093			{testMsg, 1, now},
1094			{testMsg, 2, now},
1095		}, []time.Time{{}, {}}},
1096		{V0_10_2_1, false, []testMessage{
1097			{testMsg, 1, now.Add(time.Second)},
1098			{testMsg, 2, now.Add(2 * time.Second)},
1099		}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1100		{V0_10_2_1, true, []testMessage{
1101			{testMsg, 1, now.Add(time.Second)},
1102			{testMsg, 2, now.Add(2 * time.Second)},
1103		}, []time.Time{now, now}},
1104		{V0_11_0_0, false, []testMessage{
1105			{testMsg, 1, now.Add(time.Second)},
1106			{testMsg, 2, now.Add(2 * time.Second)},
1107		}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1108		{V0_11_0_0, true, []testMessage{
1109			{testMsg, 1, now.Add(time.Second)},
1110			{testMsg, 2, now.Add(2 * time.Second)},
1111		}, []time.Time{now, now}},
1112	} {
1113		var fr *FetchResponse
1114		var offsetResponseVersion int16
1115		cfg := NewConfig()
1116		cfg.Version = d.kversion
1117		switch {
1118		case d.kversion.IsAtLeast(V0_11_0_0):
1119			offsetResponseVersion = 1
1120			fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
1121			for _, m := range d.messages {
1122				fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
1123			}
1124			fr.SetLastOffsetDelta("my_topic", 0, 2)
1125			fr.SetLastStableOffset("my_topic", 0, 2)
1126		case d.kversion.IsAtLeast(V0_10_1_0):
1127			offsetResponseVersion = 1
1128			fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
1129			for _, m := range d.messages {
1130				fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
1131			}
1132		default:
1133			var version int16
1134			switch {
1135			case d.kversion.IsAtLeast(V0_10_0_0):
1136				version = 2
1137			case d.kversion.IsAtLeast(V0_9_0_0):
1138				version = 1
1139			}
1140			fr = &FetchResponse{Version: version}
1141			for _, m := range d.messages {
1142				fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
1143			}
1144		}
1145
1146		broker0 := NewMockBroker(t, 0)
1147		broker0.SetHandlerByMap(map[string]MockResponse{
1148			"MetadataRequest": NewMockMetadataResponse(t).
1149				SetBroker(broker0.Addr(), broker0.BrokerID()).
1150				SetLeader("my_topic", 0, broker0.BrokerID()),
1151			"OffsetRequest": NewMockOffsetResponse(t).
1152				SetVersion(offsetResponseVersion).
1153				SetOffset("my_topic", 0, OffsetNewest, 1234).
1154				SetOffset("my_topic", 0, OffsetOldest, 0),
1155			"FetchRequest": NewMockSequence(fr),
1156		})
1157
1158		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
1159		if err != nil {
1160			t.Fatal(err)
1161		}
1162
1163		consumer, err := master.ConsumePartition("my_topic", 0, 1)
1164		if err != nil {
1165			t.Fatal(err)
1166		}
1167
1168		for i, ts := range d.expectedTimestamp {
1169			select {
1170			case msg := <-consumer.Messages():
1171				assertMessageOffset(t, msg, int64(i)+1)
1172				if msg.Timestamp != ts {
1173					t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
1174						d.kversion, d.logAppendTime, msg.Timestamp, ts)
1175				}
1176			case err := <-consumer.Errors():
1177				t.Fatal(err)
1178			}
1179		}
1180
1181		safeClose(t, consumer)
1182		safeClose(t, master)
1183		broker0.Close()
1184	}
1185}
1186
1187// When set to ReadCommitted, no uncommitted message should be available in messages channel
1188func TestExcludeUncommitted(t *testing.T) {
1189	// Given
1190	broker0 := NewMockBroker(t, 0)
1191
1192	fetchResponse := &FetchResponse{
1193		Version: 4,
1194		Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
1195			AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
1196		}}},
1197	}
1198	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true)   // committed msg
1199	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true)   // uncommitted msg
1200	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true)   // uncommitted msg
1201	fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
1202	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true)   // committed msg
1203
1204	broker0.SetHandlerByMap(map[string]MockResponse{
1205		"MetadataRequest": NewMockMetadataResponse(t).
1206			SetBroker(broker0.Addr(), broker0.BrokerID()).
1207			SetLeader("my_topic", 0, broker0.BrokerID()),
1208		"OffsetRequest": NewMockOffsetResponse(t).
1209			SetVersion(1).
1210			SetOffset("my_topic", 0, OffsetOldest, 0).
1211			SetOffset("my_topic", 0, OffsetNewest, 1237),
1212		"FetchRequest": NewMockWrapper(fetchResponse),
1213	})
1214
1215	cfg := NewConfig()
1216	cfg.Consumer.Return.Errors = true
1217	cfg.Version = V0_11_0_0
1218	cfg.Consumer.IsolationLevel = ReadCommitted
1219
1220	// When
1221	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
1222	if err != nil {
1223		t.Fatal(err)
1224	}
1225
1226	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
1227	if err != nil {
1228		t.Fatal(err)
1229	}
1230
1231	// Then: only the 2 committed messages are returned
1232	select {
1233	case message := <-consumer.Messages():
1234		assertMessageOffset(t, message, int64(1234))
1235	case err := <-consumer.Errors():
1236		t.Error(err)
1237	}
1238	select {
1239	case message := <-consumer.Messages():
1240		assertMessageOffset(t, message, int64(1238))
1241	case err := <-consumer.Errors():
1242		t.Error(err)
1243	}
1244
1245	safeClose(t, consumer)
1246	safeClose(t, master)
1247	broker0.Close()
1248}
1249
1250func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
1251	if msg.Offset != expectedOffset {
1252		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
1253	}
1254}
1255
1256// This example shows how to use the consumer to read messages
1257// from a single partition.
1258func ExampleConsumer() {
1259	consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
1260	if err != nil {
1261		panic(err)
1262	}
1263
1264	defer func() {
1265		if err := consumer.Close(); err != nil {
1266			log.Fatalln(err)
1267		}
1268	}()
1269
1270	partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
1271	if err != nil {
1272		panic(err)
1273	}
1274
1275	defer func() {
1276		if err := partitionConsumer.Close(); err != nil {
1277			log.Fatalln(err)
1278		}
1279	}()
1280
1281	// Trap SIGINT to trigger a shutdown.
1282	signals := make(chan os.Signal, 1)
1283	signal.Notify(signals, os.Interrupt)
1284
1285	consumed := 0
1286ConsumerLoop:
1287	for {
1288		select {
1289		case msg := <-partitionConsumer.Messages():
1290			log.Printf("Consumed message offset %d\n", msg.Offset)
1291			consumed++
1292		case <-signals:
1293			break ConsumerLoop
1294		}
1295	}
1296
1297	log.Printf("Consumed: %d\n", consumed)
1298}
1299
1300func Test_partitionConsumer_parseResponse(t *testing.T) {
1301	type args struct {
1302		response *FetchResponse
1303	}
1304	tests := []struct {
1305		name    string
1306		args    args
1307		want    []*ConsumerMessage
1308		wantErr bool
1309	}{
1310		{
1311			name: "empty but throttled FetchResponse is not considered an error",
1312			args: args{
1313				response: &FetchResponse{
1314					ThrottleTime: time.Millisecond,
1315				},
1316			},
1317		},
1318		{
1319			name: "empty FetchResponse is considered an incomplete response by default",
1320			args: args{
1321				response: &FetchResponse{},
1322			},
1323			wantErr: true,
1324		},
1325	}
1326	for _, tt := range tests {
1327		t.Run(tt.name, func(t *testing.T) {
1328			child := &partitionConsumer{
1329				broker: &brokerConsumer{
1330					broker: &Broker{},
1331				},
1332				conf: &Config{},
1333			}
1334			got, err := child.parseResponse(tt.args.response)
1335			if (err != nil) != tt.wantErr {
1336				t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
1337				return
1338			}
1339			if !reflect.DeepEqual(got, tt.want) {
1340				t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
1341			}
1342		})
1343	}
1344}
1345