1 /* 2 * DatabaseContext.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef DatabaseContext_h 22 #define DatabaseContext_h 23 #pragma once 24 25 #include "fdbclient/NativeAPI.actor.h" 26 #include "fdbclient/KeyRangeMap.h" 27 #include "fdbclient/MasterProxyInterface.h" 28 #include "fdbclient/ClientDBInfo.h" 29 #include "fdbrpc/QueueModel.h" 30 #include "fdbrpc/MultiInterface.h" 31 #include "flow/TDMetric.actor.h" 32 #include "fdbclient/EventTypes.actor.h" 33 #include "fdbrpc/ContinuousSample.h" 34 35 class StorageServerInfo : public ReferencedInterface<StorageServerInterface> { 36 public: 37 static Reference<StorageServerInfo> getInterface( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality ); 38 void notifyContextDestroyed(); 39 40 virtual ~StorageServerInfo(); 41 private: 42 DatabaseContext *cx; StorageServerInfo(DatabaseContext * cx,StorageServerInterface const & interf,LocalityData const & locality)43 StorageServerInfo( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality ) : cx(cx), ReferencedInterface<StorageServerInterface>(interf, locality) {} 44 }; 45 46 typedef MultiInterface<ReferencedInterface<StorageServerInterface>> LocationInfo; 47 typedef MultiInterface<MasterProxyInterface> ProxyInfo; 48 49 class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable { 50 public: allocateOnForeignThread()51 static DatabaseContext* allocateOnForeignThread() { 52 return (DatabaseContext*)DatabaseContext::operator new(sizeof(DatabaseContext)); 53 } 54 55 // For internal (fdbserver) use only 56 static Database create( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality ); 57 static Database create( Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID=TaskDefaultEndpoint, bool lockAware=false, int apiVersion=Database::API_VERSION_LATEST ); 58 59 ~DatabaseContext(); 60 clone()61 Database clone() const { return Database(new DatabaseContext( cluster, clientInfo, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion )); } 62 63 pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false ); 64 bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse ); 65 Reference<LocationInfo> setCachedLocation( const KeyRangeRef&, const vector<struct StorageServerInterface>& ); 66 void invalidateCache( const KeyRef&, bool isBackward = false ); 67 void invalidateCache( const KeyRangeRef& ); 68 69 Reference<ProxyInfo> getMasterProxies(bool useProvisionalProxies); 70 Future<Reference<ProxyInfo>> getMasterProxiesFuture(bool useProvisionalProxies); 71 Future<Void> onMasterProxiesChanged(); 72 Future<HealthMetrics> getHealthMetrics(bool detailed); 73 74 // Update the watch counter for the database 75 void addWatch(); 76 void removeWatch(); 77 78 void setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value ); 79 80 Error deferredError; 81 bool lockAware; 82 isError()83 bool isError() { 84 return deferredError.code() != invalid_error_code; 85 } 86 checkDeferredError()87 void checkDeferredError() { 88 if(isError()) { 89 throw deferredError; 90 } 91 } 92 apiVersionAtLeast(int minVersion)93 int apiVersionAtLeast(int minVersion) { return apiVersion < 0 || apiVersion >= minVersion; } 94 95 Future<Void> onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The cluster file therefore is valid, but the database might be unavailable. 96 Reference<ClusterConnectionFile> getConnectionFile(); 97 98 //private: 99 explicit DatabaseContext( Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientDBInfo, 100 Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, int taskID, LocalityData const& clientLocality, 101 bool enableLocalityLoadBalance, bool lockAware, int apiVersion = Database::API_VERSION_LATEST ); 102 103 explicit DatabaseContext( const Error &err ); 104 105 // Key DB-specific information 106 AsyncTrigger masterProxiesChangeTrigger; 107 Future<Void> monitorMasterProxiesInfoChange; 108 Reference<ProxyInfo> masterProxies; 109 bool provisional; 110 UID masterProxiesLastChange; 111 LocalityData clientLocality; 112 QueueModel queueModel; 113 bool enableLocalityLoadBalance; 114 115 // Transaction start request batching 116 struct VersionBatcher { 117 PromiseStream< std::pair< Promise<GetReadVersionReply>, Optional<UID> > > stream; 118 Future<Void> actor; 119 }; 120 std::map<uint32_t, VersionBatcher> versionBatcher; 121 122 // Client status updater 123 struct ClientStatusUpdater { 124 std::vector< std::pair<std::string, BinaryWriter> > inStatusQ; 125 std::vector< std::pair<std::string, BinaryWriter> > outStatusQ; 126 Future<Void> actor; 127 }; 128 ClientStatusUpdater clientStatusUpdater; 129 130 // Cache of location information 131 int locationCacheSize; 132 CoalescedKeyRangeMap< Reference<LocationInfo> > locationCache; 133 134 std::map< UID, StorageServerInfo* > server_interf; 135 136 Standalone<StringRef> dbId; 137 138 int64_t transactionReadVersions; 139 int64_t transactionLogicalReads; 140 int64_t transactionPhysicalReads; 141 int64_t transactionCommittedMutations; 142 int64_t transactionCommittedMutationBytes; 143 int64_t transactionsCommitStarted; 144 int64_t transactionsCommitCompleted; 145 int64_t transactionsTooOld; 146 int64_t transactionsFutureVersions; 147 int64_t transactionsNotCommitted; 148 int64_t transactionsMaybeCommitted; 149 int64_t transactionsResourceConstrained; 150 int64_t transactionsProcessBehind; 151 ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit; 152 153 int outstandingWatches; 154 int maxOutstandingWatches; 155 156 double transactionTimeout; 157 int transactionMaxRetries; 158 double transactionMaxBackoff; 159 int snapshotRywEnabled; 160 161 Future<Void> logger; 162 163 int taskID; 164 165 Int64MetricHandle getValueSubmitted; 166 EventMetricHandle<GetValueComplete> getValueCompleted; 167 168 Reference<AsyncVar<ClientDBInfo>> clientInfo; 169 Future<Void> clientInfoMonitor; 170 171 Reference<Cluster> cluster; 172 173 int apiVersion; 174 175 int mvCacheInsertLocation; 176 std::vector<std::pair<Version, Optional<Value>>> metadataVersionCache; 177 178 HealthMetrics healthMetrics; 179 double healthMetricsLastUpdated; 180 double detailedHealthMetricsLastUpdated; 181 }; 182 183 #endif 184