1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2003-2016. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22%%----------------------------------------------------------------------
23%% Purpose: Transaction sender process
24%%----------------------------------------------------------------------
25
26-module(megaco_trans_sender).
27
28-export([start_link/5,
29	 stop/1,
30	 upgrade/2,
31	 send_req/3,
32	 send_reqs/3,
33	 send_ack/2,
34	 send_ack_now/2,
35	 send_pending/2,
36	 send_reply/2,
37	 timeout/2,
38	 ack_maxcount/2,
39	 req_maxcount/2,
40	 req_maxsize/2]).
41-export([system_continue/3, system_terminate/4, system_code_change/4]).
42-export([init/6]).
43
44
45-include_lib("megaco/include/megaco.hrl").
46-include("megaco_message_internal.hrl").
47-include_lib("megaco/src/app/megaco_internal.hrl").
48
49
50-record(state,
51	{
52	  parent,
53	  conn_handle,
54	  timeout,
55	  req_sz = 0,
56	  req_maxsize,  %% Max total size of all accumulated reqs
57	  req_maxcount,
58	  ack_maxcount,
59	  reqs = [],
60	  acks = []
61	 }).
62
63
64%%%-----------------------------------------------------------------
65%%% Public API
66%%%-----------------------------------------------------------------
67start_link(CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) ->
68    ?d("start_link -> entry with"
69	"~n   CH:        ~p"
70	"~n   To:        ~p"
71	"~n   MaxSzReqs: ~p"
72	"~n   MaxNoReqs: ~p"
73	"~n   MaxNoAcks: ~p", [CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]),
74    Args = [self(), CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks],
75    proc_lib:start_link(?MODULE, init, Args).
76
77stop(Pid) when is_pid(Pid) ->
78    Pid ! stop,
79    ok.
80
81upgrade(Pid, CH) when is_pid(Pid) ->
82    Pid ! {upgrade, CH},
83    ok.
84
85send_req(Pid, Tid, Req) when is_pid(Pid) andalso is_binary(Req) ->
86    Pid ! {send_req, Tid, Req},
87    ok.
88
89send_reqs(Pid, Tids, Reqs)
90  when is_pid(Pid) andalso
91       is_list(Tids) andalso
92       is_list(Reqs) andalso
93       (length(Tids) =:= length(Reqs)) ->
94    Pid ! {send_reqs, Tids, Reqs},
95    ok.
96
97send_ack(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) ->
98    Pid ! {send_ack, Serial},
99    ok.
100
101send_ack_now(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) ->
102    Pid ! {send_ack_now, Serial},
103    ok.
104
105send_pending(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) ->
106    Pid ! {send_pending, Serial},
107    ok.
108
109send_reply(Pid, Reply) when is_pid(Pid) andalso is_binary(Reply) ->
110    Pid ! {send_reply, Reply}.
111
112ack_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) ->
113    Pid ! {ack_maxcount, Max},
114    ok.
115
116req_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) ->
117    Pid ! {req_maxcount, Max},
118    ok.
119
120req_maxsize(Pid, Max) when is_pid(Pid) andalso is_integer(Max) ->
121    Pid ! {req_maxsize, Max},
122    ok.
123
124timeout(Pid, Timeout) when is_pid(Pid) ->
125    Pid ! {timeout, Timeout},
126    ok.
127
128
129
130%%%-----------------------------------------------------------------
131%%% Internal exports
132%%%-----------------------------------------------------------------
133
134init(Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) ->
135    ?d("init -> entry with"
136	"~n   Parent:    ~p"
137	"~n   CH:        ~p"
138	"~n   To:        ~p"
139	"~n   MaxSzReqs: ~p"
140	"~n   MaxNoReqs: ~p"
141	"~n   MaxNoAcks: ~p", [Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]),
142    process_flag(trap_exit, true),
143    proc_lib:init_ack(Parent, {ok, self()}),
144    S = #state{parent       = Parent,
145	       conn_handle  = CH,
146	       timeout      = To,
147	       req_maxsize  = MaxSzReqs,
148	       req_maxcount = MaxNoReqs,
149	       ack_maxcount = MaxNoAcks},
150    loop(S, To).
151
152
153%%%-----------------------------------------------------------------
154%%% Internal functions
155%%%-----------------------------------------------------------------
156%% idle (= empty)
157loop(#state{reqs = [], acks = [], timeout = Timeout} = S, _) ->
158    receive
159	{send_ack, Serial} ->
160	    ?d("loop(empty) -> received send_ack [~w] request", [Serial]),
161	    loop(S#state{acks = [Serial]}, Timeout);
162
163	{send_ack_now, Serial} ->
164	    ?d("loop(empty) -> received send_ack_now [~w] request", [Serial]),
165	    send_msg(S#state.conn_handle, [], [Serial]),
166	    loop(S, Timeout);
167
168	{send_req, Tid, Req} when size(Req) >= S#state.req_maxsize ->
169	    ?d("loop(empty) -> received (big) send_req request ~w", [Tid]),
170	    send_msg(S#state.conn_handle, [{Tid, Req}], []),
171	    loop(S, Timeout);
172
173	{send_req, Tid, Req} ->
174	    ?d("loop(empty) -> received send_req request ~w", [Tid]),
175	    loop(S#state{req_sz = size(Req), reqs = [{Tid,Req}]}, Timeout);
176
177	{send_reqs, Tids, Reqs} ->
178	    ?d("loop(empty) -> received send_reqs request: ~w", [Tids]),
179	    {NewS, _} = handle_send_reqs(Tids, Reqs, S),
180	    loop(NewS, Timeout);
181
182	{send_pending, Serial} ->
183	    ?d("loop(empty) -> received send_pending [~w] request", [Serial]),
184	    handle_send_result(
185	      send_pending(S#state.conn_handle, Serial, [], [])
186	     ),
187	    loop(S, Timeout);
188
189	{send_reply, Reply} ->
190	    ?d("loop(empty) -> received send_reply request", []),
191	    #state{conn_handle = CH, req_maxsize = MaxSz} = S,
192	    handle_send_result( send_reply(CH, Reply, MaxSz, 0, [], []) ),
193	    loop(S, Timeout);
194
195	{upgrade, CH} ->
196	    ?d("loop(empty) -> received upgrade request:"
197		"~n   CH: ~p", [CH]),
198	    loop(S#state{conn_handle = CH}, Timeout);
199
200	{ack_maxcount, NewMax} ->
201	    ?d("loop(empty) -> received ack_maxcount request", []),
202	    loop(S#state{ack_maxcount = NewMax}, Timeout);
203
204	{req_maxcount, NewMax} ->
205	    ?d("loop(empty) -> received req_maxcount request", []),
206	    loop(S#state{req_maxcount = NewMax}, Timeout);
207
208	{req_maxsize, NewMax} ->
209	    ?d("loop(empty) -> received req_maxsize request", []),
210	    loop(S#state{req_maxsize = NewMax}, Timeout);
211
212	{timeout, NewTimeout} ->
213	    ?d("loop(empty) -> received timeout request", []),
214	    loop(S#state{timeout = NewTimeout}, NewTimeout);
215
216	stop ->
217	    ?d("loop(empty) -> received stop request", []),
218	    exit(normal);
219
220	{system, From, Msg} ->
221	    ?d("loop(empty) -> received system message:"
222		"~n   From: ~p"
223		"~n   Msg:  ~p", [From, Msg]),
224	    Parent = S#state.parent,
225	    sys:handle_system_msg(Msg, From, Parent,
226				  ?MODULE, [], {S, Timeout});
227
228	{'EXIT', Parent, Reason} when S#state.parent == Parent ->
229	    ?d("loop(empty) -> received upgrade request", []),
230	    exit(Reason);
231
232	M ->
233	    warning_msg("received unexpected message (ignoring): "
234			"~n~p", [M]),
235	    loop(S, Timeout)
236
237    end;
238
239%% active (= some acks or reqs waiting to to be sent)
240loop(#state{reqs = Reqs, acks = Acks, ack_maxcount = MaxAcks,
241	    timeout = Timeout} = S, To)
242  when To >= 0 ->
243    Start = t(),
244    receive
245	{send_ack, Serial} when length(Acks) + 1 >= MaxAcks ->
246	    ?d("loop(active,~w) -> "
247		"received [~w] send_ack [~w] request",
248	    [To, length(Acks), Serial]),
249	    handle_send_result(
250	      send_msg(S#state.conn_handle, Reqs, [Serial|Acks])
251	     ),
252	    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
253
254	{send_ack, Serial} ->
255	    ?d("loop(active,~w) -> received send_ack [~w] request",
256		[To, Serial]),
257	    loop(S#state{acks = [Serial|Acks]}, to(To, Start));
258
259	{send_ack_now, Serial} ->
260	    ?d("loop(active,~w) -> [~w,~w] "
261		"received send_ack_now [~w] request",
262		[To, length(Reqs), length(Acks), Serial]),
263	    handle_send_result(
264	      send_msg(S#state.conn_handle, Reqs, [Serial|Acks])
265	     ),
266	    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
267
268	%% We need to check that this is not a resend!!
269	%% In that case, send whatever we have in store
270	{send_req, Tid, Req} ->
271	    ?d("loop(active,~w) -> received send_req request ~w", [To,Tid]),
272	    {NewS, NewT} =
273		case handle_send_req(Tid, Req, S) of
274		    {S1, true} ->
275			{S1, Timeout};
276		    {S1, false} ->
277			{S1, to(To, Start)}
278		end,
279	    loop(NewS, NewT);
280
281	{send_reqs, Tids, NewReqs} ->
282	    ?d("loop(active,~w) -> received send_reqs request ~w", [To,Tids]),
283	    {NewS, NewT} =
284		case handle_send_reqs(Tids, NewReqs, S) of
285		    {S1, true} ->
286			{S1, Timeout};
287		    {S1, false} ->
288			{S1, to(To, Start)}
289		end,
290	    loop(NewS, NewT);
291
292	{send_pending, Serial} ->
293	    ?d("loop(active,~w) -> received send_pending [~w] request",
294		[To, Serial]),
295	    handle_send_result(
296	      send_pending(S#state.conn_handle, Serial, Reqs, Acks)
297	     ),
298	    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
299
300	{send_reply, Reply} ->
301	    ?d("loop(active,~w) -> received send_reply request", [To]),
302	    #state{conn_handle = CH, req_maxsize = MaxSz, req_sz = ReqSz} = S,
303	    handle_send_result(
304	      send_reply(CH, Reply, MaxSz, ReqSz, Reqs, Acks)
305	     ),
306	    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout);
307
308	{upgrade, CH} ->
309	    ?d("loop(active,~w) -> received upgrade request", [To]),
310	    loop(S#state{conn_handle = CH}, to(To, Start));
311
312	{req_maxsize, NewMax} ->
313	    ?d("loop(active,~w) -> received req_maxsize request", [To]),
314	    loop(S#state{req_maxsize = NewMax}, to(To, Start));
315
316	{req_maxcount, NewMax} ->
317	    ?d("loop(active,~w) -> received req_maxcount request", [To]),
318	    loop(S#state{req_maxcount = NewMax}, to(To, Start));
319
320	{ack_maxcount, NewMax} ->
321	    ?d("loop(active,~w) -> received ack_maxcount request", [To]),
322	    loop(S#state{ack_maxcount = NewMax}, to(To, Start));
323
324	{timeout, NewTimeout} when NewTimeout > Timeout ->
325	    ?d("loop(active,~w) -> received timeout request: ~w",
326		[To, NewTimeout]),
327	    %% We need to recalculate To
328	    NewTo = NewTimeout - (Timeout - to(To, Start)),
329	    loop(S#state{timeout = NewTimeout}, NewTo);
330
331	{timeout, NewTimeout} ->
332	    ?d("loop(active,~w) -> received timeout request: ~w",
333		[To, NewTimeout]),
334	    %% We need to recalculate To
335	    NewTo = to(To, Start) - (Timeout - NewTimeout),
336	    loop(S#state{timeout = NewTimeout}, NewTo);
337
338	stop ->
339	    ?d("loop(active,~w) -> received stop request", [To]),
340	    handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),
341	    exit(normal);
342
343	{system, From, Msg} ->
344	    ?d("loop(active,~w) -> received system message:"
345		"~n   From: ~p"
346		"~n   Msg:  ~p", [To, From, Msg]),
347	    Parent = S#state.parent,
348	    sys:handle_system_msg(Msg, From, Parent,
349				  ?MODULE, [], {S, to(To, Start)});
350
351	{'EXIT', Parent, Reason} when S#state.parent == Parent ->
352	    ?d("loop(active,~w) -> received exit request", [To]),
353	    exit(Reason);
354
355	M ->
356	    warning_msg("received unexpected message (ignoring): "
357			"~n~p", [M]),
358	    loop(S, to(To, Start))
359
360    after To ->
361	    ?d("loop(active,~w) -> timeout - time to send", [To]),
362	    handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),
363	    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout)
364    end;
365
366loop(#state{reqs = Reqs, acks = Acks, timeout = Timeout} = S, _To) ->
367    ?d("loop(active) -> timeout [~w, ~w]", [length(Reqs),length(Acks)]),
368    handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),
369    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout).
370
371
372%%%-----------------------------------------------------------------
373
374%% The request is itself larger then the max size, so first send
375%% everything we have stored in one message, and then the new request
376%% in another.
377%% Note that it does not matter if we with this request
378%% passed the maxcount limit.
379%% Note that this message cannot be a re-sent, since
380%% such a request would have been stored, but sent immediatly.
381handle_send_req(Tid, Req,
382		#state{conn_handle = CH,
383		       req_maxsize = MaxSz, reqs = Reqs, acks = Acks} = S)
384  when size(Req) >= MaxSz ->
385    ?d("handle_send_req -> request bigger then maxsize ~w", [MaxSz]),
386    handle_send_result( send_msg(CH, Reqs, Acks) ),
387    handle_send_result( send_msg(CH, [{Tid, Req}], []) ),
388    {S#state{req_sz = 0, reqs = [], acks = []}, true};
389
390%% And handle all the other cases
391handle_send_req(Tid, Req,
392		#state{conn_handle  = CH, req_sz = ReqSz,
393		       req_maxcount = MaxReqs, req_maxsize = MaxSz,
394		       reqs = Reqs, acks = Acks} = S) ->
395    case lists:keymember(Tid, 1, Reqs) of
396	true ->
397	    %% A re-send, time to send whatever we have in the store
398	    ?d("handle_send_req -> was a re-send, so flush",[]),
399	    handle_send_result( send_msg(CH, Reqs, Acks) ),
400	    {S#state{req_sz = 0, reqs = [], acks = []}, true};
401
402	false when length(Reqs) + 1 >= MaxReqs ->
403	    %% We finally passed the req-maxcount limit
404	    ?d("handle_send_req -> maxcount ~w passed", [MaxReqs]),
405	    handle_send_result(
406	      send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks)
407	     ),
408	    {S#state{req_sz = 0, reqs = [], acks = []}, true};
409
410	false when size(Req) + ReqSz >= MaxSz ->
411	    %% We finally passed the req-maxsize limit
412	    ?d("handle_send_req -> maxsize ~w passed", [MaxSz]),
413	    handle_send_result(
414	      send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks)
415	     ),
416	    {S#state{req_sz = 0, reqs = [], acks = []}, true};
417
418	false ->
419	    %% Still not time to send
420	    ?d("handle_send_req -> nothing to be sent",[]),
421	    {S#state{req_sz = ReqSz + size(Req), reqs = [{Tid, Req}|Reqs]},
422	     false}
423    end.
424
425
426%% We passed the req-maxcount limit: Time to send, atleast some of
427%% the stuff...
428handle_send_reqs(Tids, Reqs0,
429		 #state{conn_handle = CH,
430			req_maxsize = MaxSz, req_sz = ReqSz,
431			req_maxcount = MaxReqs, reqs = Reqs, acks = Acks} = S)
432  when length(Reqs0) + length(Reqs) >= MaxReqs ->
433    ?d("handle_send_reqs -> maxcount ~w: ~w, ~w",
434	[MaxSz,length(Reqs0),length(Reqs)]),
435    Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []),
436    {NewReqs, NewReqSz} = send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz),
437    ?d("handle_send_reqs -> sent:"
438	"~n   NewReqSz:        ~w"
439	"~n   length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),
440    {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true};
441
442%% We did not pass the req-maxcount limit, but we could have passed the
443%% req-maxsize limit, so maybe send...
444handle_send_reqs(Tids, Reqs0, #state{conn_handle = CH,
445				     req_maxsize = MaxSz, req_sz = ReqSz,
446				     reqs = Reqs, acks = Acks} = S) ->
447    ?d("handle_send_reqs -> not maxcount - maybe maxsize (~w)", [MaxSz]),
448    Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []),
449
450    case maybe_send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz, false) of
451	{NewReqs, NewReqSz, true} ->
452	    ?d("handle_send_reqs -> sent:"
453		"~n   NewReqSz:        ~w"
454		"~n   length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),
455	    {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true};
456	{NewReqs, NewReqSz, false} ->
457	    ?d("handle_send_reqs -> not sent:"
458		"~n   NewReqSz:        ~w"
459		"~n   length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),
460	    {S#state{req_sz = NewReqSz, reqs = NewReqs}, false}
461    end.
462
463merge_tids_and_reqs([], [], Reqs) ->
464    Reqs;
465merge_tids_and_reqs([Tid|Tids], [Req|Reqs], Acc) ->
466    merge_tids_and_reqs(Tids, Reqs, [{Tid,Req}|Acc]).
467
468%% We know that we shall send, so if maybe_send_reqs does not,
469%% we send it our self...
470send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz) ->
471    ?d("send_reqs -> entry when"
472	"~n   length(Reqs): ~w"
473	"~n   Acks:         ~w"
474	"~n   length(Acc):  ~w"
475	"~n   AccSz:        ~w", [length(Reqs), Acks, length(Acc), AccSz]),
476    case maybe_send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz, false) of
477	{NewReqs, _NewReqSz, false} ->
478	    ?d("send_reqs -> nothing sent yet"
479		"~n   length(NewReqs): ~w", [length(NewReqs)]),
480	    handle_send_result( send_msg(CH, NewReqs, Acks) ),
481	    {[], 0};
482	{NewReqs, NewReqSz, true} ->
483	    ?d("send_reqs -> something sent"
484		"~n   length(NewReqs): ~w"
485		"~n   NewReqSz:        ~w", [length(NewReqs), NewReqSz]),
486	    {NewReqs, NewReqSz}
487    end.
488
489
490maybe_send_reqs(_CH, [], _Acks, Acc, AccSz, _MaxSz, Sent) ->
491    ?d("maybe_send_reqs -> done when"
492	"~n   Sent:        ~w"
493	"~n   AccSz:       ~w"
494	"~n   length(Acc): ~w", [Sent, AccSz, length(Acc)]),
495    {Acc, AccSz, Sent};
496maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, _AccSz, MaxSz, _Sent)
497  when size(Req) >= MaxSz ->
498    %% The request was above the maxsize limit, so first send
499    %% what's in store and the the big request.
500    ?d("maybe_send_reqs -> entry when request [~w] size (~w) > max size"
501	"~n   Acks:        ~w"
502	"~n   length(Acc): ~w", [Tid, size(Req), Acks, length(Acc)]),
503    handle_send_result( send_msg(CH, Acc, Acks) ),
504    handle_send_result( send_msg(CH, [{Tid, Req}], []) ),
505    maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);
506maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, _Sent)
507  when AccSz + size(Req) >= MaxSz ->
508    %% We _did_ pass the maxsize limit with this request, so send
509    ?d("maybe_send_reqs -> entry when sum of requests (~w) > max size"
510	"~n   Tid:         ~w"
511	"~n   Acks:        ~w"
512	"~n   length(Acc): ~w", [Tid, size(Req) + AccSz, Acks, length(Acc)]),
513    handle_send_result( send_msg(CH, [{Tid, Req}|Acc], Acks) ),
514    maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);
515maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, Sent) ->
516    ?d("maybe_send_reqs -> entry when"
517	"~n   Tid:         ~w"
518	"~n   size(Req):   ~w"
519	"~n   Acks:        ~w"
520	"~n   length(Acc): ~w"
521	"~n   AccSz:       ~w", [Tid, size(Req), Acks, length(Acc), AccSz]),
522    NewAcc   = [{Tid,Req}|Acc],
523    NewAccSz = AccSz + size(Req),
524    maybe_send_reqs(CH, Reqs, Acks, NewAcc, NewAccSz, MaxSz, Sent).
525
526
527%%%-----------------------------------------------------------------
528
529send_pending(CH, Serial, Reqs, Acks) ->
530    ?d("send_pending -> entry with"
531	"~n   Serial:       ~w"
532	"~n   length(Reqs): ~w"
533	"~n   length(Acks): ~w", [Serial, length(Reqs), length(Acks)]),
534    case megaco_config:lookup_local_conn(CH) of
535	[CD] ->
536	    TP = #'TransactionPending'{transactionId = Serial},
537	    Pend = {transactionPending, TP},
538	    do_send_msg(CD, Pend, lists:reverse(Reqs), Acks);
539	[] ->
540	    ok
541    end.
542
543
544%% We need to check the size of the reply. If the reply itself is
545%% larger then the max limit, then it is sent in a separate message.
546send_reply(CH, Reply, MaxSz, _ReqSz, Reqs, Acks) ->
547    ?d("send_reply -> entry with"
548	"~n   length(Reqs): ~w"
549	"~n   length(Acks): ~w", [length(Reqs), length(Acks)]),
550    case megaco_config:lookup_local_conn(CH) of
551	[CD] when size(Reply) > MaxSz ->
552	    handle_send_result( send_msg(CD, lists:reverse(Reqs), Acks) ),
553	    Rep = {transactionReply, Reply},
554	    do_send_msg(CD, Rep, [], []);
555	[CD] ->
556	    Rep = {transactionReply, Reply},
557	    do_send_msg(CD, Rep, lists:reverse(Reqs), Acks);
558	[] ->
559	    ok
560    end.
561
562do_send_msg(CD, Trans, [], []) ->
563    Body   = {transactions, [Trans]},
564    Slogan = "send trans reply/pending",
565    ?d("do_send_msg -> ~s", [Slogan]),
566    megaco_messenger_misc:send_body(CD, Slogan, Body);
567do_send_msg(CD, Trans, Reqs0, []) ->
568    Reqs   = [{transactionRequest, Req} || {_, Req} <- Reqs0],
569    Body   = {transactions, [Trans|Reqs]},
570    Slogan = "send trans reply/pending and reqs",
571    ?d("do_send_msg -> ~s", [Slogan]),
572    megaco_messenger_misc:send_body(CD, Slogan, Body);
573do_send_msg(CD, Trans, [], SerialRanges) ->
574    Acks   = make_acks(ranges(SerialRanges), []),
575    Body   = {transactions, [Trans, {transactionResponseAck, Acks}]},
576    Slogan = "send trans reply/pending and acks",
577    ?d("do_send_msg -> ~s", [Slogan]),
578    megaco_messenger_misc:send_body(CD, Slogan, Body);
579do_send_msg(CD, Trans, Reqs0, SerialRanges) ->
580    Acks   = make_acks(ranges(SerialRanges), []),
581    Reqs   = [{transactionRequest, Req} || {_, Req} <- Reqs0],
582    Body   = {transactions, [Trans, {transactionResponseAck, Acks}|Reqs]},
583    Slogan = "send trans reply/pending, reqs and acks",
584    ?d("do_send_msg -> ~s", [Slogan]),
585    megaco_messenger_misc:send_body(CD, Slogan, Body).
586
587
588
589send_msg(_, [], []) ->
590    ok;
591send_msg(CH, Reqs, Serials) ->
592    case megaco_config:lookup_local_conn(CH) of
593	[ConnData] ->
594	    do_send_msg(ConnData, lists:reverse(Reqs), Serials);
595	[] ->
596	    ok
597    end.
598
599
600do_send_msg(CD, Reqs0, []) ->
601    ?d("do_send_msg -> entry with"
602	"~n   length(Reqs0): ~p", [length(Reqs0)]),
603    Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],
604    %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),
605    Body = {transactions, Reqs},
606    megaco_messenger_misc:send_body(CD, "send trans reqs", Body);
607do_send_msg(CD, [], SerialRanges) ->
608    ?d("do_send_msg -> entry with"
609 	"~n   SerialRanges: ~p", [SerialRanges]),
610    Acks = make_acks(ranges(SerialRanges), []),
611    %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),
612    Body = {transactions, [{transactionResponseAck, Acks}]},
613    megaco_messenger_misc:send_body(CD, "send trans acks", Body);
614do_send_msg(CD, Reqs0, SerialRanges) ->
615    ?d("do_send_msg -> entry with"
616	"~n   length(Reqs0): ~p"
617 	"~n   SerialRanges:  ~p", [length(Reqs0), SerialRanges]),
618    Acks = make_acks(ranges(SerialRanges), []),
619    Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],
620    %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),
621    Body = {transactions, [{transactionResponseAck, Acks}|Reqs]},
622    megaco_messenger_misc:send_body(CD, "send trans reqs and acks", Body).
623
624
625handle_send_result(ok) ->
626    ok;
627handle_send_result({ok, _}) ->
628    ok;
629handle_send_result({error, {send_message_cancelled, _Reason}}) ->
630    ok;
631handle_send_result({error, {send_message_failed, Reason}}) ->
632    error_msg("Failed sending message: ~n   ~p", [Reason]),
633    error;
634handle_send_result(Error) ->
635    error_msg("Failed sending message: ~n   ~p", [Error]),
636    error.
637
638
639ranges(L) ->
640    lists:reverse(ranges(lists:sort(L), [], [])).
641
642ranges([], Range, Ranges) ->
643    ranges2(Range, Ranges);
644ranges([S1|Sn], [S2|_] = Range, Ranges) when S1 == (S2+1) ->
645    ranges(Sn, [S1|Range], Ranges);
646ranges([S|Sn], Range, Ranges) ->
647    ranges(Sn, [S], ranges2(Range, Ranges)).
648
649ranges2([], Ranges) ->
650    Ranges;
651ranges2([S], Ranges) ->
652    [{S,S}|Ranges];
653ranges2(Range0, Ranges) ->
654    Range = lists:reverse(Range0),
655    [{hd(Range),lists:last(Range)}|Ranges].
656
657
658make_acks([], Acks) ->
659    lists:reverse(Acks);
660make_acks([{S,S}|SerialRanges], Acks) ->
661    TRA = #'TransactionAck'{firstAck = S},
662    make_acks(SerialRanges, [TRA|Acks]);
663make_acks([{F,L}|SerialRanges], Acks) ->
664    TRA = #'TransactionAck'{firstAck = F, lastAck = L},
665    make_acks(SerialRanges, [TRA|Acks]).
666
667
668
669%%%-----------------------------------------------------------------
670
671to(To, Start) ->
672    To - (t() - Start).
673
674%% Time in milli seconds
675t() ->
676    erlang:monotonic_time(milli_seconds).
677
678warning_msg(F, A) ->
679    ?megaco_warning("Transaction sender: " ++ F, A).
680
681error_msg(F, A) ->
682    ?megaco_error("Transaction sender: " ++ F, A).
683
684
685%%%-----------------------------------------------------------------
686%%% System messages handled here
687%%%-----------------------------------------------------------------
688
689system_continue(_Parent, _Dbg, {S,To}) ->
690    loop(S, To).
691
692system_terminate(Reason, _Parent, _Dbg, {S, _}) ->
693    #state{conn_handle = CH, reqs = Reqs, acks = Acks} = S,
694    send_msg(CH, Reqs, Acks),
695    exit(Reason).
696
697system_code_change(S, _Module, _OLdVsn, _Extra) ->
698    ?d("system_code_change -> entry", []),
699    {ok, S}.
700