1package sarama 2 3//ConsumerGroupMemberMetadata holds the metadata for consumer group 4type ConsumerGroupMemberMetadata struct { 5 Version int16 6 Topics []string 7 UserData []byte 8} 9 10func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error { 11 pe.putInt16(m.Version) 12 13 if err := pe.putStringArray(m.Topics); err != nil { 14 return err 15 } 16 17 if err := pe.putBytes(m.UserData); err != nil { 18 return err 19 } 20 21 return nil 22} 23 24func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { 25 if m.Version, err = pd.getInt16(); err != nil { 26 return 27 } 28 29 if m.Topics, err = pd.getStringArray(); err != nil { 30 return 31 } 32 33 if m.UserData, err = pd.getBytes(); err != nil { 34 return 35 } 36 37 return nil 38} 39 40//ConsumerGroupMemberAssignment holds the member assignment for a consume group 41type ConsumerGroupMemberAssignment struct { 42 Version int16 43 Topics map[string][]int32 44 UserData []byte 45} 46 47func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error { 48 pe.putInt16(m.Version) 49 50 if err := pe.putArrayLength(len(m.Topics)); err != nil { 51 return err 52 } 53 54 for topic, partitions := range m.Topics { 55 if err := pe.putString(topic); err != nil { 56 return err 57 } 58 if err := pe.putInt32Array(partitions); err != nil { 59 return err 60 } 61 } 62 63 if err := pe.putBytes(m.UserData); err != nil { 64 return err 65 } 66 67 return nil 68} 69 70func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) { 71 if m.Version, err = pd.getInt16(); err != nil { 72 return 73 } 74 75 var topicLen int 76 if topicLen, err = pd.getArrayLength(); err != nil { 77 return 78 } 79 80 m.Topics = make(map[string][]int32, topicLen) 81 for i := 0; i < topicLen; i++ { 82 var topic string 83 if topic, err = pd.getString(); err != nil { 84 return 85 } 86 if m.Topics[topic], err = pd.getInt32Array(); err != nil { 87 return 88 } 89 } 90 91 if m.UserData, err = pd.getBytes(); err != nil { 92 return 93 } 94 95 return nil 96} 97