1defmodule Ecto.Repo.Schema do 2 # The module invoked by user defined repos 3 # for schema related functionality. 4 @moduledoc false 5 6 alias Ecto.Changeset 7 alias Ecto.Changeset.Relation 8 require Ecto.Query 9 10 @doc """ 11 Implementation for `Ecto.Repo.insert_all/3`. 12 """ 13 def insert_all(repo, adapter, schema, rows, opts) when is_atom(schema) do 14 do_insert_all(repo, adapter, schema, schema.__schema__(:prefix), 15 schema.__schema__(:source), rows, opts) 16 end 17 18 def insert_all(repo, adapter, table, rows, opts) when is_binary(table) do 19 do_insert_all(repo, adapter, nil, nil, table, rows, opts) 20 end 21 22 def insert_all(repo, adapter, {source, schema}, rows, opts) when is_atom(schema) do 23 do_insert_all(repo, adapter, schema, schema.__schema__(:prefix), source, rows, opts) 24 end 25 26 defp do_insert_all(_repo, _adapter, _schema, _prefix, _source, [], opts) do 27 if opts[:returning] do 28 {0, []} 29 else 30 {0, nil} 31 end 32 end 33 34 defp do_insert_all(repo, adapter, schema, prefix, source, rows, opts) when is_list(rows) do 35 autogen_id = schema && schema.__schema__(:autogenerate_id) 36 dumper = schema && schema.__schema__(:dump) 37 {return_fields_or_types, return_sources} = 38 schema 39 |> returning(opts) 40 |> fields_to_sources(dumper) 41 42 {rows, header} = extract_header_and_fields(rows, schema, dumper, autogen_id, adapter) 43 counter = fn -> Enum.reduce(rows, 0, &length(&1) + &2) end 44 metadata = metadata(schema, prefix, source, autogen_id, nil, opts) 45 46 {on_conflict, opts} = Keyword.pop(opts, :on_conflict, :raise) 47 {conflict_target, opts} = Keyword.pop(opts, :conflict_target, []) 48 conflict_target = conflict_target(conflict_target, dumper) 49 on_conflict = on_conflict(on_conflict, conflict_target, metadata, counter, adapter) 50 51 {count, rows} = 52 adapter.insert_all(repo, metadata, Map.keys(header), rows, on_conflict, return_sources, opts) 53 {count, postprocess(rows, return_fields_or_types, adapter, schema, metadata)} 54 end 55 56 defp postprocess(nil, [], _adapter, _schema, _metadata) do 57 nil 58 end 59 defp postprocess(rows, fields, _adapter, nil, _metadata) do 60 for row <- rows, do: Map.new(Enum.zip(fields, row)) 61 end 62 defp postprocess(rows, types, adapter, schema, %{source: {prefix, source}}) do 63 struct = schema.__struct__() 64 for row <- rows do 65 Ecto.Schema.__safe_load__(struct, types, row, prefix, source, 66 &Ecto.Type.adapter_load(adapter, &1, &2)) 67 end 68 end 69 70 defp extract_header_and_fields(rows, schema, dumper, autogen_id, adapter) do 71 header = init_header(autogen_id) 72 mapper = init_mapper(schema, dumper, adapter) 73 74 Enum.map_reduce(rows, header, fn fields, header -> 75 {fields, header} = Enum.map_reduce(fields, header, mapper) 76 {autogenerate_id(autogen_id, fields, adapter), header} 77 end) 78 end 79 80 defp init_header(nil), do: %{} 81 defp init_header({_, source, _}), do: %{source => true} 82 83 defp init_mapper(nil, _dumper, _adapter) do 84 fn {field, value}, acc -> 85 case Ecto.DataType.dump(value) do 86 {:ok, value} -> 87 {{field, value}, Map.put(acc, field, true)} 88 :error -> 89 raise Ecto.ChangeError, 90 message: "value `#{inspect value}` cannot be dumped with Ecto.DataType" 91 end 92 end 93 end 94 defp init_mapper(schema, dumper, adapter) do 95 fn {field, value}, acc -> 96 case dumper do 97 %{^field => {source, type}} -> 98 value = dump_field!(:insert_all, schema, field, type, value, adapter) 99 {{source, value}, Map.put(acc, source, true)} 100 %{} -> 101 raise ArgumentError, "unknown field `#{field}` in schema #{inspect schema} given to " <> 102 "insert_all. Note virtual fields and associations are not supported" 103 end 104 end 105 end 106 107 defp autogenerate_id(nil, fields, _adapter), do: fields 108 defp autogenerate_id({key, source, type}, fields, adapter) do 109 case :lists.keyfind(key, 1, fields) do 110 {^key, _} -> 111 fields 112 false -> 113 if value = adapter.autogenerate(type) do 114 [{source, value} | fields] 115 else 116 fields 117 end 118 end 119 end 120 121 @doc """ 122 Implementation for `Ecto.Repo.insert!/2`. 123 """ 124 def insert!(repo, adapter, struct_or_changeset, opts) do 125 case insert(repo, adapter, struct_or_changeset, opts) do 126 {:ok, struct} -> struct 127 {:error, changeset} -> 128 raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset 129 end 130 end 131 132 @doc """ 133 Implementation for `Ecto.Repo.update!/2`. 134 """ 135 def update!(repo, adapter, struct_or_changeset, opts) do 136 case update(repo, adapter, struct_or_changeset, opts) do 137 {:ok, struct} -> struct 138 {:error, changeset} -> 139 raise Ecto.InvalidChangesetError, action: :update, changeset: changeset 140 end 141 end 142 143 @doc """ 144 Implementation for `Ecto.Repo.delete!/2`. 145 """ 146 def delete!(repo, adapter, struct_or_changeset, opts) do 147 case delete(repo, adapter, struct_or_changeset, opts) do 148 {:ok, struct} -> struct 149 {:error, changeset} -> 150 raise Ecto.InvalidChangesetError, action: :delete, changeset: changeset 151 end 152 end 153 154 @doc """ 155 Implementation for `Ecto.Repo.insert/2`. 156 """ 157 def insert(repo, adapter, %Changeset{} = changeset, opts) when is_list(opts) do 158 do_insert(repo, adapter, changeset, opts) 159 end 160 161 def insert(repo, adapter, %{__struct__: _} = struct, opts) when is_list(opts) do 162 changeset = Ecto.Changeset.change(struct) 163 do_insert(repo, adapter, changeset, opts) 164 end 165 166 defp do_insert(repo, adapter, %Changeset{valid?: true} = changeset, opts) do 167 %{prepare: prepare, repo_opts: repo_opts} = changeset 168 opts = Keyword.merge(repo_opts, opts) 169 170 struct = struct_from_changeset!(:insert, changeset) 171 schema = struct.__struct__ 172 dumper = schema.__schema__(:dump) 173 fields = schema.__schema__(:fields) 174 assocs = schema.__schema__(:associations) 175 176 {return_types, return_sources} = 177 schema 178 |> returning(opts) 179 |> add_read_after_writes(schema) 180 |> fields_to_sources(dumper) 181 182 {on_conflict, opts} = Keyword.pop(opts, :on_conflict, :raise) 183 {conflict_target, opts} = Keyword.pop(opts, :conflict_target, []) 184 conflict_target = conflict_target(conflict_target, dumper) 185 186 # On insert, we always merge the whole struct into the 187 # changeset as changes, except the primary key if it is nil. 188 changeset = put_repo_and_action(changeset, :insert, repo) 189 changeset = surface_changes(changeset, struct, fields ++ assocs) 190 191 wrap_in_transaction(repo, adapter, opts, assocs, prepare, fn -> 192 opts = Keyword.put(opts, :skip_transaction, true) 193 user_changeset = run_prepare(changeset, prepare) 194 195 {changeset, parents, children} = pop_assocs(user_changeset, assocs) 196 changeset = process_parents(changeset, parents, opts) 197 198 if changeset.valid? do 199 embeds = Ecto.Embedded.prepare(changeset, adapter, :insert) 200 201 autogen_id = schema.__schema__(:autogenerate_id) 202 metadata = metadata(struct, autogen_id, opts) 203 204 changes = Map.merge(changeset.changes, embeds) 205 {changes, extra, return_types, return_sources} = 206 autogenerate_id(autogen_id, changes, return_types, return_sources, adapter) 207 {changes, autogen} = 208 dump_changes!(:insert, Map.take(changes, fields), schema, extra, dumper, adapter) 209 on_conflict = 210 on_conflict(on_conflict, conflict_target, metadata, fn -> length(changes) end, adapter) 211 212 args = [repo, metadata, changes, on_conflict, return_sources, opts] 213 case apply(changeset, adapter, :insert, args) do 214 {:ok, values} -> 215 values = extra ++ values 216 changeset 217 |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, metadata) 218 |> process_children(children, user_changeset, opts) 219 {:error, _} = error -> 220 error 221 {:invalid, constraints} -> 222 {:error, constraints_to_errors(user_changeset, :insert, constraints)} 223 end 224 else 225 {:error, changeset} 226 end 227 end) 228 end 229 230 defp do_insert(repo, _adapter, %Changeset{valid?: false} = changeset, _opts) do 231 {:error, put_repo_and_action(changeset, :insert, repo)} 232 end 233 234 @doc """ 235 Implementation for `Ecto.Repo.update/2`. 236 """ 237 def update(repo, adapter, %Changeset{} = changeset, opts) when is_list(opts) do 238 do_update(repo, adapter, changeset, opts) 239 end 240 241 def update(repo, _adapter, %{__struct__: _}, opts) when is_list(opts) do 242 raise ArgumentError, "giving a struct to #{inspect repo}.update/2 is not supported. " <> 243 "Ecto is unable to properly track changes when a struct is given, " <> 244 "an Ecto.Changeset must be given instead" 245 end 246 247 defp do_update(repo, adapter, %Changeset{valid?: true} = changeset, opts) do 248 %{prepare: prepare, repo_opts: repo_opts} = changeset 249 opts = Keyword.merge(repo_opts, opts) 250 251 struct = struct_from_changeset!(:update, changeset) 252 schema = struct.__struct__ 253 dumper = schema.__schema__(:dump) 254 fields = schema.__schema__(:fields) 255 assocs = schema.__schema__(:associations) 256 force? = !!opts[:force] 257 filters = add_pk_filter!(changeset.filters, struct) 258 259 {return_types, return_sources} = 260 schema.__schema__(:read_after_writes) 261 |> fields_to_sources(dumper) 262 263 # Differently from insert, update does not copy the struct 264 # fields into the changeset. All changes must be in the 265 # changeset before hand. 266 changeset = put_repo_and_action(changeset, :update, repo) 267 268 if changeset.changes != %{} or force? do 269 wrap_in_transaction(repo, adapter, opts, assocs, prepare, fn -> 270 opts = Keyword.put(opts, :skip_transaction, true) 271 user_changeset = run_prepare(changeset, prepare) 272 273 {changeset, parents, children} = pop_assocs(user_changeset, assocs) 274 changeset = process_parents(changeset, parents, opts) 275 276 if changeset.valid? do 277 embeds = Ecto.Embedded.prepare(changeset, adapter, :update) 278 279 original = changeset.changes |> Map.merge(embeds) |> Map.take(fields) 280 {changes, autogen} = dump_changes!(:update, original, schema, [], dumper, adapter) 281 282 metadata = metadata(struct, schema.__schema__(:autogenerate_id), opts) 283 filters = dump_fields!(:update, schema, filters, dumper, adapter) 284 args = [repo, metadata, changes, filters, return_sources, opts] 285 286 # If there are no changes or all the changes were autogenerated but not forced, we skip 287 {action, autogen} = 288 if original != %{} or (autogen != [] and force?), 289 do: {:update, autogen}, 290 else: {:noop, []} 291 292 case apply(changeset, adapter, action, args) do 293 {:ok, values} -> 294 changeset 295 |> load_changes(:loaded, return_types, values, embeds, autogen, adapter, metadata) 296 |> process_children(children, user_changeset, opts) 297 {:error, _} = error -> 298 error 299 {:invalid, constraints} -> 300 {:error, constraints_to_errors(user_changeset, :update, constraints)} 301 end 302 else 303 {:error, changeset} 304 end 305 end) 306 else 307 {:ok, changeset.data} 308 end 309 end 310 311 defp do_update(repo, _adapter, %Changeset{valid?: false} = changeset, _opts) do 312 {:error, put_repo_and_action(changeset, :update, repo)} 313 end 314 315 @doc """ 316 Implementation for `Ecto.Repo.insert_or_update/2`. 317 """ 318 def insert_or_update(repo, adapter, changeset, opts) do 319 case get_state(changeset) do 320 :built -> insert repo, adapter, changeset, opts 321 :loaded -> update repo, adapter, changeset, opts 322 state -> raise ArgumentError, "the changeset has an invalid state " <> 323 "for Repo.insert_or_update/2: #{state}" 324 end 325 end 326 327 @doc """ 328 Implementation for `Ecto.Repo.insert_or_update!/2`. 329 """ 330 def insert_or_update!(repo, adapter, changeset, opts) do 331 case get_state(changeset) do 332 :built -> insert! repo, adapter, changeset, opts 333 :loaded -> update! repo, adapter, changeset, opts 334 state -> raise ArgumentError, "the changeset has an invalid state " <> 335 "for Repo.insert_or_update!/2: #{state}" 336 end 337 end 338 339 defp get_state(%Changeset{data: %{__meta__: %{state: state}}}), do: state 340 defp get_state(%{__struct__: _}) do 341 raise ArgumentError, "giving a struct to Repo.insert_or_update/2 or " <> 342 "Repo.insert_or_update!/2 is not supported. " <> 343 "Please use an Ecto.Changeset" 344 end 345 346 @doc """ 347 Implementation for `Ecto.Repo.delete/2`. 348 """ 349 def delete(repo, adapter, %Changeset{} = changeset, opts) when is_list(opts) do 350 do_delete(repo, adapter, changeset, opts) 351 end 352 353 def delete(repo, adapter, %{__struct__: _} = struct, opts) when is_list(opts) do 354 changeset = Ecto.Changeset.change(struct) 355 do_delete(repo, adapter, changeset, opts) 356 end 357 358 defp do_delete(repo, adapter, %Changeset{valid?: true} = changeset, opts) do 359 %{prepare: prepare, repo_opts: repo_opts} = changeset 360 opts = Keyword.merge(repo_opts, opts) 361 362 struct = struct_from_changeset!(:delete, changeset) 363 schema = struct.__struct__ 364 assocs = schema.__schema__(:associations) 365 dumper = schema.__schema__(:dump) 366 367 changeset = put_repo_and_action(changeset, :delete, repo) 368 changeset = %{changeset | changes: %{}} 369 370 wrap_in_transaction(repo, adapter, opts, assocs, prepare, fn -> 371 changeset = run_prepare(changeset, prepare) 372 373 filters = add_pk_filter!(changeset.filters, struct) 374 filters = dump_fields!(:delete, schema, filters, dumper, adapter) 375 376 delete_assocs(changeset, repo, schema, assocs, opts) 377 metadata = metadata(struct, schema.__schema__(:autogenerate_id), opts) 378 args = [repo, metadata, filters, opts] 379 case apply(changeset, adapter, :delete, args) do 380 {:ok, values} -> 381 changeset = load_changes(changeset, :deleted, [], values, %{}, [], adapter, metadata) 382 {:ok, changeset.data} 383 {:error, _} = error -> 384 error 385 {:invalid, constraints} -> 386 {:error, constraints_to_errors(changeset, :delete, constraints)} 387 end 388 end) 389 end 390 391 defp do_delete(repo, _adapter, %Changeset{valid?: false} = changeset, _opts) do 392 {:error, put_repo_and_action(changeset, :delete, repo)} 393 end 394 395 def load(adapter, schema_or_types, data) do 396 do_load(schema_or_types, data, &Ecto.Type.adapter_load(adapter, &1, &2)) 397 end 398 399 defp do_load(schema, data, loader) when is_list(data), 400 do: do_load(schema, Map.new(data), loader) 401 defp do_load(schema, {fields, values}, loader) when is_list(fields) and is_list(values), 402 do: do_load(schema, Enum.zip(fields, values), loader) 403 defp do_load(schema, data, loader) when is_atom(schema), 404 do: Ecto.Schema.__unsafe_load__(schema, data, loader) 405 defp do_load(types, data, loader) when is_map(types), 406 do: Ecto.Schema.__unsafe_load__(%{}, types, data, loader) 407 408 ## Helpers 409 410 defp returning(schema, opts) do 411 case Keyword.get(opts, :returning, false) do 412 [_ | _] = fields -> 413 fields 414 [] -> 415 raise ArgumentError, ":returning expects at least one field to be given, got an empty list" 416 true when is_nil(schema) -> 417 raise ArgumentError, ":returning option can only be set to true if a schema is given" 418 true -> 419 schema.__schema__(:fields) 420 false -> 421 [] 422 end 423 end 424 425 defp add_read_after_writes(return, schema) do 426 Enum.uniq(return ++ schema.__schema__(:read_after_writes)) 427 end 428 429 defp fields_to_sources(fields, nil) do 430 {fields, fields} 431 end 432 defp fields_to_sources(fields, dumper) do 433 Enum.reduce(fields, {[], []}, fn field, {types, sources} -> 434 {source, type} = Map.fetch!(dumper, field) 435 {[{field, type} | types], [source | sources]} 436 end) 437 end 438 439 defp struct_from_changeset!(action, %{data: nil}), 440 do: raise(ArgumentError, "cannot #{action} a changeset without :data") 441 defp struct_from_changeset!(_action, %{data: struct}), 442 do: struct 443 444 defp put_repo_and_action(%{action: :ignore, valid?: valid?} = changeset, action, repo) do 445 if valid? do 446 raise ArgumentError, "a valid changeset with action :ignore was given to " <> 447 "#{inspect repo}.#{action}/2. Changesets can only be ignored " <> 448 "in a repository action if they are also invalid" 449 else 450 %{changeset | action: action, repo: repo} 451 end 452 end 453 defp put_repo_and_action(%{action: given}, action, repo) when given != nil and given != action, 454 do: raise ArgumentError, "a changeset with action #{inspect given} was given to #{inspect repo}.#{action}/2" 455 defp put_repo_and_action(changeset, action, repo), 456 do: %{changeset | action: action, repo: repo} 457 458 defp run_prepare(changeset, prepare) do 459 Enum.reduce(Enum.reverse(prepare), changeset, fn fun, acc -> 460 case fun.(acc) do 461 %Ecto.Changeset{} = acc -> acc 462 other -> 463 raise "expected function #{inspect fun} given to Ecto.Changeset.prepare_changes/2 " <> 464 "to return an Ecto.Changeset, got: `#{inspect other}`" 465 end 466 end) 467 end 468 469 defp metadata(schema, prefix, source, autogen_id, context, opts) do 470 %{ 471 autogenerate_id: case autogen_id do nil -> nil; {_, source, type} -> {source, type} end, 472 context: context, 473 schema: schema, 474 source: {Keyword.get(opts, :prefix, prefix), source} 475 } 476 end 477 defp metadata(%{__struct__: schema, __meta__: %{context: context, source: {prefix, source}}}, 478 autogen_id, opts) do 479 metadata(schema, prefix, source, autogen_id, context, opts) 480 end 481 482 defp conflict_target({:constraint, constraint}, _dumper) when is_atom(constraint) do 483 {:constraint, constraint} 484 end 485 defp conflict_target(conflict_target, dumper) do 486 for target <- List.wrap(conflict_target) do 487 case dumper do 488 %{^target => {alias, _}} -> 489 alias 490 %{} when is_atom(target) -> 491 raise ArgumentError, "unknown field `#{target}` in conflict_target" 492 _ -> 493 target 494 end 495 end 496 end 497 498 defp on_conflict(on_conflict, conflict_target, 499 %{source: {prefix, source}, schema: schema}, changes, adapter) do 500 case on_conflict do 501 :raise when conflict_target == [] -> 502 {:raise, [], []} 503 :raise -> 504 raise ArgumentError, ":conflict_target option is forbidden when :on_conflict is :raise" 505 :nothing -> 506 {:nothing, [], conflict_target} 507 :replace_all -> 508 {:replace_all, [], conflict_target} 509 [_ | _] = on_conflict -> 510 from = if schema, do: {source, schema}, else: source 511 query = Ecto.Query.from from, update: ^on_conflict 512 on_conflict_query(query, {source, schema}, prefix, changes, adapter, conflict_target) 513 %Ecto.Query{} = query -> 514 on_conflict_query(query, {source, schema}, prefix, changes, adapter, conflict_target) 515 other -> 516 raise ArgumentError, "unknown value for :on_conflict, got: #{inspect other}" 517 end 518 end 519 520 defp on_conflict_query(query, from, prefix, changes, adapter, conflict_target) do 521 counter = changes.() 522 523 {query, params, _} = 524 %{query | prefix: prefix} 525 |> Ecto.Query.Planner.assert_no_select!(:update_all) 526 |> Ecto.Query.Planner.returning(false) 527 |> Ecto.Query.Planner.prepare(:update_all, adapter, counter) 528 529 unless query.from == from do 530 raise ArgumentError, "cannot run on_conflict: query because the query " <> 531 "has a different {source, schema} pair than the " <> 532 "original struct/changeset/query. Got #{inspect query.from} " <> 533 "and #{inspect from} respectively" 534 end 535 536 {query, _} = Ecto.Query.Planner.normalize(query, :update_all, adapter, counter) 537 {query, params, conflict_target} 538 end 539 540 defp apply(%{valid?: false} = changeset, _adapter, _action, _args) do 541 {:error, changeset} 542 end 543 defp apply(_changeset, _adapter, :noop, _args) do 544 {:ok, []} 545 end 546 defp apply(changeset, adapter, action, args) do 547 case apply(adapter, action, args) do 548 {:ok, values} -> 549 {:ok, values} 550 {:invalid, _} = constraints -> 551 constraints 552 {:error, :stale} -> 553 raise Ecto.StaleEntryError, struct: changeset.data, action: action 554 end 555 end 556 557 defp constraints_to_errors(%{constraints: user_constraints, errors: errors} = changeset, action, constraints) do 558 constraint_errors = 559 Enum.map constraints, fn {type, constraint} -> 560 user_constraint = 561 Enum.find(user_constraints, fn c -> 562 case {c.type, c.constraint, c.match} do 563 {^type, ^constraint, :exact} -> true 564 {^type, cc, :suffix} -> String.ends_with?(constraint, cc) 565 {^type, cc, :prefix} -> String.starts_with?(constraint, cc) 566 _ -> false 567 end 568 end) 569 570 case user_constraint do 571 %{field: field, error: error} -> 572 {field, error} 573 nil -> 574 raise Ecto.ConstraintError, action: action, type: type, 575 constraint: constraint, changeset: changeset 576 end 577 end 578 579 %{changeset | errors: constraint_errors ++ errors, valid?: false} 580 end 581 582 defp surface_changes(%{changes: changes, types: types} = changeset, struct, fields) do 583 {changes, errors} = 584 Enum.reduce fields, {changes, []}, fn field, {changes, errors} -> 585 case {struct, changes, types} do 586 # User has explicitly changed it 587 {_, %{^field => _}, _} -> 588 {changes, errors} 589 590 # Handle associations specially 591 {_, _, %{^field => {tag, embed_or_assoc}}} when tag in [:assoc, :embed] -> 592 # This is partly reimplementing the logic behind put_relation 593 # in Ecto.Changeset but we need to do it in a way where we have 594 # control over the current value. 595 value = Relation.load!(struct, Map.get(struct, field)) 596 empty = Relation.empty(embed_or_assoc) 597 case Relation.change(embed_or_assoc, value, empty) do 598 {:ok, change, _} when change != empty -> 599 {Map.put(changes, field, change), errors} 600 :error -> 601 {changes, [{field, "is invalid"}]} 602 _ -> # :ignore or ok with change == empty 603 {changes, errors} 604 end 605 606 # Struct has a non nil value 607 {%{^field => value}, _, %{^field => _}} when value != nil -> 608 {Map.put(changes, field, value), errors} 609 610 {_, _, _} -> 611 {changes, errors} 612 end 613 end 614 615 case errors do 616 [] -> %{changeset | changes: changes} 617 _ -> %{changeset | errors: errors ++ changeset.errors, valid?: false, changes: changes} 618 end 619 end 620 621 defp load_changes(changeset, state, types, values, embeds, autogen, adapter, metadata) do 622 %{changes: changes} = changeset 623 %{source: source} = metadata 624 625 # It is ok to use types from changeset because we have 626 # already filtered the results to be only about fields. 627 data = 628 changeset.data 629 |> Map.merge(changes) 630 |> Map.merge(embeds) 631 |> merge_autogen(autogen) 632 |> apply_metadata(state, source) 633 |> load_each(values, types, adapter) 634 Map.put(changeset, :data, data) 635 end 636 637 defp merge_autogen(data, autogen) do 638 Enum.reduce(autogen, data, fn {k, v}, acc -> %{acc | k => v} end) 639 end 640 641 defp apply_metadata(%{__meta__: meta} = data, state, source) do 642 %{data | __meta__: %{meta | state: state, source: source}} 643 end 644 645 defp load_each(struct, [{_, value} | kv], [{key, type} | types], adapter) do 646 case Ecto.Type.adapter_load(adapter, type, value) do 647 {:ok, value} -> 648 load_each(%{struct | key => value}, kv, types, adapter) 649 :error -> 650 raise ArgumentError, "cannot load `#{inspect value}` as type #{inspect type} " <> 651 "for field `#{key}` in schema #{inspect struct.__struct__}" 652 end 653 end 654 defp load_each(struct, [], _types, _adapter) do 655 struct 656 end 657 658 defp pop_assocs(changeset, []) do 659 {changeset, [], []} 660 end 661 defp pop_assocs(%{changes: changes, types: types} = changeset, assocs) do 662 {changes, parent, child} = 663 Enum.reduce assocs, {changes, [], []}, fn assoc, {changes, parent, child} -> 664 case Map.fetch(changes, assoc) do 665 {:ok, value} -> 666 changes = Map.delete(changes, assoc) 667 668 case Map.fetch!(types, assoc) do 669 {:assoc, %{relationship: :parent} = refl} -> 670 {changes, [{refl, value}|parent], child} 671 {:assoc, %{relationship: :child} = refl} -> 672 {changes, parent, [{refl, value}|child]} 673 end 674 :error -> 675 {changes, parent, child} 676 end 677 end 678 {%{changeset | changes: changes}, parent, child} 679 end 680 681 defp process_parents(%{changes: changes} = changeset, assocs, opts) do 682 case Ecto.Association.on_repo_change(changeset, assocs, opts) do 683 {:ok, struct} -> 684 changes = change_parents(changes, struct, assocs) 685 %{changeset | changes: changes, data: struct} 686 {:error, changes} -> 687 %{changeset | changes: changes, valid?: false} 688 end 689 end 690 691 defp change_parents(changes, struct, assocs) do 692 Enum.reduce assocs, changes, fn {refl, _}, acc -> 693 %{field: field, owner_key: owner_key, related_key: related_key} = refl 694 related = Map.get(struct, field) 695 value = related && Map.fetch!(related, related_key) 696 case Map.fetch(changes, owner_key) do 697 {:ok, current} when current != value -> 698 raise ArgumentError, 699 "cannot change belongs_to association `#{field}` because there is " <> 700 "already a change setting its foreign key `#{owner_key}` to `#{inspect current}`" 701 _ -> 702 Map.put(acc, owner_key, value) 703 end 704 end 705 end 706 707 defp process_children(changeset, assocs, user_changeset, opts) do 708 case Ecto.Association.on_repo_change(changeset, assocs, opts) do 709 {:ok, struct} -> {:ok, struct} 710 {:error, changes} -> 711 {:error, %{user_changeset | valid?: false, changes: changes}} 712 end 713 end 714 715 defp delete_assocs(%{data: struct}, repo, schema, assocs, opts) do 716 for assoc_name <- assocs do 717 case schema.__schema__(:association, assoc_name) do 718 %{__struct__: mod, on_delete: on_delete} = reflection when on_delete != :nothing -> 719 apply(mod, on_delete, [reflection, struct, repo, opts]) 720 _ -> 721 :ok 722 end 723 end 724 :ok 725 end 726 727 defp autogenerate_id(nil, changes, return_types, return_sources, _adapter) do 728 {changes, [], return_types, return_sources} 729 end 730 731 defp autogenerate_id({key, source, type}, changes, return_types, return_sources, adapter) do 732 cond do 733 Map.has_key?(changes, key) -> # Set by user 734 {changes, [], return_types, return_sources} 735 value = adapter.autogenerate(type) -> # Autogenerated now 736 {changes, [{source, value}], [{key, type} | return_types], return_sources} 737 true -> # Autogenerated in storage 738 {changes, [], [{key, type} | return_types], [source | List.delete(return_sources, source)]} 739 end 740 end 741 742 defp dump_changes!(action, changes, schema, extra, dumper, adapter) do 743 autogen = autogenerate_changes(schema, action, changes) 744 dumped = 745 dump_fields!(action, schema, changes, dumper, adapter) ++ 746 dump_fields!(action, schema, autogen, dumper, adapter) ++ 747 extra 748 {dumped, autogen} 749 end 750 751 defp autogenerate_changes(schema, action, changes) do 752 for {k, {mod, fun, args}} <- schema.__schema__(action_to_auto(action)), 753 not Map.has_key?(changes, k), 754 do: {k, apply(mod, fun, args)} 755 end 756 757 defp action_to_auto(:insert), do: :autogenerate 758 defp action_to_auto(:update), do: :autoupdate 759 760 defp add_pk_filter!(filters, struct) do 761 Enum.reduce Ecto.primary_key!(struct), filters, fn 762 {_k, nil}, _acc -> 763 raise Ecto.NoPrimaryKeyValueError, struct: struct 764 {k, v}, acc -> 765 Map.put(acc, k, v) 766 end 767 end 768 769 defp wrap_in_transaction(repo, adapter, opts, assocs, prepare, fun) do 770 if (assocs != [] or prepare != []) and 771 Keyword.get(opts, :skip_transaction) != true and 772 function_exported?(adapter, :transaction, 3) do 773 adapter.transaction(repo, opts, fn -> 774 case fun.() do 775 {:ok, struct} -> struct 776 {:error, changeset} -> adapter.rollback(repo, changeset) 777 end 778 end) 779 else 780 fun.() 781 end 782 end 783 784 defp dump_field!(action, schema, field, type, value, adapter) do 785 case Ecto.Type.adapter_dump(adapter, type, value) do 786 {:ok, value} -> 787 value 788 :error -> 789 raise Ecto.ChangeError, 790 "value `#{inspect value}` for `#{inspect schema}.#{field}` " <> 791 "in `#{action}` does not match type #{inspect type}" 792 end 793 end 794 795 defp dump_fields!(action, schema, kw, dumper, adapter) do 796 for {field, value} <- kw do 797 {alias, type} = Map.fetch!(dumper, field) 798 {alias, dump_field!(action, schema, field, type, value, adapter)} 799 end 800 end 801end 802