1package sarama 2 3import ( 4 "log" 5 "os" 6 "os/signal" 7 "sync" 8 "testing" 9 "time" 10) 11 12var testMsg = StringEncoder("Foo") 13 14// If a particular offset is provided then messages are consumed starting from 15// that offset. 16func TestConsumerOffsetManual(t *testing.T) { 17 // Given 18 broker0 := NewMockBroker(t, 0) 19 20 mockFetchResponse := NewMockFetchResponse(t, 1) 21 for i := 0; i < 10; i++ { 22 mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg) 23 } 24 25 broker0.SetHandlerByMap(map[string]MockResponse{ 26 "MetadataRequest": NewMockMetadataResponse(t). 27 SetBroker(broker0.Addr(), broker0.BrokerID()). 28 SetLeader("my_topic", 0, broker0.BrokerID()), 29 "OffsetRequest": NewMockOffsetResponse(t). 30 SetOffset("my_topic", 0, OffsetOldest, 0). 31 SetOffset("my_topic", 0, OffsetNewest, 2345), 32 "FetchRequest": mockFetchResponse, 33 }) 34 35 // When 36 master, err := NewConsumer([]string{broker0.Addr()}, nil) 37 if err != nil { 38 t.Fatal(err) 39 } 40 41 consumer, err := master.ConsumePartition("my_topic", 0, 1234) 42 if err != nil { 43 t.Fatal(err) 44 } 45 46 // Then: messages starting from offset 1234 are consumed. 47 for i := 0; i < 10; i++ { 48 select { 49 case message := <-consumer.Messages(): 50 assertMessageOffset(t, message, int64(i+1234)) 51 case err := <-consumer.Errors(): 52 t.Error(err) 53 } 54 } 55 56 safeClose(t, consumer) 57 safeClose(t, master) 58 broker0.Close() 59} 60 61// If `OffsetNewest` is passed as the initial offset then the first consumed 62// message is indeed corresponds to the offset that broker claims to be the 63// newest in its metadata response. 64func TestConsumerOffsetNewest(t *testing.T) { 65 // Given 66 broker0 := NewMockBroker(t, 0) 67 broker0.SetHandlerByMap(map[string]MockResponse{ 68 "MetadataRequest": NewMockMetadataResponse(t). 69 SetBroker(broker0.Addr(), broker0.BrokerID()). 70 SetLeader("my_topic", 0, broker0.BrokerID()), 71 "OffsetRequest": NewMockOffsetResponse(t). 72 SetOffset("my_topic", 0, OffsetNewest, 10). 73 SetOffset("my_topic", 0, OffsetOldest, 7), 74 "FetchRequest": NewMockFetchResponse(t, 1). 75 SetMessage("my_topic", 0, 9, testMsg). 76 SetMessage("my_topic", 0, 10, testMsg). 77 SetMessage("my_topic", 0, 11, testMsg). 78 SetHighWaterMark("my_topic", 0, 14), 79 }) 80 81 master, err := NewConsumer([]string{broker0.Addr()}, nil) 82 if err != nil { 83 t.Fatal(err) 84 } 85 86 // When 87 consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest) 88 if err != nil { 89 t.Fatal(err) 90 } 91 92 // Then 93 assertMessageOffset(t, <-consumer.Messages(), 10) 94 if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 { 95 t.Errorf("Expected high water mark offset 14, found %d", hwmo) 96 } 97 98 safeClose(t, consumer) 99 safeClose(t, master) 100 broker0.Close() 101} 102 103// It is possible to close a partition consumer and create the same anew. 104func TestConsumerRecreate(t *testing.T) { 105 // Given 106 broker0 := NewMockBroker(t, 0) 107 broker0.SetHandlerByMap(map[string]MockResponse{ 108 "MetadataRequest": NewMockMetadataResponse(t). 109 SetBroker(broker0.Addr(), broker0.BrokerID()). 110 SetLeader("my_topic", 0, broker0.BrokerID()), 111 "OffsetRequest": NewMockOffsetResponse(t). 112 SetOffset("my_topic", 0, OffsetOldest, 0). 113 SetOffset("my_topic", 0, OffsetNewest, 1000), 114 "FetchRequest": NewMockFetchResponse(t, 1). 115 SetMessage("my_topic", 0, 10, testMsg), 116 }) 117 118 c, err := NewConsumer([]string{broker0.Addr()}, nil) 119 if err != nil { 120 t.Fatal(err) 121 } 122 123 pc, err := c.ConsumePartition("my_topic", 0, 10) 124 if err != nil { 125 t.Fatal(err) 126 } 127 assertMessageOffset(t, <-pc.Messages(), 10) 128 129 // When 130 safeClose(t, pc) 131 pc, err = c.ConsumePartition("my_topic", 0, 10) 132 if err != nil { 133 t.Fatal(err) 134 } 135 136 // Then 137 assertMessageOffset(t, <-pc.Messages(), 10) 138 139 safeClose(t, pc) 140 safeClose(t, c) 141 broker0.Close() 142} 143 144// An attempt to consume the same partition twice should fail. 145func TestConsumerDuplicate(t *testing.T) { 146 // Given 147 broker0 := NewMockBroker(t, 0) 148 broker0.SetHandlerByMap(map[string]MockResponse{ 149 "MetadataRequest": NewMockMetadataResponse(t). 150 SetBroker(broker0.Addr(), broker0.BrokerID()). 151 SetLeader("my_topic", 0, broker0.BrokerID()), 152 "OffsetRequest": NewMockOffsetResponse(t). 153 SetOffset("my_topic", 0, OffsetOldest, 0). 154 SetOffset("my_topic", 0, OffsetNewest, 1000), 155 "FetchRequest": NewMockFetchResponse(t, 1), 156 }) 157 158 config := NewConfig() 159 config.ChannelBufferSize = 0 160 c, err := NewConsumer([]string{broker0.Addr()}, config) 161 if err != nil { 162 t.Fatal(err) 163 } 164 165 pc1, err := c.ConsumePartition("my_topic", 0, 0) 166 if err != nil { 167 t.Fatal(err) 168 } 169 170 // When 171 pc2, err := c.ConsumePartition("my_topic", 0, 0) 172 173 // Then 174 if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") { 175 t.Fatal("A partition cannot be consumed twice at the same time") 176 } 177 178 safeClose(t, pc1) 179 safeClose(t, c) 180 broker0.Close() 181} 182 183// If consumer fails to refresh metadata it keeps retrying with frequency 184// specified by `Config.Consumer.Retry.Backoff`. 185func TestConsumerLeaderRefreshError(t *testing.T) { 186 // Given 187 broker0 := NewMockBroker(t, 100) 188 189 // Stage 1: my_topic/0 served by broker0 190 Logger.Printf(" STAGE 1") 191 192 broker0.SetHandlerByMap(map[string]MockResponse{ 193 "MetadataRequest": NewMockMetadataResponse(t). 194 SetBroker(broker0.Addr(), broker0.BrokerID()). 195 SetLeader("my_topic", 0, broker0.BrokerID()), 196 "OffsetRequest": NewMockOffsetResponse(t). 197 SetOffset("my_topic", 0, OffsetOldest, 123). 198 SetOffset("my_topic", 0, OffsetNewest, 1000), 199 "FetchRequest": NewMockFetchResponse(t, 1). 200 SetMessage("my_topic", 0, 123, testMsg), 201 }) 202 203 config := NewConfig() 204 config.Net.ReadTimeout = 100 * time.Millisecond 205 config.Consumer.Retry.Backoff = 200 * time.Millisecond 206 config.Consumer.Return.Errors = true 207 config.Metadata.Retry.Max = 0 208 c, err := NewConsumer([]string{broker0.Addr()}, config) 209 if err != nil { 210 t.Fatal(err) 211 } 212 213 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) 214 if err != nil { 215 t.Fatal(err) 216 } 217 218 assertMessageOffset(t, <-pc.Messages(), 123) 219 220 // Stage 2: broker0 says that it is no longer the leader for my_topic/0, 221 // but the requests to retrieve metadata fail with network timeout. 222 Logger.Printf(" STAGE 2") 223 224 fetchResponse2 := &FetchResponse{} 225 fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) 226 227 broker0.SetHandlerByMap(map[string]MockResponse{ 228 "FetchRequest": NewMockWrapper(fetchResponse2), 229 }) 230 231 if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers { 232 t.Errorf("Unexpected error: %v", consErr.Err) 233 } 234 235 // Stage 3: finally the metadata returned by broker0 tells that broker1 is 236 // a new leader for my_topic/0. Consumption resumes. 237 238 Logger.Printf(" STAGE 3") 239 240 broker1 := NewMockBroker(t, 101) 241 242 broker1.SetHandlerByMap(map[string]MockResponse{ 243 "FetchRequest": NewMockFetchResponse(t, 1). 244 SetMessage("my_topic", 0, 124, testMsg), 245 }) 246 broker0.SetHandlerByMap(map[string]MockResponse{ 247 "MetadataRequest": NewMockMetadataResponse(t). 248 SetBroker(broker0.Addr(), broker0.BrokerID()). 249 SetBroker(broker1.Addr(), broker1.BrokerID()). 250 SetLeader("my_topic", 0, broker1.BrokerID()), 251 }) 252 253 assertMessageOffset(t, <-pc.Messages(), 124) 254 255 safeClose(t, pc) 256 safeClose(t, c) 257 broker1.Close() 258 broker0.Close() 259} 260 261func TestConsumerInvalidTopic(t *testing.T) { 262 // Given 263 broker0 := NewMockBroker(t, 100) 264 broker0.SetHandlerByMap(map[string]MockResponse{ 265 "MetadataRequest": NewMockMetadataResponse(t). 266 SetBroker(broker0.Addr(), broker0.BrokerID()), 267 }) 268 269 c, err := NewConsumer([]string{broker0.Addr()}, nil) 270 if err != nil { 271 t.Fatal(err) 272 } 273 274 // When 275 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) 276 277 // Then 278 if pc != nil || err != ErrUnknownTopicOrPartition { 279 t.Errorf("Should fail with, err=%v", err) 280 } 281 282 safeClose(t, c) 283 broker0.Close() 284} 285 286// Nothing bad happens if a partition consumer that has no leader assigned at 287// the moment is closed. 288func TestConsumerClosePartitionWithoutLeader(t *testing.T) { 289 // Given 290 broker0 := NewMockBroker(t, 100) 291 broker0.SetHandlerByMap(map[string]MockResponse{ 292 "MetadataRequest": NewMockMetadataResponse(t). 293 SetBroker(broker0.Addr(), broker0.BrokerID()). 294 SetLeader("my_topic", 0, broker0.BrokerID()), 295 "OffsetRequest": NewMockOffsetResponse(t). 296 SetOffset("my_topic", 0, OffsetOldest, 123). 297 SetOffset("my_topic", 0, OffsetNewest, 1000), 298 "FetchRequest": NewMockFetchResponse(t, 1). 299 SetMessage("my_topic", 0, 123, testMsg), 300 }) 301 302 config := NewConfig() 303 config.Net.ReadTimeout = 100 * time.Millisecond 304 config.Consumer.Retry.Backoff = 100 * time.Millisecond 305 config.Consumer.Return.Errors = true 306 config.Metadata.Retry.Max = 0 307 c, err := NewConsumer([]string{broker0.Addr()}, config) 308 if err != nil { 309 t.Fatal(err) 310 } 311 312 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) 313 if err != nil { 314 t.Fatal(err) 315 } 316 317 assertMessageOffset(t, <-pc.Messages(), 123) 318 319 // broker0 says that it is no longer the leader for my_topic/0, but the 320 // requests to retrieve metadata fail with network timeout. 321 fetchResponse2 := &FetchResponse{} 322 fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) 323 324 broker0.SetHandlerByMap(map[string]MockResponse{ 325 "FetchRequest": NewMockWrapper(fetchResponse2), 326 }) 327 328 // When 329 if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers { 330 t.Errorf("Unexpected error: %v", consErr.Err) 331 } 332 333 // Then: the partition consumer can be closed without any problem. 334 safeClose(t, pc) 335 safeClose(t, c) 336 broker0.Close() 337} 338 339// If the initial offset passed on partition consumer creation is out of the 340// actual offset range for the partition, then the partition consumer stops 341// immediately closing its output channels. 342func TestConsumerShutsDownOutOfRange(t *testing.T) { 343 // Given 344 broker0 := NewMockBroker(t, 0) 345 fetchResponse := new(FetchResponse) 346 fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) 347 broker0.SetHandlerByMap(map[string]MockResponse{ 348 "MetadataRequest": NewMockMetadataResponse(t). 349 SetBroker(broker0.Addr(), broker0.BrokerID()). 350 SetLeader("my_topic", 0, broker0.BrokerID()), 351 "OffsetRequest": NewMockOffsetResponse(t). 352 SetOffset("my_topic", 0, OffsetNewest, 1234). 353 SetOffset("my_topic", 0, OffsetOldest, 7), 354 "FetchRequest": NewMockWrapper(fetchResponse), 355 }) 356 357 master, err := NewConsumer([]string{broker0.Addr()}, nil) 358 if err != nil { 359 t.Fatal(err) 360 } 361 362 // When 363 consumer, err := master.ConsumePartition("my_topic", 0, 101) 364 if err != nil { 365 t.Fatal(err) 366 } 367 368 // Then: consumer should shut down closing its messages and errors channels. 369 if _, ok := <-consumer.Messages(); ok { 370 t.Error("Expected the consumer to shut down") 371 } 372 safeClose(t, consumer) 373 374 safeClose(t, master) 375 broker0.Close() 376} 377 378// If a fetch response contains messages with offsets that are smaller then 379// requested, then such messages are ignored. 380func TestConsumerExtraOffsets(t *testing.T) { 381 // Given 382 legacyFetchResponse := &FetchResponse{} 383 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) 384 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2) 385 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3) 386 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4) 387 newFetchResponse := &FetchResponse{Version: 4} 388 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1) 389 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2) 390 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3) 391 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4) 392 newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4) 393 newFetchResponse.SetLastStableOffset("my_topic", 0, 4) 394 for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { 395 var offsetResponseVersion int16 396 cfg := NewConfig() 397 cfg.Consumer.Return.Errors = true 398 if fetchResponse1.Version >= 4 { 399 cfg.Version = V0_11_0_0 400 offsetResponseVersion = 1 401 } 402 403 broker0 := NewMockBroker(t, 0) 404 fetchResponse2 := &FetchResponse{} 405 fetchResponse2.Version = fetchResponse1.Version 406 fetchResponse2.AddError("my_topic", 0, ErrNoError) 407 broker0.SetHandlerByMap(map[string]MockResponse{ 408 "MetadataRequest": NewMockMetadataResponse(t). 409 SetBroker(broker0.Addr(), broker0.BrokerID()). 410 SetLeader("my_topic", 0, broker0.BrokerID()), 411 "OffsetRequest": NewMockOffsetResponse(t). 412 SetVersion(offsetResponseVersion). 413 SetOffset("my_topic", 0, OffsetNewest, 1234). 414 SetOffset("my_topic", 0, OffsetOldest, 0), 415 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 416 }) 417 418 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 419 if err != nil { 420 t.Fatal(err) 421 } 422 423 // When 424 consumer, err := master.ConsumePartition("my_topic", 0, 3) 425 if err != nil { 426 t.Fatal(err) 427 } 428 429 // Then: messages with offsets 1 and 2 are not returned even though they 430 // are present in the response. 431 select { 432 case msg := <-consumer.Messages(): 433 assertMessageOffset(t, msg, 3) 434 case err := <-consumer.Errors(): 435 t.Fatal(err) 436 } 437 438 select { 439 case msg := <-consumer.Messages(): 440 assertMessageOffset(t, msg, 4) 441 case err := <-consumer.Errors(): 442 t.Fatal(err) 443 } 444 445 safeClose(t, consumer) 446 safeClose(t, master) 447 broker0.Close() 448 } 449} 450 451// In some situations broker may return a block containing only 452// messages older then requested, even though there would be 453// more messages if higher offset was requested. 454func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { 455 // Given 456 fetchResponse1 := &FetchResponse{Version: 4} 457 fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1) 458 459 fetchResponse2 := &FetchResponse{Version: 4} 460 fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000) 461 462 cfg := NewConfig() 463 cfg.Consumer.Return.Errors = true 464 cfg.Version = V1_1_0_0 465 466 broker0 := NewMockBroker(t, 0) 467 468 broker0.SetHandlerByMap(map[string]MockResponse{ 469 "MetadataRequest": NewMockMetadataResponse(t). 470 SetBroker(broker0.Addr(), broker0.BrokerID()). 471 SetLeader("my_topic", 0, broker0.BrokerID()), 472 "OffsetRequest": NewMockOffsetResponse(t). 473 SetVersion(1). 474 SetOffset("my_topic", 0, OffsetNewest, 1234). 475 SetOffset("my_topic", 0, OffsetOldest, 0), 476 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 477 }) 478 479 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 480 if err != nil { 481 t.Fatal(err) 482 } 483 484 // When 485 consumer, err := master.ConsumePartition("my_topic", 0, 2) 486 if err != nil { 487 t.Fatal(err) 488 } 489 490 select { 491 case msg := <-consumer.Messages(): 492 assertMessageOffset(t, msg, 1000000) 493 case err := <-consumer.Errors(): 494 t.Fatal(err) 495 } 496 497 safeClose(t, consumer) 498 safeClose(t, master) 499 broker0.Close() 500} 501 502func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { 503 // Given 504 fetchResponse1 := &FetchResponse{Version: 4} 505 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) 506 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) 507 508 cfg := NewConfig() 509 cfg.Version = V0_11_0_0 510 511 broker0 := NewMockBroker(t, 0) 512 fetchResponse2 := &FetchResponse{} 513 fetchResponse2.Version = 4 514 fetchResponse2.AddError("my_topic", 0, ErrNoError) 515 broker0.SetHandlerByMap(map[string]MockResponse{ 516 "MetadataRequest": NewMockMetadataResponse(t). 517 SetBroker(broker0.Addr(), broker0.BrokerID()). 518 SetLeader("my_topic", 0, broker0.BrokerID()), 519 "OffsetRequest": NewMockOffsetResponse(t). 520 SetVersion(1). 521 SetOffset("my_topic", 0, OffsetNewest, 1234). 522 SetOffset("my_topic", 0, OffsetOldest, 0), 523 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 524 }) 525 526 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 527 if err != nil { 528 t.Fatal(err) 529 } 530 531 // When 532 consumer, err := master.ConsumePartition("my_topic", 0, 1) 533 if err != nil { 534 t.Fatal(err) 535 } 536 537 assertMessageOffset(t, <-consumer.Messages(), 1) 538 assertMessageOffset(t, <-consumer.Messages(), 2) 539 540 safeClose(t, consumer) 541 safeClose(t, master) 542 broker0.Close() 543} 544 545// It is fine if offsets of fetched messages are not sequential (although 546// strictly increasing!). 547func TestConsumerNonSequentialOffsets(t *testing.T) { 548 // Given 549 legacyFetchResponse := &FetchResponse{} 550 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) 551 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7) 552 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11) 553 newFetchResponse := &FetchResponse{Version: 4} 554 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5) 555 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7) 556 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11) 557 newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11) 558 newFetchResponse.SetLastStableOffset("my_topic", 0, 11) 559 for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { 560 var offsetResponseVersion int16 561 cfg := NewConfig() 562 if fetchResponse1.Version >= 4 { 563 cfg.Version = V0_11_0_0 564 offsetResponseVersion = 1 565 } 566 567 broker0 := NewMockBroker(t, 0) 568 fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version} 569 fetchResponse2.AddError("my_topic", 0, ErrNoError) 570 broker0.SetHandlerByMap(map[string]MockResponse{ 571 "MetadataRequest": NewMockMetadataResponse(t). 572 SetBroker(broker0.Addr(), broker0.BrokerID()). 573 SetLeader("my_topic", 0, broker0.BrokerID()), 574 "OffsetRequest": NewMockOffsetResponse(t). 575 SetVersion(offsetResponseVersion). 576 SetOffset("my_topic", 0, OffsetNewest, 1234). 577 SetOffset("my_topic", 0, OffsetOldest, 0), 578 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 579 }) 580 581 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 582 if err != nil { 583 t.Fatal(err) 584 } 585 586 // When 587 consumer, err := master.ConsumePartition("my_topic", 0, 3) 588 if err != nil { 589 t.Fatal(err) 590 } 591 592 // Then: messages with offsets 1 and 2 are not returned even though they 593 // are present in the response. 594 assertMessageOffset(t, <-consumer.Messages(), 5) 595 assertMessageOffset(t, <-consumer.Messages(), 7) 596 assertMessageOffset(t, <-consumer.Messages(), 11) 597 598 safeClose(t, consumer) 599 safeClose(t, master) 600 broker0.Close() 601 } 602} 603 604// If leadership for a partition is changing then consumer resolves the new 605// leader and switches to it. 606func TestConsumerRebalancingMultiplePartitions(t *testing.T) { 607 // initial setup 608 seedBroker := NewMockBroker(t, 10) 609 leader0 := NewMockBroker(t, 0) 610 leader1 := NewMockBroker(t, 1) 611 612 seedBroker.SetHandlerByMap(map[string]MockResponse{ 613 "MetadataRequest": NewMockMetadataResponse(t). 614 SetBroker(leader0.Addr(), leader0.BrokerID()). 615 SetBroker(leader1.Addr(), leader1.BrokerID()). 616 SetLeader("my_topic", 0, leader0.BrokerID()). 617 SetLeader("my_topic", 1, leader1.BrokerID()), 618 }) 619 620 mockOffsetResponse1 := NewMockOffsetResponse(t). 621 SetOffset("my_topic", 0, OffsetOldest, 0). 622 SetOffset("my_topic", 0, OffsetNewest, 1000). 623 SetOffset("my_topic", 1, OffsetOldest, 0). 624 SetOffset("my_topic", 1, OffsetNewest, 1000) 625 leader0.SetHandlerByMap(map[string]MockResponse{ 626 "OffsetRequest": mockOffsetResponse1, 627 "FetchRequest": NewMockFetchResponse(t, 1), 628 }) 629 leader1.SetHandlerByMap(map[string]MockResponse{ 630 "OffsetRequest": mockOffsetResponse1, 631 "FetchRequest": NewMockFetchResponse(t, 1), 632 }) 633 634 // launch test goroutines 635 config := NewConfig() 636 config.Consumer.Retry.Backoff = 50 637 master, err := NewConsumer([]string{seedBroker.Addr()}, config) 638 if err != nil { 639 t.Fatal(err) 640 } 641 642 // we expect to end up (eventually) consuming exactly ten messages on each partition 643 var wg sync.WaitGroup 644 for i := int32(0); i < 2; i++ { 645 consumer, err := master.ConsumePartition("my_topic", i, 0) 646 if err != nil { 647 t.Error(err) 648 } 649 650 go func(c PartitionConsumer) { 651 for err := range c.Errors() { 652 t.Error(err) 653 } 654 }(consumer) 655 656 wg.Add(1) 657 go func(partition int32, c PartitionConsumer) { 658 for i := 0; i < 10; i++ { 659 message := <-consumer.Messages() 660 if message.Offset != int64(i) { 661 t.Error("Incorrect message offset!", i, partition, message.Offset) 662 } 663 if message.Partition != partition { 664 t.Error("Incorrect message partition!") 665 } 666 } 667 safeClose(t, consumer) 668 wg.Done() 669 }(i, consumer) 670 } 671 672 time.Sleep(50 * time.Millisecond) 673 Logger.Printf(" STAGE 1") 674 // Stage 1: 675 // * my_topic/0 -> leader0 serves 4 messages 676 // * my_topic/1 -> leader1 serves 0 messages 677 678 mockFetchResponse := NewMockFetchResponse(t, 1) 679 for i := 0; i < 4; i++ { 680 mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg) 681 } 682 leader0.SetHandlerByMap(map[string]MockResponse{ 683 "FetchRequest": mockFetchResponse, 684 }) 685 686 time.Sleep(50 * time.Millisecond) 687 Logger.Printf(" STAGE 2") 688 // Stage 2: 689 // * leader0 says that it is no longer serving my_topic/0 690 // * seedBroker tells that leader1 is serving my_topic/0 now 691 692 // seed broker tells that the new partition 0 leader is leader1 693 seedBroker.SetHandlerByMap(map[string]MockResponse{ 694 "MetadataRequest": NewMockMetadataResponse(t). 695 SetLeader("my_topic", 0, leader1.BrokerID()). 696 SetLeader("my_topic", 1, leader1.BrokerID()), 697 }) 698 699 // leader0 says no longer leader of partition 0 700 fetchResponse := new(FetchResponse) 701 fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) 702 leader0.SetHandlerByMap(map[string]MockResponse{ 703 "FetchRequest": NewMockWrapper(fetchResponse), 704 }) 705 706 time.Sleep(50 * time.Millisecond) 707 Logger.Printf(" STAGE 3") 708 // Stage 3: 709 // * my_topic/0 -> leader1 serves 3 messages 710 // * my_topic/1 -> leader1 server 8 messages 711 712 // leader1 provides 3 message on partition 0, and 8 messages on partition 1 713 mockFetchResponse2 := NewMockFetchResponse(t, 2) 714 for i := 4; i < 7; i++ { 715 mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg) 716 } 717 for i := 0; i < 8; i++ { 718 mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg) 719 } 720 leader1.SetHandlerByMap(map[string]MockResponse{ 721 "FetchRequest": mockFetchResponse2, 722 }) 723 724 time.Sleep(50 * time.Millisecond) 725 Logger.Printf(" STAGE 4") 726 // Stage 4: 727 // * my_topic/0 -> leader1 serves 3 messages 728 // * my_topic/1 -> leader1 tells that it is no longer the leader 729 // * seedBroker tells that leader0 is a new leader for my_topic/1 730 731 // metadata assigns 0 to leader1 and 1 to leader0 732 seedBroker.SetHandlerByMap(map[string]MockResponse{ 733 "MetadataRequest": NewMockMetadataResponse(t). 734 SetLeader("my_topic", 0, leader1.BrokerID()). 735 SetLeader("my_topic", 1, leader0.BrokerID()), 736 }) 737 738 // leader1 provides three more messages on partition0, says no longer leader of partition1 739 mockFetchResponse3 := NewMockFetchResponse(t, 3). 740 SetMessage("my_topic", 0, int64(7), testMsg). 741 SetMessage("my_topic", 0, int64(8), testMsg). 742 SetMessage("my_topic", 0, int64(9), testMsg) 743 fetchResponse4 := new(FetchResponse) 744 fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition) 745 leader1.SetHandlerByMap(map[string]MockResponse{ 746 "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), 747 }) 748 749 // leader0 provides two messages on partition 1 750 mockFetchResponse4 := NewMockFetchResponse(t, 2) 751 for i := 8; i < 10; i++ { 752 mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg) 753 } 754 leader0.SetHandlerByMap(map[string]MockResponse{ 755 "FetchRequest": mockFetchResponse4, 756 }) 757 758 wg.Wait() 759 safeClose(t, master) 760 leader1.Close() 761 leader0.Close() 762 seedBroker.Close() 763} 764 765// When two partitions have the same broker as the leader, if one partition 766// consumer channel buffer is full then that does not affect the ability to 767// read messages by the other consumer. 768func TestConsumerInterleavedClose(t *testing.T) { 769 // Given 770 broker0 := NewMockBroker(t, 0) 771 broker0.SetHandlerByMap(map[string]MockResponse{ 772 "MetadataRequest": NewMockMetadataResponse(t). 773 SetBroker(broker0.Addr(), broker0.BrokerID()). 774 SetLeader("my_topic", 0, broker0.BrokerID()). 775 SetLeader("my_topic", 1, broker0.BrokerID()), 776 "OffsetRequest": NewMockOffsetResponse(t). 777 SetOffset("my_topic", 0, OffsetOldest, 1000). 778 SetOffset("my_topic", 0, OffsetNewest, 1100). 779 SetOffset("my_topic", 1, OffsetOldest, 2000). 780 SetOffset("my_topic", 1, OffsetNewest, 2100), 781 "FetchRequest": NewMockFetchResponse(t, 1). 782 SetMessage("my_topic", 0, 1000, testMsg). 783 SetMessage("my_topic", 0, 1001, testMsg). 784 SetMessage("my_topic", 0, 1002, testMsg). 785 SetMessage("my_topic", 1, 2000, testMsg), 786 }) 787 788 config := NewConfig() 789 config.ChannelBufferSize = 0 790 master, err := NewConsumer([]string{broker0.Addr()}, config) 791 if err != nil { 792 t.Fatal(err) 793 } 794 795 c0, err := master.ConsumePartition("my_topic", 0, 1000) 796 if err != nil { 797 t.Fatal(err) 798 } 799 800 c1, err := master.ConsumePartition("my_topic", 1, 2000) 801 if err != nil { 802 t.Fatal(err) 803 } 804 805 // When/Then: we can read from partition 0 even if nobody reads from partition 1 806 assertMessageOffset(t, <-c0.Messages(), 1000) 807 assertMessageOffset(t, <-c0.Messages(), 1001) 808 assertMessageOffset(t, <-c0.Messages(), 1002) 809 810 safeClose(t, c1) 811 safeClose(t, c0) 812 safeClose(t, master) 813 broker0.Close() 814} 815 816func TestConsumerBounceWithReferenceOpen(t *testing.T) { 817 broker0 := NewMockBroker(t, 0) 818 broker0Addr := broker0.Addr() 819 broker1 := NewMockBroker(t, 1) 820 821 mockMetadataResponse := NewMockMetadataResponse(t). 822 SetBroker(broker0.Addr(), broker0.BrokerID()). 823 SetBroker(broker1.Addr(), broker1.BrokerID()). 824 SetLeader("my_topic", 0, broker0.BrokerID()). 825 SetLeader("my_topic", 1, broker1.BrokerID()) 826 827 mockOffsetResponse := NewMockOffsetResponse(t). 828 SetOffset("my_topic", 0, OffsetOldest, 1000). 829 SetOffset("my_topic", 0, OffsetNewest, 1100). 830 SetOffset("my_topic", 1, OffsetOldest, 2000). 831 SetOffset("my_topic", 1, OffsetNewest, 2100) 832 833 mockFetchResponse := NewMockFetchResponse(t, 1) 834 for i := 0; i < 10; i++ { 835 mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg) 836 mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg) 837 } 838 839 broker0.SetHandlerByMap(map[string]MockResponse{ 840 "OffsetRequest": mockOffsetResponse, 841 "FetchRequest": mockFetchResponse, 842 }) 843 broker1.SetHandlerByMap(map[string]MockResponse{ 844 "MetadataRequest": mockMetadataResponse, 845 "OffsetRequest": mockOffsetResponse, 846 "FetchRequest": mockFetchResponse, 847 }) 848 849 config := NewConfig() 850 config.Consumer.Return.Errors = true 851 config.Consumer.Retry.Backoff = 100 * time.Millisecond 852 config.ChannelBufferSize = 1 853 master, err := NewConsumer([]string{broker1.Addr()}, config) 854 if err != nil { 855 t.Fatal(err) 856 } 857 858 c0, err := master.ConsumePartition("my_topic", 0, 1000) 859 if err != nil { 860 t.Fatal(err) 861 } 862 863 c1, err := master.ConsumePartition("my_topic", 1, 2000) 864 if err != nil { 865 t.Fatal(err) 866 } 867 868 // read messages from both partition to make sure that both brokers operate 869 // normally. 870 assertMessageOffset(t, <-c0.Messages(), 1000) 871 assertMessageOffset(t, <-c1.Messages(), 2000) 872 873 // Simulate broker shutdown. Note that metadata response does not change, 874 // that is the leadership does not move to another broker. So partition 875 // consumer will keep retrying to restore the connection with the broker. 876 broker0.Close() 877 878 // Make sure that while the partition/0 leader is down, consumer/partition/1 879 // is capable of pulling messages from broker1. 880 for i := 1; i < 7; i++ { 881 offset := (<-c1.Messages()).Offset 882 if offset != int64(2000+i) { 883 t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i)) 884 } 885 } 886 887 // Bring broker0 back to service. 888 broker0 = NewMockBrokerAddr(t, 0, broker0Addr) 889 broker0.SetHandlerByMap(map[string]MockResponse{ 890 "FetchRequest": mockFetchResponse, 891 }) 892 893 // Read the rest of messages from both partitions. 894 for i := 7; i < 10; i++ { 895 assertMessageOffset(t, <-c1.Messages(), int64(2000+i)) 896 } 897 for i := 1; i < 10; i++ { 898 assertMessageOffset(t, <-c0.Messages(), int64(1000+i)) 899 } 900 901 select { 902 case <-c0.Errors(): 903 default: 904 t.Errorf("Partition consumer should have detected broker restart") 905 } 906 907 safeClose(t, c1) 908 safeClose(t, c0) 909 safeClose(t, master) 910 broker0.Close() 911 broker1.Close() 912} 913 914func TestConsumerOffsetOutOfRange(t *testing.T) { 915 // Given 916 broker0 := NewMockBroker(t, 2) 917 broker0.SetHandlerByMap(map[string]MockResponse{ 918 "MetadataRequest": NewMockMetadataResponse(t). 919 SetBroker(broker0.Addr(), broker0.BrokerID()). 920 SetLeader("my_topic", 0, broker0.BrokerID()), 921 "OffsetRequest": NewMockOffsetResponse(t). 922 SetOffset("my_topic", 0, OffsetNewest, 1234). 923 SetOffset("my_topic", 0, OffsetOldest, 2345), 924 }) 925 926 master, err := NewConsumer([]string{broker0.Addr()}, nil) 927 if err != nil { 928 t.Fatal(err) 929 } 930 931 // When/Then 932 if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange { 933 t.Fatal("Should return ErrOffsetOutOfRange, got:", err) 934 } 935 if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange { 936 t.Fatal("Should return ErrOffsetOutOfRange, got:", err) 937 } 938 if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange { 939 t.Fatal("Should return ErrOffsetOutOfRange, got:", err) 940 } 941 942 safeClose(t, master) 943 broker0.Close() 944} 945 946func TestConsumerExpiryTicker(t *testing.T) { 947 // Given 948 broker0 := NewMockBroker(t, 0) 949 fetchResponse1 := &FetchResponse{} 950 for i := 1; i <= 8; i++ { 951 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i)) 952 } 953 broker0.SetHandlerByMap(map[string]MockResponse{ 954 "MetadataRequest": NewMockMetadataResponse(t). 955 SetBroker(broker0.Addr(), broker0.BrokerID()). 956 SetLeader("my_topic", 0, broker0.BrokerID()), 957 "OffsetRequest": NewMockOffsetResponse(t). 958 SetOffset("my_topic", 0, OffsetNewest, 1234). 959 SetOffset("my_topic", 0, OffsetOldest, 1), 960 "FetchRequest": NewMockSequence(fetchResponse1), 961 }) 962 963 config := NewConfig() 964 config.ChannelBufferSize = 0 965 config.Consumer.MaxProcessingTime = 10 * time.Millisecond 966 master, err := NewConsumer([]string{broker0.Addr()}, config) 967 if err != nil { 968 t.Fatal(err) 969 } 970 971 // When 972 consumer, err := master.ConsumePartition("my_topic", 0, 1) 973 if err != nil { 974 t.Fatal(err) 975 } 976 977 // Then: messages with offsets 1 through 8 are read 978 for i := 1; i <= 8; i++ { 979 assertMessageOffset(t, <-consumer.Messages(), int64(i)) 980 time.Sleep(2 * time.Millisecond) 981 } 982 983 safeClose(t, consumer) 984 safeClose(t, master) 985 broker0.Close() 986} 987 988func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { 989 if msg.Offset != expectedOffset { 990 t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) 991 } 992} 993 994// This example shows how to use the consumer to read messages 995// from a single partition. 996func ExampleConsumer() { 997 consumer, err := NewConsumer([]string{"localhost:9092"}, nil) 998 if err != nil { 999 panic(err) 1000 } 1001 1002 defer func() { 1003 if err := consumer.Close(); err != nil { 1004 log.Fatalln(err) 1005 } 1006 }() 1007 1008 partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) 1009 if err != nil { 1010 panic(err) 1011 } 1012 1013 defer func() { 1014 if err := partitionConsumer.Close(); err != nil { 1015 log.Fatalln(err) 1016 } 1017 }() 1018 1019 // Trap SIGINT to trigger a shutdown. 1020 signals := make(chan os.Signal, 1) 1021 signal.Notify(signals, os.Interrupt) 1022 1023 consumed := 0 1024ConsumerLoop: 1025 for { 1026 select { 1027 case msg := <-partitionConsumer.Messages(): 1028 log.Printf("Consumed message offset %d\n", msg.Offset) 1029 consumed++ 1030 case <-signals: 1031 break ConsumerLoop 1032 } 1033 } 1034 1035 log.Printf("Consumed: %d\n", consumed) 1036} 1037