1 /* 2 * ReadYourWrites.h 3 * 4 * This source file is part of the FoundationDB open source project 5 * 6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef FDBCLIENT_READYOURWRITES_H 22 #define FDBCLIENT_READYOURWRITES_H 23 #pragma once 24 25 #include "fdbclient/NativeAPI.actor.h" 26 #include "fdbclient/KeyRangeMap.h" 27 #include "fdbclient/RYWIterator.h" 28 #include <list> 29 30 //SOMEDAY: Optimize getKey to avoid using getRange 31 32 struct ReadYourWritesTransactionOptions { 33 bool readYourWritesDisabled : 1; 34 bool readAheadDisabled : 1; 35 bool readSystemKeys : 1; 36 bool writeSystemKeys : 1; 37 bool nextWriteDisableConflictRange : 1; 38 bool debugRetryLogging : 1; 39 bool disableUsedDuringCommitProtection : 1; 40 double timeoutInSeconds; 41 int maxRetries; 42 int snapshotRywEnabled; 43 ReadYourWritesTransactionOptionsReadYourWritesTransactionOptions44 ReadYourWritesTransactionOptions() {} 45 explicit ReadYourWritesTransactionOptions(Transaction const& tr); 46 void reset(Transaction const& tr); 47 void fullReset(Transaction const& tr); 48 bool getAndResetWriteConflictDisabled(); 49 }; 50 51 struct TransactionDebugInfo : public ReferenceCounted<TransactionDebugInfo> { 52 std::string transactionName; 53 double lastRetryLogTime; 54 TransactionDebugInfoTransactionDebugInfo55 TransactionDebugInfo() : transactionName(""), lastRetryLogTime() { } 56 }; 57 58 //Values returned by a ReadYourWritesTransaction will contain a reference to the transaction's arena. Therefore, keeping a reference to a value 59 //longer than its creating transaction would hold all of the memory generated by the transaction 60 class ReadYourWritesTransaction : NonCopyable, public ReferenceCounted<ReadYourWritesTransaction>, public FastAllocated<ReadYourWritesTransaction> { 61 public: allocateOnForeignThread()62 static ReadYourWritesTransaction* allocateOnForeignThread() { 63 ReadYourWritesTransaction *tr = (ReadYourWritesTransaction*)ReadYourWritesTransaction::operator new( sizeof(ReadYourWritesTransaction) ); 64 tr->tr.preinitializeOnForeignThread(); 65 return tr; 66 } 67 68 explicit ReadYourWritesTransaction( Database const& cx ); 69 ~ReadYourWritesTransaction(); 70 setVersion(Version v)71 void setVersion( Version v ) { tr.setVersion(v); } 72 Future<Version> getReadVersion(); 73 Future< Optional<Value> > get( const Key& key, bool snapshot = false ); 74 Future< Key > getKey( const KeySelector& key, bool snapshot = false ); 75 Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, int limit, bool snapshot = false, bool reverse = false ); 76 Future< Standalone<RangeResultRef> > getRange( KeySelector begin, KeySelector end, GetRangeLimits limits, bool snapshot = false, bool reverse = false ); 77 Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, int limit, bool snapshot = false, bool reverse = false ) { 78 return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ), 79 KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limit, snapshot, reverse ); 80 } 81 Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) { 82 return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ), 83 KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limits, snapshot, reverse ); 84 } 85 86 Future< Standalone<VectorRef<const char*>> > getAddressesForKey(const Key& key); 87 88 void addReadConflictRange( KeyRangeRef const& keys ); makeSelfConflicting()89 void makeSelfConflicting() { tr.makeSelfConflicting(); } 90 91 void atomicOp( const KeyRef& key, const ValueRef& operand, uint32_t operationType ); 92 void set( const KeyRef& key, const ValueRef& value ); 93 void clear( const KeyRangeRef& range ); 94 void clear( const KeyRef& key ); 95 96 Future<Void> watch(const Key& key); 97 98 void addWriteConflictRange( KeyRangeRef const& keys ); 99 100 Future<Void> commit(); getCommittedVersion()101 Version getCommittedVersion() { return tr.getCommittedVersion(); } 102 Future<Standalone<StringRef>> getVersionstamp(); 103 104 void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ); 105 106 Future<Void> onError( Error const& e ); 107 108 // These are to permit use as state variables in actors: ReadYourWritesTransaction()109 ReadYourWritesTransaction() : cache(&arena), writes(&arena) {} 110 void operator=(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT; 111 ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT; 112 addref()113 virtual void addref() { ReferenceCounted<ReadYourWritesTransaction>::addref(); } delref()114 virtual void delref() { ReferenceCounted<ReadYourWritesTransaction>::delref(); } 115 116 void cancel(); 117 void reset(); debugTransaction(UID dID)118 void debugTransaction(UID dID) { tr.debugTransaction(dID); } 119 debug_onIdle()120 Future<Void> debug_onIdle() { return reading; } 121 122 // Used by ThreadSafeTransaction for exceptions thrown in void methods 123 Error deferredError; 124 checkDeferredError()125 void checkDeferredError() { tr.checkDeferredError(); if (deferredError.code() != invalid_error_code) throw deferredError; } 126 127 void getWriteConflicts( KeyRangeMap<bool> *result ); 128 getDatabase()129 Database getDatabase() { 130 return tr.getDatabase(); 131 } 132 private: 133 friend class RYWImpl; 134 135 Arena arena; 136 Transaction tr; 137 SnapshotCache cache; 138 WriteMap writes; 139 CoalescedKeyRefRangeMap<bool> readConflicts; 140 Map<Key, std::vector<Reference<Watch>>> watchMap; // Keys that are being watched in this transaction 141 Promise<Void> resetPromise; 142 AndFuture reading; 143 int retries; 144 Future<Void> timeoutActor; 145 double creationTime; 146 bool commitStarted; 147 148 Reference<TransactionDebugInfo> transactionDebugInfo; 149 150 void resetTimeout(); 151 void updateConflictMap( KeyRef const& key, WriteMap::iterator& it ); // pre: it.segmentContains(key) 152 void updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ); // pre: it.segmentContains(keys.begin), keys are already inside this->arena 153 void writeRangeToNativeTransaction( KeyRangeRef const& keys ); 154 155 void resetRyow(); // doesn't reset the encapsulated transaction, or creation time/retry state 156 KeyRef getMaxReadKey(); 157 KeyRef getMaxWriteKey(); 158 159 bool checkUsedDuringCommit(); 160 161 void debugLogRetries(Optional<Error> error = Optional<Error>()); 162 163 ReadYourWritesTransactionOptions options; 164 }; 165 166 #endif 167