1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_queue_index). 9 10-compile({inline, [segment_entry_count/0]}). 11 12-export([erase/1, init/3, reset_state/1, recover/6, 13 terminate/3, delete_and_terminate/1, 14 pre_publish/7, flush_pre_publish_cache/2, 15 publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, 16 read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]). 17 18-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). 19-export([scan_queue_segments/3, scan_queue_segments/4]). 20 21%% Migrates from global to per-vhost message stores 22-export([move_to_per_vhost_stores/1, 23 update_recovery_term/2, 24 read_global_recovery_terms/1, 25 cleanup_global_recovery_terms/0]). 26 27%% Used by rabbit_vhost to set the segment_entry_count. 28-export([all_queue_directory_names/1]). 29 30-define(CLEAN_FILENAME, "clean.dot"). 31 32%%---------------------------------------------------------------------------- 33 34%% The queue index is responsible for recording the order of messages 35%% within a queue on disk. As such it contains records of messages 36%% being published, delivered and acknowledged. The publish record 37%% includes the sequence ID, message ID and a small quantity of 38%% metadata about the message; the delivery and acknowledgement 39%% records just contain the sequence ID. A publish record may also 40%% contain the complete message if provided to publish/5; this allows 41%% the message store to be avoided altogether for small messages. In 42%% either case the publish record is stored in memory in the same 43%% serialised format it will take on disk. 44%% 45%% Because of the fact that the queue can decide at any point to send 46%% a queue entry to disk, you can not rely on publishes appearing in 47%% order. The only thing you can rely on is a message being published, 48%% then delivered, then ack'd. 49%% 50%% In order to be able to clean up ack'd messages, we write to segment 51%% files. These files have a fixed number of entries: segment_entry_count() 52%% publishes, delivers and acknowledgements. They are numbered, and so 53%% it is known that the 0th segment contains messages 0 -> 54%% segment_entry_count() - 1, the 1st segment contains messages 55%% segment_entry_count() -> 2*segment_entry_count() - 1 and so on. As 56%% such, in the segment files, we only refer to message sequence ids 57%% by the LSBs as SeqId rem segment_entry_count(). This gives them a 58%% fixed size. 59%% 60%% However, transient messages which are not sent to disk at any point 61%% will cause gaps to appear in segment files. Therefore, we delete a 62%% segment file whenever the number of publishes == number of acks 63%% (note that although it is not fully enforced, it is assumed that a 64%% message will never be ackd before it is delivered, thus this test 65%% also implies == number of delivers). In practise, this does not 66%% cause disk churn in the pathological case because of the journal 67%% and caching (see below). 68%% 69%% Because of the fact that publishes, delivers and acks can occur all 70%% over, we wish to avoid lots of seeking. Therefore we have a fixed 71%% sized journal to which all actions are appended. When the number of 72%% entries in this journal reaches max_journal_entries, the journal 73%% entries are scattered out to their relevant files, and the journal 74%% is truncated to zero size. Note that entries in the journal must 75%% carry the full sequence id, thus the format of entries in the 76%% journal is different to that in the segments. 77%% 78%% The journal is also kept fully in memory, pre-segmented: the state 79%% contains a mapping from segment numbers to state-per-segment (this 80%% state is held for all segments which have been "seen": thus a 81%% segment which has been read but has no pending entries in the 82%% journal is still held in this mapping. Also note that a map is 83%% used for this mapping, not an array because with an array, you will 84%% always have entries from 0). Actions are stored directly in this 85%% state. Thus at the point of flushing the journal, firstly no 86%% reading from disk is necessary, but secondly if the known number of 87%% acks and publishes in a segment are equal, given the known state of 88%% the segment file combined with the journal, no writing needs to be 89%% done to the segment file either (in fact it is deleted if it exists 90%% at all). This is safe given that the set of acks is a subset of the 91%% set of publishes. When it is necessary to sync messages, it is 92%% sufficient to fsync on the journal: when entries are distributed 93%% from the journal to segment files, those segments appended to are 94%% fsync'd prior to the journal being truncated. 95%% 96%% This module is also responsible for scanning the queue index files 97%% and seeding the message store on start up. 98%% 99%% Note that in general, the representation of a message's state as 100%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}), 101%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly 102%% necessary for most operations. However, for startup, and to ensure 103%% the safe and correct combination of journal entries with entries 104%% read from the segment on disk, this richer representation vastly 105%% simplifies and clarifies the code. 106%% 107%% For notes on Clean Shutdown and startup, see documentation in 108%% rabbit_variable_queue. 109%% 110%%---------------------------------------------------------------------------- 111 112%% ---- Journal details ---- 113 114-define(JOURNAL_FILENAME, "journal.jif"). 115-define(QUEUE_NAME_STUB_FILE, ".queue_name"). 116 117-define(PUB_PERSIST_JPREFIX, 2#00). 118-define(PUB_TRANS_JPREFIX, 2#01). 119-define(DEL_JPREFIX, 2#10). 120-define(ACK_JPREFIX, 2#11). 121-define(JPREFIX_BITS, 2). 122-define(SEQ_BYTES, 8). 123-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). 124 125%% ---- Segment details ---- 126 127-define(SEGMENT_EXTENSION, ".idx"). 128 129%% TODO: The segment size would be configurable, but deriving all the 130%% other values is quite hairy and quite possibly noticeably less 131%% efficient, depending on how clever the compiler is when it comes to 132%% binary generation/matching with constant vs variable lengths. 133 134-define(REL_SEQ_BITS, 14). 135 136%% seq only is binary 01 followed by 14 bits of rel seq id 137%% (range: 0 - 16383) 138-define(REL_SEQ_ONLY_PREFIX, 01). 139-define(REL_SEQ_ONLY_PREFIX_BITS, 2). 140-define(REL_SEQ_ONLY_RECORD_BYTES, 2). 141 142%% publish record is binary 1 followed by a bit for is_persistent, 143%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of 144%% size and then 128 bits of md5sum msg id. 145-define(PUB_PREFIX, 1). 146-define(PUB_PREFIX_BITS, 1). 147 148-define(EXPIRY_BYTES, 8). 149-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). 150-define(NO_EXPIRY, 0). 151 152-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes 153-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). 154 155%% This is the size of the message body content, for stats 156-define(SIZE_BYTES, 4). 157-define(SIZE_BITS, (?SIZE_BYTES * 8)). 158 159%% This is the size of the message record embedded in the queue 160%% index. If 0, the message can be found in the message store. 161-define(EMBEDDED_SIZE_BYTES, 4). 162-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)). 163 164%% 16 bytes for md5sum + 8 for expiry 165-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). 166%% + 4 for size 167-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)). 168 169%% + 2 for seq, bits and prefix 170-define(PUB_RECORD_PREFIX_BYTES, 2). 171 172%% ---- misc ---- 173 174-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin} 175 176-define(READ_MODE, [binary, raw, read]). 177-define(WRITE_MODE, [write | ?READ_MODE]). 178 179%%---------------------------------------------------------------------------- 180 181-record(qistate, { 182 %% queue directory where segment and journal files are stored 183 dir, 184 %% map of #segment records 185 segments, 186 %% journal file handle obtained from/used by file_handle_cache 187 journal_handle, 188 %% how many not yet flushed entries are there 189 dirty_count, 190 %% this many not yet flushed journal entries will force a flush 191 max_journal_entries, 192 %% callback function invoked when a message is "handled" 193 %% by the index and potentially can be confirmed to the publisher 194 on_sync, 195 on_sync_msg, 196 %% set of IDs of unconfirmed [to publishers] messages 197 unconfirmed, 198 unconfirmed_msg, 199 %% optimisation 200 pre_publish_cache, 201 %% optimisation 202 delivered_cache, 203 %% queue name resource record 204 queue_name}). 205 206-record(segment, { 207 %% segment ID (an integer) 208 num, 209 %% segment file path (see also ?SEGMENT_EXTENSION) 210 path, 211 %% index operation log entries in this segment 212 journal_entries, 213 entries_to_segment, 214 %% counter of unacknowledged messages 215 unacked 216}). 217 218-include_lib("rabbit_common/include/rabbit.hrl"). 219 220%%---------------------------------------------------------------------------- 221 222-rabbit_upgrade({add_queue_ttl, local, []}). 223-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). 224-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). 225-rabbit_upgrade({store_msg, local, [store_msg_size]}). 226 227-type hdl() :: ('undefined' | any()). 228-type segment() :: ('undefined' | 229 #segment { num :: non_neg_integer(), 230 path :: file:filename(), 231 journal_entries :: array:array(), 232 entries_to_segment :: array:array(), 233 unacked :: non_neg_integer() 234 }). 235-type seq_id() :: integer(). 236-type seg_map() :: {map(), [segment()]}. 237-type on_sync_fun() :: fun ((gb_sets:set()) -> ok). 238-type qistate() :: #qistate { dir :: file:filename(), 239 segments :: 'undefined' | seg_map(), 240 journal_handle :: hdl(), 241 dirty_count :: integer(), 242 max_journal_entries :: non_neg_integer(), 243 on_sync :: on_sync_fun(), 244 on_sync_msg :: on_sync_fun(), 245 unconfirmed :: gb_sets:set(), 246 unconfirmed_msg :: gb_sets:set(), 247 pre_publish_cache :: list(), 248 delivered_cache :: list() 249 }. 250-type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()). 251-type walker(A) :: fun ((A) -> 'finished' | 252 {rabbit_types:msg_id(), non_neg_integer(), A}). 253-type shutdown_terms() :: [term()] | 'non_clean_shutdown'. 254 255%%---------------------------------------------------------------------------- 256%% public API 257%%---------------------------------------------------------------------------- 258 259-spec erase(rabbit_amqqueue:name()) -> 'ok'. 260 261erase(#resource{ virtual_host = VHost } = Name) -> 262 VHostDir = rabbit_vhost:msg_store_dir_path(VHost), 263 #qistate { dir = Dir } = blank_state(VHostDir, Name), 264 erase_index_dir(Dir). 265 266%% used during variable queue purge when there are no pending acks 267 268-spec reset_state(qistate()) -> qistate(). 269 270reset_state(#qistate{ queue_name = Name, 271 dir = Dir, 272 on_sync = OnSyncFun, 273 on_sync_msg = OnSyncMsgFun, 274 journal_handle = JournalHdl }) -> 275 ok = case JournalHdl of 276 undefined -> ok; 277 _ -> file_handle_cache:close(JournalHdl) 278 end, 279 ok = erase_index_dir(Dir), 280 blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun). 281 282-spec init(rabbit_amqqueue:name(), 283 on_sync_fun(), on_sync_fun()) -> qistate(). 284 285init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) -> 286 #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), 287 put(segment_entry_count, SegmentEntryCount), 288 VHostDir = rabbit_vhost:msg_store_dir_path(VHost), 289 State = #qistate { dir = Dir } = blank_state(VHostDir, Name), 290 false = rabbit_file:is_file(Dir), %% is_file == is file or dir 291 State#qistate{on_sync = OnSyncFun, 292 on_sync_msg = OnSyncMsgFun}. 293 294-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(), 295 contains_predicate(), 296 on_sync_fun(), on_sync_fun()) -> 297 {'undefined' | non_neg_integer(), 298 'undefined' | non_neg_integer(), qistate()}. 299 300recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, 301 ContainsCheckFun, OnSyncFun, OnSyncMsgFun) -> 302 #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), 303 put(segment_entry_count, SegmentEntryCount), 304 VHostDir = rabbit_vhost:msg_store_dir_path(VHost), 305 State = blank_state(VHostDir, Name), 306 State1 = State #qistate{on_sync = OnSyncFun, 307 on_sync_msg = OnSyncMsgFun}, 308 CleanShutdown = Terms /= non_clean_shutdown, 309 case CleanShutdown andalso MsgStoreRecovered of 310 true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of 311 non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1); 312 RecoveredCounts -> init_clean(RecoveredCounts, State1) 313 end; 314 false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) 315 end. 316 317-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate(). 318 319terminate(VHost, Terms, State = #qistate { dir = Dir }) -> 320 {SegmentCounts, State1} = terminate(State), 321 rabbit_recovery_terms:store(VHost, filename:basename(Dir), 322 [{segments, SegmentCounts} | Terms]), 323 State1. 324 325-spec delete_and_terminate(qistate()) -> qistate(). 326 327delete_and_terminate(State) -> 328 {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), 329 ok = rabbit_file:recursive_delete([Dir]), 330 State1. 331 332pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint, 333 State = #qistate{pre_publish_cache = PPC, 334 delivered_cache = DC}) -> 335 State1 = maybe_needs_confirming(MsgProps, MsgOrId, State), 336 337 {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), 338 339 PPC1 = 340 [[<<(case IsPersistent of 341 true -> ?PUB_PERSIST_JPREFIX; 342 false -> ?PUB_TRANS_JPREFIX 343 end):?JPREFIX_BITS, 344 SeqId:?SEQ_BITS, Bin/binary, 345 (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | PPC], 346 347 DC1 = 348 case IsDelivered of 349 true -> 350 [SeqId | DC]; 351 false -> 352 DC 353 end, 354 355 State2 = add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1), 356 maybe_flush_pre_publish_cache( 357 JournalSizeHint, 358 State2#qistate{pre_publish_cache = PPC1, 359 delivered_cache = DC1}). 360 361%% pre_publish_cache is the entry with most elements when compared to 362%% delivered_cache so we only check the former in the guard. 363maybe_flush_pre_publish_cache(JournalSizeHint, 364 #qistate{pre_publish_cache = PPC} = State) -> 365 case length(PPC) >= segment_entry_count() of 366 true -> flush_pre_publish_cache(JournalSizeHint, State); 367 false -> State 368 end. 369 370flush_pre_publish_cache(JournalSizeHint, State) -> 371 State1 = flush_pre_publish_cache(State), 372 State2 = flush_delivered_cache(State1), 373 maybe_flush_journal(JournalSizeHint, State2). 374 375flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) -> 376 State; 377flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) -> 378 {JournalHdl, State1} = get_journal_handle(State), 379 file_handle_cache_stats:update(queue_index_journal_write), 380 ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)), 381 State1#qistate{pre_publish_cache = []}. 382 383flush_delivered_cache(#qistate{delivered_cache = []} = State) -> 384 State; 385flush_delivered_cache(State = #qistate{delivered_cache = DC}) -> 386 State1 = deliver(lists:reverse(DC), State), 387 State1#qistate{delivered_cache = []}. 388 389-spec publish(rabbit_types:msg_id(), seq_id(), 390 rabbit_types:message_properties(), boolean(), 391 non_neg_integer(), qistate()) -> qistate(). 392 393publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) -> 394 {JournalHdl, State1} = 395 get_journal_handle( 396 maybe_needs_confirming(MsgProps, MsgOrId, State)), 397 file_handle_cache_stats:update(queue_index_journal_write), 398 {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), 399 ok = file_handle_cache:append( 400 JournalHdl, [<<(case IsPersistent of 401 true -> ?PUB_PERSIST_JPREFIX; 402 false -> ?PUB_TRANS_JPREFIX 403 end):?JPREFIX_BITS, 404 SeqId:?SEQ_BITS, Bin/binary, 405 (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]), 406 maybe_flush_journal( 407 JournalSizeHint, 408 add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). 409 410maybe_needs_confirming(MsgProps, MsgOrId, 411 State = #qistate{unconfirmed = UC, 412 unconfirmed_msg = UCM}) -> 413 MsgId = case MsgOrId of 414 #basic_message{id = Id} -> Id; 415 Id when is_binary(Id) -> Id 416 end, 417 ?MSG_ID_BYTES = size(MsgId), 418 case {MsgProps#message_properties.needs_confirming, MsgOrId} of 419 {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), 420 State#qistate{unconfirmed = UC1}; 421 {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), 422 State#qistate{unconfirmed_msg = UCM1}; 423 {false, _} -> State 424 end. 425 426-spec deliver([seq_id()], qistate()) -> qistate(). 427 428deliver(SeqIds, State) -> 429 deliver_or_ack(del, SeqIds, State). 430 431-spec ack([seq_id()], qistate()) -> qistate(). 432 433ack(SeqIds, State) -> 434 deliver_or_ack(ack, SeqIds, State). 435 436%% This is called when there are outstanding confirms or when the 437%% queue is idle and the journal needs syncing (see needs_sync/1). 438 439-spec sync(qistate()) -> qistate(). 440 441sync(State = #qistate { journal_handle = undefined }) -> 442 State; 443sync(State = #qistate { journal_handle = JournalHdl }) -> 444 ok = file_handle_cache:sync(JournalHdl), 445 notify_sync(State). 446 447-spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'. 448 449needs_sync(#qistate{journal_handle = undefined}) -> 450 false; 451needs_sync(#qistate{journal_handle = JournalHdl, 452 unconfirmed = UC, 453 unconfirmed_msg = UCM}) -> 454 case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of 455 true -> case file_handle_cache:needs_sync(JournalHdl) of 456 true -> other; 457 false -> false 458 end; 459 false -> confirms 460 end. 461 462-spec flush(qistate()) -> qistate(). 463 464flush(State = #qistate { dirty_count = 0 }) -> State; 465flush(State) -> flush_journal(State). 466 467-spec read(seq_id(), seq_id(), qistate()) -> 468 {[{rabbit_types:msg_id(), seq_id(), 469 rabbit_types:message_properties(), 470 boolean(), boolean()}], qistate()}. 471 472read(StartEnd, StartEnd, State) -> 473 {[], State}; 474read(Start, End, State = #qistate { segments = Segments, 475 dir = Dir }) when Start =< End -> 476 %% Start is inclusive, End is exclusive. 477 LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), 478 UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), 479 {Messages, Segments1} = 480 lists:foldr(fun (Seg, Acc) -> 481 read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir) 482 end, {[], Segments}, lists:seq(StartSeg, EndSeg)), 483 {Messages, State #qistate { segments = Segments1 }}. 484 485-spec next_segment_boundary(seq_id()) -> seq_id(). 486 487next_segment_boundary(SeqId) -> 488 {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), 489 reconstruct_seq_id(Seg + 1, 0). 490 491-spec bounds(qistate()) -> 492 {non_neg_integer(), non_neg_integer(), qistate()}. 493 494bounds(State = #qistate { segments = Segments }) -> 495 %% This is not particularly efficient, but only gets invoked on 496 %% queue initialisation. 497 SegNums = lists:sort(segment_nums(Segments)), 498 %% Don't bother trying to figure out the lowest seq_id, merely the 499 %% seq_id of the start of the lowest segment. That seq_id may not 500 %% actually exist, but that's fine. The important thing is that 501 %% the segment exists and the seq_id reported is on a segment 502 %% boundary. 503 %% 504 %% We also don't really care about the max seq_id. Just start the 505 %% next segment: it makes life much easier. 506 %% 507 %% SegNums is sorted, ascending. 508 {LowSeqId, NextSeqId} = 509 case SegNums of 510 [] -> {0, 0}; 511 [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), 512 reconstruct_seq_id(1 + lists:last(SegNums), 0)} 513 end, 514 {LowSeqId, NextSeqId, State}. 515 516-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. 517 518start(VHost, DurableQueueNames) -> 519 ok = rabbit_recovery_terms:start(VHost), 520 {DurableTerms, DurableDirectories} = 521 lists:foldl( 522 fun(QName, {RecoveryTerms, ValidDirectories}) -> 523 DirName = queue_name_to_dir_name(QName), 524 RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of 525 {error, _} -> non_clean_shutdown; 526 {ok, Terms} -> Terms 527 end, 528 {[RecoveryInfo | RecoveryTerms], 529 sets:add_element(DirName, ValidDirectories)} 530 end, {[], sets:new()}, DurableQueueNames), 531 %% Any queue directory we've not been asked to recover is considered garbage 532 rabbit_file:recursive_delete( 533 [DirName || 534 DirName <- all_queue_directory_names(VHost), 535 not sets:is_element(filename:basename(DirName), DurableDirectories)]), 536 rabbit_recovery_terms:clear(VHost), 537 538 %% The backing queue interface requires that the queue recovery terms 539 %% which come back from start/1 are in the same order as DurableQueueNames 540 OrderedTerms = lists:reverse(DurableTerms), 541 {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. 542 543 544stop(VHost) -> rabbit_recovery_terms:stop(VHost). 545 546all_queue_directory_names(VHost) -> 547 filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost), 548 "queues", "*"])). 549 550all_queue_directory_names() -> 551 filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(), 552 "queues", "*"])). 553 554%%---------------------------------------------------------------------------- 555%% startup and shutdown 556%%---------------------------------------------------------------------------- 557 558erase_index_dir(Dir) -> 559 case rabbit_file:is_dir(Dir) of 560 true -> rabbit_file:recursive_delete([Dir]); 561 false -> ok 562 end. 563 564blank_state(VHostDir, QueueName) -> 565 Dir = queue_dir(VHostDir, QueueName), 566 blank_state_name_dir_funs(QueueName, 567 Dir, 568 fun (_) -> ok end, 569 fun (_) -> ok end). 570 571queue_dir(VHostDir, QueueName) -> 572 %% Queue directory is 573 %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue} 574 QueueDir = queue_name_to_dir_name(QueueName), 575 filename:join([VHostDir, "queues", QueueDir]). 576 577queue_name_to_dir_name(#resource { kind = queue, 578 virtual_host = VHost, 579 name = QName }) -> 580 <<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>), 581 rabbit_misc:format("~.36B", [Num]). 582 583queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) -> 584 <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)), 585 rabbit_misc:format("~.36B", [Num]). 586 587queues_base_dir() -> 588 rabbit_mnesia:dir(). 589 590blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) -> 591 {ok, MaxJournal} = 592 application:get_env(rabbit, queue_index_max_journal_entries), 593 #qistate { dir = Dir, 594 segments = segments_new(), 595 journal_handle = undefined, 596 dirty_count = 0, 597 max_journal_entries = MaxJournal, 598 on_sync = OnSyncFun, 599 on_sync_msg = OnSyncMsgFun, 600 unconfirmed = gb_sets:new(), 601 unconfirmed_msg = gb_sets:new(), 602 pre_publish_cache = [], 603 delivered_cache = [], 604 queue_name = Name }. 605 606init_clean(RecoveredCounts, State) -> 607 %% Load the journal. Since this is a clean recovery this (almost) 608 %% gets us back to where we were on shutdown. 609 State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), 610 %% The journal loading only creates records for segments touched 611 %% by the journal, and the counts are based on the journal entries 612 %% only. We need *complete* counts for *all* segments. By an 613 %% amazing coincidence we stored that information on shutdown. 614 Segments1 = 615 lists:foldl( 616 fun ({Seg, UnackedCount}, SegmentsN) -> 617 Segment = segment_find_or_new(Seg, Dir, SegmentsN), 618 segment_store(Segment #segment { unacked = UnackedCount }, 619 SegmentsN) 620 end, Segments, RecoveredCounts), 621 %% the counts above include transient messages, which would be the 622 %% wrong thing to return 623 {undefined, undefined, State1 # qistate { segments = Segments1 }}. 624 625init_dirty(CleanShutdown, ContainsCheckFun, State) -> 626 %% Recover the journal completely. This will also load segments 627 %% which have entries in the journal and remove duplicates. The 628 %% counts will correctly reflect the combination of the segment 629 %% and the journal. 630 State1 = #qistate { dir = Dir, segments = Segments } = 631 recover_journal(State), 632 {Segments1, Count, Bytes, DirtyCount} = 633 %% Load each segment in turn and filter out messages that are 634 %% not in the msg_store, by adding acks to the journal. These 635 %% acks only go to the RAM journal as it doesn't matter if we 636 %% lose them. Also mark delivered if not clean shutdown. Also 637 %% find the number of unacked messages. Also accumulate the 638 %% dirty count here, so we can call maybe_flush_journal below 639 %% and avoid unnecessary file system operations. 640 lists:foldl( 641 fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) -> 642 {{Segment = #segment { unacked = UnackedCount }, Dirty}, 643 UnackedBytes} = 644 recover_segment(ContainsCheckFun, CleanShutdown, 645 segment_find_or_new(Seg, Dir, Segments2), 646 State1#qistate.max_journal_entries), 647 {segment_store(Segment, Segments2), 648 CountAcc + UnackedCount, 649 BytesAcc + UnackedBytes, DirtyCount + Dirty} 650 end, {Segments, 0, 0, 0}, all_segment_nums(State1)), 651 State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, 652 dirty_count = DirtyCount }), 653 {Count, Bytes, State2}. 654 655terminate(State = #qistate { journal_handle = JournalHdl, 656 segments = Segments }) -> 657 ok = case JournalHdl of 658 undefined -> ok; 659 _ -> file_handle_cache:close(JournalHdl) 660 end, 661 SegmentCounts = 662 segment_fold( 663 fun (#segment { num = Seg, unacked = UnackedCount }, Acc) -> 664 [{Seg, UnackedCount} | Acc] 665 end, [], Segments), 666 {SegmentCounts, State #qistate { journal_handle = undefined, 667 segments = undefined }}. 668 669recover_segment(ContainsCheckFun, CleanShutdown, 670 Segment = #segment { journal_entries = JEntries }, MaxJournal) -> 671 {SegEntries, UnackedCount} = load_segment(false, Segment), 672 {SegEntries1, UnackedCountDelta} = 673 segment_plus_journal(SegEntries, JEntries), 674 array:sparse_foldl( 675 fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack}, 676 {SegmentAndDirtyCount, Bytes}) -> 677 {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), 678 {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown, 679 Del, RelSeq, SegmentAndDirtyCount, MaxJournal), 680 Bytes + case IsPersistent of 681 true -> MsgProps#message_properties.size; 682 false -> 0 683 end} 684 end, 685 {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0}, 686 SegEntries1). 687 688recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) -> 689 SegmentAndDirtyCount; 690recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) -> 691 SegmentAndDirtyCount; 692recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) -> 693 %% force to flush the segment 694 {add_to_journal(RelSeq, del, Segment), MaxJournal + 1}; 695recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> 696 {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; 697recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> 698 {add_to_journal(RelSeq, ack, 699 add_to_journal(RelSeq, del, Segment)), 700 DirtyCount + 2}. 701 702%%---------------------------------------------------------------------------- 703%% msg store startup delta function 704%%---------------------------------------------------------------------------- 705 706queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> 707 {ok, Gatherer} = gatherer:start_link(), 708 [begin 709 ok = gatherer:fork(Gatherer), 710 ok = worker_pool:submit_async( 711 fun () -> link(Gatherer), 712 ok = queue_index_walker_reader(QueueName, Gatherer), 713 unlink(Gatherer), 714 ok 715 end) 716 end || QueueName <- DurableQueues], 717 queue_index_walker({next, Gatherer}); 718 719queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> 720 case gatherer:out(Gatherer) of 721 empty -> 722 ok = gatherer:stop(Gatherer), 723 finished; 724 {value, {MsgId, Count}} -> 725 {MsgId, Count, {next, Gatherer}} 726 end. 727 728queue_index_walker_reader(QueueName, Gatherer) -> 729 ok = scan_queue_segments( 730 fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) 731 when is_binary(MsgId) -> 732 gatherer:sync_in(Gatherer, {MsgId, 1}); 733 (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, 734 _IsAcked, Acc) -> 735 Acc 736 end, ok, QueueName), 737 ok = gatherer:finish(Gatherer). 738 739scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) -> 740 %% Set the segment_entry_count for this worker process. 741 #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), 742 put(segment_entry_count, SegmentEntryCount), 743 VHostDir = rabbit_vhost:msg_store_dir_path(VHost), 744 scan_queue_segments(Fun, Acc, VHostDir, QueueName). 745 746scan_queue_segments(Fun, Acc, VHostDir, QueueName) -> 747 State = #qistate { segments = Segments, dir = Dir } = 748 recover_journal(blank_state(VHostDir, QueueName)), 749 Result = lists:foldr( 750 fun (Seg, AccN) -> 751 segment_entries_foldr( 752 fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, 753 IsDelivered, IsAcked}, AccM) -> 754 Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, 755 IsPersistent, IsDelivered, IsAcked, AccM) 756 end, AccN, segment_find_or_new(Seg, Dir, Segments)) 757 end, Acc, all_segment_nums(State)), 758 {_SegmentCounts, _State} = terminate(State), 759 Result. 760 761%%---------------------------------------------------------------------------- 762%% expiry/binary manipulation 763%%---------------------------------------------------------------------------- 764 765create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, 766 size = Size }) -> 767 ExpiryBin = expiry_to_binary(Expiry), 768 case MsgOrId of 769 MsgId when is_binary(MsgId) -> 770 {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, <<>>}; 771 #basic_message{id = MsgId} -> 772 MsgBin = term_to_binary(MsgOrId), 773 {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, MsgBin} 774 end. 775 776expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; 777expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. 778 779parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 780 Size:?SIZE_BITS>>, MsgBin) -> 781 %% work around for binary data fragmentation. See 782 %% rabbit_msg_file:read_next/2 783 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, 784 Props = #message_properties{expiry = case Expiry of 785 ?NO_EXPIRY -> undefined; 786 X -> X 787 end, 788 size = Size}, 789 case MsgBin of 790 <<>> -> {MsgId, Props}; 791 _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), 792 {Msg, Props} 793 end. 794 795%%---------------------------------------------------------------------------- 796%% journal manipulation 797%%---------------------------------------------------------------------------- 798 799add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, 800 segments = Segments, 801 dir = Dir }) -> 802 {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), 803 Segment = segment_find_or_new(Seg, Dir, Segments), 804 Segment1 = add_to_journal(RelSeq, Action, Segment), 805 State #qistate { dirty_count = DCount + 1, 806 segments = segment_store(Segment1, Segments) }; 807 808add_to_journal(RelSeq, Action, 809 Segment = #segment { journal_entries = JEntries, 810 entries_to_segment = EToSeg, 811 unacked = UnackedCount }) -> 812 813 {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries), 814 815 {JEntries1, EToSeg1} = 816 case Fun of 817 set -> 818 {array:set(RelSeq, Entry, JEntries), 819 array:set(RelSeq, entry_to_segment(RelSeq, Entry, []), 820 EToSeg)}; 821 reset -> 822 {array:reset(RelSeq, JEntries), 823 array:reset(RelSeq, EToSeg)} 824 end, 825 826 Segment #segment { 827 journal_entries = JEntries1, 828 entries_to_segment = EToSeg1, 829 unacked = UnackedCount + case Action of 830 ?PUB -> +1; 831 del -> 0; 832 ack -> -1 833 end}. 834 835action_to_entry(RelSeq, Action, JEntries) -> 836 case array:get(RelSeq, JEntries) of 837 undefined -> 838 {set, 839 case Action of 840 ?PUB -> {Action, no_del, no_ack}; 841 del -> {no_pub, del, no_ack}; 842 ack -> {no_pub, no_del, ack} 843 end}; 844 ({Pub, no_del, no_ack}) when Action == del -> 845 {set, {Pub, del, no_ack}}; 846 ({no_pub, del, no_ack}) when Action == ack -> 847 {set, {no_pub, del, ack}}; 848 ({?PUB, del, no_ack}) when Action == ack -> 849 {reset, none} 850 end. 851 852maybe_flush_journal(State) -> 853 maybe_flush_journal(infinity, State). 854 855maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount, 856 max_journal_entries = MaxJournal }) 857 when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) -> 858 flush_journal(State); 859maybe_flush_journal(_Hint, State) -> 860 State. 861 862flush_journal(State = #qistate { segments = Segments }) -> 863 Segments1 = 864 segment_fold( 865 fun (#segment { unacked = 0, path = Path }, SegmentsN) -> 866 case rabbit_file:is_file(Path) of 867 true -> ok = rabbit_file:delete(Path); 868 false -> ok 869 end, 870 SegmentsN; 871 (#segment {} = Segment, SegmentsN) -> 872 segment_store(append_journal_to_segment(Segment), SegmentsN) 873 end, segments_new(), Segments), 874 {JournalHdl, State1} = 875 get_journal_handle(State #qistate { segments = Segments1 }), 876 ok = file_handle_cache:clear(JournalHdl), 877 notify_sync(State1 #qistate { dirty_count = 0 }). 878 879append_journal_to_segment(#segment { journal_entries = JEntries, 880 entries_to_segment = EToSeg, 881 path = Path } = Segment) -> 882 case array:sparse_size(JEntries) of 883 0 -> Segment; 884 _ -> 885 file_handle_cache_stats:update(queue_index_write), 886 887 {ok, Hdl} = file_handle_cache:open_with_absolute_path( 888 Path, ?WRITE_MODE, 889 [{write_buffer, infinity}]), 890 %% the file_handle_cache also does a list reverse, so this 891 %% might not be required here, but before we were doing a 892 %% sparse_foldr, a lists:reverse/1 seems to be the correct 893 %% thing to do for now. 894 file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))), 895 ok = file_handle_cache:close(Hdl), 896 Segment #segment { journal_entries = array_new(), 897 entries_to_segment = array_new([]) } 898 end. 899 900get_journal_handle(State = #qistate { journal_handle = undefined, 901 dir = Dir, 902 queue_name = Name }) -> 903 Path = filename:join(Dir, ?JOURNAL_FILENAME), 904 ok = rabbit_file:ensure_dir(Path), 905 ok = ensure_queue_name_stub_file(Dir, Name), 906 {ok, Hdl} = file_handle_cache:open_with_absolute_path( 907 Path, ?WRITE_MODE, [{write_buffer, infinity}]), 908 {Hdl, State #qistate { journal_handle = Hdl }}; 909get_journal_handle(State = #qistate { journal_handle = Hdl }) -> 910 {Hdl, State}. 911 912%% Loading Journal. This isn't idempotent and will mess up the counts 913%% if you call it more than once on the same state. Assumes the counts 914%% are 0 to start with. 915load_journal(State = #qistate { dir = Dir }) -> 916 Path = filename:join(Dir, ?JOURNAL_FILENAME), 917 case rabbit_file:is_file(Path) of 918 true -> {JournalHdl, State1} = get_journal_handle(State), 919 Size = rabbit_file:file_size(Path), 920 {ok, 0} = file_handle_cache:position(JournalHdl, 0), 921 {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size), 922 parse_journal_entries(JournalBin, State1); 923 false -> State 924 end. 925 926%% ditto 927recover_journal(State) -> 928 State1 = #qistate { segments = Segments } = load_journal(State), 929 Segments1 = 930 segment_map( 931 fun (Segment = #segment { journal_entries = JEntries, 932 entries_to_segment = EToSeg, 933 unacked = UnackedCountInJournal }) -> 934 %% We want to keep ack'd entries in so that we can 935 %% remove them if duplicates are in the journal. The 936 %% counts here are purely from the segment itself. 937 {SegEntries, UnackedCountInSeg} = load_segment(true, Segment), 938 {JEntries1, EToSeg1, UnackedCountDuplicates} = 939 journal_minus_segment(JEntries, EToSeg, SegEntries), 940 Segment #segment { journal_entries = JEntries1, 941 entries_to_segment = EToSeg1, 942 unacked = (UnackedCountInJournal + 943 UnackedCountInSeg - 944 UnackedCountDuplicates) } 945 end, Segments), 946 State1 #qistate { segments = Segments1 }. 947 948parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 949 Rest/binary>>, State) -> 950 parse_journal_entries(Rest, add_to_journal(SeqId, del, State)); 951 952parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 953 Rest/binary>>, State) -> 954 parse_journal_entries(Rest, add_to_journal(SeqId, ack, State)); 955parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS, 956 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) -> 957 %% Journal entry composed only of zeroes was probably 958 %% produced during a dirty shutdown so stop reading 959 State; 960parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, 961 Bin:?PUB_RECORD_BODY_BYTES/binary, 962 MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary, 963 Rest/binary>>, State) -> 964 IsPersistent = case Prefix of 965 ?PUB_PERSIST_JPREFIX -> true; 966 ?PUB_TRANS_JPREFIX -> false 967 end, 968 parse_journal_entries( 969 Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State)); 970parse_journal_entries(_ErrOrEoF, State) -> 971 State. 972 973deliver_or_ack(_Kind, [], State) -> 974 State; 975deliver_or_ack(Kind, SeqIds, State) -> 976 JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end, 977 {JournalHdl, State1} = get_journal_handle(State), 978 file_handle_cache_stats:update(queue_index_journal_write), 979 ok = file_handle_cache:append( 980 JournalHdl, 981 [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), 982 maybe_flush_journal(lists:foldl(fun (SeqId, StateN) -> 983 add_to_journal(SeqId, Kind, StateN) 984 end, State1, SeqIds)). 985 986notify_sync(State = #qistate{unconfirmed = UC, 987 unconfirmed_msg = UCM, 988 on_sync = OnSyncFun, 989 on_sync_msg = OnSyncMsgFun}) -> 990 State1 = case gb_sets:is_empty(UC) of 991 true -> State; 992 false -> OnSyncFun(UC), 993 State#qistate{unconfirmed = gb_sets:new()} 994 end, 995 case gb_sets:is_empty(UCM) of 996 true -> State1; 997 false -> OnSyncMsgFun(UCM), 998 State1#qistate{unconfirmed_msg = gb_sets:new()} 999 end. 1000 1001%%---------------------------------------------------------------------------- 1002%% segment manipulation 1003%%---------------------------------------------------------------------------- 1004 1005seq_id_to_seg_and_rel_seq_id(SeqId) -> 1006 SegmentEntryCount = segment_entry_count(), 1007 { SeqId div SegmentEntryCount, SeqId rem SegmentEntryCount }. 1008 1009reconstruct_seq_id(Seg, RelSeq) -> 1010 (Seg * segment_entry_count()) + RelSeq. 1011 1012all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> 1013 lists:sort( 1014 sets:to_list( 1015 lists:foldl( 1016 fun (SegName, Set) -> 1017 sets:add_element( 1018 list_to_integer( 1019 lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end, 1020 SegName)), Set) 1021 end, sets:from_list(segment_nums(Segments)), 1022 rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)))). 1023 1024segment_find_or_new(Seg, Dir, Segments) -> 1025 case segment_find(Seg, Segments) of 1026 {ok, Segment} -> Segment; 1027 error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, 1028 Path = filename:join(Dir, SegName), 1029 #segment { num = Seg, 1030 path = Path, 1031 journal_entries = array_new(), 1032 entries_to_segment = array_new([]), 1033 unacked = 0 } 1034 end. 1035 1036segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> 1037 {ok, Segment}; %% 1 or (2, matches head) 1038segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) -> 1039 {ok, Segment}; %% 2, matches tail 1040segment_find(Seg, {Segments, _}) -> %% no match 1041 maps:find(Seg, Segments). 1042 1043segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head) 1044 {Segments, [#segment { num = Seg } | Tail]}) -> 1045 {Segments, [Segment | Tail]}; 1046segment_store(Segment = #segment { num = Seg }, %% 2, matches tail 1047 {Segments, [SegmentA, #segment { num = Seg }]}) -> 1048 {Segments, [Segment, SegmentA]}; 1049segment_store(Segment = #segment { num = Seg }, {Segments, []}) -> 1050 {maps:remove(Seg, Segments), [Segment]}; 1051segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) -> 1052 {maps:remove(Seg, Segments), [Segment, SegmentA]}; 1053segment_store(Segment = #segment { num = Seg }, 1054 {Segments, [SegmentA, SegmentB]}) -> 1055 {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)), 1056 [Segment, SegmentA]}. 1057 1058segment_fold(Fun, Acc, {Segments, CachedSegments}) -> 1059 maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end, 1060 lists:foldl(Fun, Acc, CachedSegments), Segments). 1061 1062segment_map(Fun, {Segments, CachedSegments}) -> 1063 {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments), 1064 lists:map(Fun, CachedSegments)}. 1065 1066segment_nums({Segments, CachedSegments}) -> 1067 lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++ 1068 maps:keys(Segments). 1069 1070segments_new() -> 1071 {#{}, []}. 1072 1073entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> 1074 Initial; 1075entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) -> 1076 %% NB: we are assembling the segment in reverse order here, so 1077 %% del/ack comes first. 1078 Buf1 = case {Del, Ack} of 1079 {no_del, no_ack} -> 1080 Initial; 1081 _ -> 1082 Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, 1083 RelSeq:?REL_SEQ_BITS>>, 1084 case {Del, Ack} of 1085 {del, ack} -> [[Binary, Binary] | Initial]; 1086 _ -> [Binary | Initial] 1087 end 1088 end, 1089 case Pub of 1090 no_pub -> 1091 Buf1; 1092 {IsPersistent, Bin, MsgBin} -> 1093 [[<<?PUB_PREFIX:?PUB_PREFIX_BITS, 1094 (bool_to_int(IsPersistent)):1, 1095 RelSeq:?REL_SEQ_BITS, Bin/binary, 1096 (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1] 1097 end. 1098 1099read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, 1100 {Messages, Segments}, Dir) -> 1101 Segment = segment_find_or_new(Seg, Dir, Segments), 1102 {segment_entries_foldr( 1103 fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack}, 1104 Acc) 1105 when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso 1106 (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> 1107 [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, 1108 IsPersistent, IsDelivered == del} | Acc]; 1109 (_RelSeq, _Value, Acc) -> 1110 Acc 1111 end, Messages, Segment), 1112 segment_store(Segment, Segments)}. 1113 1114segment_entries_foldr(Fun, Init, 1115 Segment = #segment { journal_entries = JEntries }) -> 1116 {SegEntries, _UnackedCount} = load_segment(false, Segment), 1117 {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), 1118 array:sparse_foldr( 1119 fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) -> 1120 {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), 1121 Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc) 1122 end, Init, SegEntries1). 1123 1124%% Loading segments 1125%% 1126%% Does not do any combining with the journal at all. 1127load_segment(KeepAcked, #segment { path = Path }) -> 1128 Empty = {array_new(), 0}, 1129 case rabbit_file:is_file(Path) of 1130 false -> Empty; 1131 true -> Size = rabbit_file:file_size(Path), 1132 file_handle_cache_stats:update(queue_index_read), 1133 {ok, Hdl} = file_handle_cache:open_with_absolute_path( 1134 Path, ?READ_MODE, []), 1135 {ok, 0} = file_handle_cache:position(Hdl, bof), 1136 {ok, SegBin} = file_handle_cache:read(Hdl, Size), 1137 ok = file_handle_cache:close(Hdl), 1138 Res = parse_segment_entries(SegBin, KeepAcked, Empty), 1139 Res 1140 end. 1141 1142parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS, 1143 IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>, 1144 KeepAcked, Acc) -> 1145 parse_segment_publish_entry( 1146 Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc); 1147parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, 1148 RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) -> 1149 parse_segment_entries( 1150 Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); 1151parse_segment_entries(<<>>, _KeepAcked, Acc) -> 1152 Acc. 1153 1154parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary, 1155 MsgSize:?EMBEDDED_SIZE_BITS, 1156 MsgBin:MsgSize/binary, Rest/binary>>, 1157 IsPersistent, RelSeq, KeepAcked, 1158 {SegEntries, Unacked}) -> 1159 Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, 1160 SegEntries1 = array:set(RelSeq, Obj, SegEntries), 1161 parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1}); 1162parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) -> 1163 parse_segment_entries(Rest, KeepAcked, Acc). 1164 1165add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> 1166 case array:get(RelSeq, SegEntries) of 1167 {Pub, no_del, no_ack} -> 1168 {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked}; 1169 {Pub, del, no_ack} when KeepAcked -> 1170 {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1}; 1171 {_Pub, del, no_ack} -> 1172 {array:reset(RelSeq, SegEntries), Unacked - 1} 1173 end. 1174 1175array_new() -> 1176 array_new(undefined). 1177 1178array_new(Default) -> 1179 array:new([{default, Default}, fixed, {size, segment_entry_count()}]). 1180 1181segment_entry_count() -> 1182 get(segment_entry_count). 1183 1184bool_to_int(true ) -> 1; 1185bool_to_int(false) -> 0. 1186 1187%%---------------------------------------------------------------------------- 1188%% journal & segment combination 1189%%---------------------------------------------------------------------------- 1190 1191%% Combine what we have just read from a segment file with what we're 1192%% holding for that segment in memory. There must be no duplicates. 1193segment_plus_journal(SegEntries, JEntries) -> 1194 array:sparse_foldl( 1195 fun (RelSeq, JObj, {SegEntriesOut, AdditionalUnacked}) -> 1196 SegEntry = array:get(RelSeq, SegEntriesOut), 1197 {Obj, AdditionalUnackedDelta} = 1198 segment_plus_journal1(SegEntry, JObj), 1199 {case Obj of 1200 undefined -> array:reset(RelSeq, SegEntriesOut); 1201 _ -> array:set(RelSeq, Obj, SegEntriesOut) 1202 end, 1203 AdditionalUnacked + AdditionalUnackedDelta} 1204 end, {SegEntries, 0}, JEntries). 1205 1206%% Here, the result is a tuple with the first element containing the 1207%% item which we may be adding to (for items only in the journal), 1208%% modifying in (bits in both), or, when returning 'undefined', 1209%% erasing from (ack in journal, not segment) the segment array. The 1210%% other element of the tuple is the delta for AdditionalUnacked. 1211segment_plus_journal1(undefined, {?PUB, no_del, no_ack} = Obj) -> 1212 {Obj, 1}; 1213segment_plus_journal1(undefined, {?PUB, del, no_ack} = Obj) -> 1214 {Obj, 1}; 1215segment_plus_journal1(undefined, {?PUB, del, ack}) -> 1216 {undefined, 0}; 1217 1218segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) -> 1219 {{Pub, del, no_ack}, 0}; 1220segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) -> 1221 {undefined, -1}; 1222segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) -> 1223 {undefined, -1}. 1224 1225%% Remove from the journal entries for a segment, items that are 1226%% duplicates of entries found in the segment itself. Used on start up 1227%% to clean up the journal. 1228%% 1229%% We need to update the entries_to_segment since they are just a 1230%% cache of what's on the journal. 1231journal_minus_segment(JEntries, EToSeg, SegEntries) -> 1232 array:sparse_foldl( 1233 fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) -> 1234 SegEntry = array:get(RelSeq, SegEntries), 1235 {Obj, UnackedRemovedDelta} = 1236 journal_minus_segment1(JObj, SegEntry), 1237 {JEntriesOut1, EToSegOut1} = 1238 case Obj of 1239 keep -> 1240 {JEntriesOut, EToSegOut}; 1241 undefined -> 1242 {array:reset(RelSeq, JEntriesOut), 1243 array:reset(RelSeq, EToSegOut)}; 1244 _ -> 1245 {array:set(RelSeq, Obj, JEntriesOut), 1246 array:set(RelSeq, entry_to_segment(RelSeq, Obj, []), 1247 EToSegOut)} 1248 end, 1249 {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta} 1250 end, {JEntries, EToSeg, 0}, JEntries). 1251 1252%% Here, the result is a tuple with the first element containing the 1253%% item we are adding to or modifying in the (initially fresh) journal 1254%% array. If the item is 'undefined' we leave the journal array 1255%% alone. The other element of the tuple is the deltas for 1256%% UnackedRemoved. 1257 1258%% Both the same. Must be at least the publish 1259journal_minus_segment1({?PUB, _Del, no_ack} = Obj, Obj) -> 1260 {undefined, 1}; 1261journal_minus_segment1({?PUB, _Del, ack} = Obj, Obj) -> 1262 {undefined, 0}; 1263 1264%% Just publish in journal 1265journal_minus_segment1({?PUB, no_del, no_ack}, undefined) -> 1266 {keep, 0}; 1267 1268%% Publish and deliver in journal 1269journal_minus_segment1({?PUB, del, no_ack}, undefined) -> 1270 {keep, 0}; 1271journal_minus_segment1({?PUB = Pub, del, no_ack}, {Pub, no_del, no_ack}) -> 1272 {{no_pub, del, no_ack}, 1}; 1273 1274%% Publish, deliver and ack in journal 1275journal_minus_segment1({?PUB, del, ack}, undefined) -> 1276 {keep, 0}; 1277journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, no_del, no_ack}) -> 1278 {{no_pub, del, ack}, 1}; 1279journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, del, no_ack}) -> 1280 {{no_pub, no_del, ack}, 1}; 1281 1282%% Just deliver in journal 1283journal_minus_segment1({no_pub, del, no_ack}, {?PUB, no_del, no_ack}) -> 1284 {keep, 0}; 1285journal_minus_segment1({no_pub, del, no_ack}, {?PUB, del, no_ack}) -> 1286 {undefined, 0}; 1287 1288%% Just ack in journal 1289journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) -> 1290 {keep, 0}; 1291journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) -> 1292 {undefined, -1}; 1293 1294%% Deliver and ack in journal 1295journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) -> 1296 {keep, 0}; 1297journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> 1298 {{no_pub, no_del, ack}, 0}; 1299journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> 1300 {undefined, -1}; 1301 1302%% Missing segment. If flush_journal/1 is interrupted after deleting 1303%% the segment but before truncating the journal we can get these 1304%% cases: a delivery and an acknowledgement in the journal, or just an 1305%% acknowledgement in the journal, but with no segment. In both cases 1306%% we have really forgotten the message; so ignore what's in the 1307%% journal. 1308journal_minus_segment1({no_pub, no_del, ack}, undefined) -> 1309 {undefined, 0}; 1310journal_minus_segment1({no_pub, del, ack}, undefined) -> 1311 {undefined, 0}. 1312 1313%%---------------------------------------------------------------------------- 1314%% upgrade 1315%%---------------------------------------------------------------------------- 1316 1317-spec add_queue_ttl() -> 'ok'. 1318 1319add_queue_ttl() -> 1320 foreach_queue_index({fun add_queue_ttl_journal/1, 1321 fun add_queue_ttl_segment/1}). 1322 1323add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1324 Rest/binary>>) -> 1325 {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; 1326add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1327 Rest/binary>>) -> 1328 {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; 1329add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1330 MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) -> 1331 {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId, 1332 expiry_to_binary(undefined)], Rest}; 1333add_queue_ttl_journal(_) -> 1334 stop. 1335 1336add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, 1337 RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary, 1338 Rest/binary>>) -> 1339 {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>, 1340 MsgId, expiry_to_binary(undefined)], Rest}; 1341add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, 1342 RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> 1343 {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, 1344 Rest}; 1345add_queue_ttl_segment(_) -> 1346 stop. 1347 1348avoid_zeroes() -> 1349 foreach_queue_index({none, fun avoid_zeroes_segment/1}). 1350 1351avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, 1352 RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, 1353 Expiry:?EXPIRY_BITS, Rest/binary>>) -> 1354 {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, 1355 MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest}; 1356avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, 1357 RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> 1358 {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, 1359 Rest}; 1360avoid_zeroes_segment(_) -> 1361 stop. 1362 1363%% At upgrade time we just define every message's size as 0 - that 1364%% will save us a load of faff with the message store, and means we 1365%% can actually use the clean recovery terms in VQ. It does mean we 1366%% don't count message bodies from before the migration, but we can 1367%% live with that. 1368store_msg_size() -> 1369 foreach_queue_index({fun store_msg_size_journal/1, 1370 fun store_msg_size_segment/1}). 1371 1372store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1373 Rest/binary>>) -> 1374 {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; 1375store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1376 Rest/binary>>) -> 1377 {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; 1378store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1379 MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 1380 Rest/binary>>) -> 1381 {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, 1382 Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; 1383store_msg_size_journal(_) -> 1384 stop. 1385 1386store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, 1387 RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, 1388 Expiry:?EXPIRY_BITS, Rest/binary>>) -> 1389 {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, 1390 MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; 1391store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, 1392 RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> 1393 {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, 1394 Rest}; 1395store_msg_size_segment(_) -> 1396 stop. 1397 1398store_msg() -> 1399 foreach_queue_index({fun store_msg_journal/1, 1400 fun store_msg_segment/1}). 1401 1402store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1403 Rest/binary>>) -> 1404 {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; 1405store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1406 Rest/binary>>) -> 1407 {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; 1408store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, 1409 MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, 1410 Rest/binary>>) -> 1411 {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, 1412 Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, 1413 0:?EMBEDDED_SIZE_BITS>>, Rest}; 1414store_msg_journal(_) -> 1415 stop. 1416 1417store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, 1418 RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, 1419 Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) -> 1420 {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, 1421 MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, 1422 0:?EMBEDDED_SIZE_BITS>>, Rest}; 1423store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, 1424 RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> 1425 {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, 1426 Rest}; 1427store_msg_segment(_) -> 1428 stop. 1429 1430 1431 1432%%---------------------------------------------------------------------------- 1433%% Migration functions 1434%%---------------------------------------------------------------------------- 1435 1436foreach_queue_index(Funs) -> 1437 QueueDirNames = all_queue_directory_names(), 1438 {ok, Gatherer} = gatherer:start_link(), 1439 [begin 1440 ok = gatherer:fork(Gatherer), 1441 ok = worker_pool:submit_async( 1442 fun () -> 1443 transform_queue(QueueDirName, Gatherer, Funs) 1444 end) 1445 end || QueueDirName <- QueueDirNames], 1446 empty = gatherer:out(Gatherer), 1447 ok = gatherer:stop(Gatherer). 1448 1449transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> 1450 ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), 1451 [ok = transform_file(filename:join(Dir, Seg), SegmentFun) 1452 || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)], 1453 ok = gatherer:finish(Gatherer). 1454 1455transform_file(_Path, none) -> 1456 ok; 1457transform_file(Path, Fun) when is_function(Fun)-> 1458 PathTmp = Path ++ ".upgrade", 1459 case rabbit_file:file_size(Path) of 1460 0 -> ok; 1461 Size -> {ok, PathTmpHdl} = 1462 file_handle_cache:open_with_absolute_path( 1463 PathTmp, ?WRITE_MODE, 1464 [{write_buffer, infinity}]), 1465 1466 {ok, PathHdl} = file_handle_cache:open_with_absolute_path( 1467 Path, ?READ_MODE, [{read_buffer, Size}]), 1468 {ok, Content} = file_handle_cache:read(PathHdl, Size), 1469 ok = file_handle_cache:close(PathHdl), 1470 1471 ok = drive_transform_fun(Fun, PathTmpHdl, Content), 1472 1473 ok = file_handle_cache:close(PathTmpHdl), 1474 ok = rabbit_file:rename(PathTmp, Path) 1475 end. 1476 1477drive_transform_fun(Fun, Hdl, Contents) -> 1478 case Fun(Contents) of 1479 stop -> ok; 1480 {Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output), 1481 drive_transform_fun(Fun, Hdl, Contents1) 1482 end. 1483 1484move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) -> 1485 OldQueueDir = filename:join([queues_base_dir(), "queues", 1486 queue_name_to_dir_name_legacy(QueueName)]), 1487 VHostDir = rabbit_vhost:msg_store_dir_path(VHost), 1488 NewQueueDir = queue_dir(VHostDir, QueueName), 1489 rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'", 1490 [OldQueueDir, NewQueueDir]), 1491 case rabbit_file:is_dir(OldQueueDir) of 1492 true -> 1493 ok = rabbit_file:ensure_dir(NewQueueDir), 1494 ok = rabbit_file:rename(OldQueueDir, NewQueueDir), 1495 ok = ensure_queue_name_stub_file(NewQueueDir, QueueName); 1496 false -> 1497 Msg = "Queue index directory '~s' not found for ~s", 1498 Args = [OldQueueDir, rabbit_misc:rs(QueueName)], 1499 rabbit_log_upgrade:error(Msg, Args), 1500 rabbit_log:error(Msg, Args) 1501 end, 1502 ok. 1503 1504ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) -> 1505 QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE), 1506 file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n", 1507 "QUEUE: ", QName/binary, "\n">>). 1508 1509read_global_recovery_terms(DurableQueueNames) -> 1510 ok = rabbit_recovery_terms:open_global_table(), 1511 1512 DurableTerms = 1513 lists:foldl( 1514 fun(QName, RecoveryTerms) -> 1515 DirName = queue_name_to_dir_name_legacy(QName), 1516 RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of 1517 {error, _} -> non_clean_shutdown; 1518 {ok, Terms} -> Terms 1519 end, 1520 [RecoveryInfo | RecoveryTerms] 1521 end, [], DurableQueueNames), 1522 1523 ok = rabbit_recovery_terms:close_global_table(), 1524 %% The backing queue interface requires that the queue recovery terms 1525 %% which come back from start/1 are in the same order as DurableQueueNames 1526 OrderedTerms = lists:reverse(DurableTerms), 1527 {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. 1528 1529cleanup_global_recovery_terms() -> 1530 rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]), 1531 rabbit_recovery_terms:delete_global_table(), 1532 ok. 1533 1534 1535update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) -> 1536 Key = queue_name_to_dir_name(QueueName), 1537 rabbit_recovery_terms:store(VHost, Key, Term). 1538