1-module(rabbit_msg_record). 2 3-export([ 4 init/1, 5 to_iodata/1, 6 from_amqp091/2, 7 to_amqp091/1, 8 add_message_annotations/2, 9 message_annotation/2, 10 message_annotation/3 11 ]). 12 13-include_lib("rabbit_common/include/rabbit.hrl"). 14-include_lib("rabbit_common/include/rabbit_framing.hrl"). 15-include_lib("amqp10_common/include/amqp10_framing.hrl"). 16 17-type maybe(T) :: T | undefined. 18-type amqp10_data() :: #'v1_0.data'{} | 19 [#'v1_0.amqp_sequence'{} | #'v1_0.data'{}] | 20 #'v1_0.amqp_value'{}. 21-record(msg, 22 { 23 % header :: maybe(#'v1_0.header'{}), 24 % delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}), 25 message_annotations :: maybe(#'v1_0.message_annotations'{}), 26 properties :: maybe(#'v1_0.properties'{}), 27 application_properties :: maybe(#'v1_0.application_properties'{}), 28 data :: maybe(amqp10_data()) 29 % footer :: maybe(#'v1_0.footer'{}) 30 }). 31 32%% holds static or rarely changing fields 33-record(cfg, {}). 34-record(?MODULE, {cfg :: #cfg{}, 35 msg :: #msg{}, 36 %% holds a list of modifications to various sections 37 changes = [] :: list()}). 38 39-opaque state() :: #?MODULE{}. 40 41-export_type([ 42 state/0 43 ]). 44 45%% this module acts as a wrapper / converter for the internal binar storage format 46%% (AMQP 1.0) and any format it needs to be converted to / from. 47%% Efficiency is key. No unnecessary allocations or work should be done until it 48%% is absolutely needed 49 50%% init from an AMQP 1.0 encoded binary 51-spec init(binary()) -> state(). 52init(Bin) when is_binary(Bin) -> 53 %% TODO: delay parsing until needed 54 {MA, P, AP, D} = decode(amqp10_framing:decode_bin(Bin), 55 {undefined, undefined, undefined, undefined}), 56 #?MODULE{cfg = #cfg{}, 57 msg = #msg{properties = P, 58 application_properties = AP, 59 message_annotations = MA, 60 data = D}}. 61 62decode([], Acc) -> 63 Acc; 64decode([#'v1_0.message_annotations'{} = MA | Rem], {_, P, AP, D}) -> 65 decode(Rem, {MA, P, AP, D}); 66decode([#'v1_0.properties'{} = P | Rem], {MA, _, AP, D}) -> 67 decode(Rem, {MA, P, AP, D}); 68decode([#'v1_0.application_properties'{} = AP | Rem], {MA, P, _, D}) -> 69 decode(Rem, {MA, P, AP, D}); 70decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, _}) -> 71 decode(Rem, {MA, P, AP, D}). 72 73amqp10_properties_empty(#'v1_0.properties'{message_id = undefined, 74 user_id = undefined, 75 to = undefined, 76 % subject = wrap(utf8, RKey), 77 reply_to = undefined, 78 correlation_id = undefined, 79 content_type = undefined, 80 content_encoding = undefined, 81 creation_time = undefined}) -> 82 true; 83amqp10_properties_empty(_) -> 84 false. 85 86%% to realise the final binary data representation 87-spec to_iodata(state()) -> iodata(). 88to_iodata(#?MODULE{msg = #msg{properties = P, 89 application_properties = AP, 90 message_annotations = MA, 91 data = Data}}) -> 92 [ 93 case MA of 94 #'v1_0.message_annotations'{content = []} -> 95 <<>>; 96 _ -> 97 amqp10_framing:encode_bin(MA) 98 end, 99 case amqp10_properties_empty(P) of 100 true -> <<>>; 101 false -> 102 amqp10_framing:encode_bin(P) 103 end, 104 case AP of 105 #'v1_0.application_properties'{content = []} -> 106 <<>>; 107 _ -> 108 amqp10_framing:encode_bin(AP) 109 end, 110 amqp10_framing:encode_bin(Data) 111 ]. 112 113%% TODO: refine type spec here 114-spec add_message_annotations(#{binary() => {atom(), term()}}, state()) -> 115 state(). 116add_message_annotations(Anns, 117 #?MODULE{msg = 118 #msg{message_annotations = MA0} = Msg} = State) -> 119 Content = maps:fold( 120 fun (K, {T, V}, Acc) -> 121 map_add(symbol, K, T, V, Acc) 122 end, 123 case MA0 of 124 undefined -> []; 125 #'v1_0.message_annotations'{content = C} -> C 126 end, 127 Anns), 128 129 State#?MODULE{msg = 130 Msg#msg{message_annotations = 131 #'v1_0.message_annotations'{content = Content}}}. 132 133%% TODO: refine 134-type amqp10_term() :: {atom(), term()}. 135 136-spec message_annotation(binary(), state()) -> undefined | amqp10_term(). 137message_annotation(Key, State) -> 138 message_annotation(Key, State, undefined). 139 140-spec message_annotation(binary(), state(), undefined | amqp10_term()) -> 141 undefined | amqp10_term(). 142message_annotation(_Key, #?MODULE{msg = #msg{message_annotations = undefined}}, 143 Default) -> 144 Default; 145message_annotation(Key, 146 #?MODULE{msg = 147 #msg{message_annotations = 148 #'v1_0.message_annotations'{content = Content}}}, 149 Default) 150 when is_binary(Key) -> 151 case lists:search(fun ({{symbol, K}, _}) -> K == Key end, Content) of 152 {value, {_K, V}} -> 153 V; 154 false -> 155 Default 156 end. 157 158 159%% take a binary AMQP 1.0 input function, 160%% parses it and returns the current parse state 161%% this is the input function from storage and from, e.g. socket input 162-spec from_amqp091(#'P_basic'{}, iodata()) -> state(). 163from_amqp091(#'P_basic'{message_id = MsgId, 164 expiration = Expiration, 165 delivery_mode = DelMode, 166 headers = Headers, 167 user_id = UserId, 168 reply_to = ReplyTo, 169 type = Type, 170 priority = Priority, 171 app_id = AppId, 172 correlation_id = CorrId, 173 content_type = ContentType, 174 content_encoding = ContentEncoding, 175 timestamp = Timestamp 176 }, Data) -> 177 %% TODO: support parsing properties bin directly? 178 ConvertedTs = case Timestamp of 179 undefined -> 180 undefined; 181 _ -> 182 Timestamp * 1000 183 end, 184 P = #'v1_0.properties'{message_id = wrap(utf8, MsgId), 185 user_id = wrap(binary, UserId), 186 to = undefined, 187 % subject = wrap(utf8, RKey), 188 reply_to = wrap(utf8, ReplyTo), 189 correlation_id = wrap(utf8, CorrId), 190 content_type = wrap(symbol, ContentType), 191 content_encoding = wrap(symbol, ContentEncoding), 192 creation_time = wrap(timestamp, ConvertedTs)}, 193 194 APC0 = [{wrap(utf8, K), from_091(T, V)} || {K, T, V} 195 <- case Headers of 196 undefined -> []; 197 _ -> Headers 198 end, not unsupported_header_value_type(T)], 199 %% properties that do not map directly to AMQP 1.0 properties are stored 200 %% in application properties 201 APC = map_add(utf8, <<"x-basic-type">>, utf8, Type, 202 map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC0)), 203 204 MAC = map_add(symbol, <<"x-basic-priority">>, ubyte, Priority, 205 map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode, 206 map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration, []))), 207 208 AP = #'v1_0.application_properties'{content = APC}, 209 MA = #'v1_0.message_annotations'{content = MAC}, 210 #?MODULE{cfg = #cfg{}, 211 msg = #msg{properties = P, 212 application_properties = AP, 213 message_annotations = MA, 214 data = #'v1_0.data'{content = Data}}}. 215 216map_add(_T, _Key, _Type, undefined, Acc) -> 217 Acc; 218map_add(KeyType, Key, Type, Value, Acc) -> 219 [{wrap(KeyType, Key), wrap(Type, Value)} | Acc]. 220 221-spec to_amqp091(state()) -> {#'P_basic'{}, iodata()}. 222to_amqp091(#?MODULE{msg = #msg{properties = P, 223 application_properties = APR, 224 message_annotations = MAR, 225 data = #'v1_0.data'{content = Payload}}}) -> 226 #'v1_0.properties'{message_id = MsgId, 227 user_id = UserId, 228 reply_to = ReplyTo0, 229 correlation_id = CorrId, 230 content_type = ContentType, 231 content_encoding = ContentEncoding, 232 creation_time = Timestamp} = case P of 233 undefined -> 234 #'v1_0.properties'{}; 235 _ -> 236 P 237 end, 238 239 AP0 = case APR of 240 #'v1_0.application_properties'{content = AC} -> AC; 241 _ -> [] 242 end, 243 MA0 = case MAR of 244 #'v1_0.message_annotations'{content = MC} -> MC; 245 _ -> [] 246 end, 247 248 {Type, AP1} = amqp10_map_get(utf8(<<"x-basic-type">>), AP0), 249 {AppId, AP} = amqp10_map_get(utf8(<<"x-basic-app-id">>), AP1), 250 251 {Priority, MA1} = amqp10_map_get(symbol(<<"x-basic-priority">>), MA0), 252 {DelMode, MA2} = amqp10_map_get(symbol(<<"x-basic-delivery-mode">>), MA1), 253 {Expiration, _MA} = amqp10_map_get(symbol(<<"x-basic-expiration">>), MA2), 254 255 Headers0 = [to_091(unwrap(K), V) || {K, V} <- AP], 256 {Headers1, MsgId091} = message_id(MsgId, <<"x-message-id-type">>, Headers0), 257 {Headers, CorrId091} = message_id(CorrId, <<"x-correlation-id-type">>, Headers1), 258 259 BP = #'P_basic'{message_id = MsgId091, 260 delivery_mode = DelMode, 261 expiration = Expiration, 262 user_id = unwrap(UserId), 263 headers = case Headers of 264 [] -> undefined; 265 _ -> Headers 266 end, 267 reply_to = unwrap(ReplyTo0), 268 type = Type, 269 app_id = AppId, 270 priority = Priority, 271 correlation_id = CorrId091, 272 content_type = unwrap(ContentType), 273 content_encoding = unwrap(ContentEncoding), 274 timestamp = case unwrap(Timestamp) of 275 undefined -> 276 undefined; 277 Ts -> 278 Ts div 1000 279 end 280 }, 281 {BP, Payload}. 282 283%%% Internal 284 285amqp10_map_get(K, AP0) -> 286 case lists:keytake(K, 1, AP0) of 287 false -> 288 {undefined, AP0}; 289 {value, {_, V}, AP} -> 290 {unwrap(V), AP} 291 end. 292 293wrap(_Type, undefined) -> 294 undefined; 295wrap(Type, Val) -> 296 {Type, Val}. 297 298unwrap(undefined) -> 299 undefined; 300unwrap({_Type, V}) -> 301 V. 302 303% symbol_for(#'v1_0.properties'{}) -> 304% {symbol, <<"amqp:properties:list">>}; 305 306% number_for(#'v1_0.properties'{}) -> 307% {ulong, 115}; 308% encode(Frame = #'v1_0.properties'{}) -> 309% amqp10_framing:encode_described(list, 115, Frame); 310 311% encode_described(list, CodeNumber, Frame) -> 312% {described, {ulong, CodeNumber}, 313% {list, lists:map(fun encode/1, tl(tuple_to_list(Frame)))}}; 314 315% -spec generate(amqp10_type()) -> iolist(). 316% generate({described, Descriptor, Value}) -> 317% DescBin = generate(Descriptor), 318% ValueBin = generate(Value), 319% [ ?DESCRIBED_BIN, DescBin, ValueBin ]. 320 321to_091(Key, {utf8, V}) when is_binary(V) -> {Key, longstr, V}; 322to_091(Key, {long, V}) -> {Key, long, V}; 323to_091(Key, {byte, V}) -> {Key, byte, V}; 324to_091(Key, {ubyte, V}) -> {Key, unsignedbyte, V}; 325to_091(Key, {short, V}) -> {Key, short, V}; 326to_091(Key, {ushort, V}) -> {Key, unsignedshort, V}; 327to_091(Key, {uint, V}) -> {Key, unsignedint, V}; 328to_091(Key, {int, V}) -> {Key, signedint, V}; 329to_091(Key, {double, V}) -> {Key, double, V}; 330to_091(Key, {float, V}) -> {Key, float, V}; 331%% NB: header values can never be shortstr! 332to_091(Key, {timestamp, V}) -> {Key, timestamp, V div 1000}; 333to_091(Key, {binary, V}) -> {Key, binary, V}; 334to_091(Key, {boolean, V}) -> {Key, bool, V}; 335to_091(Key, true) -> {Key, bool, true}; 336to_091(Key, false) -> {Key, bool, false}. 337 338from_091(longstr, V) when is_binary(V) -> {utf8, V}; 339from_091(long, V) -> {long, V}; 340from_091(unsignedbyte, V) -> {ubyte, V}; 341from_091(short, V) -> {short, V}; 342from_091(unsignedshort, V) -> {ushort, V}; 343from_091(unsignedint, V) -> {uint, V}; 344from_091(signedint, V) -> {int, V}; 345from_091(double, V) -> {double, V}; 346from_091(float, V) -> {float, V}; 347from_091(bool, V) -> {boolean, V}; 348from_091(binary, V) -> {binary, V}; 349from_091(timestamp, V) -> {timestamp, V * 1000}; 350from_091(byte, V) -> {byte, V}. 351 352% convert_header(signedint, V) -> [$I, <<V:32/signed>>]; 353% convert_header(decimal, V) -> {Before, After} = V, 354% [$D, Before, <<After:32>>]; 355% convert_header(timestamp, V) -> [$T, <<V:64>>]; 356% % convert_header(table, V) -> [$F | table_to_binary(V)]; 357% % convert_header(array, V) -> [$A | array_to_binary(V)]; 358% convert_header(byte, V) -> [$b, <<V:8/signed>>]; 359% convert_header(double, V) -> [$d, <<V:64/float>>]; 360% convert_header(float, V) -> [$f, <<V:32/float>>]; 361% convert_header(short, V) -> [$s, <<V:16/signed>>]; 362% convert_header(binary, V) -> [$x | long_string_to_binary(V)]; 363% convert_header(unsignedbyte, V) -> [$B, <<V:8/unsigned>>]; 364% convert_header(unsignedshort, V) -> [$u, <<V:16/unsigned>>]; 365% convert_header(unsignedint, V) -> [$i, <<V:32/unsigned>>]; 366% convert_header(void, _V) -> [$V]. 367 368utf8(T) -> {utf8, T}. 369symbol(T) -> {symbol, T}. 370 371message_id({uuid, UUID}, HKey, H0) -> 372 H = [{HKey, longstr, <<"uuid">>} | H0], 373 {H, rabbit_data_coercion:to_binary(rabbit_guid:to_string(UUID))}; 374message_id({ulong, N}, HKey, H0) -> 375 H = [{HKey, longstr, <<"ulong">>} | H0], 376 {H, erlang:integer_to_binary(N)}; 377message_id({binary, B}, HKey, H0) -> 378 E = base64:encode(B), 379 case byte_size(E) > 256 of 380 true -> 381 K = binary:replace(HKey, <<"-type">>, <<>>), 382 {[{K, longstr, B} | H0], undefined}; 383 false -> 384 H = [{HKey, longstr, <<"binary">>} | H0], 385 {H, E} 386 end; 387message_id({utf8, S}, HKey, H0) -> 388 case byte_size(S) > 256 of 389 true -> 390 K = binary:replace(HKey, <<"-type">>, <<>>), 391 {[{K, longstr, S} | H0], undefined}; 392 false -> 393 {H0, S} 394 end; 395message_id(MsgId, _, H) -> 396 {H, unwrap(MsgId)}. 397 398 unsupported_header_value_type(array) -> 399 true; 400 unsupported_header_value_type(table) -> 401 true; 402 unsupported_header_value_type(_) -> 403 false. 404 405-ifdef(TEST). 406-include_lib("eunit/include/eunit.hrl"). 407-endif. 408