1 /*
2 * MasterProxyServer.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "flow/ActorCollection.h"
22 #include "fdbclient/MasterProxyInterface.h"
23 #include "fdbclient/NativeAPI.actor.h"
24 #include "fdbserver/MasterInterface.h"
25 #include "fdbserver/WorkerInterface.actor.h"
26 #include "fdbserver/WaitFailure.h"
27 #include "fdbserver/Knobs.h"
28 #include "fdbserver/ServerDBInfo.h"
29 #include "fdbserver/LogSystem.h"
30 #include "fdbserver/LogSystemDiskQueueAdapter.h"
31 #include "fdbserver/IKeyValueStore.h"
32 #include "fdbclient/SystemData.h"
33 #include "fdbrpc/sim_validation.h"
34 #include "fdbclient/Notified.h"
35 #include "fdbclient/KeyRangeMap.h"
36 #include "fdbserver/ConflictSet.h"
37 #include "flow/Stats.h"
38 #include "fdbserver/ApplyMetadataMutation.h"
39 #include "fdbserver/RecoveryState.h"
40 #include "fdbserver/LatencyBandConfig.h"
41 #include "fdbclient/Atomic.h"
42 #include "flow/TDMetric.actor.h"
43 #include "flow/actorcompiler.h" // This must be the last #include.
44
45 struct ProxyStats {
46 CounterCollection cc;
47 Counter txnStartIn, txnStartOut, txnStartBatch;
48 Counter txnSystemPriorityStartIn, txnSystemPriorityStartOut;
49 Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut;
50 Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
51 Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess;
52 Counter txnConflicts;
53 Counter commitBatchIn, commitBatchOut;
54 Counter mutationBytes;
55 Counter mutations;
56 Counter conflictRanges;
57 Version lastCommitVersionAssigned;
58
59 LatencyBands commitLatencyBands;
60 LatencyBands grvLatencyBands;
61
62 Future<Void> logger;
63
ProxyStatsProxyStats64 explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
65 : cc("ProxyStats", id.toString()),
66 txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
67 txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc), txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
68 txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), lastCommitVersionAssigned(0),
69 commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
70 {
71 specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
72 specialCounter(cc, "Version", [pVersion](){return *pVersion; });
73 specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
74 specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
75 logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
76 }
77 };
78
getRate(UID myID,Reference<AsyncVar<ServerDBInfo>> db,int64_t * inTransactionCount,int64_t * inBatchTransactionCount,double * outTransactionRate,double * outBatchTransactionRate,GetHealthMetricsReply * healthMetricsReply,GetHealthMetricsReply * detailedHealthMetricsReply)79 ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
80 double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
81 state Future<Void> nextRequestTimer = Never();
82 state Future<Void> leaseTimeout = Never();
83 state Future<GetRateInfoReply> reply = Never();
84 state double lastDetailedReply = 0.0; // request detailed metrics immediately
85 state bool expectingDetailedReply = false;
86 state int64_t lastTC = 0;
87
88 if (db->get().ratekeeper.present()) nextRequestTimer = Void();
89 loop choose {
90 when ( wait( db->onChange() ) ) {
91 if ( db->get().ratekeeper.present() ) {
92 TraceEvent("Proxy_RatekeeperChanged", myID)
93 .detail("RKID", db->get().ratekeeper.get().id());
94 nextRequestTimer = Void(); // trigger GetRate request
95 } else {
96 TraceEvent("Proxy_RatekeeperDied", myID);
97 nextRequestTimer = Never();
98 reply = Never();
99 }
100 }
101 when ( wait( nextRequestTimer ) ) {
102 nextRequestTimer = Never();
103 bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
104 reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed)));
105 expectingDetailedReply = detailed;
106 }
107 when ( GetRateInfoReply rep = wait(reply) ) {
108 reply = Never();
109 *outTransactionRate = rep.transactionRate;
110 *outBatchTransactionRate = rep.batchTransactionRate;
111 //TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
112 lastTC = *inTransactionCount;
113 leaseTimeout = delay(rep.leaseDuration);
114 nextRequestTimer = delayJittered(rep.leaseDuration / 2);
115 healthMetricsReply->update(rep.healthMetrics, expectingDetailedReply, true);
116 if (expectingDetailedReply) {
117 detailedHealthMetricsReply->update(rep.healthMetrics, true, true);
118 lastDetailedReply = now();
119 }
120 }
121 when ( wait( leaseTimeout ) ) {
122 *outTransactionRate = 0;
123 *outBatchTransactionRate = 0;
124 //TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("BatchRate", 0).detail("Lease", "Expired");
125 leaseTimeout = Never();
126 }
127 }
128 }
129
queueTransactionStartRequests(std::priority_queue<std::pair<GetReadVersionRequest,int64_t>,std::vector<std::pair<GetReadVersionRequest,int64_t>>> * transactionQueue,FutureStream<GetReadVersionRequest> readVersionRequests,PromiseStream<Void> GRVTimer,double * lastGRVTime,double * GRVBatchTime,FutureStream<double> replyTimes,ProxyStats * stats)130 ACTOR Future<Void> queueTransactionStartRequests(
131 std::priority_queue< std::pair<GetReadVersionRequest, int64_t>, std::vector< std::pair<GetReadVersionRequest, int64_t> > > *transactionQueue,
132 FutureStream<GetReadVersionRequest> readVersionRequests,
133 PromiseStream<Void> GRVTimer, double *lastGRVTime,
134 double *GRVBatchTime, FutureStream<double> replyTimes,
135 ProxyStats* stats)
136 {
137 state int64_t counter = 0;
138 loop choose{
139 when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
140 if (req.debugID.present())
141 g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
142
143 stats->txnStartIn += req.transactionCount;
144 if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE)
145 stats->txnSystemPriorityStartIn += req.transactionCount;
146 else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT)
147 stats->txnDefaultPriorityStartIn += req.transactionCount;
148 else
149 stats->txnBatchPriorityStartIn += req.transactionCount;
150
151 if (transactionQueue->empty()) {
152 if (now() - *lastGRVTime > *GRVBatchTime)
153 *lastGRVTime = now() - *GRVBatchTime;
154
155 forwardPromise(GRVTimer, delayJittered(*GRVBatchTime - (now() - *lastGRVTime), TaskProxyGRVTimer));
156 }
157
158 transactionQueue->push(std::make_pair(req, counter--));
159 }
160 // dynamic batching monitors reply latencies
161 when(double reply_latency = waitNext(replyTimes)) {
162 double target_latency = reply_latency * SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION;
163 *GRVBatchTime =
164 std::max(SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN,
165 std::min(SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MAX,
166 target_latency * SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA + *GRVBatchTime * (1-SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA)));
167 }
168 }
169 }
170
discardCommit(UID id,Future<LogSystemDiskQueueAdapter::CommitMessage> fcm,Future<Void> dummyCommitState)171 ACTOR void discardCommit(UID id, Future<LogSystemDiskQueueAdapter::CommitMessage> fcm, Future<Void> dummyCommitState) {
172 ASSERT(!dummyCommitState.isReady());
173 LogSystemDiskQueueAdapter::CommitMessage cm = wait(fcm);
174 TraceEvent("Discarding", id).detail("Count", cm.messages.size());
175 cm.acknowledge.send(Void());
176 ASSERT(dummyCommitState.isReady());
177 }
178
179 DESCR struct SingleKeyMutation {
180 Standalone<StringRef> shardBegin;
181 Standalone<StringRef> shardEnd;
182 int64_t tag1;
183 int64_t tag2;
184 int64_t tag3;
185 };
186
187 struct ProxyCommitData {
188 UID dbgid;
189 int64_t commitBatchesMemBytesCount;
190 ProxyStats stats;
191 MasterInterface master;
192 vector<ResolverInterface> resolvers;
193 LogSystemDiskQueueAdapter* logAdapter;
194 Reference<ILogSystem> logSystem;
195 IKeyValueStore* txnStateStore;
196 NotifiedVersion committedVersion; // Provided that this recovery has succeeded or will succeed, this version is fully committed (durable)
197 Version minKnownCommittedVersion; // No version smaller than this one will be used as the known committed version during recovery
198 Version version; // The version at which txnStateStore is up to date
199 Promise<Void> validState; // Set once txnStateStore and version are valid
200 double lastVersionTime;
201 KeyRangeMap<std::set<Key>> vecBackupKeys;
202 uint64_t commitVersionRequestNumber;
203 uint64_t mostRecentProcessedRequestNumber;
204 KeyRangeMap<Deque<std::pair<Version,int>>> keyResolvers;
205 KeyRangeMap<ServerCacheInfo> keyInfo;
206 std::map<Key, applyMutationsData> uid_applyMutationsData;
207 bool firstProxy;
208 double lastCoalesceTime;
209 bool locked;
210 Optional<Value> metadataVersion;
211 double commitBatchInterval;
212
213 int64_t localCommitBatchesStarted;
214 NotifiedVersion latestLocalCommitBatchResolving;
215 NotifiedVersion latestLocalCommitBatchLogging;
216
217 PromiseStream<Void> commitBatchStartNotifications;
218 PromiseStream<Future<GetCommitVersionReply>> commitBatchVersions; // 1:1 with commitBatchStartNotifications
219 RequestStream<GetReadVersionRequest> getConsistentReadVersion;
220 RequestStream<CommitTransactionRequest> commit;
221 Database cx;
222 EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
223
224 std::map<UID, Reference<StorageInfo>> storageCache;
225 std::map<Tag, Version> tag_popped;
226 Deque<std::pair<Version, Version>> txsPopVersions;
227 Version lastTxsPop;
228 bool popRemoteTxs;
229
230 Optional<LatencyBandConfig> latencyBandConfig;
231
232 //The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
233 //When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
234 //We do not repopulate them immediately to avoid a slow task.
tagsForKeyProxyCommitData235 const vector<Tag>& tagsForKey(StringRef key) {
236 auto& tags = keyInfo[key].tags;
237 if(!tags.size()) {
238 auto& r = keyInfo.rangeContaining(key).value();
239 for(auto info : r.src_info) {
240 r.tags.push_back(info->tag);
241 }
242 for(auto info : r.dest_info) {
243 r.tags.push_back(info->tag);
244 }
245 uniquify(r.tags);
246 return r.tags;
247 }
248 return tags;
249 }
250
ProxyCommitDataProxyCommitData251 ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit, Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
252 : dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
253 logAdapter(NULL), txnStateStore(NULL), popRemoteTxs(false),
254 committedVersion(recoveryTransactionVersion), version(0), minKnownCommittedVersion(0),
255 lastVersionTime(0), commitVersionRequestNumber(1), mostRecentProcessedRequestNumber(0),
256 getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0),
257 localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
258 firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)),
259 singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0)
260 {}
261 };
262
263 struct ResolutionRequestBuilder {
264 ProxyCommitData* self;
265 vector<ResolveTransactionBatchRequest> requests;
266 vector<vector<int>> transactionResolverMap;
267 vector<CommitTransactionRef*> outTr;
268
ResolutionRequestBuilderResolutionRequestBuilder269 ResolutionRequestBuilder( ProxyCommitData* self, Version version, Version prevVersion, Version lastReceivedVersion) : self(self), requests(self->resolvers.size()) {
270 for(auto& req : requests) {
271 req.prevVersion = prevVersion;
272 req.version = version;
273 req.lastReceivedVersion = lastReceivedVersion;
274 }
275 }
276
getOutTransactionResolutionRequestBuilder277 CommitTransactionRef& getOutTransaction(int resolver, Version read_snapshot) {
278 CommitTransactionRef *& out = outTr[resolver];
279 if (!out) {
280 ResolveTransactionBatchRequest& request = requests[resolver];
281 request.transactions.resize(request.arena, request.transactions.size() + 1);
282 out = &request.transactions.back();
283 out->read_snapshot = read_snapshot;
284 }
285 return *out;
286 }
287
addTransactionResolutionRequestBuilder288 void addTransaction(CommitTransactionRef& trIn, int transactionNumberInBatch) {
289 // SOMEDAY: There are a couple of unnecessary O( # resolvers ) steps here
290 outTr.assign(requests.size(), NULL);
291 ASSERT( transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768 );
292
293 bool isTXNStateTransaction = false;
294 for (auto & m : trIn.mutations) {
295 if (m.type == MutationRef::SetVersionstampedKey) {
296 transformVersionstampMutation( m, &MutationRef::param1, requests[0].version, transactionNumberInBatch );
297 trIn.write_conflict_ranges.push_back( requests[0].arena, singleKeyRange( m.param1, requests[0].arena ) );
298 } else if (m.type == MutationRef::SetVersionstampedValue) {
299 transformVersionstampMutation( m, &MutationRef::param2, requests[0].version, transactionNumberInBatch );
300 }
301 if (isMetadataMutation(m)) {
302 isTXNStateTransaction = true;
303 getOutTransaction(0, trIn.read_snapshot).mutations.push_back(requests[0].arena, m);
304 }
305 }
306 for(auto& r : trIn.read_conflict_ranges) {
307 auto ranges = self->keyResolvers.intersectingRanges( r );
308 std::set<int> resolvers;
309 for(auto &ir : ranges) {
310 auto& version_resolver = ir.value();
311 for(int i = version_resolver.size()-1; i >= 0; i--) {
312 resolvers.insert(version_resolver[i].second);
313 if( version_resolver[i].first < trIn.read_snapshot )
314 break;
315 }
316 }
317 ASSERT(resolvers.size());
318 for(int resolver : resolvers)
319 getOutTransaction( resolver, trIn.read_snapshot ).read_conflict_ranges.push_back( requests[resolver].arena, r );
320 }
321 for(auto& r : trIn.write_conflict_ranges) {
322 auto ranges = self->keyResolvers.intersectingRanges( r );
323 std::set<int> resolvers;
324 for(auto &ir : ranges)
325 resolvers.insert(ir.value().back().second);
326 ASSERT(resolvers.size());
327 for(int resolver : resolvers)
328 getOutTransaction( resolver, trIn.read_snapshot ).write_conflict_ranges.push_back( requests[resolver].arena, r );
329 }
330 if (isTXNStateTransaction)
331 for (int r = 0; r<requests.size(); r++) {
332 int transactionNumberInRequest = &getOutTransaction(r, trIn.read_snapshot) - requests[r].transactions.begin();
333 requests[r].txnStateTransactions.push_back(requests[r].arena, transactionNumberInRequest);
334 }
335
336 vector<int> resolversUsed;
337 for (int r = 0; r<outTr.size(); r++)
338 if (outTr[r])
339 resolversUsed.push_back(r);
340 transactionResolverMap.push_back(std::move(resolversUsed));
341 }
342 };
343
commitBatcher(ProxyCommitData * commitData,PromiseStream<std::pair<std::vector<CommitTransactionRequest>,int>> out,FutureStream<CommitTransactionRequest> in,int desiredBytes,int64_t memBytesLimit)344 ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std::pair<std::vector<CommitTransactionRequest>, int> > out, FutureStream<CommitTransactionRequest> in, int desiredBytes, int64_t memBytesLimit) {
345 wait(delayJittered(commitData->commitBatchInterval, TaskProxyCommitBatcher));
346
347 state double lastBatch = 0;
348
349 loop{
350 state Future<Void> timeout;
351 state std::vector<CommitTransactionRequest> batch;
352 state int batchBytes = 0;
353
354 if(SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL <= 0) {
355 timeout = Never();
356 }
357 else {
358 timeout = delayJittered(SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL, TaskProxyCommitBatcher);
359 }
360
361 while(!timeout.isReady() && !(batch.size() == SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX || batchBytes >= desiredBytes)) {
362 choose{
363 when(CommitTransactionRequest req = waitNext(in)) {
364 int bytes = getBytes(req);
365
366 // Drop requests if memory is under severe pressure
367 if(commitData->commitBatchesMemBytesCount + bytes > memBytesLimit) {
368 req.reply.sendError(proxy_memory_limit_exceeded());
369 TraceEvent(SevWarnAlways, "ProxyCommitBatchMemoryThresholdExceeded").suppressFor(60).detail("MemBytesCount", commitData->commitBatchesMemBytesCount).detail("MemLimit", memBytesLimit);
370 continue;
371 }
372
373 ++commitData->stats.txnCommitIn;
374
375 if(req.debugID.present()) {
376 g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "MasterProxyServer.batcher");
377 }
378
379 if(!batch.size()) {
380 commitData->commitBatchStartNotifications.send(Void());
381 if(now() - lastBatch > commitData->commitBatchInterval) {
382 timeout = delayJittered(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, TaskProxyCommitBatcher);
383 }
384 else {
385 timeout = delayJittered(commitData->commitBatchInterval - (now() - lastBatch), TaskProxyCommitBatcher);
386 }
387 }
388
389 if((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) {
390 out.send({ batch, batchBytes });
391 lastBatch = now();
392 commitData->commitBatchStartNotifications.send(Void());
393 timeout = delayJittered(commitData->commitBatchInterval, TaskProxyCommitBatcher);
394 batch = std::vector<CommitTransactionRequest>();
395 batchBytes = 0;
396 }
397
398 batch.push_back(req);
399 batchBytes += bytes;
400 commitData->commitBatchesMemBytesCount += bytes;
401 }
402 when(wait(timeout)) {}
403 }
404 }
405 out.send({ std::move(batch), batchBytes });
406 lastBatch = now();
407 }
408 }
409
commitBatch(ProxyCommitData * self,vector<CommitTransactionRequest> trs,int currentBatchMemBytesCount)410 ACTOR Future<Void> commitBatch(
411 ProxyCommitData* self,
412 vector<CommitTransactionRequest> trs,
413 int currentBatchMemBytesCount)
414 {
415 state int64_t localBatchNumber = ++self->localCommitBatchesStarted;
416 state LogPushData toCommit(self->logSystem);
417 state double t1 = now();
418 state Optional<UID> debugID;
419 state bool forceRecovery = false;
420
421 ASSERT(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS <= SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT); // since we are using just the former to limit the number of versions actually in flight!
422
423 // Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit batches) so we need to downgrade here
424 wait(delay(0, TaskProxyCommit));
425
426 self->lastVersionTime = t1;
427
428 ++self->stats.commitBatchIn;
429
430 for (int t = 0; t<trs.size(); t++) {
431 if (trs[t].debugID.present()) {
432 if (!debugID.present())
433 debugID = g_nondeterministic_random->randomUniqueID();
434 g_traceBatch.addAttach("CommitAttachID", trs[t].debugID.get().first(), debugID.get().first());
435 }
436 }
437
438 if(localBatchNumber == 2 && !debugID.present() && self->firstProxy && !g_network->isSimulated()) {
439 debugID = g_random->randomUniqueID();
440 TraceEvent("SecondCommitBatch", self->dbgid).detail("DebugID", debugID.get());
441 }
442
443 if (debugID.present())
444 g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before");
445
446 if (trs.empty()) {
447 // We are sending an empty batch, so we have to trigger the version fetcher
448 self->commitBatchStartNotifications.send(Void());
449 }
450
451 /////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
452 TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
453 wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
454 wait(yield());
455
456 if (debugID.present())
457 g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
458
459 Future<GetCommitVersionReply> fVersionReply = waitNext(self->commitBatchVersions.getFuture());
460 GetCommitVersionReply versionReply = wait(fVersionReply);
461 self->mostRecentProcessedRequestNumber = versionReply.requestNum;
462
463 self->stats.txnCommitVersionAssigned += trs.size();
464 self->stats.lastCommitVersionAssigned = versionReply.version;
465
466 state Version commitVersion = versionReply.version;
467 state Version prevVersion = versionReply.prevVersion;
468
469 for(auto it : versionReply.resolverChanges) {
470 auto rs = self->keyResolvers.modify(it.range);
471 for(auto r = rs.begin(); r != rs.end(); ++r)
472 r->value().push_back(std::make_pair(versionReply.resolverChangesVersion,it.dest));
473 }
474
475 //TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion);
476
477 if (debugID.present())
478 g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
479
480 ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
481 int conflictRangeCount = 0;
482 state int64_t maxTransactionBytes = 0;
483 for (int t = 0; t<trs.size(); t++) {
484 requests.addTransaction(trs[t].transaction, t);
485 conflictRangeCount += trs[t].transaction.read_conflict_ranges.size() + trs[t].transaction.write_conflict_ranges.size();
486 //TraceEvent("MPTransactionDump", self->dbgid).detail("Snapshot", trs[t].transaction.read_snapshot);
487 //for(auto& m : trs[t].transaction.mutations)
488 maxTransactionBytes = std::max<int64_t>(maxTransactionBytes, trs[t].transaction.expectedSize());
489 // TraceEvent("MPTransactionsDump", self->dbgid).detail("Mutation", m.toString());
490 }
491 self->stats.conflictRanges += conflictRangeCount;
492
493 for (int r = 1; r<self->resolvers.size(); r++)
494 ASSERT(requests.requests[r].txnStateTransactions.size() == requests.requests[0].txnStateTransactions.size());
495
496 // Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with resolution processing but is still using CPU
497 self->stats.txnCommitResolving += trs.size();
498 vector< Future<ResolveTransactionBatchReply> > replies;
499 for (int r = 0; r<self->resolvers.size(); r++) {
500 requests.requests[r].debugID = debugID;
501 replies.push_back(brokenPromiseToNever(self->resolvers[r].resolve.getReply(requests.requests[r], TaskProxyResolverReply)));
502 }
503
504 state vector<vector<int>> transactionResolverMap = std::move( requests.transactionResolverMap );
505
506 ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber-1);
507 self->latestLocalCommitBatchResolving.set(localBatchNumber);
508
509 /////// Phase 2: Resolution (waiting on the network; pipelined)
510 state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );
511
512 if (debugID.present())
513 g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterResolution");
514
515 ////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be)
516 TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber-1); // Queuing post-resolution commit processing
517 wait(self->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber-1));
518 wait(yield());
519
520 self->stats.txnCommitResolved += trs.size();
521
522 if (debugID.present())
523 g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.ProcessingMutations");
524
525 state Arena arena;
526 state bool isMyFirstBatch = !self->version;
527 state Optional<Value> oldCoordinators = self->txnStateStore->readValue(coordinatorsKey).get();
528
529 //TraceEvent("ResolutionResult", self->dbgid).detail("Sequence", sequence).detail("Version", commitVersion).detail("StateMutationProxies", resolution[0].stateMutations.size()).detail("WaitForResolution", now()-t1).detail("R0Committed", resolution[0].committed.size())
530 // .detail("Transactions", trs.size());
531
532 for(int r=1; r<resolution.size(); r++) {
533 ASSERT( resolution[r].stateMutations.size() == resolution[0].stateMutations.size() );
534 for(int s=0; s<resolution[r].stateMutations.size(); s++)
535 ASSERT( resolution[r].stateMutations[s].size() == resolution[0].stateMutations[s].size() );
536 }
537
538 // Compute and apply "metadata" effects of each other proxy's most recent batch
539 bool initialState = isMyFirstBatch;
540 state bool firstStateMutations = isMyFirstBatch;
541 state vector< std::pair<Future<LogSystemDiskQueueAdapter::CommitMessage>, Future<Void>> > storeCommits;
542 for (int versionIndex = 0; versionIndex < resolution[0].stateMutations.size(); versionIndex++) {
543 // self->logAdapter->setNextVersion( ??? ); << Ideally we would be telling the log adapter that the pushes in this commit will be in the version at which these state mutations were committed by another proxy, but at present we don't have that information here. So the disk queue may be unnecessarily conservative about popping.
544
545 for (int transactionIndex = 0; transactionIndex < resolution[0].stateMutations[versionIndex].size() && !forceRecovery; transactionIndex++) {
546 bool committed = true;
547 for (int resolver = 0; resolver < resolution.size(); resolver++)
548 committed = committed && resolution[resolver].stateMutations[versionIndex][transactionIndex].committed;
549 if (committed)
550 applyMetadataMutations( self->dbgid, arena, resolution[0].stateMutations[versionIndex][transactionIndex].mutations, self->txnStateStore, NULL, &forceRecovery, self->logSystem, 0, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache, &self->tag_popped);
551
552 if( resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() && firstStateMutations ) {
553 ASSERT(committed);
554 firstStateMutations = false;
555 forceRecovery = false;
556 }
557 //TraceEvent("MetadataTransaction", self->dbgid).detail("Committed", committed).detail("Mutations", resolution[0].stateMutations[versionIndex][transactionIndex].second.size()).detail("R1Mutations", resolution.back().stateMutations[versionIndex][transactionIndex].second.size());
558 }
559 //TraceEvent("MetadataBatch", self->dbgid).detail("Transactions", resolution[0].stateMutations[versionIndex].size());
560
561 // These changes to txnStateStore will be committed by the other proxy, so we simply discard the commit message
562 auto fcm = self->logAdapter->getCommitMessage();
563 storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit()));
564 //discardCommit( dbgid, fcm, txnStateStore->commit() );
565
566 if (initialState) {
567 //TraceEvent("ResyncLog", dbgid);
568 initialState = false;
569 forceRecovery = false;
570 self->txnStateStore->resyncLog();
571
572 for (auto &p : storeCommits) {
573 ASSERT(!p.second.isReady());
574 p.first.get().acknowledge.send(Void());
575 ASSERT(p.second.isReady());
576 }
577 storeCommits.clear();
578 }
579 }
580
581 // Determine which transactions actually committed (conservatively) by combining results from the resolvers
582 state vector<uint8_t> committed(trs.size());
583 ASSERT(transactionResolverMap.size() == committed.size());
584 vector<int> nextTr(resolution.size());
585 for (int t = 0; t<trs.size(); t++) {
586 uint8_t commit = ConflictBatch::TransactionCommitted;
587 for (int r : transactionResolverMap[t])
588 {
589 commit = std::min(resolution[r].committed[nextTr[r]++], commit);
590 }
591 committed[t] = commit;
592 }
593 for (int r = 0; r<resolution.size(); r++)
594 ASSERT(nextTr[r] == resolution[r].committed.size());
595
596 self->logAdapter->setNextVersion(commitVersion);
597
598 state Optional<Key> lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
599 state bool locked = lockedKey.present() && lockedKey.get().size();
600
601 state Optional<Key> mustContainSystemKey = self->txnStateStore->readValue(mustContainSystemMutationsKey).get();
602 if(mustContainSystemKey.present() && mustContainSystemKey.get().size()) {
603 for (int t = 0; t<trs.size(); t++) {
604 if( committed[t] == ConflictBatch::TransactionCommitted ) {
605 bool foundSystem = false;
606 for(auto& m : trs[t].transaction.mutations) {
607 if( ( m.type == MutationRef::ClearRange ? m.param2 : m.param1 ) >= nonMetadataSystemKeys.end) {
608 foundSystem = true;
609 break;
610 }
611 }
612 if(!foundSystem) {
613 committed[t] = ConflictBatch::TransactionConflict;
614 }
615 }
616 }
617 }
618
619 if(forceRecovery) {
620 wait( Future<Void>(Never()) );
621 }
622
623 // This first pass through committed transactions deals with "metadata" effects (modifications of txnStateStore, changes to storage servers' responsibilities)
624 int t;
625 state int commitCount = 0;
626 for (t = 0; t < trs.size() && !forceRecovery; t++)
627 {
628 if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
629 commitCount++;
630 applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache, &self->tag_popped);
631 }
632 if(firstStateMutations) {
633 ASSERT(committed[t] == ConflictBatch::TransactionCommitted);
634 firstStateMutations = false;
635 forceRecovery = false;
636 }
637 }
638 if (forceRecovery) {
639 for (; t<trs.size(); t++)
640 committed[t] = ConflictBatch::TransactionConflict;
641 TraceEvent(SevWarn, "RestartingTxnSubsystem", self->dbgid).detail("Stage", "AwaitCommit");
642 }
643
644 lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
645 state bool lockedAfter = lockedKey.present() && lockedKey.get().size();
646
647 state Optional<Value> metadataVersionAfter = self->txnStateStore->readValue(metadataVersionKey).get();
648
649 auto fcm = self->logAdapter->getCommitMessage();
650 storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit()));
651 self->version = commitVersion;
652 if (!self->validState.isSet()) self->validState.send(Void());
653 ASSERT(commitVersion);
654
655 if (!isMyFirstBatch && self->txnStateStore->readValue( coordinatorsKey ).get().get() != oldCoordinators.get()) {
656 wait( brokenPromiseToNever( self->master.changeCoordinators.getReply( ChangeCoordinatorsRequest( self->txnStateStore->readValue( coordinatorsKey ).get().get() ) ) ) );
657 ASSERT(false); // ChangeCoordinatorsRequest should always throw
658 }
659
660 // This second pass through committed transactions assigns the actual mutations to the appropriate storage servers' tags
661 state int mutationCount = 0;
662 state int mutationBytes = 0;
663
664 state std::map<Key, MutationListRef> logRangeMutations;
665 state Arena logRangeMutationsArena;
666 state uint32_t v = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
667 state int transactionNum = 0;
668 state int yieldBytes = 0;
669
670 for (; transactionNum<trs.size(); transactionNum++) {
671 if (committed[transactionNum] == ConflictBatch::TransactionCommitted && (!locked || trs[transactionNum].isLockAware())) {
672 state int mutationNum = 0;
673 state VectorRef<MutationRef>* pMutations = &trs[transactionNum].transaction.mutations;
674 for (; mutationNum < pMutations->size(); mutationNum++) {
675 if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
676 yieldBytes = 0;
677 wait(yield());
678 }
679
680 auto& m = (*pMutations)[mutationNum];
681 mutationCount++;
682 mutationBytes += m.expectedSize();
683 yieldBytes += m.expectedSize();
684 // Determine the set of tags (responsible storage servers) for the mutation, splitting it
685 // if necessary. Serialize (splits of) the mutation into the message buffer and add the tags.
686
687 if (isSingleKeyMutation((MutationRef::Type) m.type)) {
688 auto& tags = self->tagsForKey(m.param1);
689
690 if(self->singleKeyMutationEvent->enabled) {
691 KeyRangeRef shard = self->keyInfo.rangeContaining(m.param1).range();
692 self->singleKeyMutationEvent->tag1 = (int64_t)tags[0].id;
693 self->singleKeyMutationEvent->tag2 = (int64_t)tags[1].id;
694 self->singleKeyMutationEvent->tag3 = (int64_t)tags[2].id;
695 self->singleKeyMutationEvent->shardBegin = shard.begin;
696 self->singleKeyMutationEvent->shardEnd = shard.end;
697 self->singleKeyMutationEvent->log();
698 }
699
700 if (debugMutation("ProxyCommit", commitVersion, m))
701 TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
702 for (auto& tag : tags)
703 toCommit.addTag(tag);
704 toCommit.addTypedMessage(m);
705 }
706 else if (m.type == MutationRef::ClearRange) {
707 auto ranges = self->keyInfo.intersectingRanges(KeyRangeRef(m.param1, m.param2));
708 auto firstRange = ranges.begin();
709 ++firstRange;
710 if (firstRange == ranges.end()) {
711 // Fast path
712 if (debugMutation("ProxyCommit", commitVersion, m))
713 TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
714
715 auto& tags = ranges.begin().value().tags;
716 if(!tags.size()) {
717 for( auto info : ranges.begin().value().src_info ) {
718 tags.push_back( info->tag );
719 }
720 for( auto info : ranges.begin().value().dest_info ) {
721 tags.push_back( info->tag );
722 }
723 uniquify(tags);
724 }
725
726 for (auto& tag : tags)
727 toCommit.addTag(tag);
728 }
729 else {
730 TEST(true); //A clear range extends past a shard boundary
731 std::set<Tag> allSources;
732 for (auto r : ranges) {
733 auto& tags = r.value().tags;
734 if(!tags.size()) {
735 for( auto info : r.value().src_info ) {
736 tags.push_back(info->tag);
737 }
738 for( auto info : r.value().dest_info ) {
739 tags.push_back(info->tag);
740 }
741 uniquify(tags);
742 }
743 allSources.insert(tags.begin(), tags.end());
744 }
745 if (debugMutation("ProxyCommit", commitVersion, m))
746 TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion);
747 for (auto& tag : allSources)
748 toCommit.addTag(tag);
749 }
750 toCommit.addTypedMessage(m);
751 }
752 else
753 UNREACHABLE();
754
755
756
757 // Check on backing up key, if backup ranges are defined and a normal key
758 if (self->vecBackupKeys.size() > 1 && (normalKeys.contains(m.param1) || m.param1 == metadataVersionKey)) {
759 if (m.type != MutationRef::Type::ClearRange) {
760 // Add the mutation to the relevant backup tag
761 for (auto backupName : self->vecBackupKeys[m.param1]) {
762 logRangeMutations[backupName].push_back_deep(logRangeMutationsArena, m);
763 }
764 }
765 else {
766 KeyRangeRef mutationRange(m.param1, m.param2);
767 KeyRangeRef intersectionRange;
768
769 // Identify and add the intersecting ranges of the mutation to the array of mutations to serialize
770 for (auto backupRange : self->vecBackupKeys.intersectingRanges(mutationRange))
771 {
772 // Get the backup sub range
773 const auto& backupSubrange = backupRange.range();
774
775 // Determine the intersecting range
776 intersectionRange = mutationRange & backupSubrange;
777
778 // Create the custom mutation for the specific backup tag
779 MutationRef backupMutation(MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
780
781 // Add the mutation to the relevant backup tag
782 for (auto backupName : backupRange.value()) {
783 logRangeMutations[backupName].push_back_deep(logRangeMutationsArena, backupMutation);
784 }
785 }
786 }
787 }
788 }
789 }
790 }
791
792 // Serialize and backup the mutations as a single mutation
793 if ((self->vecBackupKeys.size() > 1) && logRangeMutations.size()) {
794
795 Key val;
796 MutationRef backupMutation;
797 uint32_t* partBuffer = NULL;
798
799 // Serialize the log range mutations within the map
800 for (auto& logRangeMutation : logRangeMutations)
801 {
802 BinaryWriter wr(Unversioned());
803
804 // Serialize the log destination
805 wr.serializeBytes( logRangeMutation.first );
806
807 // Write the log keys and version information
808 wr << (uint8_t)hashlittle(&v, sizeof(v), 0);
809 wr << bigEndian64(commitVersion);
810
811 backupMutation.type = MutationRef::SetValue;
812 partBuffer = NULL;
813
814 val = BinaryWriter::toValue(logRangeMutation.second, IncludeVersion());
815
816 for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < val.size(); part++) {
817
818 // Assign the second parameter as the part
819 backupMutation.param2 = val.substr(part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE,
820 std::min(val.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, CLIENT_KNOBS->MUTATION_BLOCK_SIZE));
821
822 // Write the last part of the mutation to the serialization, if the buffer is not defined
823 if (!partBuffer) {
824 // Serialize the part to the writer
825 wr << bigEndian32(part);
826
827 // Define the last buffer part
828 partBuffer = (uint32_t*) ((char*) wr.getData() + wr.getLength() - sizeof(uint32_t));
829 }
830 else {
831 *partBuffer = bigEndian32(part);
832 }
833
834 // Define the mutation type and and location
835 backupMutation.param1 = wr.toValue();
836 ASSERT( backupMutation.param1.startsWith(logRangeMutation.first) ); // We are writing into the configured destination
837
838 auto& tags = self->tagsForKey(backupMutation.param1);
839 for (auto& tag : tags)
840 toCommit.addTag(tag);
841 toCommit.addTypedMessage(backupMutation);
842
843 // if (debugMutation("BackupProxyCommit", commitVersion, backupMutation)) {
844 // TraceEvent("BackupProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("BackupMutation", backupMutation.toString())
845 // .detail("BackupMutationSize", val.size()).detail("Version", commitVersion).detail("DestPath", logRangeMutation.first)
846 // .detail("PartIndex", part).detail("PartIndexEndian", bigEndian32(part)).detail("PartData", backupMutation.param1);
847 // }
848 }
849 }
850 }
851
852 self->stats.mutations += mutationCount;
853 self->stats.mutationBytes += mutationBytes;
854
855 // Storage servers mustn't make durable versions which are not fully committed (because then they are impossible to roll back)
856 // We prevent this by limiting the number of versions which are semi-committed but not fully committed to be less than the MVCC window
857 while (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
858 // This should be *extremely* rare in the real world, but knob buggification should make it happen in simulation
859 TEST(true); // Semi-committed pipeline limited by MVCC window
860 //TraceEvent("ProxyWaitingForCommitted", self->dbgid).detail("CommittedVersion", self->committedVersion.get()).detail("NeedToCommit", commitVersion);
861 choose{
862 when(wait(self->committedVersion.whenAtLeast(commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) {
863 wait(yield());
864 break;
865 }
866 when(GetReadVersionReply v = wait(self->getConsistentReadVersion.getReply(GetReadVersionRequest(0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE | GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)))) {
867 if(v.version > self->committedVersion.get()) {
868 self->locked = v.locked;
869 self->metadataVersion = v.metadataVersion;
870 self->committedVersion.set(v.version);
871 }
872
873 if (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)
874 wait(delay(SERVER_KNOBS->PROXY_SPIN_DELAY));
875 }
876 }
877 }
878
879 state LogSystemDiskQueueAdapter::CommitMessage msg = wait(storeCommits.back().first); // Should just be doing yields
880
881 if (debugID.present())
882 g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterStoreCommits");
883
884 // txnState (transaction subsystem state) tag: message extracted from log adapter
885 bool firstMessage = true;
886 for(auto m : msg.messages) {
887 if(firstMessage) {
888 toCommit.addTag(txsTag);
889 }
890 toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage);
891 firstMessage = false;
892 }
893
894 if ( prevVersion && commitVersion - prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT/2 )
895 debug_advanceMaxCommittedVersion( UID(), commitVersion ); //< Is this valid?
896
897 //TraceEvent("ProxyPush", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion)
898 // .detail("TransactionsSubmitted", trs.size()).detail("TransactionsCommitted", commitCount).detail("TxsPopTo", msg.popTo);
899
900 if ( prevVersion && commitVersion - prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT/2 )
901 debug_advanceMaxCommittedVersion(UID(), commitVersion);
902
903 Future<Version> loggingComplete = self->logSystem->push( prevVersion, commitVersion, self->committedVersion.get(), self->minKnownCommittedVersion, toCommit, debugID );
904
905 if (!forceRecovery) {
906 ASSERT(self->latestLocalCommitBatchLogging.get() == localBatchNumber-1);
907 self->latestLocalCommitBatchLogging.set(localBatchNumber);
908 }
909
910 /////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above))
911
912 try {
913 choose {
914 when(Version ver = wait(loggingComplete)) {
915 self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, ver);
916 }
917 when(wait(self->committedVersion.whenAtLeast( commitVersion+1 ))) {}
918 }
919 } catch(Error &e) {
920 if(e.code() == error_code_broken_promise) {
921 throw master_tlog_failed();
922 }
923 throw;
924 }
925 wait(yield());
926
927 if( self->popRemoteTxs && msg.popTo > ( self->txsPopVersions.size() ? self->txsPopVersions.back().second : self->lastTxsPop ) ) {
928 if(self->txsPopVersions.size() >= SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) {
929 TraceEvent(SevWarnAlways, "DiscardingTxsPopHistory").suppressFor(1.0);
930 self->txsPopVersions.pop_front();
931 }
932
933 self->txsPopVersions.push_back(std::make_pair(commitVersion, msg.popTo));
934 }
935 self->logSystem->pop(msg.popTo, txsTag);
936
937 /////// Phase 5: Replies (CPU bound; no particular order required, though ordered execution would be best for latency)
938 if ( prevVersion && commitVersion - prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT/2 )
939 debug_advanceMinCommittedVersion(UID(), commitVersion);
940
941 //TraceEvent("ProxyPushed", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
942 if (debugID.present())
943 g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterLogPush");
944
945 for (auto &p : storeCommits) {
946 ASSERT(!p.second.isReady());
947 p.first.get().acknowledge.send(Void());
948 ASSERT(p.second.isReady());
949 }
950
951 TEST(self->committedVersion.get() > commitVersion); // A later version was reported committed first
952 if( commitVersion > self->committedVersion.get() ) {
953 self->locked = lockedAfter;
954 self->metadataVersion = metadataVersionAfter;
955 self->committedVersion.set(commitVersion);
956 }
957
958 if (forceRecovery) {
959 TraceEvent(SevWarn, "RestartingTxnSubsystem", self->dbgid).detail("Stage", "ProxyShutdown");
960 throw worker_removed();
961 }
962
963 // Send replies to clients
964 double endTime = timer();
965 for (int t = 0; t < trs.size(); t++) {
966 if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
967 ASSERT_WE_THINK(commitVersion != invalidVersion);
968 trs[t].reply.send(CommitID(commitVersion, t, metadataVersionAfter));
969 }
970 else if (committed[t] == ConflictBatch::TransactionTooOld) {
971 trs[t].reply.sendError(transaction_too_old());
972 }
973 else {
974 trs[t].reply.sendError(not_committed());
975 }
976
977 // TODO: filter if pipelined with large commit
978 if(self->latencyBandConfig.present()) {
979 bool filter = maxTransactionBytes > self->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault(std::numeric_limits<int>::max());
980 self->stats.commitLatencyBands.addMeasurement(endTime - trs[t].requestTime, filter);
981 }
982 }
983
984 ++self->stats.commitBatchOut;
985 self->stats.txnCommitOut += trs.size();
986 self->stats.txnConflicts += trs.size() - commitCount;
987 self->stats.txnCommitOutSuccess += commitCount;
988
989 if(now() - self->lastCoalesceTime > SERVER_KNOBS->RESOLVER_COALESCE_TIME) {
990 self->lastCoalesceTime = now();
991 int lastSize = self->keyResolvers.size();
992 auto rs = self->keyResolvers.ranges();
993 Version oldestVersion = prevVersion - SERVER_KNOBS->MAX_WRITE_TRANSACTION_LIFE_VERSIONS;
994 for(auto r = rs.begin(); r != rs.end(); ++r) {
995 while(r->value().size() > 1 && r->value()[1].first < oldestVersion)
996 r->value().pop_front();
997 if(r->value().size() && r->value().front().first < oldestVersion)
998 r->value().front().first = 0;
999 }
1000 self->keyResolvers.coalesce(allKeys);
1001 if(self->keyResolvers.size() != lastSize)
1002 TraceEvent("KeyResolverSize", self->dbgid).detail("Size", self->keyResolvers.size());
1003 }
1004
1005 // Dynamic batching for commits
1006 double target_latency = (now() - t1) * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION;
1007 self->commitBatchInterval =
1008 std::max(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN,
1009 std::min(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MAX,
1010 target_latency * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA + self->commitBatchInterval * (1-SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA)));
1011
1012
1013 self->commitBatchesMemBytesCount -= currentBatchMemBytesCount;
1014 ASSERT_ABORT(self->commitBatchesMemBytesCount >= 0);
1015 return Void();
1016 }
1017
1018
getLiveCommittedVersion(ProxyCommitData * commitData,uint32_t flags,vector<MasterProxyInterface> * otherProxies,Optional<UID> debugID,int transactionCount,int systemTransactionCount,int defaultPriTransactionCount,int batchPriTransactionCount)1019 ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID, int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
1020 {
1021 // Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
1022 // (1) The version returned is the committedVersion of some proxy at some point before the request returns, so it is committed.
1023 // (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
1024 // and no other proxy could have already committed anything without first ending the epoch
1025 ++commitData->stats.txnStartBatch;
1026
1027 state vector<Future<GetReadVersionReply>> proxyVersions;
1028 for (auto const& p : *otherProxies)
1029 proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskTLogConfirmRunningReply)));
1030
1031 if (!(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY))
1032 {
1033 wait(commitData->logSystem->confirmEpochLive(debugID));
1034 }
1035
1036 if (debugID.present())
1037 g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive");
1038
1039 vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
1040 GetReadVersionReply rep;
1041 rep.version = commitData->committedVersion.get();
1042 rep.locked = commitData->locked;
1043 rep.metadataVersion = commitData->metadataVersion;
1044
1045 for (auto v : versions) {
1046 if(v.version > rep.version) {
1047 rep = v;
1048 }
1049 }
1050
1051 if (debugID.present())
1052 g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
1053
1054 commitData->stats.txnStartOut += transactionCount;
1055 commitData->stats.txnSystemPriorityStartOut += systemTransactionCount;
1056 commitData->stats.txnDefaultPriorityStartOut += defaultPriTransactionCount;
1057 commitData->stats.txnBatchPriorityStartOut += batchPriTransactionCount;
1058
1059 return rep;
1060 }
1061
fetchVersions(ProxyCommitData * commitData)1062 ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
1063 loop {
1064 waitNext(commitData->commitBatchStartNotifications.getFuture());
1065 GetCommitVersionRequest req(commitData->commitVersionRequestNumber++, commitData->mostRecentProcessedRequestNumber, commitData->dbgid);
1066 commitData->commitBatchVersions.send(brokenPromiseToNever(commitData->master.getCommitVersion.getReply(req)));
1067 }
1068 }
1069
1070 struct TransactionRateInfo {
1071 double rate;
1072 double limit;
1073
TransactionRateInfoTransactionRateInfo1074 TransactionRateInfo(double rate) : rate(rate), limit(0) {}
1075
resetTransactionRateInfo1076 void reset(double elapsed) {
1077 limit = std::min(0.0,limit) + std::min(rate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
1078 }
1079
canStartTransactionRateInfo1080 bool canStart(int64_t numAlreadyStarted) {
1081 return numAlreadyStarted < limit;
1082 }
1083
updateBudgetTransactionRateInfo1084 void updateBudget(int64_t numStarted) {
1085 limit -= numStarted;
1086 }
1087 };
1088
sendGrvReplies(Future<GetReadVersionReply> replyFuture,std::vector<GetReadVersionRequest> requests,ProxyStats * stats)1089 ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
1090 GetReadVersionReply reply = wait(replyFuture);
1091 double end = timer();
1092 for(GetReadVersionRequest const& request : requests) {
1093 stats->grvLatencyBands.addMeasurement(end - request.requestTime);
1094 request.reply.send(reply);
1095 }
1096
1097 return Void();
1098 }
1099
transactionStarter(MasterProxyInterface proxy,Reference<AsyncVar<ServerDBInfo>> db,PromiseStream<Future<Void>> addActor,ProxyCommitData * commitData,GetHealthMetricsReply * healthMetricsReply,GetHealthMetricsReply * detailedHealthMetricsReply)1100 ACTOR static Future<Void> transactionStarter(
1101 MasterProxyInterface proxy,
1102 Reference<AsyncVar<ServerDBInfo>> db,
1103 PromiseStream<Future<Void>> addActor,
1104 ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply,
1105 GetHealthMetricsReply* detailedHealthMetricsReply)
1106 {
1107 state double lastGRVTime = 0;
1108 state PromiseStream<Void> GRVTimer;
1109 state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
1110
1111 state int64_t transactionCount = 0;
1112 state int64_t batchTransactionCount = 0;
1113 state TransactionRateInfo normalRateInfo(10);
1114 state TransactionRateInfo batchRateInfo(0);
1115
1116 state std::priority_queue<std::pair<GetReadVersionRequest, int64_t>, std::vector<std::pair<GetReadVersionRequest, int64_t>>> transactionQueue;
1117 state vector<MasterProxyInterface> otherProxies;
1118
1119 state PromiseStream<double> replyTimes;
1120 addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
1121 addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
1122
1123 // Get a list of the other proxies that go together with us
1124 while (std::find(db->get().client.proxies.begin(), db->get().client.proxies.end(), proxy) == db->get().client.proxies.end())
1125 wait(db->onChange());
1126 for (MasterProxyInterface mp : db->get().client.proxies) {
1127 if (mp != proxy)
1128 otherProxies.push_back(mp);
1129 }
1130
1131 ASSERT(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS); // else potentially we could return uncommitted read versions (since self->committedVersion is only a committed version if this recovery succeeds)
1132
1133 TraceEvent("ProxyReadyForTxnStarts", proxy.id());
1134
1135 loop{
1136 waitNext(GRVTimer.getFuture());
1137 // Select zero or more transactions to start
1138 double t = now();
1139 double elapsed = std::min<double>(now() - lastGRVTime, SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MAX);
1140 lastGRVTime = t;
1141
1142 if(elapsed == 0) elapsed = 1e-15; // resolve a possible indeterminant multiplication with infinite transaction rate
1143
1144 normalRateInfo.reset(elapsed);
1145 batchRateInfo.reset(elapsed);
1146
1147 int transactionsStarted[2] = {0,0};
1148 int systemTransactionsStarted[2] = {0,0};
1149 int defaultPriTransactionsStarted[2] = { 0, 0 };
1150 int batchPriTransactionsStarted[2] = { 0, 0 };
1151
1152 vector<vector<GetReadVersionRequest>> start(2); // start[0] is transactions starting with !(flags&CAUSAL_READ_RISKY), start[1] is transactions starting with flags&CAUSAL_READ_RISKY
1153 Optional<UID> debugID;
1154
1155 int requestsToStart = 0;
1156 while (!transactionQueue.empty() && requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
1157 auto& req = transactionQueue.top().first;
1158 int tc = req.transactionCount;
1159
1160 if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
1161 break;
1162 }
1163 else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
1164 break;
1165 }
1166
1167 if (req.debugID.present()) {
1168 if (!debugID.present()) debugID = g_nondeterministic_random->randomUniqueID();
1169 g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
1170 }
1171
1172 transactionsStarted[req.flags&1] += tc;
1173 if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE)
1174 systemTransactionsStarted[req.flags & 1] += tc;
1175 else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT)
1176 defaultPriTransactionsStarted[req.flags & 1] += tc;
1177 else
1178 batchPriTransactionsStarted[req.flags & 1] += tc;
1179
1180 start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
1181 transactionQueue.pop();
1182 requestsToStart++;
1183 }
1184
1185 if (!transactionQueue.empty())
1186 forwardPromise(GRVTimer, delayJittered(SERVER_KNOBS->START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, TaskProxyGRVTimer));
1187
1188 /*TraceEvent("GRVBatch", proxy.id())
1189 .detail("Elapsed", elapsed)
1190 .detail("NTransactionToStart", nTransactionsToStart)
1191 .detail("TransactionRate", transactionRate)
1192 .detail("TransactionQueueSize", transactionQueue.size())
1193 .detail("NumTransactionsStarted", transactionsStarted[0] + transactionsStarted[1])
1194 .detail("NumSystemTransactionsStarted", systemTransactionsStarted[0] + systemTransactionsStarted[1])
1195 .detail("NumNonSystemTransactionsStarted", transactionsStarted[0] + transactionsStarted[1] - systemTransactionsStarted[0] - systemTransactionsStarted[1])
1196 .detail("TransactionBudget", transactionBudget)
1197 .detail("BatchTransactionBudget", batchTransactionBudget);*/
1198
1199 transactionCount += transactionsStarted[0] + transactionsStarted[1];
1200 batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
1201
1202 normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
1203 batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
1204
1205 if (debugID.present()) {
1206 g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
1207 }
1208
1209 for (int i = 0; i < start.size(); i++) {
1210 if (start[i].size()) {
1211 Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
1212 addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats));
1213
1214 // for now, base dynamic batching on the time for normal requests (not read_risky)
1215 if (i == 0) {
1216 addActor.send(timeReply(readVersionReply, replyTimes));
1217 }
1218 }
1219 }
1220 }
1221 }
1222
readRequestServer(MasterProxyInterface proxy,ProxyCommitData * commitData)1223 ACTOR static Future<Void> readRequestServer(
1224 MasterProxyInterface proxy,
1225 ProxyCommitData* commitData
1226 )
1227 {
1228 // Implement read-only parts of the proxy interface
1229
1230 // We can't respond to these requests until we have valid txnStateStore
1231 wait(commitData->validState.getFuture());
1232
1233 TraceEvent("ProxyReadyForReads", proxy.id());
1234
1235 loop {
1236 choose{
1237 when(GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture())) {
1238 GetKeyServerLocationsReply rep;
1239 if(!req.end.present()) {
1240 auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin) : commitData->keyInfo.rangeContaining(req.begin);
1241 vector<StorageServerInterface> ssis;
1242 ssis.reserve(r.value().src_info.size());
1243 for(auto& it : r.value().src_info) {
1244 ssis.push_back(it->interf);
1245 }
1246 rep.results.push_back(std::make_pair(r.range(), ssis));
1247 } else if(!req.reverse) {
1248 int count = 0;
1249 for(auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) {
1250 vector<StorageServerInterface> ssis;
1251 ssis.reserve(r.value().src_info.size());
1252 for(auto& it : r.value().src_info) {
1253 ssis.push_back(it->interf);
1254 }
1255 rep.results.push_back(std::make_pair(r.range(), ssis));
1256 count++;
1257 }
1258 } else {
1259 int count = 0;
1260 auto r = commitData->keyInfo.rangeContainingKeyBefore(req.end.get());
1261 while( count < req.limit && req.begin < r.end() ) {
1262 vector<StorageServerInterface> ssis;
1263 ssis.reserve(r.value().src_info.size());
1264 for(auto& it : r.value().src_info) {
1265 ssis.push_back(it->interf);
1266 }
1267 rep.results.push_back(std::make_pair(r.range(), ssis));
1268 if(r == commitData->keyInfo.ranges().begin()) {
1269 break;
1270 }
1271 count++;
1272 --r;
1273 }
1274 }
1275 req.reply.send(rep);
1276 }
1277 when(GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture())) {
1278 if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) {
1279 GetStorageServerRejoinInfoReply rep;
1280 rep.version = commitData->version;
1281 rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() );
1282 Standalone<VectorRef<KeyValueRef>> history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get();
1283 for(int i = history.size()-1; i >= 0; i-- ) {
1284 rep.history.push_back(std::make_pair(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value)));
1285 }
1286 auto localityKey = commitData->txnStateStore->readValue(tagLocalityListKeyFor(req.dcId)).get();
1287 if( localityKey.present() ) {
1288 rep.newLocality = false;
1289 int8_t locality = decodeTagLocalityListValue(localityKey.get());
1290 if(locality != rep.tag.locality) {
1291 uint16_t tagId = 0;
1292 std::vector<uint16_t> usedTags;
1293 auto tagKeys = commitData->txnStateStore->readRange(serverTagKeys).get();
1294 for( auto& kv : tagKeys ) {
1295 Tag t = decodeServerTagValue( kv.value );
1296 if(t.locality == locality) {
1297 usedTags.push_back(t.id);
1298 }
1299 }
1300 auto historyKeys = commitData->txnStateStore->readRange(serverTagHistoryKeys).get();
1301 for( auto& kv : historyKeys ) {
1302 Tag t = decodeServerTagValue( kv.value );
1303 if(t.locality == locality) {
1304 usedTags.push_back(t.id);
1305 }
1306 }
1307 std::sort(usedTags.begin(), usedTags.end());
1308
1309 int usedIdx = 0;
1310 for(; usedTags.size() > 0 && tagId <= usedTags.end()[-1]; tagId++) {
1311 if(tagId < usedTags[usedIdx]) {
1312 break;
1313 } else {
1314 usedIdx++;
1315 }
1316 }
1317 rep.newTag = Tag(locality, tagId);
1318 }
1319 } else {
1320 rep.newLocality = true;
1321 int8_t maxTagLocality = -1;
1322 auto localityKeys = commitData->txnStateStore->readRange(tagLocalityListKeys).get();
1323 for( auto& kv : localityKeys ) {
1324 maxTagLocality = std::max(maxTagLocality, decodeTagLocalityListValue( kv.value ));
1325 }
1326 rep.newTag = Tag(maxTagLocality+1,0);
1327 }
1328 req.reply.send(rep);
1329 } else {
1330 req.reply.sendError(worker_removed());
1331 }
1332 }
1333 }
1334 wait(yield());
1335 }
1336 }
1337
healthMetricsRequestServer(MasterProxyInterface proxy,GetHealthMetricsReply * healthMetricsReply,GetHealthMetricsReply * detailedHealthMetricsReply)1338 ACTOR Future<Void> healthMetricsRequestServer(MasterProxyInterface proxy, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply)
1339 {
1340 loop {
1341 choose {
1342 when(GetHealthMetricsRequest req =
1343 waitNext(proxy.getHealthMetrics.getFuture()))
1344 {
1345 if (req.detailed)
1346 req.reply.send(*detailedHealthMetricsReply);
1347 else
1348 req.reply.send(*healthMetricsReply);
1349 }
1350 }
1351 }
1352 }
1353
monitorRemoteCommitted(ProxyCommitData * self,Reference<AsyncVar<ServerDBInfo>> db)1354 ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self, Reference<AsyncVar<ServerDBInfo>> db) {
1355 loop {
1356 wait(delay(0)); //allow this actor to be cancelled if we are removed after db changes.
1357 state Optional<std::vector<OptionalInterface<TLogInterface>>> remoteLogs;
1358 if(db->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
1359 for(auto& logSet : db->get().logSystemConfig.tLogs) {
1360 if(!logSet.isLocal) {
1361 remoteLogs = logSet.tLogs;
1362 for(auto& tLog : logSet.tLogs) {
1363 if(!tLog.present()) {
1364 remoteLogs = Optional<std::vector<OptionalInterface<TLogInterface>>>();
1365 break;
1366 }
1367 }
1368 break;
1369 }
1370 }
1371 }
1372
1373 if(!remoteLogs.present()) {
1374 wait(db->onChange());
1375 continue;
1376 }
1377 self->popRemoteTxs = true;
1378
1379 state Future<Void> onChange = db->onChange();
1380 loop {
1381 state std::vector<Future<TLogQueuingMetricsReply>> replies;
1382 for(auto &it : remoteLogs.get()) {
1383 replies.push_back(brokenPromiseToNever( it.interf().getQueuingMetrics.getReply( TLogQueuingMetricsRequest() ) ));
1384 }
1385 wait( waitForAll(replies) || onChange );
1386
1387 if(onChange.isReady()) {
1388 break;
1389 }
1390
1391 //FIXME: use the configuration to calculate a more precise minimum recovery version.
1392 Version minVersion = std::numeric_limits<Version>::max();
1393 for(auto& it : replies) {
1394 minVersion = std::min(minVersion, it.get().v);
1395 }
1396
1397 while(self->txsPopVersions.size() && self->txsPopVersions.front().first <= minVersion) {
1398 self->lastTxsPop = self->txsPopVersions.front().second;
1399 self->logSystem->pop(self->txsPopVersions.front().second, txsTag, 0, tagLocalityRemoteLog);
1400 self->txsPopVersions.pop_front();
1401 }
1402
1403 wait( delay(SERVER_KNOBS->UPDATE_REMOTE_LOG_VERSION_INTERVAL) || onChange );
1404 if(onChange.isReady()) {
1405 break;
1406 }
1407 }
1408 }
1409 }
1410
masterProxyServerCore(MasterProxyInterface proxy,MasterInterface master,Reference<AsyncVar<ServerDBInfo>> db,LogEpoch epoch,Version recoveryTransactionVersion,bool firstProxy)1411 ACTOR Future<Void> masterProxyServerCore(
1412 MasterProxyInterface proxy,
1413 MasterInterface master,
1414 Reference<AsyncVar<ServerDBInfo>> db,
1415 LogEpoch epoch,
1416 Version recoveryTransactionVersion,
1417 bool firstProxy)
1418 {
1419 state ProxyCommitData commitData(proxy.id(), master, proxy.getConsistentReadVersion, recoveryTransactionVersion, proxy.commit, db, firstProxy);
1420
1421 state Future<Sequence> sequenceFuture = (Sequence)0;
1422 state PromiseStream< std::pair<vector<CommitTransactionRequest>, int> > batchedCommits;
1423 state Future<Void> commitBatcherActor;
1424 state Future<Void> lastCommitComplete = Void();
1425
1426 state PromiseStream<Future<Void>> addActor;
1427 state Future<Void> onError = transformError( actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed() );
1428 state double lastCommit = 0;
1429 state std::set<Sequence> txnSequences;
1430 state Sequence maxSequence = std::numeric_limits<Sequence>::max();
1431
1432 state GetHealthMetricsReply healthMetricsReply;
1433 state GetHealthMetricsReply detailedHealthMetricsReply;
1434
1435 addActor.send( fetchVersions(&commitData) );
1436 addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
1437
1438 //TraceEvent("ProxyInit1", proxy.id());
1439
1440 // Wait until we can load the "real" logsystem, since we don't support switching them currently
1441 while (!(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
1442 //TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
1443 wait(db->onChange());
1444 }
1445 state Future<Void> dbInfoChange = db->onChange();
1446 //TraceEvent("ProxyInit3", proxy.id());
1447
1448 commitData.resolvers = db->get().resolvers;
1449 ASSERT(commitData.resolvers.size() != 0);
1450
1451 auto rs = commitData.keyResolvers.modify(allKeys);
1452 for(auto r = rs.begin(); r != rs.end(); ++r)
1453 r->value().push_back(std::make_pair<Version,int>(0,0));
1454
1455 commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
1456 commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference<AsyncVar<PeekSpecialInfo>>(), false);
1457 commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
1458
1459 // ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.
1460 // COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR is an estimate based on experiments and not an accurate one.
1461 state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast<int64_t>((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR));
1462 TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
1463
1464 addActor.send(monitorRemoteCommitted(&commitData, db));
1465 addActor.send(transactionStarter(proxy, db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
1466 addActor.send(readRequestServer(proxy, &commitData));
1467 addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
1468
1469 // wait for txnStateStore recovery
1470 wait(success(commitData.txnStateStore->readValue(StringRef())));
1471
1472 int commitBatchByteLimit =
1473 (int)std::min<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MAX,
1474 std::max<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MIN,
1475 SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(db->get().client.proxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER)));
1476
1477 commitBatcherActor = commitBatcher(&commitData, batchedCommits, proxy.commit.getFuture(), commitBatchByteLimit, commitBatchesMemoryLimit);
1478 loop choose{
1479 when( wait( dbInfoChange ) ) {
1480 dbInfoChange = db->onChange();
1481 if(db->get().master.id() == master.id() && db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
1482 commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
1483 for(auto it : commitData.tag_popped) {
1484 commitData.logSystem->pop(it.second, it.first);
1485 }
1486 commitData.logSystem->pop(commitData.lastTxsPop, txsTag, 0, tagLocalityRemoteLog);
1487 }
1488
1489 Optional<LatencyBandConfig> newLatencyBandConfig = db->get().latencyBandConfig;
1490
1491 if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
1492 || (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != commitData.latencyBandConfig.get().grvConfig))
1493 {
1494 TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
1495 commitData.stats.grvLatencyBands.clearBands();
1496 if(newLatencyBandConfig.present()) {
1497 for(auto band : newLatencyBandConfig.get().grvConfig.bands) {
1498 commitData.stats.grvLatencyBands.addThreshold(band);
1499 }
1500 }
1501 }
1502
1503 if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
1504 || (newLatencyBandConfig.present() && newLatencyBandConfig.get().commitConfig != commitData.latencyBandConfig.get().commitConfig))
1505 {
1506 TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
1507 commitData.stats.commitLatencyBands.clearBands();
1508 if(newLatencyBandConfig.present()) {
1509 for(auto band : newLatencyBandConfig.get().commitConfig.bands) {
1510 commitData.stats.commitLatencyBands.addThreshold(band);
1511 }
1512 }
1513 }
1514
1515 commitData.latencyBandConfig = newLatencyBandConfig;
1516 }
1517 when(wait(onError)) {}
1518 when(std::pair<vector<CommitTransactionRequest>, int> batchedRequests = waitNext(batchedCommits.getFuture())) {
1519 const vector<CommitTransactionRequest> &trs = batchedRequests.first;
1520 int batchBytes = batchedRequests.second;
1521 //TraceEvent("MasterProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);
1522 if (trs.size() || (db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) {
1523 lastCommit = now();
1524
1525 if (trs.size() || lastCommitComplete.isReady()) {
1526 lastCommitComplete = commitBatch(&commitData, trs, batchBytes);
1527 addActor.send(lastCommitComplete);
1528 }
1529 }
1530 }
1531 when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
1532 //TraceEvent("ProxyGetRCV", proxy.id());
1533 if (req.debugID.present())
1534 g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
1535 GetReadVersionReply rep;
1536 rep.locked = commitData.locked;
1537 rep.metadataVersion = commitData.metadataVersion;
1538 rep.version = commitData.committedVersion.get();
1539 req.reply.send(rep);
1540 }
1541 when(TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {
1542 state ReplyPromise<Void> reply = req.reply;
1543 if(req.last) maxSequence = req.sequence + 1;
1544 if (!txnSequences.count(req.sequence)) {
1545 txnSequences.insert(req.sequence);
1546
1547 ASSERT(!commitData.validState.isSet()); // Although we may receive the CommitTransactionRequest for the recovery transaction before all of the TxnStateRequest, we will not get a resolution result from any resolver until the master has submitted its initial (sequence 0) resolution request, which it doesn't do until we have acknowledged all TxnStateRequests
1548
1549 for(auto& kv : req.data)
1550 commitData.txnStateStore->set(kv, &req.arena);
1551 commitData.txnStateStore->commit(true);
1552
1553 if(txnSequences.size() == maxSequence) {
1554 state KeyRange txnKeys = allKeys;
1555 loop {
1556 wait(yield());
1557 Standalone<VectorRef<KeyValueRef>> data = commitData.txnStateStore->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES).get();
1558 if(!data.size()) break;
1559 ((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
1560
1561 Standalone<VectorRef<MutationRef>> mutations;
1562 std::vector<std::pair<MapPair<Key,ServerCacheInfo>,int>> keyInfoData;
1563 vector<UID> src, dest;
1564 Reference<StorageInfo> storageInfo;
1565 ServerCacheInfo info;
1566 for(auto &kv : data) {
1567 if( kv.key.startsWith(keyServersPrefix) ) {
1568 KeyRef k = kv.key.removePrefix(keyServersPrefix);
1569 if(k != allKeys.end) {
1570 decodeKeyServersValue(kv.value, src, dest);
1571 info.tags.clear();
1572 info.src_info.clear();
1573 info.dest_info.clear();
1574 for(auto& id : src) {
1575 auto cacheItr = commitData.storageCache.find(id);
1576 if(cacheItr == commitData.storageCache.end()) {
1577 storageInfo = Reference<StorageInfo>( new StorageInfo() );
1578 storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
1579 storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
1580 commitData.storageCache[id] = storageInfo;
1581 } else {
1582 storageInfo = cacheItr->second;
1583 }
1584 ASSERT(storageInfo->tag != invalidTag);
1585 info.tags.push_back( storageInfo->tag );
1586 info.src_info.push_back( storageInfo );
1587 }
1588 for(auto& id : dest) {
1589 auto cacheItr = commitData.storageCache.find(id);
1590 if(cacheItr == commitData.storageCache.end()) {
1591 storageInfo = Reference<StorageInfo>( new StorageInfo() );
1592 storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
1593 storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
1594 commitData.storageCache[id] = storageInfo;
1595 } else {
1596 storageInfo = cacheItr->second;
1597 }
1598 ASSERT(storageInfo->tag != invalidTag);
1599 info.tags.push_back( storageInfo->tag );
1600 info.dest_info.push_back( storageInfo );
1601 }
1602 uniquify(info.tags);
1603 keyInfoData.push_back( std::make_pair(MapPair<Key,ServerCacheInfo>(k, info), 1) );
1604 }
1605 } else {
1606 mutations.push_back(mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value));
1607 }
1608 }
1609
1610 //insert keyTag data separately from metadata mutations so that we can do one bulk insert which avoids a lot of map lookups.
1611 commitData.keyInfo.rawInsert(keyInfoData);
1612
1613 Arena arena;
1614 bool confChanges;
1615 applyMetadataMutations(commitData.dbgid, arena, mutations, commitData.txnStateStore, NULL, &confChanges, Reference<ILogSystem>(), 0, &commitData.vecBackupKeys, &commitData.keyInfo, commitData.firstProxy ? &commitData.uid_applyMutationsData : NULL, commitData.commit, commitData.cx, &commitData.committedVersion, &commitData.storageCache, &commitData.tag_popped, true );
1616 }
1617
1618 auto lockedKey = commitData.txnStateStore->readValue(databaseLockedKey).get();
1619 commitData.locked = lockedKey.present() && lockedKey.get().size();
1620 commitData.metadataVersion = commitData.txnStateStore->readValue(metadataVersionKey).get();
1621
1622 commitData.txnStateStore->enableSnapshot();
1623 }
1624 }
1625 reply.send(Void());
1626 wait(yield());
1627 }
1628 }
1629 }
1630
checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,uint64_t recoveryCount,MasterProxyInterface myInterface)1631 ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, uint64_t recoveryCount, MasterProxyInterface myInterface) {
1632 loop{
1633 if (db->get().recoveryCount >= recoveryCount && !std::count(db->get().client.proxies.begin(), db->get().client.proxies.end(), myInterface))
1634 throw worker_removed();
1635 wait(db->onChange());
1636 }
1637 }
1638
masterProxyServer(MasterProxyInterface proxy,InitializeMasterProxyRequest req,Reference<AsyncVar<ServerDBInfo>> db)1639 ACTOR Future<Void> masterProxyServer(
1640 MasterProxyInterface proxy,
1641 InitializeMasterProxyRequest req,
1642 Reference<AsyncVar<ServerDBInfo>> db)
1643 {
1644 try {
1645 state Future<Void> core = masterProxyServerCore(proxy, req.master, db, req.recoveryCount, req.recoveryTransactionVersion, req.firstProxy);
1646 loop choose{
1647 when(wait(core)) { return Void(); }
1648 when(wait(checkRemoved(db, req.recoveryCount, proxy))) {}
1649 }
1650 }
1651 catch (Error& e) {
1652 if (e.code() == error_code_actor_cancelled || e.code() == error_code_worker_removed || e.code() == error_code_tlog_stopped ||
1653 e.code() == error_code_master_tlog_failed || e.code() == error_code_coordinators_changed || e.code() == error_code_coordinated_state_conflict ||
1654 e.code() == error_code_new_coordinators_timed_out)
1655 {
1656 TraceEvent("MasterProxyTerminated", proxy.id()).error(e, true);
1657 return Void();
1658 }
1659 throw;
1660 }
1661 }
1662