1 /*
2  * TLogInterface.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FDBSERVER_TLOGINTERFACE_H
22 #define FDBSERVER_TLOGINTERFACE_H
23 #pragma once
24 
25 #include "fdbclient/FDBTypes.h"
26 #include "fdbclient/CommitTransaction.h"
27 #include "fdbclient/MutationList.h"
28 #include "fdbclient/StorageServerInterface.h"
29 #include <iterator>
30 
31 struct TLogInterface {
32 	enum { LocationAwareLoadBalance = 1 };
33 	enum { AlwaysFresh = 1 };
34 
35 	LocalityData locality;
36 	UID uniqueID;
37 	UID sharedTLogID;
38 	RequestStream< struct TLogPeekRequest > peekMessages;
39 	RequestStream< struct TLogPopRequest > popMessages;
40 
41 	RequestStream< struct TLogCommitRequest > commit;
42 	RequestStream< ReplyPromise< struct TLogLockResult > > lock; // first stage of database recovery
43 	RequestStream< struct TLogQueuingMetricsRequest > getQueuingMetrics;
44 	RequestStream< struct TLogConfirmRunningRequest > confirmRunning; // used for getReadVersion requests from client
45 	RequestStream<ReplyPromise<Void>> waitFailure;
46 	RequestStream< struct TLogRecoveryFinishedRequest > recoveryFinished;
47 
TLogInterfaceTLogInterface48 	TLogInterface() {}
TLogInterfaceTLogInterface49 	explicit TLogInterface(LocalityData locality) : uniqueID( g_random->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; }
TLogInterfaceTLogInterface50 	TLogInterface(UID sharedTLogID, LocalityData locality) : uniqueID( g_random->randomUniqueID() ), sharedTLogID(sharedTLogID), locality(locality) {}
TLogInterfaceTLogInterface51 	TLogInterface(UID uniqueID, UID sharedTLogID, LocalityData locality) : uniqueID(uniqueID), sharedTLogID(sharedTLogID), locality(locality) {}
idTLogInterface52 	UID id() const { return uniqueID; }
getSharedTLogIDTLogInterface53 	UID getSharedTLogID() const { return sharedTLogID; }
toStringTLogInterface54 	std::string toString() const { return id().shortString(); }
55 	bool operator == ( TLogInterface const& r ) const { return id() == r.id(); }
addressTLogInterface56 	NetworkAddress address() const { return peekMessages.getEndpoint().getPrimaryAddress(); }
initEndpointsTLogInterface57 	void initEndpoints() {
58 		getQueuingMetrics.getEndpoint( TaskTLogQueuingMetrics );
59 		popMessages.getEndpoint( TaskTLogPop );
60 		peekMessages.getEndpoint( TaskTLogPeek );
61 		confirmRunning.getEndpoint( TaskTLogConfirmRunning );
62 		commit.getEndpoint( TaskTLogCommit );
63 	}
64 
65 	template <class Ar>
serializeTLogInterface66 	void serialize( Ar& ar ) {
67 		ASSERT(ar.isDeserializing || uniqueID != UID());
68 		serializer(ar, uniqueID, sharedTLogID, locality, peekMessages, popMessages
69 		  , commit, lock, getQueuingMetrics, confirmRunning, waitFailure, recoveryFinished);
70 	}
71 };
72 
73 struct TLogRecoveryFinishedRequest {
74 	ReplyPromise<Void> reply;
75 
TLogRecoveryFinishedRequestTLogRecoveryFinishedRequest76 	TLogRecoveryFinishedRequest() {}
77 
78 	template <class Ar>
serializeTLogRecoveryFinishedRequest79 	void serialize( Ar& ar ) {
80 		serializer(ar, reply);
81 	}
82 };
83 
84 struct TLogLockResult {
85 	Version end;
86 	Version knownCommittedVersion;
87 
88 	template <class Ar>
serializeTLogLockResult89 	void serialize( Ar& ar ) {
90 		serializer(ar, end, knownCommittedVersion);
91 	}
92 };
93 
94 struct TLogConfirmRunningRequest {
95 	Optional<UID> debugID;
96 	ReplyPromise<Void> reply;
97 
TLogConfirmRunningRequestTLogConfirmRunningRequest98 	TLogConfirmRunningRequest() {}
TLogConfirmRunningRequestTLogConfirmRunningRequest99 	TLogConfirmRunningRequest( Optional<UID> debugID ) : debugID(debugID) {}
100 
101 	template <class Ar>
serializeTLogConfirmRunningRequest102 	void serialize( Ar& ar ) {
103 		serializer(ar, debugID, reply);
104 	}
105 };
106 
107 struct VersionUpdateRef {
108 	Version version;
109 	MutationListRef mutations;
110 	bool isPrivateData;
111 
VersionUpdateRefVersionUpdateRef112 	VersionUpdateRef() : isPrivateData(false), version(invalidVersion) {}
VersionUpdateRefVersionUpdateRef113 	VersionUpdateRef( Arena& to, const VersionUpdateRef& from ) : version(from.version), mutations( to, from.mutations ), isPrivateData( from.isPrivateData ) {}
totalSizeVersionUpdateRef114 	int totalSize() const { return mutations.totalSize(); }
expectedSizeVersionUpdateRef115 	int expectedSize() const { return mutations.expectedSize(); }
116 
117 	template <class Ar>
serializeVersionUpdateRef118 	void serialize( Ar& ar ) {
119 		serializer(ar, version, mutations, isPrivateData);
120 	}
121 };
122 
123 struct VerUpdateRef {
124 	Version version;
125 	VectorRef<MutationRef> mutations;
126 	bool isPrivateData;
127 
VerUpdateRefVerUpdateRef128 	VerUpdateRef() : isPrivateData(false), version(invalidVersion) {}
VerUpdateRefVerUpdateRef129 	VerUpdateRef( Arena& to, const VerUpdateRef& from ) : version(from.version), mutations( to, from.mutations ), isPrivateData( from.isPrivateData ) {}
expectedSizeVerUpdateRef130 	int expectedSize() const { return mutations.expectedSize(); }
131 
132 	template <class Ar>
serializeVerUpdateRef133 	void serialize( Ar& ar ) {
134 		serializer(ar, version, mutations, isPrivateData);
135 	}
136 };
137 
138 struct TLogPeekReply {
139 	Arena arena;
140 	StringRef messages;
141 	Version end;
142 	Optional<Version> popped;
143 	Version maxKnownVersion;
144 	Version minKnownCommittedVersion;
145 	Optional<Version> begin;
146 
147 	template <class Ar>
serializeTLogPeekReply148 	void serialize(Ar& ar) {
149 		serializer(ar, arena, messages, end, popped, maxKnownVersion, minKnownCommittedVersion, begin);
150 	}
151 };
152 
153 struct TLogPeekRequest {
154 	Arena arena;
155 	Version begin;
156 	Tag tag;
157 	bool returnIfBlocked;
158 	Optional<std::pair<UID, int>> sequence;
159 	ReplyPromise<TLogPeekReply> reply;
160 
beginTLogPeekRequest161 	TLogPeekRequest( Version begin, Tag tag, bool returnIfBlocked, Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>() ) : begin(begin), tag(tag), returnIfBlocked(returnIfBlocked), sequence(sequence) {}
TLogPeekRequestTLogPeekRequest162 	TLogPeekRequest() {}
163 
164 	template <class Ar>
serializeTLogPeekRequest165 	void serialize(Ar& ar) {
166 		serializer(ar, arena, begin, tag, returnIfBlocked, sequence, reply);
167 	}
168 };
169 
170 struct TLogPopRequest {
171 	Arena arena;
172 	Version to;
173 	Version durableKnownCommittedVersion;
174 	Tag tag;
175 	ReplyPromise<Void> reply;
176 
TLogPopRequestTLogPopRequest177 	TLogPopRequest( Version to, Version durableKnownCommittedVersion, Tag tag ) : to(to), durableKnownCommittedVersion(durableKnownCommittedVersion), tag(tag) {}
TLogPopRequestTLogPopRequest178 	TLogPopRequest() {}
179 
180 	template <class Ar>
serializeTLogPopRequest181 	void serialize(Ar& ar) {
182 		serializer(ar, arena, to, durableKnownCommittedVersion, tag, reply);
183 	}
184 };
185 
186 struct TagMessagesRef {
187 	Tag tag;
188 	VectorRef<int> messageOffsets;
189 
TagMessagesRefTagMessagesRef190 	TagMessagesRef() {}
TagMessagesRefTagMessagesRef191 	TagMessagesRef(Arena &a, const TagMessagesRef &from) : tag(from.tag), messageOffsets(a, from.messageOffsets) {}
192 
expectedSizeTagMessagesRef193 	size_t expectedSize() const {
194 		return messageOffsets.expectedSize();
195 	}
196 
197 	template <class Ar>
serializeTagMessagesRef198 	void serialize(Ar& ar) {
199 		serializer(ar, tag, messageOffsets);
200 	}
201 };
202 
203 struct TLogCommitRequest {
204 	Arena arena;
205 	Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion;
206 
207 	StringRef messages;// Each message prefixed by a 4-byte length
208 
209 	ReplyPromise<Version> reply;
210 	Optional<UID> debugID;
211 
TLogCommitRequestTLogCommitRequest212 	TLogCommitRequest() {}
TLogCommitRequestTLogCommitRequest213 	TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
214 		: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
215 	template <class Ar>
serializeTLogCommitRequest216 	void serialize( Ar& ar ) {
217 		serializer(ar, prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, messages, reply, arena, debugID);
218 	}
219 };
220 
221 struct TLogQueuingMetricsRequest {
222 	ReplyPromise<struct TLogQueuingMetricsReply> reply;
223 
224 	template <class Ar>
serializeTLogQueuingMetricsRequest225 	void serialize(Ar& ar) {
226 		serializer(ar, reply);
227 	}
228 };
229 
230 struct TLogQueuingMetricsReply {
231 	double localTime;
232 	int64_t instanceID;  // changes if bytesDurable and bytesInput reset
233 	int64_t bytesDurable, bytesInput;
234 	StorageBytes storageBytes;
235 	Version v; // committed version
236 
237 	template <class Ar>
serializeTLogQueuingMetricsReply238 	void serialize(Ar& ar) {
239 		serializer(ar, localTime, instanceID, bytesDurable, bytesInput, storageBytes, v);
240 	}
241 };
242 
243 #endif
244