1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2001-2016. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(dets_server).
21
22%% Disk based linear hashing lookup dictionary. Server part.
23
24-behaviour(gen_server).
25
26%% External exports.
27-export([all/0, close/1, get_pid/1, open_file/1, open_file/2, pid2name/1,
28         users/1, verbose/1]).
29
30%% Internal.
31-export([start_link/0, start/0, stop/0]).
32
33%% gen_server callbacks
34-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
35        code_change/3]).
36
37%% record for not yet handled reqeusts to open or close files
38-record(pending, {tab, ref, pid, from, reqtype, clients}). % [{From,Args}]
39
40%% state for the dets server
41-record(state, {store, parent, pending}). % [pending()]
42
43-include("dets.hrl").
44
45-define(REGISTRY, dets_registry).  % {Table, NoUsers, TablePid}
46-define(OWNERS, dets_owners).      % {TablePid, Table}
47-define(STORE, dets).              % {User, Table} and {{links,User}, NoLinks}
48
49%%-define(DEBUGF(X,Y), io:format(X, Y)).
50-define(DEBUGF(X,Y), void).
51
52-compile({inline, [{pid2name_1,1}]}).
53
54%%%----------------------------------------------------------------------
55%%% API
56%%%----------------------------------------------------------------------
57
58%% Internal.
59start_link() ->
60    gen_server:start_link({local, ?SERVER_NAME}, dets_server, [self()], []).
61
62start() ->
63    ensure_started().
64
65stop() ->
66    case whereis(?SERVER_NAME) of
67	undefined ->
68	    stopped;
69	_Pid ->
70            gen_server:call(?SERVER_NAME, stop, infinity)
71    end.
72
73all() ->
74    call(all).
75
76close(Tab) ->
77    call({close, Tab}).
78
79get_pid(Tab) ->
80    ets:lookup_element(?REGISTRY, Tab, 3).
81
82open_file(File) ->
83    call({open, File}).
84
85open_file(Tab, OpenArgs) ->
86    call({open, Tab, OpenArgs}).
87
88pid2name(Pid) ->
89    ensure_started(),
90    pid2name_1(Pid).
91
92users(Tab) ->
93    call({users, Tab}).
94
95verbose(What) ->
96    call({set_verbose, What}).
97
98call(Message) ->
99    ensure_started(),
100    gen_server:call(?SERVER_NAME, Message, infinity).
101
102%%%----------------------------------------------------------------------
103%%% Callback functions from gen_server
104%%%----------------------------------------------------------------------
105
106%%----------------------------------------------------------------------
107%% Func: init/1
108%% Returns: {ok, State}          |
109%%          {ok, State, Timeout} |
110%%          ignore               |
111%%          {stop, Reason}
112%%----------------------------------------------------------------------
113init(Parent) ->
114    Store = init(),
115    {ok, #state{store=Store, parent=Parent, pending = []}}.
116
117%%----------------------------------------------------------------------
118%% Func: handle_call/3
119%% Returns: {reply, Reply, State}          |
120%%          {reply, Reply, State, Timeout} |
121%%          {noreply, State}               |
122%%          {noreply, State, Timeout}      |
123%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
124%%          {stop, Reason, State}            (terminate/2 is called)
125%%----------------------------------------------------------------------
126handle_call(all, _From, State) ->
127    F = fun(X, A) -> [element(1, X) | A] end,
128    {reply, ets:foldl(F, [], ?REGISTRY), State};
129handle_call({close, Tab}, From, State) ->
130    request([{{close, Tab}, From}], State);
131handle_call({open, File}, From, State) ->
132    request([{{open, File}, From}], State);
133handle_call({open, Tab, OpenArgs}, From, State) ->
134    request([{{open, Tab, OpenArgs}, From}], State);
135handle_call(stop, _From, State) ->
136    {stop, normal, stopped, State};
137handle_call({set_verbose, What}, _From, State) ->
138    set_verbose(What),
139    {reply, ok, State};
140handle_call({users, Tab}, _From, State) ->
141    Users = ets:select(State#state.store, [{{'$1', Tab}, [], ['$1']}]),
142    {reply, Users, State}.
143
144%%----------------------------------------------------------------------
145%% Func: handle_cast/2
146%% Returns: {noreply, State}          |
147%%          {noreply, State, Timeout} |
148%%          {stop, Reason, State}            (terminate/2 is called)
149%%----------------------------------------------------------------------
150handle_cast(_Msg, State) ->
151    {noreply, State}.
152
153%%----------------------------------------------------------------------
154%% Func: handle_info/2
155%% Returns: {noreply, State}          |
156%%          {noreply, State, Timeout} |
157%%          {stop, Reason, State}            (terminate/2 is called)
158%%----------------------------------------------------------------------
159handle_info({pending_reply, {Ref, Result0}}, State) ->
160    {value, #pending{tab = Tab, pid = Pid, from = {FromPid,_Tag}=From,
161                     reqtype = ReqT, clients = Clients}} =
162        lists:keysearch(Ref, #pending.ref, State#state.pending),
163    Store = State#state.store,
164    Result =
165	case {Result0, ReqT} of
166	    {ok, add_user} ->
167		do_link(Store, FromPid),
168		true = ets:insert(Store, {FromPid, Tab}),
169		ets:update_counter(?REGISTRY, Tab, 1),
170		{ok, Tab};
171	    {ok, internal_open} ->
172		link(Pid),
173		do_link(Store, FromPid),
174		true = ets:insert(Store, {FromPid, Tab}),
175                %% do_internal_open() has already done the following:
176                %% true = ets:insert(?REGISTRY, {Tab, 1, Pid}),
177                %% true = ets:insert(?OWNERS, {Pid, Tab}),
178		{ok, Tab};
179            {Reply, internal_open} ->
180                %% Clean up what do_internal_open() did:
181                true = ets:delete(?REGISTRY, Tab),
182                true = ets:delete(?OWNERS, Pid),
183                Reply;
184	    {Reply, _} -> % ok or Error
185		Reply
186	end,
187    gen_server:reply(From, Result),
188    NP = lists:keydelete(Pid, #pending.pid, State#state.pending),
189    State1 = State#state{pending = NP},
190    request(Clients, State1);
191handle_info({'EXIT', Pid, _Reason}, State) ->
192    Store = State#state.store,
193    case pid2name_1(Pid) of
194	{ok, Tab} ->
195	    %% A table was killed.
196            true = ets:delete(?REGISTRY, Tab),
197            true = ets:delete(?OWNERS, Pid),
198            Users = ets:select(State#state.store, [{{'$1', Tab}, [], ['$1']}]),
199            true = ets:match_delete(Store, {'_', Tab}),
200            lists:foreach(fun(User) -> do_unlink(Store, User) end, Users),
201            {noreply, State};
202	undefined ->
203	    %% Close all tables used by Pid.
204            F = fun({FromPid, Tab}, S) ->
205                        {_, S1} = handle_close(S, {close, Tab},
206                                               {FromPid, notag}, Tab),
207                        S1
208                end,
209            State1 = lists:foldl(F, State, ets:lookup(Store, Pid)),
210            {noreply, State1}
211    end;
212handle_info(_Message, State) ->
213    {noreply, State}.
214
215%%----------------------------------------------------------------------
216%% Func: terminate/2
217%% Purpose: Shutdown the server
218%% Returns: any (ignored by gen_server)
219%%----------------------------------------------------------------------
220terminate(_Reason, _State) ->
221    ok.
222
223%%----------------------------------------------------------------------
224%% Func: code_change/3
225%% Purpose: Convert process state when code is changed
226%% Returns: {ok, NewState}
227%%----------------------------------------------------------------------
228code_change(_OldVsn, State, _Extra) ->
229    {ok, State}.
230
231%%%----------------------------------------------------------------------
232%%% Internal functions
233%%%----------------------------------------------------------------------
234
235ensure_started() ->
236    case whereis(?SERVER_NAME) of
237	undefined ->
238	    DetsSup = {dets_sup, {dets_sup, start_link, []}, permanent,
239		      1000, supervisor, [dets_sup]},
240	    _ = supervisor:start_child(kernel_safe_sup, DetsSup),
241	    DetsServer = {?SERVER_NAME, {?MODULE, start_link, []},
242			  permanent, 2000, worker, [?MODULE]},
243            _ = supervisor:start_child(kernel_safe_sup, DetsServer),
244	    ok;
245	_ -> ok
246    end.
247
248init() ->
249    set_verbose(verbose_flag()),
250    process_flag(trap_exit, true),
251    ?REGISTRY = ets:new(?REGISTRY, [set, named_table]),
252    ?OWNERS = ets:new(?OWNERS, [set, named_table]),
253    ets:new(?STORE, [duplicate_bag]).
254
255verbose_flag() ->
256    case init:get_argument(dets) of
257	{ok, Args} ->
258	    lists:member(["verbose"], Args);
259	_ ->
260	    false
261    end.
262
263set_verbose(true) ->
264    put(verbose, yes);
265set_verbose(_) ->
266    erase(verbose).
267
268%% Inlined.
269pid2name_1(Pid) ->
270    case ets:lookup(?OWNERS, Pid) of
271        [] -> undefined;
272        [{_Pid,Tab}] -> {ok, Tab}
273    end.
274
275request([{Req, From} | L], State) ->
276    Res = case Req of
277              {close, Tab} ->
278                  handle_close(State, Req, From, Tab);
279              {open, File} ->
280                  do_internal_open(State, From, [File, get(verbose)]);
281              {open, Tab, OpenArgs} ->
282                  do_open(State, Req, From, OpenArgs, Tab)
283          end,
284    State2 = case Res of
285                 {pending, State1} ->
286                     State1;
287                 {Reply, State1} ->
288		     gen_server:reply(From, Reply),
289                     State1
290             end,
291    request(L, State2);
292request([], State) ->
293    {noreply, State}.
294
295%% -> {pending, NewState} | {Reply, NewState}
296do_open(State, Req, From, Args, Tab) ->
297    case check_pending(Tab, From, State, Req) of
298        {pending, NewState} -> {pending, NewState};
299        false ->
300            case ets:lookup(?REGISTRY, Tab) of
301                [] ->
302                    A = [Tab, Args, get(verbose)],
303                    do_internal_open(State, From, A);
304                [{Tab, _Counter, Pid}] ->
305                    pending_call(Tab, Pid, make_ref(), From, Args,
306                                 add_user, State)
307            end
308    end.
309
310%% -> {pending, NewState} | {Reply, NewState}
311do_internal_open(State, From, Args) ->
312    case supervisor:start_child(dets_sup, [self()]) of
313        {ok, Pid} ->
314            Ref = make_ref(),
315            Tab = case Args of
316                      [T, _, _] -> T;
317                      [_, _] -> Ref
318                  end,
319            %% Pretend the table is open. If someone else tries to
320            %% open the file it will always become a pending
321            %% 'add_user' request. If someone tries to use the table
322            %% there will be a delay, but that is OK.
323            true = ets:insert(?REGISTRY, {Tab, 1, Pid}),
324            true = ets:insert(?OWNERS, {Pid, Tab}),
325            pending_call(Tab, Pid, Ref, From, Args, internal_open, State);
326        Error ->
327            {Error, State}
328    end.
329
330%% -> {pending, NewState} | {Reply, NewState}
331handle_close(State, Req, {FromPid,_Tag}=From, Tab) ->
332    case check_pending(Tab, From, State, Req) of
333        {pending, NewState} -> {pending, NewState};
334        false ->
335            Store = State#state.store,
336            case ets:match_object(Store, {FromPid, Tab}) of
337                [] ->
338                    ?DEBUGF("DETS: Table ~w close attempt by non-owner~w~n",
339                            [Tab, FromPid]),
340                    {{error, not_owner}, State};
341                [_ | Keep] ->
342                    case ets:lookup(?REGISTRY, Tab) of
343                        [{Tab, 1, Pid}] ->
344                            do_unlink(Store, FromPid),
345                            true = ets:delete(?REGISTRY, Tab),
346                            true = ets:delete(?OWNERS, Pid),
347                            true = ets:match_delete(Store, {FromPid, Tab}),
348                            unlink(Pid),
349                            pending_call(Tab, Pid, make_ref(), From, [],
350                                         internal_close, State);
351                        [{Tab, _Counter, Pid}] ->
352			    do_unlink(Store, FromPid),
353			    true = ets:match_delete(Store, {FromPid, Tab}),
354                            true = ets:insert(Store, Keep),
355			    ets:update_counter(?REGISTRY, Tab, -1),
356                            pending_call(Tab, Pid, make_ref(), From, [],
357                                         remove_user, State)
358                    end
359            end
360    end.
361
362%% Links with counters
363do_link(Store, Pid) ->
364    Key = {links, Pid},
365    case ets:lookup(Store, Key) of
366	[] ->
367	    true = ets:insert(Store, {Key, 1}),
368	    link(Pid);
369	[{_, C}] ->
370	    true = ets:delete(Store, Key),
371	    true = ets:insert(Store, {Key, C+1})
372    end.
373
374do_unlink(Store, Pid) ->
375    Key = {links, Pid},
376    case ets:lookup(Store, Key) of
377	[{_, C}] when C > 1 ->
378	    true = ets:delete(Store, Key),
379	    true = ets:insert(Store, {Key, C-1});
380	_ ->
381	    true = ets:delete(Store, Key),
382	    unlink(Pid)
383
384    end.
385
386pending_call(Tab, Pid, Ref, {FromPid, _Tag}=From, Args, ReqT, State) ->
387    Server = self(),
388    F = fun() ->
389                Res = case ReqT of
390                          add_user ->
391                              dets:add_user(Pid, Tab, Args);
392                          internal_open ->
393                              dets:internal_open(Pid, Ref, Args);
394                          internal_close ->
395                              dets:internal_close(Pid);
396                          remove_user ->
397                              dets:remove_user(Pid, FromPid)
398                      end,
399                Server ! {pending_reply, {Ref, Res}}
400        end,
401    _ = spawn(F),
402    PD = #pending{tab = Tab, ref = Ref, pid = Pid, reqtype = ReqT,
403                  from = From, clients = []},
404    P = [PD | State#state.pending],
405    {pending, State#state{pending = P}}.
406
407check_pending(Tab, From, State, Req) ->
408    case lists:keysearch(Tab, #pending.tab, State#state.pending) of
409        {value, #pending{tab = Tab, clients = Clients}=P} ->
410            NP = lists:keyreplace(Tab, #pending.tab, State#state.pending,
411                                  P#pending{clients = Clients++[{Req,From}]}),
412            {pending, State#state{pending = NP}};
413        false ->
414            false
415    end.
416