1 /*
2  * DatabaseContext.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef DatabaseContext_h
22 #define DatabaseContext_h
23 #pragma once
24 
25 #include "fdbclient/NativeAPI.actor.h"
26 #include "fdbclient/KeyRangeMap.h"
27 #include "fdbclient/MasterProxyInterface.h"
28 #include "fdbclient/ClientDBInfo.h"
29 #include "fdbrpc/QueueModel.h"
30 #include "fdbrpc/MultiInterface.h"
31 #include "flow/TDMetric.actor.h"
32 #include "fdbclient/EventTypes.actor.h"
33 #include "fdbrpc/ContinuousSample.h"
34 
35 class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
36 public:
37 	static Reference<StorageServerInfo> getInterface( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality );
38 	void notifyContextDestroyed();
39 
40 	virtual ~StorageServerInfo();
41 private:
42 	DatabaseContext *cx;
StorageServerInfo(DatabaseContext * cx,StorageServerInterface const & interf,LocalityData const & locality)43 	StorageServerInfo( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality ) : cx(cx), ReferencedInterface<StorageServerInterface>(interf, locality) {}
44 };
45 
46 typedef MultiInterface<ReferencedInterface<StorageServerInterface>> LocationInfo;
47 typedef MultiInterface<MasterProxyInterface> ProxyInfo;
48 
49 class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
50 public:
allocateOnForeignThread()51 	static DatabaseContext* allocateOnForeignThread() {
52 		return (DatabaseContext*)DatabaseContext::operator new(sizeof(DatabaseContext));
53 	}
54 
55 	// For internal (fdbserver) use only
56 	static Database create( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality );
57 	static Database create( Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID=TaskDefaultEndpoint, bool lockAware=false, int apiVersion=Database::API_VERSION_LATEST );
58 
59 	~DatabaseContext();
60 
clone()61 	Database clone() const { return Database(new DatabaseContext( cluster, clientInfo, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion )); }
62 
63 	pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false );
64 	bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse );
65 	Reference<LocationInfo> setCachedLocation( const KeyRangeRef&, const vector<struct StorageServerInterface>& );
66 	void invalidateCache( const KeyRef&, bool isBackward = false );
67 	void invalidateCache( const KeyRangeRef& );
68 
69 	Reference<ProxyInfo> getMasterProxies(bool useProvisionalProxies);
70 	Future<Reference<ProxyInfo>> getMasterProxiesFuture(bool useProvisionalProxies);
71 	Future<Void> onMasterProxiesChanged();
72 	Future<HealthMetrics> getHealthMetrics(bool detailed);
73 
74 	// Update the watch counter for the database
75 	void addWatch();
76 	void removeWatch();
77 
78 	void setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value );
79 
80 	Error deferredError;
81 	bool lockAware;
82 
isError()83 	bool isError() {
84 		return deferredError.code() != invalid_error_code;
85 	}
86 
checkDeferredError()87 	void checkDeferredError() {
88 		if(isError()) {
89 			throw deferredError;
90 		}
91 	}
92 
apiVersionAtLeast(int minVersion)93 	int apiVersionAtLeast(int minVersion) { return apiVersion < 0 || apiVersion >= minVersion; }
94 
95 	Future<Void> onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The cluster file therefore is valid, but the database might be unavailable.
96 	Reference<ClusterConnectionFile> getConnectionFile();
97 
98 //private:
99 	explicit DatabaseContext( Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
100 		Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, int taskID, LocalityData const& clientLocality,
101 		bool enableLocalityLoadBalance, bool lockAware, int apiVersion = Database::API_VERSION_LATEST );
102 
103 	explicit DatabaseContext( const Error &err );
104 
105 	// Key DB-specific information
106 	AsyncTrigger masterProxiesChangeTrigger;
107 	Future<Void> monitorMasterProxiesInfoChange;
108 	Reference<ProxyInfo> masterProxies;
109 	bool provisional;
110 	UID masterProxiesLastChange;
111 	LocalityData clientLocality;
112 	QueueModel queueModel;
113 	bool enableLocalityLoadBalance;
114 
115 	// Transaction start request batching
116 	struct VersionBatcher {
117 		PromiseStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > stream;
118 		Future<Void> actor;
119 	};
120 	std::map<uint32_t, VersionBatcher> versionBatcher;
121 
122 	// Client status updater
123 	struct ClientStatusUpdater {
124 		std::vector< std::pair<std::string, BinaryWriter> > inStatusQ;
125 		std::vector< std::pair<std::string, BinaryWriter> > outStatusQ;
126 		Future<Void> actor;
127 	};
128 	ClientStatusUpdater clientStatusUpdater;
129 
130 	// Cache of location information
131 	int locationCacheSize;
132 	CoalescedKeyRangeMap< Reference<LocationInfo> > locationCache;
133 
134 	std::map< UID, StorageServerInfo* > server_interf;
135 
136 	Standalone<StringRef> dbId;
137 
138 	int64_t transactionReadVersions;
139 	int64_t transactionLogicalReads;
140 	int64_t transactionPhysicalReads;
141 	int64_t transactionCommittedMutations;
142 	int64_t transactionCommittedMutationBytes;
143 	int64_t transactionsCommitStarted;
144 	int64_t transactionsCommitCompleted;
145 	int64_t transactionsTooOld;
146 	int64_t transactionsFutureVersions;
147 	int64_t transactionsNotCommitted;
148 	int64_t transactionsMaybeCommitted;
149 	int64_t transactionsResourceConstrained;
150 	int64_t transactionsProcessBehind;
151 	ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit;
152 
153 	int outstandingWatches;
154 	int maxOutstandingWatches;
155 
156 	double transactionTimeout;
157 	int transactionMaxRetries;
158 	double transactionMaxBackoff;
159 	int snapshotRywEnabled;
160 
161 	Future<Void> logger;
162 
163 	int taskID;
164 
165 	Int64MetricHandle getValueSubmitted;
166 	EventMetricHandle<GetValueComplete> getValueCompleted;
167 
168 	Reference<AsyncVar<ClientDBInfo>> clientInfo;
169 	Future<Void> clientInfoMonitor;
170 
171 	Reference<Cluster> cluster;
172 
173 	int apiVersion;
174 
175 	int mvCacheInsertLocation;
176 	std::vector<std::pair<Version, Optional<Value>>> metadataVersionCache;
177 
178 	HealthMetrics healthMetrics;
179 	double healthMetricsLastUpdated;
180 	double detailedHealthMetricsLastUpdated;
181 };
182 
183 #endif
184