1-module(rabbit_msg_record).
2
3-export([
4         init/1,
5         to_iodata/1,
6         from_amqp091/2,
7         to_amqp091/1,
8         add_message_annotations/2,
9         message_annotation/2,
10         message_annotation/3
11         ]).
12
13-include_lib("rabbit_common/include/rabbit.hrl").
14-include_lib("rabbit_common/include/rabbit_framing.hrl").
15-include_lib("amqp10_common/include/amqp10_framing.hrl").
16
17-type maybe(T) :: T | undefined.
18-type amqp10_data() :: #'v1_0.data'{} |
19                       [#'v1_0.amqp_sequence'{} | #'v1_0.data'{}] |
20                       #'v1_0.amqp_value'{}.
21-record(msg,
22        {
23         % header :: maybe(#'v1_0.header'{}),
24         % delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}),
25         message_annotations :: maybe(#'v1_0.message_annotations'{}),
26         properties :: maybe(#'v1_0.properties'{}),
27         application_properties :: maybe(#'v1_0.application_properties'{}),
28         data :: maybe(amqp10_data())
29         % footer :: maybe(#'v1_0.footer'{})
30         }).
31
32%% holds static or rarely changing fields
33-record(cfg, {}).
34-record(?MODULE, {cfg :: #cfg{},
35                  msg :: #msg{},
36                  %% holds a list of modifications to various sections
37                  changes = [] :: list()}).
38
39-opaque state() :: #?MODULE{}.
40
41-export_type([
42              state/0
43              ]).
44
45%% this module acts as a wrapper / converter for the internal binar storage format
46%% (AMQP 1.0) and any format it needs to be converted to / from.
47%% Efficiency is key. No unnecessary allocations or work should be done until it
48%% is absolutely needed
49
50%% init from an AMQP 1.0 encoded binary
51-spec init(binary()) -> state().
52init(Bin) when is_binary(Bin) ->
53    %% TODO: delay parsing until needed
54    {MA, P, AP, D} = decode(amqp10_framing:decode_bin(Bin),
55                            {undefined, undefined, undefined, undefined}),
56    #?MODULE{cfg = #cfg{},
57             msg = #msg{properties = P,
58                        application_properties = AP,
59                        message_annotations = MA,
60                        data = D}}.
61
62decode([], Acc) ->
63    Acc;
64decode([#'v1_0.message_annotations'{} = MA | Rem], {_, P, AP, D}) ->
65    decode(Rem, {MA, P, AP, D});
66decode([#'v1_0.properties'{} = P | Rem], {MA, _, AP, D}) ->
67    decode(Rem, {MA, P, AP, D});
68decode([#'v1_0.application_properties'{} = AP | Rem], {MA, P, _, D}) ->
69    decode(Rem, {MA, P, AP, D});
70decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, _}) ->
71    decode(Rem, {MA, P, AP, D}).
72
73amqp10_properties_empty(#'v1_0.properties'{message_id = undefined,
74                                           user_id = undefined,
75                                           to = undefined,
76                                           % subject = wrap(utf8, RKey),
77                                           reply_to = undefined,
78                                           correlation_id = undefined,
79                                           content_type = undefined,
80                                           content_encoding = undefined,
81                                           creation_time = undefined}) ->
82    true;
83amqp10_properties_empty(_) ->
84    false.
85
86%% to realise the final binary data representation
87-spec to_iodata(state()) -> iodata().
88to_iodata(#?MODULE{msg = #msg{properties = P,
89                              application_properties = AP,
90                              message_annotations = MA,
91                              data = Data}}) ->
92    [
93     case MA of
94         #'v1_0.message_annotations'{content = []} ->
95             <<>>;
96         _ ->
97             amqp10_framing:encode_bin(MA)
98     end,
99     case amqp10_properties_empty(P) of
100         true -> <<>>;
101         false ->
102             amqp10_framing:encode_bin(P)
103     end,
104     case AP of
105         #'v1_0.application_properties'{content = []} ->
106             <<>>;
107         _ ->
108             amqp10_framing:encode_bin(AP)
109     end,
110     amqp10_framing:encode_bin(Data)
111    ].
112
113%% TODO: refine type spec here
114-spec add_message_annotations(#{binary() => {atom(), term()}}, state()) ->
115    state().
116add_message_annotations(Anns,
117                        #?MODULE{msg =
118                                 #msg{message_annotations = MA0} = Msg} = State) ->
119    Content = maps:fold(
120                fun (K, {T, V}, Acc) ->
121                        map_add(symbol, K, T, V, Acc)
122                end,
123                case MA0 of
124                    undefined -> [];
125                    #'v1_0.message_annotations'{content = C} -> C
126                end,
127                Anns),
128
129    State#?MODULE{msg =
130                  Msg#msg{message_annotations =
131                            #'v1_0.message_annotations'{content = Content}}}.
132
133%% TODO: refine
134-type amqp10_term() :: {atom(), term()}.
135
136-spec message_annotation(binary(), state()) -> undefined | amqp10_term().
137message_annotation(Key, State) ->
138    message_annotation(Key, State, undefined).
139
140-spec message_annotation(binary(), state(), undefined | amqp10_term()) ->
141    undefined | amqp10_term().
142message_annotation(_Key, #?MODULE{msg = #msg{message_annotations = undefined}},
143                  Default) ->
144    Default;
145message_annotation(Key,
146                   #?MODULE{msg =
147                            #msg{message_annotations =
148                                 #'v1_0.message_annotations'{content = Content}}},
149                  Default)
150  when is_binary(Key) ->
151    case lists:search(fun ({{symbol, K}, _}) -> K == Key end, Content) of
152        {value, {_K, V}} ->
153            V;
154        false ->
155            Default
156    end.
157
158
159%% take a binary AMQP 1.0 input function,
160%% parses it and returns the current parse state
161%% this is the input function from storage and from, e.g. socket input
162-spec from_amqp091(#'P_basic'{}, iodata()) -> state().
163from_amqp091(#'P_basic'{message_id = MsgId,
164                        expiration = Expiration,
165                        delivery_mode = DelMode,
166                        headers = Headers,
167                        user_id = UserId,
168                        reply_to = ReplyTo,
169                        type = Type,
170                        priority = Priority,
171                        app_id = AppId,
172                        correlation_id = CorrId,
173                        content_type = ContentType,
174                        content_encoding = ContentEncoding,
175                        timestamp = Timestamp
176                       }, Data) ->
177    %% TODO: support parsing properties bin directly?
178    ConvertedTs = case Timestamp of
179                      undefined ->
180                          undefined;
181                      _ ->
182                          Timestamp * 1000
183                  end,
184    P = #'v1_0.properties'{message_id = wrap(utf8, MsgId),
185                           user_id = wrap(binary, UserId),
186                           to = undefined,
187                           % subject = wrap(utf8, RKey),
188                           reply_to = wrap(utf8, ReplyTo),
189                           correlation_id = wrap(utf8, CorrId),
190                           content_type = wrap(symbol, ContentType),
191                           content_encoding = wrap(symbol, ContentEncoding),
192                           creation_time = wrap(timestamp, ConvertedTs)},
193
194    APC0 = [{wrap(utf8, K), from_091(T, V)} || {K, T, V}
195                                               <- case Headers of
196                                                      undefined -> [];
197                                                      _ -> Headers
198                                                  end, not unsupported_header_value_type(T)],
199    %% properties that do not map directly to AMQP 1.0 properties are stored
200    %% in application properties
201    APC = map_add(utf8, <<"x-basic-type">>, utf8, Type,
202                  map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC0)),
203
204    MAC = map_add(symbol, <<"x-basic-priority">>, ubyte, Priority,
205                  map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode,
206                          map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration, []))),
207
208    AP = #'v1_0.application_properties'{content = APC},
209    MA = #'v1_0.message_annotations'{content = MAC},
210    #?MODULE{cfg = #cfg{},
211             msg = #msg{properties = P,
212                        application_properties = AP,
213                        message_annotations = MA,
214                        data = #'v1_0.data'{content = Data}}}.
215
216map_add(_T, _Key, _Type, undefined, Acc) ->
217    Acc;
218map_add(KeyType, Key, Type, Value, Acc) ->
219    [{wrap(KeyType, Key), wrap(Type, Value)} | Acc].
220
221-spec to_amqp091(state()) -> {#'P_basic'{}, iodata()}.
222to_amqp091(#?MODULE{msg = #msg{properties = P,
223                               application_properties = APR,
224                               message_annotations = MAR,
225                               data = #'v1_0.data'{content = Payload}}}) ->
226    #'v1_0.properties'{message_id = MsgId,
227                       user_id = UserId,
228                       reply_to = ReplyTo0,
229                       correlation_id = CorrId,
230                       content_type = ContentType,
231                       content_encoding = ContentEncoding,
232                       creation_time = Timestamp} = case P of
233                                                        undefined ->
234                                                            #'v1_0.properties'{};
235                                                        _ ->
236                                                            P
237                                                    end,
238
239    AP0 = case APR of
240              #'v1_0.application_properties'{content = AC} -> AC;
241              _ -> []
242          end,
243    MA0 = case MAR of
244              #'v1_0.message_annotations'{content = MC} -> MC;
245              _ -> []
246          end,
247
248    {Type, AP1} = amqp10_map_get(utf8(<<"x-basic-type">>), AP0),
249    {AppId, AP} = amqp10_map_get(utf8(<<"x-basic-app-id">>), AP1),
250
251    {Priority, MA1} = amqp10_map_get(symbol(<<"x-basic-priority">>), MA0),
252    {DelMode, MA2} = amqp10_map_get(symbol(<<"x-basic-delivery-mode">>), MA1),
253    {Expiration, _MA} = amqp10_map_get(symbol(<<"x-basic-expiration">>), MA2),
254
255    Headers0 = [to_091(unwrap(K), V) || {K, V} <- AP],
256    {Headers1, MsgId091} = message_id(MsgId, <<"x-message-id-type">>, Headers0),
257    {Headers, CorrId091} = message_id(CorrId, <<"x-correlation-id-type">>, Headers1),
258
259    BP = #'P_basic'{message_id =  MsgId091,
260                    delivery_mode = DelMode,
261                    expiration = Expiration,
262                    user_id = unwrap(UserId),
263                    headers = case Headers of
264                                  [] -> undefined;
265                                  _ -> Headers
266                              end,
267                    reply_to = unwrap(ReplyTo0),
268                    type = Type,
269                    app_id = AppId,
270                    priority = Priority,
271                    correlation_id = CorrId091,
272                    content_type = unwrap(ContentType),
273                    content_encoding = unwrap(ContentEncoding),
274                    timestamp = case unwrap(Timestamp) of
275                                    undefined ->
276                                        undefined;
277                                    Ts ->
278                                        Ts  div 1000
279                                end
280                   },
281    {BP, Payload}.
282
283%%% Internal
284
285amqp10_map_get(K, AP0) ->
286    case lists:keytake(K, 1, AP0) of
287        false ->
288            {undefined, AP0};
289        {value, {_, V}, AP}  ->
290            {unwrap(V), AP}
291    end.
292
293wrap(_Type, undefined) ->
294    undefined;
295wrap(Type, Val) ->
296    {Type, Val}.
297
298unwrap(undefined) ->
299    undefined;
300unwrap({_Type, V}) ->
301    V.
302
303% symbol_for(#'v1_0.properties'{}) ->
304%     {symbol, <<"amqp:properties:list">>};
305
306% number_for(#'v1_0.properties'{}) ->
307%     {ulong, 115};
308% encode(Frame = #'v1_0.properties'{}) ->
309%     amqp10_framing:encode_described(list, 115, Frame);
310
311% encode_described(list, CodeNumber, Frame) ->
312%     {described, {ulong, CodeNumber},
313%      {list, lists:map(fun encode/1, tl(tuple_to_list(Frame)))}};
314
315% -spec generate(amqp10_type()) -> iolist().
316% generate({described, Descriptor, Value}) ->
317%     DescBin = generate(Descriptor),
318%     ValueBin = generate(Value),
319%     [ ?DESCRIBED_BIN, DescBin, ValueBin ].
320
321to_091(Key, {utf8, V}) when is_binary(V) -> {Key, longstr, V};
322to_091(Key, {long, V}) -> {Key, long, V};
323to_091(Key, {byte, V}) -> {Key, byte, V};
324to_091(Key, {ubyte, V}) -> {Key, unsignedbyte, V};
325to_091(Key, {short, V}) -> {Key, short, V};
326to_091(Key, {ushort, V}) -> {Key, unsignedshort, V};
327to_091(Key, {uint, V}) -> {Key, unsignedint, V};
328to_091(Key, {int, V}) -> {Key, signedint, V};
329to_091(Key, {double, V}) -> {Key, double, V};
330to_091(Key, {float, V}) -> {Key, float, V};
331%% NB: header values can never be shortstr!
332to_091(Key, {timestamp, V}) -> {Key, timestamp, V div 1000};
333to_091(Key, {binary, V}) -> {Key, binary, V};
334to_091(Key, {boolean, V}) -> {Key, bool, V};
335to_091(Key, true) -> {Key, bool, true};
336to_091(Key, false) -> {Key, bool, false}.
337
338from_091(longstr, V) when is_binary(V) -> {utf8, V};
339from_091(long, V) -> {long, V};
340from_091(unsignedbyte, V) -> {ubyte, V};
341from_091(short, V) -> {short, V};
342from_091(unsignedshort, V) -> {ushort, V};
343from_091(unsignedint, V) -> {uint, V};
344from_091(signedint, V) -> {int, V};
345from_091(double, V) -> {double, V};
346from_091(float, V) -> {float, V};
347from_091(bool, V) -> {boolean, V};
348from_091(binary, V) -> {binary, V};
349from_091(timestamp, V) -> {timestamp, V * 1000};
350from_091(byte, V) -> {byte, V}.
351
352% convert_header(signedint, V) -> [$I, <<V:32/signed>>];
353% convert_header(decimal, V) -> {Before, After} = V,
354%                               [$D, Before, <<After:32>>];
355% convert_header(timestamp, V) -> [$T, <<V:64>>];
356% % convert_header(table, V) -> [$F | table_to_binary(V)];
357% % convert_header(array, V) -> [$A | array_to_binary(V)];
358% convert_header(byte, V) -> [$b, <<V:8/signed>>];
359% convert_header(double, V) -> [$d, <<V:64/float>>];
360% convert_header(float, V) -> [$f, <<V:32/float>>];
361% convert_header(short, V) -> [$s, <<V:16/signed>>];
362% convert_header(binary, V) -> [$x | long_string_to_binary(V)];
363% convert_header(unsignedbyte, V) -> [$B, <<V:8/unsigned>>];
364% convert_header(unsignedshort, V) -> [$u, <<V:16/unsigned>>];
365% convert_header(unsignedint, V) -> [$i, <<V:32/unsigned>>];
366% convert_header(void, _V) -> [$V].
367
368utf8(T) -> {utf8, T}.
369symbol(T) -> {symbol, T}.
370
371message_id({uuid, UUID}, HKey, H0) ->
372    H = [{HKey, longstr, <<"uuid">>} | H0],
373    {H, rabbit_data_coercion:to_binary(rabbit_guid:to_string(UUID))};
374message_id({ulong, N}, HKey, H0) ->
375    H = [{HKey, longstr, <<"ulong">>} | H0],
376    {H, erlang:integer_to_binary(N)};
377message_id({binary, B}, HKey, H0) ->
378    E = base64:encode(B),
379    case byte_size(E) > 256 of
380        true ->
381            K = binary:replace(HKey, <<"-type">>, <<>>),
382            {[{K, longstr, B} | H0], undefined};
383        false ->
384            H = [{HKey, longstr, <<"binary">>} | H0],
385            {H, E}
386    end;
387message_id({utf8, S}, HKey, H0) ->
388    case byte_size(S) > 256 of
389        true ->
390            K = binary:replace(HKey, <<"-type">>, <<>>),
391            {[{K, longstr, S} | H0], undefined};
392        false ->
393            {H0, S}
394    end;
395message_id(MsgId, _, H) ->
396    {H, unwrap(MsgId)}.
397
398 unsupported_header_value_type(array) ->
399     true;
400 unsupported_header_value_type(table) ->
401     true;
402 unsupported_header_value_type(_) ->
403     false.
404
405-ifdef(TEST).
406-include_lib("eunit/include/eunit.hrl").
407-endif.
408