1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_msg_store). 9 10-behaviour(gen_server2). 11 12-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1, 13 client_init/4, client_terminate/1, client_delete_and_terminate/1, 14 client_ref/1, close_all_indicated/1, 15 write/3, write_flow/3, read/2, contains/2, remove/2]). 16 17-export([set_maximum_since_use/2, combine_files/3, 18 delete_file/2]). %% internal 19 20-export([scan_file_for_valid_messages/1]). %% salvage tool 21 22-export([transform_dir/3, force_recovery/2]). %% upgrade 23 24-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 25 code_change/3, prioritise_call/4, prioritise_cast/3, 26 prioritise_info/3, format_message_queue/2]). 27 28%%---------------------------------------------------------------------------- 29 30-include_lib("rabbit_common/include/rabbit_msg_store.hrl"). 31 32-define(SYNC_INTERVAL, 25). %% milliseconds 33-define(CLEAN_FILENAME, "clean.dot"). 34-define(FILE_SUMMARY_FILENAME, "file_summary.ets"). 35-define(TRANSFORM_TMP, "transform_tmp"). 36 37-define(BINARY_MODE, [raw, binary]). 38-define(READ_MODE, [read]). 39-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). 40-define(WRITE_MODE, [write]). 41 42-define(FILE_EXTENSION, ".rdq"). 43-define(FILE_EXTENSION_TMP, ".rdt"). 44 45-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB 46 47 %% i.e. two pairs, so GC does not go idle when busy 48-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4). 49 50%%---------------------------------------------------------------------------- 51 52-record(msstate, 53 { 54 %% store directory 55 dir, 56 %% the module for index ops, 57 %% rabbit_msg_store_ets_index by default 58 index_module, 59 %% where are messages? 60 index_state, 61 %% current file name as number 62 current_file, 63 %% current file handle since the last fsync? 64 current_file_handle, 65 %% file handle cache 66 file_handle_cache, 67 %% TRef for our interval timer 68 sync_timer_ref, 69 %% sum of valid data in all files 70 sum_valid_data, 71 %% sum of file sizes 72 sum_file_size, 73 %% things to do once GC completes 74 pending_gc_completion, 75 %% pid of our GC 76 gc_pid, 77 %% tid of the shared file handles table 78 file_handles_ets, 79 %% tid of the file summary table 80 file_summary_ets, 81 %% tid of current file cache table 82 cur_file_cache_ets, 83 %% tid of writes/removes in flight 84 flying_ets, 85 %% set of dying clients 86 dying_clients, 87 %% map of references of all registered clients 88 %% to callbacks 89 clients, 90 %% boolean: did we recover state? 91 successfully_recovered, 92 %% how big are our files allowed to get? 93 file_size_limit, 94 %% client ref to synced messages mapping 95 cref_to_msg_ids, 96 %% See CREDIT_DISC_BOUND in rabbit.hrl 97 credit_disc_bound 98 }). 99 100-record(client_msstate, 101 { server, 102 client_ref, 103 file_handle_cache, 104 index_state, 105 index_module, 106 dir, 107 gc_pid, 108 file_handles_ets, 109 file_summary_ets, 110 cur_file_cache_ets, 111 flying_ets, 112 credit_disc_bound 113 }). 114 115-record(file_summary, 116 {file, valid_total_size, left, right, file_size, locked, readers}). 117 118-record(gc_state, 119 { dir, 120 index_module, 121 index_state, 122 file_summary_ets, 123 file_handles_ets, 124 msg_store 125 }). 126 127-record(dying_client, 128 { client_ref, 129 file, 130 offset 131 }). 132 133%%---------------------------------------------------------------------------- 134 135-export_type([gc_state/0, file_num/0]). 136 137-type gc_state() :: #gc_state { dir :: file:filename(), 138 index_module :: atom(), 139 index_state :: any(), 140 file_summary_ets :: ets:tid(), 141 file_handles_ets :: ets:tid(), 142 msg_store :: server() 143 }. 144 145-type server() :: pid() | atom(). 146-type client_ref() :: binary(). 147-type file_num() :: non_neg_integer(). 148-type client_msstate() :: #client_msstate { 149 server :: server(), 150 client_ref :: client_ref(), 151 file_handle_cache :: map(), 152 index_state :: any(), 153 index_module :: atom(), 154 dir :: file:filename(), 155 gc_pid :: pid(), 156 file_handles_ets :: ets:tid(), 157 file_summary_ets :: ets:tid(), 158 cur_file_cache_ets :: ets:tid(), 159 flying_ets :: ets:tid(), 160 credit_disc_bound :: {pos_integer(), pos_integer()}}. 161-type msg_ref_delta_gen(A) :: 162 fun ((A) -> 'finished' | 163 {rabbit_types:msg_id(), non_neg_integer(), A}). 164-type maybe_msg_id_fun() :: 165 'undefined' | fun ((gb_sets:set(), 'written' | 'ignored') -> any()). 166-type maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok'). 167-type deletion_thunk() :: fun (() -> boolean()). 168 169%%---------------------------------------------------------------------------- 170 171%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION 172%% It is not recommended to set this to < 0.5 173-define(GARBAGE_FRACTION, 0.5). 174 175%% Message store is responsible for storing messages 176%% on disk and loading them back. The store handles both 177%% persistent messages and transient ones (when a node 178%% is under RAM pressure and needs to page messages out 179%% to disk). The store is responsible for locating messages 180%% on disk and maintaining an index. 181%% 182%% There are two message stores per node: one for transient 183%% and one for persistent messages. 184%% 185%% Queue processes interact with the stores via clients. 186%% 187%% The components: 188%% 189%% Index: this is a mapping from MsgId to #msg_location{}. 190%% By default, it's in ETS, but other implementations can 191%% be used. 192%% FileSummary: this maps File to #file_summary{} and is stored 193%% in ETS. 194%% 195%% The basic idea is that messages are appended to the current file up 196%% until that file becomes too big (> file_size_limit). At that point, 197%% the file is closed and a new file is created on the _right_ of the 198%% old file which is used for new messages. Files are named 199%% numerically ascending, thus the file with the lowest name is the 200%% eldest file. 201%% 202%% We need to keep track of which messages are in which files (this is 203%% the index); how much useful data is in each file and which files 204%% are on the left and right of each other. This is the purpose of the 205%% file summary ETS table. 206%% 207%% As messages are removed from files, holes appear in these 208%% files. The field ValidTotalSize contains the total amount of useful 209%% data left in the file. This is needed for garbage collection. 210%% 211%% When we discover that a file is now empty, we delete it. When we 212%% discover that it can be combined with the useful data in either its 213%% left or right neighbour, and overall, across all the files, we have 214%% ((the amount of garbage) / (the sum of all file sizes)) > 215%% ?GARBAGE_FRACTION, we start a garbage collection run concurrently, 216%% which will compact the two files together. This keeps disk 217%% utilisation high and aids performance. We deliberately do this 218%% lazily in order to prevent doing GC on files which are soon to be 219%% emptied (and hence deleted). 220%% 221%% Given the compaction between two files, the left file (i.e. elder 222%% file) is considered the ultimate destination for the good data in 223%% the right file. If necessary, the good data in the left file which 224%% is fragmented throughout the file is written out to a temporary 225%% file, then read back in to form a contiguous chunk of good data at 226%% the start of the left file. Thus the left file is garbage collected 227%% and compacted. Then the good data from the right file is copied 228%% onto the end of the left file. Index and file summary tables are 229%% updated. 230%% 231%% On non-clean startup, we scan the files we discover, dealing with 232%% the possibilities of a crash having occurred during a compaction 233%% (this consists of tidyup - the compaction is deliberately designed 234%% such that data is duplicated on disk rather than risking it being 235%% lost), and rebuild the file summary and index ETS table. 236%% 237%% So, with this design, messages move to the left. Eventually, they 238%% should end up in a contiguous block on the left and are then never 239%% rewritten. But this isn't quite the case. If in a file there is one 240%% message that is being ignored, for some reason, and messages in the 241%% file to the right and in the current block are being read all the 242%% time then it will repeatedly be the case that the good data from 243%% both files can be combined and will be written out to a new 244%% file. Whenever this happens, our shunned message will be rewritten. 245%% 246%% So, provided that we combine messages in the right order, 247%% (i.e. left file, bottom to top, right file, bottom to top), 248%% eventually our shunned message will end up at the bottom of the 249%% left file. The compaction/combining algorithm is smart enough to 250%% read in good data from the left file that is scattered throughout 251%% (i.e. C and D in the below diagram), then truncate the file to just 252%% above B (i.e. truncate to the limit of the good contiguous region 253%% at the start of the file), then write C and D on top and then write 254%% E, F and G from the right file on top. Thus contiguous blocks of 255%% good data at the bottom of files are not rewritten. 256%% 257%% +-------+ +-------+ +-------+ 258%% | X | | G | | G | 259%% +-------+ +-------+ +-------+ 260%% | D | | X | | F | 261%% +-------+ +-------+ +-------+ 262%% | X | | X | | E | 263%% +-------+ +-------+ +-------+ 264%% | C | | F | ===> | D | 265%% +-------+ +-------+ +-------+ 266%% | X | | X | | C | 267%% +-------+ +-------+ +-------+ 268%% | B | | X | | B | 269%% +-------+ +-------+ +-------+ 270%% | A | | E | | A | 271%% +-------+ +-------+ +-------+ 272%% left right left 273%% 274%% From this reasoning, we do have a bound on the number of times the 275%% message is rewritten. From when it is inserted, there can be no 276%% files inserted between it and the head of the queue, and the worst 277%% case is that every time it is rewritten, it moves one position lower 278%% in the file (for it to stay at the same position requires that 279%% there are no holes beneath it, which means truncate would be used 280%% and so it would not be rewritten at all). Thus this seems to 281%% suggest the limit is the number of messages ahead of it in the 282%% queue, though it's likely that that's pessimistic, given the 283%% requirements for compaction/combination of files. 284%% 285%% The other property that we have is the bound on the lowest 286%% utilisation, which should be 50% - worst case is that all files are 287%% fractionally over half full and can't be combined (equivalent is 288%% alternating full files and files with only one tiny message in 289%% them). 290%% 291%% Messages are reference-counted. When a message with the same msg id 292%% is written several times we only store it once, and only remove it 293%% from the store when it has been removed the same number of times. 294%% 295%% The reference counts do not persist. Therefore the initialisation 296%% function must be provided with a generator that produces ref count 297%% deltas for all recovered messages. This is only used on startup 298%% when the shutdown was non-clean. 299%% 300%% Read messages with a reference count greater than one are entered 301%% into a message cache. The purpose of the cache is not especially 302%% performance, though it can help there too, but prevention of memory 303%% explosion. It ensures that as messages with a high reference count 304%% are read from several processes they are read back as the same 305%% binary object rather than multiples of identical binary 306%% objects. 307%% 308%% Reads can be performed directly by clients without calling to the 309%% server. This is safe because multiple file handles can be used to 310%% read files. However, locking is used by the concurrent GC to make 311%% sure that reads are not attempted from files which are in the 312%% process of being garbage collected. 313%% 314%% When a message is removed, its reference count is decremented. Even 315%% if the reference count becomes 0, its entry is not removed. This is 316%% because in the event of the same message being sent to several 317%% different queues, there is the possibility of one queue writing and 318%% removing the message before other queues write it at all. Thus 319%% accommodating 0-reference counts allows us to avoid unnecessary 320%% writes here. Of course, there are complications: the file to which 321%% the message has already been written could be locked pending 322%% deletion or GC, which means we have to rewrite the message as the 323%% original copy will now be lost. 324%% 325%% The server automatically defers reads, removes and contains calls 326%% that occur which refer to files which are currently being 327%% GC'd. Contains calls are only deferred in order to ensure they do 328%% not overtake removes. 329%% 330%% The current file to which messages are being written has a 331%% write-back cache. This is written to immediately by clients and can 332%% be read from by clients too. This means that there are only ever 333%% writes made to the current file, thus eliminating delays due to 334%% flushing write buffers in order to be able to safely read from the 335%% current file. The one exception to this is that on start up, the 336%% cache is not populated with msgs found in the current file, and 337%% thus in this case only, reads may have to come from the file 338%% itself. The effect of this is that even if the msg_store process is 339%% heavily overloaded, clients can still write and read messages with 340%% very low latency and not block at all. 341%% 342%% Clients of the msg_store are required to register before using the 343%% msg_store. This provides them with the necessary client-side state 344%% to allow them to directly access the various caches and files. When 345%% they terminate, they should deregister. They can do this by calling 346%% either client_terminate/1 or client_delete_and_terminate/1. The 347%% differences are: (a) client_terminate is synchronous. As a result, 348%% if the msg_store is badly overloaded and has lots of in-flight 349%% writes and removes to process, this will take some time to 350%% return. However, once it does return, you can be sure that all the 351%% actions you've issued to the msg_store have been processed. (b) Not 352%% only is client_delete_and_terminate/1 asynchronous, but it also 353%% permits writes and subsequent removes from the current 354%% (terminating) client which are still in flight to be safely 355%% ignored. Thus from the point of view of the msg_store itself, and 356%% all from the same client: 357%% 358%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N 359%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 --> 360%% 361%% The client obviously sent T after all the other messages (up to 362%% W4), but because the msg_store prioritises messages, the T can be 363%% promoted and thus received early. 364%% 365%% Thus at the point of the msg_store receiving T, we have messages 1 366%% and 2 with a refcount of 1. After T, W3 will be ignored because 367%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be 368%% ignored because the messages that they refer to were already known 369%% to the msg_store prior to T. However, it can be a little more 370%% complex: after the first R2, the refcount of msg 2 is 0. At that 371%% point, if a GC occurs or file deletion, msg 2 could vanish, which 372%% would then mean that the subsequent W2 and R2 are then ignored. 373%% 374%% The use case then for client_delete_and_terminate/1 is if the 375%% client wishes to remove everything it's written to the msg_store: 376%% it issues removes for all messages it's written and not removed, 377%% and then calls client_delete_and_terminate/1. At that point, any 378%% in-flight writes (and subsequent removes) can be ignored, but 379%% removes and writes for messages the msg_store already knows about 380%% will continue to be processed normally (which will normally just 381%% involve modifying the reference count, which is fast). Thus we save 382%% disk bandwidth for writes which are going to be immediately removed 383%% again by the the terminating client. 384%% 385%% We use a separate set to keep track of the dying clients in order 386%% to keep that set, which is inspected on every write and remove, as 387%% small as possible. Inspecting the set of all clients would degrade 388%% performance with many healthy clients and few, if any, dying 389%% clients, which is the typical case. 390%% 391%% Client termination messages are stored in a separate ets index to 392%% avoid filling primary message store index and message files with 393%% client termination messages. 394%% 395%% When the msg_store has a backlog (i.e. it has unprocessed messages 396%% in its mailbox / gen_server priority queue), a further optimisation 397%% opportunity arises: we can eliminate pairs of 'write' and 'remove' 398%% from the same client for the same message. A typical occurrence of 399%% these is when an empty durable queue delivers persistent messages 400%% to ack'ing consumers. The queue will asynchronously ask the 401%% msg_store to 'write' such messages, and when they are acknowledged 402%% it will issue a 'remove'. That 'remove' may be issued before the 403%% msg_store has processed the 'write'. There is then no point going 404%% ahead with the processing of that 'write'. 405%% 406%% To detect this situation a 'flying_ets' table is shared between the 407%% clients and the server. The table is keyed on the combination of 408%% client (reference) and msg id, and the value represents an 409%% integration of all the writes and removes currently "in flight" for 410%% that message between the client and server - '+1' means all the 411%% writes/removes add up to a single 'write', '-1' to a 'remove', and 412%% '0' to nothing. (NB: the integration can never add up to more than 413%% one 'write' or 'read' since clients must not write/remove a message 414%% more than once without first removing/writing it). 415%% 416%% Maintaining this table poses two challenges: 1) both the clients 417%% and the server access and update the table, which causes 418%% concurrency issues, 2) we must ensure that entries do not stay in 419%% the table forever, since that would constitute a memory leak. We 420%% address the former by carefully modelling all operations as 421%% sequences of atomic actions that produce valid results in all 422%% possible interleavings. We address the latter by deleting table 423%% entries whenever the server finds a 0-valued entry during the 424%% processing of a write/remove. 0 is essentially equivalent to "no 425%% entry". If, OTOH, the value is non-zero we know there is at least 426%% one other 'write' or 'remove' in flight, so we get an opportunity 427%% later to delete the table entry when processing these. 428%% 429%% There are two further complications. We need to ensure that 1) 430%% eliminated writes still get confirmed, and 2) the write-back cache 431%% doesn't grow unbounded. These are quite straightforward to 432%% address. See the comments in the code. 433%% 434%% For notes on Clean Shutdown and startup, see documentation in 435%% rabbit_variable_queue. 436 437%%---------------------------------------------------------------------------- 438%% public API 439%%---------------------------------------------------------------------------- 440 441-spec start_link 442 (atom(), file:filename(), [binary()] | 'undefined', 443 {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error(). 444 445start_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) -> 446 gen_server2:start_link(?MODULE, 447 [Type, Dir, ClientRefs, StartupFunState], 448 [{timeout, infinity}]). 449 450start_global_store_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) -> 451 gen_server2:start_link({local, Type}, ?MODULE, 452 [Type, Dir, ClientRefs, StartupFunState], 453 [{timeout, infinity}]). 454 455-spec successfully_recovered_state(server()) -> boolean(). 456 457successfully_recovered_state(Server) -> 458 gen_server2:call(Server, successfully_recovered_state, infinity). 459 460-spec client_init(server(), client_ref(), maybe_msg_id_fun(), 461 maybe_close_fds_fun()) -> client_msstate(). 462 463client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) -> 464 {IState, IModule, Dir, GCPid, 465 FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = 466 gen_server2:call( 467 Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun}, 468 infinity), 469 CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, 470 ?CREDIT_DISC_BOUND), 471 #client_msstate { server = Server, 472 client_ref = Ref, 473 file_handle_cache = #{}, 474 index_state = IState, 475 index_module = IModule, 476 dir = Dir, 477 gc_pid = GCPid, 478 file_handles_ets = FileHandlesEts, 479 file_summary_ets = FileSummaryEts, 480 cur_file_cache_ets = CurFileCacheEts, 481 flying_ets = FlyingEts, 482 credit_disc_bound = CreditDiscBound }. 483 484-spec client_terminate(client_msstate()) -> 'ok'. 485 486client_terminate(CState = #client_msstate { client_ref = Ref }) -> 487 close_all_handles(CState), 488 ok = server_call(CState, {client_terminate, Ref}). 489 490-spec client_delete_and_terminate(client_msstate()) -> 'ok'. 491 492client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> 493 close_all_handles(CState), 494 ok = server_cast(CState, {client_dying, Ref}), 495 ok = server_cast(CState, {client_delete, Ref}). 496 497-spec client_ref(client_msstate()) -> client_ref(). 498 499client_ref(#client_msstate { client_ref = Ref }) -> Ref. 500 501-spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. 502 503write_flow(MsgId, Msg, 504 CState = #client_msstate { 505 server = Server, 506 credit_disc_bound = CreditDiscBound }) -> 507 %% Here we are tracking messages sent by the 508 %% rabbit_amqqueue_process process via the 509 %% rabbit_variable_queue. We are accessing the 510 %% rabbit_amqqueue_process process dictionary. 511 credit_flow:send(Server, CreditDiscBound), 512 client_write(MsgId, Msg, flow, CState). 513 514-spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. 515 516write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). 517 518-spec read(rabbit_types:msg_id(), client_msstate()) -> 519 {rabbit_types:ok(msg()) | 'not_found', client_msstate()}. 520 521read(MsgId, 522 CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> 523 file_handle_cache_stats:update(msg_store_read), 524 %% Check the cur file cache 525 case ets:lookup(CurFileCacheEts, MsgId) of 526 [] -> 527 Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end, 528 case index_lookup_positive_ref_count(MsgId, CState) of 529 not_found -> Defer(); 530 MsgLocation -> client_read1(MsgLocation, Defer, CState) 531 end; 532 [{MsgId, Msg, _CacheRefCount}] -> 533 {{ok, Msg}, CState} 534 end. 535 536-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean(). 537 538contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). 539 540-spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'. 541 542remove([], _CState) -> ok; 543remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> 544 [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], 545 server_cast(CState, {remove, CRef, MsgIds}). 546 547-spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'. 548 549set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) -> 550 gen_server2:cast(Server, {set_maximum_since_use, Age}). 551 552%%---------------------------------------------------------------------------- 553%% Client-side-only helpers 554%%---------------------------------------------------------------------------- 555 556server_call(#client_msstate { server = Server }, Msg) -> 557 gen_server2:call(Server, Msg, infinity). 558 559server_cast(#client_msstate { server = Server }, Msg) -> 560 gen_server2:cast(Server, Msg). 561 562client_write(MsgId, Msg, Flow, 563 CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, 564 client_ref = CRef }) -> 565 file_handle_cache_stats:update(msg_store_write), 566 ok = client_update_flying(+1, MsgId, CState), 567 ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), 568 ok = server_cast(CState, {write, CRef, MsgId, Flow}). 569 570client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, 571 CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> 572 case ets:lookup(FileSummaryEts, File) of 573 [] -> %% File has been GC'd and no longer exists. Go around again. 574 read(MsgId, CState); 575 [#file_summary { locked = Locked, right = Right }] -> 576 client_read2(Locked, Right, MsgLocation, Defer, CState) 577 end. 578 579client_read2(false, undefined, _MsgLocation, Defer, _CState) -> 580 %% Although we've already checked both caches and not found the 581 %% message there, the message is apparently in the 582 %% current_file. We can only arrive here if we are trying to read 583 %% a message which we have not written, which is very odd, so just 584 %% defer. 585 %% 586 %% OR, on startup, the cur_file_cache is not populated with the 587 %% contents of the current file, thus reads from the current file 588 %% will end up here and will need to be deferred. 589 Defer(); 590client_read2(true, _Right, _MsgLocation, Defer, _CState) -> 591 %% Of course, in the mean time, the GC could have run and our msg 592 %% is actually in a different file, unlocked. However, deferring is 593 %% the safest and simplest thing to do. 594 Defer(); 595client_read2(false, _Right, 596 MsgLocation = #msg_location { msg_id = MsgId, file = File }, 597 Defer, 598 CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> 599 %% It's entirely possible that everything we're doing from here on 600 %% is for the wrong file, or a non-existent file, as a GC may have 601 %% finished. 602 safe_ets_update_counter( 603 FileSummaryEts, File, {#file_summary.readers, +1}, 604 fun (_) -> client_read3(MsgLocation, Defer, CState) end, 605 fun () -> read(MsgId, CState) end). 606 607client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, 608 CState = #client_msstate { file_handles_ets = FileHandlesEts, 609 file_summary_ets = FileSummaryEts, 610 gc_pid = GCPid, 611 client_ref = Ref }) -> 612 Release = 613 fun() -> ok = case ets:update_counter(FileSummaryEts, File, 614 {#file_summary.readers, -1}) of 615 0 -> case ets:lookup(FileSummaryEts, File) of 616 [#file_summary { locked = true }] -> 617 rabbit_msg_store_gc:no_readers( 618 GCPid, File); 619 _ -> ok 620 end; 621 _ -> ok 622 end 623 end, 624 %% If a GC involving the file hasn't already started, it won't 625 %% start now. Need to check again to see if we've been locked in 626 %% the meantime, between lookup and update_counter (thus GC 627 %% started before our +1. In fact, it could have finished by now 628 %% too). 629 case ets:lookup(FileSummaryEts, File) of 630 [] -> %% GC has deleted our file, just go round again. 631 read(MsgId, CState); 632 [#file_summary { locked = true }] -> 633 %% If we get a badarg here, then the GC has finished and 634 %% deleted our file. Try going around again. Otherwise, 635 %% just defer. 636 %% 637 %% badarg scenario: we lookup, msg_store locks, GC starts, 638 %% GC ends, we +1 readers, msg_store ets:deletes (and 639 %% unlocks the dest) 640 try Release(), 641 Defer() 642 catch error:badarg -> read(MsgId, CState) 643 end; 644 [#file_summary { locked = false }] -> 645 %% Ok, we're definitely safe to continue - a GC involving 646 %% the file cannot start up now, and isn't running, so 647 %% nothing will tell us from now on to close the handle if 648 %% it's already open. 649 %% 650 %% Finally, we need to recheck that the msg is still at 651 %% the same place - it's possible an entire GC ran between 652 %% us doing the lookup and the +1 on the readers. (Same as 653 %% badarg scenario above, but we don't have a missing file 654 %% - we just have the /wrong/ file). 655 case index_lookup(MsgId, CState) of 656 #msg_location { file = File } = MsgLocation -> 657 %% Still the same file. 658 {ok, CState1} = close_all_indicated(CState), 659 %% We are now guaranteed that the mark_handle_open 660 %% call will either insert_new correctly, or will 661 %% fail, but find the value is open, not close. 662 mark_handle_open(FileHandlesEts, File, Ref), 663 %% Could the msg_store now mark the file to be 664 %% closed? No: marks for closing are issued only 665 %% when the msg_store has locked the file. 666 %% This will never be the current file 667 {Msg, CState2} = read_from_disk(MsgLocation, CState1), 668 Release(), %% this MUST NOT fail with badarg 669 {{ok, Msg}, CState2}; 670 #msg_location {} = MsgLocation -> %% different file! 671 Release(), %% this MUST NOT fail with badarg 672 client_read1(MsgLocation, Defer, CState); 673 not_found -> %% it seems not to exist. Defer, just to be sure. 674 try Release() %% this can badarg, same as locked case, above 675 catch error:badarg -> ok 676 end, 677 Defer() 678 end 679 end. 680 681client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, 682 client_ref = CRef }) -> 683 Key = {MsgId, CRef}, 684 case ets:insert_new(FlyingEts, {Key, Diff}) of 685 true -> ok; 686 false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of 687 0 -> ok; 688 Diff -> ok; 689 Err when Err >= 2 -> 690 %% The message must be referenced twice in the queue 691 %% index. There is a bug somewhere, but we don't want 692 %% to take down anything just because of this. Let's 693 %% process the message as if the copies were in 694 %% different queues (fan-out). 695 ok; 696 Err -> throw({bad_flying_ets_update, Diff, Err, Key}) 697 catch error:badarg -> 698 %% this is guaranteed to succeed since the 699 %% server only removes and updates flying_ets 700 %% entries; it never inserts them 701 true = ets:insert_new(FlyingEts, {Key, Diff}) 702 end, 703 ok 704 end. 705 706clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, 707 dying_clients = DyingClients }) -> 708 State #msstate { cref_to_msg_ids = maps:remove(CRef, CTM), 709 dying_clients = maps:remove(CRef, DyingClients) }. 710 711 712%%---------------------------------------------------------------------------- 713%% gen_server callbacks 714%%---------------------------------------------------------------------------- 715 716 717init([Type, BaseDir, ClientRefs, StartupFunState]) -> 718 process_flag(trap_exit, true), 719 720 ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, 721 [self()]), 722 723 Dir = filename:join(BaseDir, atom_to_list(Type)), 724 Name = filename:join(filename:basename(BaseDir), atom_to_list(Type)), 725 726 {ok, IndexModule} = application:get_env(rabbit, msg_store_index_module), 727 rabbit_log:info("Message store ~tp: using ~p to provide index", [Name, IndexModule]), 728 729 AttemptFileSummaryRecovery = 730 case ClientRefs of 731 undefined -> ok = rabbit_file:recursive_delete([Dir]), 732 ok = filelib:ensure_dir(filename:join(Dir, "nothing")), 733 false; 734 _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), 735 recover_crashed_compactions(Dir) 736 end, 737 %% if we found crashed compactions we trust neither the 738 %% file_summary nor the location index. Note the file_summary is 739 %% left empty here if it can't be recovered. 740 {FileSummaryRecovered, FileSummaryEts} = 741 recover_file_summary(AttemptFileSummaryRecovery, Dir), 742 {CleanShutdown, IndexState, ClientRefs1} = 743 recover_index_and_client_refs(IndexModule, FileSummaryRecovered, 744 ClientRefs, Dir, Name), 745 Clients = maps:from_list( 746 [{CRef, {undefined, undefined, undefined}} || 747 CRef <- ClientRefs1]), 748 %% CleanShutdown => msg location index and file_summary both 749 %% recovered correctly. 750 true = case {FileSummaryRecovered, CleanShutdown} of 751 {true, false} -> ets:delete_all_objects(FileSummaryEts); 752 _ -> true 753 end, 754 %% CleanShutdown <=> msg location index and file_summary both 755 %% recovered correctly. 756 757 FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, 758 [ordered_set, public]), 759 CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), 760 FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), 761 762 {ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit), 763 764 {ok, GCPid} = rabbit_msg_store_gc:start_link( 765 #gc_state { dir = Dir, 766 index_module = IndexModule, 767 index_state = IndexState, 768 file_summary_ets = FileSummaryEts, 769 file_handles_ets = FileHandlesEts, 770 msg_store = self() 771 }), 772 773 CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, 774 ?CREDIT_DISC_BOUND), 775 776 State = #msstate { dir = Dir, 777 index_module = IndexModule, 778 index_state = IndexState, 779 current_file = 0, 780 current_file_handle = undefined, 781 file_handle_cache = #{}, 782 sync_timer_ref = undefined, 783 sum_valid_data = 0, 784 sum_file_size = 0, 785 pending_gc_completion = maps:new(), 786 gc_pid = GCPid, 787 file_handles_ets = FileHandlesEts, 788 file_summary_ets = FileSummaryEts, 789 cur_file_cache_ets = CurFileCacheEts, 790 flying_ets = FlyingEts, 791 dying_clients = #{}, 792 clients = Clients, 793 successfully_recovered = CleanShutdown, 794 file_size_limit = FileSizeLimit, 795 cref_to_msg_ids = #{}, 796 credit_disc_bound = CreditDiscBound 797 }, 798 %% If we didn't recover the msg location index then we need to 799 %% rebuild it now. 800 Cleanliness = case CleanShutdown of 801 true -> "clean"; 802 false -> "unclean" 803 end, 804 rabbit_log:debug("Rebuilding message location index after ~s shutdown...", 805 [Cleanliness]), 806 {Offset, State1 = #msstate { current_file = CurFile }} = 807 build_index(CleanShutdown, StartupFunState, State), 808 rabbit_log:debug("Finished rebuilding index", []), 809 %% read is only needed so that we can seek 810 {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), 811 [read | ?WRITE_MODE]), 812 {ok, Offset} = file_handle_cache:position(CurHdl, Offset), 813 ok = file_handle_cache:truncate(CurHdl), 814 815 {ok, maybe_compact(State1 #msstate { current_file_handle = CurHdl }), 816 hibernate, 817 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. 818 819prioritise_call(Msg, _From, _Len, _State) -> 820 case Msg of 821 successfully_recovered_state -> 7; 822 {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; 823 {read, _MsgId} -> 2; 824 _ -> 0 825 end. 826 827prioritise_cast(Msg, _Len, _State) -> 828 case Msg of 829 {combine_files, _Source, _Destination, _Reclaimed} -> 8; 830 {delete_file, _File, _Reclaimed} -> 8; 831 {set_maximum_since_use, _Age} -> 8; 832 {client_dying, _Pid} -> 7; 833 _ -> 0 834 end. 835 836prioritise_info(Msg, _Len, _State) -> 837 case Msg of 838 sync -> 8; 839 _ -> 0 840 end. 841 842handle_call(successfully_recovered_state, _From, State) -> 843 reply(State #msstate.successfully_recovered, State); 844 845handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From, 846 State = #msstate { dir = Dir, 847 index_state = IndexState, 848 index_module = IndexModule, 849 file_handles_ets = FileHandlesEts, 850 file_summary_ets = FileSummaryEts, 851 cur_file_cache_ets = CurFileCacheEts, 852 flying_ets = FlyingEts, 853 clients = Clients, 854 gc_pid = GCPid }) -> 855 Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), 856 erlang:monitor(process, CPid), 857 reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, 858 CurFileCacheEts, FlyingEts}, 859 State #msstate { clients = Clients1 }); 860 861handle_call({client_terminate, CRef}, _From, State) -> 862 reply(ok, clear_client(CRef, State)); 863 864handle_call({read, MsgId}, From, State) -> 865 State1 = read_message(MsgId, From, State), 866 noreply(State1); 867 868handle_call({contains, MsgId}, From, State) -> 869 State1 = contains_message(MsgId, From, State), 870 noreply(State1). 871 872handle_cast({client_dying, CRef}, 873 State = #msstate { dying_clients = DyingClients, 874 current_file_handle = CurHdl, 875 current_file = CurFile }) -> 876 {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), 877 DyingClients1 = maps:put(CRef, 878 #dying_client{client_ref = CRef, 879 file = CurFile, 880 offset = CurOffset}, 881 DyingClients), 882 noreply(State #msstate { dying_clients = DyingClients1 }); 883 884handle_cast({client_delete, CRef}, 885 State = #msstate { clients = Clients }) -> 886 State1 = State #msstate { clients = maps:remove(CRef, Clients) }, 887 noreply(clear_client(CRef, State1)); 888 889handle_cast({write, CRef, MsgId, Flow}, 890 State = #msstate { cur_file_cache_ets = CurFileCacheEts, 891 clients = Clients, 892 credit_disc_bound = CreditDiscBound }) -> 893 case Flow of 894 flow -> {CPid, _, _} = maps:get(CRef, Clients), 895 %% We are going to process a message sent by the 896 %% rabbit_amqqueue_process. Now we are accessing the 897 %% msg_store process dictionary. 898 credit_flow:ack(CPid, CreditDiscBound); 899 noflow -> ok 900 end, 901 true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), 902 case update_flying(-1, MsgId, CRef, State) of 903 process -> 904 [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), 905 noreply(write_message(MsgId, Msg, CRef, State)); 906 ignore -> 907 %% A 'remove' has already been issued and eliminated the 908 %% 'write'. 909 State1 = blind_confirm(CRef, gb_sets:singleton(MsgId), 910 ignored, State), 911 %% If all writes get eliminated, cur_file_cache_ets could 912 %% grow unbounded. To prevent that we delete the cache 913 %% entry here, but only if the message isn't in the 914 %% current file. That way reads of the message can 915 %% continue to be done client side, from either the cache 916 %% or the non-current files. If the message *is* in the 917 %% current file then the cache entry will be removed by 918 %% the normal logic for that in write_message/4 and 919 %% maybe_roll_to_new_file/2. 920 case index_lookup(MsgId, State1) of 921 [#msg_location { file = File }] 922 when File == State1 #msstate.current_file -> 923 ok; 924 _ -> 925 true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0}) 926 end, 927 noreply(State1) 928 end; 929 930handle_cast({remove, CRef, MsgIds}, State) -> 931 {RemovedMsgIds, State1} = 932 lists:foldl( 933 fun (MsgId, {Removed, State2}) -> 934 case update_flying(+1, MsgId, CRef, State2) of 935 process -> {[MsgId | Removed], 936 remove_message(MsgId, CRef, State2)}; 937 ignore -> {Removed, State2} 938 end 939 end, {[], State}, MsgIds), 940 noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds), 941 ignored, State1))); 942 943handle_cast({combine_files, Source, Destination, Reclaimed}, 944 State = #msstate { sum_file_size = SumFileSize, 945 file_handles_ets = FileHandlesEts, 946 file_summary_ets = FileSummaryEts, 947 clients = Clients }) -> 948 ok = cleanup_after_file_deletion(Source, State), 949 %% see comment in cleanup_after_file_deletion, and client_read3 950 true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false), 951 true = ets:update_element(FileSummaryEts, Destination, 952 {#file_summary.locked, false}), 953 State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, 954 noreply(maybe_compact(run_pending([Source, Destination], State1))); 955 956handle_cast({delete_file, File, Reclaimed}, 957 State = #msstate { sum_file_size = SumFileSize }) -> 958 ok = cleanup_after_file_deletion(File, State), 959 State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, 960 noreply(maybe_compact(run_pending([File], State1))); 961 962handle_cast({set_maximum_since_use, Age}, State) -> 963 ok = file_handle_cache:set_maximum_since_use(Age), 964 noreply(State). 965 966handle_info(sync, State) -> 967 noreply(internal_sync(State)); 968 969handle_info(timeout, State) -> 970 noreply(internal_sync(State)); 971 972handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> 973 %% similar to what happens in 974 %% rabbit_amqqueue_process:handle_ch_down but with a relation of 975 %% msg_store -> rabbit_amqqueue_process instead of 976 %% rabbit_amqqueue_process -> rabbit_channel. 977 credit_flow:peer_down(Pid), 978 noreply(State); 979 980handle_info({'EXIT', _Pid, Reason}, State) -> 981 {stop, Reason, State}. 982 983terminate(_Reason, State = #msstate { index_state = IndexState, 984 index_module = IndexModule, 985 current_file_handle = CurHdl, 986 gc_pid = GCPid, 987 file_handles_ets = FileHandlesEts, 988 file_summary_ets = FileSummaryEts, 989 cur_file_cache_ets = CurFileCacheEts, 990 flying_ets = FlyingEts, 991 clients = Clients, 992 dir = Dir }) -> 993 rabbit_log:info("Stopping message store for directory '~s'", [Dir]), 994 %% stop the gc first, otherwise it could be working and we pull 995 %% out the ets tables from under it. 996 ok = rabbit_msg_store_gc:stop(GCPid), 997 State1 = case CurHdl of 998 undefined -> State; 999 _ -> State2 = internal_sync(State), 1000 ok = file_handle_cache:close(CurHdl), 1001 State2 1002 end, 1003 State3 = close_all_handles(State1), 1004 case store_file_summary(FileSummaryEts, Dir) of 1005 ok -> ok; 1006 {error, FSErr} -> 1007 rabbit_log:error("Unable to store file summary" 1008 " for vhost message store for directory ~p~n" 1009 "Error: ~p", 1010 [Dir, FSErr]) 1011 end, 1012 [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, 1013 CurFileCacheEts, FlyingEts]], 1014 IndexModule:terminate(IndexState), 1015 case store_recovery_terms([{client_refs, maps:keys(Clients)}, 1016 {index_module, IndexModule}], Dir) of 1017 ok -> 1018 rabbit_log:info("Message store for directory '~s' is stopped", [Dir]), 1019 ok; 1020 {error, RTErr} -> 1021 rabbit_log:error("Unable to save message store recovery terms" 1022 " for directory ~p~nError: ~p", 1023 [Dir, RTErr]) 1024 end, 1025 State3 #msstate { index_state = undefined, 1026 current_file_handle = undefined }. 1027 1028code_change(_OldVsn, State, _Extra) -> 1029 {ok, State}. 1030 1031format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). 1032 1033%%---------------------------------------------------------------------------- 1034%% general helper functions 1035%%---------------------------------------------------------------------------- 1036 1037noreply(State) -> 1038 {State1, Timeout} = next_state(State), 1039 {noreply, State1, Timeout}. 1040 1041reply(Reply, State) -> 1042 {State1, Timeout} = next_state(State), 1043 {reply, Reply, State1, Timeout}. 1044 1045next_state(State = #msstate { sync_timer_ref = undefined, 1046 cref_to_msg_ids = CTM }) -> 1047 case maps:size(CTM) of 1048 0 -> {State, hibernate}; 1049 _ -> {start_sync_timer(State), 0} 1050 end; 1051next_state(State = #msstate { cref_to_msg_ids = CTM }) -> 1052 case maps:size(CTM) of 1053 0 -> {stop_sync_timer(State), hibernate}; 1054 _ -> {State, 0} 1055 end. 1056 1057start_sync_timer(State) -> 1058 rabbit_misc:ensure_timer(State, #msstate.sync_timer_ref, 1059 ?SYNC_INTERVAL, sync). 1060 1061stop_sync_timer(State) -> 1062 rabbit_misc:stop_timer(State, #msstate.sync_timer_ref). 1063 1064internal_sync(State = #msstate { current_file_handle = CurHdl, 1065 cref_to_msg_ids = CTM }) -> 1066 State1 = stop_sync_timer(State), 1067 CGs = maps:fold(fun (CRef, MsgIds, NS) -> 1068 case gb_sets:is_empty(MsgIds) of 1069 true -> NS; 1070 false -> [{CRef, MsgIds} | NS] 1071 end 1072 end, [], CTM), 1073 ok = case CGs of 1074 [] -> ok; 1075 _ -> file_handle_cache:sync(CurHdl) 1076 end, 1077 lists:foldl(fun ({CRef, MsgIds}, StateN) -> 1078 client_confirm(CRef, MsgIds, written, StateN) 1079 end, State1, CGs). 1080 1081update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> 1082 Key = {MsgId, CRef}, 1083 NDiff = -Diff, 1084 case ets:lookup(FlyingEts, Key) of 1085 [] -> ignore; 1086 [{_, Diff}] -> ignore; %% [1] 1087 [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}), 1088 true = ets:delete_object(FlyingEts, {Key, 0}), 1089 process; 1090 [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), 1091 ignore; 1092 [{_, Err}] when Err >= 2 -> 1093 %% The message must be referenced twice in the queue index. There 1094 %% is a bug somewhere, but we don't want to take down anything 1095 %% just because of this. Let's process the message as if the 1096 %% copies were in different queues (fan-out). 1097 ets:update_counter(FlyingEts, Key, {2, Diff}), 1098 true = ets:delete_object(FlyingEts, {Key, 0}), 1099 process; 1100 [{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key}) 1101 end. 1102%% [1] We can get here, for example, in the following scenario: There 1103%% is a write followed by a remove in flight. The counter will be 0, 1104%% so on processing the write the server attempts to delete the 1105%% entry. If at that point the client injects another write it will 1106%% either insert a new entry, containing +1, or increment the existing 1107%% entry to +1, thus preventing its removal. Either way therefore when 1108%% the server processes the read, the counter will be +1. 1109 1110write_action({true, not_found}, _MsgId, State) -> 1111 {ignore, undefined, State}; 1112write_action({true, #msg_location { file = File }}, _MsgId, State) -> 1113 {ignore, File, State}; 1114write_action({false, not_found}, _MsgId, State) -> 1115 {write, State}; 1116write_action({Mask, #msg_location { ref_count = 0, file = File, 1117 total_size = TotalSize }}, 1118 MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) -> 1119 case {Mask, ets:lookup(FileSummaryEts, File)} of 1120 {false, [#file_summary { locked = true }]} -> 1121 ok = index_delete(MsgId, State), 1122 {write, State}; 1123 {false_if_increment, [#file_summary { locked = true }]} -> 1124 %% The msg for MsgId is older than the client death 1125 %% message, but as it is being GC'd currently we'll have 1126 %% to write a new copy, which will then be younger, so 1127 %% ignore this write. 1128 {ignore, File, State}; 1129 {_Mask, [#file_summary {}]} -> 1130 ok = index_update_ref_count(MsgId, 1, State), 1131 State1 = adjust_valid_total_size(File, TotalSize, State), 1132 {confirm, File, State1} 1133 end; 1134write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, 1135 MsgId, State) -> 1136 ok = index_update_ref_count(MsgId, RefCount + 1, State), 1137 %% We already know about it, just update counter. Only update 1138 %% field otherwise bad interaction with concurrent GC 1139 {confirm, File, State}. 1140 1141write_message(MsgId, Msg, CRef, 1142 State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> 1143 case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of 1144 {write, State1} -> 1145 write_message(MsgId, Msg, 1146 record_pending_confirm(CRef, MsgId, State1)); 1147 {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> 1148 State1; 1149 {ignore, _File, State1} -> 1150 true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), 1151 State1; 1152 {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> 1153 record_pending_confirm(CRef, MsgId, State1); 1154 {confirm, _File, State1} -> 1155 true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), 1156 update_pending_confirms( 1157 fun (MsgOnDiskFun, CTM) -> 1158 MsgOnDiskFun(gb_sets:singleton(MsgId), written), 1159 CTM 1160 end, CRef, State1) 1161 end. 1162 1163remove_message(MsgId, CRef, 1164 State = #msstate { file_summary_ets = FileSummaryEts }) -> 1165 case should_mask_action(CRef, MsgId, State) of 1166 {true, _Location} -> 1167 State; 1168 {false_if_increment, #msg_location { ref_count = 0 }} -> 1169 %% CRef has tried to both write and remove this msg whilst 1170 %% it's being GC'd. 1171 %% 1172 %% ASSERTION: [#file_summary { locked = true }] = 1173 %% ets:lookup(FileSummaryEts, File), 1174 State; 1175 {_Mask, #msg_location { ref_count = RefCount, file = File, 1176 total_size = TotalSize }} 1177 when RefCount > 0 -> 1178 %% only update field, otherwise bad interaction with 1179 %% concurrent GC 1180 Dec = fun () -> index_update_ref_count( 1181 MsgId, RefCount - 1, State) end, 1182 case RefCount of 1183 %% don't remove from cur_file_cache_ets here because 1184 %% there may be further writes in the mailbox for the 1185 %% same msg. 1186 1 -> case ets:lookup(FileSummaryEts, File) of 1187 [#file_summary { locked = true }] -> 1188 add_to_pending_gc_completion( 1189 {remove, MsgId, CRef}, File, State); 1190 [#file_summary {}] -> 1191 ok = Dec(), 1192 delete_file_if_empty( 1193 File, adjust_valid_total_size( 1194 File, -TotalSize, State)) 1195 end; 1196 _ -> ok = Dec(), 1197 State 1198 end 1199 end. 1200 1201write_message(MsgId, Msg, 1202 State = #msstate { current_file_handle = CurHdl, 1203 current_file = CurFile, 1204 sum_valid_data = SumValid, 1205 sum_file_size = SumFileSize, 1206 file_summary_ets = FileSummaryEts }) -> 1207 {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), 1208 {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), 1209 ok = index_insert( 1210 #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile, 1211 offset = CurOffset, total_size = TotalSize }, State), 1212 [#file_summary { right = undefined, locked = false }] = 1213 ets:lookup(FileSummaryEts, CurFile), 1214 [_,_] = ets:update_counter(FileSummaryEts, CurFile, 1215 [{#file_summary.valid_total_size, TotalSize}, 1216 {#file_summary.file_size, TotalSize}]), 1217 maybe_roll_to_new_file(CurOffset + TotalSize, 1218 State #msstate { 1219 sum_valid_data = SumValid + TotalSize, 1220 sum_file_size = SumFileSize + TotalSize }). 1221 1222read_message(MsgId, From, State) -> 1223 case index_lookup_positive_ref_count(MsgId, State) of 1224 not_found -> gen_server2:reply(From, not_found), 1225 State; 1226 MsgLocation -> read_message1(From, MsgLocation, State) 1227 end. 1228 1229read_message1(From, #msg_location { msg_id = MsgId, file = File, 1230 offset = Offset } = MsgLoc, 1231 State = #msstate { current_file = CurFile, 1232 current_file_handle = CurHdl, 1233 file_summary_ets = FileSummaryEts, 1234 cur_file_cache_ets = CurFileCacheEts }) -> 1235 case File =:= CurFile of 1236 true -> {Msg, State1} = 1237 %% can return [] if msg in file existed on startup 1238 case ets:lookup(CurFileCacheEts, MsgId) of 1239 [] -> 1240 {ok, RawOffSet} = 1241 file_handle_cache:current_raw_offset(CurHdl), 1242 ok = case Offset >= RawOffSet of 1243 true -> file_handle_cache:flush(CurHdl); 1244 false -> ok 1245 end, 1246 read_from_disk(MsgLoc, State); 1247 [{MsgId, Msg1, _CacheRefCount}] -> 1248 {Msg1, State} 1249 end, 1250 gen_server2:reply(From, {ok, Msg}), 1251 State1; 1252 false -> [#file_summary { locked = Locked }] = 1253 ets:lookup(FileSummaryEts, File), 1254 case Locked of 1255 true -> add_to_pending_gc_completion({read, MsgId, From}, 1256 File, State); 1257 false -> {Msg, State1} = read_from_disk(MsgLoc, State), 1258 gen_server2:reply(From, {ok, Msg}), 1259 State1 1260 end 1261 end. 1262 1263read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset, 1264 total_size = TotalSize }, State) -> 1265 {Hdl, State1} = get_read_handle(File, State), 1266 {ok, Offset} = file_handle_cache:position(Hdl, Offset), 1267 {ok, {MsgId, Msg}} = 1268 case rabbit_msg_file:read(Hdl, TotalSize) of 1269 {ok, {MsgId, _}} = Obj -> 1270 Obj; 1271 Rest -> 1272 {error, {misread, [{old_state, State}, 1273 {file_num, File}, 1274 {offset, Offset}, 1275 {msg_id, MsgId}, 1276 {read, Rest}, 1277 {proc_dict, get()} 1278 ]}} 1279 end, 1280 {Msg, State1}. 1281 1282contains_message(MsgId, From, 1283 State = #msstate { pending_gc_completion = Pending }) -> 1284 case index_lookup_positive_ref_count(MsgId, State) of 1285 not_found -> 1286 gen_server2:reply(From, false), 1287 State; 1288 #msg_location { file = File } -> 1289 case maps:is_key(File, Pending) of 1290 true -> add_to_pending_gc_completion( 1291 {contains, MsgId, From}, File, State); 1292 false -> gen_server2:reply(From, true), 1293 State 1294 end 1295 end. 1296 1297add_to_pending_gc_completion( 1298 Op, File, State = #msstate { pending_gc_completion = Pending }) -> 1299 State #msstate { pending_gc_completion = 1300 rabbit_misc:maps_cons(File, Op, Pending) }. 1301 1302run_pending(Files, State) -> 1303 lists:foldl( 1304 fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> 1305 Pending1 = maps:remove(File, Pending), 1306 lists:foldl( 1307 fun run_pending_action/2, 1308 State1 #msstate { pending_gc_completion = Pending1 }, 1309 lists:reverse(maps:get(File, Pending))) 1310 end, State, Files). 1311 1312run_pending_action({read, MsgId, From}, State) -> 1313 read_message(MsgId, From, State); 1314run_pending_action({contains, MsgId, From}, State) -> 1315 contains_message(MsgId, From, State); 1316run_pending_action({remove, MsgId, CRef}, State) -> 1317 remove_message(MsgId, CRef, State). 1318 1319safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> 1320 try 1321 SuccessFun(ets:update_counter(Tab, Key, UpdateOp)) 1322 catch error:badarg -> FailThunk() 1323 end. 1324 1325update_msg_cache(CacheEts, MsgId, Msg) -> 1326 case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of 1327 true -> ok; 1328 false -> safe_ets_update_counter( 1329 CacheEts, MsgId, {3, +1}, fun (_) -> ok end, 1330 fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) 1331 end. 1332 1333adjust_valid_total_size(File, Delta, State = #msstate { 1334 sum_valid_data = SumValid, 1335 file_summary_ets = FileSummaryEts }) -> 1336 [_] = ets:update_counter(FileSummaryEts, File, 1337 [{#file_summary.valid_total_size, Delta}]), 1338 State #msstate { sum_valid_data = SumValid + Delta }. 1339 1340maps_store(Key, Val, Dict) -> 1341 false = maps:is_key(Key, Dict), 1342 maps:put(Key, Val, Dict). 1343 1344update_pending_confirms(Fun, CRef, 1345 State = #msstate { clients = Clients, 1346 cref_to_msg_ids = CTM }) -> 1347 case maps:get(CRef, Clients) of 1348 {_CPid, undefined, _CloseFDsFun} -> State; 1349 {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), 1350 State #msstate { 1351 cref_to_msg_ids = CTM1 } 1352 end. 1353 1354record_pending_confirm(CRef, MsgId, State) -> 1355 update_pending_confirms( 1356 fun (_MsgOnDiskFun, CTM) -> 1357 NewMsgIds = case maps:find(CRef, CTM) of 1358 error -> gb_sets:singleton(MsgId); 1359 {ok, MsgIds} -> gb_sets:add(MsgId, MsgIds) 1360 end, 1361 maps:put(CRef, NewMsgIds, CTM) 1362 end, CRef, State). 1363 1364client_confirm(CRef, MsgIds, ActionTaken, State) -> 1365 update_pending_confirms( 1366 fun (MsgOnDiskFun, CTM) -> 1367 case maps:find(CRef, CTM) of 1368 {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds), 1369 ActionTaken), 1370 MsgIds1 = rabbit_misc:gb_sets_difference( 1371 Gs, MsgIds), 1372 case gb_sets:is_empty(MsgIds1) of 1373 true -> maps:remove(CRef, CTM); 1374 false -> maps:put(CRef, MsgIds1, CTM) 1375 end; 1376 error -> CTM 1377 end 1378 end, CRef, State). 1379 1380blind_confirm(CRef, MsgIds, ActionTaken, State) -> 1381 update_pending_confirms( 1382 fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end, 1383 CRef, State). 1384 1385%% Detect whether the MsgId is older or younger than the client's death 1386%% msg (if there is one). If the msg is older than the client death 1387%% msg, and it has a 0 ref_count we must only alter the ref_count, not 1388%% rewrite the msg - rewriting it would make it younger than the death 1389%% msg and thus should be ignored. Note that this (correctly) returns 1390%% false when testing to remove the death msg itself. 1391should_mask_action(CRef, MsgId, 1392 State = #msstate{dying_clients = DyingClients}) -> 1393 case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of 1394 {error, Location} -> 1395 {false, Location}; 1396 {{ok, _}, not_found} -> 1397 {true, not_found}; 1398 {{ok, Client}, #msg_location { file = File, offset = Offset, 1399 ref_count = RefCount } = Location} -> 1400 #dying_client{file = DeathFile, offset = DeathOffset} = Client, 1401 {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of 1402 {true, _} -> true; 1403 {false, 0} -> false_if_increment; 1404 {false, _} -> false 1405 end, Location} 1406 end. 1407 1408%%---------------------------------------------------------------------------- 1409%% file helper functions 1410%%---------------------------------------------------------------------------- 1411 1412open_file(File, Mode) -> 1413 file_handle_cache:open_with_absolute_path( 1414 File, ?BINARY_MODE ++ Mode, 1415 [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, 1416 {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). 1417 1418open_file(Dir, FileName, Mode) -> 1419 open_file(form_filename(Dir, FileName), Mode). 1420 1421close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> 1422 CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; 1423 1424close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> 1425 State #msstate { file_handle_cache = close_handle(Key, FHC) }; 1426 1427close_handle(Key, FHC) -> 1428 case maps:find(Key, FHC) of 1429 {ok, Hdl} -> ok = file_handle_cache:close(Hdl), 1430 maps:remove(Key, FHC); 1431 error -> FHC 1432 end. 1433 1434mark_handle_open(FileHandlesEts, File, Ref) -> 1435 %% This is fine to fail (already exists). Note it could fail with 1436 %% the value being close, and not have it updated to open. 1437 ets:insert_new(FileHandlesEts, {{Ref, File}, open}), 1438 true. 1439 1440%% See comment in client_read3 - only call this when the file is locked 1441mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> 1442 [ begin 1443 case (ets:update_element(FileHandlesEts, Key, {2, close}) 1444 andalso Invoke) of 1445 true -> case maps:get(Ref, ClientRefs) of 1446 {_CPid, _MsgOnDiskFun, undefined} -> 1447 ok; 1448 {_CPid, _MsgOnDiskFun, CloseFDsFun} -> 1449 ok = CloseFDsFun() 1450 end; 1451 false -> ok 1452 end 1453 end || {{Ref, _File} = Key, open} <- 1454 ets:match_object(FileHandlesEts, {{'_', File}, open}) ], 1455 true. 1456 1457safe_file_delete_fun(File, Dir, FileHandlesEts) -> 1458 fun () -> safe_file_delete(File, Dir, FileHandlesEts) end. 1459 1460safe_file_delete(File, Dir, FileHandlesEts) -> 1461 %% do not match on any value - it's the absence of the row that 1462 %% indicates the client has really closed the file. 1463 case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of 1464 {[_|_], _Cont} -> false; 1465 _ -> ok = file:delete( 1466 form_filename(Dir, filenum_to_name(File))), 1467 true 1468 end. 1469 1470-spec close_all_indicated 1471 (client_msstate()) -> rabbit_types:ok(client_msstate()). 1472 1473close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, 1474 client_ref = Ref } = 1475 CState) -> 1476 Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}), 1477 {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) -> 1478 true = ets:delete(FileHandlesEts, Key), 1479 close_handle(File, CStateM) 1480 end, CState, Objs)}. 1481 1482close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, 1483 file_handle_cache = FHC, 1484 client_ref = Ref }) -> 1485 ok = maps:fold(fun (File, Hdl, ok) -> 1486 true = ets:delete(FileHandlesEts, {Ref, File}), 1487 file_handle_cache:close(Hdl) 1488 end, ok, FHC), 1489 CState #client_msstate { file_handle_cache = #{} }; 1490 1491close_all_handles(State = #msstate { file_handle_cache = FHC }) -> 1492 ok = maps:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end, 1493 ok, FHC), 1494 State #msstate { file_handle_cache = #{} }. 1495 1496get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC, 1497 dir = Dir }) -> 1498 {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), 1499 {Hdl, CState #client_msstate { file_handle_cache = FHC2 }}; 1500 1501get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC, 1502 dir = Dir }) -> 1503 {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), 1504 {Hdl, State #msstate { file_handle_cache = FHC2 }}. 1505 1506get_read_handle(FileNum, FHC, Dir) -> 1507 case maps:find(FileNum, FHC) of 1508 {ok, Hdl} -> {Hdl, FHC}; 1509 error -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum), 1510 ?READ_MODE), 1511 {Hdl, maps:put(FileNum, Hdl, FHC)} 1512 end. 1513 1514preallocate(Hdl, FileSizeLimit, FinalPos) -> 1515 {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), 1516 ok = file_handle_cache:truncate(Hdl), 1517 {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), 1518 ok. 1519 1520truncate_and_extend_file(Hdl, Lowpoint, Highpoint) -> 1521 {ok, Lowpoint} = file_handle_cache:position(Hdl, Lowpoint), 1522 ok = file_handle_cache:truncate(Hdl), 1523 ok = preallocate(Hdl, Highpoint, Lowpoint). 1524 1525form_filename(Dir, Name) -> filename:join(Dir, Name). 1526 1527filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. 1528 1529filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). 1530 1531list_sorted_filenames(Dir, Ext) -> 1532 lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, 1533 filelib:wildcard("*" ++ Ext, Dir)). 1534 1535%%---------------------------------------------------------------------------- 1536%% index 1537%%---------------------------------------------------------------------------- 1538 1539index_lookup_positive_ref_count(Key, State) -> 1540 case index_lookup(Key, State) of 1541 not_found -> not_found; 1542 #msg_location { ref_count = 0 } -> not_found; 1543 #msg_location {} = MsgLocation -> MsgLocation 1544 end. 1545 1546index_update_ref_count(Key, RefCount, State) -> 1547 index_update_fields(Key, {#msg_location.ref_count, RefCount}, State). 1548 1549index_lookup(Key, #gc_state { index_module = Index, 1550 index_state = State }) -> 1551 Index:lookup(Key, State); 1552 1553index_lookup(Key, #client_msstate { index_module = Index, 1554 index_state = State }) -> 1555 Index:lookup(Key, State); 1556 1557index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> 1558 Index:lookup(Key, State). 1559 1560index_insert(Obj, #msstate { index_module = Index, index_state = State }) -> 1561 Index:insert(Obj, State). 1562 1563index_update(Obj, #msstate { index_module = Index, index_state = State }) -> 1564 Index:update(Obj, State). 1565 1566index_update_fields(Key, Updates, #msstate{ index_module = Index, 1567 index_state = State }) -> 1568 Index:update_fields(Key, Updates, State); 1569index_update_fields(Key, Updates, #gc_state{ index_module = Index, 1570 index_state = State }) -> 1571 Index:update_fields(Key, Updates, State). 1572 1573index_delete(Key, #msstate { index_module = Index, index_state = State }) -> 1574 Index:delete(Key, State). 1575 1576index_delete_object(Obj, #gc_state{ index_module = Index, 1577 index_state = State }) -> 1578 Index:delete_object(Obj, State). 1579 1580index_clean_up_temporary_reference_count_entries( 1581 #msstate { index_module = Index, 1582 index_state = State }) -> 1583 Index:clean_up_temporary_reference_count_entries_without_file(State). 1584 1585%%---------------------------------------------------------------------------- 1586%% shutdown and recovery 1587%%---------------------------------------------------------------------------- 1588 1589recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Name) -> 1590 {false, IndexModule:new(Dir), []}; 1591recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Name) -> 1592 rabbit_log:warning("Message store ~tp: rebuilding indices from scratch", [Name]), 1593 {false, IndexModule:new(Dir), []}; 1594recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Name) -> 1595 Fresh = fun (ErrorMsg, ErrorArgs) -> 1596 rabbit_log:warning("Message store ~tp : " ++ ErrorMsg ++ "~n" 1597 "rebuilding indices from scratch", 1598 [Name | ErrorArgs]), 1599 {false, IndexModule:new(Dir), []} 1600 end, 1601 case read_recovery_terms(Dir) of 1602 {false, Error} -> 1603 Fresh("failed to read recovery terms: ~p", [Error]); 1604 {true, Terms} -> 1605 RecClientRefs = proplists:get_value(client_refs, Terms, []), 1606 RecIndexModule = proplists:get_value(index_module, Terms), 1607 case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs) 1608 andalso IndexModule =:= RecIndexModule) of 1609 true -> case IndexModule:recover(Dir) of 1610 {ok, IndexState1} -> 1611 {true, IndexState1, ClientRefs}; 1612 {error, Error} -> 1613 Fresh("failed to recover index: ~p", [Error]) 1614 end; 1615 false -> Fresh("recovery terms differ from present", []) 1616 end 1617 end. 1618 1619store_recovery_terms(Terms, Dir) -> 1620 rabbit_file:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). 1621 1622read_recovery_terms(Dir) -> 1623 Path = filename:join(Dir, ?CLEAN_FILENAME), 1624 case rabbit_file:read_term_file(Path) of 1625 {ok, Terms} -> case file:delete(Path) of 1626 ok -> {true, Terms}; 1627 {error, Error} -> {false, Error} 1628 end; 1629 {error, Error} -> {false, Error} 1630 end. 1631 1632store_file_summary(Tid, Dir) -> 1633 ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), 1634 [{extended_info, [object_count]}]). 1635 1636recover_file_summary(false, _Dir) -> 1637 %% TODO: the only reason for this to be an *ordered*_set is so 1638 %% that a) maybe_compact can start a traversal from the eldest 1639 %% file, and b) build_index in fast recovery mode can easily 1640 %% identify the current file. It's awkward to have both that 1641 %% odering and the left/right pointers in the entries - replacing 1642 %% the former with some additional bit of state would be easy, but 1643 %% ditching the latter would be neater. 1644 {false, ets:new(rabbit_msg_store_file_summary, 1645 [ordered_set, public, {keypos, #file_summary.file}])}; 1646recover_file_summary(true, Dir) -> 1647 Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), 1648 case ets:file2tab(Path) of 1649 {ok, Tid} -> ok = file:delete(Path), 1650 {true, Tid}; 1651 {error, _Error} -> recover_file_summary(false, Dir) 1652 end. 1653 1654count_msg_refs(Gen, Seed, State) -> 1655 case Gen(Seed) of 1656 finished -> 1657 ok; 1658 {_MsgId, 0, Next} -> 1659 count_msg_refs(Gen, Next, State); 1660 {MsgId, Delta, Next} -> 1661 ok = case index_lookup(MsgId, State) of 1662 not_found -> 1663 index_insert(#msg_location { msg_id = MsgId, 1664 file = undefined, 1665 ref_count = Delta }, 1666 State); 1667 #msg_location { ref_count = RefCount } = StoreEntry -> 1668 NewRefCount = RefCount + Delta, 1669 case NewRefCount of 1670 0 -> index_delete(MsgId, State); 1671 _ -> index_update(StoreEntry #msg_location { 1672 ref_count = NewRefCount }, 1673 State) 1674 end 1675 end, 1676 count_msg_refs(Gen, Next, State) 1677 end. 1678 1679recover_crashed_compactions(Dir) -> 1680 FileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION), 1681 TmpFileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION_TMP), 1682 lists:foreach( 1683 fun (TmpFileName) -> 1684 NonTmpRelatedFileName = 1685 filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, 1686 true = lists:member(NonTmpRelatedFileName, FileNames), 1687 ok = recover_crashed_compaction( 1688 Dir, TmpFileName, NonTmpRelatedFileName) 1689 end, TmpFileNames), 1690 TmpFileNames == []. 1691 1692recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> 1693 %% Because a msg can legitimately appear multiple times in the 1694 %% same file, identifying the contents of the tmp file and where 1695 %% they came from is non-trivial. If we are recovering a crashed 1696 %% compaction then we will be rebuilding the index, which can cope 1697 %% with duplicates appearing. Thus the simplest and safest thing 1698 %% to do is to append the contents of the tmp file to its main 1699 %% file. 1700 {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE), 1701 {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, 1702 ?READ_MODE ++ ?WRITE_MODE), 1703 {ok, _End} = file_handle_cache:position(MainHdl, eof), 1704 Size = filelib:file_size(form_filename(Dir, TmpFileName)), 1705 {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size), 1706 ok = file_handle_cache:close(MainHdl), 1707 ok = file_handle_cache:delete(TmpHdl), 1708 ok. 1709 1710scan_file_for_valid_messages(File) -> 1711 case open_file(File, ?READ_MODE) of 1712 {ok, Hdl} -> Valid = rabbit_msg_file:scan( 1713 Hdl, filelib:file_size(File), 1714 fun scan_fun/2, []), 1715 ok = file_handle_cache:close(Hdl), 1716 Valid; 1717 {error, enoent} -> {ok, [], 0}; 1718 {error, Reason} -> {error, {unable_to_scan_file, 1719 filename:basename(File), 1720 Reason}} 1721 end. 1722 1723scan_file_for_valid_messages(Dir, FileName) -> 1724 scan_file_for_valid_messages(form_filename(Dir, FileName)). 1725 1726scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) -> 1727 [{MsgId, TotalSize, Offset} | Acc]. 1728 1729%% Takes the list in *ascending* order (i.e. eldest message 1730%% first). This is the opposite of what scan_file_for_valid_messages 1731%% produces. The list of msgs that is produced is youngest first. 1732drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0). 1733 1734drop_contiguous_block_prefix([], ExpectedOffset) -> 1735 {ExpectedOffset, []}; 1736drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset, 1737 total_size = TotalSize } | Tail], 1738 ExpectedOffset) -> 1739 ExpectedOffset1 = ExpectedOffset + TotalSize, 1740 drop_contiguous_block_prefix(Tail, ExpectedOffset1); 1741drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) -> 1742 {ExpectedOffset, MsgsAfterGap}. 1743 1744build_index(true, _StartupFunState, 1745 State = #msstate { file_summary_ets = FileSummaryEts }) -> 1746 ets:foldl( 1747 fun (#file_summary { valid_total_size = ValidTotalSize, 1748 file_size = FileSize, 1749 file = File }, 1750 {_Offset, State1 = #msstate { sum_valid_data = SumValid, 1751 sum_file_size = SumFileSize }}) -> 1752 {FileSize, State1 #msstate { 1753 sum_valid_data = SumValid + ValidTotalSize, 1754 sum_file_size = SumFileSize + FileSize, 1755 current_file = File }} 1756 end, {0, State}, FileSummaryEts); 1757build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, 1758 State = #msstate { dir = Dir }) -> 1759 rabbit_log:debug("Rebuilding message refcount...", []), 1760 ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), 1761 rabbit_log:debug("Done rebuilding message refcount", []), 1762 {ok, Pid} = gatherer:start_link(), 1763 case [filename_to_num(FileName) || 1764 FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of 1765 [] -> rebuild_index(Pid, [State #msstate.current_file], 1766 State); 1767 Files -> {Offset, State1} = rebuild_index(Pid, Files, State), 1768 {Offset, lists:foldl(fun delete_file_if_empty/2, 1769 State1, Files)} 1770 end. 1771 1772build_index_worker(Gatherer, State = #msstate { dir = Dir }, 1773 Left, File, Files) -> 1774 FileName = filenum_to_name(File), 1775 rabbit_log:debug("Rebuilding message location index from ~p (~B file(s) remaining)", 1776 [form_filename(Dir, FileName), length(Files)]), 1777 {ok, Messages, FileSize} = 1778 scan_file_for_valid_messages(Dir, FileName), 1779 {ValidMessages, ValidTotalSize} = 1780 lists:foldl( 1781 fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> 1782 case index_lookup(MsgId, State) of 1783 #msg_location { file = undefined } = StoreEntry -> 1784 ok = index_update(StoreEntry #msg_location { 1785 file = File, offset = Offset, 1786 total_size = TotalSize }, 1787 State), 1788 {[Obj | VMAcc], VTSAcc + TotalSize}; 1789 _ -> 1790 {VMAcc, VTSAcc} 1791 end 1792 end, {[], 0}, Messages), 1793 {Right, FileSize1} = 1794 case Files of 1795 %% if it's the last file, we'll truncate to remove any 1796 %% rubbish above the last valid message. This affects the 1797 %% file size. 1798 [] -> {undefined, case ValidMessages of 1799 [] -> 0; 1800 _ -> {_MsgId, TotalSize, Offset} = 1801 lists:last(ValidMessages), 1802 Offset + TotalSize 1803 end}; 1804 [F|_] -> {F, FileSize} 1805 end, 1806 ok = gatherer:in(Gatherer, #file_summary { 1807 file = File, 1808 valid_total_size = ValidTotalSize, 1809 left = Left, 1810 right = Right, 1811 file_size = FileSize1, 1812 locked = false, 1813 readers = 0 }), 1814 ok = gatherer:finish(Gatherer). 1815 1816enqueue_build_index_workers(_Gatherer, _Left, [], _State) -> 1817 exit(normal); 1818enqueue_build_index_workers(Gatherer, Left, [File|Files], State) -> 1819 ok = worker_pool:dispatch_sync( 1820 fun () -> 1821 link(Gatherer), 1822 ok = build_index_worker(Gatherer, State, 1823 Left, File, Files), 1824 unlink(Gatherer), 1825 ok 1826 end), 1827 enqueue_build_index_workers(Gatherer, File, Files, State). 1828 1829reduce_index(Gatherer, LastFile, 1830 State = #msstate { file_summary_ets = FileSummaryEts, 1831 sum_valid_data = SumValid, 1832 sum_file_size = SumFileSize }) -> 1833 case gatherer:out(Gatherer) of 1834 empty -> 1835 ok = gatherer:stop(Gatherer), 1836 ok = index_clean_up_temporary_reference_count_entries(State), 1837 Offset = case ets:lookup(FileSummaryEts, LastFile) of 1838 [] -> 0; 1839 [#file_summary { file_size = FileSize }] -> FileSize 1840 end, 1841 {Offset, State #msstate { current_file = LastFile }}; 1842 {value, #file_summary { valid_total_size = ValidTotalSize, 1843 file_size = FileSize } = FileSummary} -> 1844 true = ets:insert_new(FileSummaryEts, FileSummary), 1845 reduce_index(Gatherer, LastFile, 1846 State #msstate { 1847 sum_valid_data = SumValid + ValidTotalSize, 1848 sum_file_size = SumFileSize + FileSize }) 1849 end. 1850 1851rebuild_index(Gatherer, Files, State) -> 1852 lists:foreach(fun (_File) -> 1853 ok = gatherer:fork(Gatherer) 1854 end, Files), 1855 Pid = spawn( 1856 fun () -> 1857 enqueue_build_index_workers(Gatherer, undefined, 1858 Files, State) 1859 end), 1860 erlang:monitor(process, Pid), 1861 reduce_index(Gatherer, lists:last(Files), State). 1862 1863%%---------------------------------------------------------------------------- 1864%% garbage collection / compaction / aggregation -- internal 1865%%---------------------------------------------------------------------------- 1866 1867maybe_roll_to_new_file( 1868 Offset, 1869 State = #msstate { dir = Dir, 1870 current_file_handle = CurHdl, 1871 current_file = CurFile, 1872 file_summary_ets = FileSummaryEts, 1873 cur_file_cache_ets = CurFileCacheEts, 1874 file_size_limit = FileSizeLimit }) 1875 when Offset >= FileSizeLimit -> 1876 State1 = internal_sync(State), 1877 ok = file_handle_cache:close(CurHdl), 1878 NextFile = CurFile + 1, 1879 {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), 1880 true = ets:insert_new(FileSummaryEts, #file_summary { 1881 file = NextFile, 1882 valid_total_size = 0, 1883 left = CurFile, 1884 right = undefined, 1885 file_size = 0, 1886 locked = false, 1887 readers = 0 }), 1888 true = ets:update_element(FileSummaryEts, CurFile, 1889 {#file_summary.right, NextFile}), 1890 true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), 1891 maybe_compact(State1 #msstate { current_file_handle = NextHdl, 1892 current_file = NextFile }); 1893maybe_roll_to_new_file(_, State) -> 1894 State. 1895 1896maybe_compact(State = #msstate { sum_valid_data = SumValid, 1897 sum_file_size = SumFileSize, 1898 gc_pid = GCPid, 1899 pending_gc_completion = Pending, 1900 file_summary_ets = FileSummaryEts, 1901 file_size_limit = FileSizeLimit }) 1902 when SumFileSize > 2 * FileSizeLimit andalso 1903 (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> 1904 %% TODO: the algorithm here is sub-optimal - it may result in a 1905 %% complete traversal of FileSummaryEts. 1906 First = ets:first(FileSummaryEts), 1907 case First =:= '$end_of_table' orelse 1908 maps:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of 1909 true -> 1910 State; 1911 false -> 1912 case find_files_to_combine(FileSummaryEts, FileSizeLimit, 1913 ets:lookup(FileSummaryEts, First)) of 1914 not_found -> 1915 State; 1916 {Src, Dst} -> 1917 Pending1 = maps_store(Dst, [], 1918 maps_store(Src, [], Pending)), 1919 State1 = close_handle(Src, close_handle(Dst, State)), 1920 true = ets:update_element(FileSummaryEts, Src, 1921 {#file_summary.locked, true}), 1922 true = ets:update_element(FileSummaryEts, Dst, 1923 {#file_summary.locked, true}), 1924 ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst), 1925 State1 #msstate { pending_gc_completion = Pending1 } 1926 end 1927 end; 1928maybe_compact(State) -> 1929 State. 1930 1931find_files_to_combine(FileSummaryEts, FileSizeLimit, 1932 [#file_summary { file = Dst, 1933 valid_total_size = DstValid, 1934 right = Src, 1935 locked = DstLocked }]) -> 1936 case Src of 1937 undefined -> 1938 not_found; 1939 _ -> 1940 [#file_summary { file = Src, 1941 valid_total_size = SrcValid, 1942 left = Dst, 1943 right = SrcRight, 1944 locked = SrcLocked }] = Next = 1945 ets:lookup(FileSummaryEts, Src), 1946 case SrcRight of 1947 undefined -> not_found; 1948 _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso 1949 (DstValid > 0) andalso (SrcValid > 0) andalso 1950 not (DstLocked orelse SrcLocked) of 1951 true -> {Src, Dst}; 1952 false -> find_files_to_combine( 1953 FileSummaryEts, FileSizeLimit, Next) 1954 end 1955 end 1956 end. 1957 1958delete_file_if_empty(File, State = #msstate { current_file = File }) -> 1959 State; 1960delete_file_if_empty(File, State = #msstate { 1961 gc_pid = GCPid, 1962 file_summary_ets = FileSummaryEts, 1963 pending_gc_completion = Pending }) -> 1964 [#file_summary { valid_total_size = ValidData, 1965 locked = false }] = 1966 ets:lookup(FileSummaryEts, File), 1967 case ValidData of 1968 %% don't delete the file_summary_ets entry for File here 1969 %% because we could have readers which need to be able to 1970 %% decrement the readers count. 1971 0 -> true = ets:update_element(FileSummaryEts, File, 1972 {#file_summary.locked, true}), 1973 ok = rabbit_msg_store_gc:delete(GCPid, File), 1974 Pending1 = maps_store(File, [], Pending), 1975 close_handle(File, 1976 State #msstate { pending_gc_completion = Pending1 }); 1977 _ -> State 1978 end. 1979 1980cleanup_after_file_deletion(File, 1981 #msstate { file_handles_ets = FileHandlesEts, 1982 file_summary_ets = FileSummaryEts, 1983 clients = Clients }) -> 1984 %% Ensure that any clients that have open fhs to the file close 1985 %% them before using them again. This has to be done here (given 1986 %% it's done in the msg_store, and not the gc), and not when 1987 %% starting up the GC, because if done when starting up the GC, 1988 %% the client could find the close, and close and reopen the fh, 1989 %% whilst the GC is waiting for readers to disappear, before it's 1990 %% actually done the GC. 1991 true = mark_handle_to_close(Clients, FileHandlesEts, File, true), 1992 [#file_summary { left = Left, 1993 right = Right, 1994 locked = true, 1995 readers = 0 }] = ets:lookup(FileSummaryEts, File), 1996 %% We'll never delete the current file, so right is never undefined 1997 true = Right =/= undefined, %% ASSERTION 1998 true = ets:update_element(FileSummaryEts, Right, 1999 {#file_summary.left, Left}), 2000 %% ensure the double linked list is maintained 2001 true = case Left of 2002 undefined -> true; %% File is the eldest file (left-most) 2003 _ -> ets:update_element(FileSummaryEts, Left, 2004 {#file_summary.right, Right}) 2005 end, 2006 true = ets:delete(FileSummaryEts, File), 2007 ok. 2008 2009%%---------------------------------------------------------------------------- 2010%% garbage collection / compaction / aggregation -- external 2011%%---------------------------------------------------------------------------- 2012 2013-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> 2014 {ok, deletion_thunk()} | {defer, [non_neg_integer()]}. 2015 2016combine_files(Source, Destination, 2017 State = #gc_state { file_summary_ets = FileSummaryEts }) -> 2018 [#file_summary{locked = true} = SourceSummary] = 2019 ets:lookup(FileSummaryEts, Source), 2020 2021 [#file_summary{locked = true} = DestinationSummary] = 2022 ets:lookup(FileSummaryEts, Destination), 2023 2024 case {SourceSummary, DestinationSummary} of 2025 {#file_summary{readers = 0}, #file_summary{readers = 0}} -> 2026 {ok, do_combine_files(SourceSummary, DestinationSummary, 2027 Source, Destination, State)}; 2028 _ -> 2029 rabbit_log:debug("Asked to combine files ~p and ~p but they have active readers. Deferring.", 2030 [Source, Destination]), 2031 DeferredFiles = [FileSummary#file_summary.file 2032 || FileSummary <- [SourceSummary, DestinationSummary], 2033 FileSummary#file_summary.readers /= 0], 2034 {defer, DeferredFiles} 2035 end. 2036 2037do_combine_files(SourceSummary, DestinationSummary, 2038 Source, Destination, 2039 State = #gc_state { file_summary_ets = FileSummaryEts, 2040 file_handles_ets = FileHandlesEts, 2041 dir = Dir, 2042 msg_store = Server }) -> 2043 #file_summary { 2044 readers = 0, 2045 left = Destination, 2046 valid_total_size = SourceValid, 2047 file_size = SourceFileSize, 2048 locked = true } = SourceSummary, 2049 #file_summary { 2050 readers = 0, 2051 right = Source, 2052 valid_total_size = DestinationValid, 2053 file_size = DestinationFileSize, 2054 locked = true } = DestinationSummary, 2055 2056 SourceName = filenum_to_name(Source), 2057 DestinationName = filenum_to_name(Destination), 2058 {ok, SourceHdl} = open_file(Dir, SourceName, 2059 ?READ_AHEAD_MODE), 2060 {ok, DestinationHdl} = open_file(Dir, DestinationName, 2061 ?READ_AHEAD_MODE ++ ?WRITE_MODE), 2062 TotalValidData = SourceValid + DestinationValid, 2063 %% if DestinationValid =:= DestinationContiguousTop then we don't 2064 %% need a tmp file 2065 %% if they're not equal, then we need to write out everything past 2066 %% the DestinationContiguousTop to a tmp file then truncate, 2067 %% copy back in, and then copy over from Source 2068 %% otherwise we just truncate straight away and copy over from Source 2069 {DestinationWorkList, DestinationValid} = 2070 load_and_vacuum_message_file(Destination, State), 2071 {DestinationContiguousTop, DestinationWorkListTail} = 2072 drop_contiguous_block_prefix(DestinationWorkList), 2073 case DestinationWorkListTail of 2074 [] -> ok = truncate_and_extend_file( 2075 DestinationHdl, DestinationContiguousTop, TotalValidData); 2076 _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, 2077 {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), 2078 ok = copy_messages( 2079 DestinationWorkListTail, DestinationContiguousTop, 2080 DestinationValid, DestinationHdl, TmpHdl, Destination, 2081 State), 2082 TmpSize = DestinationValid - DestinationContiguousTop, 2083 %% so now Tmp contains everything we need to salvage 2084 %% from Destination, and index_state has been updated to 2085 %% reflect the compaction of Destination so truncate 2086 %% Destination and copy from Tmp back to the end 2087 {ok, 0} = file_handle_cache:position(TmpHdl, 0), 2088 ok = truncate_and_extend_file( 2089 DestinationHdl, DestinationContiguousTop, TotalValidData), 2090 {ok, TmpSize} = 2091 file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), 2092 %% position in DestinationHdl should now be DestinationValid 2093 ok = file_handle_cache:sync(DestinationHdl), 2094 ok = file_handle_cache:delete(TmpHdl) 2095 end, 2096 {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State), 2097 ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData, 2098 SourceHdl, DestinationHdl, Destination, State), 2099 %% tidy up 2100 ok = file_handle_cache:close(DestinationHdl), 2101 ok = file_handle_cache:close(SourceHdl), 2102 2103 %% don't update dest.right, because it could be changing at the 2104 %% same time 2105 true = ets:update_element( 2106 FileSummaryEts, Destination, 2107 [{#file_summary.valid_total_size, TotalValidData}, 2108 {#file_summary.file_size, TotalValidData}]), 2109 2110 Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, 2111 rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes", 2112 [Source, Destination, Reclaimed]), 2113 gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), 2114 safe_file_delete_fun(Source, Dir, FileHandlesEts). 2115 2116-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, [non_neg_integer()]}. 2117 2118delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, 2119 file_handles_ets = FileHandlesEts, 2120 dir = Dir, 2121 msg_store = Server }) -> 2122 case ets:lookup(FileSummaryEts, File) of 2123 [#file_summary { valid_total_size = 0, 2124 locked = true, 2125 file_size = FileSize, 2126 readers = 0 }] -> 2127 {[], 0} = load_and_vacuum_message_file(File, State), 2128 gen_server2:cast(Server, {delete_file, File, FileSize}), 2129 {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)}; 2130 [#file_summary{readers = Readers}] when Readers > 0 -> 2131 rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.", 2132 [File]), 2133 {defer, [File]} 2134 end. 2135 2136load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) -> 2137 %% Messages here will be end-of-file at start-of-list 2138 {ok, Messages, _FileSize} = 2139 scan_file_for_valid_messages(Dir, filenum_to_name(File)), 2140 %% foldl will reverse so will end up with msgs in ascending offset order 2141 lists:foldl( 2142 fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) -> 2143 case index_lookup(MsgId, State) of 2144 #msg_location { file = File, total_size = TotalSize, 2145 offset = Offset, ref_count = 0 } = Entry -> 2146 ok = index_delete_object(Entry, State), 2147 Acc; 2148 #msg_location { file = File, total_size = TotalSize, 2149 offset = Offset } = Entry -> 2150 {[ Entry | List ], TotalSize + Size}; 2151 _ -> 2152 Acc 2153 end 2154 end, {[], 0}, Messages). 2155 2156copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, 2157 Destination, State) -> 2158 Copy = fun ({BlockStart, BlockEnd}) -> 2159 BSize = BlockEnd - BlockStart, 2160 {ok, BlockStart} = 2161 file_handle_cache:position(SourceHdl, BlockStart), 2162 {ok, BSize} = 2163 file_handle_cache:copy(SourceHdl, DestinationHdl, BSize) 2164 end, 2165 case 2166 lists:foldl( 2167 fun (#msg_location { msg_id = MsgId, offset = Offset, 2168 total_size = TotalSize }, 2169 {CurOffset, Block = {BlockStart, BlockEnd}}) -> 2170 %% CurOffset is in the DestinationFile. 2171 %% Offset, BlockStart and BlockEnd are in the SourceFile 2172 %% update MsgLocation to reflect change of file and offset 2173 ok = index_update_fields(MsgId, 2174 [{#msg_location.file, Destination}, 2175 {#msg_location.offset, CurOffset}], 2176 State), 2177 {CurOffset + TotalSize, 2178 case BlockEnd of 2179 undefined -> 2180 %% base case, called only for the first list elem 2181 {Offset, Offset + TotalSize}; 2182 Offset -> 2183 %% extend the current block because the 2184 %% next msg follows straight on 2185 {BlockStart, BlockEnd + TotalSize}; 2186 _ -> 2187 %% found a gap, so actually do the work for 2188 %% the previous block 2189 Copy(Block), 2190 {Offset, Offset + TotalSize} 2191 end} 2192 end, {InitOffset, {undefined, undefined}}, WorkList) of 2193 {FinalOffset, Block} -> 2194 case WorkList of 2195 [] -> ok; 2196 _ -> Copy(Block), %% do the last remaining block 2197 ok = file_handle_cache:sync(DestinationHdl) 2198 end; 2199 {FinalOffsetZ, _Block} -> 2200 {gc_error, [{expected, FinalOffset}, 2201 {got, FinalOffsetZ}, 2202 {destination, Destination}]} 2203 end. 2204 2205-spec force_recovery(file:filename(), server()) -> 'ok'. 2206 2207force_recovery(BaseDir, Store) -> 2208 Dir = filename:join(BaseDir, atom_to_list(Store)), 2209 case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of 2210 ok -> ok; 2211 {error, enoent} -> ok 2212 end, 2213 recover_crashed_compactions(BaseDir), 2214 ok. 2215 2216foreach_file(D, Fun, Files) -> 2217 [ok = Fun(filename:join(D, File)) || File <- Files]. 2218 2219foreach_file(D1, D2, Fun, Files) -> 2220 [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. 2221 2222-spec transform_dir(file:filename(), server(), 2223 fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'. 2224 2225transform_dir(BaseDir, Store, TransformFun) -> 2226 Dir = filename:join(BaseDir, atom_to_list(Store)), 2227 TmpDir = filename:join(Dir, ?TRANSFORM_TMP), 2228 TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end, 2229 CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end, 2230 case filelib:is_dir(TmpDir) of 2231 true -> throw({error, transform_failed_previously}); 2232 false -> FileList = list_sorted_filenames(Dir, ?FILE_EXTENSION), 2233 foreach_file(Dir, TmpDir, TransformFile, FileList), 2234 foreach_file(Dir, fun file:delete/1, FileList), 2235 foreach_file(TmpDir, Dir, CopyFile, FileList), 2236 foreach_file(TmpDir, fun file:delete/1, FileList), 2237 ok = file:del_dir(TmpDir) 2238 end. 2239 2240transform_msg_file(FileOld, FileNew, TransformFun) -> 2241 ok = rabbit_file:ensure_parent_dirs_exist(FileNew), 2242 {ok, RefOld} = file_handle_cache:open_with_absolute_path( 2243 FileOld, [raw, binary, read], []), 2244 {ok, RefNew} = file_handle_cache:open_with_absolute_path( 2245 FileNew, [raw, binary, write], 2246 [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]), 2247 {ok, _Acc, _IgnoreSize} = 2248 rabbit_msg_file:scan( 2249 RefOld, filelib:file_size(FileOld), 2250 fun({MsgId, _Size, _Offset, BinMsg}, ok) -> 2251 {ok, MsgNew} = case binary_to_term(BinMsg) of 2252 <<>> -> {ok, <<>>}; %% dying client marker 2253 Msg -> TransformFun(Msg) 2254 end, 2255 {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), 2256 ok 2257 end, ok), 2258 ok = file_handle_cache:close(RefOld), 2259 ok = file_handle_cache:close(RefNew), 2260 ok. 2261