1%%% File    : pgsql_proto.erl
2%%% Author  : Christian Sunesson <chrisu@kth.se>
3%%% Description : PostgreSQL protocol driver
4%%% Created :  9 May 2005
5
6%%% This is the protocol handling part of the PostgreSQL driver, it turns packages into
7%%% erlang term messages and back.
8
9-module(pgsql_proto).
10
11%% TODO:
12%% When factorizing make clear distinction between message and packet.
13%% Packet == binary on-wire representation
14%% Message = parsed Packet as erlang terms.
15
16%%% Version 3.0 of the protocol.
17%%% Supported in postgres from version 7.4
18-define(PROTOCOL_MAJOR, 3).
19-define(PROTOCOL_MINOR, 0).
20
21%%% PostgreSQL protocol message codes
22-define(PG_BACKEND_KEY_DATA, $K).
23-define(PG_PARAMETER_STATUS, $S).
24-define(PG_ERROR_MESSAGE, $E).
25-define(PG_NOTICE_RESPONSE, $N).
26-define(PG_EMPTY_RESPONSE, $I).
27-define(PG_ROW_DESCRIPTION, $T).
28-define(PG_DATA_ROW, $D).
29-define(PG_READY_FOR_QUERY, $Z).
30-define(PG_AUTHENTICATE, $R).
31-define(PG_BIND, $B).
32-define(PG_PARSE, $P).
33-define(PG_COMMAND_COMPLETE, $C).
34-define(PG_PARSE_COMPLETE, $1).
35-define(PG_BIND_COMPLETE, $2).
36-define(PG_CLOSE_COMPLETE, $3).
37-define(PG_PORTAL_SUSPENDED, $s).
38-define(PG_NO_DATA, $n).
39-define(PG_COPY_RESPONSE, $G).
40
41-export([init/2, idle/2]).
42-export([run/1]).
43
44%% For protocol unwrapping, pgsql_tcp for example.
45-export([decode_packet/2]).
46-export([encode_message/2]).
47-export([encode/2]).
48
49-import(pgsql_util, [option/2]).
50-import(pgsql_util, [socket/1]).
51-import(pgsql_util, [send/2, send_int/2, send_msg/3]).
52-import(pgsql_util, [recv_msg/2, recv_msg/1, recv_byte/2, recv_byte/1]).
53-import(pgsql_util, [string/1, make_pair/2, split_pair/1]).
54-import(pgsql_util, [count_string/1, to_string/1]).
55-import(pgsql_util, [coldescs/2, datacoldescs/3]).
56
57deliver(Message) ->
58    DriverPid = get(driver),
59    DriverPid ! Message.
60
61run(Options) ->
62    Db = spawn_link(pgsql_proto, init,
63		    [self(), Options]),
64    {ok, Db}.
65
66%% TODO: We should use states instead of process dictionary
67init(DriverPid, Options) ->
68    put(options, Options), % connection setup options
69    put(driver, DriverPid), % driver's process id
70
71    %%io:format("Init~n", []),
72    %% Default values: We connect to localhost on the standard TCP/IP
73    %% port.
74    Host = option(host, "localhost"),
75    Port = option(port, 5432),
76
77    case socket({tcp, Host, Port}) of
78	{ok, Sock} ->
79	    connect(Sock);
80	Error ->
81	    Reason = {init, Error},
82	    DriverPid ! {pgsql_error, Reason},
83	    exit(Reason)
84    end.
85
86connect(Sock) ->
87    %%io:format("Connect~n", []),
88    %% Connection settings for database-login.
89    %% TODO: Check if the default values are relevant:
90    UserName = option(user, "cos"),
91    DatabaseName = option(database, "template1"),
92
93    %% Make protocol startup packet.
94    Version = <<?PROTOCOL_MAJOR:16/integer, ?PROTOCOL_MINOR:16/integer>>,
95    User = make_pair(user, UserName),
96    Database = make_pair(database, DatabaseName),
97    StartupPacket = <<Version/binary,
98		     User/binary,
99		     Database/binary,
100		     0>>,
101
102    %% Backend will continue with authentication after the startup packet
103    PacketSize = 4 + size(StartupPacket),
104    ok = gen_tcp:send(Sock, <<PacketSize:32/integer, StartupPacket/binary>>),
105    authenticate(Sock).
106
107
108authenticate(Sock) ->
109    %% Await authentication request from backend.
110    {ok, Code, Packet} = recv_msg(Sock, 5000),
111    {ok, Value} = decode_packet(Code, Packet),
112    case Value of
113	%% Error response
114	{error_message, Message} ->
115	    exit({authentication, Message});
116	{authenticate, {AuthMethod, Salt}} ->
117	    case AuthMethod of
118		0 -> % Auth ok
119		    setup(Sock, []);
120		1 -> % Kerberos 4
121		    exit({nyi, auth_kerberos4});
122		2 -> % Kerberos 5
123		    exit({nyi, auth_kerberos5});
124		3 -> % Plaintext password
125		    Password = option(password, ""),
126		    EncodedPass = encode_message(pass_plain, Password),
127		    ok = send(Sock, EncodedPass),
128		    authenticate(Sock);
129		4 -> % Hashed password
130		    exit({nyi, auth_crypt});
131		5 -> % MD5 password
132		    Password = option(password, ""),
133		    User = option(user, ""),
134		    EncodedPass = encode_message(pass_md5,
135						 {User, Password, Salt}),
136		    ok = send(Sock, EncodedPass),
137		    authenticate(Sock);
138		_ ->
139		    exit({authentication, {unknown, AuthMethod}})
140	    end;
141	%% Unknown message received
142	Any ->
143	    exit({protocol_error, Any})
144    end.
145
146setup(Sock, Params) ->
147    %% Receive startup messages until ReadyForQuery
148    {ok, Code, Package} = recv_msg(Sock, 5000),
149    {ok, Pair} = decode_packet(Code, Package),
150    case Pair of
151	%% BackendKeyData, necessary for issuing cancel requests
152	{backend_key_data, {Pid, Secret}} ->
153	    Params1 = [{secret, {Pid, Secret}}|Params],
154	    setup(Sock, Params1);
155	%% ParameterStatus, a key-value pair.
156	{parameter_status, {Key, Value}} ->
157	    Params1 = [{{parameter, Key}, Value}|Params],
158	    setup(Sock, Params1);
159	%% Error message, with a sequence of <<Code:8/integer, String, 0>>
160	%% of error descriptions. Code==0 terminates the Reason.
161	{error_message, Message} ->
162	    gen_tcp:close(Sock),
163	    exit({error_response, Message});
164	%% Notice Response, with a sequence of <<Code:8/integer, String,0>>
165	%% identified fields. Code==0 terminates the Notice.
166	{notice_response, Notice} ->
167	    deliver({pgsql_notice, Notice}),
168	    setup(Sock, Params);
169	%% Ready for Query, backend is ready for a new query cycle
170	{ready_for_query, Status} ->
171	    deliver({pgsql_params, Params}),
172	    deliver(pgsql_connected),
173	    put(params, Params),
174	    connected(Sock);
175	Any ->
176	    deliver({unknown_setup, Any}),
177	    connected(Sock)
178
179    end.
180
181%% Connected state. Can now start to push messages
182%% between frontend and backend. But first some setup.
183connected(Sock) ->
184    DriverPid = get(driver),
185
186    %% Protocol unwrapping process. Factored out to make future
187    %% SSL and unix domain support easier. Store process under
188    %% 'socket' in the process dictionary.
189    begin
190	Unwrapper = spawn_link(pgsql_tcp, loop0, [Sock, self()]),
191	ok = gen_tcp:controlling_process(Sock, Unwrapper),
192	put(socket, Unwrapper)
193    end,
194
195    %% Lookup oid to type names and store them in a dictionary under
196    %% 'oidmap' in the process dictionary.
197    begin
198	Packet = encode_message(squery, "SELECT oid, typname FROM pg_type"),
199	ok = send(Sock, Packet),
200	{ok, [{"SELECT", _ColDesc, Rows}]} = process_squery([]),
201	Rows1 = lists:map(fun ([CodeS, NameS]) ->
202				  Code = list_to_integer(CodeS),
203				  Name = list_to_atom(NameS),
204				  {Code, Name}
205			  end,
206			  Rows),
207	OidMap = dict:from_list(Rows1),
208	put(oidmap, OidMap)
209    end,
210
211    %% Ready to start marshalling between frontend and backend.
212    idle(Sock, DriverPid).
213
214%% In the idle state we should only be receiving requests from the
215%% frontend. Async backend messages should be forwarded to responsible
216%% process.
217idle(Sock, Pid) ->
218    receive
219	%% Unexpected idle messages. No request should continue to the
220	%% idle state before a ready-for-query has been received.
221	{message, Message} ->
222	    io:format("Unexpected message when idle: ~p~n", [Message]),
223	    idle(Sock, Pid);
224
225	%% Socket closed or socket error messages.
226	{socket, Sock, Condition} ->
227	    exit({socket, Condition});
228
229	%% Close connection
230	{terminate, Ref, Pid} ->
231	    Packet = encode_message(terminate, []),
232	    ok = send(Sock, Packet),
233	    gen_tcp:close(Sock),
234	    Pid ! {pgsql, Ref, terminated},
235	    unlink(Pid),
236	    exit(terminating);
237	%% Simple query
238	{squery, Ref, Pid, Query} ->
239	    Packet = encode_message(squery, Query),
240	    ok = send(Sock, Packet),
241	    {ok, Result} = process_squery([]),
242	    case lists:keymember(error, 1, Result) of
243		true ->
244		    RBPacket = encode_message(squery, "ROLLBACK"),
245		    ok = send(Sock, RBPacket),
246		    {ok, RBResult} = process_squery([]);
247		_ ->
248		    ok
249	    end,
250	    Pid ! {pgsql, Ref, Result},
251	    idle(Sock, Pid);
252	%% Extended query
253	%% simplistic version using the unnammed prepared statement and portal.
254	{equery, Ref, Pid, {Query, Params}} ->
255	    ParseP =    encode_message(parse, {"", Query, []}),
256	    BindP =     encode_message(bind,  {"", "", Params, auto, [binary]}),
257	    DescribeP = encode_message(describe, {portal, ""}),
258	    ExecuteP =  encode_message(execute,  {"", 0}),
259	    SyncP =     encode_message(sync, []),
260	    ok = send(Sock, [ParseP, BindP, DescribeP, ExecuteP, SyncP]),
261
262	    {ok, Command, Desc, Status, Logs} = process_equery([]),
263
264	    OidMap = get(oidmap),
265	    NameTypes = lists:map(fun({Name, _Format, _ColNo, Oid, _, _, _}) ->
266					  {Name, dict:fetch(Oid, OidMap)}
267				  end,
268				  Desc),
269	    Pid ! {pgsql, Ref, {Command, Status, NameTypes, Logs}},
270	    idle(Sock, Pid);
271	%% Prepare a statement, so it can be used for queries later on.
272	{prepare, Ref, Pid, {Name, Query}} ->
273	    send_message(Sock, parse, {Name, Query, []}),
274	    send_message(Sock, describe, {prepared_statement, Name}),
275	    send_message(Sock, sync, []),
276	    {ok, State, ParamDesc, ResultDesc} = process_prepare({[], []}),
277	    OidMap = get(oidmap),
278 	    ParamTypes =
279		lists:map(fun (Oid) -> dict:fetch(Oid, OidMap) end, ParamDesc),
280	    ResultNameTypes = lists:map(fun ({ColName, _Format, _ColNo, Oid, _, _, _}) ->
281						{ColName, dict:fetch(Oid, OidMap)}
282					end,
283					ResultDesc),
284	    Pid ! {pgsql, Ref, {prepared, State, ParamTypes, ResultNameTypes}},
285	    idle(Sock, Pid);
286	%% Close a prepared statement.
287	{unprepare, Ref, Pid, Name} ->
288	    send_message(Sock, close, {prepared_statement, Name}),
289	    send_message(Sock, sync, []),
290	    {ok, _Status} = process_unprepare(),
291	    Pid ! {pgsql, Ref, unprepared},
292	    idle(Sock, Pid);
293	%% Execute a prepared statement
294	{execute, Ref, Pid, {Name, Params}} ->
295	    %%io:format("execute: ~p ~p ~n", [Name, Params]),
296	    begin % Issue first requests for the prepared statement.
297		BindP     = encode_message(bind, {"", Name, Params, auto, [binary]}),
298		DescribeP = encode_message(describe, {portal, ""}),
299		ExecuteP  = encode_message(execute, {"", 0}),
300		FlushP    = encode_message(flush, []),
301		ok = send(Sock, [BindP, DescribeP, ExecuteP, FlushP])
302	    end,
303	    receive
304		{pgsql, {bind_complete, _}} -> % Bind reply first.
305		    %% Collect response to describe message,
306		    %% which gives a hint of the rest of the messages.
307		    {ok, Command, Result} = process_execute(Sock, Ref, Pid),
308
309		    begin % Close portal and end extended query.
310			CloseP = encode_message(close, {portal, ""}),
311			SyncP  = encode_message(sync, []),
312			ok = send(Sock, [CloseP, SyncP])
313		    end,
314		    receive
315			%% Collect response to close message.
316			{pgsql, {close_complete, _}} ->
317			    receive
318				%% Collect response to sync message.
319				{pgsql, {ready_for_query, Status}} ->
320				    %%io:format("execute: ~p ~p ~p~n",
321				    %%	      [Status, Command, Result]),
322				    Pid ! {pgsql, Ref, {Command, Result}},
323				    idle(Sock, Pid);
324				{pgsql, Unknown} ->
325				    exit(Unknown)
326			    end;
327			{pgsql, Unknown} ->
328			    exit(Unknown)
329		    end;
330		{pgsql, Unknown} ->
331		    exit(Unknown)
332	    end;
333
334	%% More requests to come.
335	%% .
336	%% .
337	%% .
338
339	Any ->
340	    exit({unknown_request, Any})
341
342    end.
343
344
345%% In the process_squery state we collect responses until the backend is
346%% done processing.
347process_squery(Log) ->
348    receive
349	{pgsql, {row_description, Cols}} ->
350	    {ok, Command, Rows} = process_squery_cols([]),
351	    process_squery([{Command, Cols, Rows}|Log]);
352	{pgsql, {command_complete, Command}} ->
353	    process_squery([Command|Log]);
354	{pgsql, {ready_for_query, Status}} ->
355	    {ok, lists:reverse(Log)};
356	{pgsql, {error_message, Error}} ->
357	    process_squery([{error, Error}|Log]);
358	{pgsql, Any} ->
359	    process_squery(Log)
360    end.
361process_squery_cols(Log) ->
362    receive
363	{pgsql, {data_row, Row}} ->
364	    process_squery_cols([lists:map(fun binary_to_list/1, Row)|Log]);
365	{pgsql, {command_complete, Command}} ->
366	    {ok, Command, lists:reverse(Log)}
367    end.
368
369process_equery(Log) ->
370    receive
371	%% Consume parse and bind complete messages when waiting for the first
372	%% first row_description message. What happens if the equery doesnt
373	%% return a result set?
374	{pgsql, {parse_complete, _}} ->
375	    process_equery(Log);
376	{pgsql, {bind_complete, _}} ->
377	    process_equery(Log);
378	{pgsql, {row_description, Descs}} ->
379	    {ok, Descs1} = pgsql_util:decode_descs(Descs),
380	    process_equery_datarow(Descs1, Log, {undefined, Descs, undefined});
381	{pgsql, Any} ->
382	    process_equery([Any|Log])
383    end.
384
385process_equery_datarow(Types, Log, Info={Command, Desc, Status}) ->
386    receive
387	%%
388	{pgsql, {command_complete, Command1}} ->
389	    process_equery_datarow(Types, Log, {Command1, Desc, Status});
390	{pgsql, {ready_for_query, Status1}} ->
391	    {ok, Command, Desc, Status1, lists:reverse(Log)};
392	{pgsql, {data_row, Row}} ->
393	    {ok, DecodedRow} = pgsql_util:decode_row(Types, Row),
394	    process_equery_datarow(Types, [DecodedRow|Log], Info);
395	{pgsql, Any} ->
396	    process_equery_datarow(Types, [Any|Log], Info)
397    end.
398
399process_prepare(Info={ParamDesc, ResultDesc}) ->
400    receive
401	{pgsql, {no_data, _}} ->
402	    process_prepare({ParamDesc, []});
403	{pgsql, {parse_complete, _}} ->
404	    process_prepare(Info);
405	{pgsql, {parameter_description, Oids}} ->
406	    process_prepare({Oids, ResultDesc});
407	{pgsql, {row_description, Desc}} ->
408	    process_prepare({ParamDesc, Desc});
409	{pgsql, {ready_for_query, Status}} ->
410	    {ok, Status, ParamDesc, ResultDesc};
411	{pgsql, Any} ->
412	    io:format("process_prepare: ~p~n", [Any]),
413	    process_prepare(Info)
414    end.
415
416process_unprepare() ->
417    receive
418	{pgsql, {ready_for_query, Status}} ->
419	    {ok, Status};
420	{pgsql, {close_complate, []}} ->
421	    process_unprepare();
422	{pgsql, Any} ->
423	    io:format("process_unprepare: ~p~n", [Any]),
424	    process_unprepare()
425    end.
426
427process_execute(Sock, Ref, Pid) ->
428    %% Either the response begins with a no_data or a row_description
429    %% Needs to return {ok, Status, Result}
430    %% where Result = {Command, ...}
431    receive
432	{pgsql, {no_data, _}} ->
433	    {ok, Command, Result} = process_execute_nodata();
434	{pgsql, {row_description, Descs}} ->
435	    {ok, Types} = pgsql_util:decode_descs(Descs),
436	    {ok, Command, Result} =
437		process_execute_resultset(Sock, Ref, Pid, Types, []);
438
439	{pgsql, Unknown} ->
440	    exit(Unknown)
441    end.
442
443process_execute_nodata() ->
444    receive
445	{pgsql, {command_complete, Command}} ->
446	    case Command of
447		"INSERT "++Rest ->
448		    {ok, [{integer, _, _Table},
449			  {integer, _, NRows}], _} = erl_scan:string(Rest),
450		    {ok, 'INSERT', NRows};
451		"SELECT" ->
452		    {ok, 'SELECT', should_not_happen};
453		"DELETE "++Rest ->
454		    {ok, [{integer, _, NRows}], _} =
455			erl_scan:string(Rest),
456		    {ok, 'DELETE', NRows};
457		Any ->
458		    {ok, nyi, Any}
459	    end;
460
461	{pgsql, Unknown} ->
462	    exit(Unknown)
463    end.
464process_execute_resultset(Sock, Ref, Pid, Types, Log) ->
465    receive
466	{pgsql, {command_complete, Command}} ->
467	    {ok, list_to_atom(Command), lists:reverse(Log)};
468	{pgsql, {data_row, Row}} ->
469	    {ok, DecodedRow} = pgsql_util:decode_row(Types, Row),
470	    process_execute_resultset(Sock, Ref, Pid, Types, [DecodedRow|Log]);
471	{pgsql, {portal_suspended, _}} ->
472	    throw(portal_suspended);
473	{pgsql, Any} ->
474	    %%process_execute_resultset(Types, [Any|Log])
475	    exit(Any)
476    end.
477
478%% With a message type Code and the payload Packet apropriate
479%% decoding procedure can proceed.
480decode_packet(Code, Packet) ->
481    Ret = fun(CodeName, Values) -> {ok, {CodeName, Values}} end,
482    case Code of
483	?PG_ERROR_MESSAGE ->
484	    Message = pgsql_util:errordesc(Packet),
485	    Ret(error_message, Message);
486	?PG_EMPTY_RESPONSE ->
487	    Ret(empty_response, []);
488	?PG_ROW_DESCRIPTION ->
489	    <<Columns:16/integer, ColDescs/binary>> = Packet,
490	    Descs = coldescs(ColDescs, []),
491	    Ret(row_description, Descs);
492	?PG_READY_FOR_QUERY ->
493	    <<State:8/integer>> = Packet,
494	    case State of
495		$I ->
496		    Ret(ready_for_query, idle);
497		$T ->
498		    Ret(ready_for_query, transaction);
499		$E ->
500		    Ret(ready_for_query, failed_transaction)
501	    end;
502	?PG_COMMAND_COMPLETE ->
503	    {Task, _} = to_string(Packet),
504	    Ret(command_complete, Task);
505	?PG_DATA_ROW ->
506	    <<NumberCol:16/integer, RowData/binary>> = Packet,
507	    ColData = datacoldescs(NumberCol, RowData, []),
508	    Ret(data_row, ColData);
509	?PG_BACKEND_KEY_DATA ->
510	    <<Pid:32/integer, Secret:32/integer>> = Packet,
511	    Ret(backend_key_data, {Pid, Secret});
512	?PG_PARAMETER_STATUS ->
513	    {Key, Value} = split_pair(Packet),
514	    Ret(parameter_status, {Key, Value});
515	?PG_NOTICE_RESPONSE ->
516	    Ret(notice_response, []);
517	?PG_AUTHENTICATE ->
518	    <<AuthMethod:32/integer, Salt/binary>> = Packet,
519	    Ret(authenticate, {AuthMethod, Salt});
520	?PG_PARSE_COMPLETE ->
521	    Ret(parse_complete, []);
522	?PG_BIND_COMPLETE ->
523	    Ret(bind_complete, []);
524	?PG_PORTAL_SUSPENDED ->
525	    Ret(portal_suspended, []);
526	?PG_CLOSE_COMPLETE ->
527	    Ret(close_complete, []);
528	?PG_COPY_RESPONSE ->
529            <<FormatCode:8/integer, ColN:16/integer, ColFormat/binary>> = Packet,
530            Format = case FormatCode of
531                         0 -> text;
532                         1 -> binary
533                     end,
534            Cols=pgsql_util:int16(ColFormat,[]),
535            Ret(copy_response, {Format,Cols});
536	$t ->
537	    <<NParams:16/integer, OidsP/binary>> = Packet,
538	    Oids = pgsql_util:oids(OidsP, []),
539	    Ret(parameter_description, Oids);
540	?PG_NO_DATA ->
541	    Ret(no_data, []);
542
543	Any ->
544	    Ret(unknown, [Code])
545    end.
546
547send_message(Sock, Type, Values) ->
548    %%io:format("send_message:~p~n", [{Type, Values}]),
549    Packet = encode_message(Type, Values),
550    ok = send(Sock, Packet).
551
552%% Add header to a message.
553encode(Code, Packet) ->
554    Len = size(Packet) + 4,
555    <<Code:8/integer, Len:4/integer-unit:8, Packet/binary>>.
556
557%% Encode a message of a given type.
558encode_message(pass_plain, Password) ->
559		Pass = pgsql_util:pass_plain(Password),
560		encode($p, Pass);
561encode_message(pass_md5, {User, Password, Salt}) ->
562		Pass = pgsql_util:pass_md5(User, Password, Salt),
563		encode($p, Pass);
564encode_message(terminate, _) ->
565    encode($X, <<>>);
566encode_message(copydone, _) ->
567    encode($c, <<>>);
568encode_message(copyfail, Msg) ->
569    encode($f, string(Msg));
570encode_message(copy, Data) ->
571    encode($d, Data );
572encode_message(squery, Query) -> % squery as in simple query.
573    encode($Q, string(Query));
574encode_message(close, {Object, Name}) ->
575    Type = case Object of prepared_statement -> $S; portal -> $P end,
576    String = string(Name),
577    encode($C, <<Type/integer, String/binary>>);
578encode_message(describe, {Object, Name}) ->
579    ObjectP = case Object of prepared_statement -> $S; portal -> $P end,
580    NameP = string(Name),
581    encode($D, <<ObjectP:8/integer, NameP/binary>>);
582encode_message(flush, _) ->
583    encode($H, <<>>);
584encode_message(parse, {Name, Query, Oids}) ->
585    StringName = string(Name),
586    StringQuery = string(Query),
587    NOids=length(Oids),
588    OidsBin=lists:foldl(fun(X,Acc)-> << Acc/binary ,X:32/integer>> end, << >>, Oids),
589    encode($P, <<StringName/binary, StringQuery/binary, NOids:16/integer,OidsBin/binary>>);
590encode_message(bind, Bind={NamePortal, NamePrepared,
591                           Parameters, ParamsFormats,ResultFormats}) ->
592    %%io:format("encode bind: ~p~n", [Bind]),
593    PortalP = string(NamePortal),
594    PreparedP = string(NamePrepared),
595    NParameters = length(Parameters),
596
597    {NParametersFormat, ParamFormatsP} =
598        case ParamsFormats of
599            none  -> {0,<<>>};
600            binary-> {1,<<1:16/integer>>};
601            text  -> {1,<<0:16/integer>>};
602            auto  ->
603                ParamFormatsList = lists:map(
604                                     fun (Bin) when is_binary(Bin) -> <<1:16/integer>>;
605                                         (Text) -> <<0:16/integer>> end,
606                                     Parameters),
607                {NParameters, erlang:list_to_binary(ParamFormatsList)}
608    end,
609
610    ParametersList = lists:map(
611                       fun (null) ->
612                               Minus = -1,
613                               <<Minus:32/integer>>;
614                           (Bin) when is_binary(Bin) ->
615                               Size = size(Bin),
616                               <<Size:32/integer, Bin/binary>>;
617                           (Integer) when is_integer(Integer) ->
618                               List = integer_to_list(Integer),
619                               Bin = list_to_binary(List),
620                               Size = size(Bin),
621                               <<Size:32/integer, Bin/binary>>;
622                           (Text) ->
623                               Bin = list_to_binary(Text),
624                               Size = size(Bin),
625                               <<Size:32/integer, Bin/binary>>
626                       end,
627                       Parameters),
628    ParametersP = erlang:list_to_binary(ParametersList),
629
630    NResultFormats = length(ResultFormats),
631    ResultFormatsList = lists:map(
632                          fun (binary) -> <<1:16/integer>>;
633                              (text) ->   <<0:16/integer>> end,
634                          ResultFormats),
635    ResultFormatsP = erlang:list_to_binary(ResultFormatsList),
636
637    %%io:format("encode bind: ~p~n", [{PortalP, PreparedP,
638    %%			     NParameters, ParamFormatsP,
639    %%			     NParameters, ParametersP,
640    %%			     NResultFormats, ResultFormatsP}]),
641    encode($B, <<PortalP/binary, PreparedP/binary,
642                 NParametersFormat:16/integer, ParamFormatsP/binary,
643                 NParameters:16/integer, ParametersP/binary,
644                 NResultFormats:16/integer, ResultFormatsP/binary>>);
645encode_message(execute, {Portal, Limit}) ->
646    String = string(Portal),
647    encode($E, <<String/binary, Limit:32/integer>>);
648encode_message(sync, _) ->
649    encode($S, <<>>).
650
651
652