1defmodule Ecto.Repo.Schema do
2  # The module invoked by user defined repos
3  # for schema related functionality.
4  @moduledoc false
5
6  alias Ecto.Changeset
7  alias Ecto.Changeset.Relation
8  require Ecto.Query
9
10  @doc """
11  Implementation for `Ecto.Repo.insert_all/3`.
12  """
13  def insert_all(repo, adapter, schema, rows, opts) when is_atom(schema) do
14    do_insert_all(repo, adapter, schema, schema.__schema__(:prefix),
15                  schema.__schema__(:source), rows, opts)
16  end
17
18  def insert_all(repo, adapter, table, rows, opts) when is_binary(table) do
19    do_insert_all(repo, adapter, nil, nil, table, rows, opts)
20  end
21
22  def insert_all(repo, adapter, {source, schema}, rows, opts) when is_atom(schema) do
23    do_insert_all(repo, adapter, schema, schema.__schema__(:prefix), source, rows, opts)
24  end
25
26  defp do_insert_all(_repo, _adapter, _schema, _prefix, _source, [], opts) do
27    if opts[:returning] do
28      {0, []}
29    else
30      {0, nil}
31    end
32  end
33
34  defp do_insert_all(repo, adapter, schema, prefix, source, rows, opts) when is_list(rows) do
35    autogen_id = schema && schema.__schema__(:autogenerate_id)
36    dumper = schema && schema.__schema__(:dump)
37    {return_fields_or_types, return_sources} =
38      schema
39      |> returning(opts)
40      |> fields_to_sources(dumper)
41
42    {rows, header} = extract_header_and_fields(rows, schema, dumper, autogen_id, adapter)
43    counter = fn -> Enum.reduce(rows, 0, &length(&1) + &2) end
44    metadata = metadata(schema, prefix, source, autogen_id, nil, opts)
45
46    {on_conflict, opts} = Keyword.pop(opts, :on_conflict, :raise)
47    {conflict_target, opts} = Keyword.pop(opts, :conflict_target, [])
48    conflict_target = conflict_target(conflict_target, dumper)
49    on_conflict = on_conflict(on_conflict, conflict_target, metadata, counter, adapter)
50
51    {count, rows} =
52      adapter.insert_all(repo, metadata, Map.keys(header), rows, on_conflict, return_sources, opts)
53    {count, postprocess(rows, return_fields_or_types, adapter, schema, metadata)}
54  end
55
56  defp postprocess(nil, [], _adapter, _schema, _metadata) do
57    nil
58  end
59  defp postprocess(rows, fields, _adapter, nil, _metadata) do
60    for row <- rows, do: Map.new(Enum.zip(fields, row))
61  end
62  defp postprocess(rows, types, adapter, schema, %{source: {prefix, source}}) do
63    struct = schema.__struct__()
64    for row <- rows do
65      Ecto.Schema.__safe_load__(struct, types, row, prefix, source,
66                                &Ecto.Type.adapter_load(adapter, &1, &2))
67    end
68  end
69
70  defp extract_header_and_fields(rows, schema, dumper, autogen_id, adapter) do
71    header = init_header(autogen_id)
72    mapper = init_mapper(schema, dumper, adapter)
73
74    Enum.map_reduce(rows, header, fn fields, header ->
75      {fields, header} = Enum.map_reduce(fields, header, mapper)
76      {autogenerate_id(autogen_id, fields, adapter), header}
77    end)
78  end
79
80  defp init_header(nil), do: %{}
81  defp init_header({_, source, _}), do: %{source => true}
82
83  defp init_mapper(nil, _dumper, _adapter) do
84    fn {field, value}, acc ->
85      case Ecto.DataType.dump(value) do
86        {:ok, value} ->
87          {{field, value}, Map.put(acc, field, true)}
88        :error ->
89          raise Ecto.ChangeError,
90            message: "value `#{inspect value}` cannot be dumped with Ecto.DataType"
91      end
92    end
93  end
94  defp init_mapper(schema, dumper, adapter) do
95    fn {field, value}, acc ->
96      case dumper do
97        %{^field => {source, type}} ->
98          value = dump_field!(:insert_all, schema, field, type, value, adapter)
99          {{source, value}, Map.put(acc, source, true)}
100        %{} ->
101          raise ArgumentError, "unknown field `#{field}` in schema #{inspect schema} given to " <>
102                               "insert_all. Note virtual fields and associations are not supported"
103      end
104    end
105  end
106
107  defp autogenerate_id(nil, fields, _adapter), do: fields
108  defp autogenerate_id({key, source, type}, fields, adapter) do
109    case :lists.keyfind(key, 1, fields) do
110      {^key, _} ->
111        fields
112      false ->
113        if value = adapter.autogenerate(type) do
114          [{source, value} | fields]
115        else
116          fields
117        end
118    end
119  end
120
121  @doc """
122  Implementation for `Ecto.Repo.insert!/2`.
123  """
124  def insert!(repo, adapter, struct_or_changeset, opts) do
125    case insert(repo, adapter, struct_or_changeset, opts) do
126      {:ok, struct} -> struct
127      {:error, changeset} ->
128        raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset
129    end
130  end
131
132  @doc """
133  Implementation for `Ecto.Repo.update!/2`.
134  """
135  def update!(repo, adapter, struct_or_changeset, opts) do
136    case update(repo, adapter, struct_or_changeset, opts) do
137      {:ok, struct} -> struct
138      {:error, changeset} ->
139        raise Ecto.InvalidChangesetError, action: :update, changeset: changeset
140    end
141  end
142
143  @doc """
144  Implementation for `Ecto.Repo.delete!/2`.
145  """
146  def delete!(repo, adapter, struct_or_changeset, opts) do
147    case delete(repo, adapter, struct_or_changeset, opts) do
148      {:ok, struct} -> struct
149      {:error, changeset} ->
150        raise Ecto.InvalidChangesetError, action: :delete, changeset: changeset
151    end
152  end
153
154  @doc """
155  Implementation for `Ecto.Repo.insert/2`.
156  """
157  def insert(repo, adapter, %Changeset{} = changeset, opts) when is_list(opts) do
158    do_insert(repo, adapter, changeset, opts)
159  end
160
161  def insert(repo, adapter, %{__struct__: _} = struct, opts) when is_list(opts) do
162    changeset = Ecto.Changeset.change(struct)
163    do_insert(repo, adapter, changeset, opts)
164  end
165
166  defp do_insert(repo, adapter, %Changeset{valid?: true} = changeset, opts) do
167    %{prepare: prepare, repo_opts: repo_opts} = changeset
168    opts = Keyword.merge(repo_opts, opts)
169
170    struct = struct_from_changeset!(:insert, changeset)
171    schema = struct.__struct__
172    dumper = schema.__schema__(:dump)
173    fields = schema.__schema__(:fields)
174    assocs = schema.__schema__(:associations)
175
176    {return_types, return_sources} =
177      schema
178      |> returning(opts)
179      |> add_read_after_writes(schema)
180      |> fields_to_sources(dumper)
181
182    {on_conflict, opts} = Keyword.pop(opts, :on_conflict, :raise)
183    {conflict_target, opts} = Keyword.pop(opts, :conflict_target, [])
184    conflict_target = conflict_target(conflict_target, dumper)
185
186    # On insert, we always merge the whole struct into the
187    # changeset as changes, except the primary key if it is nil.
188    changeset = put_repo_and_action(changeset, :insert, repo)
189    changeset = surface_changes(changeset, struct, fields ++ assocs)
190
191    wrap_in_transaction(repo, adapter, opts, assocs, prepare, fn ->
192      opts = Keyword.put(opts, :skip_transaction, true)
193      user_changeset = run_prepare(changeset, prepare)
194
195      {changeset, parents, children} = pop_assocs(user_changeset, assocs)
196      changeset = process_parents(changeset, parents, opts)
197
198      if changeset.valid? do
199        embeds = Ecto.Embedded.prepare(changeset, adapter, :insert)
200
201        autogen_id = schema.__schema__(:autogenerate_id)
202        metadata = metadata(struct, autogen_id, opts)
203
204        changes = Map.merge(changeset.changes, embeds)
205        {changes, extra, return_types, return_sources} =
206          autogenerate_id(autogen_id, changes, return_types, return_sources, adapter)
207        {changes, autogen} =
208          dump_changes!(:insert, Map.take(changes, fields), schema, extra, dumper, adapter)
209        on_conflict =
210          on_conflict(on_conflict, conflict_target, metadata, fn -> length(changes) end, adapter)
211
212        args = [repo, metadata, changes, on_conflict, return_sources, opts]
213        case apply(changeset, adapter, :insert, args) do
214          {:ok, values} ->
215            values = extra ++ values
216            changeset
217            |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, metadata)
218            |> process_children(children, user_changeset, opts)
219          {:error, _} = error ->
220            error
221          {:invalid, constraints} ->
222            {:error, constraints_to_errors(user_changeset, :insert, constraints)}
223        end
224      else
225        {:error, changeset}
226      end
227    end)
228  end
229
230  defp do_insert(repo, _adapter, %Changeset{valid?: false} = changeset, _opts) do
231    {:error, put_repo_and_action(changeset, :insert, repo)}
232  end
233
234  @doc """
235  Implementation for `Ecto.Repo.update/2`.
236  """
237  def update(repo, adapter, %Changeset{} = changeset, opts) when is_list(opts) do
238    do_update(repo, adapter, changeset, opts)
239  end
240
241  def update(repo, _adapter, %{__struct__: _}, opts) when is_list(opts) do
242    raise ArgumentError, "giving a struct to #{inspect repo}.update/2 is not supported. " <>
243                         "Ecto is unable to properly track changes when a struct is given, " <>
244                         "an Ecto.Changeset must be given instead"
245  end
246
247  defp do_update(repo, adapter, %Changeset{valid?: true} = changeset, opts) do
248    %{prepare: prepare, repo_opts: repo_opts} = changeset
249    opts = Keyword.merge(repo_opts, opts)
250
251    struct = struct_from_changeset!(:update, changeset)
252    schema = struct.__struct__
253    dumper = schema.__schema__(:dump)
254    fields = schema.__schema__(:fields)
255    assocs = schema.__schema__(:associations)
256    force? = !!opts[:force]
257    filters = add_pk_filter!(changeset.filters, struct)
258
259    {return_types, return_sources} =
260      schema.__schema__(:read_after_writes)
261      |> fields_to_sources(dumper)
262
263    # Differently from insert, update does not copy the struct
264    # fields into the changeset. All changes must be in the
265    # changeset before hand.
266    changeset = put_repo_and_action(changeset, :update, repo)
267
268    if changeset.changes != %{} or force? do
269      wrap_in_transaction(repo, adapter, opts, assocs, prepare, fn ->
270        opts = Keyword.put(opts, :skip_transaction, true)
271        user_changeset = run_prepare(changeset, prepare)
272
273        {changeset, parents, children} = pop_assocs(user_changeset, assocs)
274        changeset = process_parents(changeset, parents, opts)
275
276        if changeset.valid? do
277          embeds = Ecto.Embedded.prepare(changeset, adapter, :update)
278
279          original = changeset.changes |> Map.merge(embeds) |> Map.take(fields)
280          {changes, autogen} = dump_changes!(:update, original, schema, [], dumper, adapter)
281
282          metadata = metadata(struct, schema.__schema__(:autogenerate_id), opts)
283          filters = dump_fields!(:update, schema, filters, dumper, adapter)
284          args = [repo, metadata, changes, filters, return_sources, opts]
285
286          # If there are no changes or all the changes were autogenerated but not forced, we skip
287          {action, autogen} =
288            if original != %{} or (autogen != [] and force?),
289               do: {:update, autogen},
290               else: {:noop, []}
291
292          case apply(changeset, adapter, action, args) do
293            {:ok, values} ->
294              changeset
295              |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, metadata)
296              |> process_children(children, user_changeset, opts)
297            {:error, _} = error ->
298              error
299            {:invalid, constraints} ->
300              {:error, constraints_to_errors(user_changeset, :update, constraints)}
301          end
302        else
303          {:error, changeset}
304        end
305      end)
306    else
307      {:ok, changeset.data}
308    end
309  end
310
311  defp do_update(repo, _adapter, %Changeset{valid?: false} = changeset, _opts) do
312    {:error, put_repo_and_action(changeset, :update, repo)}
313  end
314
315  @doc """
316  Implementation for `Ecto.Repo.insert_or_update/2`.
317  """
318  def insert_or_update(repo, adapter, changeset, opts) do
319    case get_state(changeset) do
320      :built  -> insert repo, adapter, changeset, opts
321      :loaded -> update repo, adapter, changeset, opts
322      state   -> raise ArgumentError, "the changeset has an invalid state " <>
323                                      "for Repo.insert_or_update/2: #{state}"
324    end
325  end
326
327  @doc """
328  Implementation for `Ecto.Repo.insert_or_update!/2`.
329  """
330  def insert_or_update!(repo, adapter, changeset, opts) do
331    case get_state(changeset) do
332      :built  -> insert! repo, adapter, changeset, opts
333      :loaded -> update! repo, adapter, changeset, opts
334      state   -> raise ArgumentError, "the changeset has an invalid state " <>
335                                      "for Repo.insert_or_update!/2: #{state}"
336    end
337  end
338
339  defp get_state(%Changeset{data: %{__meta__: %{state: state}}}), do: state
340  defp get_state(%{__struct__: _}) do
341    raise ArgumentError, "giving a struct to Repo.insert_or_update/2 or " <>
342                         "Repo.insert_or_update!/2 is not supported. " <>
343                         "Please use an Ecto.Changeset"
344  end
345
346  @doc """
347  Implementation for `Ecto.Repo.delete/2`.
348  """
349  def delete(repo, adapter, %Changeset{} = changeset, opts) when is_list(opts) do
350    do_delete(repo, adapter, changeset, opts)
351  end
352
353  def delete(repo, adapter, %{__struct__: _} = struct, opts) when is_list(opts) do
354    changeset = Ecto.Changeset.change(struct)
355    do_delete(repo, adapter, changeset, opts)
356  end
357
358  defp do_delete(repo, adapter, %Changeset{valid?: true} = changeset, opts) do
359    %{prepare: prepare, repo_opts: repo_opts} = changeset
360    opts = Keyword.merge(repo_opts, opts)
361
362    struct = struct_from_changeset!(:delete, changeset)
363    schema = struct.__struct__
364    assocs = schema.__schema__(:associations)
365    dumper = schema.__schema__(:dump)
366
367    changeset = put_repo_and_action(changeset, :delete, repo)
368    changeset = %{changeset | changes: %{}}
369
370    wrap_in_transaction(repo, adapter, opts, assocs, prepare, fn ->
371      changeset = run_prepare(changeset, prepare)
372
373      filters = add_pk_filter!(changeset.filters, struct)
374      filters = dump_fields!(:delete, schema, filters, dumper, adapter)
375
376      delete_assocs(changeset, repo, schema, assocs, opts)
377      metadata = metadata(struct, schema.__schema__(:autogenerate_id), opts)
378      args = [repo, metadata, filters, opts]
379      case apply(changeset, adapter, :delete, args) do
380        {:ok, values} ->
381          changeset = load_changes(changeset, :deleted, [], values, %{}, [], adapter, metadata)
382          {:ok, changeset.data}
383        {:error, _} = error ->
384          error
385        {:invalid, constraints} ->
386          {:error, constraints_to_errors(changeset, :delete, constraints)}
387      end
388    end)
389  end
390
391  defp do_delete(repo, _adapter, %Changeset{valid?: false} = changeset, _opts) do
392    {:error, put_repo_and_action(changeset, :delete, repo)}
393  end
394
395  def load(adapter, schema_or_types, data) do
396    do_load(schema_or_types, data, &Ecto.Type.adapter_load(adapter, &1, &2))
397  end
398
399  defp do_load(schema, data, loader) when is_list(data),
400    do: do_load(schema, Map.new(data), loader)
401  defp do_load(schema, {fields, values}, loader) when is_list(fields) and is_list(values),
402    do: do_load(schema, Enum.zip(fields, values), loader)
403  defp do_load(schema, data, loader) when is_atom(schema),
404    do: Ecto.Schema.__unsafe_load__(schema, data, loader)
405  defp do_load(types, data, loader) when is_map(types),
406    do: Ecto.Schema.__unsafe_load__(%{}, types, data, loader)
407
408  ## Helpers
409
410  defp returning(schema, opts) do
411    case Keyword.get(opts, :returning, false) do
412      [_ | _] = fields ->
413        fields
414      [] ->
415        raise ArgumentError, ":returning expects at least one field to be given, got an empty list"
416      true when is_nil(schema) ->
417        raise ArgumentError, ":returning option can only be set to true if a schema is given"
418      true ->
419        schema.__schema__(:fields)
420      false ->
421        []
422    end
423  end
424
425  defp add_read_after_writes(return, schema) do
426    Enum.uniq(return ++ schema.__schema__(:read_after_writes))
427  end
428
429  defp fields_to_sources(fields, nil) do
430    {fields, fields}
431  end
432  defp fields_to_sources(fields, dumper) do
433    Enum.reduce(fields, {[], []}, fn field, {types, sources} ->
434      {source, type} = Map.fetch!(dumper, field)
435      {[{field, type} | types], [source | sources]}
436    end)
437  end
438
439  defp struct_from_changeset!(action, %{data: nil}),
440    do: raise(ArgumentError, "cannot #{action} a changeset without :data")
441  defp struct_from_changeset!(_action, %{data: struct}),
442    do: struct
443
444  defp put_repo_and_action(%{action: :ignore, valid?: valid?} = changeset, action, repo) do
445    if valid? do
446      raise ArgumentError, "a valid changeset with action :ignore was given to " <>
447                           "#{inspect repo}.#{action}/2. Changesets can only be ignored " <>
448                           "in a repository action if they are also invalid"
449    else
450      %{changeset | action: action, repo: repo}
451    end
452  end
453  defp put_repo_and_action(%{action: given}, action, repo) when given != nil and given != action,
454    do: raise ArgumentError, "a changeset with action #{inspect given} was given to #{inspect repo}.#{action}/2"
455  defp put_repo_and_action(changeset, action, repo),
456    do: %{changeset | action: action, repo: repo}
457
458  defp run_prepare(changeset, prepare) do
459    Enum.reduce(Enum.reverse(prepare), changeset, fn fun, acc ->
460      case fun.(acc) do
461        %Ecto.Changeset{} = acc -> acc
462        other ->
463          raise "expected function #{inspect fun} given to Ecto.Changeset.prepare_changes/2 " <>
464                "to return an Ecto.Changeset, got: `#{inspect other}`"
465      end
466    end)
467  end
468
469  defp metadata(schema, prefix, source, autogen_id, context, opts) do
470    %{
471      autogenerate_id: case autogen_id do nil -> nil; {_, source, type} -> {source, type} end,
472      context: context,
473      schema: schema,
474      source: {Keyword.get(opts, :prefix, prefix), source}
475    }
476  end
477  defp metadata(%{__struct__: schema, __meta__: %{context: context, source: {prefix, source}}},
478                autogen_id, opts) do
479    metadata(schema, prefix, source, autogen_id, context, opts)
480  end
481
482  defp conflict_target({:constraint, constraint}, _dumper) when is_atom(constraint) do
483    {:constraint, constraint}
484  end
485  defp conflict_target(conflict_target, dumper) do
486    for target <- List.wrap(conflict_target) do
487      case dumper do
488        %{^target => {alias, _}} ->
489          alias
490        %{} when is_atom(target) ->
491          raise ArgumentError, "unknown field `#{target}` in conflict_target"
492        _ ->
493          target
494      end
495    end
496  end
497
498  defp on_conflict(on_conflict, conflict_target,
499                   %{source: {prefix, source}, schema: schema}, changes, adapter) do
500    case on_conflict do
501      :raise when conflict_target == [] ->
502        {:raise, [], []}
503      :raise ->
504        raise ArgumentError, ":conflict_target option is forbidden when :on_conflict is :raise"
505      :nothing ->
506        {:nothing, [], conflict_target}
507      :replace_all ->
508        {:replace_all, [], conflict_target}
509      [_ | _] = on_conflict ->
510        from = if schema, do: {source, schema}, else: source
511        query = Ecto.Query.from from, update: ^on_conflict
512        on_conflict_query(query, {source, schema}, prefix, changes, adapter, conflict_target)
513      %Ecto.Query{} = query ->
514        on_conflict_query(query, {source, schema}, prefix, changes, adapter, conflict_target)
515      other ->
516        raise ArgumentError, "unknown value for :on_conflict, got: #{inspect other}"
517    end
518  end
519
520  defp on_conflict_query(query, from, prefix, changes, adapter, conflict_target) do
521    counter = changes.()
522
523    {query, params, _} =
524      %{query | prefix: prefix}
525      |> Ecto.Query.Planner.assert_no_select!(:update_all)
526      |> Ecto.Query.Planner.returning(false)
527      |> Ecto.Query.Planner.prepare(:update_all, adapter, counter)
528
529    unless query.from == from do
530      raise ArgumentError, "cannot run on_conflict: query because the query " <>
531                           "has a different {source, schema} pair than the " <>
532                           "original struct/changeset/query. Got #{inspect query.from} " <>
533                           "and #{inspect from} respectively"
534    end
535
536    {query, _} = Ecto.Query.Planner.normalize(query, :update_all, adapter, counter)
537    {query, params, conflict_target}
538  end
539
540  defp apply(%{valid?: false} = changeset, _adapter, _action, _args) do
541    {:error, changeset}
542  end
543  defp apply(_changeset, _adapter, :noop, _args) do
544    {:ok, []}
545  end
546  defp apply(changeset, adapter, action, args) do
547    case apply(adapter, action, args) do
548      {:ok, values} ->
549        {:ok, values}
550      {:invalid, _} = constraints ->
551        constraints
552      {:error, :stale} ->
553        raise Ecto.StaleEntryError, struct: changeset.data, action: action
554    end
555  end
556
557  defp constraints_to_errors(%{constraints: user_constraints, errors: errors} = changeset, action, constraints) do
558    constraint_errors =
559      Enum.map constraints, fn {type, constraint} ->
560        user_constraint =
561          Enum.find(user_constraints, fn c ->
562            case {c.type, c.constraint,  c.match} do
563              {^type, ^constraint, :exact} -> true
564              {^type, cc, :suffix} -> String.ends_with?(constraint, cc)
565              {^type, cc, :prefix} -> String.starts_with?(constraint, cc)
566              _ -> false
567            end
568          end)
569
570        case user_constraint do
571          %{field: field, error: error} ->
572            {field, error}
573          nil ->
574            raise Ecto.ConstraintError, action: action, type: type,
575                                        constraint: constraint, changeset: changeset
576        end
577      end
578
579    %{changeset | errors: constraint_errors ++ errors, valid?: false}
580  end
581
582  defp surface_changes(%{changes: changes, types: types} = changeset, struct, fields) do
583    {changes, errors} =
584      Enum.reduce fields, {changes, []}, fn field, {changes, errors} ->
585        case {struct, changes, types} do
586          # User has explicitly changed it
587          {_, %{^field => _}, _} ->
588            {changes, errors}
589
590          # Handle associations specially
591          {_, _, %{^field => {tag, embed_or_assoc}}} when tag in [:assoc, :embed] ->
592            # This is partly reimplementing the logic behind put_relation
593            # in Ecto.Changeset but we need to do it in a way where we have
594            # control over the current value.
595            value = Relation.load!(struct, Map.get(struct, field))
596            empty = Relation.empty(embed_or_assoc)
597            case Relation.change(embed_or_assoc, value, empty) do
598              {:ok, change, _} when change != empty ->
599                {Map.put(changes, field, change), errors}
600              :error ->
601                {changes, [{field, "is invalid"}]}
602              _ -> # :ignore or ok with change == empty
603                {changes, errors}
604            end
605
606          # Struct has a non nil value
607          {%{^field => value}, _, %{^field => _}} when value != nil ->
608            {Map.put(changes, field, value), errors}
609
610          {_, _, _} ->
611            {changes, errors}
612        end
613      end
614
615    case errors do
616      [] -> %{changeset | changes: changes}
617      _  -> %{changeset | errors: errors ++ changeset.errors, valid?: false, changes: changes}
618    end
619  end
620
621  defp load_changes(changeset, state, types, values, embeds, autogen, adapter, metadata) do
622    %{changes: changes} = changeset
623    %{source: source} = metadata
624
625    # It is ok to use types from changeset because we have
626    # already filtered the results to be only about fields.
627    data =
628      changeset.data
629      |> Map.merge(changes)
630      |> Map.merge(embeds)
631      |> merge_autogen(autogen)
632      |> apply_metadata(state, source)
633      |> load_each(values, types, adapter)
634    Map.put(changeset, :data, data)
635  end
636
637  defp merge_autogen(data, autogen) do
638    Enum.reduce(autogen, data, fn {k, v}, acc -> %{acc | k => v} end)
639  end
640
641  defp apply_metadata(%{__meta__: meta} = data, state, source) do
642    %{data | __meta__: %{meta | state: state, source: source}}
643  end
644
645  defp load_each(struct, [{_, value} | kv], [{key, type} | types], adapter) do
646    case Ecto.Type.adapter_load(adapter, type, value) do
647      {:ok, value} ->
648        load_each(%{struct | key => value}, kv, types, adapter)
649      :error ->
650        raise ArgumentError, "cannot load `#{inspect value}` as type #{inspect type} " <>
651                             "for field `#{key}` in schema #{inspect struct.__struct__}"
652    end
653  end
654  defp load_each(struct, [], _types, _adapter) do
655    struct
656  end
657
658  defp pop_assocs(changeset, []) do
659    {changeset, [], []}
660  end
661  defp pop_assocs(%{changes: changes, types: types} = changeset, assocs) do
662    {changes, parent, child} =
663      Enum.reduce assocs, {changes, [], []}, fn assoc, {changes, parent, child} ->
664        case Map.fetch(changes, assoc) do
665          {:ok, value} ->
666            changes = Map.delete(changes, assoc)
667
668            case Map.fetch!(types, assoc) do
669              {:assoc, %{relationship: :parent} = refl} ->
670                {changes, [{refl, value}|parent], child}
671              {:assoc, %{relationship: :child} = refl} ->
672                {changes, parent, [{refl, value}|child]}
673            end
674          :error ->
675            {changes, parent, child}
676        end
677      end
678    {%{changeset | changes: changes}, parent, child}
679  end
680
681  defp process_parents(%{changes: changes} = changeset, assocs, opts) do
682    case Ecto.Association.on_repo_change(changeset, assocs, opts) do
683      {:ok, struct} ->
684        changes = change_parents(changes, struct, assocs)
685        %{changeset | changes: changes, data: struct}
686      {:error, changes} ->
687        %{changeset | changes: changes, valid?: false}
688    end
689  end
690
691  defp change_parents(changes, struct, assocs) do
692    Enum.reduce assocs, changes, fn {refl, _}, acc ->
693      %{field: field, owner_key: owner_key, related_key: related_key} = refl
694      related = Map.get(struct, field)
695      value = related && Map.fetch!(related, related_key)
696      case Map.fetch(changes, owner_key) do
697        {:ok, current} when current != value ->
698          raise ArgumentError,
699            "cannot change belongs_to association `#{field}` because there is " <>
700            "already a change setting its foreign key `#{owner_key}` to `#{inspect current}`"
701        _ ->
702          Map.put(acc, owner_key, value)
703      end
704    end
705  end
706
707  defp process_children(changeset, assocs, user_changeset, opts) do
708    case Ecto.Association.on_repo_change(changeset, assocs, opts) do
709      {:ok, struct} -> {:ok, struct}
710      {:error, changes} ->
711        {:error, %{user_changeset | valid?: false, changes: changes}}
712    end
713  end
714
715  defp delete_assocs(%{data: struct}, repo, schema, assocs, opts) do
716    for assoc_name <- assocs do
717      case schema.__schema__(:association, assoc_name) do
718        %{__struct__: mod, on_delete: on_delete} = reflection when on_delete != :nothing ->
719          apply(mod, on_delete, [reflection, struct, repo, opts])
720        _ ->
721          :ok
722      end
723    end
724    :ok
725  end
726
727  defp autogenerate_id(nil, changes, return_types, return_sources, _adapter) do
728    {changes, [], return_types, return_sources}
729  end
730
731  defp autogenerate_id({key, source, type}, changes, return_types, return_sources, adapter) do
732    cond do
733      Map.has_key?(changes, key) -> # Set by user
734        {changes, [], return_types, return_sources}
735      value = adapter.autogenerate(type) -> # Autogenerated now
736        {changes, [{source, value}], [{key, type} | return_types], return_sources}
737      true -> # Autogenerated in storage
738        {changes, [], [{key, type} | return_types], [source | List.delete(return_sources, source)]}
739    end
740  end
741
742  defp dump_changes!(action, changes, schema, extra, dumper, adapter) do
743    autogen = autogenerate_changes(schema, action, changes)
744    dumped  =
745      dump_fields!(action, schema, changes, dumper, adapter) ++
746      dump_fields!(action, schema, autogen, dumper, adapter) ++
747      extra
748    {dumped, autogen}
749  end
750
751  defp autogenerate_changes(schema, action, changes) do
752    for {k, {mod, fun, args}} <- schema.__schema__(action_to_auto(action)),
753        not Map.has_key?(changes, k),
754        do: {k, apply(mod, fun, args)}
755  end
756
757  defp action_to_auto(:insert), do: :autogenerate
758  defp action_to_auto(:update), do: :autoupdate
759
760  defp add_pk_filter!(filters, struct) do
761    Enum.reduce Ecto.primary_key!(struct), filters, fn
762      {_k, nil}, _acc ->
763        raise Ecto.NoPrimaryKeyValueError, struct: struct
764      {k, v}, acc ->
765        Map.put(acc, k, v)
766    end
767  end
768
769  defp wrap_in_transaction(repo, adapter, opts, assocs, prepare, fun) do
770    if (assocs != [] or prepare != []) and
771       Keyword.get(opts, :skip_transaction) != true and
772       function_exported?(adapter, :transaction, 3) do
773      adapter.transaction(repo, opts, fn ->
774        case fun.() do
775          {:ok, struct} -> struct
776          {:error, changeset} -> adapter.rollback(repo, changeset)
777        end
778      end)
779    else
780      fun.()
781    end
782  end
783
784  defp dump_field!(action, schema, field, type, value, adapter) do
785    case Ecto.Type.adapter_dump(adapter, type, value) do
786      {:ok, value} ->
787        value
788      :error ->
789        raise Ecto.ChangeError,
790              "value `#{inspect value}` for `#{inspect schema}.#{field}` " <>
791              "in `#{action}` does not match type #{inspect type}"
792    end
793  end
794
795  defp dump_fields!(action, schema, kw, dumper, adapter) do
796    for {field, value} <- kw do
797      {alias, type} = Map.fetch!(dumper, field)
798      {alias, dump_field!(action, schema, field, type, value, adapter)}
799    end
800  end
801end
802