1 /*
2  * NativeAPI.actor.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #pragma once
22 #if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H)
23 	#define FDBCLIENT_NATIVEAPI_ACTOR_G_H
24 	#include "fdbclient/NativeAPI.actor.g.h"
25 #elif !defined(FDBCLIENT_NATIVEAPI_ACTOR_H)
26 	#define FDBCLIENT_NATIVEAPI_ACTOR_H
27 
28 
29 #include "flow/flow.h"
30 #include "flow/TDMetric.actor.h"
31 #include "fdbclient/FDBTypes.h"
32 #include "fdbclient/MasterProxyInterface.h"
33 #include "fdbclient/FDBOptions.g.h"
34 #include "fdbclient/CoordinationInterface.h"
35 #include "fdbclient/ClusterInterface.h"
36 #include "fdbclient/ClientLogEvents.h"
37 #include "flow/actorcompiler.h" // has to be last include
38 
39 // Incomplete types that are reference counted
40 class DatabaseContext;
41 template <> void addref( DatabaseContext* ptr );
42 template <> void delref( DatabaseContext* ptr );
43 
44 void validateOptionValue(Optional<StringRef> value, bool shouldBePresent);
45 
46 void enableClientInfoLogging();
47 
48 struct NetworkOptions {
49 	std::string localAddress;
50 	std::string clusterFile;
51 	Optional<std::string> traceDirectory;
52 	uint64_t traceRollSize;
53 	uint64_t traceMaxLogsSize;
54 	std::string traceLogGroup;
55 	std::string traceFormat;
56 	Optional<bool> logClientInfo;
57 	Standalone<VectorRef<ClientVersionRef>> supportedVersions;
58 	bool slowTaskProfilingEnabled;
59 
60 	// The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h.
NetworkOptionsNetworkOptions61 	NetworkOptions()
62 	  : localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
63 	    traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
64 	    traceFormat("xml"), slowTaskProfilingEnabled(false) {}
65 };
66 
67 class Database {
68 public:
69 	enum { API_VERSION_LATEST = -1 };
70 
71 	static Database createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality=LocalityData(), DatabaseContext *preallocatedDb=nullptr );
72 	static Database createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality=LocalityData() );
73 
Database()74 	Database() {}  // an uninitialized database can be destructed or reassigned safely; that's it
75 	void operator= ( Database const& rhs ) { db = rhs.db; }
Database(Database const & rhs)76 	Database( Database const& rhs ) : db(rhs.db) {}
Database(Database && r)77 	Database(Database&& r) BOOST_NOEXCEPT : db(std::move(r.db)) {}
78 	void operator= (Database&& r) BOOST_NOEXCEPT { db = std::move(r.db); }
79 
80 	// For internal use by the native client:
Database(Reference<DatabaseContext> cx)81 	explicit Database(Reference<DatabaseContext> cx) : db(cx) {}
Database(DatabaseContext * cx)82 	explicit Database( DatabaseContext* cx ) : db(cx) {}
getPtr()83 	inline DatabaseContext* getPtr() const { return db.getPtr(); }
extractPtr()84 	inline DatabaseContext* extractPtr() { return db.extractPtr(); }
85 	DatabaseContext* operator->() const { return db.getPtr(); }
86 
87 private:
88 	Reference<DatabaseContext> db;
89 };
90 
91 void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
92 
93 // Configures the global networking machinery
94 void setupNetwork(uint64_t transportId = 0, bool useMetrics = false);
95 
96 // This call blocks while the network is running.  To use the API in a single-threaded
97 //  environment, the calling program must have ACTORs already launched that are waiting
98 //  to use the network.  In this case, the program can terminate by calling stopNetwork()
99 //  from a callback, thereby releasing this call to return.  In a multithreaded setup
100 //  this call can be called from a dedicated "networking" thread.  All the network-based
101 //  callbacks will happen on this second thread.  When a program is finished, the
102 //  call stopNetwork (from a non-networking thread) can cause the runNetwork() call to
103 //  return.
104 //
105 // Throws network_already_setup if g_network has already been initalized
106 void runNetwork();
107 
108 // See above.  Can be called from a thread that is not the "networking thread"
109 //
110 // Throws network_not_setup if g_network has not been initalized
111 void stopNetwork();
112 
113 /*
114  * Starts and holds the monitorLeader and failureMonitorClient actors
115  */
116 class Cluster : public ReferenceCounted<Cluster>, NonCopyable {
117 public:
118 	Cluster(Reference<ClusterConnectionFile> connFile,  Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST);
119 	Cluster(Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface, Reference<AsyncVar<int>> connectedCoordinatorsNum);
120 
121 	~Cluster();
122 
123 	Reference<AsyncVar<Optional<struct ClusterInterface>>> getClusterInterface();
getConnectionFile()124 	Reference<ClusterConnectionFile> getConnectionFile() { return connectionFile; }
125 
126 	Future<Void> onConnected();
127 
128 private:
129 	void init(Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST);
130 
131 	Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface;
132 	Reference<ClusterConnectionFile> connectionFile;
133 
134 	Future<Void> leaderMon;
135 	Future<Void> failMon;
136 	Future<Void> connected;
137 };
138 
139 struct StorageMetrics;
140 
141 struct TransactionOptions {
142 	double maxBackoff;
143 	uint32_t getReadVersionFlags;
144 	uint32_t customTransactionSizeLimit;
145 	bool checkWritesEnabled : 1;
146 	bool causalWriteRisky : 1;
147 	bool commitOnFirstProxy : 1;
148 	bool debugDump : 1;
149 	bool lockAware : 1;
150 	bool readOnly : 1;
151 	bool firstInBatch : 1;
152 
153 	TransactionOptions(Database const& cx);
154 	TransactionOptions();
155 
156 	void reset(Database const& cx);
157 };
158 
159 struct TransactionInfo {
160 	Optional<UID> debugID;
161 	int taskID;
162 	bool useProvisionalProxies;
163 
TransactionInfoTransactionInfo164 	explicit TransactionInfo( int taskID ) : taskID(taskID), useProvisionalProxies(false) {}
165 };
166 
167 struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {
168 	enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 };
169 
TransactionLogInfoTransactionLogInfo170 	TransactionLogInfo() : logLocation(DONT_LOG) {}
TransactionLogInfoTransactionLogInfo171 	TransactionLogInfo(LoggingLocation location) : logLocation(location) {}
TransactionLogInfoTransactionLogInfo172 	TransactionLogInfo(std::string id, LoggingLocation location) : logLocation(location), identifier(id) {}
173 
setIdentifierTransactionLogInfo174 	void setIdentifier(std::string id) { identifier = id; }
logToTransactionLogInfo175 	void logTo(LoggingLocation loc) { logLocation = logLocation | loc; }
176 	template <typename T>
addLogTransactionLogInfo177 	void addLog(const T& event) {
178 		if(logLocation & TRACE_LOG) {
179 			ASSERT(!identifier.empty())
180 			event.logEvent(identifier);
181 		}
182 
183 		if (flushed) {
184 			return;
185 		}
186 
187 		if(logLocation & DATABASE) {
188 			logsAdded = true;
189 			static_assert(std::is_base_of<FdbClientLogEvents::Event, T>::value, "Event should be derived class of FdbClientLogEvents::Event");
190 			trLogWriter << event;
191 		}
192 	}
193 
194 	BinaryWriter trLogWriter{ IncludeVersion() };
195 	bool logsAdded{ false };
196 	bool flushed{ false };
197 	int logLocation;
198 	std::string identifier;
199 };
200 
201 struct Watch : public ReferenceCounted<Watch>, NonCopyable {
202 	Key key;
203 	Optional<Value> value;
204 	bool valuePresent;
205 	Optional<Value> setValue;
206 	bool setPresent;
207 	Promise<Void> onChangeTrigger;
208 	Promise<Void> onSetWatchTrigger;
209 	Future<Void> watchFuture;
210 
WatchWatch211 	Watch() : watchFuture(Never()), valuePresent(false), setPresent(false) { }
WatchWatch212 	Watch(Key key) : key(key), watchFuture(Never()), valuePresent(false), setPresent(false) { }
WatchWatch213 	Watch(Key key, Optional<Value> val) : key(key), value(val), watchFuture(Never()), valuePresent(true), setPresent(false) { }
214 
215 	void setWatch(Future<Void> watchFuture);
216 };
217 
218 class Transaction : NonCopyable {
219 public:
220 	explicit Transaction( Database const& cx );
221 	~Transaction();
222 
preinitializeOnForeignThread()223 	void preinitializeOnForeignThread() {
224 		committedVersion = invalidVersion;
225 	}
226 
227 	void setVersion( Version v );
getReadVersion()228 	Future<Version> getReadVersion() { return getReadVersion(0); }
229 
230 	Future< Optional<Value> > get( const Key& key, bool snapshot = false );
231 	Future< Void > watch( Reference<Watch> watch );
232 	Future< Key > getKey( const KeySelector& key, bool snapshot = false );
233 	//Future< Optional<KeyValue> > get( const KeySelectorRef& key );
234 	Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, int limit, bool snapshot = false, bool reverse = false );
235 	Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false );
236 	Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, int limit, bool snapshot = false, bool reverse = false ) {
237 		return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ),
238 			KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limit, snapshot, reverse );
239 	}
240 	Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) {
241 		return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ),
242 			KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limits, snapshot, reverse );
243 	}
244 
245 	Future< Standalone<VectorRef< const char*>>> getAddressesForKey (const Key& key );
246 
247 	void enableCheckWrites();
248 	void addReadConflictRange( KeyRangeRef const& keys );
249 	void addWriteConflictRange( KeyRangeRef const& keys );
250 	void makeSelfConflicting();
251 
252 	Future< Void > warmRange( Database cx, KeyRange keys );
253 
254 	Future< StorageMetrics > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit );
255 	Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit );
256 	Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated );
257 
258 	// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
259 	void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true );
260 	void atomicOp( const KeyRef& key, const ValueRef& value, MutationRef::Type operationType, bool addConflictRange = true );
261 	void clear( const KeyRangeRef& range, bool addConflictRange = true );
262 	void clear( const KeyRef& key, bool addConflictRange = true );
263 	Future<Void> commit(); // Throws not_committed or commit_unknown_result errors in normal operation
264 
265 	void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
266 
getCommittedVersion()267 	Version getCommittedVersion() { return committedVersion; }   // May be called only after commit() returns success
268 	Future<Standalone<StringRef>> getVersionstamp(); // Will be fulfilled only after commit() returns success
269 
270 	Promise<Standalone<StringRef>> versionstampPromise;
271 
272 	Future<Void> onError( Error const& e );
273 	void flushTrLogsIfEnabled();
274 
275 	// These are to permit use as state variables in actors:
Transaction()276 	Transaction() : info( TaskDefaultEndpoint ) {}
277 	void operator=(Transaction&& r) BOOST_NOEXCEPT;
278 
279 	void reset();
280 	void fullReset();
281 	double getBackoff(int errCode);
debugTransaction(UID dID)282 	void debugTransaction(UID dID) { info.debugID = dID; }
283 
284 	Future<Void> commitMutations();
285 	void setupWatches();
286 	void cancelWatches(Error const& e = transaction_cancelled());
287 
288 	TransactionInfo info;
289 	int numErrors;
290 
291 	std::vector<Reference<Watch>> watches;
292 
293 	int apiVersionAtLeast(int minVersion) const;
294 
295 	void checkDeferredError();
296 
getDatabase()297 	Database getDatabase() const {
298 		return cx;
299 	}
300 	static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx);
301 	TransactionOptions options;
302 	double startTime;
303 	Reference<TransactionLogInfo> trLogInfo;
304 private:
305 	Future<Version> getReadVersion(uint32_t flags);
306 	void setPriority(uint32_t priorityFlag);
307 
308 	Database cx;
309 
310 	double backoff;
311 	Version committedVersion;
312 	CommitTransactionRequest tr;
313 	Future<Version> readVersion;
314 	Promise<Optional<Value>> metadataVersion;
315 	vector<Future<std::pair<Key, Key>>> extraConflictRanges;
316 	Promise<Void> commitResult;
317 	Future<Void> committing;
318 };
319 
320 ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version);
321 
322 std::string unprintable( const std::string& );
323 
324 int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::numeric_limits<int64_t>::min(), int64_t maxValue = std::numeric_limits<int64_t>::max() );
325 
326 #include "flow/unactorcompiler.h"
327 #endif
328