1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_table). 9 10-export([ 11 create/0, create/2, ensure_local_copies/1, ensure_table_copy/2, 12 wait_for_replicated/1, wait/1, wait/2, 13 force_load/0, is_present/0, is_empty/0, needs_default_data/0, 14 check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0, 15 wait_for_replicated/0, exists/1]). 16 17%% for testing purposes 18-export([definitions/0]). 19 20-include_lib("rabbit_common/include/rabbit.hrl"). 21 22%%---------------------------------------------------------------------------- 23 24-type retry() :: boolean(). 25-type mnesia_table() :: atom(). 26 27%%---------------------------------------------------------------------------- 28%% Main interface 29%%---------------------------------------------------------------------------- 30 31-spec create() -> 'ok'. 32 33create() -> 34 lists:foreach( 35 fun ({Table, Def}) -> create(Table, Def) end, 36 definitions()), 37 ensure_secondary_indexes(), 38 ok. 39 40-spec create(mnesia_table(), list()) -> rabbit_types:ok_or_error(any()). 41 42create(TableName, TableDefinition) -> 43 TableDefinition1 = proplists:delete(match, TableDefinition), 44 rabbit_log:debug("Will create a schema database table '~s'", [TableName]), 45 case mnesia:create_table(TableName, TableDefinition1) of 46 {atomic, ok} -> ok; 47 {aborted,{already_exists, TableName}} -> ok; 48 {aborted, {already_exists, TableName, _}} -> ok; 49 {aborted, Reason} -> 50 throw({error, {table_creation_failed, TableName, TableDefinition1, Reason}}) 51 end. 52 53-spec exists(mnesia_table()) -> boolean(). 54exists(Table) -> 55 lists:member(Table, mnesia:system_info(tables)). 56 57%% Sets up secondary indexes in a blank node database. 58ensure_secondary_indexes() -> 59 ensure_secondary_index(rabbit_queue, vhost), 60 ok. 61 62ensure_secondary_index(Table, Field) -> 63 case mnesia:add_table_index(Table, Field) of 64 {atomic, ok} -> ok; 65 {aborted, {already_exists, Table, _}} -> ok 66 end. 67 68-spec ensure_table_copy(mnesia_table(), node()) -> ok | {error, any()}. 69ensure_table_copy(TableName, Node) -> 70 rabbit_log:debug("Will add a local schema database copy for table '~s'", [TableName]), 71 case mnesia:add_table_copy(TableName, Node, disc_copies) of 72 {atomic, ok} -> ok; 73 {aborted,{already_exists, TableName}} -> ok; 74 {aborted, {already_exists, TableName, _}} -> ok; 75 {aborted, Reason} -> {error, Reason} 76 end. 77 78%% This arity only exists for backwards compatibility with certain 79%% plugins. See https://github.com/rabbitmq/rabbitmq-clusterer/issues/19. 80 81-spec wait_for_replicated() -> 'ok'. 82 83wait_for_replicated() -> 84 wait_for_replicated(false). 85 86-spec wait_for_replicated(retry()) -> 'ok'. 87 88wait_for_replicated(Retry) -> 89 wait([Tab || {Tab, TabDef} <- definitions(), 90 not lists:member({local_content, true}, TabDef)], Retry). 91 92-spec wait([atom()]) -> 'ok'. 93 94wait(TableNames) -> 95 wait(TableNames, _Retry = false). 96 97wait(TableNames, Retry) -> 98 {Timeout, Retries} = retry_timeout(Retry), 99 wait(TableNames, Timeout, Retries). 100 101wait(TableNames, Timeout, Retries) -> 102 %% We might be in ctl here for offline ops, in which case we can't 103 %% get_env() for the rabbit app. 104 rabbit_log:info("Waiting for Mnesia tables for ~p ms, ~p retries left", 105 [Timeout, Retries - 1]), 106 Result = case mnesia:wait_for_tables(TableNames, Timeout) of 107 ok -> 108 ok; 109 {timeout, BadTabs} -> 110 AllNodes = rabbit_nodes:all(), 111 {error, {timeout_waiting_for_tables, AllNodes, BadTabs}}; 112 {error, Reason} -> 113 AllNodes = rabbit_nodes:all(), 114 {error, {failed_waiting_for_tables, AllNodes, Reason}} 115 end, 116 case {Retries, Result} of 117 {_, ok} -> 118 rabbit_log:info("Successfully synced tables from a peer"), 119 ok; 120 {1, {error, _} = Error} -> 121 throw(Error); 122 {_, {error, Error}} -> 123 rabbit_log:warning("Error while waiting for Mnesia tables: ~p", [Error]), 124 wait(TableNames, Timeout, Retries - 1) 125 end. 126 127retry_timeout(_Retry = false) -> 128 {retry_timeout(), 1}; 129retry_timeout(_Retry = true) -> 130 Retries = case application:get_env(rabbit, mnesia_table_loading_retry_limit) of 131 {ok, T} -> T; 132 undefined -> 10 133 end, 134 {retry_timeout(), Retries}. 135 136-spec retry_timeout() -> non_neg_integer() | infinity. 137 138retry_timeout() -> 139 case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of 140 {ok, T} -> T; 141 undefined -> 30000 142 end. 143 144-spec force_load() -> 'ok'. 145 146force_load() -> [mnesia:force_load_table(T) || T <- names()], ok. 147 148-spec is_present() -> boolean(). 149 150is_present() -> names() -- mnesia:system_info(tables) =:= []. 151 152-spec is_empty() -> boolean(). 153 154is_empty() -> is_empty(names()). 155 156-spec needs_default_data() -> boolean(). 157 158needs_default_data() -> is_empty([rabbit_user, rabbit_user_permission, 159 rabbit_vhost]). 160 161is_empty(Names) -> 162 lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, 163 Names). 164 165-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()). 166 167check_schema_integrity(Retry) -> 168 Tables = mnesia:system_info(tables), 169 case check(fun (Tab, TabDef) -> 170 case lists:member(Tab, Tables) of 171 false -> {error, {table_missing, Tab}}; 172 true -> check_attributes(Tab, TabDef) 173 end 174 end) of 175 ok -> wait(names(), Retry), 176 check(fun check_content/2); 177 Other -> Other 178 end. 179 180-spec clear_ram_only_tables() -> 'ok'. 181 182clear_ram_only_tables() -> 183 Node = node(), 184 lists:foreach( 185 fun (TabName) -> 186 case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of 187 true -> {atomic, ok} = mnesia:clear_table(TabName); 188 false -> ok 189 end 190 end, names()), 191 ok. 192 193%% The sequence in which we delete the schema and then the other 194%% tables is important: if we delete the schema first when moving to 195%% RAM mnesia will loudly complain since it doesn't make much sense to 196%% do that. But when moving to disc, we need to move the schema first. 197 198-spec ensure_local_copies('disc' | 'ram') -> 'ok'. 199 200ensure_local_copies(disc) -> 201 create_local_copy(schema, disc_copies), 202 create_local_copies(disc); 203ensure_local_copies(ram) -> 204 create_local_copies(ram), 205 create_local_copy(schema, ram_copies). 206 207%%-------------------------------------------------------------------- 208%% Internal helpers 209%%-------------------------------------------------------------------- 210 211create_local_copies(Type) -> 212 lists:foreach( 213 fun ({Tab, TabDef}) -> 214 HasDiscCopies = has_copy_type(TabDef, disc_copies), 215 HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies), 216 LocalTab = proplists:get_bool(local_content, TabDef), 217 StorageType = 218 if 219 Type =:= disc orelse LocalTab -> 220 if 221 HasDiscCopies -> disc_copies; 222 HasDiscOnlyCopies -> disc_only_copies; 223 true -> ram_copies 224 end; 225 Type =:= ram -> 226 ram_copies 227 end, 228 ok = create_local_copy(Tab, StorageType) 229 end, definitions(Type)), 230 ok. 231 232create_local_copy(Tab, Type) -> 233 StorageType = mnesia:table_info(Tab, storage_type), 234 {atomic, ok} = 235 if 236 StorageType == unknown -> 237 mnesia:add_table_copy(Tab, node(), Type); 238 StorageType /= Type -> 239 mnesia:change_table_copy_type(Tab, node(), Type); 240 true -> {atomic, ok} 241 end, 242 ok. 243 244has_copy_type(TabDef, DiscType) -> 245 lists:member(node(), proplists:get_value(DiscType, TabDef, [])). 246 247check_attributes(Tab, TabDef) -> 248 {_, ExpAttrs} = proplists:lookup(attributes, TabDef), 249 case mnesia:table_info(Tab, attributes) of 250 ExpAttrs -> ok; 251 Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} 252 end. 253 254check_content(Tab, TabDef) -> 255 {_, Match} = proplists:lookup(match, TabDef), 256 case mnesia:dirty_first(Tab) of 257 '$end_of_table' -> 258 ok; 259 Key -> 260 ObjList = mnesia:dirty_read(Tab, Key), 261 MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), 262 case ets:match_spec_run(ObjList, MatchComp) of 263 ObjList -> ok; 264 _ -> {error, {table_content_invalid, Tab, Match, ObjList}} 265 end 266 end. 267 268check(Fun) -> 269 case [Error || {Tab, TabDef} <- definitions(), 270 begin 271 {Ret, Error} = case Fun(Tab, TabDef) of 272 ok -> {false, none}; 273 {error, E} -> {true, E} 274 end, 275 Ret 276 end] of 277 [] -> ok; 278 Errors -> {error, Errors} 279 end. 280 281%%-------------------------------------------------------------------- 282%% Table definitions 283%%-------------------------------------------------------------------- 284 285names() -> [Tab || {Tab, _} <- definitions()]. 286 287%% The tables aren't supposed to be on disk on a ram node 288definitions(disc) -> 289 definitions(); 290definitions(ram) -> 291 [{Tab, [{disc_copies, []}, {ram_copies, [node()]} | 292 proplists:delete( 293 ram_copies, proplists:delete(disc_copies, TabDef))]} || 294 {Tab, TabDef} <- definitions()]. 295 296definitions() -> 297 [{rabbit_user, 298 [{record_name, internal_user}, 299 {attributes, internal_user:fields()}, 300 {disc_copies, [node()]}, 301 {match, internal_user:pattern_match_all()}]}, 302 {rabbit_user_permission, 303 [{record_name, user_permission}, 304 {attributes, record_info(fields, user_permission)}, 305 {disc_copies, [node()]}, 306 {match, #user_permission{user_vhost = #user_vhost{_='_'}, 307 permission = #permission{_='_'}, 308 _='_'}}]}, 309 {rabbit_topic_permission, 310 [{record_name, topic_permission}, 311 {attributes, record_info(fields, topic_permission)}, 312 {disc_copies, [node()]}, 313 {match, #topic_permission{topic_permission_key = #topic_permission_key{_='_'}, 314 permission = #permission{_='_'}, 315 _='_'}}]}, 316 {rabbit_vhost, 317 [ 318 {record_name, vhost}, 319 {attributes, vhost:fields()}, 320 {disc_copies, [node()]}, 321 {match, vhost:pattern_match_all()}]}, 322 {rabbit_listener, 323 [{record_name, listener}, 324 {attributes, record_info(fields, listener)}, 325 {type, bag}, 326 {match, #listener{_='_'}}]}, 327 {rabbit_durable_route, 328 [{record_name, route}, 329 {attributes, record_info(fields, route)}, 330 {disc_copies, [node()]}, 331 {match, #route{binding = binding_match(), _='_'}}]}, 332 {rabbit_semi_durable_route, 333 [{record_name, route}, 334 {attributes, record_info(fields, route)}, 335 {type, ordered_set}, 336 {match, #route{binding = binding_match(), _='_'}}]}, 337 {rabbit_route, 338 [{record_name, route}, 339 {attributes, record_info(fields, route)}, 340 {type, ordered_set}, 341 {match, #route{binding = binding_match(), _='_'}}]}, 342 {rabbit_reverse_route, 343 [{record_name, reverse_route}, 344 {attributes, record_info(fields, reverse_route)}, 345 {type, ordered_set}, 346 {match, #reverse_route{reverse_binding = reverse_binding_match(), 347 _='_'}}]}, 348 {rabbit_topic_trie_node, 349 [{record_name, topic_trie_node}, 350 {attributes, record_info(fields, topic_trie_node)}, 351 {type, ordered_set}, 352 {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, 353 {rabbit_topic_trie_edge, 354 [{record_name, topic_trie_edge}, 355 {attributes, record_info(fields, topic_trie_edge)}, 356 {type, ordered_set}, 357 {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, 358 {rabbit_topic_trie_binding, 359 [{record_name, topic_trie_binding}, 360 {attributes, record_info(fields, topic_trie_binding)}, 361 {type, ordered_set}, 362 {match, #topic_trie_binding{trie_binding = trie_binding_match(), 363 _='_'}}]}, 364 {rabbit_durable_exchange, 365 [{record_name, exchange}, 366 {attributes, record_info(fields, exchange)}, 367 {disc_copies, [node()]}, 368 {match, #exchange{name = exchange_name_match(), _='_'}}]}, 369 {rabbit_exchange, 370 [{record_name, exchange}, 371 {attributes, record_info(fields, exchange)}, 372 {match, #exchange{name = exchange_name_match(), _='_'}}]}, 373 {rabbit_exchange_serial, 374 [{record_name, exchange_serial}, 375 {attributes, record_info(fields, exchange_serial)}, 376 {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, 377 {rabbit_runtime_parameters, 378 [{record_name, runtime_parameters}, 379 {attributes, record_info(fields, runtime_parameters)}, 380 {disc_copies, [node()]}, 381 {match, #runtime_parameters{_='_'}}]}, 382 {rabbit_durable_queue, 383 [{record_name, amqqueue}, 384 {attributes, amqqueue:fields()}, 385 {disc_copies, [node()]}, 386 {match, amqqueue:pattern_match_on_name(queue_name_match())}]}, 387 {rabbit_queue, 388 [{record_name, amqqueue}, 389 {attributes, amqqueue:fields()}, 390 {match, amqqueue:pattern_match_on_name(queue_name_match())}]} 391 ] 392 ++ gm:table_definitions() 393 ++ mirrored_supervisor:table_definitions(). 394 395binding_match() -> 396 #binding{source = exchange_name_match(), 397 destination = binding_destination_match(), 398 _='_'}. 399reverse_binding_match() -> 400 #reverse_binding{destination = binding_destination_match(), 401 source = exchange_name_match(), 402 _='_'}. 403binding_destination_match() -> 404 resource_match('_'). 405trie_node_match() -> 406 #trie_node{exchange_name = exchange_name_match(), _='_'}. 407trie_edge_match() -> 408 #trie_edge{exchange_name = exchange_name_match(), _='_'}. 409trie_binding_match() -> 410 #trie_binding{exchange_name = exchange_name_match(), _='_'}. 411exchange_name_match() -> 412 resource_match(exchange). 413queue_name_match() -> 414 resource_match(queue). 415resource_match(Kind) -> 416 #resource{kind = Kind, _='_'}. 417