1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_queue_index).
9
10-compile({inline, [segment_entry_count/0]}).
11
12-export([erase/1, init/3, reset_state/1, recover/6,
13         terminate/3, delete_and_terminate/1,
14         pre_publish/7, flush_pre_publish_cache/2,
15         publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
16         read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
17
18-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
19-export([scan_queue_segments/3, scan_queue_segments/4]).
20
21%% Migrates from global to per-vhost message stores
22-export([move_to_per_vhost_stores/1,
23         update_recovery_term/2,
24         read_global_recovery_terms/1,
25         cleanup_global_recovery_terms/0]).
26
27%% Used by rabbit_vhost to set the segment_entry_count.
28-export([all_queue_directory_names/1]).
29
30-define(CLEAN_FILENAME, "clean.dot").
31
32%%----------------------------------------------------------------------------
33
34%% The queue index is responsible for recording the order of messages
35%% within a queue on disk. As such it contains records of messages
36%% being published, delivered and acknowledged. The publish record
37%% includes the sequence ID, message ID and a small quantity of
38%% metadata about the message; the delivery and acknowledgement
39%% records just contain the sequence ID. A publish record may also
40%% contain the complete message if provided to publish/5; this allows
41%% the message store to be avoided altogether for small messages. In
42%% either case the publish record is stored in memory in the same
43%% serialised format it will take on disk.
44%%
45%% Because of the fact that the queue can decide at any point to send
46%% a queue entry to disk, you can not rely on publishes appearing in
47%% order. The only thing you can rely on is a message being published,
48%% then delivered, then ack'd.
49%%
50%% In order to be able to clean up ack'd messages, we write to segment
51%% files. These files have a fixed number of entries: segment_entry_count()
52%% publishes, delivers and acknowledgements. They are numbered, and so
53%% it is known that the 0th segment contains messages 0 ->
54%% segment_entry_count() - 1, the 1st segment contains messages
55%% segment_entry_count() -> 2*segment_entry_count() - 1 and so on. As
56%% such, in the segment files, we only refer to message sequence ids
57%% by the LSBs as SeqId rem segment_entry_count(). This gives them a
58%% fixed size.
59%%
60%% However, transient messages which are not sent to disk at any point
61%% will cause gaps to appear in segment files. Therefore, we delete a
62%% segment file whenever the number of publishes == number of acks
63%% (note that although it is not fully enforced, it is assumed that a
64%% message will never be ackd before it is delivered, thus this test
65%% also implies == number of delivers). In practise, this does not
66%% cause disk churn in the pathological case because of the journal
67%% and caching (see below).
68%%
69%% Because of the fact that publishes, delivers and acks can occur all
70%% over, we wish to avoid lots of seeking. Therefore we have a fixed
71%% sized journal to which all actions are appended. When the number of
72%% entries in this journal reaches max_journal_entries, the journal
73%% entries are scattered out to their relevant files, and the journal
74%% is truncated to zero size. Note that entries in the journal must
75%% carry the full sequence id, thus the format of entries in the
76%% journal is different to that in the segments.
77%%
78%% The journal is also kept fully in memory, pre-segmented: the state
79%% contains a mapping from segment numbers to state-per-segment (this
80%% state is held for all segments which have been "seen": thus a
81%% segment which has been read but has no pending entries in the
82%% journal is still held in this mapping. Also note that a map is
83%% used for this mapping, not an array because with an array, you will
84%% always have entries from 0). Actions are stored directly in this
85%% state. Thus at the point of flushing the journal, firstly no
86%% reading from disk is necessary, but secondly if the known number of
87%% acks and publishes in a segment are equal, given the known state of
88%% the segment file combined with the journal, no writing needs to be
89%% done to the segment file either (in fact it is deleted if it exists
90%% at all). This is safe given that the set of acks is a subset of the
91%% set of publishes. When it is necessary to sync messages, it is
92%% sufficient to fsync on the journal: when entries are distributed
93%% from the journal to segment files, those segments appended to are
94%% fsync'd prior to the journal being truncated.
95%%
96%% This module is also responsible for scanning the queue index files
97%% and seeding the message store on start up.
98%%
99%% Note that in general, the representation of a message's state as
100%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}),
101%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
102%% necessary for most operations. However, for startup, and to ensure
103%% the safe and correct combination of journal entries with entries
104%% read from the segment on disk, this richer representation vastly
105%% simplifies and clarifies the code.
106%%
107%% For notes on Clean Shutdown and startup, see documentation in
108%% rabbit_variable_queue.
109%%
110%%----------------------------------------------------------------------------
111
112%% ---- Journal details ----
113
114-define(JOURNAL_FILENAME, "journal.jif").
115-define(QUEUE_NAME_STUB_FILE, ".queue_name").
116
117-define(PUB_PERSIST_JPREFIX, 2#00).
118-define(PUB_TRANS_JPREFIX,   2#01).
119-define(DEL_JPREFIX,         2#10).
120-define(ACK_JPREFIX,         2#11).
121-define(JPREFIX_BITS, 2).
122-define(SEQ_BYTES, 8).
123-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)).
124
125%% ---- Segment details ----
126
127-define(SEGMENT_EXTENSION, ".idx").
128
129%% TODO: The segment size would be configurable, but deriving all the
130%% other values is quite hairy and quite possibly noticeably less
131%% efficient, depending on how clever the compiler is when it comes to
132%% binary generation/matching with constant vs variable lengths.
133
134-define(REL_SEQ_BITS, 14).
135
136%% seq only is binary 01 followed by 14 bits of rel seq id
137%% (range: 0 - 16383)
138-define(REL_SEQ_ONLY_PREFIX, 01).
139-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
140-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
141
142%% publish record is binary 1 followed by a bit for is_persistent,
143%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
144%% size and then 128 bits of md5sum msg id.
145-define(PUB_PREFIX, 1).
146-define(PUB_PREFIX_BITS, 1).
147
148-define(EXPIRY_BYTES, 8).
149-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
150-define(NO_EXPIRY, 0).
151
152-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
153-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
154
155%% This is the size of the message body content, for stats
156-define(SIZE_BYTES, 4).
157-define(SIZE_BITS, (?SIZE_BYTES * 8)).
158
159%% This is the size of the message record embedded in the queue
160%% index. If 0, the message can be found in the message store.
161-define(EMBEDDED_SIZE_BYTES, 4).
162-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)).
163
164%% 16 bytes for md5sum + 8 for expiry
165-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
166%% + 4 for size
167-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)).
168
169%% + 2 for seq, bits and prefix
170-define(PUB_RECORD_PREFIX_BYTES, 2).
171
172%% ---- misc ----
173
174-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin}
175
176-define(READ_MODE, [binary, raw, read]).
177-define(WRITE_MODE, [write | ?READ_MODE]).
178
179%%----------------------------------------------------------------------------
180
181-record(qistate, {
182  %% queue directory where segment and journal files are stored
183  dir,
184  %% map of #segment records
185  segments,
186  %% journal file handle obtained from/used by file_handle_cache
187  journal_handle,
188  %% how many not yet flushed entries are there
189  dirty_count,
190  %% this many not yet flushed journal entries will force a flush
191  max_journal_entries,
192  %% callback function invoked when a message is "handled"
193  %% by the index and potentially can be confirmed to the publisher
194  on_sync,
195  on_sync_msg,
196  %% set of IDs of unconfirmed [to publishers] messages
197  unconfirmed,
198  unconfirmed_msg,
199  %% optimisation
200  pre_publish_cache,
201  %% optimisation
202  delivered_cache,
203  %% queue name resource record
204  queue_name}).
205
206-record(segment, {
207  %% segment ID (an integer)
208  num,
209  %% segment file path (see also ?SEGMENT_EXTENSION)
210  path,
211  %% index operation log entries in this segment
212  journal_entries,
213  entries_to_segment,
214  %% counter of unacknowledged messages
215  unacked
216}).
217
218-include_lib("rabbit_common/include/rabbit.hrl").
219
220%%----------------------------------------------------------------------------
221
222-rabbit_upgrade({add_queue_ttl,  local, []}).
223-rabbit_upgrade({avoid_zeroes,   local, [add_queue_ttl]}).
224-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
225-rabbit_upgrade({store_msg,      local, [store_msg_size]}).
226
227-type hdl() :: ('undefined' | any()).
228-type segment() :: ('undefined' |
229                    #segment { num                :: non_neg_integer(),
230                               path               :: file:filename(),
231                               journal_entries    :: array:array(),
232                               entries_to_segment :: array:array(),
233                               unacked            :: non_neg_integer()
234                             }).
235-type seq_id() :: integer().
236-type seg_map() :: {map(), [segment()]}.
237-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
238-type qistate() :: #qistate { dir                 :: file:filename(),
239                              segments            :: 'undefined' | seg_map(),
240                              journal_handle      :: hdl(),
241                              dirty_count         :: integer(),
242                              max_journal_entries :: non_neg_integer(),
243                              on_sync             :: on_sync_fun(),
244                              on_sync_msg         :: on_sync_fun(),
245                              unconfirmed         :: gb_sets:set(),
246                              unconfirmed_msg     :: gb_sets:set(),
247                              pre_publish_cache   :: list(),
248                              delivered_cache     :: list()
249                            }.
250-type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()).
251-type walker(A) :: fun ((A) -> 'finished' |
252                               {rabbit_types:msg_id(), non_neg_integer(), A}).
253-type shutdown_terms() :: [term()] | 'non_clean_shutdown'.
254
255%%----------------------------------------------------------------------------
256%% public API
257%%----------------------------------------------------------------------------
258
259-spec erase(rabbit_amqqueue:name()) -> 'ok'.
260
261erase(#resource{ virtual_host = VHost } = Name) ->
262    VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
263    #qistate { dir = Dir } = blank_state(VHostDir, Name),
264    erase_index_dir(Dir).
265
266%% used during variable queue purge when there are no pending acks
267
268-spec reset_state(qistate()) -> qistate().
269
270reset_state(#qistate{ queue_name     = Name,
271                      dir            = Dir,
272                      on_sync        = OnSyncFun,
273                      on_sync_msg    = OnSyncMsgFun,
274                      journal_handle = JournalHdl }) ->
275    ok = case JournalHdl of
276             undefined -> ok;
277             _         -> file_handle_cache:close(JournalHdl)
278         end,
279    ok = erase_index_dir(Dir),
280    blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun).
281
282-spec init(rabbit_amqqueue:name(),
283                 on_sync_fun(), on_sync_fun()) -> qistate().
284
285init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
286    #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
287    put(segment_entry_count, SegmentEntryCount),
288    VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
289    State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
290    false = rabbit_file:is_file(Dir), %% is_file == is file or dir
291    State#qistate{on_sync     = OnSyncFun,
292                  on_sync_msg = OnSyncMsgFun}.
293
294-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
295                    contains_predicate(),
296                    on_sync_fun(), on_sync_fun()) ->
297                        {'undefined' | non_neg_integer(),
298                         'undefined' | non_neg_integer(), qistate()}.
299
300recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
301        ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
302    #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
303    put(segment_entry_count, SegmentEntryCount),
304    VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
305    State = blank_state(VHostDir, Name),
306    State1 = State #qistate{on_sync     = OnSyncFun,
307                            on_sync_msg = OnSyncMsgFun},
308    CleanShutdown = Terms /= non_clean_shutdown,
309    case CleanShutdown andalso MsgStoreRecovered of
310        true  -> case proplists:get_value(segments, Terms, non_clean_shutdown) of
311                     non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1);
312                     RecoveredCounts    -> init_clean(RecoveredCounts, State1)
313                 end;
314        false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
315    end.
316
317-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
318
319terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
320    {SegmentCounts, State1} = terminate(State),
321    rabbit_recovery_terms:store(VHost, filename:basename(Dir),
322                                [{segments, SegmentCounts} | Terms]),
323    State1.
324
325-spec delete_and_terminate(qistate()) -> qistate().
326
327delete_and_terminate(State) ->
328    {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
329    ok = rabbit_file:recursive_delete([Dir]),
330    State1.
331
332pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
333            State = #qistate{pre_publish_cache = PPC,
334                             delivered_cache   = DC}) ->
335    State1 = maybe_needs_confirming(MsgProps, MsgOrId, State),
336
337    {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
338
339    PPC1 =
340        [[<<(case IsPersistent of
341                true  -> ?PUB_PERSIST_JPREFIX;
342                false -> ?PUB_TRANS_JPREFIX
343            end):?JPREFIX_BITS,
344           SeqId:?SEQ_BITS, Bin/binary,
345           (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | PPC],
346
347    DC1 =
348        case IsDelivered of
349            true ->
350                [SeqId | DC];
351            false ->
352                DC
353        end,
354
355    State2 = add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1),
356    maybe_flush_pre_publish_cache(
357                     JournalSizeHint,
358                     State2#qistate{pre_publish_cache = PPC1,
359                                    delivered_cache   = DC1}).
360
361%% pre_publish_cache is the entry with most elements when compared to
362%% delivered_cache so we only check the former in the guard.
363maybe_flush_pre_publish_cache(JournalSizeHint,
364                              #qistate{pre_publish_cache = PPC} = State) ->
365    case length(PPC) >= segment_entry_count() of
366        true -> flush_pre_publish_cache(JournalSizeHint, State);
367        false -> State
368    end.
369
370flush_pre_publish_cache(JournalSizeHint, State) ->
371    State1 = flush_pre_publish_cache(State),
372    State2 = flush_delivered_cache(State1),
373    maybe_flush_journal(JournalSizeHint, State2).
374
375flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) ->
376    State;
377flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) ->
378    {JournalHdl, State1} = get_journal_handle(State),
379    file_handle_cache_stats:update(queue_index_journal_write),
380    ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)),
381    State1#qistate{pre_publish_cache = []}.
382
383flush_delivered_cache(#qistate{delivered_cache = []} = State) ->
384    State;
385flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
386    State1 = deliver(lists:reverse(DC), State),
387    State1#qistate{delivered_cache = []}.
388
389-spec publish(rabbit_types:msg_id(), seq_id(),
390                    rabbit_types:message_properties(), boolean(),
391                    non_neg_integer(), qistate()) -> qistate().
392
393publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) ->
394    {JournalHdl, State1} =
395        get_journal_handle(
396          maybe_needs_confirming(MsgProps, MsgOrId, State)),
397    file_handle_cache_stats:update(queue_index_journal_write),
398    {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
399    ok = file_handle_cache:append(
400           JournalHdl, [<<(case IsPersistent of
401                               true  -> ?PUB_PERSIST_JPREFIX;
402                               false -> ?PUB_TRANS_JPREFIX
403                           end):?JPREFIX_BITS,
404                          SeqId:?SEQ_BITS, Bin/binary,
405                          (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]),
406    maybe_flush_journal(
407      JournalSizeHint,
408      add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
409
410maybe_needs_confirming(MsgProps, MsgOrId,
411        State = #qistate{unconfirmed     = UC,
412                         unconfirmed_msg = UCM}) ->
413    MsgId = case MsgOrId of
414                #basic_message{id = Id} -> Id;
415                Id when is_binary(Id)   -> Id
416            end,
417    ?MSG_ID_BYTES = size(MsgId),
418    case {MsgProps#message_properties.needs_confirming, MsgOrId} of
419      {true,  MsgId} -> UC1  = gb_sets:add_element(MsgId, UC),
420                        State#qistate{unconfirmed     = UC1};
421      {true,  _}     -> UCM1 = gb_sets:add_element(MsgId, UCM),
422                        State#qistate{unconfirmed_msg = UCM1};
423      {false, _}     -> State
424    end.
425
426-spec deliver([seq_id()], qistate()) -> qistate().
427
428deliver(SeqIds, State) ->
429    deliver_or_ack(del, SeqIds, State).
430
431-spec ack([seq_id()], qistate()) -> qistate().
432
433ack(SeqIds, State) ->
434    deliver_or_ack(ack, SeqIds, State).
435
436%% This is called when there are outstanding confirms or when the
437%% queue is idle and the journal needs syncing (see needs_sync/1).
438
439-spec sync(qistate()) -> qistate().
440
441sync(State = #qistate { journal_handle = undefined }) ->
442    State;
443sync(State = #qistate { journal_handle = JournalHdl }) ->
444    ok = file_handle_cache:sync(JournalHdl),
445    notify_sync(State).
446
447-spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'.
448
449needs_sync(#qistate{journal_handle = undefined}) ->
450    false;
451needs_sync(#qistate{journal_handle  = JournalHdl,
452                    unconfirmed     = UC,
453                    unconfirmed_msg = UCM}) ->
454    case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of
455        true  -> case file_handle_cache:needs_sync(JournalHdl) of
456                     true  -> other;
457                     false -> false
458                 end;
459        false -> confirms
460    end.
461
462-spec flush(qistate()) -> qistate().
463
464flush(State = #qistate { dirty_count = 0 }) -> State;
465flush(State)                                -> flush_journal(State).
466
467-spec read(seq_id(), seq_id(), qistate()) ->
468                     {[{rabbit_types:msg_id(), seq_id(),
469                        rabbit_types:message_properties(),
470                        boolean(), boolean()}], qistate()}.
471
472read(StartEnd, StartEnd, State) ->
473    {[], State};
474read(Start, End, State = #qistate { segments = Segments,
475                                    dir = Dir }) when Start =< End ->
476    %% Start is inclusive, End is exclusive.
477    LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
478    UpperB = {EndSeg,   _EndRelSeq}   = seq_id_to_seg_and_rel_seq_id(End - 1),
479    {Messages, Segments1} =
480        lists:foldr(fun (Seg, Acc) ->
481                            read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir)
482                    end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
483    {Messages, State #qistate { segments = Segments1 }}.
484
485-spec next_segment_boundary(seq_id()) -> seq_id().
486
487next_segment_boundary(SeqId) ->
488    {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
489    reconstruct_seq_id(Seg + 1, 0).
490
491-spec bounds(qistate()) ->
492                       {non_neg_integer(), non_neg_integer(), qistate()}.
493
494bounds(State = #qistate { segments = Segments }) ->
495    %% This is not particularly efficient, but only gets invoked on
496    %% queue initialisation.
497    SegNums = lists:sort(segment_nums(Segments)),
498    %% Don't bother trying to figure out the lowest seq_id, merely the
499    %% seq_id of the start of the lowest segment. That seq_id may not
500    %% actually exist, but that's fine. The important thing is that
501    %% the segment exists and the seq_id reported is on a segment
502    %% boundary.
503    %%
504    %% We also don't really care about the max seq_id. Just start the
505    %% next segment: it makes life much easier.
506    %%
507    %% SegNums is sorted, ascending.
508    {LowSeqId, NextSeqId} =
509        case SegNums of
510            []         -> {0, 0};
511            [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0),
512                           reconstruct_seq_id(1 + lists:last(SegNums), 0)}
513        end,
514    {LowSeqId, NextSeqId, State}.
515
516-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
517
518start(VHost, DurableQueueNames) ->
519    ok = rabbit_recovery_terms:start(VHost),
520    {DurableTerms, DurableDirectories} =
521        lists:foldl(
522          fun(QName, {RecoveryTerms, ValidDirectories}) ->
523                  DirName = queue_name_to_dir_name(QName),
524                  RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
525                                     {error, _}  -> non_clean_shutdown;
526                                     {ok, Terms} -> Terms
527                                 end,
528                  {[RecoveryInfo | RecoveryTerms],
529                   sets:add_element(DirName, ValidDirectories)}
530          end, {[], sets:new()}, DurableQueueNames),
531    %% Any queue directory we've not been asked to recover is considered garbage
532    rabbit_file:recursive_delete(
533      [DirName ||
534        DirName <- all_queue_directory_names(VHost),
535        not sets:is_element(filename:basename(DirName), DurableDirectories)]),
536    rabbit_recovery_terms:clear(VHost),
537
538    %% The backing queue interface requires that the queue recovery terms
539    %% which come back from start/1 are in the same order as DurableQueueNames
540    OrderedTerms = lists:reverse(DurableTerms),
541    {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
542
543
544stop(VHost) -> rabbit_recovery_terms:stop(VHost).
545
546all_queue_directory_names(VHost) ->
547    filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost),
548                                    "queues", "*"])).
549
550all_queue_directory_names() ->
551    filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(),
552                                    "queues", "*"])).
553
554%%----------------------------------------------------------------------------
555%% startup and shutdown
556%%----------------------------------------------------------------------------
557
558erase_index_dir(Dir) ->
559    case rabbit_file:is_dir(Dir) of
560        true  -> rabbit_file:recursive_delete([Dir]);
561        false -> ok
562    end.
563
564blank_state(VHostDir, QueueName) ->
565    Dir = queue_dir(VHostDir, QueueName),
566    blank_state_name_dir_funs(QueueName,
567                         Dir,
568                         fun (_) -> ok end,
569                         fun (_) -> ok end).
570
571queue_dir(VHostDir, QueueName) ->
572    %% Queue directory is
573    %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue}
574    QueueDir = queue_name_to_dir_name(QueueName),
575    filename:join([VHostDir, "queues", QueueDir]).
576
577queue_name_to_dir_name(#resource { kind = queue,
578                                   virtual_host = VHost,
579                                   name = QName }) ->
580    <<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>),
581    rabbit_misc:format("~.36B", [Num]).
582
583queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) ->
584    <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)),
585    rabbit_misc:format("~.36B", [Num]).
586
587queues_base_dir() ->
588    rabbit_mnesia:dir().
589
590blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
591    {ok, MaxJournal} =
592        application:get_env(rabbit, queue_index_max_journal_entries),
593    #qistate { dir                 = Dir,
594               segments            = segments_new(),
595               journal_handle      = undefined,
596               dirty_count         = 0,
597               max_journal_entries = MaxJournal,
598               on_sync             = OnSyncFun,
599               on_sync_msg         = OnSyncMsgFun,
600               unconfirmed         = gb_sets:new(),
601               unconfirmed_msg     = gb_sets:new(),
602               pre_publish_cache   = [],
603               delivered_cache     = [],
604               queue_name          = Name }.
605
606init_clean(RecoveredCounts, State) ->
607    %% Load the journal. Since this is a clean recovery this (almost)
608    %% gets us back to where we were on shutdown.
609    State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State),
610    %% The journal loading only creates records for segments touched
611    %% by the journal, and the counts are based on the journal entries
612    %% only. We need *complete* counts for *all* segments. By an
613    %% amazing coincidence we stored that information on shutdown.
614    Segments1 =
615        lists:foldl(
616          fun ({Seg, UnackedCount}, SegmentsN) ->
617                  Segment = segment_find_or_new(Seg, Dir, SegmentsN),
618                  segment_store(Segment #segment { unacked = UnackedCount },
619                                SegmentsN)
620          end, Segments, RecoveredCounts),
621    %% the counts above include transient messages, which would be the
622    %% wrong thing to return
623    {undefined, undefined, State1 # qistate { segments = Segments1 }}.
624
625init_dirty(CleanShutdown, ContainsCheckFun, State) ->
626    %% Recover the journal completely. This will also load segments
627    %% which have entries in the journal and remove duplicates. The
628    %% counts will correctly reflect the combination of the segment
629    %% and the journal.
630    State1 = #qistate { dir = Dir, segments = Segments } =
631        recover_journal(State),
632    {Segments1, Count, Bytes, DirtyCount} =
633        %% Load each segment in turn and filter out messages that are
634        %% not in the msg_store, by adding acks to the journal. These
635        %% acks only go to the RAM journal as it doesn't matter if we
636        %% lose them. Also mark delivered if not clean shutdown. Also
637        %% find the number of unacked messages. Also accumulate the
638        %% dirty count here, so we can call maybe_flush_journal below
639        %% and avoid unnecessary file system operations.
640        lists:foldl(
641          fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) ->
642                  {{Segment = #segment { unacked = UnackedCount }, Dirty},
643                   UnackedBytes} =
644                      recover_segment(ContainsCheckFun, CleanShutdown,
645                                      segment_find_or_new(Seg, Dir, Segments2),
646                                      State1#qistate.max_journal_entries),
647                  {segment_store(Segment, Segments2),
648                   CountAcc + UnackedCount,
649                   BytesAcc + UnackedBytes, DirtyCount + Dirty}
650          end, {Segments, 0, 0, 0}, all_segment_nums(State1)),
651    State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
652                                                   dirty_count = DirtyCount }),
653    {Count, Bytes, State2}.
654
655terminate(State = #qistate { journal_handle = JournalHdl,
656                             segments = Segments }) ->
657    ok = case JournalHdl of
658             undefined -> ok;
659             _         -> file_handle_cache:close(JournalHdl)
660         end,
661    SegmentCounts =
662        segment_fold(
663          fun (#segment { num = Seg, unacked = UnackedCount }, Acc) ->
664                  [{Seg, UnackedCount} | Acc]
665          end, [], Segments),
666    {SegmentCounts, State #qistate { journal_handle = undefined,
667                                     segments = undefined }}.
668
669recover_segment(ContainsCheckFun, CleanShutdown,
670                Segment = #segment { journal_entries = JEntries }, MaxJournal) ->
671    {SegEntries, UnackedCount} = load_segment(false, Segment),
672    {SegEntries1, UnackedCountDelta} =
673        segment_plus_journal(SegEntries, JEntries),
674    array:sparse_foldl(
675      fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack},
676           {SegmentAndDirtyCount, Bytes}) ->
677              {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
678              {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
679                               Del, RelSeq, SegmentAndDirtyCount, MaxJournal),
680               Bytes + case IsPersistent of
681                           true  -> MsgProps#message_properties.size;
682                           false -> 0
683                       end}
684      end,
685      {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
686      SegEntries1).
687
688recover_message( true,  true,   _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
689    SegmentAndDirtyCount;
690recover_message( true, false,    del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
691    SegmentAndDirtyCount;
692recover_message( true, false, no_del,  RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
693    %% force to flush the segment
694    {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
695recover_message(false,     _,    del,  RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
696    {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
697recover_message(false,     _, no_del,  RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
698    {add_to_journal(RelSeq, ack,
699                    add_to_journal(RelSeq, del, Segment)),
700     DirtyCount + 2}.
701
702%%----------------------------------------------------------------------------
703%% msg store startup delta function
704%%----------------------------------------------------------------------------
705
706queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
707    {ok, Gatherer} = gatherer:start_link(),
708    [begin
709         ok = gatherer:fork(Gatherer),
710         ok = worker_pool:submit_async(
711                fun () -> link(Gatherer),
712                          ok = queue_index_walker_reader(QueueName, Gatherer),
713                          unlink(Gatherer),
714                          ok
715                end)
716     end || QueueName <- DurableQueues],
717    queue_index_walker({next, Gatherer});
718
719queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
720    case gatherer:out(Gatherer) of
721        empty ->
722            ok = gatherer:stop(Gatherer),
723            finished;
724        {value, {MsgId, Count}} ->
725            {MsgId, Count, {next, Gatherer}}
726    end.
727
728queue_index_walker_reader(QueueName, Gatherer) ->
729    ok = scan_queue_segments(
730           fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
731                 when is_binary(MsgId) ->
732                   gatherer:sync_in(Gatherer, {MsgId, 1});
733               (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
734                _IsAcked, Acc) ->
735                   Acc
736           end, ok, QueueName),
737    ok = gatherer:finish(Gatherer).
738
739scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
740    %% Set the segment_entry_count for this worker process.
741    #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost),
742    put(segment_entry_count, SegmentEntryCount),
743    VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
744    scan_queue_segments(Fun, Acc, VHostDir, QueueName).
745
746scan_queue_segments(Fun, Acc, VHostDir, QueueName) ->
747    State = #qistate { segments = Segments, dir = Dir } =
748        recover_journal(blank_state(VHostDir, QueueName)),
749    Result = lists:foldr(
750      fun (Seg, AccN) ->
751              segment_entries_foldr(
752                fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
753                              IsDelivered, IsAcked}, AccM) ->
754                        Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
755                            IsPersistent, IsDelivered, IsAcked, AccM)
756                end, AccN, segment_find_or_new(Seg, Dir, Segments))
757      end, Acc, all_segment_nums(State)),
758    {_SegmentCounts, _State} = terminate(State),
759    Result.
760
761%%----------------------------------------------------------------------------
762%% expiry/binary manipulation
763%%----------------------------------------------------------------------------
764
765create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
766                                                      size   = Size }) ->
767    ExpiryBin = expiry_to_binary(Expiry),
768    case MsgOrId of
769        MsgId when is_binary(MsgId) ->
770            {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, <<>>};
771        #basic_message{id = MsgId} ->
772            MsgBin = term_to_binary(MsgOrId),
773            {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, MsgBin}
774    end.
775
776expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
777expiry_to_binary(Expiry)    -> <<Expiry:?EXPIRY_BITS>>.
778
779parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
780                        Size:?SIZE_BITS>>, MsgBin) ->
781    %% work around for binary data fragmentation. See
782    %% rabbit_msg_file:read_next/2
783    <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
784    Props = #message_properties{expiry = case Expiry of
785                                             ?NO_EXPIRY -> undefined;
786                                             X          -> X
787                                         end,
788                                size   = Size},
789    case MsgBin of
790        <<>> -> {MsgId, Props};
791        _    -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin),
792                {Msg, Props}
793    end.
794
795%%----------------------------------------------------------------------------
796%% journal manipulation
797%%----------------------------------------------------------------------------
798
799add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
800                                                 segments = Segments,
801                                                 dir = Dir }) ->
802    {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
803    Segment = segment_find_or_new(Seg, Dir, Segments),
804    Segment1 = add_to_journal(RelSeq, Action, Segment),
805    State #qistate { dirty_count = DCount + 1,
806                     segments = segment_store(Segment1, Segments) };
807
808add_to_journal(RelSeq, Action,
809               Segment = #segment { journal_entries = JEntries,
810                                    entries_to_segment = EToSeg,
811                                    unacked = UnackedCount }) ->
812
813    {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries),
814
815    {JEntries1, EToSeg1} =
816        case Fun of
817            set ->
818                {array:set(RelSeq, Entry, JEntries),
819                 array:set(RelSeq, entry_to_segment(RelSeq, Entry, []),
820                           EToSeg)};
821            reset ->
822                {array:reset(RelSeq, JEntries),
823                 array:reset(RelSeq, EToSeg)}
824        end,
825
826    Segment #segment {
827      journal_entries = JEntries1,
828      entries_to_segment = EToSeg1,
829      unacked = UnackedCount + case Action of
830                                   ?PUB -> +1;
831                                   del  ->  0;
832                                   ack  -> -1
833                               end}.
834
835action_to_entry(RelSeq, Action, JEntries) ->
836    case array:get(RelSeq, JEntries) of
837        undefined ->
838            {set,
839             case Action of
840                 ?PUB -> {Action, no_del, no_ack};
841                 del  -> {no_pub,    del, no_ack};
842                 ack  -> {no_pub, no_del,    ack}
843             end};
844        ({Pub,    no_del, no_ack}) when Action == del ->
845            {set, {Pub,    del, no_ack}};
846        ({no_pub,    del, no_ack}) when Action == ack ->
847            {set, {no_pub, del,    ack}};
848        ({?PUB,      del, no_ack}) when Action == ack ->
849            {reset, none}
850    end.
851
852maybe_flush_journal(State) ->
853    maybe_flush_journal(infinity, State).
854
855maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount,
856                                             max_journal_entries = MaxJournal })
857  when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) ->
858    flush_journal(State);
859maybe_flush_journal(_Hint, State) ->
860    State.
861
862flush_journal(State = #qistate { segments = Segments }) ->
863    Segments1 =
864        segment_fold(
865          fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
866                  case rabbit_file:is_file(Path) of
867                      true  -> ok = rabbit_file:delete(Path);
868                      false -> ok
869                  end,
870                  SegmentsN;
871              (#segment {} = Segment, SegmentsN) ->
872                  segment_store(append_journal_to_segment(Segment), SegmentsN)
873          end, segments_new(), Segments),
874    {JournalHdl, State1} =
875        get_journal_handle(State #qistate { segments = Segments1 }),
876    ok = file_handle_cache:clear(JournalHdl),
877    notify_sync(State1 #qistate { dirty_count = 0 }).
878
879append_journal_to_segment(#segment { journal_entries = JEntries,
880                                     entries_to_segment = EToSeg,
881                                     path = Path } = Segment) ->
882    case array:sparse_size(JEntries) of
883        0 -> Segment;
884        _ ->
885            file_handle_cache_stats:update(queue_index_write),
886
887            {ok, Hdl} = file_handle_cache:open_with_absolute_path(
888                          Path, ?WRITE_MODE,
889                          [{write_buffer, infinity}]),
890            %% the file_handle_cache also does a list reverse, so this
891            %% might not be required here, but before we were doing a
892            %% sparse_foldr, a lists:reverse/1 seems to be the correct
893            %% thing to do for now.
894            file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))),
895            ok = file_handle_cache:close(Hdl),
896            Segment #segment { journal_entries    = array_new(),
897                               entries_to_segment = array_new([]) }
898    end.
899
900get_journal_handle(State = #qistate { journal_handle = undefined,
901                                      dir = Dir,
902                                      queue_name = Name }) ->
903    Path = filename:join(Dir, ?JOURNAL_FILENAME),
904    ok = rabbit_file:ensure_dir(Path),
905    ok = ensure_queue_name_stub_file(Dir, Name),
906    {ok, Hdl} = file_handle_cache:open_with_absolute_path(
907                  Path, ?WRITE_MODE, [{write_buffer, infinity}]),
908    {Hdl, State #qistate { journal_handle = Hdl }};
909get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
910    {Hdl, State}.
911
912%% Loading Journal. This isn't idempotent and will mess up the counts
913%% if you call it more than once on the same state. Assumes the counts
914%% are 0 to start with.
915load_journal(State = #qistate { dir = Dir }) ->
916    Path = filename:join(Dir, ?JOURNAL_FILENAME),
917    case rabbit_file:is_file(Path) of
918        true  -> {JournalHdl, State1} = get_journal_handle(State),
919                 Size = rabbit_file:file_size(Path),
920                 {ok, 0} = file_handle_cache:position(JournalHdl, 0),
921                 {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size),
922                 parse_journal_entries(JournalBin, State1);
923        false -> State
924    end.
925
926%% ditto
927recover_journal(State) ->
928    State1 = #qistate { segments = Segments } = load_journal(State),
929    Segments1 =
930        segment_map(
931          fun (Segment = #segment { journal_entries = JEntries,
932                                    entries_to_segment = EToSeg,
933                                    unacked = UnackedCountInJournal }) ->
934                  %% We want to keep ack'd entries in so that we can
935                  %% remove them if duplicates are in the journal. The
936                  %% counts here are purely from the segment itself.
937                  {SegEntries, UnackedCountInSeg} = load_segment(true, Segment),
938                  {JEntries1, EToSeg1, UnackedCountDuplicates} =
939                      journal_minus_segment(JEntries, EToSeg, SegEntries),
940                  Segment #segment { journal_entries = JEntries1,
941                                     entries_to_segment = EToSeg1,
942                                     unacked = (UnackedCountInJournal +
943                                                    UnackedCountInSeg -
944                                                    UnackedCountDuplicates) }
945          end, Segments),
946    State1 #qistate { segments = Segments1 }.
947
948parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
949                        Rest/binary>>, State) ->
950    parse_journal_entries(Rest, add_to_journal(SeqId, del, State));
951
952parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
953                        Rest/binary>>, State) ->
954    parse_journal_entries(Rest, add_to_journal(SeqId, ack, State));
955parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS,
956                        0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) ->
957    %% Journal entry composed only of zeroes was probably
958    %% produced during a dirty shutdown so stop reading
959    State;
960parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
961                        Bin:?PUB_RECORD_BODY_BYTES/binary,
962                        MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary,
963                        Rest/binary>>, State) ->
964    IsPersistent = case Prefix of
965                       ?PUB_PERSIST_JPREFIX -> true;
966                       ?PUB_TRANS_JPREFIX   -> false
967                   end,
968    parse_journal_entries(
969      Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State));
970parse_journal_entries(_ErrOrEoF, State) ->
971    State.
972
973deliver_or_ack(_Kind, [], State) ->
974    State;
975deliver_or_ack(Kind, SeqIds, State) ->
976    JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end,
977    {JournalHdl, State1} = get_journal_handle(State),
978    file_handle_cache_stats:update(queue_index_journal_write),
979    ok = file_handle_cache:append(
980           JournalHdl,
981           [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]),
982    maybe_flush_journal(lists:foldl(fun (SeqId, StateN) ->
983                                            add_to_journal(SeqId, Kind, StateN)
984                                    end, State1, SeqIds)).
985
986notify_sync(State = #qistate{unconfirmed     = UC,
987                             unconfirmed_msg = UCM,
988                             on_sync         = OnSyncFun,
989                             on_sync_msg     = OnSyncMsgFun}) ->
990    State1 = case gb_sets:is_empty(UC) of
991                 true  -> State;
992                 false -> OnSyncFun(UC),
993                          State#qistate{unconfirmed = gb_sets:new()}
994             end,
995    case gb_sets:is_empty(UCM) of
996        true  -> State1;
997        false -> OnSyncMsgFun(UCM),
998                 State1#qistate{unconfirmed_msg = gb_sets:new()}
999    end.
1000
1001%%----------------------------------------------------------------------------
1002%% segment manipulation
1003%%----------------------------------------------------------------------------
1004
1005seq_id_to_seg_and_rel_seq_id(SeqId) ->
1006    SegmentEntryCount = segment_entry_count(),
1007    { SeqId div SegmentEntryCount, SeqId rem SegmentEntryCount }.
1008
1009reconstruct_seq_id(Seg, RelSeq) ->
1010    (Seg * segment_entry_count()) + RelSeq.
1011
1012all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
1013    lists:sort(
1014      sets:to_list(
1015        lists:foldl(
1016          fun (SegName, Set) ->
1017                  sets:add_element(
1018                    list_to_integer(
1019                      lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end,
1020                                      SegName)), Set)
1021          end, sets:from_list(segment_nums(Segments)),
1022          rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)))).
1023
1024segment_find_or_new(Seg, Dir, Segments) ->
1025    case segment_find(Seg, Segments) of
1026        {ok, Segment} -> Segment;
1027        error         -> SegName = integer_to_list(Seg)  ++ ?SEGMENT_EXTENSION,
1028                         Path = filename:join(Dir, SegName),
1029                         #segment { num                = Seg,
1030                                    path               = Path,
1031                                    journal_entries    = array_new(),
1032                                    entries_to_segment = array_new([]),
1033                                    unacked            = 0 }
1034    end.
1035
1036segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
1037    {ok, Segment}; %% 1 or (2, matches head)
1038segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) ->
1039    {ok, Segment}; %% 2, matches tail
1040segment_find(Seg, {Segments, _}) -> %% no match
1041    maps:find(Seg, Segments).
1042
1043segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head)
1044              {Segments, [#segment { num = Seg } | Tail]}) ->
1045    {Segments, [Segment | Tail]};
1046segment_store(Segment = #segment { num = Seg }, %% 2, matches tail
1047              {Segments, [SegmentA, #segment { num = Seg }]}) ->
1048    {Segments, [Segment, SegmentA]};
1049segment_store(Segment = #segment { num = Seg }, {Segments, []}) ->
1050    {maps:remove(Seg, Segments), [Segment]};
1051segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) ->
1052    {maps:remove(Seg, Segments), [Segment, SegmentA]};
1053segment_store(Segment = #segment { num = Seg },
1054              {Segments, [SegmentA, SegmentB]}) ->
1055    {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)),
1056     [Segment, SegmentA]}.
1057
1058segment_fold(Fun, Acc, {Segments, CachedSegments}) ->
1059    maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end,
1060              lists:foldl(Fun, Acc, CachedSegments), Segments).
1061
1062segment_map(Fun, {Segments, CachedSegments}) ->
1063    {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments),
1064     lists:map(Fun, CachedSegments)}.
1065
1066segment_nums({Segments, CachedSegments}) ->
1067    lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++
1068        maps:keys(Segments).
1069
1070segments_new() ->
1071    {#{}, []}.
1072
1073entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) ->
1074    Initial;
1075entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) ->
1076    %% NB: we are assembling the segment in reverse order here, so
1077    %% del/ack comes first.
1078    Buf1 = case {Del, Ack} of
1079               {no_del, no_ack} ->
1080                   Initial;
1081               _ ->
1082                   Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
1083                              RelSeq:?REL_SEQ_BITS>>,
1084                   case {Del, Ack} of
1085                       {del, ack} -> [[Binary, Binary] | Initial];
1086                       _          -> [Binary | Initial]
1087                   end
1088           end,
1089    case Pub of
1090        no_pub ->
1091            Buf1;
1092        {IsPersistent, Bin, MsgBin} ->
1093            [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
1094                (bool_to_int(IsPersistent)):1,
1095                RelSeq:?REL_SEQ_BITS, Bin/binary,
1096                (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1]
1097    end.
1098
1099read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
1100                     {Messages, Segments}, Dir) ->
1101    Segment = segment_find_or_new(Seg, Dir, Segments),
1102    {segment_entries_foldr(
1103       fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
1104            Acc)
1105             when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
1106                  (Seg < EndSeg   orelse EndRelSeq   >= RelSeq) ->
1107               [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
1108                 IsPersistent, IsDelivered == del} | Acc];
1109           (_RelSeq, _Value, Acc) ->
1110               Acc
1111       end, Messages, Segment),
1112     segment_store(Segment, Segments)}.
1113
1114segment_entries_foldr(Fun, Init,
1115                      Segment = #segment { journal_entries = JEntries }) ->
1116    {SegEntries, _UnackedCount} = load_segment(false, Segment),
1117    {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
1118    array:sparse_foldr(
1119      fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) ->
1120              {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
1121              Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc)
1122      end, Init, SegEntries1).
1123
1124%% Loading segments
1125%%
1126%% Does not do any combining with the journal at all.
1127load_segment(KeepAcked, #segment { path = Path }) ->
1128    Empty = {array_new(), 0},
1129    case rabbit_file:is_file(Path) of
1130        false -> Empty;
1131        true  -> Size = rabbit_file:file_size(Path),
1132                 file_handle_cache_stats:update(queue_index_read),
1133                 {ok, Hdl} = file_handle_cache:open_with_absolute_path(
1134                               Path, ?READ_MODE, []),
1135                 {ok, 0} = file_handle_cache:position(Hdl, bof),
1136                 {ok, SegBin} = file_handle_cache:read(Hdl, Size),
1137                 ok = file_handle_cache:close(Hdl),
1138                 Res = parse_segment_entries(SegBin, KeepAcked, Empty),
1139                 Res
1140    end.
1141
1142parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS,
1143                        IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>,
1144                      KeepAcked, Acc) ->
1145    parse_segment_publish_entry(
1146      Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc);
1147parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
1148                       RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) ->
1149    parse_segment_entries(
1150      Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
1151parse_segment_entries(<<>>, _KeepAcked, Acc) ->
1152    Acc.
1153
1154parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary,
1155                              MsgSize:?EMBEDDED_SIZE_BITS,
1156                              MsgBin:MsgSize/binary, Rest/binary>>,
1157                            IsPersistent, RelSeq, KeepAcked,
1158                            {SegEntries, Unacked}) ->
1159    Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
1160    SegEntries1 = array:set(RelSeq, Obj, SegEntries),
1161    parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1});
1162parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) ->
1163    parse_segment_entries(Rest, KeepAcked, Acc).
1164
1165add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
1166    case array:get(RelSeq, SegEntries) of
1167        {Pub, no_del, no_ack} ->
1168            {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
1169        {Pub, del, no_ack} when KeepAcked ->
1170            {array:set(RelSeq, {Pub, del, ack},    SegEntries), Unacked - 1};
1171        {_Pub, del, no_ack} ->
1172            {array:reset(RelSeq,                   SegEntries), Unacked - 1}
1173    end.
1174
1175array_new() ->
1176    array_new(undefined).
1177
1178array_new(Default) ->
1179    array:new([{default, Default}, fixed, {size, segment_entry_count()}]).
1180
1181segment_entry_count() ->
1182    get(segment_entry_count).
1183
1184bool_to_int(true ) -> 1;
1185bool_to_int(false) -> 0.
1186
1187%%----------------------------------------------------------------------------
1188%% journal & segment combination
1189%%----------------------------------------------------------------------------
1190
1191%% Combine what we have just read from a segment file with what we're
1192%% holding for that segment in memory. There must be no duplicates.
1193segment_plus_journal(SegEntries, JEntries) ->
1194    array:sparse_foldl(
1195      fun (RelSeq, JObj, {SegEntriesOut, AdditionalUnacked}) ->
1196              SegEntry = array:get(RelSeq, SegEntriesOut),
1197              {Obj, AdditionalUnackedDelta} =
1198                  segment_plus_journal1(SegEntry, JObj),
1199              {case Obj of
1200                   undefined -> array:reset(RelSeq, SegEntriesOut);
1201                   _         -> array:set(RelSeq, Obj, SegEntriesOut)
1202               end,
1203               AdditionalUnacked + AdditionalUnackedDelta}
1204      end, {SegEntries, 0}, JEntries).
1205
1206%% Here, the result is a tuple with the first element containing the
1207%% item which we may be adding to (for items only in the journal),
1208%% modifying in (bits in both), or, when returning 'undefined',
1209%% erasing from (ack in journal, not segment) the segment array. The
1210%% other element of the tuple is the delta for AdditionalUnacked.
1211segment_plus_journal1(undefined, {?PUB, no_del, no_ack} = Obj) ->
1212    {Obj, 1};
1213segment_plus_journal1(undefined, {?PUB, del, no_ack} = Obj) ->
1214    {Obj, 1};
1215segment_plus_journal1(undefined, {?PUB, del, ack}) ->
1216    {undefined, 0};
1217
1218segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) ->
1219    {{Pub, del, no_ack}, 0};
1220segment_plus_journal1({?PUB, no_del, no_ack},       {no_pub, del, ack}) ->
1221    {undefined, -1};
1222segment_plus_journal1({?PUB, del, no_ack},          {no_pub, no_del, ack}) ->
1223    {undefined, -1}.
1224
1225%% Remove from the journal entries for a segment, items that are
1226%% duplicates of entries found in the segment itself. Used on start up
1227%% to clean up the journal.
1228%%
1229%% We need to update the entries_to_segment since they are just a
1230%% cache of what's on the journal.
1231journal_minus_segment(JEntries, EToSeg, SegEntries) ->
1232    array:sparse_foldl(
1233      fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) ->
1234              SegEntry = array:get(RelSeq, SegEntries),
1235              {Obj, UnackedRemovedDelta} =
1236                  journal_minus_segment1(JObj, SegEntry),
1237              {JEntriesOut1, EToSegOut1} =
1238                  case Obj of
1239                      keep      ->
1240                          {JEntriesOut, EToSegOut};
1241                      undefined ->
1242                          {array:reset(RelSeq, JEntriesOut),
1243                           array:reset(RelSeq, EToSegOut)};
1244                      _         ->
1245                          {array:set(RelSeq, Obj, JEntriesOut),
1246                           array:set(RelSeq, entry_to_segment(RelSeq, Obj, []),
1247                                     EToSegOut)}
1248                  end,
1249               {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta}
1250      end, {JEntries, EToSeg, 0}, JEntries).
1251
1252%% Here, the result is a tuple with the first element containing the
1253%% item we are adding to or modifying in the (initially fresh) journal
1254%% array. If the item is 'undefined' we leave the journal array
1255%% alone. The other element of the tuple is the deltas for
1256%% UnackedRemoved.
1257
1258%% Both the same. Must be at least the publish
1259journal_minus_segment1({?PUB, _Del, no_ack} = Obj, Obj) ->
1260    {undefined, 1};
1261journal_minus_segment1({?PUB, _Del, ack} = Obj,    Obj) ->
1262    {undefined, 0};
1263
1264%% Just publish in journal
1265journal_minus_segment1({?PUB, no_del, no_ack},     undefined) ->
1266    {keep, 0};
1267
1268%% Publish and deliver in journal
1269journal_minus_segment1({?PUB, del, no_ack},        undefined) ->
1270    {keep, 0};
1271journal_minus_segment1({?PUB = Pub, del, no_ack},  {Pub, no_del, no_ack}) ->
1272    {{no_pub, del, no_ack}, 1};
1273
1274%% Publish, deliver and ack in journal
1275journal_minus_segment1({?PUB, del, ack},           undefined) ->
1276    {keep, 0};
1277journal_minus_segment1({?PUB = Pub, del, ack},     {Pub, no_del, no_ack}) ->
1278    {{no_pub, del, ack}, 1};
1279journal_minus_segment1({?PUB = Pub, del, ack},     {Pub, del, no_ack}) ->
1280    {{no_pub, no_del, ack}, 1};
1281
1282%% Just deliver in journal
1283journal_minus_segment1({no_pub, del, no_ack},      {?PUB, no_del, no_ack}) ->
1284    {keep, 0};
1285journal_minus_segment1({no_pub, del, no_ack},      {?PUB, del, no_ack}) ->
1286    {undefined, 0};
1287
1288%% Just ack in journal
1289journal_minus_segment1({no_pub, no_del, ack},      {?PUB, del, no_ack}) ->
1290    {keep, 0};
1291journal_minus_segment1({no_pub, no_del, ack},      {?PUB, del, ack}) ->
1292    {undefined, -1};
1293
1294%% Deliver and ack in journal
1295journal_minus_segment1({no_pub, del, ack},         {?PUB, no_del, no_ack}) ->
1296    {keep, 0};
1297journal_minus_segment1({no_pub, del, ack},         {?PUB, del, no_ack}) ->
1298    {{no_pub, no_del, ack}, 0};
1299journal_minus_segment1({no_pub, del, ack},         {?PUB, del, ack}) ->
1300    {undefined, -1};
1301
1302%% Missing segment. If flush_journal/1 is interrupted after deleting
1303%% the segment but before truncating the journal we can get these
1304%% cases: a delivery and an acknowledgement in the journal, or just an
1305%% acknowledgement in the journal, but with no segment. In both cases
1306%% we have really forgotten the message; so ignore what's in the
1307%% journal.
1308journal_minus_segment1({no_pub, no_del, ack},      undefined) ->
1309    {undefined, 0};
1310journal_minus_segment1({no_pub, del, ack},         undefined) ->
1311    {undefined, 0}.
1312
1313%%----------------------------------------------------------------------------
1314%% upgrade
1315%%----------------------------------------------------------------------------
1316
1317-spec add_queue_ttl() -> 'ok'.
1318
1319add_queue_ttl() ->
1320    foreach_queue_index({fun add_queue_ttl_journal/1,
1321                         fun add_queue_ttl_segment/1}).
1322
1323add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1324                        Rest/binary>>) ->
1325    {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
1326add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1327                        Rest/binary>>) ->
1328    {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
1329add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1330                        MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) ->
1331    {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId,
1332      expiry_to_binary(undefined)], Rest};
1333add_queue_ttl_journal(_) ->
1334    stop.
1335
1336add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
1337                        RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary,
1338                        Rest/binary>>) ->
1339    {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>,
1340      MsgId, expiry_to_binary(undefined)], Rest};
1341add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
1342                        RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
1343    {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
1344     Rest};
1345add_queue_ttl_segment(_) ->
1346    stop.
1347
1348avoid_zeroes() ->
1349    foreach_queue_index({none, fun avoid_zeroes_segment/1}).
1350
1351avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS,  IsPersistentNum:1,
1352                       RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
1353                       Expiry:?EXPIRY_BITS, Rest/binary>>) ->
1354    {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
1355       MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest};
1356avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
1357                       RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
1358    {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
1359     Rest};
1360avoid_zeroes_segment(_) ->
1361    stop.
1362
1363%% At upgrade time we just define every message's size as 0 - that
1364%% will save us a load of faff with the message store, and means we
1365%% can actually use the clean recovery terms in VQ. It does mean we
1366%% don't count message bodies from before the migration, but we can
1367%% live with that.
1368store_msg_size() ->
1369    foreach_queue_index({fun store_msg_size_journal/1,
1370                         fun store_msg_size_segment/1}).
1371
1372store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1373                        Rest/binary>>) ->
1374    {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
1375store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1376                        Rest/binary>>) ->
1377    {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
1378store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1379                         MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
1380                         Rest/binary>>) ->
1381    {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
1382       Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
1383store_msg_size_journal(_) ->
1384    stop.
1385
1386store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
1387                         RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
1388                         Expiry:?EXPIRY_BITS, Rest/binary>>) ->
1389    {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
1390       MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
1391store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
1392                        RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
1393    {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
1394     Rest};
1395store_msg_size_segment(_) ->
1396    stop.
1397
1398store_msg() ->
1399    foreach_queue_index({fun store_msg_journal/1,
1400                         fun store_msg_segment/1}).
1401
1402store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1403                    Rest/binary>>) ->
1404    {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
1405store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1406                    Rest/binary>>) ->
1407    {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
1408store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
1409                    MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
1410                    Rest/binary>>) ->
1411    {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
1412       Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
1413       0:?EMBEDDED_SIZE_BITS>>, Rest};
1414store_msg_journal(_) ->
1415    stop.
1416
1417store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
1418                    RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
1419                    Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) ->
1420    {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
1421       MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
1422       0:?EMBEDDED_SIZE_BITS>>, Rest};
1423store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
1424                    RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
1425    {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
1426     Rest};
1427store_msg_segment(_) ->
1428    stop.
1429
1430
1431
1432%%----------------------------------------------------------------------------
1433%% Migration functions
1434%%----------------------------------------------------------------------------
1435
1436foreach_queue_index(Funs) ->
1437    QueueDirNames = all_queue_directory_names(),
1438    {ok, Gatherer} = gatherer:start_link(),
1439    [begin
1440         ok = gatherer:fork(Gatherer),
1441         ok = worker_pool:submit_async(
1442                fun () ->
1443                        transform_queue(QueueDirName, Gatherer, Funs)
1444                end)
1445     end || QueueDirName <- QueueDirNames],
1446    empty = gatherer:out(Gatherer),
1447    ok = gatherer:stop(Gatherer).
1448
1449transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
1450    ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
1451    [ok = transform_file(filename:join(Dir, Seg), SegmentFun)
1452     || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
1453    ok = gatherer:finish(Gatherer).
1454
1455transform_file(_Path, none) ->
1456    ok;
1457transform_file(Path, Fun) when is_function(Fun)->
1458    PathTmp = Path ++ ".upgrade",
1459    case rabbit_file:file_size(Path) of
1460        0    -> ok;
1461        Size -> {ok, PathTmpHdl} =
1462                    file_handle_cache:open_with_absolute_path(
1463                      PathTmp, ?WRITE_MODE,
1464                      [{write_buffer, infinity}]),
1465
1466                {ok, PathHdl} = file_handle_cache:open_with_absolute_path(
1467                                  Path, ?READ_MODE, [{read_buffer, Size}]),
1468                {ok, Content} = file_handle_cache:read(PathHdl, Size),
1469                ok = file_handle_cache:close(PathHdl),
1470
1471                ok = drive_transform_fun(Fun, PathTmpHdl, Content),
1472
1473                ok = file_handle_cache:close(PathTmpHdl),
1474                ok = rabbit_file:rename(PathTmp, Path)
1475    end.
1476
1477drive_transform_fun(Fun, Hdl, Contents) ->
1478    case Fun(Contents) of
1479        stop                -> ok;
1480        {Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output),
1481                               drive_transform_fun(Fun, Hdl, Contents1)
1482    end.
1483
1484move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) ->
1485    OldQueueDir = filename:join([queues_base_dir(), "queues",
1486                                 queue_name_to_dir_name_legacy(QueueName)]),
1487    VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
1488    NewQueueDir = queue_dir(VHostDir, QueueName),
1489    rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
1490                            [OldQueueDir, NewQueueDir]),
1491    case rabbit_file:is_dir(OldQueueDir) of
1492        true  ->
1493            ok = rabbit_file:ensure_dir(NewQueueDir),
1494            ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
1495            ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
1496        false ->
1497            Msg  = "Queue index directory '~s' not found for ~s",
1498            Args = [OldQueueDir, rabbit_misc:rs(QueueName)],
1499            rabbit_log_upgrade:error(Msg, Args),
1500            rabbit_log:error(Msg, Args)
1501    end,
1502    ok.
1503
1504ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) ->
1505    QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
1506    file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
1507                                     "QUEUE: ", QName/binary, "\n">>).
1508
1509read_global_recovery_terms(DurableQueueNames) ->
1510    ok = rabbit_recovery_terms:open_global_table(),
1511
1512    DurableTerms =
1513        lists:foldl(
1514          fun(QName, RecoveryTerms) ->
1515                  DirName = queue_name_to_dir_name_legacy(QName),
1516                  RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
1517                                     {error, _}  -> non_clean_shutdown;
1518                                     {ok, Terms} -> Terms
1519                                 end,
1520                  [RecoveryInfo | RecoveryTerms]
1521          end, [], DurableQueueNames),
1522
1523    ok = rabbit_recovery_terms:close_global_table(),
1524    %% The backing queue interface requires that the queue recovery terms
1525    %% which come back from start/1 are in the same order as DurableQueueNames
1526    OrderedTerms = lists:reverse(DurableTerms),
1527    {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
1528
1529cleanup_global_recovery_terms() ->
1530    rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
1531    rabbit_recovery_terms:delete_global_table(),
1532    ok.
1533
1534
1535update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
1536    Key = queue_name_to_dir_name(QueueName),
1537    rabbit_recovery_terms:store(VHost, Key, Term).
1538