1 
2 /*
3  * MasterProxyInterface.h
4  *
5  * This source file is part of the FoundationDB open source project
6  *
7  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *     http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  */
21 
22 #ifndef FDBCLIENT_MASTERPROXYINTERFACE_H
23 #define FDBCLIENT_MASTERPROXYINTERFACE_H
24 #pragma once
25 
26 #include "fdbclient/FDBTypes.h"
27 #include "fdbclient/StorageServerInterface.h"
28 #include "fdbclient/CommitTransaction.h"
29 
30 #include "flow/Stats.h"
31 
32 struct MasterProxyInterface {
33 	enum { LocationAwareLoadBalance = 1 };
34 	enum { AlwaysFresh = 1 };
35 
36 	LocalityData locality;
37 	bool provisional;
38 	RequestStream< struct CommitTransactionRequest > commit;
39 	RequestStream< struct GetReadVersionRequest > getConsistentReadVersion;  // Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
40 															     //   (at some point between when this request is sent and when its response is received, the latest version reported committed)
41 	RequestStream< struct GetKeyServerLocationsRequest > getKeyServersLocations;
42 	RequestStream< struct GetStorageServerRejoinInfoRequest > getStorageServerRejoinInfo;
43 
44 	RequestStream<ReplyPromise<Void>> waitFailure;
45 
46 	RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
47 	RequestStream< struct TxnStateRequest >  txnState;
48 
49 	RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
50 
idMasterProxyInterface51 	UID id() const { return commit.getEndpoint().token; }
toStringMasterProxyInterface52 	std::string toString() const { return id().shortString(); }
53 	bool operator == (MasterProxyInterface const& r) const { return id() == r.id(); }
54 	bool operator != (MasterProxyInterface const& r) const { return id() != r.id(); }
addressMasterProxyInterface55 	NetworkAddress address() const { return commit.getEndpoint().getPrimaryAddress(); }
56 
57 	template <class Archive>
serializeMasterProxyInterface58 	void serialize(Archive& ar) {
59 		serializer(ar, locality, provisional, commit, getConsistentReadVersion, getKeyServersLocations,
60 				   waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
61 				   txnState, getHealthMetrics);
62 	}
63 
initEndpointsMasterProxyInterface64 	void initEndpoints() {
65 		getConsistentReadVersion.getEndpoint(TaskProxyGetConsistentReadVersion);
66 		getRawCommittedVersion.getEndpoint(TaskProxyGetRawCommittedVersion);
67 		commit.getEndpoint(TaskProxyCommitDispatcher);
68 		//getKeyServersLocations.getEndpoint(TaskProxyGetKeyServersLocations); //do not increase the priority of these requests, because clients cans bring down the cluster with too many of these messages.
69 	}
70 };
71 
72 struct CommitID {
73 	Version version; 			// returns invalidVersion if transaction conflicts
74 	uint16_t txnBatchId;
75 	Optional<Value> metadataVersion;
76 
77 	template <class Ar>
serializeCommitID78 	void serialize(Ar& ar) {
79 		serializer(ar, version, txnBatchId, metadataVersion);
80 	}
81 
CommitIDCommitID82 	CommitID() : version(invalidVersion), txnBatchId(0) {}
CommitIDCommitID83 	CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion) {}
84 };
85 
86 struct CommitTransactionRequest : TimedRequest {
87 	enum {
88 		FLAG_IS_LOCK_AWARE = 0x1,
89 		FLAG_FIRST_IN_BATCH = 0x2
90 	};
91 
isLockAwareCommitTransactionRequest92 	bool isLockAware() const { return (flags & FLAG_IS_LOCK_AWARE) != 0; }
firstInBatchCommitTransactionRequest93 	bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; }
94 
95 	Arena arena;
96 	CommitTransactionRef transaction;
97 	ReplyPromise<CommitID> reply;
98 	uint32_t flags;
99 	Optional<UID> debugID;
100 
CommitTransactionRequestCommitTransactionRequest101 	CommitTransactionRequest() : flags(0) {}
102 
103 	template <class Ar>
serializeCommitTransactionRequest104 	void serialize(Ar& ar) {
105 		serializer(ar, transaction, reply, arena, flags, debugID);
106 	}
107 };
108 
getBytes(CommitTransactionRequest const & r)109 static inline int getBytes( CommitTransactionRequest const& r ) {
110 	// SOMEDAY: Optimize
111 	//return r.arena.getSize(); // NOT correct because arena can be shared!
112 	int total = sizeof(r);
113 	for(auto m = r.transaction.mutations.begin(); m != r.transaction.mutations.end(); ++m)
114 		total += m->expectedSize() + CLIENT_KNOBS->PROXY_COMMIT_OVERHEAD_BYTES;
115 	for(auto i = r.transaction.read_conflict_ranges.begin(); i != r.transaction.read_conflict_ranges.end(); ++i)
116 		total += i->expectedSize();
117 	for(auto i = r.transaction.write_conflict_ranges.begin(); i != r.transaction.write_conflict_ranges.end(); ++i)
118 		total += i->expectedSize();
119 	return total;
120 }
121 
122 struct GetReadVersionReply {
123 	Version version;
124 	bool locked;
125 	Optional<Value> metadataVersion;
126 
127 	template <class Ar>
serializeGetReadVersionReply128 	void serialize(Ar& ar) {
129 		serializer(ar, version, locked, metadataVersion);
130 	}
131 };
132 
133 struct GetReadVersionRequest : TimedRequest {
134 	enum {
135 		PRIORITY_SYSTEM_IMMEDIATE = 15 << 24,  // Highest possible priority, always executed even if writes are otherwise blocked
136 		PRIORITY_DEFAULT = 8 << 24,
137 		PRIORITY_BATCH = 1 << 24
138 	};
139 	enum {
140 		FLAG_USE_PROVISIONAL_PROXIES = 2,
141 		FLAG_CAUSAL_READ_RISKY = 1,
142 		FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,
143 	};
144 
145 	uint32_t transactionCount;
146 	uint32_t flags;
147 	Optional<UID> debugID;
148 	ReplyPromise<GetReadVersionReply> reply;
149 
GetReadVersionRequestGetReadVersionRequest150 	GetReadVersionRequest() : transactionCount( 1 ), flags( PRIORITY_DEFAULT ) {}
transactionCountGetReadVersionRequest151 	GetReadVersionRequest( uint32_t transactionCount, uint32_t flags, Optional<UID> debugID = Optional<UID>() ) : transactionCount( transactionCount ), flags( flags ), debugID( debugID ) {}
152 
priorityGetReadVersionRequest153 	int priority() const { return flags & FLAG_PRIORITY_MASK; }
154 	bool operator < (GetReadVersionRequest const& rhs) const { return priority() < rhs.priority(); }
155 
156 	template <class Ar>
serializeGetReadVersionRequest157 	void serialize(Ar& ar) {
158 		serializer(ar, transactionCount, flags, debugID, reply);
159 	}
160 };
161 
162 struct GetKeyServerLocationsReply {
163 	Arena arena;
164 	vector<pair<KeyRangeRef, vector<StorageServerInterface>>> results;
165 
166 	template <class Ar>
serializeGetKeyServerLocationsReply167 	void serialize(Ar& ar) {
168 		serializer(ar, results, arena);
169 	}
170 };
171 
172 struct GetKeyServerLocationsRequest {
173 	Arena arena;
174 	KeyRef begin;
175 	Optional<KeyRef> end;
176 	int limit;
177 	bool reverse;
178 	ReplyPromise<GetKeyServerLocationsReply> reply;
179 
GetKeyServerLocationsRequestGetKeyServerLocationsRequest180 	GetKeyServerLocationsRequest() : limit(0), reverse(false) {}
GetKeyServerLocationsRequestGetKeyServerLocationsRequest181 	GetKeyServerLocationsRequest( KeyRef const& begin, Optional<KeyRef> const& end, int limit, bool reverse, Arena const& arena ) : begin( begin ), end( end ), limit( limit ), reverse( reverse ), arena( arena ) {}
182 
183 	template <class Ar>
serializeGetKeyServerLocationsRequest184 	void serialize(Ar& ar) {
185 		serializer(ar, begin, end, limit, reverse, reply, arena);
186 	}
187 };
188 
189 struct GetRawCommittedVersionRequest {
190 	Optional<UID> debugID;
191 	ReplyPromise<GetReadVersionReply> reply;
192 
debugIDGetRawCommittedVersionRequest193 	explicit GetRawCommittedVersionRequest(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
194 
195 	template <class Ar>
serializeGetRawCommittedVersionRequest196 	void serialize( Ar& ar ) {
197 		serializer(ar, debugID, reply);
198 	}
199 };
200 
201 struct GetStorageServerRejoinInfoReply {
202 	Version version;
203 	Tag tag;
204 	Optional<Tag> newTag;
205 	bool newLocality;
206 	vector<pair<Version, Tag>> history;
207 
208 	template <class Ar>
serializeGetStorageServerRejoinInfoReply209 	void serialize(Ar& ar) {
210 		serializer(ar, version, tag, newTag, newLocality, history);
211 	}
212 };
213 
214 struct GetStorageServerRejoinInfoRequest {
215 	UID id;
216 	Optional<Value> dcId;
217 	ReplyPromise< GetStorageServerRejoinInfoReply > reply;
218 
GetStorageServerRejoinInfoRequestGetStorageServerRejoinInfoRequest219 	GetStorageServerRejoinInfoRequest() {}
GetStorageServerRejoinInfoRequestGetStorageServerRejoinInfoRequest220 	explicit GetStorageServerRejoinInfoRequest( UID const& id, Optional<Value> const& dcId ) : id(id), dcId(dcId) {}
221 
222 	template <class Ar>
serializeGetStorageServerRejoinInfoRequest223 	void serialize( Ar& ar ) {
224 		serializer(ar, id, dcId, reply);
225 	}
226 };
227 
228 struct TxnStateRequest {
229 	Arena arena;
230 	VectorRef<KeyValueRef> data;
231 	Sequence sequence;
232 	bool last;
233 	ReplyPromise<Void> reply;
234 
235 	template <class Ar>
serializeTxnStateRequest236 	void serialize(Ar& ar) {
237 		serializer(ar, data, sequence, last, reply, arena);
238 	}
239 };
240 
241 struct GetHealthMetricsRequest
242 {
243 	ReplyPromise<struct GetHealthMetricsReply> reply;
244 	bool detailed;
245 
detailedGetHealthMetricsRequest246 	explicit GetHealthMetricsRequest(bool detailed = false) : detailed(detailed) {}
247 
248 	template <class Ar>
serializeGetHealthMetricsRequest249 	void serialize(Ar& ar)
250 	{
251 		serializer(ar, reply, detailed);
252 	}
253 };
254 
255 struct GetHealthMetricsReply
256 {
257 	Standalone<StringRef> serialized;
258 	HealthMetrics healthMetrics;
259 
260 	explicit GetHealthMetricsReply(const HealthMetrics& healthMetrics = HealthMetrics()) :
healthMetricsGetHealthMetricsReply261 		healthMetrics(healthMetrics)
262 	{
263 		update(healthMetrics, true, true);
264 	}
265 
updateGetHealthMetricsReply266 	void update(const HealthMetrics& healthMetrics, bool detailedInput, bool detailedOutput)
267 	{
268 		this->healthMetrics.update(healthMetrics, detailedInput, detailedOutput);
269 		BinaryWriter bw(IncludeVersion());
270 		bw << this->healthMetrics;
271 		serialized = bw.toValue();
272 	}
273 
274 	template <class Ar>
serializeGetHealthMetricsReply275 	void serialize(Ar& ar) {
276 		serializer(ar, serialized);
277 		if (ar.isDeserializing) {
278 			BinaryReader br(serialized, IncludeVersion());
279 			br >> healthMetrics;
280 		}
281 	}
282 };
283 
284 #endif
285