1 /*
2  * NativeAPI.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbclient/DatabaseContext.h"
22 #include "fdbclient/NativeAPI.actor.h"
23 #include "fdbclient/Atomic.h"
24 #include "flow/Platform.h"
25 #include "flow/ActorCollection.h"
26 #include "fdbclient/SystemData.h"
27 #include "fdbrpc/LoadBalance.h"
28 #include "fdbclient/StorageServerInterface.h"
29 #include "fdbclient/MasterProxyInterface.h"
30 #include "fdbclient/ClusterInterface.h"
31 #include "fdbclient/FailureMonitorClient.h"
32 #include "flow/DeterministicRandom.h"
33 #include "fdbclient/KeyRangeMap.h"
34 #include "flow/SystemMonitor.h"
35 #include "fdbclient/MutationList.h"
36 #include "fdbclient/CoordinationInterface.h"
37 #include "fdbclient/MonitorLeader.h"
38 #if defined(CMAKE_BUILD) || !defined(WIN32)
39 #include "versions.h"
40 #endif
41 #include "fdbrpc/TLSConnection.h"
42 #include "flow/Knobs.h"
43 #include "fdbclient/Knobs.h"
44 #include "fdbrpc/Net2FileSystem.h"
45 #include "fdbrpc/simulator.h"
46 
47 #include <iterator>
48 
49 #ifdef WIN32
50 #define WIN32_LEAN_AND_MEAN
51 #include <Windows.h>
52 #undef min
53 #undef max
54 #else
55 #include <time.h>
56 #endif
57 #include "flow/actorcompiler.h" // This must be the last #include.
58 
59 extern IRandom* trace_random;
60 extern const char* getHGVersion();
61 
62 using std::make_pair;
63 using std::max;
64 using std::min;
65 
66 NetworkOptions networkOptions;
67 Reference<TLSOptions> tlsOptions;
68 
initTLSOptions()69 static void initTLSOptions() {
70 	if (!tlsOptions) {
71 		tlsOptions = Reference<TLSOptions>(new TLSOptions());
72 	}
73 }
74 
75 static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");
76 static const Key CLIENT_LATENCY_INFO_CTR_PREFIX = LiteralStringRef("client_latency_counter/");
77 
getInterface(DatabaseContext * cx,StorageServerInterface const & ssi,LocalityData const & locality)78 Reference<StorageServerInfo> StorageServerInfo::getInterface( DatabaseContext *cx, StorageServerInterface const& ssi, LocalityData const& locality ) {
79 	auto it = cx->server_interf.find( ssi.id() );
80 	if( it != cx->server_interf.end() ) {
81 		if(it->second->interf.getVersion.getEndpoint().token != ssi.getVersion.getEndpoint().token) {
82 			if(it->second->interf.locality == ssi.locality) {
83 				//FIXME: load balance holds pointers to individual members of the interface, and this assignment will swap out the object they are
84 				//       pointing to. This is technically correct, but is very unnatural. We may want to refactor load balance to take an AsyncVar<Reference<Interface>>
85 				//       so that it is notified when the interface changes.
86 				it->second->interf = ssi;
87 			} else {
88 				it->second->notifyContextDestroyed();
89 				Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
90 				cx->server_interf[ ssi.id() ] = loc.getPtr();
91 				return loc;
92 			}
93 		}
94 
95 		return Reference<StorageServerInfo>::addRef( it->second );
96 	}
97 
98 	Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
99 	cx->server_interf[ ssi.id() ] = loc.getPtr();
100 	return loc;
101 }
102 
notifyContextDestroyed()103 void StorageServerInfo::notifyContextDestroyed() {
104 	cx = NULL;
105 }
106 
~StorageServerInfo()107 StorageServerInfo::~StorageServerInfo() {
108 	if( cx ) {
109 		auto it = cx->server_interf.find( interf.id() );
110 		if( it != cx->server_interf.end() )
111 			cx->server_interf.erase( it );
112 		cx = NULL;
113 	}
114 }
115 
printable(const VectorRef<KeyValueRef> & val)116 std::string printable( const VectorRef<KeyValueRef>& val ) {
117 	std::string s;
118 	for(int i=0; i<val.size(); i++)
119 		s = s + printable(val[i].key) + format(":%d ",val[i].value.size());
120 	return s;
121 }
122 
printable(const KeyValueRef & val)123 std::string printable( const KeyValueRef& val ) {
124 	return printable(val.key) + format(":%d ",val.value.size());
125 }
126 
printable(const VectorRef<StringRef> & val)127 std::string printable( const VectorRef<StringRef>& val ) {
128 	std::string s;
129 	for(int i=0; i<val.size(); i++)
130 		s = s + printable(val[i]) + " ";
131 	return s;
132 }
133 
printable(const StringRef & val)134 std::string printable( const StringRef& val ) {
135 	return val.printable();
136 }
137 
printable(const std::string & str)138 std::string printable( const std::string& str ) {
139 	return StringRef(str).printable();
140 }
141 
printable(const KeyRangeRef & range)142 std::string printable( const KeyRangeRef& range ) {
143 	return printable(range.begin) + " - " + printable(range.end);
144 }
145 
unhex(char c)146 int unhex( char c ) {
147 	if (c >= '0' && c <= '9')
148 		return c-'0';
149 	if (c >= 'a' && c <= 'f')
150 		return c-'a'+10;
151 	if (c >= 'A' && c <= 'F')
152 		return c-'A'+10;
153 	UNREACHABLE();
154 }
155 
unprintable(std::string const & val)156 std::string unprintable( std::string const& val ) {
157 	std::string s;
158 	for(int i=0; i<val.size(); i++) {
159 		char c = val[i];
160 		if ( c == '\\' ) {
161 			if (++i == val.size()) ASSERT(false);
162 			if (val[i] == '\\') {
163 				s += '\\';
164 			} else if (val[i] == 'x') {
165 				if (i+2 >= val.size()) ASSERT(false);
166 				s += char((unhex(val[i+1])<<4) + unhex(val[i+2]));
167 				i += 2;
168 			} else
169 				ASSERT(false);
170 		} else
171 			s += c;
172 	}
173 	return s;
174 }
175 
validateVersion(Version version)176 void validateVersion(Version version) {
177 	// Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any reads.
178 	// We throw client_invalid_operation because the caller didn't directly set the version, so the version_invalid error
179 	// might be confusing.
180 	if(version == 0) {
181 		throw client_invalid_operation();
182 	}
183 
184 	ASSERT(version > 0 || version == latestVersion);
185 }
186 
validateOptionValue(Optional<StringRef> value,bool shouldBePresent)187 void validateOptionValue(Optional<StringRef> value, bool shouldBePresent) {
188 	if(shouldBePresent && !value.present())
189 		throw invalid_option_value();
190 	if(!shouldBePresent && value.present() && value.get().size() > 0)
191 		throw invalid_option_value();
192 }
193 
dumpMutations(const MutationListRef & mutations)194 void dumpMutations( const MutationListRef& mutations ) {
195 	for(auto m=mutations.begin(); m; ++m) {
196 		switch (m->type) {
197 			case MutationRef::SetValue: printf("  '%s' := '%s'\n", printable(m->param1).c_str(), printable(m->param2).c_str()); break;
198 			case MutationRef::AddValue: printf("  '%s' += '%s'", printable(m->param1).c_str(), printable(m->param2).c_str()); break;
199 			case MutationRef::ClearRange: printf("  Clear ['%s','%s')\n", printable(m->param1).c_str(), printable(m->param2).c_str()); break;
200 			default: printf("  Unknown mutation %d('%s','%s')\n", m->type, printable(m->param1).c_str(), printable(m->param2).c_str()); break;
201 		}
202 	}
203 }
204 
addref(DatabaseContext * ptr)205 template <> void addref( DatabaseContext* ptr ) { ptr->addref(); }
delref(DatabaseContext * ptr)206 template <> void delref( DatabaseContext* ptr ) { ptr->delref(); }
207 
databaseLogger(DatabaseContext * cx)208 ACTOR Future<Void> databaseLogger( DatabaseContext *cx ) {
209 	loop {
210 		wait( delay( CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID ) );
211 		TraceEvent("TransactionMetrics")
212 			.detail("Cluster", cx->cluster && cx->getConnectionFile() ? cx->getConnectionFile()->getConnectionString().clusterKeyName().toString() : "")
213 			.detail("ReadVersions", cx->transactionReadVersions)
214 			.detail("LogicalUncachedReads", cx->transactionLogicalReads)
215 			.detail("PhysicalReadRequests", cx->transactionPhysicalReads)
216 			.detail("CommittedMutations", cx->transactionCommittedMutations)
217 			.detail("CommittedMutationBytes", cx->transactionCommittedMutationBytes)
218 			.detail("CommitStarted", cx->transactionsCommitStarted)
219 			.detail("CommitCompleted", cx->transactionsCommitCompleted)
220 			.detail("TooOld", cx->transactionsTooOld)
221 			.detail("FutureVersions", cx->transactionsFutureVersions)
222 			.detail("NotCommitted", cx->transactionsNotCommitted)
223 			.detail("MaybeCommitted", cx->transactionsMaybeCommitted)
224 			.detail("ResourceConstrained", cx->transactionsResourceConstrained)
225 			.detail("ProcessBehind", cx->transactionsProcessBehind)
226 			.detail("MeanLatency", cx->latencies.mean())
227 			.detail("MedianLatency", cx->latencies.median())
228 			.detail("Latency90", cx->latencies.percentile(0.90))
229 			.detail("Latency98", cx->latencies.percentile(0.98))
230 			.detail("MaxLatency", cx->latencies.max())
231 			.detail("MeanRowReadLatency", cx->readLatencies.mean())
232 			.detail("MedianRowReadLatency", cx->readLatencies.median())
233 			.detail("MaxRowReadLatency", cx->readLatencies.max())
234 			.detail("MeanGRVLatency", cx->GRVLatencies.mean())
235 			.detail("MedianGRVLatency", cx->GRVLatencies.median())
236 			.detail("MaxGRVLatency", cx->GRVLatencies.max())
237 			.detail("MeanCommitLatency", cx->commitLatencies.mean())
238 			.detail("MedianCommitLatency", cx->commitLatencies.median())
239 			.detail("MaxCommitLatency", cx->commitLatencies.max())
240 			.detail("MeanMutationsPerCommit", cx->mutationsPerCommit.mean())
241 			.detail("MedianMutationsPerCommit", cx->mutationsPerCommit.median())
242 			.detail("MaxMutationsPerCommit", cx->mutationsPerCommit.max())
243 			.detail("MeanBytesPerCommit", cx->bytesPerCommit.mean())
244 			.detail("MedianBytesPerCommit", cx->bytesPerCommit.median())
245 			.detail("MaxBytesPerCommit", cx->bytesPerCommit.max());
246 		cx->latencies.clear();
247 		cx->readLatencies.clear();
248 		cx->GRVLatencies.clear();
249 		cx->commitLatencies.clear();
250 		cx->mutationsPerCommit.clear();
251 		cx->bytesPerCommit.clear();
252 	}
253 }
254 
getSampleVersionStamp(Transaction * tr)255 ACTOR static Future<Standalone<StringRef> > getSampleVersionStamp(Transaction *tr) {
256 	loop{
257 		try {
258 			tr->reset();
259 			tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
260 			wait(success(tr->get(LiteralStringRef("\xff/StatusJsonTestKey62793"))));
261 			state Future<Standalone<StringRef> > vstamp = tr->getVersionstamp();
262 			tr->makeSelfConflicting();
263 			wait(tr->commit());
264 			Standalone<StringRef> val = wait(vstamp);
265 			return val;
266 		}
267 		catch (Error& e) {
268 			wait(tr->onError(e));
269 		}
270 	}
271 }
272 
273 struct TrInfoChunk {
274 	ValueRef value;
275 	Key key;
276 };
277 
transactionInfoCommitActor(Transaction * tr,std::vector<TrInfoChunk> * chunks)278 ACTOR static Future<Void> transactionInfoCommitActor(Transaction *tr, std::vector<TrInfoChunk> *chunks) {
279 	state const Key clientLatencyAtomicCtr = CLIENT_LATENCY_INFO_CTR_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin);
280 	state int retryCount = 0;
281 	loop{
282 		try {
283 			tr->reset();
284 			tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
285 			tr->setOption(FDBTransactionOptions::LOCK_AWARE);
286 			state Future<Standalone<StringRef> > vstamp = tr->getVersionstamp();
287 			int64_t numCommitBytes = 0;
288 			for (auto &chunk : *chunks) {
289 				tr->atomicOp(chunk.key, chunk.value, MutationRef::SetVersionstampedKey);
290 				numCommitBytes += chunk.key.size() + chunk.value.size() - 4; // subtract number of bytes of key that denotes verstion stamp index
291 			}
292 			tr->atomicOp(clientLatencyAtomicCtr, StringRef((uint8_t*)&numCommitBytes, 8), MutationRef::AddValue);
293 			wait(tr->commit());
294 			return Void();
295 		}
296 		catch (Error& e) {
297 			retryCount++;
298 			if (retryCount == 10)
299 				throw;
300 			wait(tr->onError(e));
301 		}
302 	}
303 }
304 
305 
delExcessClntTxnEntriesActor(Transaction * tr,int64_t clientTxInfoSizeLimit)306 ACTOR static Future<Void> delExcessClntTxnEntriesActor(Transaction *tr, int64_t clientTxInfoSizeLimit) {
307 	state const Key clientLatencyName = CLIENT_LATENCY_INFO_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin);
308 	state const Key clientLatencyAtomicCtr = CLIENT_LATENCY_INFO_CTR_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin);
309 	TraceEvent(SevInfo, "DelExcessClntTxnEntriesCalled");
310 	loop{
311 		try {
312 			tr->reset();
313 			tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
314 			tr->setOption(FDBTransactionOptions::LOCK_AWARE);
315 			Optional<Value> ctrValue = wait(tr->get(KeyRef(clientLatencyAtomicCtr), true));
316 			if (!ctrValue.present()) {
317 				TraceEvent(SevInfo, "NumClntTxnEntriesNotFound");
318 				return Void();
319 			}
320 			state int64_t txInfoSize = 0;
321 			ASSERT(ctrValue.get().size() == sizeof(int64_t));
322 			memcpy(&txInfoSize, ctrValue.get().begin(), ctrValue.get().size());
323 			if (txInfoSize < clientTxInfoSizeLimit)
324 				return Void();
325 			int getRangeByteLimit = (txInfoSize - clientTxInfoSizeLimit) < CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT ? (txInfoSize - clientTxInfoSizeLimit) : CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
326 			GetRangeLimits limit(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, getRangeByteLimit);
327 			Standalone<RangeResultRef> txEntries = wait(tr->getRange(KeyRangeRef(clientLatencyName, strinc(clientLatencyName)), limit));
328 			state int64_t numBytesToDel = 0;
329 			KeyRef endKey;
330 			for (auto &kv : txEntries) {
331 				endKey = kv.key;
332 				numBytesToDel += kv.key.size() + kv.value.size();
333 				if (txInfoSize - numBytesToDel <= clientTxInfoSizeLimit)
334 					break;
335 			}
336 			if (numBytesToDel) {
337 				tr->clear(KeyRangeRef(txEntries[0].key, strinc(endKey)));
338 				TraceEvent(SevInfo, "DeletingExcessCntTxnEntries").detail("BytesToBeDeleted", numBytesToDel);
339 				int64_t bytesDel = -numBytesToDel;
340 				tr->atomicOp(clientLatencyAtomicCtr, StringRef((uint8_t*)&bytesDel, 8), MutationRef::AddValue);
341 				wait(tr->commit());
342 			}
343 			if (txInfoSize - numBytesToDel <= clientTxInfoSizeLimit)
344 				return Void();
345 		}
346 		catch (Error& e) {
347 			wait(tr->onError(e));
348 		}
349 	}
350 }
351 
352 // The reason for getting a pointer to DatabaseContext instead of a reference counted object is because reference counting will increment reference count for
353 // DatabaseContext which holds the future of this actor. This creates a cyclic reference and hence this actor and Database object will not be destroyed at all.
clientStatusUpdateActor(DatabaseContext * cx)354 ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext *cx) {
355 	state const std::string clientLatencyName = CLIENT_LATENCY_INFO_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin).toString();
356 	state Transaction tr;
357 	state std::vector<TrInfoChunk> commitQ;
358 	state int txBytes = 0;
359 
360 	loop {
361 		try {
362 			ASSERT(cx->clientStatusUpdater.outStatusQ.empty());
363 			cx->clientStatusUpdater.inStatusQ.swap(cx->clientStatusUpdater.outStatusQ);
364 			// Split Transaction Info into chunks
365 			state std::vector<TrInfoChunk> trChunksQ;
366 			for (auto &entry : cx->clientStatusUpdater.outStatusQ) {
367 				auto &bw = entry.second;
368 				int64_t value_size_limit = BUGGIFY ? g_random->randomInt(1e3, CLIENT_KNOBS->VALUE_SIZE_LIMIT) : CLIENT_KNOBS->VALUE_SIZE_LIMIT;
369 				int num_chunks = (bw.getLength() + value_size_limit - 1) / value_size_limit;
370 				std::string random_id = g_random->randomAlphaNumeric(16);
371 				std::string user_provided_id = entry.first.size() ? entry.first + "/" : "";
372 				for (int i = 0; i < num_chunks; i++) {
373 					TrInfoChunk chunk;
374 					BinaryWriter chunkBW(Unversioned());
375 					chunkBW << bigEndian32(i+1) << bigEndian32(num_chunks);
376 					chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toValue().toString() + "/" + user_provided_id + std::string(4, '\x00'));
377 					int32_t pos = littleEndian32(clientLatencyName.size());
378 					memcpy(mutateString(chunk.key) + chunk.key.size() - sizeof(int32_t), &pos, sizeof(int32_t));
379 					if (i == num_chunks - 1) {
380 						chunk.value = ValueRef(static_cast<uint8_t *>(bw.getData()) + (i * value_size_limit), bw.getLength() - (i * value_size_limit));
381 					}
382 					else {
383 						chunk.value = ValueRef(static_cast<uint8_t *>(bw.getData()) + (i * value_size_limit), value_size_limit);
384 					}
385 					trChunksQ.push_back(std::move(chunk));
386 				}
387 			}
388 
389 			// Commit the chunks splitting into different transactions if needed
390 			state int64_t dataSizeLimit = BUGGIFY ? g_random->randomInt(200e3, 1.5 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT) : 0.8 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
391 			state std::vector<TrInfoChunk>::iterator tracking_iter = trChunksQ.begin();
392 			tr = Transaction(Database(Reference<DatabaseContext>::addRef(cx)));
393 			ASSERT(commitQ.empty() && (txBytes == 0));
394 			loop {
395 				state std::vector<TrInfoChunk>::iterator iter = tracking_iter;
396 				txBytes = 0;
397 				commitQ.clear();
398 				try {
399 					while (iter != trChunksQ.end()) {
400 						if (iter->value.size() + iter->key.size() + txBytes > dataSizeLimit) {
401 							wait(transactionInfoCommitActor(&tr, &commitQ));
402 							tracking_iter = iter;
403 							commitQ.clear();
404 							txBytes = 0;
405 						}
406 						commitQ.push_back(*iter);
407 						txBytes += iter->value.size() + iter->key.size();
408 						++iter;
409 					}
410 					if (!commitQ.empty()) {
411 						wait(transactionInfoCommitActor(&tr, &commitQ));
412 						commitQ.clear();
413 						txBytes = 0;
414 					}
415 					break;
416 				}
417 				catch (Error &e) {
418 					if (e.code() == error_code_transaction_too_large) {
419 						dataSizeLimit /= 2;
420 						ASSERT(dataSizeLimit >= CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->KEY_SIZE_LIMIT);
421 					}
422 					else {
423 						TraceEvent(SevWarnAlways, "ClientTrInfoErrorCommit")
424 							.error(e)
425 							.detail("TxBytes", txBytes);
426 						commitQ.clear();
427 						txBytes = 0;
428 						throw;
429 					}
430 				}
431 			}
432 			cx->clientStatusUpdater.outStatusQ.clear();
433 			double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : cx->clientInfo->get().clientTxnInfoSampleRate;
434 			int64_t clientTxnInfoSizeLimit = cx->clientInfo->get().clientTxnInfoSizeLimit == -1 ? CLIENT_KNOBS->CSI_SIZE_LIMIT : cx->clientInfo->get().clientTxnInfoSizeLimit;
435 			if (!trChunksQ.empty() && g_random->random01() < clientSamplingProbability)
436 				wait(delExcessClntTxnEntriesActor(&tr, clientTxnInfoSizeLimit));
437 
438 			// tr is destructed because it hold a reference to DatabaseContext which creates a cycle mentioned above.
439 			// Hence destroy the transacation before sleeping to give a chance for the actor to be cleanedup if the Database is destroyed by the user.
440 			tr = Transaction();
441 			wait(delay(CLIENT_KNOBS->CSI_STATUS_DELAY));
442 		}
443 		catch (Error& e) {
444 			if (e.code() == error_code_actor_cancelled) {
445 				throw;
446 			}
447 			cx->clientStatusUpdater.outStatusQ.clear();
448 			TraceEvent(SevWarnAlways, "UnableToWriteClientStatus").error(e);
449 			// tr is destructed because it hold a reference to DatabaseContext which creates a cycle mentioned above.
450 			// Hence destroy the transacation before sleeping to give a chance for the actor to be cleanedup if the Database is destroyed by the user.
451 			tr = Transaction();
452 			wait(delay(10.0));
453 		}
454 	}
455 }
456 
monitorMasterProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo,AsyncTrigger * triggerVar)457 ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo, AsyncTrigger *triggerVar) {
458 	state vector< MasterProxyInterface > curProxies;
459 	curProxies = clientDBInfo->get().proxies;
460 
461 	loop{
462 		wait(clientDBInfo->onChange());
463 		if (clientDBInfo->get().proxies != curProxies) {
464 			curProxies = clientDBInfo->get().proxies;
465 			triggerVar->trigger();
466 		}
467 	}
468 }
469 
getHealthMetricsActor(DatabaseContext * cx,bool detailed)470 ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bool detailed) {
471 	if (now() - cx->healthMetricsLastUpdated < CLIENT_KNOBS->AGGREGATE_HEALTH_METRICS_MAX_STALENESS) {
472 		if (detailed) {
473 			return cx->healthMetrics;
474 		}
475 		else {
476 			HealthMetrics result;
477 			result.update(cx->healthMetrics, false, false);
478 			return result;
479 		}
480 	}
481 	state bool sendDetailedRequest = detailed && now() - cx->detailedHealthMetricsLastUpdated >
482 		CLIENT_KNOBS->DETAILED_HEALTH_METRICS_MAX_STALENESS;
483 	loop {
484 		choose {
485 			when(wait(cx->onMasterProxiesChanged())) {}
486 			when(GetHealthMetricsReply rep =
487 				 wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getHealthMetrics,
488 							 GetHealthMetricsRequest(sendDetailedRequest)))) {
489 				cx->healthMetrics.update(rep.healthMetrics, detailed, true);
490 				if (detailed) {
491 					cx->healthMetricsLastUpdated = now();
492 					cx->detailedHealthMetricsLastUpdated = now();
493 					return cx->healthMetrics;
494 				}
495 				else {
496 					cx->healthMetricsLastUpdated = now();
497 					HealthMetrics result;
498 					result.update(cx->healthMetrics, false, false);
499 					return result;
500 				}
501 			}
502 		}
503 	}
504 }
505 
getHealthMetrics(bool detailed=false)506 Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
507 	return getHealthMetricsActor(this, detailed);
508 }
509 
DatabaseContext(Reference<Cluster> cluster,Reference<AsyncVar<ClientDBInfo>> clientInfo,Future<Void> clientInfoMonitor,Standalone<StringRef> dbId,int taskID,LocalityData const & clientLocality,bool enableLocalityLoadBalance,bool lockAware,int apiVersion)510 DatabaseContext::DatabaseContext(
511 	Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId,
512 	int taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, int apiVersion )
513 	: cluster(cluster), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), dbId(dbId), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
514 	lockAware(lockAware), apiVersion(apiVersion), provisional(false),
515 	transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0),
516 	transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0),
517 	transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), transactionTimeout(0.0), transactionMaxRetries(-1),
518 	latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
519 	healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0)
520 {
521 	metadataVersionCache.resize(CLIENT_KNOBS->METADATA_VERSION_CACHE_SIZE);
522 	maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES;
523 
524 	transactionMaxBackoff = CLIENT_KNOBS->FAILURE_MAX_DELAY;
525 	snapshotRywEnabled = apiVersionAtLeast(300) ? 1 : 0;
526 
527 	logger = databaseLogger( this );
528 	locationCacheSize = g_network->isSimulated() ?
529 			CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE_SIM :
530 			CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE;
531 
532 	getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
533 	getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
534 
535 	monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
536 	clientStatusUpdater.actor = clientStatusUpdateActor(this);
537 }
538 
DatabaseContext(const Error & err)539 DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) {}
540 
monitorClientInfo(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface,Reference<ClusterConnectionFile> ccf,Reference<AsyncVar<ClientDBInfo>> outInfo,Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed)541 ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo, Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed ) {
542 	try {
543 		state Optional<double> incorrectTime;
544 		loop {
545 			OpenDatabaseRequest req;
546 			req.knownClientInfoID = outInfo->get().id;
547 			req.supportedVersions = VectorRef<ClientVersionRef>(req.arena, networkOptions.supportedVersions);
548 			req.connectedCoordinatorsNum = connectedCoordinatorsNumDelayed->get();
549 			req.traceLogGroup = StringRef(req.arena, networkOptions.traceLogGroup);
550 
551 			ClusterConnectionString fileConnectionString;
552 			if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
553 				req.issues.push_back_deep(req.arena, LiteralStringRef("incorrect_cluster_file_contents"));
554 				std::string connectionString = ccf->getConnectionString().toString();
555 				if(!incorrectTime.present()) {
556 					incorrectTime = now();
557 				}
558 				if(ccf->canGetFilename()) {
559 					// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the file right before us)
560 					TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
561 						.detail("Filename", ccf->getFilename())
562 						.detail("ConnectionStringFromFile", fileConnectionString.toString())
563 						.detail("CurrentConnectionString", connectionString);
564 				}
565 			}
566 			else {
567 				incorrectTime = Optional<double>();
568 			}
569 
570 			choose {
571 				when( ClientDBInfo ni = wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().openDatabase.getReply( req ) ) : Never() ) ) {
572 					TraceEvent("ClientInfoChange").detail("ChangeID", ni.id);
573 					outInfo->set(ni);
574 				}
575 				when( wait( clusterInterface->onChange() ) ) {
576 					if(clusterInterface->get().present())
577 						TraceEvent("ClientInfo_CCInterfaceChange").detail("CCID", clusterInterface->get().get().id());
578 				}
579 				when( wait( connectedCoordinatorsNumDelayed->onChange() ) ) {}
580 			}
581 		}
582 	} catch( Error& e ) {
583 		TraceEvent(SevError, "MonitorClientInfoError")
584 			.error(e)
585 			.detail("ConnectionFile", ccf && ccf->canGetFilename() ? ccf->getFilename() : "")
586 			.detail("ConnectionString", ccf ? ccf->getConnectionString().toString() : "");
587 
588 		throw;
589 	}
590 }
591 
592 // Create database context and monitor the cluster status;
593 // Notify client when cluster info (e.g., cluster controller) changes
create(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface,Reference<ClusterConnectionFile> connFile,LocalityData const & clientLocality)594 Database DatabaseContext::create(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality) {
595 	Reference<AsyncVar<int>> connectedCoordinatorsNum(new AsyncVar<int>(0));
596 	Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed(new AsyncVar<int>(0));
597 	Reference<Cluster> cluster(new Cluster(connFile, clusterInterface, connectedCoordinatorsNum));
598 	Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
599 	Future<Void> clientInfoMonitor = delayedAsyncVar(connectedCoordinatorsNum, connectedCoordinatorsNumDelayed, CLIENT_KNOBS->CHECK_CONNECTED_COORDINATOR_NUM_DELAY) || monitorClientInfo(clusterInterface, connFile, clientInfo, connectedCoordinatorsNumDelayed);
600 
601 	return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false));
602 }
603 
create(Reference<AsyncVar<ClientDBInfo>> clientInfo,Future<Void> clientInfoMonitor,LocalityData clientLocality,bool enableLocalityLoadBalance,int taskID,bool lockAware,int apiVersion)604 Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID, bool lockAware, int apiVersion) {
605 	return Database( new DatabaseContext( Reference<Cluster>(nullptr), clientInfo, clientInfoMonitor, LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion ) );
606 }
607 
~DatabaseContext()608 DatabaseContext::~DatabaseContext() {
609 	monitorMasterProxiesInfoChange.cancel();
610 	for(auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
611 		it->second->notifyContextDestroyed();
612 	ASSERT_ABORT( server_interf.empty() );
613 	locationCache.insert( allKeys, Reference<LocationInfo>() );
614 }
615 
getCachedLocation(const KeyRef & key,bool isBackward)616 pair<KeyRange,Reference<LocationInfo>> DatabaseContext::getCachedLocation( const KeyRef& key, bool isBackward ) {
617 	if( isBackward ) {
618 		auto range = locationCache.rangeContainingKeyBefore(key);
619 		return std::make_pair(range->range(), range->value());
620 	}
621 	else {
622 		auto range = locationCache.rangeContaining(key);
623 		return std::make_pair(range->range(), range->value());
624 	}
625 }
626 
getCachedLocations(const KeyRangeRef & range,vector<std::pair<KeyRange,Reference<LocationInfo>>> & result,int limit,bool reverse)627 bool DatabaseContext::getCachedLocations( const KeyRangeRef& range, vector<std::pair<KeyRange,Reference<LocationInfo>>>& result, int limit, bool reverse ) {
628 	result.clear();
629 	auto locRanges = locationCache.intersectingRanges(range);
630 
631 	auto begin = locationCache.rangeContaining(range.begin);
632 	auto end = locationCache.rangeContainingKeyBefore(range.end);
633 
634 	loop {
635 		auto r = reverse ? end : begin;
636 		if (!r->value()){
637 			TEST(result.size()); // had some but not all cached locations
638 			result.clear();
639 			return false;
640 		}
641 		result.push_back( make_pair(r->range() & range, r->value()) );
642 		if(result.size() == limit)
643 			break;
644 
645 		if(begin == end)
646 			break;
647 
648 		if(reverse)
649 			--end;
650 		else
651 			++begin;
652 	}
653 
654 	return true;
655 }
656 
setCachedLocation(const KeyRangeRef & keys,const vector<StorageServerInterface> & servers)657 Reference<LocationInfo> DatabaseContext::setCachedLocation( const KeyRangeRef& keys, const vector<StorageServerInterface>& servers ) {
658 	vector<Reference<ReferencedInterface<StorageServerInterface>>> serverRefs;
659 	serverRefs.reserve(servers.size());
660 	for(auto& interf : servers) {
661 		serverRefs.push_back( StorageServerInfo::getInterface( this, interf, clientLocality ) );
662 	}
663 
664 	int maxEvictionAttempts = 100, attempts = 0;
665 	Reference<LocationInfo> loc = Reference<LocationInfo>( new LocationInfo(serverRefs) );
666 	while( locationCache.size() > locationCacheSize && attempts < maxEvictionAttempts) {
667 		TEST( true ); // NativeAPI storage server locationCache entry evicted
668 		attempts++;
669 		auto r = locationCache.randomRange();
670 		Key begin = r.begin(), end = r.end();  // insert invalidates r, so can't be passed a mere reference into it
671 		locationCache.insert( KeyRangeRef(begin, end), Reference<LocationInfo>() );
672 	}
673 	locationCache.insert( keys, loc );
674 	return std::move(loc);
675 }
676 
invalidateCache(const KeyRef & key,bool isBackward)677 void DatabaseContext::invalidateCache( const KeyRef& key, bool isBackward ) {
678 	if( isBackward )
679 		locationCache.rangeContainingKeyBefore(key)->value() = Reference<LocationInfo>();
680 	else
681 		locationCache.rangeContaining(key)->value() = Reference<LocationInfo>();
682 }
683 
invalidateCache(const KeyRangeRef & keys)684 void DatabaseContext::invalidateCache( const KeyRangeRef& keys ) {
685 	auto rs = locationCache.intersectingRanges(keys);
686 	Key begin = rs.begin().begin(), end = rs.end().begin();  // insert invalidates rs, so can't be passed a mere reference into it
687 	locationCache.insert( KeyRangeRef(begin, end), Reference<LocationInfo>() );
688 }
689 
onMasterProxiesChanged()690 Future<Void> DatabaseContext::onMasterProxiesChanged() {
691 	return this->masterProxiesChangeTrigger.onTrigger();
692 }
693 
extractIntOption(Optional<StringRef> value,int64_t minValue,int64_t maxValue)694 int64_t extractIntOption( Optional<StringRef> value, int64_t minValue, int64_t maxValue ) {
695 	validateOptionValue(value, true);
696 	if( value.get().size() != 8 ) {
697 		throw invalid_option_value();
698 	}
699 
700 	int64_t passed = *((int64_t*)(value.get().begin()));
701 	if( passed > maxValue || passed < minValue ) {
702 		throw invalid_option_value();
703 	}
704 
705 	return passed;
706 }
707 
extractHexOption(StringRef value)708 uint64_t extractHexOption( StringRef value ) {
709 	char* end;
710 	uint64_t id = strtoull( value.toString().c_str(), &end, 16 );
711 	if (*end)
712 		throw invalid_option_value();
713 	return id;
714 }
715 
setOption(FDBDatabaseOptions::Option option,Optional<StringRef> value)716 void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value) {
717 	switch(option) {
718 		case FDBDatabaseOptions::LOCATION_CACHE_SIZE:
719 			locationCacheSize = (int)extractIntOption(value, 0, std::numeric_limits<int>::max());
720 			break;
721 		case FDBDatabaseOptions::MACHINE_ID:
722 			clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
723 			if( clientInfo->get().proxies.size() )
724 				masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ) );
725 			server_interf.clear();
726 			locationCache.insert( allKeys, Reference<LocationInfo>() );
727 			break;
728 		case FDBDatabaseOptions::MAX_WATCHES:
729 			maxOutstandingWatches = (int)extractIntOption(value, 0, CLIENT_KNOBS->ABSOLUTE_MAX_WATCHES);
730 			break;
731 		case FDBDatabaseOptions::DATACENTER_ID:
732 			clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
733 			if( clientInfo->get().proxies.size() )
734 				masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
735 			server_interf.clear();
736 			locationCache.insert( allKeys, Reference<LocationInfo>() );
737 			break;
738 		case FDBDatabaseOptions::TRANSACTION_TIMEOUT:
739 			if( !apiVersionAtLeast(610) ) {
740 				throw invalid_option();
741 			}
742 			transactionTimeout = extractIntOption(value, 0, std::numeric_limits<int>::max())/1000.0;
743 			break;
744 		case FDBDatabaseOptions::TRANSACTION_RETRY_LIMIT:
745 			transactionMaxRetries = (int)extractIntOption(value, -1, std::numeric_limits<int>::max());
746 			break;
747 		case FDBDatabaseOptions::TRANSACTION_MAX_RETRY_DELAY:
748 			validateOptionValue(value, true);
749 			transactionMaxBackoff = extractIntOption(value, 0, std::numeric_limits<int32_t>::max()) / 1000.0;
750 			break;
751 		case FDBDatabaseOptions::SNAPSHOT_RYW_ENABLE:
752 			validateOptionValue(value, false);
753 			snapshotRywEnabled++;
754 			break;
755 		case FDBDatabaseOptions::SNAPSHOT_RYW_DISABLE:
756 			validateOptionValue(value, false);
757 			snapshotRywEnabled--;
758 			break;
759 	}
760 }
761 
addWatch()762 void DatabaseContext::addWatch() {
763 	if(outstandingWatches >= maxOutstandingWatches)
764 		throw too_many_watches();
765 
766 	++outstandingWatches;
767 }
768 
removeWatch()769 void DatabaseContext::removeWatch() {
770 	--outstandingWatches;
771 	ASSERT(outstandingWatches >= 0);
772 }
773 
onConnected()774 Future<Void> DatabaseContext::onConnected() {
775 	ASSERT(cluster);
776 	return cluster->onConnected();
777 }
778 
getConnectionFile()779 Reference<ClusterConnectionFile> DatabaseContext::getConnectionFile() {
780 	ASSERT(cluster);
781 	return cluster->getConnectionFile();
782 }
783 
createDatabase(Reference<ClusterConnectionFile> connFile,int apiVersion,LocalityData const & clientLocality,DatabaseContext * preallocatedDb)784 Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality, DatabaseContext *preallocatedDb ) {
785 	Reference<AsyncVar<int>> connectedCoordinatorsNum(new AsyncVar<int>(0)); // Number of connected coordinators for the client
786 	Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed(new AsyncVar<int>(0));
787 	Reference<Cluster> cluster(new Cluster(connFile, connectedCoordinatorsNum, apiVersion));
788 	Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
789 	Future<Void> clientInfoMonitor = delayedAsyncVar(connectedCoordinatorsNum, connectedCoordinatorsNumDelayed, CLIENT_KNOBS->CHECK_CONNECTED_COORDINATOR_NUM_DELAY) || monitorClientInfo(cluster->getClusterInterface(), connFile, clientInfo, connectedCoordinatorsNumDelayed);
790 
791 	DatabaseContext *db;
792 	if(preallocatedDb) {
793 		db = new (preallocatedDb) DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false, apiVersion);
794 	}
795 	else {
796 		db = new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false, apiVersion);
797 	}
798 
799 	return Database(db);
800 }
801 
createDatabase(std::string connFileName,int apiVersion,LocalityData const & clientLocality)802 Database Database::createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality ) {
803 	Reference<ClusterConnectionFile> rccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first));
804 	return Database::createDatabase(rccf, apiVersion, clientLocality);
805 }
806 
807 extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
808 
Cluster(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<int>> connectedCoordinatorsNum,int apiVersion)809 Cluster::Cluster( Reference<ClusterConnectionFile> connFile,  Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion )
810 	: clusterInterface(new AsyncVar<Optional<ClusterInterface>>())
811 {
812 	init(connFile, true, connectedCoordinatorsNum, apiVersion);
813 }
814 
Cluster(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface,Reference<AsyncVar<int>> connectedCoordinatorsNum)815 Cluster::Cluster( Reference<ClusterConnectionFile> connFile,  Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<AsyncVar<int>> connectedCoordinatorsNum)
816 	: clusterInterface(clusterInterface)
817 {
818 	init(connFile, true, connectedCoordinatorsNum);
819 }
820 
init(Reference<ClusterConnectionFile> connFile,bool startClientInfoMonitor,Reference<AsyncVar<int>> connectedCoordinatorsNum,int apiVersion)821 void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion ) {
822 	connectionFile = connFile;
823 	connected = clusterInterface->onChange();
824 
825 	if(!g_network)
826 		throw network_not_setup();
827 
828 	if(connFile) {
829 		if(networkOptions.traceDirectory.present() && !traceFileIsOpen()) {
830 			g_network->initMetrics();
831 			FlowTransport::transport().initMetrics();
832 			initTraceEventMetrics();
833 
834 			auto publicIP = determinePublicIPAutomatically( connFile->getConnectionString() );
835 			selectTraceFormatter(networkOptions.traceFormat);
836 			openTraceFile(NetworkAddress(publicIP, ::getpid()), networkOptions.traceRollSize, networkOptions.traceMaxLogsSize, networkOptions.traceDirectory.get(), "trace", networkOptions.traceLogGroup);
837 
838 			TraceEvent("ClientStart")
839 				.detail("SourceVersion", getHGVersion())
840 				.detail("Version", FDB_VT_VERSION)
841 				.detail("PackageName", FDB_VT_PACKAGE_NAME)
842 				.detail("ClusterFile", connFile->getFilename().c_str())
843 				.detail("ConnectionString", connFile->getConnectionString().toString())
844 				.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
845 				.detail("ApiVersion", apiVersion)
846 				.detailf("ImageOffset", "%p", platform::getImageOffset())
847 				.trackLatest("ClientStart");
848 
849 			initializeSystemMonitorMachineState(SystemMonitorMachineState(IPAddress(publicIP)));
850 
851 			systemMonitor();
852 			uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) );
853 		}
854 
855 		leaderMon = monitorLeader( connFile, clusterInterface, connectedCoordinatorsNum );
856 		failMon = failureMonitorClient( clusterInterface, false );
857 	}
858 }
859 
~Cluster()860 Cluster::~Cluster() {}
861 
getClusterInterface()862 Reference<AsyncVar<Optional<struct ClusterInterface>>> Cluster::getClusterInterface() {
863 	return clusterInterface;
864 }
865 
onConnected()866 Future<Void> Cluster::onConnected() {
867 	return connected;
868 }
869 
setNetworkOption(FDBNetworkOptions::Option option,Optional<StringRef> value)870 void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
871 	switch(option) {
872 		// SOMEDAY: If the network is already started, should these three throw an error?
873 		case FDBNetworkOptions::TRACE_ENABLE:
874 			networkOptions.traceDirectory = value.present() ? value.get().toString() : "";
875 			break;
876 		case FDBNetworkOptions::TRACE_ROLL_SIZE:
877 			validateOptionValue(value, true);
878 			networkOptions.traceRollSize = extractIntOption(value, 0, std::numeric_limits<int64_t>::max());
879 			break;
880 		case FDBNetworkOptions::TRACE_MAX_LOGS_SIZE:
881 			validateOptionValue(value, true);
882 			networkOptions.traceMaxLogsSize = extractIntOption(value, 0, std::numeric_limits<int64_t>::max());
883 			break;
884 		case FDBNetworkOptions::TRACE_LOG_GROUP:
885 			if(value.present())
886 				networkOptions.traceLogGroup = value.get().toString();
887 			break;
888 		case FDBNetworkOptions::TRACE_FORMAT:
889 			validateOptionValue(value, true);
890 			networkOptions.traceFormat = value.get().toString();
891 			if (!validateTraceFormat(networkOptions.traceFormat)) {
892 				fprintf(stderr, "Unrecognized trace format: `%s'\n", networkOptions.traceFormat.c_str());
893 				throw invalid_option_value();
894 			}
895 			break;
896 		case FDBNetworkOptions::KNOB: {
897 			validateOptionValue(value, true);
898 
899 			std::string optionValue = value.get().toString();
900 			TraceEvent("SetKnob").detail("KnobString", optionValue);
901 
902 			size_t eq = optionValue.find_first_of('=');
903 			if(eq == optionValue.npos) {
904 				TraceEvent(SevWarnAlways, "InvalidKnobString").detail("KnobString", optionValue);
905 				throw invalid_option_value();
906 			}
907 
908 			std::string knobName = optionValue.substr(0, eq);
909 			std::string knobValue = optionValue.substr(eq+1);
910 			if (!const_cast<FlowKnobs*>(FLOW_KNOBS)->setKnob( knobName, knobValue ) &&
911 				!const_cast<ClientKnobs*>(CLIENT_KNOBS)->setKnob( knobName, knobValue ))
912 			{
913 				TraceEvent(SevWarnAlways, "UnrecognizedKnob").detail("Knob", knobName.c_str());
914 				fprintf(stderr, "FoundationDB client ignoring unrecognized knob option '%s'\n", knobName.c_str());
915 			}
916 			break;
917 		}
918 		case FDBNetworkOptions::TLS_PLUGIN:
919 			validateOptionValue(value, true);
920 			break;
921 		case FDBNetworkOptions::TLS_CERT_PATH:
922 			validateOptionValue(value, true);
923 			initTLSOptions();
924 			tlsOptions->set_cert_file( value.get().toString() );
925 			break;
926 		case FDBNetworkOptions::TLS_CERT_BYTES:
927 			initTLSOptions();
928 			tlsOptions->set_cert_data( value.get().toString() );
929 			break;
930 		case FDBNetworkOptions::TLS_CA_PATH:
931 			validateOptionValue(value, true);
932 			initTLSOptions();
933 			tlsOptions->set_ca_file( value.get().toString() );
934 			break;
935 		case FDBNetworkOptions::TLS_CA_BYTES:
936 			validateOptionValue(value, true);
937 			initTLSOptions();
938 			tlsOptions->set_ca_data(value.get().toString());
939 			break;
940 		case FDBNetworkOptions::TLS_PASSWORD:
941 			validateOptionValue(value, true);
942 			initTLSOptions();
943 			tlsOptions->set_key_password(value.get().toString());
944 			break;
945 		case FDBNetworkOptions::TLS_KEY_PATH:
946 			validateOptionValue(value, true);
947 			initTLSOptions();
948 			tlsOptions->set_key_file( value.get().toString() );
949 			break;
950 		case FDBNetworkOptions::TLS_KEY_BYTES:
951 			validateOptionValue(value, true);
952 			initTLSOptions();
953 			tlsOptions->set_key_data( value.get().toString() );
954 			break;
955 		case FDBNetworkOptions::TLS_VERIFY_PEERS:
956 			validateOptionValue(value, true);
957 			initTLSOptions();
958 			try {
959 				tlsOptions->set_verify_peers({ value.get().toString() });
960 			} catch( Error& e ) {
961 				TraceEvent(SevWarnAlways, "TLSValidationSetError")
962 					.error( e )
963 					.detail("Input", value.get().toString() );
964 				throw invalid_option_value();
965 			}
966 			break;
967 		case FDBNetworkOptions::DISABLE_CLIENT_STATISTICS_LOGGING:
968 			validateOptionValue(value, false);
969 			networkOptions.logClientInfo = false;
970 			break;
971 		case FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS:
972 		{
973 			// The multi-version API should be providing us these guarantees
974 			ASSERT(g_network);
975 			ASSERT(value.present());
976 
977 			networkOptions.supportedVersions.resize(networkOptions.supportedVersions.arena(), 0);
978 			std::string versionString = value.get().toString();
979 
980 			size_t index = 0;
981 			size_t nextIndex = 0;
982 			while(nextIndex != versionString.npos) {
983 				nextIndex = versionString.find(';', index);
984 				networkOptions.supportedVersions.push_back_deep(networkOptions.supportedVersions.arena(), ClientVersionRef(versionString.substr(index, nextIndex-index)));
985 				index = nextIndex + 1;
986 			}
987 
988 			ASSERT(networkOptions.supportedVersions.size() > 0);
989 
990 			break;
991 		}
992 		case FDBNetworkOptions::ENABLE_SLOW_TASK_PROFILING:
993 			validateOptionValue(value, false);
994 			networkOptions.slowTaskProfilingEnabled = true;
995 			break;
996 		default:
997 			break;
998 	}
999 }
1000 
setupNetwork(uint64_t transportId,bool useMetrics)1001 void setupNetwork(uint64_t transportId, bool useMetrics) {
1002 	if( g_network )
1003 		throw network_already_setup();
1004 
1005 	g_random = new DeterministicRandom( platform::getRandomSeed() );
1006 	trace_random = new DeterministicRandom( platform::getRandomSeed() );
1007 	g_nondeterministic_random = trace_random;
1008 	g_debug_random = trace_random;
1009 	if (!networkOptions.logClientInfo.present())
1010 		networkOptions.logClientInfo = true;
1011 
1012 	g_network = newNet2(false, useMetrics || networkOptions.traceDirectory.present());
1013 	FlowTransport::createInstance(transportId);
1014 	Net2FileSystem::newFileSystem();
1015 
1016 	initTLSOptions();
1017 
1018 #ifndef TLS_DISABLED
1019 	tlsOptions->register_network();
1020 #endif
1021 }
1022 
runNetwork()1023 void runNetwork() {
1024 	if(!g_network)
1025 		throw network_not_setup();
1026 
1027 	if(networkOptions.traceDirectory.present() && networkOptions.slowTaskProfilingEnabled) {
1028 		setupSlowTaskProfiler();
1029 	}
1030 
1031 	g_network->run();
1032 
1033 	if(networkOptions.traceDirectory.present())
1034 		systemMonitor();
1035 }
1036 
stopNetwork()1037 void stopNetwork() {
1038 	if(!g_network)
1039 		throw network_not_setup();
1040 
1041 	g_network->stop();
1042 	closeTraceFile();
1043 }
1044 
getMasterProxies(bool useProvisionalProxies)1045 Reference<ProxyInfo> DatabaseContext::getMasterProxies(bool useProvisionalProxies) {
1046 	if (masterProxiesLastChange != clientInfo->get().id) {
1047 		masterProxiesLastChange = clientInfo->get().id;
1048 		masterProxies.clear();
1049 		if( clientInfo->get().proxies.size() ) {
1050 			masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
1051 			provisional = clientInfo->get().proxies[0].provisional;
1052 		}
1053 	}
1054 	if(provisional && !useProvisionalProxies) {
1055 		return Reference<ProxyInfo>();
1056 	}
1057 	return masterProxies;
1058 }
1059 
1060 //Actor which will wait until the MultiInterface<MasterProxyInterface> returned by the DatabaseContext cx is not NULL
getMasterProxiesFuture(DatabaseContext * cx,bool useProvisionalProxies)1061 ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx, bool useProvisionalProxies) {
1062 	loop{
1063 		Reference<ProxyInfo> proxies = cx->getMasterProxies(useProvisionalProxies);
1064 		if (proxies)
1065 			return proxies;
1066 		wait( cx->onMasterProxiesChanged() );
1067 	}
1068 }
1069 
1070 //Returns a future which will not be set until the ProxyInfo of this DatabaseContext is not NULL
getMasterProxiesFuture(bool useProvisionalProxies)1071 Future<Reference<ProxyInfo>> DatabaseContext::getMasterProxiesFuture(bool useProvisionalProxies) {
1072 	return ::getMasterProxiesFuture(this, useProvisionalProxies);
1073 }
1074 
decrement(VectorRef<KeyValueRef> const & data)1075 void GetRangeLimits::decrement( VectorRef<KeyValueRef> const& data ) {
1076 	if( rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED ) {
1077 		ASSERT(data.size() <= rows);
1078 		rows -= data.size();
1079 	}
1080 
1081 	minRows = std::max(0, minRows - data.size());
1082 
1083 	if( bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED )
1084 		bytes = std::max( 0, bytes - (int)data.expectedSize() - (8-(int)sizeof(KeyValueRef))*data.size() );
1085 }
1086 
decrement(KeyValueRef const & data)1087 void GetRangeLimits::decrement( KeyValueRef const& data ) {
1088 	minRows = std::max(0, minRows - 1);
1089 	if( rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED )
1090 		rows--;
1091 	if( bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED )
1092 		bytes = std::max( 0, bytes - (int)8 - (int)data.expectedSize() );
1093 }
1094 
1095 // True if either the row or byte limit has been reached
isReached()1096 bool GetRangeLimits::isReached() {
1097 	return rows == 0 || (bytes == 0 && minRows == 0);
1098 }
1099 
1100 // True if data would cause the row or byte limit to be reached
reachedBy(VectorRef<KeyValueRef> const & data)1101 bool GetRangeLimits::reachedBy( VectorRef<KeyValueRef> const& data ) {
1102 	return ( rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED && data.size() >= rows )
1103 		|| ( bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED && (int)data.expectedSize() + (8-(int)sizeof(KeyValueRef))*data.size() >= bytes && data.size() >= minRows );
1104 }
1105 
hasByteLimit()1106 bool GetRangeLimits::hasByteLimit() {
1107 	return bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED;
1108 }
1109 
hasRowLimit()1110 bool GetRangeLimits::hasRowLimit() {
1111 	return rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED;
1112 }
1113 
hasSatisfiedMinRows()1114 bool GetRangeLimits::hasSatisfiedMinRows() {
1115 	return hasByteLimit() && minRows == 0;
1116 }
1117 
parse(StringRef const & key)1118 AddressExclusion AddressExclusion::parse( StringRef const& key ) {
1119 	//Must not change: serialized to the database!
1120 	auto parsedIp = IPAddress::parse(key.toString());
1121 	if (parsedIp.present()) {
1122 		return AddressExclusion(parsedIp.get());
1123 	}
1124 
1125 	// Not a whole machine, includes `port'.
1126 	try {
1127 		auto addr = NetworkAddress::parse(key.toString());
1128 		if (addr.isTLS()) {
1129 			TraceEvent(SevWarnAlways, "AddressExclusionParseError")
1130 				.detail("String", key)
1131 				.detail("Description", "Address inclusion string should not include `:tls' suffix.");
1132 			return AddressExclusion();
1133 		}
1134 		return AddressExclusion(addr.ip, addr.port);
1135 	} catch (Error& ) {
1136 		TraceEvent(SevWarnAlways, "AddressExclusionParseError").detail("String", key);
1137 		return AddressExclusion();
1138 	}
1139 }
1140 
1141 Future<Standalone<RangeResultRef>> getRange(
1142 	Database const& cx,
1143 	Future<Version> const& fVersion,
1144 	KeySelector const& begin,
1145 	KeySelector const& end,
1146 	GetRangeLimits const& limits,
1147 	bool const& reverse,
1148 	TransactionInfo const& info);
1149 
1150 ACTOR Future<Optional<Value>> getValue(Future<Version> version, Key key, Database cx, TransactionInfo info,
1151                                        Reference<TransactionLogInfo> trLogInfo);
1152 
fetchServerInterface(Database cx,TransactionInfo info,UID id,Future<Version> ver=latestVersion)1153 ACTOR Future<Optional<StorageServerInterface>> fetchServerInterface( Database cx, TransactionInfo info, UID id, Future<Version> ver = latestVersion ) {
1154 	Optional<Value> val = wait( getValue(ver, serverListKeyFor(id), cx, info, Reference<TransactionLogInfo>()) );
1155 	if( !val.present() ) {
1156 		// A storage server has been removed from serverList since we read keyServers
1157 		return Optional<StorageServerInterface>();
1158 	}
1159 
1160 	return decodeServerListValue(val.get());
1161 }
1162 
transactionalGetServerInterfaces(Future<Version> ver,Database cx,TransactionInfo info,vector<UID> ids)1163 ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInterfaces( Future<Version> ver, Database cx, TransactionInfo info, vector<UID> ids ) {
1164 	state vector< Future< Optional<StorageServerInterface> > > serverListEntries;
1165 	for( int s = 0; s < ids.size(); s++ ) {
1166 		serverListEntries.push_back( fetchServerInterface( cx, info, ids[s], ver ) );
1167 	}
1168 
1169 	vector<Optional<StorageServerInterface>> serverListValues = wait( getAll(serverListEntries) );
1170 	vector<StorageServerInterface> serverInterfaces;
1171 	for( int s = 0; s < serverListValues.size(); s++ ) {
1172 		if( !serverListValues[s].present() ) {
1173 			// A storage server has been removed from ServerList since we read keyServers
1174 			return Optional<vector<StorageServerInterface>>();
1175 		}
1176 		serverInterfaces.push_back( serverListValues[s].get() );
1177 	}
1178 	return serverInterfaces;
1179 }
1180 
1181 //If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
getKeyLocation_internal(Database cx,Key key,TransactionInfo info,bool isBackward=false)1182 ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
1183 	if (isBackward) {
1184 		ASSERT( key != allKeys.begin && key <= allKeys.end );
1185 	} else {
1186 		ASSERT( key < allKeys.end );
1187 	}
1188 
1189 	if( info.debugID.present() )
1190 		g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
1191 
1192 	loop {
1193 		choose {
1194 			when ( wait( cx->onMasterProxiesChanged() ) ) {}
1195 			when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskDefaultPromiseEndpoint ) ) ) {
1196 				if( info.debugID.present() )
1197 					g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
1198 				ASSERT( rep.results.size() == 1 );
1199 
1200 				auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
1201 				return std::make_pair(KeyRange(rep.results[0].first, rep.arena), locationInfo);
1202 			}
1203 		}
1204 	}
1205 }
1206 
1207 template <class F>
getKeyLocation(Database const & cx,Key const & key,F StorageServerInterface::* member,TransactionInfo const & info,bool isBackward=false)1208 Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation( Database const& cx, Key const& key, F StorageServerInterface::*member, TransactionInfo const& info, bool isBackward = false ) {
1209 	auto ssi = cx->getCachedLocation( key, isBackward );
1210 	if (!ssi.second) {
1211 		return getKeyLocation_internal( cx, key, info, isBackward );
1212 	}
1213 
1214 	for(int i = 0; i < ssi.second->size(); i++) {
1215 		if( IFailureMonitor::failureMonitor().onlyEndpointFailed(ssi.second->get(i, member).getEndpoint()) ) {
1216 			cx->invalidateCache( key );
1217 			ssi.second.clear();
1218 			return getKeyLocation_internal( cx, key, info, isBackward );
1219 		}
1220 	}
1221 
1222 	return ssi;
1223 }
1224 
getKeyRangeLocations_internal(Database cx,KeyRange keys,int limit,bool reverse,TransactionInfo info)1225 ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
1226 	if( info.debugID.present() )
1227 		g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
1228 
1229 	loop {
1230 		choose {
1231 			when ( wait( cx->onMasterProxiesChanged() ) ) {}
1232 			when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskDefaultPromiseEndpoint ) ) ) {
1233 				state GetKeyServerLocationsReply rep = _rep;
1234 				if( info.debugID.present() )
1235 					g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After");
1236 				ASSERT( rep.results.size() );
1237 
1238 				state vector< pair<KeyRange,Reference<LocationInfo>> > results;
1239 				state int shard = 0;
1240 				for (; shard < rep.results.size(); shard++) {
1241 					//FIXME: these shards are being inserted into the map sequentially, it would be much more CPU efficient to save the map pairs and insert them all at once.
1242 					results.push_back( make_pair(rep.results[shard].first & keys, cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second)) );
1243 					wait(yield());
1244 				}
1245 
1246 				return results;
1247 			}
1248 		}
1249 	}
1250 }
1251 
1252 template <class F>
getKeyRangeLocations(Database const & cx,KeyRange const & keys,int limit,bool reverse,F StorageServerInterface::* member,TransactionInfo const & info)1253 Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database const& cx, KeyRange const& keys, int limit, bool reverse, F StorageServerInterface::*member, TransactionInfo const& info ) {
1254 	ASSERT (!keys.empty());
1255 
1256 	vector< pair<KeyRange,Reference<LocationInfo>> > locations;
1257 	if (!cx->getCachedLocations(keys, locations, limit, reverse)) {
1258 		return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
1259 	}
1260 
1261 	bool foundFailed = false;
1262 	for(auto& it : locations) {
1263 		bool onlyEndpointFailed = false;
1264 		for(int i = 0; i < it.second->size(); i++) {
1265 			if( IFailureMonitor::failureMonitor().onlyEndpointFailed(it.second->get(i, member).getEndpoint()) ) {
1266 				onlyEndpointFailed = true;
1267 				break;
1268 			}
1269 		}
1270 
1271 		if( onlyEndpointFailed ) {
1272 			cx->invalidateCache( it.first.begin );
1273 			foundFailed = true;
1274 		}
1275 	}
1276 
1277 	if(foundFailed) {
1278 		return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
1279 	}
1280 
1281 	return locations;
1282 }
1283 
warmRange_impl(Transaction * self,Database cx,KeyRange keys)1284 ACTOR Future<Void> warmRange_impl( Transaction *self, Database cx, KeyRange keys ) {
1285 	state int totalRanges = 0;
1286 	state int totalRequests = 0;
1287 	loop {
1288 		vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations_internal(cx, keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, false, self->info));
1289 		totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT;
1290 		totalRequests++;
1291 		if(locations.size() == 0 || totalRanges >= cx->locationCacheSize || locations[locations.size()-1].first.end >= keys.end)
1292 			break;
1293 
1294 		keys = KeyRangeRef(locations[locations.size()-1].first.end, keys.end);
1295 
1296 		if(totalRequests%20 == 0) {
1297 			//To avoid blocking the proxies from starting other transactions, occasionally get a read version.
1298 			state Transaction tr(cx);
1299 			loop {
1300 				try {
1301 					tr.setOption( FDBTransactionOptions::LOCK_AWARE );
1302 					tr.setOption( FDBTransactionOptions::CAUSAL_READ_RISKY );
1303 					wait(success( tr.getReadVersion() ));
1304 					break;
1305 				} catch( Error &e ) {
1306 					wait( tr.onError(e) );
1307 				}
1308 			}
1309 		}
1310 	}
1311 
1312 	return Void();
1313 }
1314 
warmRange(Database cx,KeyRange keys)1315 Future<Void> Transaction::warmRange(Database cx, KeyRange keys) {
1316 	return warmRange_impl(this, cx, keys);
1317 }
1318 
getValue(Future<Version> version,Key key,Database cx,TransactionInfo info,Reference<TransactionLogInfo> trLogInfo)1319 ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Database cx, TransactionInfo info, Reference<TransactionLogInfo> trLogInfo )
1320 {
1321 	state Version ver = wait( version );
1322 	validateVersion(ver);
1323 
1324 	loop {
1325 		state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, key, &StorageServerInterface::getValue, info) );
1326 		state Optional<UID> getValueID = Optional<UID>();
1327 		state uint64_t startTime;
1328 		state double startTimeD;
1329 		try {
1330 			//GetValueReply r = wait( g_random->randomChoice( ssi->get() ).getValue.getReply( GetValueRequest(key,ver) ) );
1331 			//return r.value;
1332 			if( info.debugID.present() ) {
1333 				getValueID = g_nondeterministic_random->randomUniqueID();
1334 
1335 				g_traceBatch.addAttach("GetValueAttachID", info.debugID.get().first(), getValueID.get().first());
1336 				g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
1337 				/*TraceEvent("TransactionDebugGetValueInfo", getValueID.get())
1338 					.detail("Key", key)
1339 					.detail("ReqVersion", ver)
1340 					.detail("Servers", describe(ssi.second->get()));*/
1341 			}
1342 
1343 			++cx->getValueSubmitted;
1344 			startTime = timer_int();
1345 			startTimeD = now();
1346 			++cx->transactionPhysicalReads;
1347 			state GetValueReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getValue, GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1348 			double latency = now() - startTimeD;
1349 			cx->readLatencies.addSample(latency);
1350 			if (trLogInfo) {
1351 				int valueSize = reply.value.present() ? reply.value.get().size() : 0;
1352 				trLogInfo->addLog(FdbClientLogEvents::EventGet(startTimeD, latency, valueSize, key));
1353 			}
1354 			cx->getValueCompleted->latency = timer_int() - startTime;
1355 			cx->getValueCompleted->log();
1356 
1357 			if( info.debugID.present() ) {
1358 				g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.After"); //.detail("TaskID", g_network->getCurrentTask());
1359 				/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
1360 					.detail("Key", key)
1361 					.detail("ReqVersion", ver)
1362 					.detail("ReplySize", reply.value.present() ? reply.value.get().size() : -1);*/
1363 			}
1364 			return reply.value;
1365 		} catch (Error& e) {
1366 			cx->getValueCompleted->latency = timer_int() - startTime;
1367 			cx->getValueCompleted->log();
1368 			if( info.debugID.present() ) {
1369 				g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Error"); //.detail("TaskID", g_network->getCurrentTask());
1370 				/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
1371 					.detail("Key", key)
1372 					.detail("ReqVersion", ver)
1373 					.detail("ReplySize", reply.value.present() ? reply.value.get().size() : -1);*/
1374 			}
1375 			if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
1376 				(e.code() == error_code_transaction_too_old && ver == latestVersion) ) {
1377 				cx->invalidateCache( key );
1378 				wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1379 			} else {
1380 				if (trLogInfo)
1381 					trLogInfo->addLog(FdbClientLogEvents::EventGetError(startTimeD, static_cast<int>(e.code()), key));
1382 				throw e;
1383 			}
1384 		}
1385 	}
1386 }
1387 
getKey(Database cx,KeySelector k,Future<Version> version,TransactionInfo info)1388 ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, TransactionInfo info ) {
1389 	Version ver = wait(version);
1390 
1391 	if( info.debugID.present() )
1392 		g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion");
1393 
1394 	loop {
1395 		if (k.getKey() == allKeys.end) {
1396 			if (k.offset > 0) return allKeys.end;
1397 			k.orEqual = false;
1398 		}
1399 		else if (k.getKey() == allKeys.begin && k.offset <= 0) {
1400 			return Key();
1401 		}
1402 
1403 		Key locationKey(k.getKey(), k.arena());
1404 		state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()) );
1405 
1406 		try {
1407 			if( info.debugID.present() )
1408 				g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
1409 			++cx->transactionPhysicalReads;
1410 			GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1411 			if( info.debugID.present() )
1412 				g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual);
1413 			k = reply.sel;
1414 			if (!k.offset && k.orEqual) {
1415 				return k.getKey();
1416 			}
1417 		} catch (Error& e) {
1418 			if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
1419 				cx->invalidateCache(k.getKey(), k.isBackward());
1420 
1421 				wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1422 			} else {
1423 				TraceEvent(SevInfo, "GetKeyError")
1424 					.error(e)
1425 					.detail("AtKey", k.getKey())
1426 					.detail("Offset", k.offset);
1427 				throw e;
1428 			}
1429 		}
1430 	}
1431 }
1432 
waitForCommittedVersion(Database cx,Version version)1433 ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
1434 	try {
1435 		loop {
1436 			choose {
1437 				when ( wait( cx->onMasterProxiesChanged() ) ) {}
1438 				when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE ), cx->taskID ) ) ) {
1439 					if (v.version >= version)
1440 						return v.version;
1441 					// SOMEDAY: Do the wait on the server side, possibly use less expensive source of committed version (causal consistency is not needed for this purpose)
1442 					wait( delay( CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, cx->taskID ) );
1443 				}
1444 			}
1445 		}
1446 	} catch (Error& e) {
1447 		TraceEvent(SevError, "WaitForCommittedVersionError").error(e);
1448 		throw;
1449 	}
1450 }
1451 
1452 ACTOR Future<Void> readVersionBatcher(
1453 	DatabaseContext* cx, FutureStream<std::pair<Promise<GetReadVersionReply>, Optional<UID>>> versionStream,
1454 	uint32_t flags);
1455 
watchValue(Future<Version> version,Key key,Optional<Value> value,Database cx,int readVersionFlags,TransactionInfo info)1456 ACTOR Future< Void > watchValue( Future<Version> version, Key key, Optional<Value> value, Database cx, int readVersionFlags, TransactionInfo info )
1457 {
1458 	state Version ver = wait( version );
1459 	validateVersion(ver);
1460 	ASSERT(ver != latestVersion);
1461 
1462 	loop {
1463 		state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, key, &StorageServerInterface::watchValue, info ) );
1464 
1465 		try {
1466 			state Optional<UID> watchValueID = Optional<UID>();
1467 			if( info.debugID.present() ) {
1468 				watchValueID = g_nondeterministic_random->randomUniqueID();
1469 
1470 				g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first());
1471 				g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
1472 			}
1473 			state Version resp = wait( loadBalance( ssi.second, &StorageServerInterface::watchValue, WatchValueRequest(key, value, ver, watchValueID), TaskDefaultPromiseEndpoint ) );
1474 			if( info.debugID.present() ) {
1475 				g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask());
1476 			}
1477 
1478 			//FIXME: wait for known committed version on the storage server before replying,
1479 			//cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop)
1480 			Version v = wait( waitForCommittedVersion( cx, resp ) );
1481 
1482 			//TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp).detail("Key",  key ).detail("Value", value);
1483 
1484 			if( v - resp < 50000000 ) // False if there is a master failure between getting the response and getting the committed version, Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT
1485 				return Void();
1486 			ver = v;
1487 		} catch (Error& e) {
1488 			if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
1489 				cx->invalidateCache( key );
1490 				wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1491 			} else if( e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind ) {
1492 				TEST( e.code() == error_code_watch_cancelled ); // Too many watches on the storage server, poll for changes instead
1493 				TEST( e.code() == error_code_process_behind ); // The storage servers are all behind
1494 				wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID));
1495 			} else if ( e.code() == error_code_timed_out ) { //The storage server occasionally times out watches in case it was cancelled
1496 				TEST( true ); // A watch timed out
1497 				wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, info.taskID));
1498 			} else {
1499 				state Error err = e;
1500 				wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, info.taskID));
1501 				throw err;
1502 			}
1503 		}
1504 	}
1505 }
1506 
transformRangeLimits(GetRangeLimits limits,bool reverse,GetKeyValuesRequest & req)1507 void transformRangeLimits(GetRangeLimits limits, bool reverse, GetKeyValuesRequest &req) {
1508 	if(limits.bytes != 0) {
1509 		if(!limits.hasRowLimit())
1510 			req.limit = CLIENT_KNOBS->REPLY_BYTE_LIMIT; // Can't get more than this many rows anyway
1511 		else
1512 			req.limit = std::min( CLIENT_KNOBS->REPLY_BYTE_LIMIT, limits.rows );
1513 
1514 		if(reverse)
1515 			req.limit *= -1;
1516 
1517 		if(!limits.hasByteLimit())
1518 			req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
1519 		else
1520 			req.limitBytes = std::min( CLIENT_KNOBS->REPLY_BYTE_LIMIT, limits.bytes );
1521 	}
1522 	else {
1523 		req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
1524 		req.limit = reverse ? -limits.minRows : limits.minRows;
1525 	}
1526 }
1527 
getExactRange(Database cx,Version version,KeyRange keys,GetRangeLimits limits,bool reverse,TransactionInfo info)1528 ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version version,
1529 	KeyRange keys, GetRangeLimits limits, bool reverse, TransactionInfo info )
1530 {
1531 	state Standalone<RangeResultRef> output;
1532 
1533 	//printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
1534 	loop {
1535 		state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValues, info ) );
1536 		ASSERT( locations.size() );
1537 		state int shard = 0;
1538 		loop {
1539 			const KeyRangeRef& range = locations[shard].first;
1540 
1541 			GetKeyValuesRequest req;
1542 			req.version = version;
1543 			req.begin = firstGreaterOrEqual( range.begin );
1544 			req.end = firstGreaterOrEqual( range.end );
1545 
1546 			transformRangeLimits(limits, reverse, req);
1547 			ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
1548 
1549 			//FIXME: buggify byte limits on internal functions that use them, instead of globally
1550 			req.debugID = info.debugID;
1551 
1552 			try {
1553 				if( info.debugID.present() ) {
1554 					g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.Before");
1555 					/*TraceEvent("TransactionDebugGetExactRangeInfo", info.debugID.get())
1556 						.detail("ReqBeginKey", req.begin.getKey())
1557 						.detail("ReqEndKey", req.end.getKey())
1558 						.detail("ReqLimit", req.limit)
1559 						.detail("ReqLimitBytes", req.limitBytes)
1560 						.detail("ReqVersion", req.version)
1561 						.detail("Reverse", reverse)
1562 						.detail("Servers", locations[shard].second->description());*/
1563 				}
1564 				++cx->transactionPhysicalReads;
1565 				GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1566 				if( info.debugID.present() )
1567 					g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
1568 				output.arena().dependsOn( rep.arena );
1569 				output.append( output.arena(), rep.data.begin(), rep.data.size() );
1570 
1571 				if( limits.hasRowLimit() && rep.data.size() > limits.rows ) {
1572 					TraceEvent(SevError, "GetExactRangeTooManyRows").detail("RowLimit", limits.rows).detail("DeliveredRows", output.size());
1573 					ASSERT( false );
1574 				}
1575 				limits.decrement( rep.data );
1576 
1577 				if (limits.isReached()) {
1578 					output.more = true;
1579 					return output;
1580 				}
1581 
1582 				bool more = rep.more;
1583 				// If the reply says there is more but we know that we finished the shard, then fix rep.more
1584 				if( reverse && more && rep.data.size() > 0 && output[output.size()-1].key == locations[shard].first.begin )
1585 					more = false;
1586 
1587 				if (more) {
1588 					if( !rep.data.size() ) {
1589 						TraceEvent(SevError, "GetExactRangeError").detail("Reason", "More data indicated but no rows present")
1590 							.detail("LimitBytes", limits.bytes).detail("LimitRows", limits.rows)
1591 							.detail("OutputSize", output.size()).detail("OutputBytes", output.expectedSize())
1592 							.detail("BlockSize", rep.data.size()).detail("BlockBytes", rep.data.expectedSize());
1593 						ASSERT( false );
1594 					}
1595 					TEST(true);   // GetKeyValuesReply.more in getExactRange
1596 					// Make next request to the same shard with a beginning key just after the last key returned
1597 					if( reverse )
1598 						locations[shard].first = KeyRangeRef( locations[shard].first.begin, output[output.size()-1].key );
1599 					else
1600 						locations[shard].first = KeyRangeRef( keyAfter( output[output.size()-1].key ), locations[shard].first.end );
1601 				}
1602 
1603 				if (!more || locations[shard].first.empty()) {
1604 					TEST(true);
1605 					if(shard == locations.size()-1) {
1606 						const KeyRangeRef& range = locations[shard].first;
1607 						KeyRef begin = reverse ? keys.begin : range.end;
1608 						KeyRef end = reverse ? range.begin : keys.end;
1609 
1610 						if(begin >= end) {
1611 							output.more = false;
1612 							return output;
1613 						}
1614 						TEST(true); //Multiple requests of key locations
1615 
1616 						keys = KeyRangeRef(begin, end);
1617 						break;
1618 					}
1619 
1620 					++shard;
1621 				}
1622 
1623 				// Soft byte limit - return results early if the user specified a byte limit and we got results
1624 				// This can prevent problems where the desired range spans many shards and would be too slow to
1625 				// fetch entirely.
1626 				if(limits.hasSatisfiedMinRows() && output.size() > 0) {
1627 					output.more = true;
1628 					return output;
1629 				}
1630 
1631 			} catch (Error& e) {
1632 				if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
1633 					const KeyRangeRef& range = locations[shard].first;
1634 
1635 					if( reverse )
1636 						keys = KeyRangeRef( keys.begin, range.end );
1637 					else
1638 						keys = KeyRangeRef( range.begin, keys.end );
1639 
1640 					cx->invalidateCache( keys );
1641 					wait( delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID ));
1642 					break;
1643 				} else {
1644 					TraceEvent(SevInfo, "GetExactRangeError")
1645 						.error(e)
1646 						.detail("ShardBegin", locations[shard].first.begin)
1647 						.detail("ShardEnd", locations[shard].first.end);
1648 					throw;
1649 				}
1650 			}
1651 		}
1652 	}
1653 }
1654 
resolveKey(Database const & cx,KeySelector const & key,Version const & version,TransactionInfo const & info)1655 Future<Key> resolveKey( Database const& cx, KeySelector const& key, Version const& version, TransactionInfo const& info ) {
1656 	if( key.isFirstGreaterOrEqual() )
1657 		return Future<Key>( key.getKey() );
1658 
1659 	if( key.isFirstGreaterThan() )
1660 		return Future<Key>( keyAfter( key.getKey() ) );
1661 
1662 	return getKey( cx, key, version, info );
1663 }
1664 
getRangeFallback(Database cx,Version version,KeySelector begin,KeySelector end,GetRangeLimits limits,bool reverse,TransactionInfo info)1665 ACTOR Future<Standalone<RangeResultRef>> getRangeFallback( Database cx, Version version,
1666 	KeySelector begin, KeySelector end, GetRangeLimits limits, bool reverse, TransactionInfo info )
1667 {
1668 	if(version == latestVersion) {
1669 		state Transaction transaction(cx);
1670 		transaction.setOption(FDBTransactionOptions::CAUSAL_READ_RISKY);
1671 		transaction.setOption(FDBTransactionOptions::LOCK_AWARE);
1672 		transaction.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
1673 		Version ver = wait( transaction.getReadVersion() );
1674 		version = ver;
1675 	}
1676 
1677 	Future<Key> fb = resolveKey(cx, begin, version, info);
1678 	state Future<Key> fe = resolveKey(cx, end, version, info);
1679 
1680 	state Key b = wait(fb);
1681 	state Key e = wait(fe);
1682 	if (b >= e) {
1683 		return Standalone<RangeResultRef>();
1684 	}
1685 
1686 	//if e is allKeys.end, we have read through the end of the database
1687 	//if b is allKeys.begin, we have either read through the beginning of the database,
1688 	//or allKeys.begin exists in the database and will be part of the conflict range anyways
1689 
1690 	Standalone<RangeResultRef> _r = wait( getExactRange(cx, version, KeyRangeRef(b, e), limits, reverse, info) );
1691 	Standalone<RangeResultRef> r = _r;
1692 
1693 	if(b == allKeys.begin && ((reverse && !r.more) || !reverse))
1694 		r.readToBegin = true;
1695 	if(e == allKeys.end && ((!reverse && !r.more) || reverse))
1696 		r.readThroughEnd = true;
1697 
1698 
1699 	ASSERT( !limits.hasRowLimit() || r.size() <= limits.rows );
1700 
1701 	// If we were limiting bytes and the returned range is twice the request (plus 10K) log a warning
1702 	if( limits.hasByteLimit() && r.expectedSize() > size_t(limits.bytes + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + CLIENT_KNOBS->VALUE_SIZE_LIMIT + 1) && limits.minRows == 0 ) {
1703 		TraceEvent(SevWarnAlways, "GetRangeFallbackTooMuchData")
1704 			.detail("LimitBytes", limits.bytes)
1705 			.detail("DeliveredBytes", r.expectedSize())
1706 			.detail("LimitRows", limits.rows)
1707 			.detail("DeliveredRows", r.size());
1708 	}
1709 
1710 	return r;
1711 }
1712 
getRangeFinished(Reference<TransactionLogInfo> trLogInfo,double startTime,KeySelector begin,KeySelector end,bool snapshot,Promise<std::pair<Key,Key>> conflictRange,bool reverse,Standalone<RangeResultRef> result)1713 void getRangeFinished(Reference<TransactionLogInfo> trLogInfo, double startTime, KeySelector begin, KeySelector end, bool snapshot,
1714 	Promise<std::pair<Key, Key>> conflictRange, bool reverse, Standalone<RangeResultRef> result)
1715 {
1716 	if( trLogInfo ) {
1717 		int rangeSize = 0;
1718 		for (const KeyValueRef &kv : result.contents())
1719 			rangeSize += kv.key.size() + kv.value.size();
1720 		trLogInfo->addLog(FdbClientLogEvents::EventGetRange(startTime, now()-startTime, rangeSize, begin.getKey(), end.getKey()));
1721 	}
1722 
1723 	if( !snapshot ) {
1724 		Key rangeBegin;
1725 		Key rangeEnd;
1726 
1727 		if(result.readToBegin) {
1728 			rangeBegin = allKeys.begin;
1729 		}
1730 		else if(((!reverse || !result.more || begin.offset > 1) && begin.offset > 0) || result.size() == 0) {
1731 			rangeBegin = Key(begin.getKey(), begin.arena());
1732 		}
1733 		else {
1734 			rangeBegin = reverse ? result.end()[-1].key : result[0].key;
1735 		}
1736 
1737 		if(end.offset > begin.offset && end.getKey() < rangeBegin) {
1738 			rangeBegin = Key(end.getKey(), end.arena());
1739 		}
1740 
1741 		if(result.readThroughEnd) {
1742 			rangeEnd = allKeys.end;
1743 		}
1744 		else if(((reverse || !result.more || end.offset <= 0) && end.offset <= 1) || result.size() == 0) {
1745 			rangeEnd = Key(end.getKey(), end.arena());
1746 		}
1747 		else {
1748 			rangeEnd = keyAfter(reverse ? result[0].key : result.end()[-1].key);
1749 		}
1750 
1751 		if(begin.offset < end.offset && begin.getKey() > rangeEnd) {
1752 			rangeEnd = Key(begin.getKey(), begin.arena());
1753 		}
1754 
1755 		conflictRange.send(std::make_pair(rangeBegin, rangeEnd));
1756 	}
1757 }
1758 
getRange(Database cx,Reference<TransactionLogInfo> trLogInfo,Future<Version> fVersion,KeySelector begin,KeySelector end,GetRangeLimits limits,Promise<std::pair<Key,Key>> conflictRange,bool snapshot,bool reverse,TransactionInfo info)1759 ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<TransactionLogInfo> trLogInfo, Future<Version> fVersion,
1760 	KeySelector begin, KeySelector end, GetRangeLimits limits, Promise<std::pair<Key, Key>> conflictRange, bool snapshot, bool reverse,
1761 	TransactionInfo info )
1762 {
1763 	state GetRangeLimits originalLimits( limits );
1764 	state KeySelector originalBegin = begin;
1765 	state KeySelector originalEnd = end;
1766 	state Standalone<RangeResultRef> output;
1767 
1768 	try {
1769 		state Version version = wait( fVersion );
1770 		validateVersion(version);
1771 
1772 		state double startTime = now();
1773 		state Version readVersion = version; // Needed for latestVersion requests; if more, make future requests at the version that the first one completed
1774 											 // FIXME: Is this really right?  Weaken this and see if there is a problem; if so maybe there is a much subtler problem even with this.
1775 
1776 		if( begin.getKey() == allKeys.begin && begin.offset < 1 ) {
1777 			output.readToBegin = true;
1778 			begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena());
1779 		}
1780 
1781 		ASSERT( !limits.isReached() );
1782 		ASSERT( (!limits.hasRowLimit() || limits.rows >= limits.minRows) && limits.minRows >= 0 );
1783 
1784 		loop {
1785 			if( end.getKey() == allKeys.begin && (end.offset < 1 || end.isFirstGreaterOrEqual()) ) {
1786 				getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1787 				return output;
1788 			}
1789 
1790 			Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena());
1791 			bool locationBackward = reverse ? (end-1).isBackward() : begin.isBackward();
1792 			state pair<KeyRange, Reference<LocationInfo>> beginServer = wait( getKeyLocation( cx, locationKey, &StorageServerInterface::getKeyValues, info, locationBackward ) );
1793 			state KeyRange shard = beginServer.first;
1794 			state bool modifiedSelectors = false;
1795 			state GetKeyValuesRequest req;
1796 
1797 			req.version = readVersion;
1798 
1799 			if( reverse && (begin-1).isDefinitelyLess(shard.begin) &&
1800 				( !begin.isFirstGreaterOrEqual() || begin.getKey() != shard.begin ) ) { //In this case we would be setting modifiedSelectors to true, but not modifying anything
1801 
1802 				req.begin = firstGreaterOrEqual( shard.begin );
1803 				modifiedSelectors = true;
1804 			}
1805 			else req.begin = begin;
1806 
1807 			if( !reverse && end.isDefinitelyGreater(shard.end) ) {
1808 				req.end = firstGreaterOrEqual( shard.end );
1809 				modifiedSelectors = true;
1810 			}
1811 			else req.end = end;
1812 
1813 			transformRangeLimits(limits, reverse, req);
1814 			ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
1815 
1816 			req.debugID = info.debugID;
1817 			try {
1818 				if( info.debugID.present() ) {
1819 					g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
1820 					/*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get())
1821 						.detail("ReqBeginKey", req.begin.getKey())
1822 						.detail("ReqEndKey", req.end.getKey())
1823 						.detail("OriginalBegin", originalBegin.toString())
1824 						.detail("OriginalEnd", originalEnd.toString())
1825 						.detail("Begin", begin.toString())
1826 						.detail("End", end.toString())
1827 						.detail("Shard", shard)
1828 						.detail("ReqLimit", req.limit)
1829 						.detail("ReqLimitBytes", req.limitBytes)
1830 						.detail("ReqVersion", req.version)
1831 						.detail("Reverse", reverse)
1832 						.detail("ModifiedSelectors", modifiedSelectors)
1833 						.detail("Servers", beginServer.second->description());*/
1834 				}
1835 
1836 				++cx->transactionPhysicalReads;
1837 				GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1838 
1839 				if( info.debugID.present() ) {
1840 					g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size());
1841 					/*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get())
1842 						.detail("ReqBeginKey", req.begin.getKey())
1843 						.detail("ReqEndKey", req.end.getKey())
1844 						.detail("RepIsMore", rep.more)
1845 						.detail("VersionReturned", rep.version)
1846 						.detail("RowsReturned", rep.data.size());*/
1847 				}
1848 
1849 				ASSERT( !rep.more || rep.data.size() );
1850 				ASSERT( !limits.hasRowLimit() || rep.data.size() <= limits.rows );
1851 
1852 				limits.decrement( rep.data );
1853 
1854 				if(reverse && begin.isLastLessOrEqual() && rep.data.size() && rep.data.end()[-1].key == begin.getKey()) {
1855 					modifiedSelectors = false;
1856 				}
1857 
1858 				bool finished = limits.isReached() || ( !modifiedSelectors && !rep.more ) || limits.hasSatisfiedMinRows();
1859 				bool readThrough = modifiedSelectors && !rep.more;
1860 
1861 				// optimization: first request got all data--just return it
1862 				if( finished && !output.size() ) {
1863 					bool readToBegin = output.readToBegin;
1864 					bool readThroughEnd = output.readThroughEnd;
1865 
1866 					output = Standalone<RangeResultRef>( RangeResultRef( rep.data, modifiedSelectors || limits.isReached() || rep.more ), rep.arena );
1867 					output.readToBegin = readToBegin;
1868 					output.readThroughEnd = readThroughEnd;
1869 
1870 					if( BUGGIFY && limits.hasByteLimit() && output.size() > std::max(1, originalLimits.minRows) ) {
1871 						output.more = true;
1872 						output.resize(output.arena(), g_random->randomInt(std::max(1,originalLimits.minRows),output.size()));
1873 						getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1874 						return output;
1875 					}
1876 
1877 					if( readThrough ) {
1878 						output.arena().dependsOn( shard.arena() );
1879 						output.readThrough = reverse ? shard.begin : shard.end;
1880 					}
1881 
1882 					getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1883 					return output;
1884 				}
1885 
1886 				output.arena().dependsOn( rep.arena );
1887 				output.append(output.arena(), rep.data.begin(), rep.data.size());
1888 
1889 				if( finished ) {
1890 					if( readThrough ) {
1891 						output.arena().dependsOn( shard.arena() );
1892 						output.readThrough = reverse ? shard.begin : shard.end;
1893 					}
1894 					output.more = modifiedSelectors || limits.isReached() || rep.more;
1895 
1896 					getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1897 					return output;
1898 				}
1899 
1900 				readVersion = rep.version; // see above comment
1901 
1902 				if( !rep.more ) {
1903 					ASSERT( modifiedSelectors );
1904 					TEST(true);  // !GetKeyValuesReply.more and modifiedSelectors in getRange
1905 
1906 					if( !rep.data.size() ) {
1907 						Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
1908 						getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
1909 						return result;
1910 					}
1911 
1912 					if( reverse )
1913 						end = firstGreaterOrEqual( shard.begin );
1914 					else
1915 						begin = firstGreaterOrEqual( shard.end );
1916 				} else {
1917 					TEST(true);  // GetKeyValuesReply.more in getRange
1918 					if( reverse )
1919 						end = firstGreaterOrEqual( output[output.size()-1].key );
1920 					else
1921 						begin = firstGreaterThan( output[output.size()-1].key );
1922 				}
1923 
1924 
1925 			} catch ( Error& e ) {
1926 				if( info.debugID.present() ) {
1927 					g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error");
1928 					TraceEvent("TransactionDebugError", info.debugID.get()).error(e);
1929 				}
1930 				if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
1931 					(e.code() == error_code_transaction_too_old && readVersion == latestVersion))
1932 				{
1933 					cx->invalidateCache( reverse ? end.getKey() : begin.getKey(), reverse ? (end-1).isBackward() : begin.isBackward() );
1934 
1935 					if (e.code() == error_code_wrong_shard_server) {
1936 						Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
1937 						getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
1938 						return result;
1939 					}
1940 
1941 					wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1942 				} else {
1943 					if (trLogInfo)
1944 						trLogInfo->addLog(FdbClientLogEvents::EventGetRangeError(startTime, static_cast<int>(e.code()), begin.getKey(), end.getKey()));
1945 
1946 					throw e;
1947 				}
1948 			}
1949 		}
1950 	}
1951 	catch(Error &e) {
1952 		if(conflictRange.canBeSet()) {
1953 			conflictRange.send(std::make_pair(Key(), Key()));
1954 		}
1955 
1956 		throw;
1957 	}
1958 }
1959 
getRange(Database const & cx,Future<Version> const & fVersion,KeySelector const & begin,KeySelector const & end,GetRangeLimits const & limits,bool const & reverse,TransactionInfo const & info)1960 Future<Standalone<RangeResultRef>> getRange( Database const& cx, Future<Version> const& fVersion, KeySelector const& begin, KeySelector const& end,
1961 	GetRangeLimits const& limits, bool const& reverse, TransactionInfo const& info )
1962 {
1963 	return getRange(cx, Reference<TransactionLogInfo>(), fVersion, begin, end, limits, Promise<std::pair<Key, Key>>(), true, reverse, info);
1964 }
1965 
Transaction(Database const & cx)1966 Transaction::Transaction( Database const& cx )
1967 	: cx(cx), info(cx->taskID), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), committedVersion(invalidVersion), versionstampPromise(Promise<Standalone<StringRef>>()), options(cx), numErrors(0), trLogInfo(createTrLogInfoProbabilistically(cx))
1968 {
1969 	setPriority(GetReadVersionRequest::PRIORITY_DEFAULT);
1970 }
1971 
~Transaction()1972 Transaction::~Transaction() {
1973 	flushTrLogsIfEnabled();
1974 	cancelWatches();
1975 }
1976 
operator =(Transaction && r)1977 void Transaction::operator=(Transaction&& r) BOOST_NOEXCEPT {
1978 	flushTrLogsIfEnabled();
1979 	cx = std::move(r.cx);
1980 	tr = std::move(r.tr);
1981 	readVersion = std::move(r.readVersion);
1982 	metadataVersion = std::move(r.metadataVersion);
1983 	extraConflictRanges = std::move(r.extraConflictRanges);
1984 	commitResult = std::move(r.commitResult);
1985 	committing = std::move(r.committing);
1986 	options = std::move(r.options);
1987 	info = r.info;
1988 	backoff = r.backoff;
1989 	numErrors = r.numErrors;
1990 	committedVersion = r.committedVersion;
1991 	versionstampPromise = std::move(r.versionstampPromise);
1992 	watches = r.watches;
1993 	trLogInfo = std::move(r.trLogInfo);
1994 }
1995 
flushTrLogsIfEnabled()1996 void Transaction::flushTrLogsIfEnabled() {
1997 	if (trLogInfo && trLogInfo->logsAdded && trLogInfo->trLogWriter.getData()) {
1998 		ASSERT(trLogInfo->flushed == false);
1999 		cx->clientStatusUpdater.inStatusQ.push_back({ trLogInfo->identifier, std::move(trLogInfo->trLogWriter) });
2000 		trLogInfo->flushed = true;
2001 	}
2002 }
2003 
setVersion(Version v)2004 void Transaction::setVersion( Version v ) {
2005 	startTime = now();
2006 	if (readVersion.isValid())
2007 		throw read_version_already_set();
2008 	if (v <= 0)
2009 		throw version_invalid();
2010 	readVersion = v;
2011 }
2012 
get(const Key & key,bool snapshot)2013 Future<Optional<Value>> Transaction::get( const Key& key, bool snapshot ) {
2014 	++cx->transactionLogicalReads;
2015 	//ASSERT (key < allKeys.end);
2016 
2017 	//There are no keys in the database with size greater than KEY_SIZE_LIMIT
2018 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2019 		return Optional<Value>();
2020 
2021 	auto ver = getReadVersion();
2022 
2023 /*	if (!systemKeys.contains(key))
2024 		return Optional<Value>(Value()); */
2025 
2026 	if( !snapshot )
2027 		tr.transaction.read_conflict_ranges.push_back(tr.arena, singleKeyRange(key, tr.arena));
2028 
2029 	if(key == metadataVersionKey) {
2030 		if(!ver.isReady() || metadataVersion.isSet()) {
2031 			return metadataVersion.getFuture();
2032 		} else {
2033 			if(ver.isError()) return ver.getError();
2034 			if(ver.get() == cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
2035 				return cx->metadataVersionCache[cx->mvCacheInsertLocation].second;
2036 			}
2037 
2038 			Version v = ver.get();
2039 			int hi = cx->mvCacheInsertLocation;
2040 			int lo = (cx->mvCacheInsertLocation+1)%cx->metadataVersionCache.size();
2041 
2042 			while(hi!=lo) {
2043 				int cu = hi > lo ? (hi + lo)/2 : ((hi + cx->metadataVersionCache.size() + lo)/2)%cx->metadataVersionCache.size();
2044 				if(v == cx->metadataVersionCache[cu].first) {
2045 					return cx->metadataVersionCache[cu].second;
2046 				}
2047 				if(cu == lo) {
2048 					break;
2049 				}
2050 				if(v < cx->metadataVersionCache[cu].first) {
2051 					hi = cu;
2052 				} else {
2053 					lo = (cu+1)%cx->metadataVersionCache.size();
2054 				}
2055 			}
2056 		}
2057 	}
2058 
2059 	return getValue( ver, key, cx, info, trLogInfo );
2060 }
2061 
setWatch(Future<Void> watchFuture)2062 void Watch::setWatch(Future<Void> watchFuture) {
2063 	this->watchFuture = watchFuture;
2064 
2065 	//Cause the watch loop to go around and start waiting on watchFuture
2066 	onSetWatchTrigger.send(Void());
2067 }
2068 
2069 //FIXME: This seems pretty horrible. Now a Database can't die until all of its watches do...
watch(Reference<Watch> watch,Database cx,Transaction * self)2070 ACTOR Future<Void> watch( Reference<Watch> watch, Database cx, Transaction *self ) {
2071 	cx->addWatch();
2072 	try {
2073 		self->watches.push_back(watch);
2074 
2075 		choose {
2076 			// RYOW write to value that is being watched (if applicable)
2077 			// Errors
2078 			when(wait(watch->onChangeTrigger.getFuture())) { }
2079 
2080 			// NativeAPI finished commit and updated watchFuture
2081 			when(wait(watch->onSetWatchTrigger.getFuture())) {
2082 
2083 				// NativeAPI watchValue future finishes or errors
2084 				wait(watch->watchFuture);
2085 			}
2086 		}
2087 	}
2088 	catch(Error &e) {
2089 		cx->removeWatch();
2090 		throw;
2091 	}
2092 
2093 	cx->removeWatch();
2094 	return Void();
2095 }
2096 
watch(Reference<Watch> watch)2097 Future< Void > Transaction::watch( Reference<Watch> watch ) {
2098 	return ::watch(watch, cx, this);
2099 }
2100 
getAddressesForKeyActor(Key key,Future<Version> ver,Database cx,TransactionInfo info)2101 ACTOR Future< Standalone< VectorRef< const char*>>> getAddressesForKeyActor( Key key, Future<Version> ver, Database cx, TransactionInfo info ) {
2102 	state vector<StorageServerInterface> ssi;
2103 
2104 	// If key >= allKeys.end, then getRange will return a kv-pair with an empty value. This will result in our serverInterfaces vector being empty, which will cause us to return an empty addresses list.
2105 
2106 	state Key ksKey = keyServersKey(key);
2107 	Future<Standalone<RangeResultRef>> futureServerUids = getRange(cx, ver, lastLessOrEqual(ksKey), firstGreaterThan(ksKey), GetRangeLimits(1), false, info);
2108 	Standalone<RangeResultRef> serverUids = wait( futureServerUids );
2109 
2110 	ASSERT( serverUids.size() ); // every shard needs to have a team
2111 
2112 	vector<UID> src;
2113 	vector<UID> ignore; // 'ignore' is so named because it is the vector into which we decode the 'dest' servers in the case where this key is being relocated. But 'src' is the canonical location until the move is finished, because it could be cancelled at any time.
2114 	decodeKeyServersValue(serverUids[0].value, src, ignore);
2115 	Optional<vector<StorageServerInterface>> serverInterfaces = wait( transactionalGetServerInterfaces(ver, cx, info, src) );
2116 
2117 	ASSERT( serverInterfaces.present() );  // since this is happening transactionally, /FF/keyServers and /FF/serverList need to be consistent with one another
2118 	ssi = serverInterfaces.get();
2119 
2120 	Standalone<VectorRef<const char*>> addresses;
2121 	for (auto i : ssi) {
2122 		std::string ipString = i.address().ip.toString();
2123 		char* c_string = new (addresses.arena()) char[ipString.length()+1];
2124 		strcpy(c_string, ipString.c_str());
2125 		addresses.push_back(addresses.arena(), c_string);
2126 	}
2127 	return addresses;
2128 }
2129 
getAddressesForKey(const Key & key)2130 Future< Standalone< VectorRef< const char*>>> Transaction::getAddressesForKey( const Key& key ) {
2131 	++cx->transactionLogicalReads;
2132 	auto ver = getReadVersion();
2133 
2134 	return getAddressesForKeyActor(key, ver, cx, info);
2135 }
2136 
getKeyAndConflictRange(Database cx,KeySelector k,Future<Version> version,Promise<std::pair<Key,Key>> conflictRange,TransactionInfo info)2137 ACTOR Future< Key > getKeyAndConflictRange(
2138 	Database cx, KeySelector k, Future<Version> version, Promise<std::pair<Key, Key>> conflictRange, TransactionInfo info)
2139 {
2140 	try {
2141 		Key rep = wait( getKey(cx, k, version, info) );
2142 		if( k.offset <= 0 )
2143 			conflictRange.send( std::make_pair( rep, k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()) ) );
2144 		else
2145 			conflictRange.send( std::make_pair( k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()), keyAfter( rep ) ) );
2146 		return std::move(rep);
2147 	} catch( Error&e ) {
2148 		conflictRange.send(std::make_pair(Key(), Key()));
2149 		throw;
2150 	}
2151 }
2152 
getKey(const KeySelector & key,bool snapshot)2153 Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) {
2154 	++cx->transactionLogicalReads;
2155 	if( snapshot )
2156 		return ::getKey(cx, key, getReadVersion(), info);
2157 
2158 	Promise<std::pair<Key, Key>> conflictRange;
2159 	extraConflictRanges.push_back( conflictRange.getFuture() );
2160 	return getKeyAndConflictRange( cx, key, getReadVersion(), conflictRange, info );
2161 }
2162 
getRange(const KeySelector & begin,const KeySelector & end,GetRangeLimits limits,bool snapshot,bool reverse)2163 Future< Standalone<RangeResultRef> > Transaction::getRange(
2164 	const KeySelector& begin,
2165 	const KeySelector& end,
2166 	GetRangeLimits limits,
2167 	bool snapshot,
2168 	bool reverse )
2169 {
2170 	++cx->transactionLogicalReads;
2171 
2172 	if( limits.isReached() )
2173 		return Standalone<RangeResultRef>();
2174 
2175 	if( !limits.isValid() )
2176 		return range_limits_invalid();
2177 
2178 	ASSERT(limits.rows != 0);
2179 
2180 	KeySelector b = begin;
2181 	if( b.orEqual ) {
2182 		TEST(true); // Native begin orEqual==true
2183 		b.removeOrEqual(b.arena());
2184 	}
2185 
2186 	KeySelector e = end;
2187 	if( e.orEqual ) {
2188 		TEST(true); // Native end orEqual==true
2189 		e.removeOrEqual(e.arena());
2190 	}
2191 
2192 	if( b.offset >= e.offset && b.getKey() >= e.getKey() ) {
2193 		TEST(true); // Native range inverted
2194 		return Standalone<RangeResultRef>();
2195 	}
2196 
2197 	Promise<std::pair<Key, Key>> conflictRange;
2198 	if(!snapshot) {
2199 		extraConflictRanges.push_back( conflictRange.getFuture() );
2200 	}
2201 
2202 	return ::getRange(cx, trLogInfo, getReadVersion(), b, e, limits, conflictRange, snapshot, reverse, info);
2203 }
2204 
getRange(const KeySelector & begin,const KeySelector & end,int limit,bool snapshot,bool reverse)2205 Future< Standalone<RangeResultRef> > Transaction::getRange(
2206 	const KeySelector& begin,
2207 	const KeySelector& end,
2208 	int limit,
2209 	bool snapshot,
2210 	bool reverse )
2211 {
2212 	return getRange( begin, end, GetRangeLimits( limit ), snapshot, reverse );
2213 }
2214 
addReadConflictRange(KeyRangeRef const & keys)2215 void Transaction::addReadConflictRange( KeyRangeRef const& keys ) {
2216 	ASSERT( !keys.empty() );
2217 
2218 	//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
2219 	//we can translate it to an equivalent one with smaller keys
2220 	KeyRef begin = keys.begin;
2221 	KeyRef end = keys.end;
2222 
2223 	if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2224 		begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2225 	if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2226 		end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2227 
2228 	KeyRangeRef r = KeyRangeRef(begin, end);
2229 
2230 	if(r.empty()) {
2231 		return;
2232 	}
2233 
2234 	tr.transaction.read_conflict_ranges.push_back_deep( tr.arena, r );
2235 }
2236 
makeSelfConflicting()2237 void Transaction::makeSelfConflicting() {
2238 	BinaryWriter wr(Unversioned());
2239 	wr.serializeBytes(LiteralStringRef("\xFF/SC/"));
2240 	wr << g_random->randomUniqueID();
2241 	auto r = singleKeyRange( wr.toValue(), tr.arena );
2242 	tr.transaction.read_conflict_ranges.push_back( tr.arena, r );
2243 	tr.transaction.write_conflict_ranges.push_back( tr.arena, r );
2244 }
2245 
set(const KeyRef & key,const ValueRef & value,bool addConflictRange)2246 void Transaction::set( const KeyRef& key, const ValueRef& value, bool addConflictRange ) {
2247 
2248 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2249 		throw key_too_large();
2250 	if(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
2251 		throw value_too_large();
2252 
2253 	auto &req = tr;
2254 	auto &t = req.transaction;
2255 	auto r = singleKeyRange( key, req.arena );
2256 	auto v = ValueRef( req.arena, value );
2257 	t.mutations.push_back( req.arena, MutationRef( MutationRef::SetValue, r.begin, v ) );
2258 
2259 	if( addConflictRange ) {
2260 		t.write_conflict_ranges.push_back( req.arena, r );
2261 	}
2262 }
2263 
atomicOp(const KeyRef & key,const ValueRef & operand,MutationRef::Type operationType,bool addConflictRange)2264 void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationRef::Type operationType, bool addConflictRange) {
2265 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2266 		throw key_too_large();
2267 	if(operand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
2268 		throw value_too_large();
2269 
2270 	if (apiVersionAtLeast(510)) {
2271 		if (operationType == MutationRef::Min)
2272 			operationType = MutationRef::MinV2;
2273 		else if (operationType == MutationRef::And)
2274 			operationType = MutationRef::AndV2;
2275 	}
2276 
2277 	auto &req = tr;
2278 	auto &t = req.transaction;
2279 	auto r = singleKeyRange( key, req.arena );
2280 	auto v = ValueRef( req.arena, operand );
2281 
2282 	t.mutations.push_back( req.arena, MutationRef( operationType, r.begin, v ) );
2283 
2284 	if( addConflictRange )
2285 		t.write_conflict_ranges.push_back( req.arena, r );
2286 
2287 	TEST(true); //NativeAPI atomic operation
2288 }
2289 
clear(const KeyRangeRef & range,bool addConflictRange)2290 void Transaction::clear( const KeyRangeRef& range, bool addConflictRange ) {
2291 	auto &req = tr;
2292 	auto &t = req.transaction;
2293 
2294 	KeyRef begin = range.begin;
2295 	KeyRef end = range.end;
2296 
2297 	//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
2298 	//we can translate it to an equivalent one with smaller keys
2299 	if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2300 		begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2301 	if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2302 		end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2303 
2304 	auto r = KeyRangeRef( req.arena, KeyRangeRef(begin, end) );
2305 	if (r.empty()) return;
2306 
2307 	t.mutations.push_back( req.arena, MutationRef( MutationRef::ClearRange, r.begin, r.end ) );
2308 
2309 	if(addConflictRange)
2310 		t.write_conflict_ranges.push_back( req.arena, r );
2311 }
clear(const KeyRef & key,bool addConflictRange)2312 void Transaction::clear( const KeyRef& key, bool addConflictRange ) {
2313 
2314 	//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT
2315 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2316 		return;
2317 
2318 	auto &req = tr;
2319 	auto &t = req.transaction;
2320 
2321 	//efficient single key range clear range mutation, see singleKeyRange
2322 	uint8_t* data = new ( req.arena ) uint8_t[ key.size()+1 ];
2323 	memcpy(data, key.begin(), key.size() );
2324 	data[key.size()] = 0;
2325 	t.mutations.push_back( req.arena, MutationRef( MutationRef::ClearRange, KeyRef(data,key.size()), KeyRef(data, key.size()+1)) );
2326 
2327 	if(addConflictRange)
2328 		t.write_conflict_ranges.push_back( req.arena, KeyRangeRef( KeyRef(data,key.size()), KeyRef(data, key.size()+1) ) );
2329 }
addWriteConflictRange(const KeyRangeRef & keys)2330 void Transaction::addWriteConflictRange( const KeyRangeRef& keys ) {
2331 	ASSERT( !keys.empty() );
2332 	auto &req = tr;
2333 	auto &t = req.transaction;
2334 
2335 	//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
2336 	//we can translate it to an equivalent one with smaller keys
2337 	KeyRef begin = keys.begin;
2338 	KeyRef end = keys.end;
2339 
2340 	if (begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2341 		begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT) + 1);
2342 	if (end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2343 		end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT) + 1);
2344 
2345 	KeyRangeRef r = KeyRangeRef(begin, end);
2346 
2347 	if (r.empty()) {
2348 		return;
2349 	}
2350 
2351 	t.write_conflict_ranges.push_back_deep( req.arena, r );
2352 }
2353 
getBackoff(int errCode)2354 double Transaction::getBackoff(int errCode) {
2355 	double b = backoff * g_random->random01();
2356 	backoff = errCode == error_code_proxy_memory_limit_exceeded ? std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, CLIENT_KNOBS->RESOURCE_CONSTRAINED_MAX_BACKOFF) :
2357 				std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, options.maxBackoff);
2358 	return b;
2359 }
2360 
TransactionOptions(Database const & cx)2361 TransactionOptions::TransactionOptions(Database const& cx) {
2362 	maxBackoff = cx->transactionMaxBackoff;
2363 	reset(cx);
2364 	if (BUGGIFY) {
2365 		commitOnFirstProxy = true;
2366 	}
2367 }
2368 
TransactionOptions()2369 TransactionOptions::TransactionOptions() {
2370 	memset(this, 0, sizeof(*this));
2371 	maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF;
2372 }
2373 
reset(Database const & cx)2374 void TransactionOptions::reset(Database const& cx) {
2375 	double oldMaxBackoff = maxBackoff;
2376 	memset(this, 0, sizeof(*this));
2377 	maxBackoff = cx->apiVersionAtLeast(610) ? oldMaxBackoff : cx->transactionMaxBackoff;
2378 	lockAware = cx->lockAware;
2379 }
2380 
reset()2381 void Transaction::reset() {
2382 	tr = CommitTransactionRequest();
2383 	readVersion = Future<Version>();
2384 	metadataVersion = Promise<Optional<Key>>();
2385 	extraConflictRanges.clear();
2386 	versionstampPromise = Promise<Standalone<StringRef>>();
2387 	commitResult = Promise<Void>();
2388 	committing = Future<Void>();
2389 	info.taskID = cx->taskID;
2390 	info.debugID = Optional<UID>();
2391 	flushTrLogsIfEnabled();
2392 	trLogInfo = Reference<TransactionLogInfo>(createTrLogInfoProbabilistically(cx));
2393 	cancelWatches();
2394 
2395 	if(apiVersionAtLeast(16)) {
2396 		options.reset(cx);
2397 		setPriority(GetReadVersionRequest::PRIORITY_DEFAULT);
2398 	}
2399 }
2400 
fullReset()2401 void Transaction::fullReset() {
2402 	reset();
2403 	backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
2404 	options.maxBackoff = getDatabase()->transactionMaxBackoff;
2405 }
2406 
apiVersionAtLeast(int minVersion) const2407 int Transaction::apiVersionAtLeast(int minVersion) const {
2408 	return cx->apiVersionAtLeast(minVersion);
2409 }
2410 
2411 class MutationBlock {
2412 public:
2413 	bool mutated;
2414 	bool cleared;
2415 	ValueRef setValue;
2416 
MutationBlock()2417 	MutationBlock() : mutated(false) {}
MutationBlock(bool _cleared)2418 	MutationBlock(bool _cleared) : mutated(true), cleared(_cleared) {}
MutationBlock(ValueRef value)2419 	MutationBlock(ValueRef value) : mutated(true), cleared(false), setValue(value) {}
2420 };
2421 
compareBegin(KeyRangeRef lhs,KeyRangeRef rhs)2422 bool compareBegin( KeyRangeRef lhs, KeyRangeRef rhs ) { return lhs.begin < rhs.begin; }
2423 
2424 // If there is any intersection between the two given sets of ranges, returns a range that
2425 //   falls within the intersection
intersects(VectorRef<KeyRangeRef> lhs,VectorRef<KeyRangeRef> rhs)2426 Optional<KeyRangeRef> intersects(VectorRef<KeyRangeRef> lhs, VectorRef<KeyRangeRef> rhs) {
2427 	if( lhs.size() && rhs.size() ) {
2428 		std::sort( lhs.begin(), lhs.end(), compareBegin );
2429 		std::sort( rhs.begin(), rhs.end(), compareBegin );
2430 
2431 		int l = 0, r = 0;
2432 		while(l < lhs.size() && r < rhs.size()) {
2433 			if( lhs[l].end <= rhs[r].begin )
2434 				l++;
2435 			else if( rhs[r].end <= lhs[l].begin )
2436 				r++;
2437 			else
2438 				return lhs[l] & rhs[r];
2439 		}
2440 	}
2441 
2442 	return Optional<KeyRangeRef>();
2443 }
2444 
checkWrites(Database cx,Future<Void> committed,Promise<Void> outCommitted,CommitTransactionRequest req,Transaction * checkTr)2445 ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCommitted, CommitTransactionRequest req, Transaction* checkTr )
2446 {
2447 	state Version version;
2448 	try {
2449 		wait( committed );
2450 		// If the commit is successful, by definition the transaction still exists for now.  Grab the version, and don't use it again.
2451 		version = checkTr->getCommittedVersion();
2452 		outCommitted.send(Void());
2453 	} catch (Error& e) {
2454 		outCommitted.sendError(e);
2455 		return;
2456 	}
2457 
2458 	wait( delay( g_random->random01() ) ); // delay between 0 and 1 seconds
2459 
2460 	//Future<Optional<Version>> version, Database cx, CommitTransactionRequest req ) {
2461 	state KeyRangeMap<MutationBlock> expectedValues;
2462 
2463 	auto &mutations = req.transaction.mutations;
2464 	state int mCount = mutations.size(); // debugging info for traceEvent
2465 
2466 	for( int idx = 0; idx < mutations.size(); idx++) {
2467 		if( mutations[idx].type == MutationRef::SetValue )
2468 			expectedValues.insert( singleKeyRange( mutations[idx].param1 ),
2469 				MutationBlock( mutations[idx].param2 ) );
2470 		else if( mutations[idx].type == MutationRef::ClearRange )
2471 			expectedValues.insert( KeyRangeRef( mutations[idx].param1, mutations[idx].param2 ),
2472 				MutationBlock( true ) );
2473 	}
2474 
2475 	try {
2476 		state Transaction tr(cx);
2477 		tr.setVersion( version );
2478 		state int checkedRanges = 0;
2479 		state KeyRangeMap<MutationBlock>::Ranges ranges = expectedValues.ranges();
2480 		state KeyRangeMap<MutationBlock>::Iterator it = ranges.begin();
2481 		for(; it != ranges.end(); ++it) {
2482 			state MutationBlock m = it->value();
2483 			if( m.mutated ) {
2484 				checkedRanges++;
2485 				if( m.cleared ) {
2486 					Standalone<RangeResultRef> shouldBeEmpty = wait(
2487 						tr.getRange( it->range(), 1 ) );
2488 					if( shouldBeEmpty.size() ) {
2489 						TraceEvent(SevError, "CheckWritesFailed").detail("Class", "Clear").detail("KeyBegin", it->range().begin)
2490 							.detail("KeyEnd", it->range().end);
2491 						return;
2492 					}
2493 				} else {
2494 					Optional<Value> val = wait( tr.get( it->range().begin ) );
2495 					if( !val.present() || val.get() != m.setValue ) {
2496 						TraceEvent evt = TraceEvent(SevError, "CheckWritesFailed")
2497 							.detail("Class", "Set")
2498 							.detail("Key", it->range().begin)
2499 							.detail("Expected", m.setValue);
2500 						if( !val.present() )
2501 							evt.detail("Actual", "_Value Missing_");
2502 						else
2503 							evt.detail("Actual", val.get());
2504 						return;
2505 					}
2506 				}
2507 			}
2508 		}
2509 		TraceEvent("CheckWritesSuccess").detail("Version", version).detail("MutationCount", mCount).detail("CheckedRanges", checkedRanges);
2510 	} catch( Error& e ) {
2511 		bool ok = e.code() == error_code_transaction_too_old || e.code() == error_code_future_version;
2512 		TraceEvent( ok ? SevWarn : SevError, "CheckWritesFailed" ).error(e);
2513 		throw;
2514 	}
2515 }
2516 
commitDummyTransaction(Database cx,KeyRange range,TransactionInfo info,TransactionOptions options)2517 ACTOR static Future<Void> commitDummyTransaction( Database cx, KeyRange range, TransactionInfo info, TransactionOptions options ) {
2518 	state Transaction tr(cx);
2519 	state int retries = 0;
2520 	loop {
2521 		try {
2522 			TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries);
2523 			tr.options = options;
2524 			tr.info.taskID = info.taskID;
2525 			tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
2526 			tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY );
2527 			tr.setOption( FDBTransactionOptions::LOCK_AWARE );
2528 			tr.addReadConflictRange(range);
2529 			tr.addWriteConflictRange(range);
2530 			wait( tr.commit() );
2531 			return Void();
2532 		} catch (Error& e) {
2533 			TraceEvent("CommitDummyTransactionError").error(e,true).detail("Key", range.begin).detail("Retries", retries);
2534 			wait( tr.onError(e) );
2535 		}
2536 		++retries;
2537 	}
2538 }
2539 
cancelWatches(Error const & e)2540 void Transaction::cancelWatches(Error const& e) {
2541 	for(int i = 0; i < watches.size(); ++i)
2542 		if(!watches[i]->onChangeTrigger.isSet())
2543 			watches[i]->onChangeTrigger.sendError(e);
2544 
2545 	watches.clear();
2546 }
2547 
setupWatches()2548 void Transaction::setupWatches() {
2549 	try {
2550 		Future<Version> watchVersion = getCommittedVersion() > 0 ? getCommittedVersion() : getReadVersion();
2551 
2552 		for(int i = 0; i < watches.size(); ++i)
2553 			watches[i]->setWatch(watchValue( watchVersion, watches[i]->key, watches[i]->value, cx, options.getReadVersionFlags, info ));
2554 
2555 		watches.clear();
2556 	}
2557 	catch(Error&) {
2558 		ASSERT(false); // The above code must NOT throw because commit has already occured.
2559 		throw internal_error();
2560 	}
2561 }
2562 
tryCommit(Database cx,Reference<TransactionLogInfo> trLogInfo,CommitTransactionRequest req,Future<Version> readVersion,TransactionInfo info,Version * pCommittedVersion,Transaction * tr,TransactionOptions options)2563 ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
2564 	state TraceInterval interval( "TransactionCommit" );
2565 	state double startTime;
2566 	if (info.debugID.present())
2567 		TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
2568 
2569 	try {
2570 		Version v = wait( readVersion );
2571 		req.transaction.read_snapshot = v;
2572 
2573 		startTime = now();
2574 		state Optional<UID> commitID = Optional<UID>();
2575 		if(info.debugID.present()) {
2576 			commitID = g_nondeterministic_random->randomUniqueID();
2577 			g_traceBatch.addAttach("CommitAttachID", info.debugID.get().first(), commitID.get().first());
2578 			g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.Before");
2579 		}
2580 
2581 		req.debugID = commitID;
2582 		state Future<CommitID> reply;
2583 		if (options.commitOnFirstProxy) {
2584 			const std::vector<MasterProxyInterface>& proxies = cx->clientInfo->get().proxies;
2585 			reply = proxies.size() ? throwErrorOr ( brokenPromiseToMaybeDelivered ( proxies[0].commit.tryGetReply(req) ) ) : Never();
2586 		} else {
2587 			reply = loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true );
2588 		}
2589 
2590 		choose {
2591 			when ( wait( cx->onMasterProxiesChanged() ) ) {
2592 				reply.cancel();
2593 				throw request_maybe_delivered();
2594 			}
2595 			when (CommitID ci = wait( reply )) {
2596 				Version v = ci.version;
2597 				if (v != invalidVersion) {
2598 					if (info.debugID.present())
2599 						TraceEvent(interval.end()).detail("CommittedVersion", v);
2600 					*pCommittedVersion = v;
2601 					if(v > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
2602 						cx->mvCacheInsertLocation = (cx->mvCacheInsertLocation + 1)%cx->metadataVersionCache.size();
2603 						cx->metadataVersionCache[cx->mvCacheInsertLocation] = std::make_pair(v, ci.metadataVersion);
2604 					}
2605 
2606 					Standalone<StringRef> ret = makeString(10);
2607 					placeVersionstamp(mutateString(ret), v, ci.txnBatchId);
2608 					tr->versionstampPromise.send(ret);
2609 
2610 					tr->numErrors = 0;
2611 					cx->transactionsCommitCompleted++;
2612 					cx->transactionCommittedMutations += req.transaction.mutations.size();
2613 					cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize();
2614 
2615 					if(info.debugID.present())
2616 						g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
2617 
2618 					double latency = now() - startTime;
2619 					cx->commitLatencies.addSample(latency);
2620 					cx->latencies.addSample(now() - tr->startTime);
2621 					if (trLogInfo)
2622 						trLogInfo->addLog(FdbClientLogEvents::EventCommit(startTime, latency, req.transaction.mutations.size(), req.transaction.mutations.expectedSize(), req));
2623 					return Void();
2624 				} else {
2625 					if (info.debugID.present())
2626 						TraceEvent(interval.end()).detail("Conflict", 1);
2627 
2628 					if(info.debugID.present())
2629 						g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
2630 
2631 					throw not_committed();
2632 				}
2633 			}
2634 		}
2635 	} catch (Error& e) {
2636 		if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) {
2637 			// We don't know if the commit happened, and it might even still be in flight.
2638 
2639 			if (!options.causalWriteRisky) {
2640 				// Make sure it's not still in flight, either by ensuring the master we submitted to is dead, or the version we submitted with is dead, or by committing a conflicting transaction successfully
2641 				//if ( cx->getMasterProxies()->masterGeneration <= originalMasterGeneration )
2642 
2643 				// To ensure the original request is not in flight, we need a key range which intersects its read conflict ranges
2644 				// We pick a key range which also intersects its write conflict ranges, since that avoids potentially creating conflicts where there otherwise would be none
2645 				// We make the range as small as possible (a single key range) to minimize conflicts
2646 				// The intersection will never be empty, because if it were (since !causalWriteRisky) makeSelfConflicting would have been applied automatically to req
2647 				KeyRangeRef selfConflictingRange = intersects( req.transaction.write_conflict_ranges, req.transaction.read_conflict_ranges ).get();
2648 
2649 				TEST(true);  // Waiting for dummy transaction to report commit_unknown_result
2650 
2651 				wait( commitDummyTransaction( cx, singleKeyRange(selfConflictingRange.begin), info, tr->options ) );
2652 			}
2653 
2654 			// The user needs to be informed that we aren't sure whether the commit happened.  Standard retry loops retry it anyway (relying on transaction idempotence) but a client might do something else.
2655 			throw commit_unknown_result();
2656 		} else {
2657 			if (e.code() != error_code_transaction_too_old && e.code() != error_code_not_committed && e.code() != error_code_database_locked && e.code() != error_code_proxy_memory_limit_exceeded)
2658 				TraceEvent(SevError, "TryCommitError").error(e);
2659 			if (trLogInfo)
2660 				trLogInfo->addLog(FdbClientLogEvents::EventCommitError(startTime, static_cast<int>(e.code()), req));
2661 			throw;
2662 		}
2663 	}
2664 }
2665 
commitMutations()2666 Future<Void> Transaction::commitMutations() {
2667 	try {
2668 		//if this is a read-only transaction return immediately
2669 		if( !tr.transaction.write_conflict_ranges.size() && !tr.transaction.mutations.size() ) {
2670 			numErrors = 0;
2671 
2672 			committedVersion = invalidVersion;
2673 			versionstampPromise.sendError(no_commit_version());
2674 			return Void();
2675 		}
2676 
2677 		cx->transactionsCommitStarted++;
2678 
2679 		if(options.readOnly)
2680 			return transaction_read_only();
2681 
2682 		cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
2683 		cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
2684 
2685 		size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize();
2686 		if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
2687 			TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction")
2688 				.suppressFor(1.0)
2689 				.detail("Size", transactionSize)
2690 				.detail("NumMutations", tr.transaction.mutations.size())
2691 				.detail("ReadConflictSize", tr.transaction.read_conflict_ranges.expectedSize())
2692 				.detail("WriteConflictSize", tr.transaction.write_conflict_ranges.expectedSize());
2693 		}
2694 
2695 		if(!apiVersionAtLeast(300)) {
2696 			transactionSize = tr.transaction.mutations.expectedSize(); // Old API versions didn't account for conflict ranges when determining whether to throw transaction_too_large
2697 		}
2698 
2699 		if (transactionSize > (options.customTransactionSizeLimit == 0 ? (uint64_t)CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT : (uint64_t)options.customTransactionSizeLimit))
2700 			return transaction_too_large();
2701 
2702 		if( !readVersion.isValid() )
2703 			getReadVersion( GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY ); // sets up readVersion field.  We had no reads, so no need for (expensive) full causal consistency.
2704 
2705 		bool isCheckingWrites = options.checkWritesEnabled && g_random->random01() < 0.01;
2706 		for(int i=0; i<extraConflictRanges.size(); i++)
2707 			if (extraConflictRanges[i].isReady() && extraConflictRanges[i].get().first < extraConflictRanges[i].get().second )
2708 				tr.transaction.read_conflict_ranges.push_back( tr.arena, KeyRangeRef(extraConflictRanges[i].get().first, extraConflictRanges[i].get().second) );
2709 
2710 		if( !options.causalWriteRisky && !intersects( tr.transaction.write_conflict_ranges, tr.transaction.read_conflict_ranges ).present() )
2711 			makeSelfConflicting();
2712 
2713 		if (isCheckingWrites) {
2714 			// add all writes into the read conflict range...
2715 			tr.transaction.read_conflict_ranges.append( tr.arena, tr.transaction.write_conflict_ranges.begin(), tr.transaction.write_conflict_ranges.size() );
2716 		}
2717 
2718 		if ( options.debugDump ) {
2719 			UID u = g_nondeterministic_random->randomUniqueID();
2720 			TraceEvent("TransactionDump", u);
2721 			for(auto i=tr.transaction.mutations.begin(); i!=tr.transaction.mutations.end(); ++i)
2722 				TraceEvent("TransactionMutation", u).detail("T", i->type).detail("P1", i->param1).detail("P2", i->param2);
2723 		}
2724 
2725 		if(options.lockAware) {
2726 			tr.flags = tr.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
2727 		}
2728 		if(options.firstInBatch) {
2729 			tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
2730 		}
2731 
2732 		Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
2733 
2734 		if (isCheckingWrites) {
2735 			Promise<Void> committed;
2736 			checkWrites( cx, commitResult, committed, tr, this );
2737 			return committed.getFuture();
2738 		}
2739 		return commitResult;
2740 	} catch( Error& e ) {
2741 		TraceEvent("ClientCommitError").error(e);
2742 		return Future<Void>( e );
2743 	} catch( ... ) {
2744 		Error e( error_code_unknown_error );
2745 		TraceEvent("ClientCommitError").error(e);
2746 		return Future<Void>( e );
2747 	}
2748 }
2749 
commitAndWatch(Transaction * self)2750 ACTOR Future<Void> commitAndWatch(Transaction *self) {
2751 	try {
2752 		wait(self->commitMutations());
2753 
2754 		if(!self->watches.empty()) {
2755 			self->setupWatches();
2756 		}
2757 
2758 		self->reset();
2759 		return Void();
2760 	}
2761 	catch(Error &e) {
2762 		if(e.code() != error_code_actor_cancelled) {
2763 			if(!self->watches.empty()) {
2764 				self->cancelWatches(e);
2765 			}
2766 
2767 			self->versionstampPromise.sendError(transaction_invalid_version());
2768 			self->reset();
2769 		}
2770 
2771 		throw;
2772 	}
2773 }
2774 
commit()2775 Future<Void> Transaction::commit() {
2776 	ASSERT(!committing.isValid());
2777 	committing = commitAndWatch(this);
2778 	return committing;
2779 }
2780 
setPriority(uint32_t priorityFlag)2781 void Transaction::setPriority( uint32_t priorityFlag ) {
2782 	options.getReadVersionFlags = (options.getReadVersionFlags & ~GetReadVersionRequest::FLAG_PRIORITY_MASK) | priorityFlag;
2783 }
2784 
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)2785 void Transaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
2786 	switch(option) {
2787 		case FDBTransactionOptions::INITIALIZE_NEW_DATABASE:
2788 			validateOptionValue(value, false);
2789 			if(readVersion.isValid())
2790 				throw read_version_already_set();
2791 			readVersion = Version(0);
2792 			options.causalWriteRisky = true;
2793 			break;
2794 
2795 		case FDBTransactionOptions::CAUSAL_READ_RISKY:
2796 			validateOptionValue(value, false);
2797 			options.getReadVersionFlags |= GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY;
2798 			break;
2799 
2800 		case FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE:
2801 			validateOptionValue(value, false);
2802 			setPriority(GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE);
2803 			break;
2804 
2805 		case FDBTransactionOptions::PRIORITY_BATCH:
2806 			validateOptionValue(value, false);
2807 			setPriority(GetReadVersionRequest::PRIORITY_BATCH);
2808 			break;
2809 
2810 		case FDBTransactionOptions::CAUSAL_WRITE_RISKY:
2811 			validateOptionValue(value, false);
2812 			options.causalWriteRisky = true;
2813 			break;
2814 
2815 		case FDBTransactionOptions::COMMIT_ON_FIRST_PROXY:
2816 			validateOptionValue(value, false);
2817 			options.commitOnFirstProxy = true;
2818 			break;
2819 
2820 		case FDBTransactionOptions::CHECK_WRITES_ENABLE:
2821 			validateOptionValue(value, false);
2822 			options.checkWritesEnabled = true;
2823 			break;
2824 
2825 		case FDBTransactionOptions::DEBUG_DUMP:
2826 			validateOptionValue(value, false);
2827 			options.debugDump = true;
2828 			break;
2829 
2830 		case FDBTransactionOptions::TRANSACTION_LOGGING_ENABLE:
2831 			setOption(FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER, value);
2832 			setOption(FDBTransactionOptions::LOG_TRANSACTION);
2833 			break;
2834 
2835 		case FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER:
2836 			validateOptionValue(value, true);
2837 
2838 			if (value.get().size() > 100) {
2839 				throw invalid_option_value();
2840 			}
2841 
2842 			if (trLogInfo) {
2843 				if (trLogInfo->identifier.empty()) {
2844 					trLogInfo->identifier = printable(value.get());
2845 				}
2846 				else if (trLogInfo->identifier != printable(value.get())) {
2847 					TraceEvent(SevWarn, "CannotChangeDebugTransactionIdentifier").detail("PreviousIdentifier", trLogInfo->identifier).detail("NewIdentifier", value.get());
2848 					throw client_invalid_operation();
2849 				}
2850 			}
2851 			else {
2852 				trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(printable(value.get()), TransactionLogInfo::DONT_LOG));
2853 			}
2854 			break;
2855 
2856 		case FDBTransactionOptions::LOG_TRANSACTION:
2857 			validateOptionValue(value, false);
2858 			if (trLogInfo) {
2859 				trLogInfo->logTo(TransactionLogInfo::TRACE_LOG);
2860 			}
2861 			else {
2862 				TraceEvent(SevWarn, "DebugTransactionIdentifierNotSet").detail("Error", "Debug Transaction Identifier option must be set before logging the transaction");
2863 				throw client_invalid_operation();
2864 			}
2865 			break;
2866 
2867 		case FDBTransactionOptions::MAX_RETRY_DELAY:
2868 			validateOptionValue(value, true);
2869 			options.maxBackoff = extractIntOption(value, 0, std::numeric_limits<int32_t>::max()) / 1000.0;
2870 			break;
2871 
2872 		case FDBTransactionOptions::LOCK_AWARE:
2873 			validateOptionValue(value, false);
2874 			options.lockAware = true;
2875 			options.readOnly = false;
2876 			break;
2877 
2878 		case FDBTransactionOptions::READ_LOCK_AWARE:
2879 			validateOptionValue(value, false);
2880 			if(!options.lockAware) {
2881 				options.lockAware = true;
2882 				options.readOnly = true;
2883 			}
2884 			break;
2885 
2886 		case FDBTransactionOptions::FIRST_IN_BATCH:
2887 			validateOptionValue(value, false);
2888 			options.firstInBatch = true;
2889 			break;
2890 
2891 		case FDBTransactionOptions::USE_PROVISIONAL_PROXIES:
2892 			validateOptionValue(value, false);
2893 			options.getReadVersionFlags |= GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES;
2894 			info.useProvisionalProxies = true;
2895 			break;
2896 
2897 		default:
2898 			break;
2899 	}
2900 }
2901 
getConsistentReadVersion(DatabaseContext * cx,uint32_t transactionCount,uint32_t flags,Optional<UID> debugID)2902 ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional<UID> debugID ) {
2903 	try {
2904 		if( debugID.present() )
2905 			g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
2906 		loop {
2907 			state GetReadVersionRequest req( transactionCount, flags, debugID );
2908 			choose {
2909 				when ( wait( cx->onMasterProxiesChanged() ) ) {}
2910 				when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
2911 					if( debugID.present() )
2912 						g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After");
2913 					ASSERT( v.version > 0 );
2914 					return v;
2915 				}
2916 			}
2917 		}
2918 	} catch (Error& e) {
2919 		if( e.code() != error_code_broken_promise )
2920 			TraceEvent(SevError, "GetConsistentReadVersionError").error(e);
2921 		throw;
2922 	}
2923 }
2924 
readVersionBatcher(DatabaseContext * cx,FutureStream<std::pair<Promise<GetReadVersionReply>,Optional<UID>>> versionStream,uint32_t flags)2925 ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > versionStream, uint32_t flags ) {
2926 	state std::vector< Promise<GetReadVersionReply> > requests;
2927 	state PromiseStream< Future<Void> > addActor;
2928 	state Future<Void> collection = actorCollection( addActor.getFuture() );
2929 	state Future<Void> timeout;
2930 	state Optional<UID> debugID;
2931 	state bool send_batch;
2932 
2933 	// dynamic batching
2934 	state PromiseStream<double> replyTimes;
2935 	state PromiseStream<Error> _errorStream;
2936 	state double batchTime = 0;
2937 	state double lastRequestTime = now();
2938 
2939 	loop {
2940 		send_batch = false;
2941 		choose {
2942 			when(std::pair< Promise<GetReadVersionReply>, Optional<UID> > req = waitNext(versionStream)) {
2943 				if (req.second.present()) {
2944 					if (!debugID.present())
2945 						debugID = g_nondeterministic_random->randomUniqueID();
2946 					g_traceBatch.addAttach("TransactionAttachID", req.second.get().first(), debugID.get().first());
2947 				}
2948 				requests.push_back(req.first);
2949 				if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
2950 					send_batch = true;
2951 				else if (!timeout.isValid())
2952 					timeout = delay(batchTime, TaskProxyGetConsistentReadVersion);
2953 			}
2954 			when(wait(timeout.isValid() ? timeout : Never())) {
2955 				send_batch = true;
2956 			}
2957 			// dynamic batching monitors reply latencies
2958 			when(double reply_latency = waitNext(replyTimes.getFuture())){
2959 				double target_latency = reply_latency * 0.5;
2960 				batchTime = min(0.1 * target_latency + 0.9 * batchTime, CLIENT_KNOBS->GRV_BATCH_TIMEOUT);
2961 			}
2962 			when(wait(collection)){} // for errors
2963 		}
2964 		if (send_batch) {
2965 			int count = requests.size();
2966 			ASSERT(count);
2967 
2968 			// dynamic batching
2969 			Promise<GetReadVersionReply> GRVReply;
2970 			requests.push_back(GRVReply);
2971 			addActor.send(timeReply(GRVReply.getFuture(), replyTimes));
2972 
2973 			Future<Void> batch =
2974 				incrementalBroadcast(
2975 					getConsistentReadVersion(cx, count, flags, std::move(debugID)),
2976 					std::vector< Promise<GetReadVersionReply> >(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
2977 			debugID = Optional<UID>();
2978 			requests = std::vector< Promise<GetReadVersionReply> >();
2979 			addActor.send(batch);
2980 			timeout = Future<Void>();
2981 		}
2982 	}
2983 }
2984 
extractReadVersion(DatabaseContext * cx,Reference<TransactionLogInfo> trLogInfo,Future<GetReadVersionReply> f,bool lockAware,double startTime,Promise<Optional<Value>> metadataVersion)2985 ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion) {
2986 	GetReadVersionReply rep = wait(f);
2987 	double latency = now() - startTime;
2988 	cx->GRVLatencies.addSample(latency);
2989 	if (trLogInfo)
2990 		trLogInfo->addLog(FdbClientLogEvents::EventGetVersion(startTime, latency));
2991 	if(rep.locked && !lockAware)
2992 		throw database_locked();
2993 
2994 	if(rep.version > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
2995 		cx->mvCacheInsertLocation = (cx->mvCacheInsertLocation + 1)%cx->metadataVersionCache.size();
2996 		cx->metadataVersionCache[cx->mvCacheInsertLocation] = std::make_pair(rep.version, rep.metadataVersion);
2997 	}
2998 
2999 	metadataVersion.send(rep.metadataVersion);
3000 	return rep.version;
3001 }
3002 
getReadVersion(uint32_t flags)3003 Future<Version> Transaction::getReadVersion(uint32_t flags) {
3004 	cx->transactionReadVersions++;
3005 	flags |= options.getReadVersionFlags;
3006 
3007 	auto& batcher = cx->versionBatcher[ flags ];
3008 	if (!batcher.actor.isValid()) {
3009 		batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), flags );
3010 	}
3011 	if (!readVersion.isValid()) {
3012 		Promise<GetReadVersionReply> p;
3013 		batcher.stream.send( std::make_pair( p, info.debugID ) );
3014 		startTime = now();
3015 		readVersion = extractReadVersion( cx.getPtr(), trLogInfo, p.getFuture(), options.lockAware, startTime, metadataVersion);
3016 	}
3017 	return readVersion;
3018 }
3019 
getVersionstamp()3020 Future<Standalone<StringRef>> Transaction::getVersionstamp() {
3021 	if(committing.isValid()) {
3022 		return transaction_invalid_version();
3023 	}
3024 	return versionstampPromise.getFuture();
3025 }
3026 
onError(Error const & e)3027 Future<Void> Transaction::onError( Error const& e ) {
3028 	if (e.code() == error_code_success)
3029 	{
3030 		return client_invalid_operation();
3031 	}
3032 	if (e.code() == error_code_not_committed ||
3033 		e.code() == error_code_commit_unknown_result ||
3034 		e.code() == error_code_database_locked ||
3035 		e.code() == error_code_proxy_memory_limit_exceeded ||
3036 		e.code() == error_code_process_behind)
3037 	{
3038 		if(e.code() == error_code_not_committed)
3039 			cx->transactionsNotCommitted++;
3040 		if(e.code() == error_code_commit_unknown_result)
3041 			cx->transactionsMaybeCommitted++;
3042 		if (e.code() == error_code_proxy_memory_limit_exceeded)
3043 			cx->transactionsResourceConstrained++;
3044 		if (e.code() == error_code_process_behind)
3045 			cx->transactionsProcessBehind++;
3046 
3047 		double backoff = getBackoff(e.code());
3048 		reset();
3049 		return delay( backoff, info.taskID );
3050 	}
3051 	if (e.code() == error_code_transaction_too_old ||
3052 		e.code() == error_code_future_version)
3053 	{
3054 		if( e.code() == error_code_transaction_too_old )
3055 			cx->transactionsTooOld++;
3056 		else if( e.code() == error_code_future_version )
3057 			cx->transactionsFutureVersions++;
3058 
3059 		double maxBackoff = options.maxBackoff;
3060 		reset();
3061 		return delay( std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), info.taskID );
3062 	}
3063 
3064 	if(g_network->isSimulated() && ++numErrors % 10 == 0)
3065 		TraceEvent(SevWarnAlways, "TransactionTooManyRetries").detail("NumRetries", numErrors);
3066 
3067 	return e;
3068 }
3069 
trackBoundedStorageMetrics(KeyRange keys,Reference<LocationInfo> location,StorageMetrics x,StorageMetrics halfError,PromiseStream<StorageMetrics> deltaStream)3070 ACTOR Future<Void> trackBoundedStorageMetrics(
3071 	KeyRange keys,
3072 	Reference<LocationInfo> location,
3073 	StorageMetrics x,
3074 	StorageMetrics halfError,
3075 	PromiseStream<StorageMetrics> deltaStream)
3076 {
3077 	try {
3078 		loop {
3079 			WaitMetricsRequest req( keys, x - halfError, x + halfError );
3080 			StorageMetrics nextX = wait( loadBalance( location, &StorageServerInterface::waitMetrics, req ) );
3081 			deltaStream.send( nextX - x );
3082 			x = nextX;
3083 		}
3084 	} catch (Error& e) {
3085 		deltaStream.sendError(e);
3086 		throw e;
3087 	}
3088 }
3089 
waitStorageMetricsMultipleLocations(vector<pair<KeyRange,Reference<LocationInfo>>> locations,StorageMetrics min,StorageMetrics max,StorageMetrics permittedError)3090 ACTOR Future< StorageMetrics > waitStorageMetricsMultipleLocations(
3091 	vector< pair<KeyRange,Reference<LocationInfo>> > locations,
3092 	StorageMetrics min,
3093 	StorageMetrics max,
3094 	StorageMetrics permittedError)
3095 {
3096 	state int nLocs = locations.size();
3097 	state vector<Future<StorageMetrics>> fx( nLocs );
3098 	state StorageMetrics total;
3099 	state PromiseStream<StorageMetrics> deltas;
3100 	state vector<Future<Void>> wx( fx.size() );
3101 	state StorageMetrics halfErrorPerMachine = permittedError * (0.5 / nLocs);
3102 	state StorageMetrics maxPlus = max + halfErrorPerMachine * (nLocs-1);
3103 	state StorageMetrics minMinus = min - halfErrorPerMachine * (nLocs-1);
3104 
3105 	for(int i=0; i<nLocs; i++) {
3106 		WaitMetricsRequest req(locations[i].first, StorageMetrics(), StorageMetrics());
3107 		req.min.bytes = 0;
3108 		req.max.bytes = -1;
3109 		fx[i] = loadBalance( locations[i].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution );
3110 	}
3111 	wait( waitForAll(fx) );
3112 
3113 	// invariant: true total is between (total-permittedError/2, total+permittedError/2)
3114 	for(int i=0; i<nLocs; i++)
3115 		total += fx[i].get();
3116 
3117 	if (!total.allLessOrEqual( maxPlus )) return total;
3118 	if (!minMinus.allLessOrEqual( total )) return total;
3119 
3120 	for(int i=0; i<nLocs; i++)
3121 		wx[i] = trackBoundedStorageMetrics( locations[i].first, locations[i].second, fx[i].get(), halfErrorPerMachine, deltas );
3122 
3123 	loop {
3124 		StorageMetrics delta = waitNext(deltas.getFuture());
3125 		total += delta;
3126 		if (!total.allLessOrEqual( maxPlus )) return total;
3127 		if (!minMinus.allLessOrEqual( total )) return total;
3128 	}
3129 }
3130 
waitStorageMetrics(Database cx,KeyRange keys,StorageMetrics min,StorageMetrics max,StorageMetrics permittedError,int shardLimit)3131 ACTOR Future< StorageMetrics > waitStorageMetrics(
3132 	Database cx,
3133 	KeyRange keys,
3134 	StorageMetrics min,
3135 	StorageMetrics max,
3136 	StorageMetrics permittedError,
3137 	int shardLimit )
3138 {
3139 	loop {
3140 		vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskDataDistribution) ) );
3141 
3142 		//SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
3143 		if(locations.size() < shardLimit) {
3144 			try {
3145 				Future<StorageMetrics> fx;
3146 				if (locations.size() > 1) {
3147 					fx = waitStorageMetricsMultipleLocations( locations, min, max, permittedError );
3148 				} else {
3149 					WaitMetricsRequest req( keys, min, max );
3150 					fx = loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution );
3151 				}
3152 				StorageMetrics x = wait(fx);
3153 				return x;
3154 			} catch (Error& e) {
3155 				if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
3156 					TraceEvent(SevError, "WaitStorageMetricsError").error(e);
3157 					throw;
3158 				}
3159 				cx->invalidateCache(keys);
3160 				wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
3161 			}
3162 		} else {
3163 			TraceEvent(SevWarn, "WaitStorageMetricsPenalty")
3164 				.detail("Keys", keys)
3165 				.detail("Limit", CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT)
3166 				.detail("JitteredSecondsOfPenitence", CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY);
3167 			wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
3168 			// make sure that the next getKeyRangeLocations() call will actually re-fetch the range
3169 			cx->invalidateCache( keys );
3170 		}
3171 	}
3172 }
3173 
waitStorageMetrics(KeyRange const & keys,StorageMetrics const & min,StorageMetrics const & max,StorageMetrics const & permittedError,int shardLimit)3174 Future< StorageMetrics > Transaction::waitStorageMetrics(
3175 	KeyRange const& keys,
3176 	StorageMetrics const& min,
3177 	StorageMetrics const& max,
3178 	StorageMetrics const& permittedError,
3179 	int shardLimit )
3180 {
3181 	return ::waitStorageMetrics( cx, keys, min, max, permittedError, shardLimit );
3182 }
3183 
getStorageMetrics(KeyRange const & keys,int shardLimit)3184 Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, int shardLimit ) {
3185 	StorageMetrics m;
3186 	m.bytes = -1;
3187 	return ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit );
3188 }
3189 
splitStorageMetrics(Database cx,KeyRange keys,StorageMetrics limit,StorageMetrics estimated)3190 ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated )
3191 {
3192 	loop {
3193 		state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics, TransactionInfo(TaskDataDistribution) ) );
3194 		state StorageMetrics used;
3195 		state Standalone<VectorRef<KeyRef>> results;
3196 
3197 		//SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
3198 		if(locations.size() == CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT) {
3199 			wait(delay(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
3200 			cx->invalidateCache(keys);
3201 		}
3202 		else {
3203 			results.push_back_deep( results.arena(), keys.begin );
3204 			try {
3205 				//TraceEvent("SplitStorageMetrics").detail("Locations", locations.size());
3206 
3207 				state int i = 0;
3208 				for(; i<locations.size(); i++) {
3209 					SplitMetricsRequest req( locations[i].first, limit, used, estimated, i == locations.size() - 1 );
3210 					SplitMetricsReply res = wait( loadBalance( locations[i].second, &StorageServerInterface::splitMetrics, req, TaskDataDistribution ) );
3211 					if( res.splits.size() && res.splits[0] <= results.back() ) { // split points are out of order, possibly because of moving data, throw error to retry
3212 						ASSERT_WE_THINK(false);   // FIXME: This seems impossible and doesn't seem to be covered by testing
3213 						throw all_alternatives_failed();
3214 					}
3215 					if( res.splits.size() ) {
3216 						results.append( results.arena(), res.splits.begin(), res.splits.size() );
3217 						results.arena().dependsOn( res.splits.arena() );
3218 					}
3219 					used = res.used;
3220 
3221 					//TraceEvent("SplitStorageMetricsResult").detail("Used", used.bytes).detail("Location", i).detail("Size", res.splits.size());
3222 				}
3223 
3224 				if( used.allLessOrEqual( limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT ) ) {
3225 					results.resize(results.arena(), results.size() - 1);
3226 				}
3227 
3228 				results.push_back_deep( results.arena(), keys.end );
3229 				return results;
3230 			} catch (Error& e) {
3231 				if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
3232 					TraceEvent(SevError, "SplitStorageMetricsError").error(e);
3233 					throw;
3234 				}
3235 				cx->invalidateCache( keys );
3236 				wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
3237 			}
3238 		}
3239 	}
3240 }
3241 
splitStorageMetrics(KeyRange const & keys,StorageMetrics const & limit,StorageMetrics const & estimated)3242 Future< Standalone<VectorRef<KeyRef>> > Transaction::splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated ) {
3243 	return ::splitStorageMetrics( cx, keys, limit, estimated );
3244 }
3245 
checkDeferredError()3246 void Transaction::checkDeferredError() { cx->checkDeferredError(); }
3247 
createTrLogInfoProbabilistically(const Database & cx)3248 Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database &cx) {
3249 	if(!cx->isError()) {
3250 		double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : cx->clientInfo->get().clientTxnInfoSampleRate;
3251 		if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) && g_random->random01() < clientSamplingProbability && (!g_network->isSimulated() || !g_simulator.speedUpSimulation)) {
3252 			return Reference<TransactionLogInfo>(new TransactionLogInfo(TransactionLogInfo::DATABASE));
3253 		}
3254 	}
3255 
3256 	return Reference<TransactionLogInfo>();
3257 }
3258 
enableClientInfoLogging()3259 void enableClientInfoLogging() {
3260 	ASSERT(networkOptions.logClientInfo.present() == false);
3261 	networkOptions.logClientInfo = true;
3262 	TraceEvent(SevInfo, "ClientInfoLoggingEnabled");
3263 }
3264