1package sarama 2 3import ( 4 "errors" 5 "log" 6 "os" 7 "os/signal" 8 "sync" 9 "testing" 10 "time" 11) 12 13const TestMessage = "ABC THE MESSAGE" 14 15func closeProducer(t *testing.T, p AsyncProducer) { 16 var wg sync.WaitGroup 17 p.AsyncClose() 18 19 wg.Add(2) 20 go func() { 21 for range p.Successes() { 22 t.Error("Unexpected message on Successes()") 23 } 24 wg.Done() 25 }() 26 go func() { 27 for msg := range p.Errors() { 28 t.Error(msg.Err) 29 } 30 wg.Done() 31 }() 32 wg.Wait() 33} 34 35func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { 36 expect := successes + errors 37 for expect > 0 { 38 select { 39 case msg := <-p.Errors(): 40 if msg.Msg.flags != 0 { 41 t.Error("Message had flags set") 42 } 43 errors-- 44 expect-- 45 if errors < 0 { 46 t.Error(msg.Err) 47 } 48 case msg := <-p.Successes(): 49 if msg.flags != 0 { 50 t.Error("Message had flags set") 51 } 52 successes-- 53 expect-- 54 if successes < 0 { 55 t.Error("Too many successes") 56 } 57 } 58 } 59 if successes != 0 || errors != 0 { 60 t.Error("Unexpected successes", successes, "or errors", errors) 61 } 62} 63 64type testPartitioner chan *int32 65 66func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) { 67 part := <-p 68 if part == nil { 69 return 0, errors.New("BOOM") 70 } 71 72 return *part, nil 73} 74 75func (p testPartitioner) RequiresConsistency() bool { 76 return true 77} 78 79func (p testPartitioner) feed(partition int32) { 80 p <- &partition 81} 82 83type flakyEncoder bool 84 85func (f flakyEncoder) Length() int { 86 return len(TestMessage) 87} 88 89func (f flakyEncoder) Encode() ([]byte, error) { 90 if !bool(f) { 91 return nil, errors.New("flaky encoding error") 92 } 93 return []byte(TestMessage), nil 94} 95 96func TestAsyncProducer(t *testing.T) { 97 seedBroker := NewMockBroker(t, 1) 98 leader := NewMockBroker(t, 2) 99 100 metadataResponse := new(MetadataResponse) 101 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 102 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 103 seedBroker.Returns(metadataResponse) 104 105 prodSuccess := new(ProduceResponse) 106 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 107 leader.Returns(prodSuccess) 108 109 config := NewConfig() 110 config.Producer.Flush.Messages = 10 111 config.Producer.Return.Successes = true 112 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 113 if err != nil { 114 t.Fatal(err) 115 } 116 117 for i := 0; i < 10; i++ { 118 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i} 119 } 120 for i := 0; i < 10; i++ { 121 select { 122 case msg := <-producer.Errors(): 123 t.Error(msg.Err) 124 if msg.Msg.flags != 0 { 125 t.Error("Message had flags set") 126 } 127 case msg := <-producer.Successes(): 128 if msg.flags != 0 { 129 t.Error("Message had flags set") 130 } 131 if msg.Metadata.(int) != i { 132 t.Error("Message metadata did not match") 133 } 134 } 135 } 136 137 closeProducer(t, producer) 138 leader.Close() 139 seedBroker.Close() 140} 141 142func TestAsyncProducerMultipleFlushes(t *testing.T) { 143 seedBroker := NewMockBroker(t, 1) 144 leader := NewMockBroker(t, 2) 145 146 metadataResponse := new(MetadataResponse) 147 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 148 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 149 seedBroker.Returns(metadataResponse) 150 151 prodSuccess := new(ProduceResponse) 152 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 153 leader.Returns(prodSuccess) 154 leader.Returns(prodSuccess) 155 leader.Returns(prodSuccess) 156 157 config := NewConfig() 158 config.Producer.Flush.Messages = 5 159 config.Producer.Return.Successes = true 160 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 161 if err != nil { 162 t.Fatal(err) 163 } 164 165 for flush := 0; flush < 3; flush++ { 166 for i := 0; i < 5; i++ { 167 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 168 } 169 expectResults(t, producer, 5, 0) 170 } 171 172 closeProducer(t, producer) 173 leader.Close() 174 seedBroker.Close() 175} 176 177func TestAsyncProducerMultipleBrokers(t *testing.T) { 178 seedBroker := NewMockBroker(t, 1) 179 leader0 := NewMockBroker(t, 2) 180 leader1 := NewMockBroker(t, 3) 181 182 metadataResponse := new(MetadataResponse) 183 metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID()) 184 metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID()) 185 metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError) 186 metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError) 187 seedBroker.Returns(metadataResponse) 188 189 prodResponse0 := new(ProduceResponse) 190 prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError) 191 leader0.Returns(prodResponse0) 192 193 prodResponse1 := new(ProduceResponse) 194 prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError) 195 leader1.Returns(prodResponse1) 196 197 config := NewConfig() 198 config.Producer.Flush.Messages = 5 199 config.Producer.Return.Successes = true 200 config.Producer.Partitioner = NewRoundRobinPartitioner 201 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 202 if err != nil { 203 t.Fatal(err) 204 } 205 206 for i := 0; i < 10; i++ { 207 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 208 } 209 expectResults(t, producer, 10, 0) 210 211 closeProducer(t, producer) 212 leader1.Close() 213 leader0.Close() 214 seedBroker.Close() 215} 216 217func TestAsyncProducerCustomPartitioner(t *testing.T) { 218 seedBroker := NewMockBroker(t, 1) 219 leader := NewMockBroker(t, 2) 220 221 metadataResponse := new(MetadataResponse) 222 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 223 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 224 seedBroker.Returns(metadataResponse) 225 226 prodResponse := new(ProduceResponse) 227 prodResponse.AddTopicPartition("my_topic", 0, ErrNoError) 228 leader.Returns(prodResponse) 229 230 config := NewConfig() 231 config.Producer.Flush.Messages = 2 232 config.Producer.Return.Successes = true 233 config.Producer.Partitioner = func(topic string) Partitioner { 234 p := make(testPartitioner) 235 go func() { 236 p.feed(0) 237 p <- nil 238 p <- nil 239 p <- nil 240 p.feed(0) 241 }() 242 return p 243 } 244 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 245 if err != nil { 246 t.Fatal(err) 247 } 248 249 for i := 0; i < 5; i++ { 250 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 251 } 252 expectResults(t, producer, 2, 3) 253 254 closeProducer(t, producer) 255 leader.Close() 256 seedBroker.Close() 257} 258 259func TestAsyncProducerFailureRetry(t *testing.T) { 260 seedBroker := NewMockBroker(t, 1) 261 leader1 := NewMockBroker(t, 2) 262 leader2 := NewMockBroker(t, 3) 263 264 metadataLeader1 := new(MetadataResponse) 265 metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) 266 metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) 267 seedBroker.Returns(metadataLeader1) 268 269 config := NewConfig() 270 config.Producer.Flush.Messages = 10 271 config.Producer.Return.Successes = true 272 config.Producer.Retry.Backoff = 0 273 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 274 if err != nil { 275 t.Fatal(err) 276 } 277 seedBroker.Close() 278 279 for i := 0; i < 10; i++ { 280 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 281 } 282 prodNotLeader := new(ProduceResponse) 283 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 284 leader1.Returns(prodNotLeader) 285 286 metadataLeader2 := new(MetadataResponse) 287 metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) 288 metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError) 289 leader1.Returns(metadataLeader2) 290 291 prodSuccess := new(ProduceResponse) 292 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 293 leader2.Returns(prodSuccess) 294 expectResults(t, producer, 10, 0) 295 leader1.Close() 296 297 for i := 0; i < 10; i++ { 298 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 299 } 300 leader2.Returns(prodSuccess) 301 expectResults(t, producer, 10, 0) 302 303 leader2.Close() 304 closeProducer(t, producer) 305} 306 307func TestAsyncProducerEncoderFailures(t *testing.T) { 308 seedBroker := NewMockBroker(t, 1) 309 leader := NewMockBroker(t, 2) 310 311 metadataResponse := new(MetadataResponse) 312 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 313 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 314 seedBroker.Returns(metadataResponse) 315 316 prodSuccess := new(ProduceResponse) 317 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 318 leader.Returns(prodSuccess) 319 leader.Returns(prodSuccess) 320 leader.Returns(prodSuccess) 321 322 config := NewConfig() 323 config.Producer.Flush.Messages = 1 324 config.Producer.Return.Successes = true 325 config.Producer.Partitioner = NewManualPartitioner 326 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 327 if err != nil { 328 t.Fatal(err) 329 } 330 331 for flush := 0; flush < 3; flush++ { 332 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)} 333 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)} 334 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} 335 expectResults(t, producer, 1, 2) 336 } 337 338 closeProducer(t, producer) 339 leader.Close() 340 seedBroker.Close() 341} 342 343// If a Kafka broker becomes unavailable and then returns back in service, then 344// producer reconnects to it and continues sending messages. 345func TestAsyncProducerBrokerBounce(t *testing.T) { 346 // Given 347 seedBroker := NewMockBroker(t, 1) 348 leader := NewMockBroker(t, 2) 349 leaderAddr := leader.Addr() 350 351 metadataResponse := new(MetadataResponse) 352 metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) 353 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 354 seedBroker.Returns(metadataResponse) 355 356 prodSuccess := new(ProduceResponse) 357 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 358 359 config := NewConfig() 360 config.Producer.Flush.Messages = 1 361 config.Producer.Return.Successes = true 362 config.Producer.Retry.Backoff = 0 363 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 364 if err != nil { 365 t.Fatal(err) 366 } 367 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 368 leader.Returns(prodSuccess) 369 expectResults(t, producer, 1, 0) 370 371 // When: a broker connection gets reset by a broker (network glitch, restart, you name it). 372 leader.Close() // producer should get EOF 373 leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles 374 seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again 375 376 // Then: a produced message goes through the new broker connection. 377 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 378 leader.Returns(prodSuccess) 379 expectResults(t, producer, 1, 0) 380 381 closeProducer(t, producer) 382 seedBroker.Close() 383 leader.Close() 384} 385 386func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { 387 seedBroker := NewMockBroker(t, 1) 388 leader1 := NewMockBroker(t, 2) 389 leader2 := NewMockBroker(t, 3) 390 391 metadataLeader1 := new(MetadataResponse) 392 metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) 393 metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) 394 seedBroker.Returns(metadataLeader1) 395 396 config := NewConfig() 397 config.Producer.Flush.Messages = 10 398 config.Producer.Return.Successes = true 399 config.Producer.Retry.Max = 3 400 config.Producer.Retry.Backoff = 0 401 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 402 if err != nil { 403 t.Fatal(err) 404 } 405 406 for i := 0; i < 10; i++ { 407 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 408 } 409 leader1.Close() // producer should get EOF 410 seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down 411 seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down 412 413 // ok fine, tell it to go to leader2 finally 414 metadataLeader2 := new(MetadataResponse) 415 metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) 416 metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError) 417 seedBroker.Returns(metadataLeader2) 418 419 prodSuccess := new(ProduceResponse) 420 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 421 leader2.Returns(prodSuccess) 422 expectResults(t, producer, 10, 0) 423 seedBroker.Close() 424 leader2.Close() 425 426 closeProducer(t, producer) 427} 428 429func TestAsyncProducerMultipleRetries(t *testing.T) { 430 seedBroker := NewMockBroker(t, 1) 431 leader1 := NewMockBroker(t, 2) 432 leader2 := NewMockBroker(t, 3) 433 434 metadataLeader1 := new(MetadataResponse) 435 metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) 436 metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError) 437 seedBroker.Returns(metadataLeader1) 438 439 config := NewConfig() 440 config.Producer.Flush.Messages = 10 441 config.Producer.Return.Successes = true 442 config.Producer.Retry.Max = 4 443 config.Producer.Retry.Backoff = 0 444 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 445 if err != nil { 446 t.Fatal(err) 447 } 448 449 for i := 0; i < 10; i++ { 450 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 451 } 452 prodNotLeader := new(ProduceResponse) 453 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 454 leader1.Returns(prodNotLeader) 455 456 metadataLeader2 := new(MetadataResponse) 457 metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) 458 metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError) 459 seedBroker.Returns(metadataLeader2) 460 leader2.Returns(prodNotLeader) 461 seedBroker.Returns(metadataLeader1) 462 leader1.Returns(prodNotLeader) 463 seedBroker.Returns(metadataLeader1) 464 leader1.Returns(prodNotLeader) 465 seedBroker.Returns(metadataLeader2) 466 467 prodSuccess := new(ProduceResponse) 468 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 469 leader2.Returns(prodSuccess) 470 expectResults(t, producer, 10, 0) 471 472 for i := 0; i < 10; i++ { 473 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 474 } 475 leader2.Returns(prodSuccess) 476 expectResults(t, producer, 10, 0) 477 478 seedBroker.Close() 479 leader1.Close() 480 leader2.Close() 481 closeProducer(t, producer) 482} 483 484func TestAsyncProducerOutOfRetries(t *testing.T) { 485 t.Skip("Enable once bug #294 is fixed.") 486 487 seedBroker := NewMockBroker(t, 1) 488 leader := NewMockBroker(t, 2) 489 490 metadataResponse := new(MetadataResponse) 491 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 492 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 493 seedBroker.Returns(metadataResponse) 494 495 config := NewConfig() 496 config.Producer.Flush.Messages = 10 497 config.Producer.Return.Successes = true 498 config.Producer.Retry.Backoff = 0 499 config.Producer.Retry.Max = 0 500 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 501 if err != nil { 502 t.Fatal(err) 503 } 504 505 for i := 0; i < 10; i++ { 506 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 507 } 508 509 prodNotLeader := new(ProduceResponse) 510 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 511 leader.Returns(prodNotLeader) 512 513 for i := 0; i < 10; i++ { 514 select { 515 case msg := <-producer.Errors(): 516 if msg.Err != ErrNotLeaderForPartition { 517 t.Error(msg.Err) 518 } 519 case <-producer.Successes(): 520 t.Error("Unexpected success") 521 } 522 } 523 524 seedBroker.Returns(metadataResponse) 525 526 for i := 0; i < 10; i++ { 527 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 528 } 529 530 prodSuccess := new(ProduceResponse) 531 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 532 leader.Returns(prodSuccess) 533 534 expectResults(t, producer, 10, 0) 535 536 leader.Close() 537 seedBroker.Close() 538 safeClose(t, producer) 539} 540 541func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { 542 seedBroker := NewMockBroker(t, 1) 543 leader := NewMockBroker(t, 2) 544 leaderAddr := leader.Addr() 545 546 metadataResponse := new(MetadataResponse) 547 metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) 548 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 549 metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) 550 seedBroker.Returns(metadataResponse) 551 552 config := NewConfig() 553 config.Producer.Return.Successes = true 554 config.Producer.Retry.Backoff = 0 555 config.Producer.Retry.Max = 1 556 config.Producer.Partitioner = NewRoundRobinPartitioner 557 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 558 if err != nil { 559 t.Fatal(err) 560 } 561 562 // prime partition 0 563 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 564 prodSuccess := new(ProduceResponse) 565 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 566 leader.Returns(prodSuccess) 567 expectResults(t, producer, 1, 0) 568 569 // prime partition 1 570 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 571 prodSuccess = new(ProduceResponse) 572 prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError) 573 leader.Returns(prodSuccess) 574 expectResults(t, producer, 1, 0) 575 576 // reboot the broker (the producer will get EOF on its existing connection) 577 leader.Close() 578 leader = NewMockBrokerAddr(t, 2, leaderAddr) 579 580 // send another message on partition 0 to trigger the EOF and retry 581 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 582 583 // tell partition 0 to go to that broker again 584 seedBroker.Returns(metadataResponse) 585 586 // succeed this time 587 prodSuccess = new(ProduceResponse) 588 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 589 leader.Returns(prodSuccess) 590 expectResults(t, producer, 1, 0) 591 592 // shutdown 593 closeProducer(t, producer) 594 seedBroker.Close() 595 leader.Close() 596} 597 598func TestAsyncProducerFlusherRetryCondition(t *testing.T) { 599 seedBroker := NewMockBroker(t, 1) 600 leader := NewMockBroker(t, 2) 601 602 metadataResponse := new(MetadataResponse) 603 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) 604 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 605 metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) 606 seedBroker.Returns(metadataResponse) 607 608 config := NewConfig() 609 config.Producer.Flush.Messages = 5 610 config.Producer.Return.Successes = true 611 config.Producer.Retry.Backoff = 0 612 config.Producer.Retry.Max = 1 613 config.Producer.Partitioner = NewManualPartitioner 614 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 615 if err != nil { 616 t.Fatal(err) 617 } 618 619 // prime partitions 620 for p := int32(0); p < 2; p++ { 621 for i := 0; i < 5; i++ { 622 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p} 623 } 624 prodSuccess := new(ProduceResponse) 625 prodSuccess.AddTopicPartition("my_topic", p, ErrNoError) 626 leader.Returns(prodSuccess) 627 expectResults(t, producer, 5, 0) 628 } 629 630 // send more messages on partition 0 631 for i := 0; i < 5; i++ { 632 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} 633 } 634 prodNotLeader := new(ProduceResponse) 635 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 636 leader.Returns(prodNotLeader) 637 638 time.Sleep(50 * time.Millisecond) 639 640 leader.SetHandlerByMap(map[string]MockResponse{ 641 "ProduceRequest": NewMockProduceResponse(t). 642 SetError("my_topic", 0, ErrNoError), 643 }) 644 645 // tell partition 0 to go to that broker again 646 seedBroker.Returns(metadataResponse) 647 648 // succeed this time 649 expectResults(t, producer, 5, 0) 650 651 // put five more through 652 for i := 0; i < 5; i++ { 653 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} 654 } 655 expectResults(t, producer, 5, 0) 656 657 // shutdown 658 closeProducer(t, producer) 659 seedBroker.Close() 660 leader.Close() 661} 662 663func TestAsyncProducerRetryShutdown(t *testing.T) { 664 seedBroker := NewMockBroker(t, 1) 665 leader := NewMockBroker(t, 2) 666 667 metadataLeader := new(MetadataResponse) 668 metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) 669 metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 670 seedBroker.Returns(metadataLeader) 671 672 config := NewConfig() 673 config.Producer.Flush.Messages = 10 674 config.Producer.Return.Successes = true 675 config.Producer.Retry.Backoff = 0 676 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 677 if err != nil { 678 t.Fatal(err) 679 } 680 681 for i := 0; i < 10; i++ { 682 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 683 } 684 producer.AsyncClose() 685 time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in 686 687 producer.Input() <- &ProducerMessage{Topic: "FOO"} 688 if err := <-producer.Errors(); err.Err != ErrShuttingDown { 689 t.Error(err) 690 } 691 692 prodNotLeader := new(ProduceResponse) 693 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) 694 leader.Returns(prodNotLeader) 695 696 seedBroker.Returns(metadataLeader) 697 698 prodSuccess := new(ProduceResponse) 699 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 700 leader.Returns(prodSuccess) 701 expectResults(t, producer, 10, 0) 702 703 seedBroker.Close() 704 leader.Close() 705 706 // wait for the async-closed producer to shut down fully 707 for err := range producer.Errors() { 708 t.Error(err) 709 } 710} 711 712func TestAsyncProducerNoReturns(t *testing.T) { 713 seedBroker := NewMockBroker(t, 1) 714 leader := NewMockBroker(t, 2) 715 716 metadataLeader := new(MetadataResponse) 717 metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) 718 metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) 719 seedBroker.Returns(metadataLeader) 720 721 config := NewConfig() 722 config.Producer.Flush.Messages = 10 723 config.Producer.Return.Successes = false 724 config.Producer.Return.Errors = false 725 config.Producer.Retry.Backoff = 0 726 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) 727 if err != nil { 728 t.Fatal(err) 729 } 730 731 for i := 0; i < 10; i++ { 732 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} 733 } 734 735 wait := make(chan bool) 736 go func() { 737 if err := producer.Close(); err != nil { 738 t.Error(err) 739 } 740 close(wait) 741 }() 742 743 prodSuccess := new(ProduceResponse) 744 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) 745 leader.Returns(prodSuccess) 746 747 <-wait 748 seedBroker.Close() 749 leader.Close() 750} 751 752// This example shows how to use the producer while simultaneously 753// reading the Errors channel to know about any failures. 754func ExampleAsyncProducer_select() { 755 producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil) 756 if err != nil { 757 panic(err) 758 } 759 760 defer func() { 761 if err := producer.Close(); err != nil { 762 log.Fatalln(err) 763 } 764 }() 765 766 // Trap SIGINT to trigger a shutdown. 767 signals := make(chan os.Signal, 1) 768 signal.Notify(signals, os.Interrupt) 769 770 var enqueued, errors int 771ProducerLoop: 772 for { 773 select { 774 case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}: 775 enqueued++ 776 case err := <-producer.Errors(): 777 log.Println("Failed to produce message", err) 778 errors++ 779 case <-signals: 780 break ProducerLoop 781 } 782 } 783 784 log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors) 785} 786 787// This example shows how to use the producer with separate goroutines 788// reading from the Successes and Errors channels. Note that in order 789// for the Successes channel to be populated, you have to set 790// config.Producer.Return.Successes to true. 791func ExampleAsyncProducer_goroutines() { 792 config := NewConfig() 793 config.Producer.Return.Successes = true 794 producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) 795 if err != nil { 796 panic(err) 797 } 798 799 // Trap SIGINT to trigger a graceful shutdown. 800 signals := make(chan os.Signal, 1) 801 signal.Notify(signals, os.Interrupt) 802 803 var ( 804 wg sync.WaitGroup 805 enqueued, successes, errors int 806 ) 807 808 wg.Add(1) 809 go func() { 810 defer wg.Done() 811 for range producer.Successes() { 812 successes++ 813 } 814 }() 815 816 wg.Add(1) 817 go func() { 818 defer wg.Done() 819 for err := range producer.Errors() { 820 log.Println(err) 821 errors++ 822 } 823 }() 824 825ProducerLoop: 826 for { 827 message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")} 828 select { 829 case producer.Input() <- message: 830 enqueued++ 831 832 case <-signals: 833 producer.AsyncClose() // Trigger a shutdown of the producer. 834 break ProducerLoop 835 } 836 } 837 838 wg.Wait() 839 840 log.Printf("Successfully produced: %d; errors: %d\n", successes, errors) 841} 842