1package sarama 2 3import ( 4 "crypto/rand" 5 "hash/fnv" 6 "log" 7 "testing" 8) 9 10func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message *ProducerMessage, numPartitions int32) { 11 choice, err := partitioner.Partition(message, numPartitions) 12 if err != nil { 13 t.Error(partitioner, err) 14 } 15 if choice < 0 || choice >= numPartitions { 16 t.Error(partitioner, "returned partition", choice, "outside of range for", message) 17 } 18 for i := 1; i < 50; i++ { 19 newChoice, err := partitioner.Partition(message, numPartitions) 20 if err != nil { 21 t.Error(partitioner, err) 22 } 23 if newChoice != choice { 24 t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".") 25 } 26 } 27} 28 29func TestRandomPartitioner(t *testing.T) { 30 partitioner := NewRandomPartitioner("mytopic") 31 32 choice, err := partitioner.Partition(nil, 1) 33 if err != nil { 34 t.Error(partitioner, err) 35 } 36 if choice != 0 { 37 t.Error("Returned non-zero partition when only one available.") 38 } 39 40 for i := 1; i < 50; i++ { 41 choice, err := partitioner.Partition(nil, 50) 42 if err != nil { 43 t.Error(partitioner, err) 44 } 45 if choice < 0 || choice >= 50 { 46 t.Error("Returned partition", choice, "outside of range.") 47 } 48 } 49} 50 51func TestRoundRobinPartitioner(t *testing.T) { 52 partitioner := NewRoundRobinPartitioner("mytopic") 53 54 choice, err := partitioner.Partition(nil, 1) 55 if err != nil { 56 t.Error(partitioner, err) 57 } 58 if choice != 0 { 59 t.Error("Returned non-zero partition when only one available.") 60 } 61 62 var i int32 63 for i = 1; i < 50; i++ { 64 choice, err := partitioner.Partition(nil, 7) 65 if err != nil { 66 t.Error(partitioner, err) 67 } 68 if choice != i%7 { 69 t.Error("Returned partition", choice, "expecting", i%7) 70 } 71 } 72} 73 74func TestNewHashPartitionerWithHasher(t *testing.T) { 75 // use the current default hasher fnv.New32a() 76 partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") 77 78 choice, err := partitioner.Partition(&ProducerMessage{}, 1) 79 if err != nil { 80 t.Error(partitioner, err) 81 } 82 if choice != 0 { 83 t.Error("Returned non-zero partition when only one available.") 84 } 85 86 for i := 1; i < 50; i++ { 87 choice, err := partitioner.Partition(&ProducerMessage{}, 50) 88 if err != nil { 89 t.Error(partitioner, err) 90 } 91 if choice < 0 || choice >= 50 { 92 t.Error("Returned partition", choice, "outside of range for nil key.") 93 } 94 } 95 96 buf := make([]byte, 256) 97 for i := 1; i < 50; i++ { 98 if _, err := rand.Read(buf); err != nil { 99 t.Error(err) 100 } 101 assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50) 102 } 103} 104 105func TestHashPartitionerWithHasherMinInt32(t *testing.T) { 106 // use the current default hasher fnv.New32a() 107 partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") 108 109 msg := ProducerMessage{} 110 // "1468509572224" generates 2147483648 (uint32) result from Sum32 function 111 // which is -2147483648 or int32's min value 112 msg.Key = StringEncoder("1468509572224") 113 114 choice, err := partitioner.Partition(&msg, 50) 115 if err != nil { 116 t.Error(partitioner, err) 117 } 118 if choice < 0 || choice >= 50 { 119 t.Error("Returned partition", choice, "outside of range for nil key.") 120 } 121} 122 123func TestHashPartitioner(t *testing.T) { 124 partitioner := NewHashPartitioner("mytopic") 125 126 choice, err := partitioner.Partition(&ProducerMessage{}, 1) 127 if err != nil { 128 t.Error(partitioner, err) 129 } 130 if choice != 0 { 131 t.Error("Returned non-zero partition when only one available.") 132 } 133 134 for i := 1; i < 50; i++ { 135 choice, err := partitioner.Partition(&ProducerMessage{}, 50) 136 if err != nil { 137 t.Error(partitioner, err) 138 } 139 if choice < 0 || choice >= 50 { 140 t.Error("Returned partition", choice, "outside of range for nil key.") 141 } 142 } 143 144 buf := make([]byte, 256) 145 for i := 1; i < 50; i++ { 146 if _, err := rand.Read(buf); err != nil { 147 t.Error(err) 148 } 149 assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50) 150 } 151} 152 153func TestHashPartitionerConsistency(t *testing.T) { 154 partitioner := NewHashPartitioner("mytopic") 155 ep, ok := partitioner.(DynamicConsistencyPartitioner) 156 157 if !ok { 158 t.Error("Hash partitioner does not implement DynamicConsistencyPartitioner") 159 } 160 161 consistency := ep.MessageRequiresConsistency(&ProducerMessage{Key: StringEncoder("hi")}) 162 if !consistency { 163 t.Error("Messages with keys should require consistency") 164 } 165 consistency = ep.MessageRequiresConsistency(&ProducerMessage{}) 166 if consistency { 167 t.Error("Messages without keys should require consistency") 168 } 169} 170 171func TestHashPartitionerMinInt32(t *testing.T) { 172 partitioner := NewHashPartitioner("mytopic") 173 174 msg := ProducerMessage{} 175 // "1468509572224" generates 2147483648 (uint32) result from Sum32 function 176 // which is -2147483648 or int32's min value 177 msg.Key = StringEncoder("1468509572224") 178 179 choice, err := partitioner.Partition(&msg, 50) 180 if err != nil { 181 t.Error(partitioner, err) 182 } 183 if choice < 0 || choice >= 50 { 184 t.Error("Returned partition", choice, "outside of range for nil key.") 185 } 186} 187 188func TestManualPartitioner(t *testing.T) { 189 partitioner := NewManualPartitioner("mytopic") 190 191 choice, err := partitioner.Partition(&ProducerMessage{}, 1) 192 if err != nil { 193 t.Error(partitioner, err) 194 } 195 if choice != 0 { 196 t.Error("Returned non-zero partition when only one available.") 197 } 198 199 for i := int32(1); i < 50; i++ { 200 choice, err := partitioner.Partition(&ProducerMessage{Partition: i}, 50) 201 if err != nil { 202 t.Error(partitioner, err) 203 } 204 if choice != i { 205 t.Error("Returned partition not the same as the input partition") 206 } 207 } 208} 209 210// By default, Sarama uses the message's key to consistently assign a partition to 211// a message using hashing. If no key is set, a random partition will be chosen. 212// This example shows how you can partition messages randomly, even when a key is set, 213// by overriding Config.Producer.Partitioner. 214func ExamplePartitioner_random() { 215 config := NewConfig() 216 config.Producer.Partitioner = NewRandomPartitioner 217 218 producer, err := NewSyncProducer([]string{"localhost:9092"}, config) 219 if err != nil { 220 log.Fatal(err) 221 } 222 defer func() { 223 if err := producer.Close(); err != nil { 224 log.Println("Failed to close producer:", err) 225 } 226 }() 227 228 msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")} 229 partition, offset, err := producer.SendMessage(msg) 230 if err != nil { 231 log.Fatalln("Failed to produce message to kafka cluster.") 232 } 233 234 log.Printf("Produced message to partition %d with offset %d", partition, offset) 235} 236 237// This example shows how to assign partitions to your messages manually. 238func ExamplePartitioner_manual() { 239 config := NewConfig() 240 241 // First, we tell the producer that we are going to partition ourselves. 242 config.Producer.Partitioner = NewManualPartitioner 243 244 producer, err := NewSyncProducer([]string{"localhost:9092"}, config) 245 if err != nil { 246 log.Fatal(err) 247 } 248 defer func() { 249 if err := producer.Close(); err != nil { 250 log.Println("Failed to close producer:", err) 251 } 252 }() 253 254 // Now, we set the Partition field of the ProducerMessage struct. 255 msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")} 256 257 partition, offset, err := producer.SendMessage(msg) 258 if err != nil { 259 log.Fatalln("Failed to produce message to kafka cluster.") 260 } 261 262 if partition != 6 { 263 log.Fatal("Message should have been produced to partition 6!") 264 } 265 266 log.Printf("Produced message to partition %d with offset %d", partition, offset) 267} 268 269// This example shows how to set a different partitioner depending on the topic. 270func ExamplePartitioner_per_topic() { 271 config := NewConfig() 272 config.Producer.Partitioner = func(topic string) Partitioner { 273 switch topic { 274 case "access_log", "error_log": 275 return NewRandomPartitioner(topic) 276 277 default: 278 return NewHashPartitioner(topic) 279 } 280 } 281 282 // ... 283} 284