1%%% Copyright (C) 2008 - Will Glozer.  All rights reserved.
2%%% Copyright (C) 2011 - Anton Lebedevich.  All rights reserved.
3
4-module(epgsql).
5
6-export([connect/1, connect/2, connect/3, connect/4, connect/5,
7         close/1,
8         get_parameter/2,
9         set_notice_receiver/2,
10         get_cmd_status/1,
11         squery/2,
12         equery/2, equery/3, equery/4,
13         prepared_query/3,
14         parse/2, parse/3, parse/4,
15         describe/2, describe/3,
16         bind/3, bind/4,
17         execute/2, execute/3, execute/4,
18         execute_batch/2,
19         close/2, close/3,
20         sync/1,
21         cancel/1,
22         update_type_cache/1,
23         update_type_cache/2,
24         with_transaction/2,
25         with_transaction/3,
26         sync_on_error/2,
27         standby_status_update/3,
28         start_replication/5,
29         start_replication/6,
30         to_proplist/1]).
31-export([handle_x_log_data/5]).                 % private
32
33-export_type([connection/0, connect_option/0, connect_opts/0,
34              connect_error/0, query_error/0, sql_query/0, column/0,
35              type_name/0, epgsql_type/0, statement/0]).
36
37%% Deprecated types
38-export_type([bind_param/0, typed_param/0,
39              squery_row/0, equery_row/0, reply/1,
40              pg_time/0, pg_date/0, pg_datetime/0, pg_interval/0]).
41
42-include("epgsql.hrl").
43
44-type sql_query() :: iodata().
45-type host() :: inet:ip_address() | inet:hostname().
46-type connection() :: pid().
47-type connect_option() ::
48    {host, host()}                                 |
49    {username, string()}                           |
50    {password, string()}                           |
51    {database, DBName     :: string()}             |
52    {port,     PortNum    :: inet:port_number()}   |
53    {ssl,      IsEnabled  :: boolean() | required} |
54    {ssl_opts, SslOptions :: [ssl:ssl_option()]}   | % see OTP ssl app, ssl_api.hrl
55    {timeout,  TimeoutMs  :: timeout()}            | % default: 5000 ms
56    {async,    Receiver   :: pid() | atom()}       | % process to receive LISTEN/NOTIFY msgs
57    {codecs,   Codecs     :: [{epgsql_codec:codec_mod(), any()}]} |
58    {replication, Replication :: string()}. % Pass "database" to connect in replication mode
59
60-ifdef(have_maps).
61-type connect_opts() ::
62        [connect_option()]
63      | #{host => host(),
64          username => string(),
65          password => string(),
66          database => string(),
67          port => inet:port_number(),
68          ssl => boolean() | required,
69          ssl_opts => [ssl:ssl_option()],
70          timeout => timeout(),
71          async => pid(),
72          codecs => [{epgsql_codec:codec_mod(), any()}],
73          replication => string()}.
74-else.
75-type connect_opts() :: [connect_option()].
76-endif.
77
78-type connect_error() :: epgsql_cmd_connect:connect_error().
79-type query_error() :: #error{}.
80
81
82-type type_name() :: atom().
83-type epgsql_type() :: type_name()
84                     | {array, type_name()}
85                     | {unknown_oid, integer()}.
86
87%% Deprecated
88-type pg_date() :: epgsql_codec_datetime:pg_date().
89-type pg_time() :: epgsql_codec_datetime:pg_time().
90-type pg_datetime() :: epgsql_codec_datetime:pg_datetime().
91-type pg_interval() :: epgsql_codec_datetime:pg_interval().
92
93%% Deprecated
94-type bind_param() :: any().
95
96-type typed_param() :: {epgsql_type(), bind_param()}.
97
98-type column() :: #column{}.
99-type statement() :: #statement{}.
100-type squery_row() :: tuple(). % tuple of binary().
101-type equery_row() :: tuple(). % tuple of bind_param().
102-type ok_reply(RowType) ::
103        %% select
104    {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} |
105        %% update/insert/delete
106    {ok, Count :: non_neg_integer()} |
107        %% update/insert/delete + returning
108    {ok, Count :: non_neg_integer(), ColumnsDescription :: [column()], RowsValues :: [RowType]}.
109-type error_reply() :: {error, query_error()}.
110-type reply(RowType) :: ok_reply(RowType) | error_reply().
111-type lsn() :: integer().
112-type cb_state() :: term().
113
114%% See https://github.com/erlang/rebar3/pull/1773
115-ifdef(FUN_STACKTRACE).
116-define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ).
117-else.
118-define(WITH_STACKTRACE(T, R, S), T:R:S ->).
119-endif.
120
121%% -- behaviour callbacks --
122
123%% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState).
124%% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState}
125-callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}.
126%% -------------
127
128%% -- client interface --
129-spec connect(connect_opts())
130        -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
131connect(Settings0) ->
132    Settings = to_proplist(Settings0),
133    Host = proplists:get_value(host, Settings, "localhost"),
134    Username = proplists:get_value(username, Settings, os:getenv("USER")),
135    Password = proplists:get_value(password, Settings, ""),
136    connect(Host, Username, Password, Settings).
137
138connect(Host, Opts) ->
139    connect(Host, os:getenv("USER"), "", Opts).
140
141connect(Host, Username, Opts) ->
142    connect(Host, Username, "", Opts).
143
144-spec connect(host(), string(), string(), connect_opts())
145        -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
146%% @doc connects to Postgres
147%% where
148%% `Host'     - host to connect to
149%% `Username' - username to connect as, defaults to `$USER'
150%% `Password' - optional password to authenticate with
151%% `Opts'     - proplist of extra options
152%% returns `{ok, Connection}' otherwise `{error, Reason}'
153connect(Host, Username, Password, Opts) ->
154    {ok, C} = epgsql_sock:start_link(),
155    connect(C, Host, Username, Password, Opts).
156
157-spec connect(connection(), host(), string(), string(), connect_opts())
158        -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}.
159connect(C, Host, Username, Password, Opts0) ->
160    Opts = to_proplist(Opts0),
161    %% TODO connect timeout
162    case epgsql_sock:sync_command(
163           C, epgsql_cmd_connect, {Host, Username, Password, Opts}) of
164        connected ->
165            %% If following call fails for you, try to add {codecs, []} connect option
166            {ok, _} = maybe_update_typecache(C, Opts),
167            {ok, C};
168        Error = {error, _} ->
169            Error
170    end.
171
172maybe_update_typecache(C, Opts) ->
173    maybe_update_typecache(C, proplists:get_value(replication, Opts), proplists:get_value(codecs, Opts)).
174
175maybe_update_typecache(C, undefined, undefined) ->
176    %% TODO: don't execute 'update_type_cache' when `codecs` is undefined.
177    %% This will break backward compatibility
178    update_type_cache(C);
179maybe_update_typecache(C, undefined, [_ | _] = Codecs) ->
180    update_type_cache(C, Codecs);
181maybe_update_typecache(_, _, _) ->
182    {ok, []}.
183
184update_type_cache(C) ->
185    update_type_cache(C, [{epgsql_codec_hstore, []},
186                          {epgsql_codec_postgis, []}]).
187
188-spec update_type_cache(connection(), [{epgsql_codec:codec_mod(), Opts :: any()}]) ->
189                               epgsql_cmd_update_type_cache:response() | {error, empty}.
190update_type_cache(_C, []) ->
191    {error, empty};
192update_type_cache(C, Codecs) ->
193    %% {error, #error{severity = error,
194    %%                message = <<"column \"typarray\" does not exist in pg_type">>}}
195    %% Do not fail connect if pg_type table in not in the expected
196    %% format. Known to happen for Redshift which is based on PG v8.0.2
197    epgsql_sock:sync_command(C, epgsql_cmd_update_type_cache, Codecs).
198
199%% @doc close connection
200-spec close(connection()) -> ok.
201close(C) ->
202    epgsql_sock:close(C).
203
204-spec get_parameter(connection(), binary()) -> binary() | undefined.
205get_parameter(C, Name) ->
206    epgsql_sock:get_parameter(C, Name).
207
208-spec set_notice_receiver(connection(), undefined | pid() | atom()) ->
209                                 {ok, Previous :: pid() | atom()}.
210set_notice_receiver(C, PidOrName) ->
211    epgsql_sock:set_notice_receiver(C, PidOrName).
212
213%% @doc Returns last command status message
214%% If multiple queries were executed using `squery/2', separated by semicolon,
215%% only the last query's status will be available.
216%% See https://www.postgresql.org/docs/current/static/libpq-exec.html#LIBPQ-PQCMDSTATUS
217-spec get_cmd_status(connection()) -> {ok, Status}
218                                          when
219      Status :: undefined | atom() | {atom(), integer()}.
220get_cmd_status(C) ->
221    epgsql_sock:get_cmd_status(C).
222
223-spec squery(connection(), sql_query()) -> epgsql_cmd_squery:response().
224%% @doc runs simple `SqlQuery' via given `Connection'
225squery(Connection, SqlQuery) ->
226    epgsql_sock:sync_command(Connection, epgsql_cmd_squery, SqlQuery).
227
228equery(C, Sql) ->
229    equery(C, Sql, []).
230
231%% TODO add fast_equery command that doesn't need parsed statement
232equery(C, Sql, Parameters) ->
233    case parse(C, "", Sql, []) of
234        {ok, #statement{types = Types} = S} ->
235            TypedParameters = lists:zip(Types, Parameters),
236            epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters});
237        Error ->
238            Error
239    end.
240
241-spec equery(connection(), string(), sql_query(), [bind_param()]) ->
242                    epgsql_cmd_equery:response().
243equery(C, Name, Sql, Parameters) ->
244    case parse(C, Name, Sql, []) of
245        {ok, #statement{types = Types} = S} ->
246            TypedParameters = lists:zip(Types, Parameters),
247            epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters});
248        Error ->
249            Error
250    end.
251
252-spec prepared_query(C::connection(), Name::string(), Parameters::[bind_param()]) ->
253                            epgsql_cmd_prepared_query:response().
254prepared_query(C, Name, Parameters) ->
255    case describe(C, statement, Name) of
256        {ok, #statement{types = Types} = S} ->
257            TypedParameters = lists:zip(Types, Parameters),
258            epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, TypedParameters});
259        Error ->
260            Error
261    end.
262
263
264%% parse
265
266parse(C, Sql) ->
267    parse(C, Sql, []).
268
269parse(C, Sql, Types) ->
270    parse(C, "", Sql, Types).
271
272-spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) ->
273                   epgsql_cmd_parse:response().
274parse(C, Name, Sql, Types) ->
275    sync_on_error(
276      C, epgsql_sock:sync_command(
277           C, epgsql_cmd_parse, {Name, Sql, Types})).
278
279%% bind
280
281bind(C, Statement, Parameters) ->
282    bind(C, Statement, "", Parameters).
283
284-spec bind(connection(), statement(), string(), [bind_param()]) ->
285                  epgsql_cmd_bind:response().
286bind(C, Statement, PortalName, Parameters) ->
287    sync_on_error(
288      C,
289      epgsql_sock:sync_command(
290        C, epgsql_cmd_bind, {Statement, PortalName, Parameters})).
291
292%% execute
293
294execute(C, S) ->
295    execute(C, S, "", 0).
296
297execute(C, S, N) ->
298    execute(C, S, "", N).
299
300-spec execute(connection(), statement(), string(), non_neg_integer()) -> Reply when
301      Reply :: epgsql_cmd_execute:response().
302execute(C, S, PortalName, N) ->
303    epgsql_sock:sync_command(C, epgsql_cmd_execute, {S, PortalName, N}).
304
305-spec execute_batch(connection(), [{statement(), [bind_param()]}]) ->
306                           epgsql_cmd_batch:response().
307execute_batch(C, Batch) ->
308    epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch).
309
310%% statement/portal functions
311-spec describe(connection(), statement()) -> epgsql_cmd_describe_statement:response().
312describe(C, #statement{name = Name}) ->
313    describe(C, statement, Name).
314
315-spec describe(connection(), portal, iodata()) -> epgsql_cmd_describe_portal:response();
316              (connection(), statement, iodata()) -> epgsql_cmd_describe_statement:response().
317describe(C, statement, Name) ->
318    sync_on_error(
319      C, epgsql_sock:sync_command(
320           C, epgsql_cmd_describe_statement, Name));
321
322describe(C, portal, Name) ->
323    sync_on_error(
324      C, epgsql_sock:sync_command(
325           C, epgsql_cmd_describe_portal, Name)).
326
327%% @doc close statement
328-spec close(connection(), statement()) -> epgsql_cmd_close:response().
329close(C, #statement{name = Name}) ->
330    close(C, statement, Name).
331
332-spec close(connection(), statement | portal, iodata()) -> epgsql_cmd_close:response().
333close(C, Type, Name) ->
334    epgsql_sock:sync_command(C, epgsql_cmd_close, {Type, Name}).
335
336-spec sync(connection()) -> epgsql_cmd_sync:response().
337sync(C) ->
338    epgsql_sock:sync_command(C, epgsql_cmd_sync, []).
339
340-spec cancel(connection()) -> ok.
341cancel(C) ->
342    epgsql_sock:cancel(C).
343
344%% misc helper functions
345-spec with_transaction(connection(), fun((connection()) -> Reply)) ->
346                              Reply | {rollback, any()}
347                                  when
348      Reply :: any().
349with_transaction(C, F) ->
350    with_transaction(C, F, [{reraise, false}]).
351
352%% @doc Execute callback function with connection in a transaction.
353%% Transaction will be rolled back in case of exception.
354%% Options (proplist or map):
355%% - reraise (true): when set to true, exception will be re-thrown, otherwise
356%%   {rollback, ErrorReason} will be returned
357%% - ensure_comitted (false): even when callback returns without exception,
358%%   check that transaction was comitted by checking CommandComplete status
359%%   of "COMMIT" command. In case when transaction was rolled back, status will be
360%%   "rollback" instead of "commit".
361%% - begin_opts (""): append extra options to "BEGIN" command (see
362%%   https://www.postgresql.org/docs/current/static/sql-begin.html)
363%%   Beware of SQL injections! No escaping is made on begin_opts!
364-spec with_transaction(
365        connection(), fun((connection()) -> Reply), Opts) -> Reply | {rollback, any()} | no_return() when
366      Reply :: any(),
367      Opts :: [{reraise, boolean()} |
368               {ensure_committed, boolean()} |
369               {begin_opts, iodata()}].
370with_transaction(C, F, Opts0) ->
371    Opts = to_proplist(Opts0),
372    Begin = case proplists:get_value(begin_opts, Opts) of
373                undefined -> <<"BEGIN">>;
374                BeginOpts ->
375                    [<<"BEGIN ">> | BeginOpts]
376            end,
377    try
378        {ok, [], []} = squery(C, Begin),
379        R = F(C),
380        {ok, [], []} = squery(C, <<"COMMIT">>),
381        case proplists:get_value(ensure_committed, Opts, false) of
382            true ->
383                {ok, CmdStatus} = get_cmd_status(C),
384                (commit == CmdStatus) orelse error({ensure_committed_failed, CmdStatus});
385            false -> ok
386        end,
387        R
388    catch
389        ?WITH_STACKTRACE(Type, Reason, Stack)
390            squery(C, "ROLLBACK"),
391            case proplists:get_value(reraise, Opts, true) of
392                true ->
393                    erlang:raise(Type, Reason, Stack);
394                false ->
395                    {rollback, Reason}
396            end
397    end.
398
399sync_on_error(C, Error = {error, _}) ->
400    ok = sync(C),
401    Error;
402
403sync_on_error(_C, R) ->
404    R.
405
406-spec standby_status_update(connection(), lsn(), lsn()) -> ok.
407%% @doc sends last flushed and applied WAL positions to the server in a standby status update message via
408%% given `Connection'
409standby_status_update(Connection, FlushedLSN, AppliedLSN) ->
410    gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}).
411
412handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) ->
413    Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl).
414
415-spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> Response when
416      Response :: epgsql_cmd_start_replication:response(),
417      Callback :: module() | pid().
418%% @doc instructs Postgres server to start streaming WAL for logical replication
419%% where
420%% `Connection'      - connection in replication mode
421%% `ReplicationSlot' - the name of the replication slot to stream changes from
422%% `Callback'        - Callback module which should have the callback functions implemented for message processing.
423%%                      or a process which should be able to receive replication messages.
424%% `CbInitState'     - Callback Module's initial state
425%% `WALPosition'     - the WAL position XXX/XXX to begin streaming at.
426%%                      "0/0" to let the server determine the start point.
427%% `PluginOpts'      - optional options passed to the slot's logical decoding plugin.
428%%                      For example: "option_name1 'value1', option_name2 'value2'"
429%% returns `ok' otherwise `{error, Reason}'
430start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) ->
431    Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts},
432    epgsql_sock:sync_command(Connection, epgsql_cmd_start_replication, Command).
433
434start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) ->
435    start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []).
436
437%% @private
438to_proplist(List) when is_list(List) ->
439    List;
440to_proplist(Map) ->
441    maps:to_list(Map).
442