1 /*
2  * ThreadSafeTransaction.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbclient/ThreadSafeTransaction.h"
22 #include "fdbclient/ReadYourWrites.h"
23 #include "fdbclient/DatabaseContext.h"
24 #if defined(CMAKE_BUILD) || !defined(WIN32)
25 #include "versions.h"
26 #endif
27 #include <new>
28 
29 // Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't call addRef (e.g. C API follows this).
30 // Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any of these functions.
31 
onConnected()32 ThreadFuture<Void> ThreadSafeDatabase::onConnected() {
33 	DatabaseContext *db = this->db;
34 	return onMainThread( [db]() -> Future<Void> {
35 		db->checkDeferredError();
36 		return db->onConnected();
37 	} );
38 }
39 
createFromExistingDatabase(Database db)40 ThreadFuture<Reference<IDatabase>> ThreadSafeDatabase::createFromExistingDatabase(Database db) {
41 	return onMainThread( [db](){
42 		db->checkDeferredError();
43 		DatabaseContext *cx = db.getPtr();
44 		cx->addref();
45 		return Future<Reference<IDatabase>>(Reference<IDatabase>(new ThreadSafeDatabase(cx)));
46 	});
47 }
48 
createTransaction()49 Reference<ITransaction> ThreadSafeDatabase::createTransaction() {
50 	return Reference<ITransaction>(new ThreadSafeTransaction(db));
51 }
52 
setOption(FDBDatabaseOptions::Option option,Optional<StringRef> value)53 void ThreadSafeDatabase::setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value) {
54 	DatabaseContext *db = this->db;
55 	Standalone<Optional<StringRef>> passValue = value;
56 	onMainThreadVoid( [db, option, passValue](){
57 		db->checkDeferredError();
58 		db->setOption(option, passValue.contents());
59 	}, &db->deferredError );
60 }
61 
ThreadSafeDatabase(std::string connFilename,int apiVersion)62 ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
63 	Reference<ClusterConnectionFile> connFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first));
64 
65 	// Allocate memory for the Database from this thread (so the pointer is known for subsequent method calls)
66 	// but run its constructor on the main thread
67 	DatabaseContext *db = this->db = DatabaseContext::allocateOnForeignThread();
68 
69 	onMainThreadVoid([db, connFile, apiVersion](){
70 		try {
71 			Database::createDatabase(connFile, apiVersion, LocalityData(), db).extractPtr();
72 		}
73 		catch(Error &e) {
74 			new (db) DatabaseContext(e);
75 		}
76 		catch(...) {
77 			new (db) DatabaseContext(unknown_error());
78 		}
79 	}, NULL);
80 }
81 
~ThreadSafeDatabase()82 ThreadSafeDatabase::~ThreadSafeDatabase() {
83 	DatabaseContext *db = this->db;
84 	onMainThreadVoid( [db](){ db->delref(); }, NULL );
85 }
86 
ThreadSafeTransaction(DatabaseContext * cx)87 ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx) {
88 	// Allocate memory for the transaction from this thread (so the pointer is known for subsequent method calls)
89 	// but run its constructor on the main thread
90 
91 	// It looks strange that the DatabaseContext::addref is deferred by the onMainThreadVoid call, but it is safe
92 	// because the reference count of the DatabaseContext is solely managed from the main thread.  If cx is destructed
93 	// immediately after this call, it will defer the DatabaseContext::delref (and onMainThread preserves the order of
94 	// these operations).
95 	ReadYourWritesTransaction *tr = this->tr = ReadYourWritesTransaction::allocateOnForeignThread();
96 	// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
97 	onMainThreadVoid(
98 	    [tr, cx]() {
99 		    cx->addref();
100 		    new (tr) ReadYourWritesTransaction(Database(cx));
101 	    },
102 	    NULL);
103 }
104 
~ThreadSafeTransaction()105 ThreadSafeTransaction::~ThreadSafeTransaction() {
106 	ReadYourWritesTransaction *tr = this->tr;
107 	if (tr)
108 		onMainThreadVoid( [tr](){ tr->delref(); }, NULL );
109 }
110 
cancel()111 void ThreadSafeTransaction::cancel() {
112 	ReadYourWritesTransaction *tr = this->tr;
113 	onMainThreadVoid( [tr](){ tr->cancel(); }, NULL );
114 }
115 
setVersion(Version v)116 void ThreadSafeTransaction::setVersion( Version v ) {
117 	ReadYourWritesTransaction *tr = this->tr;
118 	onMainThreadVoid( [tr, v](){ tr->setVersion(v); }, &tr->deferredError );
119 }
120 
getReadVersion()121 ThreadFuture<Version> ThreadSafeTransaction::getReadVersion() {
122 	ReadYourWritesTransaction *tr = this->tr;
123 	return onMainThread( [tr]() -> Future<Version> {
124 			tr->checkDeferredError();
125 			return tr->getReadVersion();
126 		} );
127 }
128 
get(const KeyRef & key,bool snapshot)129 ThreadFuture< Optional<Value> > ThreadSafeTransaction::get( const KeyRef& key, bool snapshot ) {
130 	Key k = key;
131 
132 	ReadYourWritesTransaction *tr = this->tr;
133 	return onMainThread( [tr, k, snapshot]() -> Future< Optional<Value> > {
134 			tr->checkDeferredError();
135 			return tr->get(k, snapshot);
136 		} );
137 }
138 
getKey(const KeySelectorRef & key,bool snapshot)139 ThreadFuture< Key > ThreadSafeTransaction::getKey( const KeySelectorRef& key, bool snapshot ) {
140 	KeySelector k = key;
141 
142 	ReadYourWritesTransaction *tr = this->tr;
143 	return onMainThread( [tr, k, snapshot]() -> Future< Key > {
144 			tr->checkDeferredError();
145 			return tr->getKey(k, snapshot);
146 		} );
147 }
148 
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,int limit,bool snapshot,bool reverse)149 ThreadFuture< Standalone<RangeResultRef> > ThreadSafeTransaction::getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse ) {
150 	KeySelector b = begin;
151 	KeySelector e = end;
152 
153 	ReadYourWritesTransaction *tr = this->tr;
154 	return onMainThread( [tr, b, e, limit, snapshot, reverse]() -> Future< Standalone<RangeResultRef> > {
155 			tr->checkDeferredError();
156 			return tr->getRange(b, e, limit, snapshot, reverse);
157 		} );
158 }
159 
getRange(const KeySelectorRef & begin,const KeySelectorRef & end,GetRangeLimits limits,bool snapshot,bool reverse)160 ThreadFuture< Standalone<RangeResultRef> > ThreadSafeTransaction::getRange( const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot, bool reverse ) {
161 	KeySelector b = begin;
162 	KeySelector e = end;
163 
164 	ReadYourWritesTransaction *tr = this->tr;
165 	return onMainThread( [tr, b, e, limits, snapshot, reverse]() -> Future< Standalone<RangeResultRef> > {
166 			tr->checkDeferredError();
167 			return tr->getRange(b, e, limits, snapshot, reverse);
168 		} );
169 }
170 
getAddressesForKey(const KeyRef & key)171 ThreadFuture<Standalone<VectorRef<const char*>>> ThreadSafeTransaction::getAddressesForKey( const KeyRef& key ) {
172 	Key k = key;
173 
174 	ReadYourWritesTransaction *tr = this->tr;
175 	return onMainThread( [tr, k]() -> Future< Standalone<VectorRef<const char*> >> {
176 		tr->checkDeferredError();
177 		return tr->getAddressesForKey(k);
178 	} );
179 }
180 
addReadConflictRange(const KeyRangeRef & keys)181 void ThreadSafeTransaction::addReadConflictRange( const KeyRangeRef& keys) {
182 	KeyRange r = keys;
183 
184 	ReadYourWritesTransaction *tr = this->tr;
185 	onMainThreadVoid( [tr, r](){ tr->addReadConflictRange(r); }, &tr->deferredError );
186 }
187 
makeSelfConflicting()188 void ThreadSafeTransaction::makeSelfConflicting() {
189 	ReadYourWritesTransaction *tr = this->tr;
190 	onMainThreadVoid( [tr](){ tr->makeSelfConflicting(); }, &tr->deferredError );
191 }
192 
atomicOp(const KeyRef & key,const ValueRef & value,uint32_t operationType)193 void ThreadSafeTransaction::atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType ) {
194 	Key k = key;
195 	Value v = value;
196 
197 	ReadYourWritesTransaction *tr = this->tr;
198 	onMainThreadVoid( [tr, k, v, operationType](){ tr->atomicOp(k, v, operationType); }, &tr->deferredError );
199 }
200 
set(const KeyRef & key,const ValueRef & value)201 void ThreadSafeTransaction::set( const KeyRef& key, const ValueRef& value ) {
202 	Key k = key;
203 	Value v = value;
204 
205 	ReadYourWritesTransaction *tr = this->tr;
206 	onMainThreadVoid( [tr, k, v](){ tr->set(k, v); }, &tr->deferredError );
207 }
208 
clear(const KeyRangeRef & range)209 void ThreadSafeTransaction::clear( const KeyRangeRef& range ) {
210 	KeyRange r = range;
211 
212 	ReadYourWritesTransaction *tr = this->tr;
213 	onMainThreadVoid( [tr, r](){ tr->clear(r); }, &tr->deferredError );
214 }
215 
clear(const KeyRef & begin,const KeyRef & end)216 void ThreadSafeTransaction::clear( const KeyRef& begin, const KeyRef& end ) {
217 	Key b = begin;
218 	Key e = end;
219 
220 	ReadYourWritesTransaction *tr = this->tr;
221 	onMainThreadVoid( [tr, b, e](){
222 		if(b > e)
223 			throw inverted_range();
224 
225 		tr->clear(KeyRangeRef(b, e));
226 	}, &tr->deferredError );
227 }
228 
clear(const KeyRef & key)229 void ThreadSafeTransaction::clear( const KeyRef& key ) {
230 	Key k = key;
231 
232 	ReadYourWritesTransaction *tr = this->tr;
233 	onMainThreadVoid( [tr, k](){ tr->clear(k); }, &tr->deferredError );
234 }
235 
watch(const KeyRef & key)236 ThreadFuture< Void > ThreadSafeTransaction::watch( const KeyRef& key ) {
237 	Key k = key;
238 
239 	ReadYourWritesTransaction *tr = this->tr;
240 	return onMainThread( [tr, k]() -> Future< Void > {
241 		tr->checkDeferredError();
242 		return tr->watch(k);
243 	});
244 }
245 
addWriteConflictRange(const KeyRangeRef & keys)246 void ThreadSafeTransaction::addWriteConflictRange( const KeyRangeRef& keys) {
247 	KeyRange r = keys;
248 
249 	ReadYourWritesTransaction *tr = this->tr;
250 	onMainThreadVoid( [tr, r](){ tr->addWriteConflictRange(r); }, &tr->deferredError );
251 }
252 
commit()253 ThreadFuture< Void > ThreadSafeTransaction::commit() {
254 	ReadYourWritesTransaction *tr = this->tr;
255 	return onMainThread( [tr]() -> Future< Void > {
256 			tr->checkDeferredError();
257 			return tr->commit();
258 		} );
259 }
260 
getCommittedVersion()261 Version ThreadSafeTransaction::getCommittedVersion() {
262 	// This should be thread safe when called legally, but it is fragile
263 	Version v = tr->getCommittedVersion();
264 	return v;
265 }
266 
getVersionstamp()267 ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {
268 	ReadYourWritesTransaction *tr = this->tr;
269 	return onMainThread([tr]() -> Future < Standalone<StringRef> > {
270 		return tr->getVersionstamp();
271 	});
272 }
273 
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)274 void ThreadSafeTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
275 	ReadYourWritesTransaction *tr = this->tr;
276 	Standalone<Optional<StringRef>> passValue = value;
277 	onMainThreadVoid( [tr, option, passValue](){ tr->setOption(option, passValue.contents()); }, &tr->deferredError );
278 }
279 
checkDeferredError()280 ThreadFuture<Void> ThreadSafeTransaction::checkDeferredError() {
281 	ReadYourWritesTransaction *tr = this->tr;
282 	return onMainThread( [tr](){
283 		try {
284 			tr->checkDeferredError();
285 		} catch (Error &e) {
286 			tr->deferredError = Error();
287 			return Future<Void>(e);
288 		}
289 		return Future<Void>(Void());
290 	} );
291 }
292 
onError(Error const & e)293 ThreadFuture<Void> ThreadSafeTransaction::onError( Error const& e ) {
294 	ReadYourWritesTransaction *tr = this->tr;
295 	return onMainThread( [tr, e](){ return tr->onError(e); } );
296 }
297 
operator =(ThreadSafeTransaction && r)298 void ThreadSafeTransaction::operator=(ThreadSafeTransaction&& r) BOOST_NOEXCEPT {
299 	tr = r.tr;
300 	r.tr = NULL;
301 }
302 
ThreadSafeTransaction(ThreadSafeTransaction && r)303 ThreadSafeTransaction::ThreadSafeTransaction(ThreadSafeTransaction&& r) BOOST_NOEXCEPT {
304 	tr = r.tr;
305 	r.tr = NULL;
306 }
307 
reset()308 void ThreadSafeTransaction::reset() {
309 	ReadYourWritesTransaction *tr = this->tr;
310 	onMainThreadVoid( [tr](){ tr->reset(); }, NULL );
311 }
312 
313 extern const char* getHGVersion();
314 
ThreadSafeApi()315 ThreadSafeApi::ThreadSafeApi() : apiVersion(-1), clientVersion(format("%s,%s,%llx", FDB_VT_VERSION, getHGVersion(), currentProtocolVersion)), transportId(0) {}
316 
selectApiVersion(int apiVersion)317 void ThreadSafeApi::selectApiVersion(int apiVersion) {
318 	this->apiVersion = apiVersion;
319 }
320 
getClientVersion()321 const char* ThreadSafeApi::getClientVersion() {
322 	// There is only one copy of the ThreadSafeAPI, and it never gets deleted. Also, clientVersion is never modified.
323 	return clientVersion.c_str();
324 }
325 
setNetworkOption(FDBNetworkOptions::Option option,Optional<StringRef> value)326 void ThreadSafeApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
327 	if (option == FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID) {
328 		if(value.present()) {
329 			transportId = std::stoull(value.get().toString().c_str());
330 		}
331 	}
332 	else {
333 		::setNetworkOption(option, value);
334 	}
335 }
336 
setupNetwork()337 void ThreadSafeApi::setupNetwork() {
338 	::setupNetwork(transportId);
339 }
340 
runNetwork()341 void ThreadSafeApi::runNetwork() {
342 	Optional<Error> runErr;
343 	try {
344 		::runNetwork();
345 	}
346 	catch(Error &e) {
347 		runErr = e;
348 	}
349 
350 	for(auto &hook : threadCompletionHooks) {
351 		try {
352 			hook.first(hook.second);
353 		}
354 		catch(Error &e) {
355 			TraceEvent(SevError, "NetworkShutdownHookError").error(e);
356 		}
357 		catch(...) {
358 			TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
359 		}
360 	}
361 
362 	if(runErr.present()) {
363 		throw runErr.get();
364 	}
365 
366 }
367 
stopNetwork()368 void ThreadSafeApi::stopNetwork() {
369 	::stopNetwork();
370 }
371 
createDatabase(const char * clusterFilePath)372 Reference<IDatabase> ThreadSafeApi::createDatabase(const char *clusterFilePath) {
373 	return Reference<IDatabase>(new ThreadSafeDatabase(clusterFilePath, apiVersion));
374 }
375 
addNetworkThreadCompletionHook(void (* hook)(void *),void * hookParameter)376 void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) {
377 	if (!g_network) {
378 		throw network_not_setup();
379 	}
380 
381 	MutexHolder holder(lock); // We could use the network thread to protect this action, but then we can't guarantee upon return that the hook is set.
382 	threadCompletionHooks.push_back(std::make_pair(hook, hookParameter));
383 }
384 
385 
386 IClientApi* ThreadSafeApi::api = new ThreadSafeApi();
387