1defmodule Ecto.Query.Planner do
2  # Normalizes a query and its parameters.
3  @moduledoc false
4
5  alias Ecto.Query.{BooleanExpr, DynamicExpr, JoinExpr, QueryExpr, SelectExpr}
6
7  if map_size(%Ecto.Query{}) != 17 do
8    raise "Ecto.Query match out of date in builder"
9  end
10
11  @doc """
12  Converts a query to a list of joins.
13
14  The from is moved as last join with the where conditions as its "on"
15  in order to keep proper binding order.
16  """
17  def query_to_joins(qual, %{from: from, wheres: wheres, joins: joins}, position) do
18    on = %QueryExpr{file: __ENV__.file, line: __ENV__.line, expr: true, params: []}
19
20    on =
21      Enum.reduce(wheres, on, fn %BooleanExpr{op: op, expr: expr, params: params}, acc ->
22        merge_expr_and_params(op, acc, expr, params)
23      end)
24
25    join = %JoinExpr{qual: qual, source: from, file: __ENV__.file, line: __ENV__.line, on: on}
26    last = length(joins) + position
27
28    mapping = fn
29      0  -> last
30      ix -> ix + position - 1
31    end
32
33    for {%{on: on} = join, ix} <- Enum.with_index(joins ++ [join]) do
34      %{join | on: rewrite_sources(on, mapping), ix: ix + position}
35    end
36  end
37
38  defp merge_expr_and_params(op, %QueryExpr{expr: left_expr, params: left_params} = struct,
39                             right_expr, right_params) do
40    right_expr =
41      case length(left_params) do
42        0 ->
43          right_expr
44        prefix ->
45          Macro.prewalk(right_expr, fn
46            {:^, meta, [counter]} when is_integer(counter) -> {:^, meta, [prefix + counter]}
47            other -> other
48          end)
49      end
50
51    %{struct | expr: merge_expr(op, left_expr, right_expr), params: left_params ++ right_params}
52  end
53
54  defp merge_expr(_op, left, true), do: left
55  defp merge_expr(_op, true, right), do: right
56  defp merge_expr(op, left, right), do: {op, [], [left, right]}
57
58  @doc """
59  Rewrites the given query expression sources using the given mapping.
60  """
61  def rewrite_sources(%{expr: expr, params: params} = part, mapping) do
62    expr =
63      Macro.prewalk expr, fn
64        %Ecto.Query.Tagged{type: type, tag: tag} = tagged ->
65          %{tagged | type: rewrite_type(type, mapping), tag: rewrite_type(tag, mapping)}
66        {:&, meta, [ix]} ->
67          {:&, meta, [mapping.(ix)]}
68        other ->
69          other
70      end
71
72    params =
73      Enum.map params, fn
74        {val, type} ->
75          {val, rewrite_type(type, mapping)}
76        val ->
77          val
78      end
79
80    %{part | expr: expr, params: params}
81  end
82
83  defp rewrite_type({composite, {ix, field}}, mapping) when is_integer(ix) do
84    {composite, {mapping.(ix), field}}
85  end
86
87  defp rewrite_type({ix, field}, mapping) when is_integer(ix) do
88    {mapping.(ix), field}
89  end
90
91  defp rewrite_type(other, _mapping) do
92    other
93  end
94
95  @doc """
96  Plans the query for execution.
97
98  Planning happens in multiple steps:
99
100    1. First the query is prepared by retrieving
101       its cache key, casting and merging parameters
102
103    2. Then a cache lookup is done, if the query is
104       cached, we are done
105
106    3. If there is no cache, we need to actually
107       normalize and validate the query, asking the
108       adapter to prepare it
109
110    4. The query is sent to the adapter to be generated
111
112  ## Cache
113
114  All entries in the query, except the preload and sources
115  field, should be part of the cache key.
116
117  The cache value is the compiled query by the adapter
118  along-side the select expression.
119  """
120  def query(query, operation, repo, adapter, counter) do
121    {query, params, key} = prepare(query, operation, adapter, counter)
122    if key == :nocache do
123      {_, select, prepared} = query_without_cache(query, operation, adapter, counter)
124      {build_meta(query, select), {:nocache, prepared}, params}
125    else
126      query_with_cache(query, operation, repo, adapter, counter, key, params)
127    end
128  end
129
130  defp query_with_cache(query, operation, repo, adapter, counter, key, params) do
131    case query_lookup(query, operation, repo, adapter, counter, key) do
132      {:nocache, select, prepared} ->
133        {build_meta(query, select), {:nocache, prepared}, params}
134      {_, :cached, select, cached} ->
135        reset = &cache_reset(repo, key, &1)
136        {build_meta(query, select), {:cached, reset, cached}, params}
137      {_, :cache, select, prepared} ->
138        update = &cache_update(repo, key, &1)
139        {build_meta(query, select), {:cache, update, prepared}, params}
140    end
141  end
142
143  defp query_lookup(query, operation, repo, adapter, counter, key) do
144    try do
145      :ets.lookup(repo, key)
146    rescue
147      ArgumentError ->
148        raise ArgumentError,
149          "repo #{inspect repo} is not started, please ensure it is part of your supervision tree"
150    else
151      [term] -> term
152      [] -> query_prepare(query, operation, adapter, counter, repo, key)
153    end
154  end
155
156  defp query_prepare(query, operation, adapter, counter, repo, key) do
157    case query_without_cache(query, operation, adapter, counter) do
158      {:cache, select, prepared} ->
159        elem = {key, :cache, select, prepared}
160        cache_insert(repo, key, elem)
161      {:nocache, _, _} = nocache ->
162        nocache
163    end
164  end
165
166  defp cache_insert(repo, key, elem) do
167    case :ets.insert_new(repo, elem) do
168      true ->
169        elem
170      false ->
171        [elem] = :ets.lookup(repo, key)
172        elem
173    end
174  end
175
176  defp cache_update(repo, key, cached) do
177    _ = :ets.update_element(repo, key, [{2, :cached}, {4, cached}])
178    :ok
179  end
180
181  defp cache_reset(repo, key, prepared) do
182    _ = :ets.update_element(repo, key, [{2, :cache}, {4, prepared}])
183    :ok
184  end
185
186  defp query_without_cache(query, operation, adapter, counter) do
187    {query, select} = normalize(query, operation, adapter, counter)
188    {cache, prepared} = adapter.prepare(operation, query)
189    {cache, select, prepared}
190  end
191
192  defp build_meta(%{prefix: prefix, sources: sources, preloads: preloads}, select) do
193    %{prefix: prefix, select: select, preloads: preloads, sources: sources}
194  end
195
196  @doc """
197  Prepares the query for cache.
198
199  This means all the parameters from query expressions are
200  merged into a single value and their entries are pruned
201  from the query.
202
203  This function is called by the backend before invoking
204  any cache mechanism.
205  """
206  def prepare(query, operation, adapter, counter) do
207    query
208    |> prepare_sources(adapter)
209    |> prepare_assocs
210    |> prepare_cache(operation, adapter, counter)
211  rescue
212    e ->
213      # Reraise errors so we ignore the planner inner stacktrace
214      reraise e
215  end
216
217  @doc """
218  Prepare all sources, by traversing and expanding joins.
219  """
220  def prepare_sources(%{from: from} = query, adapter) do
221    from = from || error!(query, "query must have a from expression")
222    from = prepare_source(query, from, adapter)
223    {joins, sources, tail_sources} = prepare_joins(query, [from], length(query.joins), adapter)
224    %{query | from: from, joins: joins |> Enum.reverse,
225              sources: (tail_sources ++ sources) |> Enum.reverse |> List.to_tuple()}
226  end
227
228  defp prepare_source(query, %Ecto.SubQuery{query: inner_query} = subquery, adapter) do
229    try do
230      {inner_query, params, key} = prepare(inner_query, :all, adapter, 0)
231      assert_no_subquery_assocs!(inner_query)
232      {inner_query, select} = inner_query |> returning(true) |> subquery_select(adapter)
233      %{subquery | query: inner_query, params: params, cache: key, select: select}
234    rescue
235      e -> raise Ecto.SubQueryError, query: query, exception: e
236    end
237  end
238
239  defp prepare_source(_query, {nil, schema}, _adapter) when is_atom(schema) and schema != nil,
240    do: {schema.__schema__(:source), schema}
241  defp prepare_source(_query, {source, schema}, _adapter) when is_binary(source) and is_atom(schema),
242    do: {source, schema}
243  defp prepare_source(_query, {:fragment, _, _} = source, _adapter),
244    do: source
245
246  defp assert_no_subquery_assocs!(%{assocs: assocs, preloads: preloads} = query)
247       when assocs != [] or preloads != [] do
248    error!(query, "cannot preload associations in subquery")
249  end
250  defp assert_no_subquery_assocs!(query) do
251    query
252  end
253
254  defp subquery_select(%{select: %{expr: expr, take: take} = select} = query, adapter) do
255    expr =
256      case subquery_select(expr, take, query) do
257        {nil, fields} ->
258          {:%{}, [], fields}
259        {struct, fields} ->
260          {:%, [], [struct, {:%{}, [], fields}]}
261      end
262    query = put_in(query.select.expr, expr)
263    {expr, _} = prewalk(expr, :select, query, select, 0, adapter)
264    {meta, _fields, _from} = collect_fields(expr, [], :error, query, take)
265    {query, meta}
266  end
267
268  defp subquery_select({:merge, _, [left, right]}, take, query) do
269    {left_struct, left_fields} = subquery_select(left, take, query)
270    {right_struct, right_fields} = subquery_select(right, take, query)
271    struct =
272      case {left_struct, right_struct} do
273        {struct, struct} -> struct
274        {_, nil} -> left_struct
275        {nil, _} -> error!(query, "cannot merge because the left side is a map " <>
276                                  "and the right side is a #{inspect right_struct} struct")
277        {_, _} -> error!(query, "cannot merge because the left side is a #{inspect left_struct} " <>
278                                "and the right side is a #{inspect right_struct} struct")
279      end
280    {struct, Keyword.merge(left_fields, right_fields)}
281  end
282  defp subquery_select({:%, _, [name, map]}, take, query) do
283    {_, fields} = subquery_select(map, take, query)
284    {name, fields}
285  end
286  defp subquery_select({:%{}, _, [{:|, _, [{:&, [], [ix]}, pairs]}]} = expr, take, query) do
287    assert_subquery_fields!(query, expr, pairs)
288    {source, _} = source_take!(:select, query, take, ix, ix)
289    {struct, fields} = subquery_struct_and_fields(source)
290
291    # Map updates may contain virtual fields, so we need to consider those
292    valid_keys = if struct, do: Map.keys(struct.__struct__), else: fields
293    update_keys = Keyword.keys(pairs)
294
295    case update_keys -- valid_keys do
296      [] -> :ok
297      [key | _] -> error!(query, "invalid key `#{inspect key}` on map update in subquery")
298    end
299
300    # In case of map updates, we need to remove duplicated fields
301    # at query time because we use the field names as aliases and
302    # duplicate aliases will lead to invalid queries.
303    kept_keys = fields -- update_keys
304    {struct, subquery_fields(kept_keys, ix) ++ pairs}
305  end
306  defp subquery_select({:%{}, _, pairs} = expr, _take, query) do
307    assert_subquery_fields!(query, expr, pairs)
308    {nil, pairs}
309  end
310  defp subquery_select({:&, _, [ix]}, take, query) do
311    {source, _} = source_take!(:select, query, take, ix, ix)
312    {struct, fields} = subquery_struct_and_fields(source)
313    {struct, subquery_fields(fields, ix)}
314  end
315  defp subquery_select({{:., _, [{:&, _, [ix]}, field]}, _, []}, _take, _query) do
316    {nil, subquery_fields([field], ix)}
317  end
318  defp subquery_select(expr, _take, query) do
319    error!(query, "subquery must select a source (t), a field (t.field) or a map, got: `#{Macro.to_string(expr)}`")
320  end
321
322  defp subquery_struct_and_fields({:source, {_, schema}, types}) do
323    {schema, Keyword.keys(types)}
324  end
325  defp subquery_struct_and_fields({:struct, name, types}) do
326    {name, Keyword.keys(types)}
327  end
328  defp subquery_struct_and_fields({:map, types}) do
329    {nil, Keyword.keys(types)}
330  end
331
332  defp subquery_fields(fields, ix) do
333    for field <- fields do
334      {field, {{:., [], [{:&, [], [ix]}, field]}, [], []}}
335    end
336  end
337
338  defp subquery_types(%{select: {:map, types}}), do: types
339  defp subquery_types(%{select: {:struct, _name, types}}), do: types
340
341  defp assert_subquery_fields!(query, expr, pairs) do
342    Enum.each(pairs, fn
343      {key, _} when not is_atom(key) ->
344        error!(query, "only atom keys are allowed when selecting a map in subquery, got: `#{Macro.to_string(expr)}`")
345      {key, value} ->
346        if valid_subquery_value?(value) do
347          {key, value}
348        else
349          error!(query, "maps, lists, tuples and sources are not allowed as map values in subquery, got: `#{Macro.to_string(expr)}`")
350        end
351    end)
352  end
353
354  defp valid_subquery_value?({_, _}), do: false
355  defp valid_subquery_value?(args) when is_list(args), do: false
356  defp valid_subquery_value?({container, _, args})
357       when container in [:{}, :%{}, :&] and is_list(args), do: false
358  defp valid_subquery_value?(_), do: true
359
360  defp prepare_joins(query, sources, offset, adapter) do
361    prepare_joins(query.joins, query, [], sources, [], 1, offset, adapter)
362  end
363
364  defp prepare_joins([%JoinExpr{assoc: {ix, assoc}, qual: qual, on: on} = join|t],
365                     query, joins, sources, tail_sources, counter, offset, adapter) do
366    schema = schema_for_association_join!(query, join, Enum.fetch!(Enum.reverse(sources), ix))
367    refl = schema.__schema__(:association, assoc)
368
369    unless refl do
370      error! query, join, "could not find association `#{assoc}` on schema #{inspect schema}"
371    end
372
373    # If we have the following join:
374    #
375    #     from p in Post,
376    #       join: p in assoc(p, :comments)
377    #
378    # The callback below will return a query that contains only
379    # joins in a way it starts with the Post and ends in the
380    # Comment.
381    #
382    # This means we need to rewrite the joins below to properly
383    # shift the &... identifier in a way that:
384    #
385    #    &0         -> becomes assoc ix
386    #    &LAST_JOIN -> becomes counter
387    #
388    # All values in the middle should be shifted by offset,
389    # all values after join are already correct.
390    child = refl.__struct__.joins_query(refl)
391    last_ix = length(child.joins)
392    source_ix = counter
393
394    {child_joins, child_sources, child_tail} =
395      prepare_joins(child, [child.from], offset + last_ix - 1, adapter)
396
397    # Rewrite joins indexes as mentioned above
398    child_joins = Enum.map(child_joins, &rewrite_join(&1, qual, ix, last_ix, source_ix, offset))
399
400    # Drop the last resource which is the association owner (it is reversed)
401    child_sources = Enum.drop(child_sources, -1)
402
403    [current_source|child_sources] = child_sources
404    child_sources = child_tail ++ child_sources
405
406    prepare_joins(t, query, attach_on(child_joins, on) ++ joins, [current_source|sources],
407                  child_sources ++ tail_sources, counter + 1, offset + length(child_sources), adapter)
408  end
409
410  defp prepare_joins([%JoinExpr{source: %Ecto.Query{from: source} = join_query, qual: qual, on: on} = join|t],
411                      query, joins, sources, tail_sources, counter, offset, adapter) do
412    case join_query do
413      %{order_bys: [], limit: nil, offset: nil, group_bys: [], joins: [],
414        havings: [], preloads: [], assocs: [], distinct: nil, lock: nil} ->
415        source = prepare_source(query, source, adapter)
416        [join] = attach_on(query_to_joins(qual, %{join_query | from: source}, counter), on)
417        prepare_joins(t, query, [join|joins], [source|sources], tail_sources, counter + 1, offset, adapter)
418      _ ->
419        error! query, join, "queries in joins can only have `where` conditions"
420    end
421  end
422
423  defp prepare_joins([%JoinExpr{source: source} = join|t],
424                      query, joins, sources, tail_sources, counter, offset, adapter) do
425    source = prepare_source(query, source, adapter)
426    join = %{join | source: source, ix: counter}
427    prepare_joins(t, query, [join|joins], [source|sources], tail_sources, counter + 1, offset, adapter)
428  end
429
430  defp prepare_joins([], _query, joins, sources, tail_sources, _counter, _offset, _adapter) do
431    {joins, sources, tail_sources}
432  end
433
434  defp attach_on([%{on: on} = h | t], %{expr: expr, params: params}) do
435    [%{h | on: merge_expr_and_params(:and, on, expr, params)} | t]
436  end
437
438  defp rewrite_join(%{on: on, ix: join_ix} = join, qual, ix, last_ix, source_ix, inc_ix) do
439    on = update_in on.expr, fn expr ->
440      Macro.prewalk expr, fn
441        {:&, meta, [join_ix]} ->
442          {:&, meta, [rewrite_ix(join_ix, ix, last_ix, source_ix, inc_ix)]}
443        other ->
444          other
445      end
446    end
447
448    %{join | on: on, qual: qual,
449             ix: rewrite_ix(join_ix, ix, last_ix, source_ix, inc_ix)}
450  end
451
452  # We need to replace the source by the one from the assoc
453  defp rewrite_ix(0, ix, _last_ix, _source_ix, _inc_x), do: ix
454
455  # The last entry will have the current source index
456  defp rewrite_ix(last_ix, _ix, last_ix, source_ix, _inc_x), do: source_ix
457
458  # All above last are already correct
459  defp rewrite_ix(join_ix, _ix, last_ix, _source_ix, _inc_ix) when join_ix > last_ix, do: join_ix
460
461  # All others need to be incremented by the offset sources
462  defp rewrite_ix(join_ix, _ix, _last_ix, _source_ix, inc_ix), do: join_ix + inc_ix
463
464  defp schema_for_association_join!(query, join, source) do
465    case source do
466      {source, nil} ->
467          error! query, join, "cannot perform association join on #{inspect source} " <>
468                              "because it does not have a schema"
469      {_, schema} ->
470        schema
471      %Ecto.SubQuery{select: {:struct, schema, _}} ->
472        schema
473      %Ecto.SubQuery{} ->
474        error! query, join, "can only perform association joins on subqueries " <>
475                            "that return a source with schema in select"
476      _ ->
477        error! query, join, "can only perform association joins on sources with a schema"
478    end
479  end
480
481  @doc """
482  Prepare the parameters by merging and casting them according to sources.
483  """
484  def prepare_cache(query, operation, adapter, counter) do
485    {query, {cache, params}} =
486      traverse_exprs(query, operation, {[], []}, &{&3, merge_cache(&1, &2, &3, &4, adapter)})
487    {query, Enum.reverse(params), finalize_cache(query, operation, cache, counter)}
488  end
489
490  defp merge_cache(:from, _query, expr, {cache, params}, _adapter) do
491    {key, params} = source_cache(expr, params)
492    {merge_cache(key, cache, key != :nocache), params}
493  end
494
495  defp merge_cache(kind, query, expr, {cache, params}, adapter)
496      when kind in ~w(select distinct limit offset)a do
497    if expr do
498      {params, cacheable?} = cast_and_merge_params(kind, query, expr, params, adapter)
499      {merge_cache({kind, expr_to_cache(expr)}, cache, cacheable?), params}
500    else
501      {cache, params}
502    end
503  end
504
505  defp merge_cache(kind, query, exprs, {cache, params}, adapter)
506      when kind in ~w(where update group_by having order_by)a do
507    {expr_cache, {params, cacheable?}} =
508      Enum.map_reduce exprs, {params, true}, fn expr, {params, cacheable?} ->
509        {params, current_cacheable?} = cast_and_merge_params(kind, query, expr, params, adapter)
510        {expr_to_cache(expr), {params, cacheable? and current_cacheable?}}
511      end
512
513    case expr_cache do
514      [] -> {cache, params}
515      _  -> {merge_cache({kind, expr_cache}, cache, cacheable?), params}
516    end
517  end
518
519  defp merge_cache(:join, query, exprs, {cache, params}, adapter) do
520    {expr_cache, {params, cacheable?}} =
521      Enum.map_reduce exprs, {params, true}, fn
522        %JoinExpr{on: on, qual: qual, source: source} = join, {params, cacheable?} ->
523          {key, params} = source_cache(source, params)
524          {params, join_cacheable?} = cast_and_merge_params(:join, query, join, params, adapter)
525          {params, on_cacheable?} = cast_and_merge_params(:join, query, on, params, adapter)
526          {{qual, key, on.expr},
527           {params, cacheable? and join_cacheable? and on_cacheable? and key != :nocache}}
528      end
529
530    case expr_cache do
531      [] -> {cache, params}
532      _  -> {merge_cache({:join, expr_cache}, cache, cacheable?), params}
533    end
534  end
535
536  defp expr_to_cache(%BooleanExpr{op: op, expr: expr}), do: {op, expr}
537  defp expr_to_cache(%QueryExpr{expr: expr}), do: expr
538  defp expr_to_cache(%SelectExpr{expr: expr}), do: expr
539
540  defp cast_and_merge_params(kind, query, expr, params, adapter) do
541    Enum.reduce expr.params, {params, true}, fn {v, type}, {acc, cacheable?} ->
542      case cast_param(kind, query, expr, v, type, adapter) do
543        {:in, v} ->
544          {Enum.reverse(v, acc), false}
545        v ->
546          {[v|acc], cacheable?}
547      end
548    end
549  end
550
551  defp merge_cache(_left, _right, false),  do: :nocache
552  defp merge_cache(_left, :nocache, true), do: :nocache
553  defp merge_cache(left, right, true),     do: [left|right]
554
555  defp finalize_cache(_query, _operation, :nocache, _counter) do
556    :nocache
557  end
558
559  defp finalize_cache(%{assocs: assocs, prefix: prefix, lock: lock, select: select},
560                      operation, cache, counter) do
561    cache =
562      case select do
563        %{take: take} when take != %{} ->
564          [take: take] ++ cache
565        _ ->
566          cache
567      end
568
569    cache =
570      cache
571      |> prepend_if(assocs != [],  [assocs: assocs])
572      |> prepend_if(prefix != nil, [prefix: prefix])
573      |> prepend_if(lock != nil,   [lock: lock])
574
575    [operation, counter | cache]
576  end
577
578  defp prepend_if(cache, true, prepend), do: prepend ++ cache
579  defp prepend_if(cache, false, _prepend), do: cache
580
581  defp source_cache({_, nil} = source, params),
582    do: {source, params}
583  defp source_cache({bin, schema}, params),
584    do: {{bin, schema, schema.__schema__(:hash)}, params}
585  defp source_cache({:fragment, _, _} = source, params),
586    do: {source, params}
587  defp source_cache(%Ecto.SubQuery{params: inner, cache: key}, params),
588    do: {key, Enum.reverse(inner, params)}
589
590  defp cast_param(_kind, query, expr, %DynamicExpr{}, _type, _value) do
591    error! query, expr, "dynamic expressions can only be interpolated inside other " <>
592                        "dynamic expressions or at the top level of where, having, update or a join's on"
593  end
594  defp cast_param(_kind, query, expr, [{_, _} | _], _type, _value) do
595    error! query, expr, "keyword lists can only be interpolated at the top level of " <>
596                        "where, having, distinct, order_by, update or a join's on"
597  end
598  defp cast_param(kind, query, expr, v, type, adapter) do
599    type = field_type!(kind, query, expr, type)
600
601    try do
602      case cast_param(kind, type, v, adapter) do
603        {:ok, v} -> v
604        {:error, error} -> error! query, expr, error
605      end
606    catch
607      :error, %Ecto.QueryError{} = e ->
608        raise Ecto.Query.CastError, value: v, type: type, message: Exception.message(e)
609    end
610  end
611
612  defp cast_param(kind, type, v, adapter) do
613    with {:ok, type} <- normalize_param(kind, type, v),
614         {:ok, v} <- cast_param(kind, type, v),
615         do: dump_param(adapter, type, v)
616  end
617
618  @doc """
619  Prepare association fields found in the query.
620  """
621  def prepare_assocs(query) do
622    prepare_assocs(query, 0, query.assocs)
623    query
624  end
625
626  defp prepare_assocs(_query, _ix, []), do: :ok
627  defp prepare_assocs(query, ix, assocs) do
628    # We validate the schema exists when preparing joins above
629    {_, parent_schema} = get_source!(:preload, query, ix)
630
631    Enum.each assocs, fn {assoc, {child_ix, child_assocs}} ->
632      refl = parent_schema.__schema__(:association, assoc)
633
634      unless refl do
635        error! query, "field `#{inspect parent_schema}.#{assoc}` " <>
636                      "in preload is not an association"
637      end
638
639      case find_source_expr(query, child_ix) do
640        %JoinExpr{qual: qual} when qual in [:inner, :left, :inner_lateral, :left_lateral] ->
641          :ok
642        %JoinExpr{qual: qual} ->
643          error! query, "association `#{inspect parent_schema}.#{assoc}` " <>
644                        "in preload requires an inner, left or lateral join, got #{qual} join"
645        _ ->
646          :ok
647      end
648
649      prepare_assocs(query, child_ix, child_assocs)
650    end
651  end
652
653  defp find_source_expr(query, 0) do
654    query.from
655  end
656
657  defp find_source_expr(query, ix) do
658    Enum.find(query.joins, & &1.ix == ix)
659  end
660
661  @doc """
662  Used for customizing the query returning result.
663  """
664  def returning(%{select: select} = query, _fields) when select != nil do
665    query
666  end
667  def returning(%{select: nil}, []) do
668    raise ArgumentError, ":returning expects at least one field to be given, got an empty list"
669  end
670  def returning(%{select: nil} = query, fields) when is_list(fields) do
671    %{query | select: %SelectExpr{expr: {:&, [], [0]}, take: %{0 => {:any, fields}},
672                                  line: __ENV__.line, file: __ENV__.file}}
673  end
674  def returning(%{select: nil} = query, true) do
675    %{query | select: %SelectExpr{expr: {:&, [], [0]}, line: __ENV__.line, file: __ENV__.file}}
676  end
677  def returning(%{select: nil} = query, false) do
678    query
679  end
680
681  @doc """
682  Asserts there is no select statement in the given query.
683  """
684  def assert_no_select!(%{select: nil} = query, _operation) do
685    query
686  end
687  def assert_no_select!(%{select: _} = query, operation) do
688    raise Ecto.QueryError,
689      query: query,
690      message: "`select` clause is not supported in `#{operation}`, " <>
691               "please pass the :returning option instead"
692  end
693
694  @doc """
695  Normalizes the query.
696
697  After the query was prepared and there is no cache
698  entry, we need to update its interpolations and check
699  its fields and associations exist and are valid.
700  """
701  def normalize(query, operation, adapter, counter) do
702    query
703    |> normalize_query(operation, adapter, counter)
704    |> elem(0)
705    |> normalize_select()
706  rescue
707    e ->
708      # Reraise errors so we ignore the planner inner stacktrace
709      reraise e
710  end
711
712  defp normalize_query(query, operation, adapter, counter) do
713    case operation do
714      :all ->
715        assert_no_update!(query, operation)
716      :update_all ->
717        assert_update!(query, operation)
718        assert_only_filter_expressions!(query, operation)
719      :delete_all ->
720        assert_no_update!(query, operation)
721        assert_only_filter_expressions!(query, operation)
722    end
723
724    traverse_exprs(query, operation, counter,
725                   &validate_and_increment(&1, &2, &3, &4, operation, adapter))
726  end
727
728  defp validate_and_increment(:from, query, %Ecto.SubQuery{}, _counter, kind, _adapter) when kind != :all do
729    error! query, "`#{kind}` does not allow subqueries in `from`"
730  end
731  defp validate_and_increment(:from, query, expr, counter, _kind, adapter) do
732    prewalk_source(expr, :from, query, expr, counter, adapter)
733  end
734
735  defp validate_and_increment(kind, query, expr, counter, _operation, adapter)
736      when kind in ~w(select distinct limit offset)a do
737    if expr do
738      prewalk(kind, query, expr, counter, adapter)
739    else
740      {nil, counter}
741    end
742  end
743
744  defp validate_and_increment(kind, query, exprs, counter, _operation, adapter)
745       when kind in ~w(where group_by having order_by update)a do
746    {exprs, counter} =
747      Enum.reduce(exprs, {[], counter}, fn
748        %{expr: []}, {list, acc} ->
749          {list, acc}
750        expr, {list, acc} ->
751          {expr, acc} = prewalk(kind, query, expr, acc, adapter)
752          {[expr|list], acc}
753      end)
754    {Enum.reverse(exprs), counter}
755  end
756
757  defp validate_and_increment(:join, query, exprs, counter, _operation, adapter) do
758    Enum.map_reduce exprs, counter, fn join, acc ->
759      {source, acc} = prewalk_source(join.source, :join, query, join, acc, adapter)
760      {on, acc} = prewalk(:join, query, join.on, acc, adapter)
761      {%{join | on: on, source: source, params: nil}, acc}
762    end
763  end
764
765  defp prewalk_source({:fragment, meta, fragments}, kind, query, expr, acc, adapter) do
766    {fragments, acc} = prewalk(fragments, kind, query, expr, acc, adapter)
767    {{:fragment, meta, fragments}, acc}
768  end
769  defp prewalk_source(%Ecto.SubQuery{query: inner_query} = subquery, _kind, query, _expr, counter, adapter) do
770    try do
771      {inner_query, counter} = normalize_query(inner_query, :all, adapter, counter)
772      {inner_query, _} = normalize_select(inner_query)
773      keys = subquery |> subquery_types() |> Keyword.keys()
774      inner_query = update_in(inner_query.select.fields, &Enum.zip(keys, &1))
775      {%{subquery | query: inner_query}, counter}
776    rescue
777      e -> raise Ecto.SubQueryError, query: query, exception: e
778    end
779  end
780  defp prewalk_source(source, _kind, _query, _expr, acc, _adapter) do
781    {source, acc}
782  end
783
784  defp prewalk(:update, query, expr, counter, adapter) do
785    source = get_source!(:update, query, 0)
786
787    {inner, acc} =
788      Enum.map_reduce expr.expr, counter, fn {op, kw}, counter ->
789        {kw, acc} =
790          Enum.map_reduce kw, counter, fn {field, value}, counter ->
791            {value, acc} = prewalk(value, :update, query, expr, counter, adapter)
792            {{field_source(source, field), value}, acc}
793          end
794        {{op, kw}, acc}
795      end
796
797    {%{expr | expr: inner, params: nil}, acc}
798  end
799  defp prewalk(kind, query, expr, counter, adapter) do
800    {inner, acc} = prewalk(expr.expr, kind, query, expr, counter, adapter)
801    {%{expr | expr: inner, params: nil}, acc}
802  end
803
804  defp prewalk({:in, in_meta, [left, {:^, meta, [param]}]}, kind, query, expr, acc, adapter) do
805    {left, acc} = prewalk(left, kind, query, expr, acc, adapter)
806    {right, acc} = validate_in(meta, expr, param, acc, adapter)
807    {{:in, in_meta, [left, right]}, acc}
808  end
809
810  defp prewalk({{:., dot_meta, [{:&, amp_meta, [ix]}, field]}, meta, []},
811               kind, query, _expr, acc, _adapter) do
812    field = field_source(get_source!(kind, query, ix), field)
813    {{{:., dot_meta, [{:&, amp_meta, [ix]}, field]}, meta, []}, acc}
814  end
815
816  defp prewalk({:^, meta, [ix]}, _kind, _query, _expr, acc, _adapter) when is_integer(ix) do
817    {{:^, meta, [acc]}, acc + 1}
818  end
819
820  defp prewalk({:type, _, [arg, type]}, kind, query, expr, acc, adapter) do
821    {arg, acc} = prewalk(arg, kind, query, expr, acc, adapter)
822    type = field_type!(kind, query, expr, type)
823    {%Ecto.Query.Tagged{value: arg, tag: type, type: Ecto.Type.type(type)}, acc}
824  end
825
826  defp prewalk(%Ecto.Query.Tagged{value: v, type: type} = tagged, kind, query, expr, acc, adapter) do
827    if Ecto.Type.base?(type) do
828      {tagged, acc}
829    else
830      {dump_param(kind, query, expr, v, type, adapter), acc}
831    end
832  end
833
834  defp prewalk({left, right}, kind, query, expr, acc, adapter) do
835    {left, acc} = prewalk(left, kind, query, expr, acc, adapter)
836    {right, acc} = prewalk(right, kind, query, expr, acc, adapter)
837    {{left, right}, acc}
838  end
839
840  defp prewalk({left, meta, args}, kind, query, expr, acc, adapter) do
841    {left, acc} = prewalk(left, kind, query, expr, acc, adapter)
842    {args, acc} = prewalk(args, kind, query, expr, acc, adapter)
843    {{left, meta, args}, acc}
844  end
845
846  defp prewalk(list, kind, query, expr, acc, adapter) when is_list(list) do
847    Enum.map_reduce(list, acc, &prewalk(&1, kind, query, expr, &2, adapter))
848  end
849
850  defp prewalk(other, _kind, _query, _expr, acc, _adapter) do
851    {other, acc}
852  end
853
854  defp dump_param(kind, query, expr, v, type, adapter) do
855    type = field_type!(kind, query, expr, type)
856
857    case dump_param(kind, type, v, adapter) do
858      {:ok, v} ->
859        v
860      {:error, error} ->
861        error = error <> ". Or the value is incompatible or it must be " <>
862                         "interpolated (using ^) so it may be cast accordingly"
863        error! query, expr, error
864    end
865  end
866
867  defp dump_param(kind, type, v, adapter) do
868    with {:ok, type} <- normalize_param(kind, type, v),
869         do: dump_param(adapter, type, v)
870  end
871
872  defp validate_in(meta, expr, param, acc, adapter) do
873    {v, t} = Enum.fetch!(expr.params, param)
874    length = length(v)
875
876    case adapter.dumpers(t, t) do
877      [{:in, _} | _] -> {{:^, meta, [acc, length]}, acc + length}
878      _ -> {{:^, meta, [acc, length]}, acc + 1}
879    end
880  end
881
882  defp normalize_select(%{select: nil} = query) do
883    {query, nil}
884  end
885  defp normalize_select(query) do
886    %{assocs: assocs, preloads: preloads, select: select} = query
887    %{take: take, expr: expr} = select
888    {tag, from_take} = Map.get(take, 0, {:any, []})
889    source = get_source!(:select, query, 0)
890
891    # In from, if there is a schema and we have a map tag with preloads,
892    # it needs to be converted to a map in a later pass.
893    {take, from_tag} =
894      case tag do
895        :map when is_tuple(source) and elem(source, 1) != nil and preloads != [] ->
896          {Map.put(take, 0, {:struct, from_take}), :map}
897        _ ->
898          {take, :any}
899      end
900
901    {postprocess, fields, from} =
902      collect_fields(expr, [], :error, query, take)
903
904    {fields, preprocess, postprocess} =
905      case from do
906        {:ok, from_pre, from_taken} ->
907          {assoc_exprs, assoc_fields} = collect_assocs([], [], query, tag, from_take, assocs)
908          fields = from_taken ++ Enum.reverse(assoc_fields, Enum.reverse(fields))
909          preprocess = [from_pre | Enum.reverse(assoc_exprs)]
910          {fields, preprocess, {:from, from_tag, postprocess}}
911        :error when preloads != [] or assocs != [] ->
912          error! query, "the binding used in `from` must be selected in `select` when using `preload`"
913        :error ->
914          {Enum.reverse(fields), [], postprocess}
915      end
916
917    select = %{preprocess: preprocess, postprocess: postprocess, take: from_take, assocs: assocs}
918    {put_in(query.select.fields, fields), select}
919  end
920
921  # Handling of source
922
923  defp collect_fields({:merge, _, [{:&, _, [0]}, right]}, fields, :error, query, take) do
924    {expr, taken} = source_take!(:select, query, take, 0, 0)
925    {right, right_fields, _from} = collect_fields(right, [], {:source, :from}, query, take)
926    {{:source, :from}, fields, {:ok, {:merge, expr, right}, taken ++ Enum.reverse(right_fields)}}
927  end
928
929  defp collect_fields({:&, _, [0]}, fields, :error, query, take) do
930    {expr, taken} = source_take!(:select, query, take, 0, 0)
931    {{:source, :from}, fields, {:ok, expr, taken}}
932  end
933
934  defp collect_fields({:&, _, [0]}, fields, from, _query, _take) do
935    {{:source, :from}, fields, from}
936  end
937
938  defp collect_fields({:&, _, [ix]}, fields, from, query, take) do
939    {expr, taken} = source_take!(:select, query, take, ix, ix)
940    {expr, Enum.reverse(taken, fields), from}
941  end
942
943  # Expression handling
944
945  defp collect_fields({agg, _, [{{:., _, [{:&, _, [ix]}, field]}, _, []} | _]} = expr,
946                      fields, from, %{select: select} = query, _take)
947       when agg in ~w(count avg min max sum)a do
948    type =
949      # TODO: Support the :number type
950      case agg do
951        :count -> :integer
952        :avg -> :any
953        :sum -> :any
954        _ -> source_type!(:select, query, select, ix, field)
955      end
956    {{:value, type}, [expr | fields], from}
957  end
958
959  defp collect_fields({{:., _, [{:&, _, [ix]}, field]}, _, []} = expr,
960                      fields, from, %{select: select} = query, _take) do
961    type = source_type!(:select, query, select, ix, field)
962    {{:value, type}, [expr | fields], from}
963  end
964
965  defp collect_fields({left, right}, fields, from, query, take) do
966    {args, fields, from} = collect_args([left, right], fields, from, query, take, [])
967    {{:tuple, args}, fields, from}
968  end
969
970  defp collect_fields({:{}, _, args}, fields, from, query, take) do
971    {args, fields, from} = collect_args(args, fields, from, query, take, [])
972    {{:tuple, args}, fields, from}
973  end
974
975  defp collect_fields({:%{}, _, [{:|, _, [data, args]}]}, fields, from, query, take) do
976    {data, fields, from} = collect_fields(data, fields, from, query, take)
977    {args, fields, from} = collect_kv(args, fields, from, query, take, [])
978    {{:map, data, args}, fields, from}
979  end
980
981  defp collect_fields({:%{}, _, args}, fields, from, query, take) do
982    {args, fields, from} = collect_kv(args, fields, from, query, take, [])
983    {{:map, args}, fields, from}
984  end
985
986  defp collect_fields({:%, _, [name, {:%{}, _, [{:|, _, [data, args]}]}]}, fields, from, query, take) do
987    {data, fields, from} = collect_fields(data, fields, from, query, take)
988    {args, fields, from} = collect_kv(args, fields, from, query, take, [])
989    struct!(name, args)
990    {{:struct, name, data, args}, fields, from}
991  end
992
993  defp collect_fields({:%, _, [name, {:%{}, _, args}]}, fields, from, query, take) do
994    {args, fields, from} = collect_kv(args, fields, from, query, take, [])
995    struct!(name, args)
996    {{:struct, name, args}, fields, from}
997  end
998
999  defp collect_fields({:merge, _, args}, fields, from, query, take) do
1000    {[left, right], fields, from} = collect_args(args, fields, from, query, take, [])
1001    {{:merge, left, right}, fields, from}
1002  end
1003
1004  defp collect_fields(args, fields, from, query, take) when is_list(args) do
1005    {args, fields, from} = collect_args(args, fields, from, query, take, [])
1006    {{:list, args}, fields, from}
1007  end
1008
1009  defp collect_fields(expr, fields, from, _query, _take)
1010       when is_atom(expr) or is_binary(expr) or is_number(expr) do
1011    {expr, fields, from}
1012  end
1013
1014  defp collect_fields(%Ecto.Query.Tagged{tag: tag} = expr, fields, from, _query, _take) do
1015    {{:value, tag}, [expr | fields], from}
1016  end
1017
1018  defp collect_fields(expr, fields, from, _query, _take) do
1019    {{:value, :any}, [expr | fields], from}
1020  end
1021
1022  defp collect_kv([{key, value} | elems], fields, from, query, take, acc) do
1023    {key, fields, from} = collect_fields(key, fields, from, query, take)
1024    {value, fields, from} = collect_fields(value, fields, from, query, take)
1025    collect_kv(elems, fields, from, query, take, [{key, value} | acc])
1026  end
1027  defp collect_kv([], fields, from, _query, _take, acc) do
1028    {Enum.reverse(acc), fields, from}
1029  end
1030
1031  defp collect_args([elem | elems], fields, from, query, take, acc) do
1032    {elem, fields, from} = collect_fields(elem, fields, from, query, take)
1033    collect_args(elems, fields, from, query, take, [elem | acc])
1034  end
1035  defp collect_args([], fields, from, _query, _take, acc) do
1036    {Enum.reverse(acc), fields, from}
1037  end
1038
1039  defp collect_assocs(exprs, fields, query, tag, take, [{assoc, {ix, children}}|tail]) do
1040    case get_source!(:preload, query, ix) do
1041      {_, schema} = source when schema != nil ->
1042        {fetch, take_children} = fetch_assoc(tag, take, assoc)
1043        {expr, taken} = take!(source, query, fetch, assoc, ix)
1044        exprs = [expr | exprs]
1045        fields = Enum.reverse(taken, fields)
1046        {exprs, fields} = collect_assocs(exprs, fields, query, tag, take_children, children)
1047        {exprs, fields} = collect_assocs(exprs, fields, query, tag, take, tail)
1048        {exprs, fields}
1049      _ ->
1050        error! query, "can only preload sources with a schema " <>
1051                      "(fragments, binary and subqueries are not supported)"
1052    end
1053  end
1054  defp collect_assocs(exprs, fields, _query, _tag, _take, []) do
1055    {exprs, fields}
1056  end
1057
1058  defp fetch_assoc(tag, take, assoc) do
1059    case Access.fetch(take, assoc) do
1060      {:ok, value} -> {{:ok, {tag, value}}, value}
1061      :error -> {:error, []}
1062    end
1063  end
1064
1065  defp source_take!(kind, query, take, field, ix) do
1066    source = get_source!(kind, query, ix)
1067    take!(source, query, Access.fetch(take, field), field, ix)
1068  end
1069
1070  defp take!(source, query, fetched, field, ix) do
1071    case {fetched, source} do
1072      {{:ok, {_, []}}, {_, _}} ->
1073        error! query, "at least one field must be selected for binding `#{field}`, got an empty list"
1074
1075      {{:ok, {:struct, _}}, {_, nil}} ->
1076        error! query, "struct/2 in select expects a source with a schema"
1077
1078      {{:ok, {kind, fields}}, {source, schema}} ->
1079        dumper = if schema, do: schema.__schema__(:dump), else: %{}
1080        schema = if kind == :map, do: nil, else: schema
1081        {types, fields} = select_dump(List.wrap(fields), dumper, ix)
1082        {{:source, {source, schema}, types}, fields}
1083
1084      {{:ok, {_, _}}, {:fragment, _, _}} ->
1085        error! query, "it is not possible to return a map/struct subset of a fragment, " <>
1086                      "you must explicitly return the desired individual fields"
1087
1088      {{:ok, {_, _}}, %Ecto.SubQuery{}} ->
1089        error! query, "it is not possible to return a map/struct subset of a subquery, " <>
1090                      "you must explicitly select the whole subquery or individual fields only"
1091
1092      {:error, {_, nil}} ->
1093        {{:value, :map}, [{:&, [], [ix]}]}
1094
1095      {:error, {_, schema}} ->
1096        {types, fields} = select_dump(schema.__schema__(:fields), schema.__schema__(:dump), ix)
1097        {{:source, source, types}, fields}
1098
1099      {:error, {:fragment, _, _}} ->
1100        {{:value, :map}, [{:&, [], [ix]}]}
1101
1102      {:error, %Ecto.SubQuery{select: select} = subquery} ->
1103        fields = for {field, _} <- subquery_types(subquery), do: select_field(field, ix)
1104        {select, fields}
1105    end
1106  end
1107
1108  defp select_dump(fields, dumper, ix) do
1109    fields
1110    |> Enum.reverse
1111    |> Enum.reduce({[], []}, fn
1112      field, {types, exprs} when is_atom(field) ->
1113        {source, type} = Map.get(dumper, field, {field, :any})
1114        {[{field, type} | types], [select_field(source, ix) | exprs]}
1115      _field, acc ->
1116        acc
1117    end)
1118  end
1119
1120  defp select_field(field, ix) do
1121    {{:., [], [{:&, [], [ix]}, field]}, [], []}
1122  end
1123
1124  defp get_source!(where, %{sources: sources} = query, ix) do
1125    elem(sources, ix)
1126  rescue
1127    ArgumentError ->
1128      error! query, "cannot prepare query because it has specified more bindings than " <>
1129                    "bindings available in `#{where}` (look for `unknown_binding!` in " <>
1130                    "the printed query below)"
1131  end
1132
1133  ## Helpers
1134
1135  @exprs [distinct: :distinct, select: :select, from: :from, join: :joins,
1136          where: :wheres, group_by: :group_bys, having: :havings,
1137          order_by: :order_bys, limit: :limit, offset: :offset]
1138
1139  # Traverse all query components with expressions.
1140  # Therefore from, preload, assocs and lock are not traversed.
1141  defp traverse_exprs(query, operation, acc, fun) do
1142    extra =
1143      case operation do
1144        :update_all -> [update: :updates]
1145        _ -> []
1146      end
1147
1148    Enum.reduce extra ++ @exprs, {query, acc}, fn {kind, key}, {query, acc} ->
1149      {traversed, acc} = fun.(kind, query, Map.fetch!(query, key), acc)
1150      {Map.put(query, key, traversed), acc}
1151    end
1152  end
1153
1154  defp field_type!(kind, query, expr, {composite, {ix, field}}) when is_integer(ix) do
1155    {composite, type!(kind, :type, query, expr, ix, field)}
1156  end
1157  defp field_type!(kind, query, expr, {ix, field}) when is_integer(ix) do
1158    type!(kind, :type, query, expr, ix, field)
1159  end
1160  defp field_type!(_kind, _query, _expr, type) do
1161    type
1162  end
1163
1164  defp source_type!(kind, query, expr, ix, field) do
1165    type!(kind, :source_type, query, expr, ix, field)
1166  end
1167
1168  defp type!(_kind, _lookup, _query, _expr, nil, _field), do: :any
1169  defp type!(kind, lookup, query, expr, ix, field) when is_integer(ix) do
1170    case get_source!(kind, query, ix) do
1171      {_, schema} ->
1172        type!(kind, lookup, query, expr, schema, field)
1173      {:fragment, _, _} ->
1174        :any
1175      %Ecto.SubQuery{} = subquery ->
1176        case Keyword.fetch(subquery_types(subquery), field) do
1177          {:ok, {:value, type}} ->
1178            type
1179          {:ok, _} ->
1180            :any
1181          :error ->
1182            error!(query, expr, "field `#{field}` does not exist in subquery")
1183        end
1184    end
1185  end
1186  defp type!(kind, lookup, query, expr, schema, field) when is_atom(schema) do
1187    cond do
1188      type = schema.__schema__(lookup, field) ->
1189        type
1190      Map.has_key?(schema.__struct__, field) ->
1191        error! query, expr, "field `#{field}` in `#{kind}` is a virtual field in schema #{inspect schema}"
1192      true ->
1193        error! query, expr, "field `#{field}` in `#{kind}` does not exist in schema #{inspect schema}"
1194    end
1195  end
1196
1197  defp normalize_param(_kind, {:out, {:array, type}}, _value) do
1198    {:ok, type}
1199  end
1200  defp normalize_param(_kind, {:out, :any}, _value) do
1201    {:ok, :any}
1202  end
1203  defp normalize_param(kind, {:out, other}, value) do
1204    {:error, "value `#{inspect value}` in `#{kind}` expected to be part of an array " <>
1205             "but matched type is #{inspect other}"}
1206  end
1207  defp normalize_param(_kind, type, _value) do
1208    {:ok, type}
1209  end
1210
1211  defp cast_param(kind, type, v) do
1212    case Ecto.Type.cast(type, v) do
1213      {:ok, v} ->
1214        {:ok, v}
1215      :error ->
1216        {:error, "value `#{inspect v}` in `#{kind}` cannot be cast to type #{inspect type}"}
1217    end
1218  end
1219
1220  defp dump_param(adapter, type, v) do
1221    case Ecto.Type.adapter_dump(adapter, type, v) do
1222      {:ok, v} ->
1223        {:ok, v}
1224      :error when type == :any ->
1225        {:error, "value `#{inspect v}` cannot be dumped with Ecto.DataType"}
1226      :error ->
1227        {:error, "value `#{inspect v}` cannot be dumped to type #{inspect type}"}
1228    end
1229  end
1230
1231  defp field_source({_, schema}, field) when schema != nil do
1232    # If the field is not found we return the field itself
1233    # which will be checked and raise later.
1234    schema.__schema__(:field_source, field) || field
1235  end
1236  defp field_source(_, field) do
1237    field
1238  end
1239
1240  defp assert_update!(%Ecto.Query{updates: updates} = query, operation) do
1241    changes =
1242      Enum.reduce(updates, %{}, fn update, acc ->
1243        Enum.reduce(update.expr, acc, fn {_op, kw}, acc ->
1244          Enum.reduce(kw, acc, fn {k, v}, acc ->
1245            Map.update(acc, k, v, fn _ ->
1246              error! query, "duplicate field `#{k}` for `#{operation}`"
1247            end)
1248          end)
1249        end)
1250      end)
1251
1252    if changes == %{} do
1253      error! query, "`#{operation}` requires at least one field to be updated"
1254    end
1255  end
1256
1257  defp assert_no_update!(query, operation) do
1258    case query do
1259      %Ecto.Query{updates: []} -> query
1260      _ ->
1261        error! query, "`#{operation}` does not allow `update` expressions"
1262    end
1263  end
1264
1265  defp assert_only_filter_expressions!(query, operation) do
1266    case query do
1267      %Ecto.Query{order_bys: [], limit: nil, offset: nil, group_bys: [],
1268                  havings: [], preloads: [], assocs: [], distinct: nil, lock: nil} ->
1269        query
1270      _ ->
1271        error! query, "`#{operation}` allows only `where` and `join` expressions. " <>
1272                      "You can exclude unwanted expressions from a query by using " <>
1273                      "Ecto.Query.exclude/2. Error found"
1274    end
1275  end
1276
1277  defp reraise(exception) do
1278    reraise exception, Enum.reject(System.stacktrace, &match?({__MODULE__, _, _, _}, &1))
1279  end
1280
1281  defp error!(query, message) do
1282    raise Ecto.QueryError, message: message, query: query
1283  end
1284
1285  defp error!(query, expr, message) do
1286    raise Ecto.QueryError, message: message, query: query, file: expr.file, line: expr.line
1287  end
1288end
1289