1defmodule DBConnection.Stream do 2 defstruct [:conn, :query, :params, :opts] 3 4 @type t :: %__MODULE__{conn: DBConnection.conn, 5 query: any, 6 params: any, 7 opts: Keyword.t} 8end 9defimpl Enumerable, for: DBConnection.Stream do 10 def count(_), do: {:error, __MODULE__} 11 12 def member?(_, _), do: {:error, __MODULE__} 13 14 def slice(_), do: {:error, __MODULE__} 15 16 def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun) 17end 18 19defmodule DBConnection.PrepareStream do 20 defstruct [:conn, :query, :params, :opts] 21 22 @type t :: %__MODULE__{conn: DBConnection.conn, 23 query: any, 24 params: any, 25 opts: Keyword.t} 26end 27defimpl Enumerable, for: DBConnection.PrepareStream do 28 def count(_), do: {:error, __MODULE__} 29 30 def member?(_, _), do: {:error, __MODULE__} 31 32 def slice(_), do: {:error, __MODULE__} 33 34 def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun) 35end 36 37defmodule DBConnection do 38 @moduledoc """ 39 A behaviour module for implementing efficient database connection 40 client processes, pools and transactions. 41 42 `DBConnection` handles callbacks differently to most behaviours. Some 43 callbacks will be called in the calling process, with the state 44 copied to and from the calling process. This is useful when the data 45 for a request is large and means that a calling process can interact 46 with a socket directly. 47 48 A side effect of this is that query handling can be written in a 49 simple blocking fashion, while the connection process itself will 50 remain responsive to OTP messages and can enqueue and cancel queued 51 requests. 52 53 If a request or series of requests takes too long to handle in the 54 client process a timeout will trigger and the socket can be cleanly 55 disconnected by the connection process. 56 57 If a calling process waits too long to start its request it will 58 timeout and its request will be cancelled. This prevents requests 59 building up when the database can not keep up. 60 61 If no requests are received for a period of time the connection will 62 trigger an idle timeout and the database can be pinged to keep the 63 connection alive. 64 65 Should the connection be lost, attempts will be made to reconnect with 66 (configurable) exponential random backoff to reconnect. All state is 67 lost when a connection disconnects but the process is reused. 68 69 The `DBConnection.Query` protocol provide utility functions so that 70 queries can be prepared or encoded and results decoding without 71 blocking the connection or pool. 72 73 By default the `DBConnection` provides a single connection. However 74 the `:pool` option can be set to use a pool of connections. If a 75 pool is used the module must be passed as an option - unless inside a 76 `run/3` or `transaction/3` fun and using the run/transaction 77 connection reference (`t`). 78 """ 79 defstruct [:pool_mod, :pool_ref, :conn_mod, :conn_ref] 80 81 @typedoc """ 82 Run or transaction connection reference. 83 """ 84 @type t :: %__MODULE__{pool_mod: module, 85 pool_ref: any, 86 conn_mod: any, 87 conn_ref: reference} 88 @type conn :: GenServer.server | t 89 @type query :: any 90 @type params :: any 91 @type result :: any 92 @type cursor :: any 93 94 @doc """ 95 Connect to the database. Return `{:ok, state}` on success or 96 `{:error, exception}` on failure. 97 98 If an error is returned it will be logged and another 99 connection attempt will be made after a backoff interval. 100 101 This callback is called in the connection process. 102 """ 103 @callback connect(opts :: Keyword.t) :: 104 {:ok, state :: any} | {:error, Exception.t} 105 106 @doc """ 107 Checkouts the state from the connection process. Return `{:ok, state}` 108 to allow the checkout or `{:disconnect, exception, state}` to disconnect. 109 110 This callback is called when the control of the state is passed to 111 another process. `checkin/1` is called with the new state when control 112 is returned to the connection process. 113 114 Messages are discarded, instead of being passed to `handle_info/2`, 115 when the state is checked out. 116 117 This callback is called in the connection process. 118 """ 119 @callback checkout(state :: any) :: 120 {:ok, new_state :: any} | {:disconnect, Exception.t, new_state :: any} 121 122 @doc """ 123 Checks in the state to the connection process. Return `{:ok, state}` 124 to allow the checkin or `{:disconnect, exception, state}` to disconnect. 125 126 This callback is called when the control of the state is passed back 127 to the connection process. It should reverse any changes made in 128 `checkout/2`. 129 130 This callback is called in the connection process. 131 """ 132 @callback checkin(state :: any) :: 133 {:ok, new_state :: any} | {:disconnect, Exception.t, new_state :: any} 134 135 @doc """ 136 Called when the connection has been idle for a period of time. Return 137 `{:ok, state}` to continue or `{:disconnect, exception, state}` to 138 disconnect. 139 140 This callback is called if no callbacks have been called after the 141 idle timeout and a client process is not using the state. The idle 142 timeout can be configured by the `:idle_timeout` option. This function 143 can be called whether the connection is checked in or checked out. 144 145 This callback is called in the connection process. 146 """ 147 @callback ping(state :: any) :: 148 {:ok, new_state :: any} | {:disconnect, Exception.t, new_state :: any} 149 150 @doc """ 151 Handle the beginning of a transaction. Return `{:ok, result, state}` to 152 continue, `{:error, exception, state}` to abort the transaction and 153 continue or `{:disconnect, exception, state}` to abort the transaction 154 and disconnect. 155 156 This callback is called in the client process. 157 """ 158 @callback handle_begin(opts :: Keyword.t, state :: any) :: 159 {:ok, result, new_state :: any} | 160 {:error | :disconnect, Exception.t, new_state :: any} 161 162 @doc """ 163 Handle committing a transaction. Return `{:ok, result, state}` on success and 164 to continue, `{:error, exception, state}` to abort the transaction and 165 continue or `{:disconnect, exception, state}` to abort the transaction 166 and disconnect. 167 168 This callback is called in the client process. 169 """ 170 @callback handle_commit(opts :: Keyword.t, state :: any) :: 171 {:ok, result, new_state :: any} | 172 {:error | :disconnect, Exception.t, new_state :: any} 173 174 @doc """ 175 Handle rolling back a transaction. Return `{:ok, result, state}` on success 176 and to continue, `{:error, exception, state}` to abort the transaction 177 and continue or `{:disconnect, exception, state}` to abort the 178 transaction and disconnect. 179 180 A transaction will be rolled back if an exception occurs or 181 `rollback/2` is called. 182 183 This callback is called in the client process. 184 """ 185 @callback handle_rollback(opts :: Keyword.t, state :: any) :: 186 {:ok, result, new_state :: any} | 187 {:error | :disconnect, Exception.t, new_state :: any} 188 189 @doc """ 190 Prepare a query with the database. Return `{:ok, query, state}` where 191 `query` is a query to pass to `execute/4` or `close/3`, 192 `{:error, exception, state}` to return an error and continue or 193 `{:disconnect, exception, state}` to return an error and disconnect. 194 195 This callback is intended for cases where the state of a connection is 196 needed to prepare a query and/or the query can be saved in the 197 database to call later. 198 199 This callback is called in the client process. 200 """ 201 @callback handle_prepare(query, opts :: Keyword.t, state :: any) :: 202 {:ok, query, new_state :: any} | 203 {:error | :disconnect, Exception.t, new_state :: any} 204 205 @doc """ 206 Execute a query prepared by `handle_prepare/3`. Return 207 `{:ok, result, state}` to return the result `result` and continue, 208 `{:error, exception, state}` to return an error and continue or 209 `{:disconnect, exception, state}` to return an error and disconnect. 210 211 This callback is called in the client process. 212 """ 213 @callback handle_execute(query, params, opts :: Keyword.t, state :: any) :: 214 {:ok, result, new_state :: any} | 215 {:error | :disconnect, Exception.t, new_state :: any} 216 217 @doc """ 218 Close a query prepared by `handle_prepare/3` with the database. Return 219 `{:ok, result, state}` on success and to continue, 220 `{:error, exception, state}` to return an error and continue, or 221 `{:disconnect, exception, state}` to return an error and disconnect. 222 223 This callback is called in the client process. 224 """ 225 @callback handle_close(query, opts :: Keyword.t, state :: any) :: 226 {:ok, result, new_state :: any} | 227 {:error | :disconnect, Exception.t, new_state :: any} 228 229 @doc """ 230 Declare a cursor using a query prepared by `handle_prepare/3`. Return 231 `{:ok, cursor, state}` to start a cursor for a stream and continue, 232 `{:error, exception, state}` to return an error and continue or 233 `{:disconnect, exception, state}` to return an error and disconnect. 234 235 This callback is called in the client process. 236 """ 237 @callback handle_declare(query, params, opts :: Keyword.t, state :: any) :: 238 {:ok, cursor, new_state :: any} | 239 {:error | :disconnect, Exception.t, new_state :: any} 240 241 @doc """ 242 Fetch the first result from a cursor declared by `handle_declare/4`. Return 243 `{:ok, result, state}` to return the result `result` and continue, 244 `{:deallocate, result, state}` to return the result `result` and deallocate, 245 `{:error, exception, state}` to return an error and close the cursor, 246 `{:disconnect, exception, state}` to return an error and disconnect. 247 248 This callback is called in the client process. 249 """ 250 @callback handle_first(query, cursor, opts :: Keyword.t, state :: any) :: 251 {:ok | :deallocate, result, new_state :: any} | 252 {:error | :disconnect, Exception.t, new_state :: any} 253 254 @doc """ 255 Fetch the next result from a cursor declared by `handle_declare/4`. Return 256 `{:ok, result, state}` to return the result `result` and continue, 257 `{:deallocate, result, state}` to return the result `result` and deallocate, 258 `{:error, exception, state}` to return an error and close the cursor, 259 `{:disconnect, exception, state}` to return an error and disconnect. 260 261 This callback is called in the client process. 262 """ 263 @callback handle_next(query, cursor, opts :: Keyword.t, state :: any) :: 264 {:ok | :deallocate, result, new_state :: any} | 265 {:error | :disconnect, Exception.t, new_state :: any} 266 267 @doc """ 268 Deallocate a cursor declared by `handle_declare/4' with the database. Return 269 `{:ok, result, state}` on success and to continue, 270 `{:error, exception, state}` to return an error and continue, or 271 `{:disconnect, exception, state}` to return an error and disconnect. 272 273 This callback is called in the client process. 274 """ 275 @callback handle_deallocate(query, cursor, opts :: Keyword.t, state :: any) :: 276 {:ok, result, new_state :: any} | 277 {:error | :disconnect, Exception.t, new_state :: any} 278 279 @doc """ 280 Handle a message received by the connection process when checked in. 281 Return `{:ok, state}` to continue or `{:disconnect, exception, 282 state}` to disconnect. 283 284 Messages received by the connection process when checked out will be 285 logged and discared. 286 287 This callback is called in the connection process. 288 """ 289 @callback handle_info(msg :: any, state :: any) :: 290 {:ok, new_state :: any} | 291 {:disconnect, Exception.t, new_state :: any} 292 293 @doc """ 294 Disconnect from the database. Return `:ok`. 295 296 The exception as first argument is the exception from a `:disconnect` 297 3-tuple returned by a previous callback. 298 299 If the state is controlled by a client and it exits or takes too long 300 to process a request the state will be last known state. In these 301 cases the exception will be a `DBConnection.ConnectionError`. 302 303 This callback is called in the connection process. 304 """ 305 @callback disconnect(err :: Exception.t, state :: any) :: :ok 306 307 @doc """ 308 Use `DBConnection` to set the behaviour and include default 309 no-op implementations for `ping/1` and `handle_info/2`. 310 """ 311 defmacro __using__(_) do 312 quote location: :keep do 313 @behaviour DBConnection 314 315 def connect(_) do 316 # We do this to trick dialyzer to not complain about non-local returns. 317 message = "connect/1 not implemented" 318 case :erlang.phash2(1, 1) do 319 0 -> raise message 320 1 -> {:error, RuntimeError.exception(message)} 321 end 322 end 323 324 def disconnect(_, _) do 325 message = "disconnect/2 not implemented" 326 case :erlang.phash2(1, 1) do 327 0 -> raise message 328 1 -> :ok 329 end 330 end 331 332 def checkout(state) do 333 message = "checkout/1 not implemented" 334 case :erlang.phash2(1, 1) do 335 0 -> raise message 336 1 -> {:disconnect, RuntimeError.exception(message), state} 337 end 338 end 339 340 def checkin(state) do 341 message = "checkin/1 not implemented" 342 case :erlang.phash2(1, 1) do 343 0 -> raise message 344 1 -> {:disconnect, RuntimeError.exception(message), state} 345 end 346 end 347 348 def ping(state), do: {:ok, state} 349 350 def handle_begin(_, state) do 351 message = "handle_begin/2 not implemented" 352 case :erlang.phash2(1, 1) do 353 0 -> raise message 354 1 -> {:error, RuntimeError.exception(message), state} 355 end 356 end 357 358 def handle_commit(_, state) do 359 message = "handle_commit/2 not implemented" 360 case :erlang.phash2(1, 1) do 361 0 -> raise message 362 1 -> {:error, RuntimeError.exception(message), state} 363 end 364 end 365 366 def handle_rollback(_, state) do 367 message = "handle_rollback/2 not implemented" 368 case :erlang.phash2(1, 1) do 369 0 -> raise message 370 1 -> {:error, RuntimeError.exception(message), state} 371 end 372 end 373 374 def handle_prepare(_, _, state) do 375 message = "handle_prepare/3 not implemented" 376 case :erlang.phash2(1, 1) do 377 0 -> raise message 378 1 -> {:error, RuntimeError.exception(message), state} 379 end 380 end 381 382 def handle_execute(_, _, _, state) do 383 message = "handle_execute/4 not implemented" 384 case :erlang.phash2(1, 1) do 385 0 -> raise message 386 1 -> {:error, RuntimeError.exception(message), state} 387 end 388 end 389 390 def handle_close(_, _, state) do 391 message = "handle_close/3 not implemented" 392 case :erlang.phash2(1, 1) do 393 0 -> raise message 394 1 -> {:error, RuntimeError.exception(message), state} 395 end 396 end 397 398 def handle_declare(_, _, _, state) do 399 message = "handle_declare/4 not implemented" 400 case :erlang.phash2(1, 1) do 401 0 -> raise message 402 1 -> {:error, RuntimeError.exception(message), state} 403 end 404 end 405 406 def handle_first(_, _, _, state) do 407 message = "handle_first/4 not implemented" 408 case :erlang.phash2(1, 1) do 409 0 -> raise message 410 1 -> {:error, RuntimeError.exception(message), state} 411 end 412 end 413 414 def handle_next(_, _, _, state) do 415 message = "handle_next/4 not implemented" 416 case :erlang.phash2(1, 1) do 417 0 -> raise message 418 1 -> {:error, RuntimeError.exception(message), state} 419 end 420 end 421 422 def handle_deallocate(_, _, _, state) do 423 message = "handle_deallocate/4 not implemented" 424 case :erlang.phash2(1, 1) do 425 0 -> raise message 426 1 -> {:error, RuntimeError.exception(message), state} 427 end 428 end 429 430 def handle_info(_, state), do: {:ok, state} 431 432 defoverridable [connect: 1, disconnect: 2, checkout: 1, checkin: 1, 433 ping: 1, handle_begin: 2, handle_commit: 2, 434 handle_rollback: 2, handle_prepare: 3, handle_execute: 4, 435 handle_close: 3, handle_declare: 4, handle_first: 4, 436 handle_next: 4, handle_deallocate: 4, handle_info: 2] 437 end 438 end 439 440 @doc """ 441 Ensures the given pool applications have been started. 442 443 ### Options 444 445 * `:pool` - The `DBConnection.Pool` module to use, (default: 446 `DBConnection.Connection`) 447 448 """ 449 @spec ensure_all_started(opts :: Keyword.t, type :: atom) :: 450 {:ok, [atom]} | {:error, atom} 451 def ensure_all_started(opts, type \\ :temporary) do 452 Keyword.get(opts, :pool, DBConnection.Connection).ensure_all_started(opts, type) 453 end 454 455 @doc """ 456 Start and link to a database connection process. 457 458 ### Options 459 460 * `:pool` - The `DBConnection.Pool` module to use, (default: 461 `DBConnection.Connection`) 462 * `:idle` - The idle strategy, `:passive` to avoid checkin when idle and 463 `:active` to checkin when idle (default: `:passive`) 464 * `:idle_timeout` - The idle timeout to ping the database (default: 465 `1_000`) 466 * `:backoff_min` - The minimum backoff interval (default: `1_000`) 467 * `:backoff_max` - The maximum backoff interval (default: `30_000`) 468 * `:backoff_type` - The backoff strategy, `:stop` for no backoff and 469 to stop, `:exp` for exponential, `:rand` for random and `:rand_exp` for 470 random exponential (default: `:rand_exp`) 471 * `:configure` - A function to run before every connect attempt to 472 dynamically configure the options, either a 1-arity fun, 473 `{module, function, args} with options prepended to `args` or `nil` where 474 only returned options are passed to connect callback (default: `nil`) 475 * `:after_connect` - A function to run on connect using `run/3`, either 476 a 1-arity fun, `{module, function, args}` with `DBConnection.t` prepended 477 to `args` or `nil` (default: `nil`) 478 * `:name` - A name to register the started process (see the `:name` option 479 in `GenServer.start_link/3`). 480 481 ### Example 482 483 {:ok, conn} = DBConnection.start_link(mod, [idle_timeout: 5_000]) 484 """ 485 @spec start_link(module, opts :: Keyword.t) :: GenServer.on_start 486 def start_link(conn_mod, opts) do 487 pool_mod = Keyword.get(opts, :pool, DBConnection.Connection) 488 apply(pool_mod, :start_link, [conn_mod, opts]) 489 end 490 491 @doc """ 492 Create a supervisor child specification for a pool of connections. 493 494 See `Supervisor.Spec` for child options (`child_opts`). 495 """ 496 @spec child_spec(module, opts :: Keyword.t, child_opts :: Keyword.t) :: 497 Supervisor.Spec.spec 498 def child_spec(conn_mod, opts, child_opts \\ []) do 499 pool_mod = Keyword.get(opts, :pool, DBConnection.Connection) 500 apply(pool_mod, :child_spec, [conn_mod, opts, child_opts]) 501 end 502 503 @doc """ 504 Prepare a query with a database connection for later execution and 505 returns `{:ok, query}` on success or `{:error, exception}` if there was 506 an error. 507 508 The returned `query` can then be passed to `execute/3` and/or `close/3` 509 510 ### Options 511 512 * `:pool_timeout` - The maximum time to wait for a reply when making a 513 synchronous call to the pool (default: `5_000`) 514 * `:queue` - Whether to block waiting in an internal queue for the 515 connection's state (boolean, default: `true`) 516 * `:timeout` - The maximum time that the caller is allowed the 517 to hold the connection's state (ignored when using a run/transaction 518 connection, default: `15_000`) 519 * `:log` - A function to log information about a call, either 520 a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` 521 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 522 523 The pool and connection module may support other options. All options 524 are passed to `handle_prepare/3`. 525 526 ### Example 527 528 query = %Query{statement: "SELECT id FROM table"} 529 {:ok, query} = DBConnection.prepare(conn, query) 530 {:ok, result} = DBConnection.execute(conn, query, []) 531 :ok = DBConnection.close(conn, query) 532 533 """ 534 @spec prepare(conn, query, opts :: Keyword.t) :: 535 {:ok, query} | {:error, Exception.t} 536 def prepare(conn, query, opts \\ []) do 537 query = parse(:prepare, query, nil, opts) 538 case run_prepare(conn, query, opts) do 539 {{:ok, query} = ok, meter} -> 540 log(:prepare, query, nil, meter, ok) 541 {error, meter} -> 542 log(:prepare, query, nil, meter, error) 543 end 544 end 545 546 @doc """ 547 Prepare a query with a database connection and return the prepared 548 query. An exception is raised on error. 549 550 See `prepare/3`. 551 """ 552 @spec prepare!(conn, query, opts :: Keyword.t) :: query 553 def prepare!(conn, query, opts \\ []) do 554 case prepare(conn, query, opts) do 555 {:ok, result} -> result 556 {:error, err} -> raise err 557 end 558 end 559 560 @doc """ 561 Prepare a query and execute it with a database connection and return both the 562 prepared query and the result, `{:ok, query, result}` on success or 563 `{:error, exception}` if there was an error. 564 565 The returned `query` can be passed to `execute/4` and `close/3`. 566 567 ### Options 568 569 * `:pool_timeout` - The maximum time to wait for a reply when making a 570 synchronous call to the pool (default: `5_000`) 571 * `:queue` - Whether to block waiting in an internal queue for the 572 connection's state (boolean, default: `true`) 573 * `:timeout` - The maximum time that the caller is allowed the 574 to hold the connection's state (ignored when using a run/transaction 575 connection, default: `15_000`) 576 * `:log` - A function to log information about a call, either 577 a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` 578 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 579 580 ### Example 581 582 query = %Query{statement: "SELECT id FROM table WHERE id=$1"} 583 {:ok, query, result} = DBConnection.prepare_execute(conn, query, [1]) 584 {:ok, result2} = DBConnection.execute(conn, query, [2]) 585 :ok = DBConnection.close(conn, query) 586 """ 587 @spec prepare_execute(conn, query, params, Keyword.t) :: 588 {:ok, query, result} | 589 {:error, Exception.t} 590 def prepare_execute(conn, query, params, opts \\ []) do 591 query = parse(:prepare_execute, query, params, opts) 592 case run_prepare_execute(conn, query, params, opts) do 593 {{:ok, query, result}, meter} -> 594 decode(:prepare_execute, query, params, meter, result, opts) 595 {error, meter} -> 596 log(:prepare_execute, query, params, meter, error) 597 end 598 end 599 600 @doc """ 601 Prepare a query and execute it with a database connection and return both the 602 prepared query and result. An exception is raised on error. 603 604 See `prepare_execute/4`. 605 """ 606 @spec prepare_execute!(conn, query, Keyword.t) :: {query, result} 607 def prepare_execute!(conn, query, params, opts \\ []) do 608 case prepare_execute(conn, query, params, opts) do 609 {:ok, query, result} -> {query, result} 610 {:error, err} -> raise err 611 end 612 end 613 614 @doc """ 615 Execute a prepared query with a database connection and return 616 `{:ok, result}` on success or `{:error, exception}` if there was an 617 error. 618 619 If the query is not prepared on the connection an attempt may be made to 620 prepare it and then execute again. 621 622 ### Options 623 624 * `:pool_timeout` - The maximum time to wait for a reply when making a 625 synchronous call to the pool (default: `5_000`) 626 * `:queue` - Whether to block waiting in an internal queue for the 627 connection's state (boolean, default: `true`) 628 * `:timeout` - The maximum time that the caller is allowed the 629 to hold the connection's state (ignored when using a run/transaction 630 connection, default: `15_000`) 631 * `:log` - A function to log information about a call, either 632 a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` 633 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 634 635 The pool and connection module may support other options. All options 636 are passed to `handle_execute/4`. 637 638 See `prepare/3`. 639 """ 640 @spec execute(conn, query, params, opts :: Keyword.t) :: 641 {:ok, result} | {:error, Exception.t} 642 def execute(conn, query, params, opts \\ []) do 643 encoded = encode(:execute, query, params, opts) 644 case run_execute(conn, query, encoded, opts) do 645 {{:ok, query, result}, meter} -> 646 decode(:execute, query, params, meter, result, opts) 647 {error, meter} -> 648 log(:execute, query, params, meter, error) 649 end 650 end 651 652 @doc """ 653 Execute a prepared query with a database connection and return the 654 result. Raises an exception on error. 655 656 See `execute/4` 657 """ 658 @spec execute!(conn, query, params, opts :: Keyword.t) :: result 659 def execute!(conn, query, params, opts \\ []) do 660 case execute(conn, query, params, opts) do 661 {:ok, result} -> result 662 {:error, err} -> raise err 663 end 664 end 665 666 @doc """ 667 Close a prepared query on a database connection and return `{:ok, result}` on 668 success or `{:error, exception}` on error. 669 670 This function should be used to free resources held by the connection 671 process and/or the database server. 672 673 ## Options 674 675 * `:pool_timeout` - The maximum time to wait for a reply when making a 676 synchronous call to the pool (default: `5_000`) 677 * `:queue` - Whether to block waiting in an internal queue for the 678 connection's state (boolean, default: `true`) 679 * `:timeout` - The maximum time that the caller is allowed the 680 to hold the connection's state (ignored when using a run/transaction 681 connection, default: `15_000`) 682 * `:log` - A function to log information about a call, either 683 a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` 684 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 685 686 The pool and connection module may support other options. All options 687 are passed to `handle_close/3`. 688 689 See `prepare/3`. 690 """ 691 @spec close(conn, query, opts :: Keyword.t) :: 692 {:ok, result} | {:error, Exception.t} 693 def close(conn, query, opts \\ []) do 694 {result, meter} = run_close(conn, query, opts) 695 log(:close, query, nil, meter, result) 696 end 697 698 @doc """ 699 Close a prepared query on a database connection and return the result. Raises 700 an exception on error. 701 702 See `close/3`. 703 """ 704 @spec close!(conn, query, opts :: Keyword.t) :: result 705 def close!(conn, query, opts \\ []) do 706 case close(conn, query, opts) do 707 {:ok, result} -> result 708 {:error, err} -> raise err 709 end 710 end 711 712 @doc """ 713 Acquire a lock on a connection and run a series of requests on it. 714 715 The return value of this function is the return value of `fun`. 716 717 To use the locked connection call the request with the connection 718 reference passed as the single argument to the `fun`. If the 719 connection disconnects all future calls using that connection 720 reference will fail. 721 722 `run/3` and `transaction/3` can be nested multiple times but a 723 `transaction/3` call inside another `transaction/3` will be treated 724 the same as `run/3`. 725 726 ### Options 727 728 * `:pool_timeout` - The maximum time to wait for a reply when making a 729 synchronous call to the pool (default: `5_000`) 730 * `:queue` - Whether to block waiting in an internal queue for the 731 connection's state (boolean, default: `true`) 732 * `:timeout` - The maximum time that the caller is allowed the 733 to hold the connection's state (default: `15_000`) 734 735 The pool may support other options. 736 737 ### Example 738 739 {:ok, res} = DBConnection.run(conn, fn(conn) -> 740 DBConnection.execute!(conn, "SELECT id FROM table", []) 741 end) 742 """ 743 @spec run(conn, (t -> result), opts :: Keyword.t) :: result when result: var 744 def run(conn, fun, opts \\ []) 745 def run(%DBConnection{} = conn, fun, _) do 746 _ = fetch_info(conn) 747 fun.(conn) 748 end 749 def run(pool, fun, opts) do 750 {conn, conn_state} = checkout(pool, opts) 751 put_info(conn, :idle, conn_state) 752 run_begin(conn, fun, opts) 753 end 754 755 @doc """ 756 Acquire a lock on a connection and run a series of requests inside a 757 transaction. The result of the transaction fun is return inside an `:ok` 758 tuple: `{:ok, result}`. 759 760 To use the locked connection call the request with the connection 761 reference passed as the single argument to the `fun`. If the 762 connection disconnects all future calls using that connection 763 reference will fail. 764 765 `run/3` and `transaction/3` can be nested multiple times. If a transaction is 766 rolled back or a nested transaction `fun` raises the transaction is marked as 767 failed. Any calls inside a failed transaction (except `rollback/2`) will raise 768 until the outer transaction call returns. All running `transaction/3` calls 769 will return `{:error, :rollback}` if the transaction failed or connection 770 closed and `rollback/2` is not called for that `transaction/3`. 771 772 ### Options 773 774 * `:pool_timeout` - The maximum time to wait for a reply when making a 775 synchronous call to the pool (default: `5_000`) 776 * `:queue` - Whether to block waiting in an internal queue for the 777 connection's state (boolean, default: `true`) 778 * `:timeout` - The maximum time that the caller is allowed the 779 to hold the connection's state (default: `15_000`) 780 * `:log` - A function to log information about begin, commit and rollback 781 calls made as part of the transaction, either a 1-arity fun, 782 `{module, function, args}` with `DBConnection.LogEntry.t` prepended to 783 `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 784 785 The pool and connection module may support other options. All options 786 are passed to `handle_begin/2`, `handle_commit/2` and 787 `handle_rollback/2`. 788 789 ### Example 790 791 {:ok, res} = DBConnection.transaction(conn, fn(conn) -> 792 DBConnection.execute!(conn, "SELECT id FROM table", []) 793 end) 794 """ 795 @spec transaction(conn, (conn -> result), opts :: Keyword.t) :: 796 {:ok, result} | {:error, reason :: any} when result: var 797 def transaction(conn, fun, opts \\ []) do 798 {result, log_info} = transaction_meter(conn, fun, opts) 799 transaction_log(log_info) 800 case result do 801 {:raise, err} -> 802 raise err 803 {kind, reason, stack} -> 804 :erlang.raise(kind, reason, stack) 805 other -> 806 other 807 end 808 end 809 810 @doc """ 811 Rollback a transaction, does not return. 812 813 Aborts the current transaction fun. If inside `transaction/3` bubbles 814 up to the top level. 815 816 ### Example 817 818 {:error, :bar} = DBConnection.transaction(conn, fn(conn) -> 819 DBConnection.rollback(conn, :bar) 820 IO.puts "never reaches here!" 821 end) 822 """ 823 @spec rollback(t, reason :: any) :: no_return 824 def rollback(%DBConnection{conn_ref: conn_ref} = conn, err) do 825 case get_info(conn) do 826 {transaction, _} when transaction in [:transaction, :failed] -> 827 throw({:rollback, conn_ref, err}) 828 {transaction, _, _} when transaction in [:transaction, :failed] -> 829 throw({:rollback, conn_ref, err}) 830 {:idle, _} -> 831 raise "not inside transaction" 832 {:idle, _, _} -> 833 raise "not inside transaction" 834 :closed -> 835 raise DBConnection.ConnectionError, "connection is closed" 836 end 837 end 838 839 @doc """ 840 Create a stream that will prepare a query, execute it and stream results 841 using a cursor. 842 843 ### Options 844 845 * `:pool_timeout` - The maximum time to wait for a reply when making a 846 synchronous call to the pool (default: `5_000`) 847 * `:queue` - Whether to block waiting in an internal queue for the 848 connection's state (boolean, default: `true`) 849 * `:timeout` - The maximum time that the caller is allowed the 850 to hold the connection's state (ignored when using a run/transaction 851 connection, default: `15_000`) 852 * `:log` - A function to log information about a call, either 853 a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` 854 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 855 856 The pool and connection module may support other options. All options 857 are passed to `handle_prepare/3, `handle_close/3, `handle_declare/4`, 858 `handle_first/4`, `handle_next/4' and `handle_deallocate/4`. 859 860 ### Example 861 862 {:ok, results} = DBConnection.transaction(conn, fn(conn) -> 863 query = %Query{statement: "SELECT id FROM table"} 864 stream = DBConnection.prepare_stream(conn, query, []) 865 Enum.to_list(stream) 866 end) 867 """ 868 @spec prepare_stream(t, query, params, opts :: Keyword.t) :: 869 DBConnection.PrepareStream.t 870 def prepare_stream(%DBConnection{} = conn, query, params, opts) do 871 %DBConnection.PrepareStream{conn: conn, query: query, params: params, 872 opts: opts} 873 end 874 875 @doc """ 876 Create a stream that will execute a prepared query and stream results using a 877 cursor. 878 879 ### Options 880 881 * `:pool_timeout` - The maximum time to wait for a reply when making a 882 synchronous call to the pool (default: `5_000`) 883 * `:queue` - Whether to block waiting in an internal queue for the 884 connection's state (boolean, default: `true`) 885 * `:timeout` - The maximum time that the caller is allowed the 886 to hold the connection's state (ignored when using a run/transaction 887 connection, default: `15_000`) 888 * `:log` - A function to log information about a call, either 889 a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t` 890 prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`) 891 892 The pool and connection module may support other options. All options 893 are passed to `handle_declare/4`, `handle_first/4` , `handle_next/4 and 894 `handle_deallocate/4`. 895 896 ### Example 897 898 {:ok, results} = DBConnection.transaction(conn, fn(conn) -> 899 query = %Query{statement: "SELECT id FROM table"} 900 query = DBConnection.prepare!(conn, query) 901 stream = DBConnection.stream(conn, query, []) 902 Enum.to_list(stream) 903 end) 904 """ 905 @spec stream(t, query, params, opts :: Keyword.t) :: DBConnection.Stream.t 906 def stream(%DBConnection{} = conn, query, params, opts \\ []) do 907 %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} 908 end 909 910 @doc false 911 def reduce(%DBConnection.PrepareStream{} = stream, acc, fun) do 912 %DBConnection.PrepareStream{conn: conn, query: query, params: params, 913 opts: opts} = stream 914 start = &prepare_declare(&1, query, params, &2) 915 resource(conn, start, &fetch/3, &deallocate/3, opts).(acc, fun) 916 end 917 def reduce(%DBConnection.Stream{} = stream, acc, fun) do 918 %DBConnection.Stream{conn: conn, query: query, params: params, 919 opts: opts} = stream 920 start = &declare(&1, query, params, &2) 921 resource(conn, start, &fetch/3, &deallocate/3, opts).(acc, fun) 922 end 923 924 ## Helpers 925 926 defp checkout(pool, opts) do 927 pool_mod = Keyword.get(opts, :pool, DBConnection.Connection) 928 case apply(pool_mod, :checkout, [pool, opts]) do 929 {:ok, pool_ref, conn_mod, conn_state} -> 930 conn = %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref, 931 conn_mod: conn_mod, conn_ref: make_ref()} 932 {conn, conn_state} 933 {:error, err} -> 934 raise err 935 end 936 end 937 938 defp checkin(conn, conn_state, opts) do 939 %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn 940 _ = apply(pool_mod, :checkin, [pool_ref, conn_state, opts]) 941 :ok 942 end 943 944 defp delete_disconnect(conn, conn_state, err, opts) do 945 _ = delete_info(conn) 946 %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn 947 args = [pool_ref, err, conn_state, opts] 948 _ = apply(pool_mod, :disconnect, args) 949 :ok 950 end 951 952 defp delete_stop(conn, conn_state, kind, reason, stack, opts) do 953 _ = delete_info(conn) 954 msg = "client #{inspect self()} stopped: " <> 955 Exception.format(kind, reason, stack) 956 exception = DBConnection.ConnectionError.exception(msg) 957 %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn 958 args = [pool_ref, exception, conn_state, opts] 959 _ = apply(pool_mod, :stop, args) 960 :ok 961 end 962 963 defp handle(%DBConnection{conn_mod: conn_mod} = conn, fun, args, opts) do 964 {status, conn_state} = fetch_info(conn) 965 try do 966 apply(conn_mod, fun, args ++ [opts, conn_state]) 967 else 968 {:ok, result, conn_state} -> 969 put_info(conn, status, conn_state) 970 {:ok, result} 971 {:deallocate, _, conn_state} = deallocate 972 when fun in [:handle_first, :handle_next] -> 973 put_info(conn, status, conn_state) 974 Tuple.delete_at(deallocate, 2) 975 {:error, _, conn_state} = error -> 976 put_info(conn, status, conn_state) 977 Tuple.delete_at(error, 2) 978 {:disconnect, err, conn_state} -> 979 delete_disconnect(conn, conn_state, err, opts) 980 {:error, err} 981 other -> 982 try do 983 raise DBConnection.ConnectionError, "bad return value: #{inspect other}" 984 catch 985 :error, reason -> 986 stack = System.stacktrace() 987 delete_stop(conn, conn_state, :error, reason, stack, opts) 988 {:error, reason, stack} 989 end 990 catch 991 kind, reason -> 992 stack = System.stacktrace() 993 delete_stop(conn, conn_state, kind, reason, stack, opts) 994 {kind, reason, stack} 995 end 996 end 997 998 defp parse(call, query, params, opts) do 999 try do 1000 DBConnection.Query.parse(query, opts) 1001 catch 1002 kind, reason -> 1003 pre_log(call, query, params, opts, kind, reason, System.stacktrace()) 1004 end 1005 end 1006 1007 defp encode(call, query, params, opts) do 1008 try do 1009 DBConnection.Query.encode(query, params, opts) 1010 catch 1011 kind, reason -> 1012 stack = System.stacktrace() 1013 pre_log(call, query, params, opts, kind, reason, stack) 1014 end 1015 end 1016 1017 defp decode(call, query, params, meter, result, opts) do 1018 try do 1019 DBConnection.Query.decode(query, result, opts) 1020 catch 1021 kind, reason -> 1022 raised = {kind, reason, System.stacktrace()} 1023 decode_log(call, query, params, meter, raised) 1024 else 1025 result when call == :prepare_execute -> 1026 ok = {:ok, query, result} 1027 decode_log(call, query, params, meter, ok) 1028 result when call in [:execute, :first, :next] -> 1029 ok = {:ok, result} 1030 decode_log(call, query, params, meter, ok) 1031 end 1032 end 1033 1034 defp pre_log(call, query, params, opts, kind, reason, stack) do 1035 case Keyword.get(opts, :log) do 1036 nil -> :erlang.raise(kind, reason, stack) 1037 log -> log(call, query, params, {log, []}, {kind, reason, stack}) 1038 end 1039 end 1040 1041 defp run_prepare(conn, query, opts) do 1042 run_meter(conn, fn(conn2) -> 1043 case handle(conn2, :handle_prepare, [query], opts) do 1044 {:ok, query} -> 1045 describe(conn2, query, opts) 1046 other -> 1047 other 1048 end 1049 end, opts) 1050 end 1051 1052 defp describe(conn, query, opts) do 1053 try do 1054 DBConnection.Query.describe(query, opts) 1055 catch 1056 kind, reason -> 1057 raised = {kind, reason, System.stacktrace()} 1058 raised_close(conn, query, opts, raised) 1059 else 1060 query -> 1061 {:ok, query} 1062 end 1063 end 1064 1065 defp run_prepare_execute(conn, query, params, opts) do 1066 run_meter(conn, fn(conn2) -> 1067 case handle(conn2, :handle_prepare, [query], opts) do 1068 {:ok, query} -> 1069 describe_run(conn2, :handle_execute, query, params, opts) 1070 other -> 1071 other 1072 end 1073 end, opts) 1074 end 1075 1076 defp describe_run(conn, fun, query, params, opts) do 1077 try do 1078 query = DBConnection.Query.describe(query, opts) 1079 [query, DBConnection.Query.encode(query, params, opts)] 1080 catch 1081 kind, reason -> 1082 raised = {kind, reason, System.stacktrace()} 1083 raised_close(conn, query, opts, raised) 1084 else 1085 [query, _params] = args -> 1086 case handle(conn, fun, args, opts) do 1087 {:ok, result} -> 1088 {:ok, query, result} 1089 other -> 1090 other 1091 end 1092 end 1093 end 1094 1095 defp raised_close(conn, query, opts, raised) do 1096 case handle(conn, :handle_close, [query], opts) do 1097 {:ok, _} -> 1098 raised 1099 {:error, _} -> 1100 raised 1101 {_kind, _reason, _stack} = raised -> 1102 raised 1103 end 1104 end 1105 1106 defp run_execute(conn, query, params, opts) do 1107 run_meter(conn, fn(conn2) -> 1108 case handle(conn2, :handle_execute, [query, params], opts) do 1109 {:ok, result} -> 1110 {:ok, query, result} 1111 other -> 1112 other 1113 end 1114 end, opts) 1115 end 1116 1117 defp run_close(conn, query, opts) do 1118 fun = &handle(&1, :handle_close, [query], opts) 1119 run_meter(conn, fun, opts) 1120 end 1121 1122 defmacrop time() do 1123 if function_exported?(:erlang, :monotonic_time, 0) do 1124 quote do: :erlang.monotonic_time() 1125 else 1126 quote do: :os.timestamp() 1127 end 1128 end 1129 1130 defp run_meter(%DBConnection{} = conn, fun, opts) do 1131 case Keyword.get(opts, :log) do 1132 nil -> 1133 {run(conn, fun, opts), nil} 1134 log -> 1135 run_meter(conn, log, [], fun, opts) 1136 end 1137 end 1138 defp run_meter(pool, fun, opts) do 1139 case Keyword.get(opts, :log) do 1140 nil -> 1141 {run(pool, fun, opts), nil} 1142 log -> 1143 run_meter(pool, log, [checkout: time()], fun, opts) 1144 end 1145 end 1146 1147 defp run_meter(conn, log, times, fun, opts) do 1148 fun = fn(conn2) -> 1149 start = time() 1150 result = fun.(conn2) 1151 stop = time() 1152 meter = {log, [stop: stop, start: start] ++ times} 1153 {result, meter} 1154 end 1155 run(conn, fun, opts) 1156 end 1157 1158 defp decode_log(_, _, _, nil, result), do: log_result(result) 1159 defp decode_log(call, query, params, {log, times}, result) do 1160 log(call, query, params, log, [decode: time()] ++ times, result) 1161 end 1162 1163 defp transaction_log(nil), do: :ok 1164 defp transaction_log({log, times, callback, result}) do 1165 call = transaction_call(callback) 1166 result = transaction_result(result) 1167 _ = log(:transaction, call, nil, log, times, result) 1168 :ok 1169 end 1170 1171 defp transaction_call(:handle_begin), do: :begin 1172 defp transaction_call(:handle_commit), do: :commit 1173 defp transaction_call(:handle_rollback), do: :rollback 1174 1175 defp transaction_result({:ok, _} = ok), do: ok 1176 defp transaction_result({:raise, err}), do: {:error, err} 1177 defp transaction_result({_kind, _reason, _stack} = raised), do: raised 1178 1179 defp log(_, _, _, nil, result), do: log_result(result) 1180 defp log(call, query, params, {log, times}, result) do 1181 log(call, query, params, log, times, result) 1182 end 1183 1184 defp log(call, query, params, log, times, result) do 1185 entry = DBConnection.LogEntry.new(call, query, params, times, entry_result(result)) 1186 log(log, entry) 1187 log_result(result) 1188 end 1189 1190 defp entry_result({kind, reason, stack}) 1191 when kind in [:error, :exit, :throw] do 1192 msg = "an exception was raised: " <> Exception.format(kind, reason, stack) 1193 {:error, %DBConnection.ConnectionError{message: msg}} 1194 end 1195 defp entry_result(other), do: other 1196 1197 defp log({mod, fun, args}, entry), do: apply(mod, fun, [entry | args]) 1198 defp log(fun, entry), do: fun.(entry) 1199 1200 defp log_result({kind, reason, stack}) when kind in [:error, :exit, :throw] do 1201 :erlang.raise(kind, reason, stack) 1202 end 1203 defp log_result(other), do: other 1204 1205 defp run_begin(conn, fun, opts) do 1206 try do 1207 fun.(conn) 1208 after 1209 run_end(conn, opts) 1210 end 1211 end 1212 1213 defp run_end(conn, opts) do 1214 case delete_info(conn) do 1215 {:idle, conn_state} -> 1216 checkin(conn, conn_state, opts) 1217 {status, conn_state} when status in [:transaction, :failed] -> 1218 try do 1219 raise "connection run ended in transaction" 1220 catch 1221 :error, reason -> 1222 stack = System.stacktrace() 1223 delete_stop(conn, conn_state, :error, reason, stack, opts) 1224 :erlang.raise(:error, reason, stack) 1225 end 1226 :closed -> 1227 :ok 1228 end 1229 end 1230 1231 defp transaction_meter(%DBConnection{} = conn, fun, opts) do 1232 case fetch_info(conn) do 1233 {:transaction, _} -> 1234 {transaction_nested(conn, fun), nil} 1235 {:idle, conn_state} -> 1236 log = Keyword.get(opts, :log) 1237 begin_meter(conn, conn_state, log, [], fun, opts) 1238 end 1239 end 1240 defp transaction_meter(pool, fun, opts) do 1241 case Keyword.get(opts, :log) do 1242 nil -> 1243 run(pool, &begin(&1, nil, [], fun, opts), opts) 1244 log -> 1245 times = [checkout: time()] 1246 run(pool, &begin(&1, log, times, fun, opts), opts) 1247 end 1248 end 1249 1250 defp begin(conn, log, times, fun, opts) do 1251 {:idle, conn_state} = get_info(conn) 1252 begin_meter(conn, conn_state, log, times, fun, opts) 1253 end 1254 1255 defp begin_meter(conn, conn_state, nil, [], fun, opts) do 1256 case handle(conn, conn_state, :handle_begin, opts, :transaction) do 1257 {:ok, _} -> 1258 transaction_run(conn, nil, fun, opts) 1259 error -> 1260 {error, nil} 1261 end 1262 end 1263 defp begin_meter(conn, conn_state, log, times, fun, opts) do 1264 start = time() 1265 result = handle(conn, conn_state, :handle_begin, opts, :transaction) 1266 stop = time() 1267 log_info = {log, [stop: stop, start: start] ++ times, :handle_begin, result} 1268 case result do 1269 {:ok, _} -> 1270 fun = fn(conn2) -> 1271 transaction_log(log_info) 1272 fun.(conn2) 1273 end 1274 transaction_run(conn, log, fun, opts) 1275 error -> 1276 {error, log_info} 1277 end 1278 end 1279 1280 defp transaction_run(conn, log, fun, opts) do 1281 %DBConnection{conn_ref: conn_ref} = conn 1282 try do 1283 fun.(conn) 1284 else 1285 result -> 1286 result = {:ok, result} 1287 commit(conn, log, opts, result) 1288 catch 1289 :throw, {:rollback, ^conn_ref, reason} -> 1290 result = {:error, reason} 1291 rollback(conn, log, opts, result) 1292 kind, reason -> 1293 result = {kind, reason, System.stacktrace()} 1294 rollback(conn, log, opts, result) 1295 end 1296 end 1297 1298 defp commit(conn, log, opts, result) do 1299 case get_info(conn) do 1300 {:transaction, conn_state} -> 1301 conclude_meter(conn, conn_state, log, :handle_commit, opts, result) 1302 {:failed, conn_state} -> 1303 result = {:error, :rollback} 1304 conclude_meter(conn, conn_state, log, :handle_rollback, opts, result) 1305 :closed -> 1306 {{:error, :rollback}, nil} 1307 end 1308 end 1309 1310 defp rollback(conn, log, opts, result) do 1311 case get_info(conn) do 1312 {trans, conn_state} when trans in [:transaction, :failed] -> 1313 conclude_meter(conn, conn_state, log, :handle_rollback, opts, result) 1314 :closed -> 1315 {result, nil} 1316 end 1317 end 1318 1319 defp conclude_meter(conn, conn_state, nil, callback, opts, result) do 1320 case handle(conn, conn_state, callback, opts, :idle) do 1321 {:ok, _} -> 1322 {result, nil} 1323 error -> 1324 {error, nil} 1325 end 1326 end 1327 defp conclude_meter(conn, conn_state, log, callback, opts, result) do 1328 start = time() 1329 cb_result = handle(conn, conn_state, callback, opts, :idle) 1330 stop = time() 1331 times = [stop: stop, start: start] 1332 case cb_result do 1333 {:ok, _} -> 1334 {result, {log, times, callback, cb_result}} 1335 _error -> 1336 {cb_result, {log, times, callback, cb_result}} 1337 end 1338 end 1339 1340 defp handle(conn, conn_state, callback, opts, status) do 1341 %DBConnection{conn_mod: conn_mod} = conn 1342 try do 1343 apply(conn_mod, callback, [opts, conn_state]) 1344 else 1345 {:ok, result, conn_state} -> 1346 put_info(conn, status, conn_state) 1347 {:ok, result} 1348 {:error, err, conn_state} -> 1349 put_info(conn, :idle, conn_state) 1350 {:raise, err} 1351 {:disconnect, err, conn_state} -> 1352 delete_disconnect(conn, conn_state, err, opts) 1353 {:raise, err} 1354 other -> 1355 try do 1356 raise DBConnection.ConnectionError, "bad return value: #{inspect other}" 1357 catch 1358 :error, reason -> 1359 stack = System.stacktrace() 1360 delete_stop(conn, conn_state, :error, reason, stack, opts) 1361 {:error, reason, stack} 1362 end 1363 catch 1364 kind, reason -> 1365 stack = System.stacktrace() 1366 delete_stop(conn, conn_state, kind, reason, stack, opts) 1367 {kind, reason, stack} 1368 end 1369 end 1370 1371 defp transaction_nested(conn, fun) do 1372 %DBConnection{conn_ref: conn_ref} = conn 1373 try do 1374 fun.(conn) 1375 else 1376 result -> 1377 transaction_ok(conn, {:ok, result}) 1378 catch 1379 :throw, {:rollback, ^conn_ref, reason} -> 1380 transaction_failed(conn) 1381 {:error, reason} 1382 kind, reason -> 1383 stack = System.stacktrace() 1384 transaction_failed(conn) 1385 :erlang.raise(kind, reason, stack) 1386 end 1387 end 1388 1389 defp transaction_ok(conn, result) do 1390 case get_info(conn) do 1391 {:failed, _} -> 1392 {:error, :rollback} 1393 _ -> 1394 result 1395 end 1396 end 1397 1398 defp transaction_failed(conn) do 1399 case get_info(conn) do 1400 {:transaction, conn_state} -> 1401 put_info(conn, :failed, conn_state) 1402 _ -> 1403 :ok 1404 end 1405 end 1406 1407 defp prepare_declare(conn, query, params, opts) do 1408 query = parse(:prepare_declare, query, params, opts) 1409 case run_prepare_declare(conn, query, params, opts) do 1410 {{:ok, query, cursor}, meter} -> 1411 prepare_declare_log(conn, query, params, meter, cursor, opts) 1412 {error, meter} -> 1413 {:error, err} = log(:prepare_declare, query, params, meter, error) 1414 raise err 1415 end 1416 end 1417 1418 defp run_prepare_declare(conn, query, params, opts) do 1419 run_meter(conn, fn(conn2) -> 1420 case handle(conn2, :handle_prepare, [query], opts) do 1421 {:ok, query} -> 1422 describe_run(conn2, :handle_declare, query, params, opts) 1423 other -> 1424 other 1425 end 1426 end, opts) 1427 end 1428 1429 defp prepare_declare_log(conn, query, params, meter, cursor, opts) do 1430 try do 1431 log(:prepare_declare, query, params, meter, {:ok, query, cursor}) 1432 catch 1433 kind, reason -> 1434 stack = System.stacktrace() 1435 deallocate(conn, query, cursor, opts) 1436 :erlang.raise(kind, reason, stack) 1437 else 1438 {:ok, query, cursor} -> 1439 {:first, query, cursor} 1440 end 1441 end 1442 1443 defp declare(conn, query, params, opts) do 1444 encoded = encode(:declare, query, params, opts) 1445 case run_declare(conn, query, encoded, opts) do 1446 {{:ok, cursor}, meter} -> 1447 declare_log(conn, query, params, meter, cursor, opts) 1448 {error, meter} -> 1449 {:error, err} = log(:declare, query, params, meter, error) 1450 raise err 1451 end 1452 end 1453 1454 defp run_declare(conn, query, params, opts) do 1455 run_meter(conn, &handle(&1, :handle_declare, [query, params], opts), opts) 1456 end 1457 1458 defp declare_log(conn, query, params, meter, cursor, opts) do 1459 try do 1460 log(:declare, query, params, meter, {:ok, cursor}) 1461 catch 1462 kind, reason -> 1463 stack = System.stacktrace() 1464 deallocate(conn, query, cursor, opts) 1465 :erlang.raise(kind, reason, stack) 1466 else 1467 {:ok, cursor} -> 1468 {:first, query, cursor} 1469 end 1470 end 1471 1472 defp fetch(conn, {:first, query, cursor}, opts) do 1473 fetch(conn, :handle_first, :first, query, cursor, opts) 1474 end 1475 defp fetch(conn, {:next, query, cursor}, opts) do 1476 fetch(conn, :handle_next, :next, query, cursor, opts) 1477 end 1478 defp fetch(_, {:deallocate, _, _} = state, _) do 1479 {:halt, state} 1480 end 1481 1482 def fetch(conn, fun, call, query, cursor, opts) do 1483 fetch = &handle(&1, fun, [query, cursor], opts) 1484 case run_meter(conn, fetch, opts) do 1485 {{:ok, result}, meter} -> 1486 fetch_decode(:next, call, query, cursor, meter, result, opts) 1487 {{:deallocate, result}, meter} -> 1488 fetch_decode(:deallocate, call, query, cursor, meter, result, opts) 1489 {error, meter} -> 1490 {:error, err} = log(call, query, cursor, meter, error) 1491 raise err 1492 end 1493 end 1494 1495 defp fetch_decode(status, call, query, cursor, meter, result, opts) do 1496 {:ok, decoded} = decode(call, query, cursor, meter, result, opts) 1497 {[decoded], {status, query, cursor}} 1498 end 1499 1500 defp deallocate(conn, {_, query, cursor}, opts) do 1501 case get_info(conn) do 1502 :closed -> :ok 1503 _ -> deallocate(conn, query, cursor, opts) 1504 end 1505 end 1506 1507 defp deallocate(conn, query, cursor, opts) do 1508 close = &handle(&1, :handle_deallocate, [query, cursor], opts) 1509 {result, meter} = run_meter(conn, close, opts) 1510 case log(:deallocate, query, cursor, meter, result) do 1511 {:ok, _} -> :ok 1512 {:error, err} -> raise err 1513 end 1514 end 1515 1516 defp resource(%DBConnection{} = conn, start, next, stop, opts) do 1517 start = fn() -> start.(conn, opts) end 1518 next = fn(state) -> next.(conn, state, opts) end 1519 stop = fn(state) -> stop.(conn, state, opts) end 1520 Stream.resource(start, next, stop) 1521 end 1522 1523 defp put_info(conn, status, conn_state) do 1524 _ = Process.put(key(conn), {status, conn_state}) 1525 :ok 1526 end 1527 1528 defp fetch_info(conn) do 1529 case get_info(conn) do 1530 {:failed, _} -> 1531 raise DBConnection.ConnectionError, "transaction rolling back" 1532 {_, _} = info -> 1533 info 1534 :closed -> 1535 raise DBConnection.ConnectionError, "connection is closed" 1536 end 1537 end 1538 1539 defp get_info(conn), do: Process.get(key(conn), :closed) 1540 1541 defp delete_info(conn) do 1542 Process.delete(key(conn)) || :closed 1543 end 1544 1545 defp key(%DBConnection{conn_ref: conn_ref}), do: {__MODULE__, conn_ref} 1546end 1547