1package sarama 2 3import ( 4 "errors" 5 "log" 6 "os" 7 "os/signal" 8 "sync" 9 "testing" 10 "time" 11) 12 13const TestMessage = "ABC THE MESSAGE" 14 15func closeProducer(t *testing.T, p AsyncProducer) { 16 var wg sync.WaitGroup 17 p.AsyncClose() 18 19 wg.Add(2) 20 go func() { 21 for range p.Successes() { 22 t.Error("Unexpected message on Successes()") 23 } 24 wg.Done() 25 }() 26 go func() { 27 for msg := range p.Errors() { 28 t.Error(msg.Err) 29 } 30 wg.Done() 31 }() 32 wg.Wait() 33} 34 35func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { 36 expect := successes + errors 37 for expect > 0 { 38 select { 39 case msg := <-p.Errors(): 40 if msg.Msg.flags != 0 { 41 t.Error("Message had flags set") 42 } 43 errors-- 44 expect-- 45 if errors < 0 { 46 t.Error(msg.Err) 47 } 48 case msg := <-p.Successes(): 49 if msg.flags != 0 { 50 t.Error("Message had flags set") 51 } 52 successes-- 53 expect-- 54 if successes < 0 { 55 t.Error("Too many successes") 56 } 57 } 58 } 59 if successes != 0 || errors != 0 { 60 t.Error("Unexpected successes", successes, "or errors", errors) 61 } 62} 63 64type testPartitioner chan *int32 65 66func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) { 67 part := <-p 68 if part == nil { 69 return 0, errors.New("BOOM") 70 } 71 72 return *part, nil 73} 74 75func (p testPartitioner) RequiresConsistency() bool { 76 return true 77} 78 79func (p testPartitioner) feed(partition int32) { 80 p <- &partition 81} 82 83type flakyEncoder bool 84 85func (f flakyEncoder) Length() int { 86 return len(TestMessage) 87} 88 89func (f flakyEncoder) Encode() ([]byte, error) { 90 if !bool(f) { 91 return nil, errors.New("flaky encoding error") 92 } 93 return []byte(TestMessage), nil 94} 95 96func TestAsyncProducer(t *testing.T) { 97 seedBroker := NewMockBroker(t, 1) 98 leader := NewMockBroker(t, 2) 99 100 metadataResponse := new(MetadataResponse) 101 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 102 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 103 seedBroker.Returns(metadataResponse) 104 105 prodSuccess := new(ProduceResponse) 106 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 107 leader.Returns(prodSuccess) 108 109 config := NewConfig() 110 config.Producer.Flush.Messages = 10 111 config.Producer.Return.Successes = true 112 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 113 if err != nil { 114 t.Fatal(err) 115 } 116 117 for i := 0; i < 10; i++ { 118 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i} 119 } 120 for i := 0; i < 10; i++ { 121 select { 122 case msg := <-producer.Errors(): 123 t.Error(msg.Err) 124 if msg.Msg.flags != 0 { 125 t.Error("Message had flags set") 126 } 127 case msg := <-producer.Successes(): 128 if msg.flags != 0 { 129 t.Error("Message had flags set") 130 } 131 if msg.Metadata.(int) != i { 132 t.Error("Message metadata did not match") 133 } 134 case <-time.After(time.Second): 135 t.Errorf("Timeout waiting for msg #%d", i) 136 goto done 137 } 138 } 139done: 140 closeProducer(t, producer) 141 leader.Close() 142 seedBroker.Close() 143} 144 145func TestAsyncProducerMultipleFlushes(t *testing.T) { 146 seedBroker := NewMockBroker(t, 1) 147 leader := NewMockBroker(t, 2) 148 149 metadataResponse := new(MetadataResponse) 150 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 151 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 152 seedBroker.Returns(metadataResponse) 153 154 prodSuccess := new(ProduceResponse) 155 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 156 leader.Returns(prodSuccess) 157 leader.Returns(prodSuccess) 158 leader.Returns(prodSuccess) 159 160 config := NewConfig() 161 config.Producer.Flush.Messages = 5 162 config.Producer.Return.Successes = true 163 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 164 if err != nil { 165 t.Fatal(err) 166 } 167 168 for flush := 0; flush < 3; flush++ { 169 for i := 0; i < 5; i++ { 170 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 171 } 172 expectResults(t, producer, 5, 0) 173 } 174 175 closeProducer(t, producer) 176 leader.Close() 177 seedBroker.Close() 178} 179 180func TestAsyncProducerMultipleBrokers(t *testing.T) { 181 seedBroker := NewMockBroker(t, 1) 182 leader0 := NewMockBroker(t, 2) 183 leader1 := NewMockBroker(t, 3) 184 185 metadataResponse := new(MetadataResponse) 186 metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID()) 187 metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID()) 188 metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError) 189 metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError) 190 seedBroker.Returns(metadataResponse) 191 192 prodResponse0 := new(ProduceResponse) 193 prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError) 194 leader0.Returns(prodResponse0) 195 196 prodResponse1 := new(ProduceResponse) 197 prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError) 198 leader1.Returns(prodResponse1) 199 200 config := NewConfig() 201 config.Producer.Flush.Messages = 5 202 config.Producer.Return.Successes = true 203 config.Producer.Partitioner = NewRoundRobinPartitioner 204 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 205 if err != nil { 206 t.Fatal(err) 207 } 208 209 for i := 0; i < 10; i++ { 210 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 211 } 212 expectResults(t, producer, 10, 0) 213 214 closeProducer(t, producer) 215 leader1.Close() 216 leader0.Close() 217 seedBroker.Close() 218} 219 220func TestAsyncProducerCustomPartitioner(t *testing.T) { 221 seedBroker := NewMockBroker(t, 1) 222 leader := NewMockBroker(t, 2) 223 224 metadataResponse := new(MetadataResponse) 225 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 226 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 227 seedBroker.Returns(metadataResponse) 228 229 prodResponse := new(ProduceResponse) 230 prodResponse.AddTopicPartition("my_topic", 0, ErrNoError) 231 leader.Returns(prodResponse) 232 233 config := NewConfig() 234 config.Producer.Flush.Messages = 2 235 config.Producer.Return.Successes = true 236 config.Producer.Partitioner = func(topic string) Partitioner { 237 p := make(testPartitioner) 238 go func() { 239 p.feed(0) 240 p <- nil 241 p <- nil 242 p <- nil 243 p.feed(0) 244 }() 245 return p 246 } 247 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 248 if err != nil { 249 t.Fatal(err) 250 } 251 252 for i := 0; i < 5; i++ { 253 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 254 } 255 expectResults(t, producer, 2, 3) 256 257 closeProducer(t, producer) 258 leader.Close() 259 seedBroker.Close() 260} 261 262func TestAsyncProducerFailureRetry(t *testing.T) { 263 seedBroker := NewMockBroker(t, 1) 264 leader1 := NewMockBroker(t, 2) 265 leader2 := NewMockBroker(t, 3) 266 267 metadataLeader1 := new(MetadataResponse) 268 metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) 269 metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) 270 seedBroker.Returns(metadataLeader1) 271 272 config := NewConfig() 273 config.Producer.Flush.Messages = 10 274 config.Producer.Return.Successes = true 275 config.Producer.Retry.Backoff = 0 276 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 277 if err != nil { 278 t.Fatal(err) 279 } 280 seedBroker.Close() 281 282 for i := 0; i < 10; i++ { 283 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 284 } 285 prodNotLeader := new(ProduceResponse) 286 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 287 leader1.Returns(prodNotLeader) 288 289 metadataLeader2 := new(MetadataResponse) 290 metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) 291 metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError) 292 leader1.Returns(metadataLeader2) 293 294 prodSuccess := new(ProduceResponse) 295 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 296 leader2.Returns(prodSuccess) 297 expectResults(t, producer, 10, 0) 298 leader1.Close() 299 300 for i := 0; i < 10; i++ { 301 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 302 } 303 leader2.Returns(prodSuccess) 304 expectResults(t, producer, 10, 0) 305 306 leader2.Close() 307 closeProducer(t, producer) 308} 309 310func TestAsyncProducerEncoderFailures(t *testing.T) { 311 seedBroker := NewMockBroker(t, 1) 312 leader := NewMockBroker(t, 2) 313 314 metadataResponse := new(MetadataResponse) 315 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 316 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 317 seedBroker.Returns(metadataResponse) 318 319 prodSuccess := new(ProduceResponse) 320 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 321 leader.Returns(prodSuccess) 322 leader.Returns(prodSuccess) 323 leader.Returns(prodSuccess) 324 325 config := NewConfig() 326 config.Producer.Flush.Messages = 1 327 config.Producer.Return.Successes = true 328 config.Producer.Partitioner = NewManualPartitioner 329 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 330 if err != nil { 331 t.Fatal(err) 332 } 333 334 for flush := 0; flush < 3; flush++ { 335 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)} 336 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)} 337 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} 338 expectResults(t, producer, 1, 2) 339 } 340 341 closeProducer(t, producer) 342 leader.Close() 343 seedBroker.Close() 344} 345 346// If a Kafka broker becomes unavailable and then returns back in service, then 347// producer reconnects to it and continues sending messages. 348func TestAsyncProducerBrokerBounce(t *testing.T) { 349 // Given 350 seedBroker := NewMockBroker(t, 1) 351 leader := NewMockBroker(t, 2) 352 leaderAddr := leader.Addr() 353 354 metadataResponse := new(MetadataResponse) 355 metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) 356 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 357 seedBroker.Returns(metadataResponse) 358 359 prodSuccess := new(ProduceResponse) 360 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 361 362 config := NewConfig() 363 config.Producer.Flush.Messages = 1 364 config.Producer.Return.Successes = true 365 config.Producer.Retry.Backoff = 0 366 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 367 if err != nil { 368 t.Fatal(err) 369 } 370 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 371 leader.Returns(prodSuccess) 372 expectResults(t, producer, 1, 0) 373 374 // When: a broker connection gets reset by a broker (network glitch, restart, you name it). 375 leader.Close() // producer should get EOF 376 leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles 377 seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again 378 379 // Then: a produced message goes through the new broker connection. 380 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 381 leader.Returns(prodSuccess) 382 expectResults(t, producer, 1, 0) 383 384 closeProducer(t, producer) 385 seedBroker.Close() 386 leader.Close() 387} 388 389func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { 390 seedBroker := NewMockBroker(t, 1) 391 leader1 := NewMockBroker(t, 2) 392 leader2 := NewMockBroker(t, 3) 393 394 metadataLeader1 := new(MetadataResponse) 395 metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) 396 metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) 397 seedBroker.Returns(metadataLeader1) 398 399 config := NewConfig() 400 config.Producer.Flush.Messages = 10 401 config.Producer.Return.Successes = true 402 config.Producer.Retry.Max = 3 403 config.Producer.Retry.Backoff = 0 404 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 405 if err != nil { 406 t.Fatal(err) 407 } 408 409 for i := 0; i < 10; i++ { 410 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 411 } 412 leader1.Close() // producer should get EOF 413 seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down 414 seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down 415 416 // ok fine, tell it to go to leader2 finally 417 metadataLeader2 := new(MetadataResponse) 418 metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) 419 metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError) 420 seedBroker.Returns(metadataLeader2) 421 422 prodSuccess := new(ProduceResponse) 423 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 424 leader2.Returns(prodSuccess) 425 expectResults(t, producer, 10, 0) 426 seedBroker.Close() 427 leader2.Close() 428 429 closeProducer(t, producer) 430} 431 432func TestAsyncProducerMultipleRetries(t *testing.T) { 433 seedBroker := NewMockBroker(t, 1) 434 leader1 := NewMockBroker(t, 2) 435 leader2 := NewMockBroker(t, 3) 436 437 metadataLeader1 := new(MetadataResponse) 438 metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) 439 metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) 440 seedBroker.Returns(metadataLeader1) 441 442 config := NewConfig() 443 config.Producer.Flush.Messages = 10 444 config.Producer.Return.Successes = true 445 config.Producer.Retry.Max = 4 446 config.Producer.Retry.Backoff = 0 447 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 448 if err != nil { 449 t.Fatal(err) 450 } 451 452 for i := 0; i < 10; i++ { 453 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 454 } 455 prodNotLeader := new(ProduceResponse) 456 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 457 leader1.Returns(prodNotLeader) 458 459 metadataLeader2 := new(MetadataResponse) 460 metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) 461 metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError) 462 seedBroker.Returns(metadataLeader2) 463 leader2.Returns(prodNotLeader) 464 seedBroker.Returns(metadataLeader1) 465 leader1.Returns(prodNotLeader) 466 seedBroker.Returns(metadataLeader1) 467 leader1.Returns(prodNotLeader) 468 seedBroker.Returns(metadataLeader2) 469 470 prodSuccess := new(ProduceResponse) 471 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 472 leader2.Returns(prodSuccess) 473 expectResults(t, producer, 10, 0) 474 475 for i := 0; i < 10; i++ { 476 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 477 } 478 leader2.Returns(prodSuccess) 479 expectResults(t, producer, 10, 0) 480 481 seedBroker.Close() 482 leader1.Close() 483 leader2.Close() 484 closeProducer(t, producer) 485} 486 487func TestAsyncProducerOutOfRetries(t *testing.T) { 488 t.Skip("Enable once bug #294 is fixed.") 489 490 seedBroker := NewMockBroker(t, 1) 491 leader := NewMockBroker(t, 2) 492 493 metadataResponse := new(MetadataResponse) 494 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 495 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 496 seedBroker.Returns(metadataResponse) 497 498 config := NewConfig() 499 config.Producer.Flush.Messages = 10 500 config.Producer.Return.Successes = true 501 config.Producer.Retry.Backoff = 0 502 config.Producer.Retry.Max = 0 503 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 504 if err != nil { 505 t.Fatal(err) 506 } 507 508 for i := 0; i < 10; i++ { 509 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 510 } 511 512 prodNotLeader := new(ProduceResponse) 513 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 514 leader.Returns(prodNotLeader) 515 516 for i := 0; i < 10; i++ { 517 select { 518 case msg := <-producer.Errors(): 519 if msg.Err != ErrNotLeaderForPartition { 520 t.Error(msg.Err) 521 } 522 case <-producer.Successes(): 523 t.Error("Unexpected success") 524 } 525 } 526 527 seedBroker.Returns(metadataResponse) 528 529 for i := 0; i < 10; i++ { 530 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 531 } 532 533 prodSuccess := new(ProduceResponse) 534 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 535 leader.Returns(prodSuccess) 536 537 expectResults(t, producer, 10, 0) 538 539 leader.Close() 540 seedBroker.Close() 541 safeClose(t, producer) 542} 543 544func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { 545 seedBroker := NewMockBroker(t, 1) 546 leader := NewMockBroker(t, 2) 547 leaderAddr := leader.Addr() 548 549 metadataResponse := new(MetadataResponse) 550 metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) 551 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 552 metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) 553 seedBroker.Returns(metadataResponse) 554 555 config := NewConfig() 556 config.Producer.Return.Successes = true 557 config.Producer.Retry.Backoff = 0 558 config.Producer.Retry.Max = 1 559 config.Producer.Partitioner = NewRoundRobinPartitioner 560 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 561 if err != nil { 562 t.Fatal(err) 563 } 564 565 // prime partition 0 566 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 567 prodSuccess := new(ProduceResponse) 568 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 569 leader.Returns(prodSuccess) 570 expectResults(t, producer, 1, 0) 571 572 // prime partition 1 573 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 574 prodSuccess = new(ProduceResponse) 575 prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError) 576 leader.Returns(prodSuccess) 577 expectResults(t, producer, 1, 0) 578 579 // reboot the broker (the producer will get EOF on its existing connection) 580 leader.Close() 581 leader = NewMockBrokerAddr(t, 2, leaderAddr) 582 583 // send another message on partition 0 to trigger the EOF and retry 584 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 585 586 // tell partition 0 to go to that broker again 587 seedBroker.Returns(metadataResponse) 588 589 // succeed this time 590 prodSuccess = new(ProduceResponse) 591 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 592 leader.Returns(prodSuccess) 593 expectResults(t, producer, 1, 0) 594 595 // shutdown 596 closeProducer(t, producer) 597 seedBroker.Close() 598 leader.Close() 599} 600 601func TestAsyncProducerFlusherRetryCondition(t *testing.T) { 602 seedBroker := NewMockBroker(t, 1) 603 leader := NewMockBroker(t, 2) 604 605 metadataResponse := new(MetadataResponse) 606 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 607 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 608 metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) 609 seedBroker.Returns(metadataResponse) 610 611 config := NewConfig() 612 config.Producer.Flush.Messages = 5 613 config.Producer.Return.Successes = true 614 config.Producer.Retry.Backoff = 0 615 config.Producer.Retry.Max = 1 616 config.Producer.Partitioner = NewManualPartitioner 617 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 618 if err != nil { 619 t.Fatal(err) 620 } 621 622 // prime partitions 623 for p := int32(0); p < 2; p++ { 624 for i := 0; i < 5; i++ { 625 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p} 626 } 627 prodSuccess := new(ProduceResponse) 628 prodSuccess.AddTopicPartition("my_topic", p, ErrNoError) 629 leader.Returns(prodSuccess) 630 expectResults(t, producer, 5, 0) 631 } 632 633 // send more messages on partition 0 634 for i := 0; i < 5; i++ { 635 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} 636 } 637 prodNotLeader := new(ProduceResponse) 638 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 639 leader.Returns(prodNotLeader) 640 641 time.Sleep(50 * time.Millisecond) 642 643 leader.SetHandlerByMap(map[string]MockResponse{ 644 "ProduceRequest": NewMockProduceResponse(t). 645 SetVersion(0). 646 SetError("my_topic", 0, ErrNoError), 647 }) 648 649 // tell partition 0 to go to that broker again 650 seedBroker.Returns(metadataResponse) 651 652 // succeed this time 653 expectResults(t, producer, 5, 0) 654 655 // put five more through 656 for i := 0; i < 5; i++ { 657 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} 658 } 659 expectResults(t, producer, 5, 0) 660 661 // shutdown 662 closeProducer(t, producer) 663 seedBroker.Close() 664 leader.Close() 665} 666 667func TestAsyncProducerRetryShutdown(t *testing.T) { 668 seedBroker := NewMockBroker(t, 1) 669 leader := NewMockBroker(t, 2) 670 671 metadataLeader := new(MetadataResponse) 672 metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) 673 metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 674 seedBroker.Returns(metadataLeader) 675 676 config := NewConfig() 677 config.Producer.Flush.Messages = 10 678 config.Producer.Return.Successes = true 679 config.Producer.Retry.Backoff = 0 680 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 681 if err != nil { 682 t.Fatal(err) 683 } 684 685 for i := 0; i < 10; i++ { 686 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 687 } 688 producer.AsyncClose() 689 time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in 690 691 producer.Input() <- &ProducerMessage{Topic: "FOO"} 692 if err := <-producer.Errors(); err.Err != ErrShuttingDown { 693 t.Error(err) 694 } 695 696 prodNotLeader := new(ProduceResponse) 697 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 698 leader.Returns(prodNotLeader) 699 700 seedBroker.Returns(metadataLeader) 701 702 prodSuccess := new(ProduceResponse) 703 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 704 leader.Returns(prodSuccess) 705 expectResults(t, producer, 10, 0) 706 707 seedBroker.Close() 708 leader.Close() 709 710 // wait for the async-closed producer to shut down fully 711 for err := range producer.Errors() { 712 t.Error(err) 713 } 714} 715 716func TestAsyncProducerNoReturns(t *testing.T) { 717 seedBroker := NewMockBroker(t, 1) 718 leader := NewMockBroker(t, 2) 719 720 metadataLeader := new(MetadataResponse) 721 metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) 722 metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 723 seedBroker.Returns(metadataLeader) 724 725 config := NewConfig() 726 config.Producer.Flush.Messages = 10 727 config.Producer.Return.Successes = false 728 config.Producer.Return.Errors = false 729 config.Producer.Retry.Backoff = 0 730 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 731 if err != nil { 732 t.Fatal(err) 733 } 734 735 for i := 0; i < 10; i++ { 736 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 737 } 738 739 wait := make(chan bool) 740 go func() { 741 if err := producer.Close(); err != nil { 742 t.Error(err) 743 } 744 close(wait) 745 }() 746 747 prodSuccess := new(ProduceResponse) 748 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 749 leader.Returns(prodSuccess) 750 751 <-wait 752 seedBroker.Close() 753 leader.Close() 754} 755 756// This example shows how to use the producer while simultaneously 757// reading the Errors channel to know about any failures. 758func ExampleAsyncProducer_select() { 759 producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil) 760 if err != nil { 761 panic(err) 762 } 763 764 defer func() { 765 if err := producer.Close(); err != nil { 766 log.Fatalln(err) 767 } 768 }() 769 770 // Trap SIGINT to trigger a shutdown. 771 signals := make(chan os.Signal, 1) 772 signal.Notify(signals, os.Interrupt) 773 774 var enqueued, errors int 775ProducerLoop: 776 for { 777 select { 778 case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}: 779 enqueued++ 780 case err := <-producer.Errors(): 781 log.Println("Failed to produce message", err) 782 errors++ 783 case <-signals: 784 break ProducerLoop 785 } 786 } 787 788 log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors) 789} 790 791// This example shows how to use the producer with separate goroutines 792// reading from the Successes and Errors channels. Note that in order 793// for the Successes channel to be populated, you have to set 794// config.Producer.Return.Successes to true. 795func ExampleAsyncProducer_goroutines() { 796 config := NewConfig() 797 config.Producer.Return.Successes = true 798 producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) 799 if err != nil { 800 panic(err) 801 } 802 803 // Trap SIGINT to trigger a graceful shutdown. 804 signals := make(chan os.Signal, 1) 805 signal.Notify(signals, os.Interrupt) 806 807 var ( 808 wg sync.WaitGroup 809 enqueued, successes, errors int 810 ) 811 812 wg.Add(1) 813 go func() { 814 defer wg.Done() 815 for range producer.Successes() { 816 successes++ 817 } 818 }() 819 820 wg.Add(1) 821 go func() { 822 defer wg.Done() 823 for err := range producer.Errors() { 824 log.Println(err) 825 errors++ 826 } 827 }() 828 829ProducerLoop: 830 for { 831 message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")} 832 select { 833 case producer.Input() <- message: 834 enqueued++ 835 836 case <-signals: 837 producer.AsyncClose() // Trigger a shutdown of the producer. 838 break ProducerLoop 839 } 840 } 841 842 wg.Wait() 843 844 log.Printf("Successfully produced: %d; errors: %d\n", successes, errors) 845} 846