1package sarama
2
3type GroupProtocol struct {
4	Name     string
5	Metadata []byte
6}
7
8func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
9	p.Name, err = pd.getString()
10	if err != nil {
11		return err
12	}
13	p.Metadata, err = pd.getBytes()
14	return err
15}
16
17func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
18	if err := pe.putString(p.Name); err != nil {
19		return err
20	}
21	if err := pe.putBytes(p.Metadata); err != nil {
22		return err
23	}
24	return nil
25}
26
27type JoinGroupRequest struct {
28	Version               int16
29	GroupId               string
30	SessionTimeout        int32
31	RebalanceTimeout      int32
32	MemberId              string
33	ProtocolType          string
34	GroupProtocols        map[string][]byte // deprecated; use OrderedGroupProtocols
35	OrderedGroupProtocols []*GroupProtocol
36}
37
38func (r *JoinGroupRequest) encode(pe packetEncoder) error {
39	if err := pe.putString(r.GroupId); err != nil {
40		return err
41	}
42	pe.putInt32(r.SessionTimeout)
43	if r.Version >= 1 {
44		pe.putInt32(r.RebalanceTimeout)
45	}
46	if err := pe.putString(r.MemberId); err != nil {
47		return err
48	}
49	if err := pe.putString(r.ProtocolType); err != nil {
50		return err
51	}
52
53	if len(r.GroupProtocols) > 0 {
54		if len(r.OrderedGroupProtocols) > 0 {
55			return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
56		}
57
58		if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
59			return err
60		}
61		for name, metadata := range r.GroupProtocols {
62			if err := pe.putString(name); err != nil {
63				return err
64			}
65			if err := pe.putBytes(metadata); err != nil {
66				return err
67			}
68		}
69	} else {
70		if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
71			return err
72		}
73		for _, protocol := range r.OrderedGroupProtocols {
74			if err := protocol.encode(pe); err != nil {
75				return err
76			}
77		}
78	}
79
80	return nil
81}
82
83func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
84	r.Version = version
85
86	if r.GroupId, err = pd.getString(); err != nil {
87		return
88	}
89
90	if r.SessionTimeout, err = pd.getInt32(); err != nil {
91		return
92	}
93
94	if version >= 1 {
95		if r.RebalanceTimeout, err = pd.getInt32(); err != nil {
96			return err
97		}
98	}
99
100	if r.MemberId, err = pd.getString(); err != nil {
101		return
102	}
103
104	if r.ProtocolType, err = pd.getString(); err != nil {
105		return
106	}
107
108	n, err := pd.getArrayLength()
109	if err != nil {
110		return err
111	}
112	if n == 0 {
113		return nil
114	}
115
116	r.GroupProtocols = make(map[string][]byte)
117	for i := 0; i < n; i++ {
118		protocol := &GroupProtocol{}
119		if err := protocol.decode(pd); err != nil {
120			return err
121		}
122		r.GroupProtocols[protocol.Name] = protocol.Metadata
123		r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
124	}
125
126	return nil
127}
128
129func (r *JoinGroupRequest) key() int16 {
130	return 11
131}
132
133func (r *JoinGroupRequest) version() int16 {
134	return r.Version
135}
136
137func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
138	switch r.Version {
139	case 2:
140		return V0_11_0_0
141	case 1:
142		return V0_10_1_0
143	default:
144		return V0_9_0_0
145	}
146}
147
148func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
149	r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
150		Name:     name,
151		Metadata: metadata,
152	})
153}
154
155func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
156	bin, err := encode(metadata, nil)
157	if err != nil {
158		return err
159	}
160
161	r.AddGroupProtocol(name, bin)
162	return nil
163}
164