1from __future__ import absolute_import 2 3from kafka.protocol.api import Request, Response 4from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String 5 6 7class OffsetCommitResponse_v0(Response): 8 API_KEY = 8 9 API_VERSION = 0 10 SCHEMA = Schema( 11 ('topics', Array( 12 ('topic', String('utf-8')), 13 ('partitions', Array( 14 ('partition', Int32), 15 ('error_code', Int16))))) 16 ) 17 18 19class OffsetCommitResponse_v1(Response): 20 API_KEY = 8 21 API_VERSION = 1 22 SCHEMA = OffsetCommitResponse_v0.SCHEMA 23 24 25class OffsetCommitResponse_v2(Response): 26 API_KEY = 8 27 API_VERSION = 2 28 SCHEMA = OffsetCommitResponse_v1.SCHEMA 29 30 31class OffsetCommitResponse_v3(Response): 32 API_KEY = 8 33 API_VERSION = 3 34 SCHEMA = Schema( 35 ('throttle_time_ms', Int32), 36 ('topics', Array( 37 ('topic', String('utf-8')), 38 ('partitions', Array( 39 ('partition', Int32), 40 ('error_code', Int16))))) 41 ) 42 43 44class OffsetCommitRequest_v0(Request): 45 API_KEY = 8 46 API_VERSION = 0 # Zookeeper-backed storage 47 RESPONSE_TYPE = OffsetCommitResponse_v0 48 SCHEMA = Schema( 49 ('consumer_group', String('utf-8')), 50 ('topics', Array( 51 ('topic', String('utf-8')), 52 ('partitions', Array( 53 ('partition', Int32), 54 ('offset', Int64), 55 ('metadata', String('utf-8')))))) 56 ) 57 58 59class OffsetCommitRequest_v1(Request): 60 API_KEY = 8 61 API_VERSION = 1 # Kafka-backed storage 62 RESPONSE_TYPE = OffsetCommitResponse_v1 63 SCHEMA = Schema( 64 ('consumer_group', String('utf-8')), 65 ('consumer_group_generation_id', Int32), 66 ('consumer_id', String('utf-8')), 67 ('topics', Array( 68 ('topic', String('utf-8')), 69 ('partitions', Array( 70 ('partition', Int32), 71 ('offset', Int64), 72 ('timestamp', Int64), 73 ('metadata', String('utf-8')))))) 74 ) 75 76 77class OffsetCommitRequest_v2(Request): 78 API_KEY = 8 79 API_VERSION = 2 # added retention_time, dropped timestamp 80 RESPONSE_TYPE = OffsetCommitResponse_v2 81 SCHEMA = Schema( 82 ('consumer_group', String('utf-8')), 83 ('consumer_group_generation_id', Int32), 84 ('consumer_id', String('utf-8')), 85 ('retention_time', Int64), 86 ('topics', Array( 87 ('topic', String('utf-8')), 88 ('partitions', Array( 89 ('partition', Int32), 90 ('offset', Int64), 91 ('metadata', String('utf-8')))))) 92 ) 93 DEFAULT_GENERATION_ID = -1 94 DEFAULT_RETENTION_TIME = -1 95 96 97class OffsetCommitRequest_v3(Request): 98 API_KEY = 8 99 API_VERSION = 3 100 RESPONSE_TYPE = OffsetCommitResponse_v3 101 SCHEMA = OffsetCommitRequest_v2.SCHEMA 102 103 104OffsetCommitRequest = [ 105 OffsetCommitRequest_v0, OffsetCommitRequest_v1, 106 OffsetCommitRequest_v2, OffsetCommitRequest_v3 107] 108OffsetCommitResponse = [ 109 OffsetCommitResponse_v0, OffsetCommitResponse_v1, 110 OffsetCommitResponse_v2, OffsetCommitResponse_v3 111] 112 113 114class OffsetFetchResponse_v0(Response): 115 API_KEY = 9 116 API_VERSION = 0 117 SCHEMA = Schema( 118 ('topics', Array( 119 ('topic', String('utf-8')), 120 ('partitions', Array( 121 ('partition', Int32), 122 ('offset', Int64), 123 ('metadata', String('utf-8')), 124 ('error_code', Int16))))) 125 ) 126 127 128class OffsetFetchResponse_v1(Response): 129 API_KEY = 9 130 API_VERSION = 1 131 SCHEMA = OffsetFetchResponse_v0.SCHEMA 132 133 134class OffsetFetchResponse_v2(Response): 135 # Added in KIP-88 136 API_KEY = 9 137 API_VERSION = 2 138 SCHEMA = Schema( 139 ('topics', Array( 140 ('topic', String('utf-8')), 141 ('partitions', Array( 142 ('partition', Int32), 143 ('offset', Int64), 144 ('metadata', String('utf-8')), 145 ('error_code', Int16))))), 146 ('error_code', Int16) 147 ) 148 149 150class OffsetFetchResponse_v3(Response): 151 API_KEY = 9 152 API_VERSION = 3 153 SCHEMA = Schema( 154 ('throttle_time_ms', Int32), 155 ('topics', Array( 156 ('topic', String('utf-8')), 157 ('partitions', Array( 158 ('partition', Int32), 159 ('offset', Int64), 160 ('metadata', String('utf-8')), 161 ('error_code', Int16))))), 162 ('error_code', Int16) 163 ) 164 165 166class OffsetFetchRequest_v0(Request): 167 API_KEY = 9 168 API_VERSION = 0 # zookeeper-backed storage 169 RESPONSE_TYPE = OffsetFetchResponse_v0 170 SCHEMA = Schema( 171 ('consumer_group', String('utf-8')), 172 ('topics', Array( 173 ('topic', String('utf-8')), 174 ('partitions', Array(Int32)))) 175 ) 176 177 178class OffsetFetchRequest_v1(Request): 179 API_KEY = 9 180 API_VERSION = 1 # kafka-backed storage 181 RESPONSE_TYPE = OffsetFetchResponse_v1 182 SCHEMA = OffsetFetchRequest_v0.SCHEMA 183 184 185class OffsetFetchRequest_v2(Request): 186 # KIP-88: Allows passing null topics to return offsets for all partitions 187 # that the consumer group has a stored offset for, even if no consumer in 188 # the group is currently consuming that partition. 189 API_KEY = 9 190 API_VERSION = 2 191 RESPONSE_TYPE = OffsetFetchResponse_v2 192 SCHEMA = OffsetFetchRequest_v1.SCHEMA 193 194 195class OffsetFetchRequest_v3(Request): 196 API_KEY = 9 197 API_VERSION = 3 198 RESPONSE_TYPE = OffsetFetchResponse_v3 199 SCHEMA = OffsetFetchRequest_v2.SCHEMA 200 201 202OffsetFetchRequest = [ 203 OffsetFetchRequest_v0, OffsetFetchRequest_v1, 204 OffsetFetchRequest_v2, OffsetFetchRequest_v3, 205] 206OffsetFetchResponse = [ 207 OffsetFetchResponse_v0, OffsetFetchResponse_v1, 208 OffsetFetchResponse_v2, OffsetFetchResponse_v3, 209] 210 211 212class GroupCoordinatorResponse_v0(Response): 213 API_KEY = 10 214 API_VERSION = 0 215 SCHEMA = Schema( 216 ('error_code', Int16), 217 ('coordinator_id', Int32), 218 ('host', String('utf-8')), 219 ('port', Int32) 220 ) 221 222 223class GroupCoordinatorResponse_v1(Response): 224 API_KEY = 10 225 API_VERSION = 1 226 SCHEMA = Schema( 227 ('error_code', Int16), 228 ('error_message', String('utf-8')), 229 ('coordinator_id', Int32), 230 ('host', String('utf-8')), 231 ('port', Int32) 232 ) 233 234 235class GroupCoordinatorRequest_v0(Request): 236 API_KEY = 10 237 API_VERSION = 0 238 RESPONSE_TYPE = GroupCoordinatorResponse_v0 239 SCHEMA = Schema( 240 ('consumer_group', String('utf-8')) 241 ) 242 243 244class GroupCoordinatorRequest_v1(Request): 245 API_KEY = 10 246 API_VERSION = 1 247 RESPONSE_TYPE = GroupCoordinatorResponse_v1 248 SCHEMA = Schema( 249 ('coordinator_key', String('utf-8')), 250 ('coordinator_type', Int8) 251 ) 252 253 254GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1] 255GroupCoordinatorResponse = [GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1] 256