1 /*
2  * LogSystemDiskQueueAdapter.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FDBSERVER_LOGSYSTEMDISKQUEUEADAPTER_H
22 #define FDBSERVER_LOGSYSTEMDISKQUEUEADAPTER_H
23 #pragma once
24 
25 #include "fdbclient/FDBTypes.h"
26 #include "fdbserver/IDiskQueue.h"
27 
28 struct PeekSpecialInfo {
29 	int8_t primaryLocality;
30 	int8_t secondaryLocality;
31 	Version knownCommittedVersion;
32 
33 	bool operator == (const PeekSpecialInfo& r) const {
34 		return primaryLocality == r.primaryLocality && secondaryLocality == r.secondaryLocality && knownCommittedVersion == r.knownCommittedVersion;
35 	}
36 
PeekSpecialInfoPeekSpecialInfo37 	PeekSpecialInfo(int8_t primaryLocality, int8_t secondaryLocality, Version knownCommittedVersion) : primaryLocality(primaryLocality), secondaryLocality(secondaryLocality), knownCommittedVersion(knownCommittedVersion) {}
38 };
39 
40 class LogSystemDiskQueueAdapter : public IDiskQueue {
41 public:
42 	// This adapter is designed to let KeyValueStoreMemory use ILogSystem
43 	// as a backing store, so that the transaction subsystem can in
44 	// turn use KeyValueStoreMemory to track configuration information as of
45 	// the database version and recover it from the logging subsystem as necessary.
46 
47 	// Because the transaction subsystem will need to control the actual pushing of
48 	// committed information to the ILogSystem, commit() in this interface doesn't directly
49 	// call ILogSystem::push().  Instead it makes a commit message available through
50 	// getCommitMessage(), and doesn't return until its acknowledge promise is set.
51 	// The caller is responsible for calling ILogSystem::push() and ILogSystem::pop() with the results.
52 
53 	// It does, however, peek the specified tag directly at recovery time.
54 
logSystem(logSystem)55 	LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<PeekSpecialInfo>> peekLocality, bool recover=true ) : logSystem(logSystem), tag(tag), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0) {
56 		if (enableRecovery) {
57 			localityChanged = peekLocality ? peekLocality->onChange() : Never();
58 			cursor = logSystem->peekSpecial( UID(), 1, tag, peekLocality ? peekLocality->get().primaryLocality : tagLocalityInvalid, peekLocality ? peekLocality->get().knownCommittedVersion : invalidVersion );
59 		}
60 	}
61 
62 	struct CommitMessage {
63 		Standalone<VectorRef<VectorRef<uint8_t>>> messages;    // push this into the logSystem with `tag`
64 		Version popTo;                                         // pop this from the logSystem with `tag`
65 		Promise<Void> acknowledge;                             // then send Void to this, so commit() can return
66 	};
67 
68 	// Set the version of the next push or commit (or a lower version)
69 	// If lower, locations returned by the IDiskQueue interface will be conservative, so things that could be popped might not be
setNextVersion(Version next)70 	void setNextVersion( Version next ) { nextCommit = next; }
71 
72 	// Return the next commit message resulting from a call to commit().
73 	Future<CommitMessage> getCommitMessage();
74 
75 	// IClosable interface
76 	virtual Future<Void> getError();
77 	virtual Future<Void> onClosed();
78 	virtual void dispose();
79 	virtual void close();
80 
81 	// IDiskQueue interface
initializeRecovery(location recoverAt)82 	virtual Future<bool> initializeRecovery(location recoverAt) { return false; }
83 	virtual Future<Standalone<StringRef>> readNext( int bytes );
84 	virtual IDiskQueue::location getNextReadLocation();
getNextCommitLocation()85 	virtual IDiskQueue::location getNextCommitLocation() { ASSERT(false); throw internal_error(); }
getNextPushLocation()86 	virtual IDiskQueue::location getNextPushLocation() { ASSERT(false); throw internal_error(); }
read(location start,location end,CheckHashes ch)87 	virtual Future<Standalone<StringRef>> read( location start, location end, CheckHashes ch ) { ASSERT(false); throw internal_error(); }
88 	virtual IDiskQueue::location push( StringRef contents );
89 	virtual void pop( IDiskQueue::location upTo );
90 	virtual Future<Void> commit();
getStorageBytes()91 	virtual StorageBytes getStorageBytes() { ASSERT(false); throw internal_error(); }
getCommitOverhead()92 	virtual int getCommitOverhead() { return 0; } //SOMEDAY: could this be more accurate?
93 
94 private:
95 	Reference<AsyncVar<PeekSpecialInfo>> peekLocality;
96 	Future<Void> localityChanged;
97 	Reference<ILogSystem::IPeekCursor> cursor;
98 	int peekTypeSwitches;
99 	Tag tag;
100 
101 	// Recovery state (used while readNext() is being called repeatedly)
102 	bool enableRecovery;
103 	Reference<ILogSystem> logSystem;
104 	Version recoveryLoc, recoveryQueueLoc;
105 	std::vector<Standalone<StringRef>> recoveryQueue;
106 	int recoveryQueueDataSize;
107 
108 	// State for next commit() call
109 	Standalone<VectorRef<VectorRef<uint8_t>>> pushedData;  // SOMEDAY: better representation?
110 	Version poppedUpTo;
111 	std::deque< Promise<CommitMessage> > commitMessages;
112 	Version nextCommit;
113 
114 	friend class LogSystemDiskQueueAdapterImpl;
115 };
116 
117 LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<PeekSpecialInfo>> peekLocality );
118 
119 #endif
120