1defmodule KafkaEx.NetworkClient do 2 require Logger 3 alias KafkaEx.Protocol.Metadata.Broker 4 alias KafkaEx.Socket 5 6 @moduledoc false 7 @spec create_socket(binary, non_neg_integer, KafkaEx.ssl_options(), boolean) :: 8 nil | Socket.t() 9 def create_socket(host, port, ssl_options \\ [], use_ssl \\ false) do 10 case Socket.create( 11 format_host(host), 12 port, 13 build_socket_options(ssl_options), 14 use_ssl 15 ) do 16 {:ok, socket} -> 17 Logger.log( 18 :debug, 19 "Successfully connected to broker #{inspect(host)}:#{inspect(port)}" 20 ) 21 22 socket 23 24 err -> 25 Logger.log( 26 :error, 27 "Could not connect to broker #{inspect(host)}:#{inspect(port)} because of error #{ 28 inspect(err) 29 }" 30 ) 31 32 nil 33 end 34 end 35 36 @spec close_socket(nil | Socket.t()) :: :ok 37 def close_socket(nil), do: :ok 38 def close_socket(socket), do: Socket.close(socket) 39 40 @spec send_async_request(Broker.t(), iodata) :: 41 :ok | {:error, :closed | :inet.posix()} 42 def send_async_request(broker, data) do 43 socket = broker.socket 44 45 case Socket.send(socket, data) do 46 :ok -> 47 :ok 48 49 {_, reason} -> 50 Logger.log( 51 :error, 52 "Asynchronously sending data to broker #{inspect(broker.host)}:#{ 53 inspect(broker.port) 54 } failed with #{inspect(reason)}" 55 ) 56 57 reason 58 end 59 end 60 61 @spec send_sync_request(Broker.t(), iodata, timeout) :: 62 iodata | {:error, any()} 63 def send_sync_request(%{:socket => socket} = broker, data, timeout) do 64 :ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, false}]) 65 66 response = 67 case Socket.send(socket, data) do 68 :ok -> 69 case Socket.recv(socket, 0, timeout) do 70 {:ok, data} -> 71 data 72 73 {:error, reason} -> 74 Logger.log( 75 :error, 76 "Receiving data from broker #{inspect(broker.host)}:#{ 77 inspect(broker.port) 78 } failed with #{inspect(reason)}" 79 ) 80 81 {:error, reason} 82 end 83 84 {_, reason} -> 85 Logger.log( 86 :error, 87 "Sending data to broker #{inspect(broker.host)}:#{ 88 inspect(broker.port) 89 } failed with #{inspect(reason)}" 90 ) 91 92 {:error, reason} 93 end 94 95 :ok = Socket.setopts(socket, [:binary, {:packet, 4}, {:active, true}]) 96 response 97 end 98 99 @spec format_host(binary) :: [char] | :inet.ip_address() 100 def format_host(host) do 101 case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do 102 [match_data] = [[_, _, _, _, _]] -> 103 match_data 104 |> tl 105 |> List.flatten() 106 |> Enum.map(&String.to_integer/1) 107 |> List.to_tuple() 108 109 # to_char_list is deprecated from Elixir 1.3 onward 110 _ -> 111 apply(String, :to_char_list, [host]) 112 end 113 end 114 115 defp build_socket_options([]) do 116 [:binary, {:packet, 4}] 117 end 118 119 defp build_socket_options(ssl_options) do 120 build_socket_options([]) ++ ssl_options 121 end 122end 123