1%%% This code was developped by Zhihui Jiao(jzhihui521@gmail.com). 2%%% 3%%% Copyright (C) 2013 Zhihui Jiao 4%%% 5%%% This program is free software; you can redistribute it and/or modify 6%%% it under the terms of the GNU General Public License as published by 7%%% the Free Software Foundation; either version 2 of the License, or 8%%% (at your option) any later version. 9%%% 10%%% This program is distributed in the hope that it will be useful, 11%%% but WITHOUT ANY WARRANTY; without even the implied warranty of 12%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13%%% GNU General Public License for more details. 14%%% 15%%% You should have received a copy of the GNU General Public License 16%%% along with this program; if not, write to the Free Software 17%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 18%%% 19%%% In addition, as a special exception, you have the permission to 20%%% link the code of this program with any library released under 21%%% the EPL license and distribute linked combinations including 22%%% the two; the MPL (Mozilla Public License), which EPL (Erlang 23%%% Public License) is based on, is included in this exception. 24 25-module(ts_amqp). 26 27-vc('$Id$ '). 28-author('jzhihui521@gmail.com'). 29 30-behavior(ts_plugin). 31 32-include("ts_profile.hrl"). 33-include("ts_config.hrl"). 34-include("ts_amqp.hrl"). 35-include("rabbit.hrl"). 36-include("rabbit_framing.hrl"). 37 38-export([add_dynparams/4, 39 get_message/2, 40 session_defaults/0, 41 parse/2, 42 dump/2, 43 parse_bidi/2, 44 parse_config/2, 45 decode_buffer/2, 46 new_session/0]). 47 48 49%%---------------------------------------------------------------------- 50%% Function: session_default/0 51%% Purpose: default parameters for session 52%% Returns: {ok, ack_type = parse|no_ack|local, persistent = true|false} 53%%---------------------------------------------------------------------- 54session_defaults() -> 55 {ok, true}. 56 57%% @spec decode_buffer(Buffer::binary(),Session::record(jabber)) -> 58%% NewBuffer::binary() 59%% @doc We need to decode buffer (remove chunks, decompress ...) for 60%% matching or dyn_variables 61%% @end 62decode_buffer(Buffer,#amqp_session{}) -> 63 Buffer. % nothing to do for amqp 64 65%%---------------------------------------------------------------------- 66%% Function: new_session/0 67%% Purpose: initialize session information 68%% Returns: record or [] 69%%---------------------------------------------------------------------- 70new_session() -> 71 #amqp_session{map_num_pa = gb_trees:empty(), ack_buf = <<>>}. 72 73dump(A,B) -> 74 ts_plugin:dump(A,B). 75%%---------------------------------------------------------------------- 76%% Function: get_message/1 77%% Purpose: Build a message/request , 78%% Args: record 79%% Returns: binary 80%%---------------------------------------------------------------------- 81get_message(Request = #amqp_request{channel = ChannelStr}, State) -> 82 ?DebugF("get message on channel: ~p ~p~n", [ChannelStr, Request]), 83 ChannelNum = list_to_integer(ChannelStr), 84 get_message1(Request#amqp_request{channel = ChannelNum}, State). 85 86get_message1(#amqp_request{type = connect}, #state_rcv{session = AMQPSession}) -> 87 Waiting = {0, 'connection.start'}, 88 {?PROTOCOL_HEADER, AMQPSession#amqp_session{status = handshake, 89 waiting = Waiting, 90 protocol = ?PROTOCOL}}; 91 92get_message1(#amqp_request{type = 'connection.start_ok', username = UserName, 93 password = Password}, 94 #state_rcv{session = AMQPSession}) -> 95 Protocol = AMQPSession#amqp_session.protocol, 96 97 ?DebugF("start with: user=~p, password=~p~n", [UserName, Password]), 98 99 Resp = plain(none, list_to_binary(UserName), list_to_binary(Password)), 100 StartOk = #'connection.start_ok'{client_properties = client_properties([]), 101 mechanism = <<"PLAIN">>, response = Resp}, 102 Frame = assemble_frame(0, StartOk, Protocol), 103 Waiting = {0, 'connection.tune'}, 104 {Frame, AMQPSession#amqp_session{waiting = Waiting}}; 105 106get_message1(#amqp_request{type = 'connection.tune_ok', heartbeat = HeartBeat}, 107 #state_rcv{session = AMQPSession}) -> 108 Protocol = AMQPSession#amqp_session.protocol, 109 110 Tune = #'connection.tune_ok'{frame_max = 131072, heartbeat = HeartBeat}, 111 Frame = assemble_frame(0, Tune, Protocol), 112 {Frame, AMQPSession#amqp_session{waiting = none}}; 113 114get_message1(#amqp_request{type = 'connection.open', vhost = VHost}, 115 #state_rcv{session = AMQPSession}) -> 116 Protocol = AMQPSession#amqp_session.protocol, 117 118 Open = #'connection.open'{virtual_host = list_to_binary(VHost)}, 119 Frame = assemble_frame(0, Open, Protocol), 120 Waiting = {0, 'connection.open_ok'}, 121 {Frame, AMQPSession#amqp_session{waiting = Waiting}}; 122 123get_message1(#amqp_request{type = 'channel.open', channel = Channel}, 124 #state_rcv{session = AMQPSession}) -> 125 Protocol = AMQPSession#amqp_session.protocol, 126 MapNPA = AMQPSession#amqp_session.map_num_pa, 127 128 ChannelOpen = #'channel.open'{}, 129 case new_number(Channel, AMQPSession) of 130 {ok, Number} -> 131 MapNPA1 = gb_trees:enter(Number, unused, MapNPA), 132 put({chstate, Number}, #ch{unconfirmed_set = gb_sets:new(), 133 next_pub_seqno = 0}), 134 Frame = assemble_frame(Number, ChannelOpen, Protocol), 135 Waiting = {Number, 'channel.open_ok'}, 136 {Frame, AMQPSession#amqp_session{waiting = Waiting, 137 map_num_pa = MapNPA1}}; 138 {error, _} -> 139 {<<>>, AMQPSession#amqp_session{waiting = none}} 140 end; 141 142get_message1(#amqp_request{type = 'channel.close', channel = Channel}, 143 #state_rcv{session = AMQPSession}) -> 144 Protocol = AMQPSession#amqp_session.protocol, 145 146 ChannelClose = #'channel.close'{reply_text = <<"Goodbye">>, 147 reply_code = 200, 148 class_id = 0, 149 method_id = 0}, 150 Frame = assemble_frame(Channel, ChannelClose, Protocol), 151 Waiting = {Channel, 'channel.close_ok'}, 152 {Frame, AMQPSession#amqp_session{waiting = Waiting}}; 153 154get_message1(#amqp_request{type = 'confirm.select', channel = Channel}, 155 #state_rcv{session = AMQPSession}) -> 156 Protocol = AMQPSession#amqp_session.protocol, 157 158 Confirm = #'confirm.select'{}, 159 Frame = assemble_frame(Channel, Confirm, Protocol), 160 Waiting = {Channel, 'confirm.select_ok'}, 161 {Frame, AMQPSession#amqp_session{waiting = Waiting}}; 162 163get_message1(#amqp_request{type = 'basic.qos', prefetch_size = PrefetchSize, 164 channel = Channel, 165 prefetch_count = PrefetchCount}, 166 #state_rcv{session = AMQPSession}) -> 167 Protocol = AMQPSession#amqp_session.protocol, 168 169 Qos = #'basic.qos'{prefetch_size = PrefetchSize, 170 prefetch_count = PrefetchCount}, 171 Frame = assemble_frame(Channel, Qos, Protocol), 172 Waiting = {Channel, 'basic.qos_ok'}, 173 {Frame, AMQPSession#amqp_session{waiting = Waiting}}; 174 175get_message1(#amqp_request{type = 'basic.publish', channel = Channel, 176 exchange = Exchange, routing_key = RoutingKey, 177 payload_size = Size, payload = Payload, 178 persistent = Persistent}, 179 #state_rcv{session = AMQPSession}) -> 180 Protocol = AMQPSession#amqp_session.protocol, 181 MsgPayload = case Payload of 182 "" -> list_to_binary(ts_utils:urandomstr_noflat(Size)); 183 _ -> list_to_binary(Payload) 184 end, 185 Publish = #'basic.publish'{exchange = list_to_binary(Exchange), 186 routing_key = list_to_binary(RoutingKey)}, 187 Msg = case Persistent of 188 true -> 189 Props = #'P_basic'{delivery_mode = 2}, %% persistent message 190 build_content(Props, MsgPayload); 191 false -> 192 Props = #'P_basic'{}, 193 build_content(Props, MsgPayload) 194 end, 195 Frame = assemble_frames(Channel, Publish, Msg, ?FRAME_MIN_SIZE, Protocol), 196 ChState = get({chstate, Channel}), 197 NewChState = case ChState#ch.next_pub_seqno of 198 0 -> 199 ChState; 200 SeqNo -> 201 USet = ChState#ch.unconfirmed_set, 202 ChState#ch{unconfirmed_set = gb_sets:add(SeqNo, USet), 203 next_pub_seqno = SeqNo + 1} 204 end, 205 put({chstate, Channel}, NewChState), 206 ts_mon_cache:add({count, amqp_published}), 207 {Frame, AMQPSession}; 208 209get_message1(#amqp_request{type = 'basic.consume', channel = Channel, 210 queue = Queue, ack = Ack}, 211 #state_rcv{session = AMQPSession}) -> 212 Protocol = AMQPSession#amqp_session.protocol, 213 214 NoAck = case Ack of 215 true -> false; 216 _ -> true 217 end, 218 219 ConsumerTag = list_to_binary(["tsung-", ts_utils:randombinstr(10)]), 220 Sub = #'basic.consume'{queue = list_to_binary(Queue), 221 consumer_tag = ConsumerTag, no_ack = NoAck}, 222 ChState = get({chstate, Channel}), 223 put({chstate, Channel}, ChState#ch{ack = Ack}), 224 Frame = assemble_frame(Channel, Sub, Protocol), 225 Waiting = {Channel, 'basic.consume_ok'}, 226 {Frame, AMQPSession#amqp_session{waiting = Waiting}}; 227 228get_message1(#amqp_request{type = 'connection.close'}, 229 #state_rcv{session = AMQPSession}) -> 230 Protocol = AMQPSession#amqp_session.protocol, 231 232 Close = #'connection.close'{reply_text = <<"Goodbye">>, 233 reply_code = 200, 234 class_id = 0, 235 method_id = 0}, 236 Frame = assemble_frame(0, Close, Protocol), 237 Waiting = {0, 'connection.close_ok'}, 238 {Frame, AMQPSession#amqp_session{waiting = Waiting}}. 239%%---------------------------------------------------------------------- 240%% Function: parse/2 241%% Purpose: parse the response from the server and keep information 242%% about the response in State#state_rcv.session 243%% Args: Data (binary), State (#state_rcv) 244%% Returns: {NewState, Options for socket (list), Close = true|false} 245%%---------------------------------------------------------------------- 246parse(closed, State) -> 247 {State#state_rcv{ack_done = true, datasize = 0}, [], true}; 248%% new response, compute data size (for stats) 249parse(Data, State=#state_rcv{acc = [], datasize = 0}) -> 250 parse(Data, State#state_rcv{datasize = size(Data)}); 251 252%% handshake stage, parse response, and validate 253parse(Data, State=#state_rcv{acc = []}) -> 254 do_parse(Data, State); 255 256%% more data, add this to accumulator and parse, update datasize 257parse(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) -> 258 NewSize= DataSize + size(Data), 259 parse(<< Acc/binary, Data/binary >>, 260 State#state_rcv{acc = [], datasize = NewSize}). 261 262parse_bidi(<<>>, State=#state_rcv{acc = [], session = AMQPSession}) -> 263 AckBuf = AMQPSession#amqp_session.ack_buf, 264 NewAMQPSession = AMQPSession#amqp_session{ack_buf = <<>>}, 265 ?DebugF("ack buf: ~p~n", [AckBuf]), 266 {confirm_ack_buf(AckBuf), State#state_rcv{session = NewAMQPSession},think}; 267parse_bidi(Data, State=#state_rcv{acc = [], session = AMQPSession}) -> 268 ?DebugF("parse bidi data: ~p ~p~n", [size(Data), Data]), 269 Protocol = AMQPSession#amqp_session.protocol, 270 AckBuf = AMQPSession#amqp_session.ack_buf, 271 case decode_frame(Protocol, Data) of 272 {error, _Reason} -> 273 ?DebugF("decode error: ~p~n", [_Reason]), 274 {nodata, State, think}; 275 {ok, heartbeat, Left} -> 276 ?DebugF("receive bidi: ~p~n", [heartbeat]), 277 HB = list_to_binary(rabbit_binary_generator:build_heartbeat_frame()), 278 NewAckBuf = <<AckBuf/binary, HB/binary>>, 279 NewAMQPSession = AMQPSession#amqp_session{ack_buf = NewAckBuf}, 280 parse_bidi(Left, State#state_rcv{session = NewAMQPSession}); 281 {ok, _, none, Left} -> 282 parse_bidi(Left, State); 283 {ok, Channel, Method, Left} -> 284 ?DebugF("receive bidi: ~p ~p~n", [Channel, Method]), 285 NewAMQPSession = should_ack(Channel, AckBuf, Method, AMQPSession), 286 parse_bidi(Left, State#state_rcv{session = NewAMQPSession}); 287 {incomplete, Left} -> 288 ?DebugF("incomplete frame: ~p~n", [Left]), 289 {confirm_ack_buf(AckBuf), State#state_rcv{acc = Left},think} 290 end; 291parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize, 292 session = AMQPSession}) -> 293 NewSize = DataSize + size(Data), 294 ?DebugF("parse bidi data: ~p ~p~n", [NewSize, Data, Acc]), 295 parse_bidi(<<Acc/binary, Data/binary>>, 296 State#state_rcv{acc = [], datasize = NewSize, session = 297 AMQPSession#amqp_session{ack_buf = <<>>}}). 298 299%%---------------------------------------------------------------------- 300%% Function: parse_config/2 301%% Purpose: parse tags in the XML config file related to the protocol 302%% Returns: List 303%%---------------------------------------------------------------------- 304parse_config(Element, Conf) -> 305 ts_config_amqp:parse_config(Element, Conf). 306 307%%---------------------------------------------------------------------- 308%% Function: add_dynparams/4 309%% Purpose: we dont actually do anything 310%% Returns: #amqp_request 311%%---------------------------------------------------------------------- 312add_dynparams(false, {_DynVars, _Session}, Param, _HostData) -> 313 Param; 314add_dynparams(true, {DynVars, _Session}, 315 Req = #amqp_request{channel = Channel, payload = Payload, 316 exchange = Exchange, routing_key = RoutingKey, 317 queue = Queue}, _HostData) -> 318 SubstChannel = ts_search:subst(Channel, DynVars), 319 SubstPayload = ts_search:subst(Payload, DynVars), 320 SubstExchange = ts_search:subst(Exchange, DynVars), 321 SubstRoutingKey = ts_search:subst(RoutingKey, DynVars), 322 SubstQueue = ts_search:subst(Queue, DynVars), 323 Req#amqp_request{channel = SubstChannel, payload = SubstPayload, 324 exchange = SubstExchange, routing_key = SubstRoutingKey, 325 queue = SubstQueue}. 326 327%%---------------------------------------------------------------------- 328plain(none, Username, Password) -> 329 <<0, Username/binary, 0, Password/binary>>. 330 331do_parse(Data, State = #state_rcv{session = AMQPSession}) -> 332 ?DebugF("start do_parse: ~p ~n", [Data]), 333 Protocol = AMQPSession#amqp_session.protocol, 334 Waiting = AMQPSession#amqp_session.waiting, 335 case decode_and_check(Data, Waiting, State, Protocol) of 336 {ok, _Method, Result} -> 337 Result; 338 {fail, Result} -> 339 Result 340 end. 341 342get_post_fun(_Channel, 'connection.open_ok') -> 343 fun({NewState, Options, Close}) -> 344 AMQPSession = NewState#state_rcv.session, 345 NewAMQPSession = AMQPSession#amqp_session{status = connected}, 346 NewState1 = NewState#state_rcv{session = NewAMQPSession}, 347 ts_mon_cache:add({count, amqp_connected}), 348 {NewState1, Options, Close} 349 end; 350 351get_post_fun(_Channel, 'channel.open_ok') -> 352 fun({NewState, Options, Close}) -> 353 ts_mon_cache:add({count, amqp_channel_opened}), 354 {NewState, Options, Close} 355 end; 356 357get_post_fun(_Channel, 'channel.close_ok') -> 358 fun({NewState, Options, Close}) -> 359 ts_mon_cache:add({count, amqp_channel_closed}), 360 {NewState, Options, Close} 361 end; 362 363get_post_fun(Channel, 'confirm.select_ok') -> 364 fun({NewState, Options, Close}) -> 365 ChState = get({chstate, Channel}), 366 NewChState = ChState#ch{next_pub_seqno = 1}, 367 put({chstate, Channel}, NewChState), 368 NewState1 = NewState#state_rcv{acc = []}, 369 {NewState1, Options, Close} 370 end; 371 372get_post_fun(_Channel, 'basic.consume_ok') -> 373 fun({NewState, Options, Close}) -> 374 AMQPSession = NewState#state_rcv.session, 375 Socket = NewState#state_rcv.socket, 376 ts_mon_cache:add({count, amqp_consumer}), 377 LeftData = NewState#state_rcv.acc, 378 NewAMQPSession = AMQPSession#amqp_session{waiting = none}, 379 NewState1 = NewState#state_rcv{acc = [], session = NewAMQPSession}, 380 case LeftData of 381 <<>> -> ok; 382 %% trick, trigger the parse_bidi call 383 _ -> self() ! {gen_ts_transport, Socket, LeftData} 384 end, 385 {NewState1, Options, Close} 386 end; 387 388get_post_fun(_Channel, 'connection.close_ok') -> 389 fun({NewState, Options, _Close}) -> 390 ts_mon_cache:add({count, amqp_closed}), 391 {NewState, Options, true} 392 end; 393 394get_post_fun(_Channel, _) -> 395 fun({NewState, Options, Close}) -> 396 AMQPSession = NewState#state_rcv.session, 397 NewAMQPSession = AMQPSession#amqp_session{waiting = none}, 398 NewState1 = NewState#state_rcv{session = NewAMQPSession}, 399 {NewState1, Options, Close} 400 end. 401 402new_number(0, #amqp_session{channel_max = ChannelMax, 403 map_num_pa = MapNPA}) -> 404 case gb_trees:is_empty(MapNPA) of 405 true -> {ok, 1}; 406 false -> {Smallest, _} = gb_trees:smallest(MapNPA), 407 if Smallest > 1 -> 408 {ok, Smallest - 1}; 409 true -> 410 {Largest, _} = gb_trees:largest(MapNPA), 411 if Largest < ChannelMax -> {ok, Largest + 1}; 412 true -> find_free(MapNPA) 413 end 414 end 415 end; 416new_number(Proposed, Session = #amqp_session{channel_max = ChannelMax, 417 map_num_pa = MapNPA}) -> 418 IsValid = Proposed > 0 andalso Proposed =< ChannelMax andalso 419 not gb_trees:is_defined(Proposed, MapNPA), 420 case IsValid of true -> {ok, Proposed}; 421 false -> new_number(none, Session) 422 end. 423 424find_free(MapNPA) -> 425 find_free(gb_trees:iterator(MapNPA), 1). 426 427find_free(It, Candidate) -> 428 case gb_trees:next(It) of 429 {Number, _, It1} -> if Number > Candidate -> 430 {ok, Number - 1}; 431 Number =:= Candidate -> 432 find_free(It1, Candidate + 1) 433 end; 434 none -> {error, out_of_channel_numbers} 435 end. 436 437confirm_ack_buf(AckBuf) -> 438 case AckBuf of 439 <<>> -> nodata; 440 _ -> AckBuf 441 end. 442 443should_ack(Channel, AckBuf, #'basic.deliver'{delivery_tag = DeliveryTag}, 444 AMQPSession = #amqp_session{protocol = Protocol}) -> 445 ChState = get({chstate, Channel}), 446 case ChState#ch.ack of 447 true -> 448 ?DebugF("delivered: ~p ~n", [ack]), 449 Ack = #'basic.ack'{delivery_tag = DeliveryTag}, 450 Frame = assemble_frame(Channel, Ack, Protocol), 451 ts_mon_cache:add({count, amqp_delivered}), 452 NewAckBuf = case AckBuf of 453 nodata -> Frame; 454 _ -> <<AckBuf/binary, Frame/binary>> 455 end, 456 AMQPSession#amqp_session{ack_buf = NewAckBuf}; 457 false -> 458 ?DebugF("delivered: ~p ~n", [noack]), 459 ts_mon_cache:add({count, amqp_delivered}), 460 AMQPSession#amqp_session{ack_buf = AckBuf} 461 end; 462should_ack(Channel, AckBuf, Method = #'basic.ack'{}, AMQPSession) -> 463 ?DebugF("publish confirm: ~p ~n", [ack]), 464 update_confirm_set(Channel, Method), 465 AMQPSession#amqp_session{ack_buf = AckBuf}; 466should_ack(Channel, AckBuf, Method = #'basic.nack'{}, AMQPSession) -> 467 ?DebugF("publish confirm: ~p ~n", [nack]), 468 update_confirm_set(Channel, Method), 469 AMQPSession#amqp_session{ack_buf = AckBuf}; 470should_ack(_Channel, AckBuf, _Method, AMQPSession) -> 471 ?DebugF("delivered: ~p ~n", [other]), 472 AMQPSession#amqp_session{ack_buf = AckBuf}. 473 474update_confirm_set(Channel, #'basic.ack'{delivery_tag = SeqNo, multiple = Multiple}) -> 475 ChState = get({chstate, Channel}), 476 USet = ChState#ch.unconfirmed_set, 477 USet1 = update_unconfirmed(ack, SeqNo, Multiple, USet), 478 put({chstate, Channel}, ChState#ch{unconfirmed_set = USet1}); 479update_confirm_set(Channel, #'basic.nack'{delivery_tag = SeqNo, multiple = Multiple}) -> 480 ChState = get({chstate, Channel}), 481 USet = ChState#ch.unconfirmed_set, 482 USet1 = update_unconfirmed(nack, SeqNo, Multiple, USet), 483 put({chstate, Channel}, ChState#ch{unconfirmed_set = USet1}). 484 485update_unconfirmed(AckType, SeqNo, false, USet) -> 486 add_ack_stat(AckType), 487 gb_sets:del_element(SeqNo, USet); 488update_unconfirmed(AckType, SeqNo, true, USet) -> 489 case gb_sets:is_empty(USet) of 490 true -> USet; 491 false -> {S, USet1} = gb_sets:take_smallest(USet), 492 case S > SeqNo of 493 true -> USet; 494 false -> 495 add_ack_stat(AckType), 496 update_unconfirmed(AckType, SeqNo, true, USet1) 497 end 498 end. 499 500add_ack_stat(ack) -> 501 ts_mon_cache:add({count, amqp_confirmed}); 502add_ack_stat(nack) -> 503 ts_mon_cache:add({count, amqp_unconfirmed}). 504 505client_properties(UserProperties) -> 506 Default = [{<<"product">>, longstr, <<"Tsung">>}, 507 {<<"version">>, longstr, list_to_binary("0.0.1")}, 508 {<<"platform">>, longstr, <<"Erlang">>}, 509 {<<"capabilities">>, table, ?CLIENT_CAPABILITIES}], 510 lists:foldl(fun({K, _, _} = Tuple, Acc) -> 511 lists:keystore(K, 1, Acc, Tuple) 512 end, Default, UserProperties). 513 514assemble_frame(Channel, MethodRecord, Protocol) -> 515 list_to_binary(rabbit_binary_generator:build_simple_method_frame( 516 Channel, MethodRecord, Protocol)). 517 518assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> 519 MethodName = rabbit_misc:method_record_type(MethodRecord), 520 true = Protocol:method_has_content(MethodName), % assertion 521 MethodFrame = rabbit_binary_generator:build_simple_method_frame( 522 Channel, MethodRecord, Protocol), 523 ContentFrames = rabbit_binary_generator:build_simple_content_frames( 524 Channel, Content, FrameMax, Protocol), 525 list_to_binary([MethodFrame | ContentFrames]). 526 527build_content(Properties, BodyBin) when is_binary(BodyBin) -> 528 build_content(Properties, [BodyBin]); 529 530build_content(Properties, PFR) -> 531 %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 532 {ClassId, _MethodId} = 533 rabbit_framing_amqp_0_9_1:method_id('basic.publish'), 534 #content{class_id = ClassId, 535 properties = Properties, 536 properties_bin = none, 537 protocol = none, 538 payload_fragments_rev = PFR}. 539 540decode_and_check(Data, Waiting, State, Protocol) -> 541 case decode_frame(Protocol, Data) of 542 {error, _Reason} -> 543 ?DebugF("decode error: ~p~n", [_Reason]), 544 ts_mon_cache:add({count, amqp_error}), 545 {fail, {State#state_rcv{ack_done = true}, [], true}}; 546 {ok, heartbeat, Left} -> 547 {ok, heartbeat, {State#state_rcv{ack_done = false, acc = Left}, 548 [], true}}; 549 {ok, Channel, Method, Left} -> 550 check(Channel, Waiting, Method, State, Left); 551 {incomplete, Left} -> 552 ?DebugF("incomplete frame: ~p~n", [Left]), 553 {fail, {State#state_rcv{ack_done = false, acc = Left}, [], false}} 554 end. 555 556check(Channel, {Channel, Expecting}, Method, State, Left) -> 557 ?DebugF("receive from server: ~p~n", [Method]), 558 case {Expecting, element(1, Method)} of 559 {E, M} when E =:= M -> 560 PostFun = get_post_fun(Channel, Expecting), 561 {ok, Method, 562 PostFun({State#state_rcv{ack_done = true, acc = Left}, [], false})}; 563 _ -> 564 ts_mon_cache:add({count, amqp_unexpected}), 565 ?DebugF("unexpected_method: ~p, expecting ~p~n", 566 [Method, Expecting]), 567 {fail, {State#state_rcv{ack_done = true}, [], true}} 568 end; 569check(Channel, Waiting = {WaitingCh, Expecting}, Method = #'basic.deliver'{}, 570 State = #state_rcv{session = AMQPSession}, Left) -> 571 ?LOGF("waiting on ~p, expecting ~p, but receive deliver on ~p ~p~n", 572 [WaitingCh, Expecting, Channel, Method], ?NOTICE), 573 AckBuf = AMQPSession#amqp_session.ack_buf, 574 NewAMQPSession = should_ack(Channel, AckBuf, Method, AMQPSession), 575 Protocol = AMQPSession#amqp_session.protocol, 576 decode_and_check(Left, Waiting, 577 State#state_rcv{session = NewAMQPSession}, Protocol); 578check(Channel, Waiting = {WaitingCh, Expecting}, Method, 579 State = #state_rcv{session = AMQPSession}, Left) -> 580 ?LOGF("waiting on ~p, but received on ~p, expecting: ~p, actual: ~p~n", 581 [WaitingCh, Channel, Expecting, Method], ?NOTICE), 582 Protocol = AMQPSession#amqp_session.protocol, 583 decode_and_check(Left, Waiting, State, Protocol). 584 585decode_frame(Protocol, <<Type:8, Channel:16, Length:32, Body/binary>>) 586 when size(Body) > Length -> 587 <<PayLoad:Length/binary, ?FRAME_END, Left/binary>> = Body, 588 case rabbit_command_assembler:analyze_frame(Type, PayLoad, Protocol) of 589 heartbeat -> {ok, heartbeat, Left}; 590 AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, Protocol, Left) 591 end; 592decode_frame(_Protocol, Data) -> 593 {incomplete, Data}. 594 595process_frame(Frame, Channel, Protocol, Left) -> 596 AState = case get({channel, Channel}) of 597 undefined -> {ok, InitAState} = rabbit_command_assembler:init(Protocol), 598 InitAState; 599 AState1-> AState1 600 end, 601 case process_channel_frame(Frame, AState, Left) of 602 {ok, Method, NewAState, Left} -> 603 put({channel, Channel}, NewAState), 604 {ok, Channel, Method, Left}; 605 Other -> Other 606 end. 607 608process_channel_frame(Frame, AState, Left) -> 609 case rabbit_command_assembler:process(Frame, AState) of 610 {ok, NewAState} -> 611 {ok, none, NewAState, Left}; 612 {ok, Method, NewAState} -> 613 {ok, Method, NewAState, Left}; 614 {ok, Method, _Content, NewAState} -> 615 {ok, Method, NewAState, Left}; 616 {error, Reason} -> {error, Reason} 617 end. 618