1package sarama
2
3import (
4	"errors"
5	"log"
6	"os"
7	"os/signal"
8	"sync"
9	"testing"
10	"time"
11)
12
13const TestMessage = "ABC THE MESSAGE"
14
15func closeProducer(t *testing.T, p AsyncProducer) {
16	var wg sync.WaitGroup
17	p.AsyncClose()
18
19	wg.Add(2)
20	go func() {
21		for range p.Successes() {
22			t.Error("Unexpected message on Successes()")
23		}
24		wg.Done()
25	}()
26	go func() {
27		for msg := range p.Errors() {
28			t.Error(msg.Err)
29		}
30		wg.Done()
31	}()
32	wg.Wait()
33}
34
35func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
36	expect := successes + errors
37	for expect > 0 {
38		select {
39		case msg := <-p.Errors():
40			if msg.Msg.flags != 0 {
41				t.Error("Message had flags set")
42			}
43			errors--
44			expect--
45			if errors < 0 {
46				t.Error(msg.Err)
47			}
48		case msg := <-p.Successes():
49			if msg.flags != 0 {
50				t.Error("Message had flags set")
51			}
52			successes--
53			expect--
54			if successes < 0 {
55				t.Error("Too many successes")
56			}
57		}
58	}
59	if successes != 0 || errors != 0 {
60		t.Error("Unexpected successes", successes, "or errors", errors)
61	}
62}
63
64type testPartitioner chan *int32
65
66func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
67	part := <-p
68	if part == nil {
69		return 0, errors.New("BOOM")
70	}
71
72	return *part, nil
73}
74
75func (p testPartitioner) RequiresConsistency() bool {
76	return true
77}
78
79func (p testPartitioner) feed(partition int32) {
80	p <- &partition
81}
82
83type flakyEncoder bool
84
85func (f flakyEncoder) Length() int {
86	return len(TestMessage)
87}
88
89func (f flakyEncoder) Encode() ([]byte, error) {
90	if !bool(f) {
91		return nil, errors.New("flaky encoding error")
92	}
93	return []byte(TestMessage), nil
94}
95
96func TestAsyncProducer(t *testing.T) {
97	seedBroker := NewMockBroker(t, 1)
98	leader := NewMockBroker(t, 2)
99
100	metadataResponse := new(MetadataResponse)
101	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
102	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
103	seedBroker.Returns(metadataResponse)
104
105	prodSuccess := new(ProduceResponse)
106	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
107	leader.Returns(prodSuccess)
108
109	config := NewConfig()
110	config.Producer.Flush.Messages = 10
111	config.Producer.Return.Successes = true
112	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
113	if err != nil {
114		t.Fatal(err)
115	}
116
117	for i := 0; i < 10; i++ {
118		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
119	}
120	for i := 0; i < 10; i++ {
121		select {
122		case msg := <-producer.Errors():
123			t.Error(msg.Err)
124			if msg.Msg.flags != 0 {
125				t.Error("Message had flags set")
126			}
127		case msg := <-producer.Successes():
128			if msg.flags != 0 {
129				t.Error("Message had flags set")
130			}
131			if msg.Metadata.(int) != i {
132				t.Error("Message metadata did not match")
133			}
134		case <-time.After(time.Second):
135			t.Errorf("Timeout waiting for msg #%d", i)
136			goto done
137		}
138	}
139done:
140	closeProducer(t, producer)
141	leader.Close()
142	seedBroker.Close()
143}
144
145func TestAsyncProducerMultipleFlushes(t *testing.T) {
146	seedBroker := NewMockBroker(t, 1)
147	leader := NewMockBroker(t, 2)
148
149	metadataResponse := new(MetadataResponse)
150	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
151	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
152	seedBroker.Returns(metadataResponse)
153
154	prodSuccess := new(ProduceResponse)
155	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
156	leader.Returns(prodSuccess)
157	leader.Returns(prodSuccess)
158	leader.Returns(prodSuccess)
159
160	config := NewConfig()
161	config.Producer.Flush.Messages = 5
162	config.Producer.Return.Successes = true
163	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
164	if err != nil {
165		t.Fatal(err)
166	}
167
168	for flush := 0; flush < 3; flush++ {
169		for i := 0; i < 5; i++ {
170			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
171		}
172		expectResults(t, producer, 5, 0)
173	}
174
175	closeProducer(t, producer)
176	leader.Close()
177	seedBroker.Close()
178}
179
180func TestAsyncProducerMultipleBrokers(t *testing.T) {
181	seedBroker := NewMockBroker(t, 1)
182	leader0 := NewMockBroker(t, 2)
183	leader1 := NewMockBroker(t, 3)
184
185	metadataResponse := new(MetadataResponse)
186	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
187	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
188	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
189	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
190	seedBroker.Returns(metadataResponse)
191
192	prodResponse0 := new(ProduceResponse)
193	prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
194	leader0.Returns(prodResponse0)
195
196	prodResponse1 := new(ProduceResponse)
197	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
198	leader1.Returns(prodResponse1)
199
200	config := NewConfig()
201	config.Producer.Flush.Messages = 5
202	config.Producer.Return.Successes = true
203	config.Producer.Partitioner = NewRoundRobinPartitioner
204	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
205	if err != nil {
206		t.Fatal(err)
207	}
208
209	for i := 0; i < 10; i++ {
210		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
211	}
212	expectResults(t, producer, 10, 0)
213
214	closeProducer(t, producer)
215	leader1.Close()
216	leader0.Close()
217	seedBroker.Close()
218}
219
220func TestAsyncProducerCustomPartitioner(t *testing.T) {
221	seedBroker := NewMockBroker(t, 1)
222	leader := NewMockBroker(t, 2)
223
224	metadataResponse := new(MetadataResponse)
225	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
226	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
227	seedBroker.Returns(metadataResponse)
228
229	prodResponse := new(ProduceResponse)
230	prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
231	leader.Returns(prodResponse)
232
233	config := NewConfig()
234	config.Producer.Flush.Messages = 2
235	config.Producer.Return.Successes = true
236	config.Producer.Partitioner = func(topic string) Partitioner {
237		p := make(testPartitioner)
238		go func() {
239			p.feed(0)
240			p <- nil
241			p <- nil
242			p <- nil
243			p.feed(0)
244		}()
245		return p
246	}
247	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
248	if err != nil {
249		t.Fatal(err)
250	}
251
252	for i := 0; i < 5; i++ {
253		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
254	}
255	expectResults(t, producer, 2, 3)
256
257	closeProducer(t, producer)
258	leader.Close()
259	seedBroker.Close()
260}
261
262func TestAsyncProducerFailureRetry(t *testing.T) {
263	seedBroker := NewMockBroker(t, 1)
264	leader1 := NewMockBroker(t, 2)
265	leader2 := NewMockBroker(t, 3)
266
267	metadataLeader1 := new(MetadataResponse)
268	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
269	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
270	seedBroker.Returns(metadataLeader1)
271
272	config := NewConfig()
273	config.Producer.Flush.Messages = 10
274	config.Producer.Return.Successes = true
275	config.Producer.Retry.Backoff = 0
276	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
277	if err != nil {
278		t.Fatal(err)
279	}
280	seedBroker.Close()
281
282	for i := 0; i < 10; i++ {
283		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
284	}
285	prodNotLeader := new(ProduceResponse)
286	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
287	leader1.Returns(prodNotLeader)
288
289	metadataLeader2 := new(MetadataResponse)
290	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
291	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
292	leader1.Returns(metadataLeader2)
293
294	prodSuccess := new(ProduceResponse)
295	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
296	leader2.Returns(prodSuccess)
297	expectResults(t, producer, 10, 0)
298	leader1.Close()
299
300	for i := 0; i < 10; i++ {
301		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
302	}
303	leader2.Returns(prodSuccess)
304	expectResults(t, producer, 10, 0)
305
306	leader2.Close()
307	closeProducer(t, producer)
308}
309
310func TestAsyncProducerEncoderFailures(t *testing.T) {
311	seedBroker := NewMockBroker(t, 1)
312	leader := NewMockBroker(t, 2)
313
314	metadataResponse := new(MetadataResponse)
315	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
316	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
317	seedBroker.Returns(metadataResponse)
318
319	prodSuccess := new(ProduceResponse)
320	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
321	leader.Returns(prodSuccess)
322	leader.Returns(prodSuccess)
323	leader.Returns(prodSuccess)
324
325	config := NewConfig()
326	config.Producer.Flush.Messages = 1
327	config.Producer.Return.Successes = true
328	config.Producer.Partitioner = NewManualPartitioner
329	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
330	if err != nil {
331		t.Fatal(err)
332	}
333
334	for flush := 0; flush < 3; flush++ {
335		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
336		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
337		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
338		expectResults(t, producer, 1, 2)
339	}
340
341	closeProducer(t, producer)
342	leader.Close()
343	seedBroker.Close()
344}
345
346// If a Kafka broker becomes unavailable and then returns back in service, then
347// producer reconnects to it and continues sending messages.
348func TestAsyncProducerBrokerBounce(t *testing.T) {
349	// Given
350	seedBroker := NewMockBroker(t, 1)
351	leader := NewMockBroker(t, 2)
352	leaderAddr := leader.Addr()
353
354	metadataResponse := new(MetadataResponse)
355	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
356	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
357	seedBroker.Returns(metadataResponse)
358
359	prodSuccess := new(ProduceResponse)
360	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
361
362	config := NewConfig()
363	config.Producer.Flush.Messages = 1
364	config.Producer.Return.Successes = true
365	config.Producer.Retry.Backoff = 0
366	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
367	if err != nil {
368		t.Fatal(err)
369	}
370	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
371	leader.Returns(prodSuccess)
372	expectResults(t, producer, 1, 0)
373
374	// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
375	leader.Close()                               // producer should get EOF
376	leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
377	seedBroker.Returns(metadataResponse)         // tell it to go to broker 2 again
378
379	// Then: a produced message goes through the new broker connection.
380	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
381	leader.Returns(prodSuccess)
382	expectResults(t, producer, 1, 0)
383
384	closeProducer(t, producer)
385	seedBroker.Close()
386	leader.Close()
387}
388
389func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
390	seedBroker := NewMockBroker(t, 1)
391	leader1 := NewMockBroker(t, 2)
392	leader2 := NewMockBroker(t, 3)
393
394	metadataLeader1 := new(MetadataResponse)
395	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
396	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
397	seedBroker.Returns(metadataLeader1)
398
399	config := NewConfig()
400	config.Producer.Flush.Messages = 10
401	config.Producer.Return.Successes = true
402	config.Producer.Retry.Max = 3
403	config.Producer.Retry.Backoff = 0
404	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
405	if err != nil {
406		t.Fatal(err)
407	}
408
409	for i := 0; i < 10; i++ {
410		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
411	}
412	leader1.Close()                     // producer should get EOF
413	seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
414	seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
415
416	// ok fine, tell it to go to leader2 finally
417	metadataLeader2 := new(MetadataResponse)
418	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
419	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
420	seedBroker.Returns(metadataLeader2)
421
422	prodSuccess := new(ProduceResponse)
423	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
424	leader2.Returns(prodSuccess)
425	expectResults(t, producer, 10, 0)
426	seedBroker.Close()
427	leader2.Close()
428
429	closeProducer(t, producer)
430}
431
432func TestAsyncProducerMultipleRetries(t *testing.T) {
433	seedBroker := NewMockBroker(t, 1)
434	leader1 := NewMockBroker(t, 2)
435	leader2 := NewMockBroker(t, 3)
436
437	metadataLeader1 := new(MetadataResponse)
438	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
439	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
440	seedBroker.Returns(metadataLeader1)
441
442	config := NewConfig()
443	config.Producer.Flush.Messages = 10
444	config.Producer.Return.Successes = true
445	config.Producer.Retry.Max = 4
446	config.Producer.Retry.Backoff = 0
447	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
448	if err != nil {
449		t.Fatal(err)
450	}
451
452	for i := 0; i < 10; i++ {
453		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
454	}
455	prodNotLeader := new(ProduceResponse)
456	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
457	leader1.Returns(prodNotLeader)
458
459	metadataLeader2 := new(MetadataResponse)
460	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
461	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
462	seedBroker.Returns(metadataLeader2)
463	leader2.Returns(prodNotLeader)
464	seedBroker.Returns(metadataLeader1)
465	leader1.Returns(prodNotLeader)
466	seedBroker.Returns(metadataLeader1)
467	leader1.Returns(prodNotLeader)
468	seedBroker.Returns(metadataLeader2)
469
470	prodSuccess := new(ProduceResponse)
471	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
472	leader2.Returns(prodSuccess)
473	expectResults(t, producer, 10, 0)
474
475	for i := 0; i < 10; i++ {
476		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
477	}
478	leader2.Returns(prodSuccess)
479	expectResults(t, producer, 10, 0)
480
481	seedBroker.Close()
482	leader1.Close()
483	leader2.Close()
484	closeProducer(t, producer)
485}
486
487func TestAsyncProducerOutOfRetries(t *testing.T) {
488	t.Skip("Enable once bug #294 is fixed.")
489
490	seedBroker := NewMockBroker(t, 1)
491	leader := NewMockBroker(t, 2)
492
493	metadataResponse := new(MetadataResponse)
494	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
495	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
496	seedBroker.Returns(metadataResponse)
497
498	config := NewConfig()
499	config.Producer.Flush.Messages = 10
500	config.Producer.Return.Successes = true
501	config.Producer.Retry.Backoff = 0
502	config.Producer.Retry.Max = 0
503	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
504	if err != nil {
505		t.Fatal(err)
506	}
507
508	for i := 0; i < 10; i++ {
509		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
510	}
511
512	prodNotLeader := new(ProduceResponse)
513	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
514	leader.Returns(prodNotLeader)
515
516	for i := 0; i < 10; i++ {
517		select {
518		case msg := <-producer.Errors():
519			if msg.Err != ErrNotLeaderForPartition {
520				t.Error(msg.Err)
521			}
522		case <-producer.Successes():
523			t.Error("Unexpected success")
524		}
525	}
526
527	seedBroker.Returns(metadataResponse)
528
529	for i := 0; i < 10; i++ {
530		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
531	}
532
533	prodSuccess := new(ProduceResponse)
534	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
535	leader.Returns(prodSuccess)
536
537	expectResults(t, producer, 10, 0)
538
539	leader.Close()
540	seedBroker.Close()
541	safeClose(t, producer)
542}
543
544func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
545	seedBroker := NewMockBroker(t, 1)
546	leader := NewMockBroker(t, 2)
547	leaderAddr := leader.Addr()
548
549	metadataResponse := new(MetadataResponse)
550	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
551	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
552	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
553	seedBroker.Returns(metadataResponse)
554
555	config := NewConfig()
556	config.Producer.Return.Successes = true
557	config.Producer.Retry.Backoff = 0
558	config.Producer.Retry.Max = 1
559	config.Producer.Partitioner = NewRoundRobinPartitioner
560	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
561	if err != nil {
562		t.Fatal(err)
563	}
564
565	// prime partition 0
566	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
567	prodSuccess := new(ProduceResponse)
568	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
569	leader.Returns(prodSuccess)
570	expectResults(t, producer, 1, 0)
571
572	// prime partition 1
573	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
574	prodSuccess = new(ProduceResponse)
575	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
576	leader.Returns(prodSuccess)
577	expectResults(t, producer, 1, 0)
578
579	// reboot the broker (the producer will get EOF on its existing connection)
580	leader.Close()
581	leader = NewMockBrokerAddr(t, 2, leaderAddr)
582
583	// send another message on partition 0 to trigger the EOF and retry
584	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
585
586	// tell partition 0 to go to that broker again
587	seedBroker.Returns(metadataResponse)
588
589	// succeed this time
590	prodSuccess = new(ProduceResponse)
591	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
592	leader.Returns(prodSuccess)
593	expectResults(t, producer, 1, 0)
594
595	// shutdown
596	closeProducer(t, producer)
597	seedBroker.Close()
598	leader.Close()
599}
600
601func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
602	seedBroker := NewMockBroker(t, 1)
603	leader := NewMockBroker(t, 2)
604
605	metadataResponse := new(MetadataResponse)
606	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
607	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
608	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
609	seedBroker.Returns(metadataResponse)
610
611	config := NewConfig()
612	config.Producer.Flush.Messages = 5
613	config.Producer.Return.Successes = true
614	config.Producer.Retry.Backoff = 0
615	config.Producer.Retry.Max = 1
616	config.Producer.Partitioner = NewManualPartitioner
617	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
618	if err != nil {
619		t.Fatal(err)
620	}
621
622	// prime partitions
623	for p := int32(0); p < 2; p++ {
624		for i := 0; i < 5; i++ {
625			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
626		}
627		prodSuccess := new(ProduceResponse)
628		prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
629		leader.Returns(prodSuccess)
630		expectResults(t, producer, 5, 0)
631	}
632
633	// send more messages on partition 0
634	for i := 0; i < 5; i++ {
635		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
636	}
637	prodNotLeader := new(ProduceResponse)
638	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
639	leader.Returns(prodNotLeader)
640
641	time.Sleep(50 * time.Millisecond)
642
643	leader.SetHandlerByMap(map[string]MockResponse{
644		"ProduceRequest": NewMockProduceResponse(t).
645			SetVersion(0).
646			SetError("my_topic", 0, ErrNoError),
647	})
648
649	// tell partition 0 to go to that broker again
650	seedBroker.Returns(metadataResponse)
651
652	// succeed this time
653	expectResults(t, producer, 5, 0)
654
655	// put five more through
656	for i := 0; i < 5; i++ {
657		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
658	}
659	expectResults(t, producer, 5, 0)
660
661	// shutdown
662	closeProducer(t, producer)
663	seedBroker.Close()
664	leader.Close()
665}
666
667func TestAsyncProducerRetryShutdown(t *testing.T) {
668	seedBroker := NewMockBroker(t, 1)
669	leader := NewMockBroker(t, 2)
670
671	metadataLeader := new(MetadataResponse)
672	metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
673	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
674	seedBroker.Returns(metadataLeader)
675
676	config := NewConfig()
677	config.Producer.Flush.Messages = 10
678	config.Producer.Return.Successes = true
679	config.Producer.Retry.Backoff = 0
680	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
681	if err != nil {
682		t.Fatal(err)
683	}
684
685	for i := 0; i < 10; i++ {
686		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
687	}
688	producer.AsyncClose()
689	time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
690
691	producer.Input() <- &ProducerMessage{Topic: "FOO"}
692	if err := <-producer.Errors(); err.Err != ErrShuttingDown {
693		t.Error(err)
694	}
695
696	prodNotLeader := new(ProduceResponse)
697	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
698	leader.Returns(prodNotLeader)
699
700	seedBroker.Returns(metadataLeader)
701
702	prodSuccess := new(ProduceResponse)
703	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
704	leader.Returns(prodSuccess)
705	expectResults(t, producer, 10, 0)
706
707	seedBroker.Close()
708	leader.Close()
709
710	// wait for the async-closed producer to shut down fully
711	for err := range producer.Errors() {
712		t.Error(err)
713	}
714}
715
716func TestAsyncProducerNoReturns(t *testing.T) {
717	seedBroker := NewMockBroker(t, 1)
718	leader := NewMockBroker(t, 2)
719
720	metadataLeader := new(MetadataResponse)
721	metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
722	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
723	seedBroker.Returns(metadataLeader)
724
725	config := NewConfig()
726	config.Producer.Flush.Messages = 10
727	config.Producer.Return.Successes = false
728	config.Producer.Return.Errors = false
729	config.Producer.Retry.Backoff = 0
730	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
731	if err != nil {
732		t.Fatal(err)
733	}
734
735	for i := 0; i < 10; i++ {
736		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
737	}
738
739	wait := make(chan bool)
740	go func() {
741		if err := producer.Close(); err != nil {
742			t.Error(err)
743		}
744		close(wait)
745	}()
746
747	prodSuccess := new(ProduceResponse)
748	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
749	leader.Returns(prodSuccess)
750
751	<-wait
752	seedBroker.Close()
753	leader.Close()
754}
755
756// This example shows how to use the producer while simultaneously
757// reading the Errors channel to know about any failures.
758func ExampleAsyncProducer_select() {
759	producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
760	if err != nil {
761		panic(err)
762	}
763
764	defer func() {
765		if err := producer.Close(); err != nil {
766			log.Fatalln(err)
767		}
768	}()
769
770	// Trap SIGINT to trigger a shutdown.
771	signals := make(chan os.Signal, 1)
772	signal.Notify(signals, os.Interrupt)
773
774	var enqueued, errors int
775ProducerLoop:
776	for {
777		select {
778		case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
779			enqueued++
780		case err := <-producer.Errors():
781			log.Println("Failed to produce message", err)
782			errors++
783		case <-signals:
784			break ProducerLoop
785		}
786	}
787
788	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
789}
790
791// This example shows how to use the producer with separate goroutines
792// reading from the Successes and Errors channels. Note that in order
793// for the Successes channel to be populated, you have to set
794// config.Producer.Return.Successes to true.
795func ExampleAsyncProducer_goroutines() {
796	config := NewConfig()
797	config.Producer.Return.Successes = true
798	producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
799	if err != nil {
800		panic(err)
801	}
802
803	// Trap SIGINT to trigger a graceful shutdown.
804	signals := make(chan os.Signal, 1)
805	signal.Notify(signals, os.Interrupt)
806
807	var (
808		wg                          sync.WaitGroup
809		enqueued, successes, errors int
810	)
811
812	wg.Add(1)
813	go func() {
814		defer wg.Done()
815		for range producer.Successes() {
816			successes++
817		}
818	}()
819
820	wg.Add(1)
821	go func() {
822		defer wg.Done()
823		for err := range producer.Errors() {
824			log.Println(err)
825			errors++
826		}
827	}()
828
829ProducerLoop:
830	for {
831		message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
832		select {
833		case producer.Input() <- message:
834			enqueued++
835
836		case <-signals:
837			producer.AsyncClose() // Trigger a shutdown of the producer.
838			break ProducerLoop
839		}
840	}
841
842	wg.Wait()
843
844	log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
845}
846