1 /*
2 * MultiVersionTransaction.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbclient/MultiVersionTransaction.h"
22 #include "fdbclient/MultiVersionAssignmentVars.h"
23 #include "fdbclient/ThreadSafeTransaction.h"
24
25 #include "flow/Platform.h"
26 #include "flow/UnitTest.h"
27
28 #include "flow/actorcompiler.h" // This must be the last #include.
29
throwIfError(FdbCApi::fdb_error_t e)30 void throwIfError(FdbCApi::fdb_error_t e) {
31 if(e) {
32 throw Error(e);
33 }
34 }
35
36 // DLTransaction
cancel()37 void DLTransaction::cancel() {
38 api->transactionCancel(tr);
39 }
40
setVersion(Version v)41 void DLTransaction::setVersion(Version v) {
42 api->transactionSetReadVersion(tr, v);
43 }
44
getReadVersion()45 ThreadFuture<Version> DLTransaction::getReadVersion() {
46 FdbCApi::FDBFuture *f = api->transactionGetReadVersion(tr);
47
48 return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
49 int64_t version;
50 FdbCApi::fdb_error_t error = api->futureGetVersion(f, &version);
51 ASSERT(!error);
52 return version;
53 });
54 }
55
get(const KeyRef & key,bool snapshot)56 ThreadFuture<Optional<Value>> DLTransaction::get(const KeyRef& key, bool snapshot) {
57 FdbCApi::FDBFuture *f = api->transactionGet(tr, key.begin(), key.size(), snapshot);
58
59 return toThreadFuture<Optional<Value>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
60 FdbCApi::fdb_bool_t present;
61 const uint8_t *value;
62 int valueLength;
63 FdbCApi::fdb_error_t error = api->futureGetValue(f, &present, &value, &valueLength);
64 ASSERT(!error);
65 if(present) {
66 // The memory for this is stored in the FDBFuture and is released when the future gets destroyed
67 return Optional<Value>(Value(ValueRef(value, valueLength), Arena()));
68 }
69 else {
70 return Optional<Value>();
71 }
72 });
73 }
74
getKey(const KeySelectorRef & key,bool snapshot)75 ThreadFuture<Key> DLTransaction::getKey(const KeySelectorRef& key, bool snapshot) {
76 FdbCApi::FDBFuture *f = api->transactionGetKey(tr, key.getKey().begin(), key.getKey().size(), key.orEqual, key.offset, snapshot);
77
78 return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
79 const uint8_t *key;
80 int keyLength;
81 FdbCApi::fdb_error_t error = api->futureGetKey(f, &key, &keyLength);
82 ASSERT(!error);
83
84 // The memory for this is stored in the FDBFuture and is released when the future gets destroyed
85 return Key(KeyRef(key, keyLength), Arena());
86 });
87 }
88
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,int limit,bool snapshot,bool reverse)89 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse) {
90 return getRange(begin, end, GetRangeLimits(limit), snapshot, reverse);
91 }
92
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,GetRangeLimits limits,bool snapshot,bool reverse)93 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot, bool reverse) {
94 FdbCApi::FDBFuture *f = api->transactionGetRange(tr, begin.getKey().begin(), begin.getKey().size(), begin.orEqual, begin.offset, end.getKey().begin(), end.getKey().size(), end.orEqual, end.offset,
95 limits.rows, limits.bytes, FDBStreamingModes::EXACT, 0, snapshot, reverse);
96 return toThreadFuture<Standalone<RangeResultRef>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
97 const FdbCApi::FDBKeyValue *kvs;
98 int count;
99 FdbCApi::fdb_bool_t more;
100 FdbCApi::fdb_error_t error = api->futureGetKeyValueArray(f, &kvs, &count, &more);
101 ASSERT(!error);
102
103 // The memory for this is stored in the FDBFuture and is released when the future gets destroyed
104 return Standalone<RangeResultRef>(RangeResultRef(VectorRef<KeyValueRef>((KeyValueRef*)kvs, count), more), Arena());
105 });
106 }
107
getRange(const KeyRangeRef & keys,int limit,bool snapshot,bool reverse)108 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeyRangeRef& keys, int limit, bool snapshot, bool reverse) {
109 return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), GetRangeLimits(limit), snapshot, reverse);
110 }
111
getRange(const KeyRangeRef & keys,GetRangeLimits limits,bool snapshot,bool reverse)112 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot, bool reverse) {
113 return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse);
114 }
115
getAddressesForKey(const KeyRef & key)116 ThreadFuture<Standalone<VectorRef<const char*>>> DLTransaction::getAddressesForKey(const KeyRef& key) {
117 FdbCApi::FDBFuture *f = api->transactionGetAddressesForKey(tr, key.begin(), key.size());
118
119 return toThreadFuture<Standalone<VectorRef<const char*>>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
120 const char **addresses;
121 int count;
122 FdbCApi::fdb_error_t error = api->futureGetStringArray(f, &addresses, &count);
123 ASSERT(!error);
124
125 // The memory for this is stored in the FDBFuture and is released when the future gets destroyed
126 return Standalone<VectorRef<const char*>>(VectorRef<const char*>(addresses, count), Arena());
127 });
128 }
129
getVersionstamp()130 ThreadFuture<Standalone<StringRef>> DLTransaction::getVersionstamp() {
131 if(!api->transactionGetVersionstamp) {
132 return unsupported_operation();
133 }
134
135 FdbCApi::FDBFuture *f = api->transactionGetVersionstamp(tr);
136
137 return toThreadFuture<Standalone<StringRef>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
138 const uint8_t *str;
139 int strLength;
140 FdbCApi::fdb_error_t error = api->futureGetKey(f, &str, &strLength);
141 ASSERT(!error);
142
143 // The memory for this is stored in the FDBFuture and is released when the future gets destroyed
144 return Standalone<StringRef>(StringRef(str, strLength), Arena());
145 });
146 }
147
addReadConflictRange(const KeyRangeRef & keys)148 void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
149 throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ));
150 }
151
atomicOp(const KeyRef & key,const ValueRef & value,uint32_t operationType)152 void DLTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
153 api->transactionAtomicOp(tr, key.begin(), key.size(), value.begin(), value.size(), (FDBMutationTypes::Option)operationType);
154 }
155
set(const KeyRef & key,const ValueRef & value)156 void DLTransaction::set(const KeyRef& key, const ValueRef& value) {
157 api->transactionSet(tr, key.begin(), key.size(), value.begin(), value.size());
158 }
159
clear(const KeyRef & begin,const KeyRef & end)160 void DLTransaction::clear(const KeyRef& begin, const KeyRef& end) {
161 api->transactionClearRange(tr, begin.begin(), begin.size(), end.begin(), end.size());
162 }
163
clear(const KeyRangeRef & range)164 void DLTransaction::clear(const KeyRangeRef& range) {
165 api->transactionClearRange(tr, range.begin.begin(), range.begin.size(), range.end.begin(), range.end.size());
166 }
167
clear(const KeyRef & key)168 void DLTransaction::clear(const KeyRef& key) {
169 api->transactionClear(tr, key.begin(), key.size());
170 }
171
watch(const KeyRef & key)172 ThreadFuture<Void> DLTransaction::watch(const KeyRef& key) {
173 FdbCApi::FDBFuture *f = api->transactionWatch(tr, key.begin(), key.size());
174
175 return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
176 return Void();
177 });
178 }
179
addWriteConflictRange(const KeyRangeRef & keys)180 void DLTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
181 throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::WRITE));
182 }
183
commit()184 ThreadFuture<Void> DLTransaction::commit() {
185 FdbCApi::FDBFuture *f = api->transactionCommit(tr);
186
187 return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
188 return Void();
189 });
190 }
191
getCommittedVersion()192 Version DLTransaction::getCommittedVersion() {
193 int64_t version;
194 throwIfError(api->transactionGetCommittedVersion(tr, &version));
195 return version;
196 }
197
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)198 void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
199 throwIfError(api->transactionSetOption(tr, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
200 }
201
onError(Error const & e)202 ThreadFuture<Void> DLTransaction::onError(Error const& e) {
203 FdbCApi::FDBFuture *f = api->transactionOnError(tr, e.code());
204
205 return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
206 return Void();
207 });
208 }
209
reset()210 void DLTransaction::reset() {
211 api->transactionReset(tr);
212 }
213
214 // DLDatabase
DLDatabase(Reference<FdbCApi> api,ThreadFuture<FdbCApi::FDBDatabase * > dbFuture)215 DLDatabase::DLDatabase(Reference<FdbCApi> api, ThreadFuture<FdbCApi::FDBDatabase*> dbFuture) : api(api), db(nullptr) {
216 ready = mapThreadFuture<FdbCApi::FDBDatabase*, Void>(dbFuture, [this](ErrorOr<FdbCApi::FDBDatabase*> db){
217 if(db.isError()) {
218 return ErrorOr<Void>(db.getError());
219 }
220
221 this->db = db.get();
222 return ErrorOr<Void>(Void());
223 });
224 }
225
onReady()226 ThreadFuture<Void> DLDatabase::onReady() {
227 return ready;
228 }
229
createTransaction()230 Reference<ITransaction> DLDatabase::createTransaction() {
231 FdbCApi::FDBTransaction *tr;
232 api->databaseCreateTransaction(db, &tr);
233 return Reference<ITransaction>(new DLTransaction(api, tr));
234 }
235
setOption(FDBDatabaseOptions::Option option,Optional<StringRef> value)236 void DLDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
237 throwIfError(api->databaseSetOption(db, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
238 }
239
240 // DLApi
241 template<class T>
loadClientFunction(T * fp,void * lib,std::string libPath,const char * functionName,bool requireFunction=true)242 void loadClientFunction(T *fp, void *lib, std::string libPath, const char *functionName, bool requireFunction = true) {
243 *(void**)(fp) = loadFunction(lib, functionName);
244 if(*fp == NULL && requireFunction) {
245 TraceEvent(SevError, "ErrorLoadingFunction").detail("LibraryPath", libPath).detail("Function", functionName);
246 throw platform_error();
247 }
248 }
249
DLApi(std::string fdbCPath)250 DLApi::DLApi(std::string fdbCPath) : api(new FdbCApi()), fdbCPath(fdbCPath), networkSetup(false) {}
251
init()252 void DLApi::init() {
253 if(isLibraryLoaded(fdbCPath.c_str())) {
254 throw external_client_already_loaded();
255 }
256
257 void* lib = loadLibrary(fdbCPath.c_str());
258 if(lib == NULL) {
259 TraceEvent(SevError, "ErrorLoadingExternalClientLibrary").detail("LibraryPath", fdbCPath);
260 throw platform_error();
261 }
262
263 loadClientFunction(&api->selectApiVersion, lib, fdbCPath, "fdb_select_api_version_impl");
264 loadClientFunction(&api->getClientVersion, lib, fdbCPath, "fdb_get_client_version", headerVersion >= 410);
265 loadClientFunction(&api->setNetworkOption, lib, fdbCPath, "fdb_network_set_option");
266 loadClientFunction(&api->setupNetwork, lib, fdbCPath, "fdb_setup_network");
267 loadClientFunction(&api->runNetwork, lib, fdbCPath, "fdb_run_network");
268 loadClientFunction(&api->stopNetwork, lib, fdbCPath, "fdb_stop_network");
269 loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610);
270
271 loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction");
272 loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option");
273 loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
274
275 loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option");
276 loadClientFunction(&api->transactionDestroy, lib, fdbCPath, "fdb_transaction_destroy");
277 loadClientFunction(&api->transactionSetReadVersion, lib, fdbCPath, "fdb_transaction_set_read_version");
278 loadClientFunction(&api->transactionGetReadVersion, lib, fdbCPath, "fdb_transaction_get_read_version");
279 loadClientFunction(&api->transactionGet, lib, fdbCPath, "fdb_transaction_get");
280 loadClientFunction(&api->transactionGetKey, lib, fdbCPath, "fdb_transaction_get_key");
281 loadClientFunction(&api->transactionGetAddressesForKey, lib, fdbCPath, "fdb_transaction_get_addresses_for_key");
282 loadClientFunction(&api->transactionGetRange, lib, fdbCPath, "fdb_transaction_get_range");
283 loadClientFunction(&api->transactionGetVersionstamp, lib, fdbCPath, "fdb_transaction_get_versionstamp", headerVersion >= 410);
284 loadClientFunction(&api->transactionSet, lib, fdbCPath, "fdb_transaction_set");
285 loadClientFunction(&api->transactionClear, lib, fdbCPath, "fdb_transaction_clear");
286 loadClientFunction(&api->transactionClearRange, lib, fdbCPath, "fdb_transaction_clear_range");
287 loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op");
288 loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit");
289 loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version");
290 loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch");
291 loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error");
292 loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
293 loadClientFunction(&api->transactionCancel, lib, fdbCPath, "fdb_transaction_cancel");
294 loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range");
295
296 loadClientFunction(&api->futureGetDatabase, lib, fdbCPath, "fdb_future_get_database");
297 loadClientFunction(&api->futureGetVersion, lib, fdbCPath, "fdb_future_get_version");
298 loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
299 loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
300 loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
301 loadClientFunction(&api->futureGetStringArray, lib, fdbCPath, "fdb_future_get_string_array");
302 loadClientFunction(&api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array");
303 loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback");
304 loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel");
305 loadClientFunction(&api->futureDestroy, lib, fdbCPath, "fdb_future_destroy");
306
307 loadClientFunction(&api->createCluster, lib, fdbCPath, "fdb_create_cluster", headerVersion < 610);
308 loadClientFunction(&api->clusterCreateDatabase, lib, fdbCPath, "fdb_cluster_create_database", headerVersion < 610);
309 loadClientFunction(&api->clusterDestroy, lib, fdbCPath, "fdb_cluster_destroy", headerVersion < 610);
310 loadClientFunction(&api->futureGetCluster, lib, fdbCPath, "fdb_future_get_cluster", headerVersion < 610);
311 }
312
selectApiVersion(int apiVersion)313 void DLApi::selectApiVersion(int apiVersion) {
314 // External clients must support at least this version
315 // Versions newer than what we understand are rejected in the C bindings
316 headerVersion = std::max(apiVersion, 400);
317
318 init();
319 throwIfError(api->selectApiVersion(apiVersion, headerVersion));
320 throwIfError(api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT, NULL, 0));
321 }
322
getClientVersion()323 const char* DLApi::getClientVersion() {
324 if(!api->getClientVersion) {
325 return "unknown";
326 }
327
328 return api->getClientVersion();
329 }
330
setNetworkOption(FDBNetworkOptions::Option option,Optional<StringRef> value)331 void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
332 throwIfError(api->setNetworkOption(option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
333 }
334
setupNetwork()335 void DLApi::setupNetwork() {
336 networkSetup = true;
337 throwIfError(api->setupNetwork());
338 }
339
runNetwork()340 void DLApi::runNetwork() {
341 auto e = api->runNetwork();
342
343 for(auto &hook : threadCompletionHooks) {
344 try {
345 hook.first(hook.second);
346 }
347 catch(Error &e) {
348 TraceEvent(SevError, "NetworkShutdownHookError").error(e);
349 }
350 catch(...) {
351 TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
352 }
353 }
354
355 throwIfError(e);
356 }
357
stopNetwork()358 void DLApi::stopNetwork() {
359 if(networkSetup) {
360 throwIfError(api->stopNetwork());
361 }
362 }
363
createDatabase609(const char * clusterFilePath)364 Reference<IDatabase> DLApi::createDatabase609(const char *clusterFilePath) {
365 FdbCApi::FDBFuture *f = api->createCluster(clusterFilePath);
366
367 auto clusterFuture = toThreadFuture<FdbCApi::FDBCluster*>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
368 FdbCApi::FDBCluster *cluster;
369 api->futureGetCluster(f, &cluster);
370 return cluster;
371 });
372
373 Reference<FdbCApi> innerApi = api;
374 auto dbFuture = flatMapThreadFuture<FdbCApi::FDBCluster*, FdbCApi::FDBDatabase*>(clusterFuture, [innerApi](ErrorOr<FdbCApi::FDBCluster*> cluster) {
375 if(cluster.isError()) {
376 return ErrorOr<ThreadFuture<FdbCApi::FDBDatabase*>>(cluster.getError());
377 }
378
379 auto innerDbFuture = toThreadFuture<FdbCApi::FDBDatabase*>(innerApi, innerApi->clusterCreateDatabase(cluster.get(), (uint8_t*)"DB", 2), [](FdbCApi::FDBFuture *f, FdbCApi *api) {
380 FdbCApi::FDBDatabase *db;
381 api->futureGetDatabase(f, &db);
382 return db;
383 });
384
385 return ErrorOr<ThreadFuture<FdbCApi::FDBDatabase*>>(mapThreadFuture<FdbCApi::FDBDatabase*, FdbCApi::FDBDatabase*>(innerDbFuture, [cluster, innerApi](ErrorOr<FdbCApi::FDBDatabase*> db) {
386 innerApi->clusterDestroy(cluster.get());
387 return db;
388 }));
389 });
390
391 return Reference<DLDatabase>(new DLDatabase(api, dbFuture));
392 }
393
createDatabase(const char * clusterFilePath)394 Reference<IDatabase> DLApi::createDatabase(const char *clusterFilePath) {
395 if(headerVersion >= 610) {
396 FdbCApi::FDBDatabase *db;
397 api->createDatabase(clusterFilePath, &db);
398 return Reference<IDatabase>(new DLDatabase(api, db));
399 }
400 else {
401 return DLApi::createDatabase609(clusterFilePath);
402 }
403 }
404
addNetworkThreadCompletionHook(void (* hook)(void *),void * hookParameter)405 void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) {
406 MutexHolder holder(lock);
407 threadCompletionHooks.push_back(std::make_pair(hook, hookParameter));
408 }
409
410 // MultiVersionTransaction
MultiVersionTransaction(Reference<MultiVersionDatabase> db)411 MultiVersionTransaction::MultiVersionTransaction(Reference<MultiVersionDatabase> db) : db(db) {
412 updateTransaction();
413 }
414
415 // SOMEDAY: This function is unsafe if it's possible to set Database options that affect subsequently created transactions. There are currently no such options.
updateTransaction()416 void MultiVersionTransaction::updateTransaction() {
417 auto currentDb = db->dbState->dbVar->get();
418
419 TransactionInfo newTr;
420 if(currentDb.value) {
421 newTr.transaction = currentDb.value->createTransaction();
422 }
423
424 newTr.onChange = currentDb.onChange;
425
426 lock.enter();
427 transaction = newTr;
428 lock.leave();
429 }
430
getTransaction()431 MultiVersionTransaction::TransactionInfo MultiVersionTransaction::getTransaction() {
432 lock.enter();
433 MultiVersionTransaction::TransactionInfo currentTr(transaction);
434 lock.leave();
435
436 return currentTr;
437 }
438
cancel()439 void MultiVersionTransaction::cancel() {
440 auto tr = getTransaction();
441 if(tr.transaction) {
442 tr.transaction->cancel();
443 }
444 }
445
setVersion(Version v)446 void MultiVersionTransaction::setVersion(Version v) {
447 auto tr = getTransaction();
448 if(tr.transaction) {
449 tr.transaction->setVersion(v);
450 }
451 }
getReadVersion()452 ThreadFuture<Version> MultiVersionTransaction::getReadVersion() {
453 auto tr = getTransaction();
454 auto f = tr.transaction ? tr.transaction->getReadVersion() : ThreadFuture<Version>(Never());
455 return abortableFuture(f, tr.onChange);
456 }
457
get(const KeyRef & key,bool snapshot)458 ThreadFuture<Optional<Value>> MultiVersionTransaction::get(const KeyRef& key, bool snapshot) {
459 auto tr = getTransaction();
460 auto f = tr.transaction ? tr.transaction->get(key, snapshot) : ThreadFuture<Optional<Value>>(Never());
461 return abortableFuture(f, tr.onChange);
462 }
463
getKey(const KeySelectorRef & key,bool snapshot)464 ThreadFuture<Key> MultiVersionTransaction::getKey(const KeySelectorRef& key, bool snapshot) {
465 auto tr = getTransaction();
466 auto f = tr.transaction ? tr.transaction->getKey(key, snapshot) : ThreadFuture<Key>(Never());
467 return abortableFuture(f, tr.onChange);
468 }
469
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,int limit,bool snapshot,bool reverse)470 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse) {
471 auto tr = getTransaction();
472 auto f = tr.transaction ? tr.transaction->getRange(begin, end, limit, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
473 return abortableFuture(f, tr.onChange);
474 }
475
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,GetRangeLimits limits,bool snapshot,bool reverse)476 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot, bool reverse) {
477 auto tr = getTransaction();
478 auto f = tr.transaction ? tr.transaction->getRange(begin, end, limits, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
479 return abortableFuture(f, tr.onChange);
480 }
481
getRange(const KeyRangeRef & keys,int limit,bool snapshot,bool reverse)482 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeyRangeRef& keys, int limit, bool snapshot, bool reverse) {
483 auto tr = getTransaction();
484 auto f = tr.transaction ? tr.transaction->getRange(keys, limit, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
485 return abortableFuture(f, tr.onChange);
486 }
487
getRange(const KeyRangeRef & keys,GetRangeLimits limits,bool snapshot,bool reverse)488 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot, bool reverse) {
489 auto tr = getTransaction();
490 auto f = tr.transaction ? tr.transaction->getRange(keys, limits, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
491 return abortableFuture(f, tr.onChange);
492 }
493
getVersionstamp()494 ThreadFuture<Standalone<StringRef>> MultiVersionTransaction::getVersionstamp() {
495 auto tr = getTransaction();
496 auto f = tr.transaction ? tr.transaction->getVersionstamp() : ThreadFuture<Standalone<StringRef>>(Never());
497 return abortableFuture(f, tr.onChange);
498 }
499
getAddressesForKey(const KeyRef & key)500 ThreadFuture<Standalone<VectorRef<const char*>>> MultiVersionTransaction::getAddressesForKey(const KeyRef& key) {
501 auto tr = getTransaction();
502 auto f = tr.transaction ? tr.transaction->getAddressesForKey(key) : ThreadFuture<Standalone<VectorRef<const char*>>>(Never());
503 return abortableFuture(f, tr.onChange);
504 }
505
addReadConflictRange(const KeyRangeRef & keys)506 void MultiVersionTransaction::addReadConflictRange(const KeyRangeRef& keys) {
507 auto tr = getTransaction();
508 if(tr.transaction) {
509 tr.transaction->addReadConflictRange(keys);
510 }
511 }
512
atomicOp(const KeyRef & key,const ValueRef & value,uint32_t operationType)513 void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
514 auto tr = getTransaction();
515 if(tr.transaction) {
516 tr.transaction->atomicOp(key, value, operationType);
517 }
518 }
519
set(const KeyRef & key,const ValueRef & value)520 void MultiVersionTransaction::set(const KeyRef& key, const ValueRef& value) {
521 auto tr = getTransaction();
522 if(tr.transaction) {
523 tr.transaction->set(key, value);
524 }
525 }
526
clear(const KeyRef & begin,const KeyRef & end)527 void MultiVersionTransaction::clear(const KeyRef& begin, const KeyRef& end) {
528 auto tr = getTransaction();
529 if(tr.transaction) {
530 tr.transaction->clear(begin, end);
531 }
532 }
533
clear(const KeyRangeRef & range)534 void MultiVersionTransaction::clear(const KeyRangeRef& range) {
535 auto tr = getTransaction();
536 if(tr.transaction) {
537 tr.transaction->clear(range);
538 }
539 }
540
clear(const KeyRef & key)541 void MultiVersionTransaction::clear(const KeyRef& key) {
542 auto tr = getTransaction();
543 if(tr.transaction) {
544 tr.transaction->clear(key);
545 }
546 }
547
watch(const KeyRef & key)548 ThreadFuture<Void> MultiVersionTransaction::watch(const KeyRef& key) {
549 auto tr = getTransaction();
550 auto f = tr.transaction ? tr.transaction->watch(key) : ThreadFuture<Void>(Never());
551 return abortableFuture(f, tr.onChange);
552 }
553
addWriteConflictRange(const KeyRangeRef & keys)554 void MultiVersionTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
555 auto tr = getTransaction();
556 if(tr.transaction) {
557 tr.transaction->addWriteConflictRange(keys);
558 }
559 }
560
commit()561 ThreadFuture<Void> MultiVersionTransaction::commit() {
562 auto tr = getTransaction();
563 auto f = tr.transaction ? tr.transaction->commit() : ThreadFuture<Void>(Never());
564 return abortableFuture(f, tr.onChange);
565 }
566
getCommittedVersion()567 Version MultiVersionTransaction::getCommittedVersion() {
568 auto tr = getTransaction();
569 if(tr.transaction) {
570 return tr.transaction->getCommittedVersion();
571 }
572
573 return invalidVersion;
574 }
575
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)576 void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
577 auto tr = getTransaction();
578 if(tr.transaction) {
579 tr.transaction->setOption(option, value);
580 }
581 }
582
onError(Error const & e)583 ThreadFuture<Void> MultiVersionTransaction::onError(Error const& e) {
584 if(e.code() == error_code_cluster_version_changed) {
585 updateTransaction();
586 return ThreadFuture<Void>(Void());
587 }
588 else {
589 auto tr = getTransaction();
590 auto f = tr.transaction ? tr.transaction->onError(e) : ThreadFuture<Void>(Never());
591 return abortableFuture(f, tr.onChange);
592 }
593 }
594
reset()595 void MultiVersionTransaction::reset() {
596 updateTransaction();
597 }
598
599 // MultiVersionDatabase
MultiVersionDatabase(MultiVersionApi * api,std::string clusterFilePath,Reference<IDatabase> db,bool openConnectors)600 MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors) : dbState(new DatabaseState()) {
601 dbState->db = db;
602 dbState->dbVar->set(db);
603
604 if(!openConnectors) {
605 dbState->currentClientIndex = 0;
606 }
607 else {
608 if(!api->localClientDisabled) {
609 dbState->currentClientIndex = 0;
610 dbState->addConnection(api->getLocalClient(), clusterFilePath);
611 }
612 else {
613 dbState->currentClientIndex = -1;
614 }
615
616 api->runOnExternalClients([this, clusterFilePath](Reference<ClientInfo> client) {
617 dbState->addConnection(client, clusterFilePath);
618 });
619
620 dbState->startConnections();
621 }
622 }
623
~MultiVersionDatabase()624 MultiVersionDatabase::~MultiVersionDatabase() {
625 dbState->cancelConnections();
626 }
627
debugCreateFromExistingDatabase(Reference<IDatabase> db)628 Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
629 return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, "", db, false));
630 }
631
createTransaction()632 Reference<ITransaction> MultiVersionDatabase::createTransaction() {
633 return Reference<ITransaction>(new MultiVersionTransaction(Reference<MultiVersionDatabase>::addRef(this)));
634 }
635
setOption(FDBDatabaseOptions::Option option,Optional<StringRef> value)636 void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
637 MutexHolder holder(dbState->optionLock);
638
639
640 auto itr = FDBDatabaseOptions::optionInfo.find(option);
641 if(itr != FDBDatabaseOptions::optionInfo.end()) {
642 TraceEvent("SetDatabaseOption").detail("Option", itr->second.name);
643 }
644 else {
645 TraceEvent("UnknownDatabaseOption").detail("Option", option);
646 throw invalid_option();
647 }
648
649 if(dbState->db) {
650 dbState->db->setOption(option, value);
651 }
652
653 dbState->options.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
654 }
655
connect()656 void MultiVersionDatabase::Connector::connect() {
657 addref();
658 onMainThreadVoid([this]() {
659 if(!cancelled) {
660 connected = false;
661 if(connectionFuture.isValid()) {
662 connectionFuture.cancel();
663 }
664
665 candidateDatabase = client->api->createDatabase(clusterFilePath.c_str());
666 if(client->external) {
667 connectionFuture = candidateDatabase.castTo<DLDatabase>()->onReady();
668 }
669 else {
670 connectionFuture = ThreadFuture<Void>(Void());
671 }
672
673 connectionFuture = flatMapThreadFuture<Void, Void>(connectionFuture, [this](ErrorOr<Void> ready) {
674 if(ready.isError()) {
675 return ErrorOr<ThreadFuture<Void>>(ready.getError());
676 }
677
678 tr = candidateDatabase->createTransaction();
679 return ErrorOr<ThreadFuture<Void>>(mapThreadFuture<Version, Void>(tr->getReadVersion(), [this](ErrorOr<Version> v) {
680 // If the version attempt returns an error, we regard that as a connection (except operation_cancelled)
681 if(v.isError() && v.getError().code() == error_code_operation_cancelled) {
682 return ErrorOr<Void>(v.getError());
683 }
684 else {
685 return ErrorOr<Void>(Void());
686 }
687 }));
688 });
689
690
691 int userParam;
692 connectionFuture.callOrSetAsCallback(this, userParam, 0);
693 }
694 else {
695 delref();
696 }
697 }, NULL);
698 }
699
700 // Only called from main thread
cancel()701 void MultiVersionDatabase::Connector::cancel() {
702 connected = false;
703 cancelled = true;
704 if(connectionFuture.isValid()) {
705 connectionFuture.cancel();
706 }
707 }
708
fire(const Void & unused,int & userParam)709 void MultiVersionDatabase::Connector::fire(const Void &unused, int& userParam) {
710 onMainThreadVoid([this]() {
711 if(!cancelled) {
712 connected = true;
713 dbState->stateChanged();
714 }
715 delref();
716 }, NULL);
717 }
718
error(const Error & e,int & userParam)719 void MultiVersionDatabase::Connector::error(const Error& e, int& userParam) {
720 if(e.code() != error_code_operation_cancelled) {
721 // TODO: is it right to abandon this connection attempt?
722 client->failed = true;
723 MultiVersionApi::api->updateSupportedVersions();
724 TraceEvent(SevError, "DatabaseConnectionError").error(e).detail("ClientLibrary", this->client->libPath);
725 }
726
727 delref();
728 }
729
DatabaseState()730 MultiVersionDatabase::DatabaseState::DatabaseState()
731 : dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(NULL))), currentClientIndex(-1) {}
732
733 // Only called from main thread
stateChanged()734 void MultiVersionDatabase::DatabaseState::stateChanged() {
735 int newIndex = -1;
736 for(int i = 0; i < clients.size(); ++i) {
737 if(i != currentClientIndex && connectionAttempts[i]->connected) {
738 if(currentClientIndex >= 0 && !clients[i]->canReplace(clients[currentClientIndex])) {
739 TraceEvent(SevWarn, "DuplicateClientVersion").detail("Keeping", clients[currentClientIndex]->libPath).detail("KeptClientProtocolVersion", clients[currentClientIndex]->protocolVersion).detail("Disabling", clients[i]->libPath).detail("DisabledClientProtocolVersion", clients[i]->protocolVersion);
740 connectionAttempts[i]->connected = false; // Permanently disable this client in favor of the current one
741 clients[i]->failed = true;
742 MultiVersionApi::api->updateSupportedVersions();
743 return;
744 }
745
746 newIndex = i;
747 break;
748 }
749 }
750
751 if(newIndex == -1) {
752 ASSERT(currentClientIndex == 0); // This can only happen for the local client, which we set as the current connection before we know it's connected
753 return;
754 }
755
756 // Restart connection for replaced client
757 auto newDb = connectionAttempts[newIndex]->candidateDatabase;
758
759 optionLock.enter();
760 for(auto option : options) {
761 try {
762 newDb->setOption(option.first, option.second.castTo<StringRef>()); // In practice, this will set a deferred error instead of throwing. If that happens, the database will be unusable (attempts to use it will throw errors).
763 }
764 catch(Error &e) {
765 optionLock.leave();
766 TraceEvent(SevError, "ClusterVersionChangeOptionError").error(e).detail("Option", option.first).detail("OptionValue", option.second).detail("LibPath", clients[newIndex]->libPath);
767 connectionAttempts[newIndex]->connected = false;
768 clients[newIndex]->failed = true;
769 MultiVersionApi::api->updateSupportedVersions();
770 return; // If we can't set all of the options on a cluster, we abandon the client
771 }
772 }
773
774 db = newDb;
775 optionLock.leave();
776
777 dbVar->set(db);
778
779 if(currentClientIndex >= 0 && connectionAttempts[currentClientIndex]->connected) {
780 connectionAttempts[currentClientIndex]->connected = false;
781 connectionAttempts[currentClientIndex]->connect();
782 }
783
784 ASSERT(newIndex >= 0 && newIndex < clients.size());
785 currentClientIndex = newIndex;
786 }
787
addConnection(Reference<ClientInfo> client,std::string clusterFilePath)788 void MultiVersionDatabase::DatabaseState::addConnection(Reference<ClientInfo> client, std::string clusterFilePath) {
789 clients.push_back(client);
790 connectionAttempts.push_back(Reference<Connector>(new Connector(Reference<DatabaseState>::addRef(this), client, clusterFilePath)));
791 }
792
startConnections()793 void MultiVersionDatabase::DatabaseState::startConnections() {
794 for(auto c : connectionAttempts) {
795 c->connect();
796 }
797 }
798
cancelConnections()799 void MultiVersionDatabase::DatabaseState::cancelConnections() {
800 addref();
801 onMainThreadVoid([this](){
802 for(auto c : connectionAttempts) {
803 c->cancel();
804 }
805
806 connectionAttempts.clear();
807 clients.clear();
808 delref();
809 }, NULL);
810 }
811
812 // MultiVersionApi
813
814 // runOnFailedClients should be used cautiously. Some failed clients may not have successfully loaded all symbols.
runOnExternalClients(std::function<void (Reference<ClientInfo>)> func,bool runOnFailedClients)815 void MultiVersionApi::runOnExternalClients(std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
816 bool newFailure = false;
817
818 auto c = externalClients.begin();
819 while(c != externalClients.end()) {
820 try {
821 if(!c->second->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
822 func(c->second);
823 }
824 }
825 catch(Error &e) {
826 if(e.code() == error_code_external_client_already_loaded) {
827 TraceEvent(SevInfo, "ExternalClientAlreadyLoaded").error(e).detail("LibPath", c->second->libPath);
828 c = externalClients.erase(c);
829 continue;
830 }
831 else {
832 TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", c->second->libPath);
833 c->second->failed = true;
834 newFailure = true;
835 }
836 }
837
838 ++c;
839 }
840
841 if(newFailure) {
842 updateSupportedVersions();
843 }
844 }
845
getLocalClient()846 Reference<ClientInfo> MultiVersionApi::getLocalClient() {
847 return localClient;
848 }
849
selectApiVersion(int apiVersion)850 void MultiVersionApi::selectApiVersion(int apiVersion) {
851 if(!localClient) {
852 localClient = Reference<ClientInfo>(new ClientInfo(ThreadSafeApi::api));
853 }
854
855 if(this->apiVersion != 0 && this->apiVersion != apiVersion) {
856 throw api_version_already_set();
857 }
858
859 localClient->api->selectApiVersion(apiVersion);
860 this->apiVersion = apiVersion;
861 }
862
getClientVersion()863 const char* MultiVersionApi::getClientVersion() {
864 return localClient->api->getClientVersion();
865 }
866
validateOption(Optional<StringRef> value,bool canBePresent,bool canBeAbsent,bool canBeEmpty=true)867 void validateOption(Optional<StringRef> value, bool canBePresent, bool canBeAbsent, bool canBeEmpty=true) {
868 ASSERT(canBePresent || canBeAbsent);
869
870 if(!canBePresent && value.present() && (!canBeEmpty || value.get().size() > 0)) {
871 throw invalid_option_value();
872 }
873 if(!canBeAbsent && (!value.present() || (!canBeEmpty && value.get().size() == 0))) {
874 throw invalid_option_value();
875 }
876 }
877
disableMultiVersionClientApi()878 void MultiVersionApi::disableMultiVersionClientApi() {
879 MutexHolder holder(lock);
880 if(networkStartSetup || localClientDisabled) {
881 throw invalid_option();
882 }
883
884 bypassMultiClientApi = true;
885 }
886
setCallbacksOnExternalThreads()887 void MultiVersionApi::setCallbacksOnExternalThreads() {
888 MutexHolder holder(lock);
889 if(networkStartSetup) {
890 throw invalid_option();
891 }
892
893 callbackOnMainThread = false;
894 }
895
addExternalLibrary(std::string path)896 void MultiVersionApi::addExternalLibrary(std::string path) {
897 std::string filename = basename(path);
898
899 if(filename.empty() || !fileExists(path)) {
900 TraceEvent("ExternalClientNotFound").detail("LibraryPath", filename);
901 throw file_not_found();
902 }
903
904 MutexHolder holder(lock);
905 if(networkStartSetup) {
906 throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup
907 }
908
909 if(externalClients.count(filename) == 0) {
910 TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
911 externalClients[filename] = Reference<ClientInfo>(new ClientInfo(new DLApi(path), path));
912 }
913 }
914
addExternalLibraryDirectory(std::string path)915 void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
916 TraceEvent("AddingExternalClientDirectory").detail("Directory", path);
917 std::vector<std::string> files = platform::listFiles(path, DYNAMIC_LIB_EXT);
918
919 MutexHolder holder(lock);
920 if(networkStartSetup) {
921 throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup. For directories, we can monitor them for the addition of new files.
922 }
923
924 for(auto filename : files) {
925 std::string lib = abspath(joinPath(path, filename));
926 if(externalClients.count(filename) == 0) {
927 TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
928 externalClients[filename] = Reference<ClientInfo>(new ClientInfo(new DLApi(lib), lib));
929 }
930 }
931 }
932
disableLocalClient()933 void MultiVersionApi::disableLocalClient() {
934 MutexHolder holder(lock);
935 if(networkStartSetup || bypassMultiClientApi) {
936 throw invalid_option();
937 }
938
939 localClientDisabled = true;
940 }
941
setSupportedClientVersions(Standalone<StringRef> versions)942 void MultiVersionApi::setSupportedClientVersions(Standalone<StringRef> versions) {
943 MutexHolder holder(lock);
944 ASSERT(networkSetup);
945
946 // This option must be set on the main thread because it modifes structures that can be used concurrently by the main thread
947 onMainThreadVoid([this, versions](){
948 localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
949 }, NULL);
950
951 if(!bypassMultiClientApi) {
952 runOnExternalClients([this, versions](Reference<ClientInfo> client){
953 client->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
954 });
955 }
956 }
957
setNetworkOption(FDBNetworkOptions::Option option,Optional<StringRef> value)958 void MultiVersionApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
959 if(option != FDBNetworkOptions::EXTERNAL_CLIENT && !externalClient) { // This is the first option set for external clients
960 loadEnvironmentVariableNetworkOptions();
961 }
962
963 setNetworkOptionInternal(option, value);
964 }
965
setNetworkOptionInternal(FDBNetworkOptions::Option option,Optional<StringRef> value)966 void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option, Optional<StringRef> value) {
967 auto itr = FDBNetworkOptions::optionInfo.find(option);
968 if(itr != FDBNetworkOptions::optionInfo.end()) {
969 TraceEvent("SetNetworkOption").detail("Option", itr->second.name);
970 }
971 else {
972 TraceEvent("UnknownNetworkOption").detail("Option", option);
973 throw invalid_option();
974 }
975
976 if(option == FDBNetworkOptions::DISABLE_MULTI_VERSION_CLIENT_API) {
977 validateOption(value, false, true);
978 disableMultiVersionClientApi();
979 }
980 else if(option == FDBNetworkOptions::CALLBACKS_ON_EXTERNAL_THREADS) {
981 validateOption(value, false, true);
982 setCallbacksOnExternalThreads();
983 }
984 else if(option == FDBNetworkOptions::EXTERNAL_CLIENT_LIBRARY) {
985 validateOption(value, true, false, false);
986 addExternalLibrary(abspath(value.get().toString()));
987 }
988 else if(option == FDBNetworkOptions::EXTERNAL_CLIENT_DIRECTORY) {
989 validateOption(value, true, false, false);
990 addExternalLibraryDirectory(value.get().toString());
991 }
992 else if(option == FDBNetworkOptions::DISABLE_LOCAL_CLIENT) {
993 validateOption(value, false, true);
994 disableLocalClient();
995 }
996 else if(option == FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS) {
997 ASSERT(value.present());
998 setSupportedClientVersions(value.get());
999 }
1000 else if(option == FDBNetworkOptions::EXTERNAL_CLIENT) {
1001 MutexHolder holder(lock);
1002 ASSERT(!value.present() && !networkStartSetup);
1003 externalClient = true;
1004 bypassMultiClientApi = true;
1005 }
1006 else {
1007 MutexHolder holder(lock);
1008 localClient->api->setNetworkOption(option, value);
1009
1010 if(!bypassMultiClientApi) {
1011 if(networkSetup) {
1012 runOnExternalClients([this, option, value](Reference<ClientInfo> client) {
1013 client->api->setNetworkOption(option, value);
1014 });
1015 }
1016 else {
1017 options.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
1018 }
1019 }
1020 }
1021 }
1022
setupNetwork()1023 void MultiVersionApi::setupNetwork() {
1024 if(!externalClient) {
1025 loadEnvironmentVariableNetworkOptions();
1026 }
1027
1028 uint64_t transportId = 0;
1029 { // lock scope
1030 MutexHolder holder(lock);
1031 if(networkStartSetup) {
1032 throw network_already_setup();
1033 }
1034
1035 networkStartSetup = true;
1036
1037 if(externalClients.empty()) {
1038 bypassMultiClientApi = true; // SOMEDAY: we won't be able to set this option once it becomes possible to add clients after setupNetwork is called
1039 }
1040
1041 if(!bypassMultiClientApi) {
1042 transportId = (uint64_t(uint32_t(platform::getRandomSeed())) << 32) ^ uint32_t(platform::getRandomSeed());
1043 if(transportId <= 1) transportId += 2;
1044 localClient->api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID, std::to_string(transportId));
1045 }
1046 localClient->api->setupNetwork();
1047 }
1048
1049 localClient->loadProtocolVersion();
1050
1051 if(!bypassMultiClientApi) {
1052 runOnExternalClients([this](Reference<ClientInfo> client) {
1053 TraceEvent("InitializingExternalClient").detail("LibraryPath", client->libPath);
1054 client->api->selectApiVersion(apiVersion);
1055 client->loadProtocolVersion();
1056 });
1057
1058 MutexHolder holder(lock);
1059 runOnExternalClients([this, transportId](Reference<ClientInfo> client) {
1060 for(auto option : options) {
1061 client->api->setNetworkOption(option.first, option.second.castTo<StringRef>());
1062 }
1063 client->api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID, std::to_string(transportId));
1064
1065 client->api->setupNetwork();
1066 });
1067
1068 networkSetup = true; // Needs to be guarded by mutex
1069 }
1070 else {
1071 networkSetup = true;
1072 }
1073
1074 options.clear();
1075 updateSupportedVersions();
1076 }
1077
runNetworkThread(void * param)1078 THREAD_FUNC_RETURN runNetworkThread(void *param) {
1079 try {
1080 ((ClientInfo*)param)->api->runNetwork();
1081 }
1082 catch(Error &e) {
1083 TraceEvent(SevError, "RunNetworkError").error(e);
1084 }
1085
1086 THREAD_RETURN;
1087 }
1088
runNetwork()1089 void MultiVersionApi::runNetwork() {
1090 lock.enter();
1091 if(!networkSetup) {
1092 lock.leave();
1093 throw network_not_setup();
1094 }
1095
1096 lock.leave();
1097
1098 std::vector<THREAD_HANDLE> handles;
1099 if(!bypassMultiClientApi) {
1100 runOnExternalClients([&handles](Reference<ClientInfo> client) {
1101 if(client->external) {
1102 handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr()));
1103 }
1104 });
1105 }
1106
1107 localClient->api->runNetwork();
1108
1109 for(auto h : handles) {
1110 waitThread(h);
1111 }
1112 }
1113
stopNetwork()1114 void MultiVersionApi::stopNetwork() {
1115 lock.enter();
1116 if(!networkSetup) {
1117 lock.leave();
1118 throw network_not_setup();
1119 }
1120 lock.leave();
1121
1122 localClient->api->stopNetwork();
1123
1124 if(!bypassMultiClientApi) {
1125 runOnExternalClients([](Reference<ClientInfo> client) {
1126 client->api->stopNetwork();
1127 }, true);
1128 }
1129 }
1130
addNetworkThreadCompletionHook(void (* hook)(void *),void * hookParameter)1131 void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) {
1132 lock.enter();
1133 if(!networkSetup) {
1134 lock.leave();
1135 throw network_not_setup();
1136 }
1137 lock.leave();
1138
1139 localClient->api->addNetworkThreadCompletionHook(hook, hookParameter);
1140
1141 if(!bypassMultiClientApi) {
1142 runOnExternalClients([hook, hookParameter](Reference<ClientInfo> client) {
1143 client->api->addNetworkThreadCompletionHook(hook, hookParameter);
1144 });
1145 }
1146 }
1147
createDatabase(const char * clusterFilePath)1148 Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath) {
1149 lock.enter();
1150 if(!networkSetup) {
1151 lock.leave();
1152 throw network_not_setup();
1153 }
1154 lock.leave();
1155
1156 std::string clusterFile(clusterFilePath);
1157 if(localClientDisabled) {
1158 return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, Reference<IDatabase>()));
1159 }
1160
1161 auto db = localClient->api->createDatabase(clusterFilePath);
1162 if(bypassMultiClientApi) {
1163 return db;
1164 }
1165 else {
1166 for(auto it : externalClients) {
1167 TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.second->libPath).detail("Failed", it.second->failed);
1168 }
1169 return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, db));
1170 }
1171 }
1172
updateSupportedVersions()1173 void MultiVersionApi::updateSupportedVersions() {
1174 if(networkSetup) {
1175 Standalone<VectorRef<uint8_t>> versionStr;
1176
1177 runOnExternalClients([&versionStr](Reference<ClientInfo> client){
1178 const char *ver = client->api->getClientVersion();
1179 versionStr.append(versionStr.arena(), (uint8_t*)ver, (int)strlen(ver));
1180 versionStr.append(versionStr.arena(), (uint8_t*)";", 1);
1181 });
1182
1183 if(!localClient->failed) {
1184 const char *local = localClient->api->getClientVersion();
1185 versionStr.append(versionStr.arena(), (uint8_t*)local, (int)strlen(local));
1186 }
1187 else {
1188 versionStr.resize(versionStr.arena(), std::max(0, versionStr.size()-1));
1189 }
1190
1191 setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, StringRef(versionStr.begin(), versionStr.size()));
1192 }
1193 }
1194
parseOptionValues(std::string valueStr)1195 std::vector<std::string> parseOptionValues(std::string valueStr) {
1196 std::string specialCharacters = "\\";
1197 specialCharacters += ENV_VAR_PATH_SEPARATOR;
1198
1199 std::vector<std::string> values;
1200
1201 size_t index = 0;
1202 size_t nextIndex = 0;
1203 std::stringstream ss;
1204 while(true) {
1205 nextIndex = valueStr.find_first_of(specialCharacters, index);
1206 char c = nextIndex == valueStr.npos ? ENV_VAR_PATH_SEPARATOR : valueStr[nextIndex];
1207
1208 if(c == '\\') {
1209 if(valueStr.size() == nextIndex + 1 || specialCharacters.find(valueStr[nextIndex+1]) == valueStr.npos) {
1210 throw invalid_option_value();
1211 }
1212
1213 ss << valueStr.substr(index, nextIndex-index);
1214 ss << valueStr[nextIndex+1];
1215
1216 index = nextIndex + 2;
1217 }
1218 else if(c == ENV_VAR_PATH_SEPARATOR) {
1219 ss << valueStr.substr(index, nextIndex-index);
1220 values.push_back(ss.str());
1221 ss.str(std::string());
1222
1223 if(nextIndex == valueStr.npos) {
1224 break;
1225 }
1226 index = nextIndex + 1;
1227 }
1228 else {
1229 ASSERT(false);
1230 }
1231 }
1232
1233 return values;
1234 }
1235
1236 // This function sets all environment variable options which have not been set previously by a call to this function.
1237 // If an option has multiple values and setting one of those values failed with an error, then only those options
1238 // which were not successfully set will be set on subsequent calls.
loadEnvironmentVariableNetworkOptions()1239 void MultiVersionApi::loadEnvironmentVariableNetworkOptions() {
1240 if(envOptionsLoaded) {
1241 return;
1242 }
1243
1244 for(auto option : FDBNetworkOptions::optionInfo) {
1245 if(!option.second.hidden) {
1246 std::string valueStr;
1247 try {
1248 if(platform::getEnvironmentVar(("FDB_NETWORK_OPTION_" + option.second.name).c_str(), valueStr)) {
1249 size_t index = 0;
1250 for(auto value : parseOptionValues(valueStr)) {
1251 Standalone<StringRef> currentValue = StringRef(value);
1252 { // lock scope
1253 MutexHolder holder(lock);
1254 if(setEnvOptions[option.first].count(currentValue) == 0) {
1255 setNetworkOptionInternal(option.first, currentValue);
1256 setEnvOptions[option.first].insert(currentValue);
1257 }
1258 }
1259 }
1260 }
1261 }
1262 catch(Error &e) {
1263 TraceEvent(SevError, "EnvironmentVariableNetworkOptionFailed").error(e).detail("Option", option.second.name).detail("Value", valueStr);
1264 throw environment_variable_network_option_failed();
1265 }
1266 }
1267 }
1268
1269 MutexHolder holder(lock);
1270 envOptionsLoaded = true;
1271 }
1272
MultiVersionApi()1273 MultiVersionApi::MultiVersionApi() : bypassMultiClientApi(false), networkStartSetup(false), networkSetup(false), callbackOnMainThread(true), externalClient(false), localClientDisabled(false), apiVersion(0), envOptionsLoaded(false) {}
1274
1275 MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();
1276
1277 // ClientInfo
loadProtocolVersion()1278 void ClientInfo::loadProtocolVersion() {
1279 std::string version = api->getClientVersion();
1280 if(version == "unknown") {
1281 protocolVersion = 0;
1282 return;
1283 }
1284
1285 char *next;
1286 std::string protocolVersionStr = ClientVersionRef(version).protocolVersion.toString();
1287 protocolVersion = strtoull(protocolVersionStr.c_str(), &next, 16);
1288
1289 ASSERT(protocolVersion != 0 && protocolVersion != ULLONG_MAX);
1290 ASSERT(next == &protocolVersionStr[protocolVersionStr.length()]);
1291 }
1292
canReplace(Reference<ClientInfo> other) const1293 bool ClientInfo::canReplace(Reference<ClientInfo> other) const {
1294 if(protocolVersion > other->protocolVersion) {
1295 return true;
1296 }
1297
1298 if(protocolVersion == other->protocolVersion && !external) {
1299 return true;
1300 }
1301
1302 return (protocolVersion & compatibleProtocolVersionMask) != (other->protocolVersion & compatibleProtocolVersionMask);
1303 }
1304
1305 // UNIT TESTS
1306 extern bool noUnseed;
1307
1308 TEST_CASE("/fdbclient/multiversionclient/EnvironmentVariableParsing" ) {
1309 auto vals = parseOptionValues("a");
1310 ASSERT(vals.size() == 1 && vals[0] == "a");
1311
1312 vals = parseOptionValues("abcde");
1313 ASSERT(vals.size() == 1 && vals[0] == "abcde");
1314
1315 vals = parseOptionValues("");
1316 ASSERT(vals.size() == 1 && vals[0] == "");
1317
1318 vals = parseOptionValues("a:b:c:d:e");
1319 ASSERT(vals.size() == 5 && vals[0] == "a" && vals[1] == "b" && vals[2] == "c" && vals[3] == "d" && vals[4] == "e");
1320
1321 vals = parseOptionValues("\\\\a\\::\\:b:\\\\");
1322 ASSERT(vals.size() == 3 && vals[0] == "\\a:" && vals[1] == ":b" && vals[2] == "\\");
1323
1324 vals = parseOptionValues("abcd:");
1325 ASSERT(vals.size() == 2 && vals[0] == "abcd" && vals[1] == "");
1326
1327 vals = parseOptionValues(":abcd");
1328 ASSERT(vals.size() == 2 && vals[0] == "" && vals[1] == "abcd");
1329
1330 vals = parseOptionValues(":");
1331 ASSERT(vals.size() == 2 && vals[0] == "" && vals[1] == "");
1332
1333 try {
1334 vals = parseOptionValues("\\x");
1335 ASSERT(false);
1336 }
1337 catch(Error &e) {
1338 ASSERT(e.code() == error_code_invalid_option_value);
1339 }
1340
1341 return Void();
1342 }
1343
1344 class ValidateFuture : public ThreadCallback {
1345 public:
ValidateFuture(ThreadFuture<int> f,ErrorOr<int> expectedValue,std::set<int> legalErrors)1346 ValidateFuture(ThreadFuture<int> f, ErrorOr<int> expectedValue, std::set<int> legalErrors) : f(f), expectedValue(expectedValue), legalErrors(legalErrors) { }
1347
canFire(int notMadeActive)1348 virtual bool canFire(int notMadeActive) { return true; }
1349
fire(const Void & unused,int & userParam)1350 virtual void fire(const Void &unused, int& userParam) {
1351 ASSERT(!f.isError() && !expectedValue.isError() && f.get() == expectedValue.get());
1352 delete this;
1353 }
1354
error(const Error & e,int & userParam)1355 virtual void error(const Error& e, int& userParam) {
1356 ASSERT(legalErrors.count(e.code()) > 0 || (f.isError() && expectedValue.isError() && f.getError().code() == expectedValue.getError().code()));
1357 delete this;
1358 }
1359
1360 private:
1361 ThreadFuture<int> f;
1362 ErrorOr<int> expectedValue;
1363 std::set<int> legalErrors;
1364 };
1365
1366 struct FutureInfo {
FutureInfoFutureInfo1367 FutureInfo() {
1368 if(g_random->coinflip()) {
1369 expectedValue = Error(g_random->randomInt(1, 100));
1370 }
1371 else {
1372 expectedValue = g_random->randomInt(0, 100);
1373 }
1374 }
1375
FutureInfoFutureInfo1376 FutureInfo(ThreadFuture<int> future, ErrorOr<int> expectedValue, std::set<int> legalErrors = std::set<int>()) : future(future), expectedValue(expectedValue), legalErrors(legalErrors) {}
1377
validateFutureInfo1378 void validate() {
1379 int userParam;
1380 future.callOrSetAsCallback(new ValidateFuture(future, expectedValue, legalErrors), userParam, 0);
1381 }
1382
1383 ThreadFuture<int> future;
1384 ErrorOr<int> expectedValue;
1385 std::set<int> legalErrors;
1386 std::vector<THREAD_HANDLE> threads;
1387 };
1388
createVarOnMainThread(bool canBeNever=true)1389 FutureInfo createVarOnMainThread(bool canBeNever=true) {
1390 FutureInfo f;
1391
1392 if(g_random->coinflip()) {
1393 f.future = onMainThread([f, canBeNever]() {
1394 Future<Void> sleep ;
1395 if(canBeNever && g_random->coinflip()) {
1396 sleep = Never();
1397 }
1398 else {
1399 sleep = delay(0.1 * g_random->random01());
1400 }
1401
1402 if(f.expectedValue.isError()) {
1403 return tagError<int>(sleep, f.expectedValue.getError());
1404 }
1405 else {
1406 return tag(sleep, f.expectedValue.get());
1407 }
1408 });
1409 }
1410 else if(f.expectedValue.isError()) {
1411 f.future = f.expectedValue.getError();
1412 }
1413 else {
1414 f.future = f.expectedValue.get();
1415 }
1416
1417 return f;
1418 }
1419
setAbort(void * arg)1420 THREAD_FUNC setAbort(void *arg) {
1421 threadSleep(0.1 * g_random->random01());
1422 try {
1423 ((ThreadSingleAssignmentVar<Void>*)arg)->send(Void());
1424 ((ThreadSingleAssignmentVar<Void>*)arg)->delref();
1425 }
1426 catch(Error &e) {
1427 printf("Caught error in setAbort: %s\n", e.name());
1428 ASSERT(false);
1429 }
1430 THREAD_RETURN;
1431 }
1432
releaseMem(void * arg)1433 THREAD_FUNC releaseMem(void *arg) {
1434 threadSleep(0.1 * g_random->random01());
1435 try {
1436 // Must get for releaseMemory to work
1437 ((ThreadSingleAssignmentVar<int>*)arg)->get();
1438 }
1439 catch(Error&) {
1440 // Swallow
1441 }
1442 try {
1443 ((ThreadSingleAssignmentVar<int>*)arg)->releaseMemory();
1444 }
1445 catch(Error &e) {
1446 printf("Caught error in releaseMem: %s\n", e.name());
1447 ASSERT(false);
1448 }
1449 THREAD_RETURN;
1450 }
1451
destroy(void * arg)1452 THREAD_FUNC destroy(void *arg) {
1453 threadSleep(0.1 * g_random->random01());
1454 try {
1455 ((ThreadSingleAssignmentVar<int>*)arg)->cancel();
1456 }
1457 catch(Error &e) {
1458 printf("Caught error in destroy: %s\n", e.name());
1459 ASSERT(false);
1460 }
1461 THREAD_RETURN;
1462 }
1463
cancel(void * arg)1464 THREAD_FUNC cancel(void *arg) {
1465 threadSleep(0.1 * g_random->random01());
1466 try {
1467 ((ThreadSingleAssignmentVar<int>*)arg)->addref();
1468 destroy(arg);
1469 }
1470 catch(Error &e) {
1471 printf("Caught error in cancel: %s\n", e.name());
1472 ASSERT(false);
1473 }
1474 THREAD_RETURN;
1475 }
1476
checkUndestroyedFutures(std::vector<ThreadSingleAssignmentVar<int> * > undestroyed)1477 ACTOR Future<Void> checkUndestroyedFutures(std::vector<ThreadSingleAssignmentVar<int>*> undestroyed) {
1478 state int fNum;
1479 state ThreadSingleAssignmentVar<int>* f;
1480 state double start = now();
1481
1482 for(fNum = 0; fNum < undestroyed.size(); ++fNum) {
1483 f = undestroyed[fNum];
1484
1485 while(!f->isReady() && start+5 >= now()) {
1486 wait(delay(1.0));
1487 }
1488
1489 ASSERT(f->isReady());
1490 }
1491
1492 wait(delay(1.0));
1493
1494 for(fNum = 0; fNum < undestroyed.size(); ++fNum) {
1495 f = undestroyed[fNum];
1496
1497 ASSERT(f->debugGetReferenceCount() == 1);
1498 ASSERT(f->isReady());
1499
1500 f->cancel();
1501 }
1502
1503 return Void();
1504 }
1505
1506 template<class T>
runSingleAssignmentVarTest(void * arg)1507 THREAD_FUNC runSingleAssignmentVarTest(void *arg) {
1508 noUnseed = true;
1509
1510 volatile bool *done = (volatile bool*)arg;
1511 try {
1512 for(int i = 0; i < 25; ++i) {
1513 FutureInfo f = createVarOnMainThread(false);
1514 FutureInfo tf = T::createThreadFuture(f);
1515 tf.validate();
1516
1517 tf.future.extractPtr(); // leaks
1518 }
1519
1520 for(int numRuns = 0; numRuns < 25; ++numRuns) {
1521 std::vector<ThreadSingleAssignmentVar<int>*> undestroyed;
1522 std::vector<THREAD_HANDLE> threads;
1523 for(int i = 0; i < 10; ++i) {
1524 FutureInfo f = createVarOnMainThread();
1525 f.legalErrors.insert(error_code_operation_cancelled);
1526
1527 FutureInfo tf = T::createThreadFuture(f);
1528 for(auto t : tf.threads) {
1529 threads.push_back(t);
1530 }
1531
1532 tf.legalErrors.insert(error_code_operation_cancelled);
1533 tf.validate();
1534
1535 auto tfp = tf.future.extractPtr();
1536
1537 if(g_random->coinflip()) {
1538 if(g_random->coinflip()) {
1539 threads.push_back(g_network->startThread(releaseMem, tfp));
1540 }
1541 threads.push_back(g_network->startThread(cancel, tfp));
1542 undestroyed.push_back((ThreadSingleAssignmentVar<int>*)tfp);
1543 }
1544 else {
1545 threads.push_back(g_network->startThread(destroy, tfp));
1546 }
1547 }
1548
1549 for(auto t : threads) {
1550 waitThread(t);
1551 }
1552
1553 ThreadFuture<Void> checkUndestroyed = onMainThread([undestroyed]() {
1554 return checkUndestroyedFutures(undestroyed);
1555 });
1556
1557 checkUndestroyed.blockUntilReady();
1558 }
1559
1560 onMainThreadVoid([done](){
1561 *done = true;
1562 }, NULL);
1563 }
1564 catch(Error &e) {
1565 printf("Caught error in test: %s\n", e.name());
1566 *done = true;
1567 ASSERT(false);
1568 }
1569
1570 THREAD_RETURN;
1571 }
1572
1573 struct AbortableTest {
createThreadFutureAbortableTest1574 static FutureInfo createThreadFuture(FutureInfo f) {
1575 ThreadSingleAssignmentVar<Void> *abort = new ThreadSingleAssignmentVar<Void>();
1576 abort->addref(); // this leaks if abort is never set
1577
1578 auto newFuture = FutureInfo(abortableFuture(f.future, ThreadFuture<Void>(abort)), f.expectedValue, f.legalErrors);
1579
1580 if(!abort->isReady() && g_random->coinflip()) {
1581 ASSERT(abort->status == ThreadSingleAssignmentVarBase::Unset);
1582 newFuture.threads.push_back(g_network->startThread(setAbort, abort));
1583 }
1584
1585 newFuture.legalErrors.insert(error_code_cluster_version_changed);
1586 return newFuture;
1587 }
1588 };
1589
1590 TEST_CASE("/fdbclient/multiversionclient/AbortableSingleAssignmentVar" ) {
1591 state volatile bool done = false;
1592 g_network->startThread(runSingleAssignmentVarTest<AbortableTest>, (void*)&done);
1593
1594 while(!done) {
1595 wait(delay(1.0));
1596 }
1597
1598 return Void();
1599 }
1600
1601 class CAPICallback : public ThreadCallback {
1602 public:
CAPICallback(void (* callbackf)(FdbCApi::FDBFuture *,void *),FdbCApi::FDBFuture * f,void * userdata)1603 CAPICallback(void (*callbackf)(FdbCApi::FDBFuture*, void*), FdbCApi::FDBFuture* f, void* userdata)
1604 : callbackf(callbackf), f(f), userdata(userdata) {}
1605
canFire(int notMadeActive)1606 virtual bool canFire(int notMadeActive) { return true; }
fire(const Void & unused,int & userParam)1607 virtual void fire(const Void& unused, int& userParam) {
1608 (*callbackf)(f, userdata);
1609 delete this;
1610 }
error(const Error & e,int & userParam)1611 virtual void error(const Error& e, int& userParam) {
1612 (*callbackf)(f, userdata);
1613 delete this;
1614 }
1615
1616 private:
1617 void (*callbackf)(FdbCApi::FDBFuture*, void*);
1618 FdbCApi::FDBFuture* f;
1619 void* userdata;
1620 };
1621
1622 struct DLTest {
createThreadFutureDLTest1623 static FutureInfo createThreadFuture(FutureInfo f) {
1624 return FutureInfo(toThreadFuture<int>(getApi(), (FdbCApi::FDBFuture*)f.future.extractPtr(), [](FdbCApi::FDBFuture *f, FdbCApi *api) {
1625 ASSERT(((ThreadSingleAssignmentVar<int>*)f)->debugGetReferenceCount() >= 1);
1626 return ((ThreadSingleAssignmentVar<int>*)f)->get();
1627 }), f.expectedValue, f.legalErrors);
1628 }
1629
getApiDLTest1630 static Reference<FdbCApi> getApi() {
1631 static Reference<FdbCApi> api;
1632 if(!api) {
1633 api = Reference<FdbCApi>(new FdbCApi());
1634
1635 // Functions needed for DLSingleAssignmentVar
1636 api->futureSetCallback = [](FdbCApi::FDBFuture *f, FdbCApi::FDBCallback callback, void *callbackParameter) {
1637 try {
1638 CAPICallback* cb = new CAPICallback(callback, f, callbackParameter);
1639 int ignore;
1640 ((ThreadSingleAssignmentVarBase*)f)->callOrSetAsCallback(cb, ignore, 0);
1641 return FdbCApi::fdb_error_t(error_code_success);
1642 }
1643 catch(Error &e) {
1644 return FdbCApi::fdb_error_t(e.code());
1645 }
1646 };
1647 api->futureCancel = [](FdbCApi::FDBFuture *f) {
1648 ((ThreadSingleAssignmentVarBase*)f)->addref();
1649 ((ThreadSingleAssignmentVarBase*)f)->cancel();
1650 };
1651 api->futureGetError = [](FdbCApi::FDBFuture *f) { return FdbCApi::fdb_error_t(((ThreadSingleAssignmentVarBase*)f)->getErrorCode()); };
1652 api->futureDestroy = [](FdbCApi::FDBFuture *f) { ((ThreadSingleAssignmentVarBase*)f)->cancel(); };
1653 }
1654
1655 return api;
1656 }
1657 };
1658
1659 TEST_CASE("/fdbclient/multiversionclient/DLSingleAssignmentVar" ) {
1660 state volatile bool done = false;
1661
1662 MultiVersionApi::api->callbackOnMainThread = true;
1663 g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
1664
1665 while(!done) {
1666 wait(delay(1.0));
1667 }
1668
1669 done = false;
1670 MultiVersionApi::api->callbackOnMainThread = false;
1671 g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
1672
1673 while(!done) {
1674 wait(delay(1.0));
1675 }
1676
1677 return Void();
1678 }
1679
1680 struct MapTest {
createThreadFutureMapTest1681 static FutureInfo createThreadFuture(FutureInfo f) {
1682 FutureInfo newFuture;
1683 newFuture.legalErrors = f.legalErrors;
1684 newFuture.future = mapThreadFuture<int, int>(f.future, [f, newFuture](ErrorOr<int> v) {
1685 if(v.isError()) {
1686 ASSERT(f.legalErrors.count(v.getError().code()) > 0 || (f.expectedValue.isError() && f.expectedValue.getError().code() == v.getError().code()));
1687 }
1688 else {
1689 ASSERT(!f.expectedValue.isError() && f.expectedValue.get() == v.get());
1690 }
1691
1692 return newFuture.expectedValue;
1693 });
1694
1695 return newFuture;
1696 }
1697 };
1698
1699 TEST_CASE("/fdbclient/multiversionclient/MapSingleAssignmentVar" ) {
1700 state volatile bool done = false;
1701 g_network->startThread(runSingleAssignmentVarTest<MapTest>, (void*)&done);
1702
1703 while(!done) {
1704 wait(delay(1.0));
1705 }
1706
1707 return Void();
1708 }
1709
1710 struct FlatMapTest {
createThreadFutureFlatMapTest1711 static FutureInfo createThreadFuture(FutureInfo f) {
1712 FutureInfo mapFuture = createVarOnMainThread();
1713
1714 return FutureInfo(flatMapThreadFuture<int, int>(f.future, [f, mapFuture](ErrorOr<int> v) {
1715 if(v.isError()) {
1716 ASSERT(f.legalErrors.count(v.getError().code()) > 0 || (f.expectedValue.isError() && f.expectedValue.getError().code() == v.getError().code()));
1717 }
1718 else {
1719 ASSERT(!f.expectedValue.isError() && f.expectedValue.get() == v.get());
1720 }
1721
1722 if(mapFuture.expectedValue.isError() && g_random->coinflip()) {
1723 return ErrorOr<ThreadFuture<int>>(mapFuture.expectedValue.getError());
1724 }
1725 else {
1726 return ErrorOr<ThreadFuture<int>>(mapFuture.future);
1727 }
1728 }), mapFuture.expectedValue, f.legalErrors);
1729 }
1730 };
1731
1732 TEST_CASE("/fdbclient/multiversionclient/FlatMapSingleAssignmentVar" ) {
1733 state volatile bool done = false;
1734 g_network->startThread(runSingleAssignmentVarTest<FlatMapTest>, (void*)&done);
1735
1736 while(!done) {
1737 wait(delay(1.0));
1738 }
1739
1740 return Void();
1741 }
1742