1defmodule KafkaEx.Protocol.OffsetFetch do
2  alias KafkaEx.Protocol
3  import KafkaEx.Protocol.Common
4
5  @moduledoc """
6  Implementation of the Kafka OffsetFetch request and response APIs
7  """
8
9  defmodule Request do
10    @moduledoc false
11    defstruct consumer_group: nil, topic: nil, partition: nil
12
13    @type t :: %Request{
14            consumer_group: nil | binary,
15            topic: binary,
16            partition: integer
17          }
18  end
19
20  defmodule Response do
21    @moduledoc false
22    defstruct topic: nil, partitions: []
23    @type t :: %Response{topic: binary, partitions: list}
24
25    def last_offset(:topic_not_found) do
26      0
27    end
28
29    def last_offset(offset_fetch_data) do
30      case offset_fetch_data do
31        [] ->
32          0
33
34        _ ->
35          partitions = offset_fetch_data |> hd |> Map.get(:partitions, [])
36
37          case partitions do
38            [] -> 0
39            _ -> partitions |> hd |> Map.get(:offset, 0)
40          end
41      end
42    end
43  end
44
45  def create_request(correlation_id, client_id, offset_fetch_request) do
46    KafkaEx.Protocol.create_request(:offset_fetch, correlation_id, client_id) <>
47      <<byte_size(offset_fetch_request.consumer_group)::16-signed,
48        offset_fetch_request.consumer_group::binary, 1::32-signed,
49        byte_size(offset_fetch_request.topic)::16-signed,
50        offset_fetch_request.topic::binary, 1::32-signed,
51        offset_fetch_request.partition::32>>
52  end
53
54  def parse_response(
55        <<_correlation_id::32-signed, topics_size::32-signed,
56          topics_data::binary>>
57      ) do
58    parse_topics(topics_size, topics_data, __MODULE__)
59  end
60
61  def parse_partitions(0, rest, partitions), do: {partitions, rest}
62
63  def parse_partitions(
64        partitions_size,
65        <<partition::32-signed, offset::64-signed, metadata_size::16-signed,
66          metadata::size(metadata_size)-binary, error_code::16-signed,
67          rest::binary>>,
68        partitions
69      ) do
70    parse_partitions(partitions_size - 1, rest, [
71      %{
72        partition: partition,
73        offset: offset,
74        metadata: metadata,
75        error_code: Protocol.error(error_code)
76      }
77      | partitions
78    ])
79  end
80end
81