1%%%  This code was developped by Zhihui Jiao(jzhihui521@gmail.com).
2%%%
3%%%  Copyright (C) 2013 Zhihui Jiao
4%%%
5%%%  This program is free software; you can redistribute it and/or modify
6%%%  it under the terms of the GNU General Public License as published by
7%%%  the Free Software Foundation; either version 2 of the License, or
8%%%  (at your option) any later version.
9%%%
10%%%  This program is distributed in the hope that it will be useful,
11%%%  but WITHOUT ANY WARRANTY; without even the implied warranty of
12%%%  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13%%%  GNU General Public License for more details.
14%%%
15%%%  You should have received a copy of the GNU General Public License
16%%%  along with this program; if not, write to the Free Software
17%%%  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
18%%%
19%%%  In addition, as a special exception, you have the permission to
20%%%  link the code of this program with any library released under
21%%%  the EPL license and distribute linked combinations including
22%%%  the two; the MPL (Mozilla Public License), which EPL (Erlang
23%%%  Public License) is based on, is included in this exception.
24
25-module(ts_amqp).
26
27-vc('$Id$ ').
28-author('jzhihui521@gmail.com').
29
30-behavior(ts_plugin).
31
32-include("ts_profile.hrl").
33-include("ts_config.hrl").
34-include("ts_amqp.hrl").
35-include("rabbit.hrl").
36-include("rabbit_framing.hrl").
37
38-export([add_dynparams/4,
39         get_message/2,
40         session_defaults/0,
41         parse/2,
42         dump/2,
43         parse_bidi/2,
44         parse_config/2,
45         decode_buffer/2,
46         new_session/0]).
47
48
49%%----------------------------------------------------------------------
50%% Function: session_default/0
51%% Purpose: default parameters for session
52%% Returns: {ok, ack_type = parse|no_ack|local, persistent = true|false}
53%%----------------------------------------------------------------------
54session_defaults() ->
55    {ok, true}.
56
57%% @spec decode_buffer(Buffer::binary(),Session::record(jabber)) ->
58%%      NewBuffer::binary()
59%% @doc We need to decode buffer (remove chunks, decompress ...) for
60%%      matching or dyn_variables
61%% @end
62decode_buffer(Buffer,#amqp_session{}) ->
63    Buffer. % nothing to do for amqp
64
65%%----------------------------------------------------------------------
66%% Function: new_session/0
67%% Purpose: initialize session information
68%% Returns: record or []
69%%----------------------------------------------------------------------
70new_session() ->
71    #amqp_session{map_num_pa = gb_trees:empty(), ack_buf = <<>>}.
72
73dump(A,B) ->
74    ts_plugin:dump(A,B).
75%%----------------------------------------------------------------------
76%% Function: get_message/1
77%% Purpose: Build a message/request ,
78%% Args:	record
79%% Returns: binary
80%%----------------------------------------------------------------------
81get_message(Request = #amqp_request{channel = ChannelStr}, State) ->
82    ?DebugF("get message on channel: ~p ~p~n", [ChannelStr, Request]),
83    ChannelNum = list_to_integer(ChannelStr),
84    get_message1(Request#amqp_request{channel = ChannelNum}, State).
85
86get_message1(#amqp_request{type = connect}, #state_rcv{session = AMQPSession}) ->
87    Waiting = {0, 'connection.start'},
88    {?PROTOCOL_HEADER, AMQPSession#amqp_session{status = handshake,
89                                                waiting = Waiting,
90                                                protocol = ?PROTOCOL}};
91
92get_message1(#amqp_request{type = 'connection.start_ok', username = UserName,
93                          password = Password},
94            #state_rcv{session = AMQPSession}) ->
95    Protocol = AMQPSession#amqp_session.protocol,
96
97    ?DebugF("start with: user=~p, password=~p~n", [UserName, Password]),
98
99    Resp = plain(none, list_to_binary(UserName), list_to_binary(Password)),
100    StartOk = #'connection.start_ok'{client_properties = client_properties([]),
101                                     mechanism = <<"PLAIN">>, response = Resp},
102    Frame = assemble_frame(0, StartOk, Protocol),
103    Waiting = {0, 'connection.tune'},
104    {Frame, AMQPSession#amqp_session{waiting = Waiting}};
105
106get_message1(#amqp_request{type = 'connection.tune_ok', heartbeat = HeartBeat},
107            #state_rcv{session = AMQPSession}) ->
108    Protocol = AMQPSession#amqp_session.protocol,
109
110    Tune = #'connection.tune_ok'{frame_max = 131072, heartbeat = HeartBeat},
111    Frame = assemble_frame(0, Tune, Protocol),
112    {Frame, AMQPSession#amqp_session{waiting = none}};
113
114get_message1(#amqp_request{type = 'connection.open', vhost = VHost},
115            #state_rcv{session = AMQPSession}) ->
116    Protocol = AMQPSession#amqp_session.protocol,
117
118    Open = #'connection.open'{virtual_host = list_to_binary(VHost)},
119    Frame = assemble_frame(0, Open, Protocol),
120    Waiting = {0, 'connection.open_ok'},
121    {Frame, AMQPSession#amqp_session{waiting = Waiting}};
122
123get_message1(#amqp_request{type = 'channel.open', channel = Channel},
124            #state_rcv{session = AMQPSession}) ->
125    Protocol = AMQPSession#amqp_session.protocol,
126    MapNPA = AMQPSession#amqp_session.map_num_pa,
127
128    ChannelOpen = #'channel.open'{},
129    case new_number(Channel, AMQPSession) of
130        {ok, Number} ->
131            MapNPA1 = gb_trees:enter(Number, unused, MapNPA),
132            put({chstate, Number}, #ch{unconfirmed_set = gb_sets:new(),
133                                       next_pub_seqno = 0}),
134            Frame = assemble_frame(Number, ChannelOpen, Protocol),
135            Waiting = {Number, 'channel.open_ok'},
136            {Frame, AMQPSession#amqp_session{waiting = Waiting,
137                                             map_num_pa = MapNPA1}};
138        {error, _} ->
139            {<<>>, AMQPSession#amqp_session{waiting = none}}
140    end;
141
142get_message1(#amqp_request{type = 'channel.close', channel = Channel},
143            #state_rcv{session = AMQPSession}) ->
144    Protocol = AMQPSession#amqp_session.protocol,
145
146    ChannelClose = #'channel.close'{reply_text = <<"Goodbye">>,
147                                    reply_code = 200,
148                                    class_id   = 0,
149                                    method_id  = 0},
150    Frame = assemble_frame(Channel, ChannelClose, Protocol),
151    Waiting = {Channel, 'channel.close_ok'},
152    {Frame, AMQPSession#amqp_session{waiting = Waiting}};
153
154get_message1(#amqp_request{type = 'confirm.select', channel = Channel},
155            #state_rcv{session = AMQPSession}) ->
156    Protocol = AMQPSession#amqp_session.protocol,
157
158    Confirm = #'confirm.select'{},
159    Frame = assemble_frame(Channel, Confirm, Protocol),
160    Waiting = {Channel, 'confirm.select_ok'},
161    {Frame, AMQPSession#amqp_session{waiting = Waiting}};
162
163get_message1(#amqp_request{type = 'basic.qos', prefetch_size = PrefetchSize,
164                          channel = Channel,
165                          prefetch_count = PrefetchCount},
166            #state_rcv{session = AMQPSession}) ->
167    Protocol = AMQPSession#amqp_session.protocol,
168
169    Qos = #'basic.qos'{prefetch_size = PrefetchSize,
170                       prefetch_count = PrefetchCount},
171    Frame = assemble_frame(Channel, Qos, Protocol),
172    Waiting = {Channel, 'basic.qos_ok'},
173    {Frame, AMQPSession#amqp_session{waiting = Waiting}};
174
175get_message1(#amqp_request{type = 'basic.publish', channel = Channel,
176                          exchange = Exchange, routing_key = RoutingKey,
177                          payload_size = Size, payload = Payload,
178                          persistent = Persistent},
179            #state_rcv{session = AMQPSession}) ->
180    Protocol = AMQPSession#amqp_session.protocol,
181    MsgPayload = case Payload of
182                     "" -> list_to_binary(ts_utils:urandomstr_noflat(Size));
183                     _ -> list_to_binary(Payload)
184                 end,
185    Publish = #'basic.publish'{exchange = list_to_binary(Exchange),
186                               routing_key = list_to_binary(RoutingKey)},
187    Msg = case Persistent of
188        true ->
189            Props = #'P_basic'{delivery_mode = 2}, %% persistent message
190            build_content(Props, MsgPayload);
191        false ->
192            Props = #'P_basic'{},
193            build_content(Props, MsgPayload)
194    end,
195    Frame = assemble_frames(Channel, Publish, Msg, ?FRAME_MIN_SIZE, Protocol),
196    ChState = get({chstate, Channel}),
197    NewChState = case ChState#ch.next_pub_seqno of
198        0 ->
199            ChState;
200        SeqNo ->
201            USet = ChState#ch.unconfirmed_set,
202            ChState#ch{unconfirmed_set = gb_sets:add(SeqNo, USet),
203                                     next_pub_seqno = SeqNo + 1}
204    end,
205    put({chstate, Channel}, NewChState),
206    ts_mon_cache:add({count, amqp_published}),
207    {Frame, AMQPSession};
208
209get_message1(#amqp_request{type = 'basic.consume', channel = Channel,
210                           queue = Queue, ack = Ack},
211             #state_rcv{session = AMQPSession}) ->
212    Protocol = AMQPSession#amqp_session.protocol,
213
214    NoAck = case Ack of
215        true -> false;
216        _ -> true
217    end,
218
219    ConsumerTag = list_to_binary(["tsung-", ts_utils:randombinstr(10)]),
220    Sub = #'basic.consume'{queue = list_to_binary(Queue),
221                           consumer_tag = ConsumerTag, no_ack = NoAck},
222    ChState = get({chstate, Channel}),
223    put({chstate, Channel}, ChState#ch{ack = Ack}),
224    Frame = assemble_frame(Channel, Sub, Protocol),
225    Waiting = {Channel, 'basic.consume_ok'},
226    {Frame, AMQPSession#amqp_session{waiting = Waiting}};
227
228get_message1(#amqp_request{type = 'connection.close'},
229            #state_rcv{session = AMQPSession}) ->
230    Protocol = AMQPSession#amqp_session.protocol,
231
232    Close = #'connection.close'{reply_text = <<"Goodbye">>,
233                                reply_code = 200,
234                                class_id   = 0,
235                                method_id  = 0},
236    Frame = assemble_frame(0, Close, Protocol),
237    Waiting = {0, 'connection.close_ok'},
238    {Frame, AMQPSession#amqp_session{waiting = Waiting}}.
239%%----------------------------------------------------------------------
240%% Function: parse/2
241%% Purpose: parse the response from the server and keep information
242%%          about the response in State#state_rcv.session
243%% Args:	Data (binary), State (#state_rcv)
244%% Returns: {NewState, Options for socket (list), Close = true|false}
245%%----------------------------------------------------------------------
246parse(closed, State) ->
247    {State#state_rcv{ack_done = true, datasize = 0}, [], true};
248%% new response, compute data size (for stats)
249parse(Data, State=#state_rcv{acc = [], datasize = 0}) ->
250    parse(Data, State#state_rcv{datasize = size(Data)});
251
252%% handshake stage, parse response, and validate
253parse(Data, State=#state_rcv{acc = []}) ->
254    do_parse(Data, State);
255
256%% more data, add this to accumulator and parse, update datasize
257parse(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) ->
258    NewSize= DataSize + size(Data),
259    parse(<< Acc/binary, Data/binary >>,
260          State#state_rcv{acc = [], datasize = NewSize}).
261
262parse_bidi(<<>>, State=#state_rcv{acc = [], session = AMQPSession}) ->
263    AckBuf = AMQPSession#amqp_session.ack_buf,
264    NewAMQPSession = AMQPSession#amqp_session{ack_buf = <<>>},
265    ?DebugF("ack buf: ~p~n", [AckBuf]),
266    {confirm_ack_buf(AckBuf), State#state_rcv{session = NewAMQPSession},think};
267parse_bidi(Data, State=#state_rcv{acc = [], session = AMQPSession}) ->
268    ?DebugF("parse bidi data: ~p ~p~n", [size(Data), Data]),
269    Protocol = AMQPSession#amqp_session.protocol,
270    AckBuf = AMQPSession#amqp_session.ack_buf,
271    case decode_frame(Protocol, Data) of
272        {error, _Reason} ->
273            ?DebugF("decode error: ~p~n", [_Reason]),
274            {nodata, State, think};
275        {ok, heartbeat, Left} ->
276            ?DebugF("receive bidi: ~p~n", [heartbeat]),
277            HB = list_to_binary(rabbit_binary_generator:build_heartbeat_frame()),
278            NewAckBuf = <<AckBuf/binary, HB/binary>>,
279            NewAMQPSession = AMQPSession#amqp_session{ack_buf = NewAckBuf},
280            parse_bidi(Left, State#state_rcv{session = NewAMQPSession});
281        {ok, _, none, Left} ->
282            parse_bidi(Left, State);
283        {ok, Channel, Method, Left} ->
284            ?DebugF("receive bidi: ~p ~p~n", [Channel, Method]),
285            NewAMQPSession = should_ack(Channel, AckBuf, Method, AMQPSession),
286            parse_bidi(Left, State#state_rcv{session = NewAMQPSession});
287        {incomplete, Left} ->
288            ?DebugF("incomplete frame: ~p~n", [Left]),
289            {confirm_ack_buf(AckBuf), State#state_rcv{acc = Left},think}
290    end;
291parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize,
292                                  session = AMQPSession}) ->
293    NewSize = DataSize + size(Data),
294    ?DebugF("parse bidi data: ~p ~p~n", [NewSize, Data, Acc]),
295    parse_bidi(<<Acc/binary, Data/binary>>,
296               State#state_rcv{acc = [], datasize = NewSize, session =
297                               AMQPSession#amqp_session{ack_buf = <<>>}}).
298
299%%----------------------------------------------------------------------
300%% Function: parse_config/2
301%% Purpose:  parse tags in the XML config file related to the protocol
302%% Returns:  List
303%%----------------------------------------------------------------------
304parse_config(Element, Conf) ->
305	ts_config_amqp:parse_config(Element, Conf).
306
307%%----------------------------------------------------------------------
308%% Function: add_dynparams/4
309%% Purpose: we dont actually do anything
310%% Returns: #amqp_request
311%%----------------------------------------------------------------------
312add_dynparams(false, {_DynVars, _Session}, Param, _HostData) ->
313    Param;
314add_dynparams(true, {DynVars, _Session},
315              Req = #amqp_request{channel = Channel, payload = Payload,
316                                  exchange = Exchange, routing_key = RoutingKey,
317                                  queue = Queue}, _HostData) ->
318    SubstChannel = ts_search:subst(Channel, DynVars),
319    SubstPayload = ts_search:subst(Payload, DynVars),
320    SubstExchange = ts_search:subst(Exchange, DynVars),
321    SubstRoutingKey = ts_search:subst(RoutingKey, DynVars),
322    SubstQueue = ts_search:subst(Queue, DynVars),
323    Req#amqp_request{channel = SubstChannel, payload = SubstPayload,
324                     exchange = SubstExchange, routing_key = SubstRoutingKey,
325                     queue = SubstQueue}.
326
327%%----------------------------------------------------------------------
328plain(none, Username, Password) ->
329    <<0, Username/binary, 0, Password/binary>>.
330
331do_parse(Data, State = #state_rcv{session = AMQPSession}) ->
332    ?DebugF("start do_parse: ~p ~n", [Data]),
333    Protocol = AMQPSession#amqp_session.protocol,
334    Waiting = AMQPSession#amqp_session.waiting,
335    case decode_and_check(Data, Waiting, State, Protocol) of
336        {ok, _Method, Result} ->
337            Result;
338        {fail, Result} ->
339            Result
340    end.
341
342get_post_fun(_Channel, 'connection.open_ok') ->
343    fun({NewState, Options, Close}) ->
344            AMQPSession = NewState#state_rcv.session,
345            NewAMQPSession = AMQPSession#amqp_session{status = connected},
346            NewState1 = NewState#state_rcv{session = NewAMQPSession},
347            ts_mon_cache:add({count, amqp_connected}),
348            {NewState1, Options, Close}
349    end;
350
351get_post_fun(_Channel, 'channel.open_ok') ->
352    fun({NewState, Options, Close}) ->
353            ts_mon_cache:add({count, amqp_channel_opened}),
354            {NewState, Options, Close}
355    end;
356
357get_post_fun(_Channel, 'channel.close_ok') ->
358    fun({NewState, Options, Close}) ->
359            ts_mon_cache:add({count, amqp_channel_closed}),
360            {NewState, Options, Close}
361    end;
362
363get_post_fun(Channel, 'confirm.select_ok') ->
364    fun({NewState, Options, Close}) ->
365            ChState = get({chstate, Channel}),
366            NewChState = ChState#ch{next_pub_seqno = 1},
367            put({chstate, Channel}, NewChState),
368            NewState1 = NewState#state_rcv{acc = []},
369            {NewState1, Options, Close}
370    end;
371
372get_post_fun(_Channel, 'basic.consume_ok') ->
373    fun({NewState, Options, Close}) ->
374            AMQPSession = NewState#state_rcv.session,
375            Socket = NewState#state_rcv.socket,
376            ts_mon_cache:add({count, amqp_consumer}),
377            LeftData = NewState#state_rcv.acc,
378            NewAMQPSession = AMQPSession#amqp_session{waiting = none},
379            NewState1 = NewState#state_rcv{acc = [], session = NewAMQPSession},
380            case LeftData of
381                <<>> -> ok;
382                %% trick, trigger the parse_bidi call
383                _ -> self() ! {gen_ts_transport, Socket, LeftData}
384            end,
385            {NewState1, Options, Close}
386    end;
387
388get_post_fun(_Channel, 'connection.close_ok') ->
389    fun({NewState, Options, _Close}) ->
390            ts_mon_cache:add({count, amqp_closed}),
391            {NewState, Options, true}
392    end;
393
394get_post_fun(_Channel, _) ->
395    fun({NewState, Options, Close}) ->
396            AMQPSession = NewState#state_rcv.session,
397            NewAMQPSession = AMQPSession#amqp_session{waiting = none},
398            NewState1 = NewState#state_rcv{session = NewAMQPSession},
399            {NewState1, Options, Close}
400    end.
401
402new_number(0, #amqp_session{channel_max = ChannelMax,
403                               map_num_pa = MapNPA}) ->
404    case gb_trees:is_empty(MapNPA) of
405        true  -> {ok, 1};
406        false -> {Smallest, _} = gb_trees:smallest(MapNPA),
407                 if Smallest > 1 ->
408                        {ok, Smallest - 1};
409                    true ->
410                        {Largest, _} = gb_trees:largest(MapNPA),
411                        if Largest < ChannelMax -> {ok, Largest + 1};
412                           true                 -> find_free(MapNPA)
413                        end
414                 end
415    end;
416new_number(Proposed, Session = #amqp_session{channel_max = ChannelMax,
417                                             map_num_pa  = MapNPA}) ->
418    IsValid = Proposed > 0 andalso Proposed =< ChannelMax andalso
419        not gb_trees:is_defined(Proposed, MapNPA),
420    case IsValid of true  -> {ok, Proposed};
421                    false -> new_number(none, Session)
422    end.
423
424find_free(MapNPA) ->
425    find_free(gb_trees:iterator(MapNPA), 1).
426
427find_free(It, Candidate) ->
428    case gb_trees:next(It) of
429        {Number, _, It1} -> if Number > Candidate ->
430                                   {ok, Number - 1};
431                               Number =:= Candidate ->
432                                   find_free(It1, Candidate + 1)
433                            end;
434        none             -> {error, out_of_channel_numbers}
435    end.
436
437confirm_ack_buf(AckBuf) ->
438    case AckBuf of
439        <<>> -> nodata;
440        _ -> AckBuf
441    end.
442
443should_ack(Channel, AckBuf, #'basic.deliver'{delivery_tag = DeliveryTag},
444           AMQPSession = #amqp_session{protocol = Protocol}) ->
445    ChState = get({chstate, Channel}),
446    case ChState#ch.ack of
447        true ->
448            ?DebugF("delivered: ~p ~n", [ack]),
449            Ack = #'basic.ack'{delivery_tag = DeliveryTag},
450            Frame = assemble_frame(Channel, Ack, Protocol),
451            ts_mon_cache:add({count, amqp_delivered}),
452            NewAckBuf = case AckBuf of
453                nodata -> Frame;
454                _ -> <<AckBuf/binary, Frame/binary>>
455            end,
456            AMQPSession#amqp_session{ack_buf = NewAckBuf};
457        false ->
458            ?DebugF("delivered: ~p ~n", [noack]),
459            ts_mon_cache:add({count, amqp_delivered}),
460            AMQPSession#amqp_session{ack_buf = AckBuf}
461    end;
462should_ack(Channel, AckBuf, Method = #'basic.ack'{}, AMQPSession) ->
463    ?DebugF("publish confirm: ~p ~n", [ack]),
464    update_confirm_set(Channel, Method),
465    AMQPSession#amqp_session{ack_buf = AckBuf};
466should_ack(Channel, AckBuf, Method = #'basic.nack'{}, AMQPSession) ->
467    ?DebugF("publish confirm: ~p ~n", [nack]),
468    update_confirm_set(Channel, Method),
469    AMQPSession#amqp_session{ack_buf = AckBuf};
470should_ack(_Channel, AckBuf, _Method, AMQPSession) ->
471    ?DebugF("delivered: ~p ~n", [other]),
472    AMQPSession#amqp_session{ack_buf = AckBuf}.
473
474update_confirm_set(Channel, #'basic.ack'{delivery_tag = SeqNo, multiple = Multiple}) ->
475    ChState = get({chstate, Channel}),
476    USet = ChState#ch.unconfirmed_set,
477    USet1 = update_unconfirmed(ack, SeqNo, Multiple, USet),
478    put({chstate, Channel}, ChState#ch{unconfirmed_set = USet1});
479update_confirm_set(Channel, #'basic.nack'{delivery_tag = SeqNo, multiple = Multiple}) ->
480    ChState = get({chstate, Channel}),
481    USet = ChState#ch.unconfirmed_set,
482    USet1 = update_unconfirmed(nack, SeqNo, Multiple, USet),
483    put({chstate, Channel}, ChState#ch{unconfirmed_set = USet1}).
484
485update_unconfirmed(AckType, SeqNo, false, USet) ->
486    add_ack_stat(AckType),
487    gb_sets:del_element(SeqNo, USet);
488update_unconfirmed(AckType, SeqNo, true, USet) ->
489    case gb_sets:is_empty(USet) of
490        true  -> USet;
491        false -> {S, USet1} = gb_sets:take_smallest(USet),
492                 case S > SeqNo of
493                     true  -> USet;
494                     false ->
495                        add_ack_stat(AckType),
496                        update_unconfirmed(AckType, SeqNo, true, USet1)
497                 end
498    end.
499
500add_ack_stat(ack) ->
501    ts_mon_cache:add({count, amqp_confirmed});
502add_ack_stat(nack) ->
503    ts_mon_cache:add({count, amqp_unconfirmed}).
504
505client_properties(UserProperties) ->
506    Default = [{<<"product">>,   longstr, <<"Tsung">>},
507               {<<"version">>,   longstr, list_to_binary("0.0.1")},
508               {<<"platform">>,  longstr, <<"Erlang">>},
509               {<<"capabilities">>, table, ?CLIENT_CAPABILITIES}],
510    lists:foldl(fun({K, _, _} = Tuple, Acc) ->
511                    lists:keystore(K, 1, Acc, Tuple)
512                end, Default, UserProperties).
513
514assemble_frame(Channel, MethodRecord, Protocol) ->
515    list_to_binary(rabbit_binary_generator:build_simple_method_frame(
516            Channel, MethodRecord, Protocol)).
517
518assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
519    MethodName = rabbit_misc:method_record_type(MethodRecord),
520    true = Protocol:method_has_content(MethodName), % assertion
521    MethodFrame = rabbit_binary_generator:build_simple_method_frame(
522                    Channel, MethodRecord, Protocol),
523    ContentFrames = rabbit_binary_generator:build_simple_content_frames(
524                      Channel, Content, FrameMax, Protocol),
525    list_to_binary([MethodFrame | ContentFrames]).
526
527build_content(Properties, BodyBin) when is_binary(BodyBin) ->
528    build_content(Properties, [BodyBin]);
529
530build_content(Properties, PFR) ->
531    %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
532    {ClassId, _MethodId} =
533        rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
534    #content{class_id = ClassId,
535             properties = Properties,
536             properties_bin = none,
537             protocol = none,
538             payload_fragments_rev = PFR}.
539
540decode_and_check(Data, Waiting, State, Protocol) ->
541    case decode_frame(Protocol, Data) of
542        {error, _Reason} ->
543            ?DebugF("decode error: ~p~n", [_Reason]),
544            ts_mon_cache:add({count, amqp_error}),
545            {fail, {State#state_rcv{ack_done = true}, [], true}};
546        {ok, heartbeat, Left} ->
547            {ok, heartbeat, {State#state_rcv{ack_done = false, acc = Left},
548                             [], true}};
549        {ok, Channel, Method, Left} ->
550            check(Channel, Waiting, Method, State, Left);
551        {incomplete, Left} ->
552            ?DebugF("incomplete frame: ~p~n", [Left]),
553            {fail, {State#state_rcv{ack_done = false, acc = Left}, [], false}}
554    end.
555
556check(Channel, {Channel, Expecting}, Method, State, Left) ->
557    ?DebugF("receive from server: ~p~n", [Method]),
558    case {Expecting, element(1, Method)} of
559        {E, M} when E =:= M ->
560            PostFun = get_post_fun(Channel, Expecting),
561            {ok, Method,
562             PostFun({State#state_rcv{ack_done = true, acc = Left}, [], false})};
563        _ ->
564            ts_mon_cache:add({count, amqp_unexpected}),
565            ?DebugF("unexpected_method: ~p, expecting ~p~n",
566                    [Method, Expecting]),
567            {fail, {State#state_rcv{ack_done = true}, [], true}}
568    end;
569check(Channel, Waiting = {WaitingCh, Expecting}, Method = #'basic.deliver'{},
570      State = #state_rcv{session = AMQPSession}, Left) ->
571    ?LOGF("waiting on ~p, expecting ~p, but receive deliver on ~p ~p~n",
572          [WaitingCh, Expecting, Channel, Method], ?NOTICE),
573    AckBuf = AMQPSession#amqp_session.ack_buf,
574    NewAMQPSession = should_ack(Channel, AckBuf, Method, AMQPSession),
575    Protocol = AMQPSession#amqp_session.protocol,
576    decode_and_check(Left, Waiting,
577                     State#state_rcv{session = NewAMQPSession}, Protocol);
578check(Channel, Waiting = {WaitingCh, Expecting}, Method,
579      State = #state_rcv{session = AMQPSession}, Left) ->
580    ?LOGF("waiting on ~p, but received on ~p, expecting: ~p, actual: ~p~n",
581          [WaitingCh, Channel, Expecting, Method], ?NOTICE),
582    Protocol = AMQPSession#amqp_session.protocol,
583    decode_and_check(Left, Waiting, State, Protocol).
584
585decode_frame(Protocol, <<Type:8, Channel:16, Length:32, Body/binary>>)
586        when size(Body) > Length ->
587    <<PayLoad:Length/binary, ?FRAME_END, Left/binary>> = Body,
588    case rabbit_command_assembler:analyze_frame(Type, PayLoad, Protocol) of
589        heartbeat -> {ok, heartbeat, Left};
590        AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, Protocol, Left)
591    end;
592decode_frame(_Protocol, Data) ->
593    {incomplete, Data}.
594
595process_frame(Frame, Channel, Protocol, Left) ->
596    AState = case get({channel, Channel}) of
597        undefined -> {ok, InitAState} = rabbit_command_assembler:init(Protocol),
598            InitAState;
599        AState1-> AState1
600    end,
601    case process_channel_frame(Frame, AState, Left) of
602        {ok, Method, NewAState, Left} ->
603            put({channel, Channel}, NewAState),
604            {ok, Channel, Method, Left};
605        Other -> Other
606    end.
607
608process_channel_frame(Frame, AState, Left) ->
609    case rabbit_command_assembler:process(Frame, AState) of
610        {ok, NewAState} ->
611            {ok, none, NewAState, Left};
612        {ok, Method, NewAState} ->
613            {ok, Method, NewAState, Left};
614        {ok, Method, _Content, NewAState} ->
615            {ok, Method, NewAState, Left};
616        {error, Reason} -> {error, Reason}
617    end.
618