1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2000-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20%%----------------------------------------------------------------------
21%% Purpose: Collect trace events and provide a backing storage
22%%          appropriate for iteration
23%%----------------------------------------------------------------------
24
25-module(et_collector).
26
27-behaviour(gen_server).
28
29%% External exports
30-export([start_link/1,
31         stop/1,
32
33         report/2,
34         report_event/5,
35         report_event/6,
36
37         iterate/3,
38         iterate/5,
39	 lookup/2,
40
41         start_trace_client/3,
42         start_trace_port/1,
43         %% load_event_file/2,
44         save_event_file/3,
45         clear_table/1,
46
47         get_global_pid/0,
48         %% get_table_handle/1,
49	 get_table_size/1,
50         change_pattern/2,
51         make_key/2,
52
53         dict_insert/3,
54         dict_delete/2,
55         dict_lookup/2,
56         dict_match/2,
57         multicast/2]).
58
59%% Internal export
60-export([monitor_trace_port/2]).
61
62%% gen_server callbacks
63-export([init/1,terminate/2, code_change/3,
64         handle_call/3, handle_cast/2, handle_info/2]).
65
66-compile([{nowarn_deprecated_function,[{erlang,now,0}]}]).
67
68-include("et_internal.hrl").
69-include("../include/et.hrl").
70
71-record(state, {parent_pid,
72		auto_shutdown, % Optionally shutdown when the last subscriber dies
73		event_tab_size,
74                event_tab,
75                dict_tab,
76                event_order,
77                subscribers,
78                file,
79                trace_pattern,
80                trace_port,
81                trace_max_queue,
82                trace_nodes,
83                trace_global}).
84
85-record(file, {name, desc, event_opt, file_opt, table_opt}).
86
87-record(table_handle, {collector_pid, event_tab, event_order, filter}).
88
89-record(trace_ts, {trace_ts, event_ts}).
90-record(event_ts, {event_ts, trace_ts}).
91
92%%%----------------------------------------------------------------------
93%%% Client side
94%%%----------------------------------------------------------------------
95
96%%----------------------------------------------------------------------
97%% start_link(Options) -> {ok, CollectorPid} | {error, Reason}
98%%
99%% Start a collector process
100%%
101%% The collector collects trace events and keeps them ordered by their
102%% timestamp. The timestamp may either reflect the time when the
103%% actual trace data was generated (trace_ts) or when the trace data
104%% was transformed into an event record (event_ts). If the time stamp
105%% is missing in the trace data (missing timestamp option to
106%% erlang:trace/4) the trace_ts will be set to the event_ts.
107%%
108%% Events are reported to the collector directly with the report
109%% function or indirectly via one or more trace clients. All reported
110%% events are first filtered thru the collector filter before they are
111%% stored by the collector. By replacing the default collector filter
112%% with a customized dito it is possible to allow any trace data as
113%% input. The collector filter is a dictionary entry with the
114%% predefined key {filter, all} and the value is a fun of
115%% arity 1. See et_selector:parse_event/2 for interface details,
116%% such as which erlang:trace/1 tuples that are accepted.
117%%
118%% The collector has a built-in dictionary service. Any term may be
119%% stored as value in the dictionary and bound to a unique key. When
120%% new values are inserted with an existing key, the new values will
121%% overwrite the existing ones. Processes may subscribe on dictionary
122%% updates by using {subscriber, pid()} as dictionary key. All
123%% dictionary updates will be propagated to the subscriber processes
124%% matching the pattern {{subscriber, '_'}, '_'} where the first '_'
125%% is interpreted as a pid().
126%%
127%% In global trace mode, the collector will automatically start
128%% tracing on all connected Erlang nodes. When a node connects, a port
129%% tracer will be started on that node and a corresponding trace
130%% client on the collector node. By default the global trace pattern
131%% is 'max'.
132%%
133%% Options = [option()]
134%%
135%% option() =
136%%   {parent_pid, pid()} |
137%%   {event_order, event_order()} |
138%%   {dict_insert, {filter, all}, collector_fun()} |
139%%   {dict_insert, {filter, event_filter_name()}, event_filter_fun()} |
140%%   {dict_insert, {subscriber, pid()}, dict_val()} |
141%%   {dict_insert, dict_key(), dict_val()} |
142%%   {dict_delete, dict_key()} |
143%%   {trace_client, trace_client()} |
144%%   {trace_global, boolean()} |
145%%   {trace_pattern, trace_pattern()} |
146%%   {trace_port, integer()} |
147%%   {trace_max_queue, integer()}
148%%
149%% event_order() = trace_ts | event_ts
150%% trace_pattern() = detail_level() | dbg_match_spec()
151%% detail_level() = min | max | integer(X) when X >= 0, X =< 100
152%% trace_client() =
153%%   {event_file, file_name()} |
154%%   {dbg_trace_type(), dbg_trace_parameters()}
155%% file_name() = string()
156%% collector_fun() = trace_filter_fun() | event_filter_fun()
157%% trace_filter_fun() = fun(TraceData) -> false | true | {true, NewEvent}
158%% event_filter_fun() = fun(Event) -> false | true | {true, NewEvent}
159%% event_filter_name() = atom()
160%% TraceData = erlang_trace_data()
161%% Event = NewEvent = record(event)
162%% dict_key() = term()
163%% dict_val() = term()
164%%
165%% CollectorPid = pid()
166%% Reason = term()
167%%----------------------------------------------------------------------
168
169start_link(Options) ->
170    case parse_opt(Options, default_state(), [], []) of
171	{ok, S, Dict2, Clients} ->
172	    Res =
173		case S#state.trace_global of
174		    false ->
175			gen_server:start_link(?MODULE, [S, Dict2], []);
176		    true ->
177			gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], [])
178		end,
179            case Res of
180                {ok, Pid} when S#state.parent_pid =/= self() ->
181                    unlink(Pid),
182                    start_clients(Pid, Clients);
183                {ok,Pid} ->
184                    start_clients(Pid, Clients);
185                {error, Reason} ->
186                    {error, Reason}
187            end;
188        {error, Reason} ->
189            {error, Reason}
190    end.
191
192default_state() ->
193    #state{parent_pid      = self(),
194	   auto_shutdown   = false,
195           event_order     = trace_ts,
196           subscribers     = [],
197           trace_global    = false,
198           trace_pattern   = undefined,
199           trace_nodes     = [],
200           trace_port      = 4711,
201           trace_max_queue = 50}.
202
203parse_opt([], S, Dict, Clients) ->
204    {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern),
205    Fun = fun(E) -> et_selector:parse_event(Mod, E) end,
206    Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun},
207    {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients};
208parse_opt([H | T], S, Dict, Clients) ->
209    case H of
210        {parent_pid, Parent} when Parent =:= undefined ->
211            parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
212        {parent_pid, Parent} when is_pid(Parent) ->
213            parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
214        {auto_shutdown, Bool} when Bool =:= true; Bool =:= false ->
215            parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients);
216        {event_order, Order} when Order =:= trace_ts ->
217            parse_opt(T, S#state{event_order = Order}, Dict, Clients);
218        {event_order, Order}  when Order =:= event_ts ->
219            parse_opt(T, S#state{event_order = Order}, Dict, Clients);
220        {dict_insert, {filter, Name}, Fun} ->
221	    if
222		is_atom(Name), is_function(Fun) ->
223		    parse_opt(T, S, Dict ++ [H], Clients);
224		true ->
225	            {error, {bad_option, H}}
226	    end;
227        {dict_insert, {subscriber, Pid}, _Val} ->
228	    if
229		is_pid(Pid) ->
230		    parse_opt(T, S, Dict ++ [H], Clients);
231		true ->
232	            {error, {bad_option, H}}
233	    end;
234        {dict_insert, _Key, _Val} ->
235            parse_opt(T, S, Dict ++ [H], Clients);
236        {dict_delete, _Key} ->
237            parse_opt(T, S, Dict ++ [H], Clients);
238        {trace_client, Client = {_, _}} ->
239            parse_opt(T, S, Dict, Clients ++ [Client]);
240        {trace_global, Bool} when Bool =:= false ->
241            parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
242        {trace_global, Bool} when Bool =:= true ->
243            parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
244        {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) ->
245            parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
246        {trace_pattern, undefined = Pattern} ->
247            parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
248        {trace_port, Port} when is_integer(Port) ->
249            parse_opt(T, S#state{trace_port = Port}, Dict, Clients);
250        {trace_max_queue, MaxQueue} when is_integer(MaxQueue) ->
251            parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients);
252        Bad ->
253            {error, {bad_option, Bad}}
254    end;
255parse_opt(BadList, _S, _Dict, _Clients) ->
256    {error, {bad_option_list, BadList}}.
257
258start_clients(CollectorPid, [{Type, Parameters} | T]) ->
259    _ = start_trace_client(CollectorPid, Type, Parameters),
260    start_clients(CollectorPid, T);
261start_clients(CollectorPid, []) ->
262    {ok, CollectorPid}.
263
264%%----------------------------------------------------------------------
265%% stop(CollectorPid) -> ok
266%%
267%% Stop a collector process
268%%
269%% CollectorPid = pid()
270%%----------------------------------------------------------------------
271
272stop(CollectorPid) ->
273    call(CollectorPid, stop).
274
275%%----------------------------------------------------------------------
276%% save_event_file(CollectorPid, FileName, Options) -> ok | {error, Reason}
277%%
278%% Saves the events to a file
279%%
280%% CollectorPid = pid()
281%% FileName = string()
282%% Options = [option()]
283%% Reason = term()
284%%
285%% option() = event_option() | file_option() | table_option()
286%% event_option() = existing
287%% file_option() = write | append
288%% table_option() = keep | clear
289%%
290%% By default the currently stored events (existing) are
291%% written to a brand new file (write) and the events are
292%% kept (keep) after they have been written to the file.
293%%
294%% Instead of keeping the events after writing them to file,
295%% it is possible to remove all stored events after they
296%% have successfully written to file (clear).
297%%
298%% The options defaults to existing, write and keep.
299%%----------------------------------------------------------------------
300
301save_event_file(CollectorPid, FileName, Options) ->
302    call(CollectorPid, {save_event_file, FileName, Options}).
303
304%%----------------------------------------------------------------------
305%% load_event_file(CollectorPid, FileName) ->{ok, BadBytes} | exit(Reason)
306%%
307%% Load the event table from a file
308%%
309%% CollectorPid = pid()
310%% FileName = string()
311%% BadBytes = integer(X) where X >= 0
312%% Reason = term()
313%%----------------------------------------------------------------------
314
315load_event_file(CollectorPid, FileName) ->
316    Fd = make_ref(),
317    Args = [{file, FileName}, {name, Fd}, {repair, true}, {mode, read_only}],
318    Fun = fun(Event, {ok, TH}) -> report(TH, Event) end,
319    case disk_log:open(Args) of
320        {ok, _} ->
321            do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, 0);
322        {repaired, _, _, BadBytes} ->
323            do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, BadBytes);
324        {error, Reason} ->
325            exit({disk_log_open, FileName, Reason})
326    end.
327
328do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) ->
329    case disk_log:chunk(Fd, Cont) of
330        eof ->
331            {ok, BadBytes};
332        {error, Reason} ->
333            exit({bad_disk_log_chunk, FileName, Reason});
334        {Cont2, Events} ->
335            Acc2 = lists:foldl(Fun, Acc, Events),
336            do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes);
337        {Cont2, Events, More} ->
338            Acc2 = lists:foldl(Fun, Acc, Events),
339            do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes + More)
340    end.
341
342%%----------------------------------------------------------------------
343%% report(Handle, TraceOrEvent)
344%%
345%% Report an event to the collector
346%%
347%% All events are filtered thru the collector filter, which
348%% optionally may transform or discard the event. The first
349%% call should use the pid of the collector process as
350%% report handle, while subsequent calls should use the
351%% table handle.
352%%
353%% Handle = Initial | Continuation
354%% Initial = collector_pid()
355%% collector_pid() = pid()
356%% Continuation = record(table_handle)
357%%
358%% TraceOrEvent = record(event) | dbg_trace_tuple() | end_of_trace
359%% Reason = term()
360%%
361%% Returns: {ok, Continuation} | exit(Reason)
362%%----------------------------------------------------------------------
363
364report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) ->
365    case get_table_handle(CollectorPid) of
366        {ok, TH} when is_record(TH, table_handle) ->
367            report(TH, TraceOrEvent);
368        {error, Reason} ->
369            exit(Reason)
370    end;
371report(TH, TraceOrEvent) when is_record(TH, table_handle) ->
372    Fun = TH#table_handle.filter,
373    case Fun(TraceOrEvent) of
374        false ->
375            {ok, TH};
376        true when is_record(TraceOrEvent, event) ->
377            Key = make_key(TH, TraceOrEvent),
378            case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of
379                true ->
380                    {ok, TH};
381                {'EXIT', _Reason} ->
382                    %% Refresh the report handle and try again
383                    report(TH#table_handle.collector_pid, TraceOrEvent)
384            end;
385        {true, Event} when is_record(Event, event) ->
386            Key = make_key(TH, Event),
387            case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
388                true ->
389                    {ok, TH};
390                {'EXIT', _Reason} ->
391                    %% Refresh the report handle and try again
392                    report(TH#table_handle.collector_pid, TraceOrEvent)
393            end;
394        BadEvent ->
395            TS = erlang:now(),
396            Contents = [{trace, TraceOrEvent}, {reason, BadEvent}, {filter, Fun}],
397            Event = #event{detail_level = 0,
398                           trace_ts     = TS,
399                           event_ts     = TS,
400                           from         = bad_filter,
401                           to           = bad_filter,
402                           label        = bad_filter,
403                           contents     = Contents},
404            Key = make_key(TH, Event),
405            case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
406                true ->
407                    {ok, TH};
408                {'EXIT', _Reason} ->
409                    %% Refresh the report handle and try again
410                    report(TH#table_handle.collector_pid, TraceOrEvent)
411            end
412    end;
413report(_, Bad) ->
414    exit({bad_event, Bad}).
415
416report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) ->
417    report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents).
418
419report_event(CollectorPid, DetailLevel, From, To, Label, Contents)
420  when is_integer(DetailLevel),
421       DetailLevel >= ?detail_level_min,
422       DetailLevel =< ?detail_level_max ->
423    TS= erlang:now(),
424    E = #event{detail_level = DetailLevel,
425               trace_ts     = TS,
426               event_ts     = TS,
427               from         = From,
428               to           = To,
429               label        = Label,
430               contents     = Contents},
431    report(CollectorPid, E).
432
433%%----------------------------------------------------------------------
434%% make_key(Type, Stuff) -> Key
435%%
436%% Makes a key out of an event record or an old key
437%%
438%% Type = record(table_handle) | trace_ts | event_ts
439%% Stuff = record(event) | Key
440%% Key = record(event_ts) | record(trace_ts)
441%%----------------------------------------------------------------------
442
443make_key(TH, Stuff) when is_record(TH, table_handle) ->
444    make_key(TH#table_handle.event_order, Stuff);
445make_key(trace_ts, Stuff) ->
446    if
447        is_record(Stuff, event) ->
448            #event{trace_ts = R, event_ts = P} = Stuff,
449            #trace_ts{trace_ts = R, event_ts = P};
450        is_record(Stuff, trace_ts) ->
451            Stuff;
452        is_record(Stuff, event_ts) ->
453            #event_ts{trace_ts = R, event_ts = P} = Stuff,
454            #trace_ts{trace_ts = R, event_ts = P}
455    end;
456make_key(event_ts, Stuff) ->
457    if
458        is_record(Stuff, event) ->
459            #event{trace_ts = R, event_ts = P} = Stuff,
460            #event_ts{trace_ts = R, event_ts = P};
461        is_record(Stuff, event_ts) ->
462            Stuff;
463        is_record(Stuff, trace_ts) ->
464            #trace_ts{trace_ts = R, event_ts = P} = Stuff,
465            #event_ts{trace_ts = R, event_ts = P}
466    end.
467
468%%----------------------------------------------------------------------
469%%----------------------------------------------------------------------
470
471get_table_size(CollectorPid) when is_pid(CollectorPid) ->
472    call(CollectorPid, get_table_size).
473
474%%----------------------------------------------------------------------
475%% get_table_handle(CollectorPid) -> Handle
476%%
477%% Return a table handle
478%%
479%% CollectorPid = pid()
480%% Handle = record(table_handle)
481%%----------------------------------------------------------------------
482
483get_table_handle(CollectorPid) when is_pid(CollectorPid) ->
484    call(CollectorPid, get_table_handle).
485
486%%----------------------------------------------------------------------
487%% get_global_pid() -> CollectorPid | exit(Reason)
488%%
489%% Return a the identity of the globally registered collector
490%% if there is any
491%%
492%% CollectorPid = pid()
493%% Reason = term()
494%%----------------------------------------------------------------------
495
496get_global_pid() ->
497    case global:whereis_name(?MODULE) of
498        CollectorPid when is_pid(CollectorPid) ->
499            CollectorPid;
500        undefined ->
501            exit(global_collector_not_started)
502    end.
503
504%%----------------------------------------------------------------------
505%% change_pattern(CollectorPid, RawPattern) -> {old_pattern, TracePattern}
506%%
507%% Change active trace pattern globally on all trace nodes
508%%
509%% CollectorPid = pid()
510%% RawPattern = {report_module(), extended_dbg_match_spec()}
511%% report_module() = atom() | undefined
512%% extended_dbg_match_spec() = detail_level() | dbg_match_spec()
513%% RawPattern = detail_level()
514%% detail_level() = min | max | integer(X) when X =< 0, X >= 100
515%% TracePattern = {report_module(), dbg_match_spec_match_spec()}
516%%----------------------------------------------------------------------
517
518change_pattern(CollectorPid, RawPattern) ->
519    Pattern = et_selector:make_pattern(RawPattern),
520    call(CollectorPid, {change_pattern, Pattern}).
521
522%%----------------------------------------------------------------------
523%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok
524%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok
525%% dict_insert(CollectorPid, Key, Val) -> ok
526%%
527%% Insert a dictionary entry
528%% and send a {et, {dict_insert, Key, Val}} tuple
529%% to all registered subscribers.
530%%
531%% If the entry is a new subscriber, it will imply that
532%% the new subscriber process first will get one message
533%% for each already stored dictionary entry, before it
534%% and all old subscribers will get this particular entry.
535%% The collector process links to and then supervises the
536%% subscriber process. If the subscriber process dies it
537%% will imply that it gets unregistered as with a normal
538%% dict_delete/2.
539%%
540%% CollectorPid = pid()
541%% FilterFun = filter_fun()
542%% SubscriberPid = pid()
543%% Void = term()
544%% Key = term()
545%% Val = term()
546%%----------------------------------------------------------------------
547
548dict_insert(CollectorPid, Key = {filter, Name}, Fun) ->
549    if
550	is_atom(Name), is_function(Fun) ->
551	    call(CollectorPid, {dict_insert, Key, Fun});
552	true ->
553	    exit({badarg, Key})
554    end;
555dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) ->
556    if
557	is_pid(Pid) ->
558	    call(CollectorPid, {dict_insert, Key, Val});
559	true ->
560	    exit({badarg, Key})
561    end;
562dict_insert(CollectorPid, Key, Val) ->
563    call(CollectorPid, {dict_insert, Key, Val}).
564
565%%----------------------------------------------------------------------
566%% dict_lookup(CollectorPid, Key) -> [Val]
567%%
568%% Lookup a dictionary entry and return zero or one value
569%%
570%% CollectorPid = pid()
571%% Key = term()
572%% Val = term()
573%%----------------------------------------------------------------------
574
575dict_lookup(CollectorPid, Key) ->
576    call(CollectorPid, {dict_lookup, Key}).
577
578%%----------------------------------------------------------------------
579%% Ddict_delete(CollectorPid, Key) -> ok
580%%
581%% elete a dictionary entry
582%% and send a {et, {dict_delete, Key}} tuple
583%% to all registered subscribers.
584%%
585%% If the deleted entry is a registered subscriber, it will
586%% imply that the subscriber process gets is unregistered as
587%% subscriber as well as it gets it final message.
588%%
589%% dict_delete(CollectorPid, {subscriber, SubscriberPid})
590%% dict_delete(CollectorPid, Key)
591%%
592%% CollectorPid = pid()
593%% SubscriberPid = pid()
594%% Key = term()
595%%----------------------------------------------------------------------
596
597dict_delete(CollectorPid, Key) ->
598    call(CollectorPid, {dict_delete, Key}).
599
600%%----------------------------------------------------------------------
601%% dict_match(CollectorPid, Pattern) -> [Match]
602%%
603%% Match some dictionary entries
604%%
605%% CollectorPid = pid()
606%% Pattern = '_' | {key_pattern(), val_pattern()}
607%% key_pattern() = ets_match_object_pattern()
608%% val_pattern() = ets_match_object_pattern()
609%% Match = {key(), val()}
610%% key() = term()
611%% val() = term()
612%%----------------------------------------------------------------------
613
614dict_match(CollectorPid, Pattern)  ->
615    call(CollectorPid, {dict_match, Pattern}).
616
617%%----------------------------------------------------------------------
618%% multicast(_CollectorPid, Msg) -> ok
619%%
620%% Sends a message to all registered subscribers
621%%
622%% CollectorPid = pid()
623%% Msg = term()
624%%----------------------------------------------------------------------
625
626multicast(_CollectorPid, Msg = {dict_insert, _Key, _Val}) ->
627    exit({badarg, Msg});
628multicast(_CollectorPid, Msg = {dict_delete, _Key}) ->
629    exit({badarg, Msg});
630multicast(CollectorPid, Msg) ->
631    call(CollectorPid, {multicast, Msg}).
632
633%%----------------------------------------------------------------------
634%% start_trace_client(CollectorPid, Type, Parameters) ->
635%%     file_loaded | {trace_client_pid, pid()} | exit(Reason)
636%%
637%% Load raw Erlang trace from a file, port or process.
638%%
639%% Type       = dbg_trace_client_type()
640%% Parameters = dbg_trace_client_parameters()
641%% Pid        = dbg_trace_client_pid()
642%%----------------------------------------------------------------------
643
644start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file ->
645    load_event_file(CollectorPid, FileName);
646start_trace_client(CollectorPid, Type, FileName) when Type =:= file ->
647    WaitFor = {make_ref(), end_of_trace},
648    EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end,
649    EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo  end,
650    Spec = trace_spec_wrapper(EventFun, EndFun, {self(), {ok, CollectorPid}}),
651    Pid = dbg:trace_client(Type, FileName, Spec),
652    unlink(Pid),
653    Ref = erlang:monitor(process, Pid),
654    receive
655        WaitFor ->
656	    erlang:demonitor(Ref, [flush]),
657	    file_loaded;
658        {'DOWN', Ref, _, _, Reason} ->
659            exit(Reason)
660    end;
661start_trace_client(CollectorPid, Type, Parameters) ->
662    EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end,
663    EndFun   = fun(Acc) -> Acc end,
664    Spec = trace_spec_wrapper(EventFun, EndFun, {ok, CollectorPid}),
665    Pid = dbg:trace_client(Type, Parameters, Spec),
666    CollectorPid ! {register_trace_client, Pid},
667    unlink(Pid),
668    {trace_client_pid, Pid}.
669
670trace_spec_wrapper(EventFun, EndFun, EventInitialAcc)
671  when is_function(EventFun), is_function(EndFun) ->
672    {fun(Trace, Acc) ->
673             case Trace =:= end_of_trace of
674                 true  -> EndFun(Acc);
675                 false -> EventFun(Trace,  Acc)
676             end
677     end,
678     EventInitialAcc}.
679
680start_trace_port(Parameters) ->
681    dbg:tracer(port, dbg:trace_port(ip, Parameters)).
682
683monitor_trace_port(CollectorPid, Parameters) ->
684    Res = start_trace_port(Parameters),
685    spawn(fun() ->
686		  MonitorRef = erlang:monitor(process, CollectorPid),
687		  receive
688		      {'DOWN', MonitorRef, _, _, _} ->
689			  dbg:stop_clear()
690		  end
691	  end),
692    Res.
693
694%%----------------------------------------------------------------------
695%% iterate(Handle, Prev, Limit) ->
696%%     iterate(Handle, Prev, Limit, undefined, Prev)
697%%
698%% Iterates over the currently stored events
699%%
700%% Short for iterate/5.
701%%----------------------------------------------------------------------
702
703iterate(Handle, Prev, Limit) ->
704    iterate(Handle, Prev, Limit, undefined, Prev).
705
706%%----------------------------------------------------------------------
707%% iterate(Handle, Prev, Limit, Fun, Acc) -> NewAcc
708%%
709%% Iterates over the currently stored events and apply a function for
710%% each event. The iteration may be performed forwards or backwards
711%% and may be limited to a maximum number of events (abs(Limit)).
712%%
713%% Handle = collector_pid() | table_handle()
714%% Prev = first | last | event_key()
715%% Limit = done() | forward() | backward()
716%% collector_pid() = pid()
717%% table_handle() = record(table_handle)
718%% event_key() =
719%% done() = 0
720%% forward() = infinity | integer(X) where X > 0
721%% backward() = '-infinity' | integer(X) where X < 0
722%% Fun = fun(Event, Acc) -> NewAcc
723%% Acc = NewAcc = term()
724%%----------------------------------------------------------------------
725
726iterate(_, _, Limit, _, Acc) when Limit =:= 0 ->
727    Acc;
728iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) ->
729    case get_table_handle(CollectorPid) of
730        {ok, TH} when is_record(TH, table_handle) ->
731            iterate(TH, Prev, Limit, Fun, Acc);
732        {error, Reason} ->
733            exit(Reason)
734    end;
735iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) ->
736    if
737        Limit =:= infinity ->
738            next_iterate(TH, Prev, Limit, Fun, Acc);
739        is_integer(Limit), Limit > 0 ->
740            next_iterate(TH, Prev, Limit, Fun, Acc);
741        Limit =:= '-infinity' ->
742            prev_iterate(TH, Prev, Limit, Fun, Acc);
743        is_integer(Limit), Limit < 0 ->
744            prev_iterate(TH, Prev, Limit, Fun, Acc)
745    end.
746
747next_iterate(TH, Prev = first, Limit, Fun, Acc) ->
748    Tab = TH#table_handle.event_tab,
749    case catch ets:first(Tab) of
750        '$end_of_table' ->
751            Acc;
752        {'EXIT', _} = Error ->
753            io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]),
754            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
755        First ->
756            lookup_and_apply(TH, Prev, First, Limit, -1, Fun, Acc)
757    end;
758next_iterate(TH, Prev = last, Limit, Fun, Acc) ->
759    Tab = TH#table_handle.event_tab,
760    case catch ets:last(Tab) of
761        '$end_of_table' ->
762            Acc;
763        {'EXIT', _} = Error ->
764            io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]),
765            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
766        Last ->
767            lookup_and_apply(TH, Prev, Last, Limit, -1, Fun, Acc)
768    end;
769next_iterate(TH, Prev, Limit, Fun, Acc) ->
770    Tab = TH#table_handle.event_tab,
771    Key = make_key(TH, Prev),
772    case catch ets:next(Tab, Key) of
773        '$end_of_table' ->
774            Acc;
775        {'EXIT', _} = Error ->
776            io:format("~p(~p): Next ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]),
777            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
778        Next ->
779            lookup_and_apply(TH, Prev, Next, Limit, -1, Fun, Acc)
780    end.
781
782prev_iterate(TH, Prev = first, Limit, Fun, Acc) ->
783    Tab = TH#table_handle.event_tab,
784    case catch ets:first(Tab) of
785        '$end_of_table' ->
786            Acc;
787        {'EXIT', _} = Error ->
788            io:format("~p(~p): First ~tp~n", [?MODULE, ?LINE, Error]),
789            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
790        First ->
791            lookup_and_apply(TH, Prev, First, Limit, 1, Fun, Acc)
792    end;
793prev_iterate(TH, Prev = last, Limit, Fun, Acc) ->
794    Tab = TH#table_handle.event_tab,
795    case catch ets:last(Tab) of
796        '$end_of_table' ->
797            Acc;
798        {'EXIT', _} = Error ->
799            io:format("~p(~p): Last ~tp~n", [?MODULE, ?LINE, Error]),
800            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
801        Last ->
802            lookup_and_apply(TH, Prev, Last, Limit, 1, Fun, Acc)
803    end;
804prev_iterate(TH, Prev, Limit, Fun, Acc) ->
805    Tab = TH#table_handle.event_tab,
806    Key = make_key(TH, Prev),
807    case catch ets:prev(Tab, Key) of
808        '$end_of_table' ->
809            Acc;
810        {'EXIT', _} = Error ->
811            io:format("~p(~p): Prev ~tp -> ~tp~n", [?MODULE, ?LINE, Key, Error]),
812            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
813        Next ->
814            lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc)
815    end.
816
817lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined ->
818    Limit2 = incr(Limit, Incr),
819    iterate(TH, Next, Limit2, Fun, Next);
820lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) ->
821    Tab = TH#table_handle.event_tab,
822    case catch ets:lookup_element(Tab, Next, 2) of
823        {'EXIT', _} ->
824            iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
825        E when is_record(E, event) ->
826            Acc2 = Fun(E, Acc),
827            Limit2 = incr(Limit, Incr),
828            iterate(TH, Next, Limit2, Fun, Acc2)
829    end.
830
831lookup(CollectorPid, Key) when is_pid(CollectorPid) ->
832    case get_table_handle(CollectorPid) of
833        {ok, TH} when is_record(TH, table_handle) ->
834            lookup(TH, Key);
835        {error, Reason} ->
836            {error, Reason}
837    end;
838lookup(TH, Key) when is_record(TH, table_handle) ->
839    Tab = TH#table_handle.event_tab,
840    case catch ets:lookup_element(Tab, Key, 2) of
841        {'EXIT', _} ->
842            {error, enoent};
843        E when is_record(E, event) ->
844	    {ok, E}
845    end.
846
847incr(Val, Incr) ->
848    if
849	Val =:= infinity    -> Val;
850	Val =:= '-infinity' -> Val;
851	is_integer(Val)       -> Val + Incr
852    end.
853
854%%----------------------------------------------------------------------
855%% clear_table(Handle) -> ok
856%%
857%% Clear the event table
858%%
859%% Handle = collector_pid() | table_handle()
860%% collector_pid() = pid()
861%% table_handle() = record(table_handle)
862%%----------------------------------------------------------------------
863
864clear_table(CollectorPid) when is_pid(CollectorPid) ->
865    call(CollectorPid, clear_table);
866clear_table(TH) when is_record(TH, table_handle) ->
867    clear_table(TH#table_handle.collector_pid).
868
869call(CollectorPid, Request) ->
870    try
871	gen_server:call(CollectorPid, Request, infinity)
872    catch
873	exit:{noproc,_} ->
874	    {error, no_collector}
875    end.
876
877%%%----------------------------------------------------------------------
878%%% Callback functions from gen_server
879%%%----------------------------------------------------------------------
880
881%%----------------------------------------------------------------------
882%% Func: init/1
883%% Returns: {ok, State}          |
884%%          {ok, State, Timeout} |
885%%          ignore               |
886%%          {stop, Reason}
887%%----------------------------------------------------------------------
888
889init([InitialS, Dict]) ->
890    process_flag(trap_exit, true),
891    case InitialS#state.parent_pid of
892	undefined ->
893	    ok;
894	Pid when is_pid(Pid) ->
895	    link(Pid)
896    end,
897    Funs = [fun init_tables/1,
898            fun init_global/1,
899            fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end],
900    {ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}.
901
902init_tables(S) ->
903    EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
904    DictTab  = ets:new(et_dict,   [ordered_set, {keypos, 1}, public]),
905    S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}.
906
907init_global(S) ->
908    case S#state.trace_global of
909        true ->
910            EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end,
911            EndFun = fun(Acc) -> Acc end,
912            Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}),
913            dbg:tracer(process, Spec),
914            et_selector:change_pattern(S#state.trace_pattern),
915            ok = net_kernel:monitor_nodes(true),
916            lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()),
917            S#state{trace_nodes = [node()]};
918        false ->
919            S
920    end.
921
922%%----------------------------------------------------------------------
923%% Func: handle_call/3
924%% Returns: {reply, Reply, State}          |
925%%          {reply, Reply, State, Timeout} |
926%%          {noreply, State}               |
927%%          {noreply, State, Timeout}      |
928%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
929%%          {stop, Reason, State}            (terminate/2 is called)
930%%----------------------------------------------------------------------
931
932handle_call({multicast, Msg}, _From, S) ->
933    do_multicast(S#state.subscribers, Msg),
934    reply(ok, S);
935
936handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) ->
937    S2 = do_dict_insert(Msg, S),
938    reply(ok, S2);
939
940handle_call(Msg = {dict_delete, _Key}, _From, S) ->
941    try
942	S2 = do_dict_delete(Msg, S),
943	reply(ok, S2)
944    catch
945	throw:{stop, R} ->
946	    opt_unlink(S#state.parent_pid),
947	    {stop, R, S}
948    end;
949handle_call({dict_lookup, Key}, _From, S) ->
950    Reply = ets:lookup(S#state.dict_tab, Key),
951    reply(Reply, S);
952
953handle_call({dict_match, Pattern}, _From, S) ->
954    case catch ets:match_object(S#state.dict_tab, Pattern) of
955        {'EXIT', _Reason} ->
956            reply([], S);
957        Matching ->
958            reply(Matching, S)
959    end;
960
961handle_call(get_table_handle, _From, S) ->
962    [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}),
963    TH = #table_handle{collector_pid = self(),
964                       event_tab     = S#state.event_tab,
965                       event_order   = S#state.event_order,
966                       filter        = TableFilter},
967    reply({ok, TH}, S);
968
969handle_call(get_table_size, _From, S) ->
970    Size = ets:info(S#state.event_tab, size),
971    reply({ok, Size}, S);
972
973handle_call(close, _From, S) ->
974    case S#state.file of
975        undefined ->
976            reply({error, file_not_open}, S);
977        F ->
978            Reply = disk_log:close(F#file.desc),
979            S2 = S#state{file = undefined},
980            reply(Reply, S2)
981    end;
982handle_call({save_event_file, FileName, Options}, _From, S) ->
983    Default = #file{name      = FileName,
984                    event_opt = existing,
985                    file_opt  = write,
986                    table_opt = keep},
987    case parse_file_options(Default, Options) of
988        {ok, F} when is_record(F, file) ->
989            case file_open(F) of
990                {ok, Fd} ->
991                    F2 = F#file{desc = Fd},
992                    {Reply2, S3} =
993                        case F2#file.event_opt of
994                            %% new ->
995                            %%     Reply = ok,
996                            %%     S2 = S#state{file = F},
997                            %%     {Reply, S2};
998                            %%
999                            %% insert() ->
1000                            %%   case S2#state.file of
1001                            %%       undefined ->
1002                            %%           ok;
1003                            %%       F  ->
1004                            %%           Fd = F#file.desc,
1005                            %%           ok = disk_log:log(Fd, Event)
1006                            %%   end.
1007                            existing ->
1008                                Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end,
1009                                Tab = S#state.event_tab,
1010                                Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok),
1011                                ok = disk_log:close(Fd),
1012                                {Reply, S}
1013                            %% all ->
1014                            %%     Reply = tab_iterate(WriteFun, Tab, ok),
1015                            %%     S2 = S#state{file = F},
1016                            %%     {Reply, S2}
1017                        end,
1018                    case F2#file.table_opt of
1019                        keep ->
1020                            reply(Reply2, S3);
1021                        clear ->
1022                            S4 = do_clear_table(S3),
1023                            reply(Reply2, S4)
1024                    end;
1025                {error, Reason} ->
1026                    reply({error, {file_open, Reason}}, S)
1027            end;
1028        {error, Reason} ->
1029            reply({error, Reason}, S)
1030    end;
1031
1032handle_call({change_pattern, Pattern}, _From, S) ->
1033    Ns = S#state.trace_nodes,
1034    {_,[]} = rpc:multicall(Ns, et_selector, change_pattern, [Pattern]),
1035    Reply = {old_pattern, S#state.trace_pattern},
1036    S2 = S#state{trace_pattern = Pattern},
1037    reply(Reply, S2);
1038
1039handle_call(clear_table, _From, S) ->
1040    S2 = do_clear_table(S),
1041    reply(ok, S2);
1042
1043handle_call(stop, _From, S) ->
1044    do_multicast(S#state.subscribers, close),
1045    case S#state.trace_global of
1046        true  -> {_,[]} = rpc:multicall(S#state.trace_nodes, dbg, stop_clear, []),
1047                 ok;
1048        false -> ok
1049    end,
1050    {stop, shutdown, ok, S};
1051handle_call(Request, From, S) ->
1052    ok = error_logger:format("~p(~p): handle_call(~tp, ~tp, ~tp)~n",
1053                             [?MODULE, self(), Request, From, S]),
1054    reply({error, {bad_request, Request}}, S).
1055
1056%%----------------------------------------------------------------------
1057%% Func: handle_cast/2
1058%% Returns: {noreply, State}          |
1059%%          {noreply, State, Timeout} |
1060%%          {stop, Reason, State}            (terminate/2 is called)
1061%%----------------------------------------------------------------------
1062
1063handle_cast(Msg, S) ->
1064    ok = error_logger:format("~p(~p): handle_cast(~tp, ~tp)~n",
1065                             [?MODULE, self(), Msg, S]),
1066    noreply(S).
1067
1068%%----------------------------------------------------------------------
1069%% Func: handle_info/2
1070%% Returns: {noreply, State}          |
1071%%          {noreply, State, Timeout} |
1072%%          {stop, Reason, State}            (terminate/2 is called)
1073%%----------------------------------------------------------------------
1074
1075handle_info(timeout, S) ->
1076    S2 = check_size(S),
1077    noreply(S2);
1078handle_info({nodeup, Node}, S) ->
1079    Port     = S#state.trace_port,
1080    MaxQueue = S#state.trace_max_queue,
1081    case rpc:call(Node, ?MODULE, monitor_trace_port, [self(), {Port, MaxQueue}]) of
1082        {ok, _} ->
1083            S2 = listen_on_trace_port(Node, Port, S),
1084	    noreply(S2);
1085        {error, Reason} when Reason =:= already_started->
1086            ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n    ~tp~n",
1087                                     [?MODULE, self(), Node, Port, Reason]),
1088            S2 = S#state{trace_port = Port + 1},
1089            noreply(S2);
1090        {badrpc, Reason} ->
1091            ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n    ~tp~n",
1092                                     [?MODULE, self(), Node, Port, Reason]),
1093            S2 = S#state{trace_port = Port + 1},
1094            noreply(S2);
1095        {error, Reason} ->
1096            self() ! {nodeup, Node},
1097            ok = error_logger:format("~p(~p): producer retry(~p:~p):~n     ~tp~n",
1098                                     [?MODULE, self(), Node, Port, Reason]),
1099            S2 = S#state{trace_port = Port + 1},
1100            noreply(S2)
1101    end;
1102
1103handle_info({nodedown, Node}, S) ->
1104    noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]});
1105
1106handle_info({register_trace_client, Pid}, S) ->
1107    link(Pid),
1108    noreply(S);
1109
1110handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid ->
1111    {stop, Reason, S};
1112handle_info(Info = {'EXIT', Pid, Reason}, S) ->
1113   OldSubscribers = S#state.subscribers,
1114    case lists:member(Pid, OldSubscribers) of
1115        true when Reason =:= shutdown ->
1116	    try
1117		S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),
1118		noreply(S2)
1119	    catch
1120		throw:{stop, R} ->
1121		    opt_unlink(S#state.parent_pid),
1122		    {stop, R, S}
1123	    end;
1124	true ->
1125	    opt_unlink(S#state.parent_pid),
1126	    {stop, Reason, S};
1127        false ->
1128            ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n",
1129                                     [?MODULE, self(), Info, S]),
1130            noreply(S)
1131    end;
1132handle_info(Info, S) ->
1133    ok = error_logger:format("~p(~p): handle_info(~tp, ~tp)~n",
1134                             [?MODULE, self(), Info, S]),
1135    noreply(S).
1136
1137listen_on_trace_port(Node, Port, S) ->
1138    [_Name, Host] = string:lexemes(atom_to_list(Node), [$@]),
1139    case catch start_trace_client(self(), ip, {Host, Port}) of
1140        {trace_client_pid, RemotePid} ->
1141            rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]),
1142            link(RemotePid),
1143            S#state{trace_nodes = [Node | S#state.trace_nodes],
1144		    trace_port  = Port + 1};
1145        {'EXIT', Reason} when Reason =:= already_started->
1146            ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~tp~n",
1147                                     [?MODULE, self(), Node, Port, Reason]),
1148            S#state{trace_port = Port + 1};
1149        {'EXIT', Reason} ->
1150            self() ! {nodeup, Node},
1151            ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n     ~tp~n",
1152                                     [?MODULE, self(), Node, Port, Reason]),
1153            S#state{trace_port = Port + 1}
1154    end.
1155
1156%%----------------------------------------------------------------------
1157%% Func: terminate/2
1158%% Purpose: Shutdown the server
1159%% Returns: any (ignored by gen_server)
1160%%----------------------------------------------------------------------
1161
1162terminate(Reason, S) ->
1163    Fun = fun(Pid) -> exit(Pid, Reason) end,
1164    lists:foreach(Fun, S#state.subscribers).
1165
1166%%----------------------------------------------------------------------
1167%% Func: code_change/3
1168%% Purpose: Convert process state when code is changed
1169%% Returns: {ok, NewState}
1170%%----------------------------------------------------------------------
1171
1172code_change(_OldVsn, S, _Extra) ->
1173    {ok, S}.
1174
1175%%%----------------------------------------------------------------------
1176%%% Internal functions
1177%%%----------------------------------------------------------------------
1178
1179do_clear_table(S) ->
1180    OldTab = S#state.event_tab,
1181    ets:delete(OldTab),
1182    NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
1183    S#state{event_tab = NewTab}.
1184
1185do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) ->
1186    OldSubscribers = S#state.subscribers,
1187    NewSubscribers =
1188        case lists:member(Pid, OldSubscribers) of
1189            true  ->
1190                OldSubscribers;
1191            false ->
1192                link(Pid),
1193                All = ets:match_object(S#state.dict_tab, '_'),
1194                lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All),
1195                [Pid | OldSubscribers]
1196        end,
1197    do_multicast(NewSubscribers, Msg),
1198    Size = ets:info(S#state.event_tab, size),
1199    do_multicast(NewSubscribers, {more_events, Size}),
1200    ets:insert(S#state.dict_tab, {Key, Val}),
1201    S#state{subscribers = NewSubscribers};
1202do_dict_insert(Msg = {dict_insert, Key, Val}, S) ->
1203    do_multicast(S#state.subscribers, Msg),
1204    ets:insert(S#state.dict_tab, {Key, Val}),
1205    S.
1206
1207do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) ->
1208    OldSubscribers = S#state.subscribers,
1209    do_multicast(OldSubscribers, Msg),
1210    ets:delete(S#state.dict_tab, Key),
1211    case lists:member(Pid, OldSubscribers) of
1212	true  ->
1213	    unlink(Pid),
1214	    S2 = S#state{subscribers = OldSubscribers -- [Pid]},
1215	    if
1216		S2#state.auto_shutdown,
1217		S2#state.subscribers =:= [] ->
1218		    throw({stop, shutdown});
1219		true ->
1220		    S2
1221	    end;
1222	false ->
1223	    S
1224    end;
1225do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) ->
1226    S;
1227do_dict_delete(Msg = {dict_delete, Key}, S) ->
1228    do_multicast(S#state.subscribers, Msg),
1229    ets:delete(S#state.dict_tab, Key),
1230    S.
1231
1232tab_iterate(_Fun, _Tab, '$end_of_table', Acc) ->
1233    Acc;
1234tab_iterate(Fun, Tab, Key, Acc) ->
1235    Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)),
1236    tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2).
1237
1238file_open(F) ->
1239    Fd = make_ref(),
1240    case F#file.file_opt of
1241        write  -> ok = file:rename(F#file.name, F#file.name ++ ".OLD");
1242        append -> ok
1243    end,
1244    Args = [{file, F#file.name}, {name, Fd},
1245            {repair, true}, {mode, read_write}],
1246    case disk_log:open(Args) of
1247        {ok, _} ->
1248            {ok, Fd};
1249        {repaired, _, _, BadBytes} ->
1250            ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~tp~n",
1251                                     [?MODULE, BadBytes, F#file.name]),
1252            {ok, Fd};
1253        {error,Reason} ->
1254            {error,Reason}
1255    end.
1256
1257parse_file_options(F, [H | T]) ->
1258    case H of
1259        existing -> parse_file_options(F#file{event_opt = existing} , T);
1260        %%new      -> parse_file_options(F#file{event_opt = new} , T);
1261        all      -> parse_file_options(F#file{event_opt = all} , T);
1262        write    -> parse_file_options(F#file{file_opt  = write} , T);
1263        append   -> parse_file_options(F#file{file_opt  = append} , T);
1264        keep     -> parse_file_options(F#file{table_opt = keep} , T);
1265        clear    -> parse_file_options(F#file{table_opt = clear} , T);
1266        Bad      -> {error, {bad_file_option, Bad}}
1267    end;
1268parse_file_options(F, []) ->
1269    {ok, F}.
1270
1271do_multicast([Pid | Pids], Msg) ->
1272    Pid ! {et, Msg},
1273    do_multicast(Pids, Msg);
1274do_multicast([], _Msg) ->
1275    ok.
1276
1277opt_unlink(Pid) ->
1278    if
1279	Pid =:= undefined ->
1280	    ok;
1281	true ->
1282	    unlink(Pid)
1283    end.
1284
1285reply(Reply, #state{subscribers = []} = S) ->
1286    {reply, Reply, S};
1287reply(Reply, S) ->
1288    {reply, Reply, S, 500}.
1289
1290noreply(#state{subscribers = []} = S) ->
1291    {noreply, S};
1292noreply(S) ->
1293    {noreply, S, 500}.
1294
1295check_size(S) ->
1296    Size = ets:info(S#state.event_tab, size),
1297    if
1298	Size =:= S#state.event_tab_size ->
1299	    S;
1300	true ->
1301	    %% Tell the subscribers that more events are available
1302	    Msg = {more_events, Size},
1303	    do_multicast(S#state.subscribers, Msg),
1304	    S#state{event_tab_size = Size}
1305    end.
1306