1 /*
2  * fdb_flow.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdb_flow.h"
22 
23 #include "flow/DeterministicRandom.h"
24 #include "flow/SystemMonitor.h"
25 
26 #include <stdio.h>
27 
28 using namespace FDB;
29 
networkThread(void * fdb)30 THREAD_FUNC networkThread(void* fdb) {
31 	((FDB::API*)fdb)->runNetwork();
32 	THREAD_RETURN;
33 }
34 
_test()35 ACTOR Future<Void> _test() {
36 	API *fdb = FDB::API::selectAPIVersion(610);
37 	auto db = fdb->createDatabase();
38 	state Reference<Transaction> tr = db->createTransaction();
39 
40 	// tr->setVersion(1);
41 
42 	Version ver = wait( tr->getReadVersion() );
43 	printf("%lld\n", ver);
44 
45 	state std::vector< Future<Version> > versions;
46 
47 	state double starttime = timer_monotonic();
48 	state int i;
49 	// for (i = 0; i < 100000; i++) {
50 	// 	Version v = wait( tr->getReadVersion() );
51 	// }
52 	for ( i = 0; i < 100000; i++ ) {
53 		versions.push_back( tr->getReadVersion() );
54 	}
55 	for ( i = 0; i < 100000; i++ ) {
56 		Version v = wait( versions[i] );
57 	}
58 	// wait( waitForAllReady( versions ) );
59 	printf("Elapsed: %lf\n", timer_monotonic() - starttime );
60 
61 	tr->set( LiteralStringRef("foo"), LiteralStringRef("bar") );
62 
63 	Optional< FDBStandalone<ValueRef> > v = wait( tr->get( LiteralStringRef("foo") ) );
64 	if ( v.present() ) {
65 		printf("%s\n", v.get().toString().c_str() );
66 	}
67 
68 	FDBStandalone<RangeResultRef> r = wait( tr->getRange( KeyRangeRef( LiteralStringRef("a"), LiteralStringRef("z") ), 100 ) );
69 
70 	for ( auto kv : r ) {
71 		printf("%s is %s\n", kv.key.toString().c_str(), kv.value.toString().c_str());
72 	}
73 
74 	g_network->stop();
75 	return Void();
76 }
77 
fdb_flow_test()78 void fdb_flow_test() {
79 	API *fdb = FDB::API::selectAPIVersion(610);
80 	fdb->setupNetwork();
81 	startThread(networkThread, fdb);
82 
83 	int randomSeed = platform::getRandomSeed();
84 
85 	g_random = new DeterministicRandom(randomSeed);
86 	g_nondeterministic_random = new DeterministicRandom(platform::getRandomSeed());
87 	g_debug_random = new DeterministicRandom(platform::getRandomSeed());
88 
89 	g_network = newNet2( false );
90 
91 	openTraceFile(NetworkAddress(), 1000000, 1000000, ".");
92 	systemMonitor();
93 	uncancellable(recurring(&systemMonitor, 5.0, TaskFlushTrace));
94 
95 	Future<Void> t = _test();
96 
97 	g_network->run();
98 }
99 
100 namespace FDB {
101 	class DatabaseImpl : public Database, NonCopyable {
102 	public:
~DatabaseImpl()103 		virtual ~DatabaseImpl() { fdb_database_destroy(db); }
104 
105 		Reference<Transaction> createTransaction() override;
106 		void setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value = Optional<StringRef>()) override;
107 
108 	private:
109 		FDBDatabase* db;
DatabaseImpl(FDBDatabase * db)110 		explicit DatabaseImpl(FDBDatabase* db) : db(db) {}
111 
112 		friend class API;
113 	};
114 
115 	class TransactionImpl : public Transaction, private NonCopyable, public FastAllocated<TransactionImpl> {
116 		friend class DatabaseImpl;
117 
118 	public:
~TransactionImpl()119 		virtual ~TransactionImpl() {
120 			if (tr) {
121 				fdb_transaction_destroy(tr);
122 			}
123 		}
124 
125 		void setReadVersion(Version v) override;
126 		Future<Version> getReadVersion() override;
127 
128 		Future<Optional<FDBStandalone<ValueRef>>> get(const Key& key, bool snapshot = false) override;
129 		Future<FDBStandalone<KeyRef>> getKey(const KeySelector& key, bool snapshot = false) override;
130 
131 		Future<Void> watch(const Key& key) override;
132 
133 		using Transaction::getRange;
134 		Future<FDBStandalone<RangeResultRef>> getRange(const KeySelector& begin, const KeySelector& end,
135 													   GetRangeLimits limits = GetRangeLimits(), bool snapshot = false,
136 													   bool reverse = false,
137 													   FDBStreamingMode streamingMode = FDB_STREAMING_MODE_SERIAL) override;
138 
139 		void addReadConflictRange(KeyRangeRef const& keys) override;
140 		void addReadConflictKey(KeyRef const& key) override;
141 		void addWriteConflictRange(KeyRangeRef const& keys) override;
142 		void addWriteConflictKey(KeyRef const& key) override;
143 
144 		void atomicOp(const KeyRef& key, const ValueRef& operand, FDBMutationType operationType) override;
145 		void set(const KeyRef& key, const ValueRef& value) override;
146 		void clear(const KeyRangeRef& range) override;
147 		void clear(const KeyRef& key) override;
148 
149 		Future<Void> commit() override;
150 		Version getCommittedVersion() override;
151 		Future<FDBStandalone<StringRef>> getVersionstamp() override;
152 
153 		void setOption(FDBTransactionOption option, Optional<StringRef> value = Optional<StringRef>()) override;
154 
155 		Future<Void> onError(Error const& e) override;
156 
157 		void cancel() override;
158 		void reset() override;
159 
TransactionImpl()160 		TransactionImpl() : tr(NULL) {}
TransactionImpl(TransactionImpl && r)161 		TransactionImpl(TransactionImpl&& r) BOOST_NOEXCEPT {
162 			tr = r.tr;
163 			r.tr = NULL;
164 		}
operator =(TransactionImpl && r)165 		TransactionImpl& operator=(TransactionImpl&& r) BOOST_NOEXCEPT {
166 			tr = r.tr;
167 			r.tr = NULL;
168 			return *this;
169 		}
170 
171 	private:
172 		FDBTransaction* tr;
173 
174 		explicit TransactionImpl(FDBDatabase* db);
175 	};
176 
throw_on_error(fdb_error_t e)177 	static inline void throw_on_error( fdb_error_t e ) {
178 		if (e)
179 			throw Error(e);
180 	}
181 
blockUntilReady()182 	void CFuture::blockUntilReady() {
183 		throw_on_error( fdb_future_block_until_ready( f ) );
184 	}
185 
backToFutureCallback(FDBFuture * f,void * data)186 	void backToFutureCallback( FDBFuture* f, void* data ) {
187 		g_network->onMainThread( Promise<Void>((SAV<Void>*)data), TaskDefaultOnMainThread ); // SOMEDAY: think about this priority
188 	}
189 
190 	// backToFuture<Type>( FDBFuture*, (FDBFuture* -> Type) ) -> Future<Type>
191 	// Takes an FDBFuture (from the alien client world, with callbacks potentially firing on an alien thread)
192 	//   and converts it into a Future<T> (with callbacks working on this thread, cancellation etc).
193 	// You must pass as the second parameter a function which takes a ready FDBFuture* and returns a value of Type
backToFuture(FDBFuture * _f,Function convertValue)194 	ACTOR template<class T, class Function> static Future<T> backToFuture( FDBFuture* _f, Function convertValue ) {
195 		state Reference<CFuture> f( new CFuture(_f) );
196 
197 		Promise<Void> ready;
198 		Future<Void> onReady = ready.getFuture();
199 
200 		throw_on_error( fdb_future_set_callback( f->f, backToFutureCallback, ready.extractRawPointer() ) );
201 		wait( onReady );
202 
203 		return convertValue( f );
204 	}
205 
setNetworkOption(FDBNetworkOption option,Optional<StringRef> value)206 	void API::setNetworkOption( FDBNetworkOption option, Optional<StringRef> value ) {
207 		if ( value.present() )
208 			throw_on_error( fdb_network_set_option( option, value.get().begin(), value.get().size() ) );
209 		else
210 			throw_on_error( fdb_network_set_option( option, NULL, 0 ) );
211 	}
212 
213 	API* API::instance = NULL;
API(int version)214 	API::API(int version) : version(version) {}
215 
selectAPIVersion(int apiVersion)216 	API* API::selectAPIVersion(int apiVersion) {
217 		if(API::instance) {
218 			if(apiVersion != API::instance->version) {
219 				throw api_version_already_set();
220 			}
221 			else {
222 				return API::instance;
223 			}
224 		}
225 
226 		if(apiVersion < 500 || apiVersion > FDB_API_VERSION) {
227 			throw api_version_not_supported();
228 		}
229 
230 		throw_on_error( fdb_select_api_version_impl(apiVersion, FDB_API_VERSION) );
231 
232 		API::instance = new API(apiVersion);
233 		return API::instance;
234 	}
235 
isAPIVersionSelected()236 	bool API::isAPIVersionSelected() {
237 		return API::instance != NULL;
238 	}
239 
getInstance()240 	API* API::getInstance() {
241 		if(API::instance == NULL) {
242 			throw api_version_unset();
243 		}
244 		else {
245 			return API::instance;
246 		}
247 	}
248 
setupNetwork()249 	void API::setupNetwork() {
250 		throw_on_error( fdb_setup_network() );
251 	}
252 
runNetwork()253 	void API::runNetwork() {
254 		throw_on_error( fdb_run_network() );
255 	}
256 
stopNetwork()257 	void API::stopNetwork() {
258 		throw_on_error( fdb_stop_network() );
259 	}
260 
evaluatePredicate(FDBErrorPredicate pred,Error const & e)261 	bool API::evaluatePredicate(FDBErrorPredicate pred, Error const& e) {
262 		return fdb_error_predicate( pred, e.code() );
263 	}
264 
createDatabase(std::string const & connFilename)265 	Reference<Database> API::createDatabase(std::string const& connFilename) {
266 		FDBDatabase *db;
267 		throw_on_error(fdb_create_database(connFilename.c_str(), &db));
268 		return Reference<Database>(new DatabaseImpl(db));
269 	}
270 
getAPIVersion() const271 	int API::getAPIVersion() const {
272 		return version;
273 	}
274 
createTransaction()275 	Reference<Transaction> DatabaseImpl::createTransaction() {
276 		return Reference<Transaction>(new TransactionImpl(db));
277 	}
278 
setDatabaseOption(FDBDatabaseOption option,Optional<StringRef> value)279 	void DatabaseImpl::setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value) {
280 		if (value.present())
281 			throw_on_error(fdb_database_set_option(db, option, value.get().begin(), value.get().size()));
282 		else
283 			throw_on_error(fdb_database_set_option(db, option, NULL, 0));
284 	}
285 
TransactionImpl(FDBDatabase * db)286 	TransactionImpl::TransactionImpl(FDBDatabase* db) {
287 		throw_on_error(fdb_database_create_transaction(db, &tr));
288 	}
289 
setReadVersion(Version v)290 	void TransactionImpl::setReadVersion(Version v) {
291 		fdb_transaction_set_read_version( tr, v );
292 	}
293 
getReadVersion()294 	Future<Version> TransactionImpl::getReadVersion() {
295 		return backToFuture<Version>( fdb_transaction_get_read_version( tr ), [](Reference<CFuture> f){
296 				Version value;
297 
298 				throw_on_error( fdb_future_get_version( f->f, &value ) );
299 
300 				return value;
301 			} );
302 	}
303 
get(const Key & key,bool snapshot)304 	Future<Optional<FDBStandalone<ValueRef>>> TransactionImpl::get(const Key& key, bool snapshot) {
305 		return backToFuture< Optional<FDBStandalone<ValueRef>> >( fdb_transaction_get( tr, key.begin(), key.size(), snapshot ), [](Reference<CFuture> f) {
306 				fdb_bool_t present;
307 				uint8_t const* value;
308 				int value_length;
309 
310 				throw_on_error( fdb_future_get_value( f->f, &present, &value, &value_length ) );
311 
312 				if ( present ) {
313 					return Optional<FDBStandalone<ValueRef>>( FDBStandalone<ValueRef>( f, ValueRef( value, value_length ) ) );
314 				} else {
315 					return Optional<FDBStandalone<ValueRef>>();
316 				}
317 			} );
318 	}
319 
watch(const Key & key)320 	Future<Void> TransactionImpl::watch(const Key& key) {
321 		return backToFuture< Void >( fdb_transaction_watch( tr, key.begin(), key.size() ), [](Reference<CFuture> f) {
322 				throw_on_error( fdb_future_get_error( f->f ) );
323 				return Void();
324 			} );
325 	}
326 
getKey(const KeySelector & key,bool snapshot)327 	Future<FDBStandalone<KeyRef>> TransactionImpl::getKey(const KeySelector& key, bool snapshot) {
328 		return backToFuture< FDBStandalone<KeyRef> >( fdb_transaction_get_key( tr, key.key.begin(), key.key.size(), key.orEqual, key.offset, snapshot ), [](Reference<CFuture> f) {
329 				uint8_t const* key;
330 				int key_length;
331 
332 				throw_on_error( fdb_future_get_key( f->f, &key, &key_length ) );
333 
334 				return FDBStandalone<KeyRef>( f, KeyRef( key, key_length ) );
335 			} );
336 	}
337 
getRange(const KeySelector & begin,const KeySelector & end,GetRangeLimits limits,bool snapshot,bool reverse,FDBStreamingMode streamingMode)338 	Future<FDBStandalone<RangeResultRef>> TransactionImpl::getRange(const KeySelector& begin, const KeySelector& end, GetRangeLimits limits, bool snapshot, bool reverse, FDBStreamingMode streamingMode) {
339 		// FIXME: iteration
340 		return backToFuture< FDBStandalone<RangeResultRef> >( fdb_transaction_get_range( tr, begin.key.begin(), begin.key.size(), begin.orEqual, begin.offset, end.key.begin(), end.key.size(), end.orEqual, end.offset, limits.rows, limits.bytes, streamingMode, 1, snapshot, reverse ), [](Reference<CFuture> f) {
341 				FDBKeyValue const* kv;
342 				int count;
343 				fdb_bool_t more;
344 
345 				throw_on_error( fdb_future_get_keyvalue_array( f->f, &kv, &count, &more ) );
346 
347 				return FDBStandalone<RangeResultRef>( f, RangeResultRef( VectorRef<KeyValueRef>( (KeyValueRef*)kv, count ), more ) );
348 			} );
349 	}
350 
addReadConflictRange(KeyRangeRef const & keys)351 	void TransactionImpl::addReadConflictRange(KeyRangeRef const& keys) {
352 		throw_on_error( fdb_transaction_add_conflict_range( tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_READ ) );
353 	}
354 
addReadConflictKey(KeyRef const & key)355 	void TransactionImpl::addReadConflictKey(KeyRef const& key) {
356 		return addReadConflictRange(KeyRange(KeyRangeRef(key, keyAfter(key))));
357 	}
358 
addWriteConflictRange(KeyRangeRef const & keys)359 	void TransactionImpl::addWriteConflictRange(KeyRangeRef const& keys) {
360 		throw_on_error( fdb_transaction_add_conflict_range( tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_WRITE ) );
361 	}
362 
addWriteConflictKey(KeyRef const & key)363 	void TransactionImpl::addWriteConflictKey(KeyRef const& key) {
364 		return addWriteConflictRange(KeyRange(KeyRangeRef(key, keyAfter(key))));
365 	}
366 
atomicOp(const KeyRef & key,const ValueRef & operand,FDBMutationType operationType)367 	void TransactionImpl::atomicOp(const KeyRef& key, const ValueRef& operand, FDBMutationType operationType) {
368 		fdb_transaction_atomic_op( tr, key.begin(), key.size(), operand.begin(), operand.size(), operationType );
369 	}
370 
set(const KeyRef & key,const ValueRef & value)371 	void TransactionImpl::set(const KeyRef& key, const ValueRef& value) {
372 		fdb_transaction_set( tr, key.begin(), key.size(), value.begin(), value.size() );
373 	}
374 
clear(const KeyRangeRef & range)375 	void TransactionImpl::clear(const KeyRangeRef& range) {
376 		fdb_transaction_clear_range( tr, range.begin.begin(), range.begin.size(), range.end.begin(), range.end.size() );
377 	}
378 
clear(const KeyRef & key)379 	void TransactionImpl::clear(const KeyRef& key) {
380 		fdb_transaction_clear( tr, key.begin(), key.size() );
381 	}
382 
commit()383 	Future<Void> TransactionImpl::commit() {
384 		return backToFuture< Void >( fdb_transaction_commit( tr ), [](Reference<CFuture> f) {
385 				throw_on_error( fdb_future_get_error( f->f ) );
386 				return Void();
387 			} );
388 	}
389 
getCommittedVersion()390 	Version TransactionImpl::getCommittedVersion() {
391 		Version v;
392 
393 		throw_on_error( fdb_transaction_get_committed_version( tr, &v ) );
394 		return v;
395 	}
396 
getVersionstamp()397 	Future<FDBStandalone<StringRef>> TransactionImpl::getVersionstamp() {
398 		return backToFuture<FDBStandalone<KeyRef>>(fdb_transaction_get_versionstamp(tr), [](Reference<CFuture> f) {
399 			uint8_t const* key;
400 			int key_length;
401 
402 			throw_on_error( fdb_future_get_key( f->f, &key, &key_length ) );
403 
404 			return FDBStandalone<StringRef>( f, StringRef( key, key_length ) );
405 		});
406 	}
407 
setOption(FDBTransactionOption option,Optional<StringRef> value)408 	void TransactionImpl::setOption(FDBTransactionOption option, Optional<StringRef> value) {
409 		if ( value.present() ) {
410 			throw_on_error( fdb_transaction_set_option( tr, option, value.get().begin(), value.get().size() ) );
411 		} else {
412 			throw_on_error( fdb_transaction_set_option( tr, option, NULL, 0 ) );
413 		}
414 	}
415 
onError(Error const & e)416 	Future<Void> TransactionImpl::onError(Error const& e) {
417 		return backToFuture< Void >( fdb_transaction_on_error( tr, e.code() ), [](Reference<CFuture> f) {
418 				throw_on_error( fdb_future_get_error( f->f ) );
419 				return Void();
420 			} );
421 	}
422 
cancel()423 	void TransactionImpl::cancel() {
424 		fdb_transaction_cancel( tr );
425 	}
426 
reset()427 	void TransactionImpl::reset() {
428 		fdb_transaction_reset( tr );
429 	}
430 
printable(const StringRef & val)431 	std::string printable( const StringRef& val ) {
432 		std::string s;
433 		for(int i=0; i<val.size(); i++) {
434 			uint8_t b = val[i];
435 			if (b >= 32 && b < 127 && b != '\\') s += (char)b;
436 			else if (b == '\\') s += "\\\\";
437 			else s += format("\\x%02x", b);
438 		}
439 		return s;
440 	}
441 
442 }
443