1package sarama 2 3import ( 4 "fmt" 5 "math" 6 "os" 7 "sort" 8 "sync" 9 "testing" 10 "time" 11 12 "github.com/stretchr/testify/require" 13) 14 15func TestFuncConsumerOffsetOutOfRange(t *testing.T) { 16 setupFunctionalTest(t) 17 defer teardownFunctionalTest(t) 18 19 consumer, err := NewConsumer(kafkaBrokers, nil) 20 if err != nil { 21 t.Fatal(err) 22 } 23 24 if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange { 25 t.Error("Expected ErrOffsetOutOfRange, got:", err) 26 } 27 28 if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange { 29 t.Error("Expected ErrOffsetOutOfRange, got:", err) 30 } 31 32 safeClose(t, consumer) 33} 34 35func TestConsumerHighWaterMarkOffset(t *testing.T) { 36 setupFunctionalTest(t) 37 defer teardownFunctionalTest(t) 38 39 p, err := NewSyncProducer(kafkaBrokers, nil) 40 if err != nil { 41 t.Fatal(err) 42 } 43 defer safeClose(t, p) 44 45 _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")}) 46 if err != nil { 47 t.Fatal(err) 48 } 49 50 c, err := NewConsumer(kafkaBrokers, nil) 51 if err != nil { 52 t.Fatal(err) 53 } 54 defer safeClose(t, c) 55 56 pc, err := c.ConsumePartition("test.1", 0, offset) 57 if err != nil { 58 t.Fatal(err) 59 } 60 61 <-pc.Messages() 62 63 if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 { 64 t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo) 65 } 66 67 safeClose(t, pc) 68} 69 70// Makes sure that messages produced by all supported client versions/ 71// compression codecs (except LZ4) combinations can be consumed by all 72// supported consumer versions. It relies on the KAFKA_VERSION environment 73// variable to provide the version of the test Kafka cluster. 74// 75// Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded 76// from this test case. It has a similar version matrix test case below that 77// only checks versions from v0.10.0.0 until KAFKA_VERSION. 78func TestVersionMatrix(t *testing.T) { 79 setupFunctionalTest(t) 80 defer teardownFunctionalTest(t) 81 82 // Produce lot's of message with all possible combinations of supported 83 // protocol versions and compressions for the except of LZ4. 84 testVersions := versionRange(V0_8_2_0) 85 allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy} 86 producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false) 87 88 // When/Then 89 consumeMsgs(t, testVersions, producedMessages) 90} 91 92// Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to 93// test LZ4 should start with v0.10.0.0. 94func TestVersionMatrixLZ4(t *testing.T) { 95 setupFunctionalTest(t) 96 defer teardownFunctionalTest(t) 97 98 // Produce lot's of message with all possible combinations of supported 99 // protocol versions starting with v0.10 (first where LZ4 was supported) 100 // and all possible compressions. 101 testVersions := versionRange(V0_10_0_0) 102 allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4} 103 producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false) 104 105 // When/Then 106 consumeMsgs(t, testVersions, producedMessages) 107} 108 109// Support for zstd codec was introduced in v2.1.0.0 110func TestVersionMatrixZstd(t *testing.T) { 111 setupFunctionalTest(t) 112 defer teardownFunctionalTest(t) 113 114 // Produce lot's of message with all possible combinations of supported 115 // protocol versions starting with v2.1.0.0 (first where zstd was supported) 116 testVersions := versionRange(V2_1_0_0) 117 allCodecs := []CompressionCodec{CompressionZSTD} 118 producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false) 119 120 // When/Then 121 consumeMsgs(t, testVersions, producedMessages) 122} 123 124func TestVersionMatrixIdempotent(t *testing.T) { 125 setupFunctionalTest(t) 126 defer teardownFunctionalTest(t) 127 128 // Produce lot's of message with all possible combinations of supported 129 // protocol versions starting with v0.11 (first where idempotent was supported) 130 testVersions := versionRange(V0_11_0_0) 131 producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true) 132 133 // When/Then 134 consumeMsgs(t, testVersions, producedMessages) 135} 136 137func TestReadOnlyAndAllCommittedMessages(t *testing.T) { 138 checkKafkaVersion(t, "0.11.0") 139 setupFunctionalTest(t) 140 defer teardownFunctionalTest(t) 141 142 config := NewConfig() 143 config.Consumer.IsolationLevel = ReadCommitted 144 config.Version = V0_11_0_0 145 146 consumer, err := NewConsumer(kafkaBrokers, config) 147 if err != nil { 148 t.Fatal(err) 149 } 150 151 pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest) 152 require.NoError(t, err) 153 154 msgChannel := pc.Messages() 155 for i := 1; i <= 6; i++ { 156 msg := <-msgChannel 157 require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value)) 158 } 159} 160 161func prodMsg2Str(prodMsg *ProducerMessage) string { 162 return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder))) 163} 164 165func consMsg2Str(consMsg *ConsumerMessage) string { 166 return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value)) 167} 168 169func versionRange(lower KafkaVersion) []KafkaVersion { 170 // Get the test cluster version from the environment. If there is nothing 171 // there then assume the highest. 172 upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")) 173 if err != nil { 174 upper = MaxVersion 175 } 176 177 versions := make([]KafkaVersion, 0, len(SupportedVersions)) 178 for _, v := range SupportedVersions { 179 if !v.IsAtLeast(lower) { 180 continue 181 } 182 if !upper.IsAtLeast(v) { 183 return versions 184 } 185 versions = append(versions, v) 186 } 187 return versions 188} 189 190func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage { 191 var wg sync.WaitGroup 192 var producedMessagesMu sync.Mutex 193 var producedMessages []*ProducerMessage 194 for _, prodVer := range clientVersions { 195 for _, codec := range codecs { 196 prodCfg := NewConfig() 197 prodCfg.Version = prodVer 198 prodCfg.Producer.Return.Successes = true 199 prodCfg.Producer.Return.Errors = true 200 prodCfg.Producer.Flush.MaxMessages = flush 201 prodCfg.Producer.Compression = codec 202 prodCfg.Producer.Idempotent = idempotent 203 if idempotent { 204 prodCfg.Producer.RequiredAcks = WaitForAll 205 prodCfg.Net.MaxOpenRequests = 1 206 } 207 208 p, err := NewSyncProducer(kafkaBrokers, prodCfg) 209 if err != nil { 210 t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) 211 continue 212 } 213 defer safeClose(t, p) 214 for i := 0; i < countPerVerCodec; i++ { 215 msg := &ProducerMessage{ 216 Topic: "test.1", 217 Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)), 218 } 219 wg.Add(1) 220 go func() { 221 defer wg.Done() 222 _, _, err := p.SendMessage(msg) 223 if err != nil { 224 t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err) 225 } 226 producedMessagesMu.Lock() 227 producedMessages = append(producedMessages, msg) 228 producedMessagesMu.Unlock() 229 }() 230 } 231 } 232 } 233 wg.Wait() 234 235 // Sort produced message in ascending offset order. 236 sort.Slice(producedMessages, func(i, j int) bool { 237 return producedMessages[i].Offset < producedMessages[j].Offset 238 }) 239 t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n", 240 len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset) 241 return producedMessages 242} 243 244func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) { 245 // Consume all produced messages with all client versions supported by the 246 // cluster. 247consumerVersionLoop: 248 for _, consVer := range clientVersions { 249 t.Logf("*** Consuming with client version %s\n", consVer) 250 // Create a partition consumer that should start from the first produced 251 // message. 252 consCfg := NewConfig() 253 consCfg.Version = consVer 254 c, err := NewConsumer(kafkaBrokers, consCfg) 255 if err != nil { 256 t.Fatal(err) 257 } 258 defer safeClose(t, c) 259 pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset) 260 if err != nil { 261 t.Fatal(err) 262 } 263 defer safeClose(t, pc) 264 265 // Consume as many messages as there have been produced and make sure that 266 // order is preserved. 267 for i, prodMsg := range producedMessages { 268 select { 269 case consMsg := <-pc.Messages(): 270 if consMsg.Offset != prodMsg.Offset { 271 t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s", 272 consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) 273 continue consumerVersionLoop 274 } 275 if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) { 276 t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s", 277 consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) 278 continue consumerVersionLoop 279 } 280 case <-time.After(3 * time.Second): 281 t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value) 282 } 283 } 284 } 285} 286