1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2003-2016. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22%%---------------------------------------------------------------------- 23%% Purpose: Transaction sender process 24%%---------------------------------------------------------------------- 25 26-module(megaco_trans_sender). 27 28-export([start_link/5, 29 stop/1, 30 upgrade/2, 31 send_req/3, 32 send_reqs/3, 33 send_ack/2, 34 send_ack_now/2, 35 send_pending/2, 36 send_reply/2, 37 timeout/2, 38 ack_maxcount/2, 39 req_maxcount/2, 40 req_maxsize/2]). 41-export([system_continue/3, system_terminate/4, system_code_change/4]). 42-export([init/6]). 43 44 45-include_lib("megaco/include/megaco.hrl"). 46-include("megaco_message_internal.hrl"). 47-include_lib("megaco/src/app/megaco_internal.hrl"). 48 49 50-record(state, 51 { 52 parent, 53 conn_handle, 54 timeout, 55 req_sz = 0, 56 req_maxsize, %% Max total size of all accumulated reqs 57 req_maxcount, 58 ack_maxcount, 59 reqs = [], 60 acks = [] 61 }). 62 63 64%%%----------------------------------------------------------------- 65%%% Public API 66%%%----------------------------------------------------------------- 67start_link(CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) -> 68 ?d("start_link -> entry with" 69 "~n CH: ~p" 70 "~n To: ~p" 71 "~n MaxSzReqs: ~p" 72 "~n MaxNoReqs: ~p" 73 "~n MaxNoAcks: ~p", [CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]), 74 Args = [self(), CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks], 75 proc_lib:start_link(?MODULE, init, Args). 76 77stop(Pid) when is_pid(Pid) -> 78 Pid ! stop, 79 ok. 80 81upgrade(Pid, CH) when is_pid(Pid) -> 82 Pid ! {upgrade, CH}, 83 ok. 84 85send_req(Pid, Tid, Req) when is_pid(Pid) andalso is_binary(Req) -> 86 Pid ! {send_req, Tid, Req}, 87 ok. 88 89send_reqs(Pid, Tids, Reqs) 90 when is_pid(Pid) andalso 91 is_list(Tids) andalso 92 is_list(Reqs) andalso 93 (length(Tids) =:= length(Reqs)) -> 94 Pid ! {send_reqs, Tids, Reqs}, 95 ok. 96 97send_ack(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) -> 98 Pid ! {send_ack, Serial}, 99 ok. 100 101send_ack_now(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) -> 102 Pid ! {send_ack_now, Serial}, 103 ok. 104 105send_pending(Pid, Serial) when is_pid(Pid) andalso is_integer(Serial) -> 106 Pid ! {send_pending, Serial}, 107 ok. 108 109send_reply(Pid, Reply) when is_pid(Pid) andalso is_binary(Reply) -> 110 Pid ! {send_reply, Reply}. 111 112ack_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) -> 113 Pid ! {ack_maxcount, Max}, 114 ok. 115 116req_maxcount(Pid, Max) when is_pid(Pid) andalso is_integer(Max) -> 117 Pid ! {req_maxcount, Max}, 118 ok. 119 120req_maxsize(Pid, Max) when is_pid(Pid) andalso is_integer(Max) -> 121 Pid ! {req_maxsize, Max}, 122 ok. 123 124timeout(Pid, Timeout) when is_pid(Pid) -> 125 Pid ! {timeout, Timeout}, 126 ok. 127 128 129 130%%%----------------------------------------------------------------- 131%%% Internal exports 132%%%----------------------------------------------------------------- 133 134init(Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks) -> 135 ?d("init -> entry with" 136 "~n Parent: ~p" 137 "~n CH: ~p" 138 "~n To: ~p" 139 "~n MaxSzReqs: ~p" 140 "~n MaxNoReqs: ~p" 141 "~n MaxNoAcks: ~p", [Parent, CH, To, MaxSzReqs, MaxNoReqs, MaxNoAcks]), 142 process_flag(trap_exit, true), 143 proc_lib:init_ack(Parent, {ok, self()}), 144 S = #state{parent = Parent, 145 conn_handle = CH, 146 timeout = To, 147 req_maxsize = MaxSzReqs, 148 req_maxcount = MaxNoReqs, 149 ack_maxcount = MaxNoAcks}, 150 loop(S, To). 151 152 153%%%----------------------------------------------------------------- 154%%% Internal functions 155%%%----------------------------------------------------------------- 156%% idle (= empty) 157loop(#state{reqs = [], acks = [], timeout = Timeout} = S, _) -> 158 receive 159 {send_ack, Serial} -> 160 ?d("loop(empty) -> received send_ack [~w] request", [Serial]), 161 loop(S#state{acks = [Serial]}, Timeout); 162 163 {send_ack_now, Serial} -> 164 ?d("loop(empty) -> received send_ack_now [~w] request", [Serial]), 165 send_msg(S#state.conn_handle, [], [Serial]), 166 loop(S, Timeout); 167 168 {send_req, Tid, Req} when size(Req) >= S#state.req_maxsize -> 169 ?d("loop(empty) -> received (big) send_req request ~w", [Tid]), 170 send_msg(S#state.conn_handle, [{Tid, Req}], []), 171 loop(S, Timeout); 172 173 {send_req, Tid, Req} -> 174 ?d("loop(empty) -> received send_req request ~w", [Tid]), 175 loop(S#state{req_sz = size(Req), reqs = [{Tid,Req}]}, Timeout); 176 177 {send_reqs, Tids, Reqs} -> 178 ?d("loop(empty) -> received send_reqs request: ~w", [Tids]), 179 {NewS, _} = handle_send_reqs(Tids, Reqs, S), 180 loop(NewS, Timeout); 181 182 {send_pending, Serial} -> 183 ?d("loop(empty) -> received send_pending [~w] request", [Serial]), 184 handle_send_result( 185 send_pending(S#state.conn_handle, Serial, [], []) 186 ), 187 loop(S, Timeout); 188 189 {send_reply, Reply} -> 190 ?d("loop(empty) -> received send_reply request", []), 191 #state{conn_handle = CH, req_maxsize = MaxSz} = S, 192 handle_send_result( send_reply(CH, Reply, MaxSz, 0, [], []) ), 193 loop(S, Timeout); 194 195 {upgrade, CH} -> 196 ?d("loop(empty) -> received upgrade request:" 197 "~n CH: ~p", [CH]), 198 loop(S#state{conn_handle = CH}, Timeout); 199 200 {ack_maxcount, NewMax} -> 201 ?d("loop(empty) -> received ack_maxcount request", []), 202 loop(S#state{ack_maxcount = NewMax}, Timeout); 203 204 {req_maxcount, NewMax} -> 205 ?d("loop(empty) -> received req_maxcount request", []), 206 loop(S#state{req_maxcount = NewMax}, Timeout); 207 208 {req_maxsize, NewMax} -> 209 ?d("loop(empty) -> received req_maxsize request", []), 210 loop(S#state{req_maxsize = NewMax}, Timeout); 211 212 {timeout, NewTimeout} -> 213 ?d("loop(empty) -> received timeout request", []), 214 loop(S#state{timeout = NewTimeout}, NewTimeout); 215 216 stop -> 217 ?d("loop(empty) -> received stop request", []), 218 exit(normal); 219 220 {system, From, Msg} -> 221 ?d("loop(empty) -> received system message:" 222 "~n From: ~p" 223 "~n Msg: ~p", [From, Msg]), 224 Parent = S#state.parent, 225 sys:handle_system_msg(Msg, From, Parent, 226 ?MODULE, [], {S, Timeout}); 227 228 {'EXIT', Parent, Reason} when S#state.parent == Parent -> 229 ?d("loop(empty) -> received upgrade request", []), 230 exit(Reason); 231 232 M -> 233 warning_msg("received unexpected message (ignoring): " 234 "~n~p", [M]), 235 loop(S, Timeout) 236 237 end; 238 239%% active (= some acks or reqs waiting to to be sent) 240loop(#state{reqs = Reqs, acks = Acks, ack_maxcount = MaxAcks, 241 timeout = Timeout} = S, To) 242 when To >= 0 -> 243 Start = t(), 244 receive 245 {send_ack, Serial} when length(Acks) + 1 >= MaxAcks -> 246 ?d("loop(active,~w) -> " 247 "received [~w] send_ack [~w] request", 248 [To, length(Acks), Serial]), 249 handle_send_result( 250 send_msg(S#state.conn_handle, Reqs, [Serial|Acks]) 251 ), 252 loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); 253 254 {send_ack, Serial} -> 255 ?d("loop(active,~w) -> received send_ack [~w] request", 256 [To, Serial]), 257 loop(S#state{acks = [Serial|Acks]}, to(To, Start)); 258 259 {send_ack_now, Serial} -> 260 ?d("loop(active,~w) -> [~w,~w] " 261 "received send_ack_now [~w] request", 262 [To, length(Reqs), length(Acks), Serial]), 263 handle_send_result( 264 send_msg(S#state.conn_handle, Reqs, [Serial|Acks]) 265 ), 266 loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); 267 268 %% We need to check that this is not a resend!! 269 %% In that case, send whatever we have in store 270 {send_req, Tid, Req} -> 271 ?d("loop(active,~w) -> received send_req request ~w", [To,Tid]), 272 {NewS, NewT} = 273 case handle_send_req(Tid, Req, S) of 274 {S1, true} -> 275 {S1, Timeout}; 276 {S1, false} -> 277 {S1, to(To, Start)} 278 end, 279 loop(NewS, NewT); 280 281 {send_reqs, Tids, NewReqs} -> 282 ?d("loop(active,~w) -> received send_reqs request ~w", [To,Tids]), 283 {NewS, NewT} = 284 case handle_send_reqs(Tids, NewReqs, S) of 285 {S1, true} -> 286 {S1, Timeout}; 287 {S1, false} -> 288 {S1, to(To, Start)} 289 end, 290 loop(NewS, NewT); 291 292 {send_pending, Serial} -> 293 ?d("loop(active,~w) -> received send_pending [~w] request", 294 [To, Serial]), 295 handle_send_result( 296 send_pending(S#state.conn_handle, Serial, Reqs, Acks) 297 ), 298 loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); 299 300 {send_reply, Reply} -> 301 ?d("loop(active,~w) -> received send_reply request", [To]), 302 #state{conn_handle = CH, req_maxsize = MaxSz, req_sz = ReqSz} = S, 303 handle_send_result( 304 send_reply(CH, Reply, MaxSz, ReqSz, Reqs, Acks) 305 ), 306 loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout); 307 308 {upgrade, CH} -> 309 ?d("loop(active,~w) -> received upgrade request", [To]), 310 loop(S#state{conn_handle = CH}, to(To, Start)); 311 312 {req_maxsize, NewMax} -> 313 ?d("loop(active,~w) -> received req_maxsize request", [To]), 314 loop(S#state{req_maxsize = NewMax}, to(To, Start)); 315 316 {req_maxcount, NewMax} -> 317 ?d("loop(active,~w) -> received req_maxcount request", [To]), 318 loop(S#state{req_maxcount = NewMax}, to(To, Start)); 319 320 {ack_maxcount, NewMax} -> 321 ?d("loop(active,~w) -> received ack_maxcount request", [To]), 322 loop(S#state{ack_maxcount = NewMax}, to(To, Start)); 323 324 {timeout, NewTimeout} when NewTimeout > Timeout -> 325 ?d("loop(active,~w) -> received timeout request: ~w", 326 [To, NewTimeout]), 327 %% We need to recalculate To 328 NewTo = NewTimeout - (Timeout - to(To, Start)), 329 loop(S#state{timeout = NewTimeout}, NewTo); 330 331 {timeout, NewTimeout} -> 332 ?d("loop(active,~w) -> received timeout request: ~w", 333 [To, NewTimeout]), 334 %% We need to recalculate To 335 NewTo = to(To, Start) - (Timeout - NewTimeout), 336 loop(S#state{timeout = NewTimeout}, NewTo); 337 338 stop -> 339 ?d("loop(active,~w) -> received stop request", [To]), 340 handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), 341 exit(normal); 342 343 {system, From, Msg} -> 344 ?d("loop(active,~w) -> received system message:" 345 "~n From: ~p" 346 "~n Msg: ~p", [To, From, Msg]), 347 Parent = S#state.parent, 348 sys:handle_system_msg(Msg, From, Parent, 349 ?MODULE, [], {S, to(To, Start)}); 350 351 {'EXIT', Parent, Reason} when S#state.parent == Parent -> 352 ?d("loop(active,~w) -> received exit request", [To]), 353 exit(Reason); 354 355 M -> 356 warning_msg("received unexpected message (ignoring): " 357 "~n~p", [M]), 358 loop(S, to(To, Start)) 359 360 after To -> 361 ?d("loop(active,~w) -> timeout - time to send", [To]), 362 handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), 363 loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout) 364 end; 365 366loop(#state{reqs = Reqs, acks = Acks, timeout = Timeout} = S, _To) -> 367 ?d("loop(active) -> timeout [~w, ~w]", [length(Reqs),length(Acks)]), 368 handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), 369 loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout). 370 371 372%%%----------------------------------------------------------------- 373 374%% The request is itself larger then the max size, so first send 375%% everything we have stored in one message, and then the new request 376%% in another. 377%% Note that it does not matter if we with this request 378%% passed the maxcount limit. 379%% Note that this message cannot be a re-sent, since 380%% such a request would have been stored, but sent immediatly. 381handle_send_req(Tid, Req, 382 #state{conn_handle = CH, 383 req_maxsize = MaxSz, reqs = Reqs, acks = Acks} = S) 384 when size(Req) >= MaxSz -> 385 ?d("handle_send_req -> request bigger then maxsize ~w", [MaxSz]), 386 handle_send_result( send_msg(CH, Reqs, Acks) ), 387 handle_send_result( send_msg(CH, [{Tid, Req}], []) ), 388 {S#state{req_sz = 0, reqs = [], acks = []}, true}; 389 390%% And handle all the other cases 391handle_send_req(Tid, Req, 392 #state{conn_handle = CH, req_sz = ReqSz, 393 req_maxcount = MaxReqs, req_maxsize = MaxSz, 394 reqs = Reqs, acks = Acks} = S) -> 395 case lists:keymember(Tid, 1, Reqs) of 396 true -> 397 %% A re-send, time to send whatever we have in the store 398 ?d("handle_send_req -> was a re-send, so flush",[]), 399 handle_send_result( send_msg(CH, Reqs, Acks) ), 400 {S#state{req_sz = 0, reqs = [], acks = []}, true}; 401 402 false when length(Reqs) + 1 >= MaxReqs -> 403 %% We finally passed the req-maxcount limit 404 ?d("handle_send_req -> maxcount ~w passed", [MaxReqs]), 405 handle_send_result( 406 send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks) 407 ), 408 {S#state{req_sz = 0, reqs = [], acks = []}, true}; 409 410 false when size(Req) + ReqSz >= MaxSz -> 411 %% We finally passed the req-maxsize limit 412 ?d("handle_send_req -> maxsize ~w passed", [MaxSz]), 413 handle_send_result( 414 send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks) 415 ), 416 {S#state{req_sz = 0, reqs = [], acks = []}, true}; 417 418 false -> 419 %% Still not time to send 420 ?d("handle_send_req -> nothing to be sent",[]), 421 {S#state{req_sz = ReqSz + size(Req), reqs = [{Tid, Req}|Reqs]}, 422 false} 423 end. 424 425 426%% We passed the req-maxcount limit: Time to send, atleast some of 427%% the stuff... 428handle_send_reqs(Tids, Reqs0, 429 #state{conn_handle = CH, 430 req_maxsize = MaxSz, req_sz = ReqSz, 431 req_maxcount = MaxReqs, reqs = Reqs, acks = Acks} = S) 432 when length(Reqs0) + length(Reqs) >= MaxReqs -> 433 ?d("handle_send_reqs -> maxcount ~w: ~w, ~w", 434 [MaxSz,length(Reqs0),length(Reqs)]), 435 Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []), 436 {NewReqs, NewReqSz} = send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz), 437 ?d("handle_send_reqs -> sent:" 438 "~n NewReqSz: ~w" 439 "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), 440 {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true}; 441 442%% We did not pass the req-maxcount limit, but we could have passed the 443%% req-maxsize limit, so maybe send... 444handle_send_reqs(Tids, Reqs0, #state{conn_handle = CH, 445 req_maxsize = MaxSz, req_sz = ReqSz, 446 reqs = Reqs, acks = Acks} = S) -> 447 ?d("handle_send_reqs -> not maxcount - maybe maxsize (~w)", [MaxSz]), 448 Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []), 449 450 case maybe_send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz, false) of 451 {NewReqs, NewReqSz, true} -> 452 ?d("handle_send_reqs -> sent:" 453 "~n NewReqSz: ~w" 454 "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), 455 {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true}; 456 {NewReqs, NewReqSz, false} -> 457 ?d("handle_send_reqs -> not sent:" 458 "~n NewReqSz: ~w" 459 "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), 460 {S#state{req_sz = NewReqSz, reqs = NewReqs}, false} 461 end. 462 463merge_tids_and_reqs([], [], Reqs) -> 464 Reqs; 465merge_tids_and_reqs([Tid|Tids], [Req|Reqs], Acc) -> 466 merge_tids_and_reqs(Tids, Reqs, [{Tid,Req}|Acc]). 467 468%% We know that we shall send, so if maybe_send_reqs does not, 469%% we send it our self... 470send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz) -> 471 ?d("send_reqs -> entry when" 472 "~n length(Reqs): ~w" 473 "~n Acks: ~w" 474 "~n length(Acc): ~w" 475 "~n AccSz: ~w", [length(Reqs), Acks, length(Acc), AccSz]), 476 case maybe_send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz, false) of 477 {NewReqs, _NewReqSz, false} -> 478 ?d("send_reqs -> nothing sent yet" 479 "~n length(NewReqs): ~w", [length(NewReqs)]), 480 handle_send_result( send_msg(CH, NewReqs, Acks) ), 481 {[], 0}; 482 {NewReqs, NewReqSz, true} -> 483 ?d("send_reqs -> something sent" 484 "~n length(NewReqs): ~w" 485 "~n NewReqSz: ~w", [length(NewReqs), NewReqSz]), 486 {NewReqs, NewReqSz} 487 end. 488 489 490maybe_send_reqs(_CH, [], _Acks, Acc, AccSz, _MaxSz, Sent) -> 491 ?d("maybe_send_reqs -> done when" 492 "~n Sent: ~w" 493 "~n AccSz: ~w" 494 "~n length(Acc): ~w", [Sent, AccSz, length(Acc)]), 495 {Acc, AccSz, Sent}; 496maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, _AccSz, MaxSz, _Sent) 497 when size(Req) >= MaxSz -> 498 %% The request was above the maxsize limit, so first send 499 %% what's in store and the the big request. 500 ?d("maybe_send_reqs -> entry when request [~w] size (~w) > max size" 501 "~n Acks: ~w" 502 "~n length(Acc): ~w", [Tid, size(Req), Acks, length(Acc)]), 503 handle_send_result( send_msg(CH, Acc, Acks) ), 504 handle_send_result( send_msg(CH, [{Tid, Req}], []) ), 505 maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true); 506maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, _Sent) 507 when AccSz + size(Req) >= MaxSz -> 508 %% We _did_ pass the maxsize limit with this request, so send 509 ?d("maybe_send_reqs -> entry when sum of requests (~w) > max size" 510 "~n Tid: ~w" 511 "~n Acks: ~w" 512 "~n length(Acc): ~w", [Tid, size(Req) + AccSz, Acks, length(Acc)]), 513 handle_send_result( send_msg(CH, [{Tid, Req}|Acc], Acks) ), 514 maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true); 515maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, Sent) -> 516 ?d("maybe_send_reqs -> entry when" 517 "~n Tid: ~w" 518 "~n size(Req): ~w" 519 "~n Acks: ~w" 520 "~n length(Acc): ~w" 521 "~n AccSz: ~w", [Tid, size(Req), Acks, length(Acc), AccSz]), 522 NewAcc = [{Tid,Req}|Acc], 523 NewAccSz = AccSz + size(Req), 524 maybe_send_reqs(CH, Reqs, Acks, NewAcc, NewAccSz, MaxSz, Sent). 525 526 527%%%----------------------------------------------------------------- 528 529send_pending(CH, Serial, Reqs, Acks) -> 530 ?d("send_pending -> entry with" 531 "~n Serial: ~w" 532 "~n length(Reqs): ~w" 533 "~n length(Acks): ~w", [Serial, length(Reqs), length(Acks)]), 534 case megaco_config:lookup_local_conn(CH) of 535 [CD] -> 536 TP = #'TransactionPending'{transactionId = Serial}, 537 Pend = {transactionPending, TP}, 538 do_send_msg(CD, Pend, lists:reverse(Reqs), Acks); 539 [] -> 540 ok 541 end. 542 543 544%% We need to check the size of the reply. If the reply itself is 545%% larger then the max limit, then it is sent in a separate message. 546send_reply(CH, Reply, MaxSz, _ReqSz, Reqs, Acks) -> 547 ?d("send_reply -> entry with" 548 "~n length(Reqs): ~w" 549 "~n length(Acks): ~w", [length(Reqs), length(Acks)]), 550 case megaco_config:lookup_local_conn(CH) of 551 [CD] when size(Reply) > MaxSz -> 552 handle_send_result( send_msg(CD, lists:reverse(Reqs), Acks) ), 553 Rep = {transactionReply, Reply}, 554 do_send_msg(CD, Rep, [], []); 555 [CD] -> 556 Rep = {transactionReply, Reply}, 557 do_send_msg(CD, Rep, lists:reverse(Reqs), Acks); 558 [] -> 559 ok 560 end. 561 562do_send_msg(CD, Trans, [], []) -> 563 Body = {transactions, [Trans]}, 564 Slogan = "send trans reply/pending", 565 ?d("do_send_msg -> ~s", [Slogan]), 566 megaco_messenger_misc:send_body(CD, Slogan, Body); 567do_send_msg(CD, Trans, Reqs0, []) -> 568 Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], 569 Body = {transactions, [Trans|Reqs]}, 570 Slogan = "send trans reply/pending and reqs", 571 ?d("do_send_msg -> ~s", [Slogan]), 572 megaco_messenger_misc:send_body(CD, Slogan, Body); 573do_send_msg(CD, Trans, [], SerialRanges) -> 574 Acks = make_acks(ranges(SerialRanges), []), 575 Body = {transactions, [Trans, {transactionResponseAck, Acks}]}, 576 Slogan = "send trans reply/pending and acks", 577 ?d("do_send_msg -> ~s", [Slogan]), 578 megaco_messenger_misc:send_body(CD, Slogan, Body); 579do_send_msg(CD, Trans, Reqs0, SerialRanges) -> 580 Acks = make_acks(ranges(SerialRanges), []), 581 Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], 582 Body = {transactions, [Trans, {transactionResponseAck, Acks}|Reqs]}, 583 Slogan = "send trans reply/pending, reqs and acks", 584 ?d("do_send_msg -> ~s", [Slogan]), 585 megaco_messenger_misc:send_body(CD, Slogan, Body). 586 587 588 589send_msg(_, [], []) -> 590 ok; 591send_msg(CH, Reqs, Serials) -> 592 case megaco_config:lookup_local_conn(CH) of 593 [ConnData] -> 594 do_send_msg(ConnData, lists:reverse(Reqs), Serials); 595 [] -> 596 ok 597 end. 598 599 600do_send_msg(CD, Reqs0, []) -> 601 ?d("do_send_msg -> entry with" 602 "~n length(Reqs0): ~p", [length(Reqs0)]), 603 Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], 604 %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), 605 Body = {transactions, Reqs}, 606 megaco_messenger_misc:send_body(CD, "send trans reqs", Body); 607do_send_msg(CD, [], SerialRanges) -> 608 ?d("do_send_msg -> entry with" 609 "~n SerialRanges: ~p", [SerialRanges]), 610 Acks = make_acks(ranges(SerialRanges), []), 611 %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), 612 Body = {transactions, [{transactionResponseAck, Acks}]}, 613 megaco_messenger_misc:send_body(CD, "send trans acks", Body); 614do_send_msg(CD, Reqs0, SerialRanges) -> 615 ?d("do_send_msg -> entry with" 616 "~n length(Reqs0): ~p" 617 "~n SerialRanges: ~p", [length(Reqs0), SerialRanges]), 618 Acks = make_acks(ranges(SerialRanges), []), 619 Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], 620 %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), 621 Body = {transactions, [{transactionResponseAck, Acks}|Reqs]}, 622 megaco_messenger_misc:send_body(CD, "send trans reqs and acks", Body). 623 624 625handle_send_result(ok) -> 626 ok; 627handle_send_result({ok, _}) -> 628 ok; 629handle_send_result({error, {send_message_cancelled, _Reason}}) -> 630 ok; 631handle_send_result({error, {send_message_failed, Reason}}) -> 632 error_msg("Failed sending message: ~n ~p", [Reason]), 633 error; 634handle_send_result(Error) -> 635 error_msg("Failed sending message: ~n ~p", [Error]), 636 error. 637 638 639ranges(L) -> 640 lists:reverse(ranges(lists:sort(L), [], [])). 641 642ranges([], Range, Ranges) -> 643 ranges2(Range, Ranges); 644ranges([S1|Sn], [S2|_] = Range, Ranges) when S1 == (S2+1) -> 645 ranges(Sn, [S1|Range], Ranges); 646ranges([S|Sn], Range, Ranges) -> 647 ranges(Sn, [S], ranges2(Range, Ranges)). 648 649ranges2([], Ranges) -> 650 Ranges; 651ranges2([S], Ranges) -> 652 [{S,S}|Ranges]; 653ranges2(Range0, Ranges) -> 654 Range = lists:reverse(Range0), 655 [{hd(Range),lists:last(Range)}|Ranges]. 656 657 658make_acks([], Acks) -> 659 lists:reverse(Acks); 660make_acks([{S,S}|SerialRanges], Acks) -> 661 TRA = #'TransactionAck'{firstAck = S}, 662 make_acks(SerialRanges, [TRA|Acks]); 663make_acks([{F,L}|SerialRanges], Acks) -> 664 TRA = #'TransactionAck'{firstAck = F, lastAck = L}, 665 make_acks(SerialRanges, [TRA|Acks]). 666 667 668 669%%%----------------------------------------------------------------- 670 671to(To, Start) -> 672 To - (t() - Start). 673 674%% Time in milli seconds 675t() -> 676 erlang:monotonic_time(milli_seconds). 677 678warning_msg(F, A) -> 679 ?megaco_warning("Transaction sender: " ++ F, A). 680 681error_msg(F, A) -> 682 ?megaco_error("Transaction sender: " ++ F, A). 683 684 685%%%----------------------------------------------------------------- 686%%% System messages handled here 687%%%----------------------------------------------------------------- 688 689system_continue(_Parent, _Dbg, {S,To}) -> 690 loop(S, To). 691 692system_terminate(Reason, _Parent, _Dbg, {S, _}) -> 693 #state{conn_handle = CH, reqs = Reqs, acks = Acks} = S, 694 send_msg(CH, Reqs, Acks), 695 exit(Reason). 696 697system_code_change(S, _Module, _OLdVsn, _Extra) -> 698 ?d("system_code_change -> entry", []), 699 {ok, S}. 700