1package sarama
2
3import (
4	"testing"
5)
6
7func TestFuncOffsetManager(t *testing.T) {
8	checkKafkaVersion(t, "0.8.2")
9	setupFunctionalTest(t)
10	defer teardownFunctionalTest(t)
11
12	client, err := NewClient(kafkaBrokers, nil)
13	if err != nil {
14		t.Fatal(err)
15	}
16
17	offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
18	if err != nil {
19		t.Fatal(err)
20	}
21
22	pom1, err := offsetManager.ManagePartition("test.1", 0)
23	if err != nil {
24		t.Fatal(err)
25	}
26
27	pom1.MarkOffset(10, "test metadata")
28	safeClose(t, pom1)
29
30	pom2, err := offsetManager.ManagePartition("test.1", 0)
31	if err != nil {
32		t.Fatal(err)
33	}
34
35	offset, metadata := pom2.NextOffset()
36
37	if offset != 10 {
38		t.Errorf("Expected the next offset to be 10, found %d.", offset)
39	}
40	if metadata != "test metadata" {
41		t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
42	}
43
44	safeClose(t, pom2)
45	safeClose(t, offsetManager)
46	safeClose(t, client)
47}
48