1 /*
2  * ReadYourWrites.h
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #ifndef FDBCLIENT_READYOURWRITES_H
22 #define FDBCLIENT_READYOURWRITES_H
23 #pragma once
24 
25 #include "fdbclient/NativeAPI.actor.h"
26 #include "fdbclient/KeyRangeMap.h"
27 #include "fdbclient/RYWIterator.h"
28 #include <list>
29 
30 //SOMEDAY: Optimize getKey to avoid using getRange
31 
32 struct ReadYourWritesTransactionOptions {
33 	bool readYourWritesDisabled : 1;
34 	bool readAheadDisabled : 1;
35 	bool readSystemKeys : 1;
36 	bool writeSystemKeys : 1;
37 	bool nextWriteDisableConflictRange : 1;
38 	bool debugRetryLogging : 1;
39 	bool disableUsedDuringCommitProtection : 1;
40 	double timeoutInSeconds;
41 	int maxRetries;
42 	int snapshotRywEnabled;
43 
ReadYourWritesTransactionOptionsReadYourWritesTransactionOptions44 	ReadYourWritesTransactionOptions() {}
45 	explicit ReadYourWritesTransactionOptions(Transaction const& tr);
46 	void reset(Transaction const& tr);
47 	void fullReset(Transaction const& tr);
48 	bool getAndResetWriteConflictDisabled();
49 };
50 
51 struct TransactionDebugInfo : public ReferenceCounted<TransactionDebugInfo> {
52 	std::string transactionName;
53 	double lastRetryLogTime;
54 
TransactionDebugInfoTransactionDebugInfo55 	TransactionDebugInfo() : transactionName(""), lastRetryLogTime() { }
56 };
57 
58 //Values returned by a ReadYourWritesTransaction will contain a reference to the transaction's arena. Therefore, keeping a reference to a value
59 //longer than its creating transaction would hold all of the memory generated by the transaction
60 class ReadYourWritesTransaction : NonCopyable, public ReferenceCounted<ReadYourWritesTransaction>, public FastAllocated<ReadYourWritesTransaction> {
61 public:
allocateOnForeignThread()62 	static ReadYourWritesTransaction* allocateOnForeignThread() {
63 		ReadYourWritesTransaction *tr = (ReadYourWritesTransaction*)ReadYourWritesTransaction::operator new( sizeof(ReadYourWritesTransaction) );
64 		tr->tr.preinitializeOnForeignThread();
65 		return tr;
66 	}
67 
68 	explicit ReadYourWritesTransaction( Database const& cx );
69 	~ReadYourWritesTransaction();
70 
setVersion(Version v)71 	void setVersion( Version v ) { tr.setVersion(v); }
72 	Future<Version> getReadVersion();
73 	Future< Optional<Value> > get( const Key& key, bool snapshot = false );
74 	Future< Key > getKey( const KeySelector& key, bool snapshot = false );
75 	Future< Standalone<RangeResultRef> > getRange( const KeySelector& begin, const KeySelector& end, int limit, bool snapshot = false, bool reverse = false );
76 	Future< Standalone<RangeResultRef> > getRange( KeySelector begin, KeySelector end, GetRangeLimits limits, bool snapshot = false, bool reverse = false );
77 	Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, int limit, bool snapshot = false, bool reverse = false ) {
78 		return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ),
79 			KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limit, snapshot, reverse );
80 	}
81 	Future< Standalone<RangeResultRef> > getRange( const KeyRange& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) {
82 		return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ),
83 			KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limits, snapshot, reverse );
84 	}
85 
86 	Future< Standalone<VectorRef<const char*>> > getAddressesForKey(const Key& key);
87 
88 	void addReadConflictRange( KeyRangeRef const& keys );
makeSelfConflicting()89 	void makeSelfConflicting() { tr.makeSelfConflicting(); }
90 
91 	void atomicOp( const KeyRef& key, const ValueRef& operand, uint32_t operationType );
92 	void set( const KeyRef& key, const ValueRef& value );
93 	void clear( const KeyRangeRef& range );
94 	void clear( const KeyRef& key );
95 
96 	Future<Void> watch(const Key& key);
97 
98 	void addWriteConflictRange( KeyRangeRef const& keys );
99 
100 	Future<Void> commit();
getCommittedVersion()101 	Version getCommittedVersion() { return tr.getCommittedVersion(); }
102 	Future<Standalone<StringRef>> getVersionstamp();
103 
104 	void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
105 
106 	Future<Void> onError( Error const& e );
107 
108 	// These are to permit use as state variables in actors:
ReadYourWritesTransaction()109 	ReadYourWritesTransaction() : cache(&arena), writes(&arena) {}
110 	void operator=(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT;
111 	ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT;
112 
addref()113 	virtual void addref() { ReferenceCounted<ReadYourWritesTransaction>::addref(); }
delref()114 	virtual void delref() { ReferenceCounted<ReadYourWritesTransaction>::delref(); }
115 
116 	void cancel();
117 	void reset();
debugTransaction(UID dID)118 	void debugTransaction(UID dID) { tr.debugTransaction(dID); }
119 
debug_onIdle()120 	Future<Void> debug_onIdle() {  return reading; }
121 
122 	// Used by ThreadSafeTransaction for exceptions thrown in void methods
123 	Error deferredError;
124 
checkDeferredError()125 	void checkDeferredError() { tr.checkDeferredError(); if (deferredError.code() != invalid_error_code) throw deferredError; }
126 
127 	void getWriteConflicts( KeyRangeMap<bool> *result );
128 
getDatabase()129 	Database getDatabase() {
130 		return tr.getDatabase();
131 	}
132 private:
133 	friend class RYWImpl;
134 
135 	Arena arena;
136 	Transaction tr;
137 	SnapshotCache cache;
138 	WriteMap writes;
139 	CoalescedKeyRefRangeMap<bool> readConflicts;
140 	Map<Key, std::vector<Reference<Watch>>> watchMap;                      // Keys that are being watched in this transaction
141 	Promise<Void> resetPromise;
142 	AndFuture reading;
143 	int retries;
144 	Future<Void> timeoutActor;
145 	double creationTime;
146 	bool commitStarted;
147 
148 	Reference<TransactionDebugInfo> transactionDebugInfo;
149 
150 	void resetTimeout();
151 	void updateConflictMap( KeyRef const& key, WriteMap::iterator& it ); // pre: it.segmentContains(key)
152 	void updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ); // pre: it.segmentContains(keys.begin), keys are already inside this->arena
153 	void writeRangeToNativeTransaction( KeyRangeRef const& keys );
154 
155 	void resetRyow(); // doesn't reset the encapsulated transaction, or creation time/retry state
156 	KeyRef getMaxReadKey();
157 	KeyRef getMaxWriteKey();
158 
159 	bool checkUsedDuringCommit();
160 
161 	void debugLogRetries(Optional<Error> error = Optional<Error>());
162 
163 	ReadYourWritesTransactionOptions options;
164 };
165 
166 #endif
167