1%% ``Licensed under the Apache License, Version 2.0 (the "License");
2%% you may not use this file except in compliance with the License.
3%% You may obtain a copy of the License at
4%%
5%%     http://www.apache.org/licenses/LICENSE-2.0
6%%
7%% Unless required by applicable law or agreed to in writing, software
8%% distributed under the License is distributed on an "AS IS" BASIS,
9%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10%% See the License for the specific language governing permissions and
11%% limitations under the License.
12%%
13%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
14%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
15%% AB. All Rights Reserved.''
16%%
17%%     $Id: mnesia_index.erl,v 1.1 2008/12/17 09:53:38 mikpe Exp $
18%% Purpose: Handles index functionality in mnesia
19
20-module(mnesia_index).
21-export([read/5,
22	 add_index/5,
23	 delete_index/3,
24	 del_object_index/5,
25	 clear_index/4,
26	 dirty_match_object/3,
27	 dirty_select/3,
28	 dirty_read/3,
29	 dirty_read2/3,
30
31	 db_put/2,
32	 db_get/2,
33	 db_match_erase/2,
34	 get_index_table/2,
35	 get_index_table/3,
36
37	 tab2filename/2,
38	 tab2tmp_filename/2,
39	 init_index/2,
40	 init_indecies/3,
41	 del_transient/2,
42	 del_transient/3,
43	 del_index_table/3]).
44
45-import(mnesia_lib, [verbose/2]).
46-include("mnesia.hrl").
47
48-record(index, {setorbag, pos_list}).
49
50val(Var) ->
51    case ?catch_val(Var) of
52	{'EXIT', _ReASoN_} -> mnesia_lib:other_val(Var, _ReASoN_);
53	_VaLuE_ -> _VaLuE_
54    end.
55
56%% read an object list throuh its index table
57%% we assume that table Tab has index on attribute number Pos
58
59read(Tid, Store, Tab, IxKey, Pos) ->
60    ResList = mnesia_locker:ixrlock(Tid, Store, Tab, IxKey, Pos),
61    %% Remove all tuples which don't include Ixkey, happens when Tab is a bag
62    case val({Tab, setorbag}) of
63	bag ->
64	    mnesia_lib:key_search_all(IxKey, Pos, ResList);
65	_ ->
66	    ResList
67    end.
68
69add_index(Index, Tab, Key, Obj, Old) ->
70    add_index2(Index#index.pos_list, Index#index.setorbag, Tab, Key, Obj, Old).
71
72add_index2([{Pos, Ixt} |Tail], bag, Tab, K, Obj, OldRecs) ->
73    db_put(Ixt, {element(Pos, Obj), K}),
74    add_index2(Tail, bag, Tab, K, Obj, OldRecs);
75add_index2([{Pos, Ixt} |Tail], Type, Tab, K, Obj, OldRecs) ->
76    %% Remove old tuples in index if Tab is updated
77    case OldRecs of
78	undefined ->
79	    Old = mnesia_lib:db_get(Tab, K),
80	    del_ixes(Ixt, Old, Pos, K);
81	Old ->
82	    del_ixes(Ixt, Old, Pos, K)
83    end,
84    db_put(Ixt, {element(Pos, Obj), K}),
85    add_index2(Tail, Type, Tab, K, Obj, OldRecs);
86add_index2([], _, _Tab, _K, _Obj, _) -> ok.
87
88delete_index(Index, Tab, K) ->
89    delete_index2(Index#index.pos_list, Tab, K).
90
91delete_index2([{Pos, Ixt} | Tail], Tab, K) ->
92    DelObjs = mnesia_lib:db_get(Tab, K),
93    del_ixes(Ixt, DelObjs, Pos, K),
94    delete_index2(Tail, Tab, K);
95delete_index2([], _Tab, _K) -> ok.
96
97
98del_ixes(_Ixt, [], _Pos, _L) -> ok;
99del_ixes(Ixt, [Obj | Tail], Pos, Key) ->
100    db_match_erase(Ixt, {element(Pos, Obj), Key}),
101    del_ixes(Ixt, Tail, Pos, Key).
102
103del_object_index(Index, Tab, K, Obj, Old) ->
104    del_object_index2(Index#index.pos_list, Index#index.setorbag, Tab, K, Obj, Old).
105
106del_object_index2([], _, _Tab, _K, _Obj, _Old) -> ok;
107del_object_index2([{Pos, Ixt} | Tail], SoB, Tab, K, Obj, Old) ->
108    case SoB of
109	bag ->
110	    del_object_bag(Tab, K, Obj, Pos, Ixt, Old);
111	_ -> %% If set remove the tuple in index table
112	    del_ixes(Ixt, [Obj], Pos, K)
113    end,
114    del_object_index2(Tail, SoB, Tab, K, Obj, Old).
115
116del_object_bag(Tab, Key, Obj, Pos, Ixt, undefined) ->
117    Old = mnesia_lib:db_get(Tab, Key),
118    del_object_bag(Tab, Key, Obj, Pos, Ixt, Old);
119%% If Tab type is bag we need remove index identifier if Tab
120%% contains less than 2 elements.
121del_object_bag(_Tab, Key, Obj, Pos, Ixt, Old) when length(Old) < 2 ->
122    del_ixes(Ixt, [Obj], Pos, Key);
123del_object_bag(_Tab, _Key, _Obj, _Pos, _Ixt, _Old) -> ok.
124
125clear_index(Index, Tab, K, Obj) ->
126    clear_index2(Index#index.pos_list, Tab, K, Obj).
127
128clear_index2([], _Tab, _K, _Obj) -> ok;
129clear_index2([{_Pos, Ixt} | Tail], Tab, K, Obj) ->
130    db_match_erase(Ixt, Obj),
131    clear_index2(Tail, Tab, K, Obj).
132
133%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
134
135dirty_match_object(Tab, Pat, Pos) ->
136    %% Assume that we are on the node where the replica is
137    case element(2, Pat) of
138	'_' ->
139	    IxKey = element(Pos, Pat),
140	    RealKeys = realkeys(Tab, Pos, IxKey),
141	    merge(RealKeys, Tab, Pat, []);
142	_Else ->
143	    mnesia_lib:db_match_object(Tab, Pat)
144    end.
145
146merge([{_IxKey, RealKey} | Tail], Tab, Pat, Ack) ->
147    %% Assume that we are on the node where the replica is
148    Pat2 = setelement(2, Pat, RealKey),
149    Recs = mnesia_lib:db_match_object(Tab, Pat2),
150    merge(Tail, Tab, Pat, Recs ++ Ack);
151merge([], _, _, Ack) ->
152    Ack.
153
154realkeys(Tab, Pos, IxKey) ->
155    Index = get_index_table(Tab, Pos),
156    db_get(Index, IxKey). % a list on the form [{IxKey, RealKey1} , ....
157
158dirty_select(Tab, Spec, Pos) ->
159    %% Assume that we are on the node where the replica is
160    %% Returns the records without applying the match spec
161    %% The actual filtering is handled by the caller
162    IxKey = element(Pos, Spec),
163    RealKeys = realkeys(Tab, Pos, IxKey),
164    StorageType = val({Tab, storage_type}),
165    lists:append([mnesia_lib:db_get(StorageType, Tab, Key) || Key <- RealKeys]).
166
167dirty_read(Tab, IxKey, Pos) ->
168    ResList = mnesia:dirty_rpc(Tab, ?MODULE, dirty_read2,
169			       [Tab, IxKey, Pos]),
170    case val({Tab, setorbag}) of
171	bag ->
172	    %% Remove all tuples which don't include Ixkey
173	    mnesia_lib:key_search_all(IxKey, Pos, ResList);
174	_ ->
175	    ResList
176    end.
177
178dirty_read2(Tab, IxKey, Pos) ->
179    Ix = get_index_table(Tab, Pos),
180    Keys = db_match(Ix, {IxKey, '$1'}),
181    r_keys(Keys, Tab, []).
182
183r_keys([[H]|T],Tab,Ack) ->
184    V = mnesia_lib:db_get(Tab, H),
185    r_keys(T, Tab, V ++ Ack);
186r_keys([], _, Ack) ->
187    Ack.
188
189
190%%%%%%% Creation, Init and deletion routines for index tables
191%% We can have several indexes on the same table
192%% this can be a fairly costly operation if table is *very* large
193
194tab2filename(Tab, Pos) ->
195    mnesia_lib:dir(Tab) ++ "_" ++ integer_to_list(Pos) ++ ".DAT".
196
197tab2tmp_filename(Tab, Pos) ->
198    mnesia_lib:dir(Tab) ++ "_" ++ integer_to_list(Pos) ++ ".TMP".
199
200init_index(Tab, Storage) ->
201    PosList = val({Tab, index}),
202    init_indecies(Tab, Storage, PosList).
203
204init_indecies(Tab, Storage, PosList) ->
205    case Storage of
206	unknown ->
207	    ignore;
208	disc_only_copies ->
209	    init_disc_index(Tab, PosList);
210	ram_copies ->
211	    make_ram_index(Tab, PosList);
212	disc_copies ->
213	    make_ram_index(Tab, PosList)
214    end.
215
216%% works for both ram and disc indexes
217
218del_index_table(_, unknown, _) ->
219    ignore;
220del_index_table(Tab, Storage, Pos) ->
221    delete_transient_index(Tab, Pos, Storage),
222    mnesia_lib:del({Tab, index}, Pos).
223
224del_transient(Tab, Storage) ->
225    PosList = val({Tab, index}),
226    del_transient(Tab, PosList, Storage).
227
228del_transient(_, [], _) -> done;
229del_transient(Tab, [Pos | Tail], Storage) ->
230    delete_transient_index(Tab, Pos, Storage),
231    del_transient(Tab, Tail, Storage).
232
233delete_transient_index(Tab, Pos, disc_only_copies) ->
234    Tag = {Tab, index, Pos},
235    mnesia_monitor:unsafe_close_dets(Tag),
236    file:delete(tab2filename(Tab, Pos)),
237    del_index_info(Tab, Pos), %% Uses val(..)
238    mnesia_lib:unset({Tab, {index, Pos}});
239
240delete_transient_index(Tab, Pos, _Storage) ->
241    Ixt = val({Tab, {index, Pos}}),
242    ?ets_delete_table(Ixt),
243    del_index_info(Tab, Pos),
244    mnesia_lib:unset({Tab, {index, Pos}}).
245
246%%%%% misc functions for the index create/init/delete functions above
247
248%% assuming that the file exists.
249init_disc_index(_Tab, []) ->
250    done;
251init_disc_index(Tab, [Pos | Tail]) when integer(Pos) ->
252    Fn = tab2filename(Tab, Pos),
253    IxTag = {Tab, index, Pos},
254    file:delete(Fn),
255    Args = [{file, Fn}, {keypos, 1}, {type, bag}],
256    mnesia_monitor:open_dets(IxTag, Args),
257    Storage = disc_only_copies,
258    Key = mnesia_lib:db_first(Storage, Tab),
259    Recs = mnesia_lib:db_get(Storage, Tab, Key),
260    BinSize = size(term_to_binary(Recs)),
261    KeysPerChunk = (4000 div BinSize) + 1,
262    Init = {start, KeysPerChunk},
263    mnesia_lib:db_fixtable(Storage, Tab, true),
264    ok = dets:init_table(IxTag, create_fun(Init, Tab, Pos)),
265    mnesia_lib:db_fixtable(Storage, Tab, false),
266    mnesia_lib:set({Tab, {index, Pos}}, IxTag),
267    add_index_info(Tab, val({Tab, setorbag}), {Pos, {dets, IxTag}}),
268    init_disc_index(Tab, Tail).
269
270create_fun(Cont, Tab, Pos) ->
271    fun(read) ->
272	    Data =
273		case Cont of
274		    {start, KeysPerChunk} ->
275			mnesia_lib:db_init_chunk(disc_only_copies, Tab, KeysPerChunk);
276		    '$end_of_table' ->
277			'$end_of_table';
278		    _Else ->
279			mnesia_lib:db_chunk(disc_only_copies, Cont)
280		end,
281	    case Data of
282		'$end_of_table' ->
283		    end_of_input;
284		{Recs, Next} ->
285		    IdxElems = [{element(Pos, Obj), element(2, Obj)} || Obj <- Recs],
286		    {IdxElems, create_fun(Next, Tab, Pos)}
287	    end;
288       (close) ->
289	    ok
290    end.
291
292make_ram_index(_, []) ->
293    done;
294make_ram_index(Tab, [Pos | Tail]) ->
295    add_ram_index(Tab, Pos),
296    make_ram_index(Tab, Tail).
297
298add_ram_index(Tab, Pos) when integer(Pos) ->
299    verbose("Creating index for ~w ~n", [Tab]),
300    Index = mnesia_monitor:mktab(mnesia_index, [bag, public]),
301    Insert = fun(Rec, _Acc) ->
302		     true = ?ets_insert(Index, {element(Pos, Rec), element(2, Rec)})
303	     end,
304    mnesia_lib:db_fixtable(ram_copies, Tab, true),
305    true = ets:foldl(Insert, true, Tab),
306    mnesia_lib:db_fixtable(ram_copies, Tab, false),
307    mnesia_lib:set({Tab, {index, Pos}}, Index),
308    add_index_info(Tab, val({Tab, setorbag}), {Pos, {ram, Index}});
309add_ram_index(_Tab, snmp) ->
310    ok.
311
312add_index_info(Tab, Type, IxElem) ->
313    Commit = val({Tab, commit_work}),
314    case lists:keysearch(index, 1, Commit) of
315	false ->
316	    Index = #index{setorbag = Type,
317			   pos_list = [IxElem]},
318	    %% Check later if mnesia_tm is sensative about the order
319	    mnesia_lib:set({Tab, commit_work},
320			   mnesia_lib:sort_commit([Index | Commit]));
321	{value, Old} ->
322	    %% We could check for consistency here
323	    Index = Old#index{pos_list = [IxElem | Old#index.pos_list]},
324	    NewC = lists:keyreplace(index, 1, Commit, Index),
325	    mnesia_lib:set({Tab, commit_work},
326			   mnesia_lib:sort_commit(NewC))
327    end.
328
329del_index_info(Tab, Pos) ->
330    Commit = val({Tab, commit_work}),
331    case lists:keysearch(index, 1, Commit) of
332	false ->
333	    %% Something is wrong ignore
334	    skip;
335	{value, Old} ->
336	    case lists:keydelete(Pos, 1, Old#index.pos_list) of
337		[] ->
338		    NewC = lists:keydelete(index, 1, Commit),
339		    mnesia_lib:set({Tab, commit_work},
340				   mnesia_lib:sort_commit(NewC));
341		New ->
342		    Index = Old#index{pos_list = New},
343		    NewC = lists:keyreplace(index, 1, Commit, Index),
344		    mnesia_lib:set({Tab, commit_work},
345				   mnesia_lib:sort_commit(NewC))
346	    end
347    end.
348
349db_put({ram, Ixt}, V) ->
350    true = ?ets_insert(Ixt, V);
351db_put({dets, Ixt}, V) ->
352    ok = dets:insert(Ixt, V).
353
354db_get({ram, Ixt}, K) ->
355    ?ets_lookup(Ixt, K);
356db_get({dets, Ixt}, K) ->
357    dets:lookup(Ixt, K).
358
359db_match_erase({ram, Ixt}, Pat) ->
360    true = ?ets_match_delete(Ixt, Pat);
361db_match_erase({dets, Ixt}, Pat) ->
362    ok = dets:match_delete(Ixt, Pat).
363
364db_match({ram, Ixt}, Pat) ->
365    ?ets_match(Ixt, Pat);
366db_match({dets, Ixt}, Pat) ->
367    dets:match(Ixt, Pat).
368
369get_index_table(Tab, Pos) ->
370    get_index_table(Tab,  val({Tab, storage_type}), Pos).
371
372get_index_table(Tab, ram_copies, Pos) ->
373    {ram,  val({Tab, {index, Pos}})};
374get_index_table(Tab, disc_copies, Pos) ->
375    {ram,  val({Tab, {index, Pos}})};
376get_index_table(Tab, disc_only_copies, Pos) ->
377    {dets, val({Tab, {index, Pos}})};
378get_index_table(_Tab, unknown, _Pos) ->
379    unknown.
380