1package sarama 2 3import ( 4 "errors" 5 "strings" 6 "testing" 7) 8 9func TestClusterAdmin(t *testing.T) { 10 seedBroker := NewMockBroker(t, 1) 11 defer seedBroker.Close() 12 13 seedBroker.SetHandlerByMap(map[string]MockResponse{ 14 "MetadataRequest": NewMockMetadataResponse(t). 15 SetController(seedBroker.BrokerID()). 16 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 17 }) 18 19 config := NewConfig() 20 config.Version = V1_0_0_0 21 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 22 if err != nil { 23 t.Fatal(err) 24 } 25 26 err = admin.Close() 27 if err != nil { 28 t.Fatal(err) 29 } 30} 31 32func TestClusterAdminInvalidController(t *testing.T) { 33 seedBroker := NewMockBroker(t, 1) 34 defer seedBroker.Close() 35 36 seedBroker.SetHandlerByMap(map[string]MockResponse{ 37 "MetadataRequest": NewMockMetadataResponse(t). 38 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 39 }) 40 41 config := NewConfig() 42 config.Version = V1_0_0_0 43 _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 44 if err == nil { 45 t.Fatal(errors.New("controller not set still cluster admin was created")) 46 } 47 48 if err != ErrControllerNotAvailable { 49 t.Fatal(err) 50 } 51} 52 53func TestClusterAdminCreateTopic(t *testing.T) { 54 seedBroker := NewMockBroker(t, 1) 55 defer seedBroker.Close() 56 57 seedBroker.SetHandlerByMap(map[string]MockResponse{ 58 "MetadataRequest": NewMockMetadataResponse(t). 59 SetController(seedBroker.BrokerID()). 60 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 61 "CreateTopicsRequest": NewMockCreateTopicsResponse(t), 62 }) 63 64 config := NewConfig() 65 config.Version = V0_10_2_0 66 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 67 if err != nil { 68 t.Fatal(err) 69 } 70 err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) 71 if err != nil { 72 t.Fatal(err) 73 } 74 75 err = admin.Close() 76 if err != nil { 77 t.Fatal(err) 78 } 79} 80 81func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { 82 seedBroker := NewMockBroker(t, 1) 83 defer seedBroker.Close() 84 85 seedBroker.SetHandlerByMap(map[string]MockResponse{ 86 "MetadataRequest": NewMockMetadataResponse(t). 87 SetController(seedBroker.BrokerID()). 88 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 89 "CreateTopicsRequest": NewMockCreateTopicsResponse(t), 90 }) 91 92 config := NewConfig() 93 config.Version = V0_10_2_0 94 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 95 if err != nil { 96 t.Fatal(err) 97 } 98 99 err = admin.CreateTopic("my_topic", nil, false) 100 if err.Error() != "you must specify topic details" { 101 t.Fatal(err) 102 } 103 err = admin.Close() 104 if err != nil { 105 t.Fatal(err) 106 } 107} 108 109func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { 110 seedBroker := NewMockBroker(t, 1) 111 defer seedBroker.Close() 112 113 seedBroker.SetHandlerByMap(map[string]MockResponse{ 114 "MetadataRequest": NewMockMetadataResponse(t). 115 SetController(seedBroker.BrokerID()). 116 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 117 "CreateTopicsRequest": NewMockCreateTopicsResponse(t), 118 }) 119 120 config := NewConfig() 121 config.Version = V0_11_0_0 122 123 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 124 if err != nil { 125 t.Fatal(err) 126 } 127 128 err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) 129 want := "insufficient permissions to create topic with reserved prefix" 130 if !strings.HasSuffix(err.Error(), want) { 131 t.Fatal(err) 132 } 133 err = admin.Close() 134 if err != nil { 135 t.Fatal(err) 136 } 137} 138 139func TestClusterAdminListTopics(t *testing.T) { 140 seedBroker := NewMockBroker(t, 1) 141 defer seedBroker.Close() 142 143 seedBroker.SetHandlerByMap(map[string]MockResponse{ 144 "MetadataRequest": NewMockMetadataResponse(t). 145 SetController(seedBroker.BrokerID()). 146 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 147 SetLeader("my_topic", 0, seedBroker.BrokerID()), 148 "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), 149 }) 150 151 config := NewConfig() 152 config.Version = V1_1_0_0 153 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 154 if err != nil { 155 t.Fatal(err) 156 } 157 158 entries, err := admin.ListTopics() 159 if err != nil { 160 t.Fatal(err) 161 } 162 163 if len(entries) <= 0 { 164 t.Fatal(errors.New("no resource present")) 165 } 166 167 topic, found := entries["my_topic"] 168 if !found { 169 t.Fatal(errors.New("topic not found in response")) 170 } 171 _, found = topic.ConfigEntries["max.message.bytes"] 172 if found { 173 t.Fatal(errors.New("default topic config entry incorrectly found in response")) 174 } 175 value := topic.ConfigEntries["retention.ms"] 176 if value == nil || *value != "5000" { 177 t.Fatal(errors.New("non-default topic config entry not found in response")) 178 } 179 180 err = admin.Close() 181 if err != nil { 182 t.Fatal(err) 183 } 184 185 if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 { 186 t.Fatal(errors.New("replica assignment not found in response")) 187 } 188} 189 190func TestClusterAdminDeleteTopic(t *testing.T) { 191 seedBroker := NewMockBroker(t, 1) 192 defer seedBroker.Close() 193 194 seedBroker.SetHandlerByMap(map[string]MockResponse{ 195 "MetadataRequest": NewMockMetadataResponse(t). 196 SetController(seedBroker.BrokerID()). 197 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 198 "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), 199 }) 200 201 config := NewConfig() 202 config.Version = V0_10_2_0 203 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 204 if err != nil { 205 t.Fatal(err) 206 } 207 208 err = admin.DeleteTopic("my_topic") 209 if err != nil { 210 t.Fatal(err) 211 } 212 213 err = admin.Close() 214 if err != nil { 215 t.Fatal(err) 216 } 217} 218 219func TestClusterAdminDeleteEmptyTopic(t *testing.T) { 220 seedBroker := NewMockBroker(t, 1) 221 defer seedBroker.Close() 222 223 seedBroker.SetHandlerByMap(map[string]MockResponse{ 224 "MetadataRequest": NewMockMetadataResponse(t). 225 SetController(seedBroker.BrokerID()). 226 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 227 "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), 228 }) 229 230 config := NewConfig() 231 config.Version = V0_10_2_0 232 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 233 if err != nil { 234 t.Fatal(err) 235 } 236 237 err = admin.DeleteTopic("") 238 if err != ErrInvalidTopic { 239 t.Fatal(err) 240 } 241 242 err = admin.Close() 243 if err != nil { 244 t.Fatal(err) 245 } 246} 247 248func TestClusterAdminCreatePartitions(t *testing.T) { 249 seedBroker := NewMockBroker(t, 1) 250 defer seedBroker.Close() 251 252 seedBroker.SetHandlerByMap(map[string]MockResponse{ 253 "MetadataRequest": NewMockMetadataResponse(t). 254 SetController(seedBroker.BrokerID()). 255 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 256 "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), 257 }) 258 259 config := NewConfig() 260 config.Version = V1_0_0_0 261 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 262 if err != nil { 263 t.Fatal(err) 264 } 265 266 err = admin.CreatePartitions("my_topic", 3, nil, false) 267 if err != nil { 268 t.Fatal(err) 269 } 270 271 err = admin.Close() 272 if err != nil { 273 t.Fatal(err) 274 } 275} 276 277func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { 278 seedBroker := NewMockBroker(t, 1) 279 defer seedBroker.Close() 280 281 seedBroker.SetHandlerByMap(map[string]MockResponse{ 282 "MetadataRequest": NewMockMetadataResponse(t). 283 SetController(seedBroker.BrokerID()). 284 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 285 "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), 286 }) 287 288 config := NewConfig() 289 config.Version = V0_10_2_0 290 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 291 if err != nil { 292 t.Fatal(err) 293 } 294 295 err = admin.CreatePartitions("my_topic", 3, nil, false) 296 if err != ErrUnsupportedVersion { 297 t.Fatal(err) 298 } 299 300 err = admin.Close() 301 if err != nil { 302 t.Fatal(err) 303 } 304} 305 306func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { 307 seedBroker := NewMockBroker(t, 1) 308 defer seedBroker.Close() 309 310 seedBroker.SetHandlerByMap(map[string]MockResponse{ 311 "MetadataRequest": NewMockMetadataResponse(t). 312 SetController(seedBroker.BrokerID()). 313 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 314 "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), 315 }) 316 317 config := NewConfig() 318 config.Version = V1_0_0_0 319 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 320 if err != nil { 321 t.Fatal(err) 322 } 323 324 err = admin.CreatePartitions("_internal_topic", 3, nil, false) 325 want := "insufficient permissions to create partition on topic with reserved prefix" 326 if !strings.HasSuffix(err.Error(), want) { 327 t.Fatal(err) 328 } 329 err = admin.Close() 330 if err != nil { 331 t.Fatal(err) 332 } 333} 334 335func TestClusterAdminAlterPartitionReassignments(t *testing.T) { 336 seedBroker := NewMockBroker(t, 1) 337 defer seedBroker.Close() 338 339 secondBroker := NewMockBroker(t, 2) 340 defer secondBroker.Close() 341 342 seedBroker.SetHandlerByMap(map[string]MockResponse{ 343 "MetadataRequest": NewMockMetadataResponse(t). 344 SetController(secondBroker.BrokerID()). 345 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 346 SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), 347 }) 348 349 secondBroker.SetHandlerByMap(map[string]MockResponse{ 350 "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), 351 }) 352 353 config := NewConfig() 354 config.Version = V2_4_0_0 355 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 356 if err != nil { 357 t.Fatal(err) 358 } 359 360 var topicAssignment = make([][]int32, 0, 3) 361 362 err = admin.AlterPartitionReassignments("my_topic", topicAssignment) 363 if err != nil { 364 t.Fatal(err) 365 } 366 367 err = admin.Close() 368 if err != nil { 369 t.Fatal(err) 370 } 371} 372 373func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { 374 seedBroker := NewMockBroker(t, 1) 375 defer seedBroker.Close() 376 377 secondBroker := NewMockBroker(t, 2) 378 defer secondBroker.Close() 379 380 seedBroker.SetHandlerByMap(map[string]MockResponse{ 381 "MetadataRequest": NewMockMetadataResponse(t). 382 SetController(secondBroker.BrokerID()). 383 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 384 SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), 385 }) 386 387 secondBroker.SetHandlerByMap(map[string]MockResponse{ 388 "AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t), 389 }) 390 391 config := NewConfig() 392 config.Version = V2_3_0_0 393 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 394 if err != nil { 395 t.Fatal(err) 396 } 397 398 var topicAssignment = make([][]int32, 0, 3) 399 400 err = admin.AlterPartitionReassignments("my_topic", topicAssignment) 401 402 if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) { 403 t.Fatal(err) 404 } 405 406 err = admin.Close() 407 if err != nil { 408 t.Fatal(err) 409 } 410} 411 412func TestClusterAdminListPartitionReassignments(t *testing.T) { 413 seedBroker := NewMockBroker(t, 1) 414 defer seedBroker.Close() 415 416 secondBroker := NewMockBroker(t, 2) 417 defer secondBroker.Close() 418 419 seedBroker.SetHandlerByMap(map[string]MockResponse{ 420 "MetadataRequest": NewMockMetadataResponse(t). 421 SetController(secondBroker.BrokerID()). 422 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 423 SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), 424 }) 425 426 secondBroker.SetHandlerByMap(map[string]MockResponse{ 427 "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), 428 }) 429 430 config := NewConfig() 431 config.Version = V2_4_0_0 432 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 433 if err != nil { 434 t.Fatal(err) 435 } 436 437 response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1}) 438 if err != nil { 439 t.Fatal(err) 440 } 441 442 partitionStatus, ok := response["my_topic"] 443 if !ok { 444 t.Fatalf("topic missing in response") 445 } else { 446 if len(partitionStatus) != 2 { 447 t.Fatalf("partition missing in response") 448 } 449 } 450 451 err = admin.Close() 452 if err != nil { 453 t.Fatal(err) 454 } 455} 456 457func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { 458 seedBroker := NewMockBroker(t, 1) 459 defer seedBroker.Close() 460 461 secondBroker := NewMockBroker(t, 2) 462 defer secondBroker.Close() 463 464 seedBroker.SetHandlerByMap(map[string]MockResponse{ 465 "MetadataRequest": NewMockMetadataResponse(t). 466 SetController(secondBroker.BrokerID()). 467 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 468 SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), 469 }) 470 471 secondBroker.SetHandlerByMap(map[string]MockResponse{ 472 "ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t), 473 }) 474 475 config := NewConfig() 476 config.Version = V2_3_0_0 477 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 478 if err != nil { 479 t.Fatal(err) 480 } 481 482 var partitions = make([]int32, 0) 483 484 _, err = admin.ListPartitionReassignments("my_topic", partitions) 485 486 if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) { 487 t.Fatal(err) 488 } 489 490 err = admin.Close() 491 if err != nil { 492 t.Fatal(err) 493 } 494} 495 496func TestClusterAdminDeleteRecords(t *testing.T) { 497 topicName := "my_topic" 498 seedBroker := NewMockBroker(t, 1) 499 defer seedBroker.Close() 500 501 seedBroker.SetHandlerByMap(map[string]MockResponse{ 502 "MetadataRequest": NewMockMetadataResponse(t). 503 SetController(seedBroker.BrokerID()). 504 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 505 SetLeader(topicName, 1, 1). 506 SetLeader(topicName, 2, 1). 507 SetLeader(topicName, 3, 1), 508 "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), 509 }) 510 511 config := NewConfig() 512 config.Version = V1_0_0_0 513 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 514 if err != nil { 515 t.Fatal(err) 516 } 517 518 partitionOffsetFake := make(map[int32]int64) 519 partitionOffsetFake[4] = 1000 520 errFake := admin.DeleteRecords(topicName, partitionOffsetFake) 521 if errFake == nil { 522 t.Fatal(err) 523 } 524 525 partitionOffset := make(map[int32]int64) 526 partitionOffset[1] = 1000 527 partitionOffset[2] = 1000 528 partitionOffset[3] = 1000 529 530 err = admin.DeleteRecords(topicName, partitionOffset) 531 if err != nil { 532 t.Fatal(err) 533 } 534 535 err = admin.Close() 536 if err != nil { 537 t.Fatal(err) 538 } 539} 540 541func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { 542 topicName := "my_topic" 543 seedBroker := NewMockBroker(t, 1) 544 secondBroker := NewMockBroker(t, 2) 545 defer seedBroker.Close() 546 defer secondBroker.Close() 547 548 seedBroker.SetHandlerByMap(map[string]MockResponse{ 549 "MetadataRequest": NewMockMetadataResponse(t). 550 SetController(seedBroker.BrokerID()). 551 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 552 SetBroker(secondBroker.Addr(), secondBroker.brokerID). 553 SetLeader(topicName, 1, 1). 554 SetLeader(topicName, 2, 1). 555 SetLeader(topicName, 3, 2), 556 "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), 557 }) 558 559 secondBroker.SetHandlerByMap(map[string]MockResponse{ 560 "MetadataRequest": NewMockMetadataResponse(t). 561 SetController(seedBroker.BrokerID()). 562 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 563 SetBroker(secondBroker.Addr(), secondBroker.brokerID). 564 SetLeader(topicName, 1, 1). 565 SetLeader(topicName, 2, 1). 566 SetLeader(topicName, 3, 2), 567 "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), 568 }) 569 570 config := NewConfig() 571 config.Version = V1_0_0_0 572 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 573 if err != nil { 574 t.Fatal(err) 575 } 576 partitionOffset := make(map[int32]int64) 577 partitionOffset[1] = 1000 578 partitionOffset[2] = 1000 579 partitionOffset[3] = 1000 580 581 err = admin.DeleteRecords(topicName, partitionOffset) 582 if err != nil { 583 t.Fatal(err) 584 } 585 586 err = admin.Close() 587 if err != nil { 588 t.Fatal(err) 589 } 590} 591 592func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { 593 topicName := "my_topic" 594 seedBroker := NewMockBroker(t, 1) 595 defer seedBroker.Close() 596 597 seedBroker.SetHandlerByMap(map[string]MockResponse{ 598 "MetadataRequest": NewMockMetadataResponse(t). 599 SetController(seedBroker.BrokerID()). 600 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 601 SetLeader(topicName, 1, 1). 602 SetLeader(topicName, 2, 1). 603 SetLeader(topicName, 3, 1), 604 "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), 605 }) 606 607 config := NewConfig() 608 config.Version = V0_10_2_0 609 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 610 if err != nil { 611 t.Fatal(err) 612 } 613 614 partitionOffset := make(map[int32]int64) 615 partitionOffset[1] = 1000 616 partitionOffset[2] = 1000 617 partitionOffset[3] = 1000 618 619 err = admin.DeleteRecords(topicName, partitionOffset) 620 if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") { 621 t.Fatal(err) 622 } 623 deleteRecordsError, ok := err.(ErrDeleteRecords) 624 625 if !ok { 626 t.Fatal(err) 627 } 628 629 for _, err := range *deleteRecordsError.Errors { 630 if err != ErrUnsupportedVersion { 631 t.Fatal(err) 632 } 633 } 634 635 err = admin.Close() 636 if err != nil { 637 t.Fatal(err) 638 } 639} 640 641func TestClusterAdminDescribeConfig(t *testing.T) { 642 seedBroker := NewMockBroker(t, 1) 643 defer seedBroker.Close() 644 645 seedBroker.SetHandlerByMap(map[string]MockResponse{ 646 "MetadataRequest": NewMockMetadataResponse(t). 647 SetController(seedBroker.BrokerID()). 648 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 649 "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), 650 }) 651 652 var tests = []struct { 653 saramaVersion KafkaVersion 654 requestVersion int16 655 includeSynonyms bool 656 }{ 657 {V1_0_0_0, 0, false}, 658 {V1_1_0_0, 1, true}, 659 {V1_1_1_0, 1, true}, 660 {V2_0_0_0, 2, true}, 661 } 662 for _, tt := range tests { 663 config := NewConfig() 664 config.Version = tt.saramaVersion 665 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 666 if err != nil { 667 t.Fatal(err) 668 } 669 defer func() { 670 _ = admin.Close() 671 }() 672 673 resource := ConfigResource{ 674 Name: "r1", 675 Type: TopicResource, 676 ConfigNames: []string{"my_topic"}, 677 } 678 679 entries, err := admin.DescribeConfig(resource) 680 if err != nil { 681 t.Fatal(err) 682 } 683 684 history := seedBroker.History() 685 describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest) 686 if !ok { 687 t.Fatal("failed to find DescribeConfigsRequest in mockBroker history") 688 } 689 690 if describeReq.Version != tt.requestVersion { 691 t.Fatalf( 692 "requestVersion %v did not match expected %v", 693 describeReq.Version, tt.requestVersion) 694 } 695 696 if len(entries) <= 0 { 697 t.Fatal(errors.New("no resource present")) 698 } 699 if tt.includeSynonyms { 700 if len(entries[0].Synonyms) == 0 { 701 t.Fatal("expected synonyms to have been included") 702 } 703 } 704 } 705} 706 707func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { 708 seedBroker := NewMockBroker(t, 1) 709 defer seedBroker.Close() 710 711 seedBroker.SetHandlerByMap(map[string]MockResponse{ 712 "MetadataRequest": NewMockMetadataResponse(t). 713 SetController(seedBroker.BrokerID()). 714 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 715 "DescribeConfigsRequest": NewMockDescribeConfigsResponseWithErrorCode(t), 716 }) 717 718 config := NewConfig() 719 config.Version = V1_1_0_0 720 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 721 if err != nil { 722 t.Fatal(err) 723 } 724 defer func() { 725 _ = admin.Close() 726 }() 727 728 resource := ConfigResource{ 729 Name: "r1", 730 Type: TopicResource, 731 ConfigNames: []string{"my_topic"}, 732 } 733 734 _, err = admin.DescribeConfig(resource) 735 if err == nil { 736 t.Fatal(errors.New("ErrorCode present but no Error returned")) 737 } 738} 739 740// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config 741// is sent to the broker in the resource struct, _not_ the controller 742func TestClusterAdminDescribeBrokerConfig(t *testing.T) { 743 controllerBroker := NewMockBroker(t, 1) 744 defer controllerBroker.Close() 745 configBroker := NewMockBroker(t, 2) 746 defer configBroker.Close() 747 748 controllerBroker.SetHandlerByMap(map[string]MockResponse{ 749 "MetadataRequest": NewMockMetadataResponse(t). 750 SetController(controllerBroker.BrokerID()). 751 SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). 752 SetBroker(configBroker.Addr(), configBroker.BrokerID()), 753 }) 754 755 configBroker.SetHandlerByMap(map[string]MockResponse{ 756 "MetadataRequest": NewMockMetadataResponse(t). 757 SetController(controllerBroker.BrokerID()). 758 SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). 759 SetBroker(configBroker.Addr(), configBroker.BrokerID()), 760 "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), 761 }) 762 763 config := NewConfig() 764 config.Version = V1_0_0_0 765 admin, err := NewClusterAdmin( 766 []string{ 767 controllerBroker.Addr(), 768 configBroker.Addr(), 769 }, config) 770 if err != nil { 771 t.Fatal(err) 772 } 773 774 for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} { 775 resource := ConfigResource{Name: "2", Type: resourceType} 776 entries, err := admin.DescribeConfig(resource) 777 if err != nil { 778 t.Fatal(err) 779 } 780 781 if len(entries) <= 0 { 782 t.Fatal(errors.New("no resource present")) 783 } 784 } 785 786 err = admin.Close() 787 if err != nil { 788 t.Fatal(err) 789 } 790} 791 792func TestClusterAdminAlterConfig(t *testing.T) { 793 seedBroker := NewMockBroker(t, 1) 794 defer seedBroker.Close() 795 796 seedBroker.SetHandlerByMap(map[string]MockResponse{ 797 "MetadataRequest": NewMockMetadataResponse(t). 798 SetController(seedBroker.BrokerID()). 799 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 800 "AlterConfigsRequest": NewMockAlterConfigsResponse(t), 801 }) 802 803 config := NewConfig() 804 config.Version = V1_0_0_0 805 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 806 if err != nil { 807 t.Fatal(err) 808 } 809 810 var value string 811 entries := make(map[string]*string) 812 value = "60000" 813 entries["retention.ms"] = &value 814 err = admin.AlterConfig(TopicResource, "my_topic", entries, false) 815 if err != nil { 816 t.Fatal(err) 817 } 818 819 err = admin.Close() 820 if err != nil { 821 t.Fatal(err) 822 } 823} 824 825func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { 826 seedBroker := NewMockBroker(t, 1) 827 defer seedBroker.Close() 828 829 seedBroker.SetHandlerByMap(map[string]MockResponse{ 830 "MetadataRequest": NewMockMetadataResponse(t). 831 SetController(seedBroker.BrokerID()). 832 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 833 "AlterConfigsRequest": NewMockAlterConfigsResponseWithErrorCode(t), 834 }) 835 836 config := NewConfig() 837 config.Version = V1_0_0_0 838 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 839 if err != nil { 840 t.Fatal(err) 841 } 842 defer func() { 843 _ = admin.Close() 844 }() 845 846 var value string 847 entries := make(map[string]*string) 848 value = "60000" 849 entries["retention.ms"] = &value 850 err = admin.AlterConfig(TopicResource, "my_topic", entries, false) 851 if err == nil { 852 t.Fatal(errors.New("ErrorCode present but no Error returned")) 853 } 854} 855 856func TestClusterAdminAlterBrokerConfig(t *testing.T) { 857 controllerBroker := NewMockBroker(t, 1) 858 defer controllerBroker.Close() 859 configBroker := NewMockBroker(t, 2) 860 defer configBroker.Close() 861 862 controllerBroker.SetHandlerByMap(map[string]MockResponse{ 863 "MetadataRequest": NewMockMetadataResponse(t). 864 SetController(controllerBroker.BrokerID()). 865 SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). 866 SetBroker(configBroker.Addr(), configBroker.BrokerID()), 867 }) 868 configBroker.SetHandlerByMap(map[string]MockResponse{ 869 "MetadataRequest": NewMockMetadataResponse(t). 870 SetController(controllerBroker.BrokerID()). 871 SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). 872 SetBroker(configBroker.Addr(), configBroker.BrokerID()), 873 "AlterConfigsRequest": NewMockAlterConfigsResponse(t), 874 }) 875 876 config := NewConfig() 877 config.Version = V1_0_0_0 878 admin, err := NewClusterAdmin( 879 []string{ 880 controllerBroker.Addr(), 881 configBroker.Addr(), 882 }, config) 883 if err != nil { 884 t.Fatal(err) 885 } 886 887 var value string 888 entries := make(map[string]*string) 889 value = "3" 890 entries["min.insync.replicas"] = &value 891 892 for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} { 893 resource := ConfigResource{Name: "2", Type: resourceType} 894 err = admin.AlterConfig( 895 resource.Type, 896 resource.Name, 897 entries, 898 false) 899 if err != nil { 900 t.Fatal(err) 901 } 902 } 903 904 err = admin.Close() 905 if err != nil { 906 t.Fatal(err) 907 } 908} 909 910func TestClusterAdminCreateAcl(t *testing.T) { 911 seedBroker := NewMockBroker(t, 1) 912 defer seedBroker.Close() 913 914 seedBroker.SetHandlerByMap(map[string]MockResponse{ 915 "MetadataRequest": NewMockMetadataResponse(t). 916 SetController(seedBroker.BrokerID()). 917 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 918 "CreateAclsRequest": NewMockCreateAclsResponse(t), 919 }) 920 921 config := NewConfig() 922 config.Version = V1_0_0_0 923 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 924 if err != nil { 925 t.Fatal(err) 926 } 927 928 r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"} 929 a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny} 930 931 err = admin.CreateACL(r, a) 932 if err != nil { 933 t.Fatal(err) 934 } 935 936 err = admin.Close() 937 if err != nil { 938 t.Fatal(err) 939 } 940} 941 942func TestClusterAdminListAcls(t *testing.T) { 943 seedBroker := NewMockBroker(t, 1) 944 defer seedBroker.Close() 945 946 seedBroker.SetHandlerByMap(map[string]MockResponse{ 947 "MetadataRequest": NewMockMetadataResponse(t). 948 SetController(seedBroker.BrokerID()). 949 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 950 "DescribeAclsRequest": NewMockListAclsResponse(t), 951 "CreateAclsRequest": NewMockCreateAclsResponse(t), 952 }) 953 954 config := NewConfig() 955 config.Version = V1_0_0_0 956 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 957 if err != nil { 958 t.Fatal(err) 959 } 960 961 r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"} 962 a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny} 963 964 err = admin.CreateACL(r, a) 965 if err != nil { 966 t.Fatal(err) 967 } 968 resourceName := "my_topic" 969 filter := AclFilter{ 970 ResourceType: AclResourceTopic, 971 Operation: AclOperationRead, 972 ResourceName: &resourceName, 973 } 974 975 rAcls, err := admin.ListAcls(filter) 976 if err != nil { 977 t.Fatal(err) 978 } 979 if len(rAcls) <= 0 { 980 t.Fatal("no acls present") 981 } 982 983 err = admin.Close() 984 if err != nil { 985 t.Fatal(err) 986 } 987} 988 989func TestClusterAdminDeleteAcl(t *testing.T) { 990 seedBroker := NewMockBroker(t, 1) 991 defer seedBroker.Close() 992 993 seedBroker.SetHandlerByMap(map[string]MockResponse{ 994 "MetadataRequest": NewMockMetadataResponse(t). 995 SetController(seedBroker.BrokerID()). 996 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 997 "DeleteAclsRequest": NewMockDeleteAclsResponse(t), 998 }) 999 1000 config := NewConfig() 1001 config.Version = V1_0_0_0 1002 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1003 if err != nil { 1004 t.Fatal(err) 1005 } 1006 1007 resourceName := "my_topic" 1008 filter := AclFilter{ 1009 ResourceType: AclResourceTopic, 1010 Operation: AclOperationAlter, 1011 ResourceName: &resourceName, 1012 } 1013 1014 _, err = admin.DeleteACL(filter, false) 1015 if err != nil { 1016 t.Fatal(err) 1017 } 1018 1019 err = admin.Close() 1020 if err != nil { 1021 t.Fatal(err) 1022 } 1023} 1024 1025func TestDescribeTopic(t *testing.T) { 1026 seedBroker := NewMockBroker(t, 1) 1027 defer seedBroker.Close() 1028 1029 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1030 "MetadataRequest": NewMockMetadataResponse(t). 1031 SetController(seedBroker.BrokerID()). 1032 SetLeader("my_topic", 0, seedBroker.BrokerID()). 1033 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 1034 }) 1035 1036 config := NewConfig() 1037 config.Version = V1_0_0_0 1038 1039 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1040 if err != nil { 1041 t.Fatal(err) 1042 } 1043 1044 topics, err := admin.DescribeTopics([]string{"my_topic"}) 1045 if err != nil { 1046 t.Fatal(err) 1047 } 1048 1049 if len(topics) != 1 { 1050 t.Fatalf("Expected 1 result, got %v", len(topics)) 1051 } 1052 1053 if topics[0].Name != "my_topic" { 1054 t.Fatalf("Incorrect topic name: %v", topics[0].Name) 1055 } 1056 1057 err = admin.Close() 1058 if err != nil { 1059 t.Fatal(err) 1060 } 1061} 1062 1063func TestDescribeTopicWithVersion0_11(t *testing.T) { 1064 seedBroker := NewMockBroker(t, 1) 1065 defer seedBroker.Close() 1066 1067 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1068 "MetadataRequest": NewMockMetadataResponse(t). 1069 SetController(seedBroker.BrokerID()). 1070 SetLeader("my_topic", 0, seedBroker.BrokerID()). 1071 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 1072 }) 1073 1074 config := NewConfig() 1075 config.Version = V0_11_0_0 1076 1077 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1078 if err != nil { 1079 t.Fatal(err) 1080 } 1081 1082 topics, err := admin.DescribeTopics([]string{"my_topic"}) 1083 if err != nil { 1084 t.Fatal(err) 1085 } 1086 1087 if len(topics) != 1 { 1088 t.Fatalf("Expected 1 result, got %v", len(topics)) 1089 } 1090 1091 if topics[0].Name != "my_topic" { 1092 t.Fatalf("Incorrect topic name: %v", topics[0].Name) 1093 } 1094 1095 err = admin.Close() 1096 if err != nil { 1097 t.Fatal(err) 1098 } 1099} 1100 1101func TestDescribeConsumerGroup(t *testing.T) { 1102 seedBroker := NewMockBroker(t, 1) 1103 defer seedBroker.Close() 1104 1105 expectedGroupID := "my-group" 1106 1107 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1108 "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{ 1109 GroupId: expectedGroupID, 1110 }), 1111 "MetadataRequest": NewMockMetadataResponse(t). 1112 SetController(seedBroker.BrokerID()). 1113 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 1114 "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker), 1115 }) 1116 1117 config := NewConfig() 1118 config.Version = V1_0_0_0 1119 1120 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1121 if err != nil { 1122 t.Fatal(err) 1123 } 1124 1125 result, err := admin.DescribeConsumerGroups([]string{expectedGroupID}) 1126 if err != nil { 1127 t.Fatal(err) 1128 } 1129 1130 if len(result) != 1 { 1131 t.Fatalf("Expected 1 result, got %v", len(result)) 1132 } 1133 1134 if result[0].GroupId != expectedGroupID { 1135 t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId) 1136 } 1137 1138 err = admin.Close() 1139 if err != nil { 1140 t.Fatal(err) 1141 } 1142} 1143 1144func TestListConsumerGroups(t *testing.T) { 1145 seedBroker := NewMockBroker(t, 1) 1146 defer seedBroker.Close() 1147 1148 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1149 "MetadataRequest": NewMockMetadataResponse(t). 1150 SetController(seedBroker.BrokerID()). 1151 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 1152 "ListGroupsRequest": NewMockListGroupsResponse(t). 1153 AddGroup("my-group", "consumer"), 1154 }) 1155 1156 config := NewConfig() 1157 config.Version = V1_0_0_0 1158 1159 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1160 if err != nil { 1161 t.Fatal(err) 1162 } 1163 1164 groups, err := admin.ListConsumerGroups() 1165 if err != nil { 1166 t.Fatal(err) 1167 } 1168 1169 if len(groups) != 1 { 1170 t.Fatalf("Expected %v results, got %v", 1, len(groups)) 1171 } 1172 1173 protocolType, ok := groups["my-group"] 1174 1175 if !ok { 1176 t.Fatal("Expected group to be returned, but it did not") 1177 } 1178 1179 if protocolType != "consumer" { 1180 t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType) 1181 } 1182 1183 err = admin.Close() 1184 if err != nil { 1185 t.Fatal(err) 1186 } 1187} 1188 1189func TestListConsumerGroupsMultiBroker(t *testing.T) { 1190 seedBroker := NewMockBroker(t, 1) 1191 defer seedBroker.Close() 1192 1193 secondBroker := NewMockBroker(t, 2) 1194 defer secondBroker.Close() 1195 1196 firstGroup := "first" 1197 secondGroup := "second" 1198 nonExistingGroup := "non-existing-group" 1199 1200 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1201 "MetadataRequest": NewMockMetadataResponse(t). 1202 SetController(seedBroker.BrokerID()). 1203 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 1204 SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), 1205 "ListGroupsRequest": NewMockListGroupsResponse(t). 1206 AddGroup(firstGroup, "consumer"), 1207 }) 1208 1209 secondBroker.SetHandlerByMap(map[string]MockResponse{ 1210 "MetadataRequest": NewMockMetadataResponse(t). 1211 SetController(seedBroker.BrokerID()). 1212 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). 1213 SetBroker(secondBroker.Addr(), secondBroker.BrokerID()), 1214 "ListGroupsRequest": NewMockListGroupsResponse(t). 1215 AddGroup(secondGroup, "consumer"), 1216 }) 1217 1218 config := NewConfig() 1219 config.Version = V1_0_0_0 1220 1221 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1222 if err != nil { 1223 t.Fatal(err) 1224 } 1225 1226 groups, err := admin.ListConsumerGroups() 1227 if err != nil { 1228 t.Fatal(err) 1229 } 1230 1231 if len(groups) != 2 { 1232 t.Fatalf("Expected %v results, got %v", 1, len(groups)) 1233 } 1234 1235 if _, found := groups[firstGroup]; !found { 1236 t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup) 1237 } 1238 1239 if _, found := groups[secondGroup]; !found { 1240 t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup) 1241 } 1242 1243 if _, found := groups[nonExistingGroup]; found { 1244 t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup) 1245 } 1246 1247 err = admin.Close() 1248 if err != nil { 1249 t.Fatal(err) 1250 } 1251} 1252 1253func TestListConsumerGroupOffsets(t *testing.T) { 1254 seedBroker := NewMockBroker(t, 1) 1255 defer seedBroker.Close() 1256 1257 group := "my-group" 1258 topic := "my-topic" 1259 partition := int32(0) 1260 expectedOffset := int64(0) 1261 1262 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1263 "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError), 1264 "MetadataRequest": NewMockMetadataResponse(t). 1265 SetController(seedBroker.BrokerID()). 1266 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 1267 "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), 1268 }) 1269 1270 config := NewConfig() 1271 config.Version = V1_0_0_0 1272 1273 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1274 if err != nil { 1275 t.Fatal(err) 1276 } 1277 1278 response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{ 1279 topic: {0}, 1280 }) 1281 if err != nil { 1282 t.Fatalf("ListConsumerGroupOffsets failed with error %v", err) 1283 } 1284 1285 block := response.GetBlock(topic, partition) 1286 if block == nil { 1287 t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition) 1288 } 1289 1290 if block.Offset != expectedOffset { 1291 t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset) 1292 } 1293 1294 err = admin.Close() 1295 if err != nil { 1296 t.Fatal(err) 1297 } 1298} 1299 1300func TestDeleteConsumerGroup(t *testing.T) { 1301 seedBroker := NewMockBroker(t, 1) 1302 defer seedBroker.Close() 1303 1304 group := "my-group" 1305 1306 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1307 // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError), 1308 "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}), 1309 "MetadataRequest": NewMockMetadataResponse(t). 1310 SetController(seedBroker.BrokerID()). 1311 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 1312 "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), 1313 }) 1314 1315 config := NewConfig() 1316 config.Version = V1_1_0_0 1317 1318 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1319 if err != nil { 1320 t.Fatal(err) 1321 } 1322 1323 err = admin.DeleteConsumerGroup(group) 1324 if err != nil { 1325 t.Fatalf("DeleteConsumerGroup failed with error %v", err) 1326 } 1327} 1328 1329// TestRefreshMetaDataWithDifferentController ensures that the cached 1330// controller can be forcibly updated from Metadata by the admin client 1331func TestRefreshMetaDataWithDifferentController(t *testing.T) { 1332 seedBroker1 := NewMockBroker(t, 1) 1333 seedBroker2 := NewMockBroker(t, 2) 1334 defer seedBroker1.Close() 1335 defer seedBroker2.Close() 1336 1337 seedBroker1.SetHandlerByMap(map[string]MockResponse{ 1338 "MetadataRequest": NewMockMetadataResponse(t). 1339 SetController(seedBroker1.BrokerID()). 1340 SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). 1341 SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), 1342 }) 1343 1344 config := NewConfig() 1345 config.Version = V1_1_0_0 1346 1347 client, err := NewClient([]string{seedBroker1.Addr()}, config) 1348 if err != nil { 1349 t.Fatal(err) 1350 } 1351 1352 ca := clusterAdmin{client: client, conf: config} 1353 1354 if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() { 1355 t.Fatalf("expected cached controller to be %d rather than %d", 1356 seedBroker1.BrokerID(), b.ID()) 1357 } 1358 1359 seedBroker1.SetHandlerByMap(map[string]MockResponse{ 1360 "MetadataRequest": NewMockMetadataResponse(t). 1361 SetController(seedBroker2.BrokerID()). 1362 SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). 1363 SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), 1364 }) 1365 1366 if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() { 1367 t.Fatalf("expected refreshed controller to be %d rather than %d", 1368 seedBroker2.BrokerID(), b.ID()) 1369 } 1370 1371 if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() { 1372 t.Fatalf("expected cached controller to be %d rather than %d", 1373 seedBroker2.BrokerID(), b.ID()) 1374 } 1375} 1376 1377func TestDescribeLogDirs(t *testing.T) { 1378 seedBroker := NewMockBroker(t, 1) 1379 defer seedBroker.Close() 1380 1381 seedBroker.SetHandlerByMap(map[string]MockResponse{ 1382 "MetadataRequest": NewMockMetadataResponse(t). 1383 SetController(seedBroker.BrokerID()). 1384 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), 1385 "DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t). 1386 SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}), 1387 }) 1388 1389 config := NewConfig() 1390 config.Version = V1_0_0_0 1391 1392 admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) 1393 if err != nil { 1394 t.Fatal(err) 1395 } 1396 1397 logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()}) 1398 if err != nil { 1399 t.Fatal(err) 1400 } 1401 1402 if len(logDirsPerBroker) != 1 { 1403 t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker)) 1404 } 1405 logDirs := logDirsPerBroker[seedBroker.BrokerID()] 1406 if len(logDirs) != 1 { 1407 t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs)) 1408 } 1409 logDirsBroker := logDirs[0] 1410 if logDirsBroker.ErrorCode != ErrNoError { 1411 t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode) 1412 } 1413 if logDirsBroker.Path != "/tmp/logs" { 1414 t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path) 1415 } 1416 if len(logDirsBroker.Topics) != 2 { 1417 t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics)) 1418 } 1419 err = admin.Close() 1420 if err != nil { 1421 t.Fatal(err) 1422 } 1423} 1424