1 /*
2 * NativeAPI.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbclient/DatabaseContext.h"
22 #include "fdbclient/NativeAPI.actor.h"
23 #include "fdbclient/Atomic.h"
24 #include "flow/Platform.h"
25 #include "flow/ActorCollection.h"
26 #include "fdbclient/SystemData.h"
27 #include "fdbrpc/LoadBalance.h"
28 #include "fdbclient/StorageServerInterface.h"
29 #include "fdbclient/MasterProxyInterface.h"
30 #include "fdbclient/ClusterInterface.h"
31 #include "fdbclient/FailureMonitorClient.h"
32 #include "flow/DeterministicRandom.h"
33 #include "fdbclient/KeyRangeMap.h"
34 #include "flow/SystemMonitor.h"
35 #include "fdbclient/MutationList.h"
36 #include "fdbclient/CoordinationInterface.h"
37 #include "fdbclient/MonitorLeader.h"
38 #if defined(CMAKE_BUILD) || !defined(WIN32)
39 #include "versions.h"
40 #endif
41 #include "fdbrpc/TLSConnection.h"
42 #include "flow/Knobs.h"
43 #include "fdbclient/Knobs.h"
44 #include "fdbrpc/Net2FileSystem.h"
45 #include "fdbrpc/simulator.h"
46
47 #include <iterator>
48
49 #ifdef WIN32
50 #define WIN32_LEAN_AND_MEAN
51 #include <Windows.h>
52 #undef min
53 #undef max
54 #else
55 #include <time.h>
56 #endif
57 #include "flow/actorcompiler.h" // This must be the last #include.
58
59 extern IRandom* trace_random;
60 extern const char* getHGVersion();
61
62 using std::make_pair;
63 using std::max;
64 using std::min;
65
66 NetworkOptions networkOptions;
67 Reference<TLSOptions> tlsOptions;
68
initTLSOptions()69 static void initTLSOptions() {
70 if (!tlsOptions) {
71 tlsOptions = Reference<TLSOptions>(new TLSOptions());
72 }
73 }
74
75 static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");
76 static const Key CLIENT_LATENCY_INFO_CTR_PREFIX = LiteralStringRef("client_latency_counter/");
77
getInterface(DatabaseContext * cx,StorageServerInterface const & ssi,LocalityData const & locality)78 Reference<StorageServerInfo> StorageServerInfo::getInterface( DatabaseContext *cx, StorageServerInterface const& ssi, LocalityData const& locality ) {
79 auto it = cx->server_interf.find( ssi.id() );
80 if( it != cx->server_interf.end() ) {
81 if(it->second->interf.getVersion.getEndpoint().token != ssi.getVersion.getEndpoint().token) {
82 if(it->second->interf.locality == ssi.locality) {
83 //FIXME: load balance holds pointers to individual members of the interface, and this assignment will swap out the object they are
84 // pointing to. This is technically correct, but is very unnatural. We may want to refactor load balance to take an AsyncVar<Reference<Interface>>
85 // so that it is notified when the interface changes.
86 it->second->interf = ssi;
87 } else {
88 it->second->notifyContextDestroyed();
89 Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
90 cx->server_interf[ ssi.id() ] = loc.getPtr();
91 return loc;
92 }
93 }
94
95 return Reference<StorageServerInfo>::addRef( it->second );
96 }
97
98 Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
99 cx->server_interf[ ssi.id() ] = loc.getPtr();
100 return loc;
101 }
102
notifyContextDestroyed()103 void StorageServerInfo::notifyContextDestroyed() {
104 cx = NULL;
105 }
106
~StorageServerInfo()107 StorageServerInfo::~StorageServerInfo() {
108 if( cx ) {
109 auto it = cx->server_interf.find( interf.id() );
110 if( it != cx->server_interf.end() )
111 cx->server_interf.erase( it );
112 cx = NULL;
113 }
114 }
115
printable(const VectorRef<KeyValueRef> & val)116 std::string printable( const VectorRef<KeyValueRef>& val ) {
117 std::string s;
118 for(int i=0; i<val.size(); i++)
119 s = s + printable(val[i].key) + format(":%d ",val[i].value.size());
120 return s;
121 }
122
printable(const KeyValueRef & val)123 std::string printable( const KeyValueRef& val ) {
124 return printable(val.key) + format(":%d ",val.value.size());
125 }
126
printable(const VectorRef<StringRef> & val)127 std::string printable( const VectorRef<StringRef>& val ) {
128 std::string s;
129 for(int i=0; i<val.size(); i++)
130 s = s + printable(val[i]) + " ";
131 return s;
132 }
133
printable(const StringRef & val)134 std::string printable( const StringRef& val ) {
135 return val.printable();
136 }
137
printable(const std::string & str)138 std::string printable( const std::string& str ) {
139 return StringRef(str).printable();
140 }
141
printable(const KeyRangeRef & range)142 std::string printable( const KeyRangeRef& range ) {
143 return printable(range.begin) + " - " + printable(range.end);
144 }
145
unhex(char c)146 int unhex( char c ) {
147 if (c >= '0' && c <= '9')
148 return c-'0';
149 if (c >= 'a' && c <= 'f')
150 return c-'a'+10;
151 if (c >= 'A' && c <= 'F')
152 return c-'A'+10;
153 UNREACHABLE();
154 }
155
unprintable(std::string const & val)156 std::string unprintable( std::string const& val ) {
157 std::string s;
158 for(int i=0; i<val.size(); i++) {
159 char c = val[i];
160 if ( c == '\\' ) {
161 if (++i == val.size()) ASSERT(false);
162 if (val[i] == '\\') {
163 s += '\\';
164 } else if (val[i] == 'x') {
165 if (i+2 >= val.size()) ASSERT(false);
166 s += char((unhex(val[i+1])<<4) + unhex(val[i+2]));
167 i += 2;
168 } else
169 ASSERT(false);
170 } else
171 s += c;
172 }
173 return s;
174 }
175
validateVersion(Version version)176 void validateVersion(Version version) {
177 // Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any reads.
178 // We throw client_invalid_operation because the caller didn't directly set the version, so the version_invalid error
179 // might be confusing.
180 if(version == 0) {
181 throw client_invalid_operation();
182 }
183
184 ASSERT(version > 0 || version == latestVersion);
185 }
186
validateOptionValue(Optional<StringRef> value,bool shouldBePresent)187 void validateOptionValue(Optional<StringRef> value, bool shouldBePresent) {
188 if(shouldBePresent && !value.present())
189 throw invalid_option_value();
190 if(!shouldBePresent && value.present() && value.get().size() > 0)
191 throw invalid_option_value();
192 }
193
dumpMutations(const MutationListRef & mutations)194 void dumpMutations( const MutationListRef& mutations ) {
195 for(auto m=mutations.begin(); m; ++m) {
196 switch (m->type) {
197 case MutationRef::SetValue: printf(" '%s' := '%s'\n", printable(m->param1).c_str(), printable(m->param2).c_str()); break;
198 case MutationRef::AddValue: printf(" '%s' += '%s'", printable(m->param1).c_str(), printable(m->param2).c_str()); break;
199 case MutationRef::ClearRange: printf(" Clear ['%s','%s')\n", printable(m->param1).c_str(), printable(m->param2).c_str()); break;
200 default: printf(" Unknown mutation %d('%s','%s')\n", m->type, printable(m->param1).c_str(), printable(m->param2).c_str()); break;
201 }
202 }
203 }
204
addref(DatabaseContext * ptr)205 template <> void addref( DatabaseContext* ptr ) { ptr->addref(); }
delref(DatabaseContext * ptr)206 template <> void delref( DatabaseContext* ptr ) { ptr->delref(); }
207
databaseLogger(DatabaseContext * cx)208 ACTOR Future<Void> databaseLogger( DatabaseContext *cx ) {
209 loop {
210 wait( delay( CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID ) );
211 TraceEvent("TransactionMetrics")
212 .detail("Cluster", cx->cluster && cx->getConnectionFile() ? cx->getConnectionFile()->getConnectionString().clusterKeyName().toString() : "")
213 .detail("ReadVersions", cx->transactionReadVersions)
214 .detail("LogicalUncachedReads", cx->transactionLogicalReads)
215 .detail("PhysicalReadRequests", cx->transactionPhysicalReads)
216 .detail("CommittedMutations", cx->transactionCommittedMutations)
217 .detail("CommittedMutationBytes", cx->transactionCommittedMutationBytes)
218 .detail("CommitStarted", cx->transactionsCommitStarted)
219 .detail("CommitCompleted", cx->transactionsCommitCompleted)
220 .detail("TooOld", cx->transactionsTooOld)
221 .detail("FutureVersions", cx->transactionsFutureVersions)
222 .detail("NotCommitted", cx->transactionsNotCommitted)
223 .detail("MaybeCommitted", cx->transactionsMaybeCommitted)
224 .detail("ResourceConstrained", cx->transactionsResourceConstrained)
225 .detail("ProcessBehind", cx->transactionsProcessBehind)
226 .detail("MeanLatency", cx->latencies.mean())
227 .detail("MedianLatency", cx->latencies.median())
228 .detail("Latency90", cx->latencies.percentile(0.90))
229 .detail("Latency98", cx->latencies.percentile(0.98))
230 .detail("MaxLatency", cx->latencies.max())
231 .detail("MeanRowReadLatency", cx->readLatencies.mean())
232 .detail("MedianRowReadLatency", cx->readLatencies.median())
233 .detail("MaxRowReadLatency", cx->readLatencies.max())
234 .detail("MeanGRVLatency", cx->GRVLatencies.mean())
235 .detail("MedianGRVLatency", cx->GRVLatencies.median())
236 .detail("MaxGRVLatency", cx->GRVLatencies.max())
237 .detail("MeanCommitLatency", cx->commitLatencies.mean())
238 .detail("MedianCommitLatency", cx->commitLatencies.median())
239 .detail("MaxCommitLatency", cx->commitLatencies.max())
240 .detail("MeanMutationsPerCommit", cx->mutationsPerCommit.mean())
241 .detail("MedianMutationsPerCommit", cx->mutationsPerCommit.median())
242 .detail("MaxMutationsPerCommit", cx->mutationsPerCommit.max())
243 .detail("MeanBytesPerCommit", cx->bytesPerCommit.mean())
244 .detail("MedianBytesPerCommit", cx->bytesPerCommit.median())
245 .detail("MaxBytesPerCommit", cx->bytesPerCommit.max());
246 cx->latencies.clear();
247 cx->readLatencies.clear();
248 cx->GRVLatencies.clear();
249 cx->commitLatencies.clear();
250 cx->mutationsPerCommit.clear();
251 cx->bytesPerCommit.clear();
252 }
253 }
254
getSampleVersionStamp(Transaction * tr)255 ACTOR static Future<Standalone<StringRef> > getSampleVersionStamp(Transaction *tr) {
256 loop{
257 try {
258 tr->reset();
259 tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
260 wait(success(tr->get(LiteralStringRef("\xff/StatusJsonTestKey62793"))));
261 state Future<Standalone<StringRef> > vstamp = tr->getVersionstamp();
262 tr->makeSelfConflicting();
263 wait(tr->commit());
264 Standalone<StringRef> val = wait(vstamp);
265 return val;
266 }
267 catch (Error& e) {
268 wait(tr->onError(e));
269 }
270 }
271 }
272
273 struct TrInfoChunk {
274 ValueRef value;
275 Key key;
276 };
277
transactionInfoCommitActor(Transaction * tr,std::vector<TrInfoChunk> * chunks)278 ACTOR static Future<Void> transactionInfoCommitActor(Transaction *tr, std::vector<TrInfoChunk> *chunks) {
279 state const Key clientLatencyAtomicCtr = CLIENT_LATENCY_INFO_CTR_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin);
280 state int retryCount = 0;
281 loop{
282 try {
283 tr->reset();
284 tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
285 tr->setOption(FDBTransactionOptions::LOCK_AWARE);
286 state Future<Standalone<StringRef> > vstamp = tr->getVersionstamp();
287 int64_t numCommitBytes = 0;
288 for (auto &chunk : *chunks) {
289 tr->atomicOp(chunk.key, chunk.value, MutationRef::SetVersionstampedKey);
290 numCommitBytes += chunk.key.size() + chunk.value.size() - 4; // subtract number of bytes of key that denotes verstion stamp index
291 }
292 tr->atomicOp(clientLatencyAtomicCtr, StringRef((uint8_t*)&numCommitBytes, 8), MutationRef::AddValue);
293 wait(tr->commit());
294 return Void();
295 }
296 catch (Error& e) {
297 retryCount++;
298 if (retryCount == 10)
299 throw;
300 wait(tr->onError(e));
301 }
302 }
303 }
304
305
delExcessClntTxnEntriesActor(Transaction * tr,int64_t clientTxInfoSizeLimit)306 ACTOR static Future<Void> delExcessClntTxnEntriesActor(Transaction *tr, int64_t clientTxInfoSizeLimit) {
307 state const Key clientLatencyName = CLIENT_LATENCY_INFO_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin);
308 state const Key clientLatencyAtomicCtr = CLIENT_LATENCY_INFO_CTR_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin);
309 TraceEvent(SevInfo, "DelExcessClntTxnEntriesCalled");
310 loop{
311 try {
312 tr->reset();
313 tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
314 tr->setOption(FDBTransactionOptions::LOCK_AWARE);
315 Optional<Value> ctrValue = wait(tr->get(KeyRef(clientLatencyAtomicCtr), true));
316 if (!ctrValue.present()) {
317 TraceEvent(SevInfo, "NumClntTxnEntriesNotFound");
318 return Void();
319 }
320 state int64_t txInfoSize = 0;
321 ASSERT(ctrValue.get().size() == sizeof(int64_t));
322 memcpy(&txInfoSize, ctrValue.get().begin(), ctrValue.get().size());
323 if (txInfoSize < clientTxInfoSizeLimit)
324 return Void();
325 int getRangeByteLimit = (txInfoSize - clientTxInfoSizeLimit) < CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT ? (txInfoSize - clientTxInfoSizeLimit) : CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
326 GetRangeLimits limit(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, getRangeByteLimit);
327 Standalone<RangeResultRef> txEntries = wait(tr->getRange(KeyRangeRef(clientLatencyName, strinc(clientLatencyName)), limit));
328 state int64_t numBytesToDel = 0;
329 KeyRef endKey;
330 for (auto &kv : txEntries) {
331 endKey = kv.key;
332 numBytesToDel += kv.key.size() + kv.value.size();
333 if (txInfoSize - numBytesToDel <= clientTxInfoSizeLimit)
334 break;
335 }
336 if (numBytesToDel) {
337 tr->clear(KeyRangeRef(txEntries[0].key, strinc(endKey)));
338 TraceEvent(SevInfo, "DeletingExcessCntTxnEntries").detail("BytesToBeDeleted", numBytesToDel);
339 int64_t bytesDel = -numBytesToDel;
340 tr->atomicOp(clientLatencyAtomicCtr, StringRef((uint8_t*)&bytesDel, 8), MutationRef::AddValue);
341 wait(tr->commit());
342 }
343 if (txInfoSize - numBytesToDel <= clientTxInfoSizeLimit)
344 return Void();
345 }
346 catch (Error& e) {
347 wait(tr->onError(e));
348 }
349 }
350 }
351
352 // The reason for getting a pointer to DatabaseContext instead of a reference counted object is because reference counting will increment reference count for
353 // DatabaseContext which holds the future of this actor. This creates a cyclic reference and hence this actor and Database object will not be destroyed at all.
clientStatusUpdateActor(DatabaseContext * cx)354 ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext *cx) {
355 state const std::string clientLatencyName = CLIENT_LATENCY_INFO_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin).toString();
356 state Transaction tr;
357 state std::vector<TrInfoChunk> commitQ;
358 state int txBytes = 0;
359
360 loop {
361 try {
362 ASSERT(cx->clientStatusUpdater.outStatusQ.empty());
363 cx->clientStatusUpdater.inStatusQ.swap(cx->clientStatusUpdater.outStatusQ);
364 // Split Transaction Info into chunks
365 state std::vector<TrInfoChunk> trChunksQ;
366 for (auto &entry : cx->clientStatusUpdater.outStatusQ) {
367 auto &bw = entry.second;
368 int64_t value_size_limit = BUGGIFY ? g_random->randomInt(1e3, CLIENT_KNOBS->VALUE_SIZE_LIMIT) : CLIENT_KNOBS->VALUE_SIZE_LIMIT;
369 int num_chunks = (bw.getLength() + value_size_limit - 1) / value_size_limit;
370 std::string random_id = g_random->randomAlphaNumeric(16);
371 std::string user_provided_id = entry.first.size() ? entry.first + "/" : "";
372 for (int i = 0; i < num_chunks; i++) {
373 TrInfoChunk chunk;
374 BinaryWriter chunkBW(Unversioned());
375 chunkBW << bigEndian32(i+1) << bigEndian32(num_chunks);
376 chunk.key = KeyRef(clientLatencyName + std::string(10, '\x00') + "/" + random_id + "/" + chunkBW.toValue().toString() + "/" + user_provided_id + std::string(4, '\x00'));
377 int32_t pos = littleEndian32(clientLatencyName.size());
378 memcpy(mutateString(chunk.key) + chunk.key.size() - sizeof(int32_t), &pos, sizeof(int32_t));
379 if (i == num_chunks - 1) {
380 chunk.value = ValueRef(static_cast<uint8_t *>(bw.getData()) + (i * value_size_limit), bw.getLength() - (i * value_size_limit));
381 }
382 else {
383 chunk.value = ValueRef(static_cast<uint8_t *>(bw.getData()) + (i * value_size_limit), value_size_limit);
384 }
385 trChunksQ.push_back(std::move(chunk));
386 }
387 }
388
389 // Commit the chunks splitting into different transactions if needed
390 state int64_t dataSizeLimit = BUGGIFY ? g_random->randomInt(200e3, 1.5 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT) : 0.8 * CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
391 state std::vector<TrInfoChunk>::iterator tracking_iter = trChunksQ.begin();
392 tr = Transaction(Database(Reference<DatabaseContext>::addRef(cx)));
393 ASSERT(commitQ.empty() && (txBytes == 0));
394 loop {
395 state std::vector<TrInfoChunk>::iterator iter = tracking_iter;
396 txBytes = 0;
397 commitQ.clear();
398 try {
399 while (iter != trChunksQ.end()) {
400 if (iter->value.size() + iter->key.size() + txBytes > dataSizeLimit) {
401 wait(transactionInfoCommitActor(&tr, &commitQ));
402 tracking_iter = iter;
403 commitQ.clear();
404 txBytes = 0;
405 }
406 commitQ.push_back(*iter);
407 txBytes += iter->value.size() + iter->key.size();
408 ++iter;
409 }
410 if (!commitQ.empty()) {
411 wait(transactionInfoCommitActor(&tr, &commitQ));
412 commitQ.clear();
413 txBytes = 0;
414 }
415 break;
416 }
417 catch (Error &e) {
418 if (e.code() == error_code_transaction_too_large) {
419 dataSizeLimit /= 2;
420 ASSERT(dataSizeLimit >= CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->KEY_SIZE_LIMIT);
421 }
422 else {
423 TraceEvent(SevWarnAlways, "ClientTrInfoErrorCommit")
424 .error(e)
425 .detail("TxBytes", txBytes);
426 commitQ.clear();
427 txBytes = 0;
428 throw;
429 }
430 }
431 }
432 cx->clientStatusUpdater.outStatusQ.clear();
433 double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : cx->clientInfo->get().clientTxnInfoSampleRate;
434 int64_t clientTxnInfoSizeLimit = cx->clientInfo->get().clientTxnInfoSizeLimit == -1 ? CLIENT_KNOBS->CSI_SIZE_LIMIT : cx->clientInfo->get().clientTxnInfoSizeLimit;
435 if (!trChunksQ.empty() && g_random->random01() < clientSamplingProbability)
436 wait(delExcessClntTxnEntriesActor(&tr, clientTxnInfoSizeLimit));
437
438 // tr is destructed because it hold a reference to DatabaseContext which creates a cycle mentioned above.
439 // Hence destroy the transacation before sleeping to give a chance for the actor to be cleanedup if the Database is destroyed by the user.
440 tr = Transaction();
441 wait(delay(CLIENT_KNOBS->CSI_STATUS_DELAY));
442 }
443 catch (Error& e) {
444 if (e.code() == error_code_actor_cancelled) {
445 throw;
446 }
447 cx->clientStatusUpdater.outStatusQ.clear();
448 TraceEvent(SevWarnAlways, "UnableToWriteClientStatus").error(e);
449 // tr is destructed because it hold a reference to DatabaseContext which creates a cycle mentioned above.
450 // Hence destroy the transacation before sleeping to give a chance for the actor to be cleanedup if the Database is destroyed by the user.
451 tr = Transaction();
452 wait(delay(10.0));
453 }
454 }
455 }
456
monitorMasterProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo,AsyncTrigger * triggerVar)457 ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo, AsyncTrigger *triggerVar) {
458 state vector< MasterProxyInterface > curProxies;
459 curProxies = clientDBInfo->get().proxies;
460
461 loop{
462 wait(clientDBInfo->onChange());
463 if (clientDBInfo->get().proxies != curProxies) {
464 curProxies = clientDBInfo->get().proxies;
465 triggerVar->trigger();
466 }
467 }
468 }
469
getHealthMetricsActor(DatabaseContext * cx,bool detailed)470 ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bool detailed) {
471 if (now() - cx->healthMetricsLastUpdated < CLIENT_KNOBS->AGGREGATE_HEALTH_METRICS_MAX_STALENESS) {
472 if (detailed) {
473 return cx->healthMetrics;
474 }
475 else {
476 HealthMetrics result;
477 result.update(cx->healthMetrics, false, false);
478 return result;
479 }
480 }
481 state bool sendDetailedRequest = detailed && now() - cx->detailedHealthMetricsLastUpdated >
482 CLIENT_KNOBS->DETAILED_HEALTH_METRICS_MAX_STALENESS;
483 loop {
484 choose {
485 when(wait(cx->onMasterProxiesChanged())) {}
486 when(GetHealthMetricsReply rep =
487 wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getHealthMetrics,
488 GetHealthMetricsRequest(sendDetailedRequest)))) {
489 cx->healthMetrics.update(rep.healthMetrics, detailed, true);
490 if (detailed) {
491 cx->healthMetricsLastUpdated = now();
492 cx->detailedHealthMetricsLastUpdated = now();
493 return cx->healthMetrics;
494 }
495 else {
496 cx->healthMetricsLastUpdated = now();
497 HealthMetrics result;
498 result.update(cx->healthMetrics, false, false);
499 return result;
500 }
501 }
502 }
503 }
504 }
505
getHealthMetrics(bool detailed=false)506 Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
507 return getHealthMetricsActor(this, detailed);
508 }
509
DatabaseContext(Reference<Cluster> cluster,Reference<AsyncVar<ClientDBInfo>> clientInfo,Future<Void> clientInfoMonitor,Standalone<StringRef> dbId,int taskID,LocalityData const & clientLocality,bool enableLocalityLoadBalance,bool lockAware,int apiVersion)510 DatabaseContext::DatabaseContext(
511 Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId,
512 int taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, int apiVersion )
513 : cluster(cluster), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), dbId(dbId), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
514 lockAware(lockAware), apiVersion(apiVersion), provisional(false),
515 transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0),
516 transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0),
517 transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), transactionTimeout(0.0), transactionMaxRetries(-1),
518 latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
519 healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0)
520 {
521 metadataVersionCache.resize(CLIENT_KNOBS->METADATA_VERSION_CACHE_SIZE);
522 maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES;
523
524 transactionMaxBackoff = CLIENT_KNOBS->FAILURE_MAX_DELAY;
525 snapshotRywEnabled = apiVersionAtLeast(300) ? 1 : 0;
526
527 logger = databaseLogger( this );
528 locationCacheSize = g_network->isSimulated() ?
529 CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE_SIM :
530 CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE;
531
532 getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
533 getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
534
535 monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
536 clientStatusUpdater.actor = clientStatusUpdateActor(this);
537 }
538
DatabaseContext(const Error & err)539 DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) {}
540
monitorClientInfo(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface,Reference<ClusterConnectionFile> ccf,Reference<AsyncVar<ClientDBInfo>> outInfo,Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed)541 ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo, Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed ) {
542 try {
543 state Optional<double> incorrectTime;
544 loop {
545 OpenDatabaseRequest req;
546 req.knownClientInfoID = outInfo->get().id;
547 req.supportedVersions = VectorRef<ClientVersionRef>(req.arena, networkOptions.supportedVersions);
548 req.connectedCoordinatorsNum = connectedCoordinatorsNumDelayed->get();
549 req.traceLogGroup = StringRef(req.arena, networkOptions.traceLogGroup);
550
551 ClusterConnectionString fileConnectionString;
552 if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
553 req.issues.push_back_deep(req.arena, LiteralStringRef("incorrect_cluster_file_contents"));
554 std::string connectionString = ccf->getConnectionString().toString();
555 if(!incorrectTime.present()) {
556 incorrectTime = now();
557 }
558 if(ccf->canGetFilename()) {
559 // Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the file right before us)
560 TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
561 .detail("Filename", ccf->getFilename())
562 .detail("ConnectionStringFromFile", fileConnectionString.toString())
563 .detail("CurrentConnectionString", connectionString);
564 }
565 }
566 else {
567 incorrectTime = Optional<double>();
568 }
569
570 choose {
571 when( ClientDBInfo ni = wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().openDatabase.getReply( req ) ) : Never() ) ) {
572 TraceEvent("ClientInfoChange").detail("ChangeID", ni.id);
573 outInfo->set(ni);
574 }
575 when( wait( clusterInterface->onChange() ) ) {
576 if(clusterInterface->get().present())
577 TraceEvent("ClientInfo_CCInterfaceChange").detail("CCID", clusterInterface->get().get().id());
578 }
579 when( wait( connectedCoordinatorsNumDelayed->onChange() ) ) {}
580 }
581 }
582 } catch( Error& e ) {
583 TraceEvent(SevError, "MonitorClientInfoError")
584 .error(e)
585 .detail("ConnectionFile", ccf && ccf->canGetFilename() ? ccf->getFilename() : "")
586 .detail("ConnectionString", ccf ? ccf->getConnectionString().toString() : "");
587
588 throw;
589 }
590 }
591
592 // Create database context and monitor the cluster status;
593 // Notify client when cluster info (e.g., cluster controller) changes
create(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface,Reference<ClusterConnectionFile> connFile,LocalityData const & clientLocality)594 Database DatabaseContext::create(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality) {
595 Reference<AsyncVar<int>> connectedCoordinatorsNum(new AsyncVar<int>(0));
596 Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed(new AsyncVar<int>(0));
597 Reference<Cluster> cluster(new Cluster(connFile, clusterInterface, connectedCoordinatorsNum));
598 Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
599 Future<Void> clientInfoMonitor = delayedAsyncVar(connectedCoordinatorsNum, connectedCoordinatorsNumDelayed, CLIENT_KNOBS->CHECK_CONNECTED_COORDINATOR_NUM_DELAY) || monitorClientInfo(clusterInterface, connFile, clientInfo, connectedCoordinatorsNumDelayed);
600
601 return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false));
602 }
603
create(Reference<AsyncVar<ClientDBInfo>> clientInfo,Future<Void> clientInfoMonitor,LocalityData clientLocality,bool enableLocalityLoadBalance,int taskID,bool lockAware,int apiVersion)604 Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID, bool lockAware, int apiVersion) {
605 return Database( new DatabaseContext( Reference<Cluster>(nullptr), clientInfo, clientInfoMonitor, LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion ) );
606 }
607
~DatabaseContext()608 DatabaseContext::~DatabaseContext() {
609 monitorMasterProxiesInfoChange.cancel();
610 for(auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
611 it->second->notifyContextDestroyed();
612 ASSERT_ABORT( server_interf.empty() );
613 locationCache.insert( allKeys, Reference<LocationInfo>() );
614 }
615
getCachedLocation(const KeyRef & key,bool isBackward)616 pair<KeyRange,Reference<LocationInfo>> DatabaseContext::getCachedLocation( const KeyRef& key, bool isBackward ) {
617 if( isBackward ) {
618 auto range = locationCache.rangeContainingKeyBefore(key);
619 return std::make_pair(range->range(), range->value());
620 }
621 else {
622 auto range = locationCache.rangeContaining(key);
623 return std::make_pair(range->range(), range->value());
624 }
625 }
626
getCachedLocations(const KeyRangeRef & range,vector<std::pair<KeyRange,Reference<LocationInfo>>> & result,int limit,bool reverse)627 bool DatabaseContext::getCachedLocations( const KeyRangeRef& range, vector<std::pair<KeyRange,Reference<LocationInfo>>>& result, int limit, bool reverse ) {
628 result.clear();
629 auto locRanges = locationCache.intersectingRanges(range);
630
631 auto begin = locationCache.rangeContaining(range.begin);
632 auto end = locationCache.rangeContainingKeyBefore(range.end);
633
634 loop {
635 auto r = reverse ? end : begin;
636 if (!r->value()){
637 TEST(result.size()); // had some but not all cached locations
638 result.clear();
639 return false;
640 }
641 result.push_back( make_pair(r->range() & range, r->value()) );
642 if(result.size() == limit)
643 break;
644
645 if(begin == end)
646 break;
647
648 if(reverse)
649 --end;
650 else
651 ++begin;
652 }
653
654 return true;
655 }
656
setCachedLocation(const KeyRangeRef & keys,const vector<StorageServerInterface> & servers)657 Reference<LocationInfo> DatabaseContext::setCachedLocation( const KeyRangeRef& keys, const vector<StorageServerInterface>& servers ) {
658 vector<Reference<ReferencedInterface<StorageServerInterface>>> serverRefs;
659 serverRefs.reserve(servers.size());
660 for(auto& interf : servers) {
661 serverRefs.push_back( StorageServerInfo::getInterface( this, interf, clientLocality ) );
662 }
663
664 int maxEvictionAttempts = 100, attempts = 0;
665 Reference<LocationInfo> loc = Reference<LocationInfo>( new LocationInfo(serverRefs) );
666 while( locationCache.size() > locationCacheSize && attempts < maxEvictionAttempts) {
667 TEST( true ); // NativeAPI storage server locationCache entry evicted
668 attempts++;
669 auto r = locationCache.randomRange();
670 Key begin = r.begin(), end = r.end(); // insert invalidates r, so can't be passed a mere reference into it
671 locationCache.insert( KeyRangeRef(begin, end), Reference<LocationInfo>() );
672 }
673 locationCache.insert( keys, loc );
674 return std::move(loc);
675 }
676
invalidateCache(const KeyRef & key,bool isBackward)677 void DatabaseContext::invalidateCache( const KeyRef& key, bool isBackward ) {
678 if( isBackward )
679 locationCache.rangeContainingKeyBefore(key)->value() = Reference<LocationInfo>();
680 else
681 locationCache.rangeContaining(key)->value() = Reference<LocationInfo>();
682 }
683
invalidateCache(const KeyRangeRef & keys)684 void DatabaseContext::invalidateCache( const KeyRangeRef& keys ) {
685 auto rs = locationCache.intersectingRanges(keys);
686 Key begin = rs.begin().begin(), end = rs.end().begin(); // insert invalidates rs, so can't be passed a mere reference into it
687 locationCache.insert( KeyRangeRef(begin, end), Reference<LocationInfo>() );
688 }
689
onMasterProxiesChanged()690 Future<Void> DatabaseContext::onMasterProxiesChanged() {
691 return this->masterProxiesChangeTrigger.onTrigger();
692 }
693
extractIntOption(Optional<StringRef> value,int64_t minValue,int64_t maxValue)694 int64_t extractIntOption( Optional<StringRef> value, int64_t minValue, int64_t maxValue ) {
695 validateOptionValue(value, true);
696 if( value.get().size() != 8 ) {
697 throw invalid_option_value();
698 }
699
700 int64_t passed = *((int64_t*)(value.get().begin()));
701 if( passed > maxValue || passed < minValue ) {
702 throw invalid_option_value();
703 }
704
705 return passed;
706 }
707
extractHexOption(StringRef value)708 uint64_t extractHexOption( StringRef value ) {
709 char* end;
710 uint64_t id = strtoull( value.toString().c_str(), &end, 16 );
711 if (*end)
712 throw invalid_option_value();
713 return id;
714 }
715
setOption(FDBDatabaseOptions::Option option,Optional<StringRef> value)716 void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value) {
717 switch(option) {
718 case FDBDatabaseOptions::LOCATION_CACHE_SIZE:
719 locationCacheSize = (int)extractIntOption(value, 0, std::numeric_limits<int>::max());
720 break;
721 case FDBDatabaseOptions::MACHINE_ID:
722 clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
723 if( clientInfo->get().proxies.size() )
724 masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ) );
725 server_interf.clear();
726 locationCache.insert( allKeys, Reference<LocationInfo>() );
727 break;
728 case FDBDatabaseOptions::MAX_WATCHES:
729 maxOutstandingWatches = (int)extractIntOption(value, 0, CLIENT_KNOBS->ABSOLUTE_MAX_WATCHES);
730 break;
731 case FDBDatabaseOptions::DATACENTER_ID:
732 clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
733 if( clientInfo->get().proxies.size() )
734 masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
735 server_interf.clear();
736 locationCache.insert( allKeys, Reference<LocationInfo>() );
737 break;
738 case FDBDatabaseOptions::TRANSACTION_TIMEOUT:
739 if( !apiVersionAtLeast(610) ) {
740 throw invalid_option();
741 }
742 transactionTimeout = extractIntOption(value, 0, std::numeric_limits<int>::max())/1000.0;
743 break;
744 case FDBDatabaseOptions::TRANSACTION_RETRY_LIMIT:
745 transactionMaxRetries = (int)extractIntOption(value, -1, std::numeric_limits<int>::max());
746 break;
747 case FDBDatabaseOptions::TRANSACTION_MAX_RETRY_DELAY:
748 validateOptionValue(value, true);
749 transactionMaxBackoff = extractIntOption(value, 0, std::numeric_limits<int32_t>::max()) / 1000.0;
750 break;
751 case FDBDatabaseOptions::SNAPSHOT_RYW_ENABLE:
752 validateOptionValue(value, false);
753 snapshotRywEnabled++;
754 break;
755 case FDBDatabaseOptions::SNAPSHOT_RYW_DISABLE:
756 validateOptionValue(value, false);
757 snapshotRywEnabled--;
758 break;
759 }
760 }
761
addWatch()762 void DatabaseContext::addWatch() {
763 if(outstandingWatches >= maxOutstandingWatches)
764 throw too_many_watches();
765
766 ++outstandingWatches;
767 }
768
removeWatch()769 void DatabaseContext::removeWatch() {
770 --outstandingWatches;
771 ASSERT(outstandingWatches >= 0);
772 }
773
onConnected()774 Future<Void> DatabaseContext::onConnected() {
775 ASSERT(cluster);
776 return cluster->onConnected();
777 }
778
getConnectionFile()779 Reference<ClusterConnectionFile> DatabaseContext::getConnectionFile() {
780 ASSERT(cluster);
781 return cluster->getConnectionFile();
782 }
783
createDatabase(Reference<ClusterConnectionFile> connFile,int apiVersion,LocalityData const & clientLocality,DatabaseContext * preallocatedDb)784 Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality, DatabaseContext *preallocatedDb ) {
785 Reference<AsyncVar<int>> connectedCoordinatorsNum(new AsyncVar<int>(0)); // Number of connected coordinators for the client
786 Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed(new AsyncVar<int>(0));
787 Reference<Cluster> cluster(new Cluster(connFile, connectedCoordinatorsNum, apiVersion));
788 Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
789 Future<Void> clientInfoMonitor = delayedAsyncVar(connectedCoordinatorsNum, connectedCoordinatorsNumDelayed, CLIENT_KNOBS->CHECK_CONNECTED_COORDINATOR_NUM_DELAY) || monitorClientInfo(cluster->getClusterInterface(), connFile, clientInfo, connectedCoordinatorsNumDelayed);
790
791 DatabaseContext *db;
792 if(preallocatedDb) {
793 db = new (preallocatedDb) DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false, apiVersion);
794 }
795 else {
796 db = new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false, apiVersion);
797 }
798
799 return Database(db);
800 }
801
createDatabase(std::string connFileName,int apiVersion,LocalityData const & clientLocality)802 Database Database::createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality ) {
803 Reference<ClusterConnectionFile> rccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first));
804 return Database::createDatabase(rccf, apiVersion, clientLocality);
805 }
806
807 extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
808
Cluster(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<int>> connectedCoordinatorsNum,int apiVersion)809 Cluster::Cluster( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion )
810 : clusterInterface(new AsyncVar<Optional<ClusterInterface>>())
811 {
812 init(connFile, true, connectedCoordinatorsNum, apiVersion);
813 }
814
Cluster(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface,Reference<AsyncVar<int>> connectedCoordinatorsNum)815 Cluster::Cluster( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<AsyncVar<int>> connectedCoordinatorsNum)
816 : clusterInterface(clusterInterface)
817 {
818 init(connFile, true, connectedCoordinatorsNum);
819 }
820
init(Reference<ClusterConnectionFile> connFile,bool startClientInfoMonitor,Reference<AsyncVar<int>> connectedCoordinatorsNum,int apiVersion)821 void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion ) {
822 connectionFile = connFile;
823 connected = clusterInterface->onChange();
824
825 if(!g_network)
826 throw network_not_setup();
827
828 if(connFile) {
829 if(networkOptions.traceDirectory.present() && !traceFileIsOpen()) {
830 g_network->initMetrics();
831 FlowTransport::transport().initMetrics();
832 initTraceEventMetrics();
833
834 auto publicIP = determinePublicIPAutomatically( connFile->getConnectionString() );
835 selectTraceFormatter(networkOptions.traceFormat);
836 openTraceFile(NetworkAddress(publicIP, ::getpid()), networkOptions.traceRollSize, networkOptions.traceMaxLogsSize, networkOptions.traceDirectory.get(), "trace", networkOptions.traceLogGroup);
837
838 TraceEvent("ClientStart")
839 .detail("SourceVersion", getHGVersion())
840 .detail("Version", FDB_VT_VERSION)
841 .detail("PackageName", FDB_VT_PACKAGE_NAME)
842 .detail("ClusterFile", connFile->getFilename().c_str())
843 .detail("ConnectionString", connFile->getConnectionString().toString())
844 .detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
845 .detail("ApiVersion", apiVersion)
846 .detailf("ImageOffset", "%p", platform::getImageOffset())
847 .trackLatest("ClientStart");
848
849 initializeSystemMonitorMachineState(SystemMonitorMachineState(IPAddress(publicIP)));
850
851 systemMonitor();
852 uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) );
853 }
854
855 leaderMon = monitorLeader( connFile, clusterInterface, connectedCoordinatorsNum );
856 failMon = failureMonitorClient( clusterInterface, false );
857 }
858 }
859
~Cluster()860 Cluster::~Cluster() {}
861
getClusterInterface()862 Reference<AsyncVar<Optional<struct ClusterInterface>>> Cluster::getClusterInterface() {
863 return clusterInterface;
864 }
865
onConnected()866 Future<Void> Cluster::onConnected() {
867 return connected;
868 }
869
setNetworkOption(FDBNetworkOptions::Option option,Optional<StringRef> value)870 void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
871 switch(option) {
872 // SOMEDAY: If the network is already started, should these three throw an error?
873 case FDBNetworkOptions::TRACE_ENABLE:
874 networkOptions.traceDirectory = value.present() ? value.get().toString() : "";
875 break;
876 case FDBNetworkOptions::TRACE_ROLL_SIZE:
877 validateOptionValue(value, true);
878 networkOptions.traceRollSize = extractIntOption(value, 0, std::numeric_limits<int64_t>::max());
879 break;
880 case FDBNetworkOptions::TRACE_MAX_LOGS_SIZE:
881 validateOptionValue(value, true);
882 networkOptions.traceMaxLogsSize = extractIntOption(value, 0, std::numeric_limits<int64_t>::max());
883 break;
884 case FDBNetworkOptions::TRACE_LOG_GROUP:
885 if(value.present())
886 networkOptions.traceLogGroup = value.get().toString();
887 break;
888 case FDBNetworkOptions::TRACE_FORMAT:
889 validateOptionValue(value, true);
890 networkOptions.traceFormat = value.get().toString();
891 if (!validateTraceFormat(networkOptions.traceFormat)) {
892 fprintf(stderr, "Unrecognized trace format: `%s'\n", networkOptions.traceFormat.c_str());
893 throw invalid_option_value();
894 }
895 break;
896 case FDBNetworkOptions::KNOB: {
897 validateOptionValue(value, true);
898
899 std::string optionValue = value.get().toString();
900 TraceEvent("SetKnob").detail("KnobString", optionValue);
901
902 size_t eq = optionValue.find_first_of('=');
903 if(eq == optionValue.npos) {
904 TraceEvent(SevWarnAlways, "InvalidKnobString").detail("KnobString", optionValue);
905 throw invalid_option_value();
906 }
907
908 std::string knobName = optionValue.substr(0, eq);
909 std::string knobValue = optionValue.substr(eq+1);
910 if (!const_cast<FlowKnobs*>(FLOW_KNOBS)->setKnob( knobName, knobValue ) &&
911 !const_cast<ClientKnobs*>(CLIENT_KNOBS)->setKnob( knobName, knobValue ))
912 {
913 TraceEvent(SevWarnAlways, "UnrecognizedKnob").detail("Knob", knobName.c_str());
914 fprintf(stderr, "FoundationDB client ignoring unrecognized knob option '%s'\n", knobName.c_str());
915 }
916 break;
917 }
918 case FDBNetworkOptions::TLS_PLUGIN:
919 validateOptionValue(value, true);
920 break;
921 case FDBNetworkOptions::TLS_CERT_PATH:
922 validateOptionValue(value, true);
923 initTLSOptions();
924 tlsOptions->set_cert_file( value.get().toString() );
925 break;
926 case FDBNetworkOptions::TLS_CERT_BYTES:
927 initTLSOptions();
928 tlsOptions->set_cert_data( value.get().toString() );
929 break;
930 case FDBNetworkOptions::TLS_CA_PATH:
931 validateOptionValue(value, true);
932 initTLSOptions();
933 tlsOptions->set_ca_file( value.get().toString() );
934 break;
935 case FDBNetworkOptions::TLS_CA_BYTES:
936 validateOptionValue(value, true);
937 initTLSOptions();
938 tlsOptions->set_ca_data(value.get().toString());
939 break;
940 case FDBNetworkOptions::TLS_PASSWORD:
941 validateOptionValue(value, true);
942 initTLSOptions();
943 tlsOptions->set_key_password(value.get().toString());
944 break;
945 case FDBNetworkOptions::TLS_KEY_PATH:
946 validateOptionValue(value, true);
947 initTLSOptions();
948 tlsOptions->set_key_file( value.get().toString() );
949 break;
950 case FDBNetworkOptions::TLS_KEY_BYTES:
951 validateOptionValue(value, true);
952 initTLSOptions();
953 tlsOptions->set_key_data( value.get().toString() );
954 break;
955 case FDBNetworkOptions::TLS_VERIFY_PEERS:
956 validateOptionValue(value, true);
957 initTLSOptions();
958 try {
959 tlsOptions->set_verify_peers({ value.get().toString() });
960 } catch( Error& e ) {
961 TraceEvent(SevWarnAlways, "TLSValidationSetError")
962 .error( e )
963 .detail("Input", value.get().toString() );
964 throw invalid_option_value();
965 }
966 break;
967 case FDBNetworkOptions::DISABLE_CLIENT_STATISTICS_LOGGING:
968 validateOptionValue(value, false);
969 networkOptions.logClientInfo = false;
970 break;
971 case FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS:
972 {
973 // The multi-version API should be providing us these guarantees
974 ASSERT(g_network);
975 ASSERT(value.present());
976
977 networkOptions.supportedVersions.resize(networkOptions.supportedVersions.arena(), 0);
978 std::string versionString = value.get().toString();
979
980 size_t index = 0;
981 size_t nextIndex = 0;
982 while(nextIndex != versionString.npos) {
983 nextIndex = versionString.find(';', index);
984 networkOptions.supportedVersions.push_back_deep(networkOptions.supportedVersions.arena(), ClientVersionRef(versionString.substr(index, nextIndex-index)));
985 index = nextIndex + 1;
986 }
987
988 ASSERT(networkOptions.supportedVersions.size() > 0);
989
990 break;
991 }
992 case FDBNetworkOptions::ENABLE_SLOW_TASK_PROFILING:
993 validateOptionValue(value, false);
994 networkOptions.slowTaskProfilingEnabled = true;
995 break;
996 default:
997 break;
998 }
999 }
1000
setupNetwork(uint64_t transportId,bool useMetrics)1001 void setupNetwork(uint64_t transportId, bool useMetrics) {
1002 if( g_network )
1003 throw network_already_setup();
1004
1005 g_random = new DeterministicRandom( platform::getRandomSeed() );
1006 trace_random = new DeterministicRandom( platform::getRandomSeed() );
1007 g_nondeterministic_random = trace_random;
1008 g_debug_random = trace_random;
1009 if (!networkOptions.logClientInfo.present())
1010 networkOptions.logClientInfo = true;
1011
1012 g_network = newNet2(false, useMetrics || networkOptions.traceDirectory.present());
1013 FlowTransport::createInstance(transportId);
1014 Net2FileSystem::newFileSystem();
1015
1016 initTLSOptions();
1017
1018 #ifndef TLS_DISABLED
1019 tlsOptions->register_network();
1020 #endif
1021 }
1022
runNetwork()1023 void runNetwork() {
1024 if(!g_network)
1025 throw network_not_setup();
1026
1027 if(networkOptions.traceDirectory.present() && networkOptions.slowTaskProfilingEnabled) {
1028 setupSlowTaskProfiler();
1029 }
1030
1031 g_network->run();
1032
1033 if(networkOptions.traceDirectory.present())
1034 systemMonitor();
1035 }
1036
stopNetwork()1037 void stopNetwork() {
1038 if(!g_network)
1039 throw network_not_setup();
1040
1041 g_network->stop();
1042 closeTraceFile();
1043 }
1044
getMasterProxies(bool useProvisionalProxies)1045 Reference<ProxyInfo> DatabaseContext::getMasterProxies(bool useProvisionalProxies) {
1046 if (masterProxiesLastChange != clientInfo->get().id) {
1047 masterProxiesLastChange = clientInfo->get().id;
1048 masterProxies.clear();
1049 if( clientInfo->get().proxies.size() ) {
1050 masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
1051 provisional = clientInfo->get().proxies[0].provisional;
1052 }
1053 }
1054 if(provisional && !useProvisionalProxies) {
1055 return Reference<ProxyInfo>();
1056 }
1057 return masterProxies;
1058 }
1059
1060 //Actor which will wait until the MultiInterface<MasterProxyInterface> returned by the DatabaseContext cx is not NULL
getMasterProxiesFuture(DatabaseContext * cx,bool useProvisionalProxies)1061 ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx, bool useProvisionalProxies) {
1062 loop{
1063 Reference<ProxyInfo> proxies = cx->getMasterProxies(useProvisionalProxies);
1064 if (proxies)
1065 return proxies;
1066 wait( cx->onMasterProxiesChanged() );
1067 }
1068 }
1069
1070 //Returns a future which will not be set until the ProxyInfo of this DatabaseContext is not NULL
getMasterProxiesFuture(bool useProvisionalProxies)1071 Future<Reference<ProxyInfo>> DatabaseContext::getMasterProxiesFuture(bool useProvisionalProxies) {
1072 return ::getMasterProxiesFuture(this, useProvisionalProxies);
1073 }
1074
decrement(VectorRef<KeyValueRef> const & data)1075 void GetRangeLimits::decrement( VectorRef<KeyValueRef> const& data ) {
1076 if( rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED ) {
1077 ASSERT(data.size() <= rows);
1078 rows -= data.size();
1079 }
1080
1081 minRows = std::max(0, minRows - data.size());
1082
1083 if( bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED )
1084 bytes = std::max( 0, bytes - (int)data.expectedSize() - (8-(int)sizeof(KeyValueRef))*data.size() );
1085 }
1086
decrement(KeyValueRef const & data)1087 void GetRangeLimits::decrement( KeyValueRef const& data ) {
1088 minRows = std::max(0, minRows - 1);
1089 if( rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED )
1090 rows--;
1091 if( bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED )
1092 bytes = std::max( 0, bytes - (int)8 - (int)data.expectedSize() );
1093 }
1094
1095 // True if either the row or byte limit has been reached
isReached()1096 bool GetRangeLimits::isReached() {
1097 return rows == 0 || (bytes == 0 && minRows == 0);
1098 }
1099
1100 // True if data would cause the row or byte limit to be reached
reachedBy(VectorRef<KeyValueRef> const & data)1101 bool GetRangeLimits::reachedBy( VectorRef<KeyValueRef> const& data ) {
1102 return ( rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED && data.size() >= rows )
1103 || ( bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED && (int)data.expectedSize() + (8-(int)sizeof(KeyValueRef))*data.size() >= bytes && data.size() >= minRows );
1104 }
1105
hasByteLimit()1106 bool GetRangeLimits::hasByteLimit() {
1107 return bytes != CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED;
1108 }
1109
hasRowLimit()1110 bool GetRangeLimits::hasRowLimit() {
1111 return rows != CLIENT_KNOBS->ROW_LIMIT_UNLIMITED;
1112 }
1113
hasSatisfiedMinRows()1114 bool GetRangeLimits::hasSatisfiedMinRows() {
1115 return hasByteLimit() && minRows == 0;
1116 }
1117
parse(StringRef const & key)1118 AddressExclusion AddressExclusion::parse( StringRef const& key ) {
1119 //Must not change: serialized to the database!
1120 auto parsedIp = IPAddress::parse(key.toString());
1121 if (parsedIp.present()) {
1122 return AddressExclusion(parsedIp.get());
1123 }
1124
1125 // Not a whole machine, includes `port'.
1126 try {
1127 auto addr = NetworkAddress::parse(key.toString());
1128 if (addr.isTLS()) {
1129 TraceEvent(SevWarnAlways, "AddressExclusionParseError")
1130 .detail("String", key)
1131 .detail("Description", "Address inclusion string should not include `:tls' suffix.");
1132 return AddressExclusion();
1133 }
1134 return AddressExclusion(addr.ip, addr.port);
1135 } catch (Error& ) {
1136 TraceEvent(SevWarnAlways, "AddressExclusionParseError").detail("String", key);
1137 return AddressExclusion();
1138 }
1139 }
1140
1141 Future<Standalone<RangeResultRef>> getRange(
1142 Database const& cx,
1143 Future<Version> const& fVersion,
1144 KeySelector const& begin,
1145 KeySelector const& end,
1146 GetRangeLimits const& limits,
1147 bool const& reverse,
1148 TransactionInfo const& info);
1149
1150 ACTOR Future<Optional<Value>> getValue(Future<Version> version, Key key, Database cx, TransactionInfo info,
1151 Reference<TransactionLogInfo> trLogInfo);
1152
fetchServerInterface(Database cx,TransactionInfo info,UID id,Future<Version> ver=latestVersion)1153 ACTOR Future<Optional<StorageServerInterface>> fetchServerInterface( Database cx, TransactionInfo info, UID id, Future<Version> ver = latestVersion ) {
1154 Optional<Value> val = wait( getValue(ver, serverListKeyFor(id), cx, info, Reference<TransactionLogInfo>()) );
1155 if( !val.present() ) {
1156 // A storage server has been removed from serverList since we read keyServers
1157 return Optional<StorageServerInterface>();
1158 }
1159
1160 return decodeServerListValue(val.get());
1161 }
1162
transactionalGetServerInterfaces(Future<Version> ver,Database cx,TransactionInfo info,vector<UID> ids)1163 ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInterfaces( Future<Version> ver, Database cx, TransactionInfo info, vector<UID> ids ) {
1164 state vector< Future< Optional<StorageServerInterface> > > serverListEntries;
1165 for( int s = 0; s < ids.size(); s++ ) {
1166 serverListEntries.push_back( fetchServerInterface( cx, info, ids[s], ver ) );
1167 }
1168
1169 vector<Optional<StorageServerInterface>> serverListValues = wait( getAll(serverListEntries) );
1170 vector<StorageServerInterface> serverInterfaces;
1171 for( int s = 0; s < serverListValues.size(); s++ ) {
1172 if( !serverListValues[s].present() ) {
1173 // A storage server has been removed from ServerList since we read keyServers
1174 return Optional<vector<StorageServerInterface>>();
1175 }
1176 serverInterfaces.push_back( serverListValues[s].get() );
1177 }
1178 return serverInterfaces;
1179 }
1180
1181 //If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key). Otherwise returns the shard containing key
getKeyLocation_internal(Database cx,Key key,TransactionInfo info,bool isBackward=false)1182 ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal( Database cx, Key key, TransactionInfo info, bool isBackward = false ) {
1183 if (isBackward) {
1184 ASSERT( key != allKeys.begin && key <= allKeys.end );
1185 } else {
1186 ASSERT( key < allKeys.end );
1187 }
1188
1189 if( info.debugID.present() )
1190 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
1191
1192 loop {
1193 choose {
1194 when ( wait( cx->onMasterProxiesChanged() ) ) {}
1195 when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskDefaultPromiseEndpoint ) ) ) {
1196 if( info.debugID.present() )
1197 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
1198 ASSERT( rep.results.size() == 1 );
1199
1200 auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
1201 return std::make_pair(KeyRange(rep.results[0].first, rep.arena), locationInfo);
1202 }
1203 }
1204 }
1205 }
1206
1207 template <class F>
getKeyLocation(Database const & cx,Key const & key,F StorageServerInterface::* member,TransactionInfo const & info,bool isBackward=false)1208 Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation( Database const& cx, Key const& key, F StorageServerInterface::*member, TransactionInfo const& info, bool isBackward = false ) {
1209 auto ssi = cx->getCachedLocation( key, isBackward );
1210 if (!ssi.second) {
1211 return getKeyLocation_internal( cx, key, info, isBackward );
1212 }
1213
1214 for(int i = 0; i < ssi.second->size(); i++) {
1215 if( IFailureMonitor::failureMonitor().onlyEndpointFailed(ssi.second->get(i, member).getEndpoint()) ) {
1216 cx->invalidateCache( key );
1217 ssi.second.clear();
1218 return getKeyLocation_internal( cx, key, info, isBackward );
1219 }
1220 }
1221
1222 return ssi;
1223 }
1224
getKeyRangeLocations_internal(Database cx,KeyRange keys,int limit,bool reverse,TransactionInfo info)1225 ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
1226 if( info.debugID.present() )
1227 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
1228
1229 loop {
1230 choose {
1231 when ( wait( cx->onMasterProxiesChanged() ) ) {}
1232 when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskDefaultPromiseEndpoint ) ) ) {
1233 state GetKeyServerLocationsReply rep = _rep;
1234 if( info.debugID.present() )
1235 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After");
1236 ASSERT( rep.results.size() );
1237
1238 state vector< pair<KeyRange,Reference<LocationInfo>> > results;
1239 state int shard = 0;
1240 for (; shard < rep.results.size(); shard++) {
1241 //FIXME: these shards are being inserted into the map sequentially, it would be much more CPU efficient to save the map pairs and insert them all at once.
1242 results.push_back( make_pair(rep.results[shard].first & keys, cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second)) );
1243 wait(yield());
1244 }
1245
1246 return results;
1247 }
1248 }
1249 }
1250 }
1251
1252 template <class F>
getKeyRangeLocations(Database const & cx,KeyRange const & keys,int limit,bool reverse,F StorageServerInterface::* member,TransactionInfo const & info)1253 Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations( Database const& cx, KeyRange const& keys, int limit, bool reverse, F StorageServerInterface::*member, TransactionInfo const& info ) {
1254 ASSERT (!keys.empty());
1255
1256 vector< pair<KeyRange,Reference<LocationInfo>> > locations;
1257 if (!cx->getCachedLocations(keys, locations, limit, reverse)) {
1258 return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
1259 }
1260
1261 bool foundFailed = false;
1262 for(auto& it : locations) {
1263 bool onlyEndpointFailed = false;
1264 for(int i = 0; i < it.second->size(); i++) {
1265 if( IFailureMonitor::failureMonitor().onlyEndpointFailed(it.second->get(i, member).getEndpoint()) ) {
1266 onlyEndpointFailed = true;
1267 break;
1268 }
1269 }
1270
1271 if( onlyEndpointFailed ) {
1272 cx->invalidateCache( it.first.begin );
1273 foundFailed = true;
1274 }
1275 }
1276
1277 if(foundFailed) {
1278 return getKeyRangeLocations_internal( cx, keys, limit, reverse, info );
1279 }
1280
1281 return locations;
1282 }
1283
warmRange_impl(Transaction * self,Database cx,KeyRange keys)1284 ACTOR Future<Void> warmRange_impl( Transaction *self, Database cx, KeyRange keys ) {
1285 state int totalRanges = 0;
1286 state int totalRequests = 0;
1287 loop {
1288 vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations_internal(cx, keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, false, self->info));
1289 totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT;
1290 totalRequests++;
1291 if(locations.size() == 0 || totalRanges >= cx->locationCacheSize || locations[locations.size()-1].first.end >= keys.end)
1292 break;
1293
1294 keys = KeyRangeRef(locations[locations.size()-1].first.end, keys.end);
1295
1296 if(totalRequests%20 == 0) {
1297 //To avoid blocking the proxies from starting other transactions, occasionally get a read version.
1298 state Transaction tr(cx);
1299 loop {
1300 try {
1301 tr.setOption( FDBTransactionOptions::LOCK_AWARE );
1302 tr.setOption( FDBTransactionOptions::CAUSAL_READ_RISKY );
1303 wait(success( tr.getReadVersion() ));
1304 break;
1305 } catch( Error &e ) {
1306 wait( tr.onError(e) );
1307 }
1308 }
1309 }
1310 }
1311
1312 return Void();
1313 }
1314
warmRange(Database cx,KeyRange keys)1315 Future<Void> Transaction::warmRange(Database cx, KeyRange keys) {
1316 return warmRange_impl(this, cx, keys);
1317 }
1318
getValue(Future<Version> version,Key key,Database cx,TransactionInfo info,Reference<TransactionLogInfo> trLogInfo)1319 ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Database cx, TransactionInfo info, Reference<TransactionLogInfo> trLogInfo )
1320 {
1321 state Version ver = wait( version );
1322 validateVersion(ver);
1323
1324 loop {
1325 state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, key, &StorageServerInterface::getValue, info) );
1326 state Optional<UID> getValueID = Optional<UID>();
1327 state uint64_t startTime;
1328 state double startTimeD;
1329 try {
1330 //GetValueReply r = wait( g_random->randomChoice( ssi->get() ).getValue.getReply( GetValueRequest(key,ver) ) );
1331 //return r.value;
1332 if( info.debugID.present() ) {
1333 getValueID = g_nondeterministic_random->randomUniqueID();
1334
1335 g_traceBatch.addAttach("GetValueAttachID", info.debugID.get().first(), getValueID.get().first());
1336 g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
1337 /*TraceEvent("TransactionDebugGetValueInfo", getValueID.get())
1338 .detail("Key", key)
1339 .detail("ReqVersion", ver)
1340 .detail("Servers", describe(ssi.second->get()));*/
1341 }
1342
1343 ++cx->getValueSubmitted;
1344 startTime = timer_int();
1345 startTimeD = now();
1346 ++cx->transactionPhysicalReads;
1347 state GetValueReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getValue, GetValueRequest(key, ver, getValueID), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1348 double latency = now() - startTimeD;
1349 cx->readLatencies.addSample(latency);
1350 if (trLogInfo) {
1351 int valueSize = reply.value.present() ? reply.value.get().size() : 0;
1352 trLogInfo->addLog(FdbClientLogEvents::EventGet(startTimeD, latency, valueSize, key));
1353 }
1354 cx->getValueCompleted->latency = timer_int() - startTime;
1355 cx->getValueCompleted->log();
1356
1357 if( info.debugID.present() ) {
1358 g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.After"); //.detail("TaskID", g_network->getCurrentTask());
1359 /*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
1360 .detail("Key", key)
1361 .detail("ReqVersion", ver)
1362 .detail("ReplySize", reply.value.present() ? reply.value.get().size() : -1);*/
1363 }
1364 return reply.value;
1365 } catch (Error& e) {
1366 cx->getValueCompleted->latency = timer_int() - startTime;
1367 cx->getValueCompleted->log();
1368 if( info.debugID.present() ) {
1369 g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Error"); //.detail("TaskID", g_network->getCurrentTask());
1370 /*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
1371 .detail("Key", key)
1372 .detail("ReqVersion", ver)
1373 .detail("ReplySize", reply.value.present() ? reply.value.get().size() : -1);*/
1374 }
1375 if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
1376 (e.code() == error_code_transaction_too_old && ver == latestVersion) ) {
1377 cx->invalidateCache( key );
1378 wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1379 } else {
1380 if (trLogInfo)
1381 trLogInfo->addLog(FdbClientLogEvents::EventGetError(startTimeD, static_cast<int>(e.code()), key));
1382 throw e;
1383 }
1384 }
1385 }
1386 }
1387
getKey(Database cx,KeySelector k,Future<Version> version,TransactionInfo info)1388 ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, TransactionInfo info ) {
1389 Version ver = wait(version);
1390
1391 if( info.debugID.present() )
1392 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion");
1393
1394 loop {
1395 if (k.getKey() == allKeys.end) {
1396 if (k.offset > 0) return allKeys.end;
1397 k.orEqual = false;
1398 }
1399 else if (k.getKey() == allKeys.begin && k.offset <= 0) {
1400 return Key();
1401 }
1402
1403 Key locationKey(k.getKey(), k.arena());
1404 state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()) );
1405
1406 try {
1407 if( info.debugID.present() )
1408 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
1409 ++cx->transactionPhysicalReads;
1410 GetKeyReply reply = wait( loadBalance( ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get()), TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1411 if( info.debugID.present() )
1412 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual);
1413 k = reply.sel;
1414 if (!k.offset && k.orEqual) {
1415 return k.getKey();
1416 }
1417 } catch (Error& e) {
1418 if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
1419 cx->invalidateCache(k.getKey(), k.isBackward());
1420
1421 wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1422 } else {
1423 TraceEvent(SevInfo, "GetKeyError")
1424 .error(e)
1425 .detail("AtKey", k.getKey())
1426 .detail("Offset", k.offset);
1427 throw e;
1428 }
1429 }
1430 }
1431 }
1432
waitForCommittedVersion(Database cx,Version version)1433 ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
1434 try {
1435 loop {
1436 choose {
1437 when ( wait( cx->onMasterProxiesChanged() ) ) {}
1438 when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE ), cx->taskID ) ) ) {
1439 if (v.version >= version)
1440 return v.version;
1441 // SOMEDAY: Do the wait on the server side, possibly use less expensive source of committed version (causal consistency is not needed for this purpose)
1442 wait( delay( CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, cx->taskID ) );
1443 }
1444 }
1445 }
1446 } catch (Error& e) {
1447 TraceEvent(SevError, "WaitForCommittedVersionError").error(e);
1448 throw;
1449 }
1450 }
1451
1452 ACTOR Future<Void> readVersionBatcher(
1453 DatabaseContext* cx, FutureStream<std::pair<Promise<GetReadVersionReply>, Optional<UID>>> versionStream,
1454 uint32_t flags);
1455
watchValue(Future<Version> version,Key key,Optional<Value> value,Database cx,int readVersionFlags,TransactionInfo info)1456 ACTOR Future< Void > watchValue( Future<Version> version, Key key, Optional<Value> value, Database cx, int readVersionFlags, TransactionInfo info )
1457 {
1458 state Version ver = wait( version );
1459 validateVersion(ver);
1460 ASSERT(ver != latestVersion);
1461
1462 loop {
1463 state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, key, &StorageServerInterface::watchValue, info ) );
1464
1465 try {
1466 state Optional<UID> watchValueID = Optional<UID>();
1467 if( info.debugID.present() ) {
1468 watchValueID = g_nondeterministic_random->randomUniqueID();
1469
1470 g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first());
1471 g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
1472 }
1473 state Version resp = wait( loadBalance( ssi.second, &StorageServerInterface::watchValue, WatchValueRequest(key, value, ver, watchValueID), TaskDefaultPromiseEndpoint ) );
1474 if( info.debugID.present() ) {
1475 g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask());
1476 }
1477
1478 //FIXME: wait for known committed version on the storage server before replying,
1479 //cannot do this until the storage server is notified on knownCommittedVersion changes from tlog (faster than the current update loop)
1480 Version v = wait( waitForCommittedVersion( cx, resp ) );
1481
1482 //TraceEvent("WatcherCommitted").detail("CommittedVersion", v).detail("WatchVersion", resp).detail("Key", key ).detail("Value", value);
1483
1484 if( v - resp < 50000000 ) // False if there is a master failure between getting the response and getting the committed version, Dependent on SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT
1485 return Void();
1486 ver = v;
1487 } catch (Error& e) {
1488 if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
1489 cx->invalidateCache( key );
1490 wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1491 } else if( e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind ) {
1492 TEST( e.code() == error_code_watch_cancelled ); // Too many watches on the storage server, poll for changes instead
1493 TEST( e.code() == error_code_process_behind ); // The storage servers are all behind
1494 wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID));
1495 } else if ( e.code() == error_code_timed_out ) { //The storage server occasionally times out watches in case it was cancelled
1496 TEST( true ); // A watch timed out
1497 wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, info.taskID));
1498 } else {
1499 state Error err = e;
1500 wait(delay(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, info.taskID));
1501 throw err;
1502 }
1503 }
1504 }
1505 }
1506
transformRangeLimits(GetRangeLimits limits,bool reverse,GetKeyValuesRequest & req)1507 void transformRangeLimits(GetRangeLimits limits, bool reverse, GetKeyValuesRequest &req) {
1508 if(limits.bytes != 0) {
1509 if(!limits.hasRowLimit())
1510 req.limit = CLIENT_KNOBS->REPLY_BYTE_LIMIT; // Can't get more than this many rows anyway
1511 else
1512 req.limit = std::min( CLIENT_KNOBS->REPLY_BYTE_LIMIT, limits.rows );
1513
1514 if(reverse)
1515 req.limit *= -1;
1516
1517 if(!limits.hasByteLimit())
1518 req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
1519 else
1520 req.limitBytes = std::min( CLIENT_KNOBS->REPLY_BYTE_LIMIT, limits.bytes );
1521 }
1522 else {
1523 req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
1524 req.limit = reverse ? -limits.minRows : limits.minRows;
1525 }
1526 }
1527
getExactRange(Database cx,Version version,KeyRange keys,GetRangeLimits limits,bool reverse,TransactionInfo info)1528 ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version version,
1529 KeyRange keys, GetRangeLimits limits, bool reverse, TransactionInfo info )
1530 {
1531 state Standalone<RangeResultRef> output;
1532
1533 //printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
1534 loop {
1535 state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValues, info ) );
1536 ASSERT( locations.size() );
1537 state int shard = 0;
1538 loop {
1539 const KeyRangeRef& range = locations[shard].first;
1540
1541 GetKeyValuesRequest req;
1542 req.version = version;
1543 req.begin = firstGreaterOrEqual( range.begin );
1544 req.end = firstGreaterOrEqual( range.end );
1545
1546 transformRangeLimits(limits, reverse, req);
1547 ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
1548
1549 //FIXME: buggify byte limits on internal functions that use them, instead of globally
1550 req.debugID = info.debugID;
1551
1552 try {
1553 if( info.debugID.present() ) {
1554 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.Before");
1555 /*TraceEvent("TransactionDebugGetExactRangeInfo", info.debugID.get())
1556 .detail("ReqBeginKey", req.begin.getKey())
1557 .detail("ReqEndKey", req.end.getKey())
1558 .detail("ReqLimit", req.limit)
1559 .detail("ReqLimitBytes", req.limitBytes)
1560 .detail("ReqVersion", req.version)
1561 .detail("Reverse", reverse)
1562 .detail("Servers", locations[shard].second->description());*/
1563 }
1564 ++cx->transactionPhysicalReads;
1565 GetKeyValuesReply rep = wait( loadBalance( locations[shard].second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1566 if( info.debugID.present() )
1567 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
1568 output.arena().dependsOn( rep.arena );
1569 output.append( output.arena(), rep.data.begin(), rep.data.size() );
1570
1571 if( limits.hasRowLimit() && rep.data.size() > limits.rows ) {
1572 TraceEvent(SevError, "GetExactRangeTooManyRows").detail("RowLimit", limits.rows).detail("DeliveredRows", output.size());
1573 ASSERT( false );
1574 }
1575 limits.decrement( rep.data );
1576
1577 if (limits.isReached()) {
1578 output.more = true;
1579 return output;
1580 }
1581
1582 bool more = rep.more;
1583 // If the reply says there is more but we know that we finished the shard, then fix rep.more
1584 if( reverse && more && rep.data.size() > 0 && output[output.size()-1].key == locations[shard].first.begin )
1585 more = false;
1586
1587 if (more) {
1588 if( !rep.data.size() ) {
1589 TraceEvent(SevError, "GetExactRangeError").detail("Reason", "More data indicated but no rows present")
1590 .detail("LimitBytes", limits.bytes).detail("LimitRows", limits.rows)
1591 .detail("OutputSize", output.size()).detail("OutputBytes", output.expectedSize())
1592 .detail("BlockSize", rep.data.size()).detail("BlockBytes", rep.data.expectedSize());
1593 ASSERT( false );
1594 }
1595 TEST(true); // GetKeyValuesReply.more in getExactRange
1596 // Make next request to the same shard with a beginning key just after the last key returned
1597 if( reverse )
1598 locations[shard].first = KeyRangeRef( locations[shard].first.begin, output[output.size()-1].key );
1599 else
1600 locations[shard].first = KeyRangeRef( keyAfter( output[output.size()-1].key ), locations[shard].first.end );
1601 }
1602
1603 if (!more || locations[shard].first.empty()) {
1604 TEST(true);
1605 if(shard == locations.size()-1) {
1606 const KeyRangeRef& range = locations[shard].first;
1607 KeyRef begin = reverse ? keys.begin : range.end;
1608 KeyRef end = reverse ? range.begin : keys.end;
1609
1610 if(begin >= end) {
1611 output.more = false;
1612 return output;
1613 }
1614 TEST(true); //Multiple requests of key locations
1615
1616 keys = KeyRangeRef(begin, end);
1617 break;
1618 }
1619
1620 ++shard;
1621 }
1622
1623 // Soft byte limit - return results early if the user specified a byte limit and we got results
1624 // This can prevent problems where the desired range spans many shards and would be too slow to
1625 // fetch entirely.
1626 if(limits.hasSatisfiedMinRows() && output.size() > 0) {
1627 output.more = true;
1628 return output;
1629 }
1630
1631 } catch (Error& e) {
1632 if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
1633 const KeyRangeRef& range = locations[shard].first;
1634
1635 if( reverse )
1636 keys = KeyRangeRef( keys.begin, range.end );
1637 else
1638 keys = KeyRangeRef( range.begin, keys.end );
1639
1640 cx->invalidateCache( keys );
1641 wait( delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID ));
1642 break;
1643 } else {
1644 TraceEvent(SevInfo, "GetExactRangeError")
1645 .error(e)
1646 .detail("ShardBegin", locations[shard].first.begin)
1647 .detail("ShardEnd", locations[shard].first.end);
1648 throw;
1649 }
1650 }
1651 }
1652 }
1653 }
1654
resolveKey(Database const & cx,KeySelector const & key,Version const & version,TransactionInfo const & info)1655 Future<Key> resolveKey( Database const& cx, KeySelector const& key, Version const& version, TransactionInfo const& info ) {
1656 if( key.isFirstGreaterOrEqual() )
1657 return Future<Key>( key.getKey() );
1658
1659 if( key.isFirstGreaterThan() )
1660 return Future<Key>( keyAfter( key.getKey() ) );
1661
1662 return getKey( cx, key, version, info );
1663 }
1664
getRangeFallback(Database cx,Version version,KeySelector begin,KeySelector end,GetRangeLimits limits,bool reverse,TransactionInfo info)1665 ACTOR Future<Standalone<RangeResultRef>> getRangeFallback( Database cx, Version version,
1666 KeySelector begin, KeySelector end, GetRangeLimits limits, bool reverse, TransactionInfo info )
1667 {
1668 if(version == latestVersion) {
1669 state Transaction transaction(cx);
1670 transaction.setOption(FDBTransactionOptions::CAUSAL_READ_RISKY);
1671 transaction.setOption(FDBTransactionOptions::LOCK_AWARE);
1672 transaction.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
1673 Version ver = wait( transaction.getReadVersion() );
1674 version = ver;
1675 }
1676
1677 Future<Key> fb = resolveKey(cx, begin, version, info);
1678 state Future<Key> fe = resolveKey(cx, end, version, info);
1679
1680 state Key b = wait(fb);
1681 state Key e = wait(fe);
1682 if (b >= e) {
1683 return Standalone<RangeResultRef>();
1684 }
1685
1686 //if e is allKeys.end, we have read through the end of the database
1687 //if b is allKeys.begin, we have either read through the beginning of the database,
1688 //or allKeys.begin exists in the database and will be part of the conflict range anyways
1689
1690 Standalone<RangeResultRef> _r = wait( getExactRange(cx, version, KeyRangeRef(b, e), limits, reverse, info) );
1691 Standalone<RangeResultRef> r = _r;
1692
1693 if(b == allKeys.begin && ((reverse && !r.more) || !reverse))
1694 r.readToBegin = true;
1695 if(e == allKeys.end && ((!reverse && !r.more) || reverse))
1696 r.readThroughEnd = true;
1697
1698
1699 ASSERT( !limits.hasRowLimit() || r.size() <= limits.rows );
1700
1701 // If we were limiting bytes and the returned range is twice the request (plus 10K) log a warning
1702 if( limits.hasByteLimit() && r.expectedSize() > size_t(limits.bytes + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT + CLIENT_KNOBS->VALUE_SIZE_LIMIT + 1) && limits.minRows == 0 ) {
1703 TraceEvent(SevWarnAlways, "GetRangeFallbackTooMuchData")
1704 .detail("LimitBytes", limits.bytes)
1705 .detail("DeliveredBytes", r.expectedSize())
1706 .detail("LimitRows", limits.rows)
1707 .detail("DeliveredRows", r.size());
1708 }
1709
1710 return r;
1711 }
1712
getRangeFinished(Reference<TransactionLogInfo> trLogInfo,double startTime,KeySelector begin,KeySelector end,bool snapshot,Promise<std::pair<Key,Key>> conflictRange,bool reverse,Standalone<RangeResultRef> result)1713 void getRangeFinished(Reference<TransactionLogInfo> trLogInfo, double startTime, KeySelector begin, KeySelector end, bool snapshot,
1714 Promise<std::pair<Key, Key>> conflictRange, bool reverse, Standalone<RangeResultRef> result)
1715 {
1716 if( trLogInfo ) {
1717 int rangeSize = 0;
1718 for (const KeyValueRef &kv : result.contents())
1719 rangeSize += kv.key.size() + kv.value.size();
1720 trLogInfo->addLog(FdbClientLogEvents::EventGetRange(startTime, now()-startTime, rangeSize, begin.getKey(), end.getKey()));
1721 }
1722
1723 if( !snapshot ) {
1724 Key rangeBegin;
1725 Key rangeEnd;
1726
1727 if(result.readToBegin) {
1728 rangeBegin = allKeys.begin;
1729 }
1730 else if(((!reverse || !result.more || begin.offset > 1) && begin.offset > 0) || result.size() == 0) {
1731 rangeBegin = Key(begin.getKey(), begin.arena());
1732 }
1733 else {
1734 rangeBegin = reverse ? result.end()[-1].key : result[0].key;
1735 }
1736
1737 if(end.offset > begin.offset && end.getKey() < rangeBegin) {
1738 rangeBegin = Key(end.getKey(), end.arena());
1739 }
1740
1741 if(result.readThroughEnd) {
1742 rangeEnd = allKeys.end;
1743 }
1744 else if(((reverse || !result.more || end.offset <= 0) && end.offset <= 1) || result.size() == 0) {
1745 rangeEnd = Key(end.getKey(), end.arena());
1746 }
1747 else {
1748 rangeEnd = keyAfter(reverse ? result[0].key : result.end()[-1].key);
1749 }
1750
1751 if(begin.offset < end.offset && begin.getKey() > rangeEnd) {
1752 rangeEnd = Key(begin.getKey(), begin.arena());
1753 }
1754
1755 conflictRange.send(std::make_pair(rangeBegin, rangeEnd));
1756 }
1757 }
1758
getRange(Database cx,Reference<TransactionLogInfo> trLogInfo,Future<Version> fVersion,KeySelector begin,KeySelector end,GetRangeLimits limits,Promise<std::pair<Key,Key>> conflictRange,bool snapshot,bool reverse,TransactionInfo info)1759 ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<TransactionLogInfo> trLogInfo, Future<Version> fVersion,
1760 KeySelector begin, KeySelector end, GetRangeLimits limits, Promise<std::pair<Key, Key>> conflictRange, bool snapshot, bool reverse,
1761 TransactionInfo info )
1762 {
1763 state GetRangeLimits originalLimits( limits );
1764 state KeySelector originalBegin = begin;
1765 state KeySelector originalEnd = end;
1766 state Standalone<RangeResultRef> output;
1767
1768 try {
1769 state Version version = wait( fVersion );
1770 validateVersion(version);
1771
1772 state double startTime = now();
1773 state Version readVersion = version; // Needed for latestVersion requests; if more, make future requests at the version that the first one completed
1774 // FIXME: Is this really right? Weaken this and see if there is a problem; if so maybe there is a much subtler problem even with this.
1775
1776 if( begin.getKey() == allKeys.begin && begin.offset < 1 ) {
1777 output.readToBegin = true;
1778 begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena());
1779 }
1780
1781 ASSERT( !limits.isReached() );
1782 ASSERT( (!limits.hasRowLimit() || limits.rows >= limits.minRows) && limits.minRows >= 0 );
1783
1784 loop {
1785 if( end.getKey() == allKeys.begin && (end.offset < 1 || end.isFirstGreaterOrEqual()) ) {
1786 getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1787 return output;
1788 }
1789
1790 Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena());
1791 bool locationBackward = reverse ? (end-1).isBackward() : begin.isBackward();
1792 state pair<KeyRange, Reference<LocationInfo>> beginServer = wait( getKeyLocation( cx, locationKey, &StorageServerInterface::getKeyValues, info, locationBackward ) );
1793 state KeyRange shard = beginServer.first;
1794 state bool modifiedSelectors = false;
1795 state GetKeyValuesRequest req;
1796
1797 req.version = readVersion;
1798
1799 if( reverse && (begin-1).isDefinitelyLess(shard.begin) &&
1800 ( !begin.isFirstGreaterOrEqual() || begin.getKey() != shard.begin ) ) { //In this case we would be setting modifiedSelectors to true, but not modifying anything
1801
1802 req.begin = firstGreaterOrEqual( shard.begin );
1803 modifiedSelectors = true;
1804 }
1805 else req.begin = begin;
1806
1807 if( !reverse && end.isDefinitelyGreater(shard.end) ) {
1808 req.end = firstGreaterOrEqual( shard.end );
1809 modifiedSelectors = true;
1810 }
1811 else req.end = end;
1812
1813 transformRangeLimits(limits, reverse, req);
1814 ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
1815
1816 req.debugID = info.debugID;
1817 try {
1818 if( info.debugID.present() ) {
1819 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
1820 /*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get())
1821 .detail("ReqBeginKey", req.begin.getKey())
1822 .detail("ReqEndKey", req.end.getKey())
1823 .detail("OriginalBegin", originalBegin.toString())
1824 .detail("OriginalEnd", originalEnd.toString())
1825 .detail("Begin", begin.toString())
1826 .detail("End", end.toString())
1827 .detail("Shard", shard)
1828 .detail("ReqLimit", req.limit)
1829 .detail("ReqLimitBytes", req.limitBytes)
1830 .detail("ReqVersion", req.version)
1831 .detail("Reverse", reverse)
1832 .detail("ModifiedSelectors", modifiedSelectors)
1833 .detail("Servers", beginServer.second->description());*/
1834 }
1835
1836 ++cx->transactionPhysicalReads;
1837 GetKeyValuesReply rep = wait( loadBalance(beginServer.second, &StorageServerInterface::getKeyValues, req, TaskDefaultPromiseEndpoint, false, cx->enableLocalityLoadBalance ? &cx->queueModel : NULL ) );
1838
1839 if( info.debugID.present() ) {
1840 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size());
1841 /*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get())
1842 .detail("ReqBeginKey", req.begin.getKey())
1843 .detail("ReqEndKey", req.end.getKey())
1844 .detail("RepIsMore", rep.more)
1845 .detail("VersionReturned", rep.version)
1846 .detail("RowsReturned", rep.data.size());*/
1847 }
1848
1849 ASSERT( !rep.more || rep.data.size() );
1850 ASSERT( !limits.hasRowLimit() || rep.data.size() <= limits.rows );
1851
1852 limits.decrement( rep.data );
1853
1854 if(reverse && begin.isLastLessOrEqual() && rep.data.size() && rep.data.end()[-1].key == begin.getKey()) {
1855 modifiedSelectors = false;
1856 }
1857
1858 bool finished = limits.isReached() || ( !modifiedSelectors && !rep.more ) || limits.hasSatisfiedMinRows();
1859 bool readThrough = modifiedSelectors && !rep.more;
1860
1861 // optimization: first request got all data--just return it
1862 if( finished && !output.size() ) {
1863 bool readToBegin = output.readToBegin;
1864 bool readThroughEnd = output.readThroughEnd;
1865
1866 output = Standalone<RangeResultRef>( RangeResultRef( rep.data, modifiedSelectors || limits.isReached() || rep.more ), rep.arena );
1867 output.readToBegin = readToBegin;
1868 output.readThroughEnd = readThroughEnd;
1869
1870 if( BUGGIFY && limits.hasByteLimit() && output.size() > std::max(1, originalLimits.minRows) ) {
1871 output.more = true;
1872 output.resize(output.arena(), g_random->randomInt(std::max(1,originalLimits.minRows),output.size()));
1873 getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1874 return output;
1875 }
1876
1877 if( readThrough ) {
1878 output.arena().dependsOn( shard.arena() );
1879 output.readThrough = reverse ? shard.begin : shard.end;
1880 }
1881
1882 getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1883 return output;
1884 }
1885
1886 output.arena().dependsOn( rep.arena );
1887 output.append(output.arena(), rep.data.begin(), rep.data.size());
1888
1889 if( finished ) {
1890 if( readThrough ) {
1891 output.arena().dependsOn( shard.arena() );
1892 output.readThrough = reverse ? shard.begin : shard.end;
1893 }
1894 output.more = modifiedSelectors || limits.isReached() || rep.more;
1895
1896 getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, output);
1897 return output;
1898 }
1899
1900 readVersion = rep.version; // see above comment
1901
1902 if( !rep.more ) {
1903 ASSERT( modifiedSelectors );
1904 TEST(true); // !GetKeyValuesReply.more and modifiedSelectors in getRange
1905
1906 if( !rep.data.size() ) {
1907 Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
1908 getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
1909 return result;
1910 }
1911
1912 if( reverse )
1913 end = firstGreaterOrEqual( shard.begin );
1914 else
1915 begin = firstGreaterOrEqual( shard.end );
1916 } else {
1917 TEST(true); // GetKeyValuesReply.more in getRange
1918 if( reverse )
1919 end = firstGreaterOrEqual( output[output.size()-1].key );
1920 else
1921 begin = firstGreaterThan( output[output.size()-1].key );
1922 }
1923
1924
1925 } catch ( Error& e ) {
1926 if( info.debugID.present() ) {
1927 g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error");
1928 TraceEvent("TransactionDebugError", info.debugID.get()).error(e);
1929 }
1930 if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
1931 (e.code() == error_code_transaction_too_old && readVersion == latestVersion))
1932 {
1933 cx->invalidateCache( reverse ? end.getKey() : begin.getKey(), reverse ? (end-1).isBackward() : begin.isBackward() );
1934
1935 if (e.code() == error_code_wrong_shard_server) {
1936 Standalone<RangeResultRef> result = wait( getRangeFallback(cx, version, originalBegin, originalEnd, originalLimits, reverse, info ) );
1937 getRangeFinished(trLogInfo, startTime, originalBegin, originalEnd, snapshot, conflictRange, reverse, result);
1938 return result;
1939 }
1940
1941 wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
1942 } else {
1943 if (trLogInfo)
1944 trLogInfo->addLog(FdbClientLogEvents::EventGetRangeError(startTime, static_cast<int>(e.code()), begin.getKey(), end.getKey()));
1945
1946 throw e;
1947 }
1948 }
1949 }
1950 }
1951 catch(Error &e) {
1952 if(conflictRange.canBeSet()) {
1953 conflictRange.send(std::make_pair(Key(), Key()));
1954 }
1955
1956 throw;
1957 }
1958 }
1959
getRange(Database const & cx,Future<Version> const & fVersion,KeySelector const & begin,KeySelector const & end,GetRangeLimits const & limits,bool const & reverse,TransactionInfo const & info)1960 Future<Standalone<RangeResultRef>> getRange( Database const& cx, Future<Version> const& fVersion, KeySelector const& begin, KeySelector const& end,
1961 GetRangeLimits const& limits, bool const& reverse, TransactionInfo const& info )
1962 {
1963 return getRange(cx, Reference<TransactionLogInfo>(), fVersion, begin, end, limits, Promise<std::pair<Key, Key>>(), true, reverse, info);
1964 }
1965
Transaction(Database const & cx)1966 Transaction::Transaction( Database const& cx )
1967 : cx(cx), info(cx->taskID), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), committedVersion(invalidVersion), versionstampPromise(Promise<Standalone<StringRef>>()), options(cx), numErrors(0), trLogInfo(createTrLogInfoProbabilistically(cx))
1968 {
1969 setPriority(GetReadVersionRequest::PRIORITY_DEFAULT);
1970 }
1971
~Transaction()1972 Transaction::~Transaction() {
1973 flushTrLogsIfEnabled();
1974 cancelWatches();
1975 }
1976
operator =(Transaction && r)1977 void Transaction::operator=(Transaction&& r) BOOST_NOEXCEPT {
1978 flushTrLogsIfEnabled();
1979 cx = std::move(r.cx);
1980 tr = std::move(r.tr);
1981 readVersion = std::move(r.readVersion);
1982 metadataVersion = std::move(r.metadataVersion);
1983 extraConflictRanges = std::move(r.extraConflictRanges);
1984 commitResult = std::move(r.commitResult);
1985 committing = std::move(r.committing);
1986 options = std::move(r.options);
1987 info = r.info;
1988 backoff = r.backoff;
1989 numErrors = r.numErrors;
1990 committedVersion = r.committedVersion;
1991 versionstampPromise = std::move(r.versionstampPromise);
1992 watches = r.watches;
1993 trLogInfo = std::move(r.trLogInfo);
1994 }
1995
flushTrLogsIfEnabled()1996 void Transaction::flushTrLogsIfEnabled() {
1997 if (trLogInfo && trLogInfo->logsAdded && trLogInfo->trLogWriter.getData()) {
1998 ASSERT(trLogInfo->flushed == false);
1999 cx->clientStatusUpdater.inStatusQ.push_back({ trLogInfo->identifier, std::move(trLogInfo->trLogWriter) });
2000 trLogInfo->flushed = true;
2001 }
2002 }
2003
setVersion(Version v)2004 void Transaction::setVersion( Version v ) {
2005 startTime = now();
2006 if (readVersion.isValid())
2007 throw read_version_already_set();
2008 if (v <= 0)
2009 throw version_invalid();
2010 readVersion = v;
2011 }
2012
get(const Key & key,bool snapshot)2013 Future<Optional<Value>> Transaction::get( const Key& key, bool snapshot ) {
2014 ++cx->transactionLogicalReads;
2015 //ASSERT (key < allKeys.end);
2016
2017 //There are no keys in the database with size greater than KEY_SIZE_LIMIT
2018 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2019 return Optional<Value>();
2020
2021 auto ver = getReadVersion();
2022
2023 /* if (!systemKeys.contains(key))
2024 return Optional<Value>(Value()); */
2025
2026 if( !snapshot )
2027 tr.transaction.read_conflict_ranges.push_back(tr.arena, singleKeyRange(key, tr.arena));
2028
2029 if(key == metadataVersionKey) {
2030 if(!ver.isReady() || metadataVersion.isSet()) {
2031 return metadataVersion.getFuture();
2032 } else {
2033 if(ver.isError()) return ver.getError();
2034 if(ver.get() == cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
2035 return cx->metadataVersionCache[cx->mvCacheInsertLocation].second;
2036 }
2037
2038 Version v = ver.get();
2039 int hi = cx->mvCacheInsertLocation;
2040 int lo = (cx->mvCacheInsertLocation+1)%cx->metadataVersionCache.size();
2041
2042 while(hi!=lo) {
2043 int cu = hi > lo ? (hi + lo)/2 : ((hi + cx->metadataVersionCache.size() + lo)/2)%cx->metadataVersionCache.size();
2044 if(v == cx->metadataVersionCache[cu].first) {
2045 return cx->metadataVersionCache[cu].second;
2046 }
2047 if(cu == lo) {
2048 break;
2049 }
2050 if(v < cx->metadataVersionCache[cu].first) {
2051 hi = cu;
2052 } else {
2053 lo = (cu+1)%cx->metadataVersionCache.size();
2054 }
2055 }
2056 }
2057 }
2058
2059 return getValue( ver, key, cx, info, trLogInfo );
2060 }
2061
setWatch(Future<Void> watchFuture)2062 void Watch::setWatch(Future<Void> watchFuture) {
2063 this->watchFuture = watchFuture;
2064
2065 //Cause the watch loop to go around and start waiting on watchFuture
2066 onSetWatchTrigger.send(Void());
2067 }
2068
2069 //FIXME: This seems pretty horrible. Now a Database can't die until all of its watches do...
watch(Reference<Watch> watch,Database cx,Transaction * self)2070 ACTOR Future<Void> watch( Reference<Watch> watch, Database cx, Transaction *self ) {
2071 cx->addWatch();
2072 try {
2073 self->watches.push_back(watch);
2074
2075 choose {
2076 // RYOW write to value that is being watched (if applicable)
2077 // Errors
2078 when(wait(watch->onChangeTrigger.getFuture())) { }
2079
2080 // NativeAPI finished commit and updated watchFuture
2081 when(wait(watch->onSetWatchTrigger.getFuture())) {
2082
2083 // NativeAPI watchValue future finishes or errors
2084 wait(watch->watchFuture);
2085 }
2086 }
2087 }
2088 catch(Error &e) {
2089 cx->removeWatch();
2090 throw;
2091 }
2092
2093 cx->removeWatch();
2094 return Void();
2095 }
2096
watch(Reference<Watch> watch)2097 Future< Void > Transaction::watch( Reference<Watch> watch ) {
2098 return ::watch(watch, cx, this);
2099 }
2100
getAddressesForKeyActor(Key key,Future<Version> ver,Database cx,TransactionInfo info)2101 ACTOR Future< Standalone< VectorRef< const char*>>> getAddressesForKeyActor( Key key, Future<Version> ver, Database cx, TransactionInfo info ) {
2102 state vector<StorageServerInterface> ssi;
2103
2104 // If key >= allKeys.end, then getRange will return a kv-pair with an empty value. This will result in our serverInterfaces vector being empty, which will cause us to return an empty addresses list.
2105
2106 state Key ksKey = keyServersKey(key);
2107 Future<Standalone<RangeResultRef>> futureServerUids = getRange(cx, ver, lastLessOrEqual(ksKey), firstGreaterThan(ksKey), GetRangeLimits(1), false, info);
2108 Standalone<RangeResultRef> serverUids = wait( futureServerUids );
2109
2110 ASSERT( serverUids.size() ); // every shard needs to have a team
2111
2112 vector<UID> src;
2113 vector<UID> ignore; // 'ignore' is so named because it is the vector into which we decode the 'dest' servers in the case where this key is being relocated. But 'src' is the canonical location until the move is finished, because it could be cancelled at any time.
2114 decodeKeyServersValue(serverUids[0].value, src, ignore);
2115 Optional<vector<StorageServerInterface>> serverInterfaces = wait( transactionalGetServerInterfaces(ver, cx, info, src) );
2116
2117 ASSERT( serverInterfaces.present() ); // since this is happening transactionally, /FF/keyServers and /FF/serverList need to be consistent with one another
2118 ssi = serverInterfaces.get();
2119
2120 Standalone<VectorRef<const char*>> addresses;
2121 for (auto i : ssi) {
2122 std::string ipString = i.address().ip.toString();
2123 char* c_string = new (addresses.arena()) char[ipString.length()+1];
2124 strcpy(c_string, ipString.c_str());
2125 addresses.push_back(addresses.arena(), c_string);
2126 }
2127 return addresses;
2128 }
2129
getAddressesForKey(const Key & key)2130 Future< Standalone< VectorRef< const char*>>> Transaction::getAddressesForKey( const Key& key ) {
2131 ++cx->transactionLogicalReads;
2132 auto ver = getReadVersion();
2133
2134 return getAddressesForKeyActor(key, ver, cx, info);
2135 }
2136
getKeyAndConflictRange(Database cx,KeySelector k,Future<Version> version,Promise<std::pair<Key,Key>> conflictRange,TransactionInfo info)2137 ACTOR Future< Key > getKeyAndConflictRange(
2138 Database cx, KeySelector k, Future<Version> version, Promise<std::pair<Key, Key>> conflictRange, TransactionInfo info)
2139 {
2140 try {
2141 Key rep = wait( getKey(cx, k, version, info) );
2142 if( k.offset <= 0 )
2143 conflictRange.send( std::make_pair( rep, k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()) ) );
2144 else
2145 conflictRange.send( std::make_pair( k.orEqual ? keyAfter( k.getKey() ) : Key(k.getKey(), k.arena()), keyAfter( rep ) ) );
2146 return std::move(rep);
2147 } catch( Error&e ) {
2148 conflictRange.send(std::make_pair(Key(), Key()));
2149 throw;
2150 }
2151 }
2152
getKey(const KeySelector & key,bool snapshot)2153 Future< Key > Transaction::getKey( const KeySelector& key, bool snapshot ) {
2154 ++cx->transactionLogicalReads;
2155 if( snapshot )
2156 return ::getKey(cx, key, getReadVersion(), info);
2157
2158 Promise<std::pair<Key, Key>> conflictRange;
2159 extraConflictRanges.push_back( conflictRange.getFuture() );
2160 return getKeyAndConflictRange( cx, key, getReadVersion(), conflictRange, info );
2161 }
2162
getRange(const KeySelector & begin,const KeySelector & end,GetRangeLimits limits,bool snapshot,bool reverse)2163 Future< Standalone<RangeResultRef> > Transaction::getRange(
2164 const KeySelector& begin,
2165 const KeySelector& end,
2166 GetRangeLimits limits,
2167 bool snapshot,
2168 bool reverse )
2169 {
2170 ++cx->transactionLogicalReads;
2171
2172 if( limits.isReached() )
2173 return Standalone<RangeResultRef>();
2174
2175 if( !limits.isValid() )
2176 return range_limits_invalid();
2177
2178 ASSERT(limits.rows != 0);
2179
2180 KeySelector b = begin;
2181 if( b.orEqual ) {
2182 TEST(true); // Native begin orEqual==true
2183 b.removeOrEqual(b.arena());
2184 }
2185
2186 KeySelector e = end;
2187 if( e.orEqual ) {
2188 TEST(true); // Native end orEqual==true
2189 e.removeOrEqual(e.arena());
2190 }
2191
2192 if( b.offset >= e.offset && b.getKey() >= e.getKey() ) {
2193 TEST(true); // Native range inverted
2194 return Standalone<RangeResultRef>();
2195 }
2196
2197 Promise<std::pair<Key, Key>> conflictRange;
2198 if(!snapshot) {
2199 extraConflictRanges.push_back( conflictRange.getFuture() );
2200 }
2201
2202 return ::getRange(cx, trLogInfo, getReadVersion(), b, e, limits, conflictRange, snapshot, reverse, info);
2203 }
2204
getRange(const KeySelector & begin,const KeySelector & end,int limit,bool snapshot,bool reverse)2205 Future< Standalone<RangeResultRef> > Transaction::getRange(
2206 const KeySelector& begin,
2207 const KeySelector& end,
2208 int limit,
2209 bool snapshot,
2210 bool reverse )
2211 {
2212 return getRange( begin, end, GetRangeLimits( limit ), snapshot, reverse );
2213 }
2214
addReadConflictRange(KeyRangeRef const & keys)2215 void Transaction::addReadConflictRange( KeyRangeRef const& keys ) {
2216 ASSERT( !keys.empty() );
2217
2218 //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
2219 //we can translate it to an equivalent one with smaller keys
2220 KeyRef begin = keys.begin;
2221 KeyRef end = keys.end;
2222
2223 if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2224 begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2225 if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2226 end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2227
2228 KeyRangeRef r = KeyRangeRef(begin, end);
2229
2230 if(r.empty()) {
2231 return;
2232 }
2233
2234 tr.transaction.read_conflict_ranges.push_back_deep( tr.arena, r );
2235 }
2236
makeSelfConflicting()2237 void Transaction::makeSelfConflicting() {
2238 BinaryWriter wr(Unversioned());
2239 wr.serializeBytes(LiteralStringRef("\xFF/SC/"));
2240 wr << g_random->randomUniqueID();
2241 auto r = singleKeyRange( wr.toValue(), tr.arena );
2242 tr.transaction.read_conflict_ranges.push_back( tr.arena, r );
2243 tr.transaction.write_conflict_ranges.push_back( tr.arena, r );
2244 }
2245
set(const KeyRef & key,const ValueRef & value,bool addConflictRange)2246 void Transaction::set( const KeyRef& key, const ValueRef& value, bool addConflictRange ) {
2247
2248 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2249 throw key_too_large();
2250 if(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
2251 throw value_too_large();
2252
2253 auto &req = tr;
2254 auto &t = req.transaction;
2255 auto r = singleKeyRange( key, req.arena );
2256 auto v = ValueRef( req.arena, value );
2257 t.mutations.push_back( req.arena, MutationRef( MutationRef::SetValue, r.begin, v ) );
2258
2259 if( addConflictRange ) {
2260 t.write_conflict_ranges.push_back( req.arena, r );
2261 }
2262 }
2263
atomicOp(const KeyRef & key,const ValueRef & operand,MutationRef::Type operationType,bool addConflictRange)2264 void Transaction::atomicOp(const KeyRef& key, const ValueRef& operand, MutationRef::Type operationType, bool addConflictRange) {
2265 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2266 throw key_too_large();
2267 if(operand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
2268 throw value_too_large();
2269
2270 if (apiVersionAtLeast(510)) {
2271 if (operationType == MutationRef::Min)
2272 operationType = MutationRef::MinV2;
2273 else if (operationType == MutationRef::And)
2274 operationType = MutationRef::AndV2;
2275 }
2276
2277 auto &req = tr;
2278 auto &t = req.transaction;
2279 auto r = singleKeyRange( key, req.arena );
2280 auto v = ValueRef( req.arena, operand );
2281
2282 t.mutations.push_back( req.arena, MutationRef( operationType, r.begin, v ) );
2283
2284 if( addConflictRange )
2285 t.write_conflict_ranges.push_back( req.arena, r );
2286
2287 TEST(true); //NativeAPI atomic operation
2288 }
2289
clear(const KeyRangeRef & range,bool addConflictRange)2290 void Transaction::clear( const KeyRangeRef& range, bool addConflictRange ) {
2291 auto &req = tr;
2292 auto &t = req.transaction;
2293
2294 KeyRef begin = range.begin;
2295 KeyRef end = range.end;
2296
2297 //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
2298 //we can translate it to an equivalent one with smaller keys
2299 if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2300 begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2301 if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2302 end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
2303
2304 auto r = KeyRangeRef( req.arena, KeyRangeRef(begin, end) );
2305 if (r.empty()) return;
2306
2307 t.mutations.push_back( req.arena, MutationRef( MutationRef::ClearRange, r.begin, r.end ) );
2308
2309 if(addConflictRange)
2310 t.write_conflict_ranges.push_back( req.arena, r );
2311 }
clear(const KeyRef & key,bool addConflictRange)2312 void Transaction::clear( const KeyRef& key, bool addConflictRange ) {
2313
2314 //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT
2315 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2316 return;
2317
2318 auto &req = tr;
2319 auto &t = req.transaction;
2320
2321 //efficient single key range clear range mutation, see singleKeyRange
2322 uint8_t* data = new ( req.arena ) uint8_t[ key.size()+1 ];
2323 memcpy(data, key.begin(), key.size() );
2324 data[key.size()] = 0;
2325 t.mutations.push_back( req.arena, MutationRef( MutationRef::ClearRange, KeyRef(data,key.size()), KeyRef(data, key.size()+1)) );
2326
2327 if(addConflictRange)
2328 t.write_conflict_ranges.push_back( req.arena, KeyRangeRef( KeyRef(data,key.size()), KeyRef(data, key.size()+1) ) );
2329 }
addWriteConflictRange(const KeyRangeRef & keys)2330 void Transaction::addWriteConflictRange( const KeyRangeRef& keys ) {
2331 ASSERT( !keys.empty() );
2332 auto &req = tr;
2333 auto &t = req.transaction;
2334
2335 //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
2336 //we can translate it to an equivalent one with smaller keys
2337 KeyRef begin = keys.begin;
2338 KeyRef end = keys.end;
2339
2340 if (begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2341 begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT) + 1);
2342 if (end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
2343 end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT) + 1);
2344
2345 KeyRangeRef r = KeyRangeRef(begin, end);
2346
2347 if (r.empty()) {
2348 return;
2349 }
2350
2351 t.write_conflict_ranges.push_back_deep( req.arena, r );
2352 }
2353
getBackoff(int errCode)2354 double Transaction::getBackoff(int errCode) {
2355 double b = backoff * g_random->random01();
2356 backoff = errCode == error_code_proxy_memory_limit_exceeded ? std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, CLIENT_KNOBS->RESOURCE_CONSTRAINED_MAX_BACKOFF) :
2357 std::min(backoff * CLIENT_KNOBS->BACKOFF_GROWTH_RATE, options.maxBackoff);
2358 return b;
2359 }
2360
TransactionOptions(Database const & cx)2361 TransactionOptions::TransactionOptions(Database const& cx) {
2362 maxBackoff = cx->transactionMaxBackoff;
2363 reset(cx);
2364 if (BUGGIFY) {
2365 commitOnFirstProxy = true;
2366 }
2367 }
2368
TransactionOptions()2369 TransactionOptions::TransactionOptions() {
2370 memset(this, 0, sizeof(*this));
2371 maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF;
2372 }
2373
reset(Database const & cx)2374 void TransactionOptions::reset(Database const& cx) {
2375 double oldMaxBackoff = maxBackoff;
2376 memset(this, 0, sizeof(*this));
2377 maxBackoff = cx->apiVersionAtLeast(610) ? oldMaxBackoff : cx->transactionMaxBackoff;
2378 lockAware = cx->lockAware;
2379 }
2380
reset()2381 void Transaction::reset() {
2382 tr = CommitTransactionRequest();
2383 readVersion = Future<Version>();
2384 metadataVersion = Promise<Optional<Key>>();
2385 extraConflictRanges.clear();
2386 versionstampPromise = Promise<Standalone<StringRef>>();
2387 commitResult = Promise<Void>();
2388 committing = Future<Void>();
2389 info.taskID = cx->taskID;
2390 info.debugID = Optional<UID>();
2391 flushTrLogsIfEnabled();
2392 trLogInfo = Reference<TransactionLogInfo>(createTrLogInfoProbabilistically(cx));
2393 cancelWatches();
2394
2395 if(apiVersionAtLeast(16)) {
2396 options.reset(cx);
2397 setPriority(GetReadVersionRequest::PRIORITY_DEFAULT);
2398 }
2399 }
2400
fullReset()2401 void Transaction::fullReset() {
2402 reset();
2403 backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
2404 options.maxBackoff = getDatabase()->transactionMaxBackoff;
2405 }
2406
apiVersionAtLeast(int minVersion) const2407 int Transaction::apiVersionAtLeast(int minVersion) const {
2408 return cx->apiVersionAtLeast(minVersion);
2409 }
2410
2411 class MutationBlock {
2412 public:
2413 bool mutated;
2414 bool cleared;
2415 ValueRef setValue;
2416
MutationBlock()2417 MutationBlock() : mutated(false) {}
MutationBlock(bool _cleared)2418 MutationBlock(bool _cleared) : mutated(true), cleared(_cleared) {}
MutationBlock(ValueRef value)2419 MutationBlock(ValueRef value) : mutated(true), cleared(false), setValue(value) {}
2420 };
2421
compareBegin(KeyRangeRef lhs,KeyRangeRef rhs)2422 bool compareBegin( KeyRangeRef lhs, KeyRangeRef rhs ) { return lhs.begin < rhs.begin; }
2423
2424 // If there is any intersection between the two given sets of ranges, returns a range that
2425 // falls within the intersection
intersects(VectorRef<KeyRangeRef> lhs,VectorRef<KeyRangeRef> rhs)2426 Optional<KeyRangeRef> intersects(VectorRef<KeyRangeRef> lhs, VectorRef<KeyRangeRef> rhs) {
2427 if( lhs.size() && rhs.size() ) {
2428 std::sort( lhs.begin(), lhs.end(), compareBegin );
2429 std::sort( rhs.begin(), rhs.end(), compareBegin );
2430
2431 int l = 0, r = 0;
2432 while(l < lhs.size() && r < rhs.size()) {
2433 if( lhs[l].end <= rhs[r].begin )
2434 l++;
2435 else if( rhs[r].end <= lhs[l].begin )
2436 r++;
2437 else
2438 return lhs[l] & rhs[r];
2439 }
2440 }
2441
2442 return Optional<KeyRangeRef>();
2443 }
2444
checkWrites(Database cx,Future<Void> committed,Promise<Void> outCommitted,CommitTransactionRequest req,Transaction * checkTr)2445 ACTOR void checkWrites( Database cx, Future<Void> committed, Promise<Void> outCommitted, CommitTransactionRequest req, Transaction* checkTr )
2446 {
2447 state Version version;
2448 try {
2449 wait( committed );
2450 // If the commit is successful, by definition the transaction still exists for now. Grab the version, and don't use it again.
2451 version = checkTr->getCommittedVersion();
2452 outCommitted.send(Void());
2453 } catch (Error& e) {
2454 outCommitted.sendError(e);
2455 return;
2456 }
2457
2458 wait( delay( g_random->random01() ) ); // delay between 0 and 1 seconds
2459
2460 //Future<Optional<Version>> version, Database cx, CommitTransactionRequest req ) {
2461 state KeyRangeMap<MutationBlock> expectedValues;
2462
2463 auto &mutations = req.transaction.mutations;
2464 state int mCount = mutations.size(); // debugging info for traceEvent
2465
2466 for( int idx = 0; idx < mutations.size(); idx++) {
2467 if( mutations[idx].type == MutationRef::SetValue )
2468 expectedValues.insert( singleKeyRange( mutations[idx].param1 ),
2469 MutationBlock( mutations[idx].param2 ) );
2470 else if( mutations[idx].type == MutationRef::ClearRange )
2471 expectedValues.insert( KeyRangeRef( mutations[idx].param1, mutations[idx].param2 ),
2472 MutationBlock( true ) );
2473 }
2474
2475 try {
2476 state Transaction tr(cx);
2477 tr.setVersion( version );
2478 state int checkedRanges = 0;
2479 state KeyRangeMap<MutationBlock>::Ranges ranges = expectedValues.ranges();
2480 state KeyRangeMap<MutationBlock>::Iterator it = ranges.begin();
2481 for(; it != ranges.end(); ++it) {
2482 state MutationBlock m = it->value();
2483 if( m.mutated ) {
2484 checkedRanges++;
2485 if( m.cleared ) {
2486 Standalone<RangeResultRef> shouldBeEmpty = wait(
2487 tr.getRange( it->range(), 1 ) );
2488 if( shouldBeEmpty.size() ) {
2489 TraceEvent(SevError, "CheckWritesFailed").detail("Class", "Clear").detail("KeyBegin", it->range().begin)
2490 .detail("KeyEnd", it->range().end);
2491 return;
2492 }
2493 } else {
2494 Optional<Value> val = wait( tr.get( it->range().begin ) );
2495 if( !val.present() || val.get() != m.setValue ) {
2496 TraceEvent evt = TraceEvent(SevError, "CheckWritesFailed")
2497 .detail("Class", "Set")
2498 .detail("Key", it->range().begin)
2499 .detail("Expected", m.setValue);
2500 if( !val.present() )
2501 evt.detail("Actual", "_Value Missing_");
2502 else
2503 evt.detail("Actual", val.get());
2504 return;
2505 }
2506 }
2507 }
2508 }
2509 TraceEvent("CheckWritesSuccess").detail("Version", version).detail("MutationCount", mCount).detail("CheckedRanges", checkedRanges);
2510 } catch( Error& e ) {
2511 bool ok = e.code() == error_code_transaction_too_old || e.code() == error_code_future_version;
2512 TraceEvent( ok ? SevWarn : SevError, "CheckWritesFailed" ).error(e);
2513 throw;
2514 }
2515 }
2516
commitDummyTransaction(Database cx,KeyRange range,TransactionInfo info,TransactionOptions options)2517 ACTOR static Future<Void> commitDummyTransaction( Database cx, KeyRange range, TransactionInfo info, TransactionOptions options ) {
2518 state Transaction tr(cx);
2519 state int retries = 0;
2520 loop {
2521 try {
2522 TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries);
2523 tr.options = options;
2524 tr.info.taskID = info.taskID;
2525 tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
2526 tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY );
2527 tr.setOption( FDBTransactionOptions::LOCK_AWARE );
2528 tr.addReadConflictRange(range);
2529 tr.addWriteConflictRange(range);
2530 wait( tr.commit() );
2531 return Void();
2532 } catch (Error& e) {
2533 TraceEvent("CommitDummyTransactionError").error(e,true).detail("Key", range.begin).detail("Retries", retries);
2534 wait( tr.onError(e) );
2535 }
2536 ++retries;
2537 }
2538 }
2539
cancelWatches(Error const & e)2540 void Transaction::cancelWatches(Error const& e) {
2541 for(int i = 0; i < watches.size(); ++i)
2542 if(!watches[i]->onChangeTrigger.isSet())
2543 watches[i]->onChangeTrigger.sendError(e);
2544
2545 watches.clear();
2546 }
2547
setupWatches()2548 void Transaction::setupWatches() {
2549 try {
2550 Future<Version> watchVersion = getCommittedVersion() > 0 ? getCommittedVersion() : getReadVersion();
2551
2552 for(int i = 0; i < watches.size(); ++i)
2553 watches[i]->setWatch(watchValue( watchVersion, watches[i]->key, watches[i]->value, cx, options.getReadVersionFlags, info ));
2554
2555 watches.clear();
2556 }
2557 catch(Error&) {
2558 ASSERT(false); // The above code must NOT throw because commit has already occured.
2559 throw internal_error();
2560 }
2561 }
2562
tryCommit(Database cx,Reference<TransactionLogInfo> trLogInfo,CommitTransactionRequest req,Future<Version> readVersion,TransactionInfo info,Version * pCommittedVersion,Transaction * tr,TransactionOptions options)2563 ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
2564 state TraceInterval interval( "TransactionCommit" );
2565 state double startTime;
2566 if (info.debugID.present())
2567 TraceEvent(interval.begin()).detail( "Parent", info.debugID.get() );
2568
2569 try {
2570 Version v = wait( readVersion );
2571 req.transaction.read_snapshot = v;
2572
2573 startTime = now();
2574 state Optional<UID> commitID = Optional<UID>();
2575 if(info.debugID.present()) {
2576 commitID = g_nondeterministic_random->randomUniqueID();
2577 g_traceBatch.addAttach("CommitAttachID", info.debugID.get().first(), commitID.get().first());
2578 g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.Before");
2579 }
2580
2581 req.debugID = commitID;
2582 state Future<CommitID> reply;
2583 if (options.commitOnFirstProxy) {
2584 const std::vector<MasterProxyInterface>& proxies = cx->clientInfo->get().proxies;
2585 reply = proxies.size() ? throwErrorOr ( brokenPromiseToMaybeDelivered ( proxies[0].commit.tryGetReply(req) ) ) : Never();
2586 } else {
2587 reply = loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::commit, req, TaskDefaultPromiseEndpoint, true );
2588 }
2589
2590 choose {
2591 when ( wait( cx->onMasterProxiesChanged() ) ) {
2592 reply.cancel();
2593 throw request_maybe_delivered();
2594 }
2595 when (CommitID ci = wait( reply )) {
2596 Version v = ci.version;
2597 if (v != invalidVersion) {
2598 if (info.debugID.present())
2599 TraceEvent(interval.end()).detail("CommittedVersion", v);
2600 *pCommittedVersion = v;
2601 if(v > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
2602 cx->mvCacheInsertLocation = (cx->mvCacheInsertLocation + 1)%cx->metadataVersionCache.size();
2603 cx->metadataVersionCache[cx->mvCacheInsertLocation] = std::make_pair(v, ci.metadataVersion);
2604 }
2605
2606 Standalone<StringRef> ret = makeString(10);
2607 placeVersionstamp(mutateString(ret), v, ci.txnBatchId);
2608 tr->versionstampPromise.send(ret);
2609
2610 tr->numErrors = 0;
2611 cx->transactionsCommitCompleted++;
2612 cx->transactionCommittedMutations += req.transaction.mutations.size();
2613 cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize();
2614
2615 if(info.debugID.present())
2616 g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
2617
2618 double latency = now() - startTime;
2619 cx->commitLatencies.addSample(latency);
2620 cx->latencies.addSample(now() - tr->startTime);
2621 if (trLogInfo)
2622 trLogInfo->addLog(FdbClientLogEvents::EventCommit(startTime, latency, req.transaction.mutations.size(), req.transaction.mutations.expectedSize(), req));
2623 return Void();
2624 } else {
2625 if (info.debugID.present())
2626 TraceEvent(interval.end()).detail("Conflict", 1);
2627
2628 if(info.debugID.present())
2629 g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
2630
2631 throw not_committed();
2632 }
2633 }
2634 }
2635 } catch (Error& e) {
2636 if (e.code() == error_code_request_maybe_delivered || e.code() == error_code_commit_unknown_result) {
2637 // We don't know if the commit happened, and it might even still be in flight.
2638
2639 if (!options.causalWriteRisky) {
2640 // Make sure it's not still in flight, either by ensuring the master we submitted to is dead, or the version we submitted with is dead, or by committing a conflicting transaction successfully
2641 //if ( cx->getMasterProxies()->masterGeneration <= originalMasterGeneration )
2642
2643 // To ensure the original request is not in flight, we need a key range which intersects its read conflict ranges
2644 // We pick a key range which also intersects its write conflict ranges, since that avoids potentially creating conflicts where there otherwise would be none
2645 // We make the range as small as possible (a single key range) to minimize conflicts
2646 // The intersection will never be empty, because if it were (since !causalWriteRisky) makeSelfConflicting would have been applied automatically to req
2647 KeyRangeRef selfConflictingRange = intersects( req.transaction.write_conflict_ranges, req.transaction.read_conflict_ranges ).get();
2648
2649 TEST(true); // Waiting for dummy transaction to report commit_unknown_result
2650
2651 wait( commitDummyTransaction( cx, singleKeyRange(selfConflictingRange.begin), info, tr->options ) );
2652 }
2653
2654 // The user needs to be informed that we aren't sure whether the commit happened. Standard retry loops retry it anyway (relying on transaction idempotence) but a client might do something else.
2655 throw commit_unknown_result();
2656 } else {
2657 if (e.code() != error_code_transaction_too_old && e.code() != error_code_not_committed && e.code() != error_code_database_locked && e.code() != error_code_proxy_memory_limit_exceeded)
2658 TraceEvent(SevError, "TryCommitError").error(e);
2659 if (trLogInfo)
2660 trLogInfo->addLog(FdbClientLogEvents::EventCommitError(startTime, static_cast<int>(e.code()), req));
2661 throw;
2662 }
2663 }
2664 }
2665
commitMutations()2666 Future<Void> Transaction::commitMutations() {
2667 try {
2668 //if this is a read-only transaction return immediately
2669 if( !tr.transaction.write_conflict_ranges.size() && !tr.transaction.mutations.size() ) {
2670 numErrors = 0;
2671
2672 committedVersion = invalidVersion;
2673 versionstampPromise.sendError(no_commit_version());
2674 return Void();
2675 }
2676
2677 cx->transactionsCommitStarted++;
2678
2679 if(options.readOnly)
2680 return transaction_read_only();
2681
2682 cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
2683 cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
2684
2685 size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize();
2686 if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
2687 TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction")
2688 .suppressFor(1.0)
2689 .detail("Size", transactionSize)
2690 .detail("NumMutations", tr.transaction.mutations.size())
2691 .detail("ReadConflictSize", tr.transaction.read_conflict_ranges.expectedSize())
2692 .detail("WriteConflictSize", tr.transaction.write_conflict_ranges.expectedSize());
2693 }
2694
2695 if(!apiVersionAtLeast(300)) {
2696 transactionSize = tr.transaction.mutations.expectedSize(); // Old API versions didn't account for conflict ranges when determining whether to throw transaction_too_large
2697 }
2698
2699 if (transactionSize > (options.customTransactionSizeLimit == 0 ? (uint64_t)CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT : (uint64_t)options.customTransactionSizeLimit))
2700 return transaction_too_large();
2701
2702 if( !readVersion.isValid() )
2703 getReadVersion( GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY ); // sets up readVersion field. We had no reads, so no need for (expensive) full causal consistency.
2704
2705 bool isCheckingWrites = options.checkWritesEnabled && g_random->random01() < 0.01;
2706 for(int i=0; i<extraConflictRanges.size(); i++)
2707 if (extraConflictRanges[i].isReady() && extraConflictRanges[i].get().first < extraConflictRanges[i].get().second )
2708 tr.transaction.read_conflict_ranges.push_back( tr.arena, KeyRangeRef(extraConflictRanges[i].get().first, extraConflictRanges[i].get().second) );
2709
2710 if( !options.causalWriteRisky && !intersects( tr.transaction.write_conflict_ranges, tr.transaction.read_conflict_ranges ).present() )
2711 makeSelfConflicting();
2712
2713 if (isCheckingWrites) {
2714 // add all writes into the read conflict range...
2715 tr.transaction.read_conflict_ranges.append( tr.arena, tr.transaction.write_conflict_ranges.begin(), tr.transaction.write_conflict_ranges.size() );
2716 }
2717
2718 if ( options.debugDump ) {
2719 UID u = g_nondeterministic_random->randomUniqueID();
2720 TraceEvent("TransactionDump", u);
2721 for(auto i=tr.transaction.mutations.begin(); i!=tr.transaction.mutations.end(); ++i)
2722 TraceEvent("TransactionMutation", u).detail("T", i->type).detail("P1", i->param1).detail("P2", i->param2);
2723 }
2724
2725 if(options.lockAware) {
2726 tr.flags = tr.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
2727 }
2728 if(options.firstInBatch) {
2729 tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
2730 }
2731
2732 Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
2733
2734 if (isCheckingWrites) {
2735 Promise<Void> committed;
2736 checkWrites( cx, commitResult, committed, tr, this );
2737 return committed.getFuture();
2738 }
2739 return commitResult;
2740 } catch( Error& e ) {
2741 TraceEvent("ClientCommitError").error(e);
2742 return Future<Void>( e );
2743 } catch( ... ) {
2744 Error e( error_code_unknown_error );
2745 TraceEvent("ClientCommitError").error(e);
2746 return Future<Void>( e );
2747 }
2748 }
2749
commitAndWatch(Transaction * self)2750 ACTOR Future<Void> commitAndWatch(Transaction *self) {
2751 try {
2752 wait(self->commitMutations());
2753
2754 if(!self->watches.empty()) {
2755 self->setupWatches();
2756 }
2757
2758 self->reset();
2759 return Void();
2760 }
2761 catch(Error &e) {
2762 if(e.code() != error_code_actor_cancelled) {
2763 if(!self->watches.empty()) {
2764 self->cancelWatches(e);
2765 }
2766
2767 self->versionstampPromise.sendError(transaction_invalid_version());
2768 self->reset();
2769 }
2770
2771 throw;
2772 }
2773 }
2774
commit()2775 Future<Void> Transaction::commit() {
2776 ASSERT(!committing.isValid());
2777 committing = commitAndWatch(this);
2778 return committing;
2779 }
2780
setPriority(uint32_t priorityFlag)2781 void Transaction::setPriority( uint32_t priorityFlag ) {
2782 options.getReadVersionFlags = (options.getReadVersionFlags & ~GetReadVersionRequest::FLAG_PRIORITY_MASK) | priorityFlag;
2783 }
2784
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)2785 void Transaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
2786 switch(option) {
2787 case FDBTransactionOptions::INITIALIZE_NEW_DATABASE:
2788 validateOptionValue(value, false);
2789 if(readVersion.isValid())
2790 throw read_version_already_set();
2791 readVersion = Version(0);
2792 options.causalWriteRisky = true;
2793 break;
2794
2795 case FDBTransactionOptions::CAUSAL_READ_RISKY:
2796 validateOptionValue(value, false);
2797 options.getReadVersionFlags |= GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY;
2798 break;
2799
2800 case FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE:
2801 validateOptionValue(value, false);
2802 setPriority(GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE);
2803 break;
2804
2805 case FDBTransactionOptions::PRIORITY_BATCH:
2806 validateOptionValue(value, false);
2807 setPriority(GetReadVersionRequest::PRIORITY_BATCH);
2808 break;
2809
2810 case FDBTransactionOptions::CAUSAL_WRITE_RISKY:
2811 validateOptionValue(value, false);
2812 options.causalWriteRisky = true;
2813 break;
2814
2815 case FDBTransactionOptions::COMMIT_ON_FIRST_PROXY:
2816 validateOptionValue(value, false);
2817 options.commitOnFirstProxy = true;
2818 break;
2819
2820 case FDBTransactionOptions::CHECK_WRITES_ENABLE:
2821 validateOptionValue(value, false);
2822 options.checkWritesEnabled = true;
2823 break;
2824
2825 case FDBTransactionOptions::DEBUG_DUMP:
2826 validateOptionValue(value, false);
2827 options.debugDump = true;
2828 break;
2829
2830 case FDBTransactionOptions::TRANSACTION_LOGGING_ENABLE:
2831 setOption(FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER, value);
2832 setOption(FDBTransactionOptions::LOG_TRANSACTION);
2833 break;
2834
2835 case FDBTransactionOptions::DEBUG_TRANSACTION_IDENTIFIER:
2836 validateOptionValue(value, true);
2837
2838 if (value.get().size() > 100) {
2839 throw invalid_option_value();
2840 }
2841
2842 if (trLogInfo) {
2843 if (trLogInfo->identifier.empty()) {
2844 trLogInfo->identifier = printable(value.get());
2845 }
2846 else if (trLogInfo->identifier != printable(value.get())) {
2847 TraceEvent(SevWarn, "CannotChangeDebugTransactionIdentifier").detail("PreviousIdentifier", trLogInfo->identifier).detail("NewIdentifier", value.get());
2848 throw client_invalid_operation();
2849 }
2850 }
2851 else {
2852 trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(printable(value.get()), TransactionLogInfo::DONT_LOG));
2853 }
2854 break;
2855
2856 case FDBTransactionOptions::LOG_TRANSACTION:
2857 validateOptionValue(value, false);
2858 if (trLogInfo) {
2859 trLogInfo->logTo(TransactionLogInfo::TRACE_LOG);
2860 }
2861 else {
2862 TraceEvent(SevWarn, "DebugTransactionIdentifierNotSet").detail("Error", "Debug Transaction Identifier option must be set before logging the transaction");
2863 throw client_invalid_operation();
2864 }
2865 break;
2866
2867 case FDBTransactionOptions::MAX_RETRY_DELAY:
2868 validateOptionValue(value, true);
2869 options.maxBackoff = extractIntOption(value, 0, std::numeric_limits<int32_t>::max()) / 1000.0;
2870 break;
2871
2872 case FDBTransactionOptions::LOCK_AWARE:
2873 validateOptionValue(value, false);
2874 options.lockAware = true;
2875 options.readOnly = false;
2876 break;
2877
2878 case FDBTransactionOptions::READ_LOCK_AWARE:
2879 validateOptionValue(value, false);
2880 if(!options.lockAware) {
2881 options.lockAware = true;
2882 options.readOnly = true;
2883 }
2884 break;
2885
2886 case FDBTransactionOptions::FIRST_IN_BATCH:
2887 validateOptionValue(value, false);
2888 options.firstInBatch = true;
2889 break;
2890
2891 case FDBTransactionOptions::USE_PROVISIONAL_PROXIES:
2892 validateOptionValue(value, false);
2893 options.getReadVersionFlags |= GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES;
2894 info.useProvisionalProxies = true;
2895 break;
2896
2897 default:
2898 break;
2899 }
2900 }
2901
getConsistentReadVersion(DatabaseContext * cx,uint32_t transactionCount,uint32_t flags,Optional<UID> debugID)2902 ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional<UID> debugID ) {
2903 try {
2904 if( debugID.present() )
2905 g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
2906 loop {
2907 state GetReadVersionRequest req( transactionCount, flags, debugID );
2908 choose {
2909 when ( wait( cx->onMasterProxiesChanged() ) ) {}
2910 when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
2911 if( debugID.present() )
2912 g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After");
2913 ASSERT( v.version > 0 );
2914 return v;
2915 }
2916 }
2917 }
2918 } catch (Error& e) {
2919 if( e.code() != error_code_broken_promise )
2920 TraceEvent(SevError, "GetConsistentReadVersionError").error(e);
2921 throw;
2922 }
2923 }
2924
readVersionBatcher(DatabaseContext * cx,FutureStream<std::pair<Promise<GetReadVersionReply>,Optional<UID>>> versionStream,uint32_t flags)2925 ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > versionStream, uint32_t flags ) {
2926 state std::vector< Promise<GetReadVersionReply> > requests;
2927 state PromiseStream< Future<Void> > addActor;
2928 state Future<Void> collection = actorCollection( addActor.getFuture() );
2929 state Future<Void> timeout;
2930 state Optional<UID> debugID;
2931 state bool send_batch;
2932
2933 // dynamic batching
2934 state PromiseStream<double> replyTimes;
2935 state PromiseStream<Error> _errorStream;
2936 state double batchTime = 0;
2937 state double lastRequestTime = now();
2938
2939 loop {
2940 send_batch = false;
2941 choose {
2942 when(std::pair< Promise<GetReadVersionReply>, Optional<UID> > req = waitNext(versionStream)) {
2943 if (req.second.present()) {
2944 if (!debugID.present())
2945 debugID = g_nondeterministic_random->randomUniqueID();
2946 g_traceBatch.addAttach("TransactionAttachID", req.second.get().first(), debugID.get().first());
2947 }
2948 requests.push_back(req.first);
2949 if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
2950 send_batch = true;
2951 else if (!timeout.isValid())
2952 timeout = delay(batchTime, TaskProxyGetConsistentReadVersion);
2953 }
2954 when(wait(timeout.isValid() ? timeout : Never())) {
2955 send_batch = true;
2956 }
2957 // dynamic batching monitors reply latencies
2958 when(double reply_latency = waitNext(replyTimes.getFuture())){
2959 double target_latency = reply_latency * 0.5;
2960 batchTime = min(0.1 * target_latency + 0.9 * batchTime, CLIENT_KNOBS->GRV_BATCH_TIMEOUT);
2961 }
2962 when(wait(collection)){} // for errors
2963 }
2964 if (send_batch) {
2965 int count = requests.size();
2966 ASSERT(count);
2967
2968 // dynamic batching
2969 Promise<GetReadVersionReply> GRVReply;
2970 requests.push_back(GRVReply);
2971 addActor.send(timeReply(GRVReply.getFuture(), replyTimes));
2972
2973 Future<Void> batch =
2974 incrementalBroadcast(
2975 getConsistentReadVersion(cx, count, flags, std::move(debugID)),
2976 std::vector< Promise<GetReadVersionReply> >(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
2977 debugID = Optional<UID>();
2978 requests = std::vector< Promise<GetReadVersionReply> >();
2979 addActor.send(batch);
2980 timeout = Future<Void>();
2981 }
2982 }
2983 }
2984
extractReadVersion(DatabaseContext * cx,Reference<TransactionLogInfo> trLogInfo,Future<GetReadVersionReply> f,bool lockAware,double startTime,Promise<Optional<Value>> metadataVersion)2985 ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion) {
2986 GetReadVersionReply rep = wait(f);
2987 double latency = now() - startTime;
2988 cx->GRVLatencies.addSample(latency);
2989 if (trLogInfo)
2990 trLogInfo->addLog(FdbClientLogEvents::EventGetVersion(startTime, latency));
2991 if(rep.locked && !lockAware)
2992 throw database_locked();
2993
2994 if(rep.version > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
2995 cx->mvCacheInsertLocation = (cx->mvCacheInsertLocation + 1)%cx->metadataVersionCache.size();
2996 cx->metadataVersionCache[cx->mvCacheInsertLocation] = std::make_pair(rep.version, rep.metadataVersion);
2997 }
2998
2999 metadataVersion.send(rep.metadataVersion);
3000 return rep.version;
3001 }
3002
getReadVersion(uint32_t flags)3003 Future<Version> Transaction::getReadVersion(uint32_t flags) {
3004 cx->transactionReadVersions++;
3005 flags |= options.getReadVersionFlags;
3006
3007 auto& batcher = cx->versionBatcher[ flags ];
3008 if (!batcher.actor.isValid()) {
3009 batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), flags );
3010 }
3011 if (!readVersion.isValid()) {
3012 Promise<GetReadVersionReply> p;
3013 batcher.stream.send( std::make_pair( p, info.debugID ) );
3014 startTime = now();
3015 readVersion = extractReadVersion( cx.getPtr(), trLogInfo, p.getFuture(), options.lockAware, startTime, metadataVersion);
3016 }
3017 return readVersion;
3018 }
3019
getVersionstamp()3020 Future<Standalone<StringRef>> Transaction::getVersionstamp() {
3021 if(committing.isValid()) {
3022 return transaction_invalid_version();
3023 }
3024 return versionstampPromise.getFuture();
3025 }
3026
onError(Error const & e)3027 Future<Void> Transaction::onError( Error const& e ) {
3028 if (e.code() == error_code_success)
3029 {
3030 return client_invalid_operation();
3031 }
3032 if (e.code() == error_code_not_committed ||
3033 e.code() == error_code_commit_unknown_result ||
3034 e.code() == error_code_database_locked ||
3035 e.code() == error_code_proxy_memory_limit_exceeded ||
3036 e.code() == error_code_process_behind)
3037 {
3038 if(e.code() == error_code_not_committed)
3039 cx->transactionsNotCommitted++;
3040 if(e.code() == error_code_commit_unknown_result)
3041 cx->transactionsMaybeCommitted++;
3042 if (e.code() == error_code_proxy_memory_limit_exceeded)
3043 cx->transactionsResourceConstrained++;
3044 if (e.code() == error_code_process_behind)
3045 cx->transactionsProcessBehind++;
3046
3047 double backoff = getBackoff(e.code());
3048 reset();
3049 return delay( backoff, info.taskID );
3050 }
3051 if (e.code() == error_code_transaction_too_old ||
3052 e.code() == error_code_future_version)
3053 {
3054 if( e.code() == error_code_transaction_too_old )
3055 cx->transactionsTooOld++;
3056 else if( e.code() == error_code_future_version )
3057 cx->transactionsFutureVersions++;
3058
3059 double maxBackoff = options.maxBackoff;
3060 reset();
3061 return delay( std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), info.taskID );
3062 }
3063
3064 if(g_network->isSimulated() && ++numErrors % 10 == 0)
3065 TraceEvent(SevWarnAlways, "TransactionTooManyRetries").detail("NumRetries", numErrors);
3066
3067 return e;
3068 }
3069
trackBoundedStorageMetrics(KeyRange keys,Reference<LocationInfo> location,StorageMetrics x,StorageMetrics halfError,PromiseStream<StorageMetrics> deltaStream)3070 ACTOR Future<Void> trackBoundedStorageMetrics(
3071 KeyRange keys,
3072 Reference<LocationInfo> location,
3073 StorageMetrics x,
3074 StorageMetrics halfError,
3075 PromiseStream<StorageMetrics> deltaStream)
3076 {
3077 try {
3078 loop {
3079 WaitMetricsRequest req( keys, x - halfError, x + halfError );
3080 StorageMetrics nextX = wait( loadBalance( location, &StorageServerInterface::waitMetrics, req ) );
3081 deltaStream.send( nextX - x );
3082 x = nextX;
3083 }
3084 } catch (Error& e) {
3085 deltaStream.sendError(e);
3086 throw e;
3087 }
3088 }
3089
waitStorageMetricsMultipleLocations(vector<pair<KeyRange,Reference<LocationInfo>>> locations,StorageMetrics min,StorageMetrics max,StorageMetrics permittedError)3090 ACTOR Future< StorageMetrics > waitStorageMetricsMultipleLocations(
3091 vector< pair<KeyRange,Reference<LocationInfo>> > locations,
3092 StorageMetrics min,
3093 StorageMetrics max,
3094 StorageMetrics permittedError)
3095 {
3096 state int nLocs = locations.size();
3097 state vector<Future<StorageMetrics>> fx( nLocs );
3098 state StorageMetrics total;
3099 state PromiseStream<StorageMetrics> deltas;
3100 state vector<Future<Void>> wx( fx.size() );
3101 state StorageMetrics halfErrorPerMachine = permittedError * (0.5 / nLocs);
3102 state StorageMetrics maxPlus = max + halfErrorPerMachine * (nLocs-1);
3103 state StorageMetrics minMinus = min - halfErrorPerMachine * (nLocs-1);
3104
3105 for(int i=0; i<nLocs; i++) {
3106 WaitMetricsRequest req(locations[i].first, StorageMetrics(), StorageMetrics());
3107 req.min.bytes = 0;
3108 req.max.bytes = -1;
3109 fx[i] = loadBalance( locations[i].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution );
3110 }
3111 wait( waitForAll(fx) );
3112
3113 // invariant: true total is between (total-permittedError/2, total+permittedError/2)
3114 for(int i=0; i<nLocs; i++)
3115 total += fx[i].get();
3116
3117 if (!total.allLessOrEqual( maxPlus )) return total;
3118 if (!minMinus.allLessOrEqual( total )) return total;
3119
3120 for(int i=0; i<nLocs; i++)
3121 wx[i] = trackBoundedStorageMetrics( locations[i].first, locations[i].second, fx[i].get(), halfErrorPerMachine, deltas );
3122
3123 loop {
3124 StorageMetrics delta = waitNext(deltas.getFuture());
3125 total += delta;
3126 if (!total.allLessOrEqual( maxPlus )) return total;
3127 if (!minMinus.allLessOrEqual( total )) return total;
3128 }
3129 }
3130
waitStorageMetrics(Database cx,KeyRange keys,StorageMetrics min,StorageMetrics max,StorageMetrics permittedError,int shardLimit)3131 ACTOR Future< StorageMetrics > waitStorageMetrics(
3132 Database cx,
3133 KeyRange keys,
3134 StorageMetrics min,
3135 StorageMetrics max,
3136 StorageMetrics permittedError,
3137 int shardLimit )
3138 {
3139 loop {
3140 vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskDataDistribution) ) );
3141
3142 //SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
3143 if(locations.size() < shardLimit) {
3144 try {
3145 Future<StorageMetrics> fx;
3146 if (locations.size() > 1) {
3147 fx = waitStorageMetricsMultipleLocations( locations, min, max, permittedError );
3148 } else {
3149 WaitMetricsRequest req( keys, min, max );
3150 fx = loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution );
3151 }
3152 StorageMetrics x = wait(fx);
3153 return x;
3154 } catch (Error& e) {
3155 if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
3156 TraceEvent(SevError, "WaitStorageMetricsError").error(e);
3157 throw;
3158 }
3159 cx->invalidateCache(keys);
3160 wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
3161 }
3162 } else {
3163 TraceEvent(SevWarn, "WaitStorageMetricsPenalty")
3164 .detail("Keys", keys)
3165 .detail("Limit", CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT)
3166 .detail("JitteredSecondsOfPenitence", CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY);
3167 wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
3168 // make sure that the next getKeyRangeLocations() call will actually re-fetch the range
3169 cx->invalidateCache( keys );
3170 }
3171 }
3172 }
3173
waitStorageMetrics(KeyRange const & keys,StorageMetrics const & min,StorageMetrics const & max,StorageMetrics const & permittedError,int shardLimit)3174 Future< StorageMetrics > Transaction::waitStorageMetrics(
3175 KeyRange const& keys,
3176 StorageMetrics const& min,
3177 StorageMetrics const& max,
3178 StorageMetrics const& permittedError,
3179 int shardLimit )
3180 {
3181 return ::waitStorageMetrics( cx, keys, min, max, permittedError, shardLimit );
3182 }
3183
getStorageMetrics(KeyRange const & keys,int shardLimit)3184 Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, int shardLimit ) {
3185 StorageMetrics m;
3186 m.bytes = -1;
3187 return ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit );
3188 }
3189
splitStorageMetrics(Database cx,KeyRange keys,StorageMetrics limit,StorageMetrics estimated)3190 ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated )
3191 {
3192 loop {
3193 state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, false, &StorageServerInterface::splitMetrics, TransactionInfo(TaskDataDistribution) ) );
3194 state StorageMetrics used;
3195 state Standalone<VectorRef<KeyRef>> results;
3196
3197 //SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
3198 if(locations.size() == CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT) {
3199 wait(delay(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
3200 cx->invalidateCache(keys);
3201 }
3202 else {
3203 results.push_back_deep( results.arena(), keys.begin );
3204 try {
3205 //TraceEvent("SplitStorageMetrics").detail("Locations", locations.size());
3206
3207 state int i = 0;
3208 for(; i<locations.size(); i++) {
3209 SplitMetricsRequest req( locations[i].first, limit, used, estimated, i == locations.size() - 1 );
3210 SplitMetricsReply res = wait( loadBalance( locations[i].second, &StorageServerInterface::splitMetrics, req, TaskDataDistribution ) );
3211 if( res.splits.size() && res.splits[0] <= results.back() ) { // split points are out of order, possibly because of moving data, throw error to retry
3212 ASSERT_WE_THINK(false); // FIXME: This seems impossible and doesn't seem to be covered by testing
3213 throw all_alternatives_failed();
3214 }
3215 if( res.splits.size() ) {
3216 results.append( results.arena(), res.splits.begin(), res.splits.size() );
3217 results.arena().dependsOn( res.splits.arena() );
3218 }
3219 used = res.used;
3220
3221 //TraceEvent("SplitStorageMetricsResult").detail("Used", used.bytes).detail("Location", i).detail("Size", res.splits.size());
3222 }
3223
3224 if( used.allLessOrEqual( limit * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT ) ) {
3225 results.resize(results.arena(), results.size() - 1);
3226 }
3227
3228 results.push_back_deep( results.arena(), keys.end );
3229 return results;
3230 } catch (Error& e) {
3231 if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
3232 TraceEvent(SevError, "SplitStorageMetricsError").error(e);
3233 throw;
3234 }
3235 cx->invalidateCache( keys );
3236 wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
3237 }
3238 }
3239 }
3240 }
3241
splitStorageMetrics(KeyRange const & keys,StorageMetrics const & limit,StorageMetrics const & estimated)3242 Future< Standalone<VectorRef<KeyRef>> > Transaction::splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated ) {
3243 return ::splitStorageMetrics( cx, keys, limit, estimated );
3244 }
3245
checkDeferredError()3246 void Transaction::checkDeferredError() { cx->checkDeferredError(); }
3247
createTrLogInfoProbabilistically(const Database & cx)3248 Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database &cx) {
3249 if(!cx->isError()) {
3250 double clientSamplingProbability = std::isinf(cx->clientInfo->get().clientTxnInfoSampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : cx->clientInfo->get().clientTxnInfoSampleRate;
3251 if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) && g_random->random01() < clientSamplingProbability && (!g_network->isSimulated() || !g_simulator.speedUpSimulation)) {
3252 return Reference<TransactionLogInfo>(new TransactionLogInfo(TransactionLogInfo::DATABASE));
3253 }
3254 }
3255
3256 return Reference<TransactionLogInfo>();
3257 }
3258
enableClientInfoLogging()3259 void enableClientInfoLogging() {
3260 ASSERT(networkOptions.logClientInfo.present() == false);
3261 networkOptions.logClientInfo = true;
3262 TraceEvent(SevInfo, "ClientInfoLoggingEnabled");
3263 }
3264