1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2000-2018. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20%%---------------------------------------------------------------------- 21%% Purpose: Collect trace events and provide a backing storage 22%% appropriate for iteration 23%%---------------------------------------------------------------------- 24 25-module(et_collector). 26 27-behaviour(gen_server). 28 29%% External exports 30-export([start_link/1, 31 stop/1, 32 33 report/2, 34 report_event/5, 35 report_event/6, 36 37 iterate/3, 38 iterate/5, 39 lookup/2, 40 41 start_trace_client/3, 42 start_trace_port/1, 43 save_event_file/3, 44 clear_table/1, 45 46 get_global_pid/0, 47 get_table_size/1, 48 change_pattern/2, 49 make_key/2, 50 51 dict_insert/3, 52 dict_delete/2, 53 dict_lookup/2, 54 dict_match/2, 55 multicast/2]). 56 57%% Internal export 58-export([monitor_trace_port/2]). 59 60%% gen_server callbacks 61-export([init/1,terminate/2, code_change/3, 62 handle_call/3, handle_cast/2, handle_info/2]). 63 64-compile([{nowarn_deprecated_function,[{erlang,now,0}]}]). 65 66-include("et_internal.hrl"). 67-include("../include/et.hrl"). 68 69-record(state, {parent_pid, 70 auto_shutdown, % Optionally shutdown when the last subscriber dies 71 event_tab_size, 72 event_tab, 73 dict_tab, 74 event_order, 75 subscribers, 76 file, 77 trace_pattern, 78 trace_port, 79 trace_max_queue, 80 trace_nodes, 81 trace_global}). 82 83-record(file, {name, desc, event_opt, file_opt, table_opt}). 84 85-record(table_handle, {collector_pid, event_tab, event_order, filter}). 86 87-record(trace_ts, {trace_ts, event_ts}). 88-record(event_ts, {event_ts, trace_ts}). 89 90%%%---------------------------------------------------------------------- 91%%% Client side 92%%%---------------------------------------------------------------------- 93 94%%---------------------------------------------------------------------- 95%% start_link(Options) -> {ok, CollectorPid} | {error, Reason} 96%% 97%% Start a collector process 98%% 99%% The collector collects trace events and keeps them ordered by their 100%% timestamp. The timestamp may either reflect the time when the 101%% actual trace data was generated (trace_ts) or when the trace data 102%% was transformed into an event record (event_ts). If the time stamp 103%% is missing in the trace data (missing timestamp option to 104%% erlang:trace/4) the trace_ts will be set to the event_ts. 105%% 106%% Events are reported to the collector directly with the report 107%% function or indirectly via one or more trace clients. All reported 108%% events are first filtered thru the collector filter before they are 109%% stored by the collector. By replacing the default collector filter 110%% with a customized dito it is possible to allow any trace data as 111%% input. The collector filter is a dictionary entry with the 112%% predefined key {filter, all} and the value is a fun of 113%% arity 1. See et_selector:parse_event/2 for interface details, 114%% such as which erlang:trace/1 tuples that are accepted. 115%% 116%% The collector has a built-in dictionary service. Any term may be 117%% stored as value in the dictionary and bound to a unique key. When 118%% new values are inserted with an existing key, the new values will 119%% overwrite the existing ones. Processes may subscribe on dictionary 120%% updates by using {subscriber, pid()} as dictionary key. All 121%% dictionary updates will be propagated to the subscriber processes 122%% matching the pattern {{subscriber, '_'}, '_'} where the first '_' 123%% is interpreted as a pid(). 124%% 125%% In global trace mode, the collector will automatically start 126%% tracing on all connected Erlang nodes. When a node connects, a port 127%% tracer will be started on that node and a corresponding trace 128%% client on the collector node. By default the global trace pattern 129%% is 'max'. 130%% 131%% Options = [option()] 132%% 133%% option() = 134%% {parent_pid, pid()} | 135%% {event_order, event_order()} | 136%% {dict_insert, {filter, all}, collector_fun()} | 137%% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} | 138%% {dict_insert, {subscriber, pid()}, dict_val()} | 139%% {dict_insert, dict_key(), dict_val()} | 140%% {dict_delete, dict_key()} | 141%% {trace_client, trace_client()} | 142%% {trace_global, boolean()} | 143%% {trace_pattern, trace_pattern()} | 144%% {trace_port, integer()} | 145%% {trace_max_queue, integer()} 146%% 147%% event_order() = trace_ts | event_ts 148%% trace_pattern() = detail_level() | dbg_match_spec() 149%% detail_level() = min | max | integer(X) when X >= 0, X =< 100 150%% trace_client() = 151%% {event_file, file_name()} | 152%% {dbg_trace_type(), dbg_trace_parameters()} 153%% file_name() = string() 154%% collector_fun() = trace_filter_fun() | event_filter_fun() 155%% trace_filter_fun() = fun(TraceData) -> false | true | {true, NewEvent} 156%% event_filter_fun() = fun(Event) -> false | true | {true, NewEvent} 157%% event_filter_name() = atom() 158%% TraceData = erlang_trace_data() 159%% Event = NewEvent = record(event) 160%% dict_key() = term() 161%% dict_val() = term() 162%% 163%% CollectorPid = pid() 164%% Reason = term() 165%%---------------------------------------------------------------------- 166 167start_link(Options) -> 168 case parse_opt(Options, default_state(), [], []) of 169 {ok, S, Dict2, Clients} -> 170 Res = 171 case S#state.trace_global of 172 false -> 173 gen_server:start_link(?MODULE, [S, Dict2], []); 174 true -> 175 gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) 176 end, 177 case Res of 178 {ok, Pid} when S#state.parent_pid =/= self() -> 179 unlink(Pid), 180 start_clients(Pid, Clients); 181 {ok,Pid} -> 182 start_clients(Pid, Clients); 183 {error, Reason} -> 184 {error, Reason} 185 end; 186 {error, Reason} -> 187 {error, Reason} 188 end. 189 190default_state() -> 191 #state{parent_pid = self(), 192 auto_shutdown = false, 193 event_order = trace_ts, 194 subscribers = [], 195 trace_global = false, 196 trace_pattern = undefined, 197 trace_nodes = [], 198 trace_port = 4711, 199 trace_max_queue = 50}. 200 201parse_opt([], S, Dict, Clients) -> 202 {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern), 203 Fun = fun(E) -> et_selector:parse_event(Mod, E) end, 204 Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun}, 205 {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients}; 206parse_opt([H | T], S, Dict, Clients) -> 207 case H of 208 {parent_pid, Parent} when Parent =:= undefined -> 209 parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); 210 {parent_pid, Parent} when is_pid(Parent) -> 211 parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); 212 {auto_shutdown, Bool} when Bool =:= true; Bool =:= false -> 213 parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients); 214 {event_order, Order} when Order =:= trace_ts -> 215 parse_opt(T, S#state{event_order = Order}, Dict, Clients); 216 {event_order, Order} when Order =:= event_ts -> 217 parse_opt(T, S#state{event_order = Order}, Dict, Clients); 218 {dict_insert, {filter, Name}, Fun} -> 219 if 220 is_atom(Name), is_function(Fun) -> 221 parse_opt(T, S, Dict ++ [H], Clients); 222 true -> 223 {error, {bad_option, H}} 224 end; 225 {dict_insert, {subscriber, Pid}, _Val} -> 226 if 227 is_pid(Pid) -> 228 parse_opt(T, S, Dict ++ [H], Clients); 229 true -> 230 {error, {bad_option, H}} 231 end; 232 {dict_insert, _Key, _Val} -> 233 parse_opt(T, S, Dict ++ [H], Clients); 234 {dict_delete, _Key} -> 235 parse_opt(T, S, Dict ++ [H], Clients); 236 {trace_client, Client = {_, _}} -> 237 parse_opt(T, S, Dict, Clients ++ [Client]); 238 {trace_global, Bool} when Bool =:= false -> 239 parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); 240 {trace_global, Bool} when Bool =:= true -> 241 parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); 242 {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) -> 243 parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); 244 {trace_pattern, undefined = Pattern} -> 245 parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); 246 {trace_port, Port} when is_integer(Port) -> 247 parse_opt(T, S#state{trace_port = Port}, Dict, Clients); 248 {trace_max_queue, MaxQueue} when is_integer(MaxQueue) -> 249 parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients); 250 Bad -> 251 {error, {bad_option, Bad}} 252 end; 253parse_opt(BadList, _S, _Dict, _Clients) -> 254 {error, {bad_option_list, BadList}}. 255 256start_clients(CollectorPid, [{Type, Parameters} | T]) -> 257 _ = start_trace_client(CollectorPid, Type, Parameters), 258 start_clients(CollectorPid, T); 259start_clients(CollectorPid, []) -> 260 {ok, CollectorPid}. 261 262%%---------------------------------------------------------------------- 263%% stop(CollectorPid) -> ok 264%% 265%% Stop a collector process 266%% 267%% CollectorPid = pid() 268%%---------------------------------------------------------------------- 269 270stop(CollectorPid) -> 271 call(CollectorPid, stop). 272 273%%---------------------------------------------------------------------- 274%% save_event_file(CollectorPid, FileName, Options) -> ok | {error, Reason} 275%% 276%% Saves the events to a file 277%% 278%% CollectorPid = pid() 279%% FileName = string() 280%% Options = [option()] 281%% Reason = term() 282%% 283%% option() = event_option() | file_option() | table_option() 284%% event_option() = existing 285%% file_option() = write | append 286%% table_option() = keep | clear 287%% 288%% By default the currently stored events (existing) are 289%% written to a brand new file (write) and the events are 290%% kept (keep) after they have been written to the file. 291%% 292%% Instead of keeping the events after writing them to file, 293%% it is possible to remove all stored events after they 294%% have successfully written to file (clear). 295%% 296%% The options defaults to existing, write and keep. 297%%---------------------------------------------------------------------- 298 299save_event_file(CollectorPid, FileName, Options) -> 300 call(CollectorPid, {save_event_file, FileName, Options}). 301 302%%---------------------------------------------------------------------- 303%% load_event_file(CollectorPid, FileName) ->{ok, BadBytes} | exit(Reason) 304%% 305%% Load the event table from a file 306%% 307%% CollectorPid = pid() 308%% FileName = string() 309%% BadBytes = integer(X) where X >= 0 310%% Reason = term() 311%%---------------------------------------------------------------------- 312 313load_event_file(CollectorPid, FileName) -> 314 Fd = make_ref(), 315 Args = [{file, FileName}, {name, Fd}, {repair, true}, {mode, read_only}], 316 Fun = fun(Event, {ok, TH}) -> report(TH, Event) end, 317 case disk_log:open(Args) of 318 {ok, _} -> 319 do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, 0); 320 {repaired, _, _, BadBytes} -> 321 do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, BadBytes); 322 {error, Reason} -> 323 exit({disk_log_open, FileName, Reason}) 324 end. 325 326do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) -> 327 case disk_log:chunk(Fd, Cont) of 328 eof -> 329 {ok, BadBytes}; 330 {error, Reason} -> 331 exit({bad_disk_log_chunk, FileName, Reason}); 332 {Cont2, Events} -> 333 Acc2 = lists:foldl(Fun, Acc, Events), 334 do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes); 335 {Cont2, Events, More} -> 336 Acc2 = lists:foldl(Fun, Acc, Events), 337 do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes + More) 338 end. 339 340%%---------------------------------------------------------------------- 341%% report(Handle, TraceOrEvent) 342%% 343%% Report an event to the collector 344%% 345%% All events are filtered thru the collector filter, which 346%% optionally may transform or discard the event. The first 347%% call should use the pid of the collector process as 348%% report handle, while subsequent calls should use the 349%% table handle. 350%% 351%% Handle = Initial | Continuation 352%% Initial = collector_pid() 353%% collector_pid() = pid() 354%% Continuation = record(table_handle) 355%% 356%% TraceOrEvent = record(event) | dbg_trace_tuple() | end_of_trace 357%% Reason = term() 358%% 359%% Returns: {ok, Continuation} | exit(Reason) 360%%---------------------------------------------------------------------- 361 362report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) -> 363 case get_table_handle(CollectorPid) of 364 {ok, TH} when is_record(TH, table_handle) -> 365 report(TH, TraceOrEvent); 366 {error, Reason} -> 367 exit(Reason) 368 end; 369report(TH, TraceOrEvent) when is_record(TH, table_handle) -> 370 Fun = TH#table_handle.filter, 371 case Fun(TraceOrEvent) of 372 false -> 373 {ok, TH}; 374 true when is_record(TraceOrEvent, event) -> 375 Key = make_key(TH, TraceOrEvent), 376 case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of 377 true -> 378 {ok, TH}; 379 {'EXIT', _Reason} -> 380 %% Refresh the report handle and try again 381 report(TH#table_handle.collector_pid, TraceOrEvent) 382 end; 383 {true, Event} when is_record(Event, event) -> 384 Key = make_key(TH, Event), 385 case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of 386 true -> 387 {ok, TH}; 388 {'EXIT', _Reason} -> 389 %% Refresh the report handle and try again 390 report(TH#table_handle.collector_pid, TraceOrEvent) 391 end; 392 BadEvent -> 393 TS = erlang:now(), 394 Contents = [{trace, TraceOrEvent}, {reason, BadEvent}, {filter, Fun}], 395 Event = #event{detail_level = 0, 396 trace_ts = TS, 397 event_ts = TS, 398 from = bad_filter, 399 to = bad_filter, 400 label = bad_filter, 401 contents = Contents}, 402 Key = make_key(TH, Event), 403 case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of 404 true -> 405 {ok, TH}; 406 {'EXIT', _Reason} -> 407 %% Refresh the report handle and try again 408 report(TH#table_handle.collector_pid, TraceOrEvent) 409 end 410 end; 411report(_, Bad) -> 412 exit({bad_event, Bad}). 413 414report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) -> 415 report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents). 416 417report_event(CollectorPid, DetailLevel, From, To, Label, Contents) 418 when is_integer(DetailLevel), 419 DetailLevel >= ?detail_level_min, 420 DetailLevel =< ?detail_level_max -> 421 TS= erlang:now(), 422 E = #event{detail_level = DetailLevel, 423 trace_ts = TS, 424 event_ts = TS, 425 from = From, 426 to = To, 427 label = Label, 428 contents = Contents}, 429 report(CollectorPid, E). 430 431%%---------------------------------------------------------------------- 432%% make_key(Type, Stuff) -> Key 433%% 434%% Makes a key out of an event record or an old key 435%% 436%% Type = record(table_handle) | trace_ts | event_ts 437%% Stuff = record(event) | Key 438%% Key = record(event_ts) | record(trace_ts) 439%%---------------------------------------------------------------------- 440 441make_key(TH, Stuff) when is_record(TH, table_handle) -> 442 make_key(TH#table_handle.event_order, Stuff); 443make_key(trace_ts, Stuff) -> 444 if 445 is_record(Stuff, event) -> 446 #event{trace_ts = R, event_ts = P} = Stuff, 447 #trace_ts{trace_ts = R, event_ts = P}; 448 is_record(Stuff, trace_ts) -> 449 Stuff; 450 is_record(Stuff, event_ts) -> 451 #event_ts{trace_ts = R, event_ts = P} = Stuff, 452 #trace_ts{trace_ts = R, event_ts = P} 453 end; 454make_key(event_ts, Stuff) -> 455 if 456 is_record(Stuff, event) -> 457 #event{trace_ts = R, event_ts = P} = Stuff, 458 #event_ts{trace_ts = R, event_ts = P}; 459 is_record(Stuff, event_ts) -> 460 Stuff; 461 is_record(Stuff, trace_ts) -> 462 #trace_ts{trace_ts = R, event_ts = P} = Stuff, 463 #event_ts{trace_ts = R, event_ts = P} 464 end. 465 466%%---------------------------------------------------------------------- 467%%---------------------------------------------------------------------- 468 469get_table_size(CollectorPid) when is_pid(CollectorPid) -> 470 call(CollectorPid, get_table_size). 471 472%%---------------------------------------------------------------------- 473%% get_table_handle(CollectorPid) -> Handle 474%% 475%% Return a table handle 476%% 477%% CollectorPid = pid() 478%% Handle = record(table_handle) 479%%---------------------------------------------------------------------- 480 481get_table_handle(CollectorPid) when is_pid(CollectorPid) -> 482 call(CollectorPid, get_table_handle). 483 484%%---------------------------------------------------------------------- 485%% get_global_pid() -> CollectorPid | exit(Reason) 486%% 487%% Return a the identity of the globally registered collector 488%% if there is any 489%% 490%% CollectorPid = pid() 491%% Reason = term() 492%%---------------------------------------------------------------------- 493 494get_global_pid() -> 495 case global:whereis_name(?MODULE) of 496 CollectorPid when is_pid(CollectorPid) -> 497 CollectorPid; 498 undefined -> 499 exit(global_collector_not_started) 500 end. 501 502%%---------------------------------------------------------------------- 503%% change_pattern(CollectorPid, RawPattern) -> {old_pattern, TracePattern} 504%% 505%% Change active trace pattern globally on all trace nodes 506%% 507%% CollectorPid = pid() 508%% RawPattern = {report_module(), extended_dbg_match_spec()} 509%% report_module() = atom() | undefined 510%% extended_dbg_match_spec() = detail_level() | dbg_match_spec() 511%% RawPattern = detail_level() 512%% detail_level() = min | max | integer(X) when X =< 0, X >= 100 513%% TracePattern = {report_module(), dbg_match_spec_match_spec()} 514%%---------------------------------------------------------------------- 515 516change_pattern(CollectorPid, RawPattern) -> 517 Pattern = et_selector:make_pattern(RawPattern), 518 call(CollectorPid, {change_pattern, Pattern}). 519 520%%---------------------------------------------------------------------- 521%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok 522%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok 523%% dict_insert(CollectorPid, Key, Val) -> ok 524%% 525%% Insert a dictionary entry 526%% and send a {et, {dict_insert, Key, Val}} tuple 527%% to all registered subscribers. 528%% 529%% If the entry is a new subscriber, it will imply that 530%% the new subscriber process first will get one message 531%% for each already stored dictionary entry, before it 532%% and all old subscribers will get this particular entry. 533%% The collector process links to and then supervises the 534%% subscriber process. If the subscriber process dies it 535%% will imply that it gets unregistered as with a normal 536%% dict_delete/2. 537%% 538%% CollectorPid = pid() 539%% FilterFun = filter_fun() 540%% SubscriberPid = pid() 541%% Void = term() 542%% Key = term() 543%% Val = term() 544%%---------------------------------------------------------------------- 545 546dict_insert(CollectorPid, Key = {filter, Name}, Fun) -> 547 if 548 is_atom(Name), is_function(Fun) -> 549 call(CollectorPid, {dict_insert, Key, Fun}); 550 true -> 551 exit({badarg, Key}) 552 end; 553dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) -> 554 if 555 is_pid(Pid) -> 556 call(CollectorPid, {dict_insert, Key, Val}); 557 true -> 558 exit({badarg, Key}) 559 end; 560dict_insert(CollectorPid, Key, Val) -> 561 call(CollectorPid, {dict_insert, Key, Val}). 562 563%%---------------------------------------------------------------------- 564%% dict_lookup(CollectorPid, Key) -> [Val] 565%% 566%% Lookup a dictionary entry and return zero or one value 567%% 568%% CollectorPid = pid() 569%% Key = term() 570%% Val = term() 571%%---------------------------------------------------------------------- 572 573dict_lookup(CollectorPid, Key) -> 574 call(CollectorPid, {dict_lookup, Key}). 575 576%%---------------------------------------------------------------------- 577%% Ddict_delete(CollectorPid, Key) -> ok 578%% 579%% elete a dictionary entry 580%% and send a {et, {dict_delete, Key}} tuple 581%% to all registered subscribers. 582%% 583%% If the deleted entry is a registered subscriber, it will 584%% imply that the subscriber process gets is unregistered as 585%% subscriber as well as it gets it final message. 586%% 587%% dict_delete(CollectorPid, {subscriber, SubscriberPid}) 588%% dict_delete(CollectorPid, Key) 589%% 590%% CollectorPid = pid() 591%% SubscriberPid = pid() 592%% Key = term() 593%%---------------------------------------------------------------------- 594 595dict_delete(CollectorPid, Key) -> 596 call(CollectorPid, {dict_delete, Key}). 597 598%%---------------------------------------------------------------------- 599%% dict_match(CollectorPid, Pattern) -> [Match] 600%% 601%% Match some dictionary entries 602%% 603%% CollectorPid = pid() 604%% Pattern = '_' | {key_pattern(), val_pattern()} 605%% key_pattern() = ets_match_object_pattern() 606%% val_pattern() = ets_match_object_pattern() 607%% Match = {key(), val()} 608%% key() = term() 609%% val() = term() 610%%---------------------------------------------------------------------- 611 612dict_match(CollectorPid, Pattern) -> 613 call(CollectorPid, {dict_match, Pattern}). 614 615%%---------------------------------------------------------------------- 616%% multicast(_CollectorPid, Msg) -> ok 617%% 618%% Sends a message to all registered subscribers 619%% 620%% CollectorPid = pid() 621%% Msg = term() 622%%---------------------------------------------------------------------- 623 624multicast(_CollectorPid, Msg = {dict_insert, _Key, _Val}) -> 625 exit({badarg, Msg}); 626multicast(_CollectorPid, Msg = {dict_delete, _Key}) -> 627 exit({badarg, Msg}); 628multicast(CollectorPid, Msg) -> 629 call(CollectorPid, {multicast, Msg}). 630 631%%---------------------------------------------------------------------- 632%% start_trace_client(CollectorPid, Type, Parameters) -> 633%% file_loaded | {trace_client_pid, pid()} | exit(Reason) 634%% 635%% Load raw Erlang trace from a file, port or process. 636%% 637%% Type = dbg_trace_client_type() 638%% Parameters = dbg_trace_client_parameters() 639%% Pid = dbg_trace_client_pid() 640%%---------------------------------------------------------------------- 641 642start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file -> 643 load_event_file(CollectorPid, FileName); 644start_trace_client(CollectorPid, Type, FileName) when Type =:= file -> 645 WaitFor = {make_ref(), end_of_trace}, 646 EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end, 647 EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end, 648 Spec = trace_spec_wrapper(EventFun, EndFun, {self(), {ok, CollectorPid}}), 649 Pid = dbg:trace_client(Type, FileName, Spec), 650 unlink(Pid), 651 Ref = erlang:monitor(process, Pid), 652 receive 653 WaitFor -> 654 erlang:demonitor(Ref, [flush]), 655 file_loaded; 656 {'DOWN', Ref, _, _, Reason} -> 657 exit(Reason) 658 end; 659start_trace_client(CollectorPid, Type, Parameters) -> 660 EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end, 661 EndFun = fun(Acc) -> Acc end, 662 Spec = trace_spec_wrapper(EventFun, EndFun, {ok, CollectorPid}), 663 Pid = dbg:trace_client(Type, Parameters, Spec), 664 CollectorPid ! {register_trace_client, Pid}, 665 unlink(Pid), 666 {trace_client_pid, Pid}. 667 668trace_spec_wrapper(EventFun, EndFun, EventInitialAcc) 669 when is_function(EventFun), is_function(EndFun) -> 670 {fun(Trace, Acc) -> 671 case Trace =:= end_of_trace of 672 true -> EndFun(Acc); 673 false -> EventFun(Trace, Acc) 674 end 675 end, 676 EventInitialAcc}. 677 678start_trace_port(Parameters) -> 679 dbg:tracer(port, dbg:trace_port(ip, Parameters)). 680 681monitor_trace_port(CollectorPid, Parameters) -> 682 Res = start_trace_port(Parameters), 683 spawn(fun() -> 684 MonitorRef = erlang:monitor(process, CollectorPid), 685 receive 686 {'DOWN', MonitorRef, _, _, _} -> 687 dbg:stop_clear() 688 end 689 end), 690 Res. 691 692%%---------------------------------------------------------------------- 693%% iterate(Handle, Prev, Limit) -> 694%% iterate(Handle, Prev, Limit, undefined, Prev) 695%% 696%% Iterates over the currently stored events 697%% 698%% Short for iterate/5. 699%%---------------------------------------------------------------------- 700 701iterate(Handle, Prev, Limit) -> 702 iterate(Handle, Prev, Limit, undefined, Prev). 703 704%%---------------------------------------------------------------------- 705%% iterate(Handle, Prev, Limit, Fun, Acc) -> NewAcc 706%% 707%% Iterates over the currently stored events and apply a function for 708%% each event. The iteration may be performed forwards or backwards 709%% and may be limited to a maximum number of events (abs(Limit)). 710%% 711%% Handle = collector_pid() | table_handle() 712%% Prev = first | last | event_key() 713%% Limit = done() | forward() | backward() 714%% collector_pid() = pid() 715%% table_handle() = record(table_handle) 716%% event_key() = 717%% done() = 0 718%% forward() = infinity | integer(X) where X > 0 719%% backward() = '-infinity' | integer(X) where X < 0 720%% Fun = fun(Event, Acc) -> NewAcc 721%% Acc = NewAcc = term() 722%%---------------------------------------------------------------------- 723 724iterate(_, _, Limit, _, Acc) when Limit =:= 0 -> 725 Acc; 726iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) -> 727 case get_table_handle(CollectorPid) of 728 {ok, TH} when is_record(TH, table_handle) -> 729 iterate(TH, Prev, Limit, Fun, Acc); 730 {error, Reason} -> 731 exit(Reason) 732 end; 733iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) -> 734 if 735 Limit =:= infinity -> 736 next_iterate(TH, Prev, Limit, Fun, Acc); 737 is_integer(Limit), Limit > 0 -> 738 next_iterate(TH, Prev, Limit, Fun, Acc); 739 Limit =:= '-infinity' -> 740 prev_iterate(TH, Prev, Limit, Fun, Acc); 741 is_integer(Limit), Limit < 0 -> 742 prev_iterate(TH, Prev, Limit, Fun, Acc) 743 end. 744 745next_iterate(TH, Prev = first, Limit, Fun, Acc) -> 746 Tab = TH#table_handle.event_tab, 747 case catch ets:first(Tab) of 748 '$end_of_table' -> 749 Acc; 750 {'EXIT', _} = Error -> 751 io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]), 752 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 753 First -> 754 lookup_and_apply(TH, Prev, First, Limit, -1, Fun, Acc) 755 end; 756next_iterate(TH, Prev = last, Limit, Fun, Acc) -> 757 Tab = TH#table_handle.event_tab, 758 case catch ets:last(Tab) of 759 '$end_of_table' -> 760 Acc; 761 {'EXIT', _} = Error -> 762 io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]), 763 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 764 Last -> 765 lookup_and_apply(TH, Prev, Last, Limit, -1, Fun, Acc) 766 end; 767next_iterate(TH, Prev, Limit, Fun, Acc) -> 768 Tab = TH#table_handle.event_tab, 769 Key = make_key(TH, Prev), 770 case catch ets:next(Tab, Key) of 771 '$end_of_table' -> 772 Acc; 773 {'EXIT', _} = Error -> 774 io:format("~p(~p): Next ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]), 775 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 776 Next -> 777 lookup_and_apply(TH, Prev, Next, Limit, -1, Fun, Acc) 778 end. 779 780prev_iterate(TH, Prev = first, Limit, Fun, Acc) -> 781 Tab = TH#table_handle.event_tab, 782 case catch ets:first(Tab) of 783 '$end_of_table' -> 784 Acc; 785 {'EXIT', _} = Error -> 786 io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]), 787 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 788 First -> 789 lookup_and_apply(TH, Prev, First, Limit, 1, Fun, Acc) 790 end; 791prev_iterate(TH, Prev = last, Limit, Fun, Acc) -> 792 Tab = TH#table_handle.event_tab, 793 case catch ets:last(Tab) of 794 '$end_of_table' -> 795 Acc; 796 {'EXIT', _} = Error -> 797 io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]), 798 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 799 Last -> 800 lookup_and_apply(TH, Prev, Last, Limit, 1, Fun, Acc) 801 end; 802prev_iterate(TH, Prev, Limit, Fun, Acc) -> 803 Tab = TH#table_handle.event_tab, 804 Key = make_key(TH, Prev), 805 case catch ets:prev(Tab, Key) of 806 '$end_of_table' -> 807 Acc; 808 {'EXIT', _} = Error -> 809 io:format("~p(~p): Prev ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]), 810 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 811 Next -> 812 lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc) 813 end. 814 815lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined -> 816 Limit2 = incr(Limit, Incr), 817 iterate(TH, Next, Limit2, Fun, Next); 818lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) -> 819 Tab = TH#table_handle.event_tab, 820 case catch ets:lookup_element(Tab, Next, 2) of 821 {'EXIT', _} -> 822 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 823 E when is_record(E, event) -> 824 Acc2 = Fun(E, Acc), 825 Limit2 = incr(Limit, Incr), 826 iterate(TH, Next, Limit2, Fun, Acc2) 827 end. 828 829lookup(CollectorPid, Key) when is_pid(CollectorPid) -> 830 case get_table_handle(CollectorPid) of 831 {ok, TH} when is_record(TH, table_handle) -> 832 lookup(TH, Key); 833 {error, Reason} -> 834 {error, Reason} 835 end; 836lookup(TH, Key) when is_record(TH, table_handle) -> 837 Tab = TH#table_handle.event_tab, 838 case catch ets:lookup_element(Tab, Key, 2) of 839 {'EXIT', _} -> 840 {error, enoent}; 841 E when is_record(E, event) -> 842 {ok, E} 843 end. 844 845incr(Val, Incr) -> 846 if 847 Val =:= infinity -> Val; 848 Val =:= '-infinity' -> Val; 849 is_integer(Val) -> Val + Incr 850 end. 851 852%%---------------------------------------------------------------------- 853%% clear_table(Handle) -> ok 854%% 855%% Clear the event table 856%% 857%% Handle = collector_pid() | table_handle() 858%% collector_pid() = pid() 859%% table_handle() = record(table_handle) 860%%---------------------------------------------------------------------- 861 862clear_table(CollectorPid) when is_pid(CollectorPid) -> 863 call(CollectorPid, clear_table); 864clear_table(TH) when is_record(TH, table_handle) -> 865 clear_table(TH#table_handle.collector_pid). 866 867call(CollectorPid, Request) -> 868 try 869 gen_server:call(CollectorPid, Request, infinity) 870 catch 871 exit:{noproc,_} -> 872 {error, no_collector} 873 end. 874 875%%%---------------------------------------------------------------------- 876%%% Callback functions from gen_server 877%%%---------------------------------------------------------------------- 878 879%%---------------------------------------------------------------------- 880%% Func: init/1 881%% Returns: {ok, State} | 882%% {ok, State, Timeout} | 883%% ignore | 884%% {stop, Reason} 885%%---------------------------------------------------------------------- 886 887init([InitialS, Dict]) -> 888 process_flag(trap_exit, true), 889 case InitialS#state.parent_pid of 890 undefined -> 891 ok; 892 Pid when is_pid(Pid) -> 893 link(Pid) 894 end, 895 Funs = [fun init_tables/1, 896 fun init_global/1, 897 fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end], 898 {ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}. 899 900init_tables(S) -> 901 EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), 902 DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]), 903 S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}. 904 905init_global(S) -> 906 case S#state.trace_global of 907 true -> 908 EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end, 909 EndFun = fun(Acc) -> Acc end, 910 Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}), 911 dbg:tracer(process, Spec), 912 et_selector:change_pattern(S#state.trace_pattern), 913 ok = net_kernel:monitor_nodes(true), 914 lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()), 915 S#state{trace_nodes = [node()]}; 916 false -> 917 S 918 end. 919 920%%---------------------------------------------------------------------- 921%% Func: handle_call/3 922%% Returns: {reply, Reply, State} | 923%% {reply, Reply, State, Timeout} | 924%% {noreply, State} | 925%% {noreply, State, Timeout} | 926%% {stop, Reason, Reply, State} | (terminate/2 is called) 927%% {stop, Reason, State} (terminate/2 is called) 928%%---------------------------------------------------------------------- 929 930handle_call({multicast, Msg}, _From, S) -> 931 do_multicast(S#state.subscribers, Msg), 932 reply(ok, S); 933 934handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) -> 935 S2 = do_dict_insert(Msg, S), 936 reply(ok, S2); 937 938handle_call(Msg = {dict_delete, _Key}, _From, S) -> 939 try 940 S2 = do_dict_delete(Msg, S), 941 reply(ok, S2) 942 catch 943 throw:{stop, R} -> 944 opt_unlink(S#state.parent_pid), 945 {stop, R, S} 946 end; 947handle_call({dict_lookup, Key}, _From, S) -> 948 Reply = ets:lookup(S#state.dict_tab, Key), 949 reply(Reply, S); 950 951handle_call({dict_match, Pattern}, _From, S) -> 952 case catch ets:match_object(S#state.dict_tab, Pattern) of 953 {'EXIT', _Reason} -> 954 reply([], S); 955 Matching -> 956 reply(Matching, S) 957 end; 958 959handle_call(get_table_handle, _From, S) -> 960 [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}), 961 TH = #table_handle{collector_pid = self(), 962 event_tab = S#state.event_tab, 963 event_order = S#state.event_order, 964 filter = TableFilter}, 965 reply({ok, TH}, S); 966 967handle_call(get_table_size, _From, S) -> 968 Size = ets:info(S#state.event_tab, size), 969 reply({ok, Size}, S); 970 971handle_call(close, _From, S) -> 972 case S#state.file of 973 undefined -> 974 reply({error, file_not_open}, S); 975 F -> 976 Reply = disk_log:close(F#file.desc), 977 S2 = S#state{file = undefined}, 978 reply(Reply, S2) 979 end; 980handle_call({save_event_file, FileName, Options}, _From, S) -> 981 Default = #file{name = FileName, 982 event_opt = existing, 983 file_opt = write, 984 table_opt = keep}, 985 case parse_file_options(Default, Options) of 986 {ok, F} when is_record(F, file) -> 987 case file_open(F) of 988 {ok, Fd} -> 989 F2 = F#file{desc = Fd}, 990 {Reply2, S3} = 991 case F2#file.event_opt of 992 %% new -> 993 %% Reply = ok, 994 %% S2 = S#state{file = F}, 995 %% {Reply, S2}; 996 %% 997 %% insert() -> 998 %% case S2#state.file of 999 %% undefined -> 1000 %% ok; 1001 %% F -> 1002 %% Fd = F#file.desc, 1003 %% ok = disk_log:log(Fd, Event) 1004 %% end. 1005 existing -> 1006 Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end, 1007 Tab = S#state.event_tab, 1008 Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok), 1009 ok = disk_log:close(Fd), 1010 {Reply, S} 1011 %% all -> 1012 %% Reply = tab_iterate(WriteFun, Tab, ok), 1013 %% S2 = S#state{file = F}, 1014 %% {Reply, S2} 1015 end, 1016 case F2#file.table_opt of 1017 keep -> 1018 reply(Reply2, S3); 1019 clear -> 1020 S4 = do_clear_table(S3), 1021 reply(Reply2, S4) 1022 end; 1023 {error, Reason} -> 1024 reply({error, {file_open, Reason}}, S) 1025 end; 1026 {error, Reason} -> 1027 reply({error, Reason}, S) 1028 end; 1029 1030handle_call({change_pattern, Pattern}, _From, S) -> 1031 Ns = S#state.trace_nodes, 1032 {_,[]} = rpc:multicall(Ns, et_selector, change_pattern, [Pattern]), 1033 Reply = {old_pattern, S#state.trace_pattern}, 1034 S2 = S#state{trace_pattern = Pattern}, 1035 reply(Reply, S2); 1036 1037handle_call(clear_table, _From, S) -> 1038 S2 = do_clear_table(S), 1039 reply(ok, S2); 1040 1041handle_call(stop, _From, S) -> 1042 do_multicast(S#state.subscribers, close), 1043 case S#state.trace_global of 1044 true -> {_,[]} = rpc:multicall(S#state.trace_nodes, dbg, stop_clear, []), 1045 ok; 1046 false -> ok 1047 end, 1048 {stop, shutdown, ok, S}; 1049handle_call(Request, From, S) -> 1050 ok = error_logger:format("~p(~p): handle_call(~tp, ~tp, ~tp)~n", 1051 [?MODULE, self(), Request, From, S]), 1052 reply({error, {bad_request, Request}}, S). 1053 1054%%---------------------------------------------------------------------- 1055%% Func: handle_cast/2 1056%% Returns: {noreply, State} | 1057%% {noreply, State, Timeout} | 1058%% {stop, Reason, State} (terminate/2 is called) 1059%%---------------------------------------------------------------------- 1060 1061handle_cast(Msg, S) -> 1062 ok = error_logger:format("~p(~p): handle_cast(~tp, ~tp)~n", 1063 [?MODULE, self(), Msg, S]), 1064 noreply(S). 1065 1066%%---------------------------------------------------------------------- 1067%% Func: handle_info/2 1068%% Returns: {noreply, State} | 1069%% {noreply, State, Timeout} | 1070%% {stop, Reason, State} (terminate/2 is called) 1071%%---------------------------------------------------------------------- 1072 1073handle_info(timeout, S) -> 1074 S2 = check_size(S), 1075 noreply(S2); 1076handle_info({nodeup, Node}, S) -> 1077 Port = S#state.trace_port, 1078 MaxQueue = S#state.trace_max_queue, 1079 case rpc:call(Node, ?MODULE, monitor_trace_port, [self(), {Port, MaxQueue}]) of 1080 {ok, _} -> 1081 S2 = listen_on_trace_port(Node, Port, S), 1082 noreply(S2); 1083 {error, Reason} when Reason =:= already_started-> 1084 ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~tp~n", 1085 [?MODULE, self(), Node, Port, Reason]), 1086 S2 = S#state{trace_port = Port + 1}, 1087 noreply(S2); 1088 {badrpc, Reason} -> 1089 ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~tp~n", 1090 [?MODULE, self(), Node, Port, Reason]), 1091 S2 = S#state{trace_port = Port + 1}, 1092 noreply(S2); 1093 {error, Reason} -> 1094 self() ! {nodeup, Node}, 1095 ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~tp~n", 1096 [?MODULE, self(), Node, Port, Reason]), 1097 S2 = S#state{trace_port = Port + 1}, 1098 noreply(S2) 1099 end; 1100 1101handle_info({nodedown, Node}, S) -> 1102 noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]}); 1103 1104handle_info({register_trace_client, Pid}, S) -> 1105 link(Pid), 1106 noreply(S); 1107 1108handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid -> 1109 {stop, Reason, S}; 1110handle_info(Info = {'EXIT', Pid, Reason}, S) -> 1111 OldSubscribers = S#state.subscribers, 1112 case lists:member(Pid, OldSubscribers) of 1113 true when Reason =:= shutdown -> 1114 try 1115 S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), 1116 noreply(S2) 1117 catch 1118 throw:{stop, R} -> 1119 opt_unlink(S#state.parent_pid), 1120 {stop, R, S} 1121 end; 1122 true -> 1123 opt_unlink(S#state.parent_pid), 1124 {stop, Reason, S}; 1125 false -> 1126 ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n", 1127 [?MODULE, self(), Info, S]), 1128 noreply(S) 1129 end; 1130handle_info(Info, S) -> 1131 ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n", 1132 [?MODULE, self(), Info, S]), 1133 noreply(S). 1134 1135listen_on_trace_port(Node, Port, S) -> 1136 [_Name, Host] = string:lexemes(atom_to_list(Node), [$@]), 1137 case catch start_trace_client(self(), ip, {Host, Port}) of 1138 {trace_client_pid, RemotePid} -> 1139 rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]), 1140 link(RemotePid), 1141 S#state{trace_nodes = [Node | S#state.trace_nodes], 1142 trace_port = Port + 1}; 1143 {'EXIT', Reason} when Reason =:= already_started-> 1144 ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~tp~n", 1145 [?MODULE, self(), Node, Port, Reason]), 1146 S#state{trace_port = Port + 1}; 1147 {'EXIT', Reason} -> 1148 self() ! {nodeup, Node}, 1149 ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~tp~n", 1150 [?MODULE, self(), Node, Port, Reason]), 1151 S#state{trace_port = Port + 1} 1152 end. 1153 1154%%---------------------------------------------------------------------- 1155%% Func: terminate/2 1156%% Purpose: Shutdown the server 1157%% Returns: any (ignored by gen_server) 1158%%---------------------------------------------------------------------- 1159 1160terminate(Reason, S) -> 1161 Fun = fun(Pid) -> exit(Pid, Reason) end, 1162 lists:foreach(Fun, S#state.subscribers). 1163 1164%%---------------------------------------------------------------------- 1165%% Func: code_change/3 1166%% Purpose: Convert process state when code is changed 1167%% Returns: {ok, NewState} 1168%%---------------------------------------------------------------------- 1169 1170code_change(_OldVsn, S, _Extra) -> 1171 {ok, S}. 1172 1173%%%---------------------------------------------------------------------- 1174%%% Internal functions 1175%%%---------------------------------------------------------------------- 1176 1177do_clear_table(S) -> 1178 OldTab = S#state.event_tab, 1179 ets:delete(OldTab), 1180 NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), 1181 S#state{event_tab = NewTab}. 1182 1183do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) -> 1184 OldSubscribers = S#state.subscribers, 1185 NewSubscribers = 1186 case lists:member(Pid, OldSubscribers) of 1187 true -> 1188 OldSubscribers; 1189 false -> 1190 link(Pid), 1191 All = ets:match_object(S#state.dict_tab, '_'), 1192 lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All), 1193 [Pid | OldSubscribers] 1194 end, 1195 do_multicast(NewSubscribers, Msg), 1196 Size = ets:info(S#state.event_tab, size), 1197 do_multicast(NewSubscribers, {more_events, Size}), 1198 ets:insert(S#state.dict_tab, {Key, Val}), 1199 S#state{subscribers = NewSubscribers}; 1200do_dict_insert(Msg = {dict_insert, Key, Val}, S) -> 1201 do_multicast(S#state.subscribers, Msg), 1202 ets:insert(S#state.dict_tab, {Key, Val}), 1203 S. 1204 1205do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) -> 1206 OldSubscribers = S#state.subscribers, 1207 do_multicast(OldSubscribers, Msg), 1208 ets:delete(S#state.dict_tab, Key), 1209 case lists:member(Pid, OldSubscribers) of 1210 true -> 1211 unlink(Pid), 1212 S2 = S#state{subscribers = OldSubscribers -- [Pid]}, 1213 if 1214 S2#state.auto_shutdown, 1215 S2#state.subscribers =:= [] -> 1216 throw({stop, shutdown}); 1217 true -> 1218 S2 1219 end; 1220 false -> 1221 S 1222 end; 1223do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) -> 1224 S; 1225do_dict_delete(Msg = {dict_delete, Key}, S) -> 1226 do_multicast(S#state.subscribers, Msg), 1227 ets:delete(S#state.dict_tab, Key), 1228 S. 1229 1230tab_iterate(_Fun, _Tab, '$end_of_table', Acc) -> 1231 Acc; 1232tab_iterate(Fun, Tab, Key, Acc) -> 1233 Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)), 1234 tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2). 1235 1236file_open(F) -> 1237 Fd = make_ref(), 1238 case F#file.file_opt of 1239 write -> ok = file:rename(F#file.name, F#file.name ++ ".OLD"); 1240 append -> ok 1241 end, 1242 Args = [{file, F#file.name}, {name, Fd}, 1243 {repair, true}, {mode, read_write}], 1244 case disk_log:open(Args) of 1245 {ok, _} -> 1246 {ok, Fd}; 1247 {repaired, _, _, BadBytes} -> 1248 ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~tp~n", 1249 [?MODULE, BadBytes, F#file.name]), 1250 {ok, Fd}; 1251 {error,Reason} -> 1252 {error,Reason} 1253 end. 1254 1255parse_file_options(F, [H | T]) -> 1256 case H of 1257 existing -> parse_file_options(F#file{event_opt = existing} , T); 1258 %%new -> parse_file_options(F#file{event_opt = new} , T); 1259 all -> parse_file_options(F#file{event_opt = all} , T); 1260 write -> parse_file_options(F#file{file_opt = write} , T); 1261 append -> parse_file_options(F#file{file_opt = append} , T); 1262 keep -> parse_file_options(F#file{table_opt = keep} , T); 1263 clear -> parse_file_options(F#file{table_opt = clear} , T); 1264 Bad -> {error, {bad_file_option, Bad}} 1265 end; 1266parse_file_options(F, []) -> 1267 {ok, F}. 1268 1269do_multicast([Pid | Pids], Msg) -> 1270 Pid ! {et, Msg}, 1271 do_multicast(Pids, Msg); 1272do_multicast([], _Msg) -> 1273 ok. 1274 1275opt_unlink(Pid) -> 1276 if 1277 Pid =:= undefined -> 1278 ok; 1279 true -> 1280 unlink(Pid) 1281 end. 1282 1283reply(Reply, #state{subscribers = []} = S) -> 1284 {reply, Reply, S}; 1285reply(Reply, S) -> 1286 {reply, Reply, S, 500}. 1287 1288noreply(#state{subscribers = []} = S) -> 1289 {noreply, S}; 1290noreply(S) -> 1291 {noreply, S, 500}. 1292 1293check_size(S) -> 1294 Size = ets:info(S#state.event_tab, size), 1295 if 1296 Size =:= S#state.event_tab_size -> 1297 S; 1298 true -> 1299 %% Tell the subscribers that more events are available 1300 Msg = {more_events, Size}, 1301 do_multicast(S#state.subscribers, Msg), 1302 S#state{event_tab_size = Size} 1303 end. 1304