1 /*
2  * MasterProxyServer.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "flow/ActorCollection.h"
22 #include "fdbclient/MasterProxyInterface.h"
23 #include "fdbclient/NativeAPI.actor.h"
24 #include "fdbserver/MasterInterface.h"
25 #include "fdbserver/WorkerInterface.actor.h"
26 #include "fdbserver/WaitFailure.h"
27 #include "fdbserver/Knobs.h"
28 #include "fdbserver/ServerDBInfo.h"
29 #include "fdbserver/LogSystem.h"
30 #include "fdbserver/LogSystemDiskQueueAdapter.h"
31 #include "fdbserver/IKeyValueStore.h"
32 #include "fdbclient/SystemData.h"
33 #include "fdbrpc/sim_validation.h"
34 #include "fdbclient/Notified.h"
35 #include "fdbclient/KeyRangeMap.h"
36 #include "fdbserver/ConflictSet.h"
37 #include "flow/Stats.h"
38 #include "fdbserver/ApplyMetadataMutation.h"
39 #include "fdbserver/RecoveryState.h"
40 #include "fdbserver/LatencyBandConfig.h"
41 #include "fdbclient/Atomic.h"
42 #include "flow/TDMetric.actor.h"
43 #include "flow/actorcompiler.h"  // This must be the last #include.
44 
45 struct ProxyStats {
46 	CounterCollection cc;
47 	Counter txnStartIn, txnStartOut, txnStartBatch;
48 	Counter txnSystemPriorityStartIn, txnSystemPriorityStartOut;
49 	Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut;
50 	Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
51 	Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess;
52 	Counter txnConflicts;
53 	Counter commitBatchIn, commitBatchOut;
54 	Counter mutationBytes;
55 	Counter mutations;
56 	Counter conflictRanges;
57 	Version lastCommitVersionAssigned;
58 
59 	LatencyBands commitLatencyBands;
60 	LatencyBands grvLatencyBands;
61 
62 	Future<Void> logger;
63 
ProxyStatsProxyStats64 	explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
65 	  : cc("ProxyStats", id.toString()),
66 		txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
67 		txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),	txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
68 		txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), lastCommitVersionAssigned(0),
69 		commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
70 	{
71 		specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
72 		specialCounter(cc, "Version", [pVersion](){return *pVersion; });
73 		specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
74 		specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
75 		logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
76 	}
77 };
78 
getRate(UID myID,Reference<AsyncVar<ServerDBInfo>> db,int64_t * inTransactionCount,int64_t * inBatchTransactionCount,double * outTransactionRate,double * outBatchTransactionRate,GetHealthMetricsReply * healthMetricsReply,GetHealthMetricsReply * detailedHealthMetricsReply)79 ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
80 						   double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
81 	state Future<Void> nextRequestTimer = Never();
82 	state Future<Void> leaseTimeout = Never();
83 	state Future<GetRateInfoReply> reply = Never();
84 	state double lastDetailedReply = 0.0; // request detailed metrics immediately
85 	state bool expectingDetailedReply = false;
86 	state int64_t lastTC = 0;
87 
88 	if (db->get().ratekeeper.present()) nextRequestTimer = Void();
89 	loop choose {
90 		when ( wait( db->onChange() ) ) {
91 			if ( db->get().ratekeeper.present() ) {
92 				TraceEvent("Proxy_RatekeeperChanged", myID)
93 				.detail("RKID", db->get().ratekeeper.get().id());
94 				nextRequestTimer = Void();  // trigger GetRate request
95 			} else {
96 				TraceEvent("Proxy_RatekeeperDied", myID);
97 				nextRequestTimer = Never();
98 				reply = Never();
99 			}
100 		}
101 		when ( wait( nextRequestTimer ) ) {
102 			nextRequestTimer = Never();
103 			bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
104 			reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed)));
105 			expectingDetailedReply = detailed;
106 		}
107 		when ( GetRateInfoReply rep = wait(reply) ) {
108 			reply = Never();
109 			*outTransactionRate = rep.transactionRate;
110 			*outBatchTransactionRate = rep.batchTransactionRate;
111 			//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
112 			lastTC = *inTransactionCount;
113 			leaseTimeout = delay(rep.leaseDuration);
114 			nextRequestTimer = delayJittered(rep.leaseDuration / 2);
115 			healthMetricsReply->update(rep.healthMetrics, expectingDetailedReply, true);
116 			if (expectingDetailedReply) {
117 				detailedHealthMetricsReply->update(rep.healthMetrics, true, true);
118 				lastDetailedReply = now();
119 			}
120 		}
121 		when ( wait( leaseTimeout ) ) {
122 			*outTransactionRate = 0;
123 			*outBatchTransactionRate = 0;
124 			//TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("BatchRate", 0).detail("Lease", "Expired");
125 			leaseTimeout = Never();
126 		}
127 	}
128 }
129 
queueTransactionStartRequests(std::priority_queue<std::pair<GetReadVersionRequest,int64_t>,std::vector<std::pair<GetReadVersionRequest,int64_t>>> * transactionQueue,FutureStream<GetReadVersionRequest> readVersionRequests,PromiseStream<Void> GRVTimer,double * lastGRVTime,double * GRVBatchTime,FutureStream<double> replyTimes,ProxyStats * stats)130 ACTOR Future<Void> queueTransactionStartRequests(
131 	std::priority_queue< std::pair<GetReadVersionRequest, int64_t>, std::vector< std::pair<GetReadVersionRequest, int64_t> > > *transactionQueue,
132 	FutureStream<GetReadVersionRequest> readVersionRequests,
133 	PromiseStream<Void> GRVTimer, double *lastGRVTime,
134 	double *GRVBatchTime, FutureStream<double> replyTimes,
135 	ProxyStats* stats)
136 {
137 	state int64_t counter = 0;
138 	loop choose{
139 		when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
140 			if (req.debugID.present())
141 				g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
142 
143 			stats->txnStartIn += req.transactionCount;
144 			if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE)
145 				stats->txnSystemPriorityStartIn += req.transactionCount;
146 			else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT)
147 				stats->txnDefaultPriorityStartIn += req.transactionCount;
148 			else
149 				stats->txnBatchPriorityStartIn += req.transactionCount;
150 
151 			if (transactionQueue->empty()) {
152 				if (now() - *lastGRVTime > *GRVBatchTime)
153 					*lastGRVTime = now() - *GRVBatchTime;
154 
155 				forwardPromise(GRVTimer, delayJittered(*GRVBatchTime - (now() - *lastGRVTime), TaskProxyGRVTimer));
156 			}
157 
158 			transactionQueue->push(std::make_pair(req, counter--));
159 		}
160 		// dynamic batching monitors reply latencies
161 		when(double reply_latency = waitNext(replyTimes)) {
162 			double target_latency = reply_latency * SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION;
163 			*GRVBatchTime =
164 				std::max(SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN,
165 					std::min(SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MAX,
166 						target_latency * SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA + *GRVBatchTime * (1-SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA)));
167 		}
168 	}
169 }
170 
discardCommit(UID id,Future<LogSystemDiskQueueAdapter::CommitMessage> fcm,Future<Void> dummyCommitState)171 ACTOR void discardCommit(UID id, Future<LogSystemDiskQueueAdapter::CommitMessage> fcm, Future<Void> dummyCommitState) {
172 	ASSERT(!dummyCommitState.isReady());
173 	LogSystemDiskQueueAdapter::CommitMessage cm = wait(fcm);
174 	TraceEvent("Discarding", id).detail("Count", cm.messages.size());
175 	cm.acknowledge.send(Void());
176 	ASSERT(dummyCommitState.isReady());
177 }
178 
179 DESCR struct SingleKeyMutation {
180 	Standalone<StringRef> shardBegin;
181 	Standalone<StringRef> shardEnd;
182 	int64_t tag1;
183 	int64_t tag2;
184 	int64_t tag3;
185 };
186 
187 struct ProxyCommitData {
188 	UID dbgid;
189 	int64_t commitBatchesMemBytesCount;
190 	ProxyStats stats;
191 	MasterInterface master;
192 	vector<ResolverInterface> resolvers;
193 	LogSystemDiskQueueAdapter* logAdapter;
194 	Reference<ILogSystem> logSystem;
195 	IKeyValueStore* txnStateStore;
196 	NotifiedVersion committedVersion; // Provided that this recovery has succeeded or will succeed, this version is fully committed (durable)
197 	Version minKnownCommittedVersion; // No version smaller than this one will be used as the known committed version during recovery
198 	Version version;  // The version at which txnStateStore is up to date
199 	Promise<Void> validState;  // Set once txnStateStore and version are valid
200 	double lastVersionTime;
201 	KeyRangeMap<std::set<Key>> vecBackupKeys;
202 	uint64_t commitVersionRequestNumber;
203 	uint64_t mostRecentProcessedRequestNumber;
204 	KeyRangeMap<Deque<std::pair<Version,int>>> keyResolvers;
205 	KeyRangeMap<ServerCacheInfo> keyInfo;
206 	std::map<Key, applyMutationsData> uid_applyMutationsData;
207 	bool firstProxy;
208 	double lastCoalesceTime;
209 	bool locked;
210 	Optional<Value> metadataVersion;
211 	double commitBatchInterval;
212 
213 	int64_t localCommitBatchesStarted;
214 	NotifiedVersion latestLocalCommitBatchResolving;
215 	NotifiedVersion latestLocalCommitBatchLogging;
216 
217 	PromiseStream<Void> commitBatchStartNotifications;
218 	PromiseStream<Future<GetCommitVersionReply>> commitBatchVersions;  // 1:1 with commitBatchStartNotifications
219 	RequestStream<GetReadVersionRequest> getConsistentReadVersion;
220 	RequestStream<CommitTransactionRequest> commit;
221 	Database cx;
222 	EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
223 
224 	std::map<UID, Reference<StorageInfo>> storageCache;
225 	std::map<Tag, Version> tag_popped;
226 	Deque<std::pair<Version, Version>> txsPopVersions;
227 	Version lastTxsPop;
228 	bool popRemoteTxs;
229 
230 	Optional<LatencyBandConfig> latencyBandConfig;
231 
232 	//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
233 	//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
234 	//We do not repopulate them immediately to avoid a slow task.
tagsForKeyProxyCommitData235 	const vector<Tag>& tagsForKey(StringRef key) {
236 		auto& tags = keyInfo[key].tags;
237 		if(!tags.size()) {
238 			auto& r = keyInfo.rangeContaining(key).value();
239 			for(auto info : r.src_info) {
240 				r.tags.push_back(info->tag);
241 			}
242 			for(auto info : r.dest_info) {
243 				r.tags.push_back(info->tag);
244 			}
245 			uniquify(r.tags);
246 			return r.tags;
247 		}
248 		return tags;
249 	}
250 
ProxyCommitDataProxyCommitData251 	ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit, Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
252 		: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
253 			logAdapter(NULL), txnStateStore(NULL), popRemoteTxs(false),
254 			committedVersion(recoveryTransactionVersion), version(0), minKnownCommittedVersion(0),
255 			lastVersionTime(0), commitVersionRequestNumber(1), mostRecentProcessedRequestNumber(0),
256 			getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0),
257 			localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
258 			firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)),
259 			singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0)
260 	{}
261 };
262 
263 struct ResolutionRequestBuilder {
264 	ProxyCommitData* self;
265 	vector<ResolveTransactionBatchRequest> requests;
266 	vector<vector<int>> transactionResolverMap;
267 	vector<CommitTransactionRef*> outTr;
268 
ResolutionRequestBuilderResolutionRequestBuilder269 	ResolutionRequestBuilder( ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion) : self(self), requests(self->resolvers.size()) {
270 		for(auto& req : requests) {
271 			req.prevVersion = prevVersion;
272 			req.version = version;
273 			req.lastReceivedVersion = lastReceivedVersion;
274 		}
275 	}
276 
getOutTransactionResolutionRequestBuilder277 	CommitTransactionRef& getOutTransaction(int resolver, Version read_snapshot) {
278 		CommitTransactionRef *& out = outTr[resolver];
279 		if (!out) {
280 			ResolveTransactionBatchRequest& request = requests[resolver];
281 			request.transactions.resize(request.arena, request.transactions.size() + 1);
282 			out = &request.transactions.back();
283 			out->read_snapshot = read_snapshot;
284 		}
285 		return *out;
286 	}
287 
addTransactionResolutionRequestBuilder288 	void addTransaction(CommitTransactionRef& trIn, int transactionNumberInBatch) {
289 		// SOMEDAY: There are a couple of unnecessary O( # resolvers ) steps here
290 		outTr.assign(requests.size(), NULL);
291 		ASSERT( transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768 );
292 
293 		bool isTXNStateTransaction = false;
294 		for (auto & m : trIn.mutations) {
295 			if (m.type == MutationRef::SetVersionstampedKey) {
296 				transformVersionstampMutation( m, &MutationRef::param1, requests[0].version, transactionNumberInBatch );
297 				trIn.write_conflict_ranges.push_back( requests[0].arena, singleKeyRange( m.param1, requests[0].arena ) );
298 			} else if (m.type == MutationRef::SetVersionstampedValue) {
299 				transformVersionstampMutation( m, &MutationRef::param2, requests[0].version, transactionNumberInBatch );
300 			}
301 			if (isMetadataMutation(m)) {
302 				isTXNStateTransaction = true;
303 				getOutTransaction(0, trIn.read_snapshot).mutations.push_back(requests[0].arena, m);
304 			}
305 		}
306 		for(auto& r : trIn.read_conflict_ranges) {
307 			auto ranges = self->keyResolvers.intersectingRanges( r );
308 			std::set<int> resolvers;
309 			for(auto &ir : ranges) {
310 				auto& version_resolver = ir.value();
311 				for(int i = version_resolver.size()-1; i >= 0; i--) {
312 					resolvers.insert(version_resolver[i].second);
313 					if( version_resolver[i].first < trIn.read_snapshot )
314 						break;
315 				}
316 			}
317 			ASSERT(resolvers.size());
318 			for(int resolver : resolvers)
319 				getOutTransaction( resolver, trIn.read_snapshot ).read_conflict_ranges.push_back( requests[resolver].arena, r );
320 		}
321 		for(auto& r : trIn.write_conflict_ranges) {
322 			auto ranges = self->keyResolvers.intersectingRanges( r );
323 			std::set<int> resolvers;
324 			for(auto &ir : ranges)
325 				resolvers.insert(ir.value().back().second);
326 			ASSERT(resolvers.size());
327 			for(int resolver : resolvers)
328 				getOutTransaction( resolver, trIn.read_snapshot ).write_conflict_ranges.push_back( requests[resolver].arena, r );
329 		}
330 		if (isTXNStateTransaction)
331 			for (int r = 0; r<requests.size(); r++) {
332 				int transactionNumberInRequest = &getOutTransaction(r, trIn.read_snapshot) - requests[r].transactions.begin();
333 				requests[r].txnStateTransactions.push_back(requests[r].arena, transactionNumberInRequest);
334 			}
335 
336 		vector<int> resolversUsed;
337 		for (int r = 0; r<outTr.size(); r++)
338 			if (outTr[r])
339 				resolversUsed.push_back(r);
340 		transactionResolverMap.push_back(std::move(resolversUsed));
341 	}
342 };
343 
commitBatcher(ProxyCommitData * commitData,PromiseStream<std::pair<std::vector<CommitTransactionRequest>,int>> out,FutureStream<CommitTransactionRequest> in,int desiredBytes,int64_t memBytesLimit)344 ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std::pair<std::vector<CommitTransactionRequest>, int> > out, FutureStream<CommitTransactionRequest> in, int desiredBytes, int64_t memBytesLimit) {
345 	wait(delayJittered(commitData->commitBatchInterval, TaskProxyCommitBatcher));
346 
347 	state double lastBatch = 0;
348 
349 	loop{
350 		state Future<Void> timeout;
351 		state std::vector<CommitTransactionRequest> batch;
352 		state int batchBytes = 0;
353 
354 		if(SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL <= 0) {
355 			timeout = Never();
356 		}
357 		else {
358 			timeout = delayJittered(SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL, TaskProxyCommitBatcher);
359 		}
360 
361 		while(!timeout.isReady() && !(batch.size() == SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX || batchBytes >= desiredBytes)) {
362 			choose{
363 				when(CommitTransactionRequest req = waitNext(in)) {
364 					int bytes = getBytes(req);
365 
366 					// Drop requests if memory is under severe pressure
367 					if(commitData->commitBatchesMemBytesCount + bytes > memBytesLimit) {
368 						req.reply.sendError(proxy_memory_limit_exceeded());
369 						TraceEvent(SevWarnAlways, "ProxyCommitBatchMemoryThresholdExceeded").suppressFor(60).detail("MemBytesCount", commitData->commitBatchesMemBytesCount).detail("MemLimit", memBytesLimit);
370 						continue;
371 					}
372 
373 					++commitData->stats.txnCommitIn;
374 
375 					if(req.debugID.present()) {
376 						g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "MasterProxyServer.batcher");
377 					}
378 
379 					if(!batch.size()) {
380 						commitData->commitBatchStartNotifications.send(Void());
381 						if(now() - lastBatch > commitData->commitBatchInterval) {
382 							timeout = delayJittered(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, TaskProxyCommitBatcher);
383 						}
384 						else {
385 							timeout = delayJittered(commitData->commitBatchInterval - (now() - lastBatch), TaskProxyCommitBatcher);
386 						}
387 					}
388 
389 					if((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) {
390 						out.send({ batch, batchBytes });
391 						lastBatch = now();
392 						commitData->commitBatchStartNotifications.send(Void());
393 						timeout = delayJittered(commitData->commitBatchInterval, TaskProxyCommitBatcher);
394 						batch = std::vector<CommitTransactionRequest>();
395 						batchBytes = 0;
396 					}
397 
398 					batch.push_back(req);
399 					batchBytes += bytes;
400 					commitData->commitBatchesMemBytesCount += bytes;
401 				}
402 				when(wait(timeout)) {}
403 			}
404 		}
405 		out.send({ std::move(batch), batchBytes });
406 		lastBatch = now();
407 	}
408 }
409 
commitBatch(ProxyCommitData * self,vector<CommitTransactionRequest> trs,int currentBatchMemBytesCount)410 ACTOR Future<Void> commitBatch(
411 	ProxyCommitData* self,
412 	vector<CommitTransactionRequest> trs,
413 	int currentBatchMemBytesCount)
414 {
415 	state int64_t localBatchNumber = ++self->localCommitBatchesStarted;
416 	state LogPushData toCommit(self->logSystem);
417 	state double t1 = now();
418 	state Optional<UID> debugID;
419 	state bool forceRecovery = false;
420 
421 	ASSERT(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT);  // since we are using just the former to limit the number of versions actually in flight!
422 
423 	// Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit batches) so we need to downgrade here
424 	wait(delay(0, TaskProxyCommit));
425 
426 	self->lastVersionTime = t1;
427 
428 	++self->stats.commitBatchIn;
429 
430 	for (int t = 0; t<trs.size(); t++) {
431 		if (trs[t].debugID.present()) {
432 			if (!debugID.present())
433 				debugID = g_nondeterministic_random->randomUniqueID();
434 			g_traceBatch.addAttach("CommitAttachID", trs[t].debugID.get().first(), debugID.get().first());
435 		}
436 	}
437 
438 	if(localBatchNumber == 2 && !debugID.present() && self->firstProxy && !g_network->isSimulated()) {
439 		debugID = g_random->randomUniqueID();
440 		TraceEvent("SecondCommitBatch", self->dbgid).detail("DebugID", debugID.get());
441 	}
442 
443 	if (debugID.present())
444 		g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before");
445 
446 	if (trs.empty()) {
447 		// We are sending an empty batch, so we have to trigger the version fetcher
448 		self->commitBatchStartNotifications.send(Void());
449 	}
450 
451 	/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
452 	TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
453 	wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
454 	wait(yield());
455 
456 	if (debugID.present())
457 		g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
458 
459 	Future<GetCommitVersionReply> fVersionReply = waitNext(self->commitBatchVersions.getFuture());
460 	GetCommitVersionReply versionReply = wait(fVersionReply);
461 	self->mostRecentProcessedRequestNumber = versionReply.requestNum;
462 
463 	self->stats.txnCommitVersionAssigned += trs.size();
464 	self->stats.lastCommitVersionAssigned = versionReply.version;
465 
466 	state Version commitVersion = versionReply.version;
467 	state Version prevVersion = versionReply.prevVersion;
468 
469 	for(auto it : versionReply.resolverChanges) {
470 		auto rs = self->keyResolvers.modify(it.range);
471 		for(auto r = rs.begin(); r != rs.end(); ++r)
472 			r->value().push_back(std::make_pair(versionReply.resolverChangesVersion,it.dest));
473 	}
474 
475 	//TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion);
476 
477 	if (debugID.present())
478 		g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
479 
480 	ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
481 	int conflictRangeCount = 0;
482 	state int64_t maxTransactionBytes = 0;
483 	for (int t = 0; t<trs.size(); t++) {
484 		requests.addTransaction(trs[t].transaction, t);
485 		conflictRangeCount += trs[t].transaction.read_conflict_ranges.size() + trs[t].transaction.write_conflict_ranges.size();
486 		//TraceEvent("MPTransactionDump", self->dbgid).detail("Snapshot", trs[t].transaction.read_snapshot);
487 		//for(auto& m : trs[t].transaction.mutations)
488 		maxTransactionBytes = std::max<int64_t>(maxTransactionBytes, trs[t].transaction.expectedSize());
489 		//	TraceEvent("MPTransactionsDump", self->dbgid).detail("Mutation", m.toString());
490 	}
491 	self->stats.conflictRanges += conflictRangeCount;
492 
493 	for (int r = 1; r<self->resolvers.size(); r++)
494 		ASSERT(requests.requests[r].txnStateTransactions.size() == requests.requests[0].txnStateTransactions.size());
495 
496 	// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with resolution processing but is still using CPU
497 	self->stats.txnCommitResolving += trs.size();
498 	vector< Future<ResolveTransactionBatchReply> > replies;
499 	for (int r = 0; r<self->resolvers.size(); r++) {
500 		requests.requests[r].debugID = debugID;
501 		replies.push_back(brokenPromiseToNever(self->resolvers[r].resolve.getReply(requests.requests[r], TaskProxyResolverReply)));
502 	}
503 
504 	state vector<vector<int>> transactionResolverMap = std::move( requests.transactionResolverMap );
505 
506 	ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1);
507 	self->latestLocalCommitBatchResolving.set(localBatchNumber);
508 
509 	/////// Phase 2: Resolution (waiting on the network; pipelined)
510 	state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );
511 
512 	if (debugID.present())
513 		g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterResolution");
514 
515 	////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be)
516 	TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber-1); // Queuing post-resolution commit processing
517 	wait(self->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber-1));
518 	wait(yield());
519 
520 	self->stats.txnCommitResolved += trs.size();
521 
522 	if (debugID.present())
523 		g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.ProcessingMutations");
524 
525 	state Arena arena;
526 	state bool isMyFirstBatch = !self->version;
527 	state Optional<Value> oldCoordinators = self->txnStateStore->readValue(coordinatorsKey).get();
528 
529 	//TraceEvent("ResolutionResult", self->dbgid).detail("Sequence", sequence).detail("Version", commitVersion).detail("StateMutationProxies", resolution[0].stateMutations.size()).detail("WaitForResolution", now()-t1).detail("R0Committed", resolution[0].committed.size())
530 	//	.detail("Transactions", trs.size());
531 
532 	for(int r=1; r<resolution.size(); r++) {
533 		ASSERT( resolution[r].stateMutations.size() == resolution[0].stateMutations.size() );
534 		for(int s=0; s<resolution[r].stateMutations.size(); s++)
535 			ASSERT( resolution[r].stateMutations[s].size() == resolution[0].stateMutations[s].size() );
536 	}
537 
538 	// Compute and apply "metadata" effects of each other proxy's most recent batch
539 	bool initialState = isMyFirstBatch;
540 	state bool firstStateMutations = isMyFirstBatch;
541 	state vector< std::pair<Future<LogSystemDiskQueueAdapter::CommitMessage>, Future<Void>> > storeCommits;
542 	for (int versionIndex = 0; versionIndex < resolution[0].stateMutations.size(); versionIndex++) {
543 		// self->logAdapter->setNextVersion( ??? );  << Ideally we would be telling the log adapter that the pushes in this commit will be in the version at which these state mutations were committed by another proxy, but at present we don't have that information here.  So the disk queue may be unnecessarily conservative about popping.
544 
545 		for (int transactionIndex = 0; transactionIndex < resolution[0].stateMutations[versionIndex].size() && !forceRecovery; transactionIndex++) {
546 			bool committed = true;
547 			for (int resolver = 0; resolver < resolution.size(); resolver++)
548 				committed = committed && resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
549 			if (committed)
550 				applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache, &self->tag_popped);
551 
552 			if( resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() && firstStateMutations ) {
553 				ASSERT(committed);
554 				firstStateMutations = false;
555 				forceRecovery = false;
556 			}
557 			//TraceEvent("MetadataTransaction", self->dbgid).detail("Committed", committed).detail("Mutations", resolution[0].stateMutations[versionIndex][transactionIndex].second.size()).detail("R1Mutations", resolution.back().stateMutations[versionIndex][transactionIndex].second.size());
558 		}
559 		//TraceEvent("MetadataBatch", self->dbgid).detail("Transactions", resolution[0].stateMutations[versionIndex].size());
560 
561 		// These changes to txnStateStore will be committed by the other proxy, so we simply discard the commit message
562 		auto fcm = self->logAdapter->getCommitMessage();
563 		storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit()));
564 		//discardCommit( dbgid, fcm, txnStateStore->commit() );
565 
566 		if (initialState) {
567 			//TraceEvent("ResyncLog", dbgid);
568 			initialState = false;
569 			forceRecovery = false;
570 			self->txnStateStore->resyncLog();
571 
572 			for (auto &p : storeCommits) {
573 				ASSERT(!p.second.isReady());
574 				p.first.get().acknowledge.send(Void());
575 				ASSERT(p.second.isReady());
576 			}
577 			storeCommits.clear();
578 		}
579 	}
580 
581 	// Determine which transactions actually committed (conservatively) by combining results from the resolvers
582 	state vector<uint8_t> committed(trs.size());
583 	ASSERT(transactionResolverMap.size() == committed.size());
584 	vector<int> nextTr(resolution.size());
585 	for (int t = 0; t<trs.size(); t++) {
586 		uint8_t commit = ConflictBatch::TransactionCommitted;
587 		for (int r : transactionResolverMap[t])
588 		{
589 			commit = std::min(resolution[r].committed[nextTr[r]++], commit);
590 		}
591 		committed[t] = commit;
592 	}
593 	for (int r = 0; r<resolution.size(); r++)
594 		ASSERT(nextTr[r] == resolution[r].committed.size());
595 
596 	self->logAdapter->setNextVersion(commitVersion);
597 
598 	state Optional<Key> lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
599 	state bool locked = lockedKey.present() && lockedKey.get().size();
600 
601 	state Optional<Key> mustContainSystemKey = self->txnStateStore->readValue(mustContainSystemMutationsKey).get();
602 	if(mustContainSystemKey.present() && mustContainSystemKey.get().size()) {
603 		for (int t = 0; t<trs.size(); t++) {
604 			if( committed[t] == ConflictBatch::TransactionCommitted ) {
605 				bool foundSystem = false;
606 				for(auto& m : trs[t].transaction.mutations) {
607 					if( ( m.type == MutationRef::ClearRange ? m.param2 : m.param1 ) >= nonMetadataSystemKeys.end) {
608 						foundSystem = true;
609 						break;
610 					}
611 				}
612 				if(!foundSystem) {
613 					committed[t] = ConflictBatch::TransactionConflict;
614 				}
615 			}
616 		}
617 	}
618 
619 	if(forceRecovery) {
620 		wait( Future<Void>(Never()) );
621 	}
622 
623 	// This first pass through committed transactions deals with "metadata" effects (modifications of txnStateStore, changes to storage servers' responsibilities)
624 	int t;
625 	state int commitCount = 0;
626 	for (t = 0; t < trs.size() && !forceRecovery; t++)
627 	{
628 		if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
629 			commitCount++;
630 			applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache, &self->tag_popped);
631 		}
632 		if(firstStateMutations) {
633 			ASSERT(committed[t] == ConflictBatch::TransactionCommitted);
634 			firstStateMutations = false;
635 			forceRecovery = false;
636 		}
637 	}
638 	if (forceRecovery) {
639 		for (; t<trs.size(); t++)
640 			committed[t] = ConflictBatch::TransactionConflict;
641 		TraceEvent(SevWarn, "RestartingTxnSubsystem", self->dbgid).detail("Stage", "AwaitCommit");
642 	}
643 
644 	lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
645 	state bool lockedAfter = lockedKey.present() && lockedKey.get().size();
646 
647 	state Optional<Value> metadataVersionAfter = self->txnStateStore->readValue(metadataVersionKey).get();
648 
649 	auto fcm = self->logAdapter->getCommitMessage();
650 	storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit()));
651 	self->version = commitVersion;
652 	if (!self->validState.isSet()) self->validState.send(Void());
653 	ASSERT(commitVersion);
654 
655 	if (!isMyFirstBatch && self->txnStateStore->readValue( coordinatorsKey ).get().get() != oldCoordinators.get()) {
656 		wait( brokenPromiseToNever( self->master.changeCoordinators.getReply( ChangeCoordinatorsRequest( self->txnStateStore->readValue( coordinatorsKey ).get().get() ) ) ) );
657 		ASSERT(false);   // ChangeCoordinatorsRequest should always throw
658 	}
659 
660 	// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers' tags
661 	state int mutationCount = 0;
662 	state int mutationBytes = 0;
663 
664 	state std::map<Key, MutationListRef> logRangeMutations;
665 	state Arena logRangeMutationsArena;
666 	state uint32_t v = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
667 	state int transactionNum = 0;
668 	state int yieldBytes = 0;
669 
670 	for (; transactionNum<trs.size(); transactionNum++) {
671 		if (committed[transactionNum] == ConflictBatch::TransactionCommitted && (!locked || trs[transactionNum].isLockAware())) {
672 			state int mutationNum = 0;
673 			state VectorRef<MutationRef>* pMutations = &trs[transactionNum].transaction.mutations;
674 			for (; mutationNum < pMutations->size(); mutationNum++) {
675 				if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
676 					yieldBytes = 0;
677 					wait(yield());
678 				}
679 
680 				auto& m = (*pMutations)[mutationNum];
681 				mutationCount++;
682 				mutationBytes += m.expectedSize();
683 				yieldBytes += m.expectedSize();
684 				// Determine the set of tags (responsible storage servers) for the mutation, splitting it
685 				// if necessary.  Serialize (splits of) the mutation into the message buffer and add the tags.
686 
687 				if (isSingleKeyMutation((MutationRef::Type) m.type)) {
688 					auto& tags = self->tagsForKey(m.param1);
689 
690 					if(self->singleKeyMutationEvent->enabled) {
691 						KeyRangeRef shard = self->keyInfo.rangeContaining(m.param1).range();
692 						self->singleKeyMutationEvent->tag1 = (int64_t)tags[0].id;
693 						self->singleKeyMutationEvent->tag2 = (int64_t)tags[1].id;
694 						self->singleKeyMutationEvent->tag3 = (int64_t)tags[2].id;
695 						self->singleKeyMutationEvent->shardBegin = shard.begin;
696 						self->singleKeyMutationEvent->shardEnd = shard.end;
697 						self->singleKeyMutationEvent->log();
698 					}
699 
700 					if (debugMutation("ProxyCommit", commitVersion, m))
701 						TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
702 					for (auto& tag : tags)
703 						toCommit.addTag(tag);
704 					toCommit.addTypedMessage(m);
705 				}
706 				else if (m.type == MutationRef::ClearRange) {
707 					auto ranges = self->keyInfo.intersectingRanges(KeyRangeRef(m.param1, m.param2));
708 					auto firstRange = ranges.begin();
709 					++firstRange;
710 					if (firstRange == ranges.end()) {
711 						// Fast path
712 						if (debugMutation("ProxyCommit", commitVersion, m))
713 							TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
714 
715 						auto& tags = ranges.begin().value().tags;
716 						if(!tags.size()) {
717 							for( auto info : ranges.begin().value().src_info ) {
718 								tags.push_back( info->tag );
719 							}
720 							for( auto info : ranges.begin().value().dest_info ) {
721 								tags.push_back( info->tag );
722 							}
723 							uniquify(tags);
724 						}
725 
726 						for (auto& tag : tags)
727 							toCommit.addTag(tag);
728 					}
729 					else {
730 						TEST(true); //A clear range extends past a shard boundary
731 						std::set<Tag> allSources;
732 						for (auto r : ranges) {
733 							auto& tags = r.value().tags;
734 							if(!tags.size()) {
735 								for( auto info : r.value().src_info ) {
736 									tags.push_back(info->tag);
737 								}
738 								for( auto info : r.value().dest_info ) {
739 									tags.push_back(info->tag);
740 								}
741 								uniquify(tags);
742 							}
743 							allSources.insert(tags.begin(), tags.end());
744 						}
745 						if (debugMutation("ProxyCommit", commitVersion, m))
746 							TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion);
747 						for (auto& tag : allSources)
748 							toCommit.addTag(tag);
749 					}
750 					toCommit.addTypedMessage(m);
751 				}
752 				else
753 					UNREACHABLE();
754 
755 
756 
757 				// Check on backing up key, if backup ranges are defined and a normal key
758 				if (self->vecBackupKeys.size() > 1 && (normalKeys.contains(m.param1) || m.param1 == metadataVersionKey)) {
759 					if (m.type != MutationRef::Type::ClearRange) {
760 						// Add the mutation to the relevant backup tag
761 						for (auto backupName : self->vecBackupKeys[m.param1]) {
762 							logRangeMutations[backupName].push_back_deep(logRangeMutationsArena, m);
763 						}
764 					}
765 					else {
766 						KeyRangeRef mutationRange(m.param1, m.param2);
767 						KeyRangeRef intersectionRange;
768 
769 						// Identify and add the intersecting ranges of the mutation to the array of mutations to serialize
770 						for (auto backupRange : self->vecBackupKeys.intersectingRanges(mutationRange))
771 						{
772 							// Get the backup sub range
773 							const auto&		backupSubrange = backupRange.range();
774 
775 							// Determine the intersecting range
776 							intersectionRange = mutationRange & backupSubrange;
777 
778 							// Create the custom mutation for the specific backup tag
779 							MutationRef		backupMutation(MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
780 
781 							// Add the mutation to the relevant backup tag
782 							for (auto backupName : backupRange.value()) {
783 								logRangeMutations[backupName].push_back_deep(logRangeMutationsArena, backupMutation);
784 							}
785 						}
786 					}
787 				}
788 			}
789 		}
790 	}
791 
792 	// Serialize and backup the mutations as a single mutation
793 	if ((self->vecBackupKeys.size() > 1) && logRangeMutations.size()) {
794 
795 		Key			val;
796 		MutationRef backupMutation;
797 		uint32_t*	partBuffer = NULL;
798 
799 		// Serialize the log range mutations within the map
800 		for (auto& logRangeMutation : logRangeMutations)
801 		{
802 			BinaryWriter wr(Unversioned());
803 
804 			// Serialize the log destination
805 			wr.serializeBytes( logRangeMutation.first );
806 
807 			// Write the log keys and version information
808 			wr << (uint8_t)hashlittle(&v, sizeof(v), 0);
809 			wr << bigEndian64(commitVersion);
810 
811 			backupMutation.type = MutationRef::SetValue;
812 			partBuffer = NULL;
813 
814 			val = BinaryWriter::toValue(logRangeMutation.second, IncludeVersion());
815 
816 			for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < val.size(); part++) {
817 
818 				// Assign the second parameter as the part
819 				backupMutation.param2 = val.substr(part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE,
820 					std::min(val.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, CLIENT_KNOBS->MUTATION_BLOCK_SIZE));
821 
822 				// Write the last part of the mutation to the serialization, if the buffer is not defined
823 				if (!partBuffer) {
824 					// Serialize the part to the writer
825 					wr << bigEndian32(part);
826 
827 					// Define the last buffer part
828 					partBuffer = (uint32_t*) ((char*) wr.getData() + wr.getLength() - sizeof(uint32_t));
829 				}
830 				else {
831 					*partBuffer = bigEndian32(part);
832 				}
833 
834 				// Define the mutation type and and location
835 				backupMutation.param1 = wr.toValue();
836 				ASSERT( backupMutation.param1.startsWith(logRangeMutation.first) );  // We are writing into the configured destination
837 
838 				auto& tags = self->tagsForKey(backupMutation.param1);
839 				for (auto& tag : tags)
840 					toCommit.addTag(tag);
841 				toCommit.addTypedMessage(backupMutation);
842 
843 //				if (debugMutation("BackupProxyCommit", commitVersion, backupMutation)) {
844 //					TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString())
845 //						.detail("BackupMutationSize", val.size()).detail("Version", commitVersion).detail("DestPath", logRangeMutation.first)
846 //						.detail("PartIndex", part).detail("PartIndexEndian", bigEndian32(part)).detail("PartData", backupMutation.param1);
847 //				}
848 			}
849 		}
850 	}
851 
852 	self->stats.mutations += mutationCount;
853 	self->stats.mutationBytes += mutationBytes;
854 
855 	// Storage servers mustn't make durable versions which are not fully committed (because then they are impossible to roll back)
856 	// We prevent this by limiting the number of versions which are semi-committed but not fully committed to be less than the MVCC window
857 	while (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
858 		// This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
859 		TEST(true);  // Semi-committed pipeline limited by MVCC window
860 		//TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
861 		choose{
862 			when(wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
863 				wait(yield());
864 				break;
865 			}
866 			when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(GetReadVersionRequest(0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE | GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
867 				if(v.version > self->committedVersion.get()) {
868 					self->locked = v.locked;
869 					self->metadataVersion = v.metadataVersion;
870 					self->committedVersion.set(v.version);
871 				}
872 
873 				if (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)
874 					wait(delay(SERVER_KNOBS->PROXY_SPIN_DELAY));
875 			}
876 		}
877 	}
878 
879 	state LogSystemDiskQueueAdapter::CommitMessage msg = wait(storeCommits.back().first); // Should just be doing yields
880 
881 	if (debugID.present())
882 		g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterStoreCommits");
883 
884 	// txnState (transaction subsystem state) tag: message extracted from log adapter
885 	bool firstMessage = true;
886 	for(auto m : msg.messages) {
887 		if(firstMessage) {
888 			toCommit.addTag(txsTag);
889 		}
890 		toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage);
891 		firstMessage = false;
892 	}
893 
894 	if ( prevVersion && commitVersion - prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT/2 )
895 		debug_advanceMaxCommittedVersion( UID(), commitVersion );  //< Is this valid?
896 
897 	//TraceEvent("ProxyPush", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion)
898 	//	.detail("TransactionsSubmitted", trs.size()).detail("TransactionsCommitted", commitCount).detail("TxsPopTo", msg.popTo);
899 
900 	if ( prevVersion && commitVersion - prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT/2 )
901 		debug_advanceMaxCommittedVersion(UID(), commitVersion);
902 
903 	Future<Version> loggingComplete = self->logSystem->push( prevVersion, commitVersion, self->committedVersion.get(), self->minKnownCommittedVersion, toCommit, debugID );
904 
905 	if (!forceRecovery) {
906 		ASSERT(self->latestLocalCommitBatchLogging.get() == localBatchNumber-1);
907 		self->latestLocalCommitBatchLogging.set(localBatchNumber);
908 	}
909 
910 	/////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above))
911 
912 	try {
913 		choose {
914 			when(Version ver = wait(loggingComplete)) {
915 				self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, ver);
916 			}
917 			when(wait(self->committedVersion.whenAtLeast( commitVersion+1 ))) {}
918 		}
919 	} catch(Error &e) {
920 		if(e.code() == error_code_broken_promise) {
921 			throw master_tlog_failed();
922 		}
923 		throw;
924 	}
925 	wait(yield());
926 
927 	if( self->popRemoteTxs && msg.popTo > ( self->txsPopVersions.size() ? self->txsPopVersions.back().second : self->lastTxsPop ) ) {
928 		if(self->txsPopVersions.size() >= SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) {
929 			TraceEvent(SevWarnAlways, "DiscardingTxsPopHistory").suppressFor(1.0);
930 			self->txsPopVersions.pop_front();
931 		}
932 
933 		self->txsPopVersions.push_back(std::make_pair(commitVersion, msg.popTo));
934 	}
935 	self->logSystem->pop(msg.popTo, txsTag);
936 
937 	/////// Phase 5: Replies (CPU bound; no particular order required, though ordered execution would be best for latency)
938 	if ( prevVersion && commitVersion - prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT/2 )
939 		debug_advanceMinCommittedVersion(UID(), commitVersion);
940 
941 	//TraceEvent("ProxyPushed", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
942 	if (debugID.present())
943 		g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterLogPush");
944 
945 	for (auto &p : storeCommits) {
946 		ASSERT(!p.second.isReady());
947 		p.first.get().acknowledge.send(Void());
948 		ASSERT(p.second.isReady());
949 	}
950 
951 	TEST(self->committedVersion.get() > commitVersion);   // A later version was reported committed first
952 	if( commitVersion > self->committedVersion.get() ) {
953 		self->locked = lockedAfter;
954 		self->metadataVersion = metadataVersionAfter;
955 		self->committedVersion.set(commitVersion);
956 	}
957 
958 	if (forceRecovery) {
959 		TraceEvent(SevWarn, "RestartingTxnSubsystem", self->dbgid).detail("Stage", "ProxyShutdown");
960 		throw worker_removed();
961 	}
962 
963 	// Send replies to clients
964 	double endTime = timer();
965 	for (int t = 0; t < trs.size(); t++) {
966 		if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
967 			ASSERT_WE_THINK(commitVersion != invalidVersion);
968 			trs[t].reply.send(CommitID(commitVersion, t, metadataVersionAfter));
969 		}
970 		else if (committed[t] == ConflictBatch::TransactionTooOld) {
971 			trs[t].reply.sendError(transaction_too_old());
972 		}
973 		else {
974 			trs[t].reply.sendError(not_committed());
975 		}
976 
977 		// TODO: filter if pipelined with large commit
978 		if(self->latencyBandConfig.present()) {
979 			bool filter = maxTransactionBytes > self->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault(std::numeric_limits<int>::max());
980 			self->stats.commitLatencyBands.addMeasurement(endTime - trs[t].requestTime, filter);
981 		}
982 	}
983 
984 	++self->stats.commitBatchOut;
985 	self->stats.txnCommitOut += trs.size();
986 	self->stats.txnConflicts += trs.size() - commitCount;
987 	self->stats.txnCommitOutSuccess += commitCount;
988 
989 	if(now() - self->lastCoalesceTime > SERVER_KNOBS->RESOLVER_COALESCE_TIME) {
990 		self->lastCoalesceTime = now();
991 		int lastSize = self->keyResolvers.size();
992 		auto rs = self->keyResolvers.ranges();
993 		Version oldestVersion = prevVersion - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
994 		for(auto r = rs.begin(); r != rs.end(); ++r) {
995 			while(r->value().size() > 1 && r->value()[1].first < oldestVersion)
996 				r->value().pop_front();
997 			if(r->value().size() && r->value().front().first < oldestVersion)
998 				r->value().front().first = 0;
999 		}
1000 		self->keyResolvers.coalesce(allKeys);
1001 		if(self->keyResolvers.size() != lastSize)
1002 			TraceEvent("KeyResolverSize", self->dbgid).detail("Size", self->keyResolvers.size());
1003 	}
1004 
1005 	// Dynamic batching for commits
1006 	double target_latency = (now() - t1) * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION;
1007 	self->commitBatchInterval =
1008 		std::max(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN,
1009 			std::min(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MAX,
1010 				target_latency * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA + self->commitBatchInterval * (1-SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA)));
1011 
1012 
1013 	self->commitBatchesMemBytesCount -= currentBatchMemBytesCount;
1014 	ASSERT_ABORT(self->commitBatchesMemBytesCount >= 0);
1015 	return Void();
1016 }
1017 
1018 
getLiveCommittedVersion(ProxyCommitData * commitData,uint32_t flags,vector<MasterProxyInterface> * otherProxies,Optional<UID> debugID,int transactionCount,int systemTransactionCount,int defaultPriTransactionCount,int batchPriTransactionCount)1019 ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID, int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
1020 {
1021 	// Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
1022 	// (1) The version returned is the committedVersion of some proxy at some point before the request returns, so it is committed.
1023 	// (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
1024 	//     and no other proxy could have already committed anything without first ending the epoch
1025 	++commitData->stats.txnStartBatch;
1026 
1027 	state vector<Future<GetReadVersionReply>> proxyVersions;
1028 	for (auto const& p : *otherProxies)
1029 		proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskTLogConfirmRunningReply)));
1030 
1031 	if (!(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY))
1032 	{
1033 		wait(commitData->logSystem->confirmEpochLive(debugID));
1034 	}
1035 
1036 	if (debugID.present())
1037 		g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive");
1038 
1039 	vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
1040 	GetReadVersionReply rep;
1041 	rep.version = commitData->committedVersion.get();
1042 	rep.locked = commitData->locked;
1043 	rep.metadataVersion = commitData->metadataVersion;
1044 
1045 	for (auto v : versions) {
1046 		if(v.version > rep.version) {
1047 			rep = v;
1048 		}
1049 	}
1050 
1051 	if (debugID.present())
1052 		g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
1053 
1054 	commitData->stats.txnStartOut += transactionCount;
1055 	commitData->stats.txnSystemPriorityStartOut += systemTransactionCount;
1056 	commitData->stats.txnDefaultPriorityStartOut += defaultPriTransactionCount;
1057 	commitData->stats.txnBatchPriorityStartOut += batchPriTransactionCount;
1058 
1059 	return rep;
1060 }
1061 
fetchVersions(ProxyCommitData * commitData)1062 ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
1063 	loop {
1064 		waitNext(commitData->commitBatchStartNotifications.getFuture());
1065 		GetCommitVersionRequest req(commitData->commitVersionRequestNumber++, commitData->mostRecentProcessedRequestNumber, commitData->dbgid);
1066 		commitData->commitBatchVersions.send(brokenPromiseToNever(commitData->master.getCommitVersion.getReply(req)));
1067 	}
1068 }
1069 
1070 struct TransactionRateInfo {
1071 	double rate;
1072 	double limit;
1073 
TransactionRateInfoTransactionRateInfo1074 	TransactionRateInfo(double rate) : rate(rate), limit(0) {}
1075 
resetTransactionRateInfo1076 	void reset(double elapsed) {
1077 		limit = std::min(0.0,limit) + std::min(rate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
1078 	}
1079 
canStartTransactionRateInfo1080 	bool canStart(int64_t numAlreadyStarted) {
1081 		return numAlreadyStarted < limit;
1082 	}
1083 
updateBudgetTransactionRateInfo1084 	void updateBudget(int64_t numStarted) {
1085 		limit -= numStarted;
1086 	}
1087 };
1088 
sendGrvReplies(Future<GetReadVersionReply> replyFuture,std::vector<GetReadVersionRequest> requests,ProxyStats * stats)1089 ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
1090 	GetReadVersionReply reply = wait(replyFuture);
1091 	double end = timer();
1092 	for(GetReadVersionRequest const& request : requests) {
1093 		stats->grvLatencyBands.addMeasurement(end - request.requestTime);
1094 		request.reply.send(reply);
1095 	}
1096 
1097 	return Void();
1098 }
1099 
transactionStarter(MasterProxyInterface proxy,Reference<AsyncVar<ServerDBInfo>> db,PromiseStream<Future<Void>> addActor,ProxyCommitData * commitData,GetHealthMetricsReply * healthMetricsReply,GetHealthMetricsReply * detailedHealthMetricsReply)1100 ACTOR static Future<Void> transactionStarter(
1101 	MasterProxyInterface proxy,
1102 	Reference<AsyncVar<ServerDBInfo>> db,
1103 	PromiseStream<Future<Void>> addActor,
1104 	ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply,
1105 	GetHealthMetricsReply* detailedHealthMetricsReply)
1106 {
1107 	state double lastGRVTime = 0;
1108 	state PromiseStream<Void> GRVTimer;
1109 	state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
1110 
1111 	state int64_t transactionCount = 0;
1112 	state int64_t batchTransactionCount = 0;
1113 	state TransactionRateInfo normalRateInfo(10);
1114 	state TransactionRateInfo batchRateInfo(0);
1115 
1116 	state std::priority_queue<std::pair<GetReadVersionRequest, int64_t>, std::vector<std::pair<GetReadVersionRequest, int64_t>>> transactionQueue;
1117 	state vector<MasterProxyInterface> otherProxies;
1118 
1119 	state PromiseStream<double> replyTimes;
1120 	addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
1121 	addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
1122 
1123 	// Get a list of the other proxies that go together with us
1124 	while (std::find(db->get().client.proxies.begin(), db->get().client.proxies.end(), proxy) == db->get().client.proxies.end())
1125 		wait(db->onChange());
1126 	for (MasterProxyInterface mp : db->get().client.proxies) {
1127 		if (mp != proxy)
1128 			otherProxies.push_back(mp);
1129 	}
1130 
1131 	ASSERT(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS);  // else potentially we could return uncommitted read versions (since self->committedVersion is only a committed version if this recovery succeeds)
1132 
1133 	TraceEvent("ProxyReadyForTxnStarts", proxy.id());
1134 
1135 	loop{
1136 		waitNext(GRVTimer.getFuture());
1137 		// Select zero or more transactions to start
1138 		double t = now();
1139 		double elapsed = std::min<double>(now() - lastGRVTime, SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MAX);
1140 		lastGRVTime = t;
1141 
1142 		if(elapsed == 0) elapsed = 1e-15; // resolve a possible indeterminant multiplication with infinite transaction rate
1143 
1144 		normalRateInfo.reset(elapsed);
1145 		batchRateInfo.reset(elapsed);
1146 
1147 		int transactionsStarted[2] = {0,0};
1148 		int systemTransactionsStarted[2] = {0,0};
1149 		int defaultPriTransactionsStarted[2] = { 0, 0 };
1150 		int batchPriTransactionsStarted[2] = { 0, 0 };
1151 
1152 		vector<vector<GetReadVersionRequest>> start(2);  // start[0] is transactions starting with !(flags&CAUSAL_READ_RISKY), start[1] is transactions starting with flags&CAUSAL_READ_RISKY
1153 		Optional<UID> debugID;
1154 
1155 		int requestsToStart = 0;
1156 		while (!transactionQueue.empty() && requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
1157 			auto& req = transactionQueue.top().first;
1158 			int tc = req.transactionCount;
1159 
1160 			if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
1161 				break;
1162 			}
1163 			else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
1164 				break;
1165 			}
1166 
1167 			if (req.debugID.present()) {
1168 				if (!debugID.present()) debugID = g_nondeterministic_random->randomUniqueID();
1169 				g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
1170 			}
1171 
1172 			transactionsStarted[req.flags&1] += tc;
1173 			if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE)
1174 				systemTransactionsStarted[req.flags & 1] += tc;
1175 			else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT)
1176 				defaultPriTransactionsStarted[req.flags & 1] += tc;
1177 			else
1178 				batchPriTransactionsStarted[req.flags & 1] += tc;
1179 
1180 			start[req.flags & 1].push_back(std::move(req));  static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
1181 			transactionQueue.pop();
1182 			requestsToStart++;
1183 		}
1184 
1185 		if (!transactionQueue.empty())
1186 			forwardPromise(GRVTimer, delayJittered(SERVER_KNOBS->START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, TaskProxyGRVTimer));
1187 
1188 		/*TraceEvent("GRVBatch", proxy.id())
1189 		.detail("Elapsed", elapsed)
1190 		.detail("NTransactionToStart", nTransactionsToStart)
1191 		.detail("TransactionRate", transactionRate)
1192 		.detail("TransactionQueueSize", transactionQueue.size())
1193 		.detail("NumTransactionsStarted", transactionsStarted[0] + transactionsStarted[1])
1194 		.detail("NumSystemTransactionsStarted", systemTransactionsStarted[0] + systemTransactionsStarted[1])
1195 		.detail("NumNonSystemTransactionsStarted", transactionsStarted[0] + transactionsStarted[1] - systemTransactionsStarted[0] - systemTransactionsStarted[1])
1196 		.detail("TransactionBudget", transactionBudget)
1197 		.detail("BatchTransactionBudget", batchTransactionBudget);*/
1198 
1199 		transactionCount += transactionsStarted[0] + transactionsStarted[1];
1200 		batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
1201 
1202 		normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
1203 		batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
1204 
1205 		if (debugID.present()) {
1206 			g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
1207 		}
1208 
1209 		for (int i = 0; i < start.size(); i++) {
1210 			if (start[i].size()) {
1211 				Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
1212 				addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats));
1213 
1214 				// for now, base dynamic batching on the time for normal requests (not read_risky)
1215 				if (i == 0) {
1216 					addActor.send(timeReply(readVersionReply, replyTimes));
1217 				}
1218 			}
1219 		}
1220 	}
1221 }
1222 
readRequestServer(MasterProxyInterface proxy,ProxyCommitData * commitData)1223 ACTOR static Future<Void> readRequestServer(
1224 	MasterProxyInterface proxy,
1225 	ProxyCommitData* commitData
1226 	)
1227 {
1228 	// Implement read-only parts of the proxy interface
1229 
1230 	// We can't respond to these requests until we have valid txnStateStore
1231 	wait(commitData->validState.getFuture());
1232 
1233 	TraceEvent("ProxyReadyForReads", proxy.id());
1234 
1235 	loop {
1236 		choose{
1237 			when(GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture())) {
1238 				GetKeyServerLocationsReply rep;
1239 				if(!req.end.present()) {
1240 					auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin) : commitData->keyInfo.rangeContaining(req.begin);
1241 					vector<StorageServerInterface> ssis;
1242 					ssis.reserve(r.value().src_info.size());
1243 					for(auto& it : r.value().src_info) {
1244 						ssis.push_back(it->interf);
1245 					}
1246 					rep.results.push_back(std::make_pair(r.range(), ssis));
1247 				} else if(!req.reverse) {
1248 					int count = 0;
1249 					for(auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) {
1250 						vector<StorageServerInterface> ssis;
1251 						ssis.reserve(r.value().src_info.size());
1252 						for(auto& it : r.value().src_info) {
1253 							ssis.push_back(it->interf);
1254 						}
1255 						rep.results.push_back(std::make_pair(r.range(), ssis));
1256 						count++;
1257 					}
1258 				} else {
1259 					int count = 0;
1260 					auto r = commitData->keyInfo.rangeContainingKeyBefore(req.end.get());
1261 					while( count < req.limit && req.begin < r.end() ) {
1262 						vector<StorageServerInterface> ssis;
1263 						ssis.reserve(r.value().src_info.size());
1264 						for(auto& it : r.value().src_info) {
1265 							ssis.push_back(it->interf);
1266 						}
1267 						rep.results.push_back(std::make_pair(r.range(), ssis));
1268 						if(r == commitData->keyInfo.ranges().begin()) {
1269 							break;
1270 						}
1271 						count++;
1272 						--r;
1273 					}
1274 				}
1275 				req.reply.send(rep);
1276 			}
1277 			when(GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture())) {
1278 				if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) {
1279 					GetStorageServerRejoinInfoReply rep;
1280 					rep.version = commitData->version;
1281 					rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() );
1282 					Standalone<VectorRef<KeyValueRef>> history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get();
1283 					for(int i = history.size()-1; i >= 0; i-- ) {
1284 						rep.history.push_back(std::make_pair(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value)));
1285 					}
1286 					auto localityKey = commitData->txnStateStore->readValue(tagLocalityListKeyFor(req.dcId)).get();
1287 					if( localityKey.present() ) {
1288 						rep.newLocality = false;
1289 						int8_t locality = decodeTagLocalityListValue(localityKey.get());
1290 						if(locality != rep.tag.locality) {
1291 							uint16_t tagId = 0;
1292 							std::vector<uint16_t> usedTags;
1293 							auto tagKeys = commitData->txnStateStore->readRange(serverTagKeys).get();
1294 							for( auto& kv : tagKeys ) {
1295 								Tag t = decodeServerTagValue( kv.value );
1296 								if(t.locality == locality) {
1297 									usedTags.push_back(t.id);
1298 								}
1299 							}
1300 							auto historyKeys = commitData->txnStateStore->readRange(serverTagHistoryKeys).get();
1301 							for( auto& kv : historyKeys ) {
1302 								Tag t = decodeServerTagValue( kv.value );
1303 								if(t.locality == locality) {
1304 									usedTags.push_back(t.id);
1305 								}
1306 							}
1307 							std::sort(usedTags.begin(), usedTags.end());
1308 
1309 							int usedIdx = 0;
1310 							for(; usedTags.size() > 0 && tagId <= usedTags.end()[-1]; tagId++) {
1311 								if(tagId < usedTags[usedIdx]) {
1312 									break;
1313 								} else {
1314 									usedIdx++;
1315 								}
1316 							}
1317 							rep.newTag = Tag(locality, tagId);
1318 						}
1319 					} else {
1320 						rep.newLocality = true;
1321 						int8_t maxTagLocality = -1;
1322 						auto localityKeys = commitData->txnStateStore->readRange(tagLocalityListKeys).get();
1323 						for( auto& kv : localityKeys ) {
1324 							maxTagLocality = std::max(maxTagLocality, decodeTagLocalityListValue( kv.value ));
1325 						}
1326 						rep.newTag = Tag(maxTagLocality+1,0);
1327 					}
1328 					req.reply.send(rep);
1329 				} else {
1330 					req.reply.sendError(worker_removed());
1331 				}
1332 			}
1333 		}
1334 		wait(yield());
1335 	}
1336 }
1337 
healthMetricsRequestServer(MasterProxyInterface proxy,GetHealthMetricsReply * healthMetricsReply,GetHealthMetricsReply * detailedHealthMetricsReply)1338 ACTOR Future<Void> healthMetricsRequestServer(MasterProxyInterface proxy, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply)
1339 {
1340 	loop {
1341 		choose {
1342 			when(GetHealthMetricsRequest req =
1343 				 waitNext(proxy.getHealthMetrics.getFuture()))
1344 			{
1345 				if (req.detailed)
1346 					req.reply.send(*detailedHealthMetricsReply);
1347 				else
1348 					req.reply.send(*healthMetricsReply);
1349 			}
1350 		}
1351 	}
1352 }
1353 
monitorRemoteCommitted(ProxyCommitData * self,Reference<AsyncVar<ServerDBInfo>> db)1354 ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self, Reference<AsyncVar<ServerDBInfo>> db) {
1355 	loop {
1356 		wait(delay(0)); //allow this actor to be cancelled if we are removed after db changes.
1357 		state Optional<std::vector<OptionalInterface<TLogInterface>>> remoteLogs;
1358 		if(db->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
1359 			for(auto& logSet : db->get().logSystemConfig.tLogs) {
1360 				if(!logSet.isLocal) {
1361 					remoteLogs = logSet.tLogs;
1362 					for(auto& tLog : logSet.tLogs) {
1363 						if(!tLog.present()) {
1364 							remoteLogs = Optional<std::vector<OptionalInterface<TLogInterface>>>();
1365 							break;
1366 						}
1367 					}
1368 					break;
1369 				}
1370 			}
1371 		}
1372 
1373 		if(!remoteLogs.present()) {
1374 			wait(db->onChange());
1375 			continue;
1376 		}
1377 		self->popRemoteTxs = true;
1378 
1379 		state Future<Void> onChange = db->onChange();
1380 		loop {
1381 			state std::vector<Future<TLogQueuingMetricsReply>> replies;
1382 			for(auto &it : remoteLogs.get()) {
1383 				replies.push_back(brokenPromiseToNever( it.interf().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ) ));
1384 			}
1385 			wait( waitForAll(replies) || onChange );
1386 
1387 			if(onChange.isReady()) {
1388 				break;
1389 			}
1390 
1391 			//FIXME: use the configuration to calculate a more precise minimum recovery version.
1392 			Version minVersion = std::numeric_limits<Version>::max();
1393 			for(auto& it : replies) {
1394 				minVersion = std::min(minVersion, it.get().v);
1395 			}
1396 
1397 			while(self->txsPopVersions.size() && self->txsPopVersions.front().first <= minVersion) {
1398 				self->lastTxsPop = self->txsPopVersions.front().second;
1399 				self->logSystem->pop(self->txsPopVersions.front().second, txsTag, 0, tagLocalityRemoteLog);
1400 				self->txsPopVersions.pop_front();
1401 			}
1402 
1403 			wait( delay(SERVER_KNOBS->UPDATE_REMOTE_LOG_VERSION_INTERVAL) || onChange );
1404 			if(onChange.isReady()) {
1405 				break;
1406 			}
1407 		}
1408 	}
1409 }
1410 
masterProxyServerCore(MasterProxyInterface proxy,MasterInterface master,Reference<AsyncVar<ServerDBInfo>> db,LogEpoch epoch,Version recoveryTransactionVersion,bool firstProxy)1411 ACTOR Future<Void> masterProxyServerCore(
1412 	MasterProxyInterface proxy,
1413 	MasterInterface master,
1414 	Reference<AsyncVar<ServerDBInfo>> db,
1415 	LogEpoch epoch,
1416 	Version recoveryTransactionVersion,
1417 	bool firstProxy)
1418 {
1419 	state ProxyCommitData commitData(proxy.id(), master, proxy.getConsistentReadVersion, recoveryTransactionVersion, proxy.commit, db, firstProxy);
1420 
1421 	state Future<Sequence> sequenceFuture = (Sequence)0;
1422 	state PromiseStream< std::pair<vector<CommitTransactionRequest>, int> > batchedCommits;
1423 	state Future<Void> commitBatcherActor;
1424 	state Future<Void> lastCommitComplete = Void();
1425 
1426 	state PromiseStream<Future<Void>> addActor;
1427 	state Future<Void> onError = transformError( actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed() );
1428 	state double lastCommit = 0;
1429 	state std::set<Sequence> txnSequences;
1430 	state Sequence maxSequence = std::numeric_limits<Sequence>::max();
1431 
1432 	state GetHealthMetricsReply healthMetricsReply;
1433 	state GetHealthMetricsReply detailedHealthMetricsReply;
1434 
1435 	addActor.send( fetchVersions(&commitData) );
1436 	addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
1437 
1438 	//TraceEvent("ProxyInit1", proxy.id());
1439 
1440 	// Wait until we can load the "real" logsystem, since we don't support switching them currently
1441 	while (!(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
1442 		//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
1443 		wait(db->onChange());
1444 	}
1445 	state Future<Void> dbInfoChange = db->onChange();
1446 	//TraceEvent("ProxyInit3", proxy.id());
1447 
1448 	commitData.resolvers = db->get().resolvers;
1449 	ASSERT(commitData.resolvers.size() != 0);
1450 
1451 	auto rs = commitData.keyResolvers.modify(allKeys);
1452 	for(auto r = rs.begin(); r != rs.end(); ++r)
1453 		r->value().push_back(std::make_pair<Version,int>(0,0));
1454 
1455 	commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
1456 	commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference<AsyncVar<PeekSpecialInfo>>(), false);
1457 	commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
1458 
1459 	// ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.
1460 	// COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR is an estimate based on experiments and not an accurate one.
1461 	state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast<int64_t>((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR));
1462 	TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
1463 
1464 	addActor.send(monitorRemoteCommitted(&commitData, db));
1465 	addActor.send(transactionStarter(proxy, db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
1466 	addActor.send(readRequestServer(proxy, &commitData));
1467 	addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
1468 
1469 	// wait for txnStateStore recovery
1470 	wait(success(commitData.txnStateStore->readValue(StringRef())));
1471 
1472 	int commitBatchByteLimit =
1473 		(int)std::min<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MAX,
1474 			std::max<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MIN,
1475 				SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(db->get().client.proxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER)));
1476 
1477 	commitBatcherActor = commitBatcher(&commitData, batchedCommits, proxy.commit.getFuture(), commitBatchByteLimit, commitBatchesMemoryLimit);
1478 	loop choose{
1479 		when( wait( dbInfoChange ) ) {
1480 			dbInfoChange = db->onChange();
1481 			if(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
1482 				commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
1483 				for(auto it : commitData.tag_popped) {
1484 					commitData.logSystem->pop(it.second, it.first);
1485 				}
1486 				commitData.logSystem->pop(commitData.lastTxsPop, txsTag, 0, tagLocalityRemoteLog);
1487 			}
1488 
1489 			Optional<LatencyBandConfig> newLatencyBandConfig = db->get().latencyBandConfig;
1490 
1491 			if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
1492 				|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != commitData.latencyBandConfig.get().grvConfig))
1493 			{
1494 				TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
1495 				commitData.stats.grvLatencyBands.clearBands();
1496 				if(newLatencyBandConfig.present()) {
1497 					for(auto band : newLatencyBandConfig.get().grvConfig.bands) {
1498 						commitData.stats.grvLatencyBands.addThreshold(band);
1499 					}
1500 				}
1501 			}
1502 
1503 			if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
1504 				|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().commitConfig != commitData.latencyBandConfig.get().commitConfig))
1505 			{
1506 				TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
1507 				commitData.stats.commitLatencyBands.clearBands();
1508 				if(newLatencyBandConfig.present()) {
1509 					for(auto band : newLatencyBandConfig.get().commitConfig.bands) {
1510 						commitData.stats.commitLatencyBands.addThreshold(band);
1511 					}
1512 				}
1513 			}
1514 
1515 			commitData.latencyBandConfig = newLatencyBandConfig;
1516 		}
1517 		when(wait(onError)) {}
1518 		when(std::pair<vector<CommitTransactionRequest>, int> batchedRequests = waitNext(batchedCommits.getFuture())) {
1519 			const vector<CommitTransactionRequest> &trs = batchedRequests.first;
1520 			int batchBytes = batchedRequests.second;
1521 			//TraceEvent("MasterProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);
1522 			if (trs.size() || (db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) {
1523 				lastCommit = now();
1524 
1525 				if (trs.size() || lastCommitComplete.isReady()) {
1526 					lastCommitComplete = commitBatch(&commitData, trs, batchBytes);
1527 					addActor.send(lastCommitComplete);
1528 				}
1529 			}
1530 		}
1531 		when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
1532 			//TraceEvent("ProxyGetRCV", proxy.id());
1533 			if (req.debugID.present())
1534 				g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
1535 			GetReadVersionReply rep;
1536 			rep.locked = commitData.locked;
1537 			rep.metadataVersion = commitData.metadataVersion;
1538 			rep.version = commitData.committedVersion.get();
1539 			req.reply.send(rep);
1540 		}
1541 		when(TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {
1542 			state ReplyPromise<Void> reply = req.reply;
1543 			if(req.last) maxSequence = req.sequence + 1;
1544 			if (!txnSequences.count(req.sequence)) {
1545 				txnSequences.insert(req.sequence);
1546 
1547 				ASSERT(!commitData.validState.isSet()); // Although we may receive the CommitTransactionRequest for the recovery transaction before all of the TxnStateRequest, we will not get a resolution result from any resolver until the master has submitted its initial (sequence 0) resolution request, which it doesn't do until we have acknowledged all TxnStateRequests
1548 
1549 				for(auto& kv : req.data)
1550 					commitData.txnStateStore->set(kv, &req.arena);
1551 				commitData.txnStateStore->commit(true);
1552 
1553 				if(txnSequences.size() == maxSequence) {
1554 					state KeyRange txnKeys = allKeys;
1555 					loop {
1556 						wait(yield());
1557 						Standalone<VectorRef<KeyValueRef>> data = commitData.txnStateStore->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES).get();
1558 						if(!data.size()) break;
1559 						((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
1560 
1561 						Standalone<VectorRef<MutationRef>> mutations;
1562 						std::vector<std::pair<MapPair<Key,ServerCacheInfo>,int>> keyInfoData;
1563 						vector<UID> src, dest;
1564 						Reference<StorageInfo> storageInfo;
1565 						ServerCacheInfo info;
1566 						for(auto &kv : data) {
1567 							if( kv.key.startsWith(keyServersPrefix) ) {
1568 								KeyRef k = kv.key.removePrefix(keyServersPrefix);
1569 								if(k != allKeys.end) {
1570 									decodeKeyServersValue(kv.value, src, dest);
1571 									info.tags.clear();
1572 									info.src_info.clear();
1573 									info.dest_info.clear();
1574 									for(auto& id : src) {
1575 										auto cacheItr = commitData.storageCache.find(id);
1576 										if(cacheItr == commitData.storageCache.end()) {
1577 											storageInfo = Reference<StorageInfo>( new StorageInfo() );
1578 											storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
1579 											storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
1580 											commitData.storageCache[id] = storageInfo;
1581 										} else {
1582 											storageInfo = cacheItr->second;
1583 										}
1584 										ASSERT(storageInfo->tag != invalidTag);
1585 										info.tags.push_back( storageInfo->tag );
1586 										info.src_info.push_back( storageInfo );
1587 									}
1588 									for(auto& id : dest) {
1589 										auto cacheItr = commitData.storageCache.find(id);
1590 										if(cacheItr == commitData.storageCache.end()) {
1591 											storageInfo = Reference<StorageInfo>( new StorageInfo() );
1592 											storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
1593 											storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
1594 											commitData.storageCache[id] = storageInfo;
1595 										} else {
1596 											storageInfo = cacheItr->second;
1597 										}
1598 										ASSERT(storageInfo->tag != invalidTag);
1599 										info.tags.push_back( storageInfo->tag );
1600 										info.dest_info.push_back( storageInfo );
1601 									}
1602 									uniquify(info.tags);
1603 									keyInfoData.push_back( std::make_pair(MapPair<Key,ServerCacheInfo>(k, info), 1) );
1604 								}
1605 							} else {
1606 								mutations.push_back(mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value));
1607 							}
1608 						}
1609 
1610 						//insert keyTag data separately from metadata mutations so that we can do one bulk insert which avoids a lot of map lookups.
1611 						commitData.keyInfo.rawInsert(keyInfoData);
1612 
1613 						Arena arena;
1614 						bool confChanges;
1615 						applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference<ILogSystem>(), 0, &commitData.vecBackupKeys, &commitData.keyInfo, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.storageCache, &commitData.tag_popped, true );
1616 					}
1617 
1618 					auto lockedKey = commitData.txnStateStore->readValue(databaseLockedKey).get();
1619 					commitData.locked = lockedKey.present() && lockedKey.get().size();
1620 					commitData.metadataVersion = commitData.txnStateStore->readValue(metadataVersionKey).get();
1621 
1622 					commitData.txnStateStore->enableSnapshot();
1623 				}
1624 			}
1625 			reply.send(Void());
1626 			wait(yield());
1627 		}
1628 	}
1629 }
1630 
checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,uint64_t recoveryCount,MasterProxyInterface myInterface)1631 ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, uint64_t recoveryCount, MasterProxyInterface myInterface) {
1632 	loop{
1633 		if (db->get().recoveryCount >= recoveryCount && !std::count(db->get().client.proxies.begin(), db->get().client.proxies.end(), myInterface))
1634 		throw worker_removed();
1635 		wait(db->onChange());
1636 	}
1637 }
1638 
masterProxyServer(MasterProxyInterface proxy,InitializeMasterProxyRequest req,Reference<AsyncVar<ServerDBInfo>> db)1639 ACTOR Future<Void> masterProxyServer(
1640 	MasterProxyInterface proxy,
1641 	InitializeMasterProxyRequest req,
1642 	Reference<AsyncVar<ServerDBInfo>> db)
1643 {
1644 	try {
1645 		state Future<Void> core = masterProxyServerCore(proxy, req.master, db, req.recoveryCount, req.recoveryTransactionVersion, req.firstProxy);
1646 		loop choose{
1647 			when(wait(core)) { return Void(); }
1648 			when(wait(checkRemoved(db, req.recoveryCount, proxy))) {}
1649 		}
1650 	}
1651 	catch (Error& e) {
1652 		if (e.code() == error_code_actor_cancelled || e.code() == error_code_worker_removed || e.code() == error_code_tlog_stopped ||
1653 			e.code() == error_code_master_tlog_failed || e.code() == error_code_coordinators_changed || e.code() == error_code_coordinated_state_conflict ||
1654 			e.code() == error_code_new_coordinators_timed_out)
1655 		{
1656 			TraceEvent("MasterProxyTerminated", proxy.id()).error(e, true);
1657 			return Void();
1658 		}
1659 		throw;
1660 	}
1661 }
1662