1defmodule Ecto.Repo.Queryable do 2 # The module invoked by user defined repos 3 # for query related functionality. 4 @moduledoc false 5 6 @dialyzer {:no_opaque, transaction: 4} 7 8 alias Ecto.Query 9 alias Ecto.Queryable 10 alias Ecto.Query.Planner 11 alias Ecto.Query.SelectExpr 12 13 require Ecto.Query 14 15 def transaction(adapter, repo, fun, opts) when is_function(fun, 0) do 16 adapter.transaction(repo, opts, fun) 17 end 18 19 def transaction(adapter, repo, %Ecto.Multi{} = multi, opts) do 20 wrap = &adapter.transaction(repo, opts, &1) 21 return = &adapter.rollback(repo, &1) 22 23 case Ecto.Multi.__apply__(multi, repo, wrap, return) do 24 {:ok, values} -> 25 {:ok, values} 26 {:error, {key, error_value, values}} -> 27 {:error, key, error_value, values} 28 end 29 end 30 31 def all(repo, adapter, queryable, opts) when is_list(opts) do 32 query = 33 queryable 34 |> Ecto.Queryable.to_query 35 |> Ecto.Query.Planner.returning(true) 36 |> attach_prefix(opts) 37 execute(:all, repo, adapter, query, opts) |> elem(1) 38 end 39 40 def stream(repo, adapter, queryable, opts) when is_list(opts) do 41 query = 42 queryable 43 |> Ecto.Queryable.to_query 44 |> Ecto.Query.Planner.returning(true) 45 |> attach_prefix(opts) 46 stream(:all, repo, adapter, query, opts) 47 end 48 49 def get(repo, adapter, queryable, id, opts) do 50 one(repo, adapter, query_for_get(repo, queryable, id), opts) 51 end 52 53 def get!(repo, adapter, queryable, id, opts) do 54 one!(repo, adapter, query_for_get(repo, queryable, id), opts) 55 end 56 57 def get_by(repo, adapter, queryable, clauses, opts) do 58 one(repo, adapter, query_for_get_by(repo, queryable, clauses), opts) 59 end 60 61 def get_by!(repo, adapter, queryable, clauses, opts) do 62 one!(repo, adapter, query_for_get_by(repo, queryable, clauses), opts) 63 end 64 65 def aggregate(repo, adapter, queryable, aggregate, field, opts) do 66 one!(repo, adapter, query_for_aggregate(queryable, aggregate, field), opts) 67 end 68 69 def one(repo, adapter, queryable, opts) do 70 case all(repo, adapter, queryable, opts) do 71 [one] -> one 72 [] -> nil 73 other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other) 74 end 75 end 76 77 def one!(repo, adapter, queryable, opts) do 78 case all(repo, adapter, queryable, opts) do 79 [one] -> one 80 [] -> raise Ecto.NoResultsError, queryable: queryable 81 other -> raise Ecto.MultipleResultsError, queryable: queryable, count: length(other) 82 end 83 end 84 85 def update_all(repo, adapter, queryable, [], opts) when is_list(opts) do 86 update_all(repo, adapter, queryable, opts) 87 end 88 89 def update_all(repo, adapter, queryable, updates, opts) when is_list(opts) do 90 query = Query.from queryable, update: ^updates 91 update_all(repo, adapter, query, opts) 92 end 93 94 defp update_all(repo, adapter, queryable, opts) do 95 query = 96 queryable 97 |> Ecto.Queryable.to_query 98 |> Ecto.Query.Planner.assert_no_select!(:update_all) 99 |> Ecto.Query.Planner.returning(opts[:returning] || false) 100 |> attach_prefix(opts) 101 execute(:update_all, repo, adapter, query, opts) 102 end 103 104 def delete_all(repo, adapter, queryable, opts) when is_list(opts) do 105 query = 106 queryable 107 |> Ecto.Queryable.to_query 108 |> Ecto.Query.Planner.assert_no_select!(:delete_all) 109 |> Ecto.Query.Planner.returning(opts[:returning] || false) 110 |> attach_prefix(opts) 111 execute(:delete_all, repo, adapter, query, opts) 112 end 113 114 ## Helpers 115 116 defp attach_prefix(query, opts) do 117 case Keyword.fetch(opts, :prefix) do 118 {:ok, prefix} -> %{query | prefix: prefix} 119 :error -> query 120 end 121 end 122 123 defp execute(operation, repo, adapter, query, opts) when is_list(opts) do 124 {meta, prepared, params} = Planner.query(query, operation, repo, adapter, 0) 125 126 case meta do 127 %{select: nil} -> 128 adapter.execute(repo, meta, prepared, params, nil, opts) 129 %{select: select, prefix: prefix, sources: sources, preloads: preloads} -> 130 %{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select 131 all_nil? = tuple_size(sources) != 1 132 preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter) 133 {count, rows} = adapter.execute(repo, meta, prepared, params, preprocessor, opts) 134 postprocessor = postprocessor(postprocess, take, prefix, adapter) 135 136 {count, 137 rows 138 |> Ecto.Repo.Assoc.query(assocs, sources) 139 |> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts)} 140 end 141 end 142 143 defp stream(operation, repo, adapter, query, opts) do 144 {meta, prepared, params} = Planner.query(query, operation, repo, adapter, 0) 145 146 case meta do 147 %{select: nil} -> 148 repo 149 |> adapter.stream(meta, prepared, params, nil, opts) 150 |> Stream.flat_map(fn {_, nil} -> [] end) 151 %{select: select, prefix: prefix, sources: sources, preloads: preloads} -> 152 %{preprocess: preprocess, postprocess: postprocess, take: take, assocs: assocs} = select 153 all_nil? = tuple_size(sources) != 1 154 preprocessor = &preprocess(&1, preprocess, all_nil?, prefix, adapter) 155 stream = adapter.stream(repo, meta, prepared, params, preprocessor, opts) 156 postprocessor = postprocessor(postprocess, take, prefix, adapter) 157 158 Stream.flat_map(stream, fn {_, rows} -> 159 rows 160 |> Ecto.Repo.Assoc.query(assocs, sources) 161 |> Ecto.Repo.Preloader.query(repo, preloads, take, postprocessor, opts) 162 end) 163 end 164 end 165 166 defp preprocess(row, [], _all_nil?, _prefix, _adapter) do 167 row 168 end 169 defp preprocess(row, [{:source, source_schema, fields} | sources], all_nil?, prefix, adapter) do 170 {entry, rest} = process_source(source_schema, fields, row, all_nil?, prefix, adapter) 171 [entry | preprocess(rest, sources, true, prefix, adapter)] 172 end 173 defp preprocess(row, [source | sources], all_nil?, prefix, adapter) do 174 {entry, rest} = process(row, source, nil, prefix, adapter) 175 [entry | preprocess(rest, sources, all_nil?, prefix, adapter)] 176 end 177 178 defp postprocessor({:from, :any, postprocess}, _take, prefix, adapter) do 179 fn [from | row] -> 180 row |> process(postprocess, from, prefix, adapter) |> elem(0) 181 end 182 end 183 defp postprocessor({:from, :map, postprocess}, take, prefix, adapter) do 184 fn [from | row] -> 185 row |> process(postprocess, to_map(from, take), prefix, adapter) |> elem(0) 186 end 187 end 188 defp postprocessor(postprocess, _take, prefix, adapter) do 189 fn row -> row |> process(postprocess, nil, prefix, adapter) |> elem(0) end 190 end 191 192 defp process(row, {:merge, left, right}, from, prefix, adapter) do 193 {left, row} = process(row, left, from, prefix, adapter) 194 {right, row} = process(row, right, from, prefix, adapter) 195 196 data = 197 case {left, right} do 198 {%{__struct__: struct}, %{__struct__: struct}} -> 199 right 200 |> Map.from_struct() 201 |> Enum.reduce(left, fn {key, value}, acc -> %{acc | key => value} end) 202 {_, %{__struct__: _}} -> 203 raise ArgumentError, "can only merge with a struct on the right side when both sides " <> 204 "represent the same struct. Left side is #{inspect left} and " <> 205 "right side is #{inspect right}" 206 {%{__struct__: _}, %{}} -> 207 Enum.reduce(right, left, fn {key, value}, acc -> %{acc | key => value} end) 208 {%{}, %{}} -> 209 Map.merge(left, right) 210 {_, %{}} -> 211 raise ArgumentError, "cannot merge because the left side is not a map, got: #{inspect left}" 212 {%{}, _} -> 213 raise ArgumentError, "cannot merge because the right side is not a map, got: #{inspect right}" 214 end 215 216 {data, row} 217 end 218 defp process(row, {:struct, struct, data, args}, from, prefix, adapter) do 219 case process(row, data, from, prefix, adapter) do 220 {%{__struct__: ^struct} = data, row} -> 221 process_update(data, args, row, from, prefix, adapter) 222 {data, _row} -> 223 raise BadStructError, struct: struct, term: data 224 end 225 end 226 defp process(row, {:struct, struct, args}, from, prefix, adapter) do 227 {fields, row} = process_kv(args, row, from, prefix, adapter) 228 229 case Map.merge(struct.__struct__(), Map.new(fields)) do 230 %{__meta__: %Ecto.Schema.Metadata{source: {_, source}} = metadata} = struct -> 231 metadata = %{metadata | state: :loaded, source: {prefix, source}} 232 {Map.put(struct, :__meta__, metadata), row} 233 map -> 234 {map, row} 235 end 236 end 237 defp process(row, {:map, data, args}, from, prefix, adapter) do 238 {data, row} = process(row, data, from, prefix, adapter) 239 process_update(data, args, row, from, prefix, adapter) 240 end 241 defp process(row, {:map, args}, from, prefix, adapter) do 242 {args, row} = process_kv(args, row, from, prefix, adapter) 243 {Map.new(args), row} 244 end 245 defp process(row, {:list, args}, from, prefix, adapter) do 246 process_args(args, row, from, prefix, adapter) 247 end 248 defp process(row, {:tuple, args}, from, prefix, adapter) do 249 {args, row} = process_args(args, row, from, prefix, adapter) 250 {List.to_tuple(args), row} 251 end 252 defp process(row, {:source, :from}, from, _prefix, _adapter) do 253 {from, row} 254 end 255 defp process(row, {:source, source_schema, fields}, _from, prefix, adapter) do 256 process_source(source_schema, fields, row, true, prefix, adapter) 257 end 258 defp process([value | row], {:value, :any}, _from, _prefix, _adapter) do 259 {value, row} 260 end 261 defp process([value | row], {:value, type}, _from, _prefix, adapter) do 262 {load!(type, value, nil, nil, adapter), row} 263 end 264 defp process(row, value, _from, _prefix, _adapter) 265 when is_binary(value) or is_number(value) or is_atom(value) do 266 {value, row} 267 end 268 269 defp process_update(data, args, row, from, prefix, adapter) do 270 {args, row} = process_kv(args, row, from, prefix, adapter) 271 data = Enum.reduce(args, data, fn {key, value}, acc -> %{acc | key => value} end) 272 {data, row} 273 end 274 275 defp process_source({source, schema}, types, row, all_nil?, prefix, adapter) do 276 case split_values(types, row, [], all_nil?) do 277 {nil, row} -> 278 {nil, row} 279 {values, row} -> 280 struct = if schema, do: schema.__struct__(), else: %{} 281 loader = &Ecto.Type.adapter_load(adapter, &1, &2) 282 {Ecto.Schema.__safe_load__(struct, types, values, prefix, source, loader), row} 283 end 284 end 285 286 defp split_values([_ | types], [nil | values], acc, all_nil?) do 287 split_values(types, values, [nil | acc], all_nil?) 288 end 289 defp split_values([_ | types], [value | values], acc, _all_nil?) do 290 split_values(types, values, [value | acc], false) 291 end 292 defp split_values([], values, _acc, true) do 293 {nil, values} 294 end 295 defp split_values([], values, acc, false) do 296 {Enum.reverse(acc), values} 297 end 298 299 defp process_args(args, row, from, prefix, adapter) do 300 Enum.map_reduce(args, row, fn arg, row -> 301 process(row, arg, from, prefix, adapter) 302 end) 303 end 304 305 defp process_kv(kv, row, from, prefix, adapter) do 306 Enum.map_reduce(kv, row, fn {key, value}, row -> 307 {key, row} = process(row, key, from, prefix, adapter) 308 {value, row} = process(row, value, from, prefix, adapter) 309 {{key, value}, row} 310 end) 311 end 312 313 defp load!(type, value, field, struct, adapter) do 314 case Ecto.Type.adapter_load(adapter, type, value) do 315 {:ok, value} -> 316 value 317 :error -> 318 field = field && " for field #{inspect field}" 319 struct = struct && " in #{inspect struct}" 320 raise ArgumentError, "cannot load `#{inspect value}` as type #{inspect type}#{field}#{struct}" 321 end 322 end 323 324 defp to_map(nil, _fields) do 325 nil 326 end 327 defp to_map(value, fields) when is_list(value) do 328 Enum.map(value, &to_map(&1, fields)) 329 end 330 defp to_map(value, fields) do 331 for field <- fields, into: %{} do 332 case field do 333 {k, v} -> {k, to_map(Map.fetch!(value, k), List.wrap(v))} 334 k -> {k, Map.fetch!(value, k)} 335 end 336 end 337 end 338 339 defp query_for_get(repo, _queryable, nil) do 340 raise ArgumentError, "cannot perform #{inspect repo}.get/2 because the given value is nil" 341 end 342 343 defp query_for_get(repo, queryable, id) do 344 query = Queryable.to_query(queryable) 345 schema = assert_schema!(query) 346 case schema.__schema__(:primary_key) do 347 [pk] -> 348 Query.from(x in query, where: field(x, ^pk) == ^id) 349 pks -> 350 raise ArgumentError, 351 "#{inspect repo}.get/2 requires the schema #{inspect schema} " <> 352 "to have exactly one primary key, got: #{inspect pks}" 353 end 354 end 355 356 defp query_for_get_by(_repo, queryable, clauses) do 357 Query.where(queryable, [], ^Enum.to_list(clauses)) 358 end 359 360 defp query_for_aggregate(queryable, aggregate, field) do 361 query = %{Queryable.to_query(queryable) | preloads: [], assocs: []} 362 ast = field(0, field) 363 364 query = 365 case query do 366 %{group_bys: [_|_]} -> 367 raise Ecto.QueryError, message: "cannot aggregate on query with group_by", query: query 368 %{distinct: nil, limit: nil, offset: nil} -> 369 %{query | order_bys: []} 370 _ -> 371 select = %SelectExpr{expr: ast, file: __ENV__.file, line: __ENV__.line} 372 %{query | select: select} 373 |> Query.subquery() 374 |> Queryable.Ecto.SubQuery.to_query() 375 end 376 377 %{query | select: %SelectExpr{expr: {aggregate, [], [ast]}, 378 file: __ENV__.file, line: __ENV__.line}} 379 end 380 381 defp field(ix, field) when is_integer(ix) and is_atom(field) do 382 {{:., [], [{:&, [], [ix]}, field]}, [], []} 383 end 384 385 defp assert_schema!(%{from: {_source, schema}}) when schema != nil, do: schema 386 defp assert_schema!(query) do 387 raise Ecto.QueryError, 388 query: query, 389 message: "expected a from expression with a schema" 390 end 391end 392