1package sarama
2
3import (
4	"crypto/rand"
5	"hash/fnv"
6	"log"
7	"testing"
8)
9
10func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message *ProducerMessage, numPartitions int32) {
11	choice, err := partitioner.Partition(message, numPartitions)
12	if err != nil {
13		t.Error(partitioner, err)
14	}
15	if choice < 0 || choice >= numPartitions {
16		t.Error(partitioner, "returned partition", choice, "outside of range for", message)
17	}
18	for i := 1; i < 50; i++ {
19		newChoice, err := partitioner.Partition(message, numPartitions)
20		if err != nil {
21			t.Error(partitioner, err)
22		}
23		if newChoice != choice {
24			t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
25		}
26	}
27}
28
29func TestRandomPartitioner(t *testing.T) {
30	partitioner := NewRandomPartitioner("mytopic")
31
32	choice, err := partitioner.Partition(nil, 1)
33	if err != nil {
34		t.Error(partitioner, err)
35	}
36	if choice != 0 {
37		t.Error("Returned non-zero partition when only one available.")
38	}
39
40	for i := 1; i < 50; i++ {
41		choice, err := partitioner.Partition(nil, 50)
42		if err != nil {
43			t.Error(partitioner, err)
44		}
45		if choice < 0 || choice >= 50 {
46			t.Error("Returned partition", choice, "outside of range.")
47		}
48	}
49}
50
51func TestRoundRobinPartitioner(t *testing.T) {
52	partitioner := NewRoundRobinPartitioner("mytopic")
53
54	choice, err := partitioner.Partition(nil, 1)
55	if err != nil {
56		t.Error(partitioner, err)
57	}
58	if choice != 0 {
59		t.Error("Returned non-zero partition when only one available.")
60	}
61
62	var i int32
63	for i = 1; i < 50; i++ {
64		choice, err := partitioner.Partition(nil, 7)
65		if err != nil {
66			t.Error(partitioner, err)
67		}
68		if choice != i%7 {
69			t.Error("Returned partition", choice, "expecting", i%7)
70		}
71	}
72}
73
74func TestNewHashPartitionerWithHasher(t *testing.T) {
75	// use the current default hasher fnv.New32a()
76	partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
77
78	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
79	if err != nil {
80		t.Error(partitioner, err)
81	}
82	if choice != 0 {
83		t.Error("Returned non-zero partition when only one available.")
84	}
85
86	for i := 1; i < 50; i++ {
87		choice, err := partitioner.Partition(&ProducerMessage{}, 50)
88		if err != nil {
89			t.Error(partitioner, err)
90		}
91		if choice < 0 || choice >= 50 {
92			t.Error("Returned partition", choice, "outside of range for nil key.")
93		}
94	}
95
96	buf := make([]byte, 256)
97	for i := 1; i < 50; i++ {
98		if _, err := rand.Read(buf); err != nil {
99			t.Error(err)
100		}
101		assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
102	}
103}
104
105func TestHashPartitionerWithHasherMinInt32(t *testing.T) {
106	// use the current default hasher fnv.New32a()
107	partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic")
108
109	msg := ProducerMessage{}
110	// "1468509572224" generates 2147483648 (uint32) result from Sum32 function
111	// which is -2147483648 or int32's min value
112	msg.Key = StringEncoder("1468509572224")
113
114	choice, err := partitioner.Partition(&msg, 50)
115	if err != nil {
116		t.Error(partitioner, err)
117	}
118	if choice < 0 || choice >= 50 {
119		t.Error("Returned partition", choice, "outside of range for nil key.")
120	}
121}
122
123func TestHashPartitioner(t *testing.T) {
124	partitioner := NewHashPartitioner("mytopic")
125
126	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
127	if err != nil {
128		t.Error(partitioner, err)
129	}
130	if choice != 0 {
131		t.Error("Returned non-zero partition when only one available.")
132	}
133
134	for i := 1; i < 50; i++ {
135		choice, err := partitioner.Partition(&ProducerMessage{}, 50)
136		if err != nil {
137			t.Error(partitioner, err)
138		}
139		if choice < 0 || choice >= 50 {
140			t.Error("Returned partition", choice, "outside of range for nil key.")
141		}
142	}
143
144	buf := make([]byte, 256)
145	for i := 1; i < 50; i++ {
146		if _, err := rand.Read(buf); err != nil {
147			t.Error(err)
148		}
149		assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
150	}
151}
152
153func TestHashPartitionerConsistency(t *testing.T) {
154	partitioner := NewHashPartitioner("mytopic")
155	ep, ok := partitioner.(DynamicConsistencyPartitioner)
156
157	if !ok {
158		t.Error("Hash partitioner does not implement DynamicConsistencyPartitioner")
159	}
160
161	consistency := ep.MessageRequiresConsistency(&ProducerMessage{Key: StringEncoder("hi")})
162	if !consistency {
163		t.Error("Messages with keys should require consistency")
164	}
165	consistency = ep.MessageRequiresConsistency(&ProducerMessage{})
166	if consistency {
167		t.Error("Messages without keys should require consistency")
168	}
169}
170
171func TestHashPartitionerMinInt32(t *testing.T) {
172	partitioner := NewHashPartitioner("mytopic")
173
174	msg := ProducerMessage{}
175	// "1468509572224" generates 2147483648 (uint32) result from Sum32 function
176	// which is -2147483648 or int32's min value
177	msg.Key = StringEncoder("1468509572224")
178
179	choice, err := partitioner.Partition(&msg, 50)
180	if err != nil {
181		t.Error(partitioner, err)
182	}
183	if choice < 0 || choice >= 50 {
184		t.Error("Returned partition", choice, "outside of range for nil key.")
185	}
186}
187
188func TestManualPartitioner(t *testing.T) {
189	partitioner := NewManualPartitioner("mytopic")
190
191	choice, err := partitioner.Partition(&ProducerMessage{}, 1)
192	if err != nil {
193		t.Error(partitioner, err)
194	}
195	if choice != 0 {
196		t.Error("Returned non-zero partition when only one available.")
197	}
198
199	for i := int32(1); i < 50; i++ {
200		choice, err := partitioner.Partition(&ProducerMessage{Partition: i}, 50)
201		if err != nil {
202			t.Error(partitioner, err)
203		}
204		if choice != i {
205			t.Error("Returned partition not the same as the input partition")
206		}
207	}
208}
209
210// By default, Sarama uses the message's key to consistently assign a partition to
211// a message using hashing. If no key is set, a random partition will be chosen.
212// This example shows how you can partition messages randomly, even when a key is set,
213// by overriding Config.Producer.Partitioner.
214func ExamplePartitioner_random() {
215	config := NewConfig()
216	config.Producer.Partitioner = NewRandomPartitioner
217
218	producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
219	if err != nil {
220		log.Fatal(err)
221	}
222	defer func() {
223		if err := producer.Close(); err != nil {
224			log.Println("Failed to close producer:", err)
225		}
226	}()
227
228	msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
229	partition, offset, err := producer.SendMessage(msg)
230	if err != nil {
231		log.Fatalln("Failed to produce message to kafka cluster.")
232	}
233
234	log.Printf("Produced message to partition %d with offset %d", partition, offset)
235}
236
237// This example shows how to assign partitions to your messages manually.
238func ExamplePartitioner_manual() {
239	config := NewConfig()
240
241	// First, we tell the producer that we are going to partition ourselves.
242	config.Producer.Partitioner = NewManualPartitioner
243
244	producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
245	if err != nil {
246		log.Fatal(err)
247	}
248	defer func() {
249		if err := producer.Close(); err != nil {
250			log.Println("Failed to close producer:", err)
251		}
252	}()
253
254	// Now, we set the Partition field of the ProducerMessage struct.
255	msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}
256
257	partition, offset, err := producer.SendMessage(msg)
258	if err != nil {
259		log.Fatalln("Failed to produce message to kafka cluster.")
260	}
261
262	if partition != 6 {
263		log.Fatal("Message should have been produced to partition 6!")
264	}
265
266	log.Printf("Produced message to partition %d with offset %d", partition, offset)
267}
268
269// This example shows how to set a different partitioner depending on the topic.
270func ExamplePartitioner_per_topic() {
271	config := NewConfig()
272	config.Producer.Partitioner = func(topic string) Partitioner {
273		switch topic {
274		case "access_log", "error_log":
275			return NewRandomPartitioner(topic)
276
277		default:
278			return NewHashPartitioner(topic)
279		}
280	}
281
282	// ...
283}
284