1package sarama
2
3import (
4	"errors"
5	"log"
6	"os"
7	"os/signal"
8	"sync"
9	"testing"
10	"time"
11)
12
13const TestMessage = "ABC THE MESSAGE"
14
15func closeProducer(t *testing.T, p AsyncProducer) {
16	var wg sync.WaitGroup
17	p.AsyncClose()
18
19	wg.Add(2)
20	go func() {
21		for range p.Successes() {
22			t.Error("Unexpected message on Successes()")
23		}
24		wg.Done()
25	}()
26	go func() {
27		for msg := range p.Errors() {
28			t.Error(msg.Err)
29		}
30		wg.Done()
31	}()
32	wg.Wait()
33}
34
35func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
36	expect := successes + errors
37	for expect > 0 {
38		select {
39		case msg := <-p.Errors():
40			if msg.Msg.flags != 0 {
41				t.Error("Message had flags set")
42			}
43			errors--
44			expect--
45			if errors < 0 {
46				t.Error(msg.Err)
47			}
48		case msg := <-p.Successes():
49			if msg.flags != 0 {
50				t.Error("Message had flags set")
51			}
52			successes--
53			expect--
54			if successes < 0 {
55				t.Error("Too many successes")
56			}
57		}
58	}
59	if successes != 0 || errors != 0 {
60		t.Error("Unexpected successes", successes, "or errors", errors)
61	}
62}
63
64type testPartitioner chan *int32
65
66func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
67	part := <-p
68	if part == nil {
69		return 0, errors.New("BOOM")
70	}
71
72	return *part, nil
73}
74
75func (p testPartitioner) RequiresConsistency() bool {
76	return true
77}
78
79func (p testPartitioner) feed(partition int32) {
80	p <- &partition
81}
82
83type flakyEncoder bool
84
85func (f flakyEncoder) Length() int {
86	return len(TestMessage)
87}
88
89func (f flakyEncoder) Encode() ([]byte, error) {
90	if !bool(f) {
91		return nil, errors.New("flaky encoding error")
92	}
93	return []byte(TestMessage), nil
94}
95
96func TestAsyncProducer(t *testing.T) {
97	seedBroker := NewMockBroker(t, 1)
98	leader := NewMockBroker(t, 2)
99
100	metadataResponse := new(MetadataResponse)
101	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
102	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
103	seedBroker.Returns(metadataResponse)
104
105	prodSuccess := new(ProduceResponse)
106	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
107	leader.Returns(prodSuccess)
108
109	config := NewConfig()
110	config.Producer.Flush.Messages = 10
111	config.Producer.Return.Successes = true
112	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
113	if err != nil {
114		t.Fatal(err)
115	}
116
117	for i := 0; i < 10; i++ {
118		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
119	}
120	for i := 0; i < 10; i++ {
121		select {
122		case msg := <-producer.Errors():
123			t.Error(msg.Err)
124			if msg.Msg.flags != 0 {
125				t.Error("Message had flags set")
126			}
127		case msg := <-producer.Successes():
128			if msg.flags != 0 {
129				t.Error("Message had flags set")
130			}
131			if msg.Metadata.(int) != i {
132				t.Error("Message metadata did not match")
133			}
134		}
135	}
136
137	closeProducer(t, producer)
138	leader.Close()
139	seedBroker.Close()
140}
141
142func TestAsyncProducerMultipleFlushes(t *testing.T) {
143	seedBroker := NewMockBroker(t, 1)
144	leader := NewMockBroker(t, 2)
145
146	metadataResponse := new(MetadataResponse)
147	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
148	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
149	seedBroker.Returns(metadataResponse)
150
151	prodSuccess := new(ProduceResponse)
152	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
153	leader.Returns(prodSuccess)
154	leader.Returns(prodSuccess)
155	leader.Returns(prodSuccess)
156
157	config := NewConfig()
158	config.Producer.Flush.Messages = 5
159	config.Producer.Return.Successes = true
160	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
161	if err != nil {
162		t.Fatal(err)
163	}
164
165	for flush := 0; flush < 3; flush++ {
166		for i := 0; i < 5; i++ {
167			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
168		}
169		expectResults(t, producer, 5, 0)
170	}
171
172	closeProducer(t, producer)
173	leader.Close()
174	seedBroker.Close()
175}
176
177func TestAsyncProducerMultipleBrokers(t *testing.T) {
178	seedBroker := NewMockBroker(t, 1)
179	leader0 := NewMockBroker(t, 2)
180	leader1 := NewMockBroker(t, 3)
181
182	metadataResponse := new(MetadataResponse)
183	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
184	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
185	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
186	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
187	seedBroker.Returns(metadataResponse)
188
189	prodResponse0 := new(ProduceResponse)
190	prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
191	leader0.Returns(prodResponse0)
192
193	prodResponse1 := new(ProduceResponse)
194	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
195	leader1.Returns(prodResponse1)
196
197	config := NewConfig()
198	config.Producer.Flush.Messages = 5
199	config.Producer.Return.Successes = true
200	config.Producer.Partitioner = NewRoundRobinPartitioner
201	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
202	if err != nil {
203		t.Fatal(err)
204	}
205
206	for i := 0; i < 10; i++ {
207		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
208	}
209	expectResults(t, producer, 10, 0)
210
211	closeProducer(t, producer)
212	leader1.Close()
213	leader0.Close()
214	seedBroker.Close()
215}
216
217func TestAsyncProducerCustomPartitioner(t *testing.T) {
218	seedBroker := NewMockBroker(t, 1)
219	leader := NewMockBroker(t, 2)
220
221	metadataResponse := new(MetadataResponse)
222	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
223	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
224	seedBroker.Returns(metadataResponse)
225
226	prodResponse := new(ProduceResponse)
227	prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
228	leader.Returns(prodResponse)
229
230	config := NewConfig()
231	config.Producer.Flush.Messages = 2
232	config.Producer.Return.Successes = true
233	config.Producer.Partitioner = func(topic string) Partitioner {
234		p := make(testPartitioner)
235		go func() {
236			p.feed(0)
237			p <- nil
238			p <- nil
239			p <- nil
240			p.feed(0)
241		}()
242		return p
243	}
244	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
245	if err != nil {
246		t.Fatal(err)
247	}
248
249	for i := 0; i < 5; i++ {
250		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
251	}
252	expectResults(t, producer, 2, 3)
253
254	closeProducer(t, producer)
255	leader.Close()
256	seedBroker.Close()
257}
258
259func TestAsyncProducerFailureRetry(t *testing.T) {
260	seedBroker := NewMockBroker(t, 1)
261	leader1 := NewMockBroker(t, 2)
262	leader2 := NewMockBroker(t, 3)
263
264	metadataLeader1 := new(MetadataResponse)
265	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
266	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
267	seedBroker.Returns(metadataLeader1)
268
269	config := NewConfig()
270	config.Producer.Flush.Messages = 10
271	config.Producer.Return.Successes = true
272	config.Producer.Retry.Backoff = 0
273	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
274	if err != nil {
275		t.Fatal(err)
276	}
277	seedBroker.Close()
278
279	for i := 0; i < 10; i++ {
280		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
281	}
282	prodNotLeader := new(ProduceResponse)
283	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
284	leader1.Returns(prodNotLeader)
285
286	metadataLeader2 := new(MetadataResponse)
287	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
288	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
289	leader1.Returns(metadataLeader2)
290
291	prodSuccess := new(ProduceResponse)
292	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
293	leader2.Returns(prodSuccess)
294	expectResults(t, producer, 10, 0)
295	leader1.Close()
296
297	for i := 0; i < 10; i++ {
298		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
299	}
300	leader2.Returns(prodSuccess)
301	expectResults(t, producer, 10, 0)
302
303	leader2.Close()
304	closeProducer(t, producer)
305}
306
307func TestAsyncProducerEncoderFailures(t *testing.T) {
308	seedBroker := NewMockBroker(t, 1)
309	leader := NewMockBroker(t, 2)
310
311	metadataResponse := new(MetadataResponse)
312	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
313	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
314	seedBroker.Returns(metadataResponse)
315
316	prodSuccess := new(ProduceResponse)
317	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
318	leader.Returns(prodSuccess)
319	leader.Returns(prodSuccess)
320	leader.Returns(prodSuccess)
321
322	config := NewConfig()
323	config.Producer.Flush.Messages = 1
324	config.Producer.Return.Successes = true
325	config.Producer.Partitioner = NewManualPartitioner
326	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
327	if err != nil {
328		t.Fatal(err)
329	}
330
331	for flush := 0; flush < 3; flush++ {
332		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
333		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
334		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
335		expectResults(t, producer, 1, 2)
336	}
337
338	closeProducer(t, producer)
339	leader.Close()
340	seedBroker.Close()
341}
342
343// If a Kafka broker becomes unavailable and then returns back in service, then
344// producer reconnects to it and continues sending messages.
345func TestAsyncProducerBrokerBounce(t *testing.T) {
346	// Given
347	seedBroker := NewMockBroker(t, 1)
348	leader := NewMockBroker(t, 2)
349	leaderAddr := leader.Addr()
350
351	metadataResponse := new(MetadataResponse)
352	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
353	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
354	seedBroker.Returns(metadataResponse)
355
356	prodSuccess := new(ProduceResponse)
357	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
358
359	config := NewConfig()
360	config.Producer.Flush.Messages = 1
361	config.Producer.Return.Successes = true
362	config.Producer.Retry.Backoff = 0
363	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
364	if err != nil {
365		t.Fatal(err)
366	}
367	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
368	leader.Returns(prodSuccess)
369	expectResults(t, producer, 1, 0)
370
371	// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
372	leader.Close()                               // producer should get EOF
373	leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
374	seedBroker.Returns(metadataResponse)         // tell it to go to broker 2 again
375
376	// Then: a produced message goes through the new broker connection.
377	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
378	leader.Returns(prodSuccess)
379	expectResults(t, producer, 1, 0)
380
381	closeProducer(t, producer)
382	seedBroker.Close()
383	leader.Close()
384}
385
386func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
387	seedBroker := NewMockBroker(t, 1)
388	leader1 := NewMockBroker(t, 2)
389	leader2 := NewMockBroker(t, 3)
390
391	metadataLeader1 := new(MetadataResponse)
392	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
393	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
394	seedBroker.Returns(metadataLeader1)
395
396	config := NewConfig()
397	config.Producer.Flush.Messages = 10
398	config.Producer.Return.Successes = true
399	config.Producer.Retry.Max = 3
400	config.Producer.Retry.Backoff = 0
401	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
402	if err != nil {
403		t.Fatal(err)
404	}
405
406	for i := 0; i < 10; i++ {
407		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
408	}
409	leader1.Close()                     // producer should get EOF
410	seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
411	seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
412
413	// ok fine, tell it to go to leader2 finally
414	metadataLeader2 := new(MetadataResponse)
415	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
416	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
417	seedBroker.Returns(metadataLeader2)
418
419	prodSuccess := new(ProduceResponse)
420	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
421	leader2.Returns(prodSuccess)
422	expectResults(t, producer, 10, 0)
423	seedBroker.Close()
424	leader2.Close()
425
426	closeProducer(t, producer)
427}
428
429func TestAsyncProducerMultipleRetries(t *testing.T) {
430	seedBroker := NewMockBroker(t, 1)
431	leader1 := NewMockBroker(t, 2)
432	leader2 := NewMockBroker(t, 3)
433
434	metadataLeader1 := new(MetadataResponse)
435	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
436	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
437	seedBroker.Returns(metadataLeader1)
438
439	config := NewConfig()
440	config.Producer.Flush.Messages = 10
441	config.Producer.Return.Successes = true
442	config.Producer.Retry.Max = 4
443	config.Producer.Retry.Backoff = 0
444	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
445	if err != nil {
446		t.Fatal(err)
447	}
448
449	for i := 0; i < 10; i++ {
450		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
451	}
452	prodNotLeader := new(ProduceResponse)
453	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
454	leader1.Returns(prodNotLeader)
455
456	metadataLeader2 := new(MetadataResponse)
457	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
458	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
459	seedBroker.Returns(metadataLeader2)
460	leader2.Returns(prodNotLeader)
461	seedBroker.Returns(metadataLeader1)
462	leader1.Returns(prodNotLeader)
463	seedBroker.Returns(metadataLeader1)
464	leader1.Returns(prodNotLeader)
465	seedBroker.Returns(metadataLeader2)
466
467	prodSuccess := new(ProduceResponse)
468	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
469	leader2.Returns(prodSuccess)
470	expectResults(t, producer, 10, 0)
471
472	for i := 0; i < 10; i++ {
473		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
474	}
475	leader2.Returns(prodSuccess)
476	expectResults(t, producer, 10, 0)
477
478	seedBroker.Close()
479	leader1.Close()
480	leader2.Close()
481	closeProducer(t, producer)
482}
483
484func TestAsyncProducerOutOfRetries(t *testing.T) {
485	t.Skip("Enable once bug #294 is fixed.")
486
487	seedBroker := NewMockBroker(t, 1)
488	leader := NewMockBroker(t, 2)
489
490	metadataResponse := new(MetadataResponse)
491	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
492	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
493	seedBroker.Returns(metadataResponse)
494
495	config := NewConfig()
496	config.Producer.Flush.Messages = 10
497	config.Producer.Return.Successes = true
498	config.Producer.Retry.Backoff = 0
499	config.Producer.Retry.Max = 0
500	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
501	if err != nil {
502		t.Fatal(err)
503	}
504
505	for i := 0; i < 10; i++ {
506		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
507	}
508
509	prodNotLeader := new(ProduceResponse)
510	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
511	leader.Returns(prodNotLeader)
512
513	for i := 0; i < 10; i++ {
514		select {
515		case msg := <-producer.Errors():
516			if msg.Err != ErrNotLeaderForPartition {
517				t.Error(msg.Err)
518			}
519		case <-producer.Successes():
520			t.Error("Unexpected success")
521		}
522	}
523
524	seedBroker.Returns(metadataResponse)
525
526	for i := 0; i < 10; i++ {
527		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
528	}
529
530	prodSuccess := new(ProduceResponse)
531	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
532	leader.Returns(prodSuccess)
533
534	expectResults(t, producer, 10, 0)
535
536	leader.Close()
537	seedBroker.Close()
538	safeClose(t, producer)
539}
540
541func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
542	seedBroker := NewMockBroker(t, 1)
543	leader := NewMockBroker(t, 2)
544	leaderAddr := leader.Addr()
545
546	metadataResponse := new(MetadataResponse)
547	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
548	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
549	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
550	seedBroker.Returns(metadataResponse)
551
552	config := NewConfig()
553	config.Producer.Return.Successes = true
554	config.Producer.Retry.Backoff = 0
555	config.Producer.Retry.Max = 1
556	config.Producer.Partitioner = NewRoundRobinPartitioner
557	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
558	if err != nil {
559		t.Fatal(err)
560	}
561
562	// prime partition 0
563	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
564	prodSuccess := new(ProduceResponse)
565	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
566	leader.Returns(prodSuccess)
567	expectResults(t, producer, 1, 0)
568
569	// prime partition 1
570	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
571	prodSuccess = new(ProduceResponse)
572	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
573	leader.Returns(prodSuccess)
574	expectResults(t, producer, 1, 0)
575
576	// reboot the broker (the producer will get EOF on its existing connection)
577	leader.Close()
578	leader = NewMockBrokerAddr(t, 2, leaderAddr)
579
580	// send another message on partition 0 to trigger the EOF and retry
581	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
582
583	// tell partition 0 to go to that broker again
584	seedBroker.Returns(metadataResponse)
585
586	// succeed this time
587	prodSuccess = new(ProduceResponse)
588	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
589	leader.Returns(prodSuccess)
590	expectResults(t, producer, 1, 0)
591
592	// shutdown
593	closeProducer(t, producer)
594	seedBroker.Close()
595	leader.Close()
596}
597
598func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
599	seedBroker := NewMockBroker(t, 1)
600	leader := NewMockBroker(t, 2)
601
602	metadataResponse := new(MetadataResponse)
603	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
604	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
605	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
606	seedBroker.Returns(metadataResponse)
607
608	config := NewConfig()
609	config.Producer.Flush.Messages = 5
610	config.Producer.Return.Successes = true
611	config.Producer.Retry.Backoff = 0
612	config.Producer.Retry.Max = 1
613	config.Producer.Partitioner = NewManualPartitioner
614	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
615	if err != nil {
616		t.Fatal(err)
617	}
618
619	// prime partitions
620	for p := int32(0); p < 2; p++ {
621		for i := 0; i < 5; i++ {
622			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
623		}
624		prodSuccess := new(ProduceResponse)
625		prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
626		leader.Returns(prodSuccess)
627		expectResults(t, producer, 5, 0)
628	}
629
630	// send more messages on partition 0
631	for i := 0; i < 5; i++ {
632		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
633	}
634	prodNotLeader := new(ProduceResponse)
635	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
636	leader.Returns(prodNotLeader)
637
638	time.Sleep(50 * time.Millisecond)
639
640	leader.SetHandlerByMap(map[string]MockResponse{
641		"ProduceRequest": NewMockProduceResponse(t).
642			SetError("my_topic", 0, ErrNoError),
643	})
644
645	// tell partition 0 to go to that broker again
646	seedBroker.Returns(metadataResponse)
647
648	// succeed this time
649	expectResults(t, producer, 5, 0)
650
651	// put five more through
652	for i := 0; i < 5; i++ {
653		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
654	}
655	expectResults(t, producer, 5, 0)
656
657	// shutdown
658	closeProducer(t, producer)
659	seedBroker.Close()
660	leader.Close()
661}
662
663func TestAsyncProducerRetryShutdown(t *testing.T) {
664	seedBroker := NewMockBroker(t, 1)
665	leader := NewMockBroker(t, 2)
666
667	metadataLeader := new(MetadataResponse)
668	metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
669	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
670	seedBroker.Returns(metadataLeader)
671
672	config := NewConfig()
673	config.Producer.Flush.Messages = 10
674	config.Producer.Return.Successes = true
675	config.Producer.Retry.Backoff = 0
676	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
677	if err != nil {
678		t.Fatal(err)
679	}
680
681	for i := 0; i < 10; i++ {
682		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
683	}
684	producer.AsyncClose()
685	time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
686
687	producer.Input() <- &ProducerMessage{Topic: "FOO"}
688	if err := <-producer.Errors(); err.Err != ErrShuttingDown {
689		t.Error(err)
690	}
691
692	prodNotLeader := new(ProduceResponse)
693	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
694	leader.Returns(prodNotLeader)
695
696	seedBroker.Returns(metadataLeader)
697
698	prodSuccess := new(ProduceResponse)
699	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
700	leader.Returns(prodSuccess)
701	expectResults(t, producer, 10, 0)
702
703	seedBroker.Close()
704	leader.Close()
705
706	// wait for the async-closed producer to shut down fully
707	for err := range producer.Errors() {
708		t.Error(err)
709	}
710}
711
712func TestAsyncProducerNoReturns(t *testing.T) {
713	seedBroker := NewMockBroker(t, 1)
714	leader := NewMockBroker(t, 2)
715
716	metadataLeader := new(MetadataResponse)
717	metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
718	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
719	seedBroker.Returns(metadataLeader)
720
721	config := NewConfig()
722	config.Producer.Flush.Messages = 10
723	config.Producer.Return.Successes = false
724	config.Producer.Return.Errors = false
725	config.Producer.Retry.Backoff = 0
726	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
727	if err != nil {
728		t.Fatal(err)
729	}
730
731	for i := 0; i < 10; i++ {
732		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
733	}
734
735	wait := make(chan bool)
736	go func() {
737		if err := producer.Close(); err != nil {
738			t.Error(err)
739		}
740		close(wait)
741	}()
742
743	prodSuccess := new(ProduceResponse)
744	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
745	leader.Returns(prodSuccess)
746
747	<-wait
748	seedBroker.Close()
749	leader.Close()
750}
751
752// This example shows how to use the producer while simultaneously
753// reading the Errors channel to know about any failures.
754func ExampleAsyncProducer_select() {
755	producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
756	if err != nil {
757		panic(err)
758	}
759
760	defer func() {
761		if err := producer.Close(); err != nil {
762			log.Fatalln(err)
763		}
764	}()
765
766	// Trap SIGINT to trigger a shutdown.
767	signals := make(chan os.Signal, 1)
768	signal.Notify(signals, os.Interrupt)
769
770	var enqueued, errors int
771ProducerLoop:
772	for {
773		select {
774		case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
775			enqueued++
776		case err := <-producer.Errors():
777			log.Println("Failed to produce message", err)
778			errors++
779		case <-signals:
780			break ProducerLoop
781		}
782	}
783
784	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
785}
786
787// This example shows how to use the producer with separate goroutines
788// reading from the Successes and Errors channels. Note that in order
789// for the Successes channel to be populated, you have to set
790// config.Producer.Return.Successes to true.
791func ExampleAsyncProducer_goroutines() {
792	config := NewConfig()
793	config.Producer.Return.Successes = true
794	producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
795	if err != nil {
796		panic(err)
797	}
798
799	// Trap SIGINT to trigger a graceful shutdown.
800	signals := make(chan os.Signal, 1)
801	signal.Notify(signals, os.Interrupt)
802
803	var (
804		wg                          sync.WaitGroup
805		enqueued, successes, errors int
806	)
807
808	wg.Add(1)
809	go func() {
810		defer wg.Done()
811		for range producer.Successes() {
812			successes++
813		}
814	}()
815
816	wg.Add(1)
817	go func() {
818		defer wg.Done()
819		for err := range producer.Errors() {
820			log.Println(err)
821			errors++
822		}
823	}()
824
825ProducerLoop:
826	for {
827		message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
828		select {
829		case producer.Input() <- message:
830			enqueued++
831
832		case <-signals:
833			producer.AsyncClose() // Trigger a shutdown of the producer.
834			break ProducerLoop
835		}
836	}
837
838	wg.Wait()
839
840	log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
841}
842