1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_msg_store).
9
10-behaviour(gen_server2).
11
12-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1,
13         client_init/4, client_terminate/1, client_delete_and_terminate/1,
14         client_ref/1, close_all_indicated/1,
15         write/3, write_flow/3, read/2, contains/2, remove/2]).
16
17-export([set_maximum_since_use/2, combine_files/3,
18         delete_file/2]). %% internal
19
20-export([scan_file_for_valid_messages/1]). %% salvage tool
21
22-export([transform_dir/3, force_recovery/2]). %% upgrade
23
24-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
25         code_change/3, prioritise_call/4, prioritise_cast/3,
26         prioritise_info/3, format_message_queue/2]).
27
28%%----------------------------------------------------------------------------
29
30-include_lib("rabbit_common/include/rabbit_msg_store.hrl").
31
32-define(SYNC_INTERVAL,  25).   %% milliseconds
33-define(CLEAN_FILENAME, "clean.dot").
34-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
35-define(TRANSFORM_TMP, "transform_tmp").
36
37-define(BINARY_MODE,     [raw, binary]).
38-define(READ_MODE,       [read]).
39-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
40-define(WRITE_MODE,      [write]).
41
42-define(FILE_EXTENSION,        ".rdq").
43-define(FILE_EXTENSION_TMP,    ".rdt").
44
45-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
46
47 %% i.e. two pairs, so GC does not go idle when busy
48-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4).
49
50%%----------------------------------------------------------------------------
51
52-record(msstate,
53        {
54          %% store directory
55          dir,
56          %% the module for index ops,
57          %% rabbit_msg_store_ets_index by default
58          index_module,
59          %% where are messages?
60          index_state,
61          %% current file name as number
62          current_file,
63          %% current file handle since the last fsync?
64          current_file_handle,
65          %% file handle cache
66          file_handle_cache,
67          %% TRef for our interval timer
68          sync_timer_ref,
69          %% sum of valid data in all files
70          sum_valid_data,
71          %% sum of file sizes
72          sum_file_size,
73          %% things to do once GC completes
74          pending_gc_completion,
75          %% pid of our GC
76          gc_pid,
77          %% tid of the shared file handles table
78          file_handles_ets,
79          %% tid of the file summary table
80          file_summary_ets,
81          %% tid of current file cache table
82          cur_file_cache_ets,
83          %% tid of writes/removes in flight
84          flying_ets,
85          %% set of dying clients
86          dying_clients,
87          %% map of references of all registered clients
88          %% to callbacks
89          clients,
90          %% boolean: did we recover state?
91          successfully_recovered,
92          %% how big are our files allowed to get?
93          file_size_limit,
94          %% client ref to synced messages mapping
95          cref_to_msg_ids,
96          %% See CREDIT_DISC_BOUND in rabbit.hrl
97          credit_disc_bound
98        }).
99
100-record(client_msstate,
101        { server,
102          client_ref,
103          file_handle_cache,
104          index_state,
105          index_module,
106          dir,
107          gc_pid,
108          file_handles_ets,
109          file_summary_ets,
110          cur_file_cache_ets,
111          flying_ets,
112          credit_disc_bound
113        }).
114
115-record(file_summary,
116        {file, valid_total_size, left, right, file_size, locked, readers}).
117
118-record(gc_state,
119        { dir,
120          index_module,
121          index_state,
122          file_summary_ets,
123          file_handles_ets,
124          msg_store
125        }).
126
127-record(dying_client,
128        { client_ref,
129          file,
130          offset
131        }).
132
133%%----------------------------------------------------------------------------
134
135-export_type([gc_state/0, file_num/0]).
136
137-type gc_state() :: #gc_state { dir              :: file:filename(),
138                                index_module     :: atom(),
139                                index_state      :: any(),
140                                file_summary_ets :: ets:tid(),
141                                file_handles_ets :: ets:tid(),
142                                msg_store        :: server()
143                              }.
144
145-type server() :: pid() | atom().
146-type client_ref() :: binary().
147-type file_num() :: non_neg_integer().
148-type client_msstate() :: #client_msstate {
149                      server             :: server(),
150                      client_ref         :: client_ref(),
151                      file_handle_cache  :: map(),
152                      index_state        :: any(),
153                      index_module       :: atom(),
154                      dir                :: file:filename(),
155                      gc_pid             :: pid(),
156                      file_handles_ets   :: ets:tid(),
157                      file_summary_ets   :: ets:tid(),
158                      cur_file_cache_ets :: ets:tid(),
159                      flying_ets         :: ets:tid(),
160                      credit_disc_bound  :: {pos_integer(), pos_integer()}}.
161-type msg_ref_delta_gen(A) ::
162        fun ((A) -> 'finished' |
163                    {rabbit_types:msg_id(), non_neg_integer(), A}).
164-type maybe_msg_id_fun() ::
165        'undefined' | fun ((gb_sets:set(), 'written' | 'ignored') -> any()).
166-type maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok').
167-type deletion_thunk() :: fun (() -> boolean()).
168
169%%----------------------------------------------------------------------------
170
171%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
172%% It is not recommended to set this to < 0.5
173-define(GARBAGE_FRACTION,      0.5).
174
175%% Message store is responsible for storing messages
176%% on disk and loading them back. The store handles both
177%% persistent messages and transient ones (when a node
178%% is under RAM pressure and needs to page messages out
179%% to disk). The store is responsible for locating messages
180%% on disk and maintaining an index.
181%%
182%% There are two message stores per node: one for transient
183%% and one for persistent messages.
184%%
185%% Queue processes interact with the stores via clients.
186%%
187%% The components:
188%%
189%% Index: this is a mapping from MsgId to #msg_location{}.
190%%        By default, it's in ETS, but other implementations can
191%%        be used.
192%% FileSummary: this maps File to #file_summary{} and is stored
193%%              in ETS.
194%%
195%% The basic idea is that messages are appended to the current file up
196%% until that file becomes too big (> file_size_limit). At that point,
197%% the file is closed and a new file is created on the _right_ of the
198%% old file which is used for new messages. Files are named
199%% numerically ascending, thus the file with the lowest name is the
200%% eldest file.
201%%
202%% We need to keep track of which messages are in which files (this is
203%% the index); how much useful data is in each file and which files
204%% are on the left and right of each other. This is the purpose of the
205%% file summary ETS table.
206%%
207%% As messages are removed from files, holes appear in these
208%% files. The field ValidTotalSize contains the total amount of useful
209%% data left in the file. This is needed for garbage collection.
210%%
211%% When we discover that a file is now empty, we delete it. When we
212%% discover that it can be combined with the useful data in either its
213%% left or right neighbour, and overall, across all the files, we have
214%% ((the amount of garbage) / (the sum of all file sizes)) >
215%% ?GARBAGE_FRACTION, we start a garbage collection run concurrently,
216%% which will compact the two files together. This keeps disk
217%% utilisation high and aids performance. We deliberately do this
218%% lazily in order to prevent doing GC on files which are soon to be
219%% emptied (and hence deleted).
220%%
221%% Given the compaction between two files, the left file (i.e. elder
222%% file) is considered the ultimate destination for the good data in
223%% the right file. If necessary, the good data in the left file which
224%% is fragmented throughout the file is written out to a temporary
225%% file, then read back in to form a contiguous chunk of good data at
226%% the start of the left file. Thus the left file is garbage collected
227%% and compacted. Then the good data from the right file is copied
228%% onto the end of the left file. Index and file summary tables are
229%% updated.
230%%
231%% On non-clean startup, we scan the files we discover, dealing with
232%% the possibilities of a crash having occurred during a compaction
233%% (this consists of tidyup - the compaction is deliberately designed
234%% such that data is duplicated on disk rather than risking it being
235%% lost), and rebuild the file summary and index ETS table.
236%%
237%% So, with this design, messages move to the left. Eventually, they
238%% should end up in a contiguous block on the left and are then never
239%% rewritten. But this isn't quite the case. If in a file there is one
240%% message that is being ignored, for some reason, and messages in the
241%% file to the right and in the current block are being read all the
242%% time then it will repeatedly be the case that the good data from
243%% both files can be combined and will be written out to a new
244%% file. Whenever this happens, our shunned message will be rewritten.
245%%
246%% So, provided that we combine messages in the right order,
247%% (i.e. left file, bottom to top, right file, bottom to top),
248%% eventually our shunned message will end up at the bottom of the
249%% left file. The compaction/combining algorithm is smart enough to
250%% read in good data from the left file that is scattered throughout
251%% (i.e. C and D in the below diagram), then truncate the file to just
252%% above B (i.e. truncate to the limit of the good contiguous region
253%% at the start of the file), then write C and D on top and then write
254%% E, F and G from the right file on top. Thus contiguous blocks of
255%% good data at the bottom of files are not rewritten.
256%%
257%% +-------+    +-------+         +-------+
258%% |   X   |    |   G   |         |   G   |
259%% +-------+    +-------+         +-------+
260%% |   D   |    |   X   |         |   F   |
261%% +-------+    +-------+         +-------+
262%% |   X   |    |   X   |         |   E   |
263%% +-------+    +-------+         +-------+
264%% |   C   |    |   F   |   ===>  |   D   |
265%% +-------+    +-------+         +-------+
266%% |   X   |    |   X   |         |   C   |
267%% +-------+    +-------+         +-------+
268%% |   B   |    |   X   |         |   B   |
269%% +-------+    +-------+         +-------+
270%% |   A   |    |   E   |         |   A   |
271%% +-------+    +-------+         +-------+
272%%   left         right             left
273%%
274%% From this reasoning, we do have a bound on the number of times the
275%% message is rewritten. From when it is inserted, there can be no
276%% files inserted between it and the head of the queue, and the worst
277%% case is that every time it is rewritten, it moves one position lower
278%% in the file (for it to stay at the same position requires that
279%% there are no holes beneath it, which means truncate would be used
280%% and so it would not be rewritten at all). Thus this seems to
281%% suggest the limit is the number of messages ahead of it in the
282%% queue, though it's likely that that's pessimistic, given the
283%% requirements for compaction/combination of files.
284%%
285%% The other property that we have is the bound on the lowest
286%% utilisation, which should be 50% - worst case is that all files are
287%% fractionally over half full and can't be combined (equivalent is
288%% alternating full files and files with only one tiny message in
289%% them).
290%%
291%% Messages are reference-counted. When a message with the same msg id
292%% is written several times we only store it once, and only remove it
293%% from the store when it has been removed the same number of times.
294%%
295%% The reference counts do not persist. Therefore the initialisation
296%% function must be provided with a generator that produces ref count
297%% deltas for all recovered messages. This is only used on startup
298%% when the shutdown was non-clean.
299%%
300%% Read messages with a reference count greater than one are entered
301%% into a message cache. The purpose of the cache is not especially
302%% performance, though it can help there too, but prevention of memory
303%% explosion. It ensures that as messages with a high reference count
304%% are read from several processes they are read back as the same
305%% binary object rather than multiples of identical binary
306%% objects.
307%%
308%% Reads can be performed directly by clients without calling to the
309%% server. This is safe because multiple file handles can be used to
310%% read files. However, locking is used by the concurrent GC to make
311%% sure that reads are not attempted from files which are in the
312%% process of being garbage collected.
313%%
314%% When a message is removed, its reference count is decremented. Even
315%% if the reference count becomes 0, its entry is not removed. This is
316%% because in the event of the same message being sent to several
317%% different queues, there is the possibility of one queue writing and
318%% removing the message before other queues write it at all. Thus
319%% accommodating 0-reference counts allows us to avoid unnecessary
320%% writes here. Of course, there are complications: the file to which
321%% the message has already been written could be locked pending
322%% deletion or GC, which means we have to rewrite the message as the
323%% original copy will now be lost.
324%%
325%% The server automatically defers reads, removes and contains calls
326%% that occur which refer to files which are currently being
327%% GC'd. Contains calls are only deferred in order to ensure they do
328%% not overtake removes.
329%%
330%% The current file to which messages are being written has a
331%% write-back cache. This is written to immediately by clients and can
332%% be read from by clients too. This means that there are only ever
333%% writes made to the current file, thus eliminating delays due to
334%% flushing write buffers in order to be able to safely read from the
335%% current file. The one exception to this is that on start up, the
336%% cache is not populated with msgs found in the current file, and
337%% thus in this case only, reads may have to come from the file
338%% itself. The effect of this is that even if the msg_store process is
339%% heavily overloaded, clients can still write and read messages with
340%% very low latency and not block at all.
341%%
342%% Clients of the msg_store are required to register before using the
343%% msg_store. This provides them with the necessary client-side state
344%% to allow them to directly access the various caches and files. When
345%% they terminate, they should deregister. They can do this by calling
346%% either client_terminate/1 or client_delete_and_terminate/1. The
347%% differences are: (a) client_terminate is synchronous. As a result,
348%% if the msg_store is badly overloaded and has lots of in-flight
349%% writes and removes to process, this will take some time to
350%% return. However, once it does return, you can be sure that all the
351%% actions you've issued to the msg_store have been processed. (b) Not
352%% only is client_delete_and_terminate/1 asynchronous, but it also
353%% permits writes and subsequent removes from the current
354%% (terminating) client which are still in flight to be safely
355%% ignored. Thus from the point of view of the msg_store itself, and
356%% all from the same client:
357%%
358%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
359%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
360%%
361%% The client obviously sent T after all the other messages (up to
362%% W4), but because the msg_store prioritises messages, the T can be
363%% promoted and thus received early.
364%%
365%% Thus at the point of the msg_store receiving T, we have messages 1
366%% and 2 with a refcount of 1. After T, W3 will be ignored because
367%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
368%% ignored because the messages that they refer to were already known
369%% to the msg_store prior to T. However, it can be a little more
370%% complex: after the first R2, the refcount of msg 2 is 0. At that
371%% point, if a GC occurs or file deletion, msg 2 could vanish, which
372%% would then mean that the subsequent W2 and R2 are then ignored.
373%%
374%% The use case then for client_delete_and_terminate/1 is if the
375%% client wishes to remove everything it's written to the msg_store:
376%% it issues removes for all messages it's written and not removed,
377%% and then calls client_delete_and_terminate/1. At that point, any
378%% in-flight writes (and subsequent removes) can be ignored, but
379%% removes and writes for messages the msg_store already knows about
380%% will continue to be processed normally (which will normally just
381%% involve modifying the reference count, which is fast). Thus we save
382%% disk bandwidth for writes which are going to be immediately removed
383%% again by the the terminating client.
384%%
385%% We use a separate set to keep track of the dying clients in order
386%% to keep that set, which is inspected on every write and remove, as
387%% small as possible. Inspecting the set of all clients would degrade
388%% performance with many healthy clients and few, if any, dying
389%% clients, which is the typical case.
390%%
391%% Client termination messages are stored in a separate ets index to
392%% avoid filling primary message store index and message files with
393%% client termination messages.
394%%
395%% When the msg_store has a backlog (i.e. it has unprocessed messages
396%% in its mailbox / gen_server priority queue), a further optimisation
397%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
398%% from the same client for the same message. A typical occurrence of
399%% these is when an empty durable queue delivers persistent messages
400%% to ack'ing consumers. The queue will asynchronously ask the
401%% msg_store to 'write' such messages, and when they are acknowledged
402%% it will issue a 'remove'. That 'remove' may be issued before the
403%% msg_store has processed the 'write'. There is then no point going
404%% ahead with the processing of that 'write'.
405%%
406%% To detect this situation a 'flying_ets' table is shared between the
407%% clients and the server. The table is keyed on the combination of
408%% client (reference) and msg id, and the value represents an
409%% integration of all the writes and removes currently "in flight" for
410%% that message between the client and server - '+1' means all the
411%% writes/removes add up to a single 'write', '-1' to a 'remove', and
412%% '0' to nothing. (NB: the integration can never add up to more than
413%% one 'write' or 'read' since clients must not write/remove a message
414%% more than once without first removing/writing it).
415%%
416%% Maintaining this table poses two challenges: 1) both the clients
417%% and the server access and update the table, which causes
418%% concurrency issues, 2) we must ensure that entries do not stay in
419%% the table forever, since that would constitute a memory leak. We
420%% address the former by carefully modelling all operations as
421%% sequences of atomic actions that produce valid results in all
422%% possible interleavings. We address the latter by deleting table
423%% entries whenever the server finds a 0-valued entry during the
424%% processing of a write/remove. 0 is essentially equivalent to "no
425%% entry". If, OTOH, the value is non-zero we know there is at least
426%% one other 'write' or 'remove' in flight, so we get an opportunity
427%% later to delete the table entry when processing these.
428%%
429%% There are two further complications. We need to ensure that 1)
430%% eliminated writes still get confirmed, and 2) the write-back cache
431%% doesn't grow unbounded. These are quite straightforward to
432%% address. See the comments in the code.
433%%
434%% For notes on Clean Shutdown and startup, see documentation in
435%% rabbit_variable_queue.
436
437%%----------------------------------------------------------------------------
438%% public API
439%%----------------------------------------------------------------------------
440
441-spec start_link
442        (atom(), file:filename(), [binary()] | 'undefined',
443         {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error().
444
445start_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
446    gen_server2:start_link(?MODULE,
447                           [Type, Dir, ClientRefs, StartupFunState],
448                           [{timeout, infinity}]).
449
450start_global_store_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
451    gen_server2:start_link({local, Type}, ?MODULE,
452                           [Type, Dir, ClientRefs, StartupFunState],
453                           [{timeout, infinity}]).
454
455-spec successfully_recovered_state(server()) -> boolean().
456
457successfully_recovered_state(Server) ->
458    gen_server2:call(Server, successfully_recovered_state, infinity).
459
460-spec client_init(server(), client_ref(), maybe_msg_id_fun(),
461                        maybe_close_fds_fun()) -> client_msstate().
462
463client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) ->
464    {IState, IModule, Dir, GCPid,
465     FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
466        gen_server2:call(
467          Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
468          infinity),
469    CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
470                                          ?CREDIT_DISC_BOUND),
471    #client_msstate { server             = Server,
472                      client_ref         = Ref,
473                      file_handle_cache  = #{},
474                      index_state        = IState,
475                      index_module       = IModule,
476                      dir                = Dir,
477                      gc_pid             = GCPid,
478                      file_handles_ets   = FileHandlesEts,
479                      file_summary_ets   = FileSummaryEts,
480                      cur_file_cache_ets = CurFileCacheEts,
481                      flying_ets         = FlyingEts,
482                      credit_disc_bound  = CreditDiscBound }.
483
484-spec client_terminate(client_msstate()) -> 'ok'.
485
486client_terminate(CState = #client_msstate { client_ref = Ref }) ->
487    close_all_handles(CState),
488    ok = server_call(CState, {client_terminate, Ref}).
489
490-spec client_delete_and_terminate(client_msstate()) -> 'ok'.
491
492client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
493    close_all_handles(CState),
494    ok = server_cast(CState, {client_dying, Ref}),
495    ok = server_cast(CState, {client_delete, Ref}).
496
497-spec client_ref(client_msstate()) -> client_ref().
498
499client_ref(#client_msstate { client_ref = Ref }) -> Ref.
500
501-spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
502
503write_flow(MsgId, Msg,
504           CState = #client_msstate {
505                       server = Server,
506                       credit_disc_bound = CreditDiscBound }) ->
507    %% Here we are tracking messages sent by the
508    %% rabbit_amqqueue_process process via the
509    %% rabbit_variable_queue. We are accessing the
510    %% rabbit_amqqueue_process process dictionary.
511    credit_flow:send(Server, CreditDiscBound),
512    client_write(MsgId, Msg, flow, CState).
513
514-spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
515
516write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
517
518-spec read(rabbit_types:msg_id(), client_msstate()) ->
519                     {rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
520
521read(MsgId,
522     CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
523    file_handle_cache_stats:update(msg_store_read),
524    %% Check the cur file cache
525    case ets:lookup(CurFileCacheEts, MsgId) of
526        [] ->
527            Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
528            case index_lookup_positive_ref_count(MsgId, CState) of
529                not_found   -> Defer();
530                MsgLocation -> client_read1(MsgLocation, Defer, CState)
531            end;
532        [{MsgId, Msg, _CacheRefCount}] ->
533            {{ok, Msg}, CState}
534    end.
535
536-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().
537
538contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
539
540-spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'.
541
542remove([],    _CState) -> ok;
543remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
544    [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
545    server_cast(CState, {remove, CRef, MsgIds}).
546
547-spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'.
548
549set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) ->
550    gen_server2:cast(Server, {set_maximum_since_use, Age}).
551
552%%----------------------------------------------------------------------------
553%% Client-side-only helpers
554%%----------------------------------------------------------------------------
555
556server_call(#client_msstate { server = Server }, Msg) ->
557    gen_server2:call(Server, Msg, infinity).
558
559server_cast(#client_msstate { server = Server }, Msg) ->
560    gen_server2:cast(Server, Msg).
561
562client_write(MsgId, Msg, Flow,
563             CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
564                                        client_ref         = CRef }) ->
565    file_handle_cache_stats:update(msg_store_write),
566    ok = client_update_flying(+1, MsgId, CState),
567    ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
568    ok = server_cast(CState, {write, CRef, MsgId, Flow}).
569
570client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
571             CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
572    case ets:lookup(FileSummaryEts, File) of
573        [] -> %% File has been GC'd and no longer exists. Go around again.
574            read(MsgId, CState);
575        [#file_summary { locked = Locked, right = Right }] ->
576            client_read2(Locked, Right, MsgLocation, Defer, CState)
577    end.
578
579client_read2(false, undefined, _MsgLocation, Defer, _CState) ->
580    %% Although we've already checked both caches and not found the
581    %% message there, the message is apparently in the
582    %% current_file. We can only arrive here if we are trying to read
583    %% a message which we have not written, which is very odd, so just
584    %% defer.
585    %%
586    %% OR, on startup, the cur_file_cache is not populated with the
587    %% contents of the current file, thus reads from the current file
588    %% will end up here and will need to be deferred.
589    Defer();
590client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
591    %% Of course, in the mean time, the GC could have run and our msg
592    %% is actually in a different file, unlocked. However, deferring is
593    %% the safest and simplest thing to do.
594    Defer();
595client_read2(false, _Right,
596             MsgLocation = #msg_location { msg_id = MsgId, file = File },
597             Defer,
598             CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
599    %% It's entirely possible that everything we're doing from here on
600    %% is for the wrong file, or a non-existent file, as a GC may have
601    %% finished.
602    safe_ets_update_counter(
603      FileSummaryEts, File, {#file_summary.readers, +1},
604      fun (_) -> client_read3(MsgLocation, Defer, CState) end,
605      fun () -> read(MsgId, CState) end).
606
607client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
608             CState = #client_msstate { file_handles_ets = FileHandlesEts,
609                                        file_summary_ets = FileSummaryEts,
610                                        gc_pid           = GCPid,
611                                        client_ref       = Ref }) ->
612    Release =
613        fun() -> ok = case ets:update_counter(FileSummaryEts, File,
614                                              {#file_summary.readers, -1}) of
615                          0 -> case ets:lookup(FileSummaryEts, File) of
616                                   [#file_summary { locked = true }] ->
617                                       rabbit_msg_store_gc:no_readers(
618                                         GCPid, File);
619                                   _ -> ok
620                               end;
621                          _ -> ok
622                      end
623        end,
624    %% If a GC involving the file hasn't already started, it won't
625    %% start now. Need to check again to see if we've been locked in
626    %% the meantime, between lookup and update_counter (thus GC
627    %% started before our +1. In fact, it could have finished by now
628    %% too).
629    case ets:lookup(FileSummaryEts, File) of
630        [] -> %% GC has deleted our file, just go round again.
631            read(MsgId, CState);
632        [#file_summary { locked = true }] ->
633            %% If we get a badarg here, then the GC has finished and
634            %% deleted our file. Try going around again. Otherwise,
635            %% just defer.
636            %%
637            %% badarg scenario: we lookup, msg_store locks, GC starts,
638            %% GC ends, we +1 readers, msg_store ets:deletes (and
639            %% unlocks the dest)
640            try Release(),
641                 Defer()
642            catch error:badarg -> read(MsgId, CState)
643            end;
644        [#file_summary { locked = false }] ->
645            %% Ok, we're definitely safe to continue - a GC involving
646            %% the file cannot start up now, and isn't running, so
647            %% nothing will tell us from now on to close the handle if
648            %% it's already open.
649            %%
650            %% Finally, we need to recheck that the msg is still at
651            %% the same place - it's possible an entire GC ran between
652            %% us doing the lookup and the +1 on the readers. (Same as
653            %% badarg scenario above, but we don't have a missing file
654            %% - we just have the /wrong/ file).
655            case index_lookup(MsgId, CState) of
656                #msg_location { file = File } = MsgLocation ->
657                    %% Still the same file.
658                    {ok, CState1} = close_all_indicated(CState),
659                    %% We are now guaranteed that the mark_handle_open
660                    %% call will either insert_new correctly, or will
661                    %% fail, but find the value is open, not close.
662                    mark_handle_open(FileHandlesEts, File, Ref),
663                    %% Could the msg_store now mark the file to be
664                    %% closed? No: marks for closing are issued only
665                    %% when the msg_store has locked the file.
666                    %% This will never be the current file
667                    {Msg, CState2} = read_from_disk(MsgLocation, CState1),
668                    Release(), %% this MUST NOT fail with badarg
669                    {{ok, Msg}, CState2};
670                #msg_location {} = MsgLocation -> %% different file!
671                    Release(), %% this MUST NOT fail with badarg
672                    client_read1(MsgLocation, Defer, CState);
673                not_found -> %% it seems not to exist. Defer, just to be sure.
674                    try Release() %% this can badarg, same as locked case, above
675                    catch error:badarg -> ok
676                    end,
677                    Defer()
678            end
679    end.
680
681client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
682                                                    client_ref = CRef }) ->
683    Key = {MsgId, CRef},
684    case ets:insert_new(FlyingEts, {Key, Diff}) of
685        true  -> ok;
686        false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of
687                     0    -> ok;
688                     Diff -> ok;
689                     Err when Err >= 2 ->
690                         %% The message must be referenced twice in the queue
691                         %% index. There is a bug somewhere, but we don't want
692                         %% to take down anything just because of this. Let's
693                         %% process the message as if the copies were in
694                         %% different queues (fan-out).
695                         ok;
696                     Err  -> throw({bad_flying_ets_update, Diff, Err, Key})
697                 catch error:badarg ->
698                         %% this is guaranteed to succeed since the
699                         %% server only removes and updates flying_ets
700                         %% entries; it never inserts them
701                         true = ets:insert_new(FlyingEts, {Key, Diff})
702                 end,
703                 ok
704    end.
705
706clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
707                                      dying_clients = DyingClients }) ->
708    State #msstate { cref_to_msg_ids = maps:remove(CRef, CTM),
709                     dying_clients = maps:remove(CRef, DyingClients) }.
710
711
712%%----------------------------------------------------------------------------
713%% gen_server callbacks
714%%----------------------------------------------------------------------------
715
716
717init([Type, BaseDir, ClientRefs, StartupFunState]) ->
718    process_flag(trap_exit, true),
719
720    ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
721                                             [self()]),
722
723    Dir = filename:join(BaseDir, atom_to_list(Type)),
724    Name = filename:join(filename:basename(BaseDir), atom_to_list(Type)),
725
726    {ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
727    rabbit_log:info("Message store ~tp: using ~p to provide index", [Name, IndexModule]),
728
729    AttemptFileSummaryRecovery =
730        case ClientRefs of
731            undefined -> ok = rabbit_file:recursive_delete([Dir]),
732                         ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
733                         false;
734            _         -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
735                         recover_crashed_compactions(Dir)
736        end,
737    %% if we found crashed compactions we trust neither the
738    %% file_summary nor the location index. Note the file_summary is
739    %% left empty here if it can't be recovered.
740    {FileSummaryRecovered, FileSummaryEts} =
741        recover_file_summary(AttemptFileSummaryRecovery, Dir),
742    {CleanShutdown, IndexState, ClientRefs1} =
743        recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
744                                      ClientRefs, Dir, Name),
745    Clients = maps:from_list(
746                [{CRef, {undefined, undefined, undefined}} ||
747                    CRef <- ClientRefs1]),
748    %% CleanShutdown => msg location index and file_summary both
749    %% recovered correctly.
750    true = case {FileSummaryRecovered, CleanShutdown} of
751               {true, false} -> ets:delete_all_objects(FileSummaryEts);
752               _             -> true
753           end,
754    %% CleanShutdown <=> msg location index and file_summary both
755    %% recovered correctly.
756
757    FileHandlesEts  = ets:new(rabbit_msg_store_shared_file_handles,
758                              [ordered_set, public]),
759    CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
760    FlyingEts       = ets:new(rabbit_msg_store_flying, [set, public]),
761
762    {ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit),
763
764    {ok, GCPid} = rabbit_msg_store_gc:start_link(
765                    #gc_state { dir              = Dir,
766                                index_module     = IndexModule,
767                                index_state      = IndexState,
768                                file_summary_ets = FileSummaryEts,
769                                file_handles_ets = FileHandlesEts,
770                                msg_store        = self()
771                              }),
772
773    CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
774                                          ?CREDIT_DISC_BOUND),
775
776    State = #msstate { dir                    = Dir,
777                       index_module           = IndexModule,
778                       index_state            = IndexState,
779                       current_file           = 0,
780                       current_file_handle    = undefined,
781                       file_handle_cache      = #{},
782                       sync_timer_ref         = undefined,
783                       sum_valid_data         = 0,
784                       sum_file_size          = 0,
785                       pending_gc_completion  = maps:new(),
786                       gc_pid                 = GCPid,
787                       file_handles_ets       = FileHandlesEts,
788                       file_summary_ets       = FileSummaryEts,
789                       cur_file_cache_ets     = CurFileCacheEts,
790                       flying_ets             = FlyingEts,
791                       dying_clients          = #{},
792                       clients                = Clients,
793                       successfully_recovered = CleanShutdown,
794                       file_size_limit        = FileSizeLimit,
795                       cref_to_msg_ids        = #{},
796                       credit_disc_bound      = CreditDiscBound
797                     },
798    %% If we didn't recover the msg location index then we need to
799    %% rebuild it now.
800    Cleanliness = case CleanShutdown of
801                      true -> "clean";
802                      false -> "unclean"
803                  end,
804    rabbit_log:debug("Rebuilding message location index after ~s shutdown...",
805                     [Cleanliness]),
806    {Offset, State1 = #msstate { current_file = CurFile }} =
807        build_index(CleanShutdown, StartupFunState, State),
808    rabbit_log:debug("Finished rebuilding index", []),
809    %% read is only needed so that we can seek
810    {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
811                             [read | ?WRITE_MODE]),
812    {ok, Offset} = file_handle_cache:position(CurHdl, Offset),
813    ok = file_handle_cache:truncate(CurHdl),
814
815    {ok, maybe_compact(State1 #msstate { current_file_handle = CurHdl }),
816     hibernate,
817     {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
818
819prioritise_call(Msg, _From, _Len, _State) ->
820    case Msg of
821        successfully_recovered_state                        -> 7;
822        {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
823        {read, _MsgId}                                      -> 2;
824        _                                                   -> 0
825    end.
826
827prioritise_cast(Msg, _Len, _State) ->
828    case Msg of
829        {combine_files, _Source, _Destination, _Reclaimed} -> 8;
830        {delete_file, _File, _Reclaimed}                   -> 8;
831        {set_maximum_since_use, _Age}                      -> 8;
832        {client_dying, _Pid}                               -> 7;
833        _                                                  -> 0
834    end.
835
836prioritise_info(Msg, _Len, _State) ->
837    case Msg of
838        sync                                               -> 8;
839        _                                                  -> 0
840    end.
841
842handle_call(successfully_recovered_state, _From, State) ->
843    reply(State #msstate.successfully_recovered, State);
844
845handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
846            State = #msstate { dir                = Dir,
847                               index_state        = IndexState,
848                               index_module       = IndexModule,
849                               file_handles_ets   = FileHandlesEts,
850                               file_summary_ets   = FileSummaryEts,
851                               cur_file_cache_ets = CurFileCacheEts,
852                               flying_ets         = FlyingEts,
853                               clients            = Clients,
854                               gc_pid             = GCPid }) ->
855    Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
856    erlang:monitor(process, CPid),
857    reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
858           CurFileCacheEts, FlyingEts},
859          State #msstate { clients = Clients1 });
860
861handle_call({client_terminate, CRef}, _From, State) ->
862    reply(ok, clear_client(CRef, State));
863
864handle_call({read, MsgId}, From, State) ->
865    State1 = read_message(MsgId, From, State),
866    noreply(State1);
867
868handle_call({contains, MsgId}, From, State) ->
869    State1 = contains_message(MsgId, From, State),
870    noreply(State1).
871
872handle_cast({client_dying, CRef},
873            State = #msstate { dying_clients       = DyingClients,
874                               current_file_handle = CurHdl,
875                               current_file        = CurFile }) ->
876    {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
877    DyingClients1 = maps:put(CRef,
878                             #dying_client{client_ref = CRef,
879                                           file = CurFile,
880                                           offset = CurOffset},
881                             DyingClients),
882    noreply(State #msstate { dying_clients = DyingClients1 });
883
884handle_cast({client_delete, CRef},
885            State = #msstate { clients = Clients }) ->
886    State1 = State #msstate { clients = maps:remove(CRef, Clients) },
887    noreply(clear_client(CRef, State1));
888
889handle_cast({write, CRef, MsgId, Flow},
890            State = #msstate { cur_file_cache_ets = CurFileCacheEts,
891                               clients            = Clients,
892                               credit_disc_bound  = CreditDiscBound }) ->
893    case Flow of
894        flow   -> {CPid, _, _} = maps:get(CRef, Clients),
895                  %% We are going to process a message sent by the
896                  %% rabbit_amqqueue_process. Now we are accessing the
897                  %% msg_store process dictionary.
898                  credit_flow:ack(CPid, CreditDiscBound);
899        noflow -> ok
900    end,
901    true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
902    case update_flying(-1, MsgId, CRef, State) of
903        process ->
904            [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
905            noreply(write_message(MsgId, Msg, CRef, State));
906        ignore ->
907            %% A 'remove' has already been issued and eliminated the
908            %% 'write'.
909            State1 = blind_confirm(CRef, gb_sets:singleton(MsgId),
910                                   ignored, State),
911            %% If all writes get eliminated, cur_file_cache_ets could
912            %% grow unbounded. To prevent that we delete the cache
913            %% entry here, but only if the message isn't in the
914            %% current file. That way reads of the message can
915            %% continue to be done client side, from either the cache
916            %% or the non-current files. If the message *is* in the
917            %% current file then the cache entry will be removed by
918            %% the normal logic for that in write_message/4 and
919            %% maybe_roll_to_new_file/2.
920            case index_lookup(MsgId, State1) of
921                [#msg_location { file = File }]
922                  when File == State1 #msstate.current_file ->
923                    ok;
924                _ ->
925                    true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0})
926            end,
927            noreply(State1)
928    end;
929
930handle_cast({remove, CRef, MsgIds}, State) ->
931    {RemovedMsgIds, State1} =
932        lists:foldl(
933          fun (MsgId, {Removed, State2}) ->
934                  case update_flying(+1, MsgId, CRef, State2) of
935                      process -> {[MsgId | Removed],
936                                  remove_message(MsgId, CRef, State2)};
937                      ignore  -> {Removed, State2}
938                  end
939          end, {[], State}, MsgIds),
940    noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds),
941                                         ignored, State1)));
942
943handle_cast({combine_files, Source, Destination, Reclaimed},
944            State = #msstate { sum_file_size    = SumFileSize,
945                               file_handles_ets = FileHandlesEts,
946                               file_summary_ets = FileSummaryEts,
947                               clients          = Clients }) ->
948    ok = cleanup_after_file_deletion(Source, State),
949    %% see comment in cleanup_after_file_deletion, and client_read3
950    true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false),
951    true = ets:update_element(FileSummaryEts, Destination,
952                              {#file_summary.locked, false}),
953    State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
954    noreply(maybe_compact(run_pending([Source, Destination], State1)));
955
956handle_cast({delete_file, File, Reclaimed},
957            State = #msstate { sum_file_size = SumFileSize }) ->
958    ok = cleanup_after_file_deletion(File, State),
959    State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
960    noreply(maybe_compact(run_pending([File], State1)));
961
962handle_cast({set_maximum_since_use, Age}, State) ->
963    ok = file_handle_cache:set_maximum_since_use(Age),
964    noreply(State).
965
966handle_info(sync, State) ->
967    noreply(internal_sync(State));
968
969handle_info(timeout, State) ->
970    noreply(internal_sync(State));
971
972handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
973    %% similar to what happens in
974    %% rabbit_amqqueue_process:handle_ch_down but with a relation of
975    %% msg_store -> rabbit_amqqueue_process instead of
976    %% rabbit_amqqueue_process -> rabbit_channel.
977    credit_flow:peer_down(Pid),
978    noreply(State);
979
980handle_info({'EXIT', _Pid, Reason}, State) ->
981    {stop, Reason, State}.
982
983terminate(_Reason, State = #msstate { index_state         = IndexState,
984                                      index_module        = IndexModule,
985                                      current_file_handle = CurHdl,
986                                      gc_pid              = GCPid,
987                                      file_handles_ets    = FileHandlesEts,
988                                      file_summary_ets    = FileSummaryEts,
989                                      cur_file_cache_ets  = CurFileCacheEts,
990                                      flying_ets          = FlyingEts,
991                                      clients             = Clients,
992                                      dir                 = Dir }) ->
993    rabbit_log:info("Stopping message store for directory '~s'", [Dir]),
994    %% stop the gc first, otherwise it could be working and we pull
995    %% out the ets tables from under it.
996    ok = rabbit_msg_store_gc:stop(GCPid),
997    State1 = case CurHdl of
998                 undefined -> State;
999                 _         -> State2 = internal_sync(State),
1000                              ok = file_handle_cache:close(CurHdl),
1001                              State2
1002             end,
1003    State3 = close_all_handles(State1),
1004    case store_file_summary(FileSummaryEts, Dir) of
1005        ok           -> ok;
1006        {error, FSErr} ->
1007            rabbit_log:error("Unable to store file summary"
1008                             " for vhost message store for directory ~p~n"
1009                             "Error: ~p",
1010                             [Dir, FSErr])
1011    end,
1012    [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
1013                                   CurFileCacheEts, FlyingEts]],
1014    IndexModule:terminate(IndexState),
1015    case store_recovery_terms([{client_refs, maps:keys(Clients)},
1016                               {index_module, IndexModule}], Dir) of
1017        ok           ->
1018            rabbit_log:info("Message store for directory '~s' is stopped", [Dir]),
1019            ok;
1020        {error, RTErr} ->
1021            rabbit_log:error("Unable to save message store recovery terms"
1022                             " for directory ~p~nError: ~p",
1023                             [Dir, RTErr])
1024    end,
1025    State3 #msstate { index_state         = undefined,
1026                      current_file_handle = undefined }.
1027
1028code_change(_OldVsn, State, _Extra) ->
1029    {ok, State}.
1030
1031format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
1032
1033%%----------------------------------------------------------------------------
1034%% general helper functions
1035%%----------------------------------------------------------------------------
1036
1037noreply(State) ->
1038    {State1, Timeout} = next_state(State),
1039    {noreply, State1, Timeout}.
1040
1041reply(Reply, State) ->
1042    {State1, Timeout} = next_state(State),
1043    {reply, Reply, State1, Timeout}.
1044
1045next_state(State = #msstate { sync_timer_ref  = undefined,
1046                              cref_to_msg_ids = CTM }) ->
1047    case maps:size(CTM) of
1048        0 -> {State, hibernate};
1049        _ -> {start_sync_timer(State), 0}
1050    end;
1051next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
1052    case maps:size(CTM) of
1053        0 -> {stop_sync_timer(State), hibernate};
1054        _ -> {State, 0}
1055    end.
1056
1057start_sync_timer(State) ->
1058    rabbit_misc:ensure_timer(State, #msstate.sync_timer_ref,
1059                             ?SYNC_INTERVAL, sync).
1060
1061stop_sync_timer(State) ->
1062    rabbit_misc:stop_timer(State, #msstate.sync_timer_ref).
1063
1064internal_sync(State = #msstate { current_file_handle = CurHdl,
1065                                 cref_to_msg_ids     = CTM }) ->
1066    State1 = stop_sync_timer(State),
1067    CGs = maps:fold(fun (CRef, MsgIds, NS) ->
1068                            case gb_sets:is_empty(MsgIds) of
1069                                true  -> NS;
1070                                false -> [{CRef, MsgIds} | NS]
1071                            end
1072                    end, [], CTM),
1073    ok = case CGs of
1074             [] -> ok;
1075             _  -> file_handle_cache:sync(CurHdl)
1076         end,
1077    lists:foldl(fun ({CRef, MsgIds}, StateN) ->
1078                        client_confirm(CRef, MsgIds, written, StateN)
1079                end, State1, CGs).
1080
1081update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
1082    Key = {MsgId, CRef},
1083    NDiff = -Diff,
1084    case ets:lookup(FlyingEts, Key) of
1085        []           -> ignore;
1086        [{_,  Diff}] -> ignore; %% [1]
1087        [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
1088                        true = ets:delete_object(FlyingEts, {Key, 0}),
1089                        process;
1090        [{_, 0}]     -> true = ets:delete_object(FlyingEts, {Key, 0}),
1091                        ignore;
1092        [{_, Err}] when Err >= 2 ->
1093            %% The message must be referenced twice in the queue index. There
1094            %% is a bug somewhere, but we don't want to take down anything
1095            %% just because of this. Let's process the message as if the
1096            %% copies were in different queues (fan-out).
1097            ets:update_counter(FlyingEts, Key, {2, Diff}),
1098            true = ets:delete_object(FlyingEts, {Key, 0}),
1099            process;
1100        [{_, Err}]   -> throw({bad_flying_ets_record, Diff, Err, Key})
1101    end.
1102%% [1] We can get here, for example, in the following scenario: There
1103%% is a write followed by a remove in flight. The counter will be 0,
1104%% so on processing the write the server attempts to delete the
1105%% entry. If at that point the client injects another write it will
1106%% either insert a new entry, containing +1, or increment the existing
1107%% entry to +1, thus preventing its removal. Either way therefore when
1108%% the server processes the read, the counter will be +1.
1109
1110write_action({true, not_found}, _MsgId, State) ->
1111    {ignore, undefined, State};
1112write_action({true, #msg_location { file = File }}, _MsgId, State) ->
1113    {ignore, File, State};
1114write_action({false, not_found}, _MsgId, State) ->
1115    {write, State};
1116write_action({Mask, #msg_location { ref_count = 0, file = File,
1117                                    total_size = TotalSize }},
1118             MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) ->
1119    case {Mask, ets:lookup(FileSummaryEts, File)} of
1120        {false, [#file_summary { locked = true }]} ->
1121            ok = index_delete(MsgId, State),
1122            {write, State};
1123        {false_if_increment, [#file_summary { locked = true }]} ->
1124            %% The msg for MsgId is older than the client death
1125            %% message, but as it is being GC'd currently we'll have
1126            %% to write a new copy, which will then be younger, so
1127            %% ignore this write.
1128            {ignore, File, State};
1129        {_Mask, [#file_summary {}]} ->
1130            ok = index_update_ref_count(MsgId, 1, State),
1131            State1 = adjust_valid_total_size(File, TotalSize, State),
1132            {confirm, File, State1}
1133    end;
1134write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
1135             MsgId, State) ->
1136    ok = index_update_ref_count(MsgId, RefCount + 1, State),
1137    %% We already know about it, just update counter. Only update
1138    %% field otherwise bad interaction with concurrent GC
1139    {confirm, File, State}.
1140
1141write_message(MsgId, Msg, CRef,
1142              State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
1143    case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
1144        {write, State1} ->
1145            write_message(MsgId, Msg,
1146                          record_pending_confirm(CRef, MsgId, State1));
1147        {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
1148            State1;
1149        {ignore, _File, State1} ->
1150            true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
1151            State1;
1152        {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
1153            record_pending_confirm(CRef, MsgId, State1);
1154        {confirm, _File, State1} ->
1155            true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
1156            update_pending_confirms(
1157              fun (MsgOnDiskFun, CTM) ->
1158                      MsgOnDiskFun(gb_sets:singleton(MsgId), written),
1159                      CTM
1160              end, CRef, State1)
1161    end.
1162
1163remove_message(MsgId, CRef,
1164               State = #msstate { file_summary_ets = FileSummaryEts }) ->
1165    case should_mask_action(CRef, MsgId, State) of
1166        {true, _Location} ->
1167            State;
1168        {false_if_increment, #msg_location { ref_count = 0 }} ->
1169            %% CRef has tried to both write and remove this msg whilst
1170            %% it's being GC'd.
1171            %%
1172            %% ASSERTION: [#file_summary { locked = true }] =
1173            %% ets:lookup(FileSummaryEts, File),
1174            State;
1175        {_Mask, #msg_location { ref_count = RefCount, file = File,
1176                                total_size = TotalSize }}
1177          when RefCount > 0 ->
1178            %% only update field, otherwise bad interaction with
1179            %% concurrent GC
1180            Dec = fun () -> index_update_ref_count(
1181                              MsgId, RefCount - 1, State) end,
1182            case RefCount of
1183                %% don't remove from cur_file_cache_ets here because
1184                %% there may be further writes in the mailbox for the
1185                %% same msg.
1186                1 -> case ets:lookup(FileSummaryEts, File) of
1187                         [#file_summary { locked = true }] ->
1188                             add_to_pending_gc_completion(
1189                               {remove, MsgId, CRef}, File, State);
1190                         [#file_summary {}] ->
1191                             ok = Dec(),
1192                             delete_file_if_empty(
1193                               File, adjust_valid_total_size(
1194                                       File, -TotalSize, State))
1195                     end;
1196                _ -> ok = Dec(),
1197                     State
1198            end
1199    end.
1200
1201write_message(MsgId, Msg,
1202              State = #msstate { current_file_handle = CurHdl,
1203                                 current_file        = CurFile,
1204                                 sum_valid_data      = SumValid,
1205                                 sum_file_size       = SumFileSize,
1206                                 file_summary_ets    = FileSummaryEts }) ->
1207    {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
1208    {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
1209    ok = index_insert(
1210           #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
1211                           offset = CurOffset, total_size = TotalSize }, State),
1212    [#file_summary { right = undefined, locked = false }] =
1213        ets:lookup(FileSummaryEts, CurFile),
1214    [_,_] = ets:update_counter(FileSummaryEts, CurFile,
1215                               [{#file_summary.valid_total_size, TotalSize},
1216                                {#file_summary.file_size,        TotalSize}]),
1217    maybe_roll_to_new_file(CurOffset + TotalSize,
1218                           State #msstate {
1219                             sum_valid_data = SumValid    + TotalSize,
1220                             sum_file_size  = SumFileSize + TotalSize }).
1221
1222read_message(MsgId, From, State) ->
1223    case index_lookup_positive_ref_count(MsgId, State) of
1224        not_found   -> gen_server2:reply(From, not_found),
1225                       State;
1226        MsgLocation -> read_message1(From, MsgLocation, State)
1227    end.
1228
1229read_message1(From, #msg_location { msg_id = MsgId, file = File,
1230                                    offset = Offset } = MsgLoc,
1231              State = #msstate { current_file        = CurFile,
1232                                 current_file_handle = CurHdl,
1233                                 file_summary_ets    = FileSummaryEts,
1234                                 cur_file_cache_ets  = CurFileCacheEts }) ->
1235    case File =:= CurFile of
1236        true  -> {Msg, State1} =
1237                     %% can return [] if msg in file existed on startup
1238                     case ets:lookup(CurFileCacheEts, MsgId) of
1239                         [] ->
1240                             {ok, RawOffSet} =
1241                                 file_handle_cache:current_raw_offset(CurHdl),
1242                             ok = case Offset >= RawOffSet of
1243                                      true  -> file_handle_cache:flush(CurHdl);
1244                                      false -> ok
1245                                  end,
1246                             read_from_disk(MsgLoc, State);
1247                         [{MsgId, Msg1, _CacheRefCount}] ->
1248                             {Msg1, State}
1249                     end,
1250                 gen_server2:reply(From, {ok, Msg}),
1251                 State1;
1252        false -> [#file_summary { locked = Locked }] =
1253                     ets:lookup(FileSummaryEts, File),
1254                 case Locked of
1255                     true  -> add_to_pending_gc_completion({read, MsgId, From},
1256                                                           File, State);
1257                     false -> {Msg, State1} = read_from_disk(MsgLoc, State),
1258                              gen_server2:reply(From, {ok, Msg}),
1259                              State1
1260                 end
1261    end.
1262
1263read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
1264                               total_size = TotalSize }, State) ->
1265    {Hdl, State1} = get_read_handle(File, State),
1266    {ok, Offset} = file_handle_cache:position(Hdl, Offset),
1267    {ok, {MsgId, Msg}} =
1268        case rabbit_msg_file:read(Hdl, TotalSize) of
1269            {ok, {MsgId, _}} = Obj ->
1270                Obj;
1271            Rest ->
1272                {error, {misread, [{old_state, State},
1273                                   {file_num,  File},
1274                                   {offset,    Offset},
1275                                   {msg_id,    MsgId},
1276                                   {read,      Rest},
1277                                   {proc_dict, get()}
1278                                  ]}}
1279        end,
1280    {Msg, State1}.
1281
1282contains_message(MsgId, From,
1283                 State = #msstate { pending_gc_completion = Pending }) ->
1284    case index_lookup_positive_ref_count(MsgId, State) of
1285        not_found ->
1286            gen_server2:reply(From, false),
1287            State;
1288        #msg_location { file = File } ->
1289            case maps:is_key(File, Pending) of
1290                true  -> add_to_pending_gc_completion(
1291                           {contains, MsgId, From}, File, State);
1292                false -> gen_server2:reply(From, true),
1293                         State
1294            end
1295    end.
1296
1297add_to_pending_gc_completion(
1298  Op, File, State = #msstate { pending_gc_completion = Pending }) ->
1299    State #msstate { pending_gc_completion =
1300                         rabbit_misc:maps_cons(File, Op, Pending) }.
1301
1302run_pending(Files, State) ->
1303    lists:foldl(
1304      fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
1305              Pending1 = maps:remove(File, Pending),
1306              lists:foldl(
1307                fun run_pending_action/2,
1308                State1 #msstate { pending_gc_completion = Pending1 },
1309                lists:reverse(maps:get(File, Pending)))
1310      end, State, Files).
1311
1312run_pending_action({read, MsgId, From}, State) ->
1313    read_message(MsgId, From, State);
1314run_pending_action({contains, MsgId, From}, State) ->
1315    contains_message(MsgId, From, State);
1316run_pending_action({remove, MsgId, CRef}, State) ->
1317    remove_message(MsgId, CRef, State).
1318
1319safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
1320    try
1321        SuccessFun(ets:update_counter(Tab, Key, UpdateOp))
1322    catch error:badarg -> FailThunk()
1323    end.
1324
1325update_msg_cache(CacheEts, MsgId, Msg) ->
1326    case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
1327        true  -> ok;
1328        false -> safe_ets_update_counter(
1329                   CacheEts, MsgId, {3, +1}, fun (_) -> ok end,
1330                   fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
1331    end.
1332
1333adjust_valid_total_size(File, Delta, State = #msstate {
1334                                       sum_valid_data   = SumValid,
1335                                       file_summary_ets = FileSummaryEts }) ->
1336    [_] = ets:update_counter(FileSummaryEts, File,
1337                             [{#file_summary.valid_total_size, Delta}]),
1338    State #msstate { sum_valid_data = SumValid + Delta }.
1339
1340maps_store(Key, Val, Dict) ->
1341    false = maps:is_key(Key, Dict),
1342    maps:put(Key, Val, Dict).
1343
1344update_pending_confirms(Fun, CRef,
1345                        State = #msstate { clients         = Clients,
1346                                           cref_to_msg_ids = CTM }) ->
1347    case maps:get(CRef, Clients) of
1348        {_CPid, undefined,    _CloseFDsFun} -> State;
1349        {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
1350                                               State #msstate {
1351                                                 cref_to_msg_ids = CTM1 }
1352    end.
1353
1354record_pending_confirm(CRef, MsgId, State) ->
1355    update_pending_confirms(
1356      fun (_MsgOnDiskFun, CTM) ->
1357            NewMsgIds = case maps:find(CRef, CTM) of
1358                error        -> gb_sets:singleton(MsgId);
1359                {ok, MsgIds} -> gb_sets:add(MsgId, MsgIds)
1360            end,
1361            maps:put(CRef, NewMsgIds, CTM)
1362      end, CRef, State).
1363
1364client_confirm(CRef, MsgIds, ActionTaken, State) ->
1365    update_pending_confirms(
1366      fun (MsgOnDiskFun, CTM) ->
1367              case maps:find(CRef, CTM) of
1368                  {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
1369                                           ActionTaken),
1370                              MsgIds1 = rabbit_misc:gb_sets_difference(
1371                                          Gs, MsgIds),
1372                              case gb_sets:is_empty(MsgIds1) of
1373                                  true  -> maps:remove(CRef, CTM);
1374                                  false -> maps:put(CRef, MsgIds1, CTM)
1375                              end;
1376                  error    -> CTM
1377              end
1378      end, CRef, State).
1379
1380blind_confirm(CRef, MsgIds, ActionTaken, State) ->
1381    update_pending_confirms(
1382      fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end,
1383      CRef, State).
1384
1385%% Detect whether the MsgId is older or younger than the client's death
1386%% msg (if there is one). If the msg is older than the client death
1387%% msg, and it has a 0 ref_count we must only alter the ref_count, not
1388%% rewrite the msg - rewriting it would make it younger than the death
1389%% msg and thus should be ignored. Note that this (correctly) returns
1390%% false when testing to remove the death msg itself.
1391should_mask_action(CRef, MsgId,
1392                   State = #msstate{dying_clients = DyingClients}) ->
1393    case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of
1394        {error, Location} ->
1395            {false, Location};
1396        {{ok, _}, not_found} ->
1397            {true, not_found};
1398        {{ok, Client}, #msg_location { file = File, offset = Offset,
1399                                       ref_count = RefCount } = Location} ->
1400            #dying_client{file = DeathFile, offset = DeathOffset} = Client,
1401            {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
1402                 {true,  _} -> true;
1403                 {false, 0} -> false_if_increment;
1404                 {false, _} -> false
1405             end, Location}
1406    end.
1407
1408%%----------------------------------------------------------------------------
1409%% file helper functions
1410%%----------------------------------------------------------------------------
1411
1412open_file(File, Mode) ->
1413    file_handle_cache:open_with_absolute_path(
1414      File, ?BINARY_MODE ++ Mode,
1415      [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
1416       {read_buffer,  ?HANDLE_CACHE_BUFFER_SIZE}]).
1417
1418open_file(Dir, FileName, Mode) ->
1419    open_file(form_filename(Dir, FileName), Mode).
1420
1421close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
1422    CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
1423
1424close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
1425    State #msstate { file_handle_cache = close_handle(Key, FHC) };
1426
1427close_handle(Key, FHC) ->
1428    case maps:find(Key, FHC) of
1429        {ok, Hdl} -> ok = file_handle_cache:close(Hdl),
1430                     maps:remove(Key, FHC);
1431        error     -> FHC
1432    end.
1433
1434mark_handle_open(FileHandlesEts, File, Ref) ->
1435    %% This is fine to fail (already exists). Note it could fail with
1436    %% the value being close, and not have it updated to open.
1437    ets:insert_new(FileHandlesEts, {{Ref, File}, open}),
1438    true.
1439
1440%% See comment in client_read3 - only call this when the file is locked
1441mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
1442    [ begin
1443          case (ets:update_element(FileHandlesEts, Key, {2, close})
1444                andalso Invoke) of
1445              true  -> case maps:get(Ref, ClientRefs) of
1446                           {_CPid, _MsgOnDiskFun, undefined} ->
1447                               ok;
1448                           {_CPid, _MsgOnDiskFun, CloseFDsFun} ->
1449                               ok = CloseFDsFun()
1450                       end;
1451              false -> ok
1452          end
1453      end || {{Ref, _File} = Key, open} <-
1454                 ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
1455    true.
1456
1457safe_file_delete_fun(File, Dir, FileHandlesEts) ->
1458    fun () -> safe_file_delete(File, Dir, FileHandlesEts) end.
1459
1460safe_file_delete(File, Dir, FileHandlesEts) ->
1461    %% do not match on any value - it's the absence of the row that
1462    %% indicates the client has really closed the file.
1463    case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
1464        {[_|_], _Cont} -> false;
1465        _              -> ok = file:delete(
1466                                 form_filename(Dir, filenum_to_name(File))),
1467                          true
1468    end.
1469
1470-spec close_all_indicated
1471        (client_msstate()) -> rabbit_types:ok(client_msstate()).
1472
1473close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
1474                                      client_ref       = Ref } =
1475                        CState) ->
1476    Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
1477    {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
1478                             true = ets:delete(FileHandlesEts, Key),
1479                             close_handle(File, CStateM)
1480                     end, CState, Objs)}.
1481
1482close_all_handles(CState = #client_msstate { file_handles_ets  = FileHandlesEts,
1483                                             file_handle_cache = FHC,
1484                                             client_ref        = Ref }) ->
1485    ok = maps:fold(fun (File, Hdl, ok) ->
1486                           true = ets:delete(FileHandlesEts, {Ref, File}),
1487                           file_handle_cache:close(Hdl)
1488                   end, ok, FHC),
1489    CState #client_msstate { file_handle_cache = #{} };
1490
1491close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
1492    ok = maps:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end,
1493                   ok, FHC),
1494    State #msstate { file_handle_cache = #{} }.
1495
1496get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
1497                                                    dir = Dir }) ->
1498    {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
1499    {Hdl, CState #client_msstate { file_handle_cache = FHC2 }};
1500
1501get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
1502                                            dir = Dir }) ->
1503    {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
1504    {Hdl, State #msstate { file_handle_cache = FHC2 }}.
1505
1506get_read_handle(FileNum, FHC, Dir) ->
1507    case maps:find(FileNum, FHC) of
1508        {ok, Hdl} -> {Hdl, FHC};
1509        error     -> {ok, Hdl} = open_file(Dir, filenum_to_name(FileNum),
1510                                           ?READ_MODE),
1511                     {Hdl, maps:put(FileNum, Hdl, FHC)}
1512    end.
1513
1514preallocate(Hdl, FileSizeLimit, FinalPos) ->
1515    {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
1516    ok = file_handle_cache:truncate(Hdl),
1517    {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
1518    ok.
1519
1520truncate_and_extend_file(Hdl, Lowpoint, Highpoint) ->
1521    {ok, Lowpoint} = file_handle_cache:position(Hdl, Lowpoint),
1522    ok = file_handle_cache:truncate(Hdl),
1523    ok = preallocate(Hdl, Highpoint, Lowpoint).
1524
1525form_filename(Dir, Name) -> filename:join(Dir, Name).
1526
1527filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
1528
1529filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
1530
1531list_sorted_filenames(Dir, Ext) ->
1532    lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
1533               filelib:wildcard("*" ++ Ext, Dir)).
1534
1535%%----------------------------------------------------------------------------
1536%% index
1537%%----------------------------------------------------------------------------
1538
1539index_lookup_positive_ref_count(Key, State) ->
1540    case index_lookup(Key, State) of
1541        not_found                       -> not_found;
1542        #msg_location { ref_count = 0 } -> not_found;
1543        #msg_location {} = MsgLocation  -> MsgLocation
1544    end.
1545
1546index_update_ref_count(Key, RefCount, State) ->
1547    index_update_fields(Key, {#msg_location.ref_count, RefCount}, State).
1548
1549index_lookup(Key, #gc_state { index_module = Index,
1550                              index_state  = State }) ->
1551    Index:lookup(Key, State);
1552
1553index_lookup(Key, #client_msstate { index_module = Index,
1554                                    index_state  = State }) ->
1555    Index:lookup(Key, State);
1556
1557index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
1558    Index:lookup(Key, State).
1559
1560index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
1561    Index:insert(Obj, State).
1562
1563index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
1564    Index:update(Obj, State).
1565
1566index_update_fields(Key, Updates, #msstate{ index_module = Index,
1567                                            index_state  = State }) ->
1568    Index:update_fields(Key, Updates, State);
1569index_update_fields(Key, Updates, #gc_state{ index_module = Index,
1570                                             index_state  = State }) ->
1571    Index:update_fields(Key, Updates, State).
1572
1573index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
1574    Index:delete(Key, State).
1575
1576index_delete_object(Obj, #gc_state{ index_module = Index,
1577                                    index_state = State }) ->
1578     Index:delete_object(Obj, State).
1579
1580index_clean_up_temporary_reference_count_entries(
1581        #msstate { index_module = Index,
1582                   index_state  = State }) ->
1583    Index:clean_up_temporary_reference_count_entries_without_file(State).
1584
1585%%----------------------------------------------------------------------------
1586%% shutdown and recovery
1587%%----------------------------------------------------------------------------
1588
1589recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Name) ->
1590    {false, IndexModule:new(Dir), []};
1591recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Name) ->
1592    rabbit_log:warning("Message store ~tp: rebuilding indices from scratch", [Name]),
1593    {false, IndexModule:new(Dir), []};
1594recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Name) ->
1595    Fresh = fun (ErrorMsg, ErrorArgs) ->
1596                    rabbit_log:warning("Message store ~tp : " ++ ErrorMsg ++ "~n"
1597                                       "rebuilding indices from scratch",
1598                                       [Name | ErrorArgs]),
1599                    {false, IndexModule:new(Dir), []}
1600            end,
1601    case read_recovery_terms(Dir) of
1602        {false, Error} ->
1603            Fresh("failed to read recovery terms: ~p", [Error]);
1604        {true, Terms} ->
1605            RecClientRefs  = proplists:get_value(client_refs, Terms, []),
1606            RecIndexModule = proplists:get_value(index_module, Terms),
1607            case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
1608                  andalso IndexModule =:= RecIndexModule) of
1609                true  -> case IndexModule:recover(Dir) of
1610                             {ok, IndexState1} ->
1611                                 {true, IndexState1, ClientRefs};
1612                             {error, Error} ->
1613                                 Fresh("failed to recover index: ~p", [Error])
1614                         end;
1615                false -> Fresh("recovery terms differ from present", [])
1616            end
1617    end.
1618
1619store_recovery_terms(Terms, Dir) ->
1620    rabbit_file:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
1621
1622read_recovery_terms(Dir) ->
1623    Path = filename:join(Dir, ?CLEAN_FILENAME),
1624    case rabbit_file:read_term_file(Path) of
1625        {ok, Terms}    -> case file:delete(Path) of
1626                              ok             -> {true,  Terms};
1627                              {error, Error} -> {false, Error}
1628                          end;
1629        {error, Error} -> {false, Error}
1630    end.
1631
1632store_file_summary(Tid, Dir) ->
1633    ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
1634                      [{extended_info, [object_count]}]).
1635
1636recover_file_summary(false, _Dir) ->
1637    %% TODO: the only reason for this to be an *ordered*_set is so
1638    %% that a) maybe_compact can start a traversal from the eldest
1639    %% file, and b) build_index in fast recovery mode can easily
1640    %% identify the current file. It's awkward to have both that
1641    %% odering and the left/right pointers in the entries - replacing
1642    %% the former with some additional bit of state would be easy, but
1643    %% ditching the latter would be neater.
1644    {false, ets:new(rabbit_msg_store_file_summary,
1645                    [ordered_set, public, {keypos, #file_summary.file}])};
1646recover_file_summary(true, Dir) ->
1647    Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
1648    case ets:file2tab(Path) of
1649        {ok, Tid}       -> ok = file:delete(Path),
1650                           {true, Tid};
1651        {error, _Error} -> recover_file_summary(false, Dir)
1652    end.
1653
1654count_msg_refs(Gen, Seed, State) ->
1655    case Gen(Seed) of
1656        finished ->
1657            ok;
1658        {_MsgId, 0, Next} ->
1659            count_msg_refs(Gen, Next, State);
1660        {MsgId, Delta, Next} ->
1661            ok = case index_lookup(MsgId, State) of
1662                     not_found ->
1663                         index_insert(#msg_location { msg_id = MsgId,
1664                                                      file = undefined,
1665                                                      ref_count = Delta },
1666                                      State);
1667                     #msg_location { ref_count = RefCount } = StoreEntry ->
1668                         NewRefCount = RefCount + Delta,
1669                         case NewRefCount of
1670                             0 -> index_delete(MsgId, State);
1671                             _ -> index_update(StoreEntry #msg_location {
1672                                                 ref_count = NewRefCount },
1673                                               State)
1674                         end
1675                 end,
1676            count_msg_refs(Gen, Next, State)
1677    end.
1678
1679recover_crashed_compactions(Dir) ->
1680    FileNames =    list_sorted_filenames(Dir, ?FILE_EXTENSION),
1681    TmpFileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION_TMP),
1682    lists:foreach(
1683      fun (TmpFileName) ->
1684              NonTmpRelatedFileName =
1685                  filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
1686              true = lists:member(NonTmpRelatedFileName, FileNames),
1687              ok = recover_crashed_compaction(
1688                     Dir, TmpFileName, NonTmpRelatedFileName)
1689      end, TmpFileNames),
1690    TmpFileNames == [].
1691
1692recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
1693    %% Because a msg can legitimately appear multiple times in the
1694    %% same file, identifying the contents of the tmp file and where
1695    %% they came from is non-trivial. If we are recovering a crashed
1696    %% compaction then we will be rebuilding the index, which can cope
1697    %% with duplicates appearing. Thus the simplest and safest thing
1698    %% to do is to append the contents of the tmp file to its main
1699    %% file.
1700    {ok, TmpHdl}  = open_file(Dir, TmpFileName, ?READ_MODE),
1701    {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
1702                              ?READ_MODE ++ ?WRITE_MODE),
1703    {ok, _End} = file_handle_cache:position(MainHdl, eof),
1704    Size = filelib:file_size(form_filename(Dir, TmpFileName)),
1705    {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
1706    ok = file_handle_cache:close(MainHdl),
1707    ok = file_handle_cache:delete(TmpHdl),
1708    ok.
1709
1710scan_file_for_valid_messages(File) ->
1711    case open_file(File, ?READ_MODE) of
1712        {ok, Hdl}       -> Valid = rabbit_msg_file:scan(
1713                                     Hdl, filelib:file_size(File),
1714                                     fun scan_fun/2, []),
1715                           ok = file_handle_cache:close(Hdl),
1716                           Valid;
1717        {error, enoent} -> {ok, [], 0};
1718        {error, Reason} -> {error, {unable_to_scan_file,
1719                                    filename:basename(File),
1720                                    Reason}}
1721    end.
1722
1723scan_file_for_valid_messages(Dir, FileName) ->
1724    scan_file_for_valid_messages(form_filename(Dir, FileName)).
1725
1726scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
1727    [{MsgId, TotalSize, Offset} | Acc].
1728
1729%% Takes the list in *ascending* order (i.e. eldest message
1730%% first). This is the opposite of what scan_file_for_valid_messages
1731%% produces. The list of msgs that is produced is youngest first.
1732drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0).
1733
1734drop_contiguous_block_prefix([], ExpectedOffset) ->
1735    {ExpectedOffset, []};
1736drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset,
1737                                              total_size = TotalSize } | Tail],
1738                             ExpectedOffset) ->
1739    ExpectedOffset1 = ExpectedOffset + TotalSize,
1740    drop_contiguous_block_prefix(Tail, ExpectedOffset1);
1741drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) ->
1742    {ExpectedOffset, MsgsAfterGap}.
1743
1744build_index(true, _StartupFunState,
1745            State = #msstate { file_summary_ets = FileSummaryEts }) ->
1746    ets:foldl(
1747      fun (#file_summary { valid_total_size = ValidTotalSize,
1748                           file_size        = FileSize,
1749                           file             = File },
1750           {_Offset, State1 = #msstate { sum_valid_data = SumValid,
1751                                         sum_file_size  = SumFileSize }}) ->
1752              {FileSize, State1 #msstate {
1753                           sum_valid_data = SumValid + ValidTotalSize,
1754                           sum_file_size  = SumFileSize + FileSize,
1755                           current_file   = File }}
1756      end, {0, State}, FileSummaryEts);
1757build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
1758            State = #msstate { dir = Dir }) ->
1759    rabbit_log:debug("Rebuilding message refcount...", []),
1760    ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
1761    rabbit_log:debug("Done rebuilding message refcount", []),
1762    {ok, Pid} = gatherer:start_link(),
1763    case [filename_to_num(FileName) ||
1764             FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of
1765        []     -> rebuild_index(Pid, [State #msstate.current_file],
1766                                State);
1767        Files  -> {Offset, State1} = rebuild_index(Pid, Files, State),
1768                  {Offset, lists:foldl(fun delete_file_if_empty/2,
1769                                       State1, Files)}
1770    end.
1771
1772build_index_worker(Gatherer, State = #msstate { dir = Dir },
1773                   Left, File, Files) ->
1774    FileName = filenum_to_name(File),
1775    rabbit_log:debug("Rebuilding message location index from ~p (~B file(s) remaining)",
1776                     [form_filename(Dir, FileName), length(Files)]),
1777    {ok, Messages, FileSize} =
1778        scan_file_for_valid_messages(Dir, FileName),
1779    {ValidMessages, ValidTotalSize} =
1780        lists:foldl(
1781          fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
1782                  case index_lookup(MsgId, State) of
1783                      #msg_location { file = undefined } = StoreEntry ->
1784                          ok = index_update(StoreEntry #msg_location {
1785                                              file = File, offset = Offset,
1786                                              total_size = TotalSize },
1787                                            State),
1788                          {[Obj | VMAcc], VTSAcc + TotalSize};
1789                      _ ->
1790                          {VMAcc, VTSAcc}
1791                  end
1792          end, {[], 0}, Messages),
1793    {Right, FileSize1} =
1794        case Files of
1795            %% if it's the last file, we'll truncate to remove any
1796            %% rubbish above the last valid message. This affects the
1797            %% file size.
1798            []    -> {undefined, case ValidMessages of
1799                                     [] -> 0;
1800                                     _  -> {_MsgId, TotalSize, Offset} =
1801                                               lists:last(ValidMessages),
1802                                           Offset + TotalSize
1803                                 end};
1804            [F|_] -> {F, FileSize}
1805        end,
1806    ok = gatherer:in(Gatherer, #file_summary {
1807                                  file             = File,
1808                                  valid_total_size = ValidTotalSize,
1809                                  left             = Left,
1810                                  right            = Right,
1811                                  file_size        = FileSize1,
1812                                  locked           = false,
1813                                  readers          = 0 }),
1814    ok = gatherer:finish(Gatherer).
1815
1816enqueue_build_index_workers(_Gatherer, _Left, [], _State) ->
1817    exit(normal);
1818enqueue_build_index_workers(Gatherer, Left, [File|Files], State) ->
1819    ok = worker_pool:dispatch_sync(
1820           fun () ->
1821                   link(Gatherer),
1822                   ok = build_index_worker(Gatherer, State,
1823                                           Left, File, Files),
1824                   unlink(Gatherer),
1825                   ok
1826           end),
1827    enqueue_build_index_workers(Gatherer, File, Files, State).
1828
1829reduce_index(Gatherer, LastFile,
1830             State = #msstate { file_summary_ets = FileSummaryEts,
1831                                sum_valid_data   = SumValid,
1832                                sum_file_size    = SumFileSize }) ->
1833    case gatherer:out(Gatherer) of
1834        empty ->
1835            ok = gatherer:stop(Gatherer),
1836            ok = index_clean_up_temporary_reference_count_entries(State),
1837            Offset = case ets:lookup(FileSummaryEts, LastFile) of
1838                         []                                       -> 0;
1839                         [#file_summary { file_size = FileSize }] -> FileSize
1840                     end,
1841            {Offset, State #msstate { current_file = LastFile }};
1842        {value, #file_summary { valid_total_size = ValidTotalSize,
1843                                file_size = FileSize } = FileSummary} ->
1844            true = ets:insert_new(FileSummaryEts, FileSummary),
1845            reduce_index(Gatherer, LastFile,
1846                         State #msstate {
1847                           sum_valid_data = SumValid + ValidTotalSize,
1848                           sum_file_size  = SumFileSize + FileSize })
1849    end.
1850
1851rebuild_index(Gatherer, Files, State) ->
1852    lists:foreach(fun (_File) ->
1853                          ok = gatherer:fork(Gatherer)
1854                  end, Files),
1855    Pid = spawn(
1856            fun () ->
1857                    enqueue_build_index_workers(Gatherer, undefined,
1858                                                Files, State)
1859            end),
1860    erlang:monitor(process, Pid),
1861    reduce_index(Gatherer, lists:last(Files), State).
1862
1863%%----------------------------------------------------------------------------
1864%% garbage collection / compaction / aggregation -- internal
1865%%----------------------------------------------------------------------------
1866
1867maybe_roll_to_new_file(
1868  Offset,
1869  State = #msstate { dir                 = Dir,
1870                     current_file_handle = CurHdl,
1871                     current_file        = CurFile,
1872                     file_summary_ets    = FileSummaryEts,
1873                     cur_file_cache_ets  = CurFileCacheEts,
1874                     file_size_limit     = FileSizeLimit })
1875  when Offset >= FileSizeLimit ->
1876    State1 = internal_sync(State),
1877    ok = file_handle_cache:close(CurHdl),
1878    NextFile = CurFile + 1,
1879    {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
1880    true = ets:insert_new(FileSummaryEts, #file_summary {
1881                            file             = NextFile,
1882                            valid_total_size = 0,
1883                            left             = CurFile,
1884                            right            = undefined,
1885                            file_size        = 0,
1886                            locked           = false,
1887                            readers          = 0 }),
1888    true = ets:update_element(FileSummaryEts, CurFile,
1889                              {#file_summary.right, NextFile}),
1890    true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1891    maybe_compact(State1 #msstate { current_file_handle = NextHdl,
1892                                    current_file        = NextFile });
1893maybe_roll_to_new_file(_, State) ->
1894    State.
1895
1896maybe_compact(State = #msstate { sum_valid_data        = SumValid,
1897                                 sum_file_size         = SumFileSize,
1898                                 gc_pid                = GCPid,
1899                                 pending_gc_completion = Pending,
1900                                 file_summary_ets      = FileSummaryEts,
1901                                 file_size_limit       = FileSizeLimit })
1902  when SumFileSize > 2 * FileSizeLimit andalso
1903       (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
1904    %% TODO: the algorithm here is sub-optimal - it may result in a
1905    %% complete traversal of FileSummaryEts.
1906    First = ets:first(FileSummaryEts),
1907    case First =:= '$end_of_table' orelse
1908        maps:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of
1909        true ->
1910            State;
1911        false ->
1912            case find_files_to_combine(FileSummaryEts, FileSizeLimit,
1913                                       ets:lookup(FileSummaryEts, First)) of
1914                not_found ->
1915                    State;
1916                {Src, Dst} ->
1917                    Pending1 = maps_store(Dst, [],
1918                                             maps_store(Src, [], Pending)),
1919                    State1 = close_handle(Src, close_handle(Dst, State)),
1920                    true = ets:update_element(FileSummaryEts, Src,
1921                                              {#file_summary.locked, true}),
1922                    true = ets:update_element(FileSummaryEts, Dst,
1923                                              {#file_summary.locked, true}),
1924                    ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst),
1925                    State1 #msstate { pending_gc_completion = Pending1 }
1926            end
1927    end;
1928maybe_compact(State) ->
1929    State.
1930
1931find_files_to_combine(FileSummaryEts, FileSizeLimit,
1932                      [#file_summary { file             = Dst,
1933                                       valid_total_size = DstValid,
1934                                       right            = Src,
1935                                       locked           = DstLocked }]) ->
1936    case Src of
1937        undefined ->
1938            not_found;
1939        _   ->
1940            [#file_summary { file             = Src,
1941                             valid_total_size = SrcValid,
1942                             left             = Dst,
1943                             right            = SrcRight,
1944                             locked           = SrcLocked }] = Next =
1945                ets:lookup(FileSummaryEts, Src),
1946            case SrcRight of
1947                undefined -> not_found;
1948                _         -> case (DstValid + SrcValid =< FileSizeLimit) andalso
1949                                 (DstValid > 0) andalso (SrcValid > 0) andalso
1950                                 not (DstLocked orelse SrcLocked) of
1951                                 true  -> {Src, Dst};
1952                                 false -> find_files_to_combine(
1953                                            FileSummaryEts, FileSizeLimit, Next)
1954                             end
1955            end
1956    end.
1957
1958delete_file_if_empty(File, State = #msstate { current_file = File }) ->
1959    State;
1960delete_file_if_empty(File, State = #msstate {
1961                             gc_pid                = GCPid,
1962                             file_summary_ets      = FileSummaryEts,
1963                             pending_gc_completion = Pending }) ->
1964    [#file_summary { valid_total_size = ValidData,
1965                     locked           = false }] =
1966        ets:lookup(FileSummaryEts, File),
1967    case ValidData of
1968        %% don't delete the file_summary_ets entry for File here
1969        %% because we could have readers which need to be able to
1970        %% decrement the readers count.
1971        0 -> true = ets:update_element(FileSummaryEts, File,
1972                                       {#file_summary.locked, true}),
1973             ok = rabbit_msg_store_gc:delete(GCPid, File),
1974             Pending1 = maps_store(File, [], Pending),
1975             close_handle(File,
1976                          State #msstate { pending_gc_completion = Pending1 });
1977        _ -> State
1978    end.
1979
1980cleanup_after_file_deletion(File,
1981                            #msstate { file_handles_ets = FileHandlesEts,
1982                                       file_summary_ets = FileSummaryEts,
1983                                       clients          = Clients }) ->
1984    %% Ensure that any clients that have open fhs to the file close
1985    %% them before using them again. This has to be done here (given
1986    %% it's done in the msg_store, and not the gc), and not when
1987    %% starting up the GC, because if done when starting up the GC,
1988    %% the client could find the close, and close and reopen the fh,
1989    %% whilst the GC is waiting for readers to disappear, before it's
1990    %% actually done the GC.
1991    true = mark_handle_to_close(Clients, FileHandlesEts, File, true),
1992    [#file_summary { left    = Left,
1993                     right   = Right,
1994                     locked  = true,
1995                     readers = 0 }] = ets:lookup(FileSummaryEts, File),
1996    %% We'll never delete the current file, so right is never undefined
1997    true = Right =/= undefined, %% ASSERTION
1998    true = ets:update_element(FileSummaryEts, Right,
1999                              {#file_summary.left, Left}),
2000    %% ensure the double linked list is maintained
2001    true = case Left of
2002               undefined -> true; %% File is the eldest file (left-most)
2003               _         -> ets:update_element(FileSummaryEts, Left,
2004                                               {#file_summary.right, Right})
2005           end,
2006    true = ets:delete(FileSummaryEts, File),
2007    ok.
2008
2009%%----------------------------------------------------------------------------
2010%% garbage collection / compaction / aggregation -- external
2011%%----------------------------------------------------------------------------
2012
2013-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
2014                              {ok, deletion_thunk()} | {defer, [non_neg_integer()]}.
2015
2016combine_files(Source, Destination,
2017              State = #gc_state { file_summary_ets = FileSummaryEts }) ->
2018    [#file_summary{locked = true} = SourceSummary] =
2019        ets:lookup(FileSummaryEts, Source),
2020
2021    [#file_summary{locked = true} = DestinationSummary] =
2022        ets:lookup(FileSummaryEts, Destination),
2023
2024    case {SourceSummary, DestinationSummary} of
2025        {#file_summary{readers = 0}, #file_summary{readers = 0}} ->
2026            {ok, do_combine_files(SourceSummary, DestinationSummary,
2027                                  Source, Destination, State)};
2028        _ ->
2029            rabbit_log:debug("Asked to combine files ~p and ~p but they have active readers. Deferring.",
2030                             [Source, Destination]),
2031            DeferredFiles = [FileSummary#file_summary.file
2032                             || FileSummary <- [SourceSummary, DestinationSummary],
2033                             FileSummary#file_summary.readers /= 0],
2034            {defer, DeferredFiles}
2035    end.
2036
2037do_combine_files(SourceSummary, DestinationSummary,
2038                 Source, Destination,
2039                 State = #gc_state { file_summary_ets = FileSummaryEts,
2040                                     file_handles_ets = FileHandlesEts,
2041                                     dir              = Dir,
2042                                     msg_store        = Server }) ->
2043    #file_summary {
2044        readers          = 0,
2045        left             = Destination,
2046        valid_total_size = SourceValid,
2047        file_size        = SourceFileSize,
2048        locked           = true } = SourceSummary,
2049    #file_summary {
2050        readers          = 0,
2051        right            = Source,
2052        valid_total_size = DestinationValid,
2053        file_size        = DestinationFileSize,
2054        locked           = true } = DestinationSummary,
2055
2056    SourceName           = filenum_to_name(Source),
2057    DestinationName      = filenum_to_name(Destination),
2058    {ok, SourceHdl}      = open_file(Dir, SourceName,
2059                                     ?READ_AHEAD_MODE),
2060    {ok, DestinationHdl} = open_file(Dir, DestinationName,
2061                                     ?READ_AHEAD_MODE ++ ?WRITE_MODE),
2062    TotalValidData = SourceValid + DestinationValid,
2063    %% if DestinationValid =:= DestinationContiguousTop then we don't
2064    %% need a tmp file
2065    %% if they're not equal, then we need to write out everything past
2066    %%   the DestinationContiguousTop to a tmp file then truncate,
2067    %%   copy back in, and then copy over from Source
2068    %% otherwise we just truncate straight away and copy over from Source
2069    {DestinationWorkList, DestinationValid} =
2070        load_and_vacuum_message_file(Destination, State),
2071    {DestinationContiguousTop, DestinationWorkListTail} =
2072        drop_contiguous_block_prefix(DestinationWorkList),
2073    case DestinationWorkListTail of
2074        [] -> ok = truncate_and_extend_file(
2075                     DestinationHdl, DestinationContiguousTop, TotalValidData);
2076        _  -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
2077              {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
2078              ok = copy_messages(
2079                     DestinationWorkListTail, DestinationContiguousTop,
2080                     DestinationValid, DestinationHdl, TmpHdl, Destination,
2081                     State),
2082              TmpSize = DestinationValid - DestinationContiguousTop,
2083              %% so now Tmp contains everything we need to salvage
2084              %% from Destination, and index_state has been updated to
2085              %% reflect the compaction of Destination so truncate
2086              %% Destination and copy from Tmp back to the end
2087              {ok, 0} = file_handle_cache:position(TmpHdl, 0),
2088              ok = truncate_and_extend_file(
2089                     DestinationHdl, DestinationContiguousTop, TotalValidData),
2090              {ok, TmpSize} =
2091                  file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
2092              %% position in DestinationHdl should now be DestinationValid
2093              ok = file_handle_cache:sync(DestinationHdl),
2094              ok = file_handle_cache:delete(TmpHdl)
2095    end,
2096    {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State),
2097    ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData,
2098                       SourceHdl, DestinationHdl, Destination, State),
2099    %% tidy up
2100    ok = file_handle_cache:close(DestinationHdl),
2101    ok = file_handle_cache:close(SourceHdl),
2102
2103    %% don't update dest.right, because it could be changing at the
2104    %% same time
2105    true = ets:update_element(
2106             FileSummaryEts, Destination,
2107             [{#file_summary.valid_total_size, TotalValidData},
2108              {#file_summary.file_size,        TotalValidData}]),
2109
2110    Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
2111    rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes",
2112                     [Source, Destination, Reclaimed]),
2113    gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
2114    safe_file_delete_fun(Source, Dir, FileHandlesEts).
2115
2116-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, [non_neg_integer()]}.
2117
2118delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
2119                                      file_handles_ets = FileHandlesEts,
2120                                      dir              = Dir,
2121                                      msg_store        = Server }) ->
2122    case ets:lookup(FileSummaryEts, File) of
2123        [#file_summary { valid_total_size = 0,
2124                         locked           = true,
2125                         file_size        = FileSize,
2126                         readers          = 0 }] ->
2127            {[], 0} = load_and_vacuum_message_file(File, State),
2128            gen_server2:cast(Server, {delete_file, File, FileSize}),
2129            {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)};
2130        [#file_summary{readers = Readers}] when Readers > 0 ->
2131            rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.",
2132                             [File]),
2133            {defer, [File]}
2134    end.
2135
2136load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
2137    %% Messages here will be end-of-file at start-of-list
2138    {ok, Messages, _FileSize} =
2139        scan_file_for_valid_messages(Dir, filenum_to_name(File)),
2140    %% foldl will reverse so will end up with msgs in ascending offset order
2141    lists:foldl(
2142      fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
2143              case index_lookup(MsgId, State) of
2144                  #msg_location { file = File, total_size = TotalSize,
2145                                  offset = Offset, ref_count = 0 } = Entry ->
2146                      ok = index_delete_object(Entry, State),
2147                      Acc;
2148                  #msg_location { file = File, total_size = TotalSize,
2149                                  offset = Offset } = Entry ->
2150                      {[ Entry | List ], TotalSize + Size};
2151                  _ ->
2152                      Acc
2153              end
2154      end, {[], 0}, Messages).
2155
2156copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
2157              Destination, State) ->
2158    Copy = fun ({BlockStart, BlockEnd}) ->
2159                   BSize = BlockEnd - BlockStart,
2160                   {ok, BlockStart} =
2161                       file_handle_cache:position(SourceHdl, BlockStart),
2162                   {ok, BSize} =
2163                       file_handle_cache:copy(SourceHdl, DestinationHdl, BSize)
2164           end,
2165    case
2166        lists:foldl(
2167          fun (#msg_location { msg_id = MsgId, offset = Offset,
2168                               total_size = TotalSize },
2169               {CurOffset, Block = {BlockStart, BlockEnd}}) ->
2170                  %% CurOffset is in the DestinationFile.
2171                  %% Offset, BlockStart and BlockEnd are in the SourceFile
2172                  %% update MsgLocation to reflect change of file and offset
2173                  ok = index_update_fields(MsgId,
2174                                           [{#msg_location.file, Destination},
2175                                            {#msg_location.offset, CurOffset}],
2176                                           State),
2177                  {CurOffset + TotalSize,
2178                   case BlockEnd of
2179                       undefined ->
2180                           %% base case, called only for the first list elem
2181                           {Offset, Offset + TotalSize};
2182                       Offset ->
2183                           %% extend the current block because the
2184                           %% next msg follows straight on
2185                           {BlockStart, BlockEnd + TotalSize};
2186                       _ ->
2187                           %% found a gap, so actually do the work for
2188                           %% the previous block
2189                           Copy(Block),
2190                           {Offset, Offset + TotalSize}
2191                   end}
2192          end, {InitOffset, {undefined, undefined}}, WorkList) of
2193        {FinalOffset, Block} ->
2194            case WorkList of
2195                [] -> ok;
2196                _  -> Copy(Block), %% do the last remaining block
2197                      ok = file_handle_cache:sync(DestinationHdl)
2198            end;
2199        {FinalOffsetZ, _Block} ->
2200            {gc_error, [{expected, FinalOffset},
2201                        {got, FinalOffsetZ},
2202                        {destination, Destination}]}
2203    end.
2204
2205-spec force_recovery(file:filename(), server()) -> 'ok'.
2206
2207force_recovery(BaseDir, Store) ->
2208    Dir = filename:join(BaseDir, atom_to_list(Store)),
2209    case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
2210        ok              -> ok;
2211        {error, enoent} -> ok
2212    end,
2213    recover_crashed_compactions(BaseDir),
2214    ok.
2215
2216foreach_file(D, Fun, Files) ->
2217    [ok = Fun(filename:join(D, File)) || File <- Files].
2218
2219foreach_file(D1, D2, Fun, Files) ->
2220    [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
2221
2222-spec transform_dir(file:filename(), server(),
2223        fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'.
2224
2225transform_dir(BaseDir, Store, TransformFun) ->
2226    Dir = filename:join(BaseDir, atom_to_list(Store)),
2227    TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
2228    TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
2229    CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
2230    case filelib:is_dir(TmpDir) of
2231        true  -> throw({error, transform_failed_previously});
2232        false -> FileList = list_sorted_filenames(Dir, ?FILE_EXTENSION),
2233                 foreach_file(Dir, TmpDir, TransformFile,     FileList),
2234                 foreach_file(Dir,         fun file:delete/1, FileList),
2235                 foreach_file(TmpDir, Dir, CopyFile,          FileList),
2236                 foreach_file(TmpDir,      fun file:delete/1, FileList),
2237                 ok = file:del_dir(TmpDir)
2238    end.
2239
2240transform_msg_file(FileOld, FileNew, TransformFun) ->
2241    ok = rabbit_file:ensure_parent_dirs_exist(FileNew),
2242    {ok, RefOld} = file_handle_cache:open_with_absolute_path(
2243                     FileOld, [raw, binary, read], []),
2244    {ok, RefNew} = file_handle_cache:open_with_absolute_path(
2245                     FileNew, [raw, binary, write],
2246                     [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]),
2247    {ok, _Acc, _IgnoreSize} =
2248        rabbit_msg_file:scan(
2249          RefOld, filelib:file_size(FileOld),
2250          fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
2251                  {ok, MsgNew} = case binary_to_term(BinMsg) of
2252                                     <<>> -> {ok, <<>>};  %% dying client marker
2253                                     Msg  -> TransformFun(Msg)
2254                                 end,
2255                  {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
2256                  ok
2257          end, ok),
2258    ok = file_handle_cache:close(RefOld),
2259    ok = file_handle_cache:close(RefNew),
2260    ok.
2261