1defmodule DBConnection.Stream do
2  defstruct [:conn, :query, :params, :opts]
3
4  @type t :: %__MODULE__{conn: DBConnection.conn,
5                         query: any,
6                         params: any,
7                         opts: Keyword.t}
8end
9defimpl Enumerable, for: DBConnection.Stream do
10  def count(_), do: {:error, __MODULE__}
11
12  def member?(_, _), do: {:error, __MODULE__}
13
14  def slice(_), do: {:error, __MODULE__}
15
16  def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun)
17end
18
19defmodule DBConnection.PrepareStream do
20  defstruct [:conn, :query, :params, :opts]
21
22  @type t :: %__MODULE__{conn: DBConnection.conn,
23                         query: any,
24                         params: any,
25                         opts: Keyword.t}
26end
27defimpl Enumerable, for: DBConnection.PrepareStream do
28  def count(_), do: {:error, __MODULE__}
29
30  def member?(_, _), do: {:error, __MODULE__}
31
32  def slice(_), do: {:error, __MODULE__}
33
34  def reduce(stream, acc, fun), do: DBConnection.reduce(stream, acc, fun)
35end
36
37defmodule DBConnection do
38  @moduledoc """
39  A behaviour module for implementing efficient database connection
40  client processes, pools and transactions.
41
42  `DBConnection` handles callbacks differently to most behaviours. Some
43  callbacks will be called in the calling process, with the state
44  copied to and from the calling process. This is useful when the data
45  for a request is large and means that a calling process can interact
46  with a socket directly.
47
48  A side effect of this is that query handling can be written in a
49  simple blocking fashion, while the connection process itself will
50  remain responsive to OTP messages and can enqueue and cancel queued
51  requests.
52
53  If a request or series of requests takes too long to handle in the
54  client process a timeout will trigger and the socket can be cleanly
55  disconnected by the connection process.
56
57  If a calling process waits too long to start its request it will
58  timeout and its request will be cancelled. This prevents requests
59  building up when the database can not keep up.
60
61  If no requests are received for a period of time the connection will
62  trigger an idle timeout and the database can be pinged to keep the
63  connection alive.
64
65  Should the connection be lost, attempts will be made to reconnect with
66  (configurable) exponential random backoff to reconnect. All state is
67  lost when a connection disconnects but the process is reused.
68
69  The `DBConnection.Query` protocol provide utility functions so that
70  queries can be prepared or encoded and results decoding without
71  blocking the connection or pool.
72
73  By default the `DBConnection` provides a single connection. However
74  the `:pool` option can be set to use a pool of connections. If a
75  pool is used the module must be passed as an option - unless inside a
76  `run/3` or `transaction/3` fun and using the run/transaction
77  connection reference (`t`).
78  """
79  defstruct [:pool_mod, :pool_ref, :conn_mod, :conn_ref]
80
81  @typedoc """
82  Run or transaction connection reference.
83  """
84  @type t :: %__MODULE__{pool_mod: module,
85                         pool_ref: any,
86                         conn_mod: any,
87                         conn_ref: reference}
88  @type conn :: GenServer.server | t
89  @type query :: any
90  @type params :: any
91  @type result :: any
92  @type cursor :: any
93
94  @doc """
95  Connect to the database. Return `{:ok, state}` on success or
96  `{:error, exception}` on failure.
97
98  If an error is returned it will be logged and another
99  connection attempt will be made after a backoff interval.
100
101  This callback is called in the connection process.
102  """
103  @callback connect(opts :: Keyword.t) ::
104    {:ok, state :: any} | {:error, Exception.t}
105
106  @doc """
107  Checkouts the state from the connection process. Return `{:ok, state}`
108  to allow the checkout or `{:disconnect, exception, state}` to disconnect.
109
110  This callback is called when the control of the state is passed to
111  another process. `checkin/1` is called with the new state when control
112  is returned to the connection process.
113
114  Messages are discarded, instead of being passed to `handle_info/2`,
115  when the state is checked out.
116
117  This callback is called in the connection process.
118  """
119  @callback checkout(state :: any) ::
120    {:ok, new_state :: any} | {:disconnect, Exception.t, new_state :: any}
121
122  @doc """
123  Checks in the state to the connection process. Return `{:ok, state}`
124  to allow the checkin or `{:disconnect, exception, state}` to disconnect.
125
126  This callback is called when the control of the state is passed back
127  to the connection process. It should reverse any changes made in
128  `checkout/2`.
129
130  This callback is called in the connection process.
131  """
132  @callback checkin(state :: any) ::
133    {:ok, new_state :: any} | {:disconnect, Exception.t, new_state :: any}
134
135  @doc """
136  Called when the connection has been idle for a period of time. Return
137  `{:ok, state}` to continue or `{:disconnect, exception, state}` to
138  disconnect.
139
140  This callback is called if no callbacks have been called after the
141  idle timeout and a client process is not using the state. The idle
142  timeout can be configured by the `:idle_timeout` option. This function
143  can be called whether the connection is checked in or checked out.
144
145  This callback is called in the connection process.
146  """
147  @callback ping(state :: any) ::
148    {:ok, new_state :: any} | {:disconnect, Exception.t, new_state :: any}
149
150  @doc """
151  Handle the beginning of a transaction. Return `{:ok, result, state}` to
152  continue, `{:error, exception, state}` to abort the transaction and
153  continue or `{:disconnect, exception, state}` to abort the transaction
154  and disconnect.
155
156  This callback is called in the client process.
157  """
158  @callback handle_begin(opts :: Keyword.t, state :: any) ::
159    {:ok, result, new_state :: any} |
160    {:error | :disconnect, Exception.t, new_state :: any}
161
162  @doc """
163  Handle committing a transaction. Return `{:ok, result, state}` on success and
164  to continue, `{:error, exception, state}` to abort the transaction and
165  continue or `{:disconnect, exception, state}` to abort the transaction
166  and disconnect.
167
168  This callback is called in the client process.
169  """
170  @callback handle_commit(opts :: Keyword.t, state :: any) ::
171    {:ok, result, new_state :: any} |
172    {:error | :disconnect, Exception.t, new_state :: any}
173
174  @doc """
175  Handle rolling back a transaction. Return `{:ok, result, state}` on success
176  and to continue, `{:error, exception, state}` to abort the transaction
177  and continue or `{:disconnect, exception, state}` to abort the
178  transaction and disconnect.
179
180  A transaction will be rolled back if an exception occurs or
181  `rollback/2` is called.
182
183  This callback is called in the client process.
184  """
185  @callback handle_rollback(opts :: Keyword.t, state :: any) ::
186    {:ok, result, new_state :: any} |
187    {:error | :disconnect, Exception.t, new_state :: any}
188
189  @doc """
190  Prepare a query with the database. Return `{:ok, query, state}` where
191  `query` is a query to pass to `execute/4` or `close/3`,
192  `{:error, exception, state}` to return an error and continue or
193  `{:disconnect, exception, state}` to return an error and disconnect.
194
195  This callback is intended for cases where the state of a connection is
196  needed to prepare a query and/or the query can be saved in the
197  database to call later.
198
199  This callback is called in the client process.
200  """
201  @callback handle_prepare(query, opts :: Keyword.t, state :: any) ::
202    {:ok, query, new_state :: any} |
203    {:error | :disconnect, Exception.t, new_state :: any}
204
205  @doc """
206  Execute a query prepared by `handle_prepare/3`. Return
207  `{:ok, result, state}` to return the result `result` and continue,
208  `{:error, exception, state}` to return an error and continue or
209  `{:disconnect, exception, state}` to return an error and disconnect.
210
211  This callback is called in the client process.
212  """
213  @callback handle_execute(query, params, opts :: Keyword.t, state :: any) ::
214    {:ok, result, new_state :: any} |
215    {:error | :disconnect, Exception.t, new_state :: any}
216
217  @doc """
218  Close a query prepared by `handle_prepare/3` with the database. Return
219  `{:ok, result, state}` on success and to continue,
220  `{:error, exception, state}` to return an error and continue, or
221  `{:disconnect, exception, state}` to return an error and disconnect.
222
223  This callback is called in the client process.
224  """
225  @callback handle_close(query, opts :: Keyword.t, state :: any) ::
226    {:ok, result, new_state :: any} |
227    {:error | :disconnect, Exception.t, new_state :: any}
228
229  @doc """
230  Declare a cursor using a query prepared by `handle_prepare/3`. Return
231  `{:ok, cursor, state}` to start a cursor for a stream and continue,
232  `{:error, exception, state}` to return an error and continue or
233  `{:disconnect, exception, state}` to return an error and disconnect.
234
235  This callback is called in the client process.
236  """
237  @callback handle_declare(query, params, opts :: Keyword.t, state :: any) ::
238    {:ok, cursor, new_state :: any} |
239    {:error | :disconnect, Exception.t, new_state :: any}
240
241  @doc """
242  Fetch the first result from a cursor declared by `handle_declare/4`. Return
243  `{:ok, result, state}` to return the result `result` and continue,
244  `{:deallocate, result, state}` to return the result `result` and deallocate,
245  `{:error, exception, state}` to return an error and close the cursor,
246  `{:disconnect, exception, state}` to return an error and disconnect.
247
248  This callback is called in the client process.
249  """
250  @callback handle_first(query, cursor, opts :: Keyword.t, state :: any) ::
251    {:ok | :deallocate, result, new_state :: any} |
252    {:error | :disconnect, Exception.t, new_state :: any}
253
254  @doc """
255  Fetch the next result from a cursor declared by `handle_declare/4`. Return
256  `{:ok, result, state}` to return the result `result` and continue,
257  `{:deallocate, result, state}` to return the result `result` and deallocate,
258  `{:error, exception, state}` to return an error and close the cursor,
259  `{:disconnect, exception, state}` to return an error and disconnect.
260
261  This callback is called in the client process.
262  """
263  @callback handle_next(query, cursor, opts :: Keyword.t, state :: any) ::
264    {:ok | :deallocate, result, new_state :: any} |
265    {:error | :disconnect, Exception.t, new_state :: any}
266
267  @doc """
268  Deallocate a cursor declared by `handle_declare/4' with the database. Return
269  `{:ok, result, state}` on success and to continue,
270  `{:error, exception, state}` to return an error and continue, or
271  `{:disconnect, exception, state}` to return an error and disconnect.
272
273  This callback is called in the client process.
274  """
275  @callback handle_deallocate(query, cursor, opts :: Keyword.t, state :: any) ::
276    {:ok, result, new_state :: any} |
277    {:error | :disconnect, Exception.t, new_state :: any}
278
279  @doc """
280  Handle a message received by the connection process when checked in.
281  Return `{:ok, state}` to continue or `{:disconnect, exception,
282  state}` to disconnect.
283
284  Messages received by the connection process when checked out will be
285  logged and discared.
286
287  This callback is called in the connection process.
288  """
289  @callback handle_info(msg :: any, state :: any) ::
290    {:ok, new_state :: any} |
291    {:disconnect, Exception.t, new_state :: any}
292
293  @doc """
294  Disconnect from the database. Return `:ok`.
295
296  The exception as first argument is the exception from a `:disconnect`
297  3-tuple returned by a previous callback.
298
299  If the state is controlled by a client and it exits or takes too long
300  to process a request the state will be last known state. In these
301  cases the exception will be a `DBConnection.ConnectionError`.
302
303  This callback is called in the connection process.
304  """
305  @callback disconnect(err :: Exception.t, state :: any) :: :ok
306
307  @doc """
308  Use `DBConnection` to set the behaviour and include default
309  no-op implementations for `ping/1` and `handle_info/2`.
310  """
311  defmacro __using__(_) do
312    quote location: :keep do
313      @behaviour DBConnection
314
315      def connect(_) do
316        # We do this to trick dialyzer to not complain about non-local returns.
317        message = "connect/1 not implemented"
318        case :erlang.phash2(1, 1) do
319          0 -> raise message
320          1 -> {:error, RuntimeError.exception(message)}
321        end
322      end
323
324      def disconnect(_, _) do
325        message = "disconnect/2 not implemented"
326        case :erlang.phash2(1, 1) do
327          0 -> raise message
328          1 -> :ok
329        end
330      end
331
332      def checkout(state) do
333        message = "checkout/1 not implemented"
334        case :erlang.phash2(1, 1) do
335          0 -> raise message
336          1 -> {:disconnect, RuntimeError.exception(message), state}
337        end
338      end
339
340      def checkin(state) do
341        message = "checkin/1 not implemented"
342        case :erlang.phash2(1, 1) do
343          0 -> raise message
344          1 -> {:disconnect, RuntimeError.exception(message), state}
345        end
346      end
347
348      def ping(state), do: {:ok, state}
349
350      def handle_begin(_, state) do
351        message = "handle_begin/2 not implemented"
352        case :erlang.phash2(1, 1) do
353          0 -> raise message
354          1 -> {:error, RuntimeError.exception(message), state}
355        end
356      end
357
358      def handle_commit(_, state) do
359        message = "handle_commit/2 not implemented"
360        case :erlang.phash2(1, 1) do
361          0 -> raise message
362          1 -> {:error, RuntimeError.exception(message), state}
363        end
364      end
365
366      def handle_rollback(_, state) do
367        message = "handle_rollback/2 not implemented"
368        case :erlang.phash2(1, 1) do
369          0 -> raise message
370          1 -> {:error, RuntimeError.exception(message), state}
371        end
372      end
373
374      def handle_prepare(_, _, state) do
375       message = "handle_prepare/3 not implemented"
376        case :erlang.phash2(1, 1) do
377          0 -> raise message
378          1 -> {:error, RuntimeError.exception(message), state}
379        end
380      end
381
382      def handle_execute(_, _, _, state) do
383        message = "handle_execute/4 not implemented"
384        case :erlang.phash2(1, 1) do
385          0 -> raise message
386          1 -> {:error, RuntimeError.exception(message), state}
387        end
388      end
389
390      def handle_close(_, _, state) do
391        message = "handle_close/3 not implemented"
392        case :erlang.phash2(1, 1) do
393          0 -> raise message
394          1 -> {:error, RuntimeError.exception(message), state}
395        end
396      end
397
398      def handle_declare(_, _, _, state) do
399       message = "handle_declare/4 not implemented"
400        case :erlang.phash2(1, 1) do
401          0 -> raise message
402          1 -> {:error, RuntimeError.exception(message), state}
403        end
404      end
405
406      def handle_first(_, _, _, state) do
407       message = "handle_first/4 not implemented"
408        case :erlang.phash2(1, 1) do
409          0 -> raise message
410          1 -> {:error, RuntimeError.exception(message), state}
411        end
412      end
413
414      def handle_next(_, _, _, state) do
415       message = "handle_next/4 not implemented"
416        case :erlang.phash2(1, 1) do
417          0 -> raise message
418          1 -> {:error, RuntimeError.exception(message), state}
419        end
420      end
421
422      def handle_deallocate(_, _, _, state) do
423        message = "handle_deallocate/4 not implemented"
424        case :erlang.phash2(1, 1) do
425          0 -> raise message
426          1 -> {:error, RuntimeError.exception(message), state}
427        end
428      end
429
430      def handle_info(_, state), do: {:ok, state}
431
432      defoverridable [connect: 1, disconnect: 2, checkout: 1, checkin: 1,
433                      ping: 1, handle_begin: 2, handle_commit: 2,
434                      handle_rollback: 2, handle_prepare: 3, handle_execute: 4,
435                      handle_close: 3, handle_declare: 4, handle_first: 4,
436                      handle_next: 4, handle_deallocate: 4, handle_info: 2]
437    end
438  end
439
440  @doc """
441  Ensures the given pool applications have been started.
442
443  ### Options
444
445    * `:pool` - The `DBConnection.Pool` module to use, (default:
446    `DBConnection.Connection`)
447
448  """
449  @spec ensure_all_started(opts :: Keyword.t, type :: atom) ::
450    {:ok, [atom]} | {:error, atom}
451  def ensure_all_started(opts, type \\ :temporary) do
452    Keyword.get(opts, :pool, DBConnection.Connection).ensure_all_started(opts, type)
453  end
454
455  @doc """
456  Start and link to a database connection process.
457
458  ### Options
459
460    * `:pool` - The `DBConnection.Pool` module to use, (default:
461    `DBConnection.Connection`)
462    * `:idle` - The idle strategy, `:passive` to avoid checkin when idle and
463    `:active` to checkin when idle (default: `:passive`)
464    * `:idle_timeout` - The idle timeout to ping the database (default:
465    `1_000`)
466    * `:backoff_min` - The minimum backoff interval (default: `1_000`)
467    * `:backoff_max` - The maximum backoff interval (default: `30_000`)
468    * `:backoff_type` - The backoff strategy, `:stop` for no backoff and
469    to stop, `:exp` for exponential, `:rand` for random and `:rand_exp` for
470    random exponential (default: `:rand_exp`)
471    * `:configure` - A function to run before every connect attempt to
472    dynamically configure the options, either a 1-arity fun,
473    `{module, function, args} with options prepended to `args` or `nil` where
474    only returned options are passed to connect callback (default: `nil`)
475    * `:after_connect` - A function to run on connect using `run/3`, either
476    a 1-arity fun, `{module, function, args}` with `DBConnection.t` prepended
477    to `args` or `nil` (default: `nil`)
478    * `:name` - A name to register the started process (see the `:name` option
479    in `GenServer.start_link/3`).
480
481  ### Example
482
483      {:ok, conn} = DBConnection.start_link(mod, [idle_timeout: 5_000])
484  """
485  @spec start_link(module, opts :: Keyword.t) :: GenServer.on_start
486  def start_link(conn_mod, opts) do
487    pool_mod = Keyword.get(opts, :pool, DBConnection.Connection)
488    apply(pool_mod, :start_link, [conn_mod, opts])
489  end
490
491  @doc """
492  Create a supervisor child specification for a pool of connections.
493
494  See `Supervisor.Spec` for child options (`child_opts`).
495  """
496  @spec child_spec(module, opts :: Keyword.t, child_opts :: Keyword.t) ::
497    Supervisor.Spec.spec
498  def child_spec(conn_mod, opts, child_opts \\ []) do
499    pool_mod = Keyword.get(opts, :pool, DBConnection.Connection)
500    apply(pool_mod, :child_spec, [conn_mod, opts, child_opts])
501  end
502
503  @doc """
504  Prepare a query with a database connection for later execution and
505  returns `{:ok, query}` on success or `{:error, exception}` if there was
506  an error.
507
508  The returned `query` can then be passed to `execute/3` and/or `close/3`
509
510  ### Options
511
512    * `:pool_timeout` - The maximum time to wait for a reply when making a
513    synchronous call to the pool (default: `5_000`)
514    * `:queue` - Whether to block waiting in an internal queue for the
515    connection's state (boolean, default: `true`)
516    * `:timeout` - The maximum time that the caller is allowed the
517    to hold the connection's state (ignored when using a run/transaction
518    connection, default: `15_000`)
519    * `:log` - A function to log information about a call, either
520    a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t`
521    prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
522
523  The pool and connection module may support other options. All options
524  are passed to `handle_prepare/3`.
525
526  ### Example
527
528      query         = %Query{statement: "SELECT id FROM table"}
529      {:ok, query}  = DBConnection.prepare(conn, query)
530      {:ok, result} = DBConnection.execute(conn, query, [])
531      :ok           = DBConnection.close(conn, query)
532
533  """
534  @spec prepare(conn, query, opts :: Keyword.t) ::
535    {:ok, query} | {:error, Exception.t}
536  def prepare(conn, query, opts \\ []) do
537    query = parse(:prepare, query, nil, opts)
538    case run_prepare(conn, query, opts) do
539      {{:ok, query} = ok, meter} ->
540        log(:prepare, query, nil, meter, ok)
541      {error, meter} ->
542        log(:prepare, query, nil, meter, error)
543    end
544  end
545
546  @doc """
547  Prepare a query with a database connection and return the prepared
548  query. An exception is raised on error.
549
550  See `prepare/3`.
551  """
552  @spec prepare!(conn, query, opts :: Keyword.t) :: query
553  def prepare!(conn, query, opts \\ []) do
554    case prepare(conn, query, opts) do
555      {:ok, result} -> result
556      {:error, err} -> raise err
557    end
558  end
559
560  @doc """
561  Prepare a query and execute it with a database connection and return both the
562  prepared query and the result, `{:ok, query, result}` on success or
563  `{:error, exception}` if there was an error.
564
565  The returned `query` can be passed to `execute/4` and `close/3`.
566
567  ### Options
568
569    * `:pool_timeout` - The maximum time to wait for a reply when making a
570    synchronous call to the pool (default: `5_000`)
571    * `:queue` - Whether to block waiting in an internal queue for the
572    connection's state (boolean, default: `true`)
573    * `:timeout` - The maximum time that the caller is allowed the
574    to hold the connection's state (ignored when using a run/transaction
575    connection, default: `15_000`)
576    * `:log` - A function to log information about a call, either
577    a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t`
578    prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
579
580  ### Example
581
582      query                = %Query{statement: "SELECT id FROM table WHERE id=$1"}
583      {:ok, query, result} = DBConnection.prepare_execute(conn, query, [1])
584      {:ok, result2}       = DBConnection.execute(conn, query, [2])
585      :ok                  = DBConnection.close(conn, query)
586  """
587  @spec prepare_execute(conn, query, params, Keyword.t) ::
588    {:ok, query, result} |
589    {:error, Exception.t}
590  def prepare_execute(conn, query, params, opts \\ []) do
591    query = parse(:prepare_execute, query, params, opts)
592    case run_prepare_execute(conn, query, params, opts) do
593      {{:ok, query, result}, meter} ->
594        decode(:prepare_execute, query, params, meter, result, opts)
595      {error, meter} ->
596        log(:prepare_execute, query, params, meter, error)
597    end
598  end
599
600  @doc """
601  Prepare a query and execute it with a database connection and return both the
602  prepared query and result. An exception is raised on error.
603
604  See `prepare_execute/4`.
605  """
606  @spec prepare_execute!(conn, query, Keyword.t) :: {query, result}
607  def prepare_execute!(conn, query, params, opts \\ []) do
608    case prepare_execute(conn, query, params, opts) do
609      {:ok, query, result} -> {query, result}
610      {:error, err}        -> raise err
611    end
612  end
613
614  @doc """
615  Execute a prepared query with a database connection and return
616  `{:ok, result}` on success or `{:error, exception}` if there was an
617  error.
618
619  If the query is not prepared on the connection an attempt may be made to
620  prepare it and then execute again.
621
622  ### Options
623
624    * `:pool_timeout` - The maximum time to wait for a reply when making a
625    synchronous call to the pool (default: `5_000`)
626    * `:queue` - Whether to block waiting in an internal queue for the
627    connection's state (boolean, default: `true`)
628    * `:timeout` - The maximum time that the caller is allowed the
629    to hold the connection's state (ignored when using a run/transaction
630    connection, default: `15_000`)
631    * `:log` - A function to log information about a call, either
632    a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t`
633    prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
634
635  The pool and connection module may support other options. All options
636  are passed to `handle_execute/4`.
637
638  See `prepare/3`.
639  """
640  @spec execute(conn, query, params, opts :: Keyword.t) ::
641    {:ok, result} | {:error, Exception.t}
642  def execute(conn, query, params, opts \\ []) do
643    encoded = encode(:execute, query, params, opts)
644    case run_execute(conn, query, encoded, opts)  do
645      {{:ok, query, result}, meter} ->
646        decode(:execute, query, params, meter, result, opts)
647      {error, meter} ->
648        log(:execute, query, params, meter, error)
649    end
650  end
651
652  @doc """
653  Execute a prepared query with a database connection and return the
654  result. Raises an exception on error.
655
656  See `execute/4`
657  """
658  @spec execute!(conn, query, params, opts :: Keyword.t) :: result
659  def execute!(conn, query, params, opts \\ []) do
660    case execute(conn, query, params, opts) do
661      {:ok, result} -> result
662      {:error, err} -> raise err
663    end
664  end
665
666  @doc """
667  Close a prepared query on a database connection and return `{:ok, result}` on
668  success or `{:error, exception}` on error.
669
670  This function should be used to free resources held by the connection
671  process and/or the database server.
672
673  ## Options
674
675    * `:pool_timeout` - The maximum time to wait for a reply when making a
676    synchronous call to the pool (default: `5_000`)
677    * `:queue` - Whether to block waiting in an internal queue for the
678    connection's state (boolean, default: `true`)
679    * `:timeout` - The maximum time that the caller is allowed the
680    to hold the connection's state (ignored when using a run/transaction
681    connection, default: `15_000`)
682    * `:log` - A function to log information about a call, either
683    a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t`
684    prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
685
686  The pool and connection module may support other options. All options
687  are passed to `handle_close/3`.
688
689  See `prepare/3`.
690  """
691  @spec close(conn, query, opts :: Keyword.t) ::
692    {:ok, result} | {:error, Exception.t}
693  def close(conn, query, opts \\ []) do
694    {result, meter} = run_close(conn, query, opts)
695    log(:close, query, nil, meter, result)
696  end
697
698  @doc """
699  Close a prepared query on a database connection and return the result. Raises
700  an exception on error.
701
702  See `close/3`.
703  """
704  @spec close!(conn, query, opts :: Keyword.t) :: result
705  def close!(conn, query, opts \\ []) do
706    case close(conn, query, opts) do
707      {:ok, result} -> result
708      {:error, err} -> raise err
709    end
710  end
711
712  @doc """
713  Acquire a lock on a connection and run a series of requests on it.
714
715  The return value of this function is the return value of `fun`.
716
717  To use the locked connection call the request with the connection
718  reference passed as the single argument to the `fun`. If the
719  connection disconnects all future calls using that connection
720  reference will fail.
721
722  `run/3` and `transaction/3` can be nested multiple times but a
723  `transaction/3` call inside another `transaction/3` will be treated
724  the same as `run/3`.
725
726  ### Options
727
728    * `:pool_timeout` - The maximum time to wait for a reply when making a
729    synchronous call to the pool (default: `5_000`)
730    * `:queue` - Whether to block waiting in an internal queue for the
731    connection's state (boolean, default: `true`)
732    * `:timeout` - The maximum time that the caller is allowed the
733    to hold the connection's state (default: `15_000`)
734
735  The pool may support other options.
736
737  ### Example
738
739      {:ok, res} = DBConnection.run(conn, fn(conn) ->
740        DBConnection.execute!(conn, "SELECT id FROM table", [])
741      end)
742  """
743  @spec run(conn, (t -> result), opts :: Keyword.t) :: result when result: var
744  def run(conn, fun, opts \\ [])
745  def run(%DBConnection{} = conn, fun, _) do
746    _ = fetch_info(conn)
747    fun.(conn)
748  end
749  def run(pool, fun, opts) do
750    {conn, conn_state} = checkout(pool, opts)
751    put_info(conn, :idle, conn_state)
752    run_begin(conn, fun, opts)
753  end
754
755  @doc """
756  Acquire a lock on a connection and run a series of requests inside a
757  transaction. The result of the transaction fun is return inside an `:ok`
758  tuple: `{:ok, result}`.
759
760  To use the locked connection call the request with the connection
761  reference passed as the single argument to the `fun`. If the
762  connection disconnects all future calls using that connection
763  reference will fail.
764
765  `run/3` and `transaction/3` can be nested multiple times. If a transaction is
766  rolled back or a nested transaction `fun` raises the transaction is marked as
767  failed. Any calls inside a failed transaction (except `rollback/2`) will raise
768  until the outer transaction call returns. All running `transaction/3` calls
769  will return `{:error, :rollback}` if the transaction failed or connection
770  closed and `rollback/2` is not called for that `transaction/3`.
771
772  ### Options
773
774    * `:pool_timeout` - The maximum time to wait for a reply when making a
775    synchronous call to the pool (default: `5_000`)
776    * `:queue` - Whether to block waiting in an internal queue for the
777    connection's state (boolean, default: `true`)
778    * `:timeout` - The maximum time that the caller is allowed the
779    to hold the connection's state (default: `15_000`)
780    * `:log` - A function to log information about begin, commit and rollback
781    calls made as part of the transaction, either a 1-arity fun,
782    `{module, function, args}` with `DBConnection.LogEntry.t` prepended to
783    `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
784
785  The pool and connection module may support other options. All options
786  are passed to `handle_begin/2`, `handle_commit/2` and
787  `handle_rollback/2`.
788
789  ### Example
790
791      {:ok, res} = DBConnection.transaction(conn, fn(conn) ->
792        DBConnection.execute!(conn, "SELECT id FROM table", [])
793      end)
794  """
795  @spec transaction(conn, (conn -> result), opts :: Keyword.t) ::
796    {:ok, result} | {:error, reason :: any} when result: var
797  def transaction(conn, fun, opts \\ []) do
798    {result, log_info} = transaction_meter(conn, fun, opts)
799    transaction_log(log_info)
800    case result do
801      {:raise, err} ->
802        raise err
803      {kind, reason, stack} ->
804        :erlang.raise(kind, reason, stack)
805      other ->
806        other
807    end
808  end
809
810  @doc """
811  Rollback a transaction, does not return.
812
813  Aborts the current transaction fun. If inside `transaction/3` bubbles
814  up to the top level.
815
816  ### Example
817
818      {:error, :bar} = DBConnection.transaction(conn, fn(conn) ->
819        DBConnection.rollback(conn, :bar)
820        IO.puts "never reaches here!"
821      end)
822  """
823  @spec rollback(t, reason :: any) :: no_return
824  def rollback(%DBConnection{conn_ref: conn_ref} = conn, err) do
825    case get_info(conn) do
826      {transaction, _} when transaction in [:transaction, :failed] ->
827        throw({:rollback, conn_ref, err})
828      {transaction, _, _} when transaction in [:transaction, :failed] ->
829        throw({:rollback, conn_ref, err})
830      {:idle, _} ->
831        raise "not inside transaction"
832      {:idle, _, _} ->
833        raise "not inside transaction"
834      :closed ->
835        raise DBConnection.ConnectionError, "connection is closed"
836    end
837  end
838
839  @doc """
840  Create a stream that will prepare a query, execute it and stream results
841  using a cursor.
842
843  ### Options
844
845    * `:pool_timeout` - The maximum time to wait for a reply when making a
846    synchronous call to the pool (default: `5_000`)
847    * `:queue` - Whether to block waiting in an internal queue for the
848    connection's state (boolean, default: `true`)
849    * `:timeout` - The maximum time that the caller is allowed the
850    to hold the connection's state (ignored when using a run/transaction
851    connection, default: `15_000`)
852    * `:log` - A function to log information about a call, either
853    a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t`
854    prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
855
856  The pool and connection module may support other options. All options
857  are passed to `handle_prepare/3, `handle_close/3, `handle_declare/4`,
858  `handle_first/4`, `handle_next/4' and `handle_deallocate/4`.
859
860  ### Example
861
862      {:ok, results} = DBConnection.transaction(conn, fn(conn) ->
863        query  = %Query{statement: "SELECT id FROM table"}
864        stream = DBConnection.prepare_stream(conn, query, [])
865        Enum.to_list(stream)
866      end)
867  """
868  @spec prepare_stream(t, query, params, opts :: Keyword.t) ::
869    DBConnection.PrepareStream.t
870  def prepare_stream(%DBConnection{} = conn, query, params, opts) do
871    %DBConnection.PrepareStream{conn: conn, query: query, params: params,
872                                opts: opts}
873  end
874
875  @doc """
876  Create a stream that will execute a prepared query and stream results using a
877  cursor.
878
879  ### Options
880
881    * `:pool_timeout` - The maximum time to wait for a reply when making a
882    synchronous call to the pool (default: `5_000`)
883    * `:queue` - Whether to block waiting in an internal queue for the
884    connection's state (boolean, default: `true`)
885    * `:timeout` - The maximum time that the caller is allowed the
886    to hold the connection's state (ignored when using a run/transaction
887    connection, default: `15_000`)
888    * `:log` - A function to log information about a call, either
889    a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t`
890    prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
891
892  The pool and connection module may support other options. All options
893  are passed to `handle_declare/4`, `handle_first/4` , `handle_next/4 and
894  `handle_deallocate/4`.
895
896  ### Example
897
898      {:ok, results} = DBConnection.transaction(conn, fn(conn) ->
899        query  = %Query{statement: "SELECT id FROM table"}
900        query  = DBConnection.prepare!(conn, query)
901        stream = DBConnection.stream(conn, query, [])
902        Enum.to_list(stream)
903      end)
904  """
905  @spec stream(t, query, params, opts :: Keyword.t) :: DBConnection.Stream.t
906  def stream(%DBConnection{} = conn, query, params, opts \\ []) do
907    %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
908  end
909
910  @doc false
911  def reduce(%DBConnection.PrepareStream{} = stream, acc, fun) do
912    %DBConnection.PrepareStream{conn: conn, query: query, params: params,
913                                opts: opts} = stream
914    start = &prepare_declare(&1, query, params, &2)
915    resource(conn, start, &fetch/3, &deallocate/3, opts).(acc, fun)
916  end
917  def reduce(%DBConnection.Stream{} = stream, acc, fun) do
918    %DBConnection.Stream{conn: conn, query: query, params: params,
919                         opts: opts} = stream
920    start = &declare(&1, query, params, &2)
921    resource(conn, start, &fetch/3, &deallocate/3, opts).(acc, fun)
922  end
923
924  ## Helpers
925
926  defp checkout(pool, opts) do
927    pool_mod = Keyword.get(opts, :pool, DBConnection.Connection)
928    case apply(pool_mod, :checkout, [pool, opts]) do
929      {:ok, pool_ref, conn_mod, conn_state} ->
930        conn = %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref,
931          conn_mod: conn_mod, conn_ref: make_ref()}
932        {conn, conn_state}
933      {:error, err} ->
934        raise err
935    end
936  end
937
938  defp checkin(conn, conn_state, opts) do
939    %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn
940    _ = apply(pool_mod, :checkin, [pool_ref, conn_state, opts])
941    :ok
942  end
943
944  defp delete_disconnect(conn, conn_state, err, opts) do
945    _ = delete_info(conn)
946    %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn
947    args = [pool_ref, err, conn_state, opts]
948    _ = apply(pool_mod, :disconnect, args)
949    :ok
950  end
951
952  defp delete_stop(conn, conn_state, kind, reason, stack, opts) do
953    _ = delete_info(conn)
954    msg = "client #{inspect self()} stopped: " <>
955      Exception.format(kind, reason, stack)
956    exception = DBConnection.ConnectionError.exception(msg)
957    %DBConnection{pool_mod: pool_mod, pool_ref: pool_ref} = conn
958    args = [pool_ref, exception, conn_state, opts]
959    _ = apply(pool_mod, :stop, args)
960    :ok
961  end
962
963  defp handle(%DBConnection{conn_mod: conn_mod} = conn, fun, args, opts) do
964    {status, conn_state} = fetch_info(conn)
965    try do
966      apply(conn_mod, fun, args ++ [opts, conn_state])
967    else
968      {:ok, result, conn_state} ->
969        put_info(conn, status, conn_state)
970        {:ok, result}
971      {:deallocate, _, conn_state} = deallocate
972          when fun in [:handle_first, :handle_next] ->
973        put_info(conn, status, conn_state)
974        Tuple.delete_at(deallocate, 2)
975      {:error, _, conn_state} = error ->
976        put_info(conn, status, conn_state)
977        Tuple.delete_at(error, 2)
978      {:disconnect, err, conn_state} ->
979        delete_disconnect(conn, conn_state, err, opts)
980        {:error, err}
981      other ->
982        try do
983          raise DBConnection.ConnectionError, "bad return value: #{inspect other}"
984        catch
985          :error, reason ->
986            stack = System.stacktrace()
987            delete_stop(conn, conn_state, :error, reason, stack, opts)
988            {:error, reason, stack}
989        end
990    catch
991      kind, reason ->
992        stack = System.stacktrace()
993        delete_stop(conn, conn_state, kind, reason, stack, opts)
994        {kind, reason, stack}
995    end
996  end
997
998  defp parse(call, query, params, opts) do
999    try do
1000      DBConnection.Query.parse(query, opts)
1001    catch
1002      kind, reason ->
1003        pre_log(call, query, params, opts, kind, reason, System.stacktrace())
1004    end
1005  end
1006
1007  defp encode(call, query, params, opts) do
1008    try do
1009      DBConnection.Query.encode(query, params, opts)
1010    catch
1011      kind, reason ->
1012        stack = System.stacktrace()
1013        pre_log(call, query, params, opts, kind, reason, stack)
1014    end
1015  end
1016
1017  defp decode(call, query, params, meter, result, opts) do
1018    try do
1019      DBConnection.Query.decode(query, result, opts)
1020    catch
1021      kind, reason ->
1022        raised = {kind, reason, System.stacktrace()}
1023        decode_log(call, query, params, meter, raised)
1024    else
1025      result when call == :prepare_execute ->
1026        ok = {:ok, query, result}
1027        decode_log(call, query, params, meter, ok)
1028      result when call in [:execute, :first, :next] ->
1029        ok = {:ok, result}
1030        decode_log(call, query, params, meter, ok)
1031    end
1032  end
1033
1034  defp pre_log(call, query, params, opts, kind, reason, stack) do
1035    case Keyword.get(opts, :log) do
1036      nil -> :erlang.raise(kind, reason, stack)
1037      log -> log(call, query, params, {log, []}, {kind, reason, stack})
1038    end
1039  end
1040
1041  defp run_prepare(conn, query, opts) do
1042    run_meter(conn, fn(conn2) ->
1043      case handle(conn2, :handle_prepare, [query], opts) do
1044        {:ok, query} ->
1045          describe(conn2, query, opts)
1046        other ->
1047          other
1048      end
1049    end, opts)
1050  end
1051
1052  defp describe(conn, query, opts) do
1053    try do
1054      DBConnection.Query.describe(query, opts)
1055    catch
1056      kind, reason ->
1057        raised = {kind, reason, System.stacktrace()}
1058        raised_close(conn, query, opts, raised)
1059    else
1060      query ->
1061        {:ok, query}
1062    end
1063  end
1064
1065  defp run_prepare_execute(conn, query, params, opts) do
1066    run_meter(conn, fn(conn2) ->
1067      case handle(conn2, :handle_prepare, [query], opts) do
1068        {:ok, query} ->
1069          describe_run(conn2, :handle_execute, query, params, opts)
1070        other ->
1071          other
1072      end
1073    end, opts)
1074  end
1075
1076  defp describe_run(conn, fun, query, params, opts) do
1077    try do
1078      query = DBConnection.Query.describe(query, opts)
1079      [query, DBConnection.Query.encode(query, params, opts)]
1080    catch
1081      kind, reason ->
1082        raised = {kind, reason, System.stacktrace()}
1083        raised_close(conn, query, opts, raised)
1084    else
1085      [query, _params] = args ->
1086        case handle(conn, fun, args, opts) do
1087          {:ok, result} ->
1088            {:ok, query, result}
1089          other ->
1090            other
1091        end
1092    end
1093  end
1094
1095  defp raised_close(conn, query, opts, raised) do
1096    case handle(conn, :handle_close, [query], opts) do
1097      {:ok, _} ->
1098        raised
1099      {:error, _} ->
1100        raised
1101      {_kind, _reason, _stack} = raised ->
1102        raised
1103    end
1104  end
1105
1106  defp run_execute(conn, query, params, opts) do
1107    run_meter(conn, fn(conn2) ->
1108      case handle(conn2, :handle_execute, [query, params], opts) do
1109        {:ok, result} ->
1110          {:ok, query, result}
1111        other ->
1112          other
1113      end
1114    end, opts)
1115  end
1116
1117  defp run_close(conn, query, opts) do
1118    fun = &handle(&1, :handle_close, [query], opts)
1119    run_meter(conn, fun, opts)
1120  end
1121
1122  defmacrop time() do
1123    if function_exported?(:erlang, :monotonic_time, 0) do
1124      quote do: :erlang.monotonic_time()
1125    else
1126      quote do: :os.timestamp()
1127    end
1128  end
1129
1130  defp run_meter(%DBConnection{} = conn, fun, opts) do
1131    case Keyword.get(opts, :log) do
1132      nil ->
1133        {run(conn, fun, opts), nil}
1134      log ->
1135        run_meter(conn, log, [], fun, opts)
1136      end
1137  end
1138  defp run_meter(pool, fun, opts) do
1139    case Keyword.get(opts, :log) do
1140      nil ->
1141        {run(pool, fun, opts), nil}
1142      log ->
1143        run_meter(pool, log, [checkout: time()], fun, opts)
1144    end
1145  end
1146
1147  defp run_meter(conn, log, times, fun, opts) do
1148    fun = fn(conn2) ->
1149      start = time()
1150      result = fun.(conn2)
1151      stop = time()
1152      meter = {log, [stop: stop, start: start] ++ times}
1153      {result, meter}
1154    end
1155    run(conn, fun, opts)
1156  end
1157
1158  defp decode_log(_, _, _, nil, result), do: log_result(result)
1159  defp decode_log(call, query, params, {log, times}, result) do
1160   log(call, query, params, log, [decode: time()] ++ times, result)
1161  end
1162
1163  defp transaction_log(nil), do: :ok
1164  defp transaction_log({log, times, callback, result}) do
1165    call = transaction_call(callback)
1166    result = transaction_result(result)
1167    _ = log(:transaction, call, nil, log, times, result)
1168    :ok
1169  end
1170
1171  defp transaction_call(:handle_begin), do: :begin
1172  defp transaction_call(:handle_commit), do: :commit
1173  defp transaction_call(:handle_rollback), do: :rollback
1174
1175  defp transaction_result({:ok, _} = ok), do: ok
1176  defp transaction_result({:raise, err}), do: {:error, err}
1177  defp transaction_result({_kind, _reason, _stack} = raised), do: raised
1178
1179  defp log(_, _, _, nil, result), do: log_result(result)
1180  defp log(call, query, params, {log, times}, result) do
1181    log(call, query, params, log, times, result)
1182  end
1183
1184  defp log(call, query, params, log, times, result) do
1185    entry = DBConnection.LogEntry.new(call, query, params, times, entry_result(result))
1186    log(log, entry)
1187    log_result(result)
1188  end
1189
1190  defp entry_result({kind, reason, stack})
1191  when kind in [:error, :exit, :throw] do
1192    msg = "an exception was raised: " <> Exception.format(kind, reason, stack)
1193    {:error, %DBConnection.ConnectionError{message: msg}}
1194  end
1195  defp entry_result(other), do: other
1196
1197  defp log({mod, fun, args}, entry), do: apply(mod, fun, [entry | args])
1198  defp log(fun, entry), do: fun.(entry)
1199
1200  defp log_result({kind, reason, stack}) when kind in [:error, :exit, :throw] do
1201    :erlang.raise(kind, reason, stack)
1202  end
1203  defp log_result(other), do: other
1204
1205  defp run_begin(conn, fun, opts) do
1206    try do
1207      fun.(conn)
1208    after
1209      run_end(conn, opts)
1210    end
1211  end
1212
1213  defp run_end(conn, opts) do
1214    case delete_info(conn) do
1215      {:idle, conn_state} ->
1216        checkin(conn, conn_state, opts)
1217      {status, conn_state} when status in [:transaction, :failed] ->
1218        try do
1219          raise "connection run ended in transaction"
1220        catch
1221          :error, reason ->
1222            stack = System.stacktrace()
1223            delete_stop(conn, conn_state, :error, reason, stack, opts)
1224            :erlang.raise(:error, reason, stack)
1225        end
1226      :closed ->
1227        :ok
1228    end
1229  end
1230
1231  defp transaction_meter(%DBConnection{} = conn, fun, opts) do
1232    case fetch_info(conn) do
1233      {:transaction, _} ->
1234        {transaction_nested(conn, fun), nil}
1235      {:idle, conn_state} ->
1236        log = Keyword.get(opts, :log)
1237        begin_meter(conn, conn_state, log, [], fun, opts)
1238    end
1239  end
1240  defp transaction_meter(pool, fun, opts) do
1241    case Keyword.get(opts, :log) do
1242      nil ->
1243        run(pool, &begin(&1, nil, [], fun, opts), opts)
1244      log ->
1245        times = [checkout: time()]
1246        run(pool, &begin(&1, log, times, fun, opts), opts)
1247    end
1248  end
1249
1250  defp begin(conn, log, times, fun, opts) do
1251    {:idle, conn_state} = get_info(conn)
1252    begin_meter(conn, conn_state, log, times, fun, opts)
1253  end
1254
1255  defp begin_meter(conn, conn_state, nil, [], fun, opts) do
1256    case handle(conn, conn_state, :handle_begin, opts, :transaction) do
1257      {:ok, _} ->
1258        transaction_run(conn, nil, fun, opts)
1259      error ->
1260        {error, nil}
1261    end
1262  end
1263  defp begin_meter(conn, conn_state, log, times, fun, opts) do
1264    start = time()
1265    result = handle(conn, conn_state, :handle_begin, opts, :transaction)
1266    stop = time()
1267    log_info = {log, [stop: stop, start: start] ++ times, :handle_begin, result}
1268    case result do
1269      {:ok, _} ->
1270        fun = fn(conn2) ->
1271          transaction_log(log_info)
1272          fun.(conn2)
1273        end
1274        transaction_run(conn, log, fun, opts)
1275      error ->
1276        {error, log_info}
1277    end
1278  end
1279
1280  defp transaction_run(conn, log, fun, opts) do
1281    %DBConnection{conn_ref: conn_ref} = conn
1282    try do
1283      fun.(conn)
1284    else
1285      result ->
1286        result = {:ok, result}
1287        commit(conn, log, opts, result)
1288    catch
1289      :throw, {:rollback, ^conn_ref, reason} ->
1290        result = {:error, reason}
1291        rollback(conn, log, opts, result)
1292      kind, reason ->
1293        result = {kind, reason, System.stacktrace()}
1294        rollback(conn, log, opts, result)
1295    end
1296  end
1297
1298  defp commit(conn, log, opts, result) do
1299    case get_info(conn) do
1300      {:transaction, conn_state} ->
1301        conclude_meter(conn, conn_state, log, :handle_commit, opts, result)
1302      {:failed, conn_state} ->
1303        result = {:error, :rollback}
1304        conclude_meter(conn, conn_state, log, :handle_rollback, opts, result)
1305      :closed ->
1306        {{:error, :rollback}, nil}
1307    end
1308  end
1309
1310  defp rollback(conn, log, opts, result) do
1311    case get_info(conn) do
1312      {trans, conn_state} when trans in [:transaction, :failed] ->
1313        conclude_meter(conn, conn_state, log, :handle_rollback, opts, result)
1314      :closed ->
1315        {result, nil}
1316    end
1317  end
1318
1319  defp conclude_meter(conn, conn_state, nil, callback, opts, result) do
1320    case handle(conn, conn_state, callback, opts, :idle) do
1321      {:ok, _} ->
1322        {result, nil}
1323      error ->
1324        {error, nil}
1325    end
1326  end
1327  defp conclude_meter(conn, conn_state, log, callback, opts, result) do
1328    start = time()
1329    cb_result = handle(conn, conn_state, callback, opts, :idle)
1330    stop = time()
1331    times = [stop: stop, start: start]
1332    case cb_result do
1333      {:ok, _} ->
1334        {result, {log, times, callback, cb_result}}
1335      _error ->
1336        {cb_result, {log, times, callback, cb_result}}
1337    end
1338  end
1339
1340  defp handle(conn, conn_state, callback, opts, status) do
1341    %DBConnection{conn_mod: conn_mod} = conn
1342    try do
1343      apply(conn_mod, callback, [opts, conn_state])
1344    else
1345      {:ok, result, conn_state} ->
1346        put_info(conn, status, conn_state)
1347        {:ok, result}
1348      {:error, err, conn_state} ->
1349        put_info(conn, :idle, conn_state)
1350        {:raise, err}
1351      {:disconnect, err, conn_state} ->
1352        delete_disconnect(conn, conn_state, err, opts)
1353        {:raise, err}
1354      other ->
1355        try do
1356          raise DBConnection.ConnectionError, "bad return value: #{inspect other}"
1357        catch
1358          :error, reason ->
1359            stack = System.stacktrace()
1360            delete_stop(conn, conn_state, :error, reason, stack, opts)
1361            {:error, reason, stack}
1362        end
1363    catch
1364      kind, reason ->
1365        stack = System.stacktrace()
1366        delete_stop(conn, conn_state, kind, reason, stack, opts)
1367        {kind, reason, stack}
1368    end
1369  end
1370
1371  defp transaction_nested(conn, fun) do
1372    %DBConnection{conn_ref: conn_ref} = conn
1373    try do
1374      fun.(conn)
1375    else
1376      result ->
1377        transaction_ok(conn, {:ok, result})
1378    catch
1379      :throw, {:rollback, ^conn_ref, reason} ->
1380        transaction_failed(conn)
1381        {:error, reason}
1382      kind, reason ->
1383        stack = System.stacktrace()
1384        transaction_failed(conn)
1385        :erlang.raise(kind, reason, stack)
1386    end
1387  end
1388
1389  defp transaction_ok(conn, result) do
1390    case get_info(conn) do
1391      {:failed, _} ->
1392        {:error, :rollback}
1393      _ ->
1394        result
1395    end
1396  end
1397
1398  defp transaction_failed(conn) do
1399    case get_info(conn) do
1400      {:transaction, conn_state} ->
1401        put_info(conn, :failed, conn_state)
1402      _ ->
1403        :ok
1404    end
1405  end
1406
1407  defp prepare_declare(conn, query, params, opts) do
1408    query = parse(:prepare_declare, query, params, opts)
1409    case run_prepare_declare(conn, query, params, opts) do
1410      {{:ok, query, cursor}, meter} ->
1411        prepare_declare_log(conn, query, params, meter, cursor, opts)
1412      {error, meter} ->
1413        {:error, err} = log(:prepare_declare, query, params, meter, error)
1414        raise err
1415    end
1416  end
1417
1418  defp run_prepare_declare(conn, query, params, opts) do
1419    run_meter(conn, fn(conn2) ->
1420      case handle(conn2, :handle_prepare, [query], opts) do
1421        {:ok, query} ->
1422          describe_run(conn2, :handle_declare, query, params, opts)
1423        other ->
1424          other
1425      end
1426    end, opts)
1427  end
1428
1429  defp prepare_declare_log(conn, query, params, meter, cursor, opts) do
1430    try do
1431      log(:prepare_declare, query, params, meter, {:ok, query, cursor})
1432    catch
1433      kind, reason ->
1434        stack = System.stacktrace()
1435        deallocate(conn, query, cursor, opts)
1436        :erlang.raise(kind, reason, stack)
1437    else
1438      {:ok, query, cursor} ->
1439        {:first, query, cursor}
1440    end
1441  end
1442
1443  defp declare(conn, query, params, opts) do
1444    encoded = encode(:declare, query, params, opts)
1445    case run_declare(conn, query, encoded, opts) do
1446      {{:ok, cursor}, meter} ->
1447        declare_log(conn, query, params, meter, cursor, opts)
1448      {error, meter} ->
1449        {:error, err} = log(:declare, query, params, meter, error)
1450        raise err
1451    end
1452  end
1453
1454  defp run_declare(conn, query, params, opts) do
1455    run_meter(conn, &handle(&1, :handle_declare, [query, params], opts), opts)
1456  end
1457
1458  defp declare_log(conn, query, params, meter, cursor, opts) do
1459    try do
1460      log(:declare, query, params, meter, {:ok, cursor})
1461    catch
1462      kind, reason ->
1463        stack = System.stacktrace()
1464        deallocate(conn, query, cursor, opts)
1465        :erlang.raise(kind, reason, stack)
1466    else
1467      {:ok, cursor} ->
1468        {:first, query, cursor}
1469    end
1470  end
1471
1472  defp fetch(conn, {:first, query, cursor}, opts) do
1473    fetch(conn, :handle_first, :first, query, cursor, opts)
1474  end
1475  defp fetch(conn, {:next, query, cursor}, opts) do
1476    fetch(conn, :handle_next, :next, query, cursor, opts)
1477  end
1478  defp fetch(_, {:deallocate, _,  _} = state, _) do
1479    {:halt, state}
1480  end
1481
1482  def fetch(conn, fun, call, query, cursor, opts) do
1483    fetch = &handle(&1, fun, [query, cursor], opts)
1484    case run_meter(conn, fetch, opts) do
1485      {{:ok, result}, meter} ->
1486        fetch_decode(:next, call, query, cursor, meter, result, opts)
1487      {{:deallocate, result}, meter} ->
1488        fetch_decode(:deallocate, call, query, cursor, meter, result, opts)
1489      {error, meter} ->
1490        {:error, err} = log(call, query, cursor, meter, error)
1491        raise err
1492    end
1493  end
1494
1495  defp fetch_decode(status, call, query, cursor, meter, result, opts) do
1496    {:ok, decoded} = decode(call, query, cursor, meter, result, opts)
1497    {[decoded], {status, query, cursor}}
1498  end
1499
1500  defp deallocate(conn, {_, query, cursor}, opts) do
1501    case get_info(conn) do
1502      :closed -> :ok
1503      _       -> deallocate(conn, query, cursor, opts)
1504    end
1505  end
1506
1507  defp deallocate(conn, query, cursor, opts) do
1508    close = &handle(&1, :handle_deallocate, [query, cursor], opts)
1509    {result, meter} = run_meter(conn, close, opts)
1510    case log(:deallocate, query, cursor, meter, result) do
1511      {:ok, _}      -> :ok
1512      {:error, err} -> raise err
1513    end
1514  end
1515
1516  defp resource(%DBConnection{} = conn, start, next, stop, opts) do
1517    start = fn() -> start.(conn, opts) end
1518    next = fn(state) -> next.(conn, state, opts) end
1519    stop = fn(state) -> stop.(conn, state, opts) end
1520    Stream.resource(start, next, stop)
1521  end
1522
1523  defp put_info(conn, status, conn_state) do
1524    _ = Process.put(key(conn), {status, conn_state})
1525    :ok
1526  end
1527
1528  defp fetch_info(conn) do
1529    case get_info(conn) do
1530      {:failed, _} ->
1531        raise DBConnection.ConnectionError, "transaction rolling back"
1532      {_, _} = info ->
1533        info
1534      :closed ->
1535        raise DBConnection.ConnectionError, "connection is closed"
1536    end
1537  end
1538
1539  defp get_info(conn), do: Process.get(key(conn), :closed)
1540
1541  defp delete_info(conn) do
1542    Process.delete(key(conn)) || :closed
1543  end
1544
1545  defp key(%DBConnection{conn_ref: conn_ref}), do: {__MODULE__, conn_ref}
1546end
1547