1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_table).
9
10-export([
11    create/0, create/2, ensure_local_copies/1, ensure_table_copy/2,
12    wait_for_replicated/1, wait/1, wait/2,
13    force_load/0, is_present/0, is_empty/0, needs_default_data/0,
14    check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
15    wait_for_replicated/0, exists/1]).
16
17%% for testing purposes
18-export([definitions/0]).
19
20-include_lib("rabbit_common/include/rabbit.hrl").
21
22%%----------------------------------------------------------------------------
23
24-type retry() :: boolean().
25-type mnesia_table() :: atom().
26
27%%----------------------------------------------------------------------------
28%% Main interface
29%%----------------------------------------------------------------------------
30
31-spec create() -> 'ok'.
32
33create() ->
34    lists:foreach(
35        fun ({Table, Def}) -> create(Table, Def) end,
36        definitions()),
37    ensure_secondary_indexes(),
38    ok.
39
40-spec create(mnesia_table(), list()) -> rabbit_types:ok_or_error(any()).
41
42create(TableName, TableDefinition) ->
43    TableDefinition1 = proplists:delete(match, TableDefinition),
44    rabbit_log:debug("Will create a schema database table '~s'", [TableName]),
45    case mnesia:create_table(TableName, TableDefinition1) of
46        {atomic, ok}                              -> ok;
47        {aborted,{already_exists, TableName}}     -> ok;
48        {aborted, {already_exists, TableName, _}} -> ok;
49        {aborted, Reason}                         ->
50            throw({error, {table_creation_failed, TableName, TableDefinition1, Reason}})
51    end.
52
53-spec exists(mnesia_table()) -> boolean().
54exists(Table) ->
55    lists:member(Table, mnesia:system_info(tables)).
56
57%% Sets up secondary indexes in a blank node database.
58ensure_secondary_indexes() ->
59  ensure_secondary_index(rabbit_queue, vhost),
60  ok.
61
62ensure_secondary_index(Table, Field) ->
63  case mnesia:add_table_index(Table, Field) of
64    {atomic, ok}                          -> ok;
65    {aborted, {already_exists, Table, _}} -> ok
66  end.
67
68-spec ensure_table_copy(mnesia_table(), node()) -> ok | {error, any()}.
69ensure_table_copy(TableName, Node) ->
70    rabbit_log:debug("Will add a local schema database copy for table '~s'", [TableName]),
71    case mnesia:add_table_copy(TableName, Node, disc_copies) of
72        {atomic, ok}                              -> ok;
73        {aborted,{already_exists, TableName}}     -> ok;
74        {aborted, {already_exists, TableName, _}} -> ok;
75        {aborted, Reason}                         -> {error, Reason}
76    end.
77
78%% This arity only exists for backwards compatibility with certain
79%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19.
80
81-spec wait_for_replicated() -> 'ok'.
82
83wait_for_replicated() ->
84    wait_for_replicated(false).
85
86-spec wait_for_replicated(retry()) -> 'ok'.
87
88wait_for_replicated(Retry) ->
89    wait([Tab || {Tab, TabDef} <- definitions(),
90                 not lists:member({local_content, true}, TabDef)], Retry).
91
92-spec wait([atom()]) -> 'ok'.
93
94wait(TableNames) ->
95    wait(TableNames, _Retry = false).
96
97wait(TableNames, Retry) ->
98    {Timeout, Retries} = retry_timeout(Retry),
99    wait(TableNames, Timeout, Retries).
100
101wait(TableNames, Timeout, Retries) ->
102    %% We might be in ctl here for offline ops, in which case we can't
103    %% get_env() for the rabbit app.
104    rabbit_log:info("Waiting for Mnesia tables for ~p ms, ~p retries left",
105                    [Timeout, Retries - 1]),
106    Result = case mnesia:wait_for_tables(TableNames, Timeout) of
107                 ok ->
108                     ok;
109                 {timeout, BadTabs} ->
110                     AllNodes = rabbit_nodes:all(),
111                     {error, {timeout_waiting_for_tables, AllNodes, BadTabs}};
112                 {error, Reason} ->
113                     AllNodes = rabbit_nodes:all(),
114                     {error, {failed_waiting_for_tables, AllNodes, Reason}}
115             end,
116    case {Retries, Result} of
117        {_, ok} ->
118            rabbit_log:info("Successfully synced tables from a peer"),
119            ok;
120        {1, {error, _} = Error} ->
121            throw(Error);
122        {_, {error, Error}} ->
123            rabbit_log:warning("Error while waiting for Mnesia tables: ~p", [Error]),
124            wait(TableNames, Timeout, Retries - 1)
125    end.
126
127retry_timeout(_Retry = false) ->
128    {retry_timeout(), 1};
129retry_timeout(_Retry = true) ->
130    Retries = case application:get_env(rabbit, mnesia_table_loading_retry_limit) of
131                  {ok, T}   -> T;
132                  undefined -> 10
133              end,
134    {retry_timeout(), Retries}.
135
136-spec retry_timeout() -> non_neg_integer() | infinity.
137
138retry_timeout() ->
139    case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of
140        {ok, T}   -> T;
141        undefined -> 30000
142    end.
143
144-spec force_load() -> 'ok'.
145
146force_load() -> [mnesia:force_load_table(T) || T <- names()], ok.
147
148-spec is_present() -> boolean().
149
150is_present() -> names() -- mnesia:system_info(tables) =:= [].
151
152-spec is_empty() -> boolean().
153
154is_empty()           -> is_empty(names()).
155
156-spec needs_default_data() -> boolean().
157
158needs_default_data() -> is_empty([rabbit_user, rabbit_user_permission,
159                                  rabbit_vhost]).
160
161is_empty(Names) ->
162    lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
163              Names).
164
165-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()).
166
167check_schema_integrity(Retry) ->
168    Tables = mnesia:system_info(tables),
169    case check(fun (Tab, TabDef) ->
170                       case lists:member(Tab, Tables) of
171                           false -> {error, {table_missing, Tab}};
172                           true  -> check_attributes(Tab, TabDef)
173                       end
174               end) of
175        ok     -> wait(names(), Retry),
176                  check(fun check_content/2);
177        Other  -> Other
178    end.
179
180-spec clear_ram_only_tables() -> 'ok'.
181
182clear_ram_only_tables() ->
183    Node = node(),
184    lists:foreach(
185      fun (TabName) ->
186              case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
187                  true  -> {atomic, ok} = mnesia:clear_table(TabName);
188                  false -> ok
189              end
190      end, names()),
191    ok.
192
193%% The sequence in which we delete the schema and then the other
194%% tables is important: if we delete the schema first when moving to
195%% RAM mnesia will loudly complain since it doesn't make much sense to
196%% do that. But when moving to disc, we need to move the schema first.
197
198-spec ensure_local_copies('disc' | 'ram') -> 'ok'.
199
200ensure_local_copies(disc) ->
201    create_local_copy(schema, disc_copies),
202    create_local_copies(disc);
203ensure_local_copies(ram)  ->
204    create_local_copies(ram),
205    create_local_copy(schema, ram_copies).
206
207%%--------------------------------------------------------------------
208%% Internal helpers
209%%--------------------------------------------------------------------
210
211create_local_copies(Type) ->
212    lists:foreach(
213      fun ({Tab, TabDef}) ->
214              HasDiscCopies     = has_copy_type(TabDef, disc_copies),
215              HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies),
216              LocalTab          = proplists:get_bool(local_content, TabDef),
217              StorageType =
218                  if
219                      Type =:= disc orelse LocalTab ->
220                          if
221                              HasDiscCopies     -> disc_copies;
222                              HasDiscOnlyCopies -> disc_only_copies;
223                              true              -> ram_copies
224                          end;
225                      Type =:= ram ->
226                          ram_copies
227                  end,
228              ok = create_local_copy(Tab, StorageType)
229      end, definitions(Type)),
230    ok.
231
232create_local_copy(Tab, Type) ->
233    StorageType = mnesia:table_info(Tab, storage_type),
234    {atomic, ok} =
235        if
236            StorageType == unknown ->
237                mnesia:add_table_copy(Tab, node(), Type);
238            StorageType /= Type ->
239                mnesia:change_table_copy_type(Tab, node(), Type);
240            true -> {atomic, ok}
241        end,
242    ok.
243
244has_copy_type(TabDef, DiscType) ->
245    lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
246
247check_attributes(Tab, TabDef) ->
248    {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
249    case mnesia:table_info(Tab, attributes) of
250        ExpAttrs -> ok;
251        Attrs    -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}}
252    end.
253
254check_content(Tab, TabDef) ->
255    {_, Match} = proplists:lookup(match, TabDef),
256    case mnesia:dirty_first(Tab) of
257        '$end_of_table' ->
258            ok;
259        Key ->
260            ObjList = mnesia:dirty_read(Tab, Key),
261            MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
262            case ets:match_spec_run(ObjList, MatchComp) of
263                ObjList -> ok;
264                _       -> {error, {table_content_invalid, Tab, Match, ObjList}}
265            end
266    end.
267
268check(Fun) ->
269    case [Error || {Tab, TabDef} <- definitions(),
270                   begin
271                       {Ret, Error} = case Fun(Tab, TabDef) of
272                           ok         -> {false, none};
273                           {error, E} -> {true, E}
274                       end,
275                       Ret
276                   end] of
277        []     -> ok;
278        Errors -> {error, Errors}
279    end.
280
281%%--------------------------------------------------------------------
282%% Table definitions
283%%--------------------------------------------------------------------
284
285names() -> [Tab || {Tab, _} <- definitions()].
286
287%% The tables aren't supposed to be on disk on a ram node
288definitions(disc) ->
289    definitions();
290definitions(ram) ->
291    [{Tab, [{disc_copies, []}, {ram_copies, [node()]} |
292            proplists:delete(
293              ram_copies, proplists:delete(disc_copies, TabDef))]} ||
294        {Tab, TabDef} <- definitions()].
295
296definitions() ->
297    [{rabbit_user,
298      [{record_name, internal_user},
299       {attributes, internal_user:fields()},
300       {disc_copies, [node()]},
301       {match, internal_user:pattern_match_all()}]},
302     {rabbit_user_permission,
303      [{record_name, user_permission},
304       {attributes, record_info(fields, user_permission)},
305       {disc_copies, [node()]},
306       {match, #user_permission{user_vhost = #user_vhost{_='_'},
307                                permission = #permission{_='_'},
308                                _='_'}}]},
309     {rabbit_topic_permission,
310      [{record_name, topic_permission},
311       {attributes, record_info(fields, topic_permission)},
312       {disc_copies, [node()]},
313       {match, #topic_permission{topic_permission_key = #topic_permission_key{_='_'},
314                                 permission = #permission{_='_'},
315                                 _='_'}}]},
316     {rabbit_vhost,
317      [
318       {record_name, vhost},
319       {attributes, vhost:fields()},
320       {disc_copies, [node()]},
321       {match, vhost:pattern_match_all()}]},
322     {rabbit_listener,
323      [{record_name, listener},
324       {attributes, record_info(fields, listener)},
325       {type, bag},
326       {match, #listener{_='_'}}]},
327     {rabbit_durable_route,
328      [{record_name, route},
329       {attributes, record_info(fields, route)},
330       {disc_copies, [node()]},
331       {match, #route{binding = binding_match(), _='_'}}]},
332     {rabbit_semi_durable_route,
333      [{record_name, route},
334       {attributes, record_info(fields, route)},
335       {type, ordered_set},
336       {match, #route{binding = binding_match(), _='_'}}]},
337     {rabbit_route,
338      [{record_name, route},
339       {attributes, record_info(fields, route)},
340       {type, ordered_set},
341       {match, #route{binding = binding_match(), _='_'}}]},
342     {rabbit_reverse_route,
343      [{record_name, reverse_route},
344       {attributes, record_info(fields, reverse_route)},
345       {type, ordered_set},
346       {match, #reverse_route{reverse_binding = reverse_binding_match(),
347                              _='_'}}]},
348     {rabbit_topic_trie_node,
349      [{record_name, topic_trie_node},
350       {attributes, record_info(fields, topic_trie_node)},
351       {type, ordered_set},
352       {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
353     {rabbit_topic_trie_edge,
354      [{record_name, topic_trie_edge},
355       {attributes, record_info(fields, topic_trie_edge)},
356       {type, ordered_set},
357       {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
358     {rabbit_topic_trie_binding,
359      [{record_name, topic_trie_binding},
360       {attributes, record_info(fields, topic_trie_binding)},
361       {type, ordered_set},
362       {match, #topic_trie_binding{trie_binding = trie_binding_match(),
363                                   _='_'}}]},
364     {rabbit_durable_exchange,
365      [{record_name, exchange},
366       {attributes, record_info(fields, exchange)},
367       {disc_copies, [node()]},
368       {match, #exchange{name = exchange_name_match(), _='_'}}]},
369     {rabbit_exchange,
370      [{record_name, exchange},
371       {attributes, record_info(fields, exchange)},
372       {match, #exchange{name = exchange_name_match(), _='_'}}]},
373     {rabbit_exchange_serial,
374      [{record_name, exchange_serial},
375       {attributes, record_info(fields, exchange_serial)},
376       {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
377     {rabbit_runtime_parameters,
378      [{record_name, runtime_parameters},
379       {attributes, record_info(fields, runtime_parameters)},
380       {disc_copies, [node()]},
381       {match, #runtime_parameters{_='_'}}]},
382     {rabbit_durable_queue,
383      [{record_name, amqqueue},
384       {attributes, amqqueue:fields()},
385       {disc_copies, [node()]},
386       {match, amqqueue:pattern_match_on_name(queue_name_match())}]},
387     {rabbit_queue,
388      [{record_name, amqqueue},
389       {attributes, amqqueue:fields()},
390       {match, amqqueue:pattern_match_on_name(queue_name_match())}]}
391    ]
392        ++ gm:table_definitions()
393        ++ mirrored_supervisor:table_definitions().
394
395binding_match() ->
396    #binding{source = exchange_name_match(),
397             destination = binding_destination_match(),
398             _='_'}.
399reverse_binding_match() ->
400    #reverse_binding{destination = binding_destination_match(),
401                     source = exchange_name_match(),
402                     _='_'}.
403binding_destination_match() ->
404    resource_match('_').
405trie_node_match() ->
406    #trie_node{exchange_name = exchange_name_match(), _='_'}.
407trie_edge_match() ->
408    #trie_edge{exchange_name = exchange_name_match(), _='_'}.
409trie_binding_match() ->
410    #trie_binding{exchange_name = exchange_name_match(), _='_'}.
411exchange_name_match() ->
412    resource_match(exchange).
413queue_name_match() ->
414    resource_match(queue).
415resource_match(Kind) ->
416    #resource{kind = Kind, _='_'}.
417