1package sarama 2 3type GroupProtocol struct { 4 Name string 5 Metadata []byte 6} 7 8func (p *GroupProtocol) decode(pd packetDecoder) (err error) { 9 p.Name, err = pd.getString() 10 if err != nil { 11 return err 12 } 13 p.Metadata, err = pd.getBytes() 14 return err 15} 16 17func (p *GroupProtocol) encode(pe packetEncoder) (err error) { 18 if err := pe.putString(p.Name); err != nil { 19 return err 20 } 21 if err := pe.putBytes(p.Metadata); err != nil { 22 return err 23 } 24 return nil 25} 26 27type JoinGroupRequest struct { 28 Version int16 29 GroupId string 30 SessionTimeout int32 31 RebalanceTimeout int32 32 MemberId string 33 ProtocolType string 34 GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols 35 OrderedGroupProtocols []*GroupProtocol 36} 37 38func (r *JoinGroupRequest) encode(pe packetEncoder) error { 39 if err := pe.putString(r.GroupId); err != nil { 40 return err 41 } 42 pe.putInt32(r.SessionTimeout) 43 if r.Version >= 1 { 44 pe.putInt32(r.RebalanceTimeout) 45 } 46 if err := pe.putString(r.MemberId); err != nil { 47 return err 48 } 49 if err := pe.putString(r.ProtocolType); err != nil { 50 return err 51 } 52 53 if len(r.GroupProtocols) > 0 { 54 if len(r.OrderedGroupProtocols) > 0 { 55 return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"} 56 } 57 58 if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil { 59 return err 60 } 61 for name, metadata := range r.GroupProtocols { 62 if err := pe.putString(name); err != nil { 63 return err 64 } 65 if err := pe.putBytes(metadata); err != nil { 66 return err 67 } 68 } 69 } else { 70 if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil { 71 return err 72 } 73 for _, protocol := range r.OrderedGroupProtocols { 74 if err := protocol.encode(pe); err != nil { 75 return err 76 } 77 } 78 } 79 80 return nil 81} 82 83func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) { 84 r.Version = version 85 86 if r.GroupId, err = pd.getString(); err != nil { 87 return 88 } 89 90 if r.SessionTimeout, err = pd.getInt32(); err != nil { 91 return 92 } 93 94 if version >= 1 { 95 if r.RebalanceTimeout, err = pd.getInt32(); err != nil { 96 return err 97 } 98 } 99 100 if r.MemberId, err = pd.getString(); err != nil { 101 return 102 } 103 104 if r.ProtocolType, err = pd.getString(); err != nil { 105 return 106 } 107 108 n, err := pd.getArrayLength() 109 if err != nil { 110 return err 111 } 112 if n == 0 { 113 return nil 114 } 115 116 r.GroupProtocols = make(map[string][]byte) 117 for i := 0; i < n; i++ { 118 protocol := &GroupProtocol{} 119 if err := protocol.decode(pd); err != nil { 120 return err 121 } 122 r.GroupProtocols[protocol.Name] = protocol.Metadata 123 r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol) 124 } 125 126 return nil 127} 128 129func (r *JoinGroupRequest) key() int16 { 130 return 11 131} 132 133func (r *JoinGroupRequest) version() int16 { 134 return r.Version 135} 136 137func (r *JoinGroupRequest) requiredVersion() KafkaVersion { 138 switch r.Version { 139 case 2: 140 return V0_11_0_0 141 case 1: 142 return V0_10_1_0 143 default: 144 return V0_9_0_0 145 } 146} 147 148func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { 149 r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{ 150 Name: name, 151 Metadata: metadata, 152 }) 153} 154 155func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error { 156 bin, err := encode(metadata, nil) 157 if err != nil { 158 return err 159 } 160 161 r.AddGroupProtocol(name, bin) 162 return nil 163} 164