1 /* 2 * NativeAPI.actor.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #pragma once 22 #if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H) 23 #define FDBCLIENT_NATIVEAPI_ACTOR_G_H 24 #include "fdbclient/NativeAPI.actor.g.h" 25 #elif !defined(FDBCLIENT_NATIVEAPI_ACTOR_H) 26 #define FDBCLIENT_NATIVEAPI_ACTOR_H 27 28 29 #include "flow/flow.h" 30 #include "flow/TDMetric.actor.h" 31 #include "fdbclient/FDBTypes.h" 32 #include "fdbclient/MasterProxyInterface.h" 33 #include "fdbclient/FDBOptions.g.h" 34 #include "fdbclient/CoordinationInterface.h" 35 #include "fdbclient/ClusterInterface.h" 36 #include "fdbclient/ClientLogEvents.h" 37 #include "flow/actorcompiler.h" // has to be last include 38 39 // Incomplete types that are reference counted 40 class DatabaseContext; 41 template <> void addref( DatabaseContext* ptr ); 42 template <> void delref( DatabaseContext* ptr ); 43 44 void validateOptionValue(Optional<StringRef> value, bool shouldBePresent); 45 46 void enableClientInfoLogging(); 47 48 struct NetworkOptions { 49 std::string localAddress; 50 std::string clusterFile; 51 Optional<std::string> traceDirectory; 52 uint64_t traceRollSize; 53 uint64_t traceMaxLogsSize; 54 std::string traceLogGroup; 55 std::string traceFormat; 56 Optional<bool> logClientInfo; 57 Standalone<VectorRef<ClientVersionRef>> supportedVersions; 58 bool slowTaskProfilingEnabled; 59 60 // The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h. NetworkOptionsNetworkOptions61 NetworkOptions() 62 : localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()), 63 traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"), 64 traceFormat("xml"), slowTaskProfilingEnabled(false) {} 65 }; 66 67 class Database { 68 public: 69 enum { API_VERSION_LATEST = -1 }; 70 71 static Database createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality=LocalityData(), DatabaseContext *preallocatedDb=nullptr ); 72 static Database createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality=LocalityData() ); 73 Database()74 Database() {} // an uninitialized database can be destructed or reassigned safely; that's it 75 void operator= ( Database const& rhs ) { db = rhs.db; } Database(Database const & rhs)76 Database( Database const& rhs ) : db(rhs.db) {} Database(Database && r)77 Database(Database&& r) BOOST_NOEXCEPT : db(std::move(r.db)) {} 78 void operator= (Database&& r) BOOST_NOEXCEPT { db = std::move(r.db); } 79 80 // For internal use by the native client: Database(Reference<DatabaseContext> cx)81 explicit Database(Reference<DatabaseContext> cx) : db(cx) {} Database(DatabaseContext * cx)82 explicit Database( DatabaseContext* cx ) : db(cx) {} getPtr()83 inline DatabaseContext* getPtr() const { return db.getPtr(); } extractPtr()84 inline DatabaseContext* extractPtr() { return db.extractPtr(); } 85 DatabaseContext* operator->() const { return db.getPtr(); } 86 87 private: 88 Reference<DatabaseContext> db; 89 }; 90 91 void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ); 92 93 // Configures the global networking machinery 94 void setupNetwork(uint64_t transportId = 0, bool useMetrics = false); 95 96 // This call blocks while the network is running. To use the API in a single-threaded 97 // environment, the calling program must have ACTORs already launched that are waiting 98 // to use the network. In this case, the program can terminate by calling stopNetwork() 99 // from a callback, thereby releasing this call to return. In a multithreaded setup 100 // this call can be called from a dedicated "networking" thread. All the network-based 101 // callbacks will happen on this second thread. When a program is finished, the 102 // call stopNetwork (from a non-networking thread) can cause the runNetwork() call to 103 // return. 104 // 105 // Throws network_already_setup if g_network has already been initalized 106 void runNetwork(); 107 108 // See above. Can be called from a thread that is not the "networking thread" 109 // 110 // Throws network_not_setup if g_network has not been initalized 111 void stopNetwork(); 112 113 /* 114 * Starts and holds the monitorLeader and failureMonitorClient actors 115 */ 116 class Cluster : public ReferenceCounted<Cluster>, NonCopyable { 117 public: 118 Cluster(Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST); 119 Cluster(Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface, Reference<AsyncVar<int>> connectedCoordinatorsNum); 120 121 ~Cluster(); 122 123 Reference<AsyncVar<Optional<struct ClusterInterface>>> getClusterInterface(); getConnectionFile()124 Reference<ClusterConnectionFile> getConnectionFile() { return connectionFile; } 125 126 Future<Void> onConnected(); 127 128 private: 129 void init(Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST); 130 131 Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface; 132 Reference<ClusterConnectionFile> connectionFile; 133 134 Future<Void> leaderMon; 135 Future<Void> failMon; 136 Future<Void> connected; 137 }; 138 139 struct StorageMetrics; 140 141 struct TransactionOptions { 142 double maxBackoff; 143 uint32_t getReadVersionFlags; 144 uint32_t customTransactionSizeLimit; 145 bool checkWritesEnabled : 1; 146 bool causalWriteRisky : 1; 147 bool commitOnFirstProxy : 1; 148 bool debugDump : 1; 149 bool lockAware : 1; 150 bool readOnly : 1; 151 bool firstInBatch : 1; 152 153 TransactionOptions(Database const& cx); 154 TransactionOptions(); 155 156 void reset(Database const& cx); 157 }; 158 159 struct TransactionInfo { 160 Optional<UID> debugID; 161 int taskID; 162 bool useProvisionalProxies; 163 TransactionInfoTransactionInfo164 explicit TransactionInfo( int taskID ) : taskID(taskID), useProvisionalProxies(false) {} 165 }; 166 167 struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable { 168 enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 }; 169 TransactionLogInfoTransactionLogInfo170 TransactionLogInfo() : logLocation(DONT_LOG) {} TransactionLogInfoTransactionLogInfo171 TransactionLogInfo(LoggingLocation location) : logLocation(location) {} TransactionLogInfoTransactionLogInfo172 TransactionLogInfo(std::string id, LoggingLocation location) : logLocation(location), identifier(id) {} 173 setIdentifierTransactionLogInfo174 void setIdentifier(std::string id) { identifier = id; } logToTransactionLogInfo175 void logTo(LoggingLocation loc) { logLocation = logLocation | loc; } 176 template <typename T> addLogTransactionLogInfo177 void addLog(const T& event) { 178 if(logLocation & TRACE_LOG) { 179 ASSERT(!identifier.empty()) 180 event.logEvent(identifier); 181 } 182 183 if (flushed) { 184 return; 185 } 186 187 if(logLocation & DATABASE) { 188 logsAdded = true; 189 static_assert(std::is_base_of<FdbClientLogEvents::Event, T>::value, "Event should be derived class of FdbClientLogEvents::Event"); 190 trLogWriter << event; 191 } 192 } 193 194 BinaryWriter trLogWriter{ IncludeVersion() }; 195 bool logsAdded{ false }; 196 bool flushed{ false }; 197 int logLocation; 198 std::string identifier; 199 }; 200 201 struct Watch : public ReferenceCounted<Watch>, NonCopyable { 202 Key key; 203 Optional<Value> value; 204 bool valuePresent; 205 Optional<Value> setValue; 206 bool setPresent; 207 Promise<Void> onChangeTrigger; 208 Promise<Void> onSetWatchTrigger; 209 Future<Void> watchFuture; 210 WatchWatch211 Watch() : watchFuture(Never()), valuePresent(false), setPresent(false) { } WatchWatch212 Watch(Key key) : key(key), watchFuture(Never()), valuePresent(false), setPresent(false) { } WatchWatch213 Watch(Key key, Optional<Value> val) : key(key), value(val), watchFuture(Never()), valuePresent(true), setPresent(false) { } 214 215 void setWatch(Future<Void> watchFuture); 216 }; 217 218 class Transaction : NonCopyable { 219 public: 220 explicit Transaction( Database const& cx ); 221 ~Transaction(); 222 preinitializeOnForeignThread()223 void preinitializeOnForeignThread() { 224 committedVersion = invalidVersion; 225 } 226 227 void setVersion( Version v ); getReadVersion()228 Future<Version> getReadVersion() { return getReadVersion(0); } 229 230 Future< Optional<Value> > get( const Key& key, bool snapshot = false ); 231 Future< Void > watch( Reference<Watch> watch ); 232 Future< Key > getKey( const KeySelector& key, bool snapshot = false ); 233 //Future< Optional<KeyValue> > get( const KeySelectorRef& key ); 234 Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, int limit, bool snapshot = false, bool reverse = false ); 235 Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false ); 236 Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, int limit, bool snapshot = false, bool reverse = false ) { 237 return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ), 238 KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limit, snapshot, reverse ); 239 } 240 Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) { 241 return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ), 242 KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limits, snapshot, reverse ); 243 } 244 245 Future< Standalone<VectorRef< const char*>>> getAddressesForKey (const Key& key ); 246 247 void enableCheckWrites(); 248 void addReadConflictRange( KeyRangeRef const& keys ); 249 void addWriteConflictRange( KeyRangeRef const& keys ); 250 void makeSelfConflicting(); 251 252 Future< Void > warmRange( Database cx, KeyRange keys ); 253 254 Future< StorageMetrics > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit ); 255 Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit ); 256 Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated ); 257 258 // If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key 259 void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true ); 260 void atomicOp( const KeyRef& key, const ValueRef& value, MutationRef::Type operationType, bool addConflictRange = true ); 261 void clear( const KeyRangeRef& range, bool addConflictRange = true ); 262 void clear( const KeyRef& key, bool addConflictRange = true ); 263 Future<Void> commit(); // Throws not_committed or commit_unknown_result errors in normal operation 264 265 void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ); 266 getCommittedVersion()267 Version getCommittedVersion() { return committedVersion; } // May be called only after commit() returns success 268 Future<Standalone<StringRef>> getVersionstamp(); // Will be fulfilled only after commit() returns success 269 270 Promise<Standalone<StringRef>> versionstampPromise; 271 272 Future<Void> onError( Error const& e ); 273 void flushTrLogsIfEnabled(); 274 275 // These are to permit use as state variables in actors: Transaction()276 Transaction() : info( TaskDefaultEndpoint ) {} 277 void operator=(Transaction&& r) BOOST_NOEXCEPT; 278 279 void reset(); 280 void fullReset(); 281 double getBackoff(int errCode); debugTransaction(UID dID)282 void debugTransaction(UID dID) { info.debugID = dID; } 283 284 Future<Void> commitMutations(); 285 void setupWatches(); 286 void cancelWatches(Error const& e = transaction_cancelled()); 287 288 TransactionInfo info; 289 int numErrors; 290 291 std::vector<Reference<Watch>> watches; 292 293 int apiVersionAtLeast(int minVersion) const; 294 295 void checkDeferredError(); 296 getDatabase()297 Database getDatabase() const { 298 return cx; 299 } 300 static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx); 301 TransactionOptions options; 302 double startTime; 303 Reference<TransactionLogInfo> trLogInfo; 304 private: 305 Future<Version> getReadVersion(uint32_t flags); 306 void setPriority(uint32_t priorityFlag); 307 308 Database cx; 309 310 double backoff; 311 Version committedVersion; 312 CommitTransactionRequest tr; 313 Future<Version> readVersion; 314 Promise<Optional<Value>> metadataVersion; 315 vector<Future<std::pair<Key, Key>>> extraConflictRanges; 316 Promise<Void> commitResult; 317 Future<Void> committing; 318 }; 319 320 ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version); 321 322 std::string unprintable( const std::string& ); 323 324 int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::numeric_limits<int64_t>::min(), int64_t maxValue = std::numeric_limits<int64_t>::max() ); 325 326 #include "flow/unactorcompiler.h" 327 #endif 328