1package sarama 2 3import ( 4 "log" 5 "os" 6 "os/signal" 7 "reflect" 8 "sync" 9 "sync/atomic" 10 "testing" 11 "time" 12) 13 14var testMsg = StringEncoder("Foo") 15 16// If a particular offset is provided then messages are consumed starting from 17// that offset. 18func TestConsumerOffsetManual(t *testing.T) { 19 // Given 20 broker0 := NewMockBroker(t, 0) 21 22 mockFetchResponse := NewMockFetchResponse(t, 1) 23 for i := 0; i < 10; i++ { 24 mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg) 25 } 26 27 broker0.SetHandlerByMap(map[string]MockResponse{ 28 "MetadataRequest": NewMockMetadataResponse(t). 29 SetBroker(broker0.Addr(), broker0.BrokerID()). 30 SetLeader("my_topic", 0, broker0.BrokerID()), 31 "OffsetRequest": NewMockOffsetResponse(t). 32 SetOffset("my_topic", 0, OffsetOldest, 0). 33 SetOffset("my_topic", 0, OffsetNewest, 2345), 34 "FetchRequest": mockFetchResponse, 35 }) 36 37 // When 38 master, err := NewConsumer([]string{broker0.Addr()}, nil) 39 if err != nil { 40 t.Fatal(err) 41 } 42 43 consumer, err := master.ConsumePartition("my_topic", 0, 1234) 44 if err != nil { 45 t.Fatal(err) 46 } 47 48 // Then: messages starting from offset 1234 are consumed. 49 for i := 0; i < 10; i++ { 50 select { 51 case message := <-consumer.Messages(): 52 assertMessageOffset(t, message, int64(i+1234)) 53 case err := <-consumer.Errors(): 54 t.Error(err) 55 } 56 } 57 58 safeClose(t, consumer) 59 safeClose(t, master) 60 broker0.Close() 61} 62 63// If `OffsetNewest` is passed as the initial offset then the first consumed 64// message is indeed corresponds to the offset that broker claims to be the 65// newest in its metadata response. 66func TestConsumerOffsetNewest(t *testing.T) { 67 // Given 68 broker0 := NewMockBroker(t, 0) 69 broker0.SetHandlerByMap(map[string]MockResponse{ 70 "MetadataRequest": NewMockMetadataResponse(t). 71 SetBroker(broker0.Addr(), broker0.BrokerID()). 72 SetLeader("my_topic", 0, broker0.BrokerID()), 73 "OffsetRequest": NewMockOffsetResponse(t). 74 SetOffset("my_topic", 0, OffsetNewest, 10). 75 SetOffset("my_topic", 0, OffsetOldest, 7), 76 "FetchRequest": NewMockFetchResponse(t, 1). 77 SetMessage("my_topic", 0, 9, testMsg). 78 SetMessage("my_topic", 0, 10, testMsg). 79 SetMessage("my_topic", 0, 11, testMsg). 80 SetHighWaterMark("my_topic", 0, 14), 81 }) 82 83 master, err := NewConsumer([]string{broker0.Addr()}, nil) 84 if err != nil { 85 t.Fatal(err) 86 } 87 88 // When 89 consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest) 90 if err != nil { 91 t.Fatal(err) 92 } 93 94 // Then 95 assertMessageOffset(t, <-consumer.Messages(), 10) 96 if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 { 97 t.Errorf("Expected high water mark offset 14, found %d", hwmo) 98 } 99 100 safeClose(t, consumer) 101 safeClose(t, master) 102 broker0.Close() 103} 104 105// It is possible to close a partition consumer and create the same anew. 106func TestConsumerRecreate(t *testing.T) { 107 // Given 108 broker0 := NewMockBroker(t, 0) 109 broker0.SetHandlerByMap(map[string]MockResponse{ 110 "MetadataRequest": NewMockMetadataResponse(t). 111 SetBroker(broker0.Addr(), broker0.BrokerID()). 112 SetLeader("my_topic", 0, broker0.BrokerID()), 113 "OffsetRequest": NewMockOffsetResponse(t). 114 SetOffset("my_topic", 0, OffsetOldest, 0). 115 SetOffset("my_topic", 0, OffsetNewest, 1000), 116 "FetchRequest": NewMockFetchResponse(t, 1). 117 SetMessage("my_topic", 0, 10, testMsg), 118 }) 119 120 c, err := NewConsumer([]string{broker0.Addr()}, nil) 121 if err != nil { 122 t.Fatal(err) 123 } 124 125 pc, err := c.ConsumePartition("my_topic", 0, 10) 126 if err != nil { 127 t.Fatal(err) 128 } 129 assertMessageOffset(t, <-pc.Messages(), 10) 130 131 // When 132 safeClose(t, pc) 133 pc, err = c.ConsumePartition("my_topic", 0, 10) 134 if err != nil { 135 t.Fatal(err) 136 } 137 138 // Then 139 assertMessageOffset(t, <-pc.Messages(), 10) 140 141 safeClose(t, pc) 142 safeClose(t, c) 143 broker0.Close() 144} 145 146// An attempt to consume the same partition twice should fail. 147func TestConsumerDuplicate(t *testing.T) { 148 // Given 149 broker0 := NewMockBroker(t, 0) 150 broker0.SetHandlerByMap(map[string]MockResponse{ 151 "MetadataRequest": NewMockMetadataResponse(t). 152 SetBroker(broker0.Addr(), broker0.BrokerID()). 153 SetLeader("my_topic", 0, broker0.BrokerID()), 154 "OffsetRequest": NewMockOffsetResponse(t). 155 SetOffset("my_topic", 0, OffsetOldest, 0). 156 SetOffset("my_topic", 0, OffsetNewest, 1000), 157 "FetchRequest": NewMockFetchResponse(t, 1), 158 }) 159 160 config := NewConfig() 161 config.ChannelBufferSize = 0 162 c, err := NewConsumer([]string{broker0.Addr()}, config) 163 if err != nil { 164 t.Fatal(err) 165 } 166 167 pc1, err := c.ConsumePartition("my_topic", 0, 0) 168 if err != nil { 169 t.Fatal(err) 170 } 171 172 // When 173 pc2, err := c.ConsumePartition("my_topic", 0, 0) 174 175 // Then 176 if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") { 177 t.Fatal("A partition cannot be consumed twice at the same time") 178 } 179 180 safeClose(t, pc1) 181 safeClose(t, c) 182 broker0.Close() 183} 184 185func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) { 186 // Given 187 broker0 := NewMockBroker(t, 100) 188 189 // Stage 1: my_topic/0 served by broker0 190 Logger.Printf(" STAGE 1") 191 192 broker0.SetHandlerByMap(map[string]MockResponse{ 193 "MetadataRequest": NewMockMetadataResponse(t). 194 SetBroker(broker0.Addr(), broker0.BrokerID()). 195 SetLeader("my_topic", 0, broker0.BrokerID()), 196 "OffsetRequest": NewMockOffsetResponse(t). 197 SetOffset("my_topic", 0, OffsetOldest, 123). 198 SetOffset("my_topic", 0, OffsetNewest, 1000), 199 "FetchRequest": NewMockFetchResponse(t, 1). 200 SetMessage("my_topic", 0, 123, testMsg), 201 }) 202 203 c, err := NewConsumer([]string{broker0.Addr()}, config) 204 if err != nil { 205 t.Fatal(err) 206 } 207 208 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) 209 if err != nil { 210 t.Fatal(err) 211 } 212 213 assertMessageOffset(t, <-pc.Messages(), 123) 214 215 // Stage 2: broker0 says that it is no longer the leader for my_topic/0, 216 // but the requests to retrieve metadata fail with network timeout. 217 Logger.Printf(" STAGE 2") 218 219 fetchResponse2 := &FetchResponse{} 220 fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) 221 222 broker0.SetHandlerByMap(map[string]MockResponse{ 223 "FetchRequest": NewMockWrapper(fetchResponse2), 224 }) 225 226 if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers { 227 t.Errorf("Unexpected error: %v", consErr.Err) 228 } 229 230 // Stage 3: finally the metadata returned by broker0 tells that broker1 is 231 // a new leader for my_topic/0. Consumption resumes. 232 233 Logger.Printf(" STAGE 3") 234 235 broker1 := NewMockBroker(t, 101) 236 237 broker1.SetHandlerByMap(map[string]MockResponse{ 238 "FetchRequest": NewMockFetchResponse(t, 1). 239 SetMessage("my_topic", 0, 124, testMsg), 240 }) 241 broker0.SetHandlerByMap(map[string]MockResponse{ 242 "MetadataRequest": NewMockMetadataResponse(t). 243 SetBroker(broker0.Addr(), broker0.BrokerID()). 244 SetBroker(broker1.Addr(), broker1.BrokerID()). 245 SetLeader("my_topic", 0, broker1.BrokerID()), 246 }) 247 248 assertMessageOffset(t, <-pc.Messages(), 124) 249 250 safeClose(t, pc) 251 safeClose(t, c) 252 broker1.Close() 253 broker0.Close() 254} 255 256// If consumer fails to refresh metadata it keeps retrying with frequency 257// specified by `Config.Consumer.Retry.Backoff`. 258func TestConsumerLeaderRefreshError(t *testing.T) { 259 config := NewConfig() 260 config.Net.ReadTimeout = 100 * time.Millisecond 261 config.Consumer.Retry.Backoff = 200 * time.Millisecond 262 config.Consumer.Return.Errors = true 263 config.Metadata.Retry.Max = 0 264 265 runConsumerLeaderRefreshErrorTestWithConfig(t, config) 266} 267 268func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { 269 var calls int32 = 0 270 271 config := NewConfig() 272 config.Net.ReadTimeout = 100 * time.Millisecond 273 config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration { 274 atomic.AddInt32(&calls, 1) 275 return 200 * time.Millisecond 276 } 277 config.Consumer.Return.Errors = true 278 config.Metadata.Retry.Max = 0 279 280 runConsumerLeaderRefreshErrorTestWithConfig(t, config) 281 282 // we expect at least one call to our backoff function 283 if calls == 0 { 284 t.Fail() 285 } 286} 287 288func TestConsumerInvalidTopic(t *testing.T) { 289 // Given 290 broker0 := NewMockBroker(t, 100) 291 broker0.SetHandlerByMap(map[string]MockResponse{ 292 "MetadataRequest": NewMockMetadataResponse(t). 293 SetBroker(broker0.Addr(), broker0.BrokerID()), 294 }) 295 296 c, err := NewConsumer([]string{broker0.Addr()}, nil) 297 if err != nil { 298 t.Fatal(err) 299 } 300 301 // When 302 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) 303 304 // Then 305 if pc != nil || err != ErrUnknownTopicOrPartition { 306 t.Errorf("Should fail with, err=%v", err) 307 } 308 309 safeClose(t, c) 310 broker0.Close() 311} 312 313// Nothing bad happens if a partition consumer that has no leader assigned at 314// the moment is closed. 315func TestConsumerClosePartitionWithoutLeader(t *testing.T) { 316 // Given 317 broker0 := NewMockBroker(t, 100) 318 broker0.SetHandlerByMap(map[string]MockResponse{ 319 "MetadataRequest": NewMockMetadataResponse(t). 320 SetBroker(broker0.Addr(), broker0.BrokerID()). 321 SetLeader("my_topic", 0, broker0.BrokerID()), 322 "OffsetRequest": NewMockOffsetResponse(t). 323 SetOffset("my_topic", 0, OffsetOldest, 123). 324 SetOffset("my_topic", 0, OffsetNewest, 1000), 325 "FetchRequest": NewMockFetchResponse(t, 1). 326 SetMessage("my_topic", 0, 123, testMsg), 327 }) 328 329 config := NewConfig() 330 config.Net.ReadTimeout = 100 * time.Millisecond 331 config.Consumer.Retry.Backoff = 100 * time.Millisecond 332 config.Consumer.Return.Errors = true 333 config.Metadata.Retry.Max = 0 334 c, err := NewConsumer([]string{broker0.Addr()}, config) 335 if err != nil { 336 t.Fatal(err) 337 } 338 339 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) 340 if err != nil { 341 t.Fatal(err) 342 } 343 344 assertMessageOffset(t, <-pc.Messages(), 123) 345 346 // broker0 says that it is no longer the leader for my_topic/0, but the 347 // requests to retrieve metadata fail with network timeout. 348 fetchResponse2 := &FetchResponse{} 349 fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) 350 351 broker0.SetHandlerByMap(map[string]MockResponse{ 352 "FetchRequest": NewMockWrapper(fetchResponse2), 353 }) 354 355 // When 356 if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers { 357 t.Errorf("Unexpected error: %v", consErr.Err) 358 } 359 360 // Then: the partition consumer can be closed without any problem. 361 safeClose(t, pc) 362 safeClose(t, c) 363 broker0.Close() 364} 365 366// If the initial offset passed on partition consumer creation is out of the 367// actual offset range for the partition, then the partition consumer stops 368// immediately closing its output channels. 369func TestConsumerShutsDownOutOfRange(t *testing.T) { 370 // Given 371 broker0 := NewMockBroker(t, 0) 372 fetchResponse := new(FetchResponse) 373 fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) 374 broker0.SetHandlerByMap(map[string]MockResponse{ 375 "MetadataRequest": NewMockMetadataResponse(t). 376 SetBroker(broker0.Addr(), broker0.BrokerID()). 377 SetLeader("my_topic", 0, broker0.BrokerID()), 378 "OffsetRequest": NewMockOffsetResponse(t). 379 SetOffset("my_topic", 0, OffsetNewest, 1234). 380 SetOffset("my_topic", 0, OffsetOldest, 7), 381 "FetchRequest": NewMockWrapper(fetchResponse), 382 }) 383 384 master, err := NewConsumer([]string{broker0.Addr()}, nil) 385 if err != nil { 386 t.Fatal(err) 387 } 388 389 // When 390 consumer, err := master.ConsumePartition("my_topic", 0, 101) 391 if err != nil { 392 t.Fatal(err) 393 } 394 395 // Then: consumer should shut down closing its messages and errors channels. 396 if _, ok := <-consumer.Messages(); ok { 397 t.Error("Expected the consumer to shut down") 398 } 399 safeClose(t, consumer) 400 401 safeClose(t, master) 402 broker0.Close() 403} 404 405// If a fetch response contains messages with offsets that are smaller then 406// requested, then such messages are ignored. 407func TestConsumerExtraOffsets(t *testing.T) { 408 // Given 409 legacyFetchResponse := &FetchResponse{} 410 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) 411 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2) 412 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3) 413 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4) 414 newFetchResponse := &FetchResponse{Version: 4} 415 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1) 416 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2) 417 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3) 418 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4) 419 newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4) 420 newFetchResponse.SetLastStableOffset("my_topic", 0, 4) 421 for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { 422 var offsetResponseVersion int16 423 cfg := NewConfig() 424 cfg.Consumer.Return.Errors = true 425 if fetchResponse1.Version >= 4 { 426 cfg.Version = V0_11_0_0 427 offsetResponseVersion = 1 428 } 429 430 broker0 := NewMockBroker(t, 0) 431 fetchResponse2 := &FetchResponse{} 432 fetchResponse2.Version = fetchResponse1.Version 433 fetchResponse2.AddError("my_topic", 0, ErrNoError) 434 broker0.SetHandlerByMap(map[string]MockResponse{ 435 "MetadataRequest": NewMockMetadataResponse(t). 436 SetBroker(broker0.Addr(), broker0.BrokerID()). 437 SetLeader("my_topic", 0, broker0.BrokerID()), 438 "OffsetRequest": NewMockOffsetResponse(t). 439 SetVersion(offsetResponseVersion). 440 SetOffset("my_topic", 0, OffsetNewest, 1234). 441 SetOffset("my_topic", 0, OffsetOldest, 0), 442 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 443 }) 444 445 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 446 if err != nil { 447 t.Fatal(err) 448 } 449 450 // When 451 consumer, err := master.ConsumePartition("my_topic", 0, 3) 452 if err != nil { 453 t.Fatal(err) 454 } 455 456 // Then: messages with offsets 1 and 2 are not returned even though they 457 // are present in the response. 458 select { 459 case msg := <-consumer.Messages(): 460 assertMessageOffset(t, msg, 3) 461 case err := <-consumer.Errors(): 462 t.Fatal(err) 463 } 464 465 select { 466 case msg := <-consumer.Messages(): 467 assertMessageOffset(t, msg, 4) 468 case err := <-consumer.Errors(): 469 t.Fatal(err) 470 } 471 472 safeClose(t, consumer) 473 safeClose(t, master) 474 broker0.Close() 475 } 476} 477 478// In some situations broker may return a block containing only 479// messages older then requested, even though there would be 480// more messages if higher offset was requested. 481func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { 482 // Given 483 fetchResponse1 := &FetchResponse{Version: 4} 484 fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1) 485 486 fetchResponse2 := &FetchResponse{Version: 4} 487 fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000) 488 489 cfg := NewConfig() 490 cfg.Consumer.Return.Errors = true 491 cfg.Version = V0_11_0_0 492 493 broker0 := NewMockBroker(t, 0) 494 495 broker0.SetHandlerByMap(map[string]MockResponse{ 496 "MetadataRequest": NewMockMetadataResponse(t). 497 SetBroker(broker0.Addr(), broker0.BrokerID()). 498 SetLeader("my_topic", 0, broker0.BrokerID()), 499 "OffsetRequest": NewMockOffsetResponse(t). 500 SetVersion(1). 501 SetOffset("my_topic", 0, OffsetNewest, 1234). 502 SetOffset("my_topic", 0, OffsetOldest, 0), 503 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 504 }) 505 506 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 507 if err != nil { 508 t.Fatal(err) 509 } 510 511 // When 512 consumer, err := master.ConsumePartition("my_topic", 0, 2) 513 if err != nil { 514 t.Fatal(err) 515 } 516 517 select { 518 case msg := <-consumer.Messages(): 519 assertMessageOffset(t, msg, 1000000) 520 case err := <-consumer.Errors(): 521 t.Fatal(err) 522 } 523 524 safeClose(t, consumer) 525 safeClose(t, master) 526 broker0.Close() 527} 528 529func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { 530 // Given 531 fetchResponse1 := &FetchResponse{Version: 4} 532 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) 533 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) 534 535 cfg := NewConfig() 536 cfg.Version = V0_11_0_0 537 538 broker0 := NewMockBroker(t, 0) 539 fetchResponse2 := &FetchResponse{} 540 fetchResponse2.Version = 4 541 fetchResponse2.AddError("my_topic", 0, ErrNoError) 542 broker0.SetHandlerByMap(map[string]MockResponse{ 543 "MetadataRequest": NewMockMetadataResponse(t). 544 SetBroker(broker0.Addr(), broker0.BrokerID()). 545 SetLeader("my_topic", 0, broker0.BrokerID()), 546 "OffsetRequest": NewMockOffsetResponse(t). 547 SetVersion(1). 548 SetOffset("my_topic", 0, OffsetNewest, 1234). 549 SetOffset("my_topic", 0, OffsetOldest, 0), 550 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 551 }) 552 553 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 554 if err != nil { 555 t.Fatal(err) 556 } 557 558 // When 559 consumer, err := master.ConsumePartition("my_topic", 0, 1) 560 if err != nil { 561 t.Fatal(err) 562 } 563 564 assertMessageOffset(t, <-consumer.Messages(), 1) 565 assertMessageOffset(t, <-consumer.Messages(), 2) 566 567 safeClose(t, consumer) 568 safeClose(t, master) 569 broker0.Close() 570} 571 572func TestConsumeMessageWithSessionIDs(t *testing.T) { 573 // Given 574 fetchResponse1 := &FetchResponse{Version: 7} 575 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) 576 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) 577 578 cfg := NewConfig() 579 cfg.Version = V1_1_0_0 580 581 broker0 := NewMockBroker(t, 0) 582 fetchResponse2 := &FetchResponse{} 583 fetchResponse2.Version = 7 584 fetchResponse2.AddError("my_topic", 0, ErrNoError) 585 586 broker0.SetHandlerByMap(map[string]MockResponse{ 587 "MetadataRequest": NewMockMetadataResponse(t). 588 SetBroker(broker0.Addr(), broker0.BrokerID()). 589 SetLeader("my_topic", 0, broker0.BrokerID()), 590 "OffsetRequest": NewMockOffsetResponse(t). 591 SetVersion(1). 592 SetOffset("my_topic", 0, OffsetNewest, 1234). 593 SetOffset("my_topic", 0, OffsetOldest, 0), 594 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 595 }) 596 597 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 598 if err != nil { 599 t.Fatal(err) 600 } 601 602 // When 603 consumer, err := master.ConsumePartition("my_topic", 0, 1) 604 if err != nil { 605 t.Fatal(err) 606 } 607 608 assertMessageOffset(t, <-consumer.Messages(), 1) 609 assertMessageOffset(t, <-consumer.Messages(), 2) 610 611 safeClose(t, consumer) 612 safeClose(t, master) 613 broker0.Close() 614 615 fetchReq := broker0.History()[3].Request.(*FetchRequest) 616 if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 { 617 t.Error("Expected session ID to be zero & Epoch to be -1") 618 } 619} 620 621// It is fine if offsets of fetched messages are not sequential (although 622// strictly increasing!). 623func TestConsumerNonSequentialOffsets(t *testing.T) { 624 // Given 625 legacyFetchResponse := &FetchResponse{} 626 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) 627 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7) 628 legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11) 629 newFetchResponse := &FetchResponse{Version: 4} 630 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5) 631 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7) 632 newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11) 633 newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11) 634 newFetchResponse.SetLastStableOffset("my_topic", 0, 11) 635 for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { 636 var offsetResponseVersion int16 637 cfg := NewConfig() 638 if fetchResponse1.Version >= 4 { 639 cfg.Version = V0_11_0_0 640 offsetResponseVersion = 1 641 } 642 643 broker0 := NewMockBroker(t, 0) 644 fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version} 645 fetchResponse2.AddError("my_topic", 0, ErrNoError) 646 broker0.SetHandlerByMap(map[string]MockResponse{ 647 "MetadataRequest": NewMockMetadataResponse(t). 648 SetBroker(broker0.Addr(), broker0.BrokerID()). 649 SetLeader("my_topic", 0, broker0.BrokerID()), 650 "OffsetRequest": NewMockOffsetResponse(t). 651 SetVersion(offsetResponseVersion). 652 SetOffset("my_topic", 0, OffsetNewest, 1234). 653 SetOffset("my_topic", 0, OffsetOldest, 0), 654 "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), 655 }) 656 657 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 658 if err != nil { 659 t.Fatal(err) 660 } 661 662 // When 663 consumer, err := master.ConsumePartition("my_topic", 0, 3) 664 if err != nil { 665 t.Fatal(err) 666 } 667 668 // Then: messages with offsets 1 and 2 are not returned even though they 669 // are present in the response. 670 assertMessageOffset(t, <-consumer.Messages(), 5) 671 assertMessageOffset(t, <-consumer.Messages(), 7) 672 assertMessageOffset(t, <-consumer.Messages(), 11) 673 674 safeClose(t, consumer) 675 safeClose(t, master) 676 broker0.Close() 677 } 678} 679 680// If leadership for a partition is changing then consumer resolves the new 681// leader and switches to it. 682func TestConsumerRebalancingMultiplePartitions(t *testing.T) { 683 // initial setup 684 seedBroker := NewMockBroker(t, 10) 685 leader0 := NewMockBroker(t, 0) 686 leader1 := NewMockBroker(t, 1) 687 688 seedBroker.SetHandlerByMap(map[string]MockResponse{ 689 "MetadataRequest": NewMockMetadataResponse(t). 690 SetBroker(leader0.Addr(), leader0.BrokerID()). 691 SetBroker(leader1.Addr(), leader1.BrokerID()). 692 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 693 SetLeader("my_topic", 0, leader0.BrokerID()). 694 SetLeader("my_topic", 1, leader1.BrokerID()), 695 }) 696 697 mockOffsetResponse1 := NewMockOffsetResponse(t). 698 SetOffset("my_topic", 0, OffsetOldest, 0). 699 SetOffset("my_topic", 0, OffsetNewest, 1000). 700 SetOffset("my_topic", 1, OffsetOldest, 0). 701 SetOffset("my_topic", 1, OffsetNewest, 1000) 702 leader0.SetHandlerByMap(map[string]MockResponse{ 703 "OffsetRequest": mockOffsetResponse1, 704 "FetchRequest": NewMockFetchResponse(t, 1), 705 }) 706 leader1.SetHandlerByMap(map[string]MockResponse{ 707 "OffsetRequest": mockOffsetResponse1, 708 "FetchRequest": NewMockFetchResponse(t, 1), 709 }) 710 711 // launch test goroutines 712 config := NewConfig() 713 config.Consumer.Retry.Backoff = 50 714 master, err := NewConsumer([]string{seedBroker.Addr()}, config) 715 if err != nil { 716 t.Fatal(err) 717 } 718 719 // we expect to end up (eventually) consuming exactly ten messages on each partition 720 var wg sync.WaitGroup 721 for i := int32(0); i < 2; i++ { 722 consumer, err := master.ConsumePartition("my_topic", i, 0) 723 if err != nil { 724 t.Error(err) 725 } 726 727 go func(c PartitionConsumer) { 728 for err := range c.Errors() { 729 t.Error(err) 730 } 731 }(consumer) 732 733 wg.Add(1) 734 go func(partition int32, c PartitionConsumer) { 735 for i := 0; i < 10; i++ { 736 message := <-consumer.Messages() 737 if message.Offset != int64(i) { 738 t.Error("Incorrect message offset!", i, partition, message.Offset) 739 } 740 if message.Partition != partition { 741 t.Error("Incorrect message partition!") 742 } 743 } 744 safeClose(t, consumer) 745 wg.Done() 746 }(i, consumer) 747 } 748 749 time.Sleep(50 * time.Millisecond) 750 Logger.Printf(" STAGE 1") 751 // Stage 1: 752 // * my_topic/0 -> leader0 serves 4 messages 753 // * my_topic/1 -> leader1 serves 0 messages 754 755 mockFetchResponse := NewMockFetchResponse(t, 1) 756 for i := 0; i < 4; i++ { 757 mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg) 758 } 759 leader0.SetHandlerByMap(map[string]MockResponse{ 760 "FetchRequest": mockFetchResponse, 761 }) 762 763 time.Sleep(50 * time.Millisecond) 764 Logger.Printf(" STAGE 2") 765 // Stage 2: 766 // * leader0 says that it is no longer serving my_topic/0 767 // * seedBroker tells that leader1 is serving my_topic/0 now 768 769 // seed broker tells that the new partition 0 leader is leader1 770 seedBroker.SetHandlerByMap(map[string]MockResponse{ 771 "MetadataRequest": NewMockMetadataResponse(t). 772 SetLeader("my_topic", 0, leader1.BrokerID()). 773 SetLeader("my_topic", 1, leader1.BrokerID()). 774 SetBroker(leader0.Addr(), leader0.BrokerID()). 775 SetBroker(leader1.Addr(), leader1.BrokerID()). 776 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 777 }) 778 779 // leader0 says no longer leader of partition 0 780 fetchResponse := new(FetchResponse) 781 fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) 782 leader0.SetHandlerByMap(map[string]MockResponse{ 783 "FetchRequest": NewMockWrapper(fetchResponse), 784 }) 785 786 time.Sleep(50 * time.Millisecond) 787 Logger.Printf(" STAGE 3") 788 // Stage 3: 789 // * my_topic/0 -> leader1 serves 3 messages 790 // * my_topic/1 -> leader1 server 8 messages 791 792 // leader1 provides 3 message on partition 0, and 8 messages on partition 1 793 mockFetchResponse2 := NewMockFetchResponse(t, 2) 794 for i := 4; i < 7; i++ { 795 mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg) 796 } 797 for i := 0; i < 8; i++ { 798 mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg) 799 } 800 leader1.SetHandlerByMap(map[string]MockResponse{ 801 "FetchRequest": mockFetchResponse2, 802 }) 803 804 time.Sleep(50 * time.Millisecond) 805 Logger.Printf(" STAGE 4") 806 // Stage 4: 807 // * my_topic/0 -> leader1 serves 3 messages 808 // * my_topic/1 -> leader1 tells that it is no longer the leader 809 // * seedBroker tells that leader0 is a new leader for my_topic/1 810 811 // metadata assigns 0 to leader1 and 1 to leader0 812 seedBroker.SetHandlerByMap(map[string]MockResponse{ 813 "MetadataRequest": NewMockMetadataResponse(t). 814 SetLeader("my_topic", 0, leader1.BrokerID()). 815 SetLeader("my_topic", 1, leader0.BrokerID()). 816 SetBroker(leader0.Addr(), leader0.BrokerID()). 817 SetBroker(leader1.Addr(), leader1.BrokerID()). 818 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 819 }) 820 821 // leader1 provides three more messages on partition0, says no longer leader of partition1 822 mockFetchResponse3 := NewMockFetchResponse(t, 3). 823 SetMessage("my_topic", 0, int64(7), testMsg). 824 SetMessage("my_topic", 0, int64(8), testMsg). 825 SetMessage("my_topic", 0, int64(9), testMsg) 826 fetchResponse4 := new(FetchResponse) 827 fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition) 828 leader1.SetHandlerByMap(map[string]MockResponse{ 829 "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), 830 }) 831 832 // leader0 provides two messages on partition 1 833 mockFetchResponse4 := NewMockFetchResponse(t, 2) 834 for i := 8; i < 10; i++ { 835 mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg) 836 } 837 leader0.SetHandlerByMap(map[string]MockResponse{ 838 "FetchRequest": mockFetchResponse4, 839 }) 840 841 wg.Wait() 842 safeClose(t, master) 843 leader1.Close() 844 leader0.Close() 845 seedBroker.Close() 846} 847 848// When two partitions have the same broker as the leader, if one partition 849// consumer channel buffer is full then that does not affect the ability to 850// read messages by the other consumer. 851func TestConsumerInterleavedClose(t *testing.T) { 852 // Given 853 broker0 := NewMockBroker(t, 0) 854 broker0.SetHandlerByMap(map[string]MockResponse{ 855 "MetadataRequest": NewMockMetadataResponse(t). 856 SetBroker(broker0.Addr(), broker0.BrokerID()). 857 SetLeader("my_topic", 0, broker0.BrokerID()). 858 SetLeader("my_topic", 1, broker0.BrokerID()), 859 "OffsetRequest": NewMockOffsetResponse(t). 860 SetOffset("my_topic", 0, OffsetOldest, 1000). 861 SetOffset("my_topic", 0, OffsetNewest, 1100). 862 SetOffset("my_topic", 1, OffsetOldest, 2000). 863 SetOffset("my_topic", 1, OffsetNewest, 2100), 864 "FetchRequest": NewMockFetchResponse(t, 1). 865 SetMessage("my_topic", 0, 1000, testMsg). 866 SetMessage("my_topic", 0, 1001, testMsg). 867 SetMessage("my_topic", 0, 1002, testMsg). 868 SetMessage("my_topic", 1, 2000, testMsg), 869 }) 870 871 config := NewConfig() 872 config.ChannelBufferSize = 0 873 master, err := NewConsumer([]string{broker0.Addr()}, config) 874 if err != nil { 875 t.Fatal(err) 876 } 877 878 c0, err := master.ConsumePartition("my_topic", 0, 1000) 879 if err != nil { 880 t.Fatal(err) 881 } 882 883 c1, err := master.ConsumePartition("my_topic", 1, 2000) 884 if err != nil { 885 t.Fatal(err) 886 } 887 888 // When/Then: we can read from partition 0 even if nobody reads from partition 1 889 assertMessageOffset(t, <-c0.Messages(), 1000) 890 assertMessageOffset(t, <-c0.Messages(), 1001) 891 assertMessageOffset(t, <-c0.Messages(), 1002) 892 893 safeClose(t, c1) 894 safeClose(t, c0) 895 safeClose(t, master) 896 broker0.Close() 897} 898 899func TestConsumerBounceWithReferenceOpen(t *testing.T) { 900 broker0 := NewMockBroker(t, 0) 901 broker0Addr := broker0.Addr() 902 broker1 := NewMockBroker(t, 1) 903 904 mockMetadataResponse := NewMockMetadataResponse(t). 905 SetBroker(broker0.Addr(), broker0.BrokerID()). 906 SetBroker(broker1.Addr(), broker1.BrokerID()). 907 SetLeader("my_topic", 0, broker0.BrokerID()). 908 SetLeader("my_topic", 1, broker1.BrokerID()) 909 910 mockOffsetResponse := NewMockOffsetResponse(t). 911 SetOffset("my_topic", 0, OffsetOldest, 1000). 912 SetOffset("my_topic", 0, OffsetNewest, 1100). 913 SetOffset("my_topic", 1, OffsetOldest, 2000). 914 SetOffset("my_topic", 1, OffsetNewest, 2100) 915 916 mockFetchResponse := NewMockFetchResponse(t, 1) 917 for i := 0; i < 10; i++ { 918 mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg) 919 mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg) 920 } 921 922 broker0.SetHandlerByMap(map[string]MockResponse{ 923 "OffsetRequest": mockOffsetResponse, 924 "FetchRequest": mockFetchResponse, 925 }) 926 broker1.SetHandlerByMap(map[string]MockResponse{ 927 "MetadataRequest": mockMetadataResponse, 928 "OffsetRequest": mockOffsetResponse, 929 "FetchRequest": mockFetchResponse, 930 }) 931 932 config := NewConfig() 933 config.Consumer.Return.Errors = true 934 config.Consumer.Retry.Backoff = 100 * time.Millisecond 935 config.ChannelBufferSize = 1 936 master, err := NewConsumer([]string{broker1.Addr()}, config) 937 if err != nil { 938 t.Fatal(err) 939 } 940 941 c0, err := master.ConsumePartition("my_topic", 0, 1000) 942 if err != nil { 943 t.Fatal(err) 944 } 945 946 c1, err := master.ConsumePartition("my_topic", 1, 2000) 947 if err != nil { 948 t.Fatal(err) 949 } 950 951 // read messages from both partition to make sure that both brokers operate 952 // normally. 953 assertMessageOffset(t, <-c0.Messages(), 1000) 954 assertMessageOffset(t, <-c1.Messages(), 2000) 955 956 // Simulate broker shutdown. Note that metadata response does not change, 957 // that is the leadership does not move to another broker. So partition 958 // consumer will keep retrying to restore the connection with the broker. 959 broker0.Close() 960 961 // Make sure that while the partition/0 leader is down, consumer/partition/1 962 // is capable of pulling messages from broker1. 963 for i := 1; i < 7; i++ { 964 offset := (<-c1.Messages()).Offset 965 if offset != int64(2000+i) { 966 t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i)) 967 } 968 } 969 970 // Bring broker0 back to service. 971 broker0 = NewMockBrokerAddr(t, 0, broker0Addr) 972 broker0.SetHandlerByMap(map[string]MockResponse{ 973 "FetchRequest": mockFetchResponse, 974 }) 975 976 // Read the rest of messages from both partitions. 977 for i := 7; i < 10; i++ { 978 assertMessageOffset(t, <-c1.Messages(), int64(2000+i)) 979 } 980 for i := 1; i < 10; i++ { 981 assertMessageOffset(t, <-c0.Messages(), int64(1000+i)) 982 } 983 984 select { 985 case <-c0.Errors(): 986 default: 987 t.Errorf("Partition consumer should have detected broker restart") 988 } 989 990 safeClose(t, c1) 991 safeClose(t, c0) 992 safeClose(t, master) 993 broker0.Close() 994 broker1.Close() 995} 996 997func TestConsumerOffsetOutOfRange(t *testing.T) { 998 // Given 999 broker0 := NewMockBroker(t, 2) 1000 broker0.SetHandlerByMap(map[string]MockResponse{ 1001 "MetadataRequest": NewMockMetadataResponse(t). 1002 SetBroker(broker0.Addr(), broker0.BrokerID()). 1003 SetLeader("my_topic", 0, broker0.BrokerID()), 1004 "OffsetRequest": NewMockOffsetResponse(t). 1005 SetOffset("my_topic", 0, OffsetNewest, 1234). 1006 SetOffset("my_topic", 0, OffsetOldest, 2345), 1007 }) 1008 1009 master, err := NewConsumer([]string{broker0.Addr()}, nil) 1010 if err != nil { 1011 t.Fatal(err) 1012 } 1013 1014 // When/Then 1015 if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange { 1016 t.Fatal("Should return ErrOffsetOutOfRange, got:", err) 1017 } 1018 if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange { 1019 t.Fatal("Should return ErrOffsetOutOfRange, got:", err) 1020 } 1021 if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange { 1022 t.Fatal("Should return ErrOffsetOutOfRange, got:", err) 1023 } 1024 1025 safeClose(t, master) 1026 broker0.Close() 1027} 1028 1029func TestConsumerExpiryTicker(t *testing.T) { 1030 // Given 1031 broker0 := NewMockBroker(t, 0) 1032 fetchResponse1 := &FetchResponse{} 1033 for i := 1; i <= 8; i++ { 1034 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i)) 1035 } 1036 broker0.SetHandlerByMap(map[string]MockResponse{ 1037 "MetadataRequest": NewMockMetadataResponse(t). 1038 SetBroker(broker0.Addr(), broker0.BrokerID()). 1039 SetLeader("my_topic", 0, broker0.BrokerID()), 1040 "OffsetRequest": NewMockOffsetResponse(t). 1041 SetOffset("my_topic", 0, OffsetNewest, 1234). 1042 SetOffset("my_topic", 0, OffsetOldest, 1), 1043 "FetchRequest": NewMockSequence(fetchResponse1), 1044 }) 1045 1046 config := NewConfig() 1047 config.ChannelBufferSize = 0 1048 config.Consumer.MaxProcessingTime = 10 * time.Millisecond 1049 master, err := NewConsumer([]string{broker0.Addr()}, config) 1050 if err != nil { 1051 t.Fatal(err) 1052 } 1053 1054 // When 1055 consumer, err := master.ConsumePartition("my_topic", 0, 1) 1056 if err != nil { 1057 t.Fatal(err) 1058 } 1059 1060 // Then: messages with offsets 1 through 8 are read 1061 for i := 1; i <= 8; i++ { 1062 assertMessageOffset(t, <-consumer.Messages(), int64(i)) 1063 time.Sleep(2 * time.Millisecond) 1064 } 1065 1066 safeClose(t, consumer) 1067 safeClose(t, master) 1068 broker0.Close() 1069} 1070 1071func TestConsumerTimestamps(t *testing.T) { 1072 now := time.Now().Truncate(time.Millisecond) 1073 type testMessage struct { 1074 key Encoder 1075 offset int64 1076 timestamp time.Time 1077 } 1078 for _, d := range []struct { 1079 kversion KafkaVersion 1080 logAppendTime bool 1081 messages []testMessage 1082 expectedTimestamp []time.Time 1083 }{ 1084 {MinVersion, false, []testMessage{ 1085 {testMsg, 1, now}, 1086 {testMsg, 2, now}, 1087 }, []time.Time{{}, {}}}, 1088 {V0_9_0_0, false, []testMessage{ 1089 {testMsg, 1, now}, 1090 {testMsg, 2, now}, 1091 }, []time.Time{{}, {}}}, 1092 {V0_10_0_0, false, []testMessage{ 1093 {testMsg, 1, now}, 1094 {testMsg, 2, now}, 1095 }, []time.Time{{}, {}}}, 1096 {V0_10_2_1, false, []testMessage{ 1097 {testMsg, 1, now.Add(time.Second)}, 1098 {testMsg, 2, now.Add(2 * time.Second)}, 1099 }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, 1100 {V0_10_2_1, true, []testMessage{ 1101 {testMsg, 1, now.Add(time.Second)}, 1102 {testMsg, 2, now.Add(2 * time.Second)}, 1103 }, []time.Time{now, now}}, 1104 {V0_11_0_0, false, []testMessage{ 1105 {testMsg, 1, now.Add(time.Second)}, 1106 {testMsg, 2, now.Add(2 * time.Second)}, 1107 }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, 1108 {V0_11_0_0, true, []testMessage{ 1109 {testMsg, 1, now.Add(time.Second)}, 1110 {testMsg, 2, now.Add(2 * time.Second)}, 1111 }, []time.Time{now, now}}, 1112 } { 1113 var fr *FetchResponse 1114 var offsetResponseVersion int16 1115 cfg := NewConfig() 1116 cfg.Version = d.kversion 1117 switch { 1118 case d.kversion.IsAtLeast(V0_11_0_0): 1119 offsetResponseVersion = 1 1120 fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now} 1121 for _, m := range d.messages { 1122 fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp) 1123 } 1124 fr.SetLastOffsetDelta("my_topic", 0, 2) 1125 fr.SetLastStableOffset("my_topic", 0, 2) 1126 case d.kversion.IsAtLeast(V0_10_1_0): 1127 offsetResponseVersion = 1 1128 fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now} 1129 for _, m := range d.messages { 1130 fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1) 1131 } 1132 default: 1133 var version int16 1134 switch { 1135 case d.kversion.IsAtLeast(V0_10_0_0): 1136 version = 2 1137 case d.kversion.IsAtLeast(V0_9_0_0): 1138 version = 1 1139 } 1140 fr = &FetchResponse{Version: version} 1141 for _, m := range d.messages { 1142 fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0) 1143 } 1144 } 1145 1146 broker0 := NewMockBroker(t, 0) 1147 broker0.SetHandlerByMap(map[string]MockResponse{ 1148 "MetadataRequest": NewMockMetadataResponse(t). 1149 SetBroker(broker0.Addr(), broker0.BrokerID()). 1150 SetLeader("my_topic", 0, broker0.BrokerID()), 1151 "OffsetRequest": NewMockOffsetResponse(t). 1152 SetVersion(offsetResponseVersion). 1153 SetOffset("my_topic", 0, OffsetNewest, 1234). 1154 SetOffset("my_topic", 0, OffsetOldest, 0), 1155 "FetchRequest": NewMockSequence(fr), 1156 }) 1157 1158 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 1159 if err != nil { 1160 t.Fatal(err) 1161 } 1162 1163 consumer, err := master.ConsumePartition("my_topic", 0, 1) 1164 if err != nil { 1165 t.Fatal(err) 1166 } 1167 1168 for i, ts := range d.expectedTimestamp { 1169 select { 1170 case msg := <-consumer.Messages(): 1171 assertMessageOffset(t, msg, int64(i)+1) 1172 if msg.Timestamp != ts { 1173 t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v", 1174 d.kversion, d.logAppendTime, msg.Timestamp, ts) 1175 } 1176 case err := <-consumer.Errors(): 1177 t.Fatal(err) 1178 } 1179 } 1180 1181 safeClose(t, consumer) 1182 safeClose(t, master) 1183 broker0.Close() 1184 } 1185} 1186 1187// When set to ReadCommitted, no uncommitted message should be available in messages channel 1188func TestExcludeUncommitted(t *testing.T) { 1189 // Given 1190 broker0 := NewMockBroker(t, 0) 1191 1192 fetchResponse := &FetchResponse{ 1193 Version: 4, 1194 Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: { 1195 AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}}, 1196 }}}, 1197 } 1198 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg 1199 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg 1200 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg 1201 fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record 1202 fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg 1203 1204 broker0.SetHandlerByMap(map[string]MockResponse{ 1205 "MetadataRequest": NewMockMetadataResponse(t). 1206 SetBroker(broker0.Addr(), broker0.BrokerID()). 1207 SetLeader("my_topic", 0, broker0.BrokerID()), 1208 "OffsetRequest": NewMockOffsetResponse(t). 1209 SetVersion(1). 1210 SetOffset("my_topic", 0, OffsetOldest, 0). 1211 SetOffset("my_topic", 0, OffsetNewest, 1237), 1212 "FetchRequest": NewMockWrapper(fetchResponse), 1213 }) 1214 1215 cfg := NewConfig() 1216 cfg.Consumer.Return.Errors = true 1217 cfg.Version = V0_11_0_0 1218 cfg.Consumer.IsolationLevel = ReadCommitted 1219 1220 // When 1221 master, err := NewConsumer([]string{broker0.Addr()}, cfg) 1222 if err != nil { 1223 t.Fatal(err) 1224 } 1225 1226 consumer, err := master.ConsumePartition("my_topic", 0, 1234) 1227 if err != nil { 1228 t.Fatal(err) 1229 } 1230 1231 // Then: only the 2 committed messages are returned 1232 select { 1233 case message := <-consumer.Messages(): 1234 assertMessageOffset(t, message, int64(1234)) 1235 case err := <-consumer.Errors(): 1236 t.Error(err) 1237 } 1238 select { 1239 case message := <-consumer.Messages(): 1240 assertMessageOffset(t, message, int64(1238)) 1241 case err := <-consumer.Errors(): 1242 t.Error(err) 1243 } 1244 1245 safeClose(t, consumer) 1246 safeClose(t, master) 1247 broker0.Close() 1248} 1249 1250func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { 1251 if msg.Offset != expectedOffset { 1252 t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) 1253 } 1254} 1255 1256// This example shows how to use the consumer to read messages 1257// from a single partition. 1258func ExampleConsumer() { 1259 consumer, err := NewConsumer([]string{"localhost:9092"}, nil) 1260 if err != nil { 1261 panic(err) 1262 } 1263 1264 defer func() { 1265 if err := consumer.Close(); err != nil { 1266 log.Fatalln(err) 1267 } 1268 }() 1269 1270 partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) 1271 if err != nil { 1272 panic(err) 1273 } 1274 1275 defer func() { 1276 if err := partitionConsumer.Close(); err != nil { 1277 log.Fatalln(err) 1278 } 1279 }() 1280 1281 // Trap SIGINT to trigger a shutdown. 1282 signals := make(chan os.Signal, 1) 1283 signal.Notify(signals, os.Interrupt) 1284 1285 consumed := 0 1286ConsumerLoop: 1287 for { 1288 select { 1289 case msg := <-partitionConsumer.Messages(): 1290 log.Printf("Consumed message offset %d\n", msg.Offset) 1291 consumed++ 1292 case <-signals: 1293 break ConsumerLoop 1294 } 1295 } 1296 1297 log.Printf("Consumed: %d\n", consumed) 1298} 1299 1300func Test_partitionConsumer_parseResponse(t *testing.T) { 1301 type args struct { 1302 response *FetchResponse 1303 } 1304 tests := []struct { 1305 name string 1306 args args 1307 want []*ConsumerMessage 1308 wantErr bool 1309 }{ 1310 { 1311 name: "empty but throttled FetchResponse is not considered an error", 1312 args: args{ 1313 response: &FetchResponse{ 1314 ThrottleTime: time.Millisecond, 1315 }, 1316 }, 1317 }, 1318 { 1319 name: "empty FetchResponse is considered an incomplete response by default", 1320 args: args{ 1321 response: &FetchResponse{}, 1322 }, 1323 wantErr: true, 1324 }, 1325 } 1326 for _, tt := range tests { 1327 t.Run(tt.name, func(t *testing.T) { 1328 child := &partitionConsumer{ 1329 broker: &brokerConsumer{ 1330 broker: &Broker{}, 1331 }, 1332 conf: &Config{}, 1333 } 1334 got, err := child.parseResponse(tt.args.response) 1335 if (err != nil) != tt.wantErr { 1336 t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr) 1337 return 1338 } 1339 if !reflect.DeepEqual(got, tt.want) { 1340 t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want) 1341 } 1342 }) 1343 } 1344} 1345