1package sarama
2
3import (
4	"fmt"
5	"math"
6	"os"
7	"sort"
8	"sync"
9	"testing"
10	"time"
11
12	"github.com/stretchr/testify/require"
13)
14
15func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
16	setupFunctionalTest(t)
17	defer teardownFunctionalTest(t)
18
19	consumer, err := NewConsumer(kafkaBrokers, nil)
20	if err != nil {
21		t.Fatal(err)
22	}
23
24	if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
25		t.Error("Expected ErrOffsetOutOfRange, got:", err)
26	}
27
28	if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
29		t.Error("Expected ErrOffsetOutOfRange, got:", err)
30	}
31
32	safeClose(t, consumer)
33}
34
35func TestConsumerHighWaterMarkOffset(t *testing.T) {
36	setupFunctionalTest(t)
37	defer teardownFunctionalTest(t)
38
39	p, err := NewSyncProducer(kafkaBrokers, nil)
40	if err != nil {
41		t.Fatal(err)
42	}
43	defer safeClose(t, p)
44
45	_, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
46	if err != nil {
47		t.Fatal(err)
48	}
49
50	c, err := NewConsumer(kafkaBrokers, nil)
51	if err != nil {
52		t.Fatal(err)
53	}
54	defer safeClose(t, c)
55
56	pc, err := c.ConsumePartition("test.1", 0, offset)
57	if err != nil {
58		t.Fatal(err)
59	}
60
61	<-pc.Messages()
62
63	if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
64		t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
65	}
66
67	safeClose(t, pc)
68}
69
70// Makes sure that messages produced by all supported client versions/
71// compression codecs (except LZ4) combinations can be consumed by all
72// supported consumer versions. It relies on the KAFKA_VERSION environment
73// variable to provide the version of the test Kafka cluster.
74//
75// Note that LZ4 codec was introduced in v0.10.0.0 and therefore is excluded
76// from this test case. It has a similar version matrix test case below that
77// only checks versions from v0.10.0.0 until KAFKA_VERSION.
78func TestVersionMatrix(t *testing.T) {
79	setupFunctionalTest(t)
80	defer teardownFunctionalTest(t)
81
82	// Produce lot's of message with all possible combinations of supported
83	// protocol versions and compressions for the except of LZ4.
84	testVersions := versionRange(V0_8_2_0)
85	allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
86	producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false)
87
88	// When/Then
89	consumeMsgs(t, testVersions, producedMessages)
90}
91
92// Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
93// test LZ4 should start with v0.10.0.0.
94func TestVersionMatrixLZ4(t *testing.T) {
95	setupFunctionalTest(t)
96	defer teardownFunctionalTest(t)
97
98	// Produce lot's of message with all possible combinations of supported
99	// protocol versions starting with v0.10 (first where LZ4 was supported)
100	// and all possible compressions.
101	testVersions := versionRange(V0_10_0_0)
102	allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
103	producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
104
105	// When/Then
106	consumeMsgs(t, testVersions, producedMessages)
107}
108
109// Support for zstd codec was introduced in v2.1.0.0
110func TestVersionMatrixZstd(t *testing.T) {
111	setupFunctionalTest(t)
112	defer teardownFunctionalTest(t)
113
114	// Produce lot's of message with all possible combinations of supported
115	// protocol versions starting with v2.1.0.0 (first where zstd was supported)
116	testVersions := versionRange(V2_1_0_0)
117	allCodecs := []CompressionCodec{CompressionZSTD}
118	producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
119
120	// When/Then
121	consumeMsgs(t, testVersions, producedMessages)
122}
123
124func TestVersionMatrixIdempotent(t *testing.T) {
125	setupFunctionalTest(t)
126	defer teardownFunctionalTest(t)
127
128	// Produce lot's of message with all possible combinations of supported
129	// protocol versions starting with v0.11 (first where idempotent was supported)
130	testVersions := versionRange(V0_11_0_0)
131	producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true)
132
133	// When/Then
134	consumeMsgs(t, testVersions, producedMessages)
135}
136
137func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
138	checkKafkaVersion(t, "0.11.0")
139	setupFunctionalTest(t)
140	defer teardownFunctionalTest(t)
141
142	config := NewConfig()
143	config.Consumer.IsolationLevel = ReadCommitted
144	config.Version = V0_11_0_0
145
146	consumer, err := NewConsumer(kafkaBrokers, config)
147	if err != nil {
148		t.Fatal(err)
149	}
150
151	pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
152	require.NoError(t, err)
153
154	msgChannel := pc.Messages()
155	for i := 1; i <= 6; i++ {
156		msg := <-msgChannel
157		require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
158	}
159}
160
161func prodMsg2Str(prodMsg *ProducerMessage) string {
162	return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
163}
164
165func consMsg2Str(consMsg *ConsumerMessage) string {
166	return fmt.Sprintf("{offset: %d, value: %s}", consMsg.Offset, string(consMsg.Value))
167}
168
169func versionRange(lower KafkaVersion) []KafkaVersion {
170	// Get the test cluster version from the environment. If there is nothing
171	// there then assume the highest.
172	upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION"))
173	if err != nil {
174		upper = MaxVersion
175	}
176
177	versions := make([]KafkaVersion, 0, len(SupportedVersions))
178	for _, v := range SupportedVersions {
179		if !v.IsAtLeast(lower) {
180			continue
181		}
182		if !upper.IsAtLeast(v) {
183			return versions
184		}
185		versions = append(versions, v)
186	}
187	return versions
188}
189
190func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
191	var wg sync.WaitGroup
192	var producedMessagesMu sync.Mutex
193	var producedMessages []*ProducerMessage
194	for _, prodVer := range clientVersions {
195		for _, codec := range codecs {
196			prodCfg := NewConfig()
197			prodCfg.Version = prodVer
198			prodCfg.Producer.Return.Successes = true
199			prodCfg.Producer.Return.Errors = true
200			prodCfg.Producer.Flush.MaxMessages = flush
201			prodCfg.Producer.Compression = codec
202			prodCfg.Producer.Idempotent = idempotent
203			if idempotent {
204				prodCfg.Producer.RequiredAcks = WaitForAll
205				prodCfg.Net.MaxOpenRequests = 1
206			}
207
208			p, err := NewSyncProducer(kafkaBrokers, prodCfg)
209			if err != nil {
210				t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
211				continue
212			}
213			defer safeClose(t, p)
214			for i := 0; i < countPerVerCodec; i++ {
215				msg := &ProducerMessage{
216					Topic: "test.1",
217					Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
218				}
219				wg.Add(1)
220				go func() {
221					defer wg.Done()
222					_, _, err := p.SendMessage(msg)
223					if err != nil {
224						t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
225					}
226					producedMessagesMu.Lock()
227					producedMessages = append(producedMessages, msg)
228					producedMessagesMu.Unlock()
229				}()
230			}
231		}
232	}
233	wg.Wait()
234
235	// Sort produced message in ascending offset order.
236	sort.Slice(producedMessages, func(i, j int) bool {
237		return producedMessages[i].Offset < producedMessages[j].Offset
238	})
239	t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
240		len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
241	return producedMessages
242}
243
244func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
245	// Consume all produced messages with all client versions supported by the
246	// cluster.
247consumerVersionLoop:
248	for _, consVer := range clientVersions {
249		t.Logf("*** Consuming with client version %s\n", consVer)
250		// Create a partition consumer that should start from the first produced
251		// message.
252		consCfg := NewConfig()
253		consCfg.Version = consVer
254		c, err := NewConsumer(kafkaBrokers, consCfg)
255		if err != nil {
256			t.Fatal(err)
257		}
258		defer safeClose(t, c)
259		pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
260		if err != nil {
261			t.Fatal(err)
262		}
263		defer safeClose(t, pc)
264
265		// Consume as many messages as there have been produced and make sure that
266		// order is preserved.
267		for i, prodMsg := range producedMessages {
268			select {
269			case consMsg := <-pc.Messages():
270				if consMsg.Offset != prodMsg.Offset {
271					t.Errorf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
272						consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
273					continue consumerVersionLoop
274				}
275				if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
276					t.Errorf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
277						consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
278					continue consumerVersionLoop
279				}
280			case <-time.After(3 * time.Second):
281				t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
282			}
283		}
284	}
285}
286