1from __future__ import absolute_import
2
3from kafka.protocol.api import Request, Response
4from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
5
6
7class OffsetCommitResponse_v0(Response):
8    API_KEY = 8
9    API_VERSION = 0
10    SCHEMA = Schema(
11        ('topics', Array(
12            ('topic', String('utf-8')),
13            ('partitions', Array(
14                ('partition', Int32),
15                ('error_code', Int16)))))
16    )
17
18
19class OffsetCommitResponse_v1(Response):
20    API_KEY = 8
21    API_VERSION = 1
22    SCHEMA = OffsetCommitResponse_v0.SCHEMA
23
24
25class OffsetCommitResponse_v2(Response):
26    API_KEY = 8
27    API_VERSION = 2
28    SCHEMA = OffsetCommitResponse_v1.SCHEMA
29
30
31class OffsetCommitResponse_v3(Response):
32    API_KEY = 8
33    API_VERSION = 3
34    SCHEMA = Schema(
35        ('throttle_time_ms', Int32),
36        ('topics', Array(
37            ('topic', String('utf-8')),
38            ('partitions', Array(
39                ('partition', Int32),
40                ('error_code', Int16)))))
41    )
42
43
44class OffsetCommitRequest_v0(Request):
45    API_KEY = 8
46    API_VERSION = 0  # Zookeeper-backed storage
47    RESPONSE_TYPE = OffsetCommitResponse_v0
48    SCHEMA = Schema(
49        ('consumer_group', String('utf-8')),
50        ('topics', Array(
51            ('topic', String('utf-8')),
52            ('partitions', Array(
53                ('partition', Int32),
54                ('offset', Int64),
55                ('metadata', String('utf-8'))))))
56    )
57
58
59class OffsetCommitRequest_v1(Request):
60    API_KEY = 8
61    API_VERSION = 1  # Kafka-backed storage
62    RESPONSE_TYPE = OffsetCommitResponse_v1
63    SCHEMA = Schema(
64        ('consumer_group', String('utf-8')),
65        ('consumer_group_generation_id', Int32),
66        ('consumer_id', String('utf-8')),
67        ('topics', Array(
68            ('topic', String('utf-8')),
69            ('partitions', Array(
70                ('partition', Int32),
71                ('offset', Int64),
72                ('timestamp', Int64),
73                ('metadata', String('utf-8'))))))
74    )
75
76
77class OffsetCommitRequest_v2(Request):
78    API_KEY = 8
79    API_VERSION = 2  # added retention_time, dropped timestamp
80    RESPONSE_TYPE = OffsetCommitResponse_v2
81    SCHEMA = Schema(
82        ('consumer_group', String('utf-8')),
83        ('consumer_group_generation_id', Int32),
84        ('consumer_id', String('utf-8')),
85        ('retention_time', Int64),
86        ('topics', Array(
87            ('topic', String('utf-8')),
88            ('partitions', Array(
89                ('partition', Int32),
90                ('offset', Int64),
91                ('metadata', String('utf-8'))))))
92    )
93    DEFAULT_GENERATION_ID = -1
94    DEFAULT_RETENTION_TIME = -1
95
96
97class OffsetCommitRequest_v3(Request):
98    API_KEY = 8
99    API_VERSION = 3
100    RESPONSE_TYPE = OffsetCommitResponse_v3
101    SCHEMA = OffsetCommitRequest_v2.SCHEMA
102
103
104OffsetCommitRequest = [
105    OffsetCommitRequest_v0, OffsetCommitRequest_v1,
106    OffsetCommitRequest_v2, OffsetCommitRequest_v3
107]
108OffsetCommitResponse = [
109    OffsetCommitResponse_v0, OffsetCommitResponse_v1,
110    OffsetCommitResponse_v2, OffsetCommitResponse_v3
111]
112
113
114class OffsetFetchResponse_v0(Response):
115    API_KEY = 9
116    API_VERSION = 0
117    SCHEMA = Schema(
118        ('topics', Array(
119            ('topic', String('utf-8')),
120            ('partitions', Array(
121                ('partition', Int32),
122                ('offset', Int64),
123                ('metadata', String('utf-8')),
124                ('error_code', Int16)))))
125    )
126
127
128class OffsetFetchResponse_v1(Response):
129    API_KEY = 9
130    API_VERSION = 1
131    SCHEMA = OffsetFetchResponse_v0.SCHEMA
132
133
134class OffsetFetchResponse_v2(Response):
135    # Added in KIP-88
136    API_KEY = 9
137    API_VERSION = 2
138    SCHEMA = Schema(
139        ('topics', Array(
140            ('topic', String('utf-8')),
141            ('partitions', Array(
142                ('partition', Int32),
143                ('offset', Int64),
144                ('metadata', String('utf-8')),
145                ('error_code', Int16))))),
146        ('error_code', Int16)
147    )
148
149
150class OffsetFetchResponse_v3(Response):
151    API_KEY = 9
152    API_VERSION = 3
153    SCHEMA = Schema(
154        ('throttle_time_ms', Int32),
155        ('topics', Array(
156            ('topic', String('utf-8')),
157            ('partitions', Array(
158                ('partition', Int32),
159                ('offset', Int64),
160                ('metadata', String('utf-8')),
161                ('error_code', Int16))))),
162        ('error_code', Int16)
163    )
164
165
166class OffsetFetchRequest_v0(Request):
167    API_KEY = 9
168    API_VERSION = 0  # zookeeper-backed storage
169    RESPONSE_TYPE = OffsetFetchResponse_v0
170    SCHEMA = Schema(
171        ('consumer_group', String('utf-8')),
172        ('topics', Array(
173            ('topic', String('utf-8')),
174            ('partitions', Array(Int32))))
175    )
176
177
178class OffsetFetchRequest_v1(Request):
179    API_KEY = 9
180    API_VERSION = 1  # kafka-backed storage
181    RESPONSE_TYPE = OffsetFetchResponse_v1
182    SCHEMA = OffsetFetchRequest_v0.SCHEMA
183
184
185class OffsetFetchRequest_v2(Request):
186    # KIP-88: Allows passing null topics to return offsets for all partitions
187    # that the consumer group has a stored offset for, even if no consumer in
188    # the group is currently consuming that partition.
189    API_KEY = 9
190    API_VERSION = 2
191    RESPONSE_TYPE = OffsetFetchResponse_v2
192    SCHEMA = OffsetFetchRequest_v1.SCHEMA
193
194
195class OffsetFetchRequest_v3(Request):
196    API_KEY = 9
197    API_VERSION = 3
198    RESPONSE_TYPE = OffsetFetchResponse_v3
199    SCHEMA = OffsetFetchRequest_v2.SCHEMA
200
201
202OffsetFetchRequest = [
203    OffsetFetchRequest_v0, OffsetFetchRequest_v1,
204    OffsetFetchRequest_v2, OffsetFetchRequest_v3,
205]
206OffsetFetchResponse = [
207    OffsetFetchResponse_v0, OffsetFetchResponse_v1,
208    OffsetFetchResponse_v2, OffsetFetchResponse_v3,
209]
210
211
212class GroupCoordinatorResponse_v0(Response):
213    API_KEY = 10
214    API_VERSION = 0
215    SCHEMA = Schema(
216        ('error_code', Int16),
217        ('coordinator_id', Int32),
218        ('host', String('utf-8')),
219        ('port', Int32)
220    )
221
222
223class GroupCoordinatorResponse_v1(Response):
224    API_KEY = 10
225    API_VERSION = 1
226    SCHEMA = Schema(
227        ('error_code', Int16),
228        ('error_message', String('utf-8')),
229        ('coordinator_id', Int32),
230        ('host', String('utf-8')),
231        ('port', Int32)
232    )
233
234
235class GroupCoordinatorRequest_v0(Request):
236    API_KEY = 10
237    API_VERSION = 0
238    RESPONSE_TYPE = GroupCoordinatorResponse_v0
239    SCHEMA = Schema(
240        ('consumer_group', String('utf-8'))
241    )
242
243
244class GroupCoordinatorRequest_v1(Request):
245    API_KEY = 10
246    API_VERSION = 1
247    RESPONSE_TYPE = GroupCoordinatorResponse_v1
248    SCHEMA = Schema(
249        ('coordinator_key', String('utf-8')),
250        ('coordinator_type', Int8)
251    )
252
253
254GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1]
255GroupCoordinatorResponse = [GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1]
256