1 /*
2 * fdb_flow.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdb_flow.h"
22
23 #include "flow/DeterministicRandom.h"
24 #include "flow/SystemMonitor.h"
25
26 #include <stdio.h>
27
28 using namespace FDB;
29
networkThread(void * fdb)30 THREAD_FUNC networkThread(void* fdb) {
31 ((FDB::API*)fdb)->runNetwork();
32 THREAD_RETURN;
33 }
34
_test()35 ACTOR Future<Void> _test() {
36 API *fdb = FDB::API::selectAPIVersion(610);
37 auto db = fdb->createDatabase();
38 state Reference<Transaction> tr = db->createTransaction();
39
40 // tr->setVersion(1);
41
42 Version ver = wait( tr->getReadVersion() );
43 printf("%lld\n", ver);
44
45 state std::vector< Future<Version> > versions;
46
47 state double starttime = timer_monotonic();
48 state int i;
49 // for (i = 0; i < 100000; i++) {
50 // Version v = wait( tr->getReadVersion() );
51 // }
52 for ( i = 0; i < 100000; i++ ) {
53 versions.push_back( tr->getReadVersion() );
54 }
55 for ( i = 0; i < 100000; i++ ) {
56 Version v = wait( versions[i] );
57 }
58 // wait( waitForAllReady( versions ) );
59 printf("Elapsed: %lf\n", timer_monotonic() - starttime );
60
61 tr->set( LiteralStringRef("foo"), LiteralStringRef("bar") );
62
63 Optional< FDBStandalone<ValueRef> > v = wait( tr->get( LiteralStringRef("foo") ) );
64 if ( v.present() ) {
65 printf("%s\n", v.get().toString().c_str() );
66 }
67
68 FDBStandalone<RangeResultRef> r = wait( tr->getRange( KeyRangeRef( LiteralStringRef("a"), LiteralStringRef("z") ), 100 ) );
69
70 for ( auto kv : r ) {
71 printf("%s is %s\n", kv.key.toString().c_str(), kv.value.toString().c_str());
72 }
73
74 g_network->stop();
75 return Void();
76 }
77
fdb_flow_test()78 void fdb_flow_test() {
79 API *fdb = FDB::API::selectAPIVersion(610);
80 fdb->setupNetwork();
81 startThread(networkThread, fdb);
82
83 int randomSeed = platform::getRandomSeed();
84
85 g_random = new DeterministicRandom(randomSeed);
86 g_nondeterministic_random = new DeterministicRandom(platform::getRandomSeed());
87 g_debug_random = new DeterministicRandom(platform::getRandomSeed());
88
89 g_network = newNet2( false );
90
91 openTraceFile(NetworkAddress(), 1000000, 1000000, ".");
92 systemMonitor();
93 uncancellable(recurring(&systemMonitor, 5.0, TaskFlushTrace));
94
95 Future<Void> t = _test();
96
97 g_network->run();
98 }
99
100 namespace FDB {
101 class DatabaseImpl : public Database, NonCopyable {
102 public:
~DatabaseImpl()103 virtual ~DatabaseImpl() { fdb_database_destroy(db); }
104
105 Reference<Transaction> createTransaction() override;
106 void setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value = Optional<StringRef>()) override;
107
108 private:
109 FDBDatabase* db;
DatabaseImpl(FDBDatabase * db)110 explicit DatabaseImpl(FDBDatabase* db) : db(db) {}
111
112 friend class API;
113 };
114
115 class TransactionImpl : public Transaction, private NonCopyable, public FastAllocated<TransactionImpl> {
116 friend class DatabaseImpl;
117
118 public:
~TransactionImpl()119 virtual ~TransactionImpl() {
120 if (tr) {
121 fdb_transaction_destroy(tr);
122 }
123 }
124
125 void setReadVersion(Version v) override;
126 Future<Version> getReadVersion() override;
127
128 Future<Optional<FDBStandalone<ValueRef>>> get(const Key& key, bool snapshot = false) override;
129 Future<FDBStandalone<KeyRef>> getKey(const KeySelector& key, bool snapshot = false) override;
130
131 Future<Void> watch(const Key& key) override;
132
133 using Transaction::getRange;
134 Future<FDBStandalone<RangeResultRef>> getRange(const KeySelector& begin, const KeySelector& end,
135 GetRangeLimits limits = GetRangeLimits(), bool snapshot = false,
136 bool reverse = false,
137 FDBStreamingMode streamingMode = FDB_STREAMING_MODE_SERIAL) override;
138
139 void addReadConflictRange(KeyRangeRef const& keys) override;
140 void addReadConflictKey(KeyRef const& key) override;
141 void addWriteConflictRange(KeyRangeRef const& keys) override;
142 void addWriteConflictKey(KeyRef const& key) override;
143
144 void atomicOp(const KeyRef& key, const ValueRef& operand, FDBMutationType operationType) override;
145 void set(const KeyRef& key, const ValueRef& value) override;
146 void clear(const KeyRangeRef& range) override;
147 void clear(const KeyRef& key) override;
148
149 Future<Void> commit() override;
150 Version getCommittedVersion() override;
151 Future<FDBStandalone<StringRef>> getVersionstamp() override;
152
153 void setOption(FDBTransactionOption option, Optional<StringRef> value = Optional<StringRef>()) override;
154
155 Future<Void> onError(Error const& e) override;
156
157 void cancel() override;
158 void reset() override;
159
TransactionImpl()160 TransactionImpl() : tr(NULL) {}
TransactionImpl(TransactionImpl && r)161 TransactionImpl(TransactionImpl&& r) BOOST_NOEXCEPT {
162 tr = r.tr;
163 r.tr = NULL;
164 }
operator =(TransactionImpl && r)165 TransactionImpl& operator=(TransactionImpl&& r) BOOST_NOEXCEPT {
166 tr = r.tr;
167 r.tr = NULL;
168 return *this;
169 }
170
171 private:
172 FDBTransaction* tr;
173
174 explicit TransactionImpl(FDBDatabase* db);
175 };
176
throw_on_error(fdb_error_t e)177 static inline void throw_on_error( fdb_error_t e ) {
178 if (e)
179 throw Error(e);
180 }
181
blockUntilReady()182 void CFuture::blockUntilReady() {
183 throw_on_error( fdb_future_block_until_ready( f ) );
184 }
185
backToFutureCallback(FDBFuture * f,void * data)186 void backToFutureCallback( FDBFuture* f, void* data ) {
187 g_network->onMainThread( Promise<Void>((SAV<Void>*)data), TaskDefaultOnMainThread ); // SOMEDAY: think about this priority
188 }
189
190 // backToFuture<Type>( FDBFuture*, (FDBFuture* -> Type) ) -> Future<Type>
191 // Takes an FDBFuture (from the alien client world, with callbacks potentially firing on an alien thread)
192 // and converts it into a Future<T> (with callbacks working on this thread, cancellation etc).
193 // You must pass as the second parameter a function which takes a ready FDBFuture* and returns a value of Type
backToFuture(FDBFuture * _f,Function convertValue)194 ACTOR template<class T, class Function> static Future<T> backToFuture( FDBFuture* _f, Function convertValue ) {
195 state Reference<CFuture> f( new CFuture(_f) );
196
197 Promise<Void> ready;
198 Future<Void> onReady = ready.getFuture();
199
200 throw_on_error( fdb_future_set_callback( f->f, backToFutureCallback, ready.extractRawPointer() ) );
201 wait( onReady );
202
203 return convertValue( f );
204 }
205
setNetworkOption(FDBNetworkOption option,Optional<StringRef> value)206 void API::setNetworkOption( FDBNetworkOption option, Optional<StringRef> value ) {
207 if ( value.present() )
208 throw_on_error( fdb_network_set_option( option, value.get().begin(), value.get().size() ) );
209 else
210 throw_on_error( fdb_network_set_option( option, NULL, 0 ) );
211 }
212
213 API* API::instance = NULL;
API(int version)214 API::API(int version) : version(version) {}
215
selectAPIVersion(int apiVersion)216 API* API::selectAPIVersion(int apiVersion) {
217 if(API::instance) {
218 if(apiVersion != API::instance->version) {
219 throw api_version_already_set();
220 }
221 else {
222 return API::instance;
223 }
224 }
225
226 if(apiVersion < 500 || apiVersion > FDB_API_VERSION) {
227 throw api_version_not_supported();
228 }
229
230 throw_on_error( fdb_select_api_version_impl(apiVersion, FDB_API_VERSION) );
231
232 API::instance = new API(apiVersion);
233 return API::instance;
234 }
235
isAPIVersionSelected()236 bool API::isAPIVersionSelected() {
237 return API::instance != NULL;
238 }
239
getInstance()240 API* API::getInstance() {
241 if(API::instance == NULL) {
242 throw api_version_unset();
243 }
244 else {
245 return API::instance;
246 }
247 }
248
setupNetwork()249 void API::setupNetwork() {
250 throw_on_error( fdb_setup_network() );
251 }
252
runNetwork()253 void API::runNetwork() {
254 throw_on_error( fdb_run_network() );
255 }
256
stopNetwork()257 void API::stopNetwork() {
258 throw_on_error( fdb_stop_network() );
259 }
260
evaluatePredicate(FDBErrorPredicate pred,Error const & e)261 bool API::evaluatePredicate(FDBErrorPredicate pred, Error const& e) {
262 return fdb_error_predicate( pred, e.code() );
263 }
264
createDatabase(std::string const & connFilename)265 Reference<Database> API::createDatabase(std::string const& connFilename) {
266 FDBDatabase *db;
267 throw_on_error(fdb_create_database(connFilename.c_str(), &db));
268 return Reference<Database>(new DatabaseImpl(db));
269 }
270
getAPIVersion() const271 int API::getAPIVersion() const {
272 return version;
273 }
274
createTransaction()275 Reference<Transaction> DatabaseImpl::createTransaction() {
276 return Reference<Transaction>(new TransactionImpl(db));
277 }
278
setDatabaseOption(FDBDatabaseOption option,Optional<StringRef> value)279 void DatabaseImpl::setDatabaseOption(FDBDatabaseOption option, Optional<StringRef> value) {
280 if (value.present())
281 throw_on_error(fdb_database_set_option(db, option, value.get().begin(), value.get().size()));
282 else
283 throw_on_error(fdb_database_set_option(db, option, NULL, 0));
284 }
285
TransactionImpl(FDBDatabase * db)286 TransactionImpl::TransactionImpl(FDBDatabase* db) {
287 throw_on_error(fdb_database_create_transaction(db, &tr));
288 }
289
setReadVersion(Version v)290 void TransactionImpl::setReadVersion(Version v) {
291 fdb_transaction_set_read_version( tr, v );
292 }
293
getReadVersion()294 Future<Version> TransactionImpl::getReadVersion() {
295 return backToFuture<Version>( fdb_transaction_get_read_version( tr ), [](Reference<CFuture> f){
296 Version value;
297
298 throw_on_error( fdb_future_get_version( f->f, &value ) );
299
300 return value;
301 } );
302 }
303
get(const Key & key,bool snapshot)304 Future<Optional<FDBStandalone<ValueRef>>> TransactionImpl::get(const Key& key, bool snapshot) {
305 return backToFuture< Optional<FDBStandalone<ValueRef>> >( fdb_transaction_get( tr, key.begin(), key.size(), snapshot ), [](Reference<CFuture> f) {
306 fdb_bool_t present;
307 uint8_t const* value;
308 int value_length;
309
310 throw_on_error( fdb_future_get_value( f->f, &present, &value, &value_length ) );
311
312 if ( present ) {
313 return Optional<FDBStandalone<ValueRef>>( FDBStandalone<ValueRef>( f, ValueRef( value, value_length ) ) );
314 } else {
315 return Optional<FDBStandalone<ValueRef>>();
316 }
317 } );
318 }
319
watch(const Key & key)320 Future<Void> TransactionImpl::watch(const Key& key) {
321 return backToFuture< Void >( fdb_transaction_watch( tr, key.begin(), key.size() ), [](Reference<CFuture> f) {
322 throw_on_error( fdb_future_get_error( f->f ) );
323 return Void();
324 } );
325 }
326
getKey(const KeySelector & key,bool snapshot)327 Future<FDBStandalone<KeyRef>> TransactionImpl::getKey(const KeySelector& key, bool snapshot) {
328 return backToFuture< FDBStandalone<KeyRef> >( fdb_transaction_get_key( tr, key.key.begin(), key.key.size(), key.orEqual, key.offset, snapshot ), [](Reference<CFuture> f) {
329 uint8_t const* key;
330 int key_length;
331
332 throw_on_error( fdb_future_get_key( f->f, &key, &key_length ) );
333
334 return FDBStandalone<KeyRef>( f, KeyRef( key, key_length ) );
335 } );
336 }
337
getRange(const KeySelector & begin,const KeySelector & end,GetRangeLimits limits,bool snapshot,bool reverse,FDBStreamingMode streamingMode)338 Future<FDBStandalone<RangeResultRef>> TransactionImpl::getRange(const KeySelector& begin, const KeySelector& end, GetRangeLimits limits, bool snapshot, bool reverse, FDBStreamingMode streamingMode) {
339 // FIXME: iteration
340 return backToFuture< FDBStandalone<RangeResultRef> >( fdb_transaction_get_range( tr, begin.key.begin(), begin.key.size(), begin.orEqual, begin.offset, end.key.begin(), end.key.size(), end.orEqual, end.offset, limits.rows, limits.bytes, streamingMode, 1, snapshot, reverse ), [](Reference<CFuture> f) {
341 FDBKeyValue const* kv;
342 int count;
343 fdb_bool_t more;
344
345 throw_on_error( fdb_future_get_keyvalue_array( f->f, &kv, &count, &more ) );
346
347 return FDBStandalone<RangeResultRef>( f, RangeResultRef( VectorRef<KeyValueRef>( (KeyValueRef*)kv, count ), more ) );
348 } );
349 }
350
addReadConflictRange(KeyRangeRef const & keys)351 void TransactionImpl::addReadConflictRange(KeyRangeRef const& keys) {
352 throw_on_error( fdb_transaction_add_conflict_range( tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_READ ) );
353 }
354
addReadConflictKey(KeyRef const & key)355 void TransactionImpl::addReadConflictKey(KeyRef const& key) {
356 return addReadConflictRange(KeyRange(KeyRangeRef(key, keyAfter(key))));
357 }
358
addWriteConflictRange(KeyRangeRef const & keys)359 void TransactionImpl::addWriteConflictRange(KeyRangeRef const& keys) {
360 throw_on_error( fdb_transaction_add_conflict_range( tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDB_CONFLICT_RANGE_TYPE_WRITE ) );
361 }
362
addWriteConflictKey(KeyRef const & key)363 void TransactionImpl::addWriteConflictKey(KeyRef const& key) {
364 return addWriteConflictRange(KeyRange(KeyRangeRef(key, keyAfter(key))));
365 }
366
atomicOp(const KeyRef & key,const ValueRef & operand,FDBMutationType operationType)367 void TransactionImpl::atomicOp(const KeyRef& key, const ValueRef& operand, FDBMutationType operationType) {
368 fdb_transaction_atomic_op( tr, key.begin(), key.size(), operand.begin(), operand.size(), operationType );
369 }
370
set(const KeyRef & key,const ValueRef & value)371 void TransactionImpl::set(const KeyRef& key, const ValueRef& value) {
372 fdb_transaction_set( tr, key.begin(), key.size(), value.begin(), value.size() );
373 }
374
clear(const KeyRangeRef & range)375 void TransactionImpl::clear(const KeyRangeRef& range) {
376 fdb_transaction_clear_range( tr, range.begin.begin(), range.begin.size(), range.end.begin(), range.end.size() );
377 }
378
clear(const KeyRef & key)379 void TransactionImpl::clear(const KeyRef& key) {
380 fdb_transaction_clear( tr, key.begin(), key.size() );
381 }
382
commit()383 Future<Void> TransactionImpl::commit() {
384 return backToFuture< Void >( fdb_transaction_commit( tr ), [](Reference<CFuture> f) {
385 throw_on_error( fdb_future_get_error( f->f ) );
386 return Void();
387 } );
388 }
389
getCommittedVersion()390 Version TransactionImpl::getCommittedVersion() {
391 Version v;
392
393 throw_on_error( fdb_transaction_get_committed_version( tr, &v ) );
394 return v;
395 }
396
getVersionstamp()397 Future<FDBStandalone<StringRef>> TransactionImpl::getVersionstamp() {
398 return backToFuture<FDBStandalone<KeyRef>>(fdb_transaction_get_versionstamp(tr), [](Reference<CFuture> f) {
399 uint8_t const* key;
400 int key_length;
401
402 throw_on_error( fdb_future_get_key( f->f, &key, &key_length ) );
403
404 return FDBStandalone<StringRef>( f, StringRef( key, key_length ) );
405 });
406 }
407
setOption(FDBTransactionOption option,Optional<StringRef> value)408 void TransactionImpl::setOption(FDBTransactionOption option, Optional<StringRef> value) {
409 if ( value.present() ) {
410 throw_on_error( fdb_transaction_set_option( tr, option, value.get().begin(), value.get().size() ) );
411 } else {
412 throw_on_error( fdb_transaction_set_option( tr, option, NULL, 0 ) );
413 }
414 }
415
onError(Error const & e)416 Future<Void> TransactionImpl::onError(Error const& e) {
417 return backToFuture< Void >( fdb_transaction_on_error( tr, e.code() ), [](Reference<CFuture> f) {
418 throw_on_error( fdb_future_get_error( f->f ) );
419 return Void();
420 } );
421 }
422
cancel()423 void TransactionImpl::cancel() {
424 fdb_transaction_cancel( tr );
425 }
426
reset()427 void TransactionImpl::reset() {
428 fdb_transaction_reset( tr );
429 }
430
printable(const StringRef & val)431 std::string printable( const StringRef& val ) {
432 std::string s;
433 for(int i=0; i<val.size(); i++) {
434 uint8_t b = val[i];
435 if (b >= 32 && b < 127 && b != '\\') s += (char)b;
436 else if (b == '\\') s += "\\\\";
437 else s += format("\\x%02x", b);
438 }
439 return s;
440 }
441
442 }
443