1defmodule KafkaEx.Protocol.OffsetFetch do 2 alias KafkaEx.Protocol 3 import KafkaEx.Protocol.Common 4 5 @moduledoc """ 6 Implementation of the Kafka OffsetFetch request and response APIs 7 """ 8 9 defmodule Request do 10 @moduledoc false 11 defstruct consumer_group: nil, topic: nil, partition: nil 12 13 @type t :: %Request{ 14 consumer_group: nil | binary, 15 topic: binary, 16 partition: integer 17 } 18 end 19 20 defmodule Response do 21 @moduledoc false 22 defstruct topic: nil, partitions: [] 23 @type t :: %Response{topic: binary, partitions: list} 24 25 def last_offset(:topic_not_found) do 26 0 27 end 28 29 def last_offset(offset_fetch_data) do 30 case offset_fetch_data do 31 [] -> 32 0 33 34 _ -> 35 partitions = offset_fetch_data |> hd |> Map.get(:partitions, []) 36 37 case partitions do 38 [] -> 0 39 _ -> partitions |> hd |> Map.get(:offset, 0) 40 end 41 end 42 end 43 end 44 45 def create_request(correlation_id, client_id, offset_fetch_request) do 46 KafkaEx.Protocol.create_request(:offset_fetch, correlation_id, client_id) <> 47 <<byte_size(offset_fetch_request.consumer_group)::16-signed, 48 offset_fetch_request.consumer_group::binary, 1::32-signed, 49 byte_size(offset_fetch_request.topic)::16-signed, 50 offset_fetch_request.topic::binary, 1::32-signed, 51 offset_fetch_request.partition::32>> 52 end 53 54 def parse_response( 55 <<_correlation_id::32-signed, topics_size::32-signed, 56 topics_data::binary>> 57 ) do 58 parse_topics(topics_size, topics_data, __MODULE__) 59 end 60 61 def parse_partitions(0, rest, partitions), do: {partitions, rest} 62 63 def parse_partitions( 64 partitions_size, 65 <<partition::32-signed, offset::64-signed, metadata_size::16-signed, 66 metadata::size(metadata_size)-binary, error_code::16-signed, 67 rest::binary>>, 68 partitions 69 ) do 70 parse_partitions(partitions_size - 1, rest, [ 71 %{ 72 partition: partition, 73 offset: offset, 74 metadata: metadata, 75 error_code: Protocol.error(error_code) 76 } 77 | partitions 78 ]) 79 end 80end 81