1 /* 2 * LogSystemDiskQueueAdapter.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef FDBSERVER_LOGSYSTEMDISKQUEUEADAPTER_H 22 #define FDBSERVER_LOGSYSTEMDISKQUEUEADAPTER_H 23 #pragma once 24 25 #include "fdbclient/FDBTypes.h" 26 #include "fdbserver/IDiskQueue.h" 27 28 struct PeekSpecialInfo { 29 int8_t primaryLocality; 30 int8_t secondaryLocality; 31 Version knownCommittedVersion; 32 33 bool operator == (const PeekSpecialInfo& r) const { 34 return primaryLocality == r.primaryLocality && secondaryLocality == r.secondaryLocality && knownCommittedVersion == r.knownCommittedVersion; 35 } 36 PeekSpecialInfoPeekSpecialInfo37 PeekSpecialInfo(int8_t primaryLocality, int8_t secondaryLocality, Version knownCommittedVersion) : primaryLocality(primaryLocality), secondaryLocality(secondaryLocality), knownCommittedVersion(knownCommittedVersion) {} 38 }; 39 40 class LogSystemDiskQueueAdapter : public IDiskQueue { 41 public: 42 // This adapter is designed to let KeyValueStoreMemory use ILogSystem 43 // as a backing store, so that the transaction subsystem can in 44 // turn use KeyValueStoreMemory to track configuration information as of 45 // the database version and recover it from the logging subsystem as necessary. 46 47 // Because the transaction subsystem will need to control the actual pushing of 48 // committed information to the ILogSystem, commit() in this interface doesn't directly 49 // call ILogSystem::push(). Instead it makes a commit message available through 50 // getCommitMessage(), and doesn't return until its acknowledge promise is set. 51 // The caller is responsible for calling ILogSystem::push() and ILogSystem::pop() with the results. 52 53 // It does, however, peek the specified tag directly at recovery time. 54 logSystem(logSystem)55 LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<PeekSpecialInfo>> peekLocality, bool recover=true ) : logSystem(logSystem), tag(tag), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0) { 56 if (enableRecovery) { 57 localityChanged = peekLocality ? peekLocality->onChange() : Never(); 58 cursor = logSystem->peekSpecial( UID(), 1, tag, peekLocality ? peekLocality->get().primaryLocality : tagLocalityInvalid, peekLocality ? peekLocality->get().knownCommittedVersion : invalidVersion ); 59 } 60 } 61 62 struct CommitMessage { 63 Standalone<VectorRef<VectorRef<uint8_t>>> messages; // push this into the logSystem with `tag` 64 Version popTo; // pop this from the logSystem with `tag` 65 Promise<Void> acknowledge; // then send Void to this, so commit() can return 66 }; 67 68 // Set the version of the next push or commit (or a lower version) 69 // If lower, locations returned by the IDiskQueue interface will be conservative, so things that could be popped might not be setNextVersion(Version next)70 void setNextVersion( Version next ) { nextCommit = next; } 71 72 // Return the next commit message resulting from a call to commit(). 73 Future<CommitMessage> getCommitMessage(); 74 75 // IClosable interface 76 virtual Future<Void> getError(); 77 virtual Future<Void> onClosed(); 78 virtual void dispose(); 79 virtual void close(); 80 81 // IDiskQueue interface initializeRecovery(location recoverAt)82 virtual Future<bool> initializeRecovery(location recoverAt) { return false; } 83 virtual Future<Standalone<StringRef>> readNext( int bytes ); 84 virtual IDiskQueue::location getNextReadLocation(); getNextCommitLocation()85 virtual IDiskQueue::location getNextCommitLocation() { ASSERT(false); throw internal_error(); } getNextPushLocation()86 virtual IDiskQueue::location getNextPushLocation() { ASSERT(false); throw internal_error(); } read(location start,location end,CheckHashes ch)87 virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes ch ) { ASSERT(false); throw internal_error(); } 88 virtual IDiskQueue::location push( StringRef contents ); 89 virtual void pop( IDiskQueue::location upTo ); 90 virtual Future<Void> commit(); getStorageBytes()91 virtual StorageBytes getStorageBytes() { ASSERT(false); throw internal_error(); } getCommitOverhead()92 virtual int getCommitOverhead() { return 0; } //SOMEDAY: could this be more accurate? 93 94 private: 95 Reference<AsyncVar<PeekSpecialInfo>> peekLocality; 96 Future<Void> localityChanged; 97 Reference<ILogSystem::IPeekCursor> cursor; 98 int peekTypeSwitches; 99 Tag tag; 100 101 // Recovery state (used while readNext() is being called repeatedly) 102 bool enableRecovery; 103 Reference<ILogSystem> logSystem; 104 Version recoveryLoc, recoveryQueueLoc; 105 std::vector<Standalone<StringRef>> recoveryQueue; 106 int recoveryQueueDataSize; 107 108 // State for next commit() call 109 Standalone<VectorRef<VectorRef<uint8_t>>> pushedData; // SOMEDAY: better representation? 110 Version poppedUpTo; 111 std::deque< Promise<CommitMessage> > commitMessages; 112 Version nextCommit; 113 114 friend class LogSystemDiskQueueAdapterImpl; 115 }; 116 117 LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<PeekSpecialInfo>> peekLocality ); 118 119 #endif 120