1%% > Bind
2%% < BindComplete
3%% > Execute
4%% < DataRow*
5%% < CommandComplete
6%% -- Repeated many times --
7%% > Sync
8%% < ReadyForQuery
9-module(epgsql_cmd_batch).
10-behaviour(epgsql_command).
11-export([init/1, execute/2, handle_message/4]).
12-export_type([response/0]).
13
14-type response() :: [{ok, Count :: non_neg_integer(), Rows :: [tuple()]}
15                     | {ok, Count :: non_neg_integer()}
16                     | {ok, Rows :: [tuple()]}
17                     | {error, epgsql:query_error()}].
18
19-include("epgsql.hrl").
20-include("protocol.hrl").
21
22-record(batch,
23        {batch :: [{#statement{}, list()}],
24         decoder}).
25
26init(Batch) ->
27    #batch{batch = Batch}.
28
29execute(Sock, #batch{batch = Batch} = State) ->
30    Codec = epgsql_sock:get_codec(Sock),
31    Commands =
32        lists:foldr(
33          fun({Statement, Parameters}, Acc) ->
34                  #statement{name = StatementName,
35                             columns = Columns,
36                             types = Types} = Statement,
37                  TypedParameters = lists:zip(Types, Parameters),
38                  Bin1 = epgsql_wire:encode_parameters(TypedParameters, Codec),
39                  Bin2 = epgsql_wire:encode_formats(Columns),
40                  [{?BIND, [0, StatementName, 0, Bin1, Bin2]},
41                   {?EXECUTE, [0, <<0:?int32>>]} | Acc]
42          end,
43          [{?SYNC, []}],
44          Batch),
45    epgsql_sock:send_multi(Sock, Commands),
46    {ok, Sock, State}.
47
48handle_message(?BIND_COMPLETE, <<>>, Sock, #batch{batch = [{Stmt, _} | _]} = State) ->
49    #statement{columns = Columns} = Stmt,
50    Codec = epgsql_sock:get_codec(Sock),
51    Decoder = epgsql_wire:build_decoder(Columns, Codec),
52    {noaction, Sock, State#batch{decoder = Decoder}};
53handle_message(?DATA_ROW, <<_Count:?int16, Bin/binary>>, Sock,
54               #batch{decoder = Decoder} = State) ->
55    Row = epgsql_wire:decode_data(Bin, Decoder),
56    {add_row, Row, Sock, State};
57%% handle_message(?EMPTY_QUERY, _, Sock, _State) ->
58%%     Sock1 = epgsql_sock:add_result(Sock, {complete, empty}, {ok, [], []}),
59%%     {noaction, Sock1};
60handle_message(?COMMAND_COMPLETE, Bin, Sock,
61               #batch{batch = [{#statement{columns = Columns}, _} | Batch]} = State) ->
62    Complete = epgsql_wire:decode_complete(Bin),
63    Rows = epgsql_sock:get_rows(Sock),
64    Result = case Complete of
65                 {_, Count} when Columns == [] ->
66                     {ok, Count};
67                 {_, Count} ->
68                     {ok, Count, Rows};
69                 _ ->
70                     {ok, Rows}
71             end,
72    {add_result, Result, {complete, Complete}, Sock, State#batch{batch = Batch}};
73handle_message(?READY_FOR_QUERY, _Status, Sock, #batch{batch = B} = _State) when
74      length(B) =< 1 ->
75    Results = epgsql_sock:get_results(Sock),
76    {finish, Results, done, Sock};
77handle_message(?ERROR, Error, Sock, #batch{batch = [_ | Batch]} = State) ->
78    Result = {error, Error},
79    {add_result, Result, Result, Sock, State#batch{batch = Batch}};
80handle_message(_, _, _, _) ->
81    unknown.
82