1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2000-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20%%----------------------------------------------------------------------
21%% Purpose: Collect trace events and provide a backing storage
22%%          appropriate for iteration
23%%----------------------------------------------------------------------
24
25-module(et_collector).
26
27-behaviour(gen_server).
28
29%% External exports
30-export([start_link/1,
31         stop/1,
32
33         report/2,
34         report_event/5,
35         report_event/6,
36
37         iterate/3,
38         iterate/5,
39	 lookup/2,
40
41         start_trace_client/3,
42         start_trace_port/1,
43         save_event_file/3,
44         clear_table/1,
45
46         get_global_pid/0,
47	 get_table_size/1,
48         change_pattern/2,
49         make_key/2,
50
51         dict_insert/3,
52         dict_delete/2,
53         dict_lookup/2,
54         dict_match/2,
55         multicast/2]).
56
57%% Internal export
58-export([monitor_trace_port/2]).
59
60%% gen_server callbacks
61-export([init/1,terminate/2, code_change/3,
62         handle_call/3, handle_cast/2, handle_info/2]).
63
64-compile([{nowarn_deprecated_function,[{erlang,now,0}]}]).
65
66-include("et_internal.hrl").
67-include("../include/et.hrl").
68
69-record(state, {parent_pid,
70		auto_shutdown, % Optionally shutdown when the last subscriber dies
71		event_tab_size,
72                event_tab,
73                dict_tab,
74                event_order,
75                subscribers,
76                file,
77                trace_pattern,
78                trace_port,
79                trace_max_queue,
80                trace_nodes,
81                trace_global}).
82
83-record(file, {name, desc, event_opt, file_opt, table_opt}).
84
85-record(table_handle, {collector_pid, event_tab, event_order, filter}).
86
87-record(trace_ts, {trace_ts, event_ts}).
88-record(event_ts, {event_ts, trace_ts}).
89
90%%%----------------------------------------------------------------------
91%%% Client side
92%%%----------------------------------------------------------------------
93
94%%----------------------------------------------------------------------
95%% start_link(Options) -> {ok, CollectorPid} | {error, Reason}
96%%
97%% Start a collector process
98%%
99%% The collector collects trace events and keeps them ordered by their
100%% timestamp. The timestamp may either reflect the time when the
101%% actual trace data was generated (trace_ts) or when the trace data
102%% was transformed into an event record (event_ts). If the time stamp
103%% is missing in the trace data (missing timestamp option to
104%% erlang:trace/4) the trace_ts will be set to the event_ts.
105%%
106%% Events are reported to the collector directly with the report
107%% function or indirectly via one or more trace clients. All reported
108%% events are first filtered thru the collector filter before they are
109%% stored by the collector. By replacing the default collector filter
110%% with a customized dito it is possible to allow any trace data as
111%% input. The collector filter is a dictionary entry with the
112%% predefined key {filter, all} and the value is a fun of
113%% arity 1. See et_selector:parse_event/2 for interface details,
114%% such as which erlang:trace/1 tuples that are accepted.
115%%
116%% The collector has a built-in dictionary service. Any term may be
117%% stored as value in the dictionary and bound to a unique key. When
118%% new values are inserted with an existing key, the new values will
119%% overwrite the existing ones. Processes may subscribe on dictionary
120%% updates by using {subscriber, pid()} as dictionary key. All
121%% dictionary updates will be propagated to the subscriber processes
122%% matching the pattern {{subscriber, '_'}, '_'} where the first '_'
123%% is interpreted as a pid().
124%%
125%% In global trace mode, the collector will automatically start
126%% tracing on all connected Erlang nodes. When a node connects, a port
127%% tracer will be started on that node and a corresponding trace
128%% client on the collector node. By default the global trace pattern
129%% is 'max'.
130%%
131%% Options = [option()]
132%%
133%% option() =
134%%   {parent_pid, pid()} |
135%%   {event_order, event_order()} |
136%%   {dict_insert, {filter, all}, collector_fun()} |
137%%   {dict_insert, {filter, event_filter_name()}, event_filter_fun()} |
138%%   {dict_insert, {subscriber, pid()}, dict_val()} |
139%%   {dict_insert, dict_key(), dict_val()} |
140%%   {dict_delete, dict_key()} |
141%%   {trace_client, trace_client()} |
142%%   {trace_global, boolean()} |
143%%   {trace_pattern, trace_pattern()} |
144%%   {trace_port, integer()} |
145%%   {trace_max_queue, integer()}
146%%
147%% event_order() = trace_ts | event_ts
148%% trace_pattern() = detail_level() | dbg_match_spec()
149%% detail_level() = min | max | integer(X) when X >= 0, X =< 100
150%% trace_client() =
151%%   {event_file, file_name()} |
152%%   {dbg_trace_type(), dbg_trace_parameters()}
153%% file_name() = string()
154%% collector_fun() = trace_filter_fun() | event_filter_fun()
155%% trace_filter_fun() = fun(TraceData) -> false | true | {true, NewEvent}
156%% event_filter_fun() = fun(Event) -> false | true | {true, NewEvent}
157%% event_filter_name() = atom()
158%% TraceData = erlang_trace_data()
159%% Event = NewEvent = record(event)
160%% dict_key() = term()
161%% dict_val() = term()
162%%
163%% CollectorPid = pid()
164%% Reason = term()
165%%----------------------------------------------------------------------
166
167start_link(Options) ->
168    case parse_opt(Options, default_state(), [], []) of
169	{ok, S, Dict2, Clients} ->
170	    Res =
171		case S#state.trace_global of
172		    false ->
173			gen_server:start_link(?MODULE, [S, Dict2], []);
174		    true ->
175			gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], [])
176		end,
177            case Res of
178                {ok, Pid} when S#state.parent_pid =/= self() ->
179                    unlink(Pid),
180                    start_clients(Pid, Clients);
181                {ok,Pid} ->
182                    start_clients(Pid, Clients);
183                {error, Reason} ->
184                    {error, Reason}
185            end;
186        {error, Reason} ->
187            {error, Reason}
188    end.
189
190default_state() ->
191    #state{parent_pid      = self(),
192	   auto_shutdown   = false,
193           event_order     = trace_ts,
194           subscribers     = [],
195           trace_global    = false,
196           trace_pattern   = undefined,
197           trace_nodes     = [],
198           trace_port      = 4711,
199           trace_max_queue = 50}.
200
201parse_opt([], S, Dict, Clients) ->
202    {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern),
203    Fun = fun(E) -> et_selector:parse_event(Mod, E) end,
204    Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun},
205    {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients};
206parse_opt([H | T], S, Dict, Clients) ->
207    case H of
208        {parent_pid, Parent} when Parent =:= undefined ->
209            parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
210        {parent_pid, Parent} when is_pid(Parent) ->
211            parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
212        {auto_shutdown, Bool} when Bool =:= true; Bool =:= false ->
213            parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients);
214        {event_order, Order} when Order =:= trace_ts ->
215            parse_opt(T, S#state{event_order = Order}, Dict, Clients);
216        {event_order, Order}  when Order =:= event_ts ->
217            parse_opt(T, S#state{event_order = Order}, Dict, Clients);
218        {dict_insert, {filter, Name}, Fun} ->
219	    if
220		is_atom(Name), is_function(Fun) ->
221		    parse_opt(T, S, Dict ++ [H], Clients);
222		true ->
223	            {error, {bad_option, H}}
224	    end;
225        {dict_insert, {subscriber, Pid}, _Val} ->
226	    if
227		is_pid(Pid) ->
228		    parse_opt(T, S, Dict ++ [H], Clients);
229		true ->
230	            {error, {bad_option, H}}
231	    end;
232        {dict_insert, _Key, _Val} ->
233            parse_opt(T, S, Dict ++ [H], Clients);
234        {dict_delete, _Key} ->
235            parse_opt(T, S, Dict ++ [H], Clients);
236        {trace_client, Client = {_, _}} ->
237            parse_opt(T, S, Dict, Clients ++ [Client]);
238        {trace_global, Bool} when Bool =:= false ->
239            parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
240        {trace_global, Bool} when Bool =:= true ->
241            parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
242        {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) ->
243            parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
244        {trace_pattern, undefined = Pattern} ->
245            parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
246        {trace_port, Port} when is_integer(Port) ->
247            parse_opt(T, S#state{trace_port = Port}, Dict, Clients);
248        {trace_max_queue, MaxQueue} when is_integer(MaxQueue) ->
249            parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients);
250        Bad ->
251            {error, {bad_option, Bad}}
252    end;
253parse_opt(BadList, _S, _Dict, _Clients) ->
254    {error, {bad_option_list, BadList}}.
255
256start_clients(CollectorPid, [{Type, Parameters} | T]) ->
257    _ = start_trace_client(CollectorPid, Type, Parameters),
258    start_clients(CollectorPid, T);
259start_clients(CollectorPid, []) ->
260    {ok, CollectorPid}.
261
262%%----------------------------------------------------------------------
263%% stop(CollectorPid) -> ok
264%%
265%% Stop a collector process
266%%
267%% CollectorPid = pid()
268%%----------------------------------------------------------------------
269
270stop(CollectorPid) ->
271    call(CollectorPid, stop).
272
273%%----------------------------------------------------------------------
274%% save_event_file(CollectorPid, FileName, Options) -> ok | {error, Reason}
275%%
276%% Saves the events to a file
277%%
278%% CollectorPid = pid()
279%% FileName = string()
280%% Options = [option()]
281%% Reason = term()
282%%
283%% option() = event_option() | file_option() | table_option()
284%% event_option() = existing
285%% file_option() = write | append
286%% table_option() = keep | clear
287%%
288%% By default the currently stored events (existing) are
289%% written to a brand new file (write) and the events are
290%% kept (keep) after they have been written to the file.
291%%
292%% Instead of keeping the events after writing them to file,
293%% it is possible to remove all stored events after they
294%% have successfully written to file (clear).
295%%
296%% The options defaults to existing, write and keep.
297%%----------------------------------------------------------------------
298
299save_event_file(CollectorPid, FileName, Options) ->
300    call(CollectorPid, {save_event_file, FileName, Options}).
301
302%%----------------------------------------------------------------------
303%% load_event_file(CollectorPid, FileName) ->{ok, BadBytes} | exit(Reason)
304%%
305%% Load the event table from a file
306%%
307%% CollectorPid = pid()
308%% FileName = string()
309%% BadBytes = integer(X) where X >= 0
310%% Reason = term()
311%%----------------------------------------------------------------------
312
313load_event_file(CollectorPid, FileName) ->
314    Fd = make_ref(),
315    Args = [{file, FileName}, {name, Fd}, {repair, true}, {mode, read_only}],
316    Fun = fun(Event, {ok, TH}) -> report(TH, Event) end,
317    case disk_log:open(Args) of
318        {ok, _} ->
319            do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, 0);
320        {repaired, _, _, BadBytes} ->
321            do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, BadBytes);
322        {error, Reason} ->
323            exit({disk_log_open, FileName, Reason})
324    end.
325
326do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) ->
327    case disk_log:chunk(Fd, Cont) of
328        eof ->
329            {ok, BadBytes};
330        {error, Reason} ->
331            exit({bad_disk_log_chunk, FileName, Reason});
332        {Cont2, Events} ->
333            Acc2 = lists:foldl(Fun, Acc, Events),
334            do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes);
335        {Cont2, Events, More} ->
336            Acc2 = lists:foldl(Fun, Acc, Events),
337            do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes + More)
338    end.
339
340%%----------------------------------------------------------------------
341%% report(Handle, TraceOrEvent)
342%%
343%% Report an event to the collector
344%%
345%% All events are filtered thru the collector filter, which
346%% optionally may transform or discard the event. The first
347%% call should use the pid of the collector process as
348%% report handle, while subsequent calls should use the
349%% table handle.
350%%
351%% Handle = Initial | Continuation
352%% Initial = collector_pid()
353%% collector_pid() = pid()
354%% Continuation = record(table_handle)
355%%
356%% TraceOrEvent = record(event) | dbg_trace_tuple() | end_of_trace
357%% Reason = term()
358%%
359%% Returns: {ok, Continuation} | exit(Reason)
360%%----------------------------------------------------------------------
361
362report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) ->
363    case get_table_handle(CollectorPid) of
364        {ok, TH} when is_record(TH, table_handle) ->
365            report(TH, TraceOrEvent);
366        {error, Reason} ->
367            exit(Reason)
368    end;
369report(TH, TraceOrEvent) when is_record(TH, table_handle) ->
370    Fun = TH#table_handle.filter,
371    case Fun(TraceOrEvent) of
372        false ->
373            {ok, TH};
374        true when is_record(TraceOrEvent, event) ->
375            Key = make_key(TH, TraceOrEvent),
376            case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of
377                true ->
378                    {ok, TH};
379                {'EXIT', _Reason} ->
380                    %% Refresh the report handle and try again
381                    report(TH#table_handle.collector_pid, TraceOrEvent)
382            end;
383        {true, Event} when is_record(Event, event) ->
384            Key = make_key(TH, Event),
385            case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
386                true ->
387                    {ok, TH};
388                {'EXIT', _Reason} ->
389                    %% Refresh the report handle and try again
390                    report(TH#table_handle.collector_pid, TraceOrEvent)
391            end;
392        BadEvent ->
393            TS = erlang:now(),
394            Contents = [{trace, TraceOrEvent}, {reason, BadEvent}, {filter, Fun}],
395            Event = #event{detail_level = 0,
396                           trace_ts     = TS,
397                           event_ts     = TS,
398                           from         = bad_filter,
399                           to           = bad_filter,
400                           label        = bad_filter,
401                           contents     = Contents},
402            Key = make_key(TH, Event),
403            case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
404                true ->
405                    {ok, TH};
406                {'EXIT', _Reason} ->
407                    %% Refresh the report handle and try again
408                    report(TH#table_handle.collector_pid, TraceOrEvent)
409            end
410    end;
411report(_, Bad) ->
412    exit({bad_event, Bad}).
413
414report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) ->
415    report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents).
416
417report_event(CollectorPid, DetailLevel, From, To, Label, Contents)
418  when is_integer(DetailLevel),
419       DetailLevel >= ?detail_level_min,
420       DetailLevel =< ?detail_level_max ->
421    TS= erlang:now(),
422    E = #event{detail_level = DetailLevel,
423               trace_ts     = TS,
424               event_ts     = TS,
425               from         = From,
426               to           = To,
427               label        = Label,
428               contents     = Contents},
429    report(CollectorPid, E).
430
431%%----------------------------------------------------------------------
432%% make_key(Type, Stuff) -> Key
433%%
434%% Makes a key out of an event record or an old key
435%%
436%% Type = record(table_handle) | trace_ts | event_ts
437%% Stuff = record(event) | Key
438%% Key = record(event_ts) | record(trace_ts)
439%%----------------------------------------------------------------------
440
441make_key(TH, Stuff) when is_record(TH, table_handle) ->
442    make_key(TH#table_handle.event_order, Stuff);
443make_key(trace_ts, Stuff) ->
444    if
445        is_record(Stuff, event) ->
446            #event{trace_ts = R, event_ts = P} = Stuff,
447            #trace_ts{trace_ts = R, event_ts = P};
448        is_record(Stuff, trace_ts) ->
449            Stuff;
450        is_record(Stuff, event_ts) ->
451            #event_ts{trace_ts = R, event_ts = P} = Stuff,
452            #trace_ts{trace_ts = R, event_ts = P}
453    end;
454make_key(event_ts, Stuff) ->
455    if
456        is_record(Stuff, event) ->
457            #event{trace_ts = R, event_ts = P} = Stuff,
458            #event_ts{trace_ts = R, event_ts = P};
459        is_record(Stuff, event_ts) ->
460            Stuff;
461        is_record(Stuff, trace_ts) ->
462            #trace_ts{trace_ts = R, event_ts = P} = Stuff,
463            #event_ts{trace_ts = R, event_ts = P}
464    end.
465
466%%----------------------------------------------------------------------
467%%----------------------------------------------------------------------
468
469get_table_size(CollectorPid) when is_pid(CollectorPid) ->
470    call(CollectorPid, get_table_size).
471
472%%----------------------------------------------------------------------
473%% get_table_handle(CollectorPid) -> Handle
474%%
475%% Return a table handle
476%%
477%% CollectorPid = pid()
478%% Handle = record(table_handle)
479%%----------------------------------------------------------------------
480
481get_table_handle(CollectorPid) when is_pid(CollectorPid) ->
482    call(CollectorPid, get_table_handle).
483
484%%----------------------------------------------------------------------
485%% get_global_pid() -> CollectorPid | exit(Reason)
486%%
487%% Return a the identity of the globally registered collector
488%% if there is any
489%%
490%% CollectorPid = pid()
491%% Reason = term()
492%%----------------------------------------------------------------------
493
494get_global_pid() ->
495    case global:whereis_name(?MODULE) of
496        CollectorPid when is_pid(CollectorPid) ->
497            CollectorPid;
498        undefined ->
499            exit(global_collector_not_started)
500    end.
501
502%%----------------------------------------------------------------------
503%% change_pattern(CollectorPid, RawPattern) -> {old_pattern, TracePattern}
504%%
505%% Change active trace pattern globally on all trace nodes
506%%
507%% CollectorPid = pid()
508%% RawPattern = {report_module(), extended_dbg_match_spec()}
509%% report_module() = atom() | undefined
510%% extended_dbg_match_spec() = detail_level() | dbg_match_spec()
511%% RawPattern = detail_level()
512%% detail_level() = min | max | integer(X) when X =< 0, X >= 100
513%% TracePattern = {report_module(), dbg_match_spec_match_spec()}
514%%----------------------------------------------------------------------
515
516change_pattern(CollectorPid, RawPattern) ->
517    Pattern = et_selector:make_pattern(RawPattern),
518    call(CollectorPid, {change_pattern, Pattern}).
519
520%%----------------------------------------------------------------------
521%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok
522%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok
523%% dict_insert(CollectorPid, Key, Val) -> ok
524%%
525%% Insert a dictionary entry
526%% and send a {et, {dict_insert, Key, Val}} tuple
527%% to all registered subscribers.
528%%
529%% If the entry is a new subscriber, it will imply that
530%% the new subscriber process first will get one message
531%% for each already stored dictionary entry, before it
532%% and all old subscribers will get this particular entry.
533%% The collector process links to and then supervises the
534%% subscriber process. If the subscriber process dies it
535%% will imply that it gets unregistered as with a normal
536%% dict_delete/2.
537%%
538%% CollectorPid = pid()
539%% FilterFun = filter_fun()
540%% SubscriberPid = pid()
541%% Void = term()
542%% Key = term()
543%% Val = term()
544%%----------------------------------------------------------------------
545
546dict_insert(CollectorPid, Key = {filter, Name}, Fun) ->
547    if
548	is_atom(Name), is_function(Fun) ->
549	    call(CollectorPid, {dict_insert, Key, Fun});
550	true ->
551	    exit({badarg, Key})
552    end;
553dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) ->
554    if
555	is_pid(Pid) ->
556	    call(CollectorPid, {dict_insert, Key, Val});
557	true ->
558	    exit({badarg, Key})
559    end;
560dict_insert(CollectorPid, Key, Val) ->
561    call(CollectorPid, {dict_insert, Key, Val}).
562
563%%----------------------------------------------------------------------
564%% dict_lookup(CollectorPid, Key) -> [Val]
565%%
566%% Lookup a dictionary entry and return zero or one value
567%%
568%% CollectorPid = pid()
569%% Key = term()
570%% Val = term()
571%%----------------------------------------------------------------------
572
573dict_lookup(CollectorPid, Key) ->
574    call(CollectorPid, {dict_lookup, Key}).
575
576%%----------------------------------------------------------------------
577%% Ddict_delete(CollectorPid, Key) -> ok
578%%
579%% elete a dictionary entry
580%% and send a {et, {dict_delete, Key}} tuple
581%% to all registered subscribers.
582%%
583%% If the deleted entry is a registered subscriber, it will
584%% imply that the subscriber process gets is unregistered as
585%% subscriber as well as it gets it final message.
586%%
587%% dict_delete(CollectorPid, {subscriber, SubscriberPid})
588%% dict_delete(CollectorPid, Key)
589%%
590%% CollectorPid = pid()
591%% SubscriberPid = pid()
592%% Key = term()
593%%----------------------------------------------------------------------
594
595dict_delete(CollectorPid, Key) ->
596    call(CollectorPid, {dict_delete, Key}).
597
598%%----------------------------------------------------------------------
599%% dict_match(CollectorPid, Pattern) -> [Match]
600%%
601%% Match some dictionary entries
602%%
603%% CollectorPid = pid()
604%% Pattern = '_' | {key_pattern(), val_pattern()}
605%% key_pattern() = ets_match_object_pattern()
606%% val_pattern() = ets_match_object_pattern()
607%% Match = {key(), val()}
608%% key() = term()
609%% val() = term()
610%%----------------------------------------------------------------------
611
612dict_match(CollectorPid, Pattern)  ->
613    call(CollectorPid, {dict_match, Pattern}).
614
615%%----------------------------------------------------------------------
616%% multicast(_CollectorPid, Msg) -> ok
617%%
618%% Sends a message to all registered subscribers
619%%
620%% CollectorPid = pid()
621%% Msg = term()
622%%----------------------------------------------------------------------
623
624multicast(_CollectorPid, Msg = {dict_insert, _Key, _Val}) ->
625    exit({badarg, Msg});
626multicast(_CollectorPid, Msg = {dict_delete, _Key}) ->
627    exit({badarg, Msg});
628multicast(CollectorPid, Msg) ->
629    call(CollectorPid, {multicast, Msg}).
630
631%%----------------------------------------------------------------------
632%% start_trace_client(CollectorPid, Type, Parameters) ->
633%%     file_loaded | {trace_client_pid, pid()} | exit(Reason)
634%%
635%% Load raw Erlang trace from a file, port or process.
636%%
637%% Type       = dbg_trace_client_type()
638%% Parameters = dbg_trace_client_parameters()
639%% Pid        = dbg_trace_client_pid()
640%%----------------------------------------------------------------------
641
642start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file ->
643    load_event_file(CollectorPid, FileName);
644start_trace_client(CollectorPid, Type, FileName) when Type =:= file ->
645    WaitFor = {make_ref(), end_of_trace},
646    EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end,
647    EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo  end,
648    Spec = trace_spec_wrapper(EventFun, EndFun, {self(), {ok, CollectorPid}}),
649    Pid = dbg:trace_client(Type, FileName, Spec),
650    unlink(Pid),
651    Ref = erlang:monitor(process, Pid),
652    receive
653        WaitFor ->
654	    erlang:demonitor(Ref, [flush]),
655	    file_loaded;
656        {'DOWN', Ref, _, _, Reason} ->
657            exit(Reason)
658    end;
659start_trace_client(CollectorPid, Type, Parameters) ->
660    EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end,
661    EndFun   = fun(Acc) -> Acc end,
662    Spec = trace_spec_wrapper(EventFun, EndFun, {ok, CollectorPid}),
663    Pid = dbg:trace_client(Type, Parameters, Spec),
664    CollectorPid ! {register_trace_client, Pid},
665    unlink(Pid),
666    {trace_client_pid, Pid}.
667
668trace_spec_wrapper(EventFun, EndFun, EventInitialAcc)
669  when is_function(EventFun), is_function(EndFun) ->
670    {fun(Trace, Acc) ->
671             case Trace =:= end_of_trace of
672                 true  -> EndFun(Acc);
673                 false -> EventFun(Trace,  Acc)
674             end
675     end,
676     EventInitialAcc}.
677
678start_trace_port(Parameters) ->
679    dbg:tracer(port, dbg:trace_port(ip, Parameters)).
680
681monitor_trace_port(CollectorPid, Parameters) ->
682    Res = start_trace_port(Parameters),
683    spawn(fun() ->
684		  MonitorRef = erlang:monitor(process, CollectorPid),
685		  receive
686		      {'DOWN', MonitorRef, _, _, _} ->
687			  dbg:stop_clear()
688		  end
689	  end),
690    Res.
691
692%%----------------------------------------------------------------------
693%% iterate(Handle, Prev, Limit) ->
694%%     iterate(Handle, Prev, Limit, undefined, Prev)
695%%
696%% Iterates over the currently stored events
697%%
698%% Short for iterate/5.
699%%----------------------------------------------------------------------
700
701iterate(Handle, Prev, Limit) ->
702    iterate(Handle, Prev, Limit, undefined, Prev).
703
704%%----------------------------------------------------------------------
705%% iterate(Handle, Prev, Limit, Fun, Acc) -> NewAcc
706%%
707%% Iterates over the currently stored events and apply a function for
708%% each event. The iteration may be performed forwards or backwards
709%% and may be limited to a maximum number of events (abs(Limit)).
710%%
711%% Handle = collector_pid() | table_handle()
712%% Prev = first | last | event_key()
713%% Limit = done() | forward() | backward()
714%% collector_pid() = pid()
715%% table_handle() = record(table_handle)
716%% event_key() =
717%% done() = 0
718%% forward() = infinity | integer(X) where X > 0
719%% backward() = '-infinity' | integer(X) where X < 0
720%% Fun = fun(Event, Acc) -> NewAcc
721%% Acc = NewAcc = term()
722%%----------------------------------------------------------------------
723
724iterate(_, _, Limit, _, Acc) when Limit =:= 0 ->
725    Acc;
726iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) ->
727    case get_table_handle(CollectorPid) of
728        {ok, TH} when is_record(TH, table_handle) ->
729            iterate(TH, Prev, Limit, Fun, Acc);
730        {error, Reason} ->
731            exit(Reason)
732    end;
733iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) ->
734    if
735        Limit =:= infinity ->
736            next_iterate(TH, Prev, Limit, Fun, Acc);
737        is_integer(Limit), Limit > 0 ->
738            next_iterate(TH, Prev, Limit, Fun, Acc);
739        Limit =:= '-infinity' ->
740            prev_iterate(TH, Prev, Limit, Fun, Acc);
741        is_integer(Limit), Limit < 0 ->
742            prev_iterate(TH, Prev, Limit, Fun, Acc)
743    end.
744
745next_iterate(TH, Prev = first, Limit, Fun, Acc) ->
746    Tab = TH#table_handle.event_tab,
747    case catch ets:first(Tab) of
748        '$end_of_table' ->
749            Acc;
750        {'EXIT', _} = Error ->
751            io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]),
752            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
753        First ->
754            lookup_and_apply(TH, Prev, First, Limit, -1, Fun, Acc)
755    end;
756next_iterate(TH, Prev = last, Limit, Fun, Acc) ->
757    Tab = TH#table_handle.event_tab,
758    case catch ets:last(Tab) of
759        '$end_of_table' ->
760            Acc;
761        {'EXIT', _} = Error ->
762            io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]),
763            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
764        Last ->
765            lookup_and_apply(TH, Prev, Last, Limit, -1, Fun, Acc)
766    end;
767next_iterate(TH, Prev, Limit, Fun, Acc) ->
768    Tab = TH#table_handle.event_tab,
769    Key = make_key(TH, Prev),
770    case catch ets:next(Tab, Key) of
771        '$end_of_table' ->
772            Acc;
773        {'EXIT', _} = Error ->
774            io:format("~p(~p): Next ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]),
775            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
776        Next ->
777            lookup_and_apply(TH, Prev, Next, Limit, -1, Fun, Acc)
778    end.
779
780prev_iterate(TH, Prev = first, Limit, Fun, Acc) ->
781    Tab = TH#table_handle.event_tab,
782    case catch ets:first(Tab) of
783        '$end_of_table' ->
784            Acc;
785        {'EXIT', _} = Error ->
786            io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]),
787            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
788        First ->
789            lookup_and_apply(TH, Prev, First, Limit, 1, Fun, Acc)
790    end;
791prev_iterate(TH, Prev = last, Limit, Fun, Acc) ->
792    Tab = TH#table_handle.event_tab,
793    case catch ets:last(Tab) of
794        '$end_of_table' ->
795            Acc;
796        {'EXIT', _} = Error ->
797            io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]),
798            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
799        Last ->
800            lookup_and_apply(TH, Prev, Last, Limit, 1, Fun, Acc)
801    end;
802prev_iterate(TH, Prev, Limit, Fun, Acc) ->
803    Tab = TH#table_handle.event_tab,
804    Key = make_key(TH, Prev),
805    case catch ets:prev(Tab, Key) of
806        '$end_of_table' ->
807            Acc;
808        {'EXIT', _} = Error ->
809            io:format("~p(~p): Prev ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]),
810            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
811        Next ->
812            lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc)
813    end.
814
815lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined ->
816    Limit2 = incr(Limit, Incr),
817    iterate(TH, Next, Limit2, Fun, Next);
818lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) ->
819    Tab = TH#table_handle.event_tab,
820    case catch ets:lookup_element(Tab, Next, 2) of
821        {'EXIT', _} ->
822            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
823        E when is_record(E, event) ->
824            Acc2 = Fun(E, Acc),
825            Limit2 = incr(Limit, Incr),
826            iterate(TH, Next, Limit2, Fun, Acc2)
827    end.
828
829lookup(CollectorPid, Key) when is_pid(CollectorPid) ->
830    case get_table_handle(CollectorPid) of
831        {ok, TH} when is_record(TH, table_handle) ->
832            lookup(TH, Key);
833        {error, Reason} ->
834            {error, Reason}
835    end;
836lookup(TH, Key) when is_record(TH, table_handle) ->
837    Tab = TH#table_handle.event_tab,
838    case catch ets:lookup_element(Tab, Key, 2) of
839        {'EXIT', _} ->
840            {error, enoent};
841        E when is_record(E, event) ->
842	    {ok, E}
843    end.
844
845incr(Val, Incr) ->
846    if
847	Val =:= infinity    -> Val;
848	Val =:= '-infinity' -> Val;
849	is_integer(Val)       -> Val + Incr
850    end.
851
852%%----------------------------------------------------------------------
853%% clear_table(Handle) -> ok
854%%
855%% Clear the event table
856%%
857%% Handle = collector_pid() | table_handle()
858%% collector_pid() = pid()
859%% table_handle() = record(table_handle)
860%%----------------------------------------------------------------------
861
862clear_table(CollectorPid) when is_pid(CollectorPid) ->
863    call(CollectorPid, clear_table);
864clear_table(TH) when is_record(TH, table_handle) ->
865    clear_table(TH#table_handle.collector_pid).
866
867call(CollectorPid, Request) ->
868    try
869	gen_server:call(CollectorPid, Request, infinity)
870    catch
871	exit:{noproc,_} ->
872	    {error, no_collector}
873    end.
874
875%%%----------------------------------------------------------------------
876%%% Callback functions from gen_server
877%%%----------------------------------------------------------------------
878
879%%----------------------------------------------------------------------
880%% Func: init/1
881%% Returns: {ok, State}          |
882%%          {ok, State, Timeout} |
883%%          ignore               |
884%%          {stop, Reason}
885%%----------------------------------------------------------------------
886
887init([InitialS, Dict]) ->
888    process_flag(trap_exit, true),
889    case InitialS#state.parent_pid of
890	undefined ->
891	    ok;
892	Pid when is_pid(Pid) ->
893	    link(Pid)
894    end,
895    Funs = [fun init_tables/1,
896            fun init_global/1,
897            fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end],
898    {ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}.
899
900init_tables(S) ->
901    EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
902    DictTab  = ets:new(et_dict,   [ordered_set, {keypos, 1}, public]),
903    S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}.
904
905init_global(S) ->
906    case S#state.trace_global of
907        true ->
908            EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end,
909            EndFun = fun(Acc) -> Acc end,
910            Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}),
911            dbg:tracer(process, Spec),
912            et_selector:change_pattern(S#state.trace_pattern),
913            ok = net_kernel:monitor_nodes(true),
914            lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()),
915            S#state{trace_nodes = [node()]};
916        false ->
917            S
918    end.
919
920%%----------------------------------------------------------------------
921%% Func: handle_call/3
922%% Returns: {reply, Reply, State}          |
923%%          {reply, Reply, State, Timeout} |
924%%          {noreply, State}               |
925%%          {noreply, State, Timeout}      |
926%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
927%%          {stop, Reason, State}            (terminate/2 is called)
928%%----------------------------------------------------------------------
929
930handle_call({multicast, Msg}, _From, S) ->
931    do_multicast(S#state.subscribers, Msg),
932    reply(ok, S);
933
934handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) ->
935    S2 = do_dict_insert(Msg, S),
936    reply(ok, S2);
937
938handle_call(Msg = {dict_delete, _Key}, _From, S) ->
939    try
940	S2 = do_dict_delete(Msg, S),
941	reply(ok, S2)
942    catch
943	throw:{stop, R} ->
944	    opt_unlink(S#state.parent_pid),
945	    {stop, R, S}
946    end;
947handle_call({dict_lookup, Key}, _From, S) ->
948    Reply = ets:lookup(S#state.dict_tab, Key),
949    reply(Reply, S);
950
951handle_call({dict_match, Pattern}, _From, S) ->
952    case catch ets:match_object(S#state.dict_tab, Pattern) of
953        {'EXIT', _Reason} ->
954            reply([], S);
955        Matching ->
956            reply(Matching, S)
957    end;
958
959handle_call(get_table_handle, _From, S) ->
960    [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}),
961    TH = #table_handle{collector_pid = self(),
962                       event_tab     = S#state.event_tab,
963                       event_order   = S#state.event_order,
964                       filter        = TableFilter},
965    reply({ok, TH}, S);
966
967handle_call(get_table_size, _From, S) ->
968    Size = ets:info(S#state.event_tab, size),
969    reply({ok, Size}, S);
970
971handle_call(close, _From, S) ->
972    case S#state.file of
973        undefined ->
974            reply({error, file_not_open}, S);
975        F ->
976            Reply = disk_log:close(F#file.desc),
977            S2 = S#state{file = undefined},
978            reply(Reply, S2)
979    end;
980handle_call({save_event_file, FileName, Options}, _From, S) ->
981    Default = #file{name      = FileName,
982                    event_opt = existing,
983                    file_opt  = write,
984                    table_opt = keep},
985    case parse_file_options(Default, Options) of
986        {ok, F} when is_record(F, file) ->
987            case file_open(F) of
988                {ok, Fd} ->
989                    F2 = F#file{desc = Fd},
990                    {Reply2, S3} =
991                        case F2#file.event_opt of
992                            %% new ->
993                            %%     Reply = ok,
994                            %%     S2 = S#state{file = F},
995                            %%     {Reply, S2};
996                            %%
997                            %% insert() ->
998                            %%   case S2#state.file of
999                            %%       undefined ->
1000                            %%           ok;
1001                            %%       F  ->
1002                            %%           Fd = F#file.desc,
1003                            %%           ok = disk_log:log(Fd, Event)
1004                            %%   end.
1005                            existing ->
1006                                Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end,
1007                                Tab = S#state.event_tab,
1008                                Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok),
1009                                ok = disk_log:close(Fd),
1010                                {Reply, S}
1011                            %% all ->
1012                            %%     Reply = tab_iterate(WriteFun, Tab, ok),
1013                            %%     S2 = S#state{file = F},
1014                            %%     {Reply, S2}
1015                        end,
1016                    case F2#file.table_opt of
1017                        keep ->
1018                            reply(Reply2, S3);
1019                        clear ->
1020                            S4 = do_clear_table(S3),
1021                            reply(Reply2, S4)
1022                    end;
1023                {error, Reason} ->
1024                    reply({error, {file_open, Reason}}, S)
1025            end;
1026        {error, Reason} ->
1027            reply({error, Reason}, S)
1028    end;
1029
1030handle_call({change_pattern, Pattern}, _From, S) ->
1031    Ns = S#state.trace_nodes,
1032    {_,[]} = rpc:multicall(Ns, et_selector, change_pattern, [Pattern]),
1033    Reply = {old_pattern, S#state.trace_pattern},
1034    S2 = S#state{trace_pattern = Pattern},
1035    reply(Reply, S2);
1036
1037handle_call(clear_table, _From, S) ->
1038    S2 = do_clear_table(S),
1039    reply(ok, S2);
1040
1041handle_call(stop, _From, S) ->
1042    do_multicast(S#state.subscribers, close),
1043    case S#state.trace_global of
1044        true  -> {_,[]} = rpc:multicall(S#state.trace_nodes, dbg, stop_clear, []),
1045                 ok;
1046        false -> ok
1047    end,
1048    {stop, shutdown, ok, S};
1049handle_call(Request, From, S) ->
1050    ok = error_logger:format("~p(~p): handle_call(~tp, ~tp, ~tp)~n",
1051                             [?MODULE, self(), Request, From, S]),
1052    reply({error, {bad_request, Request}}, S).
1053
1054%%----------------------------------------------------------------------
1055%% Func: handle_cast/2
1056%% Returns: {noreply, State}          |
1057%%          {noreply, State, Timeout} |
1058%%          {stop, Reason, State}            (terminate/2 is called)
1059%%----------------------------------------------------------------------
1060
1061handle_cast(Msg, S) ->
1062    ok = error_logger:format("~p(~p): handle_cast(~tp, ~tp)~n",
1063                             [?MODULE, self(), Msg, S]),
1064    noreply(S).
1065
1066%%----------------------------------------------------------------------
1067%% Func: handle_info/2
1068%% Returns: {noreply, State}          |
1069%%          {noreply, State, Timeout} |
1070%%          {stop, Reason, State}            (terminate/2 is called)
1071%%----------------------------------------------------------------------
1072
1073handle_info(timeout, S) ->
1074    S2 = check_size(S),
1075    noreply(S2);
1076handle_info({nodeup, Node}, S) ->
1077    Port     = S#state.trace_port,
1078    MaxQueue = S#state.trace_max_queue,
1079    case rpc:call(Node, ?MODULE, monitor_trace_port, [self(), {Port, MaxQueue}]) of
1080        {ok, _} ->
1081            S2 = listen_on_trace_port(Node, Port, S),
1082	    noreply(S2);
1083        {error, Reason} when Reason =:= already_started->
1084            ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n    ~tp~n",
1085                                     [?MODULE, self(), Node, Port, Reason]),
1086            S2 = S#state{trace_port = Port + 1},
1087            noreply(S2);
1088        {badrpc, Reason} ->
1089            ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n    ~tp~n",
1090                                     [?MODULE, self(), Node, Port, Reason]),
1091            S2 = S#state{trace_port = Port + 1},
1092            noreply(S2);
1093        {error, Reason} ->
1094            self() ! {nodeup, Node},
1095            ok = error_logger:format("~p(~p): producer retry(~p:~p):~n     ~tp~n",
1096                                     [?MODULE, self(), Node, Port, Reason]),
1097            S2 = S#state{trace_port = Port + 1},
1098            noreply(S2)
1099    end;
1100
1101handle_info({nodedown, Node}, S) ->
1102    noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]});
1103
1104handle_info({register_trace_client, Pid}, S) ->
1105    link(Pid),
1106    noreply(S);
1107
1108handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid ->
1109    {stop, Reason, S};
1110handle_info(Info = {'EXIT', Pid, Reason}, S) ->
1111   OldSubscribers = S#state.subscribers,
1112    case lists:member(Pid, OldSubscribers) of
1113        true when Reason =:= shutdown ->
1114	    try
1115		S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),
1116		noreply(S2)
1117	    catch
1118		throw:{stop, R} ->
1119		    opt_unlink(S#state.parent_pid),
1120		    {stop, R, S}
1121	    end;
1122	true ->
1123	    opt_unlink(S#state.parent_pid),
1124	    {stop, Reason, S};
1125        false ->
1126            ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n",
1127                                     [?MODULE, self(), Info, S]),
1128            noreply(S)
1129    end;
1130handle_info(Info, S) ->
1131    ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n",
1132                             [?MODULE, self(), Info, S]),
1133    noreply(S).
1134
1135listen_on_trace_port(Node, Port, S) ->
1136    [_Name, Host] = string:lexemes(atom_to_list(Node), [$@]),
1137    case catch start_trace_client(self(), ip, {Host, Port}) of
1138        {trace_client_pid, RemotePid} ->
1139            rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]),
1140            link(RemotePid),
1141            S#state{trace_nodes = [Node | S#state.trace_nodes],
1142		    trace_port  = Port + 1};
1143        {'EXIT', Reason} when Reason =:= already_started->
1144            ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~tp~n",
1145                                     [?MODULE, self(), Node, Port, Reason]),
1146            S#state{trace_port = Port + 1};
1147        {'EXIT', Reason} ->
1148            self() ! {nodeup, Node},
1149            ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n     ~tp~n",
1150                                     [?MODULE, self(), Node, Port, Reason]),
1151            S#state{trace_port = Port + 1}
1152    end.
1153
1154%%----------------------------------------------------------------------
1155%% Func: terminate/2
1156%% Purpose: Shutdown the server
1157%% Returns: any (ignored by gen_server)
1158%%----------------------------------------------------------------------
1159
1160terminate(Reason, S) ->
1161    Fun = fun(Pid) -> exit(Pid, Reason) end,
1162    lists:foreach(Fun, S#state.subscribers).
1163
1164%%----------------------------------------------------------------------
1165%% Func: code_change/3
1166%% Purpose: Convert process state when code is changed
1167%% Returns: {ok, NewState}
1168%%----------------------------------------------------------------------
1169
1170code_change(_OldVsn, S, _Extra) ->
1171    {ok, S}.
1172
1173%%%----------------------------------------------------------------------
1174%%% Internal functions
1175%%%----------------------------------------------------------------------
1176
1177do_clear_table(S) ->
1178    OldTab = S#state.event_tab,
1179    ets:delete(OldTab),
1180    NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
1181    S#state{event_tab = NewTab}.
1182
1183do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) ->
1184    OldSubscribers = S#state.subscribers,
1185    NewSubscribers =
1186        case lists:member(Pid, OldSubscribers) of
1187            true  ->
1188                OldSubscribers;
1189            false ->
1190                link(Pid),
1191                All = ets:match_object(S#state.dict_tab, '_'),
1192                lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All),
1193                [Pid | OldSubscribers]
1194        end,
1195    do_multicast(NewSubscribers, Msg),
1196    Size = ets:info(S#state.event_tab, size),
1197    do_multicast(NewSubscribers, {more_events, Size}),
1198    ets:insert(S#state.dict_tab, {Key, Val}),
1199    S#state{subscribers = NewSubscribers};
1200do_dict_insert(Msg = {dict_insert, Key, Val}, S) ->
1201    do_multicast(S#state.subscribers, Msg),
1202    ets:insert(S#state.dict_tab, {Key, Val}),
1203    S.
1204
1205do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) ->
1206    OldSubscribers = S#state.subscribers,
1207    do_multicast(OldSubscribers, Msg),
1208    ets:delete(S#state.dict_tab, Key),
1209    case lists:member(Pid, OldSubscribers) of
1210	true  ->
1211	    unlink(Pid),
1212	    S2 = S#state{subscribers = OldSubscribers -- [Pid]},
1213	    if
1214		S2#state.auto_shutdown,
1215		S2#state.subscribers =:= [] ->
1216		    throw({stop, shutdown});
1217		true ->
1218		    S2
1219	    end;
1220	false ->
1221	    S
1222    end;
1223do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) ->
1224    S;
1225do_dict_delete(Msg = {dict_delete, Key}, S) ->
1226    do_multicast(S#state.subscribers, Msg),
1227    ets:delete(S#state.dict_tab, Key),
1228    S.
1229
1230tab_iterate(_Fun, _Tab, '$end_of_table', Acc) ->
1231    Acc;
1232tab_iterate(Fun, Tab, Key, Acc) ->
1233    Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)),
1234    tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2).
1235
1236file_open(F) ->
1237    Fd = make_ref(),
1238    case F#file.file_opt of
1239        write  -> ok = file:rename(F#file.name, F#file.name ++ ".OLD");
1240        append -> ok
1241    end,
1242    Args = [{file, F#file.name}, {name, Fd},
1243            {repair, true}, {mode, read_write}],
1244    case disk_log:open(Args) of
1245        {ok, _} ->
1246            {ok, Fd};
1247        {repaired, _, _, BadBytes} ->
1248            ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~tp~n",
1249                                     [?MODULE, BadBytes, F#file.name]),
1250            {ok, Fd};
1251        {error,Reason} ->
1252            {error,Reason}
1253    end.
1254
1255parse_file_options(F, [H | T]) ->
1256    case H of
1257        existing -> parse_file_options(F#file{event_opt = existing} , T);
1258        %%new      -> parse_file_options(F#file{event_opt = new} , T);
1259        all      -> parse_file_options(F#file{event_opt = all} , T);
1260        write    -> parse_file_options(F#file{file_opt  = write} , T);
1261        append   -> parse_file_options(F#file{file_opt  = append} , T);
1262        keep     -> parse_file_options(F#file{table_opt = keep} , T);
1263        clear    -> parse_file_options(F#file{table_opt = clear} , T);
1264        Bad      -> {error, {bad_file_option, Bad}}
1265    end;
1266parse_file_options(F, []) ->
1267    {ok, F}.
1268
1269do_multicast([Pid | Pids], Msg) ->
1270    Pid ! {et, Msg},
1271    do_multicast(Pids, Msg);
1272do_multicast([], _Msg) ->
1273    ok.
1274
1275opt_unlink(Pid) ->
1276    if
1277	Pid =:= undefined ->
1278	    ok;
1279	true ->
1280	    unlink(Pid)
1281    end.
1282
1283reply(Reply, #state{subscribers = []} = S) ->
1284    {reply, Reply, S};
1285reply(Reply, S) ->
1286    {reply, Reply, S, 500}.
1287
1288noreply(#state{subscribers = []} = S) ->
1289    {noreply, S};
1290noreply(S) ->
1291    {noreply, S, 500}.
1292
1293check_size(S) ->
1294    Size = ets:info(S#state.event_tab, size),
1295    if
1296	Size =:= S#state.event_tab_size ->
1297	    S;
1298	true ->
1299	    %% Tell the subscribers that more events are available
1300	    Msg = {more_events, Size},
1301	    do_multicast(S#state.subscribers, Msg),
1302	    S#state{event_tab_size = Size}
1303    end.
1304