1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2000-2018. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20%%---------------------------------------------------------------------- 21%% Purpose: Collect trace events and provide a backing storage 22%% appropriate for iteration 23%%---------------------------------------------------------------------- 24 25-module(et_collector). 26 27-behaviour(gen_server). 28 29%% External exports 30-export([start_link/1, 31 stop/1, 32 33 report/2, 34 report_event/5, 35 report_event/6, 36 37 iterate/3, 38 iterate/5, 39 lookup/2, 40 41 start_trace_client/3, 42 start_trace_port/1, 43 %% load_event_file/2, 44 save_event_file/3, 45 clear_table/1, 46 47 get_global_pid/0, 48 %% get_table_handle/1, 49 get_table_size/1, 50 change_pattern/2, 51 make_key/2, 52 53 dict_insert/3, 54 dict_delete/2, 55 dict_lookup/2, 56 dict_match/2, 57 multicast/2]). 58 59%% Internal export 60-export([monitor_trace_port/2]). 61 62%% gen_server callbacks 63-export([init/1,terminate/2, code_change/3, 64 handle_call/3, handle_cast/2, handle_info/2]). 65 66-compile([{nowarn_deprecated_function,[{erlang,now,0}]}]). 67 68-include("et_internal.hrl"). 69-include("../include/et.hrl"). 70 71-record(state, {parent_pid, 72 auto_shutdown, % Optionally shutdown when the last subscriber dies 73 event_tab_size, 74 event_tab, 75 dict_tab, 76 event_order, 77 subscribers, 78 file, 79 trace_pattern, 80 trace_port, 81 trace_max_queue, 82 trace_nodes, 83 trace_global}). 84 85-record(file, {name, desc, event_opt, file_opt, table_opt}). 86 87-record(table_handle, {collector_pid, event_tab, event_order, filter}). 88 89-record(trace_ts, {trace_ts, event_ts}). 90-record(event_ts, {event_ts, trace_ts}). 91 92%%%---------------------------------------------------------------------- 93%%% Client side 94%%%---------------------------------------------------------------------- 95 96%%---------------------------------------------------------------------- 97%% start_link(Options) -> {ok, CollectorPid} | {error, Reason} 98%% 99%% Start a collector process 100%% 101%% The collector collects trace events and keeps them ordered by their 102%% timestamp. The timestamp may either reflect the time when the 103%% actual trace data was generated (trace_ts) or when the trace data 104%% was transformed into an event record (event_ts). If the time stamp 105%% is missing in the trace data (missing timestamp option to 106%% erlang:trace/4) the trace_ts will be set to the event_ts. 107%% 108%% Events are reported to the collector directly with the report 109%% function or indirectly via one or more trace clients. All reported 110%% events are first filtered thru the collector filter before they are 111%% stored by the collector. By replacing the default collector filter 112%% with a customized dito it is possible to allow any trace data as 113%% input. The collector filter is a dictionary entry with the 114%% predefined key {filter, all} and the value is a fun of 115%% arity 1. See et_selector:parse_event/2 for interface details, 116%% such as which erlang:trace/1 tuples that are accepted. 117%% 118%% The collector has a built-in dictionary service. Any term may be 119%% stored as value in the dictionary and bound to a unique key. When 120%% new values are inserted with an existing key, the new values will 121%% overwrite the existing ones. Processes may subscribe on dictionary 122%% updates by using {subscriber, pid()} as dictionary key. All 123%% dictionary updates will be propagated to the subscriber processes 124%% matching the pattern {{subscriber, '_'}, '_'} where the first '_' 125%% is interpreted as a pid(). 126%% 127%% In global trace mode, the collector will automatically start 128%% tracing on all connected Erlang nodes. When a node connects, a port 129%% tracer will be started on that node and a corresponding trace 130%% client on the collector node. By default the global trace pattern 131%% is 'max'. 132%% 133%% Options = [option()] 134%% 135%% option() = 136%% {parent_pid, pid()} | 137%% {event_order, event_order()} | 138%% {dict_insert, {filter, all}, collector_fun()} | 139%% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} | 140%% {dict_insert, {subscriber, pid()}, dict_val()} | 141%% {dict_insert, dict_key(), dict_val()} | 142%% {dict_delete, dict_key()} | 143%% {trace_client, trace_client()} | 144%% {trace_global, boolean()} | 145%% {trace_pattern, trace_pattern()} | 146%% {trace_port, integer()} | 147%% {trace_max_queue, integer()} 148%% 149%% event_order() = trace_ts | event_ts 150%% trace_pattern() = detail_level() | dbg_match_spec() 151%% detail_level() = min | max | integer(X) when X >= 0, X =< 100 152%% trace_client() = 153%% {event_file, file_name()} | 154%% {dbg_trace_type(), dbg_trace_parameters()} 155%% file_name() = string() 156%% collector_fun() = trace_filter_fun() | event_filter_fun() 157%% trace_filter_fun() = fun(TraceData) -> false | true | {true, NewEvent} 158%% event_filter_fun() = fun(Event) -> false | true | {true, NewEvent} 159%% event_filter_name() = atom() 160%% TraceData = erlang_trace_data() 161%% Event = NewEvent = record(event) 162%% dict_key() = term() 163%% dict_val() = term() 164%% 165%% CollectorPid = pid() 166%% Reason = term() 167%%---------------------------------------------------------------------- 168 169start_link(Options) -> 170 case parse_opt(Options, default_state(), [], []) of 171 {ok, S, Dict2, Clients} -> 172 Res = 173 case S#state.trace_global of 174 false -> 175 gen_server:start_link(?MODULE, [S, Dict2], []); 176 true -> 177 gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) 178 end, 179 case Res of 180 {ok, Pid} when S#state.parent_pid =/= self() -> 181 unlink(Pid), 182 start_clients(Pid, Clients); 183 {ok,Pid} -> 184 start_clients(Pid, Clients); 185 {error, Reason} -> 186 {error, Reason} 187 end; 188 {error, Reason} -> 189 {error, Reason} 190 end. 191 192default_state() -> 193 #state{parent_pid = self(), 194 auto_shutdown = false, 195 event_order = trace_ts, 196 subscribers = [], 197 trace_global = false, 198 trace_pattern = undefined, 199 trace_nodes = [], 200 trace_port = 4711, 201 trace_max_queue = 50}. 202 203parse_opt([], S, Dict, Clients) -> 204 {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern), 205 Fun = fun(E) -> et_selector:parse_event(Mod, E) end, 206 Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun}, 207 {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients}; 208parse_opt([H | T], S, Dict, Clients) -> 209 case H of 210 {parent_pid, Parent} when Parent =:= undefined -> 211 parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); 212 {parent_pid, Parent} when is_pid(Parent) -> 213 parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); 214 {auto_shutdown, Bool} when Bool =:= true; Bool =:= false -> 215 parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients); 216 {event_order, Order} when Order =:= trace_ts -> 217 parse_opt(T, S#state{event_order = Order}, Dict, Clients); 218 {event_order, Order} when Order =:= event_ts -> 219 parse_opt(T, S#state{event_order = Order}, Dict, Clients); 220 {dict_insert, {filter, Name}, Fun} -> 221 if 222 is_atom(Name), is_function(Fun) -> 223 parse_opt(T, S, Dict ++ [H], Clients); 224 true -> 225 {error, {bad_option, H}} 226 end; 227 {dict_insert, {subscriber, Pid}, _Val} -> 228 if 229 is_pid(Pid) -> 230 parse_opt(T, S, Dict ++ [H], Clients); 231 true -> 232 {error, {bad_option, H}} 233 end; 234 {dict_insert, _Key, _Val} -> 235 parse_opt(T, S, Dict ++ [H], Clients); 236 {dict_delete, _Key} -> 237 parse_opt(T, S, Dict ++ [H], Clients); 238 {trace_client, Client = {_, _}} -> 239 parse_opt(T, S, Dict, Clients ++ [Client]); 240 {trace_global, Bool} when Bool =:= false -> 241 parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); 242 {trace_global, Bool} when Bool =:= true -> 243 parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); 244 {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) -> 245 parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); 246 {trace_pattern, undefined = Pattern} -> 247 parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); 248 {trace_port, Port} when is_integer(Port) -> 249 parse_opt(T, S#state{trace_port = Port}, Dict, Clients); 250 {trace_max_queue, MaxQueue} when is_integer(MaxQueue) -> 251 parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients); 252 Bad -> 253 {error, {bad_option, Bad}} 254 end; 255parse_opt(BadList, _S, _Dict, _Clients) -> 256 {error, {bad_option_list, BadList}}. 257 258start_clients(CollectorPid, [{Type, Parameters} | T]) -> 259 _ = start_trace_client(CollectorPid, Type, Parameters), 260 start_clients(CollectorPid, T); 261start_clients(CollectorPid, []) -> 262 {ok, CollectorPid}. 263 264%%---------------------------------------------------------------------- 265%% stop(CollectorPid) -> ok 266%% 267%% Stop a collector process 268%% 269%% CollectorPid = pid() 270%%---------------------------------------------------------------------- 271 272stop(CollectorPid) -> 273 call(CollectorPid, stop). 274 275%%---------------------------------------------------------------------- 276%% save_event_file(CollectorPid, FileName, Options) -> ok | {error, Reason} 277%% 278%% Saves the events to a file 279%% 280%% CollectorPid = pid() 281%% FileName = string() 282%% Options = [option()] 283%% Reason = term() 284%% 285%% option() = event_option() | file_option() | table_option() 286%% event_option() = existing 287%% file_option() = write | append 288%% table_option() = keep | clear 289%% 290%% By default the currently stored events (existing) are 291%% written to a brand new file (write) and the events are 292%% kept (keep) after they have been written to the file. 293%% 294%% Instead of keeping the events after writing them to file, 295%% it is possible to remove all stored events after they 296%% have successfully written to file (clear). 297%% 298%% The options defaults to existing, write and keep. 299%%---------------------------------------------------------------------- 300 301save_event_file(CollectorPid, FileName, Options) -> 302 call(CollectorPid, {save_event_file, FileName, Options}). 303 304%%---------------------------------------------------------------------- 305%% load_event_file(CollectorPid, FileName) ->{ok, BadBytes} | exit(Reason) 306%% 307%% Load the event table from a file 308%% 309%% CollectorPid = pid() 310%% FileName = string() 311%% BadBytes = integer(X) where X >= 0 312%% Reason = term() 313%%---------------------------------------------------------------------- 314 315load_event_file(CollectorPid, FileName) -> 316 Fd = make_ref(), 317 Args = [{file, FileName}, {name, Fd}, {repair, true}, {mode, read_only}], 318 Fun = fun(Event, {ok, TH}) -> report(TH, Event) end, 319 case disk_log:open(Args) of 320 {ok, _} -> 321 do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, 0); 322 {repaired, _, _, BadBytes} -> 323 do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, BadBytes); 324 {error, Reason} -> 325 exit({disk_log_open, FileName, Reason}) 326 end. 327 328do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) -> 329 case disk_log:chunk(Fd, Cont) of 330 eof -> 331 {ok, BadBytes}; 332 {error, Reason} -> 333 exit({bad_disk_log_chunk, FileName, Reason}); 334 {Cont2, Events} -> 335 Acc2 = lists:foldl(Fun, Acc, Events), 336 do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes); 337 {Cont2, Events, More} -> 338 Acc2 = lists:foldl(Fun, Acc, Events), 339 do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes + More) 340 end. 341 342%%---------------------------------------------------------------------- 343%% report(Handle, TraceOrEvent) 344%% 345%% Report an event to the collector 346%% 347%% All events are filtered thru the collector filter, which 348%% optionally may transform or discard the event. The first 349%% call should use the pid of the collector process as 350%% report handle, while subsequent calls should use the 351%% table handle. 352%% 353%% Handle = Initial | Continuation 354%% Initial = collector_pid() 355%% collector_pid() = pid() 356%% Continuation = record(table_handle) 357%% 358%% TraceOrEvent = record(event) | dbg_trace_tuple() | end_of_trace 359%% Reason = term() 360%% 361%% Returns: {ok, Continuation} | exit(Reason) 362%%---------------------------------------------------------------------- 363 364report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) -> 365 case get_table_handle(CollectorPid) of 366 {ok, TH} when is_record(TH, table_handle) -> 367 report(TH, TraceOrEvent); 368 {error, Reason} -> 369 exit(Reason) 370 end; 371report(TH, TraceOrEvent) when is_record(TH, table_handle) -> 372 Fun = TH#table_handle.filter, 373 case Fun(TraceOrEvent) of 374 false -> 375 {ok, TH}; 376 true when is_record(TraceOrEvent, event) -> 377 Key = make_key(TH, TraceOrEvent), 378 case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of 379 true -> 380 {ok, TH}; 381 {'EXIT', _Reason} -> 382 %% Refresh the report handle and try again 383 report(TH#table_handle.collector_pid, TraceOrEvent) 384 end; 385 {true, Event} when is_record(Event, event) -> 386 Key = make_key(TH, Event), 387 case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of 388 true -> 389 {ok, TH}; 390 {'EXIT', _Reason} -> 391 %% Refresh the report handle and try again 392 report(TH#table_handle.collector_pid, TraceOrEvent) 393 end; 394 BadEvent -> 395 TS = erlang:now(), 396 Contents = [{trace, TraceOrEvent}, {reason, BadEvent}, {filter, Fun}], 397 Event = #event{detail_level = 0, 398 trace_ts = TS, 399 event_ts = TS, 400 from = bad_filter, 401 to = bad_filter, 402 label = bad_filter, 403 contents = Contents}, 404 Key = make_key(TH, Event), 405 case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of 406 true -> 407 {ok, TH}; 408 {'EXIT', _Reason} -> 409 %% Refresh the report handle and try again 410 report(TH#table_handle.collector_pid, TraceOrEvent) 411 end 412 end; 413report(_, Bad) -> 414 exit({bad_event, Bad}). 415 416report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) -> 417 report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents). 418 419report_event(CollectorPid, DetailLevel, From, To, Label, Contents) 420 when is_integer(DetailLevel), 421 DetailLevel >= ?detail_level_min, 422 DetailLevel =< ?detail_level_max -> 423 TS= erlang:now(), 424 E = #event{detail_level = DetailLevel, 425 trace_ts = TS, 426 event_ts = TS, 427 from = From, 428 to = To, 429 label = Label, 430 contents = Contents}, 431 report(CollectorPid, E). 432 433%%---------------------------------------------------------------------- 434%% make_key(Type, Stuff) -> Key 435%% 436%% Makes a key out of an event record or an old key 437%% 438%% Type = record(table_handle) | trace_ts | event_ts 439%% Stuff = record(event) | Key 440%% Key = record(event_ts) | record(trace_ts) 441%%---------------------------------------------------------------------- 442 443make_key(TH, Stuff) when is_record(TH, table_handle) -> 444 make_key(TH#table_handle.event_order, Stuff); 445make_key(trace_ts, Stuff) -> 446 if 447 is_record(Stuff, event) -> 448 #event{trace_ts = R, event_ts = P} = Stuff, 449 #trace_ts{trace_ts = R, event_ts = P}; 450 is_record(Stuff, trace_ts) -> 451 Stuff; 452 is_record(Stuff, event_ts) -> 453 #event_ts{trace_ts = R, event_ts = P} = Stuff, 454 #trace_ts{trace_ts = R, event_ts = P} 455 end; 456make_key(event_ts, Stuff) -> 457 if 458 is_record(Stuff, event) -> 459 #event{trace_ts = R, event_ts = P} = Stuff, 460 #event_ts{trace_ts = R, event_ts = P}; 461 is_record(Stuff, event_ts) -> 462 Stuff; 463 is_record(Stuff, trace_ts) -> 464 #trace_ts{trace_ts = R, event_ts = P} = Stuff, 465 #event_ts{trace_ts = R, event_ts = P} 466 end. 467 468%%---------------------------------------------------------------------- 469%%---------------------------------------------------------------------- 470 471get_table_size(CollectorPid) when is_pid(CollectorPid) -> 472 call(CollectorPid, get_table_size). 473 474%%---------------------------------------------------------------------- 475%% get_table_handle(CollectorPid) -> Handle 476%% 477%% Return a table handle 478%% 479%% CollectorPid = pid() 480%% Handle = record(table_handle) 481%%---------------------------------------------------------------------- 482 483get_table_handle(CollectorPid) when is_pid(CollectorPid) -> 484 call(CollectorPid, get_table_handle). 485 486%%---------------------------------------------------------------------- 487%% get_global_pid() -> CollectorPid | exit(Reason) 488%% 489%% Return a the identity of the globally registered collector 490%% if there is any 491%% 492%% CollectorPid = pid() 493%% Reason = term() 494%%---------------------------------------------------------------------- 495 496get_global_pid() -> 497 case global:whereis_name(?MODULE) of 498 CollectorPid when is_pid(CollectorPid) -> 499 CollectorPid; 500 undefined -> 501 exit(global_collector_not_started) 502 end. 503 504%%---------------------------------------------------------------------- 505%% change_pattern(CollectorPid, RawPattern) -> {old_pattern, TracePattern} 506%% 507%% Change active trace pattern globally on all trace nodes 508%% 509%% CollectorPid = pid() 510%% RawPattern = {report_module(), extended_dbg_match_spec()} 511%% report_module() = atom() | undefined 512%% extended_dbg_match_spec() = detail_level() | dbg_match_spec() 513%% RawPattern = detail_level() 514%% detail_level() = min | max | integer(X) when X =< 0, X >= 100 515%% TracePattern = {report_module(), dbg_match_spec_match_spec()} 516%%---------------------------------------------------------------------- 517 518change_pattern(CollectorPid, RawPattern) -> 519 Pattern = et_selector:make_pattern(RawPattern), 520 call(CollectorPid, {change_pattern, Pattern}). 521 522%%---------------------------------------------------------------------- 523%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok 524%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok 525%% dict_insert(CollectorPid, Key, Val) -> ok 526%% 527%% Insert a dictionary entry 528%% and send a {et, {dict_insert, Key, Val}} tuple 529%% to all registered subscribers. 530%% 531%% If the entry is a new subscriber, it will imply that 532%% the new subscriber process first will get one message 533%% for each already stored dictionary entry, before it 534%% and all old subscribers will get this particular entry. 535%% The collector process links to and then supervises the 536%% subscriber process. If the subscriber process dies it 537%% will imply that it gets unregistered as with a normal 538%% dict_delete/2. 539%% 540%% CollectorPid = pid() 541%% FilterFun = filter_fun() 542%% SubscriberPid = pid() 543%% Void = term() 544%% Key = term() 545%% Val = term() 546%%---------------------------------------------------------------------- 547 548dict_insert(CollectorPid, Key = {filter, Name}, Fun) -> 549 if 550 is_atom(Name), is_function(Fun) -> 551 call(CollectorPid, {dict_insert, Key, Fun}); 552 true -> 553 exit({badarg, Key}) 554 end; 555dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) -> 556 if 557 is_pid(Pid) -> 558 call(CollectorPid, {dict_insert, Key, Val}); 559 true -> 560 exit({badarg, Key}) 561 end; 562dict_insert(CollectorPid, Key, Val) -> 563 call(CollectorPid, {dict_insert, Key, Val}). 564 565%%---------------------------------------------------------------------- 566%% dict_lookup(CollectorPid, Key) -> [Val] 567%% 568%% Lookup a dictionary entry and return zero or one value 569%% 570%% CollectorPid = pid() 571%% Key = term() 572%% Val = term() 573%%---------------------------------------------------------------------- 574 575dict_lookup(CollectorPid, Key) -> 576 call(CollectorPid, {dict_lookup, Key}). 577 578%%---------------------------------------------------------------------- 579%% Ddict_delete(CollectorPid, Key) -> ok 580%% 581%% elete a dictionary entry 582%% and send a {et, {dict_delete, Key}} tuple 583%% to all registered subscribers. 584%% 585%% If the deleted entry is a registered subscriber, it will 586%% imply that the subscriber process gets is unregistered as 587%% subscriber as well as it gets it final message. 588%% 589%% dict_delete(CollectorPid, {subscriber, SubscriberPid}) 590%% dict_delete(CollectorPid, Key) 591%% 592%% CollectorPid = pid() 593%% SubscriberPid = pid() 594%% Key = term() 595%%---------------------------------------------------------------------- 596 597dict_delete(CollectorPid, Key) -> 598 call(CollectorPid, {dict_delete, Key}). 599 600%%---------------------------------------------------------------------- 601%% dict_match(CollectorPid, Pattern) -> [Match] 602%% 603%% Match some dictionary entries 604%% 605%% CollectorPid = pid() 606%% Pattern = '_' | {key_pattern(), val_pattern()} 607%% key_pattern() = ets_match_object_pattern() 608%% val_pattern() = ets_match_object_pattern() 609%% Match = {key(), val()} 610%% key() = term() 611%% val() = term() 612%%---------------------------------------------------------------------- 613 614dict_match(CollectorPid, Pattern) -> 615 call(CollectorPid, {dict_match, Pattern}). 616 617%%---------------------------------------------------------------------- 618%% multicast(_CollectorPid, Msg) -> ok 619%% 620%% Sends a message to all registered subscribers 621%% 622%% CollectorPid = pid() 623%% Msg = term() 624%%---------------------------------------------------------------------- 625 626multicast(_CollectorPid, Msg = {dict_insert, _Key, _Val}) -> 627 exit({badarg, Msg}); 628multicast(_CollectorPid, Msg = {dict_delete, _Key}) -> 629 exit({badarg, Msg}); 630multicast(CollectorPid, Msg) -> 631 call(CollectorPid, {multicast, Msg}). 632 633%%---------------------------------------------------------------------- 634%% start_trace_client(CollectorPid, Type, Parameters) -> 635%% file_loaded | {trace_client_pid, pid()} | exit(Reason) 636%% 637%% Load raw Erlang trace from a file, port or process. 638%% 639%% Type = dbg_trace_client_type() 640%% Parameters = dbg_trace_client_parameters() 641%% Pid = dbg_trace_client_pid() 642%%---------------------------------------------------------------------- 643 644start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file -> 645 load_event_file(CollectorPid, FileName); 646start_trace_client(CollectorPid, Type, FileName) when Type =:= file -> 647 WaitFor = {make_ref(), end_of_trace}, 648 EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end, 649 EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end, 650 Spec = trace_spec_wrapper(EventFun, EndFun, {self(), {ok, CollectorPid}}), 651 Pid = dbg:trace_client(Type, FileName, Spec), 652 unlink(Pid), 653 Ref = erlang:monitor(process, Pid), 654 receive 655 WaitFor -> 656 erlang:demonitor(Ref, [flush]), 657 file_loaded; 658 {'DOWN', Ref, _, _, Reason} -> 659 exit(Reason) 660 end; 661start_trace_client(CollectorPid, Type, Parameters) -> 662 EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end, 663 EndFun = fun(Acc) -> Acc end, 664 Spec = trace_spec_wrapper(EventFun, EndFun, {ok, CollectorPid}), 665 Pid = dbg:trace_client(Type, Parameters, Spec), 666 CollectorPid ! {register_trace_client, Pid}, 667 unlink(Pid), 668 {trace_client_pid, Pid}. 669 670trace_spec_wrapper(EventFun, EndFun, EventInitialAcc) 671 when is_function(EventFun), is_function(EndFun) -> 672 {fun(Trace, Acc) -> 673 case Trace =:= end_of_trace of 674 true -> EndFun(Acc); 675 false -> EventFun(Trace, Acc) 676 end 677 end, 678 EventInitialAcc}. 679 680start_trace_port(Parameters) -> 681 dbg:tracer(port, dbg:trace_port(ip, Parameters)). 682 683monitor_trace_port(CollectorPid, Parameters) -> 684 Res = start_trace_port(Parameters), 685 spawn(fun() -> 686 MonitorRef = erlang:monitor(process, CollectorPid), 687 receive 688 {'DOWN', MonitorRef, _, _, _} -> 689 dbg:stop_clear() 690 end 691 end), 692 Res. 693 694%%---------------------------------------------------------------------- 695%% iterate(Handle, Prev, Limit) -> 696%% iterate(Handle, Prev, Limit, undefined, Prev) 697%% 698%% Iterates over the currently stored events 699%% 700%% Short for iterate/5. 701%%---------------------------------------------------------------------- 702 703iterate(Handle, Prev, Limit) -> 704 iterate(Handle, Prev, Limit, undefined, Prev). 705 706%%---------------------------------------------------------------------- 707%% iterate(Handle, Prev, Limit, Fun, Acc) -> NewAcc 708%% 709%% Iterates over the currently stored events and apply a function for 710%% each event. The iteration may be performed forwards or backwards 711%% and may be limited to a maximum number of events (abs(Limit)). 712%% 713%% Handle = collector_pid() | table_handle() 714%% Prev = first | last | event_key() 715%% Limit = done() | forward() | backward() 716%% collector_pid() = pid() 717%% table_handle() = record(table_handle) 718%% event_key() = 719%% done() = 0 720%% forward() = infinity | integer(X) where X > 0 721%% backward() = '-infinity' | integer(X) where X < 0 722%% Fun = fun(Event, Acc) -> NewAcc 723%% Acc = NewAcc = term() 724%%---------------------------------------------------------------------- 725 726iterate(_, _, Limit, _, Acc) when Limit =:= 0 -> 727 Acc; 728iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) -> 729 case get_table_handle(CollectorPid) of 730 {ok, TH} when is_record(TH, table_handle) -> 731 iterate(TH, Prev, Limit, Fun, Acc); 732 {error, Reason} -> 733 exit(Reason) 734 end; 735iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) -> 736 if 737 Limit =:= infinity -> 738 next_iterate(TH, Prev, Limit, Fun, Acc); 739 is_integer(Limit), Limit > 0 -> 740 next_iterate(TH, Prev, Limit, Fun, Acc); 741 Limit =:= '-infinity' -> 742 prev_iterate(TH, Prev, Limit, Fun, Acc); 743 is_integer(Limit), Limit < 0 -> 744 prev_iterate(TH, Prev, Limit, Fun, Acc) 745 end. 746 747next_iterate(TH, Prev = first, Limit, Fun, Acc) -> 748 Tab = TH#table_handle.event_tab, 749 case catch ets:first(Tab) of 750 '$end_of_table' -> 751 Acc; 752 {'EXIT', _} = Error -> 753 io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]), 754 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 755 First -> 756 lookup_and_apply(TH, Prev, First, Limit, -1, Fun, Acc) 757 end; 758next_iterate(TH, Prev = last, Limit, Fun, Acc) -> 759 Tab = TH#table_handle.event_tab, 760 case catch ets:last(Tab) of 761 '$end_of_table' -> 762 Acc; 763 {'EXIT', _} = Error -> 764 io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]), 765 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 766 Last -> 767 lookup_and_apply(TH, Prev, Last, Limit, -1, Fun, Acc) 768 end; 769next_iterate(TH, Prev, Limit, Fun, Acc) -> 770 Tab = TH#table_handle.event_tab, 771 Key = make_key(TH, Prev), 772 case catch ets:next(Tab, Key) of 773 '$end_of_table' -> 774 Acc; 775 {'EXIT', _} = Error -> 776 io:format("~p(~p): Next ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]), 777 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 778 Next -> 779 lookup_and_apply(TH, Prev, Next, Limit, -1, Fun, Acc) 780 end. 781 782prev_iterate(TH, Prev = first, Limit, Fun, Acc) -> 783 Tab = TH#table_handle.event_tab, 784 case catch ets:first(Tab) of 785 '$end_of_table' -> 786 Acc; 787 {'EXIT', _} = Error -> 788 io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]), 789 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 790 First -> 791 lookup_and_apply(TH, Prev, First, Limit, 1, Fun, Acc) 792 end; 793prev_iterate(TH, Prev = last, Limit, Fun, Acc) -> 794 Tab = TH#table_handle.event_tab, 795 case catch ets:last(Tab) of 796 '$end_of_table' -> 797 Acc; 798 {'EXIT', _} = Error -> 799 io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]), 800 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 801 Last -> 802 lookup_and_apply(TH, Prev, Last, Limit, 1, Fun, Acc) 803 end; 804prev_iterate(TH, Prev, Limit, Fun, Acc) -> 805 Tab = TH#table_handle.event_tab, 806 Key = make_key(TH, Prev), 807 case catch ets:prev(Tab, Key) of 808 '$end_of_table' -> 809 Acc; 810 {'EXIT', _} = Error -> 811 io:format("~p(~p): Prev ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]), 812 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 813 Next -> 814 lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc) 815 end. 816 817lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined -> 818 Limit2 = incr(Limit, Incr), 819 iterate(TH, Next, Limit2, Fun, Next); 820lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) -> 821 Tab = TH#table_handle.event_tab, 822 case catch ets:lookup_element(Tab, Next, 2) of 823 {'EXIT', _} -> 824 iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); 825 E when is_record(E, event) -> 826 Acc2 = Fun(E, Acc), 827 Limit2 = incr(Limit, Incr), 828 iterate(TH, Next, Limit2, Fun, Acc2) 829 end. 830 831lookup(CollectorPid, Key) when is_pid(CollectorPid) -> 832 case get_table_handle(CollectorPid) of 833 {ok, TH} when is_record(TH, table_handle) -> 834 lookup(TH, Key); 835 {error, Reason} -> 836 {error, Reason} 837 end; 838lookup(TH, Key) when is_record(TH, table_handle) -> 839 Tab = TH#table_handle.event_tab, 840 case catch ets:lookup_element(Tab, Key, 2) of 841 {'EXIT', _} -> 842 {error, enoent}; 843 E when is_record(E, event) -> 844 {ok, E} 845 end. 846 847incr(Val, Incr) -> 848 if 849 Val =:= infinity -> Val; 850 Val =:= '-infinity' -> Val; 851 is_integer(Val) -> Val + Incr 852 end. 853 854%%---------------------------------------------------------------------- 855%% clear_table(Handle) -> ok 856%% 857%% Clear the event table 858%% 859%% Handle = collector_pid() | table_handle() 860%% collector_pid() = pid() 861%% table_handle() = record(table_handle) 862%%---------------------------------------------------------------------- 863 864clear_table(CollectorPid) when is_pid(CollectorPid) -> 865 call(CollectorPid, clear_table); 866clear_table(TH) when is_record(TH, table_handle) -> 867 clear_table(TH#table_handle.collector_pid). 868 869call(CollectorPid, Request) -> 870 try 871 gen_server:call(CollectorPid, Request, infinity) 872 catch 873 exit:{noproc,_} -> 874 {error, no_collector} 875 end. 876 877%%%---------------------------------------------------------------------- 878%%% Callback functions from gen_server 879%%%---------------------------------------------------------------------- 880 881%%---------------------------------------------------------------------- 882%% Func: init/1 883%% Returns: {ok, State} | 884%% {ok, State, Timeout} | 885%% ignore | 886%% {stop, Reason} 887%%---------------------------------------------------------------------- 888 889init([InitialS, Dict]) -> 890 process_flag(trap_exit, true), 891 case InitialS#state.parent_pid of 892 undefined -> 893 ok; 894 Pid when is_pid(Pid) -> 895 link(Pid) 896 end, 897 Funs = [fun init_tables/1, 898 fun init_global/1, 899 fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end], 900 {ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}. 901 902init_tables(S) -> 903 EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), 904 DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]), 905 S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}. 906 907init_global(S) -> 908 case S#state.trace_global of 909 true -> 910 EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end, 911 EndFun = fun(Acc) -> Acc end, 912 Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}), 913 dbg:tracer(process, Spec), 914 et_selector:change_pattern(S#state.trace_pattern), 915 ok = net_kernel:monitor_nodes(true), 916 lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()), 917 S#state{trace_nodes = [node()]}; 918 false -> 919 S 920 end. 921 922%%---------------------------------------------------------------------- 923%% Func: handle_call/3 924%% Returns: {reply, Reply, State} | 925%% {reply, Reply, State, Timeout} | 926%% {noreply, State} | 927%% {noreply, State, Timeout} | 928%% {stop, Reason, Reply, State} | (terminate/2 is called) 929%% {stop, Reason, State} (terminate/2 is called) 930%%---------------------------------------------------------------------- 931 932handle_call({multicast, Msg}, _From, S) -> 933 do_multicast(S#state.subscribers, Msg), 934 reply(ok, S); 935 936handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) -> 937 S2 = do_dict_insert(Msg, S), 938 reply(ok, S2); 939 940handle_call(Msg = {dict_delete, _Key}, _From, S) -> 941 try 942 S2 = do_dict_delete(Msg, S), 943 reply(ok, S2) 944 catch 945 throw:{stop, R} -> 946 opt_unlink(S#state.parent_pid), 947 {stop, R, S} 948 end; 949handle_call({dict_lookup, Key}, _From, S) -> 950 Reply = ets:lookup(S#state.dict_tab, Key), 951 reply(Reply, S); 952 953handle_call({dict_match, Pattern}, _From, S) -> 954 case catch ets:match_object(S#state.dict_tab, Pattern) of 955 {'EXIT', _Reason} -> 956 reply([], S); 957 Matching -> 958 reply(Matching, S) 959 end; 960 961handle_call(get_table_handle, _From, S) -> 962 [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}), 963 TH = #table_handle{collector_pid = self(), 964 event_tab = S#state.event_tab, 965 event_order = S#state.event_order, 966 filter = TableFilter}, 967 reply({ok, TH}, S); 968 969handle_call(get_table_size, _From, S) -> 970 Size = ets:info(S#state.event_tab, size), 971 reply({ok, Size}, S); 972 973handle_call(close, _From, S) -> 974 case S#state.file of 975 undefined -> 976 reply({error, file_not_open}, S); 977 F -> 978 Reply = disk_log:close(F#file.desc), 979 S2 = S#state{file = undefined}, 980 reply(Reply, S2) 981 end; 982handle_call({save_event_file, FileName, Options}, _From, S) -> 983 Default = #file{name = FileName, 984 event_opt = existing, 985 file_opt = write, 986 table_opt = keep}, 987 case parse_file_options(Default, Options) of 988 {ok, F} when is_record(F, file) -> 989 case file_open(F) of 990 {ok, Fd} -> 991 F2 = F#file{desc = Fd}, 992 {Reply2, S3} = 993 case F2#file.event_opt of 994 %% new -> 995 %% Reply = ok, 996 %% S2 = S#state{file = F}, 997 %% {Reply, S2}; 998 %% 999 %% insert() -> 1000 %% case S2#state.file of 1001 %% undefined -> 1002 %% ok; 1003 %% F -> 1004 %% Fd = F#file.desc, 1005 %% ok = disk_log:log(Fd, Event) 1006 %% end. 1007 existing -> 1008 Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end, 1009 Tab = S#state.event_tab, 1010 Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok), 1011 ok = disk_log:close(Fd), 1012 {Reply, S} 1013 %% all -> 1014 %% Reply = tab_iterate(WriteFun, Tab, ok), 1015 %% S2 = S#state{file = F}, 1016 %% {Reply, S2} 1017 end, 1018 case F2#file.table_opt of 1019 keep -> 1020 reply(Reply2, S3); 1021 clear -> 1022 S4 = do_clear_table(S3), 1023 reply(Reply2, S4) 1024 end; 1025 {error, Reason} -> 1026 reply({error, {file_open, Reason}}, S) 1027 end; 1028 {error, Reason} -> 1029 reply({error, Reason}, S) 1030 end; 1031 1032handle_call({change_pattern, Pattern}, _From, S) -> 1033 Ns = S#state.trace_nodes, 1034 {_,[]} = rpc:multicall(Ns, et_selector, change_pattern, [Pattern]), 1035 Reply = {old_pattern, S#state.trace_pattern}, 1036 S2 = S#state{trace_pattern = Pattern}, 1037 reply(Reply, S2); 1038 1039handle_call(clear_table, _From, S) -> 1040 S2 = do_clear_table(S), 1041 reply(ok, S2); 1042 1043handle_call(stop, _From, S) -> 1044 do_multicast(S#state.subscribers, close), 1045 case S#state.trace_global of 1046 true -> {_,[]} = rpc:multicall(S#state.trace_nodes, dbg, stop_clear, []), 1047 ok; 1048 false -> ok 1049 end, 1050 {stop, shutdown, ok, S}; 1051handle_call(Request, From, S) -> 1052 ok = error_logger:format("~p(~p): handle_call(~tp, ~tp, ~tp)~n", 1053 [?MODULE, self(), Request, From, S]), 1054 reply({error, {bad_request, Request}}, S). 1055 1056%%---------------------------------------------------------------------- 1057%% Func: handle_cast/2 1058%% Returns: {noreply, State} | 1059%% {noreply, State, Timeout} | 1060%% {stop, Reason, State} (terminate/2 is called) 1061%%---------------------------------------------------------------------- 1062 1063handle_cast(Msg, S) -> 1064 ok = error_logger:format("~p(~p): handle_cast(~tp, ~tp)~n", 1065 [?MODULE, self(), Msg, S]), 1066 noreply(S). 1067 1068%%---------------------------------------------------------------------- 1069%% Func: handle_info/2 1070%% Returns: {noreply, State} | 1071%% {noreply, State, Timeout} | 1072%% {stop, Reason, State} (terminate/2 is called) 1073%%---------------------------------------------------------------------- 1074 1075handle_info(timeout, S) -> 1076 S2 = check_size(S), 1077 noreply(S2); 1078handle_info({nodeup, Node}, S) -> 1079 Port = S#state.trace_port, 1080 MaxQueue = S#state.trace_max_queue, 1081 case rpc:call(Node, ?MODULE, monitor_trace_port, [self(), {Port, MaxQueue}]) of 1082 {ok, _} -> 1083 S2 = listen_on_trace_port(Node, Port, S), 1084 noreply(S2); 1085 {error, Reason} when Reason =:= already_started-> 1086 ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~tp~n", 1087 [?MODULE, self(), Node, Port, Reason]), 1088 S2 = S#state{trace_port = Port + 1}, 1089 noreply(S2); 1090 {badrpc, Reason} -> 1091 ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~tp~n", 1092 [?MODULE, self(), Node, Port, Reason]), 1093 S2 = S#state{trace_port = Port + 1}, 1094 noreply(S2); 1095 {error, Reason} -> 1096 self() ! {nodeup, Node}, 1097 ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~tp~n", 1098 [?MODULE, self(), Node, Port, Reason]), 1099 S2 = S#state{trace_port = Port + 1}, 1100 noreply(S2) 1101 end; 1102 1103handle_info({nodedown, Node}, S) -> 1104 noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]}); 1105 1106handle_info({register_trace_client, Pid}, S) -> 1107 link(Pid), 1108 noreply(S); 1109 1110handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid -> 1111 {stop, Reason, S}; 1112handle_info(Info = {'EXIT', Pid, Reason}, S) -> 1113 OldSubscribers = S#state.subscribers, 1114 case lists:member(Pid, OldSubscribers) of 1115 true when Reason =:= shutdown -> 1116 try 1117 S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), 1118 noreply(S2) 1119 catch 1120 throw:{stop, R} -> 1121 opt_unlink(S#state.parent_pid), 1122 {stop, R, S} 1123 end; 1124 true -> 1125 opt_unlink(S#state.parent_pid), 1126 {stop, Reason, S}; 1127 false -> 1128 ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n", 1129 [?MODULE, self(), Info, S]), 1130 noreply(S) 1131 end; 1132handle_info(Info, S) -> 1133 ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n", 1134 [?MODULE, self(), Info, S]), 1135 noreply(S). 1136 1137listen_on_trace_port(Node, Port, S) -> 1138 [_Name, Host] = string:lexemes(atom_to_list(Node), [$@]), 1139 case catch start_trace_client(self(), ip, {Host, Port}) of 1140 {trace_client_pid, RemotePid} -> 1141 rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]), 1142 link(RemotePid), 1143 S#state{trace_nodes = [Node | S#state.trace_nodes], 1144 trace_port = Port + 1}; 1145 {'EXIT', Reason} when Reason =:= already_started-> 1146 ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~tp~n", 1147 [?MODULE, self(), Node, Port, Reason]), 1148 S#state{trace_port = Port + 1}; 1149 {'EXIT', Reason} -> 1150 self() ! {nodeup, Node}, 1151 ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~tp~n", 1152 [?MODULE, self(), Node, Port, Reason]), 1153 S#state{trace_port = Port + 1} 1154 end. 1155 1156%%---------------------------------------------------------------------- 1157%% Func: terminate/2 1158%% Purpose: Shutdown the server 1159%% Returns: any (ignored by gen_server) 1160%%---------------------------------------------------------------------- 1161 1162terminate(Reason, S) -> 1163 Fun = fun(Pid) -> exit(Pid, Reason) end, 1164 lists:foreach(Fun, S#state.subscribers). 1165 1166%%---------------------------------------------------------------------- 1167%% Func: code_change/3 1168%% Purpose: Convert process state when code is changed 1169%% Returns: {ok, NewState} 1170%%---------------------------------------------------------------------- 1171 1172code_change(_OldVsn, S, _Extra) -> 1173 {ok, S}. 1174 1175%%%---------------------------------------------------------------------- 1176%%% Internal functions 1177%%%---------------------------------------------------------------------- 1178 1179do_clear_table(S) -> 1180 OldTab = S#state.event_tab, 1181 ets:delete(OldTab), 1182 NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), 1183 S#state{event_tab = NewTab}. 1184 1185do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) -> 1186 OldSubscribers = S#state.subscribers, 1187 NewSubscribers = 1188 case lists:member(Pid, OldSubscribers) of 1189 true -> 1190 OldSubscribers; 1191 false -> 1192 link(Pid), 1193 All = ets:match_object(S#state.dict_tab, '_'), 1194 lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All), 1195 [Pid | OldSubscribers] 1196 end, 1197 do_multicast(NewSubscribers, Msg), 1198 Size = ets:info(S#state.event_tab, size), 1199 do_multicast(NewSubscribers, {more_events, Size}), 1200 ets:insert(S#state.dict_tab, {Key, Val}), 1201 S#state{subscribers = NewSubscribers}; 1202do_dict_insert(Msg = {dict_insert, Key, Val}, S) -> 1203 do_multicast(S#state.subscribers, Msg), 1204 ets:insert(S#state.dict_tab, {Key, Val}), 1205 S. 1206 1207do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) -> 1208 OldSubscribers = S#state.subscribers, 1209 do_multicast(OldSubscribers, Msg), 1210 ets:delete(S#state.dict_tab, Key), 1211 case lists:member(Pid, OldSubscribers) of 1212 true -> 1213 unlink(Pid), 1214 S2 = S#state{subscribers = OldSubscribers -- [Pid]}, 1215 if 1216 S2#state.auto_shutdown, 1217 S2#state.subscribers =:= [] -> 1218 throw({stop, shutdown}); 1219 true -> 1220 S2 1221 end; 1222 false -> 1223 S 1224 end; 1225do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) -> 1226 S; 1227do_dict_delete(Msg = {dict_delete, Key}, S) -> 1228 do_multicast(S#state.subscribers, Msg), 1229 ets:delete(S#state.dict_tab, Key), 1230 S. 1231 1232tab_iterate(_Fun, _Tab, '$end_of_table', Acc) -> 1233 Acc; 1234tab_iterate(Fun, Tab, Key, Acc) -> 1235 Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)), 1236 tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2). 1237 1238file_open(F) -> 1239 Fd = make_ref(), 1240 case F#file.file_opt of 1241 write -> ok = file:rename(F#file.name, F#file.name ++ ".OLD"); 1242 append -> ok 1243 end, 1244 Args = [{file, F#file.name}, {name, Fd}, 1245 {repair, true}, {mode, read_write}], 1246 case disk_log:open(Args) of 1247 {ok, _} -> 1248 {ok, Fd}; 1249 {repaired, _, _, BadBytes} -> 1250 ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~tp~n", 1251 [?MODULE, BadBytes, F#file.name]), 1252 {ok, Fd}; 1253 {error,Reason} -> 1254 {error,Reason} 1255 end. 1256 1257parse_file_options(F, [H | T]) -> 1258 case H of 1259 existing -> parse_file_options(F#file{event_opt = existing} , T); 1260 %%new -> parse_file_options(F#file{event_opt = new} , T); 1261 all -> parse_file_options(F#file{event_opt = all} , T); 1262 write -> parse_file_options(F#file{file_opt = write} , T); 1263 append -> parse_file_options(F#file{file_opt = append} , T); 1264 keep -> parse_file_options(F#file{table_opt = keep} , T); 1265 clear -> parse_file_options(F#file{table_opt = clear} , T); 1266 Bad -> {error, {bad_file_option, Bad}} 1267 end; 1268parse_file_options(F, []) -> 1269 {ok, F}. 1270 1271do_multicast([Pid | Pids], Msg) -> 1272 Pid ! {et, Msg}, 1273 do_multicast(Pids, Msg); 1274do_multicast([], _Msg) -> 1275 ok. 1276 1277opt_unlink(Pid) -> 1278 if 1279 Pid =:= undefined -> 1280 ok; 1281 true -> 1282 unlink(Pid) 1283 end. 1284 1285reply(Reply, #state{subscribers = []} = S) -> 1286 {reply, Reply, S}; 1287reply(Reply, S) -> 1288 {reply, Reply, S, 500}. 1289 1290noreply(#state{subscribers = []} = S) -> 1291 {noreply, S}; 1292noreply(S) -> 1293 {noreply, S, 500}. 1294 1295check_size(S) -> 1296 Size = ets:info(S#state.event_tab, size), 1297 if 1298 Size =:= S#state.event_tab_size -> 1299 S; 1300 true -> 1301 %% Tell the subscribers that more events are available 1302 Msg = {more_events, Size}, 1303 do_multicast(S#state.subscribers, Msg), 1304 S#state{event_tab_size = Size} 1305 end. 1306