1defmodule KafkaEx.NetworkClient do
2  require Logger
3  alias KafkaEx.Protocol.Metadata.Broker
4  alias KafkaEx.Socket
5
6  @moduledoc false
7  @spec create_socket(binary, non_neg_integer, KafkaEx.ssl_options(), boolean) ::
8          nil | Socket.t()
9  def create_socket(host, port, ssl_options \\ [], use_ssl \\ false) do
10    case Socket.create(
11           format_host(host),
12           port,
13           build_socket_options(ssl_options),
14           use_ssl
15         ) do
16      {:ok, socket} ->
17        Logger.log(
18          :debug,
19          "Successfully connected to broker #{inspect(host)}:#{inspect(port)}"
20        )
21
22        socket
23
24      err ->
25        Logger.log(
26          :error,
27          "Could not connect to broker #{inspect(host)}:#{inspect(port)} because of error #{
28            inspect(err)
29          }"
30        )
31
32        nil
33    end
34  end
35
36  @spec close_socket(nil | Socket.t()) :: :ok
37  def close_socket(nil), do: :ok
38  def close_socket(socket), do: Socket.close(socket)
39
40  @spec send_async_request(Broker.t(), iodata) ::
41          :ok | {:error, :closed | :inet.posix()}
42  def send_async_request(broker, data) do
43    socket = broker.socket
44
45    case Socket.send(socket, data) do
46      :ok ->
47        :ok
48
49      {_, reason} ->
50        Logger.log(
51          :error,
52          "Asynchronously sending data to broker #{inspect(broker.host)}:#{
53            inspect(broker.port)
54          } failed with #{inspect(reason)}"
55        )
56
57        reason
58    end
59  end
60
61  @spec send_sync_request(Broker.t(), iodata, timeout) ::
62          iodata | {:error, any()}
63  def send_sync_request(%{:socket => socket} = broker, data, timeout) do
64    :ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, false}])
65
66    response =
67      case Socket.send(socket, data) do
68        :ok ->
69          case Socket.recv(socket, 0, timeout) do
70            {:ok, data} ->
71              data
72
73            {:error, reason} ->
74              Logger.log(
75                :error,
76                "Receiving data from broker #{inspect(broker.host)}:#{
77                  inspect(broker.port)
78                } failed with #{inspect(reason)}"
79              )
80
81              {:error, reason}
82          end
83
84        {_, reason} ->
85          Logger.log(
86            :error,
87            "Sending data to broker #{inspect(broker.host)}:#{
88              inspect(broker.port)
89            } failed with #{inspect(reason)}"
90          )
91
92          {:error, reason}
93      end
94
95    :ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}])
96    response
97  end
98
99  @spec format_host(binary) :: [char] | :inet.ip_address()
100  def format_host(host) do
101    case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do
102      [match_data] = [[_, _, _, _, _]] ->
103        match_data
104        |> tl
105        |> List.flatten()
106        |> Enum.map(&String.to_integer/1)
107        |> List.to_tuple()
108
109      # to_char_list is deprecated from Elixir 1.3 onward
110      _ ->
111        apply(String, :to_char_list, [host])
112    end
113  end
114
115  defp build_socket_options([]) do
116    [:binary, {:packet, 4}]
117  end
118
119  defp build_socket_options(ssl_options) do
120    build_socket_options([]) ++ ssl_options
121  end
122end
123