1defmodule File.Stream do 2 @moduledoc """ 3 Defines a `File.Stream` struct returned by `File.stream!/3`. 4 5 The following fields are public: 6 7 * `path` - the file path 8 * `modes` - the file modes 9 * `raw` - a boolean indicating if bin functions should be used 10 * `line_or_bytes` - if reading should read lines or a given number of bytes 11 12 """ 13 14 defstruct path: nil, modes: [], line_or_bytes: :line, raw: true 15 16 @type t :: %__MODULE__{} 17 18 @doc false 19 def __build__(path, modes, line_or_bytes) do 20 raw = :lists.keyfind(:encoding, 1, modes) == false 21 22 modes = 23 case raw do 24 true -> 25 case :lists.keyfind(:read_ahead, 1, modes) do 26 {:read_ahead, false} -> [:raw | :lists.keydelete(:read_ahead, 1, modes)] 27 {:read_ahead, _} -> [:raw | modes] 28 false -> [:raw, :read_ahead | modes] 29 end 30 31 false -> 32 modes 33 end 34 35 %File.Stream{path: path, modes: modes, raw: raw, line_or_bytes: line_or_bytes} 36 end 37 38 defimpl Collectable do 39 def into(%{path: path, modes: modes, raw: raw} = stream) do 40 modes = for mode <- modes, mode not in [:read], do: mode 41 42 case :file.open(path, [:write | modes]) do 43 {:ok, device} -> 44 {:ok, into(device, stream, raw)} 45 46 {:error, reason} -> 47 raise File.Error, reason: reason, action: "stream", path: path 48 end 49 end 50 51 defp into(device, stream, raw) do 52 fn 53 :ok, {:cont, x} -> 54 case raw do 55 true -> IO.binwrite(device, x) 56 false -> IO.write(device, x) 57 end 58 59 :ok, :done -> 60 # If delayed_write option is used and the last write failed will 61 # MatchError here as {:error, _} is returned. 62 :ok = :file.close(device) 63 stream 64 65 :ok, :halt -> 66 # If delayed_write option is used and the last write failed will 67 # MatchError here as {:error, _} is returned. 68 :ok = :file.close(device) 69 end 70 end 71 end 72 73 defimpl Enumerable do 74 @read_ahead_size 64 * 1024 75 76 def reduce(%{path: path, modes: modes, line_or_bytes: line_or_bytes, raw: raw}, acc, fun) do 77 start_fun = fn -> 78 case :file.open(path, read_modes(modes)) do 79 {:ok, device} -> 80 if :trim_bom in modes, do: trim_bom(device, raw) |> elem(0), else: device 81 82 {:error, reason} -> 83 raise File.Error, reason: reason, action: "stream", path: path 84 end 85 end 86 87 next_fun = 88 case raw do 89 true -> &IO.each_binstream(&1, line_or_bytes) 90 false -> &IO.each_stream(&1, line_or_bytes) 91 end 92 93 Stream.resource(start_fun, next_fun, &:file.close/1).(acc, fun) 94 end 95 96 def count(%{path: path, modes: modes, line_or_bytes: :line} = stream) do 97 pattern = :binary.compile_pattern("\n") 98 counter = &count_lines(&1, path, pattern, read_function(stream), 0) 99 100 case File.open(path, read_modes(modes), counter) do 101 {:ok, count} -> 102 {:ok, count} 103 104 {:error, reason} -> 105 raise File.Error, reason: reason, action: "stream", path: path 106 end 107 end 108 109 def count(%{path: path, line_or_bytes: bytes, raw: true, modes: modes}) do 110 case File.stat(path) do 111 {:ok, %{size: 0}} -> 112 {:error, __MODULE__} 113 114 {:ok, %{size: size}} -> 115 remainder = if rem(size, bytes) == 0, do: 0, else: 1 116 {:ok, div(size, bytes) + remainder - count_raw_bom(path, modes)} 117 118 {:error, reason} -> 119 raise File.Error, reason: reason, action: "stream", path: path 120 end 121 end 122 123 def count(_stream) do 124 {:error, __MODULE__} 125 end 126 127 def member?(_stream, _term) do 128 {:error, __MODULE__} 129 end 130 131 def slice(_stream) do 132 {:error, __MODULE__} 133 end 134 135 defp count_raw_bom(path, modes) do 136 if :trim_bom in modes do 137 File.open!(path, read_modes(modes), &(&1 |> trim_bom(true) |> elem(1))) 138 else 139 0 140 end 141 end 142 143 defp trim_bom(device, true) do 144 bom_length = device |> IO.binread(4) |> bom_length() 145 {:ok, new_pos} = :file.position(device, bom_length) 146 {device, new_pos} 147 end 148 149 defp trim_bom(device, false) do 150 # Or we read the bom in the correct amount or it isn't there 151 case bom_length(IO.read(device, 1)) do 152 0 -> 153 {:ok, _} = :file.position(device, 0) 154 {device, 0} 155 156 _ -> 157 {device, 1} 158 end 159 end 160 161 defp bom_length(<<239, 187, 191, _rest::binary>>), do: 3 162 defp bom_length(<<254, 255, _rest::binary>>), do: 2 163 defp bom_length(<<255, 254, _rest::binary>>), do: 2 164 defp bom_length(<<0, 0, 254, 255, _rest::binary>>), do: 4 165 defp bom_length(<<254, 255, 0, 0, _rest::binary>>), do: 4 166 defp bom_length(_binary), do: 0 167 168 defp read_modes(modes) do 169 for mode <- modes, mode not in [:write, :append, :trim_bom], do: mode 170 end 171 172 defp count_lines(device, path, pattern, read, count) do 173 case read.(device) do 174 data when is_binary(data) -> 175 count_lines(device, path, pattern, read, count + count_lines(data, pattern)) 176 177 :eof -> 178 count 179 180 {:error, reason} -> 181 raise File.Error, reason: reason, action: "stream", path: path 182 end 183 end 184 185 defp count_lines(data, pattern), do: length(:binary.matches(data, pattern)) 186 187 defp read_function(%{raw: true}), do: &IO.binread(&1, @read_ahead_size) 188 defp read_function(%{raw: false}), do: &IO.read(&1, @read_ahead_size) 189 end 190end 191