1defmodule Ecto.Repo.Queryable do
2  # The module invoked by user defined repos
3  # for query related functionality.
4  @moduledoc false
5
6  @dialyzer {:no_opaque, transaction: 4}
7
8  alias Ecto.Query
9  alias Ecto.Queryable
10  alias Ecto.Query.Planner
11  alias Ecto.Query.SelectExpr
12
13  require Ecto.Query
14
15  def transaction(adapter, repo, fun, opts) when is_function(fun, 0) do
16    adapter.transaction(repo, opts, fun)
17  end
18
19  def transaction(adapter, repo, %Ecto.Multi{} = multi, opts) do
20    wrap   = &adapter.transaction(repo, opts, &1)
21    return = &adapter.rollback(repo, &1)
22
23    case Ecto.Multi.__apply__(multi, repo, wrap, return) do
24      {:ok, values} ->
25        {:ok, values}
26      {:error, {key, error_value, values}} ->
27        {:error, key, error_value, values}
28    end
29  end
30
31  def all(repo, adapter, queryable, opts) when is_list(opts) do
32    query =
33      queryable
34      |> Ecto.Queryable.to_query
35      |> Ecto.Query.Planner.returning(true)
36      |> attach_prefix(opts)
37    execute(:all, repo, adapter, query, opts) |> elem(1)
38  end
39
40  def stream(repo, adapter, queryable, opts) when is_list(opts) do
41    query =
42      queryable
43      |> Ecto.Queryable.to_query
44      |> Ecto.Query.Planner.returning(true)
45      |> attach_prefix(opts)
46    stream(:all, repo, adapter, query, opts)
47  end
48
49  def get(repo, adapter, queryable, id, opts) do
50    one(repo, adapter, query_for_get(repo, queryable, id), opts)
51  end
52
53  def get!(repo, adapter, queryable, id, opts) do
54    one!(repo, adapter, query_for_get(repo, queryable, id), opts)
55  end
56
57  def get_by(repo, adapter, queryable, clauses, opts) do
58    one(repo, adapter, query_for_get_by(repo, queryable, clauses), opts)
59  end
60
61  def get_by!(repo, adapter, queryable, clauses, opts) do
62    one!(repo, adapter, query_for_get_by(repo, queryable, clauses), opts)
63  end
64
65  def aggregate(repo, adapter, queryable, aggregate, field, opts) do
66    one!(repo, adapter, query_for_aggregate(queryable, aggregate, field), opts)
67  end
68
69  def one(repo, adapter, queryable, opts) do
70    case all(repo, adapter, queryable, opts) do
71      [one] -> one
72      []    -> nil
73      other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other)
74    end
75  end
76
77  def one!(repo, adapter, queryable, opts) do
78    case all(repo, adapter, queryable, opts) do
79      [one] -> one
80      []    -> raise Ecto.NoResultsError, queryable: queryable
81      other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other)
82    end
83  end
84
85  def update_all(repo, adapter, queryable, [], opts) when is_list(opts) do
86    update_all(repo, adapter, queryable, opts)
87  end
88
89  def update_all(repo, adapter, queryable, updates, opts) when is_list(opts) do
90    query = Query.from queryable, update: ^updates
91    update_all(repo, adapter, query, opts)
92  end
93
94  defp update_all(repo, adapter, queryable, opts) do
95    query =
96      queryable
97      |> Ecto.Queryable.to_query
98      |> Ecto.Query.Planner.assert_no_select!(:update_all)
99      |> Ecto.Query.Planner.returning(opts[:returning] || false)
100      |> attach_prefix(opts)
101    execute(:update_all, repo, adapter, query, opts)
102  end
103
104  def delete_all(repo, adapter, queryable, opts) when is_list(opts) do
105    query =
106      queryable
107      |> Ecto.Queryable.to_query
108      |> Ecto.Query.Planner.assert_no_select!(:delete_all)
109      |> Ecto.Query.Planner.returning(opts[:returning] || false)
110      |> attach_prefix(opts)
111    execute(:delete_all, repo, adapter, query, opts)
112  end
113
114  ## Helpers
115
116  defp attach_prefix(query, opts) do
117    case Keyword.fetch(opts, :prefix) do
118      {:ok, prefix} -> %{query | prefix: prefix}
119      :error -> query
120    end
121  end
122
123  defp execute(operation, repo, adapter, query, opts) when is_list(opts) do
124    {meta, prepared, params} = Planner.query(query, operation, repo, adapter, 0)
125
126    case meta do
127      %{select: nil} ->
128        adapter.execute(repo, meta, prepared, params, nil, opts)
129      %{select: select, prefix: prefix, sources: sources, preloads: preloads} ->
130        %{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select
131        all_nil? = tuple_size(sources) != 1
132        preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter)
133        {count, rows} = adapter.execute(repo, meta, prepared, params, preprocessor, opts)
134        postprocessor = postprocessor(postprocess, take, prefix, adapter)
135
136        {count,
137          rows
138          |> Ecto.Repo.Assoc.query(assocs, sources)
139          |> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts)}
140    end
141  end
142
143  defp stream(operation, repo, adapter, query, opts) do
144    {meta, prepared, params} = Planner.query(query, operation, repo, adapter, 0)
145
146    case meta do
147      %{select: nil} ->
148        repo
149        |> adapter.stream(meta, prepared, params, nil, opts)
150        |> Stream.flat_map(fn {_, nil} -> [] end)
151      %{select: select, prefix: prefix, sources: sources, preloads: preloads} ->
152        %{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select
153        all_nil? = tuple_size(sources) != 1
154        preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter)
155        stream = adapter.stream(repo, meta, prepared, params, preprocessor, opts)
156        postprocessor = postprocessor(postprocess, take, prefix, adapter)
157
158        Stream.flat_map(stream, fn {_, rows} ->
159          rows
160          |> Ecto.Repo.Assoc.query(assocs, sources)
161          |> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts)
162        end)
163    end
164  end
165
166  defp preprocess(row, [], _all_nil?, _prefix, _adapter) do
167    row
168  end
169  defp preprocess(row, [{:source, source_schema, fields} | sources], all_nil?, prefix, adapter) do
170    {entry, rest} = process_source(source_schema, fields, row, all_nil?, prefix, adapter)
171    [entry | preprocess(rest, sources, true, prefix, adapter)]
172  end
173  defp preprocess(row, [source | sources], all_nil?, prefix, adapter) do
174    {entry, rest} = process(row, source, nil, prefix, adapter)
175    [entry | preprocess(rest, sources, all_nil?, prefix, adapter)]
176  end
177
178  defp postprocessor({:from, :any, postprocess}, _take, prefix, adapter) do
179    fn [from | row] ->
180      row |> process(postprocess, from, prefix, adapter) |> elem(0)
181    end
182  end
183  defp postprocessor({:from, :map, postprocess}, take, prefix, adapter) do
184    fn [from | row] ->
185      row |> process(postprocess, to_map(from, take), prefix, adapter) |> elem(0)
186    end
187  end
188  defp postprocessor(postprocess, _take, prefix, adapter) do
189    fn row -> row |> process(postprocess, nil, prefix, adapter) |> elem(0) end
190  end
191
192  defp process(row, {:merge, left, right}, from, prefix, adapter) do
193    {left, row} = process(row, left, from, prefix, adapter)
194    {right, row} = process(row, right, from, prefix, adapter)
195
196    data =
197      case {left, right} do
198        {%{__struct__: struct}, %{__struct__: struct}} ->
199          right
200          |> Map.from_struct()
201          |> Enum.reduce(left, fn {key, value}, acc -> %{acc | key => value} end)
202        {_, %{__struct__: _}} ->
203          raise ArgumentError, "can only merge with a struct on the right side when both sides " <>
204                               "represent the same struct. Left side is #{inspect left} and " <>
205                               "right side is #{inspect right}"
206        {%{__struct__: _}, %{}} ->
207          Enum.reduce(right, left, fn {key, value}, acc -> %{acc | key => value} end)
208        {%{}, %{}} ->
209          Map.merge(left, right)
210        {_, %{}} ->
211          raise ArgumentError, "cannot merge because the left side is not a map, got: #{inspect left}"
212        {%{}, _} ->
213          raise ArgumentError, "cannot merge because the right side is not a map, got: #{inspect right}"
214      end
215
216    {data, row}
217  end
218  defp process(row, {:struct, struct, data, args}, from, prefix, adapter) do
219    case process(row, data, from, prefix, adapter) do
220      {%{__struct__: ^struct} = data, row} ->
221        process_update(data, args, row, from, prefix, adapter)
222      {data, _row} ->
223        raise BadStructError, struct: struct, term: data
224    end
225  end
226  defp process(row, {:struct, struct, args}, from, prefix, adapter) do
227    {fields, row} = process_kv(args, row, from, prefix, adapter)
228
229    case Map.merge(struct.__struct__(), Map.new(fields)) do
230      %{__meta__: %Ecto.Schema.Metadata{source: {_, source}} = metadata} = struct ->
231        metadata = %{metadata | state: :loaded, source: {prefix, source}}
232        {Map.put(struct, :__meta__, metadata), row}
233      map ->
234        {map, row}
235    end
236  end
237  defp process(row, {:map, data, args}, from, prefix, adapter) do
238    {data, row} = process(row, data, from, prefix, adapter)
239    process_update(data, args, row, from, prefix, adapter)
240  end
241  defp process(row, {:map, args}, from, prefix, adapter) do
242    {args, row} = process_kv(args, row, from, prefix, adapter)
243    {Map.new(args), row}
244  end
245  defp process(row, {:list, args}, from, prefix, adapter) do
246    process_args(args, row, from, prefix, adapter)
247  end
248  defp process(row, {:tuple, args}, from, prefix, adapter) do
249    {args, row} = process_args(args, row, from, prefix, adapter)
250    {List.to_tuple(args), row}
251  end
252  defp process(row, {:source, :from}, from, _prefix, _adapter) do
253    {from, row}
254  end
255  defp process(row, {:source, source_schema, fields}, _from, prefix, adapter) do
256    process_source(source_schema, fields, row, true, prefix, adapter)
257  end
258  defp process([value | row], {:value, :any}, _from, _prefix, _adapter) do
259    {value, row}
260  end
261  defp process([value | row], {:value, type}, _from, _prefix, adapter) do
262    {load!(type, value, nil, nil, adapter), row}
263  end
264  defp process(row, value, _from, _prefix, _adapter)
265       when is_binary(value) or is_number(value) or is_atom(value) do
266    {value, row}
267  end
268
269  defp process_update(data, args, row, from, prefix, adapter) do
270    {args, row} = process_kv(args, row, from, prefix, adapter)
271    data = Enum.reduce(args, data, fn {key, value}, acc -> %{acc | key => value} end)
272    {data, row}
273  end
274
275  defp process_source({source, schema}, types, row, all_nil?, prefix, adapter) do
276    case split_values(types, row, [], all_nil?) do
277      {nil, row} ->
278        {nil, row}
279      {values, row} ->
280        struct = if schema, do: schema.__struct__(), else: %{}
281        loader = &Ecto.Type.adapter_load(adapter, &1, &2)
282        {Ecto.Schema.__safe_load__(struct, types, values, prefix, source, loader), row}
283    end
284  end
285
286  defp split_values([_ | types], [nil | values], acc, all_nil?) do
287    split_values(types, values, [nil | acc], all_nil?)
288  end
289  defp split_values([_ | types], [value | values], acc, _all_nil?) do
290    split_values(types, values, [value | acc], false)
291  end
292  defp split_values([], values, _acc, true) do
293    {nil, values}
294  end
295  defp split_values([], values, acc, false) do
296    {Enum.reverse(acc), values}
297  end
298
299  defp process_args(args, row, from, prefix, adapter) do
300    Enum.map_reduce(args, row, fn arg, row ->
301      process(row, arg, from, prefix, adapter)
302    end)
303  end
304
305  defp process_kv(kv, row, from, prefix, adapter) do
306    Enum.map_reduce(kv, row, fn {key, value}, row ->
307      {key, row} = process(row, key, from, prefix, adapter)
308      {value, row} = process(row, value, from, prefix, adapter)
309      {{key, value}, row}
310    end)
311  end
312
313  defp load!(type, value, field, struct, adapter) do
314    case Ecto.Type.adapter_load(adapter, type, value) do
315      {:ok, value} ->
316        value
317      :error ->
318        field = field && " for field #{inspect field}"
319        struct = struct && " in #{inspect struct}"
320        raise ArgumentError, "cannot load `#{inspect value}` as type #{inspect type}#{field}#{struct}"
321    end
322  end
323
324  defp to_map(nil, _fields) do
325    nil
326  end
327  defp to_map(value, fields) when is_list(value) do
328    Enum.map(value, &to_map(&1, fields))
329  end
330  defp to_map(value, fields) do
331    for field <- fields, into: %{} do
332      case field do
333        {k, v} -> {k, to_map(Map.fetch!(value, k), List.wrap(v))}
334        k -> {k, Map.fetch!(value, k)}
335      end
336    end
337  end
338
339  defp query_for_get(repo, _queryable, nil) do
340    raise ArgumentError, "cannot perform #{inspect repo}.get/2 because the given value is nil"
341  end
342
343  defp query_for_get(repo, queryable, id) do
344    query  = Queryable.to_query(queryable)
345    schema = assert_schema!(query)
346    case schema.__schema__(:primary_key) do
347      [pk] ->
348        Query.from(x in query, where: field(x, ^pk) == ^id)
349      pks ->
350        raise ArgumentError,
351          "#{inspect repo}.get/2 requires the schema #{inspect schema} " <>
352          "to have exactly one primary key, got: #{inspect pks}"
353    end
354  end
355
356  defp query_for_get_by(_repo, queryable, clauses) do
357    Query.where(queryable, [], ^Enum.to_list(clauses))
358  end
359
360  defp query_for_aggregate(queryable, aggregate, field) do
361    query = %{Queryable.to_query(queryable) | preloads: [], assocs: []}
362    ast   = field(0, field)
363
364    query =
365      case query do
366        %{group_bys: [_|_]} ->
367          raise Ecto.QueryError, message: "cannot aggregate on query with group_by", query: query
368        %{distinct: nil, limit: nil, offset: nil} ->
369          %{query | order_bys: []}
370        _ ->
371          select = %SelectExpr{expr: ast, file: __ENV__.file, line: __ENV__.line}
372          %{query | select: select}
373          |> Query.subquery()
374          |> Queryable.Ecto.SubQuery.to_query()
375      end
376
377    %{query | select: %SelectExpr{expr: {aggregate, [], [ast]},
378                                  file: __ENV__.file, line: __ENV__.line}}
379  end
380
381  defp field(ix, field) when is_integer(ix) and is_atom(field) do
382    {{:., [], [{:&, [], [ix]}, field]}, [], []}
383  end
384
385  defp assert_schema!(%{from: {_source, schema}}) when schema != nil, do: schema
386  defp assert_schema!(query) do
387    raise Ecto.QueryError,
388      query: query,
389      message: "expected a from expression with a schema"
390  end
391end
392