1package sarama
2
3import (
4	"log"
5	"os"
6	"os/signal"
7	"sync"
8	"testing"
9	"time"
10)
11
12var testMsg = StringEncoder("Foo")
13
14// If a particular offset is provided then messages are consumed starting from
15// that offset.
16func TestConsumerOffsetManual(t *testing.T) {
17	// Given
18	broker0 := NewMockBroker(t, 0)
19
20	mockFetchResponse := NewMockFetchResponse(t, 1)
21	for i := 0; i < 10; i++ {
22		mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
23	}
24
25	broker0.SetHandlerByMap(map[string]MockResponse{
26		"MetadataRequest": NewMockMetadataResponse(t).
27			SetBroker(broker0.Addr(), broker0.BrokerID()).
28			SetLeader("my_topic", 0, broker0.BrokerID()),
29		"OffsetRequest": NewMockOffsetResponse(t).
30			SetOffset("my_topic", 0, OffsetOldest, 0).
31			SetOffset("my_topic", 0, OffsetNewest, 2345),
32		"FetchRequest": mockFetchResponse,
33	})
34
35	// When
36	master, err := NewConsumer([]string{broker0.Addr()}, nil)
37	if err != nil {
38		t.Fatal(err)
39	}
40
41	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
42	if err != nil {
43		t.Fatal(err)
44	}
45
46	// Then: messages starting from offset 1234 are consumed.
47	for i := 0; i < 10; i++ {
48		select {
49		case message := <-consumer.Messages():
50			assertMessageOffset(t, message, int64(i+1234))
51		case err := <-consumer.Errors():
52			t.Error(err)
53		}
54	}
55
56	safeClose(t, consumer)
57	safeClose(t, master)
58	broker0.Close()
59}
60
61// If `OffsetNewest` is passed as the initial offset then the first consumed
62// message is indeed corresponds to the offset that broker claims to be the
63// newest in its metadata response.
64func TestConsumerOffsetNewest(t *testing.T) {
65	// Given
66	broker0 := NewMockBroker(t, 0)
67	broker0.SetHandlerByMap(map[string]MockResponse{
68		"MetadataRequest": NewMockMetadataResponse(t).
69			SetBroker(broker0.Addr(), broker0.BrokerID()).
70			SetLeader("my_topic", 0, broker0.BrokerID()),
71		"OffsetRequest": NewMockOffsetResponse(t).
72			SetOffset("my_topic", 0, OffsetNewest, 10).
73			SetOffset("my_topic", 0, OffsetOldest, 7),
74		"FetchRequest": NewMockFetchResponse(t, 1).
75			SetMessage("my_topic", 0, 9, testMsg).
76			SetMessage("my_topic", 0, 10, testMsg).
77			SetMessage("my_topic", 0, 11, testMsg).
78			SetHighWaterMark("my_topic", 0, 14),
79	})
80
81	master, err := NewConsumer([]string{broker0.Addr()}, nil)
82	if err != nil {
83		t.Fatal(err)
84	}
85
86	// When
87	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
88	if err != nil {
89		t.Fatal(err)
90	}
91
92	// Then
93	assertMessageOffset(t, <-consumer.Messages(), 10)
94	if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
95		t.Errorf("Expected high water mark offset 14, found %d", hwmo)
96	}
97
98	safeClose(t, consumer)
99	safeClose(t, master)
100	broker0.Close()
101}
102
103// It is possible to close a partition consumer and create the same anew.
104func TestConsumerRecreate(t *testing.T) {
105	// Given
106	broker0 := NewMockBroker(t, 0)
107	broker0.SetHandlerByMap(map[string]MockResponse{
108		"MetadataRequest": NewMockMetadataResponse(t).
109			SetBroker(broker0.Addr(), broker0.BrokerID()).
110			SetLeader("my_topic", 0, broker0.BrokerID()),
111		"OffsetRequest": NewMockOffsetResponse(t).
112			SetOffset("my_topic", 0, OffsetOldest, 0).
113			SetOffset("my_topic", 0, OffsetNewest, 1000),
114		"FetchRequest": NewMockFetchResponse(t, 1).
115			SetMessage("my_topic", 0, 10, testMsg),
116	})
117
118	c, err := NewConsumer([]string{broker0.Addr()}, nil)
119	if err != nil {
120		t.Fatal(err)
121	}
122
123	pc, err := c.ConsumePartition("my_topic", 0, 10)
124	if err != nil {
125		t.Fatal(err)
126	}
127	assertMessageOffset(t, <-pc.Messages(), 10)
128
129	// When
130	safeClose(t, pc)
131	pc, err = c.ConsumePartition("my_topic", 0, 10)
132	if err != nil {
133		t.Fatal(err)
134	}
135
136	// Then
137	assertMessageOffset(t, <-pc.Messages(), 10)
138
139	safeClose(t, pc)
140	safeClose(t, c)
141	broker0.Close()
142}
143
144// An attempt to consume the same partition twice should fail.
145func TestConsumerDuplicate(t *testing.T) {
146	// Given
147	broker0 := NewMockBroker(t, 0)
148	broker0.SetHandlerByMap(map[string]MockResponse{
149		"MetadataRequest": NewMockMetadataResponse(t).
150			SetBroker(broker0.Addr(), broker0.BrokerID()).
151			SetLeader("my_topic", 0, broker0.BrokerID()),
152		"OffsetRequest": NewMockOffsetResponse(t).
153			SetOffset("my_topic", 0, OffsetOldest, 0).
154			SetOffset("my_topic", 0, OffsetNewest, 1000),
155		"FetchRequest": NewMockFetchResponse(t, 1),
156	})
157
158	config := NewConfig()
159	config.ChannelBufferSize = 0
160	c, err := NewConsumer([]string{broker0.Addr()}, config)
161	if err != nil {
162		t.Fatal(err)
163	}
164
165	pc1, err := c.ConsumePartition("my_topic", 0, 0)
166	if err != nil {
167		t.Fatal(err)
168	}
169
170	// When
171	pc2, err := c.ConsumePartition("my_topic", 0, 0)
172
173	// Then
174	if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
175		t.Fatal("A partition cannot be consumed twice at the same time")
176	}
177
178	safeClose(t, pc1)
179	safeClose(t, c)
180	broker0.Close()
181}
182
183// If consumer fails to refresh metadata it keeps retrying with frequency
184// specified by `Config.Consumer.Retry.Backoff`.
185func TestConsumerLeaderRefreshError(t *testing.T) {
186	// Given
187	broker0 := NewMockBroker(t, 100)
188
189	// Stage 1: my_topic/0 served by broker0
190	Logger.Printf("    STAGE 1")
191
192	broker0.SetHandlerByMap(map[string]MockResponse{
193		"MetadataRequest": NewMockMetadataResponse(t).
194			SetBroker(broker0.Addr(), broker0.BrokerID()).
195			SetLeader("my_topic", 0, broker0.BrokerID()),
196		"OffsetRequest": NewMockOffsetResponse(t).
197			SetOffset("my_topic", 0, OffsetOldest, 123).
198			SetOffset("my_topic", 0, OffsetNewest, 1000),
199		"FetchRequest": NewMockFetchResponse(t, 1).
200			SetMessage("my_topic", 0, 123, testMsg),
201	})
202
203	config := NewConfig()
204	config.Net.ReadTimeout = 100 * time.Millisecond
205	config.Consumer.Retry.Backoff = 200 * time.Millisecond
206	config.Consumer.Return.Errors = true
207	config.Metadata.Retry.Max = 0
208	c, err := NewConsumer([]string{broker0.Addr()}, config)
209	if err != nil {
210		t.Fatal(err)
211	}
212
213	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
214	if err != nil {
215		t.Fatal(err)
216	}
217
218	assertMessageOffset(t, <-pc.Messages(), 123)
219
220	// Stage 2: broker0 says that it is no longer the leader for my_topic/0,
221	// but the requests to retrieve metadata fail with network timeout.
222	Logger.Printf("    STAGE 2")
223
224	fetchResponse2 := &FetchResponse{}
225	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
226
227	broker0.SetHandlerByMap(map[string]MockResponse{
228		"FetchRequest": NewMockWrapper(fetchResponse2),
229	})
230
231	if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
232		t.Errorf("Unexpected error: %v", consErr.Err)
233	}
234
235	// Stage 3: finally the metadata returned by broker0 tells that broker1 is
236	// a new leader for my_topic/0. Consumption resumes.
237
238	Logger.Printf("    STAGE 3")
239
240	broker1 := NewMockBroker(t, 101)
241
242	broker1.SetHandlerByMap(map[string]MockResponse{
243		"FetchRequest": NewMockFetchResponse(t, 1).
244			SetMessage("my_topic", 0, 124, testMsg),
245	})
246	broker0.SetHandlerByMap(map[string]MockResponse{
247		"MetadataRequest": NewMockMetadataResponse(t).
248			SetBroker(broker0.Addr(), broker0.BrokerID()).
249			SetBroker(broker1.Addr(), broker1.BrokerID()).
250			SetLeader("my_topic", 0, broker1.BrokerID()),
251	})
252
253	assertMessageOffset(t, <-pc.Messages(), 124)
254
255	safeClose(t, pc)
256	safeClose(t, c)
257	broker1.Close()
258	broker0.Close()
259}
260
261func TestConsumerInvalidTopic(t *testing.T) {
262	// Given
263	broker0 := NewMockBroker(t, 100)
264	broker0.SetHandlerByMap(map[string]MockResponse{
265		"MetadataRequest": NewMockMetadataResponse(t).
266			SetBroker(broker0.Addr(), broker0.BrokerID()),
267	})
268
269	c, err := NewConsumer([]string{broker0.Addr()}, nil)
270	if err != nil {
271		t.Fatal(err)
272	}
273
274	// When
275	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
276
277	// Then
278	if pc != nil || err != ErrUnknownTopicOrPartition {
279		t.Errorf("Should fail with, err=%v", err)
280	}
281
282	safeClose(t, c)
283	broker0.Close()
284}
285
286// Nothing bad happens if a partition consumer that has no leader assigned at
287// the moment is closed.
288func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
289	// Given
290	broker0 := NewMockBroker(t, 100)
291	broker0.SetHandlerByMap(map[string]MockResponse{
292		"MetadataRequest": NewMockMetadataResponse(t).
293			SetBroker(broker0.Addr(), broker0.BrokerID()).
294			SetLeader("my_topic", 0, broker0.BrokerID()),
295		"OffsetRequest": NewMockOffsetResponse(t).
296			SetOffset("my_topic", 0, OffsetOldest, 123).
297			SetOffset("my_topic", 0, OffsetNewest, 1000),
298		"FetchRequest": NewMockFetchResponse(t, 1).
299			SetMessage("my_topic", 0, 123, testMsg),
300	})
301
302	config := NewConfig()
303	config.Net.ReadTimeout = 100 * time.Millisecond
304	config.Consumer.Retry.Backoff = 100 * time.Millisecond
305	config.Consumer.Return.Errors = true
306	config.Metadata.Retry.Max = 0
307	c, err := NewConsumer([]string{broker0.Addr()}, config)
308	if err != nil {
309		t.Fatal(err)
310	}
311
312	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
313	if err != nil {
314		t.Fatal(err)
315	}
316
317	assertMessageOffset(t, <-pc.Messages(), 123)
318
319	// broker0 says that it is no longer the leader for my_topic/0, but the
320	// requests to retrieve metadata fail with network timeout.
321	fetchResponse2 := &FetchResponse{}
322	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
323
324	broker0.SetHandlerByMap(map[string]MockResponse{
325		"FetchRequest": NewMockWrapper(fetchResponse2),
326	})
327
328	// When
329	if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
330		t.Errorf("Unexpected error: %v", consErr.Err)
331	}
332
333	// Then: the partition consumer can be closed without any problem.
334	safeClose(t, pc)
335	safeClose(t, c)
336	broker0.Close()
337}
338
339// If the initial offset passed on partition consumer creation is out of the
340// actual offset range for the partition, then the partition consumer stops
341// immediately closing its output channels.
342func TestConsumerShutsDownOutOfRange(t *testing.T) {
343	// Given
344	broker0 := NewMockBroker(t, 0)
345	fetchResponse := new(FetchResponse)
346	fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
347	broker0.SetHandlerByMap(map[string]MockResponse{
348		"MetadataRequest": NewMockMetadataResponse(t).
349			SetBroker(broker0.Addr(), broker0.BrokerID()).
350			SetLeader("my_topic", 0, broker0.BrokerID()),
351		"OffsetRequest": NewMockOffsetResponse(t).
352			SetOffset("my_topic", 0, OffsetNewest, 1234).
353			SetOffset("my_topic", 0, OffsetOldest, 7),
354		"FetchRequest": NewMockWrapper(fetchResponse),
355	})
356
357	master, err := NewConsumer([]string{broker0.Addr()}, nil)
358	if err != nil {
359		t.Fatal(err)
360	}
361
362	// When
363	consumer, err := master.ConsumePartition("my_topic", 0, 101)
364	if err != nil {
365		t.Fatal(err)
366	}
367
368	// Then: consumer should shut down closing its messages and errors channels.
369	if _, ok := <-consumer.Messages(); ok {
370		t.Error("Expected the consumer to shut down")
371	}
372	safeClose(t, consumer)
373
374	safeClose(t, master)
375	broker0.Close()
376}
377
378// If a fetch response contains messages with offsets that are smaller then
379// requested, then such messages are ignored.
380func TestConsumerExtraOffsets(t *testing.T) {
381	// Given
382	legacyFetchResponse := &FetchResponse{}
383	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
384	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
385	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
386	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
387	newFetchResponse := &FetchResponse{Version: 4}
388	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
389	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
390	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
391	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
392	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
393	newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
394	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
395		var offsetResponseVersion int16
396		cfg := NewConfig()
397		cfg.Consumer.Return.Errors = true
398		if fetchResponse1.Version >= 4 {
399			cfg.Version = V0_11_0_0
400			offsetResponseVersion = 1
401		}
402
403		broker0 := NewMockBroker(t, 0)
404		fetchResponse2 := &FetchResponse{}
405		fetchResponse2.Version = fetchResponse1.Version
406		fetchResponse2.AddError("my_topic", 0, ErrNoError)
407		broker0.SetHandlerByMap(map[string]MockResponse{
408			"MetadataRequest": NewMockMetadataResponse(t).
409				SetBroker(broker0.Addr(), broker0.BrokerID()).
410				SetLeader("my_topic", 0, broker0.BrokerID()),
411			"OffsetRequest": NewMockOffsetResponse(t).
412				SetVersion(offsetResponseVersion).
413				SetOffset("my_topic", 0, OffsetNewest, 1234).
414				SetOffset("my_topic", 0, OffsetOldest, 0),
415			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
416		})
417
418		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
419		if err != nil {
420			t.Fatal(err)
421		}
422
423		// When
424		consumer, err := master.ConsumePartition("my_topic", 0, 3)
425		if err != nil {
426			t.Fatal(err)
427		}
428
429		// Then: messages with offsets 1 and 2 are not returned even though they
430		// are present in the response.
431		select {
432		case msg := <-consumer.Messages():
433			assertMessageOffset(t, msg, 3)
434		case err := <-consumer.Errors():
435			t.Fatal(err)
436		}
437
438		select {
439		case msg := <-consumer.Messages():
440			assertMessageOffset(t, msg, 4)
441		case err := <-consumer.Errors():
442			t.Fatal(err)
443		}
444
445		safeClose(t, consumer)
446		safeClose(t, master)
447		broker0.Close()
448	}
449}
450
451// In some situations broker may return a block containing only
452// messages older then requested, even though there would be
453// more messages if higher offset was requested.
454func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
455	// Given
456	fetchResponse1 := &FetchResponse{Version: 4}
457	fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
458
459	fetchResponse2 := &FetchResponse{Version: 4}
460	fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
461
462	cfg := NewConfig()
463	cfg.Consumer.Return.Errors = true
464	cfg.Version = V1_1_0_0
465
466	broker0 := NewMockBroker(t, 0)
467
468	broker0.SetHandlerByMap(map[string]MockResponse{
469		"MetadataRequest": NewMockMetadataResponse(t).
470			SetBroker(broker0.Addr(), broker0.BrokerID()).
471			SetLeader("my_topic", 0, broker0.BrokerID()),
472		"OffsetRequest": NewMockOffsetResponse(t).
473			SetVersion(1).
474			SetOffset("my_topic", 0, OffsetNewest, 1234).
475			SetOffset("my_topic", 0, OffsetOldest, 0),
476		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
477	})
478
479	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
480	if err != nil {
481		t.Fatal(err)
482	}
483
484	// When
485	consumer, err := master.ConsumePartition("my_topic", 0, 2)
486	if err != nil {
487		t.Fatal(err)
488	}
489
490	select {
491	case msg := <-consumer.Messages():
492		assertMessageOffset(t, msg, 1000000)
493	case err := <-consumer.Errors():
494		t.Fatal(err)
495	}
496
497	safeClose(t, consumer)
498	safeClose(t, master)
499	broker0.Close()
500}
501
502func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
503	// Given
504	fetchResponse1 := &FetchResponse{Version: 4}
505	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
506	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
507
508	cfg := NewConfig()
509	cfg.Version = V0_11_0_0
510
511	broker0 := NewMockBroker(t, 0)
512	fetchResponse2 := &FetchResponse{}
513	fetchResponse2.Version = 4
514	fetchResponse2.AddError("my_topic", 0, ErrNoError)
515	broker0.SetHandlerByMap(map[string]MockResponse{
516		"MetadataRequest": NewMockMetadataResponse(t).
517			SetBroker(broker0.Addr(), broker0.BrokerID()).
518			SetLeader("my_topic", 0, broker0.BrokerID()),
519		"OffsetRequest": NewMockOffsetResponse(t).
520			SetVersion(1).
521			SetOffset("my_topic", 0, OffsetNewest, 1234).
522			SetOffset("my_topic", 0, OffsetOldest, 0),
523		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
524	})
525
526	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
527	if err != nil {
528		t.Fatal(err)
529	}
530
531	// When
532	consumer, err := master.ConsumePartition("my_topic", 0, 1)
533	if err != nil {
534		t.Fatal(err)
535	}
536
537	assertMessageOffset(t, <-consumer.Messages(), 1)
538	assertMessageOffset(t, <-consumer.Messages(), 2)
539
540	safeClose(t, consumer)
541	safeClose(t, master)
542	broker0.Close()
543}
544
545// It is fine if offsets of fetched messages are not sequential (although
546// strictly increasing!).
547func TestConsumerNonSequentialOffsets(t *testing.T) {
548	// Given
549	legacyFetchResponse := &FetchResponse{}
550	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
551	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
552	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
553	newFetchResponse := &FetchResponse{Version: 4}
554	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
555	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
556	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
557	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
558	newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
559	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
560		var offsetResponseVersion int16
561		cfg := NewConfig()
562		if fetchResponse1.Version >= 4 {
563			cfg.Version = V0_11_0_0
564			offsetResponseVersion = 1
565		}
566
567		broker0 := NewMockBroker(t, 0)
568		fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
569		fetchResponse2.AddError("my_topic", 0, ErrNoError)
570		broker0.SetHandlerByMap(map[string]MockResponse{
571			"MetadataRequest": NewMockMetadataResponse(t).
572				SetBroker(broker0.Addr(), broker0.BrokerID()).
573				SetLeader("my_topic", 0, broker0.BrokerID()),
574			"OffsetRequest": NewMockOffsetResponse(t).
575				SetVersion(offsetResponseVersion).
576				SetOffset("my_topic", 0, OffsetNewest, 1234).
577				SetOffset("my_topic", 0, OffsetOldest, 0),
578			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
579		})
580
581		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
582		if err != nil {
583			t.Fatal(err)
584		}
585
586		// When
587		consumer, err := master.ConsumePartition("my_topic", 0, 3)
588		if err != nil {
589			t.Fatal(err)
590		}
591
592		// Then: messages with offsets 1 and 2 are not returned even though they
593		// are present in the response.
594		assertMessageOffset(t, <-consumer.Messages(), 5)
595		assertMessageOffset(t, <-consumer.Messages(), 7)
596		assertMessageOffset(t, <-consumer.Messages(), 11)
597
598		safeClose(t, consumer)
599		safeClose(t, master)
600		broker0.Close()
601	}
602}
603
604// If leadership for a partition is changing then consumer resolves the new
605// leader and switches to it.
606func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
607	// initial setup
608	seedBroker := NewMockBroker(t, 10)
609	leader0 := NewMockBroker(t, 0)
610	leader1 := NewMockBroker(t, 1)
611
612	seedBroker.SetHandlerByMap(map[string]MockResponse{
613		"MetadataRequest": NewMockMetadataResponse(t).
614			SetBroker(leader0.Addr(), leader0.BrokerID()).
615			SetBroker(leader1.Addr(), leader1.BrokerID()).
616			SetLeader("my_topic", 0, leader0.BrokerID()).
617			SetLeader("my_topic", 1, leader1.BrokerID()),
618	})
619
620	mockOffsetResponse1 := NewMockOffsetResponse(t).
621		SetOffset("my_topic", 0, OffsetOldest, 0).
622		SetOffset("my_topic", 0, OffsetNewest, 1000).
623		SetOffset("my_topic", 1, OffsetOldest, 0).
624		SetOffset("my_topic", 1, OffsetNewest, 1000)
625	leader0.SetHandlerByMap(map[string]MockResponse{
626		"OffsetRequest": mockOffsetResponse1,
627		"FetchRequest":  NewMockFetchResponse(t, 1),
628	})
629	leader1.SetHandlerByMap(map[string]MockResponse{
630		"OffsetRequest": mockOffsetResponse1,
631		"FetchRequest":  NewMockFetchResponse(t, 1),
632	})
633
634	// launch test goroutines
635	config := NewConfig()
636	config.Consumer.Retry.Backoff = 50
637	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
638	if err != nil {
639		t.Fatal(err)
640	}
641
642	// we expect to end up (eventually) consuming exactly ten messages on each partition
643	var wg sync.WaitGroup
644	for i := int32(0); i < 2; i++ {
645		consumer, err := master.ConsumePartition("my_topic", i, 0)
646		if err != nil {
647			t.Error(err)
648		}
649
650		go func(c PartitionConsumer) {
651			for err := range c.Errors() {
652				t.Error(err)
653			}
654		}(consumer)
655
656		wg.Add(1)
657		go func(partition int32, c PartitionConsumer) {
658			for i := 0; i < 10; i++ {
659				message := <-consumer.Messages()
660				if message.Offset != int64(i) {
661					t.Error("Incorrect message offset!", i, partition, message.Offset)
662				}
663				if message.Partition != partition {
664					t.Error("Incorrect message partition!")
665				}
666			}
667			safeClose(t, consumer)
668			wg.Done()
669		}(i, consumer)
670	}
671
672	time.Sleep(50 * time.Millisecond)
673	Logger.Printf("    STAGE 1")
674	// Stage 1:
675	//   * my_topic/0 -> leader0 serves 4 messages
676	//   * my_topic/1 -> leader1 serves 0 messages
677
678	mockFetchResponse := NewMockFetchResponse(t, 1)
679	for i := 0; i < 4; i++ {
680		mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
681	}
682	leader0.SetHandlerByMap(map[string]MockResponse{
683		"FetchRequest": mockFetchResponse,
684	})
685
686	time.Sleep(50 * time.Millisecond)
687	Logger.Printf("    STAGE 2")
688	// Stage 2:
689	//   * leader0 says that it is no longer serving my_topic/0
690	//   * seedBroker tells that leader1 is serving my_topic/0 now
691
692	// seed broker tells that the new partition 0 leader is leader1
693	seedBroker.SetHandlerByMap(map[string]MockResponse{
694		"MetadataRequest": NewMockMetadataResponse(t).
695			SetLeader("my_topic", 0, leader1.BrokerID()).
696			SetLeader("my_topic", 1, leader1.BrokerID()),
697	})
698
699	// leader0 says no longer leader of partition 0
700	fetchResponse := new(FetchResponse)
701	fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
702	leader0.SetHandlerByMap(map[string]MockResponse{
703		"FetchRequest": NewMockWrapper(fetchResponse),
704	})
705
706	time.Sleep(50 * time.Millisecond)
707	Logger.Printf("    STAGE 3")
708	// Stage 3:
709	//   * my_topic/0 -> leader1 serves 3 messages
710	//   * my_topic/1 -> leader1 server 8 messages
711
712	// leader1 provides 3 message on partition 0, and 8 messages on partition 1
713	mockFetchResponse2 := NewMockFetchResponse(t, 2)
714	for i := 4; i < 7; i++ {
715		mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
716	}
717	for i := 0; i < 8; i++ {
718		mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
719	}
720	leader1.SetHandlerByMap(map[string]MockResponse{
721		"FetchRequest": mockFetchResponse2,
722	})
723
724	time.Sleep(50 * time.Millisecond)
725	Logger.Printf("    STAGE 4")
726	// Stage 4:
727	//   * my_topic/0 -> leader1 serves 3 messages
728	//   * my_topic/1 -> leader1 tells that it is no longer the leader
729	//   * seedBroker tells that leader0 is a new leader for my_topic/1
730
731	// metadata assigns 0 to leader1 and 1 to leader0
732	seedBroker.SetHandlerByMap(map[string]MockResponse{
733		"MetadataRequest": NewMockMetadataResponse(t).
734			SetLeader("my_topic", 0, leader1.BrokerID()).
735			SetLeader("my_topic", 1, leader0.BrokerID()),
736	})
737
738	// leader1 provides three more messages on partition0, says no longer leader of partition1
739	mockFetchResponse3 := NewMockFetchResponse(t, 3).
740		SetMessage("my_topic", 0, int64(7), testMsg).
741		SetMessage("my_topic", 0, int64(8), testMsg).
742		SetMessage("my_topic", 0, int64(9), testMsg)
743	fetchResponse4 := new(FetchResponse)
744	fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
745	leader1.SetHandlerByMap(map[string]MockResponse{
746		"FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
747	})
748
749	// leader0 provides two messages on partition 1
750	mockFetchResponse4 := NewMockFetchResponse(t, 2)
751	for i := 8; i < 10; i++ {
752		mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
753	}
754	leader0.SetHandlerByMap(map[string]MockResponse{
755		"FetchRequest": mockFetchResponse4,
756	})
757
758	wg.Wait()
759	safeClose(t, master)
760	leader1.Close()
761	leader0.Close()
762	seedBroker.Close()
763}
764
765// When two partitions have the same broker as the leader, if one partition
766// consumer channel buffer is full then that does not affect the ability to
767// read messages by the other consumer.
768func TestConsumerInterleavedClose(t *testing.T) {
769	// Given
770	broker0 := NewMockBroker(t, 0)
771	broker0.SetHandlerByMap(map[string]MockResponse{
772		"MetadataRequest": NewMockMetadataResponse(t).
773			SetBroker(broker0.Addr(), broker0.BrokerID()).
774			SetLeader("my_topic", 0, broker0.BrokerID()).
775			SetLeader("my_topic", 1, broker0.BrokerID()),
776		"OffsetRequest": NewMockOffsetResponse(t).
777			SetOffset("my_topic", 0, OffsetOldest, 1000).
778			SetOffset("my_topic", 0, OffsetNewest, 1100).
779			SetOffset("my_topic", 1, OffsetOldest, 2000).
780			SetOffset("my_topic", 1, OffsetNewest, 2100),
781		"FetchRequest": NewMockFetchResponse(t, 1).
782			SetMessage("my_topic", 0, 1000, testMsg).
783			SetMessage("my_topic", 0, 1001, testMsg).
784			SetMessage("my_topic", 0, 1002, testMsg).
785			SetMessage("my_topic", 1, 2000, testMsg),
786	})
787
788	config := NewConfig()
789	config.ChannelBufferSize = 0
790	master, err := NewConsumer([]string{broker0.Addr()}, config)
791	if err != nil {
792		t.Fatal(err)
793	}
794
795	c0, err := master.ConsumePartition("my_topic", 0, 1000)
796	if err != nil {
797		t.Fatal(err)
798	}
799
800	c1, err := master.ConsumePartition("my_topic", 1, 2000)
801	if err != nil {
802		t.Fatal(err)
803	}
804
805	// When/Then: we can read from partition 0 even if nobody reads from partition 1
806	assertMessageOffset(t, <-c0.Messages(), 1000)
807	assertMessageOffset(t, <-c0.Messages(), 1001)
808	assertMessageOffset(t, <-c0.Messages(), 1002)
809
810	safeClose(t, c1)
811	safeClose(t, c0)
812	safeClose(t, master)
813	broker0.Close()
814}
815
816func TestConsumerBounceWithReferenceOpen(t *testing.T) {
817	broker0 := NewMockBroker(t, 0)
818	broker0Addr := broker0.Addr()
819	broker1 := NewMockBroker(t, 1)
820
821	mockMetadataResponse := NewMockMetadataResponse(t).
822		SetBroker(broker0.Addr(), broker0.BrokerID()).
823		SetBroker(broker1.Addr(), broker1.BrokerID()).
824		SetLeader("my_topic", 0, broker0.BrokerID()).
825		SetLeader("my_topic", 1, broker1.BrokerID())
826
827	mockOffsetResponse := NewMockOffsetResponse(t).
828		SetOffset("my_topic", 0, OffsetOldest, 1000).
829		SetOffset("my_topic", 0, OffsetNewest, 1100).
830		SetOffset("my_topic", 1, OffsetOldest, 2000).
831		SetOffset("my_topic", 1, OffsetNewest, 2100)
832
833	mockFetchResponse := NewMockFetchResponse(t, 1)
834	for i := 0; i < 10; i++ {
835		mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
836		mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
837	}
838
839	broker0.SetHandlerByMap(map[string]MockResponse{
840		"OffsetRequest": mockOffsetResponse,
841		"FetchRequest":  mockFetchResponse,
842	})
843	broker1.SetHandlerByMap(map[string]MockResponse{
844		"MetadataRequest": mockMetadataResponse,
845		"OffsetRequest":   mockOffsetResponse,
846		"FetchRequest":    mockFetchResponse,
847	})
848
849	config := NewConfig()
850	config.Consumer.Return.Errors = true
851	config.Consumer.Retry.Backoff = 100 * time.Millisecond
852	config.ChannelBufferSize = 1
853	master, err := NewConsumer([]string{broker1.Addr()}, config)
854	if err != nil {
855		t.Fatal(err)
856	}
857
858	c0, err := master.ConsumePartition("my_topic", 0, 1000)
859	if err != nil {
860		t.Fatal(err)
861	}
862
863	c1, err := master.ConsumePartition("my_topic", 1, 2000)
864	if err != nil {
865		t.Fatal(err)
866	}
867
868	// read messages from both partition to make sure that both brokers operate
869	// normally.
870	assertMessageOffset(t, <-c0.Messages(), 1000)
871	assertMessageOffset(t, <-c1.Messages(), 2000)
872
873	// Simulate broker shutdown. Note that metadata response does not change,
874	// that is the leadership does not move to another broker. So partition
875	// consumer will keep retrying to restore the connection with the broker.
876	broker0.Close()
877
878	// Make sure that while the partition/0 leader is down, consumer/partition/1
879	// is capable of pulling messages from broker1.
880	for i := 1; i < 7; i++ {
881		offset := (<-c1.Messages()).Offset
882		if offset != int64(2000+i) {
883			t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
884		}
885	}
886
887	// Bring broker0 back to service.
888	broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
889	broker0.SetHandlerByMap(map[string]MockResponse{
890		"FetchRequest": mockFetchResponse,
891	})
892
893	// Read the rest of messages from both partitions.
894	for i := 7; i < 10; i++ {
895		assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
896	}
897	for i := 1; i < 10; i++ {
898		assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
899	}
900
901	select {
902	case <-c0.Errors():
903	default:
904		t.Errorf("Partition consumer should have detected broker restart")
905	}
906
907	safeClose(t, c1)
908	safeClose(t, c0)
909	safeClose(t, master)
910	broker0.Close()
911	broker1.Close()
912}
913
914func TestConsumerOffsetOutOfRange(t *testing.T) {
915	// Given
916	broker0 := NewMockBroker(t, 2)
917	broker0.SetHandlerByMap(map[string]MockResponse{
918		"MetadataRequest": NewMockMetadataResponse(t).
919			SetBroker(broker0.Addr(), broker0.BrokerID()).
920			SetLeader("my_topic", 0, broker0.BrokerID()),
921		"OffsetRequest": NewMockOffsetResponse(t).
922			SetOffset("my_topic", 0, OffsetNewest, 1234).
923			SetOffset("my_topic", 0, OffsetOldest, 2345),
924	})
925
926	master, err := NewConsumer([]string{broker0.Addr()}, nil)
927	if err != nil {
928		t.Fatal(err)
929	}
930
931	// When/Then
932	if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
933		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
934	}
935	if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
936		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
937	}
938	if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
939		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
940	}
941
942	safeClose(t, master)
943	broker0.Close()
944}
945
946func TestConsumerExpiryTicker(t *testing.T) {
947	// Given
948	broker0 := NewMockBroker(t, 0)
949	fetchResponse1 := &FetchResponse{}
950	for i := 1; i <= 8; i++ {
951		fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
952	}
953	broker0.SetHandlerByMap(map[string]MockResponse{
954		"MetadataRequest": NewMockMetadataResponse(t).
955			SetBroker(broker0.Addr(), broker0.BrokerID()).
956			SetLeader("my_topic", 0, broker0.BrokerID()),
957		"OffsetRequest": NewMockOffsetResponse(t).
958			SetOffset("my_topic", 0, OffsetNewest, 1234).
959			SetOffset("my_topic", 0, OffsetOldest, 1),
960		"FetchRequest": NewMockSequence(fetchResponse1),
961	})
962
963	config := NewConfig()
964	config.ChannelBufferSize = 0
965	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
966	master, err := NewConsumer([]string{broker0.Addr()}, config)
967	if err != nil {
968		t.Fatal(err)
969	}
970
971	// When
972	consumer, err := master.ConsumePartition("my_topic", 0, 1)
973	if err != nil {
974		t.Fatal(err)
975	}
976
977	// Then: messages with offsets 1 through 8 are read
978	for i := 1; i <= 8; i++ {
979		assertMessageOffset(t, <-consumer.Messages(), int64(i))
980		time.Sleep(2 * time.Millisecond)
981	}
982
983	safeClose(t, consumer)
984	safeClose(t, master)
985	broker0.Close()
986}
987
988func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
989	if msg.Offset != expectedOffset {
990		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
991	}
992}
993
994// This example shows how to use the consumer to read messages
995// from a single partition.
996func ExampleConsumer() {
997	consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
998	if err != nil {
999		panic(err)
1000	}
1001
1002	defer func() {
1003		if err := consumer.Close(); err != nil {
1004			log.Fatalln(err)
1005		}
1006	}()
1007
1008	partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
1009	if err != nil {
1010		panic(err)
1011	}
1012
1013	defer func() {
1014		if err := partitionConsumer.Close(); err != nil {
1015			log.Fatalln(err)
1016		}
1017	}()
1018
1019	// Trap SIGINT to trigger a shutdown.
1020	signals := make(chan os.Signal, 1)
1021	signal.Notify(signals, os.Interrupt)
1022
1023	consumed := 0
1024ConsumerLoop:
1025	for {
1026		select {
1027		case msg := <-partitionConsumer.Messages():
1028			log.Printf("Consumed message offset %d\n", msg.Offset)
1029			consumed++
1030		case <-signals:
1031			break ConsumerLoop
1032		}
1033	}
1034
1035	log.Printf("Consumed: %d\n", consumed)
1036}
1037