1package sarama 2 3import ( 4 "testing" 5) 6 7func TestFuncOffsetManager(t *testing.T) { 8 checkKafkaVersion(t, "0.8.2") 9 setupFunctionalTest(t) 10 defer teardownFunctionalTest(t) 11 12 client, err := NewClient(kafkaBrokers, nil) 13 if err != nil { 14 t.Fatal(err) 15 } 16 17 offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client) 18 if err != nil { 19 t.Fatal(err) 20 } 21 22 pom1, err := offsetManager.ManagePartition("test.1", 0) 23 if err != nil { 24 t.Fatal(err) 25 } 26 27 pom1.MarkOffset(10, "test metadata") 28 safeClose(t, pom1) 29 30 pom2, err := offsetManager.ManagePartition("test.1", 0) 31 if err != nil { 32 t.Fatal(err) 33 } 34 35 offset, metadata := pom2.NextOffset() 36 37 if offset != 10 { 38 t.Errorf("Expected the next offset to be 10, found %d.", offset) 39 } 40 if metadata != "test metadata" { 41 t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata) 42 } 43 44 safeClose(t, pom2) 45 safeClose(t, offsetManager) 46 safeClose(t, client) 47} 48