1 /*
2  * MultiVersionTransaction.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbclient/MultiVersionTransaction.h"
22 #include "fdbclient/MultiVersionAssignmentVars.h"
23 #include "fdbclient/ThreadSafeTransaction.h"
24 
25 #include "flow/Platform.h"
26 #include "flow/UnitTest.h"
27 
28 #include "flow/actorcompiler.h"  // This must be the last #include.
29 
throwIfError(FdbCApi::fdb_error_t e)30 void throwIfError(FdbCApi::fdb_error_t e) {
31 	if(e) {
32 		throw Error(e);
33 	}
34 }
35 
36 // DLTransaction
cancel()37 void DLTransaction::cancel() {
38 	api->transactionCancel(tr);
39 }
40 
setVersion(Version v)41 void DLTransaction::setVersion(Version v) {
42 	api->transactionSetReadVersion(tr, v);
43 }
44 
getReadVersion()45 ThreadFuture<Version> DLTransaction::getReadVersion() {
46 	FdbCApi::FDBFuture *f = api->transactionGetReadVersion(tr);
47 
48 	return toThreadFuture<Version>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
49 		int64_t version;
50 		FdbCApi::fdb_error_t error = api->futureGetVersion(f, &version);
51 		ASSERT(!error);
52 		return version;
53 	});
54 }
55 
get(const KeyRef & key,bool snapshot)56 ThreadFuture<Optional<Value>> DLTransaction::get(const KeyRef& key, bool snapshot) {
57 	FdbCApi::FDBFuture *f = api->transactionGet(tr, key.begin(), key.size(), snapshot);
58 
59 	return toThreadFuture<Optional<Value>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
60 		FdbCApi::fdb_bool_t present;
61 		const uint8_t *value;
62 		int valueLength;
63 		FdbCApi::fdb_error_t error = api->futureGetValue(f, &present, &value, &valueLength);
64 		ASSERT(!error);
65 		if(present) {
66 			// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
67 			return Optional<Value>(Value(ValueRef(value, valueLength), Arena()));
68 		}
69 		else {
70 			return Optional<Value>();
71 		}
72 	});
73 }
74 
getKey(const KeySelectorRef & key,bool snapshot)75 ThreadFuture<Key> DLTransaction::getKey(const KeySelectorRef& key, bool snapshot) {
76 	FdbCApi::FDBFuture *f = api->transactionGetKey(tr, key.getKey().begin(), key.getKey().size(), key.orEqual, key.offset, snapshot);
77 
78 	return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
79 		const uint8_t *key;
80 		int keyLength;
81 		FdbCApi::fdb_error_t error = api->futureGetKey(f, &key, &keyLength);
82 		ASSERT(!error);
83 
84 		// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
85 		return Key(KeyRef(key, keyLength), Arena());
86 	});
87 }
88 
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,int limit,bool snapshot,bool reverse)89 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse) {
90 	return getRange(begin, end, GetRangeLimits(limit), snapshot, reverse);
91 }
92 
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,GetRangeLimits limits,bool snapshot,bool reverse)93 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot, bool reverse) {
94 	FdbCApi::FDBFuture *f = api->transactionGetRange(tr, begin.getKey().begin(), begin.getKey().size(), begin.orEqual, begin.offset, end.getKey().begin(), end.getKey().size(), end.orEqual, end.offset,
95 														limits.rows, limits.bytes, FDBStreamingModes::EXACT, 0, snapshot, reverse);
96 	return toThreadFuture<Standalone<RangeResultRef>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
97 		const FdbCApi::FDBKeyValue *kvs;
98 		int count;
99 		FdbCApi::fdb_bool_t more;
100 		FdbCApi::fdb_error_t error = api->futureGetKeyValueArray(f, &kvs, &count, &more);
101 		ASSERT(!error);
102 
103 		// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
104 		return Standalone<RangeResultRef>(RangeResultRef(VectorRef<KeyValueRef>((KeyValueRef*)kvs, count), more), Arena());
105 	});
106 }
107 
getRange(const KeyRangeRef & keys,int limit,bool snapshot,bool reverse)108 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeyRangeRef& keys, int limit, bool snapshot, bool reverse) {
109 	return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), GetRangeLimits(limit), snapshot, reverse);
110 }
111 
getRange(const KeyRangeRef & keys,GetRangeLimits limits,bool snapshot,bool reverse)112 ThreadFuture<Standalone<RangeResultRef>> DLTransaction::getRange(const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot, bool reverse) {
113 	return getRange(firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse);
114 }
115 
getAddressesForKey(const KeyRef & key)116 ThreadFuture<Standalone<VectorRef<const char*>>> DLTransaction::getAddressesForKey(const KeyRef& key) {
117 	FdbCApi::FDBFuture *f = api->transactionGetAddressesForKey(tr, key.begin(), key.size());
118 
119 	return toThreadFuture<Standalone<VectorRef<const char*>>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
120 		const char **addresses;
121 		int count;
122 		FdbCApi::fdb_error_t error = api->futureGetStringArray(f, &addresses, &count);
123 		ASSERT(!error);
124 
125 		// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
126 		return Standalone<VectorRef<const char*>>(VectorRef<const char*>(addresses, count), Arena());
127 	});
128 }
129 
getVersionstamp()130 ThreadFuture<Standalone<StringRef>> DLTransaction::getVersionstamp() {
131 	if(!api->transactionGetVersionstamp) {
132 		return unsupported_operation();
133 	}
134 
135 	FdbCApi::FDBFuture *f = api->transactionGetVersionstamp(tr);
136 
137 	return toThreadFuture<Standalone<StringRef>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
138 		const uint8_t *str;
139 		int strLength;
140 		FdbCApi::fdb_error_t error = api->futureGetKey(f, &str, &strLength);
141 		ASSERT(!error);
142 
143 		// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
144 		return Standalone<StringRef>(StringRef(str, strLength), Arena());
145 	});
146 }
147 
addReadConflictRange(const KeyRangeRef & keys)148 void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) {
149 	throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ));
150 }
151 
atomicOp(const KeyRef & key,const ValueRef & value,uint32_t operationType)152 void DLTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
153 	api->transactionAtomicOp(tr, key.begin(), key.size(), value.begin(), value.size(), (FDBMutationTypes::Option)operationType);
154 }
155 
set(const KeyRef & key,const ValueRef & value)156 void DLTransaction::set(const KeyRef& key, const ValueRef& value) {
157 	api->transactionSet(tr, key.begin(), key.size(), value.begin(), value.size());
158 }
159 
clear(const KeyRef & begin,const KeyRef & end)160 void DLTransaction::clear(const KeyRef& begin, const KeyRef& end) {
161 	api->transactionClearRange(tr, begin.begin(), begin.size(), end.begin(), end.size());
162 }
163 
clear(const KeyRangeRef & range)164 void DLTransaction::clear(const KeyRangeRef& range) {
165 	api->transactionClearRange(tr, range.begin.begin(), range.begin.size(), range.end.begin(), range.end.size());
166 }
167 
clear(const KeyRef & key)168 void DLTransaction::clear(const KeyRef& key) {
169 	api->transactionClear(tr, key.begin(), key.size());
170 }
171 
watch(const KeyRef & key)172 ThreadFuture<Void> DLTransaction::watch(const KeyRef& key) {
173 	FdbCApi::FDBFuture *f = api->transactionWatch(tr, key.begin(), key.size());
174 
175 	return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
176 		return Void();
177 	});
178 }
179 
addWriteConflictRange(const KeyRangeRef & keys)180 void DLTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
181 	throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::WRITE));
182 }
183 
commit()184 ThreadFuture<Void> DLTransaction::commit() {
185 	FdbCApi::FDBFuture *f = api->transactionCommit(tr);
186 
187 	return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
188 		return Void();
189 	});
190 }
191 
getCommittedVersion()192 Version DLTransaction::getCommittedVersion() {
193 	int64_t version;
194 	throwIfError(api->transactionGetCommittedVersion(tr, &version));
195 	return version;
196 }
197 
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)198 void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
199 	throwIfError(api->transactionSetOption(tr, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
200 }
201 
onError(Error const & e)202 ThreadFuture<Void> DLTransaction::onError(Error const& e) {
203 	FdbCApi::FDBFuture *f = api->transactionOnError(tr, e.code());
204 
205 	return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
206 		return Void();
207 	});
208 }
209 
reset()210 void DLTransaction::reset() {
211 	api->transactionReset(tr);
212 }
213 
214 // DLDatabase
DLDatabase(Reference<FdbCApi> api,ThreadFuture<FdbCApi::FDBDatabase * > dbFuture)215 DLDatabase::DLDatabase(Reference<FdbCApi> api, ThreadFuture<FdbCApi::FDBDatabase*> dbFuture) : api(api), db(nullptr) {
216 	ready = mapThreadFuture<FdbCApi::FDBDatabase*, Void>(dbFuture, [this](ErrorOr<FdbCApi::FDBDatabase*> db){
217 		if(db.isError()) {
218 			return ErrorOr<Void>(db.getError());
219 		}
220 
221 		this->db = db.get();
222 		return ErrorOr<Void>(Void());
223 	});
224 }
225 
onReady()226 ThreadFuture<Void> DLDatabase::onReady() {
227 	return ready;
228 }
229 
createTransaction()230 Reference<ITransaction> DLDatabase::createTransaction() {
231 	FdbCApi::FDBTransaction *tr;
232 	api->databaseCreateTransaction(db, &tr);
233 	return Reference<ITransaction>(new DLTransaction(api, tr));
234 }
235 
setOption(FDBDatabaseOptions::Option option,Optional<StringRef> value)236 void DLDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
237 	throwIfError(api->databaseSetOption(db, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
238 }
239 
240 // DLApi
241 template<class T>
loadClientFunction(T * fp,void * lib,std::string libPath,const char * functionName,bool requireFunction=true)242 void loadClientFunction(T *fp, void *lib, std::string libPath, const char *functionName, bool requireFunction = true) {
243 	*(void**)(fp) = loadFunction(lib, functionName);
244 	if(*fp == NULL && requireFunction) {
245 		TraceEvent(SevError, "ErrorLoadingFunction").detail("LibraryPath", libPath).detail("Function", functionName);
246 		throw platform_error();
247 	}
248 }
249 
DLApi(std::string fdbCPath)250 DLApi::DLApi(std::string fdbCPath) : api(new FdbCApi()), fdbCPath(fdbCPath), networkSetup(false) {}
251 
init()252 void DLApi::init() {
253 	if(isLibraryLoaded(fdbCPath.c_str())) {
254 		throw external_client_already_loaded();
255 	}
256 
257 	void* lib = loadLibrary(fdbCPath.c_str());
258 	if(lib == NULL) {
259 		TraceEvent(SevError, "ErrorLoadingExternalClientLibrary").detail("LibraryPath", fdbCPath);
260 		throw platform_error();
261 	}
262 
263 	loadClientFunction(&api->selectApiVersion, lib, fdbCPath, "fdb_select_api_version_impl");
264 	loadClientFunction(&api->getClientVersion, lib, fdbCPath, "fdb_get_client_version", headerVersion >= 410);
265 	loadClientFunction(&api->setNetworkOption, lib, fdbCPath, "fdb_network_set_option");
266 	loadClientFunction(&api->setupNetwork, lib, fdbCPath, "fdb_setup_network");
267 	loadClientFunction(&api->runNetwork, lib, fdbCPath, "fdb_run_network");
268 	loadClientFunction(&api->stopNetwork, lib, fdbCPath, "fdb_stop_network");
269 	loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610);
270 
271 	loadClientFunction(&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction");
272 	loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option");
273 	loadClientFunction(&api->databaseDestroy, lib, fdbCPath, "fdb_database_destroy");
274 
275 	loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option");
276 	loadClientFunction(&api->transactionDestroy, lib, fdbCPath, "fdb_transaction_destroy");
277 	loadClientFunction(&api->transactionSetReadVersion, lib, fdbCPath, "fdb_transaction_set_read_version");
278 	loadClientFunction(&api->transactionGetReadVersion, lib, fdbCPath, "fdb_transaction_get_read_version");
279 	loadClientFunction(&api->transactionGet, lib, fdbCPath, "fdb_transaction_get");
280 	loadClientFunction(&api->transactionGetKey, lib, fdbCPath, "fdb_transaction_get_key");
281 	loadClientFunction(&api->transactionGetAddressesForKey, lib, fdbCPath, "fdb_transaction_get_addresses_for_key");
282 	loadClientFunction(&api->transactionGetRange, lib, fdbCPath, "fdb_transaction_get_range");
283 	loadClientFunction(&api->transactionGetVersionstamp, lib, fdbCPath, "fdb_transaction_get_versionstamp", headerVersion >= 410);
284 	loadClientFunction(&api->transactionSet, lib, fdbCPath, "fdb_transaction_set");
285 	loadClientFunction(&api->transactionClear, lib, fdbCPath, "fdb_transaction_clear");
286 	loadClientFunction(&api->transactionClearRange, lib, fdbCPath, "fdb_transaction_clear_range");
287 	loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op");
288 	loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit");
289 	loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version");
290 	loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch");
291 	loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error");
292 	loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
293 	loadClientFunction(&api->transactionCancel, lib, fdbCPath, "fdb_transaction_cancel");
294 	loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range");
295 
296 	loadClientFunction(&api->futureGetDatabase, lib, fdbCPath, "fdb_future_get_database");
297 	loadClientFunction(&api->futureGetVersion, lib, fdbCPath, "fdb_future_get_version");
298 	loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error");
299 	loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key");
300 	loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value");
301 	loadClientFunction(&api->futureGetStringArray, lib, fdbCPath, "fdb_future_get_string_array");
302 	loadClientFunction(&api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array");
303 	loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback");
304 	loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel");
305 	loadClientFunction(&api->futureDestroy, lib, fdbCPath, "fdb_future_destroy");
306 
307 	loadClientFunction(&api->createCluster, lib, fdbCPath, "fdb_create_cluster", headerVersion < 610);
308 	loadClientFunction(&api->clusterCreateDatabase, lib, fdbCPath, "fdb_cluster_create_database", headerVersion < 610);
309 	loadClientFunction(&api->clusterDestroy, lib, fdbCPath, "fdb_cluster_destroy", headerVersion < 610);
310 	loadClientFunction(&api->futureGetCluster, lib, fdbCPath, "fdb_future_get_cluster", headerVersion < 610);
311 }
312 
selectApiVersion(int apiVersion)313 void DLApi::selectApiVersion(int apiVersion) {
314 	// External clients must support at least this version
315 	// Versions newer than what we understand are rejected in the C bindings
316 	headerVersion = std::max(apiVersion, 400);
317 
318 	init();
319 	throwIfError(api->selectApiVersion(apiVersion, headerVersion));
320 	throwIfError(api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT, NULL, 0));
321 }
322 
getClientVersion()323 const char* DLApi::getClientVersion() {
324 	if(!api->getClientVersion) {
325 		return "unknown";
326 	}
327 
328 	return api->getClientVersion();
329 }
330 
setNetworkOption(FDBNetworkOptions::Option option,Optional<StringRef> value)331 void DLApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
332 	throwIfError(api->setNetworkOption(option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
333 }
334 
setupNetwork()335 void DLApi::setupNetwork() {
336 	networkSetup = true;
337 	throwIfError(api->setupNetwork());
338 }
339 
runNetwork()340 void DLApi::runNetwork() {
341 	auto e = api->runNetwork();
342 
343 	for(auto &hook : threadCompletionHooks) {
344 		try {
345 			hook.first(hook.second);
346 		}
347 		catch(Error &e) {
348 			TraceEvent(SevError, "NetworkShutdownHookError").error(e);
349 		}
350 		catch(...) {
351 			TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
352 		}
353 	}
354 
355 	throwIfError(e);
356 }
357 
stopNetwork()358 void DLApi::stopNetwork() {
359 	if(networkSetup) {
360 		throwIfError(api->stopNetwork());
361 	}
362 }
363 
createDatabase609(const char * clusterFilePath)364 Reference<IDatabase> DLApi::createDatabase609(const char *clusterFilePath) {
365 	FdbCApi::FDBFuture *f = api->createCluster(clusterFilePath);
366 
367 	auto clusterFuture = toThreadFuture<FdbCApi::FDBCluster*>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
368 		FdbCApi::FDBCluster *cluster;
369 		api->futureGetCluster(f, &cluster);
370 		return cluster;
371 	});
372 
373 	Reference<FdbCApi> innerApi = api;
374 	auto dbFuture = flatMapThreadFuture<FdbCApi::FDBCluster*, FdbCApi::FDBDatabase*>(clusterFuture, [innerApi](ErrorOr<FdbCApi::FDBCluster*> cluster) {
375 		if(cluster.isError()) {
376 			return ErrorOr<ThreadFuture<FdbCApi::FDBDatabase*>>(cluster.getError());
377 		}
378 
379 		auto innerDbFuture = toThreadFuture<FdbCApi::FDBDatabase*>(innerApi, innerApi->clusterCreateDatabase(cluster.get(), (uint8_t*)"DB", 2), [](FdbCApi::FDBFuture *f, FdbCApi *api) {
380 			FdbCApi::FDBDatabase *db;
381 			api->futureGetDatabase(f, &db);
382 			return db;
383 		});
384 
385 		return ErrorOr<ThreadFuture<FdbCApi::FDBDatabase*>>(mapThreadFuture<FdbCApi::FDBDatabase*, FdbCApi::FDBDatabase*>(innerDbFuture, [cluster, innerApi](ErrorOr<FdbCApi::FDBDatabase*> db) {
386 			innerApi->clusterDestroy(cluster.get());
387 			return db;
388 		}));
389 	});
390 
391 	return Reference<DLDatabase>(new DLDatabase(api, dbFuture));
392 }
393 
createDatabase(const char * clusterFilePath)394 Reference<IDatabase> DLApi::createDatabase(const char *clusterFilePath) {
395 	if(headerVersion >= 610) {
396 		FdbCApi::FDBDatabase *db;
397 		api->createDatabase(clusterFilePath, &db);
398 		return Reference<IDatabase>(new DLDatabase(api, db));
399 	}
400 	else {
401 		return DLApi::createDatabase609(clusterFilePath);
402 	}
403 }
404 
addNetworkThreadCompletionHook(void (* hook)(void *),void * hookParameter)405 void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) {
406 	MutexHolder holder(lock);
407 	threadCompletionHooks.push_back(std::make_pair(hook, hookParameter));
408 }
409 
410 // MultiVersionTransaction
MultiVersionTransaction(Reference<MultiVersionDatabase> db)411 MultiVersionTransaction::MultiVersionTransaction(Reference<MultiVersionDatabase> db) : db(db) {
412 	updateTransaction();
413 }
414 
415 // SOMEDAY: This function is unsafe if it's possible to set Database options that affect subsequently created transactions. There are currently no such options.
updateTransaction()416 void MultiVersionTransaction::updateTransaction() {
417 	auto currentDb = db->dbState->dbVar->get();
418 
419 	TransactionInfo newTr;
420 	if(currentDb.value) {
421 		newTr.transaction = currentDb.value->createTransaction();
422 	}
423 
424 	newTr.onChange = currentDb.onChange;
425 
426 	lock.enter();
427 	transaction = newTr;
428 	lock.leave();
429 }
430 
getTransaction()431 MultiVersionTransaction::TransactionInfo MultiVersionTransaction::getTransaction() {
432 	lock.enter();
433 	MultiVersionTransaction::TransactionInfo currentTr(transaction);
434 	lock.leave();
435 
436 	return currentTr;
437 }
438 
cancel()439 void MultiVersionTransaction::cancel() {
440 	auto tr = getTransaction();
441 	if(tr.transaction) {
442 		tr.transaction->cancel();
443 	}
444 }
445 
setVersion(Version v)446 void MultiVersionTransaction::setVersion(Version v) {
447 	auto tr = getTransaction();
448 	if(tr.transaction) {
449 		tr.transaction->setVersion(v);
450 	}
451 }
getReadVersion()452 ThreadFuture<Version> MultiVersionTransaction::getReadVersion() {
453 	auto tr = getTransaction();
454 	auto f = tr.transaction ? tr.transaction->getReadVersion() : ThreadFuture<Version>(Never());
455 	return abortableFuture(f, tr.onChange);
456 }
457 
get(const KeyRef & key,bool snapshot)458 ThreadFuture<Optional<Value>> MultiVersionTransaction::get(const KeyRef& key, bool snapshot) {
459 	auto tr = getTransaction();
460 	auto f = tr.transaction ? tr.transaction->get(key, snapshot) : ThreadFuture<Optional<Value>>(Never());
461 	return abortableFuture(f, tr.onChange);
462 }
463 
getKey(const KeySelectorRef & key,bool snapshot)464 ThreadFuture<Key> MultiVersionTransaction::getKey(const KeySelectorRef& key, bool snapshot) {
465 	auto tr = getTransaction();
466 	auto f = tr.transaction ? tr.transaction->getKey(key, snapshot) : ThreadFuture<Key>(Never());
467 	return abortableFuture(f, tr.onChange);
468 }
469 
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,int limit,bool snapshot,bool reverse)470 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse) {
471 	auto tr = getTransaction();
472 	auto f = tr.transaction ? tr.transaction->getRange(begin, end, limit, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
473 	return abortableFuture(f, tr.onChange);
474 }
475 
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,GetRangeLimits limits,bool snapshot,bool reverse)476 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot, bool reverse) {
477 	auto tr = getTransaction();
478 	auto f = tr.transaction ? tr.transaction->getRange(begin, end, limits, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
479 	return abortableFuture(f, tr.onChange);
480 }
481 
getRange(const KeyRangeRef & keys,int limit,bool snapshot,bool reverse)482 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeyRangeRef& keys, int limit, bool snapshot, bool reverse) {
483 	auto tr = getTransaction();
484 	auto f = tr.transaction ? tr.transaction->getRange(keys, limit, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
485 	return abortableFuture(f, tr.onChange);
486 }
487 
getRange(const KeyRangeRef & keys,GetRangeLimits limits,bool snapshot,bool reverse)488 ThreadFuture<Standalone<RangeResultRef>> MultiVersionTransaction::getRange(const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot, bool reverse) {
489 	auto tr = getTransaction();
490 	auto f = tr.transaction ? tr.transaction->getRange(keys, limits, snapshot, reverse) : ThreadFuture<Standalone<RangeResultRef>>(Never());
491 	return abortableFuture(f, tr.onChange);
492 }
493 
getVersionstamp()494 ThreadFuture<Standalone<StringRef>> MultiVersionTransaction::getVersionstamp() {
495 	auto tr = getTransaction();
496 	auto f = tr.transaction ? tr.transaction->getVersionstamp() : ThreadFuture<Standalone<StringRef>>(Never());
497 	return abortableFuture(f, tr.onChange);
498 }
499 
getAddressesForKey(const KeyRef & key)500 ThreadFuture<Standalone<VectorRef<const char*>>> MultiVersionTransaction::getAddressesForKey(const KeyRef& key) {
501 	auto tr = getTransaction();
502 	auto f = tr.transaction ? tr.transaction->getAddressesForKey(key) : ThreadFuture<Standalone<VectorRef<const char*>>>(Never());
503 	return abortableFuture(f, tr.onChange);
504 }
505 
addReadConflictRange(const KeyRangeRef & keys)506 void MultiVersionTransaction::addReadConflictRange(const KeyRangeRef& keys) {
507 	auto tr = getTransaction();
508 	if(tr.transaction) {
509 		tr.transaction->addReadConflictRange(keys);
510 	}
511 }
512 
atomicOp(const KeyRef & key,const ValueRef & value,uint32_t operationType)513 void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
514 	auto tr = getTransaction();
515 	if(tr.transaction) {
516 		tr.transaction->atomicOp(key, value, operationType);
517 	}
518 }
519 
set(const KeyRef & key,const ValueRef & value)520 void MultiVersionTransaction::set(const KeyRef& key, const ValueRef& value) {
521 	auto tr = getTransaction();
522 	if(tr.transaction) {
523 		tr.transaction->set(key, value);
524 	}
525 }
526 
clear(const KeyRef & begin,const KeyRef & end)527 void MultiVersionTransaction::clear(const KeyRef& begin, const KeyRef& end) {
528 	auto tr = getTransaction();
529 	if(tr.transaction) {
530 		tr.transaction->clear(begin, end);
531 	}
532 }
533 
clear(const KeyRangeRef & range)534 void MultiVersionTransaction::clear(const KeyRangeRef& range) {
535 	auto tr = getTransaction();
536 	if(tr.transaction) {
537 		tr.transaction->clear(range);
538 	}
539 }
540 
clear(const KeyRef & key)541 void MultiVersionTransaction::clear(const KeyRef& key) {
542 	auto tr = getTransaction();
543 	if(tr.transaction) {
544 		tr.transaction->clear(key);
545 	}
546 }
547 
watch(const KeyRef & key)548 ThreadFuture<Void> MultiVersionTransaction::watch(const KeyRef& key) {
549 	auto tr = getTransaction();
550 	auto f = tr.transaction ? tr.transaction->watch(key) : ThreadFuture<Void>(Never());
551 	return abortableFuture(f, tr.onChange);
552 }
553 
addWriteConflictRange(const KeyRangeRef & keys)554 void MultiVersionTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
555 	auto tr = getTransaction();
556 	if(tr.transaction) {
557 		tr.transaction->addWriteConflictRange(keys);
558 	}
559 }
560 
commit()561 ThreadFuture<Void> MultiVersionTransaction::commit() {
562 	auto tr = getTransaction();
563 	auto f = tr.transaction ? tr.transaction->commit() : ThreadFuture<Void>(Never());
564 	return abortableFuture(f, tr.onChange);
565 }
566 
getCommittedVersion()567 Version MultiVersionTransaction::getCommittedVersion() {
568 	auto tr = getTransaction();
569 	if(tr.transaction) {
570 		return tr.transaction->getCommittedVersion();
571 	}
572 
573 	return invalidVersion;
574 }
575 
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)576 void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
577 	auto tr = getTransaction();
578 	if(tr.transaction) {
579 		tr.transaction->setOption(option, value);
580 	}
581 }
582 
onError(Error const & e)583 ThreadFuture<Void> MultiVersionTransaction::onError(Error const& e) {
584 	if(e.code() == error_code_cluster_version_changed) {
585 		updateTransaction();
586 		return ThreadFuture<Void>(Void());
587 	}
588 	else {
589 		auto tr = getTransaction();
590 		auto f = tr.transaction ? tr.transaction->onError(e) : ThreadFuture<Void>(Never());
591 		return abortableFuture(f, tr.onChange);
592 	}
593 }
594 
reset()595 void MultiVersionTransaction::reset() {
596 	updateTransaction();
597 }
598 
599 // MultiVersionDatabase
MultiVersionDatabase(MultiVersionApi * api,std::string clusterFilePath,Reference<IDatabase> db,bool openConnectors)600 MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors) : dbState(new DatabaseState()) {
601 	dbState->db = db;
602 	dbState->dbVar->set(db);
603 
604 	if(!openConnectors) {
605 		dbState->currentClientIndex = 0;
606 	}
607 	else {
608 		if(!api->localClientDisabled) {
609 			dbState->currentClientIndex = 0;
610 			dbState->addConnection(api->getLocalClient(), clusterFilePath);
611 		}
612 		else {
613 			dbState->currentClientIndex = -1;
614 		}
615 
616 		api->runOnExternalClients([this, clusterFilePath](Reference<ClientInfo> client) {
617 			dbState->addConnection(client, clusterFilePath);
618 		});
619 
620 		dbState->startConnections();
621 	}
622 }
623 
~MultiVersionDatabase()624 MultiVersionDatabase::~MultiVersionDatabase() {
625 	dbState->cancelConnections();
626 }
627 
debugCreateFromExistingDatabase(Reference<IDatabase> db)628 Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
629 	return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, "", db, false));
630 }
631 
createTransaction()632 Reference<ITransaction> MultiVersionDatabase::createTransaction() {
633 	return Reference<ITransaction>(new MultiVersionTransaction(Reference<MultiVersionDatabase>::addRef(this)));
634 }
635 
setOption(FDBDatabaseOptions::Option option,Optional<StringRef> value)636 void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
637 	MutexHolder holder(dbState->optionLock);
638 
639 
640 	auto itr = FDBDatabaseOptions::optionInfo.find(option);
641 	if(itr != FDBDatabaseOptions::optionInfo.end()) {
642 		TraceEvent("SetDatabaseOption").detail("Option", itr->second.name);
643 	}
644 	else {
645 		TraceEvent("UnknownDatabaseOption").detail("Option", option);
646 		throw invalid_option();
647 	}
648 
649 	if(dbState->db) {
650 		dbState->db->setOption(option, value);
651 	}
652 
653 	dbState->options.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
654 }
655 
connect()656 void MultiVersionDatabase::Connector::connect() {
657 	addref();
658 	onMainThreadVoid([this]() {
659 		if(!cancelled) {
660 			connected = false;
661 			if(connectionFuture.isValid()) {
662 				connectionFuture.cancel();
663 			}
664 
665 			candidateDatabase = client->api->createDatabase(clusterFilePath.c_str());
666 			if(client->external) {
667 				connectionFuture = candidateDatabase.castTo<DLDatabase>()->onReady();
668 			}
669 			else {
670 				connectionFuture = ThreadFuture<Void>(Void());
671 			}
672 
673 			connectionFuture = flatMapThreadFuture<Void, Void>(connectionFuture, [this](ErrorOr<Void> ready) {
674 				if(ready.isError()) {
675 					return ErrorOr<ThreadFuture<Void>>(ready.getError());
676 				}
677 
678 				tr = candidateDatabase->createTransaction();
679 				return ErrorOr<ThreadFuture<Void>>(mapThreadFuture<Version, Void>(tr->getReadVersion(), [this](ErrorOr<Version> v) {
680 					// If the version attempt returns an error, we regard that as a connection (except operation_cancelled)
681 					if(v.isError() && v.getError().code() == error_code_operation_cancelled) {
682 						return ErrorOr<Void>(v.getError());
683 					}
684 					else {
685 						return ErrorOr<Void>(Void());
686 					}
687 				}));
688 			});
689 
690 
691 			int userParam;
692 			connectionFuture.callOrSetAsCallback(this, userParam, 0);
693 		}
694 		else {
695 			delref();
696 		}
697 	}, NULL);
698 }
699 
700 // Only called from main thread
cancel()701 void MultiVersionDatabase::Connector::cancel() {
702 	connected = false;
703 	cancelled = true;
704 	if(connectionFuture.isValid()) {
705 		connectionFuture.cancel();
706 	}
707 }
708 
fire(const Void & unused,int & userParam)709 void MultiVersionDatabase::Connector::fire(const Void &unused, int& userParam) {
710 	onMainThreadVoid([this]() {
711 		if(!cancelled) {
712 			connected = true;
713 			dbState->stateChanged();
714 		}
715 		delref();
716 	}, NULL);
717 }
718 
error(const Error & e,int & userParam)719 void MultiVersionDatabase::Connector::error(const Error& e, int& userParam) {
720 	if(e.code() != error_code_operation_cancelled) {
721 		// TODO: is it right to abandon this connection attempt?
722 		client->failed = true;
723 		MultiVersionApi::api->updateSupportedVersions();
724 		TraceEvent(SevError, "DatabaseConnectionError").error(e).detail("ClientLibrary", this->client->libPath);
725 	}
726 
727 	delref();
728 }
729 
DatabaseState()730 MultiVersionDatabase::DatabaseState::DatabaseState()
731 	: dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(NULL))), currentClientIndex(-1) {}
732 
733 // Only called from main thread
stateChanged()734 void MultiVersionDatabase::DatabaseState::stateChanged() {
735 	int newIndex = -1;
736 	for(int i = 0; i < clients.size(); ++i) {
737 		if(i != currentClientIndex && connectionAttempts[i]->connected) {
738 			if(currentClientIndex >= 0 && !clients[i]->canReplace(clients[currentClientIndex])) {
739 				TraceEvent(SevWarn, "DuplicateClientVersion").detail("Keeping", clients[currentClientIndex]->libPath).detail("KeptClientProtocolVersion", clients[currentClientIndex]->protocolVersion).detail("Disabling", clients[i]->libPath).detail("DisabledClientProtocolVersion", clients[i]->protocolVersion);
740 				connectionAttempts[i]->connected = false; // Permanently disable this client in favor of the current one
741 				clients[i]->failed = true;
742 				MultiVersionApi::api->updateSupportedVersions();
743 				return;
744 			}
745 
746 			newIndex = i;
747 			break;
748 		}
749 	}
750 
751 	if(newIndex == -1) {
752 		ASSERT(currentClientIndex == 0); // This can only happen for the local client, which we set as the current connection before we know it's connected
753 		return;
754 	}
755 
756 	// Restart connection for replaced client
757 	auto newDb = connectionAttempts[newIndex]->candidateDatabase;
758 
759 	optionLock.enter();
760 	for(auto option : options) {
761 		try {
762 			newDb->setOption(option.first, option.second.castTo<StringRef>()); // In practice, this will set a deferred error instead of throwing. If that happens, the database will be unusable (attempts to use it will throw errors).
763 		}
764 		catch(Error &e) {
765 			optionLock.leave();
766 			TraceEvent(SevError, "ClusterVersionChangeOptionError").error(e).detail("Option", option.first).detail("OptionValue", option.second).detail("LibPath", clients[newIndex]->libPath);
767 			connectionAttempts[newIndex]->connected = false;
768 			clients[newIndex]->failed = true;
769 			MultiVersionApi::api->updateSupportedVersions();
770 			return; // If we can't set all of the options on a cluster, we abandon the client
771 		}
772 	}
773 
774 	db = newDb;
775 	optionLock.leave();
776 
777 	dbVar->set(db);
778 
779 	if(currentClientIndex >= 0 && connectionAttempts[currentClientIndex]->connected) {
780 		connectionAttempts[currentClientIndex]->connected = false;
781 		connectionAttempts[currentClientIndex]->connect();
782 	}
783 
784 	ASSERT(newIndex >= 0 && newIndex < clients.size());
785 	currentClientIndex = newIndex;
786 }
787 
addConnection(Reference<ClientInfo> client,std::string clusterFilePath)788 void MultiVersionDatabase::DatabaseState::addConnection(Reference<ClientInfo> client, std::string clusterFilePath) {
789 	clients.push_back(client);
790 	connectionAttempts.push_back(Reference<Connector>(new Connector(Reference<DatabaseState>::addRef(this), client, clusterFilePath)));
791 }
792 
startConnections()793 void MultiVersionDatabase::DatabaseState::startConnections() {
794 	for(auto c : connectionAttempts) {
795 		c->connect();
796 	}
797 }
798 
cancelConnections()799 void MultiVersionDatabase::DatabaseState::cancelConnections() {
800 	addref();
801 	onMainThreadVoid([this](){
802 		for(auto c : connectionAttempts) {
803 			c->cancel();
804 		}
805 
806 		connectionAttempts.clear();
807 		clients.clear();
808 		delref();
809 	}, NULL);
810 }
811 
812 // MultiVersionApi
813 
814 // runOnFailedClients should be used cautiously. Some failed clients may not have successfully loaded all symbols.
runOnExternalClients(std::function<void (Reference<ClientInfo>)> func,bool runOnFailedClients)815 void MultiVersionApi::runOnExternalClients(std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
816 	bool newFailure = false;
817 
818 	auto c = externalClients.begin();
819 	while(c != externalClients.end()) {
820 		try {
821 			if(!c->second->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
822 				func(c->second);
823 			}
824 		}
825 		catch(Error &e) {
826 			if(e.code() == error_code_external_client_already_loaded) {
827 				TraceEvent(SevInfo, "ExternalClientAlreadyLoaded").error(e).detail("LibPath", c->second->libPath);
828 				c = externalClients.erase(c);
829 				continue;
830 			}
831 			else {
832 				TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", c->second->libPath);
833 				c->second->failed = true;
834 				newFailure = true;
835 			}
836 		}
837 
838 		++c;
839 	}
840 
841 	if(newFailure) {
842 		updateSupportedVersions();
843 	}
844 }
845 
getLocalClient()846 Reference<ClientInfo> MultiVersionApi::getLocalClient() {
847 	return localClient;
848 }
849 
selectApiVersion(int apiVersion)850 void MultiVersionApi::selectApiVersion(int apiVersion) {
851 	if(!localClient) {
852 		localClient = Reference<ClientInfo>(new ClientInfo(ThreadSafeApi::api));
853 	}
854 
855 	if(this->apiVersion != 0 && this->apiVersion != apiVersion) {
856 		throw api_version_already_set();
857 	}
858 
859 	localClient->api->selectApiVersion(apiVersion);
860 	this->apiVersion = apiVersion;
861 }
862 
getClientVersion()863 const char* MultiVersionApi::getClientVersion() {
864 	return localClient->api->getClientVersion();
865 }
866 
validateOption(Optional<StringRef> value,bool canBePresent,bool canBeAbsent,bool canBeEmpty=true)867 void validateOption(Optional<StringRef> value, bool canBePresent, bool canBeAbsent, bool canBeEmpty=true) {
868 	ASSERT(canBePresent || canBeAbsent);
869 
870 	if(!canBePresent && value.present() && (!canBeEmpty || value.get().size() > 0)) {
871 		throw invalid_option_value();
872 	}
873 	if(!canBeAbsent && (!value.present() || (!canBeEmpty && value.get().size() == 0))) {
874 		throw invalid_option_value();
875 	}
876 }
877 
disableMultiVersionClientApi()878 void MultiVersionApi::disableMultiVersionClientApi() {
879 	MutexHolder holder(lock);
880 	if(networkStartSetup || localClientDisabled) {
881 		throw invalid_option();
882 	}
883 
884 	bypassMultiClientApi = true;
885 }
886 
setCallbacksOnExternalThreads()887 void MultiVersionApi::setCallbacksOnExternalThreads() {
888 	MutexHolder holder(lock);
889 	if(networkStartSetup) {
890 		throw invalid_option();
891 	}
892 
893 	callbackOnMainThread = false;
894 }
895 
addExternalLibrary(std::string path)896 void MultiVersionApi::addExternalLibrary(std::string path) {
897 	std::string filename = basename(path);
898 
899 	if(filename.empty() || !fileExists(path)) {
900 		TraceEvent("ExternalClientNotFound").detail("LibraryPath", filename);
901 		throw file_not_found();
902 	}
903 
904 	MutexHolder holder(lock);
905 	if(networkStartSetup) {
906 		throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup
907 	}
908 
909 	if(externalClients.count(filename) == 0) {
910 		TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
911 		externalClients[filename] = Reference<ClientInfo>(new ClientInfo(new DLApi(path), path));
912 	}
913 }
914 
addExternalLibraryDirectory(std::string path)915 void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
916 	TraceEvent("AddingExternalClientDirectory").detail("Directory", path);
917 	std::vector<std::string> files = platform::listFiles(path, DYNAMIC_LIB_EXT);
918 
919 	MutexHolder holder(lock);
920 	if(networkStartSetup) {
921 		throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup. For directories, we can monitor them for the addition of new files.
922 	}
923 
924 	for(auto filename : files) {
925 		std::string lib = abspath(joinPath(path, filename));
926 		if(externalClients.count(filename) == 0) {
927 			TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
928 			externalClients[filename] = Reference<ClientInfo>(new ClientInfo(new DLApi(lib), lib));
929 		}
930 	}
931 }
932 
disableLocalClient()933 void MultiVersionApi::disableLocalClient() {
934 	MutexHolder holder(lock);
935 	if(networkStartSetup || bypassMultiClientApi) {
936 		throw invalid_option();
937 	}
938 
939 	localClientDisabled = true;
940 }
941 
setSupportedClientVersions(Standalone<StringRef> versions)942 void MultiVersionApi::setSupportedClientVersions(Standalone<StringRef> versions) {
943 	MutexHolder holder(lock);
944 	ASSERT(networkSetup);
945 
946 	// This option must be set on the main thread because it modifes structures that can be used concurrently by the main thread
947 	onMainThreadVoid([this, versions](){
948 		localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
949 	}, NULL);
950 
951 	if(!bypassMultiClientApi) {
952 		runOnExternalClients([this, versions](Reference<ClientInfo> client){
953 			client->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
954 		});
955 	}
956 }
957 
setNetworkOption(FDBNetworkOptions::Option option,Optional<StringRef> value)958 void MultiVersionApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
959 	if(option != FDBNetworkOptions::EXTERNAL_CLIENT && !externalClient) { // This is the first option set for external clients
960 		loadEnvironmentVariableNetworkOptions();
961 	}
962 
963 	setNetworkOptionInternal(option, value);
964 }
965 
setNetworkOptionInternal(FDBNetworkOptions::Option option,Optional<StringRef> value)966 void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option, Optional<StringRef> value) {
967 	auto itr = FDBNetworkOptions::optionInfo.find(option);
968 	if(itr != FDBNetworkOptions::optionInfo.end()) {
969 		TraceEvent("SetNetworkOption").detail("Option", itr->second.name);
970 	}
971 	else {
972 		TraceEvent("UnknownNetworkOption").detail("Option", option);
973 		throw invalid_option();
974 	}
975 
976 	if(option == FDBNetworkOptions::DISABLE_MULTI_VERSION_CLIENT_API) {
977 		validateOption(value, false, true);
978 		disableMultiVersionClientApi();
979 	}
980 	else if(option == FDBNetworkOptions::CALLBACKS_ON_EXTERNAL_THREADS) {
981 		validateOption(value, false, true);
982 		setCallbacksOnExternalThreads();
983 	}
984 	else if(option == FDBNetworkOptions::EXTERNAL_CLIENT_LIBRARY) {
985 		validateOption(value, true, false, false);
986 		addExternalLibrary(abspath(value.get().toString()));
987 	}
988 	else if(option == FDBNetworkOptions::EXTERNAL_CLIENT_DIRECTORY) {
989 		validateOption(value, true, false, false);
990 		addExternalLibraryDirectory(value.get().toString());
991 	}
992 	else if(option == FDBNetworkOptions::DISABLE_LOCAL_CLIENT) {
993 		validateOption(value, false, true);
994 		disableLocalClient();
995 	}
996 	else if(option == FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS) {
997 		ASSERT(value.present());
998 		setSupportedClientVersions(value.get());
999 	}
1000 	else if(option == FDBNetworkOptions::EXTERNAL_CLIENT) {
1001 		MutexHolder holder(lock);
1002 		ASSERT(!value.present() && !networkStartSetup);
1003 		externalClient = true;
1004 		bypassMultiClientApi = true;
1005 	}
1006 	else {
1007 		MutexHolder holder(lock);
1008 		localClient->api->setNetworkOption(option, value);
1009 
1010 		if(!bypassMultiClientApi) {
1011 			if(networkSetup) {
1012 				runOnExternalClients([this, option, value](Reference<ClientInfo> client) {
1013 					client->api->setNetworkOption(option, value);
1014 				});
1015 			}
1016 			else {
1017 				options.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
1018 			}
1019 		}
1020 	}
1021 }
1022 
setupNetwork()1023 void MultiVersionApi::setupNetwork() {
1024 	if(!externalClient) {
1025 		loadEnvironmentVariableNetworkOptions();
1026 	}
1027 
1028 	uint64_t transportId = 0;
1029 	{ // lock scope
1030 		MutexHolder holder(lock);
1031 		if(networkStartSetup) {
1032 			throw network_already_setup();
1033 		}
1034 
1035 		networkStartSetup = true;
1036 
1037 		if(externalClients.empty()) {
1038 			bypassMultiClientApi = true; // SOMEDAY: we won't be able to set this option once it becomes possible to add clients after setupNetwork is called
1039 		}
1040 
1041 		if(!bypassMultiClientApi) {
1042 			transportId = (uint64_t(uint32_t(platform::getRandomSeed())) << 32) ^ uint32_t(platform::getRandomSeed());
1043 			if(transportId <= 1) transportId += 2;
1044 			localClient->api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID, std::to_string(transportId));
1045 		}
1046 		localClient->api->setupNetwork();
1047 	}
1048 
1049 	localClient->loadProtocolVersion();
1050 
1051 	if(!bypassMultiClientApi) {
1052 		runOnExternalClients([this](Reference<ClientInfo> client) {
1053 			TraceEvent("InitializingExternalClient").detail("LibraryPath", client->libPath);
1054 			client->api->selectApiVersion(apiVersion);
1055 			client->loadProtocolVersion();
1056 		});
1057 
1058 		MutexHolder holder(lock);
1059 		runOnExternalClients([this, transportId](Reference<ClientInfo> client) {
1060 			for(auto option : options) {
1061 				client->api->setNetworkOption(option.first, option.second.castTo<StringRef>());
1062 			}
1063 			client->api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID, std::to_string(transportId));
1064 
1065 			client->api->setupNetwork();
1066 		});
1067 
1068 		networkSetup = true; // Needs to be guarded by mutex
1069 	}
1070 	else {
1071 		networkSetup = true;
1072 	}
1073 
1074 	options.clear();
1075 	updateSupportedVersions();
1076 }
1077 
runNetworkThread(void * param)1078 THREAD_FUNC_RETURN runNetworkThread(void *param) {
1079 	try {
1080 		((ClientInfo*)param)->api->runNetwork();
1081 	}
1082 	catch(Error &e) {
1083 		TraceEvent(SevError, "RunNetworkError").error(e);
1084 	}
1085 
1086 	THREAD_RETURN;
1087 }
1088 
runNetwork()1089 void MultiVersionApi::runNetwork() {
1090 	lock.enter();
1091 	if(!networkSetup) {
1092 		lock.leave();
1093 		throw network_not_setup();
1094 	}
1095 
1096 	lock.leave();
1097 
1098 	std::vector<THREAD_HANDLE> handles;
1099 	if(!bypassMultiClientApi) {
1100 		runOnExternalClients([&handles](Reference<ClientInfo> client) {
1101 			if(client->external) {
1102 				handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr()));
1103 			}
1104 		});
1105 	}
1106 
1107 	localClient->api->runNetwork();
1108 
1109 	for(auto h : handles) {
1110 		waitThread(h);
1111 	}
1112 }
1113 
stopNetwork()1114 void MultiVersionApi::stopNetwork() {
1115 	lock.enter();
1116 	if(!networkSetup) {
1117 		lock.leave();
1118 		throw network_not_setup();
1119 	}
1120 	lock.leave();
1121 
1122 	localClient->api->stopNetwork();
1123 
1124 	if(!bypassMultiClientApi) {
1125 		runOnExternalClients([](Reference<ClientInfo> client) {
1126 			client->api->stopNetwork();
1127 		}, true);
1128 	}
1129 }
1130 
addNetworkThreadCompletionHook(void (* hook)(void *),void * hookParameter)1131 void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) {
1132 	lock.enter();
1133 	if(!networkSetup) {
1134 		lock.leave();
1135 		throw network_not_setup();
1136 	}
1137 	lock.leave();
1138 
1139 	localClient->api->addNetworkThreadCompletionHook(hook, hookParameter);
1140 
1141 	if(!bypassMultiClientApi) {
1142 		runOnExternalClients([hook, hookParameter](Reference<ClientInfo> client) {
1143 			client->api->addNetworkThreadCompletionHook(hook, hookParameter);
1144 		});
1145 	}
1146 }
1147 
createDatabase(const char * clusterFilePath)1148 Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath) {
1149 	lock.enter();
1150 	if(!networkSetup) {
1151 		lock.leave();
1152 		throw network_not_setup();
1153 	}
1154 	lock.leave();
1155 
1156 	std::string clusterFile(clusterFilePath);
1157 	if(localClientDisabled) {
1158 		return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, Reference<IDatabase>()));
1159 	}
1160 
1161 	auto db = localClient->api->createDatabase(clusterFilePath);
1162 	if(bypassMultiClientApi) {
1163 		return db;
1164 	}
1165 	else {
1166 		for(auto it : externalClients) {
1167 			TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.second->libPath).detail("Failed", it.second->failed);
1168 		}
1169 		return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, db));
1170 	}
1171 }
1172 
updateSupportedVersions()1173 void MultiVersionApi::updateSupportedVersions() {
1174 	if(networkSetup) {
1175 		Standalone<VectorRef<uint8_t>> versionStr;
1176 
1177 		runOnExternalClients([&versionStr](Reference<ClientInfo> client){
1178 			const char *ver = client->api->getClientVersion();
1179 			versionStr.append(versionStr.arena(), (uint8_t*)ver, (int)strlen(ver));
1180 			versionStr.append(versionStr.arena(), (uint8_t*)";", 1);
1181 		});
1182 
1183 		if(!localClient->failed) {
1184 			const char *local = localClient->api->getClientVersion();
1185 			versionStr.append(versionStr.arena(), (uint8_t*)local, (int)strlen(local));
1186 		}
1187 		else {
1188 			versionStr.resize(versionStr.arena(), std::max(0, versionStr.size()-1));
1189 		}
1190 
1191 		setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, StringRef(versionStr.begin(), versionStr.size()));
1192 	}
1193 }
1194 
parseOptionValues(std::string valueStr)1195 std::vector<std::string> parseOptionValues(std::string valueStr) {
1196 	std::string specialCharacters = "\\";
1197 	specialCharacters += ENV_VAR_PATH_SEPARATOR;
1198 
1199 	std::vector<std::string> values;
1200 
1201 	size_t index = 0;
1202 	size_t nextIndex = 0;
1203 	std::stringstream ss;
1204 	while(true) {
1205 		nextIndex = valueStr.find_first_of(specialCharacters, index);
1206 		char c = nextIndex == valueStr.npos ? ENV_VAR_PATH_SEPARATOR : valueStr[nextIndex];
1207 
1208 		if(c == '\\') {
1209 			if(valueStr.size() == nextIndex + 1 || specialCharacters.find(valueStr[nextIndex+1]) == valueStr.npos) {
1210 				throw invalid_option_value();
1211 			}
1212 
1213 			ss << valueStr.substr(index, nextIndex-index);
1214 			ss << valueStr[nextIndex+1];
1215 
1216 			index = nextIndex + 2;
1217 		}
1218 		else if(c == ENV_VAR_PATH_SEPARATOR) {
1219 			ss << valueStr.substr(index, nextIndex-index);
1220 			values.push_back(ss.str());
1221 			ss.str(std::string());
1222 
1223 			if(nextIndex == valueStr.npos) {
1224 				break;
1225 			}
1226 			index = nextIndex + 1;
1227 		}
1228 		else {
1229 			ASSERT(false);
1230 		}
1231 	}
1232 
1233 	return values;
1234 }
1235 
1236 // This function sets all environment variable options which have not been set previously by a call to this function.
1237 // If an option has multiple values and setting one of those values failed with an error, then only those options
1238 // which were not successfully set will be set on subsequent calls.
loadEnvironmentVariableNetworkOptions()1239 void MultiVersionApi::loadEnvironmentVariableNetworkOptions() {
1240 	if(envOptionsLoaded) {
1241 		return;
1242 	}
1243 
1244 	for(auto option : FDBNetworkOptions::optionInfo) {
1245 		if(!option.second.hidden) {
1246 			std::string valueStr;
1247 			try {
1248 				if(platform::getEnvironmentVar(("FDB_NETWORK_OPTION_" + option.second.name).c_str(), valueStr)) {
1249 					size_t index = 0;
1250 					for(auto value : parseOptionValues(valueStr)) {
1251 						Standalone<StringRef> currentValue = StringRef(value);
1252 						{ // lock scope
1253 							MutexHolder holder(lock);
1254 							if(setEnvOptions[option.first].count(currentValue) == 0) {
1255 								setNetworkOptionInternal(option.first, currentValue);
1256 								setEnvOptions[option.first].insert(currentValue);
1257 							}
1258 						}
1259 					}
1260 				}
1261 			}
1262 			catch(Error &e) {
1263 				TraceEvent(SevError, "EnvironmentVariableNetworkOptionFailed").error(e).detail("Option", option.second.name).detail("Value", valueStr);
1264 				throw environment_variable_network_option_failed();
1265 			}
1266 		}
1267 	}
1268 
1269 	MutexHolder holder(lock);
1270 	envOptionsLoaded = true;
1271 }
1272 
MultiVersionApi()1273 MultiVersionApi::MultiVersionApi() : bypassMultiClientApi(false), networkStartSetup(false), networkSetup(false), callbackOnMainThread(true), externalClient(false), localClientDisabled(false), apiVersion(0), envOptionsLoaded(false) {}
1274 
1275 MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();
1276 
1277 // ClientInfo
loadProtocolVersion()1278 void ClientInfo::loadProtocolVersion() {
1279 	std::string version = api->getClientVersion();
1280 	if(version == "unknown") {
1281 		protocolVersion = 0;
1282 		return;
1283 	}
1284 
1285 	char *next;
1286 	std::string protocolVersionStr = ClientVersionRef(version).protocolVersion.toString();
1287 	protocolVersion = strtoull(protocolVersionStr.c_str(), &next, 16);
1288 
1289 	ASSERT(protocolVersion != 0 && protocolVersion != ULLONG_MAX);
1290 	ASSERT(next == &protocolVersionStr[protocolVersionStr.length()]);
1291 }
1292 
canReplace(Reference<ClientInfo> other) const1293 bool ClientInfo::canReplace(Reference<ClientInfo> other) const {
1294 	if(protocolVersion > other->protocolVersion) {
1295 		return true;
1296 	}
1297 
1298 	if(protocolVersion == other->protocolVersion && !external) {
1299 		return true;
1300 	}
1301 
1302 	return (protocolVersion & compatibleProtocolVersionMask) != (other->protocolVersion & compatibleProtocolVersionMask);
1303 }
1304 
1305 // UNIT TESTS
1306 extern bool noUnseed;
1307 
1308 TEST_CASE("/fdbclient/multiversionclient/EnvironmentVariableParsing" ) {
1309 	auto vals = parseOptionValues("a");
1310 	ASSERT(vals.size() == 1 && vals[0] == "a");
1311 
1312 	vals = parseOptionValues("abcde");
1313 	ASSERT(vals.size() == 1 && vals[0] == "abcde");
1314 
1315 	vals = parseOptionValues("");
1316 	ASSERT(vals.size() == 1 && vals[0] == "");
1317 
1318 	vals = parseOptionValues("a:b:c:d:e");
1319 	ASSERT(vals.size() == 5 && vals[0] == "a" && vals[1] == "b" && vals[2] == "c" && vals[3] == "d" && vals[4] == "e");
1320 
1321 	vals = parseOptionValues("\\\\a\\::\\:b:\\\\");
1322 	ASSERT(vals.size() == 3 && vals[0] == "\\a:" && vals[1] == ":b" && vals[2] == "\\");
1323 
1324 	vals = parseOptionValues("abcd:");
1325 	ASSERT(vals.size() == 2 && vals[0] == "abcd" && vals[1] == "");
1326 
1327 	vals = parseOptionValues(":abcd");
1328 	ASSERT(vals.size() == 2 && vals[0] == "" && vals[1] == "abcd");
1329 
1330 	vals = parseOptionValues(":");
1331 	ASSERT(vals.size() == 2 && vals[0] == "" && vals[1] == "");
1332 
1333 	try {
1334 		vals = parseOptionValues("\\x");
1335 		ASSERT(false);
1336 	}
1337 	catch(Error &e) {
1338 		ASSERT(e.code() == error_code_invalid_option_value);
1339 	}
1340 
1341 	return Void();
1342 }
1343 
1344 class ValidateFuture : public ThreadCallback {
1345 public:
ValidateFuture(ThreadFuture<int> f,ErrorOr<int> expectedValue,std::set<int> legalErrors)1346 	ValidateFuture(ThreadFuture<int> f, ErrorOr<int> expectedValue, std::set<int> legalErrors) : f(f), expectedValue(expectedValue), legalErrors(legalErrors) { }
1347 
canFire(int notMadeActive)1348 	virtual bool canFire(int notMadeActive) { return true; }
1349 
fire(const Void & unused,int & userParam)1350 	virtual void fire(const Void &unused, int& userParam) {
1351 		ASSERT(!f.isError() && !expectedValue.isError() && f.get() == expectedValue.get());
1352 		delete this;
1353 	}
1354 
error(const Error & e,int & userParam)1355 	virtual void error(const Error& e, int& userParam) {
1356 		ASSERT(legalErrors.count(e.code()) > 0 || (f.isError() && expectedValue.isError() && f.getError().code() == expectedValue.getError().code()));
1357 		delete this;
1358 	}
1359 
1360 private:
1361 	ThreadFuture<int> f;
1362 	ErrorOr<int> expectedValue;
1363 	std::set<int> legalErrors;
1364 };
1365 
1366 struct FutureInfo {
FutureInfoFutureInfo1367 	FutureInfo() {
1368 		if(g_random->coinflip()) {
1369 			expectedValue = Error(g_random->randomInt(1, 100));
1370 		}
1371 		else {
1372 			expectedValue = g_random->randomInt(0, 100);
1373 		}
1374 	}
1375 
FutureInfoFutureInfo1376 	FutureInfo(ThreadFuture<int> future, ErrorOr<int> expectedValue, std::set<int> legalErrors = std::set<int>()) : future(future), expectedValue(expectedValue), legalErrors(legalErrors) {}
1377 
validateFutureInfo1378 	void validate() {
1379 		int userParam;
1380 		future.callOrSetAsCallback(new ValidateFuture(future, expectedValue, legalErrors), userParam, 0);
1381 	}
1382 
1383 	ThreadFuture<int> future;
1384 	ErrorOr<int> expectedValue;
1385 	std::set<int> legalErrors;
1386 	std::vector<THREAD_HANDLE> threads;
1387 };
1388 
createVarOnMainThread(bool canBeNever=true)1389 FutureInfo createVarOnMainThread(bool canBeNever=true) {
1390 	FutureInfo f;
1391 
1392 	if(g_random->coinflip()) {
1393 		f.future = onMainThread([f, canBeNever]() {
1394 			Future<Void> sleep ;
1395 			if(canBeNever && g_random->coinflip()) {
1396 				sleep = Never();
1397 			}
1398 			else {
1399 				sleep = delay(0.1 * g_random->random01());
1400 			}
1401 
1402 			if(f.expectedValue.isError()) {
1403 				return tagError<int>(sleep, f.expectedValue.getError());
1404 			}
1405 			else {
1406 				return tag(sleep, f.expectedValue.get());
1407 			}
1408 		});
1409 	}
1410 	else if(f.expectedValue.isError()) {
1411 		f.future = f.expectedValue.getError();
1412 	}
1413 	else {
1414 		f.future = f.expectedValue.get();
1415 	}
1416 
1417 	return f;
1418 }
1419 
setAbort(void * arg)1420 THREAD_FUNC setAbort(void *arg) {
1421 	threadSleep(0.1 * g_random->random01());
1422 	try {
1423 		((ThreadSingleAssignmentVar<Void>*)arg)->send(Void());
1424 		((ThreadSingleAssignmentVar<Void>*)arg)->delref();
1425 	}
1426 	catch(Error &e) {
1427 		printf("Caught error in setAbort: %s\n", e.name());
1428 		ASSERT(false);
1429 	}
1430 	THREAD_RETURN;
1431 }
1432 
releaseMem(void * arg)1433 THREAD_FUNC releaseMem(void *arg) {
1434 	threadSleep(0.1 * g_random->random01());
1435 	try {
1436 		// Must get for releaseMemory to work
1437 		((ThreadSingleAssignmentVar<int>*)arg)->get();
1438 	}
1439 	catch(Error&) {
1440 		// Swallow
1441 	}
1442 	try {
1443 		((ThreadSingleAssignmentVar<int>*)arg)->releaseMemory();
1444 	}
1445 	catch(Error &e) {
1446 		printf("Caught error in releaseMem: %s\n", e.name());
1447 		ASSERT(false);
1448 	}
1449 	THREAD_RETURN;
1450 }
1451 
destroy(void * arg)1452 THREAD_FUNC destroy(void *arg) {
1453 	threadSleep(0.1 * g_random->random01());
1454 	try {
1455 		((ThreadSingleAssignmentVar<int>*)arg)->cancel();
1456 	}
1457 	catch(Error &e) {
1458 		printf("Caught error in destroy: %s\n", e.name());
1459 		ASSERT(false);
1460 	}
1461 	THREAD_RETURN;
1462 }
1463 
cancel(void * arg)1464 THREAD_FUNC cancel(void *arg) {
1465 	threadSleep(0.1 * g_random->random01());
1466 	try {
1467 		((ThreadSingleAssignmentVar<int>*)arg)->addref();
1468 		destroy(arg);
1469 	}
1470 	catch(Error &e) {
1471 		printf("Caught error in cancel: %s\n", e.name());
1472 		ASSERT(false);
1473 	}
1474 	THREAD_RETURN;
1475 }
1476 
checkUndestroyedFutures(std::vector<ThreadSingleAssignmentVar<int> * > undestroyed)1477 ACTOR Future<Void> checkUndestroyedFutures(std::vector<ThreadSingleAssignmentVar<int>*> undestroyed) {
1478 	state int fNum;
1479 	state ThreadSingleAssignmentVar<int>* f;
1480 	state double start = now();
1481 
1482 	for(fNum = 0; fNum < undestroyed.size(); ++fNum) {
1483 		f = undestroyed[fNum];
1484 
1485 		while(!f->isReady() && start+5 >= now()) {
1486 			wait(delay(1.0));
1487 		}
1488 
1489 		ASSERT(f->isReady());
1490 	}
1491 
1492 	wait(delay(1.0));
1493 
1494 	for(fNum = 0; fNum < undestroyed.size(); ++fNum) {
1495 		f = undestroyed[fNum];
1496 
1497 		ASSERT(f->debugGetReferenceCount() == 1);
1498 		ASSERT(f->isReady());
1499 
1500 		f->cancel();
1501 	}
1502 
1503 	return Void();
1504 }
1505 
1506 template<class T>
runSingleAssignmentVarTest(void * arg)1507 THREAD_FUNC runSingleAssignmentVarTest(void *arg) {
1508 	noUnseed = true;
1509 
1510 	volatile bool *done = (volatile bool*)arg;
1511 	try {
1512 		for(int i = 0; i < 25; ++i) {
1513 			FutureInfo f = createVarOnMainThread(false);
1514 			FutureInfo tf = T::createThreadFuture(f);
1515 			tf.validate();
1516 
1517 			tf.future.extractPtr(); // leaks
1518 		}
1519 
1520 		for(int numRuns = 0; numRuns < 25; ++numRuns) {
1521 			std::vector<ThreadSingleAssignmentVar<int>*> undestroyed;
1522 			std::vector<THREAD_HANDLE> threads;
1523 			for(int i = 0; i < 10; ++i) {
1524 				FutureInfo f = createVarOnMainThread();
1525 				f.legalErrors.insert(error_code_operation_cancelled);
1526 
1527 				FutureInfo tf = T::createThreadFuture(f);
1528 				for(auto t : tf.threads) {
1529 					threads.push_back(t);
1530 				}
1531 
1532 				tf.legalErrors.insert(error_code_operation_cancelled);
1533 				tf.validate();
1534 
1535 				auto tfp = tf.future.extractPtr();
1536 
1537 				if(g_random->coinflip()) {
1538 					if(g_random->coinflip()) {
1539 						threads.push_back(g_network->startThread(releaseMem, tfp));
1540 					}
1541 					threads.push_back(g_network->startThread(cancel, tfp));
1542 					undestroyed.push_back((ThreadSingleAssignmentVar<int>*)tfp);
1543 				}
1544 				else {
1545 					threads.push_back(g_network->startThread(destroy, tfp));
1546 				}
1547 			}
1548 
1549 			for(auto t : threads) {
1550 				waitThread(t);
1551 			}
1552 
1553 			ThreadFuture<Void> checkUndestroyed = onMainThread([undestroyed]() {
1554 				return checkUndestroyedFutures(undestroyed);
1555 			});
1556 
1557 			checkUndestroyed.blockUntilReady();
1558 		}
1559 
1560 		onMainThreadVoid([done](){
1561 			*done = true;
1562 		}, NULL);
1563 	}
1564 	catch(Error &e) {
1565 		printf("Caught error in test: %s\n", e.name());
1566 		*done = true;
1567 		ASSERT(false);
1568 	}
1569 
1570 	THREAD_RETURN;
1571 }
1572 
1573 struct AbortableTest {
createThreadFutureAbortableTest1574 	static FutureInfo createThreadFuture(FutureInfo f) {
1575 		ThreadSingleAssignmentVar<Void> *abort = new ThreadSingleAssignmentVar<Void>();
1576 		abort->addref(); // this leaks if abort is never set
1577 
1578 		auto newFuture = FutureInfo(abortableFuture(f.future, ThreadFuture<Void>(abort)), f.expectedValue, f.legalErrors);
1579 
1580 		if(!abort->isReady() && g_random->coinflip()) {
1581 			ASSERT(abort->status == ThreadSingleAssignmentVarBase::Unset);
1582 			newFuture.threads.push_back(g_network->startThread(setAbort, abort));
1583 		}
1584 
1585 		newFuture.legalErrors.insert(error_code_cluster_version_changed);
1586 		return newFuture;
1587 	}
1588 };
1589 
1590 TEST_CASE("/fdbclient/multiversionclient/AbortableSingleAssignmentVar" ) {
1591 	state volatile bool done = false;
1592 	g_network->startThread(runSingleAssignmentVarTest<AbortableTest>, (void*)&done);
1593 
1594 	while(!done) {
1595 		wait(delay(1.0));
1596 	}
1597 
1598 	return Void();
1599 }
1600 
1601 class CAPICallback : public ThreadCallback {
1602 public:
CAPICallback(void (* callbackf)(FdbCApi::FDBFuture *,void *),FdbCApi::FDBFuture * f,void * userdata)1603 	CAPICallback(void (*callbackf)(FdbCApi::FDBFuture*, void*), FdbCApi::FDBFuture* f, void* userdata)
1604 		: callbackf(callbackf), f(f), userdata(userdata) {}
1605 
canFire(int notMadeActive)1606 	virtual bool canFire(int notMadeActive) { return true; }
fire(const Void & unused,int & userParam)1607 	virtual void fire(const Void& unused, int& userParam) {
1608 		(*callbackf)(f, userdata);
1609 		delete this;
1610 	}
error(const Error & e,int & userParam)1611 	virtual void error(const Error& e, int& userParam) {
1612 		(*callbackf)(f, userdata);
1613 		delete this;
1614 	}
1615 
1616 private:
1617 	void (*callbackf)(FdbCApi::FDBFuture*, void*);
1618 	FdbCApi::FDBFuture* f;
1619 	void* userdata;
1620 };
1621 
1622 struct DLTest {
createThreadFutureDLTest1623 	static FutureInfo createThreadFuture(FutureInfo f) {
1624 		return FutureInfo(toThreadFuture<int>(getApi(), (FdbCApi::FDBFuture*)f.future.extractPtr(), [](FdbCApi::FDBFuture *f, FdbCApi *api) {
1625 			ASSERT(((ThreadSingleAssignmentVar<int>*)f)->debugGetReferenceCount() >= 1);
1626 			return ((ThreadSingleAssignmentVar<int>*)f)->get();
1627 		}), f.expectedValue, f.legalErrors);
1628 	}
1629 
getApiDLTest1630 	static Reference<FdbCApi> getApi() {
1631 		static Reference<FdbCApi> api;
1632 		if(!api) {
1633 			api = Reference<FdbCApi>(new FdbCApi());
1634 
1635 			// Functions needed for DLSingleAssignmentVar
1636 			api->futureSetCallback = [](FdbCApi::FDBFuture *f, FdbCApi::FDBCallback callback, void *callbackParameter) {
1637 				try {
1638 					CAPICallback* cb = new CAPICallback(callback, f, callbackParameter);
1639 					int ignore;
1640 					((ThreadSingleAssignmentVarBase*)f)->callOrSetAsCallback(cb, ignore, 0);
1641 					return FdbCApi::fdb_error_t(error_code_success);
1642 				}
1643 				catch(Error &e) {
1644 					return FdbCApi::fdb_error_t(e.code());
1645 				}
1646 			};
1647 			api->futureCancel = [](FdbCApi::FDBFuture *f) {
1648 				((ThreadSingleAssignmentVarBase*)f)->addref();
1649 				((ThreadSingleAssignmentVarBase*)f)->cancel();
1650 			};
1651 			api->futureGetError = [](FdbCApi::FDBFuture *f) { return FdbCApi::fdb_error_t(((ThreadSingleAssignmentVarBase*)f)->getErrorCode()); };
1652 			api->futureDestroy = [](FdbCApi::FDBFuture *f) { ((ThreadSingleAssignmentVarBase*)f)->cancel(); };
1653 		}
1654 
1655 		return api;
1656 	}
1657 };
1658 
1659 TEST_CASE("/fdbclient/multiversionclient/DLSingleAssignmentVar" ) {
1660 	state volatile bool done = false;
1661 
1662 	MultiVersionApi::api->callbackOnMainThread = true;
1663 	g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
1664 
1665 	while(!done) {
1666 		wait(delay(1.0));
1667 	}
1668 
1669 	done = false;
1670 	MultiVersionApi::api->callbackOnMainThread = false;
1671 	g_network->startThread(runSingleAssignmentVarTest<DLTest>, (void*)&done);
1672 
1673 	while(!done) {
1674 		wait(delay(1.0));
1675 	}
1676 
1677 	return Void();
1678 }
1679 
1680 struct MapTest {
createThreadFutureMapTest1681 	static FutureInfo createThreadFuture(FutureInfo f) {
1682 		FutureInfo newFuture;
1683 		newFuture.legalErrors = f.legalErrors;
1684 		newFuture.future = mapThreadFuture<int, int>(f.future, [f, newFuture](ErrorOr<int> v) {
1685 			if(v.isError()) {
1686 				ASSERT(f.legalErrors.count(v.getError().code()) > 0 || (f.expectedValue.isError() && f.expectedValue.getError().code() == v.getError().code()));
1687 			}
1688 			else {
1689 				ASSERT(!f.expectedValue.isError() && f.expectedValue.get() == v.get());
1690 			}
1691 
1692 			return newFuture.expectedValue;
1693 		});
1694 
1695 		return newFuture;
1696 	}
1697 };
1698 
1699 TEST_CASE("/fdbclient/multiversionclient/MapSingleAssignmentVar" ) {
1700 	state volatile bool done = false;
1701 	g_network->startThread(runSingleAssignmentVarTest<MapTest>, (void*)&done);
1702 
1703 	while(!done) {
1704 		wait(delay(1.0));
1705 	}
1706 
1707 	return Void();
1708 }
1709 
1710 struct FlatMapTest {
createThreadFutureFlatMapTest1711 	static FutureInfo createThreadFuture(FutureInfo f) {
1712 		FutureInfo mapFuture = createVarOnMainThread();
1713 
1714 		return FutureInfo(flatMapThreadFuture<int, int>(f.future, [f, mapFuture](ErrorOr<int> v) {
1715 			if(v.isError()) {
1716 				ASSERT(f.legalErrors.count(v.getError().code()) > 0 || (f.expectedValue.isError() && f.expectedValue.getError().code() == v.getError().code()));
1717 			}
1718 			else {
1719 				ASSERT(!f.expectedValue.isError() && f.expectedValue.get() == v.get());
1720 			}
1721 
1722 			if(mapFuture.expectedValue.isError() && g_random->coinflip()) {
1723 				return ErrorOr<ThreadFuture<int>>(mapFuture.expectedValue.getError());
1724 			}
1725 			else {
1726 				return ErrorOr<ThreadFuture<int>>(mapFuture.future);
1727 			}
1728 		}), mapFuture.expectedValue, f.legalErrors);
1729 	}
1730 };
1731 
1732 TEST_CASE("/fdbclient/multiversionclient/FlatMapSingleAssignmentVar" ) {
1733 	state volatile bool done = false;
1734 	g_network->startThread(runSingleAssignmentVarTest<FlatMapTest>, (void*)&done);
1735 
1736 	while(!done) {
1737 		wait(delay(1.0));
1738 	}
1739 
1740 	return Void();
1741 }
1742