1defmodule Ecto.Repo.Preloader do
2  # The module invoked by user defined repos
3  # for preload related functionality.
4  @moduledoc false
5
6  require Ecto.Query
7
8  @doc """
9  Transforms a result set based on query preloads, loading
10  the associations onto their parent schema.
11  """
12  @spec query([list], Ecto.Repo.t, list, Access.t, fun, Keyword.t) :: [list]
13  def query([], _repo, _preloads, _take, _fun, _opts), do: []
14  def query(rows, _repo, [], _take, fun, _opts), do: Enum.map(rows, fun)
15
16  def query(rows, repo, preloads, take, fun, opts) do
17    rows
18    |> extract
19    |> normalize_and_preload_each(repo, preloads, take, opts)
20    |> unextract(rows, fun)
21  end
22
23  defp extract([[nil|_]|t2]), do: extract(t2)
24  defp extract([[h|_]|t2]), do: [h|extract(t2)]
25  defp extract([]), do: []
26
27  defp unextract(structs, [[nil|_] = h2|t2], fun), do: [fun.(h2)|unextract(structs, t2, fun)]
28  defp unextract([h1|structs], [[_|t1]|t2], fun), do: [fun.([h1|t1])|unextract(structs, t2, fun)]
29  defp unextract([], [], _fun), do: []
30
31  @doc """
32  Implementation for `Ecto.Repo.preload/2`.
33  """
34  @spec preload(structs, atom, atom | list, Keyword.t) ::
35                structs when structs: [Ecto.Schema.t] | Ecto.Schema.t | nil
36  def preload(nil, _repo, _preloads, _opts) do
37    nil
38  end
39
40  def preload(structs, repo, preloads, opts) when is_list(structs) do
41    normalize_and_preload_each(structs, repo, preloads, opts[:take], opts)
42  end
43
44  def preload(struct, repo, preloads, opts) when is_map(struct) do
45    normalize_and_preload_each([struct], repo, preloads, opts[:take], opts) |> hd()
46  end
47
48  defp normalize_and_preload_each(structs, repo, preloads, take, opts) do
49    preloads = normalize(preloads, take, preloads)
50    preload_each(structs, repo, preloads, opts)
51  rescue
52    e ->
53      # Reraise errors so we ignore the preload inner stacktrace
54      reraise e
55  end
56
57  ## Preloading
58
59  defp preload_each(structs, _repo, [], _opts),   do: structs
60  defp preload_each([], _repo, _preloads, _opts), do: []
61  defp preload_each(structs, repo, preloads, opts) do
62    if sample = Enum.find(structs, & &1) do
63      module = sample.__struct__
64      prefix = preload_prefix(opts, sample)
65      {assocs, throughs} = expand(module, preloads, {%{}, %{}})
66
67      assocs =
68        maybe_pmap Map.values(assocs), repo, opts, fn
69          {{:assoc, assoc, related_key}, take, query, sub_preloads}, opts ->
70            preload_assoc(structs, module, repo, prefix, assoc, related_key,
71                          query, sub_preloads, take, opts)
72        end
73
74      throughs =
75        Map.values(throughs)
76
77      for struct <- structs do
78        struct = Enum.reduce assocs, struct, &load_assoc/2
79        struct = Enum.reduce throughs, struct, &load_through/2
80        struct
81      end
82    else
83      structs
84    end
85  end
86
87  defp preload_prefix(opts, sample) do
88    case Keyword.fetch(opts, :prefix) do
89      {:ok, prefix} ->
90        prefix
91      :error ->
92        %{__meta__: %{source: {prefix, _}}} = sample
93        prefix
94    end
95  end
96
97  ## Association preloading
98
99  defp maybe_pmap(assocs, repo, opts, fun) do
100    if match?([_,_|_], assocs) and not repo.in_transaction? and
101         Keyword.get(opts, :in_parallel, true) do
102      # We pass caller: self() so pools like the ownership
103      # pool knows where to fetch the connection from and
104      # set the proper timeouts.
105      opts = Keyword.put_new(opts, :caller, self())
106      assocs
107      |> Enum.map(&Task.async(:erlang, :apply, [fun, [&1, opts]]))
108      |> Enum.map(&Task.await(&1, :infinity))
109    else
110      Enum.map(assocs, &fun.(&1, opts))
111    end
112  end
113
114  defp preload_assoc(structs, module, repo, prefix, %{cardinality: card} = assoc,
115                     related_key, query, preloads, take, opts) do
116    {fetch_ids, loaded_ids, loaded_structs} =
117      fetch_ids(structs, module, assoc, opts)
118    {fetch_ids, fetch_structs} =
119      fetch_query(fetch_ids, assoc, repo, query, prefix, related_key, take, opts)
120
121    all = preload_each(Enum.reverse(loaded_structs, fetch_structs), repo, preloads, opts)
122    {:assoc, assoc, assoc_map(card, Enum.reverse(loaded_ids, fetch_ids), all)}
123  end
124
125  defp fetch_ids(structs, module, assoc, opts) do
126    %{field: field, owner_key: owner_key, cardinality: card} = assoc
127    force? = Keyword.get(opts, :force, false)
128
129    Enum.reduce structs, {[], [], []}, fn
130      nil, acc ->
131        acc
132      struct, {fetch_ids, loaded_ids, loaded_structs} ->
133        assert_struct!(module, struct)
134        %{^owner_key => id, ^field => value} = struct
135
136        cond do
137          card == :one and Ecto.assoc_loaded?(value) and not force? ->
138            {fetch_ids, [id|loaded_ids], [value|loaded_structs]}
139          card == :many and Ecto.assoc_loaded?(value) and not force? ->
140            {fetch_ids,
141             List.duplicate(id, length(value)) ++ loaded_ids,
142             value ++ loaded_structs}
143          is_nil(id) ->
144            {fetch_ids, loaded_ids, loaded_structs}
145          true ->
146            {[id|fetch_ids], loaded_ids, loaded_structs}
147        end
148    end
149  end
150
151  defp fetch_query([], _assoc, _repo, _query, _prefix, _related_key, _take, _opts) do
152    {[], []}
153  end
154
155  defp fetch_query(ids, _assoc, _repo, query, _prefix, {_, key}, _take, _opts) when is_function(query, 1) do
156    data = ids |> Enum.uniq |> query.() |> Enum.map(&{Map.fetch!(&1, key), &1}) |> Enum.sort
157    unzip_ids data, [], []
158  end
159
160  defp fetch_query(ids, %{cardinality: card} = assoc, repo, query, prefix, related_key, take, opts) do
161    query = assoc.__struct__.assoc_query(assoc, query, Enum.uniq(ids))
162    field = related_key_to_field(query, related_key)
163
164    # Normalize query
165    query = %{Ecto.Query.Planner.returning(query, take || true) | prefix: prefix}
166
167    # Add the related key to the query results
168    query = update_in query.select.expr, &{:{}, [], [field, &1]}
169
170    # If we are returning many results, we must sort by the key too
171    query =
172      case card do
173        :many ->
174          update_in query.order_bys, fn order_bys ->
175            [%Ecto.Query.QueryExpr{expr: [asc: field], params: [],
176                                   file: __ENV__.file, line: __ENV__.line}|order_bys]
177          end
178        :one ->
179          query
180      end
181
182    unzip_ids repo.all(query, opts), [], []
183  end
184
185  defp related_key_to_field(query, {pos, key}) do
186    {{:., [], [{:&, [], [related_key_pos(query, pos)]}, key]}, [], []}
187  end
188
189  defp related_key_pos(_query, pos) when pos >= 0, do: pos
190  defp related_key_pos(query, pos), do: Ecto.Query.Builder.count_binds(query) + pos
191
192  defp unzip_ids([{k, v}|t], acc1, acc2), do: unzip_ids(t, [k|acc1], [v|acc2])
193  defp unzip_ids([], acc1, acc2), do: {acc1, acc2}
194
195  defp assert_struct!(mod, %{__struct__: mod}), do: true
196  defp assert_struct!(mod, %{__struct__: struct}) do
197    raise ArgumentError, "expected a homogeneous list containing the same struct, " <>
198                         "got: #{inspect mod} and #{inspect struct}"
199  end
200
201  defp assoc_map(:one, ids, structs) do
202    one_assoc_map(ids, structs, %{})
203  end
204  defp assoc_map(:many, ids, structs) do
205    many_assoc_map(ids, structs, %{})
206  end
207
208  defp one_assoc_map([id|ids], [struct|structs], map) do
209    one_assoc_map(ids, structs, Map.put(map, id, struct))
210  end
211  defp one_assoc_map([], [], map) do
212    map
213  end
214
215  defp many_assoc_map([id|ids], [struct|structs], map) do
216    {ids, structs, acc} = split_while(ids, structs, id, [struct])
217    many_assoc_map(ids, structs, Map.put(map, id, acc))
218  end
219  defp many_assoc_map([], [], map) do
220    map
221  end
222
223  defp split_while([id|ids], [struct|structs], id, acc),
224    do: split_while(ids, structs, id, [struct|acc])
225  defp split_while(ids, structs, _id, acc),
226    do: {ids, structs, acc}
227
228  ## Load preloaded data
229
230  defp load_assoc({:assoc, _assoc, _ids}, nil) do
231    nil
232  end
233
234  defp load_assoc({:assoc, assoc, ids}, struct) do
235    %{field: field, owner_key: owner_key, cardinality: cardinality} = assoc
236    key = Map.fetch!(struct, owner_key)
237
238    loaded =
239      case ids do
240        %{^key => value} -> value
241        _ when cardinality == :many -> []
242        _ -> nil
243      end
244
245    Map.put(struct, field, loaded)
246  end
247
248  defp load_through({:through, assoc, throughs}, struct) do
249    %{cardinality: cardinality, field: field, owner: owner} = assoc
250    {loaded, _} = Enum.reduce(throughs, {[struct], owner}, &recur_through/2)
251    Map.put(struct, field, maybe_first(loaded, cardinality))
252  end
253
254  defp maybe_first(list, :one), do: List.first(list)
255  defp maybe_first(list, _), do: list
256
257  defp recur_through(field, {structs, owner}) do
258    assoc = owner.__schema__(:association, field)
259    case assoc.__struct__.preload_info(assoc) do
260      {:assoc, %{related: related}, _} ->
261        pks = related.__schema__(:primary_key)
262
263        {children, _} =
264          Enum.reduce(structs, {[], %{}}, fn struct, acc ->
265            children = struct |> Map.fetch!(field) |> List.wrap
266
267            Enum.reduce children, acc, fn child, {fresh, set} ->
268              keys = through_pks(child, pks, assoc)
269              case set do
270                %{^keys => true} ->
271                  {fresh, set}
272                _ ->
273                  {[child|fresh], Map.put(set, keys, true)}
274              end
275            end
276          end)
277
278        {Enum.reverse(children), related}
279      {:through, _, through} ->
280        Enum.reduce(through, {structs, owner}, &recur_through/2)
281    end
282  end
283
284  defp through_pks(map, pks, assoc) do
285    Enum.map pks, fn pk ->
286      case map do
287        %{^pk => value} -> value
288        _ ->
289          raise ArgumentError,
290            "cannot preload through association `#{assoc.field}` on `#{inspect assoc.owner}`. " <>
291            "Ecto expected a map/struct with the key `#{pk}` but got: #{inspect map}"
292      end
293    end
294  end
295
296  ## Normalizer
297
298  def normalize(preload, take, original) do
299    normalize_each(wrap(preload, original), [], take, original)
300  end
301
302  defp normalize_each({atom, {query, list}}, acc, take, original)
303       when is_atom(atom) and (is_map(query) or is_function(query, 1)) do
304    fields = take(take, atom)
305    [{atom, {fields, query!(query), normalize_each(wrap(list, original), [], fields, original)}}|acc]
306  end
307
308  defp normalize_each({atom, query}, acc, take, _original)
309       when is_atom(atom) and (is_map(query) or is_function(query, 1)) do
310    [{atom, {take(take, atom), query!(query), []}}|acc]
311  end
312
313  defp normalize_each({atom, list}, acc, take, original) when is_atom(atom) do
314    fields = take(take, atom)
315    [{atom, {fields, nil, normalize_each(wrap(list, original), [], fields, original)}}|acc]
316  end
317
318  defp normalize_each(atom, acc, take, _original) when is_atom(atom) do
319    [{atom, {take(take, atom), nil, []}}|acc]
320  end
321
322  defp normalize_each(other, acc, take, original) do
323    Enum.reduce(wrap(other, original), acc, &normalize_each(&1, &2, take, original))
324  end
325
326  defp query!(query) when is_function(query, 1), do: query
327  defp query!(%Ecto.Query{} = query), do: query
328
329  defp take(take, field) do
330    case Access.fetch(take, field) do
331      {:ok, fields} -> List.wrap(fields)
332      :error -> nil
333    end
334  end
335
336  defp wrap(list, _original) when is_list(list),
337    do: list
338  defp wrap(atom, _original) when is_atom(atom),
339    do: atom
340  defp wrap(other, original) do
341    raise ArgumentError, "invalid preload `#{inspect other}` in `#{inspect original}`. " <>
342                         "preload expects an atom, a (nested) keyword or a (nested) list of atoms"
343  end
344
345  ## Expand
346
347  def expand(schema, preloads, acc) do
348    Enum.reduce(preloads, acc, fn {preload, {fields, query, sub_preloads}}, {assocs, throughs} ->
349      assoc = association_from_schema!(schema, preload)
350      info  = assoc.__struct__.preload_info(assoc)
351
352      case info do
353        {:assoc, _, _} ->
354          value  = {info, fields, query, sub_preloads}
355          assocs = Map.update(assocs, preload, value, &merge_preloads(preload, value, &1))
356          {assocs, throughs}
357        {:through, _, through} ->
358          through =
359            through
360            |> Enum.reverse()
361            |> Enum.reduce({fields, query, sub_preloads}, &{nil, nil, [{&1, &2}]})
362            |> elem(2)
363          expand(schema, through, {assocs, Map.put(throughs, preload, info)})
364      end
365    end)
366  end
367
368  defp merge_preloads(_preload, {info, _, nil, left}, {info, take, query, right}),
369    do: {info, take, query, left ++ right}
370  defp merge_preloads(_preload, {info, take, query, left}, {info, _, nil, right}),
371    do: {info, take, query, left ++ right}
372  defp merge_preloads(preload, {info, _, left, _}, {info, _, right, _}) do
373    raise ArgumentError, "cannot preload `#{preload}` as it has been supplied more than once " <>
374                         "with different queries: #{inspect left} and #{inspect right}"
375  end
376
377  # Since there is some ambiguity between assoc and queries.
378  # We reimplement this function here for nice error messages.
379  defp association_from_schema!(schema, assoc) do
380    schema.__schema__(:association, assoc) ||
381      raise ArgumentError,
382            "schema #{inspect schema} does not have association #{inspect assoc}#{maybe_module(assoc)}"
383  end
384
385  defp maybe_module(assoc) do
386    case Atom.to_string(assoc) do
387      "Elixir." <> _ ->
388        " (if you were trying to pass a schema as a query to preload, " <>
389          "you have to explicitly convert it to a query by doing `from x in #{inspect assoc}` " <>
390          "or by calling Ecto.Queryable.to_query/1)"
391
392      _ ->
393        ""
394    end
395  end
396
397  defp reraise(exception) do
398    reraise exception, Enum.reject(System.stacktrace, &match?({__MODULE__, _, _, _}, &1))
399  end
400end
401