1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1999-2019. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22%%---------------------------------------------------------------------- 23%% Purpose: Send and process a (sequence of) Megaco/H.248 transactions 24%%---------------------------------------------------------------------- 25 26-module(megaco_messenger). 27 28%% Application internal export 29-export([ 30 process_received_message/4, process_received_message/5, 31 receive_message/4, receive_message/5, 32 connect/4, connect/5, 33 disconnect/2, 34 encode_actions/3, 35 call/3, 36 cast/3, 37 cancel/2, 38 request_timeout/2, 39 request_keep_alive_timeout/2, 40 pending_timeout/3, 41 reply_timeout/3, 42 segment_timeout/3, 43 %% segment_reply_timeout/4, 44 45 test_request/5, 46 test_reply/5 47 ]). 48 49%% MIB stat functions 50-export([ 51 get_stats/0, get_stats/1, get_stats/2, 52 reset_stats/0, reset_stats/1 53 ]). 54 55%% Misc functions 56-export([ 57 cleanup/2, 58 which_requests/1, which_replies/1 59 ]). 60 61%% Module internal export 62-export([ 63 process_received_message/6, 64 handle_request/2, 65 handle_long_request/2, 66 connect_remote/3, 67 disconnect_local/2, 68 disconnect_remote/3, 69 send_request_remote/4, 70 receive_reply_remote/2, receive_reply_remote/3 71 ]). 72 73-include_lib("megaco/include/megaco.hrl"). 74-include("megaco_message_internal.hrl"). 75-include_lib("megaco/src/app/megaco_internal.hrl"). 76 77%% N.B. Update cancel/1 with '_' when a new field is added 78-record(request, 79 {trans_id, 80 remote_mid, 81 timer_ref, % {short, Ref} | {long, Ref} 82 init_timer, 83 init_long_timer, 84 curr_timer, 85 version, 86 bytes, % {send, Data} | {no_send, Data}, Data = binary() | tuple() 87 send_handle, 88 user_mod, 89 user_args, 90 reply_action, % call | cast 91 reply_data, 92 seg_recv = [], % [integer()] (received segments) 93 init_seg_timer, 94 seg_timer_ref, 95 keep_alive_timer, % plain | integer() >= 0 96 keep_alive_ref % undefined | ref() 97 }). 98 99 100%% N.B. Update cancel/1 with '_' when a new field is added 101-record(reply, 102 { 103 trans_id, 104 local_mid, 105 state = prepare, % prepare | eval_request | waiting_for_ack | aborted 106 pending_timer_ref, 107 handler = undefined, % pid of the proc executing the callback func 108 timer_ref, 109 version, 110 %% bytes: Sent reply data: not acknowledged 111 bytes, % binary() | [{integer(), binary(), timer_ref()}] 112 ack_action, % discard_ack | {handle_ack, Data} 113 send_handle, 114 %% segments: Not sent reply data (segments) 115 segments = [], % [{integer(), binary()}] 116 user_mod, 117 user_args 118 }). 119 120-record(trans_id, 121 { 122 mid, 123 serial 124 }). 125 126 127-ifdef(MEGACO_TEST_CODE). 128-define(SIM(Other,Where), 129 fun(Afun,Bfun) -> 130 Kfun = {?MODULE,Bfun}, 131 case (catch ets:lookup(megaco_test_data, Kfun)) of 132 [{Kfun,Cfun}] -> 133 Cfun(Afun); 134 _ -> 135 Afun 136 end 137 end(Other,Where)). 138-define(TC_AWAIT_CANCEL_EVENT(), 139 case megaco_tc_controller:lookup(block_on_cancel) of 140 {value, {Tag, Pid}} when is_pid(Pid) -> 141 Pid ! {Tag, self()}, 142 receive 143 {Tag, Pid} -> 144 ok 145 end; 146 {value, {sleep, To}} when is_integer(To) andalso (To > 0) -> 147 receive after To -> ok end; 148 _ -> 149 ok 150 end). 151-define(TC_AWAIT_REPLY_EVENT(Info), 152 case megaco_tc_controller:lookup(block_on_reply) of 153 {value, {Tag, Pid}} when is_pid(Pid) -> 154 Pid ! {Tag, self(), Info}, 155 receive 156 {Tag, Pid} -> 157 ok 158 end; 159 _Whatever -> 160 %% io:format("Whatever: ~p~n", [Whatever]), 161 ok 162 end). 163-else. 164-define(SIM(Other,Where),Other). 165-define(TC_AWAIT_CANCEL_EVENT(),ok). 166-define(TC_AWAIT_REPLY_EVENT(_),ok). 167-endif. 168 169 170-define(report_pending_limit_exceeded(ConnData), 171 ?report_important(ConnData, "<ERROR> pending limit exceeded", [])). 172 173-ifdef(megaco_extended_trace). 174-define(rt1(T,F,A),?report_trace(T,F,A)). 175-define(rt2(F,A), ?rt1(ignore,F,A)). 176-define(rt3(F), ?rt2(F,[])). 177-else. 178-define(rt1(T,F,A),ok). 179-define(rt2(F,A), ok). 180-define(rt3(F), ok). 181-endif. 182 183 184%%---------------------------------------------------------------------- 185%% SNMP statistics handling functions 186%%---------------------------------------------------------------------- 187 188%%----------------------------------------------------------------- 189%% Func: get_stats/0, get_stats/1, get_stats/2 190%% Description: Retreive statistics (counters) for TCP 191%%----------------------------------------------------------------- 192 193get_stats() -> 194 megaco_stats:get_stats(megaco_stats). 195 196get_stats(ConnHandleOrCounter) -> 197 megaco_stats:get_stats(megaco_stats, ConnHandleOrCounter). 198 199get_stats(ConnHandle, Counter) -> 200 megaco_stats:get_stats(megaco_stats, ConnHandle, Counter). 201 202 203%%----------------------------------------------------------------- 204%% Func: reset_stats/0, reaet_stats/1 205%% Description: Reset statistics (counters) 206%%----------------------------------------------------------------- 207 208reset_stats() -> 209 megaco_stats:reset_stats(megaco_stats). 210 211reset_stats(ConnHandleOrCounter) -> 212 megaco_stats:reset_stats(megaco_stats, ConnHandleOrCounter). 213 214 215 216%%---------------------------------------------------------------------- 217%% cleanup utility functions 218%%---------------------------------------------------------------------- 219 220cleanup(#megaco_conn_handle{local_mid = LocalMid}, Force) 221 when (Force =:= true) orelse (Force =:= false) -> 222 Pat = #reply{trans_id = '$1', 223 local_mid = LocalMid, 224 state = '$2', 225 _ = '_'}, 226 do_cleanup(Pat, Force); 227cleanup(LocalMid, Force) 228 when (Force =:= true) orelse (Force =:= false) -> 229 Pat = #reply{trans_id = '$1', 230 local_mid = LocalMid, 231 state = '$2', 232 _ = '_'}, 233 do_cleanup(Pat, Force). 234 235do_cleanup(Pat, Force) -> 236 Match = megaco_monitor:which_replies(Pat), 237 Reps = [{V1, V2} || [V1, V2] <- Match], 238 do_cleanup2(Reps, Force). 239 240do_cleanup2([], _) -> 241 ok; 242do_cleanup2([{TransId, aborted}|T], Force = false) -> 243 megaco_monitor:delete_reply(TransId), 244 do_cleanup2(T, Force); 245do_cleanup2([_|T], Force = false) -> 246 do_cleanup2(T, Force); 247do_cleanup2([{TransId, _State}|T], Force = true) -> 248 megaco_monitor:delete_reply(TransId), 249 do_cleanup2(T, Force). 250 251 252%%---------------------------------------------------------------------- 253%% which_requests and which_replies utility functions 254%%---------------------------------------------------------------------- 255 256which_requests(#megaco_conn_handle{local_mid = LocalMid, 257 remote_mid = RemoteMid}) -> 258 Pat1 = #trans_id{mid = LocalMid, 259 serial = '$1', _ = '_'}, 260 Pat2 = #request{trans_id = Pat1, 261 remote_mid = RemoteMid, 262 _ = '_'}, 263 Match = megaco_monitor:which_requests(Pat2), 264 [S || [S] <- Match]; 265which_requests(LocalMid) -> 266 Pat1 = #trans_id{mid = LocalMid, 267 serial = '$1', _ = '_'}, 268 Pat2 = #request{trans_id = Pat1, 269 remote_mid = '$2', _ = '_'}, 270 Match0 = megaco_monitor:which_requests(Pat2), 271 Match1 = [{mk_ch(LocalMid, V2), V1} || [V1, V2] <- Match0], 272 which_requests1(lists:sort(Match1)). 273 274which_requests1([]) -> 275 []; 276which_requests1([{CH, S}|T]) -> 277 which_requests2(T, CH, [S], []). 278 279which_requests2([], CH, Serials, Reqs) -> 280 lists:reverse([{CH, Serials}|Reqs]); 281which_requests2([{CH, S}|T], CH, Serials, Reqs) -> 282 which_requests2(T, CH, [S|Serials], Reqs); 283which_requests2([{CH1, S}|T], CH2, Serials, Reqs) -> 284 which_requests2(T, CH1, [S], [{CH2, lists:reverse(Serials)}| Reqs]). 285 286 287which_replies(#megaco_conn_handle{local_mid = LocalMid, 288 remote_mid = RemoteMid}) -> 289 Pat1 = #trans_id{mid = RemoteMid, 290 serial = '$1', _ = '_'}, 291 Pat2 = #reply{trans_id = Pat1, 292 local_mid = LocalMid, 293 state = '$2', 294 handler = '$3', _ = '_'}, 295 Match = megaco_monitor:which_replies(Pat2), 296 [{V1, V2, V3} || [V1, V2, V3] <- Match]; 297which_replies(LocalMid) -> 298 Pat1 = #trans_id{mid = '$1', 299 serial = '$2', _ = '_'}, 300 Pat2 = #reply{trans_id = Pat1, 301 local_mid = LocalMid, 302 state = '$3', 303 handler = '$4', _ = '_'}, 304 Match0 = megaco_monitor:which_replies(Pat2), 305 Match1 = [{mk_ch(LocalMid,V1),{V2,V3,V4}} || [V1, V2, V3, V4] <- Match0], 306 which_replies1(lists:sort(Match1)). 307 308which_replies1([]) -> 309 []; 310which_replies1([{CH, Data}|T]) -> 311 which_replies2(T, CH, [Data], []). 312 313which_replies2([], CH, Data, Reps) -> 314 lists:reverse([{CH, Data}|Reps]); 315which_replies2([{CH, Data}|T], CH, Datas, Reps) -> 316 which_replies2(T, CH, [Data|Datas], Reps); 317which_replies2([{CH1, Data}|T], CH2, Datas, Reps) -> 318 which_replies2(T, CH1, [Data], [{CH2, lists:reverse(Datas)}| Reps]). 319 320 321mk_ch(LM, RM) -> 322 #megaco_conn_handle{local_mid = LM, remote_mid = RM}. 323 324 325%%---------------------------------------------------------------------- 326%% Register/unreister connections 327%%---------------------------------------------------------------------- 328 329%% Returns {ok, ConnHandle} | {error, Reason} 330autoconnect(RH, RemoteMid, SendHandle, ControlPid, Extra) 331 when is_record(RH, megaco_receive_handle) -> 332 ?rt2("autoconnect", [RH, RemoteMid, SendHandle, ControlPid]), 333 case megaco_config:autoconnect(RH, RemoteMid, SendHandle, ControlPid) of 334 {ok, ConnData} -> 335 do_connect(ConnData, Extra); 336 {error, Reason} -> 337 {error, Reason} 338 end; 339autoconnect(BadHandle, _CH, _SendHandle, _ControlPid, _Extra) -> 340 {error, {bad_receive_handle, BadHandle}}. 341 342connect(RH, RemoteMid, SendHandle, ControlPid) -> 343 Extra = ?default_user_callback_extra, 344 connect(RH, RemoteMid, SendHandle, ControlPid, Extra). 345connect(RH, RemoteMid, SendHandle, ControlPid, Extra) 346 when is_record(RH, megaco_receive_handle) -> 347 ?rt2("connect", [RH, RemoteMid, SendHandle, ControlPid, Extra]), 348 349 %% The purpose of this is to have a temoporary process, to 350 %% which one can set up a monitor or link and get a 351 %% notification when process exits. The entire connect is 352 %% done in the temporary worker process. 353 %% When it exits, the connect is either successfully done 354 %% or it failed. 355 356 ConnectorFun = 357 fun() -> 358 359 ConnectResult = 360 case megaco_config:connect(RH, RemoteMid, 361 SendHandle, ControlPid) of 362 {ok, ConnData} -> 363 do_connect(ConnData, Extra); 364 {error, Reason} -> 365 {error, Reason} 366 end, 367 ?rt2("connector: connected", [self(), ConnectResult]), 368 exit({result, ConnectResult}) 369 end, 370 Flag = process_flag(trap_exit, true), 371 Connector = erlang:spawn_link(ConnectorFun), 372 receive 373 {'EXIT', Connector, {result, ConnectResult}} -> 374 ?rt2("connect result: received expected connector exit signal", 375 [Connector, ConnectResult]), 376 process_flag(trap_exit, Flag), 377 ConnectResult; 378 {'EXIT', Connector, OtherReason} -> 379 ?rt2("connect exit: received unexpected connector exit signal", 380 [Connector, OtherReason]), 381 process_flag(trap_exit, Flag), 382 {error, OtherReason} 383 end; 384connect(BadHandle, _CH, _SendHandle, _ControlPid, _Extra) -> 385 {error, {bad_receive_handle, BadHandle}}. 386 387do_connect(CD, Extra) -> 388 CH = CD#conn_data.conn_handle, 389 Version = CD#conn_data.protocol_version, 390 UserMod = CD#conn_data.user_mod, 391 UserArgs = CD#conn_data.user_args, 392 Args = 393 case Extra of 394 ?default_user_callback_extra -> 395 [CH, Version | UserArgs]; 396 _ -> 397 [CH, Version, Extra | UserArgs] 398 end, 399 ?report_trace(CD, "callback: connect", [Args]), 400 Res = (catch apply(UserMod, handle_connect, Args)), 401 ?report_debug(CD, "return: connect", [{return, Res}]), 402 case Res of 403 ok -> 404 ?SIM(ok, do_connect), % do_encode), 405 monitor_process(CH, CD#conn_data.control_pid); 406 error -> 407 megaco_config:disconnect(CH), 408 {error, {connection_refused, CD, error}}; 409 {error, ED} when is_record(ED,'ErrorDescriptor') -> 410 megaco_config:disconnect(CH), 411 {error, {connection_refused, CD, ED}}; 412 _Error -> 413 warning_msg("connect callback failed: ~w", [Res]), 414 megaco_config:disconnect(CH), 415 {error, {connection_refused, CD, Res}} 416 end. 417 418finish_connect(#conn_data{control_pid = ControlPid} = CD) 419 when is_pid(ControlPid) andalso (node(ControlPid) =:= node()) -> 420 ?rt1(CD, "finish local connect", [ControlPid]), 421 do_finish_connect(CD); 422finish_connect(#conn_data{conn_handle = CH, 423 control_pid = ControlPid} = CD) 424 when is_pid(ControlPid) andalso (node(ControlPid) =/= node()) -> 425 ?rt1(CD, "finish remote connect", [ControlPid]), 426 RemoteNode = node(ControlPid), 427 UserMonitorPid = whereis(megaco_monitor), 428 Args = [CH, ControlPid, UserMonitorPid], 429 case rpc:call(RemoteNode, ?MODULE, connect_remote, Args) of 430 {ok, ControlMonitorPid} -> 431 do_finish_connect(CD#conn_data{control_pid = ControlMonitorPid}); 432 {error, Reason} -> 433 disconnect(CH, {connect_remote, Reason}), 434 {error, Reason}; 435 {badrpc, Reason} -> 436 Reason2 = {'EXIT', Reason}, 437 disconnect(CH, {connect_remote, Reason2}), 438 {error, Reason2} 439 end. 440 441do_finish_connect(#conn_data{conn_handle = CH, 442 send_handle = SendHandle, 443 control_pid = ControlPid} = CD) -> 444 M = ?MODULE, 445 F = disconnect_local, 446 A = [CH], 447 MFA = {M, F, A}, 448 case megaco_config:finish_connect(CH, SendHandle, ControlPid, MFA) of 449 {ok, Ref} -> 450 {ok, CD#conn_data{monitor_ref = Ref}}; 451 {error, Reason} -> 452 {error, {config_update, Reason}} 453 end. 454 455 456monitor_process(CH, ControlPid) 457 when is_pid(ControlPid) andalso (node(ControlPid) =:= node()) -> 458 M = ?MODULE, 459 F = disconnect_local, 460 A = [CH], 461 Ref = megaco_monitor:apply_at_exit(M, F, A, ControlPid), 462 case megaco_config:update_conn_info(CH, monitor_ref, Ref) of 463 ok -> 464 ?SIM({ok, CH}, monitor_process_local); 465 {error, Reason} -> 466 disconnect(CH, {config_update, Reason}), 467 {error, Reason} 468 end; 469monitor_process(CH, ControlPid) 470 when is_pid(ControlPid) andalso (node(ControlPid) =/= node()) -> 471 RemoteNode = node(ControlPid), 472 UserMonitorPid = whereis(megaco_monitor), 473 Args = [CH, ControlPid, UserMonitorPid], 474 case rpc:call(RemoteNode, ?MODULE, connect_remote, Args) of 475 {ok, ControlMonitorPid} -> 476 M = ?MODULE, 477 F = disconnect_local, 478 A = [CH], 479 Ref = megaco_monitor:apply_at_exit(M, F, A, ControlMonitorPid), 480 case megaco_config:update_conn_info(CH, monitor_ref, Ref) of 481 ok -> 482 ?SIM({ok, CH}, monitor_process_remote); 483 {error, Reason} -> 484 disconnect(CH, {config_update, Reason}), 485 {error, Reason} 486 end; 487 {error, Reason} -> 488 disconnect(CH, {connect_remote, Reason}), 489 {error, Reason}; 490 {badrpc, Reason} -> 491 Reason2 = {'EXIT', Reason}, 492 disconnect(CH, {connect_remote, Reason2}), 493 {error, Reason2} 494 end; 495monitor_process(CH, undefined = _ControlPid) -> 496 %% We have to do this later (setting up the monitor), 497 %% when the first message arrives. The 'connected' atom is 498 %% the indication for the first arriving message to finish 499 %% the connect. 500 %% This may be the case when an MGC performs a pre-connect 501 %% in order to speed up the handling of an (expected) connecting 502 %% MG. 503 case megaco_config:update_conn_info(CH, monitor_ref, connected) of 504 ok -> 505 ?SIM({ok, CH}, monitor_process_local); 506 {error, Reason} -> 507 disconnect(CH, {config_update, Reason}), 508 {error, Reason} 509 end. 510 511connect_remote(CH, ControlPid, UserMonitorPid) 512 when node(ControlPid) =:= node() andalso node(UserMonitorPid) =/= node() -> 513 case megaco_config:lookup_local_conn(CH) of 514 [_ConnData] -> 515 UserNode = node(UserMonitorPid), 516 M = ?MODULE, 517 F = disconnect_remote, 518 A = [CH, UserNode], 519 Ref = megaco_monitor:apply_at_exit(M, F, A, UserMonitorPid), 520 case megaco_config:connect_remote(CH, UserNode, Ref) of 521 ok -> 522 ControlMonitorPid = whereis(megaco_monitor), 523 ?SIM({ok, ControlMonitorPid}, connect_remote); 524 {error, Reason} -> 525 {error, Reason} 526 end; 527 [] -> 528 {error, {no_connection, CH}} 529 end. 530 531cancel_apply_at_exit({connecting, _ConnectorPid}) -> 532 ok; 533cancel_apply_at_exit(connected) -> 534 ok; 535cancel_apply_at_exit(ControlRef) -> 536 megaco_monitor:cancel_apply_at_exit(ControlRef). 537 538node_of_control_pid(Pid) when is_pid(Pid) -> 539 node(Pid); 540node_of_control_pid(_) -> 541 node(). 542 543disconnect(ConnHandle, DiscoReason) 544 when is_record(ConnHandle, megaco_conn_handle) -> 545 case megaco_config:disconnect(ConnHandle) of 546 {ok, ConnData, RemoteConnData} -> 547 ControlRef = ConnData#conn_data.monitor_ref, 548 cancel_apply_at_exit(ControlRef), 549 handle_disconnect_callback(ConnData, DiscoReason), 550 ControlNode = node_of_control_pid(ConnData#conn_data.control_pid), 551 case ControlNode =:= node() of 552 true -> 553 %% Propagate to remote users 554 CancelFun = 555 fun(RCD) -> 556 UserRef = RCD#remote_conn_data.monitor_ref, 557 cancel_apply_at_exit(UserRef), 558 RCD#remote_conn_data.user_node 559 end, 560 Nodes = lists:map(CancelFun, RemoteConnData), 561 %% io:format("NODES: ~p~n", [Nodes]), 562 M = ?MODULE, 563 F = disconnect, 564 A = [ConnHandle, DiscoReason], 565 case rpc:multicall(Nodes, M, F, A) of 566 {Res, []} -> 567 Check = fun(ok) -> false; 568 ({error, {no_connection, _CH}}) -> false; 569 (_) -> true 570 end, 571 case lists:filter(Check, Res) of 572 [] -> 573 ok; 574 Bad -> 575 {error, {remote_disconnect_error, ConnHandle, Bad}} 576 end; 577 {_Res, Bad} -> 578 {error, {remote_disconnect_crash, ConnHandle, Bad}} 579 end; 580 false when (RemoteConnData =:= []) -> 581 %% Propagate to remote control node 582 M = ?MODULE, 583 F = disconnect_remote, 584 A = [DiscoReason, ConnHandle, node()], 585 case rpc:call(ControlNode, M, F, A) of 586 {badrpc, Reason} -> 587 {error, {'EXIT', Reason}}; 588 Other -> 589 Other 590 end 591 end; 592 {error, Reason} -> 593 {error, Reason} 594 end; 595disconnect(BadHandle, Reason) -> 596 {error, {bad_conn_handle, BadHandle, Reason}}. 597 598disconnect_local(Reason, ConnHandle) -> 599 disconnect(ConnHandle, {no_controlling_process, Reason}). 600 601disconnect_remote(_Reason, ConnHandle, UserNode) -> 602 case megaco_config:disconnect_remote(ConnHandle, UserNode) of 603 [RCD] -> 604 Ref = RCD#remote_conn_data.monitor_ref, 605 cancel_apply_at_exit(Ref), 606 ok; 607 [] -> 608 {error, {no_connection, ConnHandle}} 609 end. 610 611 612%%---------------------------------------------------------------------- 613%% Handle incoming message 614%%---------------------------------------------------------------------- 615 616receive_message(ReceiveHandle, ControlPid, SendHandle, Bin) -> 617 Extra = ?default_user_callback_extra, 618 receive_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra). 619 620receive_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra) -> 621 Opts = [link , {min_heap_size, 5000}], 622 spawn_opt(?MODULE, 623 process_received_message, 624 [ReceiveHandle, ControlPid, SendHandle, Bin, self(), Extra], Opts), 625 ok. 626 627%% This function is called via the spawn_opt function with the link 628%% option, therefor the unlink before the exit. 629process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Receiver, 630 Extra) -> 631 process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra), 632 unlink(Receiver), 633 exit(normal). 634 635process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin) -> 636 Extra = ?default_user_callback_extra, 637 process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra). 638 639process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Extra) -> 640 Flag = process_flag(trap_exit, true), 641 case prepare_message(ReceiveHandle, SendHandle, Bin, ControlPid, Extra) of 642 {ok, ConnData, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') -> 643 ?rt1(ConnData, "message prepared", [MegaMsg]), 644 Mess = MegaMsg#'MegacoMessage'.mess, 645 case Mess#'Message'.messageBody of 646 {transactions, Transactions} -> 647 {AckList, ReqList} = 648 prepare_trans(ConnData, Transactions, [], [], Extra), 649 handle_acks(AckList, Extra), 650 case ReqList of 651 [] -> 652 ?rt3("no transaction requests"), 653 ignore; 654 [Req|Reqs] when (ConnData#conn_data.threaded =:= true) -> 655 ?rt3("handle requests (spawned)"), 656 lists:foreach( 657 fun(R) -> 658 spawn(?MODULE, handle_request, [R, Extra]) 659 end, 660 Reqs), 661 handle_request(Req, Extra); 662 _ -> 663 ?rt3("handle requests"), 664 case handle_requests(ReqList, [], Extra) of 665 [] -> 666 ignore; 667 [LongRequest | More] -> 668 lists:foreach( 669 fun(LR) -> 670 spawn(?MODULE, handle_long_request, [LR, Extra]) 671 end, 672 More), 673 handle_long_request(LongRequest, Extra) 674 end 675 end; 676 {messageError, Error} -> 677 handle_message_error(ConnData, Error, Extra) 678 end; 679 {silent_fail, ConnData, {_Code, Reason, Error}} -> 680 ?report_debug(ConnData, Reason, [no_reply, Error]), 681 ignore; 682 {verbose_fail, ConnData, {Code, Reason, Error}} -> 683 ?report_debug(ConnData, Reason, [Error]), 684 send_message_error(ConnData, Code, Reason) 685 end, 686 process_flag(trap_exit, Flag), 687 ok. 688 689prepare_message(RH, SH, Bin, Pid, Extra) 690 when is_record(RH, megaco_receive_handle) andalso is_pid(Pid) -> 691 ?report_trace(RH, "receive bytes", [{bytes, Bin}]), 692 EncodingMod = RH#megaco_receive_handle.encoding_mod, 693 EncodingConfig = RH#megaco_receive_handle.encoding_config, 694 ProtVersion = RH#megaco_receive_handle.protocol_version, 695 case (catch EncodingMod:decode_message(EncodingConfig, ProtVersion, Bin)) of 696 {ok, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') -> 697 ?report_trace(RH, "receive message", [{message, MegaMsg}]), 698 Mess = MegaMsg#'MegacoMessage'.mess, 699 RemoteMid = Mess#'Message'.mId, 700 Version = Mess#'Message'.version, 701 LocalMid = RH#megaco_receive_handle.local_mid, 702 CH = #megaco_conn_handle{local_mid = LocalMid, 703 remote_mid = RemoteMid}, 704 case megaco_config:lookup_local_conn(CH) of 705 706 %% 707 %% Message is not of the negotiated version 708 %% 709 710 [#conn_data{protocol_version = NegVersion, 711 strict_version = true} = ConnData] 712 when NegVersion =/= Version -> 713 %% Use already established connection, 714 %% but incorrect version 715 ?rt1(ConnData, "not negotiated version", [Version]), 716 Error = {error, {not_negotiated_version, 717 NegVersion, Version}}, 718 handle_syntax_error_callback(RH, ConnData, 719 prepare_error(Error), 720 Extra); 721 722 723 [ConnData] -> 724 725 %% 726 %% Use an already established connection 727 %% 728 %% This *may* have been set up in the 729 %% "non-official" way, so we may need to 730 %% create the monitor to the control process 731 %% and store the SendHandle (which is normally 732 %% done when creating the "temporary" connection). 733 %% 734 735 ?rt1(ConnData, "use already established connection", []), 736 ConnData2 = ConnData#conn_data{send_handle = SH, 737 control_pid = Pid, 738 protocol_version = Version}, 739 check_message_auth(CH, ConnData2, MegaMsg, Bin); 740 741 [] -> 742 %% Setup a temporary connection 743 ?rt3("setup a temporary connection"), 744 case autoconnect(RH, RemoteMid, SH, Pid, Extra) of 745 {ok, _} -> 746 do_prepare_message(RH, CH, SH, MegaMsg, Pid, Bin); 747 {error, {already_connected, _ConnHandle}} -> 748 do_prepare_message(RH, CH, SH, MegaMsg, Pid, Bin); 749 {error, {connection_refused, ConnData, Reason}} -> 750 Error = prepare_error({error, {connection_refused, Reason}}), 751 {verbose_fail, ConnData, Error}; 752 {error, Reason} -> 753 ConnData = fake_conn_data(RH, RemoteMid, SH, Pid), 754 ConnData2 = ConnData#conn_data{protocol_version = Version}, 755 Error = prepare_error({error, Reason}), 756 {verbose_fail, ConnData2, Error} 757 end 758 end; 759 Error -> 760 ?rt2("decode error", [Error]), 761 ConnData = handle_decode_error(Error, 762 RH, SH, Bin, Pid, 763 EncodingMod, 764 EncodingConfig, 765 ProtVersion), 766 handle_syntax_error_callback(RH, ConnData, prepare_error(Error), Extra) 767 end; 768prepare_message(RH, SendHandle, _Bin, ControlPid, _Extra) -> 769 ConnData = fake_conn_data(RH, SendHandle, ControlPid), 770 Error = prepare_error({'EXIT', {bad_receive_handle, RH}}), 771 {verbose_fail, ConnData, Error}. 772 773 774handle_decode_error({error, {unsupported_version, _}}, 775 #megaco_receive_handle{local_mid = LocalMid} = RH, SH, 776 Bin, Pid, 777 EM, EC, V) -> 778 case (catch EM:decode_mini_message(EC, V, Bin)) of 779 {ok, #'MegacoMessage'{mess = #'Message'{version = _Ver, 780 mId = RemoteMid}}} -> 781 ?rt2("erroneous message received", [SH, RemoteMid, _Ver]), 782 CH = #megaco_conn_handle{local_mid = LocalMid, 783 remote_mid = RemoteMid}, 784 incNumErrors(CH), 785 %% We cannot put the version into conn-data, that will 786 %% make the resulting error message impossible to sent 787 %% (unsupported version) 788 case megaco_config:lookup_local_conn(CH) of 789 [ConnData] -> 790 ?rt3("known to us"), 791 ConnData#conn_data{send_handle = SH}; 792 [] -> 793 ?rt3("unknown to us"), 794 ConnData = fake_conn_data(RH, SH, Pid), 795 ConnData#conn_data{conn_handle = CH} 796 end; 797 798 _ -> 799 ?rt2("erroneous message received", [SH]), 800 incNumErrors(), 801 fake_conn_data(RH, SH, Pid) 802 end; 803 804handle_decode_error(_, 805 #megaco_receive_handle{local_mid = LocalMid} = RH, SH, 806 Bin, Pid, 807 EM, EC, V) -> 808 case (catch EM:decode_mini_message(EC, V, Bin)) of 809 {ok, #'MegacoMessage'{mess = #'Message'{version = Ver, 810 mId = RemoteMid}}} -> 811 ?rt2("erroneous message received", [SH, Ver, RemoteMid]), 812 CH = #megaco_conn_handle{local_mid = LocalMid, 813 remote_mid = RemoteMid}, 814 incNumErrors(CH), 815 case megaco_config:lookup_local_conn(CH) of 816 [ConnData] -> 817 ?rt3("known to us"), 818 ConnData#conn_data{send_handle = SH, 819 protocol_version = Ver}; 820 [] -> 821 ?rt3("unknown to us"), 822 ConnData = fake_conn_data(RH, SH, Pid), 823 ConnData#conn_data{conn_handle = CH, 824 protocol_version = Ver} 825 end; 826 827 _ -> 828 ?rt2("erroneous message received", [SH]), 829 incNumErrors(), 830 fake_conn_data(RH, SH, Pid) 831 end. 832 833 834do_prepare_message(RH, CH, SendHandle, MegaMsg, ControlPid, Bin) -> 835 case megaco_config:lookup_local_conn(CH) of 836 [ConnData] -> 837 case check_message_auth(CH, ConnData, MegaMsg, Bin) of 838 {ok, ConnData2, MegaMsg} -> 839 %% Let the connection be permanent 840 {ok, ConnData2, MegaMsg}; 841 {ReplyTag, ConnData, Reason} -> 842 %% Remove the temporary connection 843 disconnect(CH, {bad_auth, Reason}), 844 {ReplyTag, ConnData, Reason} 845 end; 846 [] -> 847 Reason = no_connection, 848 disconnect(CH, Reason), 849 RemoteMid = CH#megaco_conn_handle.remote_mid, 850 ConnData = fake_conn_data(RH, RemoteMid, SendHandle, ControlPid), 851 Error = prepare_error({error, Reason}), 852 {silent_fail, ConnData, Error} 853 end. 854 855check_message_auth(_ConnHandle, ConnData, MegaMsg, Bin) -> 856 MsgAuth = MegaMsg#'MegacoMessage'.authHeader, 857 Mess = MegaMsg#'MegacoMessage'.mess, 858 Version = Mess#'Message'.version, 859 ConnData2 = ConnData#conn_data{protocol_version = Version}, 860 ConnAuth = ConnData2#conn_data.auth_data, 861 ?report_trace(ConnData2, "check message auth", [{bytes, Bin}]), 862 if 863 (MsgAuth =:= asn1_NOVALUE) andalso (ConnAuth =:= asn1_NOVALUE) -> 864 ?SIM({ok, ConnData2, MegaMsg}, check_message_auth); 865 true -> 866 ED = #'ErrorDescriptor'{errorCode = ?megaco_unauthorized, 867 errorText = "Autentication is not supported"}, 868 {verbose_fail, ConnData2, prepare_error({error, ED})} 869 end. 870 871handle_syntax_error_callback(ReceiveHandle, ConnData, PrepError, Extra) -> 872 {Code, Reason, Error} = PrepError, 873 ErrorDesc = #'ErrorDescriptor'{errorCode = Code, errorText = Reason}, 874 Version = 875 case Error of 876 {error, {unsupported_version, UV}} -> 877 UV; 878 _ -> 879 ConnData#conn_data.protocol_version 880 end, 881 UserMod = ConnData#conn_data.user_mod, 882 UserArgs = ConnData#conn_data.user_args, 883 ?report_trace(ReceiveHandle, "callback: syntax error", [ErrorDesc, Error]), 884 Args = 885 case Extra of 886 ?default_user_callback_extra -> 887 [ReceiveHandle, Version, ErrorDesc | UserArgs]; 888 _ -> 889 [ReceiveHandle, Version, ErrorDesc, Extra | UserArgs] 890 end, 891 Res = (catch apply(UserMod, handle_syntax_error, Args)), 892 ?report_debug(ReceiveHandle, "return: syntax error", 893 [{return, Res}, ErrorDesc, Error]), 894 case Res of 895 reply -> 896 {verbose_fail, ConnData, PrepError}; 897 {reply,#'ErrorDescriptor'{errorCode = Code1, errorText = Reason1}} -> 898 {verbose_fail, ConnData, {Code1,Reason1,Error}}; 899 no_reply -> 900 {silent_fail, ConnData, PrepError}; 901 {no_reply,#'ErrorDescriptor'{errorCode=Code2,errorText=Reason2}} -> 902 {silent_fail, ConnData, {Code2,Reason2,Error}}; %%% OTP-???? 903 _ -> 904 warning_msg("syntax error callback failed: ~w", [Res]), 905 {verbose_fail, ConnData, PrepError} 906 end. 907 908fake_conn_data(CH) when is_record(CH, megaco_conn_handle) -> 909 case (catch megaco_config:conn_info(CH, receive_handle)) of 910 RH when is_record(RH, megaco_receive_handle) -> 911 RemoteMid = CH#megaco_conn_handle.remote_mid, 912 ConnData = 913 fake_conn_data(RH, RemoteMid, no_send_handle, no_control_pid), 914 ConnData#conn_data{conn_handle = CH}; 915 {'EXIT', _} -> 916 UserMid = CH#megaco_conn_handle.local_mid, 917 case catch megaco_config:user_info(UserMid, receive_handle) of 918 {'EXIT', _} -> % No such user 919 #conn_data{conn_handle = CH, 920 serial = undefined_serial, 921 control_pid = no_control_pid, 922 monitor_ref = undefined_monitor_ref, 923 send_mod = no_send_mod, 924 send_handle = no_send_handle, 925 encoding_mod = no_encoding_mod, 926 encoding_config = no_encoding_config, 927 reply_action = undefined, 928 sent_pending_limit = infinity, 929 recv_pending_limit = infinity}; 930 RH -> 931 ConnData = 932 fake_conn_data(RH, no_send_handle, no_control_pid), 933 ConnData#conn_data{conn_handle = CH} 934 end 935 end. 936 937fake_conn_data(RH, SendHandle, ControlPid) -> 938 fake_conn_data(RH, unknown_remote_mid, SendHandle, ControlPid). 939 940fake_conn_data(RH, RemoteMid, SendHandle, ControlPid) -> 941 case catch megaco_config:init_conn_data(RH, RemoteMid, SendHandle, ControlPid) of 942 {'EXIT', _} -> % No such user 943 fake_user_data(RH, RemoteMid, SendHandle, ControlPid); 944 ConnData -> 945 ConnData 946 end. 947 948fake_user_data(RH, RemoteMid, SendHandle, ControlPid) -> 949 LocalMid = RH#megaco_receive_handle.local_mid, 950 RH2 = RH#megaco_receive_handle{local_mid = default}, 951 case catch megaco_config:init_conn_data(RH2, RemoteMid, SendHandle, ControlPid) of 952 {'EXIT', _} -> % Application stopped? 953 ConnHandle = #megaco_conn_handle{local_mid = LocalMid, 954 remote_mid = RemoteMid}, 955 EncodingMod = RH#megaco_receive_handle.encoding_mod, 956 EncodingConfig = RH#megaco_receive_handle.encoding_config, 957 SendMod = RH#megaco_receive_handle.send_mod, 958 #conn_data{conn_handle = ConnHandle, 959 serial = undefined_serial, 960 control_pid = ControlPid, 961 monitor_ref = undefined_monitor_ref, 962 send_mod = SendMod, 963 send_handle = SendHandle, 964 encoding_mod = EncodingMod, 965 encoding_config = EncodingConfig, 966 reply_action = undefined, 967 sent_pending_limit = infinity, 968 recv_pending_limit = infinity}; 969 ConnData -> 970 ConnData 971 end. 972 973prepare_error(Error) -> 974 case Error of 975 {error, ED} when is_record(ED, 'ErrorDescriptor') -> 976 Code = ED#'ErrorDescriptor'.errorCode, 977 Reason = ED#'ErrorDescriptor'.errorText, 978 {Code, Reason, Error}; 979 {error, [{reason, {bad_token, [BadToken, _Acc]}, Line}]} when is_integer(Line) -> 980 Reason = 981 lists:flatten( 982 io_lib:format("Illegal token (~p) on line ~w", [BadToken, Line])), 983 Code = ?megaco_bad_request, 984 {Code, Reason, Error}; 985 {error, [{reason, {bad_token, _}, Line}]} when is_integer(Line) -> 986 Reason = lists:concat(["Illegal token on line ", Line]), 987 Code = ?megaco_bad_request, 988 {Code, Reason, Error}; 989 {error, [{reason, {Line, _ParserMod, RawReasonString}} | _]} when is_integer(Line) andalso is_list(RawReasonString) -> 990 Reason = 991 case RawReasonString of 992 [[$s, $y, $n, $t, $a, $x | _], TokenString] -> 993 lists:flatten( 994 io_lib:format("Syntax error on line ~w before token ~s", [Line, TokenString])); 995 _ -> 996 lists:flatten(io_lib:format("Syntax error on line ~w", [Line])) 997 end, 998 Code = ?megaco_bad_request, 999 {Code, Reason, Error}; 1000 {error, [{reason, {Line, _, _}} | _]} when is_integer(Line) -> 1001 Reason = lists:concat(["Syntax error on line ", Line]), 1002 Code = ?megaco_bad_request, 1003 {Code, Reason, Error}; 1004 {error, {connection_refused, ED}} when is_record(ED,'ErrorDescriptor') -> 1005 Code = ED#'ErrorDescriptor'.errorCode, 1006 Reason = ED#'ErrorDescriptor'.errorText, 1007 {Code, Reason, Error}; 1008 {error, {connection_refused, _}} -> 1009 Reason = "Connection refused by user", 1010 Code = ?megaco_unauthorized, 1011 {Code, Reason, Error}; 1012 {error, {unsupported_version, V}} -> 1013 Reason = 1014 lists:flatten(io_lib:format("Unsupported version: ~w",[V])), 1015 Code = ?megaco_version_not_supported, 1016 {Code, Reason, Error}; 1017 {error, {not_negotiated_version, NegV, MsgV}} -> 1018 Reason = 1019 lists:flatten( 1020 io_lib:format("Not negotiated version: ~w [negotiated ~w]", 1021 [MsgV, NegV])), 1022 Code = ?megaco_version_not_supported, 1023 {Code, Reason, Error}; 1024 {error, _} -> 1025 Reason = "Syntax error", 1026 Code = ?megaco_bad_request, 1027 {Code, Reason, Error}; 1028 {ok, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') -> 1029 Reason = "MID does not match config", 1030 Code = ?megaco_incorrect_identifier, 1031 {Code, Reason, Error}; 1032 _ -> 1033 Reason = "Fatal syntax error", 1034 Code = ?megaco_internal_gateway_error, 1035 {Code, Reason, Error} 1036 end. 1037 1038prepare_trans(_ConnData, [], AckList, ReqList, _Extra) -> 1039 ?SIM({AckList, ReqList}, prepare_trans_done); 1040 1041prepare_trans(ConnData, Trans, AckList, ReqList, Extra) 1042 when ConnData#conn_data.monitor_ref =:= undefined_auto_monitor_ref -> 1043 1044 ?rt3("prepare_trans - autoconnect"), 1045 1046 %% <BUGBUG> 1047 %% Do we need something here, if we send more then one 1048 %% trans per message? 1049 %% </BUGBUG> 1050 1051 %% May occur if another process has already setup a 1052 %% temporary connection, but the handle_connect callback 1053 %% function has not yet returned before the eager MG 1054 %% re-sends its initial service change message. 1055 1056 prepare_autoconnecting_trans(ConnData, Trans, AckList, ReqList, Extra); 1057 1058prepare_trans(#conn_data{monitor_ref = connected} = ConnData, 1059 Trans, AckList, ReqList, Extra) -> 1060 1061 ?rt3("prepare_trans - connected"), 1062 1063 %% 1064 %% This will happen when the "MGC" user performs a "pre" connect, 1065 %% instead of waiting for the auto-connect (which normally 1066 %% happen when the MGC receives the first message from the 1067 %% MG). 1068 %% 1069 1070 %% 1071 %% The monitor_ref will have this value when the pre-connect 1072 %% is complete, so we finish it here and then continue with the 1073 %% normal transaction prepare. 1074 %% 1075 1076 case finish_connect(ConnData) of 1077 {ok, CD} -> 1078 prepare_normal_trans(CD, Trans, AckList, ReqList, Extra); 1079 {error, Reason} -> 1080 disconnect(ConnData#conn_data.conn_handle, Reason), 1081 {[], []} 1082 end; 1083 1084prepare_trans(#conn_data{monitor_ref = {connecting, _}} = _ConnData, 1085 _Trans, _AckList, _ReqList, _Extra) -> 1086 1087 ?rt3("prepare_trans - connecting"), 1088 1089 %% 1090 %% This will happen when the "MGC" user performs a "pre" connect, 1091 %% instead of waiting for the auto-connect (which normally 1092 %% happen when the MGC receives the first message from the 1093 %% MG). 1094 %% 1095 1096 %% 1097 %% The monitor_ref will have this value when the pre-connect 1098 %% is in progress. We drop (ignore) this message and hope the 1099 %% other side (MG) will resend. 1100 %% 1101 1102 %% prepare_connecting_trans(ConnData, Trans, AckList, ReqList, Extra); 1103 {[], []}; 1104 1105prepare_trans(ConnData, Trans, AckList, ReqList, Extra) -> 1106 1107 ?rt3("prepare_trans - normal"), 1108 1109 %% Handle transaction in the normal case 1110 1111 prepare_normal_trans(ConnData, Trans, AckList, ReqList, Extra). 1112 1113 1114prepare_autoconnecting_trans(_ConnData, [], AckList, ReqList, _Extra) -> 1115 ?SIM({AckList, ReqList}, prepare_autoconnecting_trans_done); 1116 1117prepare_autoconnecting_trans(ConnData, [Trans | Rest], AckList, ReqList, Extra) -> 1118 ?rt1(ConnData, "[autoconnecting] prepare trans", [Trans]), 1119 case Trans of 1120 {transactionRequest, T} when is_record(T, 'TransactionRequest') -> 1121 1122 Serial = T#'TransactionRequest'.transactionId, 1123 ConnData2 = ConnData#conn_data{serial = Serial}, 1124 ?report_trace(ConnData2, "Pending handle_connect", [T]), 1125 1126 %% ------------------------------------------ 1127 %% 1128 %% Check pending limit 1129 %% 1130 %% ------------------------------------------ 1131 1132 Limit = ConnData#conn_data.sent_pending_limit, 1133 TransId = to_remote_trans_id(ConnData2), 1134 case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of 1135 ok -> 1136 send_pending(ConnData2); 1137 error -> 1138 %% Pending limit: 1139 %% In this (granted, highly hypothetical case) 1140 %% we would make the user very confused if we 1141 %% called the abort callback function, since 1142 %% the request callback function has not yet 1143 %% been called. Alas, we skip this call here. 1144 send_pending_limit_error(ConnData); 1145 aborted -> 1146 ignore 1147 end, 1148 prepare_autoconnecting_trans(ConnData2, Rest, AckList, ReqList, 1149 Extra); 1150 _ -> 1151 prepare_autoconnecting_trans(ConnData, Rest, AckList, ReqList, 1152 Extra) 1153 end. 1154 1155 1156%% ================================================================= 1157%% 1158%% Note that the TransactionReply record was changed i v3 (two 1159%% new fields where added), and since we don't know which version, 1160%% we cannot use the record definition of TransactionReply. 1161%% Instead we transform the record into our own internal format 1162%% #megaco_transaction_reply{} 1163%% 1164%% ================================================================= 1165 1166prepare_normal_trans(_ConnData, [], AckList, ReqList, _Extra) -> 1167 ?SIM({AckList, ReqList}, prepare_normal_trans_done); 1168 1169prepare_normal_trans(ConnData, [Trans | Rest], AckList, ReqList, Extra) -> 1170 ?rt1(ConnData, "prepare [normal] trans", [Trans]), 1171 case Trans of 1172 {transactionRequest, #'TransactionRequest'{transactionId = asn1_NOVALUE}} -> 1173 ConnData2 = ConnData#conn_data{serial = 0}, 1174 Code = ?megaco_bad_request, 1175 Reason = "Syntax error in message: transaction id missing", 1176 send_trans_error(ConnData2, Code, Reason), 1177 prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra); 1178 {transactionRequest, T} when is_record(T, 'TransactionRequest') -> 1179 Serial = T#'TransactionRequest'.transactionId, 1180 ConnData2 = ConnData#conn_data{serial = Serial}, 1181 prepare_request(ConnData2, T, Rest, AckList, ReqList, Extra); 1182 {transactionPending, T} when is_record(T, 'TransactionPending') -> 1183 Serial = T#'TransactionPending'.transactionId, 1184 ConnData2 = ConnData#conn_data{serial = Serial}, 1185 handle_pending(ConnData2, T, Extra), 1186 prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra); 1187 {transactionReply, T} when is_tuple(T) andalso 1188 (element(1, T) == 'TransactionReply') -> 1189 T2 = transform_transaction_reply_dec(T), 1190 Serial = T2#megaco_transaction_reply.transactionId, 1191 ConnData2 = ConnData#conn_data{serial = Serial}, 1192 handle_reply(ConnData2, T2, Extra), 1193 prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra); 1194 {transactionResponseAck, List} when is_list(List) -> 1195 prepare_ack(ConnData, List, Rest, AckList, ReqList, Extra); 1196 {segmentReply, SR} when is_record(SR, 'SegmentReply') -> 1197 handle_segment_reply(ConnData, SR, Extra), 1198 prepare_normal_trans(ConnData, Rest, AckList, ReqList, Extra) 1199 1200 end. 1201 1202 1203prepare_request(ConnData, T, Rest, AckList, ReqList, Extra) -> 1204 ?rt2("prepare request", [T]), 1205 ConnHandle = ConnData#conn_data.conn_handle, 1206 LocalMid = ConnHandle#megaco_conn_handle.local_mid, 1207 TransId = to_remote_trans_id(ConnData), 1208 ?rt2("prepare request", [LocalMid, TransId]), 1209 case lookup_reply(ConnData, TransId) of 1210 [] -> 1211 ?rt3("brand new request"), 1212 1213 %% Brand new request 1214 1215 %% Check pending limit: 1216 %% 1217 %% We should actually check the pending limit here 1218 %% but since we have to do it later in the 1219 %% handle_request function (just before we call 1220 %% the handle_trans_request callback function) we 1221 %% can just as well wait (this is after all a very 1222 %% unlikely case: see function prepare_trans when 1223 %% monitor_ref == undefined_auto_monitor_ref). 1224 %% 1225 1226 #conn_data{send_handle = SendHandle, 1227 pending_timer = InitTimer, 1228 protocol_version = Version, 1229 user_mod = UserMod, 1230 user_args = UserArgs} = ConnData, 1231 {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), 1232 M = ?MODULE, 1233 F = pending_timeout, 1234 A = [ConnHandle, TransId, CurrTimer], 1235 PendingRef = megaco_monitor:apply_after(M, F, A, WaitFor), 1236 Rep = #reply{send_handle = SendHandle, 1237 trans_id = TransId, 1238 local_mid = LocalMid, 1239 pending_timer_ref = PendingRef, 1240 handler = self(), 1241 version = Version, 1242 user_mod = UserMod, 1243 user_args = UserArgs}, 1244 case megaco_monitor:insert_reply_new(Rep) of 1245 true -> 1246 prepare_normal_trans(ConnData, Rest, AckList, 1247 [{ConnData, TransId, T} | ReqList], 1248 Extra); 1249 false -> 1250 %% Oups - someone got there before we did... 1251 ?report_debug(ConnData, 1252 "prepare request: conflicting requests", 1253 [TransId]), 1254 send_pending(ConnData), 1255 megaco_monitor:cancel_apply_after(PendingRef), 1256 prepare_normal_trans(ConnData, Rest, AckList, ReqList, 1257 Extra) 1258 end; 1259 1260 %% We can ignore the Converted value here as we *know* 1261 %% conn-data to be correct (not faked), so even if 1262 %% the record was converted, it will now have correct 1263 %% values for user_mod and user_args. 1264 {_Converted, 1265 #reply{state = State, 1266 handler = Pid, 1267 pending_timer_ref = Ref} = Rep} 1268 when (State =:= prepare) orelse (State =:= eval_request) -> 1269 1270 ?rt2("request resend", [State, Pid, Ref]), 1271 1272 %% Pending limit: 1273 %% We are still preparing/evaluating the request 1274 %% Check if the pending limit has been exceeded... 1275 %% If the pending limit is _not_ exceeded then 1276 %% we shall send a pending (and actually restart 1277 %% the pending timer, but that we cannot do). 1278 %% Don't care about Msg and Rep version diff 1279 1280 #conn_data{sent_pending_limit = Limit} = ConnData, 1281 1282 case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of 1283 ok -> 1284 1285 %% ------------------------------------------ 1286 %% 1287 %% Pending limit not exceeded 1288 %% 1289 %% 1) Increment number of pendings sent 1290 %% (done in the check function above) 1291 %% 2) Send pending message 1292 %% (We should really restart the pending 1293 %% timer, but we have no way of doing that). 1294 %% 1295 %% ------------------------------------------ 1296 1297 send_pending(ConnData), 1298 prepare_normal_trans(ConnData, Rest, AckList, ReqList, 1299 Extra); 1300 1301 1302 error -> 1303 1304 %% ------------------------------------------- 1305 %% 1306 %% Pending limit exceeded 1307 %% 1308 %% 1) Cancel pending timer 1309 %% 2) Send 506 error message to other side 1310 %% 3) Inform user (depends on state) 1311 %% 4) Set reply in aborted state 1312 %% 1313 %% ------------------------------------------- 1314 1315 %% 1316 %% State == eval_request: 1317 %% This means that the request is currently beeing 1318 %% evaluated by the user, and the reply timer has 1319 %% not yet been started. 1320 %% Either: 1321 %% a) The "other side" will resend (which will 1322 %% trigger a pending message send) until we pass the 1323 %% pending limit 1324 %% b) We will send pending messages (when the pending 1325 %% timer expire) until we pass the pending limit. 1326 %% In any event, we cannot delete the reply record 1327 %% or the pending counter in this case. Is there 1328 %% a risk we accumulate aborted reply records? 1329 %% 1330 %% State == prepare: 1331 %% The user does not know about this request 1332 %% so we can safely perform cleanup. 1333 %% 1334 megaco_monitor:cancel_apply_after(Ref), 1335 send_pending_limit_error(ConnData), 1336 if 1337 State == eval_request -> 1338 %% 1339 %% What if the user never replies? 1340 %% In that case we will have a record 1341 %% (and counters) that is never cleaned up... 1342 NewFields = 1343 [{#reply.state, aborted}, 1344 {#reply.pending_timer_ref, undefined}], 1345 megaco_monitor:update_reply_fields(TransId, 1346 NewFields), 1347 handle_request_abort_callback(ConnData, 1348 TransId, Pid, Extra); 1349 true -> 1350 %% Since the user does not know about 1351 %% this call yet, it is safe to cleanup. 1352 %% Should we inform? 1353 Rep2 = Rep#reply{state = aborted}, 1354 cancel_reply(ConnData, Rep2, aborted), 1355 ok 1356 end, 1357 prepare_normal_trans(ConnData, Rest, AckList, ReqList, 1358 Extra); 1359 1360 1361 aborted -> 1362 1363 %% ------------------------------------------- 1364 %% 1365 %% Pending limit already exceeded 1366 %% 1367 %% Cleanup, just to make sure: 1368 %% reply record & pending counter 1369 %% 1370 %% ------------------------------------------- 1371 1372 Rep2 = Rep#reply{state = aborted}, 1373 cancel_reply(ConnData, Rep2, aborted), 1374 prepare_normal_trans(ConnData, Rest, AckList, ReqList, 1375 Extra) 1376 1377 end; 1378 1379 %% We can ignore the Converted value here as we *know* 1380 %% conn-data to be correct (not faked), so even if 1381 %% the record was converted, it will now have correct 1382 %% values for user_mod and user_args. 1383 {_Converted, 1384 #reply{state = waiting_for_ack, 1385 bytes = Bin, 1386 version = Version} = Rep} -> 1387 ?rt3("request resend when waiting for ack"), 1388 1389 %% We have already sent a reply, but the receiver 1390 %% has obviously not got it. Resend the reply but 1391 %% don't restart the reply_timer. 1392 ConnData2 = ConnData#conn_data{protocol_version = Version}, 1393 ?report_trace(ConnData2, 1394 "re-send trans reply", [T, {bytes, Bin}]), 1395 case megaco_messenger_misc:send_message(ConnData2, true, Bin) of 1396 {ok, _} -> 1397 ok; 1398 {error, Reason} -> 1399 %% Pass it on to the user (via handle_ack) 1400 cancel_reply(ConnData2, Rep, Reason) 1401 end, 1402 prepare_normal_trans(ConnData2, Rest, AckList, ReqList, Extra); 1403 1404 1405 %% We can ignore the Converted value here as we *know* 1406 %% conn-data to be correct (not faked), so even if 1407 %% the record was converted, it will now have correct 1408 %% values for user_mod and user_args. 1409 {_Converted, 1410 #reply{state = aborted} = Rep} -> 1411 ?rt3("request resend when already in aborted state"), 1412 1413 %% OTP-4956: 1414 %% Already aborted so ignore. 1415 %% This furthermore means that the abnoxious user at the 1416 %% other end has already been informed (pending-limit 1417 %% passed => error descriptor sent), but keeps sending... 1418 %% 1419 %% Shall we perform a cleanup? 1420 cancel_reply(ConnData, Rep, aborted), 1421 prepare_normal_trans(ConnData, Rest, AckList, ReqList, Extra) 1422 end. 1423 1424prepare_ack(ConnData, [TA | T], Rest, AckList, ReqList, Extra) 1425 when is_record(TA, 'TransactionAck') -> 1426 First = TA#'TransactionAck'.firstAck, 1427 Last = TA#'TransactionAck'.lastAck, 1428 TA2 = TA#'TransactionAck'{lastAck = asn1_NOVALUE}, 1429 ConnData2 = ConnData#conn_data{serial = First}, 1430 AckList2 = do_prepare_ack(ConnData2, TA2, AckList), 1431 if 1432 Last =:= asn1_NOVALUE -> 1433 prepare_ack(ConnData, T, Rest, AckList2, ReqList, Extra); 1434 First < Last -> 1435 TA3 = TA#'TransactionAck'{firstAck = First + 1}, 1436 prepare_ack(ConnData, [TA3 | T], Rest, AckList2, ReqList, Extra); 1437 First =:= Last -> 1438 prepare_ack(ConnData, T, Rest, AckList2, ReqList, Extra); 1439 First > Last -> 1440 %% Protocol violation from the sender of this ack 1441 ?report_important(ConnData, "<ERROR> discard trans", 1442 [TA, {error, "firstAck > lastAck"}]), 1443 prepare_ack(ConnData, T, Rest, AckList2, ReqList, Extra) 1444 end; 1445prepare_ack(ConnData, [], Rest, AckList, ReqList, Extra) -> 1446 prepare_normal_trans(ConnData, Rest, AckList, ReqList, Extra). 1447 1448do_prepare_ack(ConnData, T, AckList) -> 1449 TransId = to_remote_trans_id(ConnData), 1450 case lookup_reply(ConnData, TransId) of 1451 [] -> 1452 %% The reply has already been garbage collected. Ignore. 1453 ?report_trace(ConnData, "discard ack (no receiver)", [T]), 1454 AckList; 1455 {_Converted, Rep} when Rep#reply.state =:= waiting_for_ack -> 1456 %% Don't care about Msg and Rep version diff 1457 [{ConnData, Rep, T} | AckList]; 1458 {_Converted, _Rep} -> 1459 %% Protocol violation from the sender of this ack 1460 ?report_important(ConnData, "<ERROR> discard trans", 1461 [T, {error, "got ack before reply was sent"}]), 1462 AckList 1463 end. 1464 1465 1466increment_request_keep_alive_counter(#conn_data{conn_handle = CH}, TransId) -> 1467 ?rt1(CH, "increment request keep alive counter", [TransId]), 1468 megaco_config:incr_reply_counter(CH, TransId). 1469 1470create_or_maybe_increment_request_keep_alive_counter( 1471 #conn_data{conn_handle = CH}, TransId) -> 1472 ?rt1(CH, "create or maybe increment request keep alive counter", 1473 [TransId]), 1474 try 1475 begin 1476 megaco_config:cre_reply_counter(CH, TransId) 1477 end 1478 catch 1479 _:_ -> 1480 megaco_config:incr_reply_counter(CH, TransId) 1481 end. 1482 1483 1484check_and_maybe_create_pending_limit(infinity, _, _) -> 1485 ok; 1486check_and_maybe_create_pending_limit(Limit, Direction, TransId) -> 1487 ?rt2("check and maybe create pending limit counter", 1488 [Limit, Direction, TransId]), 1489 try megaco_config:get_pending_counter(Direction, TransId) of 1490 Val when Val =< Limit -> 1491 %% Since we have no intention to increment here, it 1492 %% is ok to be _at_ the limit 1493 ok; 1494 _ -> 1495 aborted 1496 catch 1497 _:_ -> 1498 %% Has not been created yet (connect). 1499 megaco_config:cre_pending_counter(Direction, TransId, 0), 1500 ok 1501 end. 1502 1503%% check_and_maybe_create_pending_limit(infinity, _, _) -> 1504%% ok; 1505%% check_and_maybe_create_pending_limit(Limit, Direction, TransId) -> 1506%% case (catch megaco_config:get_pending_counter(Direction, TransId)) of 1507%% {'EXIT', _} -> 1508%% %% Has not been created yet (connect). 1509%% megaco_config:cre_pending_counter(Direction, TransId, 0), 1510%% ok; 1511%% Val when Val =< Limit -> 1512%% %% Since we have no intention to increment here, it 1513%% %% is ok to be _at_ the limit 1514%% ok; 1515%% _ -> 1516%% aborted 1517%% end. 1518 1519 1520check_pending_limit(infinity, _, _) -> 1521 {ok, 0}; 1522check_pending_limit(Limit, Direction, TransId) -> 1523 ?rt2("check pending limit", [Direction, Limit, TransId]), 1524 try megaco_config:get_pending_counter(Direction, TransId) of 1525 Val when Val =< Limit -> 1526 %% Since we have no intention to increment here, it 1527 %% is ok to be _at_ the limit 1528 ?rt2("check pending limit - ok", [Val]), 1529 {ok, Val}; 1530 _Val -> 1531 ?rt2("check pending limit - aborted", [_Val]), 1532 aborted 1533 catch 1534 _:_ -> 1535 %% This function is only called when we "know" the 1536 %% counter to exist. So, the only reason that this 1537 %% would happen is of the counter has been removed. 1538 %% This only happen if the pending limit has been 1539 %% reached. In any case, this is basically the same 1540 %% as aborted! 1541 ?rt2("check pending limit - exit", []), 1542 aborted 1543 end. 1544 1545 1546check_and_maybe_incr_pending_limit(infinity, _, _) -> 1547 ok; 1548check_and_maybe_incr_pending_limit(Limit, Direction, TransId) -> 1549 %% 1550 %% We need this kind of test to detect when we _pass_ the limit 1551 %% 1552 ?rt2("check and maybe incr pending limit", [{direction, Direction}, 1553 {transaction_id, TransId}, 1554 {counter_limit, Limit}]), 1555 try megaco_config:get_pending_counter(Direction, TransId) of 1556 Val when Val > Limit -> 1557 ?rt2("check and maybe incr - aborted", [{counter_value, Val}]), 1558 aborted; % Already passed the limit 1559 Val -> 1560 ?rt2("check and maybe incr - incr", [{counter_value, Val}]), 1561 megaco_config:incr_pending_counter(Direction, TransId), 1562 if 1563 Val < Limit -> 1564 ok; % Still within the limit 1565 true -> 1566 ?rt2("check and maybe incr - error", 1567 [{counter_value, Val}]), 1568 error % Passed the limit 1569 end 1570 catch 1571 _:_ -> 1572 %% Has not been created yet (connect). 1573 %% Try create it, but bevare of possible raise condition 1574 try 1575 begin 1576 megaco_config:cre_pending_counter(Direction, TransId, 1), 1577 ok 1578 end 1579 catch 1580 _:_ -> 1581 %% Ouch, raise condition, increment instead... 1582 megaco_config:incr_pending_counter(Direction, TransId), 1583 ok 1584 end 1585 end. 1586 1587 1588%% BUGBUG BUGBUG BUGBUG 1589%% 1590%% Do we know that the Rep is still valid? A previous transaction 1591%% could have taken a lot of time. 1592%% 1593handle_request({ConnData, TransId, T}, Extra) -> 1594 case handle_request(ConnData, TransId, T, Extra) of 1595 {pending, _RequestData} -> 1596 handle_long_request(ConnData, TransId, T, Extra); 1597 Else -> 1598 Else 1599 end. 1600 1601handle_request(ConnData, TransId, T, Extra) -> 1602 ?report_trace(ConnData, "handle request", [TransId, T]), 1603 1604 %% Pending limit: 1605 %% Ok, before we begin, lets check that this request 1606 %% has not been aborted. I.e. exceeded the pending 1607 %% limit, so go check it... 1608 1609 #conn_data{sent_pending_limit = Limit} = ConnData, 1610 1611 case check_and_maybe_create_pending_limit(Limit, sent, TransId) of 1612 ok -> 1613 %% Ok so far, now update state 1614 case megaco_monitor:update_reply_field(TransId, 1615 #reply.state, 1616 eval_request) of 1617 true -> 1618 Actions = T#'TransactionRequest'.actions, 1619 {AckAction, SendReply} = 1620 handle_request_callback(ConnData, TransId, Actions, 1621 T, Extra), 1622 1623 %% Next step, while we where in the callback function, 1624 %% the pending limit could have been exceeded, so check 1625 %% it again... 1626 do_handle_request(AckAction, SendReply, 1627 ConnData, TransId); 1628 1629 false -> 1630 %% Ugh? 1631 ignore 1632 end; 1633 1634 aborted -> 1635 %% Pending limit 1636 %% Already exceeded the limit 1637 %% The user does not yet know about this request, so 1638 %% don't bother telling that it has been aborted... 1639 %% Furthermore, the reply timer has not been started, 1640 %% so do the cleanup now 1641 ?rt1(ConnData, "pending limit already passed", [TransId]), 1642 case lookup_reply(ConnData, TransId) of 1643 {_Converted, Rep} -> 1644 cancel_reply(ConnData, Rep, aborted); 1645 _ -> 1646 ok 1647 end, 1648 ignore 1649 end. 1650 1651do_handle_request(_, ignore, _ConnData, _TransId) -> 1652 ?rt1(_ConnData, "ignore: don't reply", [_TransId]), 1653 ignore; 1654do_handle_request(_, ignore_trans_request, ConnData, TransId) -> 1655 ?rt1(ConnData, "ignore trans request: don't reply", [TransId]), 1656 case lookup_reply(ConnData, TransId) of 1657 {_Converted, #reply{} = Rep} -> 1658 cancel_reply(ConnData, Rep, ignore); 1659 _ -> 1660 ignore 1661 end; 1662do_handle_request({pending, _RequestData}, {aborted, ignore}, _, _) -> 1663 ?rt2("handle request: pending - aborted - ignore => don't reply", []), 1664 ignore; 1665do_handle_request({pending, _RequestData}, {aborted, _SendReply}, _, _) -> 1666 ?rt2("handle request: pending - aborted => don't reply", []), 1667 ignore; 1668do_handle_request({pending, RequestData}, _SendReply, _ConnData, _) -> 1669 ?rt2("handle request: pending", [RequestData]), 1670 {pending, RequestData}; 1671do_handle_request(AckAction, {ok, Bin}, ConnData, TransId) 1672 when is_binary(Bin) -> 1673 ?rt1(ConnData, "handle request - ok", [AckAction, TransId]), 1674 case lookup_reply(ConnData, TransId) of 1675 {_Converted, #reply{pending_timer_ref = PendingRef} = Rep} -> 1676 1677 #conn_data{reply_timer = InitTimer, 1678 conn_handle = ConnHandle} = ConnData, 1679 1680 %% Pending limit update: 1681 %% - Cancel the pending timer, if running 1682 %% - Delete the pending counter 1683 %% 1684 1685 megaco_monitor:cancel_apply_after(PendingRef), 1686 megaco_config:del_pending_counter(sent, TransId), 1687 1688 Method = timer_method(AckAction), 1689 {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), 1690 OptBin = opt_garb_binary(CurrTimer, Bin), 1691 M = ?MODULE, 1692 F = reply_timeout, 1693 A = [ConnHandle, TransId, CurrTimer], 1694 Ref = megaco_monitor:apply_after(Method, M, F, A, 1695 WaitFor), 1696 Rep2 = Rep#reply{pending_timer_ref = undefined, 1697 handler = undefined, 1698 bytes = OptBin, 1699 state = waiting_for_ack, 1700 timer_ref = Ref, 1701 ack_action = AckAction}, 1702 megaco_monitor:insert_reply(Rep2), % Timing problem? 1703 ignore; 1704 1705 _ -> 1706 %% Been removed already? 1707 ignore 1708 end; 1709do_handle_request(AckAction, {ok, {Sent, NotSent}}, ConnData, TransId) 1710 when is_list(Sent) andalso is_list(NotSent) -> 1711 ?rt1(ConnData, "handle request - ok [segmented reply]", 1712 [AckAction, TransId]), 1713 1714 case lookup_reply(ConnData, TransId) of 1715 {_Converted, #reply{pending_timer_ref = PendingRef} = Rep} -> 1716 1717 %% d("do_handle_request -> found reply record:" 1718 %% "~n Rep: ~p", [Rep]), 1719 1720 #conn_data{reply_timer = InitTimer, 1721 conn_handle = ConnHandle} = ConnData, 1722 1723 %% Pending limit update: 1724 %% - Cancel the pending timer, if running 1725 %% - Delete the pending counter 1726 %% 1727 1728 megaco_monitor:cancel_apply_after(PendingRef), 1729 megaco_config:del_pending_counter(sent, TransId), 1730 1731 Method = timer_method(AckAction), 1732 {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), 1733 Garb = fun(Bin) -> opt_garb_binary(CurrTimer, Bin) end, 1734 OptBins = [{SN, Garb(Bin), undefined} || {SN, Bin} <- Sent], 1735 1736 M = ?MODULE, 1737 F = reply_timeout, 1738 A = [ConnHandle, TransId, CurrTimer], 1739 Ref = megaco_monitor:apply_after(Method, M, F, A, WaitFor), 1740 1741 Rep2 = Rep#reply{pending_timer_ref = undefined, 1742 handler = undefined, 1743 bytes = OptBins, 1744 state = waiting_for_ack, 1745 timer_ref = Ref, 1746 ack_action = AckAction, 1747 segments = NotSent}, 1748 megaco_monitor:insert_reply(Rep2), % Timing problem? 1749 1750 ignore; 1751 _ -> 1752 %% Been removed already? 1753 ignore 1754 end; 1755do_handle_request(_, {error, aborted}, ConnData, TransId) -> 1756 ?report_trace(ConnData, "aborted during our absence", [TransId]), 1757 case lookup_reply(ConnData, TransId) of 1758 {_Converted, Rep} -> 1759 cancel_reply(ConnData, Rep, aborted); 1760 _ -> 1761 ok 1762 end, 1763 ignore; 1764do_handle_request(AckAction, {error, Reason}, ConnData, TransId) -> 1765 ?report_trace(ConnData, "error", [TransId, Reason]), 1766 case lookup_reply(ConnData, TransId) of 1767 {_Converted, Rep} -> 1768 Rep2 = Rep#reply{state = waiting_for_ack, 1769 ack_action = AckAction}, 1770 cancel_reply(ConnData, Rep2, Reason); 1771 _ -> 1772 ok 1773 end, 1774 ignore; 1775do_handle_request(AckAction, SendReply, ConnData, TransId) -> 1776 ?report_trace(ConnData, "unknown send trans reply result", 1777 [TransId, AckAction, SendReply]), 1778 ignore. 1779 1780 1781handle_requests([{ConnData, TransId, T} | Rest], Pending, Extra) -> 1782 ?rt2("handle requests", [TransId]), 1783 case handle_request(ConnData, TransId, T, Extra) of 1784 {pending, RequestData} -> 1785 handle_requests(Rest, [{ConnData,TransId,RequestData} | Pending], Extra); 1786 _ -> 1787 handle_requests(Rest, Pending, Extra) 1788 end; 1789handle_requests([], Pending, _Extra) -> 1790 ?rt2("handle requests - done", [Pending]), 1791 Pending. 1792 1793%% opt_garb_binary(timeout, _Bin) -> garb_binary; % Need msg at restart of timer 1794opt_garb_binary(_Timer, Bin) -> Bin. 1795 1796timer_method(discard_ack) -> 1797 apply_method; 1798timer_method(_) -> 1799 spawn_method. 1800 1801 1802handle_long_request({ConnData, TransId, RequestData}, Extra) -> 1803 1804 ?rt2("handle long request", [TransId, RequestData]), 1805 1806 %% Pending limit: 1807 %% We need to check the pending limit, in case it was 1808 %% exceeded before we got this far... 1809 %% We dont need to be able to create the counter here, 1810 %% since that was done in the handle_request function. 1811 1812 #conn_data{sent_pending_limit = Limit} = ConnData, 1813 1814 case check_pending_limit(Limit, sent, TransId) of 1815 {ok, _} -> 1816 handle_long_request(ConnData, TransId, RequestData, Extra); 1817 _ -> 1818 %% Already exceeded the limit 1819 ignore 1820 end. 1821 1822handle_long_request(ConnData, TransId, RequestData, Extra) -> 1823 ?report_trace(ConnData, "callback: trans long request", 1824 [TransId, {request_data, RequestData}]), 1825 1826 %% Attempt to update the handler field for this reply record 1827 %% (if there is one). 1828 case megaco_monitor:update_reply_field(TransId, #reply.handler, self()) of 1829 true -> 1830 {AckAction, Res} = 1831 handle_long_request_callback(ConnData, TransId, 1832 RequestData, Extra), 1833 do_handle_long_request(AckAction, Res, ConnData, TransId); 1834 false -> 1835 %% Been removed already? 1836 ignore 1837 end. 1838 1839 1840do_handle_long_request(AckAction, {ok, Bin}, ConnData, TransId) -> 1841 case megaco_monitor:lookup_reply_field(TransId, #reply.trans_id) of 1842 {ok, _} -> 1843 Method = timer_method(AckAction), 1844 InitTimer = ConnData#conn_data.reply_timer, 1845 {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), 1846 OptBin = opt_garb_binary(CurrTimer, Bin), 1847 ConnHandle = ConnData#conn_data.conn_handle, 1848 M = ?MODULE, 1849 F = reply_timeout, 1850 A = [ConnHandle, TransId, CurrTimer], 1851 Ref = megaco_monitor:apply_after(Method, M, F, A, WaitFor), 1852 NewFields = 1853 [{#reply.bytes, OptBin}, 1854 {#reply.state, waiting_for_ack}, 1855 {#reply.timer_ref, Ref}, 1856 {#reply.ack_action, AckAction}], 1857 megaco_monitor:update_reply_fields(TransId, NewFields); % Timing problem? 1858 _ -> 1859 %% Been removed already? 1860 ignore 1861 end; 1862do_handle_long_request(_, {error, Reason}, ConnData, TransId) -> 1863 ?report_trace(ConnData, "send trans reply", [TransId, {error, Reason}]), 1864 ignore. 1865 1866handle_request_abort_callback(ConnData, TransId, Pid) -> 1867 Extra = ?default_user_callback_extra, 1868 handle_request_abort_callback(ConnData, TransId, Pid, Extra). 1869 1870handle_request_abort_callback(ConnData, TransId, Pid, Extra) -> 1871 ?report_trace(ConnData, "callback: trans request aborted", [TransId, Pid]), 1872 ConnHandle = ConnData#conn_data.conn_handle, 1873 Version = ConnData#conn_data.protocol_version, 1874 UserMod = ConnData#conn_data.user_mod, 1875 UserArgs = ConnData#conn_data.user_args, 1876 Serial = TransId#trans_id.serial, 1877 Args = 1878 case Extra of 1879 ?default_user_callback_extra -> 1880 [ConnHandle, Version, Serial, Pid | UserArgs]; 1881 _ -> 1882 [ConnHandle, Version, Serial, Pid, Extra | UserArgs] 1883 end, 1884 Res = (catch apply(UserMod, handle_trans_request_abort, Args)), 1885 ?report_debug(ConnData, "return: trans request aborted", 1886 [TransId, {return, Res}]), 1887 case Res of 1888 ok -> 1889 ok; 1890 _ -> 1891 warning_msg("transaction request abort callback failed: ~w", 1892 [Res]), 1893 ok 1894 end. 1895 1896handle_request_callback(ConnData, TransId, Actions, T, Extra) -> 1897 ?report_trace(ConnData, "callback: trans request", [T]), 1898 ConnHandle = ConnData#conn_data.conn_handle, 1899 Version = ConnData#conn_data.protocol_version, 1900 UserMod = ConnData#conn_data.user_mod, 1901 UserArgs = ConnData#conn_data.user_args, 1902 Args = 1903 case Extra of 1904 ?default_user_callback_extra -> 1905 [ConnHandle, Version, Actions | UserArgs]; 1906 _ -> 1907 [ConnHandle, Version, Actions, Extra | UserArgs] 1908 end, 1909 Res = (catch apply(UserMod, handle_trans_request, Args)), 1910 ?report_debug(ConnData, "return: trans request", [T, {return, Res}]), 1911 case Res of 1912 ignore -> %% NOTE: Only used for testing!! 1913 {discard_ack, ignore}; 1914 1915 ignore_trans_request -> 1916 {discard_ack, ignore_trans_request}; 1917 1918 {discard_ack, Replies} when is_list(Replies) -> 1919 Reply = {actionReplies, Replies}, 1920 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1921 [], asn1_NOVALUE), 1922 {discard_ack, SendReply}; 1923 {discard_ack, Error} when is_record(Error, 'ErrorDescriptor') -> 1924 Reply = {transactionError, Error}, 1925 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1926 [], asn1_NOVALUE), 1927 {discard_ack, SendReply}; 1928 {discard_ack, Replies, SendOpts} when is_list(Replies) andalso 1929 is_list(SendOpts) -> 1930 Reply = {actionReplies, Replies}, 1931 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1932 SendOpts, asn1_NOVALUE), 1933 {discard_ack, SendReply}; 1934 {discard_ack, Error, SendOpts} 1935 when is_record(Error, 'ErrorDescriptor') andalso 1936 is_list(SendOpts) -> 1937 Reply = {transactionError, Error}, 1938 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1939 SendOpts, asn1_NOVALUE), 1940 {discard_ack, SendReply}; 1941 1942 {{handle_pending_ack, AckData}, Replies} when is_list(Replies) -> 1943 Reply = {actionReplies, Replies}, 1944 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1945 [], when_pending_sent), 1946 {{handle_ack, AckData}, SendReply}; 1947 {{handle_pending_ack, AckData}, Error} 1948 when is_record(Error, 'ErrorDescriptor') -> 1949 Reply = {transactionError, Error}, 1950 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1951 [], when_pending_sent), 1952 {{handle_ack, AckData}, SendReply}; 1953 {{handle_pending_ack, AckData}, Replies, SendOpts} 1954 when is_list(Replies) andalso 1955 is_list(SendOpts) -> 1956 Reply = {actionReplies, Replies}, 1957 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1958 SendOpts, when_pending_sent), 1959 {{handle_ack, AckData}, SendReply}; 1960 {{handle_pending_ack, AckData}, Error, SendOpts} 1961 when is_record(Error, 'ErrorDescriptor') andalso 1962 is_list(SendOpts) -> 1963 Reply = {transactionError, Error}, 1964 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1965 SendOpts, when_pending_sent), 1966 {{handle_ack, AckData}, SendReply}; 1967 1968 {{handle_ack, AckData}, Replies} when is_list(Replies) -> 1969 Reply = {actionReplies, Replies}, 1970 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1971 [], 'NULL'), 1972 {{handle_ack, AckData}, SendReply}; 1973 {{handle_ack, AckData}, Error} 1974 when is_record(Error, 'ErrorDescriptor') -> 1975 Reply = {transactionError, Error}, 1976 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1977 [], 'NULL'), 1978 {{handle_ack, AckData}, SendReply}; 1979 {{handle_ack, AckData}, Replies, SendOpts} 1980 when is_list(Replies) andalso 1981 is_list(SendOpts) -> 1982 Reply = {actionReplies, Replies}, 1983 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1984 SendOpts, 'NULL'), 1985 {{handle_ack, AckData}, SendReply}; 1986 {{handle_ack, AckData}, Error, SendOpts} 1987 when is_record(Error, 'ErrorDescriptor') andalso 1988 is_list(SendOpts) -> 1989 Reply = {transactionError, Error}, 1990 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1991 SendOpts, 'NULL'), 1992 {{handle_ack, AckData}, SendReply}; 1993 1994 {{handle_sloppy_ack, AckData}, Replies} when is_list(Replies) -> 1995 Reply = {actionReplies, Replies}, 1996 SendReply = maybe_send_reply(ConnData, TransId, Reply, 1997 [], asn1_NOVALUE), 1998 {{handle_ack, AckData}, SendReply}; 1999 {{handle_sloppy_ack, AckData}, Error} 2000 when is_record(Error, 'ErrorDescriptor') -> 2001 Reply = {transactionError, Error}, 2002 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2003 [], asn1_NOVALUE), 2004 {{handle_ack, AckData}, SendReply}; 2005 {{handle_sloppy_ack, AckData}, Replies, SendOpts} 2006 when is_list(Replies) andalso 2007 is_list(SendOpts) -> 2008 Reply = {actionReplies, Replies}, 2009 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2010 SendOpts, asn1_NOVALUE), 2011 {{handle_ack, AckData}, SendReply}; 2012 {{handle_sloppy_ack, AckData}, Error, SendOpts} 2013 when is_record(Error, 'ErrorDescriptor') andalso 2014 is_list(SendOpts) -> 2015 Reply = {transactionError, Error}, 2016 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2017 SendOpts, asn1_NOVALUE), 2018 {{handle_ack, AckData}, SendReply}; 2019 2020 {pending, RequestData} -> 2021 %% The user thinks that this request will take 2022 %% quite a while to evaluate. Maybe respond with 2023 %% a pending trans (depends on the pending limit) 2024 SendReply = maybe_send_pending(ConnData, TransId), 2025 {{pending, RequestData}, SendReply}; 2026 2027 Error -> 2028 ErrorText = atom_to_list(UserMod), 2029 ED = #'ErrorDescriptor'{ 2030 errorCode = ?megaco_internal_gateway_error, 2031 errorText = ErrorText}, 2032 ?report_important(ConnData, 2033 "callback: <ERROR> trans request", 2034 [ED, {error, Error}]), 2035 error_msg("transaction request callback failed: ~w", [Error]), 2036 Reply = {transactionError, ED}, 2037 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2038 [], asn1_NOVALUE), 2039 {discard_ack, SendReply} 2040 end. 2041 2042handle_long_request_callback(ConnData, TransId, RequestData, Extra) -> 2043 ?report_trace(ConnData, "callback: trans long request", [RequestData]), 2044 ConnHandle = ConnData#conn_data.conn_handle, 2045 Version = ConnData#conn_data.protocol_version, 2046 UserMod = ConnData#conn_data.user_mod, 2047 UserArgs = ConnData#conn_data.user_args, 2048 Args = 2049 case Extra of 2050 ?default_user_callback_extra -> 2051 [ConnHandle, Version, RequestData | UserArgs]; 2052 _ -> 2053 [ConnHandle, Version, RequestData, Extra | UserArgs] 2054 end, 2055 Res = (catch apply(UserMod, handle_trans_long_request, Args)), 2056 ?report_debug(ConnData, "return: trans long request", 2057 [{request_data, RequestData}, {return, Res}]), 2058 case Res of 2059 ignore -> 2060 {discard_ack, ignore}; 2061 2062 {discard_ack, Replies} when is_list(Replies) -> 2063 Reply = {actionReplies, Replies}, 2064 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2065 [], asn1_NOVALUE), 2066 {discard_ack, SendReply}; 2067 {discard_ack, Replies, SendOpts} when is_list(Replies) andalso 2068 is_list(SendOpts) -> 2069 Reply = {actionReplies, Replies}, 2070 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2071 SendOpts, asn1_NOVALUE), 2072 {discard_ack, SendReply}; 2073 2074 {{handle_ack, AckData}, Replies} when is_list(Replies) -> 2075 Reply = {actionReplies, Replies}, 2076 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2077 [], 'NULL'), 2078 {{handle_ack, AckData}, SendReply}; 2079 {{handle_ack, AckData}, Replies, SendOpts} when is_list(Replies) 2080 andalso 2081 is_list(SendOpts) -> 2082 Reply = {actionReplies, Replies}, 2083 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2084 SendOpts, 'NULL'), 2085 {{handle_ack, AckData}, SendReply}; 2086 2087 Error -> 2088 ErrorText = atom_to_list(UserMod), 2089 ED = #'ErrorDescriptor'{errorCode = ?megaco_internal_gateway_error, 2090 errorText = ErrorText}, 2091 ?report_important(ConnData, "callback: <ERROR> trans long request", 2092 [ED, {error, Error}]), 2093 error_msg("long transaction request callback failed: ~w", [Error]), 2094 Reply = {transactionError, ED}, 2095 SendReply = maybe_send_reply(ConnData, TransId, Reply, 2096 [], asn1_NOVALUE), 2097 {discard_ack, SendReply} 2098 end. 2099 2100handle_pending(ConnData, T, Extra) -> 2101 TransId = to_local_trans_id(ConnData), 2102 ?rt2("handle pending", [T, TransId]), 2103 case megaco_monitor:lookup_request(TransId) of 2104 [Req] -> 2105 2106 %% ------------------------------------------ 2107 %% 2108 %% Check received pending limit 2109 %% 2110 %% ------------------------------------------ 2111 2112 Limit = ConnData#conn_data.recv_pending_limit, 2113 case check_and_maybe_incr_pending_limit(Limit, 2114 recv, TransId) of 2115 2116 ok -> 2117 %% ---------------------------------------------------- 2118 %% 2119 %% Received pending limit not exceeded 2120 %% 2121 %% ---------------------------------------------------- 2122 2123 handle_recv_pending(ConnData, TransId, Req, T); 2124 2125 error -> 2126 %% ---------------------------------------------------- 2127 %% 2128 %% Received pending limit exceeded 2129 %% 2130 %% Time to give up on this transaction 2131 %% 1) Delete request record 2132 %% 2) Cancel timers 2133 %% 3) Delete the (receive) pending counter 2134 %% 4) Inform the user (handle_trans_reply) 2135 %% 2136 %% ---------------------------------------------------- 2137 2138 handle_recv_pending_error(ConnData, TransId, Req, T, Extra); 2139 2140 2141 aborted -> 2142 %% ---------------------------------------------------- 2143 %% 2144 %% Received pending limit already exceeded 2145 %% 2146 %% BMK BMK BMK -- can this really happen? 2147 %% 2148 %% The user has already been notified about this 2149 %% (see error above) 2150 %% 2151 %% ---------------------------------------------------- 2152 2153 ok 2154 2155 end; 2156 2157 [] -> 2158 ?report_trace(ConnData, "remote pending (no receiver)", [T]), 2159 return_unexpected_trans(ConnData, T, Extra) 2160 end. 2161 2162handle_recv_pending(#conn_data{long_request_resend = LRR, 2163 conn_handle = ConnHandle} = ConnData, 2164 TransId, 2165 #request{timer_ref = {short, Ref}, 2166 init_long_timer = InitTimer}, T) -> 2167 2168 ?rt2("handle pending - long request", [LRR, InitTimer]), 2169 2170 %% The request seems to take a while, 2171 %% let's reset our transmission timer. 2172 %% We now know the other side has got 2173 %% the request and is working on it, 2174 %% so there is no need to keep the binary 2175 %% message for re-transmission. 2176 2177 %% Start using the long timer. 2178 %% We can now drop the "bytes", since we will 2179 %% not resend from now on. 2180 2181 megaco_monitor:cancel_apply_after(Ref), 2182 {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), 2183 ConnHandle = ConnData#conn_data.conn_handle, 2184 M = ?MODULE, 2185 F = request_timeout, 2186 A = [ConnHandle, TransId], 2187 Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor), 2188 NewFields = 2189 case LRR of 2190 true -> 2191 [{#request.timer_ref, {long, Ref2}}, 2192 {#request.curr_timer, CurrTimer}]; 2193 false -> 2194 [{#request.bytes, {no_send, garb_binary}}, 2195 {#request.timer_ref, {long, Ref2}}, 2196 {#request.curr_timer, CurrTimer}] 2197 end, 2198 ?report_trace(ConnData, "trans pending (timer restarted)", [T]), 2199 megaco_monitor:update_request_fields(TransId, NewFields); % Timing problem? 2200 2201handle_recv_pending(_ConnData, _TransId, 2202 #request{timer_ref = {long, _Ref}, 2203 curr_timer = timeout}, _T) -> 2204 2205 ?rt3("handle pending - timeout"), 2206 2207 %% The request seems to take a while, 2208 %% let's reset our transmission timer. 2209 %% We now know the other side has got 2210 %% the request and is working on it, 2211 %% so there is no need to keep the binary 2212 %% message for re-transmission. 2213 2214 %% This can happen if the timer is running for the last 2215 %% time. I.e. next time it expires, will be the last. 2216 %% Therefor we really do not need to do anything here. 2217 %% The cleanup will be done in request_timeout. 2218 2219 ok; 2220 2221handle_recv_pending(#conn_data{conn_handle = ConnHandle} = ConnData, TransId, 2222 #request{timer_ref = {long, Ref}, 2223 curr_timer = CurrTimer}, T) -> 2224 2225 ?rt2("handle pending - still waiting", [CurrTimer]), 2226 2227 %% The request seems to take a while, 2228 %% let's reset our transmission timer. 2229 %% We now know the other side has got 2230 %% the request and is working on it, 2231 %% so there is no need to keep the binary 2232 %% message for re-transmission. 2233 2234 %% We just need to recalculate the timer, i.e. 2235 %% increment the timer (one "slot" has been consumed). 2236 2237 megaco_monitor:cancel_apply_after(Ref), 2238 {WaitFor, Timer2} = megaco_timer:restart(CurrTimer), 2239 ConnHandle = ConnData#conn_data.conn_handle, 2240 M = ?MODULE, 2241 F = request_timeout, 2242 A = [ConnHandle, TransId], 2243 Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor), 2244 NewFields = 2245 [{#request.timer_ref, {long, Ref2}}, 2246 {#request.curr_timer, Timer2}], 2247 ?report_trace(ConnData, 2248 "long trans pending" 2249 " (timer restarted)", [T]), 2250 %% Timing problem? 2251 megaco_monitor:update_request_fields(TransId, NewFields). 2252 2253 2254handle_recv_pending_error(ConnData, TransId, Req, T, Extra) -> 2255 %% 1) Delete the request record 2256 megaco_monitor:delete_request(TransId), 2257 2258 %% 2) Possibly cancel the timer 2259 case Req#request.timer_ref of 2260 {_, Ref} -> 2261 megaco_monitor:cancel_apply_after(Ref); 2262 _ -> 2263 ok 2264 end, 2265 2266 %% 3) Delete the (receive) pending counter 2267 megaco_config:del_pending_counter(recv, TransId), 2268 2269 %% 4) Inform the user that his/her request reached 2270 %% the receive pending limit 2271 UserMod = Req#request.user_mod, 2272 UserArgs = Req#request.user_args, 2273 Action = Req#request.reply_action, 2274 UserData = Req#request.reply_data, 2275 UserReply = {error, exceeded_recv_pending_limit}, 2276 ConnData2 = ConnData#conn_data{user_mod = UserMod, 2277 user_args = UserArgs, 2278 reply_action = Action, 2279 reply_data = UserData}, 2280 2281 ?report_trace(ConnData, "receive pending limit reached", [T]), 2282 return_reply(ConnData2, TransId, UserReply, Extra). 2283 2284 2285%% 2286%% This _is_ a segmented message. 2287%% 2288%% Since this is not the last segment, we shall not send any ack. 2289%% (even if three-way-handshake has been configured). 2290%% 2291handle_reply( 2292 ConnData, 2293 #megaco_transaction_reply{segmentNumber = SN, 2294 segmentationComplete = asn1_NOVALUE} = T, Extra) 2295 when is_integer(SN) -> 2296 TransId = to_local_trans_id(ConnData), 2297 ?rt2("handle segmented reply", [T, TransId, SN]), 2298 case megaco_monitor:lookup_request(TransId) of 2299 2300 %% --------------------------------------------------------- 2301 %% The first segment, so stop the request timer. No longer 2302 %% needed when the segment(s) start to arrive. 2303 2304 [#request{timer_ref = {_Type, Ref}, 2305 seg_recv = [], 2306 seg_timer_ref = undefined} = Req] -> 2307 2308 %% Don't care about Req and Rep version diff 2309 ?report_trace(ConnData, "[segmented] trans reply - first seg", 2310 [T]), 2311 2312 %% Stop the request timer 2313 megaco_monitor:cancel_apply_after(Ref), %% OTP-4843 2314 2315 %% Acknowledge the segment 2316 send_segment_reply(ConnData, SN), 2317 2318 %% First segment for this reply 2319 NewFields = 2320 [{#request.timer_ref, undefined}, 2321 {#request.seg_recv, [SN]}], 2322 megaco_monitor:update_request_fields(TransId, NewFields), 2323 2324 %% Handle the reply 2325 UserMod = Req#request.user_mod, 2326 UserArgs = Req#request.user_args, 2327 Action = Req#request.reply_action, 2328 UserData = Req#request.reply_data, 2329 UserReply = 2330 case T#megaco_transaction_reply.transactionResult of 2331 {transactionError, Reason} -> 2332 {error, {SN, false, Reason}}; 2333 {actionReplies, Replies} -> 2334 {ok, {SN, false, Replies}} 2335 end, 2336 ConnData2 = ConnData#conn_data{user_mod = UserMod, 2337 user_args = UserArgs, 2338 reply_action = Action, 2339 reply_data = UserData}, 2340 return_reply(ConnData2, TransId, UserReply, Extra); 2341 2342 2343 %% --------------------------------------------------------- 2344 %% This is not the first segment. 2345 %% The segment timer has not been started, so the last 2346 %% segment have been received. 2347 %% We must check that this is not a re-transmission! 2348 2349 [#request{seg_recv = Segs, 2350 seg_timer_ref = undefined} = Req] -> 2351 %% Don't care about Req and Rep version diff 2352 ?report_trace(ConnData, "[segmented] trans reply - no seg acc", 2353 [T]), 2354 2355 %% Acknowledge the segment 2356 send_segment_reply(ConnData, SN), 2357 2358 %% Updated/handle received segment 2359 case lists:member(SN, Segs) of 2360 true -> 2361 %% This is a re-transmission, so we shall not pass 2362 %% it on to the user (or update the request record). 2363 ok; 2364 false -> 2365 %% First time for this segment 2366 megaco_monitor:update_request_field(TransId, 2367 #request.seg_recv, 2368 [ SN | Segs ]), 2369 2370 %% Handle the reply 2371 UserMod = Req#request.user_mod, 2372 UserArgs = Req#request.user_args, 2373 Action = Req#request.reply_action, 2374 UserData = Req#request.reply_data, 2375 UserReply = 2376 case T#megaco_transaction_reply.transactionResult of 2377 {transactionError, Reason} -> 2378 {error, {SN, false, Reason}}; 2379 {actionReplies, Replies} -> 2380 {ok, {SN, false, Replies}} 2381 end, 2382 ConnData2 = ConnData#conn_data{user_mod = UserMod, 2383 user_args = UserArgs, 2384 reply_action = Action, 2385 reply_data = UserData}, 2386 return_reply(ConnData2, TransId, UserReply, Extra) 2387 2388 end; 2389 2390 2391 %% --------------------------------------------------------- 2392 %% The segment timer is running! 2393 %% This could be the last (out-of-order) segment! 2394 %% We must check that this is not a re-transmission! 2395 2396 [#request{seg_recv = Segs, 2397 seg_timer_ref = SegRef} = Req] -> 2398 %% Don't care about Req and Rep version diff 2399 ?report_trace(ConnData, "[segmented] trans reply - no seg acc", 2400 [T]), 2401 2402 %% Acknowledge the segment 2403 send_segment_reply(ConnData, SN), 2404 2405 %% Updated received segments 2406 case lists:member(SN, Segs) of 2407 true -> 2408 %% This is a re-transmission 2409 ok; 2410 false -> 2411 %% First time for this segment, 2412 %% we may now have a complete set 2413 Last = 2414 case is_all_segments([SN | Segs]) of 2415 {true, _Sorted} -> 2416 megaco_monitor:cancel_apply_after(SegRef), 2417 megaco_monitor:delete_request(TransId), 2418 send_ack(ConnData), 2419 true; 2420 {false, Sorted} -> 2421 megaco_monitor:update_request_field(TransId, 2422 #request.seg_recv, 2423 Sorted), 2424 false 2425 end, 2426 2427 %% Handle the reply 2428 UserMod = Req#request.user_mod, 2429 UserArgs = Req#request.user_args, 2430 Action = Req#request.reply_action, 2431 UserData = Req#request.reply_data, 2432 UserReply = 2433 case T#megaco_transaction_reply.transactionResult of 2434 {transactionError, Reason} -> 2435 {error, {SN, Last, Reason}}; 2436 {actionReplies, Replies} -> 2437 {ok, {SN, Last, Replies}} 2438 end, 2439 ConnData2 = ConnData#conn_data{user_mod = UserMod, 2440 user_args = UserArgs, 2441 reply_action = Action, 2442 reply_data = UserData}, 2443 return_reply(ConnData2, TransId, UserReply, Extra) 2444 2445 end; 2446 2447 2448 [] -> 2449 ?report_trace(ConnData, "trans reply (no receiver)", [T]), 2450 return_unexpected_trans(ConnData, T, Extra) 2451 end; 2452 2453 2454%% 2455%% This _is_ a segmented message and it's the last segment of the 2456%% message. 2457%% 2458handle_reply( 2459 ConnData, 2460 #megaco_transaction_reply{segmentNumber = SN, 2461 segmentationComplete = 'NULL'} = T, Extra) 2462 when is_integer(SN) -> 2463 TransId = to_local_trans_id(ConnData), 2464 ?rt2("handle (last) segmented reply", [T, TransId, SN]), 2465 case megaco_monitor:lookup_request(TransId) of 2466 2467 %% --------------------------------------------------------- 2468 %% The first segment, so stop the request timer. No longer 2469 %% needed when the segment(s) start to arrive. 2470 2471 [#request{timer_ref = {_Type, Ref}, 2472 seg_recv = [], 2473 seg_timer_ref = undefined} = Req] -> 2474 2475 %% Don't care about Req and Rep version diff 2476 ?report_trace(ConnData, "[segmented] trans reply - " 2477 "first/complete seg", [T]), 2478 2479 %% Stop the request timer 2480 megaco_monitor:cancel_apply_after(Ref), %% OTP-4843 2481 2482 %% Acknowledge the ("last") segment 2483 send_segment_reply_complete(ConnData, SN), 2484 2485 %% It is ofcourse pointless to split 2486 %% a transaction into just one segment, 2487 %% but just to be sure, we handle that 2488 %% case also 2489 Last = 2490 if 2491 SN > 1 -> 2492 %% More then one segment 2493 %% First time for this segment 2494 ConnHandle = ConnData#conn_data.conn_handle, 2495 InitSegTmr = Req#request.init_seg_timer, 2496 {WaitFor, CurrTimer} = megaco_timer:init(InitSegTmr), 2497 M = ?MODULE, 2498 F = segment_timeout, 2499 A = [ConnHandle, TransId, CurrTimer], 2500 SegRef = 2501 megaco_monitor:apply_after(M, F, A, WaitFor), 2502 NewFields = 2503 [{#request.timer_ref, undefined}, 2504 {#request.seg_recv, [SN]}, 2505 {#request.seg_timer_ref, SegRef}], 2506 megaco_monitor:update_request_fields(TransId, NewFields), 2507 false; 2508 true -> 2509 %% Just one segment! 2510 megaco_monitor:delete_request(TransId), 2511 send_ack(ConnData), 2512 true 2513 end, 2514 2515 %% Handle the reply 2516 UserMod = Req#request.user_mod, 2517 UserArgs = Req#request.user_args, 2518 Action = Req#request.reply_action, 2519 UserData = Req#request.reply_data, 2520 UserReply = 2521 case T#megaco_transaction_reply.transactionResult of 2522 {transactionError, Reason} -> 2523 {error, {SN, Last, Reason}}; 2524 {actionReplies, Replies} -> 2525 {ok, {SN, Last, Replies}} 2526 end, 2527 ConnData2 = ConnData#conn_data{user_mod = UserMod, 2528 user_args = UserArgs, 2529 reply_action = Action, 2530 reply_data = UserData}, 2531 return_reply(ConnData2, TransId, UserReply, Extra); 2532 2533 2534 [#request{seg_recv = Segs} = Req] -> 2535 %% Don't care about Req and Rep version diff 2536 ?report_trace(ConnData, "[segmented] trans reply - no seg acc", 2537 [T]), 2538 2539 %% Acknowledge the ("last") segment 2540 send_segment_reply_complete(ConnData, SN), 2541 2542 %% Updated received segments 2543 %% This is _probably_ the last segment, but some of 2544 %% the previous segments may have been lost, so we 2545 %% may not have a complete set! 2546 case lists:member(SN, Segs) of 2547 true -> 2548 %% This is a re-transmission 2549 ok; 2550 false -> 2551 Last = 2552 case is_all_segments([SN | Segs]) of 2553 {true, _Sorted} -> 2554 ?report_trace(ConnData, 2555 "[segmented] trans reply - " 2556 "complete set", [T]), 2557 megaco_monitor:delete_request(TransId), 2558 send_ack(ConnData), 2559 true; 2560 {false, Sorted} -> 2561 ConnHandle = ConnData#conn_data.conn_handle, 2562 InitSegTmr = Req#request.init_seg_timer, 2563 {WaitFor, CurrTimer} = 2564 megaco_timer:init(InitSegTmr), 2565 M = ?MODULE, 2566 F = segment_timeout, 2567 A = [ConnHandle, TransId, CurrTimer], 2568 SegRef = 2569 megaco_monitor:apply_after(M, F, A, 2570 WaitFor), 2571 NewFields = 2572 [{#request.seg_recv, Sorted}, 2573 {#request.seg_timer_ref, SegRef}], 2574 megaco_monitor:update_request_fields(TransId, NewFields), 2575 false 2576 end, 2577 2578 %% Handle the reply 2579 UserMod = Req#request.user_mod, 2580 UserArgs = Req#request.user_args, 2581 Action = Req#request.reply_action, 2582 UserData = Req#request.reply_data, 2583 UserReply = 2584 case T#megaco_transaction_reply.transactionResult of 2585 {transactionError, Reason} -> 2586 {error, {SN, Last, Reason}}; 2587 {actionReplies, Replies} -> 2588 {ok, {SN, Last, Replies}} 2589 end, 2590 ConnData2 = ConnData#conn_data{user_mod = UserMod, 2591 user_args = UserArgs, 2592 reply_action = Action, 2593 reply_data = UserData}, 2594 return_reply(ConnData2, TransId, UserReply, Extra) 2595 2596 end; 2597 2598 [] -> 2599 ?report_trace(ConnData, "trans reply (no receiver)", [T]), 2600 return_unexpected_trans(ConnData, T, Extra) 2601 end; 2602 2603 2604%% 2605%% This is _not_ a segmented message, 2606%% i.e. it's an ordinary transaction reply 2607%% 2608handle_reply(#conn_data{conn_handle = CH} = CD, T, Extra) -> 2609 TransId = to_local_trans_id(CD), 2610 ?rt2("handle reply", [T, TransId]), 2611 case {megaco_monitor:request_lockcnt_inc(TransId), 2612 megaco_monitor:lookup_request(TransId)} of 2613 {_Cnt, [Req]} when (is_record(Req, request) andalso 2614 (CD#conn_data.cancel =:= true)) -> 2615 ?TC_AWAIT_REPLY_EVENT(true), 2616 ?report_trace(CD, "trans reply - cancel(1)", [T]), 2617 do_handle_reply_cancel(CD, Req, T); 2618 2619 {Cnt, [#request{remote_mid = RMid} = Req]} when 2620 ((Cnt =:= 1) andalso 2621 ((RMid =:= preliminary_mid) orelse 2622 (RMid =:= CH#megaco_conn_handle.remote_mid))) -> 2623 ?TC_AWAIT_REPLY_EVENT(false), 2624 %% Just in case conn_data got update after our lookup 2625 %% but before we looked up the request record, we 2626 %% check the cancel field again. 2627 case megaco_config:conn_info(CD, cancel) of 2628 true -> 2629 ?report_trace(CD, "trans reply - cancel(2)", [T]), 2630 megaco_monitor:request_lockcnt_del(TransId), 2631 do_handle_reply_cancel(CD, Req, T); 2632 false -> 2633 ?report_trace(CD, "trans reply", [T]), 2634 do_handle_reply(CD, Req, TransId, T, Extra) 2635 end; 2636 2637 {Cnt, [#request{remote_mid = RMid} = _Req]} when 2638 (is_integer(Cnt) andalso 2639 ((RMid =:= preliminary_mid) orelse 2640 (RMid =:= CH#megaco_conn_handle.remote_mid))) -> 2641 ?TC_AWAIT_REPLY_EVENT(false), 2642 %% Ok, someone got there before me, now what? 2643 %% This is a plain old raise condition 2644 ?report_important(CD, "trans reply - raise condition", 2645 [T, {request_lockcnt, Cnt}]), 2646 megaco_monitor:request_lockcnt_dec(TransId); 2647 2648 %% no counter 2649 {_Cnt, [#request{remote_mid = RMid} = Req]} when 2650 ((RMid =:= preliminary_mid) orelse 2651 (RMid =:= CH#megaco_conn_handle.remote_mid)) -> 2652 ?TC_AWAIT_REPLY_EVENT(false), 2653 %% The counter does not exist. 2654 %% This can only mean a code upgrade raise condition. 2655 %% That is, this request record was created before 2656 %% this feature (the counters) was instroduced. 2657 %% The simples solution is this is to behave exactly as 2658 %% before, that is pass it along, and leave it to the 2659 %% user to figure out. 2660 2661 %% Just in case conn_data got update after our lookup 2662 %% but before we looked up the request record, we 2663 %% check the cancel field again. 2664 ?report_verbose(CD, "trans reply - old style", [T]), 2665 case megaco_config:conn_info(CD, cancel) of 2666 true -> 2667 megaco_monitor:request_lockcnt_del(TransId), 2668 do_handle_reply_cancel(CD, Req, T); 2669 false -> 2670 do_handle_reply(CD, Req, TransId, T, Extra) 2671 end; 2672 2673 {Cnt, [#request{user_mod = UserMod, 2674 user_args = UserArgs, 2675 reply_action = Action, 2676 reply_data = UserData, 2677 remote_mid = RMid}]} -> 2678 ?report_trace(CD, 2679 "received trans reply with invalid remote mid", 2680 [{transaction, T}, 2681 {remote_mid, RMid}, 2682 {request_lockcnt, Cnt}]), 2683 if 2684 is_integer(Cnt) -> 2685 megaco_monitor:request_lockcnt_dec(TransId); 2686 true -> 2687 ok 2688 end, 2689 WrongMid = CH#megaco_conn_handle.remote_mid, 2690 T2 = transform_transaction_reply_enc(CD#conn_data.protocol_version, 2691 T), 2692 UserReply = {error, {wrong_mid, WrongMid, RMid, T2}}, 2693 CD2 = CD#conn_data{user_mod = UserMod, 2694 user_args = UserArgs, 2695 reply_action = Action, 2696 reply_data = UserData}, 2697 return_reply(CD2, TransId, UserReply, Extra); 2698 2699 {Cnt, []} when is_integer(Cnt) -> 2700 ?TC_AWAIT_REPLY_EVENT(undefined), 2701 ?report_trace(CD, "trans reply (no receiver)", 2702 [T, {request_lockcnt, Cnt}]), 2703 megaco_monitor:request_lockcnt_dec(TransId), 2704 return_unexpected_trans(CD, T, Extra); 2705 2706 %% No counter 2707 {_Cnt, []} -> 2708 ?TC_AWAIT_REPLY_EVENT(undefined), 2709 ?report_trace(CD, "trans reply (no receiver)", [T]), 2710 return_unexpected_trans(CD, T, Extra) 2711 end. 2712 2713do_handle_reply_cancel(CD, #request{user_mod = UserMod, 2714 user_args = UserArgs, 2715 reply_action = Action, 2716 reply_data = UserData}, T) -> 2717 CD2 = CD#conn_data{user_mod = UserMod, 2718 user_args = UserArgs, 2719 reply_action = Action, 2720 reply_data = UserData}, 2721 return_unexpected_trans(CD2, T). 2722 2723%% Plain old handling of incomming replies 2724do_handle_reply(CD, 2725 #request{timer_ref = {_Type, Ref}, % OTP-4843 2726 user_mod = UserMod, 2727 user_args = UserArgs, 2728 reply_action = Action, 2729 reply_data = UserData, 2730 keep_alive_timer = RKAT}, 2731 TransId, T, Extra) 2732 when ((RKAT =:= plain) orelse (Action =:= call)) -> 2733 %% Don't care about Req and Rep version diff 2734 ?report_trace(CD, "trans reply", [T]), 2735 2736 %% This is the first reply (maybe of many) 2737 megaco_monitor:delete_request(TransId), 2738 megaco_monitor:request_lockcnt_del(TransId), 2739 megaco_monitor:cancel_apply_after(Ref), % OTP-4843 2740 megaco_config:del_pending_counter(recv, TransId), % OTP-7189 2741 2742 %% Send acknowledgement 2743 maybe_send_ack(T#megaco_transaction_reply.immAckRequired, CD), 2744 2745 UserReply = 2746 case T#megaco_transaction_reply.transactionResult of 2747 {transactionError, Reason} -> 2748 {error, Reason}; 2749 {actionReplies, Replies} -> 2750 {ok, Replies} 2751 end, 2752 CD2 = CD#conn_data{user_mod = UserMod, 2753 user_args = UserArgs, 2754 reply_action = Action, 2755 reply_data = UserData}, 2756 return_reply(CD2, TransId, UserReply, Extra); 2757 2758%% This may be the first reply (of maybe many) 2759do_handle_reply(CD, 2760 #request{user_mod = UserMod, 2761 user_args = UserArgs, 2762 reply_action = Action, 2763 reply_data = UserData, 2764 keep_alive_ref = undefined} = Req, 2765 TransId, T, Extra) -> 2766 %% Don't care about Req and Rep version diff 2767 ?report_trace(CD, "trans reply", [T]), 2768 2769 %% Could be the first reply, in which case we shall start the 2770 %% Request Keep Alive timer... 2771 %% This could happen for more than one (1) reply though, so 2772 %% we need to check if the counter value actually equals one (1)! 2773 2774 ReplyNo = 2775 create_or_maybe_increment_request_keep_alive_counter(CD, TransId), 2776 if 2777 (ReplyNo =:= 1) -> 2778 %% This *is* the first reply!! 2779 %% 1) Stop resend timer 2780 {_Type, Ref} = Req#request.timer_ref, % OTP-4843 2781 megaco_monitor:cancel_apply_after(Ref), % OTP-4843 2782 2783 %% 2) Delete pending counter 2784 megaco_config:del_pending_counter(recv, TransId), % OTP-7189 2785 2786 %% 3) Start request keep alive timer 2787 ConnHandle = CD#conn_data.conn_handle, 2788 RKATimer = Req#request.keep_alive_timer, 2789 {RKAWaitFor, _} = megaco_timer:init(RKATimer), 2790 RKARef = megaco_monitor:apply_after(?MODULE, 2791 request_keep_alive_timeout, 2792 [ConnHandle, TransId], 2793 RKAWaitFor), 2794 2795 %% 4) Maybe send acknowledgement (three-way-handshake) 2796 maybe_send_ack(T#megaco_transaction_reply.immAckRequired, CD), 2797 2798 %% 5) And finally store the updated request record 2799 Req2 = Req#request{keep_alive_ref = RKARef}, 2800 megaco_monitor:insert_request(Req2); 2801 2802 true -> 2803 ok 2804 end, 2805 2806 UserReply = 2807 case T#megaco_transaction_reply.transactionResult of 2808 {transactionError, Reason} -> 2809 {error, ReplyNo, Reason}; 2810 {actionReplies, Replies} -> 2811 {ok, ReplyNo, Replies} 2812 end, 2813 CD2 = CD#conn_data{user_mod = UserMod, 2814 user_args = UserArgs, 2815 reply_action = Action, 2816 reply_data = UserData}, 2817 return_reply(CD2, TransId, UserReply, Extra); 2818 2819%% This is *not* the first reply (of many) 2820do_handle_reply(CD, #request{user_mod = UserMod, 2821 user_args = UserArgs, 2822 reply_action = Action, 2823 reply_data = UserData}, TransId, T, Extra) -> 2824 %% Don't care about Req and Rep version diff 2825 ?report_trace(CD, "trans reply (first reply already delivered)", [T]), 2826 2827 ReplyNo = increment_request_keep_alive_counter(CD, TransId), 2828 2829 UserReply = 2830 case T#megaco_transaction_reply.transactionResult of 2831 {transactionError, Reason} -> 2832 {error, ReplyNo, Reason}; 2833 {actionReplies, Replies} -> 2834 {ok, ReplyNo, Replies} 2835 end, 2836 CD2 = CD#conn_data{user_mod = UserMod, 2837 user_args = UserArgs, 2838 reply_action = Action, 2839 reply_data = UserData}, 2840 return_reply(CD2, TransId, UserReply, Extra). 2841 2842is_all_segments(Segs) -> 2843 Sorted = lists:sort(Segs), 2844 {is_all_segments(Sorted, 1, lists:last(Sorted)), Sorted}. 2845 2846is_all_segments([Last], Last, Last) -> 2847 true; 2848is_all_segments([_], _, _) -> 2849 false; 2850is_all_segments([SN|Segs], SN, Last) when (SN < Last) -> 2851 is_all_segments(Segs, SN+1, Last); 2852is_all_segments([SN1|_], SN2, _Last) when SN1 =/= SN2 -> 2853 false. 2854 2855 2856handle_segment_reply(CD, 2857 #'SegmentReply'{transactionId = TransId, 2858 segmentNumber = SN, 2859 segmentationComplete = SC}, Extra) -> 2860 ?rt2("handle segment reply", [{trans_id, TransId}, 2861 {segment_no, SN}, 2862 {segmentation_complete, SC}]), 2863 TransId2 = to_remote_trans_id(CD#conn_data{serial = TransId}), 2864 case lookup_reply(CD, TransId2) of 2865 {_Converted, 2866 #reply{bytes = Sent, 2867 segments = []} = Rep} when is_list(Sent) -> 2868 ?rt2("no unsent segments", [Sent]), 2869 handle_segment_reply_callback(CD, TransId, SN, SC, Extra), 2870 case lists:keysearch(SN, 1, Sent) of 2871 {value, {SN, _Bin, SegTmr}} -> 2872 megaco_monitor:cancel_apply_after(SegTmr), %% BMK BMK 2873 case lists:keydelete(SN, 1, Sent) of 2874 [] -> %% We are done 2875 Ref = Rep#reply.timer_ref, 2876 megaco_monitor:cancel_apply_after(Ref), 2877 megaco_monitor:update_reply_field(TransId2, 2878 #reply.bytes, 2879 []), 2880 ok; 2881 NewSent -> 2882 megaco_monitor:update_reply_field(TransId2, 2883 #reply.bytes, 2884 NewSent), 2885 ok 2886 end; 2887 _ -> 2888 ok 2889 end; 2890 2891 {_Converted, 2892 #reply{bytes = Sent, 2893 segments = NotSent}} when is_list(Sent) andalso 2894 is_list(NotSent) -> 2895 ?rt2("unsent segments", [Sent, NotSent]), 2896 handle_segment_reply_callback(CD, TransId, SN, SC, Extra), 2897 case lists:keysearch(SN, 1, Sent) of 2898 {value, {SN, _Bin, SegTmr}} -> 2899 megaco_monitor:cancel_apply_after(SegTmr), %% BMK BMK 2900 NewSent = lists:keydelete(SN, 1, Sent), 2901 [{SN2, Bin2}|NewNotSent] = NotSent, 2902 case send_reply_segment(CD, "send trans reply segment", 2903 SN2, Bin2) of 2904 {ok, Bin3} -> 2905 ?rt2("another segment sent", [Bin3]), 2906 NewSent2 = [{SN2, Bin3, undefined}|NewSent], 2907 NewFields = 2908 [{#reply.bytes, NewSent2}, 2909 {#reply.segments, NewNotSent}], 2910 megaco_monitor:update_reply_fields(TransId2, 2911 NewFields), 2912 ok; 2913 Error -> 2914 incNumErrors(CD#conn_data.conn_handle), 2915 ?report_important(CD, "failed sending segment", 2916 [{segment_no, SN2}, 2917 {error, Error}]), 2918 error_msg("failed sending transaction reply [~w] " 2919 "segment [~w]: ~w", 2920 [TransId, SN2, Error]), 2921 megaco_monitor:update_reply_field(TransId2, 2922 #reply.bytes, 2923 NewSent), 2924 ok 2925 end; 2926 _ -> 2927 ok 2928 end; 2929 2930 {_Converted, 2931 #reply{state = State}} -> 2932 %% We received a segment reply for a segmented reply we have 2933 %% not yet sent? This is either some sort of race condition 2934 %% or the "the other side" is really confused. 2935 %% Ignore the message but issue a warning just in case... 2936 warning_msg("received unexpected segment reply: " 2937 "~n Transaction Id: ~p" 2938 "~n Segment Number: ~p" 2939 "~n Segmentation Complete: ~p" 2940 "~n Reply state: ~p", 2941 [TransId2, SN, SC, State]), 2942 ignore; 2943 2944 [] -> 2945 ignore 2946 2947 end. 2948 2949 2950%% 2951%% This should be passed on to the user only if the user wish it 2952%% (sri = segment reply indication) 2953%% 2954handle_segment_reply_callback(#conn_data{segment_reply_ind = true, 2955 conn_handle = ConnHandle, 2956 protocol_version = Version, 2957 user_mod = UserMod, 2958 user_args = UserArgs}, 2959 TransId, SN, SC, Extra) -> 2960 Args = 2961 case Extra of 2962 ?default_user_callback_extra -> 2963 [ConnHandle, Version, TransId, SN, SC | UserArgs]; 2964 _ -> 2965 [ConnHandle, Version, TransId, SN, SC, Extra | UserArgs] 2966 end, 2967 (catch apply(UserMod, handle_segment_reply, Args)); 2968handle_segment_reply_callback(_CD, _TransId, _SN, _SC, _Extra) -> 2969 ok. 2970 2971 2972handle_acks([{ConnData, Rep, T} | Rest], Extra) 2973 when Rep#reply.state == waiting_for_ack -> 2974 handle_ack(ConnData, ok, Rep, T, Extra), 2975 handle_acks(Rest, Extra); 2976handle_acks([], _Extra) -> 2977 ok. 2978 2979%% If the reply to which this is the ack was segmented, 2980%% then we also need to check that we have received all 2981%% the segment-replies. If not, an error callback call 2982%% shall be made instead. 2983handle_ack(ConnData, AckStatus, 2984 #reply{trans_id = TransId, 2985 bytes = Bytes, 2986 timer_ref = ReplyRef, 2987 pending_timer_ref = PendingRef, %% BMK Still running? 2988 ack_action = AckAction}, T, Extra) 2989 when is_binary(Bytes) orelse (Bytes =:= undefined) -> 2990 handle_ack_cleanup(TransId, ReplyRef, PendingRef), 2991 handle_ack_callback(ConnData, AckStatus, AckAction, T, Extra); 2992 2993handle_ack(ConnData, AckStatus, 2994 #reply{trans_id = TransId, 2995 bytes = [], 2996 segments = [], 2997 timer_ref = ReplyRef, 2998 pending_timer_ref = PendingRef, %% BMK Still running? 2999 ack_action = AckAction}, T, Extra) -> 3000 handle_ack_cleanup(TransId, ReplyRef, PendingRef), 3001 handle_ack_callback(ConnData, AckStatus, AckAction, T, Extra); 3002 3003handle_ack(ConnData, OrigAckStatus, 3004 #reply{trans_id = TransId, 3005 bytes = SegSent, 3006 segments = NotSent, 3007 timer_ref = ReplyRef, 3008 pending_timer_ref = PendingRef, %% BMK Still running? 3009 ack_action = OrigAckAction}, T, Extra) 3010 when is_list(SegSent) andalso is_list(NotSent) -> 3011 SN_NotAcked = [SN || {SN, _, _} <- SegSent], 3012 SN_NotSent = [SN || {SN, _} <- NotSent], 3013 AckStatus = {error, {segment_failure, 3014 [{original_ack_status, OrigAckStatus}, 3015 {segments_not_acked, SN_NotAcked}, 3016 {segments_not_sent, SN_NotSent}]}}, 3017 AckAction = 3018 case OrigAckAction of 3019 {handle_ack, _} -> 3020 OrigAckAction; 3021 _ -> 3022 {handle_ack, segmented_reply} 3023 end, 3024 cancel_segment_timers(SegSent), 3025 handle_ack_cleanup(TransId, ReplyRef, PendingRef), 3026 handle_ack_callback(ConnData, AckStatus, AckAction, T, Extra). 3027 3028handle_ack_cleanup(TransId, ReplyRef, PendingRef) -> 3029 megaco_monitor:cancel_apply_after(ReplyRef), 3030 megaco_monitor:cancel_apply_after(PendingRef), 3031 megaco_monitor:delete_reply(TransId), 3032 megaco_config:del_pending_counter(sent, TransId). %% BMK: Still existing? 3033 3034cancel_segment_timers(SegSent) when is_list(SegSent) -> 3035 Cancel = fun({_, _, Ref}) -> 3036 megaco_monitor:cancel_apply_after(Ref) 3037 end, 3038 lists:foreach(Cancel, SegSent); 3039cancel_segment_timers(_) -> 3040 ok. 3041 3042handle_ack_callback(_CD, ok = _AS, discard_ack = _AA, _T, _Extra) -> 3043 ok; 3044handle_ack_callback(ConnData, {error, Reason}, discard_ack = AckAction, T, Extra) -> 3045 ?report_trace(ConnData, "handle ack (no callback)", 3046 [T, AckAction, {error, Reason}, Extra]); 3047handle_ack_callback(ConnData, AckStatus, {handle_ack, AckData}, T, Extra) -> 3048 ?report_trace(ConnData, "callback: trans ack", [{ack_data, AckData}]), 3049 ConnHandle = ConnData#conn_data.conn_handle, 3050 Version = ConnData#conn_data.protocol_version, 3051 UserMod = ConnData#conn_data.user_mod, 3052 UserArgs = ConnData#conn_data.user_args, 3053 Args = 3054 case Extra of 3055 ?default_user_callback_extra -> 3056 [ConnHandle, Version, AckStatus, AckData | UserArgs]; 3057 _ -> 3058 [ConnHandle, Version, AckStatus, AckData, Extra | UserArgs] 3059 end, 3060 Res = (catch handle_callback(ConnData, UserMod, handle_trans_ack, Args)), 3061 ?report_debug(ConnData, "return: trans ack", [T, AckData, {return, Res}]), 3062 case Res of 3063 ok -> 3064 ok; 3065 _ -> 3066 warning_msg("transaction ack callback failed: ~w", [Res]), 3067 ok 3068 end, 3069 Res. 3070 3071 3072handle_callback(ConnData, undefined = _UserMod, Func, Args) -> 3073 ?report_important(ConnData, "callback: unknown callback module", 3074 [{func, Func}, {args, Args}]), 3075 ok; 3076handle_callback(_ConnData, UserMod, Func, Args) -> 3077 (catch apply(UserMod, Func, Args)). 3078 3079 3080handle_message_error(ConnData, _Error, _Extra) 3081 when ConnData#conn_data.monitor_ref == undefined_monitor_ref -> 3082 %% May occur if another process already has setup a 3083 %% temporary connection, but the handle_connect callback 3084 %% function has not yet returned before the eager MG 3085 %% re-sends its initial service change message. 3086 ignore; 3087handle_message_error(ConnData, Error, Extra) -> 3088 ?report_trace(ConnData, "callback: message error", [Error]), 3089 ConnHandle = ConnData#conn_data.conn_handle, 3090 Version = ConnData#conn_data.protocol_version, 3091 UserMod = ConnData#conn_data.user_mod, 3092 UserArgs = ConnData#conn_data.user_args, 3093 Args = 3094 case Extra of 3095 ?default_user_callback_extra -> 3096 [ConnHandle, Version, Error | UserArgs]; 3097 _ -> 3098 [ConnHandle, Version, Error, Extra | UserArgs] 3099 end, 3100 Res = (catch apply(UserMod, handle_message_error, Args)), 3101 ?report_debug(ConnData, "return: message error", [Error, {return, Res}]), 3102 case Res of 3103 ok -> 3104 ok; 3105 _ -> 3106 warning_msg("message error callback failed: ~w", [Res]), 3107 ok 3108 end, 3109 Res. 3110 3111handle_disconnect_callback(ConnData, UserReason) 3112 when is_record(ConnData, conn_data) -> 3113 ?report_trace(ConnData, "callback: disconnect", [{reason, UserReason}]), 3114 ConnHandle = ConnData#conn_data.conn_handle, 3115 Version = ConnData#conn_data.protocol_version, 3116 UserMod = ConnData#conn_data.user_mod, 3117 UserArgs = ConnData#conn_data.user_args, 3118 Args = [ConnHandle, Version, UserReason | UserArgs], 3119 Res = (catch apply(UserMod, handle_disconnect, Args)), 3120 ?report_debug(ConnData, "return: disconnect", [{reason, UserReason}, {return, Res}]), 3121 case Res of 3122 ok -> 3123 ok; 3124 _ -> 3125 warning_msg("disconnect callback failed: ~w", [Res]), 3126 ok 3127 end, 3128 Res. 3129 3130 3131%%---------------------------------------------------------------------- 3132%% Test "outgoing" messages 3133%%---------------------------------------------------------------------- 3134 3135%% test_request/5 -> {MegacoMessage, EncodingRes} 3136%% 3137%% This function is only intended for testing 3138%% (e.g. answer the question: have I constructed a valid action request?) 3139%% 3140%% It's not exactly the same code as a call to 'call' 3141%% or 'cast' but close enough. 3142%% 3143test_request(ConnHandle, Actions, 3144 Version, EncodingMod, EncodingConfig) 3145 when is_record(ConnHandle, megaco_conn_handle) and 3146 is_integer(Version) andalso is_atom(EncodingMod) -> 3147 %% Create a fake conn_data structure 3148 ConnData = #conn_data{serial = 1, 3149 protocol_version = Version, 3150 conn_handle = ConnHandle, 3151 auth_data = asn1_NOVALUE, 3152 encoding_mod = EncodingMod, 3153 encoding_config = EncodingConfig}, 3154 3155 TRs = test_req_compose_transactions(ConnData, Actions), 3156 Body = {transactions, TRs}, 3157 MegaMsg = megaco_messenger_misc:compose_message(ConnData, Version, Body), 3158 EncodeRes = megaco_messenger_misc:encode_message(ConnData, MegaMsg), 3159 {MegaMsg, EncodeRes}. 3160 3161 3162test_req_compose_transactions(ConnData, [A|_] = ActionsList) when is_list(A) -> 3163 LastSerial = ConnData#conn_data.serial, 3164 test_req_compose_transactions(LastSerial, lists:reverse(ActionsList), []); 3165test_req_compose_transactions(#conn_data{serial = Serial}, Actions) -> 3166 TR = #'TransactionRequest'{transactionId = Serial, 3167 actions = Actions}, 3168 [{transactionRequest, TR}]. 3169 3170test_req_compose_transactions(_Serial, [], Acc) -> 3171 lists:reverse(Acc); 3172test_req_compose_transactions(Serial, [A|As], Acc) -> 3173 TR = #'TransactionRequest'{transactionId = Serial, 3174 actions = A}, 3175 test_req_compose_transactions(Serial, As, [{transactionRequest, TR}|Acc]). 3176 3177 3178test_reply(ConnHandle, Version, EncodingMod, EncodingConfig, Error) 3179 when is_record(Error, 'ErrorDescriptor') -> 3180 Reply = {transactionError, Error}, 3181 test_reply_encode(ConnHandle, Version, EncodingMod, EncodingConfig, Reply); 3182test_reply(ConnHandle, Version, EncodingMod, EncodingConfig, Replies) 3183 when is_list(Replies) -> 3184 Reply = {actionReplies, Replies}, 3185 test_reply_encode(ConnHandle, Version, EncodingMod, EncodingConfig, Reply). 3186 3187test_reply_encode(ConnHandle, Version, EncodingMod, EncodingConfig, Reply) -> 3188 ImmAck = asn1_NOVALUE, 3189 Serial = 1, 3190 %% Create a fake conn_data structure 3191 CD = #conn_data{serial = Serial, 3192 protocol_version = Version, 3193 conn_handle = ConnHandle, 3194 auth_data = asn1_NOVALUE, 3195 encoding_mod = EncodingMod, 3196 encoding_config = EncodingConfig}, 3197 TR0 = #megaco_transaction_reply{transactionId = Serial, 3198 immAckRequired = ImmAck, 3199 transactionResult = Reply}, 3200 TR = megaco_messenger_misc:transform_transaction_reply(CD, TR0), 3201 Body = {transactions, [{transactionReply, TR}]}, 3202 MegaMsg = megaco_messenger_misc:compose_message(CD, Version, Body), 3203 EncodeRes = megaco_messenger_misc:encode_message(CD, MegaMsg), 3204 {MegaMsg, EncodeRes}. 3205 3206 3207%%---------------------------------------------------------------------- 3208%% Send (or prepare) outgoing messages 3209%%---------------------------------------------------------------------- 3210 3211%% Description: 3212%% Encode a list of actions or a list of list of actions for 3213%% later sending (using call or cast). 3214%% 3215%% encode_actions(CH, Acts, Opts) -> {ok, encoded_actions()} | {error, Reason} 3216%% CH -> connection_handle() 3217%% Acts -> action_reqs() | [action_reqs()] 3218%% action_reqs() -> [action_req()] 3219%% action_req() -> #'ActionRequest'{} 3220%% Opts -> [option()] 3221%% option() -> {Tab, Val} 3222%% Tag -> atom() 3223%% Val -> term() 3224%% encoded_actions() -> binary() | [binary()] 3225%% Reason -> term() 3226encode_actions(CH, [A|_] = ActionsList, Opts) 3227 when is_record(CH, megaco_conn_handle) andalso is_list(A) -> 3228 (catch encode_multi_actions(CH, ActionsList, Opts)); 3229 3230encode_actions(CH, [A|_] = Actions, Opts) 3231 when is_record(CH, megaco_conn_handle) andalso is_tuple(A) -> 3232 do_encode_actions(CH, Actions, Opts). 3233 3234encode_multi_actions(CH, ActionsList, Opts) -> 3235 case prepare_req_send_options(CH, Opts) of 3236 {ok, CD} -> 3237 ActsList = [encode_multi_actions(CD, Acts) || Acts <- ActionsList], 3238 {ok, ActsList}; 3239 Error -> 3240 Error 3241 end. 3242 3243encode_multi_actions(CD, Actions) -> 3244 case megaco_messenger_misc:encode_actions(CD, 3245 "encode multi actions", 3246 Actions) of 3247 {ok, Bin} -> 3248 Bin; 3249 Error -> 3250 throw(Error) 3251 end. 3252 3253do_encode_actions(CH, Actions, Opts) 3254 when is_record(CH, megaco_conn_handle) -> 3255 case prepare_req_send_options(CH, Opts) of 3256 {ok, CD} -> 3257 megaco_messenger_misc:encode_actions(CD, 3258 "encode actions", 3259 Actions); 3260 Error -> 3261 Error 3262 end. 3263 3264prepare_req_send_options(CH, Opts) -> 3265 case megaco_config:lookup_local_conn(CH) of 3266 [CD] -> 3267 override_req_send_options(any, CD, Opts); 3268 [] -> 3269 {error, {not_found, conn_data}} 3270 end. 3271 3272 3273call(ConnHandle, Actions, Options) -> 3274 case lists:keymember(reply_data, 1, Options) of 3275 true -> 3276 {error, {bad_option, reply_data}}; 3277 false -> 3278 Self = self(), 3279 ProxyFun = fun() -> call_proxy(Self) end, 3280 {Proxy, MRef} = erlang:spawn_monitor(ProxyFun), 3281 Options2 = [{reply_data, Proxy} | Options], 3282 call_or_cast(call, ConnHandle, Actions, Options2, MRef) 3283 end. 3284 3285cast(ConnHandle, Actions, Options) -> 3286 call_or_cast(cast, ConnHandle, Actions, Options, undefined). 3287 3288%% In a transaction there can be several actions, so if the 3289%% First element of the Actions list is an ''ActionRequest'' 3290%% record this a list of ActionRequest's for one Transaction 3291%% request. If on the other hand this is not the case, then 3292%% the Actions list is assumed to be a list of list of 3293%% ActionRequest. That is, action requests for several transactions. 3294%% It could also be a binary or a list of binaries (if 3295%% the actions has already been encoded). 3296call_or_cast(CallOrCast, ConnHandle, [A|_] = Actions, Options, ProxyMon) 3297 when is_tuple(A) -> 3298 %% Just one transaction 3299 case call_or_cast(CallOrCast, ConnHandle, [Actions], Options, ProxyMon) of 3300 ok -> 3301 ok; 3302 {error, Reason} -> 3303 {error, Reason}; 3304 {Version, [Reply]} when is_integer(Version) -> 3305 {Version, Reply}; 3306 {Version, Error} when is_integer(Version) -> 3307 {Version, Error} 3308 end; 3309 3310call_or_cast(CallOrCast, ConnHandle, Actions, Options, ProxyMon) 3311 when is_binary(Actions) -> 3312 %% Just one transaction (although the actions has already been encoded) 3313 case call_or_cast(CallOrCast, ConnHandle, [Actions], Options, ProxyMon) of 3314 ok -> 3315 ok; 3316 {error, Reason} -> 3317 {error, Reason}; 3318 {Version, [Reply]} when is_integer(Version) -> 3319 {Version, Reply}; 3320 {Version, Error} when is_integer(Version) -> 3321 {Version, Error} 3322 end; 3323 3324call_or_cast(CallOrCast, ConnHandle, ActionsList, Options, ProxyMon) 3325 when is_record(ConnHandle, megaco_conn_handle) -> 3326 case prepare_req_send_options(CallOrCast, 3327 ConnHandle, Options, ActionsList) of 3328 {ok, ConnData} -> 3329 ?report_trace(ConnData, "call_or_cast - options prepared", []), 3330 case encode_requests(ConnData, ActionsList) of 3331 {ok, TRs, BinOrBins} -> 3332 ?report_trace(ConnData, 3333 "call_or_cast - request encoded", []), 3334 send_request(ConnData, ConnHandle, 3335 TRs, CallOrCast, BinOrBins), 3336 case CallOrCast of 3337 call -> 3338 TransIds = to_local_trans_id(ConnData, TRs), 3339 wait_for_reply(ConnData, TransIds, ProxyMon); 3340 cast -> 3341 ok 3342 end; 3343 {error, Reason} -> 3344 call_proxy_cleanup(ConnData, ProxyMon), 3345 Version = ConnData#conn_data.protocol_version, 3346 return_error(CallOrCast, Version, {error, Reason}) 3347 end; 3348 {error, Reason} -> 3349 call_proxy_cleanup(Options, ProxyMon), 3350 return_error(CallOrCast, 1, {error, Reason}) 3351 end; 3352call_or_cast(CallOrCast, ConnHandle, _Actions, Options, ProxyMon) -> 3353 call_proxy_cleanup(Options, ProxyMon), 3354 return_error(CallOrCast, 1, {error, {bad_megaco_conn_handle, ConnHandle}}). 3355 3356 3357return_error(Action, Version, Error) -> 3358 case Action of 3359 call -> {Version, Error}; 3360 cast -> Error 3361 end. 3362 3363wait_for_reply(CD, TransIds, ProxyMon) -> 3364 ProxyPid = CD#conn_data.reply_data, 3365 ProxyPid ! {go, self(), CD, TransIds}, 3366 receive 3367 {reply, ProxyPid, Reply} -> 3368 erlang:demonitor(ProxyMon, [flush]), 3369 Reply; 3370 {'DOWN', ProxyMon, process, ProxyPid, Info} -> 3371 UserReply = {error, {call_proxy_crash, Info}}, 3372 {CD#conn_data.protocol_version, UserReply} 3373 end. 3374 3375 3376call_proxy_cleanup(#conn_data{reply_data = ProxyPid}, ProxyMon) -> 3377 do_call_proxy_cleanup(ProxyPid, ProxyMon); 3378call_proxy_cleanup(Options, ProxyMon) when is_list(Options) -> 3379 ProxyPid = 3380 case lists:keysearch(reply_data, 1, Options) of 3381 {value, {reply_data, Data}} -> 3382 Data; 3383 _ -> 3384 undefined 3385 end, 3386 do_call_proxy_cleanup(ProxyPid, ProxyMon); 3387call_proxy_cleanup(ProxyPid, ProxyMon) -> 3388 do_call_proxy_cleanup(ProxyPid, ProxyMon). 3389 3390do_call_proxy_cleanup(ProxyPid, ProxyMon) -> 3391 maybe_demonitor(ProxyMon), 3392 maybe_stop_proxy(ProxyPid), 3393 ok. 3394 3395maybe_demonitor(undefined) -> 3396 ok; 3397maybe_demonitor(Mon) -> 3398 (catch erlang:demonitor(Mon, [flush])), 3399 ok. 3400 3401maybe_stop_proxy(Pid) when is_pid(Pid) -> 3402 Pid ! {stop, self()}, 3403 ok; 3404maybe_stop_proxy(_) -> 3405 ok. 3406 3407 3408call_proxy(Parent) -> 3409 receive 3410 {go, Parent, CD, TransIds} -> 3411 call_proxy(Parent, CD, TransIds); 3412 {stop, Parent} -> 3413 exit(normal) 3414 end. 3415 3416call_proxy(Parent, CD, TransIds) -> 3417 Reply = proxy_wait_for_reply(CD, TransIds, []), 3418 Parent ! {reply, self(), Reply}, 3419 call_proxy_gc(CD, CD#conn_data.call_proxy_gc_timeout). 3420 3421call_proxy_gc(CD, Timeout) when (Timeout > 0) -> 3422 T = t(), 3423 receive 3424 {?MODULE, TransId, Version, Result} -> % Old format 3425 CD2 = CD#conn_data{protocol_version = Version}, 3426 Extra = ?default_user_callback_extra, 3427 return_unexpected_trans_reply(CD2, TransId, Result, Extra), 3428 call_proxy_gc(CD, Timeout - (t() - T)); 3429 3430 {?MODULE, TransId, Version, Result, Extra} -> 3431 CD2 = CD#conn_data{protocol_version = Version}, 3432 return_unexpected_trans_reply(CD2, TransId, Result, Extra), 3433 call_proxy_gc(CD, Timeout - (t() - T)) 3434 3435 after Timeout -> 3436 exit(normal) 3437 end; 3438call_proxy_gc(_CD, _Timeout) -> 3439 exit(normal). 3440 3441proxy_wait_for_reply(_CD, [], Replies0) -> 3442 % Make sure they come in the same order as the requests where sent 3443 Replies1 = lists:keysort(2, Replies0), 3444 %% Must all be the same version 3445 [{Version, _, _}|_] = Replies1, 3446 Replies2 = [Result || {_Version, _TransId, Result} <- Replies1], 3447 {Version, Replies2}; 3448proxy_wait_for_reply(CD, TransIds, Replies) -> 3449 receive 3450 {?MODULE, TransId, Version, Reply} -> % Old format 3451 {TransIds2, Replies2} = 3452 wfr_handle_reply(CD, 3453 TransIds, TransId, 3454 Version, Replies, Reply), 3455 proxy_wait_for_reply(CD, TransIds2, Replies2); 3456 3457 {?MODULE, TransId, Version, Reply, Extra} -> 3458 {TransIds2, Replies2} = 3459 wfr_handle_reply(CD, 3460 TransIds, TransId, 3461 Version, Replies, Reply, Extra), 3462 proxy_wait_for_reply(CD, TransIds2, Replies2) 3463 end. 3464 3465wfr_handle_reply(CD, TransIds, TransId, Version, Replies, Reply) -> 3466 Extra = ?default_user_callback_extra, 3467 wfr_handle_reply(CD, TransIds, TransId, Version, Replies, Reply, Extra). 3468 3469wfr_handle_reply(CD, TransIds, TransId, Version, Replies, Reply, Extra) -> 3470 %% Is this meant for us? 3471 case lists:member(TransId, TransIds) of 3472 true -> % Yep 3473 wfr_update(TransIds, TransId, Version, Replies, Reply, Extra); 3474 false -> % Nop 3475 CD2 = CD#conn_data{protocol_version = Version}, 3476 return_unexpected_trans_reply(CD2, TransId, Reply, Extra), 3477 {TransIds, Replies} 3478 end. 3479 3480wfr_mk_reply(Version, TransId, Result, ?default_user_callback_extra = _Extra) -> 3481 {Version, TransId, Result}; 3482wfr_mk_reply(Version, TransId, Result0, Extra) -> 3483 Result = list_to_tuple(lists:append(tuple_to_list(Result0), [Extra])), 3484 {Version, TransId, Result}. 3485 3486%% Last segment of a reply 3487%% transactionResult "=" actionReplies 3488wfr_update(TransIds, TransId, Version, Results, {ok, {SegNo, Last, ARs}}, Extra) 3489 when is_integer(SegNo) andalso (Last == true) -> 3490 TransIds2 = lists:delete(TransId, TransIds), 3491 case lists:keysearch(TransId, 2, Results) of 3492 3493 %% All segments ok (actionReplies) 3494 {value, {V, TransId, {ok, SegReps}}} -> 3495 SegReps2 = lists:keysort(1, [{SegNo, ARs}|SegReps]), 3496 Rep = wfr_mk_reply(V, TransId, {ok, SegReps2}, Extra), 3497 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3498 {TransIds2, Results2}; 3499 3500 %% Atleast one segment error (transactionError) 3501 {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} -> 3502 OkSegs2 = lists:keysort(1, [{SegNo, ARs}|OkSegs]), 3503 ErrSegs2 = lists:keysort(1, ErrSegs), 3504 Error = {error, {segment, OkSegs2, ErrSegs2}}, 3505 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3506 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3507 {TransIds2, Results2}; 3508 3509 false -> 3510 %% First and only segment 3511 Rep = wfr_mk_reply(Version, TransId, {ok, [{SegNo, ARs}]}, Extra), 3512 {TransIds2, [Rep | Results]} 3513 3514 end; 3515 3516%% Last segment of a reply 3517%% transactionResult "=" transactionError 3518wfr_update(TransIds, TransId, Version, Results, {error, {SegNo, Last, ED}}, Extra) 3519 when is_integer(SegNo) andalso (Last == true) -> 3520 TransIds2 = lists:delete(TransId, TransIds), 3521 case lists:keysearch(TransId, 2, Results) of 3522 3523 %% First segment with error (transactionError) 3524 {value, {V, TransId, {ok, SegReps}}} -> 3525 OkSegs = lists:keysort(1, [{SegNo, ED}|SegReps]), 3526 ErrSegs = [{SegNo, ED}], 3527 Error = {error, {segment, OkSegs, ErrSegs}}, 3528 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3529 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3530 {TransIds2, Results2}; 3531 3532 %% Another segment with error (transactionError) 3533 {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} -> 3534 OkSegs2 = lists:keysort(1, OkSegs), 3535 ErrSegs2 = lists:keysort(1, [{SegNo, ED}|ErrSegs]), 3536 Error = {error, {segment, OkSegs2, ErrSegs2}}, 3537 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3538 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3539 {TransIds2, Results2}; 3540 3541 false -> 3542 %% First and only segment 3543 OkSegs = [], 3544 ErrSegs = [{SegNo, ED}], 3545 Error = {error, {segment, OkSegs, ErrSegs}}, 3546 Rep = wfr_mk_reply(Version, TransId, Error, Extra), 3547 {TransIds2, [Rep]} 3548 3549 end; 3550 3551%% One segment of a reply 3552%% transactionResult "=" actionReplies 3553wfr_update(TransIds, TransId, Version, Results, {ok, {SegNo, _Last, ARs}}, Extra) 3554 when is_integer(SegNo) -> 3555 case lists:keysearch(TransId, 2, Results) of 3556 3557 %% All segments ok (actionReplies) 3558 {value, {V, TransId, {ok, SegReps}}} -> 3559 SegReps2 = [{SegNo, ARs}|SegReps], 3560 Rep = wfr_mk_reply(V, TransId, {ok, SegReps2}, Extra), 3561 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3562 {TransIds, Results2}; 3563 3564 %% Atleast one segment error (transactionError) 3565 {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} -> 3566 OkSegs2 = [{SegNo, ARs}|OkSegs], 3567 Error = {error, {segment, OkSegs2, ErrSegs}}, 3568 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3569 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3570 {TransIds, Results2}; 3571 3572 false -> 3573 %% First and only segment 3574 Rep = wfr_mk_reply(Version, TransId, {ok, [{SegNo, ARs}]}, Extra), 3575 {TransIds, [Rep | Results]} 3576 3577 end; 3578 3579%% One segment of a reply 3580%% transactionResult "=" transactionError 3581wfr_update(TransIds, TransId, Version, Results, {error, {SegNo, _Last, ED}}, Extra) 3582 when is_integer(SegNo) -> 3583 case lists:keysearch(TransId, 2, Results) of 3584 3585 %% First segment with error (transactionError) 3586 {value, {V, TransId, {ok, OkSegs}}} -> 3587 ErrSegs = [{SegNo, ED}], 3588 Error = {error, {segment, OkSegs, ErrSegs}}, 3589 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3590 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3591 {TransIds, Results2}; 3592 3593 %% Another segment with error (transactionError) 3594 {value, {V, TransId, {error, {OkSegs, ErrSegs}}}} -> 3595 ErrSegs2 = [{SegNo, ED}|ErrSegs], 3596 Error = {error, {segment, OkSegs, ErrSegs2}}, 3597 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3598 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3599 {TransIds, Results2}; 3600 3601 false -> 3602 %% First segment 3603 OkSegs = [], 3604 ErrSegs = [{SegNo, ED}], 3605 Error = {error, {segment, OkSegs, ErrSegs}}, 3606 Rep = wfr_mk_reply(Version, TransId, Error, Extra), 3607 {TransIds, [Rep]} 3608 3609 end; 3610 3611%% This means that some segments did not make it in time 3612wfr_update(TransIds, TransId, Version, Results, 3613 {error, {segment_timeout, Missing}}, Extra) -> 3614 TransIds2 = lists:delete(TransId, TransIds), 3615 case lists:keysearch(TransId, 2, Results) of 3616 3617 %% First segment with error (transactionError) 3618 {value, {V, TransId, {ok, OkSegs}}} -> 3619 Error = {error, {segment_timeout, Missing, OkSegs, []}}, 3620 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3621 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3622 {TransIds2, Results2}; 3623 3624 %% Another segment with error (transactionError) 3625 {value, {V, TransId, {error, {segment, OkSegs, ErrSegs}}}} -> 3626 Error = {error, {segment_timeout, Missing, OkSegs, ErrSegs}}, 3627 Rep = wfr_mk_reply(V, TransId, Error, Extra), 3628 Results2 = lists:keyreplace(TransId, 2, Results, Rep), 3629 {TransIds2, Results2}; 3630 3631 false -> 3632 %% First segment 3633 Error = {error, {segment_timeout, Missing, [], []}}, 3634 Rep = wfr_mk_reply(Version, TransId, Error, Extra), 3635 {TransIds2, [Rep]} 3636 3637 end; 3638 3639%% And all other results (presumably results without segments). 3640wfr_update(TransIds, TransId, Version, Results, Result, Extra) -> 3641 TransIds2 = lists:delete(TransId, TransIds), 3642 Results2 = [wfr_mk_reply(Version, TransId, Result, Extra)|Results], 3643 {TransIds2, Results2}. 3644 3645 3646%% TransInfo is either [trans_id()] or a [trans_req()] 3647 3648%% This is the normal case where we have just one 3649%% transaction to be sent (using call or cast) using 3650%% the transaction sender process. 3651send_request(#conn_data{control_pid = CP, 3652 trans_req = true, 3653 trans_sender = Pid} = CD, 3654 CH, [Serial], Action, [Bin]) 3655 when is_pid(Pid) andalso 3656 is_integer(Serial) andalso 3657 (node(CP) =:= node()) -> 3658 3659 ?report_trace(CD, 3660 "send_request - one transaction via trans-sender", 3661 [Serial]), 3662 3663 #conn_data{request_timer = InitTimer, 3664 long_request_timer = LongTimer} = CD, 3665 TransId = to_local_trans_id(CH, Serial), 3666 insert_request(CD, CH, TransId, Action, {Serial, Bin}, 3667 InitTimer, LongTimer), 3668 megaco_trans_sender:send_req(Pid, Serial, Bin); 3669 3670%% This is the general case where we have several transactions 3671%% beeing sent (using call or cast) at once using 3672%% the transaction sender process. 3673send_request(#conn_data{control_pid = CP, 3674 trans_req = true, 3675 trans_sender = Pid} = CD, 3676 CH, TransInfo, Action, Bins) 3677 when is_pid(Pid) andalso 3678 is_list(Bins) andalso 3679 (node(CP) =:= node()) -> 3680 3681 ?report_trace(CD, 3682 "send_request - multi transactions via trans_sender", 3683 [TransInfo, Pid]), 3684 3685 #conn_data{request_timer = InitTimer, 3686 long_request_timer = LongTimer} = CD, 3687 insert_requests(CD, CH, TransInfo, Action, Bins, 3688 InitTimer, LongTimer), 3689 megaco_trans_sender:send_reqs(Pid, TransInfo, Bins); 3690 3691%% This is the case when one or more transactions is 3692%% beeing sent in one message immediatelly (not using 3693%% the transaction sender process. E.g. the binary is 3694%% this encoded message. 3695send_request(#conn_data{control_pid = CP} = CD, 3696 CH, TRs, Action, Bin) 3697 when is_list(TRs) andalso 3698 is_binary(Bin) andalso 3699 (node(CP) =:= node()) -> 3700 3701 %% d("send_request -> entry with" 3702 %% "~n TRs: ~p", [TRs]), 3703 3704 ?report_trace(CD, "send_request - multi transaction", [TRs]), 3705 3706 #conn_data{request_timer = InitTimer, 3707 long_request_timer = LongTimer} = CD, 3708 insert_requests(CD, CH, TRs, Action, Bin, 3709 InitTimer, LongTimer), 3710 case megaco_messenger_misc:send_message(CD, false, Bin) of 3711 {error, Reason} -> 3712 cancel_requests(CD, TRs, Reason); 3713 {ok, _} -> 3714 ignore 3715 end; 3716 3717%% This is the case where we are not on the node where the 3718%% transport process run. 3719send_request(#conn_data{control_pid = CP} = CD, 3720 CH, TransInfo, Action, Bin) 3721 when node(CP) =/= node() -> 3722 3723 ?report_trace(CD, "send_request - remote", [TransInfo]), 3724 3725 InitTimer = infinity, 3726 LongTimer = infinity, 3727 insert_requests(CD, CH, TransInfo, Action, Bin, 3728 InitTimer, LongTimer), 3729 Node = node(CP), 3730 Args = [node(), CD, TransInfo, Bin], 3731 rpc:cast(Node, ?MODULE, send_request_remote, Args). 3732 3733 3734insert_requests(_, _, [], _, _, _, _) -> 3735 ok; 3736 3737insert_requests(ConnData, ConnHandle, [Serial|Serials], 3738 Action, [Bin|Bins], InitTimer, LongTimer) 3739 when is_integer(Serial) andalso is_binary(Bin) -> 3740 TransId = to_local_trans_id(ConnHandle, Serial), 3741 insert_request(ConnData, ConnHandle, 3742 TransId, Action, Bin, InitTimer, LongTimer), 3743 3744 insert_requests(ConnData, ConnHandle, Serials, Action, Bins, 3745 InitTimer, LongTimer); 3746 3747insert_requests(ConnData, ConnHandle, 3748 [{transactionRequest, TR}|TRs], 3749 Action, Bin, InitTimer, LongTimer) 3750 when is_record(TR, 'TransactionRequest') andalso is_binary(Bin) -> 3751 #'TransactionRequest'{transactionId = Serial} = TR, 3752 TransId = to_local_trans_id(ConnHandle, Serial), 3753 insert_request(ConnData, ConnHandle, 3754 TransId, Action, TR, InitTimer, LongTimer), 3755 3756 insert_requests(ConnData, ConnHandle, TRs, Action, Bin, 3757 InitTimer, LongTimer). 3758 3759 3760insert_request(ConnData, ConnHandle, TransId, 3761 Action, Data, InitTimer, LongTimer) -> 3762 %% We dont check the result of the lock-counter creation because 3763 %% the only way it could already exist is if the transaction-id 3764 %% range has wrapped and an old counter was not deleted. 3765 megaco_monitor:request_lockcnt_cre(TransId), 3766 3767 #megaco_conn_handle{remote_mid = RemoteMid} = ConnHandle, 3768 #conn_data{protocol_version = Version, 3769 user_mod = UserMod, 3770 user_args = UserArgs, 3771 send_handle = SendHandle, 3772 reply_data = ReplyData, 3773 segment_recv_timer = InitSegTimer, 3774 request_keep_alive_timeout = RKATimer} = ConnData, 3775 {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), 3776 M = ?MODULE, 3777 F = request_timeout, 3778 A = [ConnHandle, TransId], 3779 Ref = megaco_monitor:apply_after(M, F, A, WaitFor), 3780 Req = #request{trans_id = TransId, 3781 remote_mid = RemoteMid, 3782 timer_ref = ?SIM({short, Ref}, init_request_timer), 3783 init_timer = InitTimer, 3784 init_long_timer = LongTimer, 3785 curr_timer = CurrTimer, 3786 version = Version, 3787 bytes = {send, Data}, 3788 send_handle = SendHandle, 3789 user_mod = UserMod, 3790 user_args = UserArgs, 3791 reply_action = Action, 3792 reply_data = ReplyData, 3793 init_seg_timer = InitSegTimer, 3794 keep_alive_timer = RKATimer}, 3795 megaco_monitor:insert_request(Req). % Timing problem? 3796 3797 3798send_request_remote(ReplyNode, ConnData, TransInfo, Bin) -> 3799 Action = remote, 3800 ConnHandle = ConnData#conn_data.conn_handle, 3801 ConnData2 = ConnData#conn_data{reply_data = ReplyNode}, 3802 send_request(ConnData2, ConnHandle, TransInfo, Action, Bin). 3803 3804prepare_req_send_options(CallOrCast, ConnHandle, Options, Actions) -> 3805 %% Ensures that two processes cannot get same transaction id. 3806 %% Bad send options may cause spurious transaction id to be consumed. 3807 Incr = number_of_transactions(Actions), 3808 case megaco_config:incr_trans_id_counter(ConnHandle, Incr) of 3809 {ok, ConnData} -> 3810 override_req_send_options(CallOrCast, ConnData, Options); 3811 {error, Reason} -> 3812 {error, Reason} 3813 end. 3814 3815number_of_transactions([Action|_]) when is_tuple(Action) -> 3816 1; 3817number_of_transactions(ActionsList) -> 3818 length(ActionsList). 3819 3820override_req_send_options(ReplyAction, ConnData, [{Key, Val} | Tail]) -> 3821 case Key of 3822 protocol_version -> 3823 ConnData2 = ConnData#conn_data{protocol_version = Val}, 3824 override_req_send_options(ReplyAction, ConnData2, Tail); 3825 send_handle -> 3826 ConnData2 = ConnData#conn_data{send_handle = Val}, 3827 override_req_send_options(ReplyAction, ConnData2, Tail); 3828 request_timer -> 3829 case megaco_config:verify_val(Key, Val) of 3830 true -> 3831 ConnData2 = ConnData#conn_data{request_timer = Val}, 3832 override_req_send_options(ReplyAction, ConnData2, Tail); 3833 false -> 3834 {error, {bad_send_option, {Key, Val}}} 3835 end; 3836 long_request_timer -> 3837 case megaco_config:verify_val(Key, Val) of 3838 true -> 3839 ConnData2 = ConnData#conn_data{long_request_timer = Val}, 3840 override_req_send_options(ReplyAction, ConnData2, Tail); 3841 false -> 3842 {error, {bad_send_option, {Key, Val}}} 3843 end; 3844 call_proxy_gc_timeout when (ReplyAction =:= call) orelse 3845 (ReplyAction =:= any) -> 3846 case megaco_config:verify_val(Key, Val) of 3847 true -> 3848 ConnData2 = 3849 ConnData#conn_data{call_proxy_gc_timeout = Val}, 3850 override_req_send_options(ReplyAction, ConnData2, Tail); 3851 false -> 3852 {error, {bad_send_option, {Key, Val}}} 3853 end; 3854 request_keep_alive_timeout when (ReplyAction =:= cast) orelse 3855 (ReplyAction =:= any) -> 3856 case megaco_config:verify_val(Key, Val) of 3857 true -> 3858 ConnData2 = 3859 ConnData#conn_data{request_keep_alive_timeout = Val}, 3860 override_req_send_options(ReplyAction, ConnData2, Tail); 3861 false -> 3862 {error, {bad_send_option, {Key, Val}}} 3863 end; 3864 reply_data -> 3865 ConnData2 = ConnData#conn_data{reply_data = Val}, 3866 override_req_send_options(ReplyAction, ConnData2, Tail); 3867 user_mod when is_atom(Val) -> 3868 ConnData2 = ConnData#conn_data{user_mod = Val}, 3869 override_req_send_options(ReplyAction, ConnData2, Tail); 3870 user_args when is_list(Val) -> 3871 ConnData2 = ConnData#conn_data{user_args = Val}, 3872 override_req_send_options(ReplyAction, ConnData2, Tail); 3873 trans_req when Val =:= false -> 3874 %% We only allow turning the transaction-sender off, since 3875 %% the opposite (turning it on) would causing to much headake... 3876 %% This will allow not using the transaction sender for 3877 %% occasional messages 3878 ConnData2 = ConnData#conn_data{trans_req = Val, 3879 trans_sender = undefined}, 3880 override_req_send_options(ReplyAction, ConnData2, Tail); 3881 _Bad -> 3882 {error, {bad_send_option, {Key, Val}}} 3883 end; 3884override_req_send_options(_ReplyAction, ConnData, []) -> 3885 {ok, ConnData}. 3886 3887override_rep_send_options(ConnData, [{Key, Val} | Tail]) -> 3888 case Key of 3889 protocol_version -> 3890 ConnData2 = ConnData#conn_data{protocol_version = Val}, 3891 override_rep_send_options(ConnData2, Tail); 3892 send_handle -> 3893 ConnData2 = ConnData#conn_data{send_handle = Val}, 3894 override_rep_send_options(ConnData2, Tail); 3895 reply_timer -> 3896 case megaco_config:verify_val(Key, Val) of 3897 true -> 3898 ConnData2 = ConnData#conn_data{reply_timer = Val}, 3899 override_rep_send_options(ConnData2, Tail); 3900 false -> 3901 {error, {bad_send_option, {Key, Val}}} 3902 end; 3903 trans_req when Val =:= false -> 3904 %% We only allow turning the transaction-sender off, since 3905 %% the opposite (turning it on) would causing to much headake... 3906 %% This will allow not using the transaction sender for 3907 %% occasional messages 3908 ConnData2 = ConnData#conn_data{trans_req = Val, 3909 trans_sender = undefined}, 3910 override_rep_send_options(ConnData2, Tail); 3911 _Bad -> 3912 {error, {bad_send_option, {Key, Val}}} 3913 end; 3914override_rep_send_options(ConnData, []) -> 3915 {ok, ConnData}. 3916 3917 3918%% ---- 3919%% This list is allways atleast one (list of actions) long. 3920%% ---- 3921%% The proper number of transaction id numbers has already 3922%% been "allocated", and the connection data record is 3923%% updated accordingly. 3924encode_requests(#conn_data{trans_req = true, 3925 trans_sender = Pid, 3926 serial = LastSerial} = CD, ActionsList) 3927 when is_pid(Pid) -> 3928 (catch encode_requests(CD, LastSerial, 3929 lists:reverse(ActionsList), [], [])); 3930encode_requests(#conn_data{serial = LastSerial} = CD, ActionsList) -> 3931 %% We shall not accumulate transactions. 3932 %% This means that we shall not encode 3933 %% the transactions individually (and send 3934 %% them to the sender process, which 3935 %% accumulate transactions for later sending), 3936 %% Instead we encode the entire message directly. 3937 %% => We shall return one binary, containing, 3938 %% possibly, many transactions 3939 encode_requests_in_msg(CD, LastSerial, lists:reverse(ActionsList)). 3940 3941 3942%% This means that we shall compose and encode one complete 3943%% megaco message, containing one or more transactions. 3944encode_requests_in_msg(CD, LastSerial, ActionsList) -> 3945 TRs = compose_requests_in_msg(LastSerial, ActionsList, []), 3946 Body = {transactions, TRs}, 3947 Res = megaco_messenger_misc:encode_body(CD, 3948 "encode trans request(s) msg", 3949 Body), 3950 case Res of 3951 {ok, Bin} -> 3952 {ok, TRs, Bin}; 3953 Error -> 3954 Error 3955 end. 3956 3957compose_requests_in_msg(_S, [], TRs) -> 3958 TRs; 3959compose_requests_in_msg(Serial, [A|As], Acc) -> 3960 TR = #'TransactionRequest'{transactionId = Serial, 3961 actions = A}, 3962 compose_requests_in_msg(Serial - 1, As, [{transactionRequest, TR}|Acc]). 3963 3964 3965%% We have done the encoding in reverse order, so there 3966%% is no need to reverse now. 3967encode_requests(_, _, [], Serials, EncodedTRs) -> 3968 {ok, Serials, EncodedTRs}; 3969encode_requests(CD, Serial, [Actions|ActionsList], Serials, EncodedTRs) -> 3970 case do_encode_request(CD, Serial, Actions) of 3971 {ok, Bin} -> 3972 encode_requests(CD, Serial - 1, ActionsList, 3973 [Serial|Serials], [Bin|EncodedTRs]); 3974 Error -> 3975 throw(Error) 3976 end. 3977 3978 3979do_encode_request(CD, Serial, Actions) -> 3980 TR = #'TransactionRequest'{transactionId = Serial, 3981 actions = Actions}, 3982 megaco_messenger_misc:encode_trans_request(CD, TR). 3983 3984 3985imm_ack_req(Counter, when_pending_sent) when (Counter > 0) -> 'NULL'; 3986imm_ack_req(_Counter, when_pending_sent) -> asn1_NOVALUE; 3987imm_ack_req(_Counter, ImmAck) -> ImmAck. 3988 3989maybe_send_reply(#conn_data{sent_pending_limit = Limit} = ConnData, 3990 TransId, Result, SendOpts, ImmAck) -> 3991 3992 %% d("maybe_send_reply -> entry with" 3993 %% "~n Limit: ~p" 3994 %% "~n TransId: ~p" 3995 %% "~n Result: ~p" 3996 %% "~n SendOpts: ~p" 3997 %% "~n ImmAck: ~p", [Limit, TransId, Result, SendOpts, ImmAck]), 3998 3999 %% Pending limit 4000 %% Before we can send the reply we must check that we have 4001 %% not passed the pending limit (and sent an error message). 4002 case check_pending_limit(Limit, sent, TransId) of 4003 {ok, Counter} -> 4004 case override_rep_send_options(ConnData, SendOpts) of 4005 {ok, ConnData2} -> 4006 send_reply(ConnData2, Result, 4007 imm_ack_req(Counter, ImmAck)); 4008 Error -> 4009 Error 4010 end; 4011 aborted -> 4012 {error, aborted} 4013 end. 4014 4015encode_reply(CD, TR) -> 4016 megaco_messenger_misc:encode_trans_reply(CD, TR). 4017 4018send_reply(#conn_data{serial = Serial, 4019 trans_req = TransReq, 4020 trans_sender = TransSnd} = CD, TransRes, ImmAck) -> 4021 4022 %% Encapsule the transaction result into a reply message 4023 4024 %% d("send_reply -> entry with" 4025 %% "~n Serial: ~p" 4026 %% "~n TransRes: ~p" 4027 %% "~n ImmAck: ~p", [Serial, TransRes, ImmAck]), 4028 4029 TR = #megaco_transaction_reply{transactionId = Serial, 4030 immAckRequired = ImmAck, 4031 transactionResult = TransRes}, 4032 case encode_reply(CD, TR) of 4033 {ok, Bin} when is_binary(Bin) andalso (TransReq =:= true) -> 4034 ?rt2("send_reply - pass it on to the transaction sender", 4035 [size(Bin)]), 4036 megaco_trans_sender:send_reply(TransSnd, Bin), 4037 {ok, Bin}; 4038 4039 {ok, Bin} when is_binary(Bin) -> 4040 ?rt2("send_reply - encoded", [size(Bin)]), 4041 TraceLabel = "send trans reply", 4042 Body = {transactions, [Bin]}, 4043 megaco_messenger_misc:send_body(CD, TraceLabel, Body); 4044 4045 {ok, Bins} when is_list(Bins) -> 4046 ?rt2("send_reply - encoded (segmented)", [length(Bins)]), 4047 Res = send_reply_segments(CD, Bins), 4048 {ok, Res}; 4049 4050 {error, not_implemented} -> 4051 %% Oups, we cannot segment regardless the config, 4052 %% so pack it all into one message and hope for 4053 %% the best... 4054 ?rt2("send_reply - cannot encode separate transactions", []), 4055 TR2 = megaco_messenger_misc:transform_transaction_reply(CD, TR), 4056 Body = {transactions, [{transactionReply, TR2}]}, 4057 megaco_messenger_misc:send_body(CD, "encode trans reply", Body); 4058 4059 {error, Reason} = Error -> 4060 Code = ?megaco_internal_gateway_error, 4061 Text = "encode transaction reply", 4062 ED = #'ErrorDescriptor'{errorCode = Code, 4063 errorText = Text}, 4064 Res = {transactionError, ED}, 4065 TR2 = #megaco_transaction_reply{transactionId = Serial, 4066 transactionResult = Res}, 4067 TR3 = megaco_messenger_misc:transform_transaction_reply(CD, TR2), 4068 TraceLabel = "<ERROR> encode trans reply body failed", 4069 ?report_important(CD, TraceLabel, [TR, TR3, ED, Error]), 4070 error_msg("failed encoding transaction reply body: ~s", 4071 [format_encode_error_reason(Reason)]), 4072 Body = {transactions, [{transactionReply, TR3}]}, 4073 megaco_messenger_misc:send_body(CD, TraceLabel, Body), 4074 Error 4075 end. 4076 4077send_reply_segments(CD, Bins) -> 4078 TraceLabelPre = "send segmented trans reply", 4079 (catch send_reply_segments(CD, TraceLabelPre, Bins)). 4080 4081send_reply_segments(#conn_data{segment_send = infinity} = CD, Label, Bins) -> 4082 send_reply_segments(CD, Label, length(Bins), Bins); 4083 4084send_reply_segments(#conn_data{segment_send = K} = CD, Label, Bins) 4085 when is_integer(K) andalso (K =< length(Bins)) -> 4086 send_reply_segments(CD, Label, K, Bins); 4087 4088send_reply_segments(#conn_data{segment_send = K} = CD, Label, Bins) 4089 when is_integer(K) -> 4090 send_reply_segments(CD, Label, length(Bins), Bins). 4091 4092send_reply_segments(CD, Label, K, Bins) -> 4093 send_reply_segments(CD, Label, K, Bins, []). 4094 4095send_reply_segments(_CD, _Label, 0, Bins, Sent) -> 4096 ?rt2("send_reply_segments - done", [Sent, Bins]), 4097 {Sent, Bins}; 4098send_reply_segments(CD, TraceLabelPre, K, [{SN, Bin}|Bins], Sent) -> 4099 case send_reply_segment(CD, TraceLabelPre, SN, Bin) of 4100 {ok, Bin2} -> 4101 ?rt2("send_reply_segments - send", [K, SN]), 4102 send_reply_segments(CD, TraceLabelPre, K-1, 4103 Bins, [{SN, Bin2}|Sent]); 4104 Error -> 4105 throw(Error) 4106 end. 4107 4108send_reply_segment(CD, TraceLabelPre, SN, Bin) -> 4109 Label = lists:flatten(io_lib:format("~s[~w]", [TraceLabelPre, SN])), 4110 Body = {transactions, [Bin]}, 4111 megaco_messenger_misc:send_body(CD, Label, Body). 4112 4113 4114format_encode_error_reason(Reason) -> 4115 FS = 4116 case Reason of 4117 {Mod, Func, [EC, Msg], {AE, CS}} when is_atom(Mod) andalso 4118 is_atom(Func) andalso 4119 is_list(EC) and 4120 is_tuple(Msg) and 4121 is_list(CS) -> 4122 io_lib:format("~n Encode module: ~w" 4123 "~n Func: ~w" 4124 "~n Encode config: ~w" 4125 "~n Message part: ~p" 4126 "~n Actual error: ~p" 4127 "~n Call stack: ~w", 4128 [Mod, Func, EC, Msg, AE, CS]); 4129 4130 {Mod, Func, [EC, Msg], AE} when is_atom(Mod) andalso 4131 is_atom(Func) andalso 4132 is_list(EC) andalso 4133 is_tuple(Msg) -> 4134 io_lib:format("~n Encode module: ~w" 4135 "~n Func: ~w" 4136 "~n Encode config: ~w" 4137 "~n Message part: ~p" 4138 "~n Actual error: ~p", 4139 [Mod, Func, EC, Msg, AE]); 4140 4141 {Mod, [EC, Msg], {AE, CS}} when is_atom(Mod) andalso 4142 is_list(EC) andalso 4143 is_tuple(Msg) andalso 4144 is_list(CS) -> 4145 io_lib:format("~n Encode module: ~w" 4146 "~n Encode config: ~w" 4147 "~n Message part: ~p" 4148 "~n Actual error: ~p" 4149 "~n Call stack: ~w", 4150 [Mod, EC, Msg, AE, CS]); 4151 4152 {Mod, [EC, Msg], AE} when is_atom(Mod) andalso 4153 is_list(EC) andalso 4154 is_tuple(Msg) -> 4155 io_lib:format("~n Encode module: ~w" 4156 "~n Encode config: ~w" 4157 "~n Message part: ~p" 4158 "~n Actual error: ~p", 4159 [Mod, EC, Msg, AE]); 4160 4161 Error -> 4162 io_lib:format("~n ~w", [Error]) 4163 end, 4164 lists:flatten(FS). 4165 4166 4167%% Presumably the user would return immediately (with {pending, Data}) if it 4168%% knows or suspects a request to take a long time to process. 4169%% For this reason we assume that handling a resent request 4170%% could not have caused an update of the pending limit counter. 4171maybe_send_pending(#conn_data{sent_pending_limit = Limit} = ConnData, 4172 TransId) -> 4173 case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of 4174 ok -> 4175 send_pending(ConnData); 4176 error -> 4177 SendReply = send_pending_limit_error(ConnData), 4178 {aborted, SendReply}; 4179 aborted -> 4180 {aborted, ignore} 4181 end. 4182 4183 4184send_pending(#conn_data{serial = Serial, 4185 trans_req = true, 4186 trans_sender = Pid}) -> 4187 megaco_trans_sender:send_pending(Pid, Serial); 4188send_pending(#conn_data{serial = Serial} = CD) -> 4189 %% Encapsule the transaction result into a pending message 4190 TP = #'TransactionPending'{transactionId = Serial}, 4191 Body = {transactions, [{transactionPending, TP}]}, 4192 megaco_messenger_misc:send_body(CD, "send trans pending", Body). 4193 4194 4195maybe_send_ack('NULL', #conn_data{serial = Serial, 4196 trans_ack = true, 4197 trans_sender = Pid}) -> 4198 megaco_trans_sender:send_ack_now(Pid, Serial); 4199maybe_send_ack('NULL', CD) -> 4200 send_ack(CD); 4201maybe_send_ack(_, #conn_data{auto_ack = false}) -> 4202 ignore; 4203maybe_send_ack(_, #conn_data{serial = Serial, 4204 trans_ack = true, 4205 trans_sender = Pid}) 4206 when is_pid(Pid) -> 4207 %% Send (later) via the transaction sender 4208 megaco_trans_sender:send_ack(Pid, Serial), 4209 ok; 4210maybe_send_ack(_, CD) -> 4211 %% Send now 4212 send_ack(CD). 4213 4214 4215send_ack(#conn_data{serial = Serial} = CD) -> 4216 %% Encapsule the transaction result into a ack message 4217 TRA = #'TransactionAck'{firstAck = Serial}, 4218 Body = {transactions, [{transactionResponseAck, [TRA]}]}, 4219 megaco_messenger_misc:send_body(CD, "send trans ack", Body). 4220 4221 4222send_segment_reply(#conn_data{serial = Serial} = CD, SegNo) -> 4223 SR = #'SegmentReply'{transactionId = Serial, 4224 segmentNumber = SegNo}, 4225 Body = {transactions, [{segmentReply, SR}]}, 4226 megaco_messenger_misc:send_body(CD, "send segment reply", Body). 4227 4228send_segment_reply(#conn_data{serial = Serial} = CD, SegNo, Complete) -> 4229 SR = #'SegmentReply'{transactionId = Serial, 4230 segmentNumber = SegNo, 4231 segmentationComplete = Complete}, 4232 Body = {transactions, [{segmentReply, SR}]}, 4233 megaco_messenger_misc:send_body(CD, "send segment reply", Body). 4234 4235send_segment_reply_complete(CD, SegNo) -> 4236 send_segment_reply(CD, SegNo, 'NULL'). 4237 4238 4239send_pending_limit_error(ConnData) -> 4240 ?report_pending_limit_exceeded(ConnData), 4241 Code = ?megaco_number_of_transactionpending_exceeded, 4242 Reason = "Pending limit exceeded", 4243 send_trans_error(ConnData, Code, Reason). 4244 4245send_trans_error(ConnData, Code, Reason) -> 4246 %% Encapsulate the transaction error into a reply message 4247 ED = #'ErrorDescriptor'{errorCode = Code, errorText = Reason}, 4248 Serial = ConnData#conn_data.serial, 4249 %% Version = ConnData#conn_data.protocol_version, 4250 TransRes = {transactionError, ED}, 4251 TR = #megaco_transaction_reply{transactionId = Serial, 4252 transactionResult = TransRes}, 4253 TR2 = megaco_messenger_misc:transform_transaction_reply(ConnData, TR), 4254 Body = {transactions, [{transactionReply, TR2}]}, 4255 case megaco_messenger_misc:send_body(ConnData, "send trans error", Body) of 4256 {error, Reason2} -> 4257 ?report_important(ConnData, 4258 "<ERROR> failed sending transaction error", 4259 [Body, {error, Reason2}]), 4260 error; 4261 _ -> 4262 ok 4263 end. 4264 4265 4266send_message_error(ConnData, Code, Reason) -> 4267 ED = #'ErrorDescriptor'{errorCode = Code, errorText = Reason}, 4268 Body = {messageError, ED}, 4269 case megaco_messenger_misc:send_body(ConnData, "send trans error", Body) of 4270 {error, Reason2} -> 4271 ?report_important(ConnData, 4272 "<ERROR> failed sending message error", 4273 [Body, {error, Reason2}]), 4274 error; 4275 _ -> 4276 ok 4277 end. 4278 4279 4280cancel(ConnHandle, Reason) when is_record(ConnHandle, megaco_conn_handle) -> 4281 case megaco_config:lookup_local_conn(ConnHandle) of 4282 [CD] -> 4283 megaco_config:update_conn_info(CD, cancel, true), 4284 do_cancel(ConnHandle, Reason, CD#conn_data{cancel = true}), 4285 megaco_config:update_conn_info(CD, cancel, false), 4286 ok; 4287 [] -> 4288 ConnData = fake_conn_data(ConnHandle), 4289 do_cancel(ConnHandle, Reason, ConnData) 4290 end. 4291 4292do_cancel(ConnHandle, Reason, ConnData) -> 4293 ?report_trace(ConnData, "cancel", [ConnHandle, Reason]), 4294 LocalMid = ConnHandle#megaco_conn_handle.local_mid, 4295 RemoteMid = ConnHandle#megaco_conn_handle.remote_mid, 4296 ReqTransIdPat = #trans_id{mid = LocalMid, _ = '_'}, 4297 ReqPat = #request{trans_id = ReqTransIdPat, 4298 remote_mid = RemoteMid, 4299 _ = '_'}, 4300 CancelReq = fun(Req) -> 4301 cancel_request(ConnData, Req, Reason), 4302 {_Type, Ref} = Req#request.timer_ref, %% OTP-4843 4303 megaco_monitor:cancel_apply_after(Ref) 4304 end, 4305 Requests = megaco_monitor:match_requests(ReqPat), 4306 lists:foreach(CancelReq, Requests), 4307 RemoteMid = ConnHandle#megaco_conn_handle.remote_mid, 4308 RepTransIdPat = #trans_id{mid = RemoteMid, _ = '_'}, % BUGBUG List here? 4309 RepPat = #reply{trans_id = RepTransIdPat, 4310 local_mid = LocalMid, 4311 _ = '_'}, 4312 CancelRep = fun(Rep) -> 4313 cancel_reply(ConnData, Rep, Reason) 4314 end, 4315 Replies = megaco_monitor:match_replies(RepPat), 4316 lists:foreach(CancelRep, Replies), 4317 ok. 4318 4319cancel_requests(_ConnData, [], _Reason) -> 4320 ok; 4321cancel_requests(ConnData, [{transactionRequest,TR}|TRs], Reason) -> 4322 #'TransactionRequest'{transactionId = TransId0} = TR, 4323 TransId = to_local_trans_id(ConnData#conn_data.conn_handle, TransId0), 4324 case megaco_monitor:lookup_request(TransId) of 4325 [] -> 4326 ignore; 4327 [Req] when is_record(Req, request) -> 4328 cancel_request(ConnData, Req, Reason) 4329 end, 4330 cancel_requests(ConnData, TRs, Reason). 4331 4332cancel_request(ConnData, Req, Reason) -> 4333 ?report_trace(ignore, "cancel request", [Req]), 4334 ?TC_AWAIT_CANCEL_EVENT(), 4335 TransId = Req#request.trans_id, 4336 Version = Req#request.version, 4337 UserMod = Req#request.user_mod, 4338 UserArgs = Req#request.user_args, 4339 Action = Req#request.reply_action, 4340 UserData = Req#request.reply_data, 4341 UserReply = {error, Reason}, 4342 ConnData2 = ConnData#conn_data{protocol_version = Version, 4343 user_mod = UserMod, 4344 user_args = UserArgs, 4345 reply_action = Action, 4346 reply_data = UserData}, 4347 cancel_request2(ConnData2, TransId, UserReply). 4348 4349cancel_request2(ConnData, TransId, UserReply) -> 4350 megaco_monitor:delete_request(TransId), 4351 megaco_monitor:request_lockcnt_del(TransId), 4352 megaco_config:del_pending_counter(recv, TransId), % OTP-7189 4353 Serial = TransId#trans_id.serial, 4354 ConnData2 = ConnData#conn_data{serial = Serial}, 4355 return_reply(ConnData2, TransId, UserReply). 4356 4357 4358return_reply(ConnData, TransId, UserReply) -> 4359 Extra = ?default_user_callback_extra, 4360 return_reply(ConnData, TransId, UserReply, Extra). 4361 4362return_reply(ConnData, TransId, UserReply, Extra) -> 4363 ?report_trace(ConnData, "callback: trans reply", [UserReply]), 4364 Version = ConnData#conn_data.protocol_version, 4365 UserData = ConnData#conn_data.reply_data, 4366 case ConnData#conn_data.reply_action of 4367 call when is_pid(UserData) -> 4368 ?report_trace(ConnData, "callback: (call) trans reply", 4369 [UserReply]), 4370 Pid = UserData, 4371 Pid ! {?MODULE, TransId, Version, UserReply, Extra}; 4372 cast -> 4373 ?report_trace(ConnData, "callback: (cast) trans reply", [UserReply]), 4374 UserMod = ConnData#conn_data.user_mod, 4375 UserArgs = ConnData#conn_data.user_args, 4376 ConnHandle = ConnData#conn_data.conn_handle, 4377 Args = 4378 case Extra of 4379 ?default_user_callback_extra -> 4380 [ConnHandle, Version, UserReply, UserData | UserArgs]; 4381 _ -> 4382 [ConnHandle, Version, UserReply, UserData, Extra | UserArgs] 4383 end, 4384 Res = (catch apply(UserMod, handle_trans_reply, Args)), 4385 ?report_debug(ConnData, "return: (cast) trans reply", 4386 [UserReply, {return, Res}]), 4387 case Res of 4388 ok -> 4389 ok; 4390 _ -> 4391 warning_msg("transaction reply callback failed: ~w", 4392 [Res]), 4393 ok 4394 end, 4395 Res; 4396 remote -> 4397 ?report_trace(ConnData, "callback: (remote) trans reply", [UserReply]), 4398 Node = UserData, 4399 Args = [ConnData, UserReply, Extra], 4400 rpc:cast(Node, ?MODULE, receive_reply_remote, Args) 4401 end. 4402 4403receive_reply_remote(ConnData, UserReply) -> 4404 Extra = ?default_user_callback_extra, 4405 receive_reply_remote(ConnData, UserReply, Extra). 4406 4407receive_reply_remote(ConnData, UserReply, Extra) -> 4408 TransId = to_local_trans_id(ConnData), 4409 case {megaco_monitor:request_lockcnt_inc(TransId), 4410 (catch megaco_monitor:lookup_request(TransId))} of 4411 {Cnt, [Req]} when (Cnt =:= 1) andalso is_record(Req, request) -> 4412 %% Don't care about Req and Rep version diff 4413 do_receive_reply_remote(ConnData, TransId, Req, UserReply, Extra); 4414 4415 {Cnt, [Req]} when is_integer(Cnt) andalso is_record(Req, request) -> 4416 %% Another process is accessing, handle as unexpected 4417 %% (so it has a possibillity to get logged). 4418 ?report_important(ConnData, "trans reply (no receiver)", 4419 [{user_reply, UserReply}, 4420 {request_lockcnt, Cnt}]), 4421 megaco_monitor:request_lockcnt_dec(TransId), 4422 return_unexpected_trans_reply(ConnData, TransId, UserReply, Extra); 4423 4424 %% no counter 4425 {_Cnt, [Req]} when is_record(Req, request) -> 4426 %% The counter does not exist. 4427 %% This can only mean a code upgrade raise condition. 4428 %% That is, this request record was created before 4429 %% this feature (the counters) was instroduced. 4430 %% The simples solution to this is to behave exactly as 4431 %% before, that is, pass it along, and leave it to the 4432 %% user to figure out. 4433 ?report_trace(ConnData, 4434 "remote reply - " 4435 "code upgrade raise condition", 4436 [{user_reply, UserReply}]), 4437 do_receive_reply_remote(ConnData, TransId, Req, UserReply, Extra); 4438 4439 {Cnt, _} when is_integer(Cnt) -> 4440 ?report_trace(ConnData, "trans reply (no receiver)", 4441 [{user_reply, UserReply}, {request_lockcnt, Cnt}]), 4442 megaco_monitor:request_lockcnt_dec(TransId), 4443 return_unexpected_trans_reply(ConnData, TransId, UserReply, Extra); 4444 4445 _ -> 4446 ?report_trace(ConnData, "remote reply (no receiver)", 4447 [{user_reply, UserReply}]), 4448 return_unexpected_trans_reply(ConnData, TransId, UserReply, Extra) 4449 end. 4450 4451do_receive_reply_remote(ConnData, TransId, 4452 #request{timer_ref = {_Type, Ref}, 4453 user_mod = UserMod, 4454 user_args = UserArgs, 4455 reply_action = Action, 4456 reply_data = UserData} = _Req, 4457 UserReply, Extra) -> 4458 megaco_monitor:delete_request(TransId), 4459 megaco_monitor:request_lockcnt_del(TransId), 4460 megaco_monitor:cancel_apply_after(Ref), % OTP-4843 4461 megaco_config:del_pending_counter(recv, TransId), % OTP-7189 4462 4463 ConnData2 = ConnData#conn_data{user_mod = UserMod, 4464 user_args = UserArgs, 4465 reply_action = Action, 4466 reply_data = UserData}, 4467 return_reply(ConnData2, TransId, UserReply, Extra). 4468 4469 4470cancel_reply(ConnData, #reply{state = waiting_for_ack, 4471 user_mod = UserMod, 4472 user_args = UserArgs} = Rep, Reason) -> 4473 ?report_trace(ignore, "cancel reply [waiting_for_ack]", [Rep]), 4474 megaco_monitor:cancel_apply_after(Rep#reply.pending_timer_ref), 4475 Serial = (Rep#reply.trans_id)#trans_id.serial, 4476 ConnData2 = ConnData#conn_data{serial = Serial, 4477 user_mod = UserMod, 4478 user_args = UserArgs}, 4479 T = #'TransactionAck'{firstAck = Serial}, 4480 Extra = ?default_user_callback_extra, 4481 handle_ack(ConnData2, {error, Reason}, Rep, T, Extra); 4482 4483cancel_reply(_ConnData, #reply{state = aborted} = Rep, _Reason) -> 4484 ?report_trace(ignore, "cancel reply [aborted]", [Rep]), 4485 #reply{trans_id = TransId, 4486 timer_ref = ReplyRef, 4487 pending_timer_ref = PendingRef} = Rep, 4488 megaco_monitor:delete_reply(TransId), 4489 megaco_monitor:cancel_apply_after(ReplyRef), 4490 megaco_monitor:cancel_apply_after(PendingRef), % Still running? 4491 megaco_config:del_pending_counter(sent, TransId), % Still existing? 4492 ok; 4493 4494cancel_reply(_ConnData, Rep, ignore) -> 4495 ?report_trace(ignore, "cancel reply [ignore]", [Rep]), 4496 #reply{trans_id = TransId, 4497 timer_ref = ReplyRef, 4498 pending_timer_ref = PendingRef} = Rep, 4499 megaco_monitor:delete_reply(TransId), 4500 megaco_monitor:cancel_apply_after(ReplyRef), 4501 megaco_monitor:cancel_apply_after(PendingRef), % Still running? 4502 megaco_config:del_pending_counter(sent, TransId), % Still existing? 4503 ok; 4504 4505cancel_reply(_CD, _Rep, _Reason) -> 4506 ok. 4507 4508 4509request_keep_alive_timeout(ConnHandle, TransId) -> 4510 megaco_config:del_pending_counter(ConnHandle, TransId), 4511 megaco_monitor:lookup_request(TransId), 4512 ok. 4513 4514 4515request_timeout(ConnHandle, TransId) -> 4516 ?rt1(ConnHandle, "request timeout", [TransId]), 4517 case megaco_monitor:lookup_request(TransId) of 4518 [] -> 4519 request_not_found_ignore; 4520 [Req] when is_record(Req, request) -> 4521 case megaco_config:lookup_local_conn(ConnHandle) of 4522 [CD] when (CD#conn_data.cancel =:= true) -> 4523 cancel_in_progress_ignore; 4524 [CD] -> 4525 incNumTimerRecovery(ConnHandle), 4526 do_request_timeout(ConnHandle, TransId, CD, Req); 4527 [] when ConnHandle#megaco_conn_handle.remote_mid =:= preliminary_mid -> 4528 %% There are two possibillities: 4529 %% 1) The connection has just been upgraded from a 4530 %% preliminary to a real connection. So this timeout 4531 %% is just a glitch. E.g. between the removel of this 4532 %% ConnHandle and the timer. 4533 %% 2) The first message sent, the service-change, got no 4534 %% reply (UDP without three-way-handshake). 4535 %% And then the other side (MGC) sends a request, 4536 %% which causes an auto-upgrade 4537 request_timeout_upgraded(ConnHandle, Req); 4538 [] -> 4539 incNumTimerRecovery(ConnHandle), 4540 ConnData = fake_conn_data(ConnHandle), 4541 do_request_timeout(ConnHandle, TransId, ConnData, Req) 4542 end 4543 end. 4544 4545request_timeout_upgraded(ConnHandle, Req) -> 4546 CD = fake_conn_data(ConnHandle), 4547 cancel_request(CD, Req, timeout). 4548 4549do_request_timeout(ConnHandle, TransId, ConnData, 4550 #request{curr_timer = CurrTimer} = Req) -> 4551 4552 ?rt1(ConnHandle, "process request timeout", [TransId, CurrTimer]), 4553 4554 SendHandle = Req#request.send_handle, 4555 Version = Req#request.version, 4556 ConnData2 = ConnData#conn_data{send_handle = SendHandle, 4557 protocol_version = Version}, 4558 case CurrTimer of 4559 timeout -> %%%%%%% 4560 cancel_request(ConnData2, Req, timeout), 4561 timeout1; 4562 4563 %% Restartable timer 4564 %% (max_retries = infinity_restartable) 4565 {_, timeout} -> 4566 cancel_request(ConnData2, Req, timeout), 4567 timeout2; 4568 4569 Timer -> 4570 {SendOrNoSend, Data} = Req#request.bytes, 4571 case SendOrNoSend of 4572 send -> 4573 case maybe_encode(ConnData2, Data) of 4574 {ok, Bin} -> 4575 ?report_trace(ConnData2, "re-send trans request", 4576 [{bytes, Bin}]), 4577 case maybe_send_message(ConnData2, true, Bin) of 4578 ok -> 4579 sent1_ignore; 4580 {ok, _} -> 4581 sent2_ignore; 4582 {error, Reason} -> 4583 ?report_important(ConnData2, 4584 "<ERROR> " 4585 "re-send trans " 4586 "request failed", 4587 [{bytes, Bin}, 4588 {error, Reason}]) 4589 end; 4590 4591 {error, Reason} -> 4592 %% Since it was possible to encode the original 4593 %% message this should really never happen... 4594 ?report_important(ConnData2, 4595 "<ERROR> " 4596 "re-send trans request failed", 4597 [{transaction, 4598 Req#request.bytes}, 4599 {error, Reason}]) 4600 end; 4601 no_send -> 4602 not_sent_ok 4603 end, 4604 {WaitFor, Timer2} = megaco_timer:restart(Timer), 4605 OptBin = opt_garb_binary(Timer2, Data), 4606 {Type, _} = Req#request.timer_ref, 4607 M = ?MODULE, 4608 F = request_timeout, 4609 A = [ConnHandle, TransId], 4610 Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor), 4611 NewFields = 4612 [{#request.bytes, {SendOrNoSend, OptBin}}, 4613 {#request.timer_ref, {Type, Ref2}}, 4614 {#request.curr_timer, Timer2}], 4615 megaco_monitor:update_request_fields(TransId, NewFields), % Timing problem 4616 {restarted, WaitFor, Timer2} 4617 4618 end. 4619 4620maybe_encode(#conn_data{trans_req = false} = CD, {_Serial, Bin}) 4621 when is_binary(Bin) -> 4622 Body = {transactions, [{transactionRequest, Bin}]}, 4623 megaco_messenger_misc:encode_body(CD, "encode trans request msg", Body); 4624maybe_encode(_CD, {_Serial, Bin} = D) when is_binary(Bin) -> 4625 {ok, D}; 4626maybe_encode(#conn_data{trans_req = true, 4627 trans_sender = Pid} = CD, 4628 #'TransactionRequest'{transactionId = Serial} = TR) 4629 when is_pid(Pid) -> 4630 case megaco_messenger_misc:encode_trans_request(CD, TR) of 4631 {ok, Bin} -> 4632 {ok, {Serial, Bin}}; 4633 Error -> 4634 Error 4635 end; 4636maybe_encode(CD, TR) 4637 when is_record(TR, 'TransactionRequest') -> 4638 Body = {transactions, [{transactionRequest, TR}]}, 4639 megaco_messenger_misc:encode_body(CD, "encode trans request msg", Body); 4640maybe_encode(_CD, Trash) -> 4641 {error, {invalid_bin, Trash}}. 4642 4643maybe_send_message(CD, Resend, Bin) when is_binary(Bin) -> 4644 megaco_messenger_misc:send_message(CD, Resend, Bin); 4645maybe_send_message(#conn_data{trans_sender = Pid}, _Resend, {Serial, Bin}) 4646 when is_pid(Pid) andalso is_integer(Serial) andalso is_binary(Bin) -> 4647 megaco_trans_sender:send_req(Pid, Serial, Bin). 4648 4649 4650reply_timeout(ConnHandle, TransId, timeout) -> 4651 handle_reply_timer_timeout(ConnHandle, TransId); 4652 4653%% This means that infinity_restartable was used for max_retries. 4654%% There is currently no reason to use this for the reply_timeout, 4655%% since there is no external event to restart the timer! 4656reply_timeout(ConnHandle, TransId, {_, timeout}) -> 4657 handle_reply_timer_timeout(ConnHandle, TransId); 4658 4659reply_timeout(ConnHandle, TransId, Timer) -> 4660 ?report_trace(ConnHandle, "reply timeout", [Timer, TransId]), 4661 4662 case lookup_reply(undefined, TransId) of 4663 [] -> 4664 reply_not_found_ignore; 4665 4666 {Converted, 4667 #reply{state = waiting_for_ack, 4668 ack_action = {handle_ack, _}} = Rep} -> 4669 case megaco_config:lookup_local_conn(ConnHandle) of 4670 [CD] when (CD#conn_data.cancel =:= true) -> 4671 cancel_in_progress_ignore; 4672 [CD] when (Converted =:= true) -> 4673 incNumTimerRecovery(ConnHandle), 4674 %% When we did the reply record lookup, we had no 4675 %% conn_data record, and the reply record was 4676 %% converted. This means that the reply record 4677 %% has no valid info about user_mod or user_args. 4678 %% Therefor, the user_mod and user_args of the 4679 %% conn_data record is better then nothing. 4680 #conn_data{user_mod = UserMod, 4681 user_args = UserArgs} = CD, 4682 Rep2 = Rep#reply{user_mod = UserMod, 4683 user_args = UserArgs}, 4684 do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep2); 4685 [CD] when (Converted =:= false) -> 4686 incNumTimerRecovery(ConnHandle), 4687 do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep); 4688 [] -> 4689 incNumTimerRecovery(ConnHandle), 4690 CD = fake_conn_data(ConnHandle), 4691 do_reply_timeout(ConnHandle, TransId, CD, Timer, Rep) 4692 end; 4693 4694 {Converted, 4695 #reply{state = waiting_for_ack, 4696 bytes = Sent} = Rep} when is_list(Sent) -> 4697 case megaco_config:lookup_local_conn(ConnHandle) of 4698 [ConnData] when (Converted =:= true) -> 4699 incNumTimerRecovery(ConnHandle), 4700 %% When we did the reply record lookup, we had no 4701 %% conn_data record, and the reply record was 4702 %% converted. This means that the reply record 4703 %% has no valid info about user_mod or user_args. 4704 %% Therefor, the user_mod and user_args of the 4705 %% conn_data record is better then nothing. 4706 #conn_data{user_mod = UserMod, 4707 user_args = UserArgs} = ConnData, 4708 Rep2 = Rep#reply{user_mod = UserMod, 4709 user_args = UserArgs}, 4710 do_reply_timeout(ConnHandle, TransId, ConnData, 4711 Timer, Rep2); 4712 [ConnData] when (Converted =:= false) -> 4713 incNumTimerRecovery(ConnHandle), 4714 do_reply_timeout(ConnHandle, TransId, ConnData, 4715 Timer, Rep); 4716 [] -> 4717 incNumTimerRecovery(ConnHandle), 4718 ConnData = fake_conn_data(ConnHandle), 4719 do_reply_timeout(ConnHandle, TransId, ConnData, 4720 Timer, Rep) 4721 end; 4722 4723 {_Converted, 4724 #reply{state = waiting_for_ack} = Rep} -> 4725 do_reply_timeout(ConnHandle, TransId, Timer, Rep); 4726 4727 {_Converted, 4728 #reply{state = aborted} = Rep} -> 4729 do_reply_timeout(ConnHandle, TransId, Timer, Rep); 4730 4731 _ -> 4732 ignore 4733 4734 end. 4735 4736do_reply_timeout(ConnHandle, TransId, ConnData, Timer, 4737 #reply{send_handle = SH, 4738 version = V, 4739 bytes = Bytes} = Rep) when is_binary(Bytes) -> 4740 4741%% d("do_reply_timeout -> entry with" 4742%% "~n ConnHandle: ~p" 4743%% "~n TransId: ~p" 4744%% "~n Timer: ~p" 4745%% "~n Rep: ~p" 4746%% "~n", [ConnHandle, TransId, Timer, Rep]), 4747 4748 CD = ConnData#conn_data{send_handle = SH, 4749 protocol_version = V}, 4750 4751 ?rt1(CD, "re-send trans reply", [{bytes, Bytes}]), 4752 case megaco_messenger_misc:send_message(CD, true, Bytes) of 4753 {ok, _} -> 4754 ignore; 4755 {error, Reason} -> 4756 ?report_important(CD, "<ERROR> re-send trans reply failed", 4757 [{bytes, Bytes}, {error, Reason}]) 4758 end, 4759 do_reply_timeout(ConnHandle, TransId, Timer, Rep); 4760 4761do_reply_timeout(ConnHandle, TransId, ConnData, Timer, 4762 #reply{send_handle = SH, 4763 version = V, 4764 bytes = Sent} = Rep) when is_list(Sent) -> 4765 4766%% d("do_reply_timeout -> entry with" 4767%% "~n ConnHandle: ~p" 4768%% "~n TransId: ~p" 4769%% "~n Timer: ~p" 4770%% "~n Rep: ~p" 4771%% "~n", [ConnHandle, TransId, Timer, Rep]), 4772 4773 CD = ConnData#conn_data{send_handle = SH, 4774 protocol_version = V}, 4775 4776 ReSend = 4777 fun({SN, Bytes}) -> 4778 ?rt1(CD, "re-send segmented trans reply", 4779 [{segment_no, SN}, {bytes, Bytes}]), 4780 case megaco_messenger_misc:send_message(CD, true, Bytes) of 4781%% ok -> 4782%% ignore; 4783 {ok, _} -> 4784 ignore; 4785 {error, Reason} -> 4786 ?report_important(CD, 4787 "<ERROR> re-send segmented " 4788 "trans reply failed", 4789 [{segment_no, SN}, 4790 {bytes, Bytes}, 4791 {error, Reason}]) 4792 end 4793 end, 4794 lists:foreach(ReSend, Sent), 4795 do_reply_timeout(ConnHandle, TransId, Timer, Rep). 4796 4797do_reply_timeout(ConnHandle, TransId, Timer, #reply{bytes = Bytes}) -> 4798 {WaitFor, Timer2} = megaco_timer:restart(Timer), 4799 OptBin = case Bytes of 4800 Bin when is_binary(Bin) -> 4801 opt_garb_binary(Timer2, Bin); 4802 Sent when is_list(Sent) -> 4803 Garb = fun(Bin) -> opt_garb_binary(Timer2, Bin) end, 4804 [{SN, Garb(Bin)} || {SN, Bin} <- Sent] 4805 end, 4806 M = ?MODULE, 4807 F = reply_timeout, 4808 A = [ConnHandle, TransId, Timer2], 4809 Ref2 = megaco_monitor:apply_after(M, F, A, WaitFor), 4810 NewFields = 4811 [{#reply.bytes, OptBin}, 4812 {#reply.timer_ref, Ref2}], 4813 megaco_monitor:update_reply_fields(TransId, NewFields), % Timing problem? 4814 {restarted, WaitFor, Timer2}. 4815 4816 4817handle_reply_timer_timeout(ConnHandle, TransId) -> 4818 ?report_trace(ConnHandle, "handle reply timeout", [timeout, TransId]), 4819 incNumTimerRecovery(ConnHandle), 4820 %% OTP-4378 4821 case lookup_reply(undefined, TransId) of 4822 {Converted, 4823 #reply{state = waiting_for_ack} = Rep} -> 4824 Serial = (Rep#reply.trans_id)#trans_id.serial, 4825 {Rep2, ConnData} = 4826 case megaco_config:lookup_local_conn(ConnHandle) of 4827 [ConnData0] when (Converted =:= false) -> 4828 #reply{user_mod = UserMod, 4829 user_args = UserArgs} = Rep, 4830 {Rep, 4831 ConnData0#conn_data{user_mod = UserMod, 4832 user_args = UserArgs}}; 4833 [ConnData0] when (Converted =:= true) -> 4834 {Rep#reply{user_mod = ConnData0#conn_data.user_mod, 4835 user_args = ConnData0#conn_data.user_args}, 4836 ConnData0}; 4837 [] when (Converted =:= false) -> 4838 ConnData0 = fake_conn_data(ConnHandle), 4839 #reply{user_mod = UserMod, 4840 user_args = UserArgs} = Rep, 4841 {Rep, 4842 ConnData0#conn_data{user_mod = UserMod, 4843 user_args = UserArgs}}; 4844 [] when (Converted =:= true) -> 4845 %% We have no valid info about user_mod and user_args 4846 {Rep, fake_conn_data(ConnHandle)} 4847 end, 4848 ConnData2 = ConnData#conn_data{serial = Serial}, 4849 T = #'TransactionAck'{firstAck = Serial}, 4850 Extra = ?default_user_callback_extra, 4851 handle_ack(ConnData2, {error, timeout}, Rep2, T, Extra); 4852 4853 {_Converted, 4854 #reply{pending_timer_ref = Ref, % aborted? 4855 bytes = SegSent}} -> % may be a binary 4856 megaco_monitor:cancel_apply_after(Ref), 4857 cancel_segment_timers(SegSent), 4858 megaco_monitor:delete_reply(TransId), 4859 megaco_config:del_pending_counter(sent, TransId); 4860 [] -> 4861 ignore_reply_removed 4862 end. 4863 4864%% segment_reply_timeout(ConnHandle, TransId, SN, timeout) -> 4865%% ?report_trace(ConnHandle, "segment reply timeout", [timeout, SN, TransId]), 4866%% D = fun({_, _, SegRef}) -> 4867%% megaco_monitor:cancel_apply_after(SegRef) 4868%% end, 4869%% incNumTimerRecovery(ConnHandle), 4870%% %% OTP-4378 4871%% case megaco_monitor:lookup_reply(TransId) of 4872%% [#reply{state = waiting_for_ack, 4873%% bytes = Sent} = Rep] -> 4874%% Serial = (Rep#reply.trans_id)#trans_id.serial, 4875%% ConnData = 4876%% case megaco_config:lookup_local_conn(ConnHandle) of 4877%% [ConnData0] -> 4878%% ConnData0; 4879%% [] -> 4880%% fake_conn_data(ConnHandle) 4881%% end, 4882%% ConnData2 = ConnData#conn_data{serial = Serial}, 4883%% T = #'TransactionAck'{firstAck = Serial}, 4884%% lists:foreach(D, Sent), 4885%% Extra = ?default_user_callback_extra, 4886%% handle_ack(ConnData2, {error, timeout}, Rep, T, Extra); 4887%% [#reply{pending_timer_ref = Ref, 4888%% bytes = Sent}] -> % aborted? 4889%% lists:foreach(D, Sent), 4890%% megaco_monitor:cancel_apply_after(Ref), 4891%% megaco_monitor:delete_reply(TransId), 4892%% megaco_config:del_pending_counter(sent, TransId); 4893 4894%% [] -> 4895%% ignore 4896 4897%% end. 4898 4899%% segment_reply_timeout(ConnHandle, TransId, SN, Timer) -> 4900%% ?report_trace(ConnHandle, "reply timeout", [Timer, SN, TransId]), 4901 4902%% %% d("reply_timeout -> entry with" 4903%% %% "~n ConnHandle: ~p" 4904%% %% "~n TransId: ~p" 4905%% %% "~n Timer: ~p", [ConnHandle, TransId, Timer]), 4906 4907%% case megaco_monitor:lookup_reply(TransId) of 4908%% [] -> 4909%% ignore; % Trace ?? 4910 4911%% [#reply{state = waiting_for_ack, 4912%% bytes = ack_action = {handle_ack, _}} = Rep] -> 4913%% case megaco_config:lookup_local_conn(ConnHandle) of 4914%% [ConnData] -> 4915%% incNumTimerRecovery(ConnHandle), 4916%% do_reply_timeout(ConnHandle, TransId, ConnData, 4917%% Timer, Rep); 4918%% [] -> 4919%% incNumTimerRecovery(ConnHandle), 4920%% ConnData = fake_conn_data(ConnHandle), 4921%% do_reply_timeout(ConnHandle, TransId, ConnData, 4922%% Timer, Rep) 4923%% end; 4924 4925%% [#reply{state = waiting_for_ack} = Rep] -> 4926%% do_reply_timeout(ConnHandle, TransId, Timer, Rep); 4927 4928%% [#reply{state = aborted} = Rep] -> 4929%% do_reply_timeout(ConnHandle, TransId, Timer, Rep); 4930 4931%% _ -> 4932%% ignore 4933 4934%% end. 4935 4936 4937%% This clause is to catch the timers started prior to the code-upgrade 4938pending_timeout(#conn_data{conn_handle = CH}, TransId, Timer) -> 4939 ?report_trace(CH, "pending timeout(1)", [Timer, TransId]), 4940 pending_timeout(CH, TransId, Timer); 4941 4942pending_timeout(ConnHandle, TransId, Timer) -> 4943 ?report_trace(ConnHandle, "pending timeout(2)", [Timer, TransId]), 4944 case megaco_config:lookup_local_conn(ConnHandle) of 4945 [CD] when (CD#conn_data.cancel == true) -> 4946 cancel_in_progress_ignore; 4947 [CD] -> 4948 Serial = TransId#trans_id.serial, 4949 handle_pending_timeout(CD#conn_data{serial = Serial}, 4950 TransId, Timer); 4951 [] -> 4952 no_such_connection_ignore 4953 end. 4954 4955handle_pending_timeout(CD, TransId, Timer) -> 4956 ?report_trace(CD, "handle pending timeout", []), 4957 case lookup_reply(CD, TransId) of 4958 {_Converted, 4959 #reply{state = State, 4960 handler = Pid} = Rep} when (State =:= prepare) orelse 4961 (State =:= eval_request) -> 4962 4963 #conn_data{sent_pending_limit = Limit, 4964 conn_handle = ConnHandle} = CD, 4965 4966 %% ------------------------------------------ 4967 %% 4968 %% Check pending limit 4969 %% 4970 %% ------------------------------------------ 4971 4972 case check_and_maybe_incr_pending_limit(Limit, sent, TransId) of 4973 ok -> 4974 4975 %% --------------------------------------------- 4976 %% 4977 %% 1) Send pending message 4978 %% 2) Possibly restart the pending timer 4979 %% 4980 %% --------------------------------------------- 4981 4982 send_pending(CD), 4983 case Timer of 4984 timeout -> 4985 %% We are done 4986 incNumTimerRecovery(ConnHandle), 4987 timeout1; 4988 {_, timeout} -> 4989 %% We are done 4990 incNumTimerRecovery(ConnHandle), 4991 timeout2; 4992 _ -> 4993 {WaitFor, Timer2} = megaco_timer:restart(Timer), 4994 M = ?MODULE, 4995 F = pending_timeout, 4996 A = [ConnHandle, TransId, Timer2], 4997 PendingRef = 4998 megaco_monitor:apply_after(M, F, A, WaitFor), 4999 %% Timing problem? 5000 megaco_monitor:update_reply_field(TransId, 5001 #reply.pending_timer_ref, 5002 PendingRef), 5003 {restarted, WaitFor, Timer2} 5004 end; 5005 5006 5007 error -> 5008 5009 %% ------------------------------------------ 5010 %% 5011 %% 1) Send 506 error message to other side 5012 %% 2) Notify user 5013 %% 3) Set reply data in aborted state 5014 %% 5015 %% ------------------------------------------- 5016 5017 send_pending_limit_error(CD), 5018 handle_request_abort_callback(CD, TransId, Pid), 5019 %% Timing problem? 5020 Rep2 = Rep#reply{state = aborted}, 5021 cancel_reply(CD, Rep2, aborted), 5022 pending_limit_error; 5023 5024 5025 aborted -> 5026 5027 %% ------------------------------------------ 5028 %% 5029 %% Pending limit already passed 5030 %% 5031 %% ------------------------------------------- 5032 Rep2 = Rep#reply{state = aborted}, 5033 cancel_reply(CD, Rep2, aborted), 5034 pending_limit_aborted 5035 5036 end; 5037 [] -> 5038 reply_not_found; % Trace ?? 5039 5040 {_Converted, 5041 #reply{state = waiting_for_ack}} -> 5042 %% The reply has already been sent 5043 %% No need for any pending trans reply 5044 reply_has_been_sent; 5045 5046 {_Converted, 5047 #reply{state = aborted} = Rep} -> 5048 %% glitch, but cleanup just the same 5049 cancel_reply(CD, Rep, aborted), 5050 reply_aborted_state 5051 5052 end. 5053 5054 5055segment_timeout(ConnHandle, TransId, timeout = Timer) -> 5056 ?report_trace(ConnHandle, "segment timeout", [TransId, Timer]), 5057 incNumTimerRecovery(ConnHandle), 5058 case megaco_monitor:lookup_request(TransId) of 5059 [] -> 5060 timeout_not_found_ignore; 5061 5062 [#request{seg_recv = Segs} = Req] -> 5063 ConnData = 5064 case megaco_config:lookup_local_conn(ConnHandle) of 5065 [ConnData0] -> 5066 ConnData0; 5067 [] -> 5068 fake_conn_data(ConnHandle) 5069 end, 5070 Last = lists:last(lists:sort(Segs)), 5071 All = lists:seq(1,Last), 5072 case All -- Segs of 5073 [] -> 5074 %% The last segment has just arrived, ignore 5075 ok; 5076 Missing -> 5077 %% Send the error message 5078 Code = ?megaco_segments_not_received, 5079 Reason = missing_to_str(Missing), 5080 send_message_error(ConnData, Code, Reason), 5081 5082 %% Report to the user 5083 UserMod = Req#request.user_mod, 5084 UserArgs = Req#request.user_args, 5085 Action = Req#request.reply_action, 5086 UserData = Req#request.reply_data, 5087 UserReply = {error, {segment_timeout, Missing}}, 5088 ConnData2 = ConnData#conn_data{user_mod = UserMod, 5089 user_args = UserArgs, 5090 reply_action = Action, 5091 reply_data = UserData}, 5092 return_reply(ConnData2, TransId, UserReply) 5093 end 5094 end; 5095 5096segment_timeout(ConnHandle, TransId, Timer) -> 5097 ?report_trace(ConnHandle, "segment timeout", [TransId, Timer]), 5098 case megaco_monitor:lookup_request_field(TransId, #request.trans_id) of 5099 {ok, _} -> 5100 {WaitFor, Timer2} = megaco_timer:restart(Timer), 5101 M = ?MODULE, 5102 F = segment_timeout, 5103 A = [ConnHandle, TransId, Timer2], 5104 Ref = megaco_monitor:apply_after(M, F, A, WaitFor), 5105 %% Timing problem? 5106 megaco_monitor:update_request_field(TransId, 5107 #request.seg_timer_ref, 5108 Ref), 5109 {restarted, WaitFor, Timer2}; 5110 _ -> 5111 not_found_ignore 5112 end. 5113 5114%% segment_reply_timeout() -> 5115%% ok. 5116 5117missing_to_str(Missing) -> 5118 lists:flatten(missing_to_str2(Missing)). 5119 5120missing_to_str2([X]) -> 5121 [integer_to_list(X)]; 5122missing_to_str2([H|T]) -> 5123 [integer_to_list(H) , "," | missing_to_str2(T)]. 5124 5125return_unexpected_trans_reply(ConnData, TransId, 5126 {actionReplies, _} = UserReply, Extra) -> 5127 Trans = make_transaction_reply(ConnData, TransId, UserReply), 5128 return_unexpected_trans(ConnData, Trans, Extra); 5129return_unexpected_trans_reply(ConnData, TransId, 5130 {transactionError, _} = UserReply, Extra) -> 5131 Trans = make_transaction_reply(ConnData, TransId, UserReply), 5132 return_unexpected_trans(ConnData, Trans, Extra); 5133return_unexpected_trans_reply(CD, TransId, {error, Reason}, Extra) -> 5134 ?report_important(CD, "unexpected trans reply with error", 5135 [TransId, Reason, Extra]), 5136 ok; 5137return_unexpected_trans_reply(CD, TransId, Crap, Extra) -> 5138 ?report_important(CD, "unexpected trans reply with crap", 5139 [TransId, Crap, Extra]), 5140 ok. 5141 5142return_unexpected_trans(ConnData, Trans) -> 5143 Extra = ?default_user_callback_extra, 5144 return_unexpected_trans(ConnData, Trans, Extra). 5145 5146return_unexpected_trans(ConnData, Trans0, Extra) -> 5147 UserMod = ConnData#conn_data.user_mod, 5148 UserArgs = ConnData#conn_data.user_args, 5149 ConnHandle = ConnData#conn_data.conn_handle, 5150 Version = ConnData#conn_data.protocol_version, 5151 Trans = transform_transaction_reply_enc(Version, Trans0), 5152 Args = 5153 case Extra of 5154 ?default_user_callback_extra -> 5155 [ConnHandle, Version, Trans | UserArgs]; 5156 _ -> 5157 [ConnHandle, Version, Trans, Extra | UserArgs] 5158 end, 5159 Res = (catch apply(UserMod, handle_unexpected_trans, Args)), 5160 ?report_debug(ConnData, "return: unexpected trans", 5161 [Trans, {return, Res}]), 5162 case Res of 5163 ok -> 5164 ok; 5165 _ -> 5166 warning_msg("unexpected transaction callback failed: ~w", [Res]), 5167 ok 5168 end, 5169 Res. 5170 5171 5172%%----------------------------------------------------------------- 5173 5174to_remote_trans_id(#conn_data{conn_handle = CH, serial = Serial}) -> 5175 Mid = CH#megaco_conn_handle.remote_mid, 5176 #trans_id{mid = Mid, serial = Serial}. 5177 5178to_local_trans_id(#conn_data{conn_handle = CH, serial = Serial}) -> 5179 Mid = CH#megaco_conn_handle.local_mid, 5180 #trans_id{mid = Mid, serial = Serial}. 5181 5182to_local_trans_id(#conn_data{conn_handle = CH}, [S|_] = Serials) 5183 when is_integer(S) -> 5184 Mid = CH#megaco_conn_handle.local_mid, 5185 [#trans_id{mid = Mid, serial = Serial} || Serial <- Serials]; 5186to_local_trans_id(#conn_data{conn_handle = CH}, 5187 [{transactionRequest, TR}|_] = TRs) 5188 when is_record(TR, 'TransactionRequest') -> 5189 Mid = CH#megaco_conn_handle.local_mid, 5190 [#trans_id{mid = Mid, serial = Serial} || 5191 {transactionRequest, 5192 #'TransactionRequest'{transactionId = Serial}} <- TRs]; 5193 5194to_local_trans_id(#megaco_conn_handle{local_mid = Mid}, Serial) 5195 when is_integer(Serial) -> 5196 #trans_id{mid = Mid, serial = Serial}; 5197to_local_trans_id(#conn_data{conn_handle = CH}, Serial) 5198 when is_integer(Serial) -> 5199 Mid = CH#megaco_conn_handle.local_mid, 5200 #trans_id{mid = Mid, serial = Serial}. 5201 5202 5203%%----------------------------------------------------------------- 5204 5205transform_transaction_reply_dec({'TransactionReply', 5206 TransId, IAR, TransRes}) -> 5207 #megaco_transaction_reply{transactionId = TransId, 5208 immAckRequired = IAR, 5209 transactionResult = TransRes}; 5210transform_transaction_reply_dec({'TransactionReply', 5211 TransId, IAR, TransRes, 5212 SegNo, SegComplete}) -> 5213 #megaco_transaction_reply{transactionId = TransId, 5214 immAckRequired = IAR, 5215 transactionResult = TransRes, 5216 segmentNumber = SegNo, 5217 segmentationComplete = SegComplete}. 5218 5219transform_transaction_reply_enc( 5220 3, 5221 #megaco_transaction_reply{transactionId = TransId, 5222 immAckRequired = IAR, 5223 transactionResult = TransRes, 5224 segmentNumber = SegNo, 5225 segmentationComplete = SegComplete}) -> 5226 {'TransactionReply', TransId, IAR, TransRes, SegNo, SegComplete}; 5227transform_transaction_reply_enc( 5228 Version, 5229 #megaco_transaction_reply{transactionId = TransId, 5230 immAckRequired = IAR, 5231 transactionResult = TransRes}) 5232 when (Version < 3) -> 5233 {'TransactionReply', TransId, IAR, TransRes}; 5234transform_transaction_reply_enc(_, TR) -> 5235 TR. 5236 5237make_transaction_reply(#conn_data{protocol_version = Version}, 5238 TransId, TransRes) -> 5239 make_transaction_reply(Version, TransId, asn1_NOVALUE, TransRes). 5240 5241%% make_transaction_reply(#conn_data{protocol_version = Version}, 5242%% TransId, IAR, TransRes) -> 5243%% make_transaction_reply(Version, TransId, IAR, TransRes); 5244 5245make_transaction_reply(3, TransId, IAR, TransRes) -> 5246 {'TransactionReply', TransId, IAR, TransRes, asn1_NOVALUE, asn1_NOVALUE}; 5247make_transaction_reply(_, TransId, IAR, TransRes) -> 5248 {'TransactionReply', TransId, IAR, TransRes}. 5249 5250 5251%%----------------------------------------------------------------- 5252 5253%% This function is used as a wrapper for reply-record lookups. 5254%% The intention is that during upgrade, this function 5255%% can perform on-the-fly conversions of reply-records. 5256lookup_reply(CD, TransId) -> 5257 case megaco_monitor:lookup_reply(TransId) of 5258 [#reply{} = Rep] -> 5259 {false, Rep}; 5260 5261 %% Old (pre-3.13.1) version of the record => Convert to new version 5262 [{reply, TransId, 5263 LocalMid, State, PendingTmrRef, Handler, TimerRef, 5264 Version, Bytes, AckAction, SendHandle, Segments}] 5265 when is_record(CD, conn_data) -> 5266 #conn_data{user_mod = UserMod, 5267 user_args = UserArgs} = CD, 5268 Rep = #reply{trans_id = TransId, 5269 local_mid = LocalMid, 5270 state = State, 5271 pending_timer_ref = PendingTmrRef, 5272 handler = Handler, 5273 timer_ref = TimerRef, 5274 version = Version, 5275 bytes = Bytes, 5276 ack_action = AckAction, 5277 send_handle = SendHandle, 5278 segments = Segments, 5279 user_mod = UserMod, 5280 user_args = UserArgs}, 5281 {true, Rep}; 5282 5283 %% Old (pre-3.13.1) version of the record => Convert to new version 5284 [{reply, TransId, 5285 LocalMid, State, PendingTmrRef, Handler, TimerRef, 5286 Version, Bytes, AckAction, SendHandle, Segments}] -> 5287 %% ConnData is not known here, so ignore for now 5288 Rep = #reply{trans_id = TransId, 5289 local_mid = LocalMid, 5290 state = State, 5291 pending_timer_ref = PendingTmrRef, 5292 handler = Handler, 5293 timer_ref = TimerRef, 5294 version = Version, 5295 bytes = Bytes, 5296 ack_action = AckAction, 5297 send_handle = SendHandle, 5298 segments = Segments}, 5299 {true, Rep}; 5300 5301 Else -> 5302 Else 5303 end. 5304 5305 5306%%----------------------------------------------------------------- 5307 5308%%----------------------------------------------------------------- 5309%% info_msg(F, A) -> 5310%% ?megaco_info(F, A). 5311 5312warning_msg(F, A) -> 5313 ?megaco_warning(F, A). 5314 5315error_msg(F, A) -> 5316 ?megaco_error(F, A). 5317 5318 5319%%----------------------------------------------------------------- 5320 5321%% d(F) -> 5322%% d(F,[]). 5323%% 5324%% d(F,A) -> 5325%% d(true,F,A). 5326%% %% d(get(dbg),F,A). 5327%% 5328%% d(true,F,A) -> 5329%% io:format("*** [~s] ~p:~p ***" 5330%% "~n " ++ F ++ "~n", 5331%% [format_timestamp(now()), self(),?MODULE|A]); 5332%% d(_, _, _) -> 5333%% ok. 5334%% 5335%% format_timestamp({_N1, _N2, N3} = Now) -> 5336%% {Date, Time} = calendar:now_to_datetime(Now), 5337%% {YYYY,MM,DD} = Date, 5338%% {Hour,Min,Sec} = Time, 5339%% FormatDate = 5340%% io_lib:format("~.4w:~.2.0w:~.2.0w ~.2.0w:~.2.0w:~.2.0w 4~w", 5341%% [YYYY,MM,DD,Hour,Min,Sec,round(N3/1000)]), 5342%% lists:flatten(FormatDate). 5343 5344%% Time in milli seconds 5345t() -> 5346 {A,B,C} = os:timestamp(), 5347 A*1000000000+B*1000+(C div 1000). 5348 5349 5350%%----------------------------------------------------------------- 5351%% Func: incNumErrors/0, incNumErrors/1, incNumTimerRecovery/1 5352%% Description: SNMP counter increment functions 5353%%----------------------------------------------------------------- 5354incNumErrors() -> 5355 incNum(medGwyGatewayNumErrors). 5356 5357incNumErrors(CH) -> 5358 incNum({CH, medGwyGatewayNumErrors}). 5359 5360incNumTimerRecovery(CH) -> 5361 incNum({CH, medGwyGatewayNumTimerRecovery}). 5362 5363incNum(Cnt) -> 5364 case (catch ets:update_counter(megaco_stats, Cnt, 1)) of 5365 {'EXIT', {badarg, _Reason}} -> 5366 ets:insert(megaco_stats, {Cnt, 1}); 5367 Old -> 5368 Old 5369 end. 5370 5371 5372