1package sarama
2
3import (
4	"errors"
5	"strings"
6	"testing"
7)
8
9func TestClusterAdmin(t *testing.T) {
10	seedBroker := NewMockBroker(t, 1)
11	defer seedBroker.Close()
12
13	seedBroker.SetHandlerByMap(map[string]MockResponse{
14		"MetadataRequest": NewMockMetadataResponse(t).
15			SetController(seedBroker.BrokerID()).
16			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
17	})
18
19	config := NewConfig()
20	config.Version = V1_0_0_0
21	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
22	if err != nil {
23		t.Fatal(err)
24	}
25
26	err = admin.Close()
27	if err != nil {
28		t.Fatal(err)
29	}
30}
31
32func TestClusterAdminInvalidController(t *testing.T) {
33	seedBroker := NewMockBroker(t, 1)
34	defer seedBroker.Close()
35
36	seedBroker.SetHandlerByMap(map[string]MockResponse{
37		"MetadataRequest": NewMockMetadataResponse(t).
38			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
39	})
40
41	config := NewConfig()
42	config.Version = V1_0_0_0
43	_, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
44	if err == nil {
45		t.Fatal(errors.New("controller not set still cluster admin was created"))
46	}
47
48	if err != ErrControllerNotAvailable {
49		t.Fatal(err)
50	}
51}
52
53func TestClusterAdminCreateTopic(t *testing.T) {
54	seedBroker := NewMockBroker(t, 1)
55	defer seedBroker.Close()
56
57	seedBroker.SetHandlerByMap(map[string]MockResponse{
58		"MetadataRequest": NewMockMetadataResponse(t).
59			SetController(seedBroker.BrokerID()).
60			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
61		"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
62	})
63
64	config := NewConfig()
65	config.Version = V0_10_2_0
66	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
67	if err != nil {
68		t.Fatal(err)
69	}
70	err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
71	if err != nil {
72		t.Fatal(err)
73	}
74
75	err = admin.Close()
76	if err != nil {
77		t.Fatal(err)
78	}
79}
80
81func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) {
82	seedBroker := NewMockBroker(t, 1)
83	defer seedBroker.Close()
84
85	seedBroker.SetHandlerByMap(map[string]MockResponse{
86		"MetadataRequest": NewMockMetadataResponse(t).
87			SetController(seedBroker.BrokerID()).
88			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
89		"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
90	})
91
92	config := NewConfig()
93	config.Version = V0_10_2_0
94	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
95	if err != nil {
96		t.Fatal(err)
97	}
98
99	err = admin.CreateTopic("my_topic", nil, false)
100	if err.Error() != "you must specify topic details" {
101		t.Fatal(err)
102	}
103	err = admin.Close()
104	if err != nil {
105		t.Fatal(err)
106	}
107}
108
109func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) {
110	seedBroker := NewMockBroker(t, 1)
111	defer seedBroker.Close()
112
113	seedBroker.SetHandlerByMap(map[string]MockResponse{
114		"MetadataRequest": NewMockMetadataResponse(t).
115			SetController(seedBroker.BrokerID()).
116			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
117		"CreateTopicsRequest": NewMockCreateTopicsResponse(t),
118	})
119
120	config := NewConfig()
121	config.Version = V0_11_0_0
122
123	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
124	if err != nil {
125		t.Fatal(err)
126	}
127
128	err = admin.CreateTopic("_internal_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
129	want := "insufficient permissions to create topic with reserved prefix"
130	if !strings.HasSuffix(err.Error(), want) {
131		t.Fatal(err)
132	}
133	err = admin.Close()
134	if err != nil {
135		t.Fatal(err)
136	}
137}
138
139func TestClusterAdminListTopics(t *testing.T) {
140	seedBroker := NewMockBroker(t, 1)
141	defer seedBroker.Close()
142
143	seedBroker.SetHandlerByMap(map[string]MockResponse{
144		"MetadataRequest": NewMockMetadataResponse(t).
145			SetController(seedBroker.BrokerID()).
146			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
147			SetLeader("my_topic", 0, seedBroker.BrokerID()),
148		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
149	})
150
151	config := NewConfig()
152	config.Version = V1_1_0_0
153	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
154	if err != nil {
155		t.Fatal(err)
156	}
157
158	entries, err := admin.ListTopics()
159	if err != nil {
160		t.Fatal(err)
161	}
162
163	if len(entries) <= 0 {
164		t.Fatal(errors.New("no resource present"))
165	}
166
167	topic, found := entries["my_topic"]
168	if !found {
169		t.Fatal(errors.New("topic not found in response"))
170	}
171	_, found = topic.ConfigEntries["max.message.bytes"]
172	if found {
173		t.Fatal(errors.New("default topic config entry incorrectly found in response"))
174	}
175	value := topic.ConfigEntries["retention.ms"]
176	if value == nil || *value != "5000" {
177		t.Fatal(errors.New("non-default topic config entry not found in response"))
178	}
179
180	err = admin.Close()
181	if err != nil {
182		t.Fatal(err)
183	}
184
185	if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
186		t.Fatal(errors.New("replica assignment not found in response"))
187	}
188}
189
190func TestClusterAdminDeleteTopic(t *testing.T) {
191	seedBroker := NewMockBroker(t, 1)
192	defer seedBroker.Close()
193
194	seedBroker.SetHandlerByMap(map[string]MockResponse{
195		"MetadataRequest": NewMockMetadataResponse(t).
196			SetController(seedBroker.BrokerID()).
197			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
198		"DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
199	})
200
201	config := NewConfig()
202	config.Version = V0_10_2_0
203	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
204	if err != nil {
205		t.Fatal(err)
206	}
207
208	err = admin.DeleteTopic("my_topic")
209	if err != nil {
210		t.Fatal(err)
211	}
212
213	err = admin.Close()
214	if err != nil {
215		t.Fatal(err)
216	}
217}
218
219func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
220	seedBroker := NewMockBroker(t, 1)
221	defer seedBroker.Close()
222
223	seedBroker.SetHandlerByMap(map[string]MockResponse{
224		"MetadataRequest": NewMockMetadataResponse(t).
225			SetController(seedBroker.BrokerID()).
226			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
227		"DeleteTopicsRequest": NewMockDeleteTopicsResponse(t),
228	})
229
230	config := NewConfig()
231	config.Version = V0_10_2_0
232	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
233	if err != nil {
234		t.Fatal(err)
235	}
236
237	err = admin.DeleteTopic("")
238	if err != ErrInvalidTopic {
239		t.Fatal(err)
240	}
241
242	err = admin.Close()
243	if err != nil {
244		t.Fatal(err)
245	}
246}
247
248func TestClusterAdminCreatePartitions(t *testing.T) {
249	seedBroker := NewMockBroker(t, 1)
250	defer seedBroker.Close()
251
252	seedBroker.SetHandlerByMap(map[string]MockResponse{
253		"MetadataRequest": NewMockMetadataResponse(t).
254			SetController(seedBroker.BrokerID()).
255			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
256		"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
257	})
258
259	config := NewConfig()
260	config.Version = V1_0_0_0
261	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
262	if err != nil {
263		t.Fatal(err)
264	}
265
266	err = admin.CreatePartitions("my_topic", 3, nil, false)
267	if err != nil {
268		t.Fatal(err)
269	}
270
271	err = admin.Close()
272	if err != nil {
273		t.Fatal(err)
274	}
275}
276
277func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) {
278	seedBroker := NewMockBroker(t, 1)
279	defer seedBroker.Close()
280
281	seedBroker.SetHandlerByMap(map[string]MockResponse{
282		"MetadataRequest": NewMockMetadataResponse(t).
283			SetController(seedBroker.BrokerID()).
284			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
285		"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
286	})
287
288	config := NewConfig()
289	config.Version = V0_10_2_0
290	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
291	if err != nil {
292		t.Fatal(err)
293	}
294
295	err = admin.CreatePartitions("my_topic", 3, nil, false)
296	if err != ErrUnsupportedVersion {
297		t.Fatal(err)
298	}
299
300	err = admin.Close()
301	if err != nil {
302		t.Fatal(err)
303	}
304}
305
306func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
307	seedBroker := NewMockBroker(t, 1)
308	defer seedBroker.Close()
309
310	seedBroker.SetHandlerByMap(map[string]MockResponse{
311		"MetadataRequest": NewMockMetadataResponse(t).
312			SetController(seedBroker.BrokerID()).
313			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
314		"CreatePartitionsRequest": NewMockCreatePartitionsResponse(t),
315	})
316
317	config := NewConfig()
318	config.Version = V1_0_0_0
319	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
320	if err != nil {
321		t.Fatal(err)
322	}
323
324	err = admin.CreatePartitions("_internal_topic", 3, nil, false)
325	want := "insufficient permissions to create partition on topic with reserved prefix"
326	if !strings.HasSuffix(err.Error(), want) {
327		t.Fatal(err)
328	}
329	err = admin.Close()
330	if err != nil {
331		t.Fatal(err)
332	}
333}
334
335func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
336	seedBroker := NewMockBroker(t, 1)
337	defer seedBroker.Close()
338
339	secondBroker := NewMockBroker(t, 2)
340	defer secondBroker.Close()
341
342	seedBroker.SetHandlerByMap(map[string]MockResponse{
343		"MetadataRequest": NewMockMetadataResponse(t).
344			SetController(secondBroker.BrokerID()).
345			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
346			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
347	})
348
349	secondBroker.SetHandlerByMap(map[string]MockResponse{
350		"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
351	})
352
353	config := NewConfig()
354	config.Version = V2_4_0_0
355	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
356	if err != nil {
357		t.Fatal(err)
358	}
359
360	var topicAssignment = make([][]int32, 0, 3)
361
362	err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
363	if err != nil {
364		t.Fatal(err)
365	}
366
367	err = admin.Close()
368	if err != nil {
369		t.Fatal(err)
370	}
371}
372
373func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
374	seedBroker := NewMockBroker(t, 1)
375	defer seedBroker.Close()
376
377	secondBroker := NewMockBroker(t, 2)
378	defer secondBroker.Close()
379
380	seedBroker.SetHandlerByMap(map[string]MockResponse{
381		"MetadataRequest": NewMockMetadataResponse(t).
382			SetController(secondBroker.BrokerID()).
383			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
384			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
385	})
386
387	secondBroker.SetHandlerByMap(map[string]MockResponse{
388		"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
389	})
390
391	config := NewConfig()
392	config.Version = V2_3_0_0
393	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
394	if err != nil {
395		t.Fatal(err)
396	}
397
398	var topicAssignment = make([][]int32, 0, 3)
399
400	err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
401
402	if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
403		t.Fatal(err)
404	}
405
406	err = admin.Close()
407	if err != nil {
408		t.Fatal(err)
409	}
410}
411
412func TestClusterAdminListPartitionReassignments(t *testing.T) {
413	seedBroker := NewMockBroker(t, 1)
414	defer seedBroker.Close()
415
416	secondBroker := NewMockBroker(t, 2)
417	defer secondBroker.Close()
418
419	seedBroker.SetHandlerByMap(map[string]MockResponse{
420		"MetadataRequest": NewMockMetadataResponse(t).
421			SetController(secondBroker.BrokerID()).
422			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
423			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
424	})
425
426	secondBroker.SetHandlerByMap(map[string]MockResponse{
427		"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
428	})
429
430	config := NewConfig()
431	config.Version = V2_4_0_0
432	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
433	if err != nil {
434		t.Fatal(err)
435	}
436
437	response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
438	if err != nil {
439		t.Fatal(err)
440	}
441
442	partitionStatus, ok := response["my_topic"]
443	if !ok {
444		t.Fatalf("topic missing in response")
445	} else {
446		if len(partitionStatus) != 2 {
447			t.Fatalf("partition missing in response")
448		}
449	}
450
451	err = admin.Close()
452	if err != nil {
453		t.Fatal(err)
454	}
455}
456
457func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
458	seedBroker := NewMockBroker(t, 1)
459	defer seedBroker.Close()
460
461	secondBroker := NewMockBroker(t, 2)
462	defer secondBroker.Close()
463
464	seedBroker.SetHandlerByMap(map[string]MockResponse{
465		"MetadataRequest": NewMockMetadataResponse(t).
466			SetController(secondBroker.BrokerID()).
467			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
468			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
469	})
470
471	secondBroker.SetHandlerByMap(map[string]MockResponse{
472		"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
473	})
474
475	config := NewConfig()
476	config.Version = V2_3_0_0
477	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
478	if err != nil {
479		t.Fatal(err)
480	}
481
482	var partitions = make([]int32, 0)
483
484	_, err = admin.ListPartitionReassignments("my_topic", partitions)
485
486	if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
487		t.Fatal(err)
488	}
489
490	err = admin.Close()
491	if err != nil {
492		t.Fatal(err)
493	}
494}
495
496func TestClusterAdminDeleteRecords(t *testing.T) {
497	topicName := "my_topic"
498	seedBroker := NewMockBroker(t, 1)
499	defer seedBroker.Close()
500
501	seedBroker.SetHandlerByMap(map[string]MockResponse{
502		"MetadataRequest": NewMockMetadataResponse(t).
503			SetController(seedBroker.BrokerID()).
504			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
505			SetLeader(topicName, 1, 1).
506			SetLeader(topicName, 2, 1).
507			SetLeader(topicName, 3, 1),
508		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
509	})
510
511	config := NewConfig()
512	config.Version = V1_0_0_0
513	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
514	if err != nil {
515		t.Fatal(err)
516	}
517
518	partitionOffsetFake := make(map[int32]int64)
519	partitionOffsetFake[4] = 1000
520	errFake := admin.DeleteRecords(topicName, partitionOffsetFake)
521	if errFake == nil {
522		t.Fatal(err)
523	}
524
525	partitionOffset := make(map[int32]int64)
526	partitionOffset[1] = 1000
527	partitionOffset[2] = 1000
528	partitionOffset[3] = 1000
529
530	err = admin.DeleteRecords(topicName, partitionOffset)
531	if err != nil {
532		t.Fatal(err)
533	}
534
535	err = admin.Close()
536	if err != nil {
537		t.Fatal(err)
538	}
539}
540
541func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
542	topicName := "my_topic"
543	seedBroker := NewMockBroker(t, 1)
544	secondBroker := NewMockBroker(t, 2)
545	defer seedBroker.Close()
546	defer secondBroker.Close()
547
548	seedBroker.SetHandlerByMap(map[string]MockResponse{
549		"MetadataRequest": NewMockMetadataResponse(t).
550			SetController(seedBroker.BrokerID()).
551			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
552			SetBroker(secondBroker.Addr(), secondBroker.brokerID).
553			SetLeader(topicName, 1, 1).
554			SetLeader(topicName, 2, 1).
555			SetLeader(topicName, 3, 2),
556		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
557	})
558
559	secondBroker.SetHandlerByMap(map[string]MockResponse{
560		"MetadataRequest": NewMockMetadataResponse(t).
561			SetController(seedBroker.BrokerID()).
562			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
563			SetBroker(secondBroker.Addr(), secondBroker.brokerID).
564			SetLeader(topicName, 1, 1).
565			SetLeader(topicName, 2, 1).
566			SetLeader(topicName, 3, 2),
567		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
568	})
569
570	config := NewConfig()
571	config.Version = V1_0_0_0
572	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
573	if err != nil {
574		t.Fatal(err)
575	}
576	partitionOffset := make(map[int32]int64)
577	partitionOffset[1] = 1000
578	partitionOffset[2] = 1000
579	partitionOffset[3] = 1000
580
581	err = admin.DeleteRecords(topicName, partitionOffset)
582	if err != nil {
583		t.Fatal(err)
584	}
585
586	err = admin.Close()
587	if err != nil {
588		t.Fatal(err)
589	}
590}
591
592func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
593	topicName := "my_topic"
594	seedBroker := NewMockBroker(t, 1)
595	defer seedBroker.Close()
596
597	seedBroker.SetHandlerByMap(map[string]MockResponse{
598		"MetadataRequest": NewMockMetadataResponse(t).
599			SetController(seedBroker.BrokerID()).
600			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
601			SetLeader(topicName, 1, 1).
602			SetLeader(topicName, 2, 1).
603			SetLeader(topicName, 3, 1),
604		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
605	})
606
607	config := NewConfig()
608	config.Version = V0_10_2_0
609	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
610	if err != nil {
611		t.Fatal(err)
612	}
613
614	partitionOffset := make(map[int32]int64)
615	partitionOffset[1] = 1000
616	partitionOffset[2] = 1000
617	partitionOffset[3] = 1000
618
619	err = admin.DeleteRecords(topicName, partitionOffset)
620	if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
621		t.Fatal(err)
622	}
623	deleteRecordsError, ok := err.(ErrDeleteRecords)
624
625	if !ok {
626		t.Fatal(err)
627	}
628
629	for _, err := range *deleteRecordsError.Errors {
630		if err != ErrUnsupportedVersion {
631			t.Fatal(err)
632		}
633	}
634
635	err = admin.Close()
636	if err != nil {
637		t.Fatal(err)
638	}
639}
640
641func TestClusterAdminDescribeConfig(t *testing.T) {
642	seedBroker := NewMockBroker(t, 1)
643	defer seedBroker.Close()
644
645	seedBroker.SetHandlerByMap(map[string]MockResponse{
646		"MetadataRequest": NewMockMetadataResponse(t).
647			SetController(seedBroker.BrokerID()).
648			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
649		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
650	})
651
652	var tests = []struct {
653		saramaVersion   KafkaVersion
654		requestVersion  int16
655		includeSynonyms bool
656	}{
657		{V1_0_0_0, 0, false},
658		{V1_1_0_0, 1, true},
659		{V1_1_1_0, 1, true},
660		{V2_0_0_0, 2, true},
661	}
662	for _, tt := range tests {
663		config := NewConfig()
664		config.Version = tt.saramaVersion
665		admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
666		if err != nil {
667			t.Fatal(err)
668		}
669		defer func() {
670			_ = admin.Close()
671		}()
672
673		resource := ConfigResource{
674			Name:        "r1",
675			Type:        TopicResource,
676			ConfigNames: []string{"my_topic"},
677		}
678
679		entries, err := admin.DescribeConfig(resource)
680		if err != nil {
681			t.Fatal(err)
682		}
683
684		history := seedBroker.History()
685		describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
686		if !ok {
687			t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
688		}
689
690		if describeReq.Version != tt.requestVersion {
691			t.Fatalf(
692				"requestVersion %v did not match expected %v",
693				describeReq.Version, tt.requestVersion)
694		}
695
696		if len(entries) <= 0 {
697			t.Fatal(errors.New("no resource present"))
698		}
699		if tt.includeSynonyms {
700			if len(entries[0].Synonyms) == 0 {
701				t.Fatal("expected synonyms to have been included")
702			}
703		}
704	}
705}
706
707func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) {
708	seedBroker := NewMockBroker(t, 1)
709	defer seedBroker.Close()
710
711	seedBroker.SetHandlerByMap(map[string]MockResponse{
712		"MetadataRequest": NewMockMetadataResponse(t).
713			SetController(seedBroker.BrokerID()).
714			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
715		"DescribeConfigsRequest": NewMockDescribeConfigsResponseWithErrorCode(t),
716	})
717
718	config := NewConfig()
719	config.Version = V1_1_0_0
720	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
721	if err != nil {
722		t.Fatal(err)
723	}
724	defer func() {
725		_ = admin.Close()
726	}()
727
728	resource := ConfigResource{
729		Name:        "r1",
730		Type:        TopicResource,
731		ConfigNames: []string{"my_topic"},
732	}
733
734	_, err = admin.DescribeConfig(resource)
735	if err == nil {
736		t.Fatal(errors.New("ErrorCode present but no Error returned"))
737	}
738}
739
740// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
741// is sent to the broker in the resource struct, _not_ the controller
742func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
743	controllerBroker := NewMockBroker(t, 1)
744	defer controllerBroker.Close()
745	configBroker := NewMockBroker(t, 2)
746	defer configBroker.Close()
747
748	controllerBroker.SetHandlerByMap(map[string]MockResponse{
749		"MetadataRequest": NewMockMetadataResponse(t).
750			SetController(controllerBroker.BrokerID()).
751			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
752			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
753	})
754
755	configBroker.SetHandlerByMap(map[string]MockResponse{
756		"MetadataRequest": NewMockMetadataResponse(t).
757			SetController(controllerBroker.BrokerID()).
758			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
759			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
760		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
761	})
762
763	config := NewConfig()
764	config.Version = V1_0_0_0
765	admin, err := NewClusterAdmin(
766		[]string{
767			controllerBroker.Addr(),
768			configBroker.Addr(),
769		}, config)
770	if err != nil {
771		t.Fatal(err)
772	}
773
774	for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
775		resource := ConfigResource{Name: "2", Type: resourceType}
776		entries, err := admin.DescribeConfig(resource)
777		if err != nil {
778			t.Fatal(err)
779		}
780
781		if len(entries) <= 0 {
782			t.Fatal(errors.New("no resource present"))
783		}
784	}
785
786	err = admin.Close()
787	if err != nil {
788		t.Fatal(err)
789	}
790}
791
792func TestClusterAdminAlterConfig(t *testing.T) {
793	seedBroker := NewMockBroker(t, 1)
794	defer seedBroker.Close()
795
796	seedBroker.SetHandlerByMap(map[string]MockResponse{
797		"MetadataRequest": NewMockMetadataResponse(t).
798			SetController(seedBroker.BrokerID()).
799			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
800		"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
801	})
802
803	config := NewConfig()
804	config.Version = V1_0_0_0
805	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
806	if err != nil {
807		t.Fatal(err)
808	}
809
810	var value string
811	entries := make(map[string]*string)
812	value = "60000"
813	entries["retention.ms"] = &value
814	err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
815	if err != nil {
816		t.Fatal(err)
817	}
818
819	err = admin.Close()
820	if err != nil {
821		t.Fatal(err)
822	}
823}
824
825func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) {
826	seedBroker := NewMockBroker(t, 1)
827	defer seedBroker.Close()
828
829	seedBroker.SetHandlerByMap(map[string]MockResponse{
830		"MetadataRequest": NewMockMetadataResponse(t).
831			SetController(seedBroker.BrokerID()).
832			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
833		"AlterConfigsRequest": NewMockAlterConfigsResponseWithErrorCode(t),
834	})
835
836	config := NewConfig()
837	config.Version = V1_0_0_0
838	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
839	if err != nil {
840		t.Fatal(err)
841	}
842	defer func() {
843		_ = admin.Close()
844	}()
845
846	var value string
847	entries := make(map[string]*string)
848	value = "60000"
849	entries["retention.ms"] = &value
850	err = admin.AlterConfig(TopicResource, "my_topic", entries, false)
851	if err == nil {
852		t.Fatal(errors.New("ErrorCode present but no Error returned"))
853	}
854}
855
856func TestClusterAdminAlterBrokerConfig(t *testing.T) {
857	controllerBroker := NewMockBroker(t, 1)
858	defer controllerBroker.Close()
859	configBroker := NewMockBroker(t, 2)
860	defer configBroker.Close()
861
862	controllerBroker.SetHandlerByMap(map[string]MockResponse{
863		"MetadataRequest": NewMockMetadataResponse(t).
864			SetController(controllerBroker.BrokerID()).
865			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
866			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
867	})
868	configBroker.SetHandlerByMap(map[string]MockResponse{
869		"MetadataRequest": NewMockMetadataResponse(t).
870			SetController(controllerBroker.BrokerID()).
871			SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
872			SetBroker(configBroker.Addr(), configBroker.BrokerID()),
873		"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
874	})
875
876	config := NewConfig()
877	config.Version = V1_0_0_0
878	admin, err := NewClusterAdmin(
879		[]string{
880			controllerBroker.Addr(),
881			configBroker.Addr(),
882		}, config)
883	if err != nil {
884		t.Fatal(err)
885	}
886
887	var value string
888	entries := make(map[string]*string)
889	value = "3"
890	entries["min.insync.replicas"] = &value
891
892	for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
893		resource := ConfigResource{Name: "2", Type: resourceType}
894		err = admin.AlterConfig(
895			resource.Type,
896			resource.Name,
897			entries,
898			false)
899		if err != nil {
900			t.Fatal(err)
901		}
902	}
903
904	err = admin.Close()
905	if err != nil {
906		t.Fatal(err)
907	}
908}
909
910func TestClusterAdminCreateAcl(t *testing.T) {
911	seedBroker := NewMockBroker(t, 1)
912	defer seedBroker.Close()
913
914	seedBroker.SetHandlerByMap(map[string]MockResponse{
915		"MetadataRequest": NewMockMetadataResponse(t).
916			SetController(seedBroker.BrokerID()).
917			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
918		"CreateAclsRequest": NewMockCreateAclsResponse(t),
919	})
920
921	config := NewConfig()
922	config.Version = V1_0_0_0
923	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
924	if err != nil {
925		t.Fatal(err)
926	}
927
928	r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
929	a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
930
931	err = admin.CreateACL(r, a)
932	if err != nil {
933		t.Fatal(err)
934	}
935
936	err = admin.Close()
937	if err != nil {
938		t.Fatal(err)
939	}
940}
941
942func TestClusterAdminListAcls(t *testing.T) {
943	seedBroker := NewMockBroker(t, 1)
944	defer seedBroker.Close()
945
946	seedBroker.SetHandlerByMap(map[string]MockResponse{
947		"MetadataRequest": NewMockMetadataResponse(t).
948			SetController(seedBroker.BrokerID()).
949			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
950		"DescribeAclsRequest": NewMockListAclsResponse(t),
951		"CreateAclsRequest":   NewMockCreateAclsResponse(t),
952	})
953
954	config := NewConfig()
955	config.Version = V1_0_0_0
956	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
957	if err != nil {
958		t.Fatal(err)
959	}
960
961	r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"}
962	a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny}
963
964	err = admin.CreateACL(r, a)
965	if err != nil {
966		t.Fatal(err)
967	}
968	resourceName := "my_topic"
969	filter := AclFilter{
970		ResourceType: AclResourceTopic,
971		Operation:    AclOperationRead,
972		ResourceName: &resourceName,
973	}
974
975	rAcls, err := admin.ListAcls(filter)
976	if err != nil {
977		t.Fatal(err)
978	}
979	if len(rAcls) <= 0 {
980		t.Fatal("no acls present")
981	}
982
983	err = admin.Close()
984	if err != nil {
985		t.Fatal(err)
986	}
987}
988
989func TestClusterAdminDeleteAcl(t *testing.T) {
990	seedBroker := NewMockBroker(t, 1)
991	defer seedBroker.Close()
992
993	seedBroker.SetHandlerByMap(map[string]MockResponse{
994		"MetadataRequest": NewMockMetadataResponse(t).
995			SetController(seedBroker.BrokerID()).
996			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
997		"DeleteAclsRequest": NewMockDeleteAclsResponse(t),
998	})
999
1000	config := NewConfig()
1001	config.Version = V1_0_0_0
1002	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1003	if err != nil {
1004		t.Fatal(err)
1005	}
1006
1007	resourceName := "my_topic"
1008	filter := AclFilter{
1009		ResourceType: AclResourceTopic,
1010		Operation:    AclOperationAlter,
1011		ResourceName: &resourceName,
1012	}
1013
1014	_, err = admin.DeleteACL(filter, false)
1015	if err != nil {
1016		t.Fatal(err)
1017	}
1018
1019	err = admin.Close()
1020	if err != nil {
1021		t.Fatal(err)
1022	}
1023}
1024
1025func TestDescribeTopic(t *testing.T) {
1026	seedBroker := NewMockBroker(t, 1)
1027	defer seedBroker.Close()
1028
1029	seedBroker.SetHandlerByMap(map[string]MockResponse{
1030		"MetadataRequest": NewMockMetadataResponse(t).
1031			SetController(seedBroker.BrokerID()).
1032			SetLeader("my_topic", 0, seedBroker.BrokerID()).
1033			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1034	})
1035
1036	config := NewConfig()
1037	config.Version = V1_0_0_0
1038
1039	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1040	if err != nil {
1041		t.Fatal(err)
1042	}
1043
1044	topics, err := admin.DescribeTopics([]string{"my_topic"})
1045	if err != nil {
1046		t.Fatal(err)
1047	}
1048
1049	if len(topics) != 1 {
1050		t.Fatalf("Expected 1 result, got %v", len(topics))
1051	}
1052
1053	if topics[0].Name != "my_topic" {
1054		t.Fatalf("Incorrect topic name: %v", topics[0].Name)
1055	}
1056
1057	err = admin.Close()
1058	if err != nil {
1059		t.Fatal(err)
1060	}
1061}
1062
1063func TestDescribeTopicWithVersion0_11(t *testing.T) {
1064	seedBroker := NewMockBroker(t, 1)
1065	defer seedBroker.Close()
1066
1067	seedBroker.SetHandlerByMap(map[string]MockResponse{
1068		"MetadataRequest": NewMockMetadataResponse(t).
1069			SetController(seedBroker.BrokerID()).
1070			SetLeader("my_topic", 0, seedBroker.BrokerID()).
1071			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1072	})
1073
1074	config := NewConfig()
1075	config.Version = V0_11_0_0
1076
1077	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1078	if err != nil {
1079		t.Fatal(err)
1080	}
1081
1082	topics, err := admin.DescribeTopics([]string{"my_topic"})
1083	if err != nil {
1084		t.Fatal(err)
1085	}
1086
1087	if len(topics) != 1 {
1088		t.Fatalf("Expected 1 result, got %v", len(topics))
1089	}
1090
1091	if topics[0].Name != "my_topic" {
1092		t.Fatalf("Incorrect topic name: %v", topics[0].Name)
1093	}
1094
1095	err = admin.Close()
1096	if err != nil {
1097		t.Fatal(err)
1098	}
1099}
1100
1101func TestDescribeConsumerGroup(t *testing.T) {
1102	seedBroker := NewMockBroker(t, 1)
1103	defer seedBroker.Close()
1104
1105	expectedGroupID := "my-group"
1106
1107	seedBroker.SetHandlerByMap(map[string]MockResponse{
1108		"DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
1109			GroupId: expectedGroupID,
1110		}),
1111		"MetadataRequest": NewMockMetadataResponse(t).
1112			SetController(seedBroker.BrokerID()).
1113			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1114		"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
1115	})
1116
1117	config := NewConfig()
1118	config.Version = V1_0_0_0
1119
1120	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1121	if err != nil {
1122		t.Fatal(err)
1123	}
1124
1125	result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
1126	if err != nil {
1127		t.Fatal(err)
1128	}
1129
1130	if len(result) != 1 {
1131		t.Fatalf("Expected 1 result, got %v", len(result))
1132	}
1133
1134	if result[0].GroupId != expectedGroupID {
1135		t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
1136	}
1137
1138	err = admin.Close()
1139	if err != nil {
1140		t.Fatal(err)
1141	}
1142}
1143
1144func TestListConsumerGroups(t *testing.T) {
1145	seedBroker := NewMockBroker(t, 1)
1146	defer seedBroker.Close()
1147
1148	seedBroker.SetHandlerByMap(map[string]MockResponse{
1149		"MetadataRequest": NewMockMetadataResponse(t).
1150			SetController(seedBroker.BrokerID()).
1151			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1152		"ListGroupsRequest": NewMockListGroupsResponse(t).
1153			AddGroup("my-group", "consumer"),
1154	})
1155
1156	config := NewConfig()
1157	config.Version = V1_0_0_0
1158
1159	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1160	if err != nil {
1161		t.Fatal(err)
1162	}
1163
1164	groups, err := admin.ListConsumerGroups()
1165	if err != nil {
1166		t.Fatal(err)
1167	}
1168
1169	if len(groups) != 1 {
1170		t.Fatalf("Expected %v results, got %v", 1, len(groups))
1171	}
1172
1173	protocolType, ok := groups["my-group"]
1174
1175	if !ok {
1176		t.Fatal("Expected group to be returned, but it did not")
1177	}
1178
1179	if protocolType != "consumer" {
1180		t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
1181	}
1182
1183	err = admin.Close()
1184	if err != nil {
1185		t.Fatal(err)
1186	}
1187}
1188
1189func TestListConsumerGroupsMultiBroker(t *testing.T) {
1190	seedBroker := NewMockBroker(t, 1)
1191	defer seedBroker.Close()
1192
1193	secondBroker := NewMockBroker(t, 2)
1194	defer secondBroker.Close()
1195
1196	firstGroup := "first"
1197	secondGroup := "second"
1198	nonExistingGroup := "non-existing-group"
1199
1200	seedBroker.SetHandlerByMap(map[string]MockResponse{
1201		"MetadataRequest": NewMockMetadataResponse(t).
1202			SetController(seedBroker.BrokerID()).
1203			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
1204			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
1205		"ListGroupsRequest": NewMockListGroupsResponse(t).
1206			AddGroup(firstGroup, "consumer"),
1207	})
1208
1209	secondBroker.SetHandlerByMap(map[string]MockResponse{
1210		"MetadataRequest": NewMockMetadataResponse(t).
1211			SetController(seedBroker.BrokerID()).
1212			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
1213			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
1214		"ListGroupsRequest": NewMockListGroupsResponse(t).
1215			AddGroup(secondGroup, "consumer"),
1216	})
1217
1218	config := NewConfig()
1219	config.Version = V1_0_0_0
1220
1221	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1222	if err != nil {
1223		t.Fatal(err)
1224	}
1225
1226	groups, err := admin.ListConsumerGroups()
1227	if err != nil {
1228		t.Fatal(err)
1229	}
1230
1231	if len(groups) != 2 {
1232		t.Fatalf("Expected %v results, got %v", 1, len(groups))
1233	}
1234
1235	if _, found := groups[firstGroup]; !found {
1236		t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
1237	}
1238
1239	if _, found := groups[secondGroup]; !found {
1240		t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
1241	}
1242
1243	if _, found := groups[nonExistingGroup]; found {
1244		t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
1245	}
1246
1247	err = admin.Close()
1248	if err != nil {
1249		t.Fatal(err)
1250	}
1251}
1252
1253func TestListConsumerGroupOffsets(t *testing.T) {
1254	seedBroker := NewMockBroker(t, 1)
1255	defer seedBroker.Close()
1256
1257	group := "my-group"
1258	topic := "my-topic"
1259	partition := int32(0)
1260	expectedOffset := int64(0)
1261
1262	seedBroker.SetHandlerByMap(map[string]MockResponse{
1263		"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
1264		"MetadataRequest": NewMockMetadataResponse(t).
1265			SetController(seedBroker.BrokerID()).
1266			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1267		"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
1268	})
1269
1270	config := NewConfig()
1271	config.Version = V1_0_0_0
1272
1273	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1274	if err != nil {
1275		t.Fatal(err)
1276	}
1277
1278	response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
1279		topic: {0},
1280	})
1281	if err != nil {
1282		t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
1283	}
1284
1285	block := response.GetBlock(topic, partition)
1286	if block == nil {
1287		t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
1288	}
1289
1290	if block.Offset != expectedOffset {
1291		t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
1292	}
1293
1294	err = admin.Close()
1295	if err != nil {
1296		t.Fatal(err)
1297	}
1298}
1299
1300func TestDeleteConsumerGroup(t *testing.T) {
1301	seedBroker := NewMockBroker(t, 1)
1302	defer seedBroker.Close()
1303
1304	group := "my-group"
1305
1306	seedBroker.SetHandlerByMap(map[string]MockResponse{
1307		// "OffsetFetchRequest":  NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
1308		"DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
1309		"MetadataRequest": NewMockMetadataResponse(t).
1310			SetController(seedBroker.BrokerID()).
1311			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1312		"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
1313	})
1314
1315	config := NewConfig()
1316	config.Version = V1_1_0_0
1317
1318	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1319	if err != nil {
1320		t.Fatal(err)
1321	}
1322
1323	err = admin.DeleteConsumerGroup(group)
1324	if err != nil {
1325		t.Fatalf("DeleteConsumerGroup failed with error %v", err)
1326	}
1327}
1328
1329// TestRefreshMetaDataWithDifferentController ensures that the cached
1330// controller can be forcibly updated from Metadata by the admin client
1331func TestRefreshMetaDataWithDifferentController(t *testing.T) {
1332	seedBroker1 := NewMockBroker(t, 1)
1333	seedBroker2 := NewMockBroker(t, 2)
1334	defer seedBroker1.Close()
1335	defer seedBroker2.Close()
1336
1337	seedBroker1.SetHandlerByMap(map[string]MockResponse{
1338		"MetadataRequest": NewMockMetadataResponse(t).
1339			SetController(seedBroker1.BrokerID()).
1340			SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
1341			SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
1342	})
1343
1344	config := NewConfig()
1345	config.Version = V1_1_0_0
1346
1347	client, err := NewClient([]string{seedBroker1.Addr()}, config)
1348	if err != nil {
1349		t.Fatal(err)
1350	}
1351
1352	ca := clusterAdmin{client: client, conf: config}
1353
1354	if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() {
1355		t.Fatalf("expected cached controller to be %d rather than %d",
1356			seedBroker1.BrokerID(), b.ID())
1357	}
1358
1359	seedBroker1.SetHandlerByMap(map[string]MockResponse{
1360		"MetadataRequest": NewMockMetadataResponse(t).
1361			SetController(seedBroker2.BrokerID()).
1362			SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
1363			SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
1364	})
1365
1366	if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
1367		t.Fatalf("expected refreshed controller to be %d rather than %d",
1368			seedBroker2.BrokerID(), b.ID())
1369	}
1370
1371	if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() {
1372		t.Fatalf("expected cached controller to be %d rather than %d",
1373			seedBroker2.BrokerID(), b.ID())
1374	}
1375}
1376
1377func TestDescribeLogDirs(t *testing.T) {
1378	seedBroker := NewMockBroker(t, 1)
1379	defer seedBroker.Close()
1380
1381	seedBroker.SetHandlerByMap(map[string]MockResponse{
1382		"MetadataRequest": NewMockMetadataResponse(t).
1383			SetController(seedBroker.BrokerID()).
1384			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
1385		"DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t).
1386			SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}),
1387	})
1388
1389	config := NewConfig()
1390	config.Version = V1_0_0_0
1391
1392	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
1393	if err != nil {
1394		t.Fatal(err)
1395	}
1396
1397	logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()})
1398	if err != nil {
1399		t.Fatal(err)
1400	}
1401
1402	if len(logDirsPerBroker) != 1 {
1403		t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker))
1404	}
1405	logDirs := logDirsPerBroker[seedBroker.BrokerID()]
1406	if len(logDirs) != 1 {
1407		t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs))
1408	}
1409	logDirsBroker := logDirs[0]
1410	if logDirsBroker.ErrorCode != ErrNoError {
1411		t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode)
1412	}
1413	if logDirsBroker.Path != "/tmp/logs" {
1414		t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path)
1415	}
1416	if len(logDirsBroker.Topics) != 2 {
1417		t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics))
1418	}
1419	err = admin.Close()
1420	if err != nil {
1421		t.Fatal(err)
1422	}
1423}
1424