1%%%  This code was developped by Zhihui Jiao(jzhihui521@gmail.com).
2%%%
3%%%  Copyright (C) 2013 Zhihui Jiao
4%%%
5%%%  This program is free software; you can redistribute it and/or modify
6%%%  it under the terms of the GNU General Public License as published by
7%%%  the Free Software Foundation; either version 2 of the License, or
8%%%  (at your option) any later version.
9%%%
10%%%  This program is distributed in the hope that it will be useful,
11%%%  but WITHOUT ANY WARRANTY; without even the implied warranty of
12%%%  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13%%%  GNU General Public License for more details.
14%%%
15%%%  You should have received a copy of the GNU General Public License
16%%%  along with this program; if not, write to the Free Software
17%%%  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
18%%%
19%%%  In addition, as a special exception, you have the permission to
20%%%  link the code of this program with any library released under
21%%%  the EPL license and distribute linked combinations including
22%%%  the two; the MPL (Mozilla Public License), which EPL (Erlang
23%%%  Public License) is based on, is included in this exception.
24
25-module(ts_config_amqp).
26
27-vc('$Id$ ').
28-author('jzhihui521@gmail.com').
29
30-export([parse_config/2]).
31
32-include("ts_profile.hrl").
33-include("ts_config.hrl").
34-include("ts_amqp.hrl").
35
36-include("xmerl.hrl").
37
38%%----------------------------------------------------------------------
39%% Func: parse_config/2
40%% Args: Element, Config
41%% Returns: List
42%% Purpose: parse a request defined in the XML config file
43%%----------------------------------------------------------------------
44%% Parsing other elements
45parse_config(Element = #xmlElement{name = dyn_variable}, Conf = #config{}) ->
46    ts_config:parse(Element, Conf);
47parse_config(Element = #xmlElement{name = amqp},
48             Config = #config{curid = Id, session_tab = Tab,
49                              sessions = [CurS | _], dynvar = DynVar,
50                              subst = SubstFlag, match = MatchRegExp}) ->
51    initialize_options(Tab),
52
53    TypeStr = ts_config:getAttr(string, Element#xmlElement.attributes, type),
54    Type = list_to_atom(TypeStr),
55
56    ReqList = case Type of
57        %% connection.open request, we add all the requests to be done
58        'connection.open' ->
59            ['connect', 'connection.start_ok', 'connection.tune_ok',
60             'connection.open'];
61        'waitForConfirms' ->
62            Timeout = ts_config:getAttr(float_or_integer,
63                                        Element#xmlElement.attributes,
64                                        timeout, 1),
65            ets:insert(Tab, {{CurS#session.id, Id}, {thinktime, Timeout * 1000}}),
66            [];
67        'waitForMessages' ->
68            Timeout = ts_config:getAttr(float_or_integer,
69                                        Element#xmlElement.attributes,
70                                        timeout, 60),
71            ets:insert(Tab, {{CurS#session.id, Id}, {thinktime, Timeout * 1000}}),
72            [];
73        _ ->
74            [Type]
75    end,
76    Result = lists:foldl(fun(RequestType, CurrId) ->
77                {Ack, Request} = parse_request(Element, RequestType, Tab),
78                Msg = #ts_request{ack = Ack,
79                                  endpage = true,
80                                  dynvar_specs  = DynVar,
81                                  subst   = SubstFlag,
82                                  match   = MatchRegExp,
83                                  param   = Request},
84                ets:insert(Tab, {{CurS#session.id, CurrId}, Msg}),
85                CurrId + 1
86        end, Id, ReqList),
87
88    ResultId = case ReqList of
89        [] -> Id;
90        _ -> Result - 1
91    end,
92
93    ts_config:mark_prev_req(Id - 1, Tab, CurS),
94
95    lists:foldl(fun(A, B) -> ts_config:parse(A, B) end,
96                Config#config{dynvar = [], curid = ResultId},
97                Element#xmlElement.content);
98
99%% Parsing options
100parse_config(Element = #xmlElement{name=option}, Conf = #config{session_tab = Tab}) ->
101    NewConf = case ts_config:getAttr(Element#xmlElement.attributes, name) of
102        "username" ->
103            Val = ts_config:getAttr(string,Element#xmlElement.attributes,
104                                    value,?AMQP_USER),
105            ets:insert(Tab,{{amqp_username,value}, Val}),
106            Conf;
107        "password" ->
108            Val = ts_config:getAttr(string,Element#xmlElement.attributes,
109                                    value,?AMQP_PASSWORD),
110            ets:insert(Tab,{{amqp_password,value}, Val}),
111            Conf;
112        "heartbeat" ->
113            Val = ts_config:getAttr(float_or_integer,
114                                    Element#xmlElement.attributes, value, 600),
115            ets:insert(Tab,{{amqp_heartbeat,value}, Val}),
116            Conf
117    end,
118    lists:foldl(fun(A,B) -> ts_config:parse(A,B) end,
119                NewConf, Element#xmlElement.content);
120
121%% Parsing other elements
122parse_config(Element = #xmlElement{}, Conf = #config{}) ->
123    ts_config:parse(Element,Conf);
124%% Parsing non #xmlElement elements
125parse_config(_, Conf = #config{}) ->
126    Conf.
127
128parse_request(Element, Type = 'connection.open', _Tab) ->
129    Vhost = ts_config:getAttr(string, Element#xmlElement.attributes, vhost, "/"),
130    Request = #amqp_request{type = Type, vhost = Vhost},
131    {parse, Request};
132parse_request(_Element, Type = 'connection.start_ok', Tab) ->
133    UserName = ts_config:get_default(Tab, amqp_username),
134    Password = ts_config:get_default(Tab, amqp_password),
135    Request = #amqp_request{type = Type, username = UserName,
136                            password = Password},
137    {parse, Request};
138parse_request(_Element, Type = 'connection.tune_ok', Tab) ->
139    HeartBeat = ts_config:get_default(Tab, amqp_heartbeat),
140    Request = #amqp_request{type = Type, heartbeat = HeartBeat},
141    {no_ack, Request};
142parse_request(Element, Type = 'channel.open', _Tab) ->
143    Channel = ts_config:getAttr(string, Element#xmlElement.attributes,
144                                channel, "0"),
145    Request = #amqp_request{type = Type, channel = Channel},
146    {parse, Request};
147parse_request(Element, Type = 'basic.publish', _Tab) ->
148    Channel = ts_config:getAttr(string, Element#xmlElement.attributes,
149                                channel, "1"),
150    Exchange = ts_config:getAttr(string, Element#xmlElement.attributes,
151                                 exchange, ""),
152    RoutingKey = ts_config:getAttr(string, Element#xmlElement.attributes,
153                                   routing_key, "/"),
154    Size = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes,
155                             payload_size, 100),
156    PersistentStr = ts_config:getAttr(string, Element#xmlElement.attributes,
157                                      persistent, "false"),
158    Payload = ts_config:getAttr(string, Element#xmlElement.attributes,
159                                      payload, ""),
160    Persistent = list_to_atom(PersistentStr),
161    Request = #amqp_request{type = Type, channel = Channel, exchange = Exchange,
162                            routing_key = RoutingKey, payload_size = Size,
163                            payload = Payload, persistent = Persistent},
164    {no_ack, Request};
165parse_request(Element, Type = 'basic.consume', _Tab) ->
166    Channel = ts_config:getAttr(string, Element#xmlElement.attributes,
167                                channel, "1"),
168    Queue = ts_config:getAttr(string, Element#xmlElement.attributes, queue, ""),
169    AckStr = ts_config:getAttr(string,
170                               Element#xmlElement.attributes, ack, "false"),
171    Ack = list_to_atom(AckStr),
172    Request = #amqp_request{type = Type, channel = Channel,
173                            queue = Queue, ack = Ack},
174    {parse, Request};
175parse_request(Element, Type = 'basic.qos', _Tab) ->
176    Channel = ts_config:getAttr(string, Element#xmlElement.attributes,
177                                channel, "1"),
178    PrefetchSize = ts_config:getAttr(float_or_integer,
179                              Element#xmlElement.attributes, prefetch_size, 0),
180    PrefetchCount = ts_config:getAttr(float_or_integer,
181                              Element#xmlElement.attributes, prefetch_count, 0),
182    Request = #amqp_request{type = Type, channel = Channel,
183                            prefetch_size = PrefetchSize,
184                            prefetch_count = PrefetchCount},
185    {parse, Request};
186parse_request(Element, Type, _Tab) ->
187    Channel = ts_config:getAttr(string, Element#xmlElement.attributes,
188                                channel, "1"),
189    Request = #amqp_request{type = Type, channel = Channel},
190    {parse, Request}.
191
192
193initialize_options(Tab) ->
194    case ts_config:get_default(Tab, amqp_initialized) of
195        {undef_var, _} ->
196            ets:insert_new(Tab,{{amqp_username,value}, ?AMQP_USER}),
197            ets:insert_new(Tab,{{amqp_password,value}, ?AMQP_PASSWORD}),
198            ets:insert_new(Tab,{{amqp_heartbeat,value}, 600});
199        _Else ->
200            ok
201    end.
202