1%%% Copyright (C) 2008 - Will Glozer. All rights reserved. 2%%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved. 3 4-module(epgsql). 5 6-export([connect/1, connect/2, connect/3, connect/4, connect/5, 7 close/1, 8 get_parameter/2, 9 set_notice_receiver/2, 10 get_cmd_status/1, 11 squery/2, 12 equery/2, equery/3, equery/4, 13 prepared_query/3, 14 parse/2, parse/3, parse/4, 15 describe/2, describe/3, 16 bind/3, bind/4, 17 execute/2, execute/3, execute/4, 18 execute_batch/2, 19 close/2, close/3, 20 sync/1, 21 cancel/1, 22 update_type_cache/1, 23 update_type_cache/2, 24 with_transaction/2, 25 with_transaction/3, 26 sync_on_error/2, 27 standby_status_update/3, 28 start_replication/5, 29 start_replication/6, 30 to_proplist/1]). 31-export([handle_x_log_data/5]). % private 32 33-export_type([connection/0, connect_option/0, connect_opts/0, 34 connect_error/0, query_error/0, sql_query/0, column/0, 35 type_name/0, epgsql_type/0, statement/0]). 36 37%% Deprecated types 38-export_type([bind_param/0, typed_param/0, 39 squery_row/0, equery_row/0, reply/1, 40 pg_time/0, pg_date/0, pg_datetime/0, pg_interval/0]). 41 42-include("epgsql.hrl"). 43 44-type sql_query() :: iodata(). 45-type host() :: inet:ip_address() | inet:hostname(). 46-type connection() :: pid(). 47-type connect_option() :: 48 {host, host()} | 49 {username, string()} | 50 {password, string()} | 51 {database, DBName :: string()} | 52 {port, PortNum :: inet:port_number()} | 53 {ssl, IsEnabled :: boolean() | required} | 54 {ssl_opts, SslOptions :: [ssl:ssl_option()]} | % see OTP ssl app, ssl_api.hrl 55 {timeout, TimeoutMs :: timeout()} | % default: 5000 ms 56 {async, Receiver :: pid() | atom()} | % process to receive LISTEN/NOTIFY msgs 57 {codecs, Codecs :: [{epgsql_codec:codec_mod(), any()}]} | 58 {replication, Replication :: string()}. % Pass "database" to connect in replication mode 59 60-ifdef(have_maps). 61-type connect_opts() :: 62 [connect_option()] 63 | #{host => host(), 64 username => string(), 65 password => string(), 66 database => string(), 67 port => inet:port_number(), 68 ssl => boolean() | required, 69 ssl_opts => [ssl:ssl_option()], 70 timeout => timeout(), 71 async => pid(), 72 codecs => [{epgsql_codec:codec_mod(), any()}], 73 replication => string()}. 74-else. 75-type connect_opts() :: [connect_option()]. 76-endif. 77 78-type connect_error() :: epgsql_cmd_connect:connect_error(). 79-type query_error() :: #error{}. 80 81 82-type type_name() :: atom(). 83-type epgsql_type() :: type_name() 84 | {array, type_name()} 85 | {unknown_oid, integer()}. 86 87%% Deprecated 88-type pg_date() :: epgsql_codec_datetime:pg_date(). 89-type pg_time() :: epgsql_codec_datetime:pg_time(). 90-type pg_datetime() :: epgsql_codec_datetime:pg_datetime(). 91-type pg_interval() :: epgsql_codec_datetime:pg_interval(). 92 93%% Deprecated 94-type bind_param() :: any(). 95 96-type typed_param() :: {epgsql_type(), bind_param()}. 97 98-type column() :: #column{}. 99-type statement() :: #statement{}. 100-type squery_row() :: tuple(). % tuple of binary(). 101-type equery_row() :: tuple(). % tuple of bind_param(). 102-type ok_reply(RowType) :: 103 %% select 104 {ok, ColumnsDescription :: [column()], RowsValues :: [RowType]} | 105 %% update/insert/delete 106 {ok, Count :: non_neg_integer()} | 107 %% update/insert/delete + returning 108 {ok, Count :: non_neg_integer(), ColumnsDescription :: [column()], RowsValues :: [RowType]}. 109-type error_reply() :: {error, query_error()}. 110-type reply(RowType) :: ok_reply(RowType) | error_reply(). 111-type lsn() :: integer(). 112-type cb_state() :: term(). 113 114%% See https://github.com/erlang/rebar3/pull/1773 115-ifdef(FUN_STACKTRACE). 116-define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ). 117-else. 118-define(WITH_STACKTRACE(T, R, S), T:R:S ->). 119-endif. 120 121%% -- behaviour callbacks -- 122 123%% Handles a XLogData Message (StartLSN, EndLSN, WALRecord, CbState). 124%% Return: {ok, LastFlushedLSN, LastAppliedLSN, NewCbState} 125-callback handle_x_log_data(lsn(), lsn(), binary(), cb_state()) -> {ok, lsn(), lsn(), cb_state()}. 126%% ------------- 127 128%% -- client interface -- 129-spec connect(connect_opts()) 130 -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}. 131connect(Settings0) -> 132 Settings = to_proplist(Settings0), 133 Host = proplists:get_value(host, Settings, "localhost"), 134 Username = proplists:get_value(username, Settings, os:getenv("USER")), 135 Password = proplists:get_value(password, Settings, ""), 136 connect(Host, Username, Password, Settings). 137 138connect(Host, Opts) -> 139 connect(Host, os:getenv("USER"), "", Opts). 140 141connect(Host, Username, Opts) -> 142 connect(Host, Username, "", Opts). 143 144-spec connect(host(), string(), string(), connect_opts()) 145 -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}. 146%% @doc connects to Postgres 147%% where 148%% `Host' - host to connect to 149%% `Username' - username to connect as, defaults to `$USER' 150%% `Password' - optional password to authenticate with 151%% `Opts' - proplist of extra options 152%% returns `{ok, Connection}' otherwise `{error, Reason}' 153connect(Host, Username, Password, Opts) -> 154 {ok, C} = epgsql_sock:start_link(), 155 connect(C, Host, Username, Password, Opts). 156 157-spec connect(connection(), host(), string(), string(), connect_opts()) 158 -> {ok, Connection :: connection()} | {error, Reason :: connect_error()}. 159connect(C, Host, Username, Password, Opts0) -> 160 Opts = to_proplist(Opts0), 161 %% TODO connect timeout 162 case epgsql_sock:sync_command( 163 C, epgsql_cmd_connect, {Host, Username, Password, Opts}) of 164 connected -> 165 %% If following call fails for you, try to add {codecs, []} connect option 166 {ok, _} = maybe_update_typecache(C, Opts), 167 {ok, C}; 168 Error = {error, _} -> 169 Error 170 end. 171 172maybe_update_typecache(C, Opts) -> 173 maybe_update_typecache(C, proplists:get_value(replication, Opts), proplists:get_value(codecs, Opts)). 174 175maybe_update_typecache(C, undefined, undefined) -> 176 %% TODO: don't execute 'update_type_cache' when `codecs` is undefined. 177 %% This will break backward compatibility 178 update_type_cache(C); 179maybe_update_typecache(C, undefined, [_ | _] = Codecs) -> 180 update_type_cache(C, Codecs); 181maybe_update_typecache(_, _, _) -> 182 {ok, []}. 183 184update_type_cache(C) -> 185 update_type_cache(C, [{epgsql_codec_hstore, []}, 186 {epgsql_codec_postgis, []}]). 187 188-spec update_type_cache(connection(), [{epgsql_codec:codec_mod(), Opts :: any()}]) -> 189 epgsql_cmd_update_type_cache:response() | {error, empty}. 190update_type_cache(_C, []) -> 191 {error, empty}; 192update_type_cache(C, Codecs) -> 193 %% {error, #error{severity = error, 194 %% message = <<"column \"typarray\" does not exist in pg_type">>}} 195 %% Do not fail connect if pg_type table in not in the expected 196 %% format. Known to happen for Redshift which is based on PG v8.0.2 197 epgsql_sock:sync_command(C, epgsql_cmd_update_type_cache, Codecs). 198 199%% @doc close connection 200-spec close(connection()) -> ok. 201close(C) -> 202 epgsql_sock:close(C). 203 204-spec get_parameter(connection(), binary()) -> binary() | undefined. 205get_parameter(C, Name) -> 206 epgsql_sock:get_parameter(C, Name). 207 208-spec set_notice_receiver(connection(), undefined | pid() | atom()) -> 209 {ok, Previous :: pid() | atom()}. 210set_notice_receiver(C, PidOrName) -> 211 epgsql_sock:set_notice_receiver(C, PidOrName). 212 213%% @doc Returns last command status message 214%% If multiple queries were executed using `squery/2', separated by semicolon, 215%% only the last query's status will be available. 216%% See https://www.postgresql.org/docs/current/static/libpq-exec.html#LIBPQ-PQCMDSTATUS 217-spec get_cmd_status(connection()) -> {ok, Status} 218 when 219 Status :: undefined | atom() | {atom(), integer()}. 220get_cmd_status(C) -> 221 epgsql_sock:get_cmd_status(C). 222 223-spec squery(connection(), sql_query()) -> epgsql_cmd_squery:response(). 224%% @doc runs simple `SqlQuery' via given `Connection' 225squery(Connection, SqlQuery) -> 226 epgsql_sock:sync_command(Connection, epgsql_cmd_squery, SqlQuery). 227 228equery(C, Sql) -> 229 equery(C, Sql, []). 230 231%% TODO add fast_equery command that doesn't need parsed statement 232equery(C, Sql, Parameters) -> 233 case parse(C, "", Sql, []) of 234 {ok, #statement{types = Types} = S} -> 235 TypedParameters = lists:zip(Types, Parameters), 236 epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters}); 237 Error -> 238 Error 239 end. 240 241-spec equery(connection(), string(), sql_query(), [bind_param()]) -> 242 epgsql_cmd_equery:response(). 243equery(C, Name, Sql, Parameters) -> 244 case parse(C, Name, Sql, []) of 245 {ok, #statement{types = Types} = S} -> 246 TypedParameters = lists:zip(Types, Parameters), 247 epgsql_sock:sync_command(C, epgsql_cmd_equery, {S, TypedParameters}); 248 Error -> 249 Error 250 end. 251 252-spec prepared_query(C::connection(), Name::string(), Parameters::[bind_param()]) -> 253 epgsql_cmd_prepared_query:response(). 254prepared_query(C, Name, Parameters) -> 255 case describe(C, statement, Name) of 256 {ok, #statement{types = Types} = S} -> 257 TypedParameters = lists:zip(Types, Parameters), 258 epgsql_sock:sync_command(C, epgsql_cmd_prepared_query, {S, TypedParameters}); 259 Error -> 260 Error 261 end. 262 263 264%% parse 265 266parse(C, Sql) -> 267 parse(C, Sql, []). 268 269parse(C, Sql, Types) -> 270 parse(C, "", Sql, Types). 271 272-spec parse(connection(), iolist(), sql_query(), [epgsql_type()]) -> 273 epgsql_cmd_parse:response(). 274parse(C, Name, Sql, Types) -> 275 sync_on_error( 276 C, epgsql_sock:sync_command( 277 C, epgsql_cmd_parse, {Name, Sql, Types})). 278 279%% bind 280 281bind(C, Statement, Parameters) -> 282 bind(C, Statement, "", Parameters). 283 284-spec bind(connection(), statement(), string(), [bind_param()]) -> 285 epgsql_cmd_bind:response(). 286bind(C, Statement, PortalName, Parameters) -> 287 sync_on_error( 288 C, 289 epgsql_sock:sync_command( 290 C, epgsql_cmd_bind, {Statement, PortalName, Parameters})). 291 292%% execute 293 294execute(C, S) -> 295 execute(C, S, "", 0). 296 297execute(C, S, N) -> 298 execute(C, S, "", N). 299 300-spec execute(connection(), statement(), string(), non_neg_integer()) -> Reply when 301 Reply :: epgsql_cmd_execute:response(). 302execute(C, S, PortalName, N) -> 303 epgsql_sock:sync_command(C, epgsql_cmd_execute, {S, PortalName, N}). 304 305-spec execute_batch(connection(), [{statement(), [bind_param()]}]) -> 306 epgsql_cmd_batch:response(). 307execute_batch(C, Batch) -> 308 epgsql_sock:sync_command(C, epgsql_cmd_batch, Batch). 309 310%% statement/portal functions 311-spec describe(connection(), statement()) -> epgsql_cmd_describe_statement:response(). 312describe(C, #statement{name = Name}) -> 313 describe(C, statement, Name). 314 315-spec describe(connection(), portal, iodata()) -> epgsql_cmd_describe_portal:response(); 316 (connection(), statement, iodata()) -> epgsql_cmd_describe_statement:response(). 317describe(C, statement, Name) -> 318 sync_on_error( 319 C, epgsql_sock:sync_command( 320 C, epgsql_cmd_describe_statement, Name)); 321 322describe(C, portal, Name) -> 323 sync_on_error( 324 C, epgsql_sock:sync_command( 325 C, epgsql_cmd_describe_portal, Name)). 326 327%% @doc close statement 328-spec close(connection(), statement()) -> epgsql_cmd_close:response(). 329close(C, #statement{name = Name}) -> 330 close(C, statement, Name). 331 332-spec close(connection(), statement | portal, iodata()) -> epgsql_cmd_close:response(). 333close(C, Type, Name) -> 334 epgsql_sock:sync_command(C, epgsql_cmd_close, {Type, Name}). 335 336-spec sync(connection()) -> epgsql_cmd_sync:response(). 337sync(C) -> 338 epgsql_sock:sync_command(C, epgsql_cmd_sync, []). 339 340-spec cancel(connection()) -> ok. 341cancel(C) -> 342 epgsql_sock:cancel(C). 343 344%% misc helper functions 345-spec with_transaction(connection(), fun((connection()) -> Reply)) -> 346 Reply | {rollback, any()} 347 when 348 Reply :: any(). 349with_transaction(C, F) -> 350 with_transaction(C, F, [{reraise, false}]). 351 352%% @doc Execute callback function with connection in a transaction. 353%% Transaction will be rolled back in case of exception. 354%% Options (proplist or map): 355%% - reraise (true): when set to true, exception will be re-thrown, otherwise 356%% {rollback, ErrorReason} will be returned 357%% - ensure_comitted (false): even when callback returns without exception, 358%% check that transaction was comitted by checking CommandComplete status 359%% of "COMMIT" command. In case when transaction was rolled back, status will be 360%% "rollback" instead of "commit". 361%% - begin_opts (""): append extra options to "BEGIN" command (see 362%% https://www.postgresql.org/docs/current/static/sql-begin.html) 363%% Beware of SQL injections! No escaping is made on begin_opts! 364-spec with_transaction( 365 connection(), fun((connection()) -> Reply), Opts) -> Reply | {rollback, any()} | no_return() when 366 Reply :: any(), 367 Opts :: [{reraise, boolean()} | 368 {ensure_committed, boolean()} | 369 {begin_opts, iodata()}]. 370with_transaction(C, F, Opts0) -> 371 Opts = to_proplist(Opts0), 372 Begin = case proplists:get_value(begin_opts, Opts) of 373 undefined -> <<"BEGIN">>; 374 BeginOpts -> 375 [<<"BEGIN ">> | BeginOpts] 376 end, 377 try 378 {ok, [], []} = squery(C, Begin), 379 R = F(C), 380 {ok, [], []} = squery(C, <<"COMMIT">>), 381 case proplists:get_value(ensure_committed, Opts, false) of 382 true -> 383 {ok, CmdStatus} = get_cmd_status(C), 384 (commit == CmdStatus) orelse error({ensure_committed_failed, CmdStatus}); 385 false -> ok 386 end, 387 R 388 catch 389 ?WITH_STACKTRACE(Type, Reason, Stack) 390 squery(C, "ROLLBACK"), 391 case proplists:get_value(reraise, Opts, true) of 392 true -> 393 erlang:raise(Type, Reason, Stack); 394 false -> 395 {rollback, Reason} 396 end 397 end. 398 399sync_on_error(C, Error = {error, _}) -> 400 ok = sync(C), 401 Error; 402 403sync_on_error(_C, R) -> 404 R. 405 406-spec standby_status_update(connection(), lsn(), lsn()) -> ok. 407%% @doc sends last flushed and applied WAL positions to the server in a standby status update message via 408%% given `Connection' 409standby_status_update(Connection, FlushedLSN, AppliedLSN) -> 410 gen_server:call(Connection, {standby_status_update, FlushedLSN, AppliedLSN}). 411 412handle_x_log_data(Mod, StartLSN, EndLSN, WALRecord, Repl) -> 413 Mod:handle_x_log_data(StartLSN, EndLSN, WALRecord, Repl). 414 415-spec start_replication(connection(), string(), Callback, cb_state(), string(), string()) -> Response when 416 Response :: epgsql_cmd_start_replication:response(), 417 Callback :: module() | pid(). 418%% @doc instructs Postgres server to start streaming WAL for logical replication 419%% where 420%% `Connection' - connection in replication mode 421%% `ReplicationSlot' - the name of the replication slot to stream changes from 422%% `Callback' - Callback module which should have the callback functions implemented for message processing. 423%% or a process which should be able to receive replication messages. 424%% `CbInitState' - Callback Module's initial state 425%% `WALPosition' - the WAL position XXX/XXX to begin streaming at. 426%% "0/0" to let the server determine the start point. 427%% `PluginOpts' - optional options passed to the slot's logical decoding plugin. 428%% For example: "option_name1 'value1', option_name2 'value2'" 429%% returns `ok' otherwise `{error, Reason}' 430start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts) -> 431 Command = {ReplicationSlot, Callback, CbInitState, WALPosition, PluginOpts}, 432 epgsql_sock:sync_command(Connection, epgsql_cmd_start_replication, Command). 433 434start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition) -> 435 start_replication(Connection, ReplicationSlot, Callback, CbInitState, WALPosition, []). 436 437%% @private 438to_proplist(List) when is_list(List) -> 439 List; 440to_proplist(Map) -> 441 maps:to_list(Map). 442