1%%% This code was developped by Zhihui Jiao(jzhihui521@gmail.com). 2%%% 3%%% Copyright (C) 2013 Zhihui Jiao 4%%% 5%%% This program is free software; you can redistribute it and/or modify 6%%% it under the terms of the GNU General Public License as published by 7%%% the Free Software Foundation; either version 2 of the License, or 8%%% (at your option) any later version. 9%%% 10%%% This program is distributed in the hope that it will be useful, 11%%% but WITHOUT ANY WARRANTY; without even the implied warranty of 12%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13%%% GNU General Public License for more details. 14%%% 15%%% You should have received a copy of the GNU General Public License 16%%% along with this program; if not, write to the Free Software 17%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. 18%%% 19%%% In addition, as a special exception, you have the permission to 20%%% link the code of this program with any library released under 21%%% the EPL license and distribute linked combinations including 22%%% the two; the MPL (Mozilla Public License), which EPL (Erlang 23%%% Public License) is based on, is included in this exception. 24 25-module(ts_config_amqp). 26 27-vc('$Id$ '). 28-author('jzhihui521@gmail.com'). 29 30-export([parse_config/2]). 31 32-include("ts_profile.hrl"). 33-include("ts_config.hrl"). 34-include("ts_amqp.hrl"). 35 36-include("xmerl.hrl"). 37 38%%---------------------------------------------------------------------- 39%% Func: parse_config/2 40%% Args: Element, Config 41%% Returns: List 42%% Purpose: parse a request defined in the XML config file 43%%---------------------------------------------------------------------- 44%% Parsing other elements 45parse_config(Element = #xmlElement{name = dyn_variable}, Conf = #config{}) -> 46 ts_config:parse(Element, Conf); 47parse_config(Element = #xmlElement{name = amqp}, 48 Config = #config{curid = Id, session_tab = Tab, 49 sessions = [CurS | _], dynvar = DynVar, 50 subst = SubstFlag, match = MatchRegExp}) -> 51 initialize_options(Tab), 52 53 TypeStr = ts_config:getAttr(string, Element#xmlElement.attributes, type), 54 Type = list_to_atom(TypeStr), 55 56 ReqList = case Type of 57 %% connection.open request, we add all the requests to be done 58 'connection.open' -> 59 ['connect', 'connection.start_ok', 'connection.tune_ok', 60 'connection.open']; 61 'waitForConfirms' -> 62 Timeout = ts_config:getAttr(float_or_integer, 63 Element#xmlElement.attributes, 64 timeout, 1), 65 ets:insert(Tab, {{CurS#session.id, Id}, {thinktime, Timeout * 1000}}), 66 []; 67 'waitForMessages' -> 68 Timeout = ts_config:getAttr(float_or_integer, 69 Element#xmlElement.attributes, 70 timeout, 60), 71 ets:insert(Tab, {{CurS#session.id, Id}, {thinktime, Timeout * 1000}}), 72 []; 73 _ -> 74 [Type] 75 end, 76 Result = lists:foldl(fun(RequestType, CurrId) -> 77 {Ack, Request} = parse_request(Element, RequestType, Tab), 78 Msg = #ts_request{ack = Ack, 79 endpage = true, 80 dynvar_specs = DynVar, 81 subst = SubstFlag, 82 match = MatchRegExp, 83 param = Request}, 84 ets:insert(Tab, {{CurS#session.id, CurrId}, Msg}), 85 CurrId + 1 86 end, Id, ReqList), 87 88 ResultId = case ReqList of 89 [] -> Id; 90 _ -> Result - 1 91 end, 92 93 ts_config:mark_prev_req(Id - 1, Tab, CurS), 94 95 lists:foldl(fun(A, B) -> ts_config:parse(A, B) end, 96 Config#config{dynvar = [], curid = ResultId}, 97 Element#xmlElement.content); 98 99%% Parsing options 100parse_config(Element = #xmlElement{name=option}, Conf = #config{session_tab = Tab}) -> 101 NewConf = case ts_config:getAttr(Element#xmlElement.attributes, name) of 102 "username" -> 103 Val = ts_config:getAttr(string,Element#xmlElement.attributes, 104 value,?AMQP_USER), 105 ets:insert(Tab,{{amqp_username,value}, Val}), 106 Conf; 107 "password" -> 108 Val = ts_config:getAttr(string,Element#xmlElement.attributes, 109 value,?AMQP_PASSWORD), 110 ets:insert(Tab,{{amqp_password,value}, Val}), 111 Conf; 112 "heartbeat" -> 113 Val = ts_config:getAttr(float_or_integer, 114 Element#xmlElement.attributes, value, 600), 115 ets:insert(Tab,{{amqp_heartbeat,value}, Val}), 116 Conf 117 end, 118 lists:foldl(fun(A,B) -> ts_config:parse(A,B) end, 119 NewConf, Element#xmlElement.content); 120 121%% Parsing other elements 122parse_config(Element = #xmlElement{}, Conf = #config{}) -> 123 ts_config:parse(Element,Conf); 124%% Parsing non #xmlElement elements 125parse_config(_, Conf = #config{}) -> 126 Conf. 127 128parse_request(Element, Type = 'connection.open', _Tab) -> 129 Vhost = ts_config:getAttr(string, Element#xmlElement.attributes, vhost, "/"), 130 Request = #amqp_request{type = Type, vhost = Vhost}, 131 {parse, Request}; 132parse_request(_Element, Type = 'connection.start_ok', Tab) -> 133 UserName = ts_config:get_default(Tab, amqp_username), 134 Password = ts_config:get_default(Tab, amqp_password), 135 Request = #amqp_request{type = Type, username = UserName, 136 password = Password}, 137 {parse, Request}; 138parse_request(_Element, Type = 'connection.tune_ok', Tab) -> 139 HeartBeat = ts_config:get_default(Tab, amqp_heartbeat), 140 Request = #amqp_request{type = Type, heartbeat = HeartBeat}, 141 {no_ack, Request}; 142parse_request(Element, Type = 'channel.open', _Tab) -> 143 Channel = ts_config:getAttr(string, Element#xmlElement.attributes, 144 channel, "0"), 145 Request = #amqp_request{type = Type, channel = Channel}, 146 {parse, Request}; 147parse_request(Element, Type = 'basic.publish', _Tab) -> 148 Channel = ts_config:getAttr(string, Element#xmlElement.attributes, 149 channel, "1"), 150 Exchange = ts_config:getAttr(string, Element#xmlElement.attributes, 151 exchange, ""), 152 RoutingKey = ts_config:getAttr(string, Element#xmlElement.attributes, 153 routing_key, "/"), 154 Size = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes, 155 payload_size, 100), 156 PersistentStr = ts_config:getAttr(string, Element#xmlElement.attributes, 157 persistent, "false"), 158 Payload = ts_config:getAttr(string, Element#xmlElement.attributes, 159 payload, ""), 160 Persistent = list_to_atom(PersistentStr), 161 Request = #amqp_request{type = Type, channel = Channel, exchange = Exchange, 162 routing_key = RoutingKey, payload_size = Size, 163 payload = Payload, persistent = Persistent}, 164 {no_ack, Request}; 165parse_request(Element, Type = 'basic.consume', _Tab) -> 166 Channel = ts_config:getAttr(string, Element#xmlElement.attributes, 167 channel, "1"), 168 Queue = ts_config:getAttr(string, Element#xmlElement.attributes, queue, ""), 169 AckStr = ts_config:getAttr(string, 170 Element#xmlElement.attributes, ack, "false"), 171 Ack = list_to_atom(AckStr), 172 Request = #amqp_request{type = Type, channel = Channel, 173 queue = Queue, ack = Ack}, 174 {parse, Request}; 175parse_request(Element, Type = 'basic.qos', _Tab) -> 176 Channel = ts_config:getAttr(string, Element#xmlElement.attributes, 177 channel, "1"), 178 PrefetchSize = ts_config:getAttr(float_or_integer, 179 Element#xmlElement.attributes, prefetch_size, 0), 180 PrefetchCount = ts_config:getAttr(float_or_integer, 181 Element#xmlElement.attributes, prefetch_count, 0), 182 Request = #amqp_request{type = Type, channel = Channel, 183 prefetch_size = PrefetchSize, 184 prefetch_count = PrefetchCount}, 185 {parse, Request}; 186parse_request(Element, Type, _Tab) -> 187 Channel = ts_config:getAttr(string, Element#xmlElement.attributes, 188 channel, "1"), 189 Request = #amqp_request{type = Type, channel = Channel}, 190 {parse, Request}. 191 192 193initialize_options(Tab) -> 194 case ts_config:get_default(Tab, amqp_initialized) of 195 {undef_var, _} -> 196 ets:insert_new(Tab,{{amqp_username,value}, ?AMQP_USER}), 197 ets:insert_new(Tab,{{amqp_password,value}, ?AMQP_PASSWORD}), 198 ets:insert_new(Tab,{{amqp_heartbeat,value}, 600}); 199 _Else -> 200 ok 201 end. 202