1defmodule File.Stream do
2  @moduledoc """
3  Defines a `File.Stream` struct returned by `File.stream!/3`.
4
5  The following fields are public:
6
7    * `path`          - the file path
8    * `modes`         - the file modes
9    * `raw`           - a boolean indicating if bin functions should be used
10    * `line_or_bytes` - if reading should read lines or a given number of bytes
11
12  """
13
14  defstruct path: nil, modes: [], line_or_bytes: :line, raw: true
15
16  @type t :: %__MODULE__{}
17
18  @doc false
19  def __build__(path, modes, line_or_bytes) do
20    raw = :lists.keyfind(:encoding, 1, modes) == false
21
22    modes =
23      case raw do
24        true ->
25          case :lists.keyfind(:read_ahead, 1, modes) do
26            {:read_ahead, false} -> [:raw | :lists.keydelete(:read_ahead, 1, modes)]
27            {:read_ahead, _} -> [:raw | modes]
28            false -> [:raw, :read_ahead | modes]
29          end
30
31        false ->
32          modes
33      end
34
35    %File.Stream{path: path, modes: modes, raw: raw, line_or_bytes: line_or_bytes}
36  end
37
38  defimpl Collectable do
39    def into(%{path: path, modes: modes, raw: raw} = stream) do
40      modes = for mode <- modes, mode not in [:read], do: mode
41
42      case :file.open(path, [:write | modes]) do
43        {:ok, device} ->
44          {:ok, into(device, stream, raw)}
45
46        {:error, reason} ->
47          raise File.Error, reason: reason, action: "stream", path: path
48      end
49    end
50
51    defp into(device, stream, raw) do
52      fn
53        :ok, {:cont, x} ->
54          case raw do
55            true -> IO.binwrite(device, x)
56            false -> IO.write(device, x)
57          end
58
59        :ok, :done ->
60          # If delayed_write option is used and the last write failed will
61          # MatchError here as {:error, _} is returned.
62          :ok = :file.close(device)
63          stream
64
65        :ok, :halt ->
66          # If delayed_write option is used and the last write failed will
67          # MatchError here as {:error, _} is returned.
68          :ok = :file.close(device)
69      end
70    end
71  end
72
73  defimpl Enumerable do
74    @read_ahead_size 64 * 1024
75
76    def reduce(%{path: path, modes: modes, line_or_bytes: line_or_bytes, raw: raw}, acc, fun) do
77      start_fun = fn ->
78        case :file.open(path, read_modes(modes)) do
79          {:ok, device} ->
80            if :trim_bom in modes, do: trim_bom(device, raw) |> elem(0), else: device
81
82          {:error, reason} ->
83            raise File.Error, reason: reason, action: "stream", path: path
84        end
85      end
86
87      next_fun =
88        case raw do
89          true -> &IO.each_binstream(&1, line_or_bytes)
90          false -> &IO.each_stream(&1, line_or_bytes)
91        end
92
93      Stream.resource(start_fun, next_fun, &:file.close/1).(acc, fun)
94    end
95
96    def count(%{path: path, modes: modes, line_or_bytes: :line} = stream) do
97      pattern = :binary.compile_pattern("\n")
98      counter = &count_lines(&1, path, pattern, read_function(stream), 0)
99
100      case File.open(path, read_modes(modes), counter) do
101        {:ok, count} ->
102          {:ok, count}
103
104        {:error, reason} ->
105          raise File.Error, reason: reason, action: "stream", path: path
106      end
107    end
108
109    def count(%{path: path, line_or_bytes: bytes, raw: true, modes: modes}) do
110      case File.stat(path) do
111        {:ok, %{size: 0}} ->
112          {:error, __MODULE__}
113
114        {:ok, %{size: size}} ->
115          remainder = if rem(size, bytes) == 0, do: 0, else: 1
116          {:ok, div(size, bytes) + remainder - count_raw_bom(path, modes)}
117
118        {:error, reason} ->
119          raise File.Error, reason: reason, action: "stream", path: path
120      end
121    end
122
123    def count(_stream) do
124      {:error, __MODULE__}
125    end
126
127    def member?(_stream, _term) do
128      {:error, __MODULE__}
129    end
130
131    def slice(_stream) do
132      {:error, __MODULE__}
133    end
134
135    defp count_raw_bom(path, modes) do
136      if :trim_bom in modes do
137        File.open!(path, read_modes(modes), &(&1 |> trim_bom(true) |> elem(1)))
138      else
139        0
140      end
141    end
142
143    defp trim_bom(device, true) do
144      bom_length = device |> IO.binread(4) |> bom_length()
145      {:ok, new_pos} = :file.position(device, bom_length)
146      {device, new_pos}
147    end
148
149    defp trim_bom(device, false) do
150      # Or we read the bom in the correct amount or it isn't there
151      case bom_length(IO.read(device, 1)) do
152        0 ->
153          {:ok, _} = :file.position(device, 0)
154          {device, 0}
155
156        _ ->
157          {device, 1}
158      end
159    end
160
161    defp bom_length(<<239, 187, 191, _rest::binary>>), do: 3
162    defp bom_length(<<254, 255, _rest::binary>>), do: 2
163    defp bom_length(<<255, 254, _rest::binary>>), do: 2
164    defp bom_length(<<0, 0, 254, 255, _rest::binary>>), do: 4
165    defp bom_length(<<254, 255, 0, 0, _rest::binary>>), do: 4
166    defp bom_length(_binary), do: 0
167
168    defp read_modes(modes) do
169      for mode <- modes, mode not in [:write, :append, :trim_bom], do: mode
170    end
171
172    defp count_lines(device, path, pattern, read, count) do
173      case read.(device) do
174        data when is_binary(data) ->
175          count_lines(device, path, pattern, read, count + count_lines(data, pattern))
176
177        :eof ->
178          count
179
180        {:error, reason} ->
181          raise File.Error, reason: reason, action: "stream", path: path
182      end
183    end
184
185    defp count_lines(data, pattern), do: length(:binary.matches(data, pattern))
186
187    defp read_function(%{raw: true}), do: &IO.binread(&1, @read_ahead_size)
188    defp read_function(%{raw: false}), do: &IO.read(&1, @read_ahead_size)
189  end
190end
191