1 /*
2  * ReadYourWrites.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbclient/ReadYourWrites.h"
22 #include "fdbclient/Atomic.h"
23 #include "fdbclient/DatabaseContext.h"
24 #include "fdbclient/StatusClient.h"
25 #include "fdbclient/MonitorLeader.h"
26 #include "flow/Util.h"
27 #include "flow/actorcompiler.h"  // This must be the last #include.
28 
29 class RYWImpl {
30 public:
dump(Iter it)31 	template<class Iter> static void dump( Iter it ) {
32 		it.skip(allKeys.begin);
33 		Arena arena;
34 		while( true ) {
35 			Optional<StringRef> key = StringRef();
36 			if (it.is_kv()) {
37 				auto kv = it.kv(arena);
38 				if (kv) key = kv->key;
39 			}
40 			TraceEvent("RYWDump")
41 			    .detail("Begin", it.beginKey())
42 			    .detail("End", it.endKey())
43 			    .detail("Unknown", it.is_unknown_range())
44 			    .detail("Empty", it.is_empty_range())
45 			    .detail("KV", it.is_kv())
46 			    .detail("Key", key.get());
47 			if( it.endKey() == allKeys.end )
48 				break;
49 			++it;
50 		}
51 	}
52 
53 	struct GetValueReq {
GetValueReqRYWImpl::GetValueReq54 		explicit GetValueReq( Key key ) : key(key) {}
55 		Key key;
56 		typedef Optional<Value> Result;
57 	};
58 
59 	struct GetKeyReq {
GetKeyReqRYWImpl::GetKeyReq60 		explicit GetKeyReq( KeySelector key ) : key(key) {}
61 		KeySelector key;
62 		typedef Key Result;
63 	};
64 
65 	template <bool Reverse>
66 	struct GetRangeReq {
GetRangeReqRYWImpl::GetRangeReq67 		GetRangeReq( KeySelector begin, KeySelector end, GetRangeLimits limits ) : begin(begin), end(end), limits(limits) {}
68 		KeySelector begin, end;
69 		GetRangeLimits limits;
70 		typedef Standalone<RangeResultRef> Result;
71 	};
72 
73 	// read() Performs a read (get, getKey, getRange, etc), in the context of the given transaction.  Snapshot or RYW reads are distingushed by the type Iter being SnapshotCache::iterator or RYWIterator.
74 	// Fills in the snapshot cache as a side effect but does not affect conflict ranges.
75 	// Some (indicated) overloads of read are required to update the given *it to point to the key that was read, so that the corresponding overload of addConflictRange() can make use of it.
76 
read(ReadYourWritesTransaction * ryw,GetValueReq read,Iter * it)77 	ACTOR template<class Iter> static Future< Optional<Value> > read( ReadYourWritesTransaction *ryw, GetValueReq read, Iter* it ) {
78 		// This overload is required to provide postcondition: it->extractWriteMapIterator().segmentContains(read.key)
79 
80 		it->skip(read.key);
81 		state bool dependent = it->is_dependent();
82 		if( it->is_kv() ) {
83 			const KeyValueRef* result = it->kv(ryw->arena);
84 			if (result != nullptr) {
85 				return result->value;
86 			} else {
87 				return Optional<Value>();
88 			}
89 		} else if( it->is_empty_range() ) {
90 			return Optional<Value>();
91 		} else {
92 			Optional<Value> res = wait( ryw->tr.get( read.key, true ) );
93 			KeyRef k( ryw->arena, read.key );
94 
95 			if( res.present() ) {
96 				if( ryw->cache.insert( k, res.get() ) )
97 					ryw->arena.dependsOn(res.get().arena());
98 				if( !dependent )
99 					return res;
100 			} else {
101 				ryw->cache.insert( k, Optional<ValueRef>() );
102 				if( !dependent )
103 					return Optional<Value>();
104 			}
105 
106 			//There was a dependent write at the key, so we need to lookup the iterator again
107 			it->skip(k);
108 
109 			ASSERT( it->is_kv() );
110 			const KeyValueRef* result = it->kv(ryw->arena);
111 			if (result != nullptr) {
112 				return result->value;
113 			} else {
114 				return Optional<Value>();
115 			}
116 		}
117 	}
118 
read(ReadYourWritesTransaction * ryw,GetKeyReq read,Iter * it)119 	ACTOR template<class Iter> static Future< Key > read( ReadYourWritesTransaction* ryw, GetKeyReq read, Iter* it ) {
120 		if( read.key.offset > 0 ) {
121 			Standalone<RangeResultRef> result = wait( getRangeValue( ryw, read.key, firstGreaterOrEqual(ryw->getMaxReadKey()), GetRangeLimits(1), it ) );
122 			if( result.readToBegin )
123 				return allKeys.begin;
124 			if( result.readThroughEnd || !result.size() )
125 				return ryw->getMaxReadKey();
126 			return result[0].key;
127 		} else {
128 			read.key.offset++;
129 			Standalone<RangeResultRef> result = wait( getRangeValueBack( ryw, firstGreaterOrEqual(allKeys.begin), read.key, GetRangeLimits(1), it ) );
130 			if( result.readThroughEnd )
131 				return ryw->getMaxReadKey();
132 			if( result.readToBegin || !result.size() )
133 				return allKeys.begin;
134 			return result[0].key;
135 		}
136 	};
137 
read(ReadYourWritesTransaction * ryw,GetRangeReq<false> read,Iter * it)138 	template <class Iter> static Future< Standalone<RangeResultRef> > read( ReadYourWritesTransaction* ryw, GetRangeReq<false> read, Iter* it ) {
139 		return getRangeValue( ryw, read.begin, read.end, read.limits, it );
140 	};
141 
read(ReadYourWritesTransaction * ryw,GetRangeReq<true> read,Iter * it)142 	template <class Iter> static Future< Standalone<RangeResultRef> > read( ReadYourWritesTransaction* ryw, GetRangeReq<true> read, Iter* it ) {
143 		return getRangeValueBack( ryw, read.begin, read.end, read.limits, it );
144 	};
145 
146 	// readThrough() performs a read in the RYW disabled case, passing it on relatively directly to the underlying transaction.
147 	// Responsible for clipping results to the non-system keyspace when appropriate, since NativeAPI doesn't do that.
148 
readThrough(ReadYourWritesTransaction * ryw,GetValueReq read,bool snapshot)149 	static Future<Optional<Value>> readThrough( ReadYourWritesTransaction *ryw, GetValueReq read, bool snapshot ) {
150 		return ryw->tr.get( read.key, snapshot );
151 	}
152 
readThrough(ReadYourWritesTransaction * ryw,GetKeyReq read,bool snapshot)153 	ACTOR static Future<Key> readThrough( ReadYourWritesTransaction *ryw, GetKeyReq read, bool snapshot ) {
154 		Key key = wait( ryw->tr.getKey( read.key, snapshot ) );
155 		if (ryw->getMaxReadKey() < key) return ryw->getMaxReadKey();  // Filter out results in the system keys if they are not accessible
156 		return key;
157 	}
158 
readThrough(ReadYourWritesTransaction * ryw,GetRangeReq<Reverse> read,bool snapshot)159 	ACTOR template <bool Reverse> static Future<Standalone<RangeResultRef>> readThrough( ReadYourWritesTransaction *ryw, GetRangeReq<Reverse> read, bool snapshot ) {
160 		if(Reverse && read.end.offset > 1) {
161 			// FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result actually does.
162 			Key key = wait( ryw->tr.getKey(read.end, snapshot) );
163 			if(key > ryw->getMaxReadKey())
164 				read.end = firstGreaterOrEqual(ryw->getMaxReadKey());
165 			else
166 				read.end = KeySelector(firstGreaterOrEqual(key), key.arena());
167 		}
168 
169 		Standalone<RangeResultRef> v = wait( ryw->tr.getRange(read.begin, read.end, read.limits, snapshot, Reverse) );
170 		KeyRef maxKey = ryw->getMaxReadKey();
171 		if(v.size() > 0) {
172 			if(!Reverse && v[v.size()-1].key >= maxKey) {
173 				state Standalone<RangeResultRef> _v = v;
174 				int i = _v.size() - 2;
175 				for(; i >= 0 && _v[i].key >= maxKey; --i) { }
176 				return Standalone<RangeResultRef>(RangeResultRef( VectorRef<KeyValueRef>(&_v[0], i+1), false ), _v.arena());
177 			}
178 		}
179 
180 		return v;
181 	}
182 
183 	// addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant conflict range
184 
addConflictRange(ReadYourWritesTransaction * ryw,GetValueReq read,WriteMap::iterator & it,Optional<Value> result)185 	static void addConflictRange( ReadYourWritesTransaction* ryw, GetValueReq read, WriteMap::iterator& it, Optional<Value> result ) {
186 		// it will already point to the right segment (see the calling code in read()), so we don't need to skip
187 		// read.key will be copied into ryw->arena inside of updateConflictMap if it is being added
188 		ryw->updateConflictMap(read.key, it);
189 	}
190 
addConflictRange(ReadYourWritesTransaction * ryw,GetKeyReq read,WriteMap::iterator & it,Key result)191 	static void addConflictRange( ReadYourWritesTransaction* ryw, GetKeyReq read, WriteMap::iterator& it, Key result ) {
192 		KeyRangeRef readRange;
193 		if( read.key.offset <= 0 )
194 			readRange = KeyRangeRef( KeyRef( ryw->arena, result ), read.key.orEqual ? keyAfter( read.key.getKey(), ryw->arena ) : KeyRef( ryw->arena, read.key.getKey() ) );
195 		else
196 			readRange = KeyRangeRef( read.key.orEqual ? keyAfter( read.key.getKey(), ryw->arena ) : KeyRef( ryw->arena, read.key.getKey() ), keyAfter( result, ryw->arena ) );
197 
198 		it.skip( readRange.begin );
199 		ryw->updateConflictMap(readRange, it);
200 	}
201 
addConflictRange(ReadYourWritesTransaction * ryw,GetRangeReq<false> read,WriteMap::iterator & it,Standalone<RangeResultRef> const & result)202 	static void addConflictRange( ReadYourWritesTransaction* ryw, GetRangeReq<false> read, WriteMap::iterator &it, Standalone<RangeResultRef> const& result ) {
203 		KeyRef rangeBegin, rangeEnd;
204 		bool endInArena = false;
205 
206 		if( read.begin.getKey() < read.end.getKey() ) {
207 			rangeBegin = read.begin.getKey();
208 			rangeEnd = read.end.offset > 0 && result.more ? read.begin.getKey() : read.end.getKey();
209 		}
210 		else {
211 			rangeBegin = read.end.getKey();
212 			rangeEnd = read.begin.getKey();
213 		}
214 
215 		if( result.readToBegin && read.begin.offset <= 0 ) rangeBegin = allKeys.begin;
216 		if( result.readThroughEnd && read.end.offset > 0 ) rangeEnd = ryw->getMaxReadKey();
217 
218 		if ( result.size() ) {
219 			if( read.begin.offset <= 0 ) rangeBegin = std::min( rangeBegin, result[0].key );
220 			if( rangeEnd <= result.end()[-1].key ) {
221 				rangeEnd = keyAfter( result.end()[-1].key, ryw->arena );
222 				endInArena = true;
223 			}
224 		}
225 
226 		KeyRangeRef readRange = KeyRangeRef( KeyRef( ryw->arena, rangeBegin ), endInArena ? rangeEnd : KeyRef( ryw->arena, rangeEnd ) );
227 		it.skip( readRange.begin );
228 		ryw->updateConflictMap(readRange, it);
229 	}
230 
addConflictRange(ReadYourWritesTransaction * ryw,GetRangeReq<true> read,WriteMap::iterator & it,Standalone<RangeResultRef> const & result)231 	static void addConflictRange( ReadYourWritesTransaction* ryw, GetRangeReq<true> read, WriteMap::iterator& it, Standalone<RangeResultRef> const& result ) {
232 		KeyRef rangeBegin, rangeEnd;
233 		bool endInArena = false;
234 
235 		if( read.begin.getKey() < read.end.getKey() ) {
236 			rangeBegin = read.begin.offset <= 0 && result.more ? read.end.getKey() : read.begin.getKey();
237 			rangeEnd = read.end.getKey();
238 		}
239 		else {
240 			rangeBegin = read.end.getKey();
241 			rangeEnd = read.begin.getKey();
242 		}
243 
244 		if( result.readToBegin && read.begin.offset <= 0 ) rangeBegin = allKeys.begin;
245 		if( result.readThroughEnd && read.end.offset > 0 ) rangeEnd = ryw->getMaxReadKey();
246 
247 		if ( result.size() ) {
248 			rangeBegin = std::min( rangeBegin, result.end()[-1].key );
249 			if( read.end.offset > 0 && rangeEnd <= result[0].key ) {
250 				rangeEnd = keyAfter( result[0].key, ryw->arena );
251 				endInArena = true;
252 			}
253 		}
254 
255 		KeyRangeRef readRange = KeyRangeRef( KeyRef( ryw->arena, rangeBegin ), endInArena ? rangeEnd : KeyRef( ryw->arena, rangeEnd ) );
256 		it.skip( readRange.begin );
257 		ryw->updateConflictMap(readRange, it);
258 	}
259 
readWithConflictRangeThrough(ReadYourWritesTransaction * ryw,Req req,bool snapshot)260 	ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeThrough( ReadYourWritesTransaction* ryw, Req req, bool snapshot ) {
261 		choose {
262 			when (typename Req::Result result = wait( readThrough( ryw, req, snapshot ) )) {
263 				return result;
264 			}
265 			when (wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
266 		}
267 	}
readWithConflictRangeSnapshot(ReadYourWritesTransaction * ryw,Req req)268 	ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeSnapshot( ReadYourWritesTransaction* ryw, Req req ) {
269 		state SnapshotCache::iterator it(&ryw->cache, &ryw->writes);
270 		choose {
271 			when (typename Req::Result result = wait( read( ryw, req, &it ) )) {
272 				return result;
273 			}
274 			when (wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
275 		}
276 	}
readWithConflictRangeRYW(ReadYourWritesTransaction * ryw,Req req,bool snapshot)277 	ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeRYW( ReadYourWritesTransaction* ryw, Req req, bool snapshot ) {
278 		state RYWIterator it( &ryw->cache, &ryw->writes );
279 		choose {
280 			when (typename Req::Result result = wait( read( ryw, req, &it ) )) {
281 				// Some overloads of addConflictRange() require it to point to the "right" key and others don't.  The corresponding overloads of read() have to provide that guarantee!
282 				if(!snapshot)
283 					addConflictRange( ryw, req, it.extractWriteMapIterator(), result );
284 				return result;
285 			}
286 			when (wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
287 		}
288 	}
readWithConflictRange(ReadYourWritesTransaction * ryw,Req const & req,bool snapshot)289 	template <class Req> static inline Future<typename Req::Result> readWithConflictRange( ReadYourWritesTransaction* ryw, Req const& req, bool snapshot ) {
290 		if (ryw->options.readYourWritesDisabled) {
291 			return readWithConflictRangeThrough(ryw, req, snapshot);
292 		} else if (snapshot && ryw->options.snapshotRywEnabled <= 0) {
293 			return readWithConflictRangeSnapshot(ryw, req);
294 		}
295 		return readWithConflictRangeRYW(ryw, req, snapshot);
296 	}
297 
resolveKeySelectorFromCache(KeySelector & key,Iter & it,KeyRef const & maxKey,bool * readToBegin,bool * readThroughEnd,int * actualOffset)298 	template<class Iter> static void resolveKeySelectorFromCache( KeySelector& key, Iter& it, KeyRef const& maxKey, bool* readToBegin, bool* readThroughEnd, int* actualOffset ) {
299 		// If the key indicated by `key` can be determined without reading unknown data from the snapshot, then it.kv().key is the resolved key.
300 		// If the indicated key is determined to be "off the beginning or end" of the database, it points to the first or last segment in the DB,
301 		//   and key is an equivalent key selector relative to the beginning or end of the database.
302 		// Otherwise it points to an unknown segment, and key is an equivalent key selector whose base key is in or adjoining the segment.
303 
304 		key.removeOrEqual(key.arena());
305 
306 		bool alreadyExhausted = key.offset == 1;
307 
308 		it.skip( key.getKey() );  // TODO: or precondition?
309 
310 		if ( key.offset <= 0 && it.beginKey() == key.getKey() && key.getKey() != allKeys.begin )
311 			--it;
312 
313 		ExtStringRef keykey = key.getKey();
314 		bool keyNeedsCopy = false;
315 
316 		// Invariant: it.beginKey() <= keykey && keykey <= it.endKey() && (key.isBackward() ? it.beginKey() != keykey : it.endKey() != keykey)
317 		// Maintaining this invariant, we transform the key selector toward firstGreaterOrEqual form until we reach an unknown range or the result
318 		while (key.offset > 1 && !it.is_unreadable() && !it.is_unknown_range() && it.endKey() < maxKey ) {
319 			if (it.is_kv())
320 				--key.offset;
321 			++it;
322 			keykey = it.beginKey();
323 			keyNeedsCopy = true;
324 		}
325 		while (key.offset < 1 && !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() != allKeys.begin) {
326 			if (it.is_kv()) {
327 				++key.offset;
328 				if (key.offset == 1) {
329 					keykey = it.beginKey();
330 					keyNeedsCopy = true;
331 					break;
332 				}
333 			}
334 			--it;
335 			keykey = it.endKey();
336 			keyNeedsCopy = true;
337 		}
338 
339 		if(!alreadyExhausted) {
340 			*actualOffset = key.offset;
341 		}
342 
343 		if (!it.is_unreadable() && !it.is_unknown_range() && key.offset < 1) {
344 			*readToBegin = true;
345 			key.setKey(allKeys.begin);
346 			key.offset = 1;
347 			return;
348 		}
349 
350 		if (!it.is_unreadable() && !it.is_unknown_range() && key.offset > 1) {
351 			*readThroughEnd = true;
352 			key.setKey(maxKey); // maxKey is a KeyRef, but points to a LiteralStringRef. TODO: how can we ASSERT this?
353 			key.offset = 1;
354 			return;
355 		}
356 
357 		while (!it.is_unreadable() && it.is_empty_range() && it.endKey() < maxKey) {
358 			++it;
359 			keykey = it.beginKey();
360 			keyNeedsCopy = true;
361 		}
362 
363 		if(keyNeedsCopy) {
364 			key.setKey(keykey.toArena(key.arena()));
365 		}
366 	}
367 
getKnownKeyRange(RangeResultRef data,KeySelector begin,KeySelector end,Arena & arena)368 	static KeyRangeRef getKnownKeyRange( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) {
369 		StringRef beginKey = begin.offset<=1 ? begin.getKey() : allKeys.end;
370 		ExtStringRef endKey = !data.more && end.offset>=1 ? end.getKey() : allKeys.begin;
371 
372 		if (data.readToBegin) beginKey = allKeys.begin;
373 		if (data.readThroughEnd) endKey = allKeys.end;
374 
375 		if( data.size() ) {
376 			beginKey = std::min( beginKey, data[0].key );
377 			if( data.readThrough.present() ) {
378 				endKey = std::max<ExtStringRef>( endKey, data.readThrough.get() );
379 			}
380 			else {
381 				endKey = !data.more && data.end()[-1].key < endKey ? endKey : ExtStringRef( data.end()[-1].key, 1 );
382 			}
383 		}
384 		if (beginKey >= endKey) return KeyRangeRef();
385 
386 
387 		return KeyRangeRef( StringRef(arena, beginKey), endKey.toArena(arena));
388 	}
389 
390 	// Pre: it points to an unknown range
391 	// Increments it to point to the unknown range just before the next nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
skipUncached(Iter & it,Iter const & end,int iterationLimit)392 	template<class Iter> static int skipUncached( Iter& it, Iter const& end, int iterationLimit ) {
393 		ExtStringRef b = it.beginKey();
394 		ExtStringRef e = it.endKey();
395 		int singleEmpty = 0;
396 
397 		ASSERT( !it.is_unreadable() && it.is_unknown_range() );
398 
399 		// b is the beginning of the most recent contiguous *empty* range
400 		// e is it.endKey()
401 		while( it != end && --iterationLimit>=0 ) {
402 			if (it.is_unreadable() || it.is_empty_range()) {
403 				if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
404 					while (it.is_unreadable() || !it.is_unknown_range())
405 						--it;
406 					return singleEmpty;
407 				}
408 				singleEmpty++;
409 			} else
410 				b = e;
411 			++it;
412 			e = it.endKey();
413 		}
414 		while (it.is_unreadable() || !it.is_unknown_range())
415 			--it;
416 		return singleEmpty;
417 	}
418 
419 	// Pre: it points to an unknown range
420 	// Returns the number of following empty single-key known ranges between it and the next nontrivial known range, but no more than maxClears
421 	// Leaves `it` in an indeterminate state
countUncached(Iter && it,KeyRef maxKey,int maxClears)422 	template<class Iter> static int countUncached( Iter&& it, KeyRef maxKey, int maxClears ) {
423 		if (maxClears<=0) return 0;
424 
425 		ExtStringRef b = it.beginKey();
426 		ExtStringRef e = it.endKey();
427 		int singleEmpty = 0;
428 
429 		while( e < maxKey ) {
430 			if (it.is_unreadable() || it.is_empty_range()) {
431 				if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
432 					return singleEmpty;
433 				}
434 				singleEmpty++;
435 				if( singleEmpty >= maxClears )
436 					return maxClears;
437 			} else
438 				b = e;
439 			++it;
440 			e = it.endKey();
441 		}
442 		return singleEmpty;
443 	}
444 
setRequestLimits(GetRangeLimits & requestLimit,int64_t additionalRows,int offset,int requestCount)445 	static void setRequestLimits(GetRangeLimits &requestLimit, int64_t additionalRows, int offset, int requestCount) {
446 		requestLimit.minRows = (int)std::min(std::max(1 + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
447 		if(requestLimit.hasRowLimit()) {
448 			requestLimit.rows = (int)std::min(std::max(std::max(1,requestLimit.rows) + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
449 		}
450 
451 		// Calculating request byte limit
452 		if(requestLimit.bytes==0) {
453 			requestLimit.bytes = CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED;
454 			if(!requestLimit.hasRowLimit()) {
455 				requestLimit.rows = (int)std::min(std::max(std::max(1,requestLimit.rows) + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
456 			}
457 		}
458 		else if(requestLimit.hasByteLimit()) {
459 			requestLimit.bytes = std::min(int64_t(requestLimit.bytes)<<std::min(requestCount, 20), (int64_t)CLIENT_KNOBS->REPLY_BYTE_LIMIT);
460 		}
461 	}
462 
463 	//TODO: read to begin, read through end flags for result
getRangeValue(ReadYourWritesTransaction * ryw,KeySelector begin,KeySelector end,GetRangeLimits limits,Iter * pit)464 	ACTOR template<class Iter> static Future< Standalone<RangeResultRef> > getRangeValue( ReadYourWritesTransaction *ryw, KeySelector begin, KeySelector end, GetRangeLimits limits, Iter* pit ) {
465 		state Iter& it(*pit);
466 		state Iter itEnd(*pit);
467 		state Standalone<RangeResultRef> result;
468 		state int64_t additionalRows = 0;
469 		state int itemsPastEnd = 0;
470 		state int requestCount = 0;
471 		state bool readToBegin = false;
472 		state bool readThroughEnd = false;
473 		state int actualBeginOffset = begin.offset;
474 		state int actualEndOffset = end.offset;
475 		//state UID randomID = g_nondeterministic_random->randomUniqueID();
476 
477 		resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
478 		resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
479 
480 		if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
481 			return RangeResultRef(false, false);
482 		}
483 		else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
484 			|| ( end.isFirstGreaterOrEqual() && end.getKey() == allKeys.begin ) )
485 		{
486 			return RangeResultRef(readToBegin, readThroughEnd);
487 		}
488 
489 		if( !end.isFirstGreaterOrEqual() && begin.getKey() > end.getKey() ) {
490 			Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) );
491 			if( resolvedEnd == allKeys.begin )
492 				readToBegin = true;
493 			if( resolvedEnd == ryw->getMaxReadKey() )
494 				readThroughEnd = true;
495 
496 			if( begin.getKey() >= resolvedEnd && !begin.isBackward() ) {
497 				return RangeResultRef(false, false);
498 			}
499 			else if( resolvedEnd == allKeys.begin ) {
500 				return RangeResultRef(readToBegin, readThroughEnd);
501 			}
502 
503 			resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
504 			resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
505 		}
506 
507 		//TraceEvent("RYWSelectorsStartForward", randomID).detail("ByteLimit", limits.bytes).detail("RowLimit", limits.rows);
508 
509 		loop {
510 			/*TraceEvent("RYWSelectors", randomID).detail("Begin", begin.toString())
511 				.detail("End", end.toString())
512 				.detail("Reached", limits.isReached())
513 				.detail("ItemsPastEnd", itemsPastEnd)
514 				.detail("EndOffset", -end.offset)
515 				.detail("ItBegin", it.beginKey())
516 				.detail("ItEnd", itEnd.beginKey())
517 				.detail("Unknown", it.is_unknown_range())
518 				.detail("Requests", requestCount);*/
519 
520 			if( !result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
521 				return RangeResultRef(false, false);
522 			}
523 
524 			if( end.offset <= 1 && end.getKey() == allKeys.begin ) {
525 				return RangeResultRef(readToBegin, readThroughEnd);
526 			}
527 
528 			if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) ||
529 				( begin.offset >= 1 && begin.getKey() >= ryw->getMaxReadKey() ) ) {
530 				if( end.isFirstGreaterOrEqual() ) break;
531 				if( !result.size() ) break;
532 				Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
533 				if( resolvedEnd == allKeys.begin )
534 					readToBegin = true;
535 				if( resolvedEnd == ryw->getMaxReadKey() )
536 					readThroughEnd = true;
537 				end = firstGreaterOrEqual( resolvedEnd );
538 				break;
539 			}
540 
541 			if( !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() > itEnd.beginKey() ) {
542 				if( end.isFirstGreaterOrEqual() ) break;
543 				return RangeResultRef(readToBegin, readThroughEnd);
544 			}
545 
546 			if( limits.isReached() && itemsPastEnd >= 1-end.offset ) break;
547 
548 			if (it == itEnd && ((!it.is_unreadable() && !it.is_unknown_range()) || (begin.offset > 0 && end.isFirstGreaterOrEqual() && end.getKey() == it.beginKey()))) break;
549 
550 			if (it.is_unknown_range()) {
551 				if( limits.hasByteLimit() && result.size() && itemsPastEnd >= 1-end.offset ) {
552 					result.more = true;
553 					break;
554 				}
555 
556 				Iter ucEnd(it);
557 				int singleClears = 0;
558 				int clearLimit = requestCount ? 1 << std::min(requestCount, 20) : 0;
559 				if( it.beginKey() < itEnd.beginKey() )
560 					singleClears = std::min(skipUncached(ucEnd, itEnd, BUGGIFY ? 0 : clearLimit + 100), clearLimit);
561 
562 				state KeySelector read_end;
563 				if ( ucEnd!=itEnd ) {
564 					Key k = ucEnd.endKey().toStandaloneStringRef();
565 					read_end = KeySelector(firstGreaterOrEqual(k), k.arena());
566 					if( end.offset < 1 ) additionalRows += 1 - end.offset; // extra for items past end
567 				} else if( end.offset < 1 ) {
568 					read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena());
569 					additionalRows += 1 - end.offset;
570 				} else {
571 					read_end = end;
572 					if( end.offset > 1 ) {
573 						singleClears += countUncached( std::move(ucEnd), ryw->getMaxReadKey(), clearLimit-singleClears);
574 						read_end.offset += singleClears;
575 					}
576 				}
577 
578 				additionalRows += singleClears;
579 
580 				state KeySelector read_begin;
581 				if (begin.isFirstGreaterOrEqual()) {
582 					Key k = it.beginKey() > begin.getKey() ? it.beginKey().toStandaloneStringRef() : Key(begin.getKey(), begin.arena());
583 					begin = KeySelector(firstGreaterOrEqual(k), k.arena());
584 					read_begin = begin;
585 				} else if( begin.offset > 1 ) {
586 					read_begin = KeySelector(firstGreaterOrEqual(begin.getKey()), begin.arena());
587 					additionalRows += begin.offset - 1;
588 				} else {
589 					read_begin = begin;
590 					ucEnd = it;
591 
592 					singleClears = countUncachedBack(std::move(ucEnd), clearLimit);
593 					read_begin.offset -= singleClears;
594 					additionalRows += singleClears;
595 				}
596 
597 				if(read_end.getKey() < read_begin.getKey()) {
598 					read_end.setKey(read_begin.getKey());
599 					read_end.arena().dependsOn(read_begin.arena());
600 				}
601 
602 				state GetRangeLimits requestLimit = limits;
603 				setRequestLimits(requestLimit, additionalRows, 2-read_begin.offset, requestCount);
604 				requestCount++;
605 
606 				ASSERT( !requestLimit.hasRowLimit() || requestLimit.rows > 0 );
607 				ASSERT( requestLimit.hasRowLimit() || requestLimit.hasByteLimit() );
608 
609 				//TraceEvent("RYWIssuing", randomID).detail("Begin", read_begin.toString()).detail("End", read_end.toString()).detail("Bytes", requestLimit.bytes).detail("Rows", requestLimit.rows).detail("Limits", limits.bytes).detail("Reached", limits.isReached()).detail("RequestCount", requestCount).detail("SingleClears", singleClears).detail("UcEnd", ucEnd.beginKey()).detail("MinRows", requestLimit.minRows);
610 
611 				additionalRows = 0;
612 				Standalone<RangeResultRef> snapshot_read = wait( ryw->tr.getRange( read_begin, read_end, requestLimit, true, false ) );
613 				KeyRangeRef range = getKnownKeyRange( snapshot_read, read_begin, read_end, ryw->arena );
614 
615 				//TraceEvent("RYWCacheInsert", randomID).detail("Range", range).detail("ExpectedSize", snapshot_read.expectedSize()).detail("Rows", snapshot_read.size()).detail("Results", snapshot_read).detail("More", snapshot_read.more).detail("ReadToBegin", snapshot_read.readToBegin).detail("ReadThroughEnd", snapshot_read.readThroughEnd).detail("ReadThrough", snapshot_read.readThrough);
616 
617 				if( ryw->cache.insert( range, snapshot_read ) )
618 					ryw->arena.dependsOn(snapshot_read.arena());
619 
620 				// TODO: Is there a more efficient way to deal with invalidation?
621 				resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
622 				resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
623 			} else if (it.is_kv()) {
624 				KeyValueRef const* start = it.kv(ryw->arena);
625 				if (start == nullptr) {
626 					++it;
627 					continue;
628 				}
629 				it.skipContiguous( end.isFirstGreaterOrEqual() ? end.getKey() : ryw->getMaxReadKey() ); //not technically correct since this would add end.getKey(), but that is protected above
630 
631 				int maxCount = it.kv(ryw->arena) - start + 1;
632 				int count = 0;
633 				for(; count < maxCount && !limits.isReached(); count++ ) {
634 					limits.decrement(start[count]);
635 				}
636 
637 				itemsPastEnd += maxCount - count;
638 
639 				//TraceEvent("RYWaddKV", randomID).detail("Key", it.beginKey()).detail("Count", count).detail("MaxCount", maxCount).detail("ItemsPastEnd", itemsPastEnd);
640 				if( count ) result.append( result.arena(), start, count );
641 				++it;
642 			} else
643 				++it;
644 		}
645 
646 		result.more = result.more || limits.isReached();
647 
648 		if( end.isFirstGreaterOrEqual() ) {
649 			int keepItems = std::lower_bound( result.begin(), result.end(), end.getKey(), KeyValueRef::OrderByKey() ) - result.begin();
650 			if( keepItems < result.size() )
651 				result.more = false;
652 			result.resize( result.arena(), keepItems );
653 		}
654 
655 		result.readToBegin = readToBegin;
656 		result.readThroughEnd = !result.more && readThroughEnd;
657 		result.arena().dependsOn( ryw->arena );
658 
659 		return result;
660 	}
661 
getKnownKeyRangeBack(RangeResultRef data,KeySelector begin,KeySelector end,Arena & arena)662 	static KeyRangeRef getKnownKeyRangeBack( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) {
663 		StringRef beginKey = !data.more && begin.offset<=1 ? begin.getKey() : allKeys.end;
664 		ExtStringRef endKey = end.offset>=1 ? end.getKey() : allKeys.begin;
665 
666 		if (data.readToBegin) beginKey = allKeys.begin;
667 		if (data.readThroughEnd) endKey = allKeys.end;
668 
669 		if( data.size() ) {
670 			if( data.readThrough.present() ) {
671 				beginKey = std::min( data.readThrough.get(), beginKey );
672 			}
673 			else {
674 				beginKey = !data.more && data.end()[-1].key > beginKey ? beginKey : data.end()[-1].key;
675 			}
676 
677 			endKey = data[0].key < endKey ? endKey : ExtStringRef( data[0].key, 1 );
678 		}
679 		if (beginKey >= endKey) return KeyRangeRef();
680 
681 		return KeyRangeRef( StringRef(arena, beginKey), endKey.toArena(arena));
682 	}
683 
684 	// Pre: it points to an unknown range
685 	// Decrements it to point to the unknown range just before the last nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
686 	// Returns the number of single-key empty ranges skipped
skipUncachedBack(Iter & it,Iter const & end,int iterationLimit)687 	template<class Iter> static int skipUncachedBack( Iter& it, Iter const& end, int iterationLimit ) {
688 		ExtStringRef b = it.beginKey();
689 		ExtStringRef e = it.endKey();
690 		int singleEmpty = 0;
691 		ASSERT(!it.is_unreadable() && it.is_unknown_range());
692 
693 		// b == it.beginKey()
694 		// e is the end of the contiguous empty range containing it
695 		while( it != end && --iterationLimit>=0) {
696 			if (it.is_unreadable() || it.is_empty_range()) {
697 				if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
698 					while (it.is_unreadable() || !it.is_unknown_range())
699 						++it;
700 					return singleEmpty;
701 				}
702 				singleEmpty++;
703 			} else
704 				e = b;
705 			--it;
706 			b = it.beginKey();
707 		}
708 		while (it.is_unreadable() || !it.is_unknown_range())
709 			++it;
710 		return singleEmpty;
711 	}
712 
713 	// Pre: it points to an unknown range
714 	// Returns the number of preceding empty single-key known ranges between it and the previous nontrivial known range, but no more than maxClears
715 	// Leaves it in an indeterminate state
countUncachedBack(Iter && it,int maxClears)716 	template<class Iter> static int countUncachedBack( Iter&& it, int maxClears ) {
717 		if (maxClears <= 0) return 0;
718 		ExtStringRef b = it.beginKey();
719 		ExtStringRef e = it.endKey();
720 		int singleEmpty = 0;
721 		while( b > allKeys.begin ) {
722 			if (it.is_unreadable() || it.is_empty_range()) {
723 				if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
724 					return singleEmpty;
725 				}
726 				singleEmpty++;
727 				if( singleEmpty >= maxClears )
728 					return maxClears;
729 			} else
730 				e = b;
731 			--it;
732 			b = it.beginKey();
733 		}
734 		return singleEmpty;
735 	}
736 
getRangeValueBack(ReadYourWritesTransaction * ryw,KeySelector begin,KeySelector end,GetRangeLimits limits,Iter * pit)737 	ACTOR template<class Iter> static Future< Standalone<RangeResultRef> > getRangeValueBack( ReadYourWritesTransaction *ryw, KeySelector begin, KeySelector end, GetRangeLimits limits, Iter* pit ) {
738 		state Iter& it(*pit);
739 		state Iter itEnd(*pit);
740 		state Standalone<RangeResultRef> result;
741 		state int64_t additionalRows = 0;
742 		state int itemsPastBegin = 0;
743 		state int requestCount = 0;
744 		state bool readToBegin = false;
745 		state bool readThroughEnd = false;
746 		state int actualBeginOffset = begin.offset;
747 		state int actualEndOffset = end.offset;
748 		//state UID randomID = g_nondeterministic_random->randomUniqueID();
749 
750 		resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
751 		resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
752 
753 		if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
754 			return RangeResultRef(false, false);
755 		}
756 		else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
757 			|| ( end.isFirstGreaterOrEqual() && end.getKey() == allKeys.begin ) )
758 		{
759 			return RangeResultRef(readToBegin, readThroughEnd);
760 		}
761 
762 		if( !begin.isFirstGreaterOrEqual() && begin.getKey() > end.getKey() ) {
763 			Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) );
764 			if( resolvedBegin == allKeys.begin )
765 				readToBegin = true;
766 			if( resolvedBegin == ryw->getMaxReadKey() )
767 				readThroughEnd = true;
768 
769 			if( resolvedBegin >= end.getKey() && end.offset <= 1 ) {
770 				return RangeResultRef(false, false);
771 			}
772 			else if( resolvedBegin == ryw->getMaxReadKey() ) {
773 				return RangeResultRef(readToBegin, readThroughEnd);
774 			}
775 
776 			resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
777 			resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
778 		}
779 
780 		//TraceEvent("RYWSelectorsStartReverse", randomID).detail("ByteLimit", limits.bytes).detail("RowLimit", limits.rows);
781 
782 		loop {
783 			/*TraceEvent("RYWSelectors", randomID).detail("Begin", begin.toString())
784 				.detail("End", end.toString())
785 				.detail("Reached", limits.isReached())
786 				.detail("ItemsPastBegin", itemsPastBegin)
787 				.detail("EndOffset", end.offset)
788 				.detail("ItBegin", it.beginKey())
789 				.detail("ItEnd", itEnd.beginKey())
790 				.detail("Unknown", it.is_unknown_range())
791 				.detail("Kv", it.is_kv())
792 				.detail("Requests", requestCount);*/
793 
794 			if(!result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
795 				return RangeResultRef(false, false);
796 			}
797 
798 			if( !begin.isBackward() && begin.getKey() >= ryw->getMaxReadKey() ) {
799 				return RangeResultRef(readToBegin, readThroughEnd);
800 			}
801 
802 			if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) ||
803 				( end.offset <= 1 && end.getKey() == allKeys.begin ) ) {
804 				if( begin.isFirstGreaterOrEqual() ) break;
805 				if( !result.size() ) break;
806 				Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
807 				if( resolvedBegin == allKeys.begin )
808 					readToBegin = true;
809 				if( resolvedBegin == ryw->getMaxReadKey() )
810 					readThroughEnd = true;
811 				begin = firstGreaterOrEqual( resolvedBegin );
812 				break;
813 			}
814 
815 			if (itemsPastBegin >= begin.offset - 1 && !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() < itEnd.beginKey()) {
816 				if( begin.isFirstGreaterOrEqual() ) break;
817 				return RangeResultRef(readToBegin, readThroughEnd);
818 			}
819 
820 			if( limits.isReached() && itemsPastBegin >= begin.offset-1 ) break;
821 
822 			if( end.isFirstGreaterOrEqual() && end.getKey() == it.beginKey() ) {
823 				if( itemsPastBegin >= begin.offset-1 && it == itEnd) break;
824 				--it;
825 			}
826 
827 			if (it.is_unknown_range()) {
828 				if( limits.hasByteLimit() && result.size() && itemsPastBegin >= begin.offset-1 ) {
829 					result.more = true;
830 					break;
831 				}
832 
833 				Iter ucEnd(it);
834 				int singleClears = 0;
835 				int clearLimit = requestCount ? 1 << std::min(requestCount, 20) : 0;
836 				if( it.beginKey() > itEnd.beginKey() )
837 					singleClears = std::min(skipUncachedBack(ucEnd, itEnd, BUGGIFY ? 0 : clearLimit+100), clearLimit);
838 
839 				state KeySelector read_begin;
840 				if ( ucEnd!=itEnd ) {
841 					Key k = ucEnd.beginKey().toStandaloneStringRef();
842 					read_begin = KeySelector(firstGreaterOrEqual(k), k.arena());
843 					if( begin.offset > 1 ) additionalRows += begin.offset - 1; // extra for items past end
844 				} else if( begin.offset > 1 ) {
845 					read_begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena());
846 					additionalRows += begin.offset - 1;
847 				} else {
848 					read_begin = begin;
849 					if( begin.offset < 1 ) {
850 						singleClears += countUncachedBack(std::move(ucEnd), clearLimit-singleClears);
851 						read_begin.offset -= singleClears;
852 					}
853 				}
854 
855 				additionalRows += singleClears;
856 
857 				state KeySelector read_end;
858 				if (end.isFirstGreaterOrEqual()) {
859 					Key k = it.endKey() < end.getKey() ? it.endKey().toStandaloneStringRef() : end.getKey();
860 					end = KeySelector(firstGreaterOrEqual(k), k.arena());
861 					read_end = end;
862 				} else if (end.offset < 1) {
863 					read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena());
864 					additionalRows += 1 - end.offset;
865 				} else {
866 					read_end = end;
867 					ucEnd = it;
868 
869 					singleClears = countUncached(std::move(ucEnd), ryw->getMaxReadKey(), clearLimit);
870 					read_end.offset += singleClears;
871 					additionalRows += singleClears;
872 				}
873 
874 				if(read_begin.getKey() > read_end.getKey()) {
875 					read_begin.setKey(read_end.getKey());
876 					read_begin.arena().dependsOn(read_end.arena());
877 				}
878 
879 				state GetRangeLimits requestLimit = limits;
880 				setRequestLimits(requestLimit, additionalRows, read_end.offset, requestCount);
881 				requestCount++;
882 
883 				ASSERT( !requestLimit.hasRowLimit() || requestLimit.rows > 0 );
884 				ASSERT( requestLimit.hasRowLimit() || requestLimit.hasByteLimit() );
885 
886 				//TraceEvent("RYWIssuing", randomID).detail("Begin", read_begin.toString()).detail("End", read_end.toString()).detail("Bytes", requestLimit.bytes).detail("Rows", requestLimit.rows).detail("Limits", limits.bytes).detail("Reached", limits.isReached()).detail("RequestCount", requestCount).detail("SingleClears", singleClears).detail("UcEnd", ucEnd.beginKey()).detail("MinRows", requestLimit.minRows);
887 
888 				additionalRows = 0;
889 				Standalone<RangeResultRef> snapshot_read = wait( ryw->tr.getRange( read_begin, read_end, requestLimit, true, true ) );
890 				KeyRangeRef range = getKnownKeyRangeBack( snapshot_read, read_begin, read_end, ryw->arena );
891 
892 				//TraceEvent("RYWCacheInsert", randomID).detail("Range", range).detail("ExpectedSize", snapshot_read.expectedSize()).detail("Rows", snapshot_read.size()).detail("Results", snapshot_read).detail("More", snapshot_read.more).detail("ReadToBegin", snapshot_read.readToBegin).detail("ReadThroughEnd", snapshot_read.readThroughEnd).detail("ReadThrough", snapshot_read.readThrough);
893 
894 				RangeResultRef reversed;
895 				reversed.resize(ryw->arena, snapshot_read.size());
896 				for( int i = 0; i < snapshot_read.size(); i++ ) {
897 					reversed[snapshot_read.size()-i-1] = snapshot_read[i];
898 				}
899 
900 				if( ryw->cache.insert( range, reversed ) )
901 					ryw->arena.dependsOn(snapshot_read.arena());
902 
903 				// TODO: Is there a more efficient way to deal with invalidation?
904 				resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
905 				resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
906 			} else {
907 				KeyValueRef const* end = it.is_kv() ? it.kv(ryw->arena) : nullptr;
908 				if (end != nullptr) {
909 					it.skipContiguousBack( begin.isFirstGreaterOrEqual() ? begin.getKey() : allKeys.begin );
910 					KeyValueRef const* start = it.kv(ryw->arena);
911 					ASSERT(start != nullptr);
912 
913 					int maxCount = end - start + 1;
914 					int count = 0;
915 					for(; count < maxCount && !limits.isReached(); count++ ) {
916 						limits.decrement(start[maxCount-count-1]);
917 					}
918 
919 					itemsPastBegin += maxCount - count;
920 					//TraceEvent("RYWaddKV", randomID).detail("Key", it.beginKey()).detail("Count", count).detail("MaxCount", maxCount).detail("ItemsPastBegin", itemsPastBegin);
921 					if( count ) {
922 						int size = result.size();
923 						result.resize(result.arena(),size+count);
924 						for( int i = 0; i < count; i++ ) {
925 							result[size + i] = start[maxCount-i-1];
926 						}
927 					}
928 				}
929 				if (it == itEnd) break;
930 				--it;
931 			}
932 		}
933 
934 		result.more = result.more || limits.isReached();
935 
936 		if( begin.isFirstGreaterOrEqual() ) {
937 			int keepItems = result.rend() - std::lower_bound( result.rbegin(), result.rend(), begin.getKey(), KeyValueRef::OrderByKey());
938 			if( keepItems < result.size() )
939 				result.more = false;
940 
941 			result.resize( result.arena(), keepItems );
942 		}
943 
944 		result.readToBegin = !result.more && readToBegin;
945 		result.readThroughEnd = readThroughEnd;
946 		result.arena().dependsOn( ryw->arena );
947 
948 		return result;
949 	}
950 
triggerWatches(ReadYourWritesTransaction * ryw,KeyRangeRef range,Optional<ValueRef> val,bool valueKnown=true)951 	static void triggerWatches(ReadYourWritesTransaction *ryw, KeyRangeRef range, Optional<ValueRef> val, bool valueKnown = true) {
952 		for(auto it = ryw->watchMap.lower_bound(range.begin); it != ryw->watchMap.end() && it->key < range.end; ) {
953 			auto itCopy = it;
954 			++it;
955 
956 			ASSERT( itCopy->value.size() );
957 			TEST( itCopy->value.size() > 1 ); //Multiple watches on the same key triggered by RYOW
958 
959 			for( int i = 0; i < itCopy->value.size(); i++ ) {
960 				if(itCopy->value[i]->onChangeTrigger.isSet()) {
961 					swapAndPop(&itCopy->value, i--);
962 				} else if( !valueKnown ||
963 						   (itCopy->value[i]->setPresent && (itCopy->value[i]->setValue.present() != val.present() || (val.present() && itCopy->value[i]->setValue.get() != val.get()))) ||
964 						   (itCopy->value[i]->valuePresent && (itCopy->value[i]->value.present() != val.present() || (val.present() && itCopy->value[i]->value.get() != val.get()))) ) {
965 					itCopy->value[i]->onChangeTrigger.send(Void());
966 					swapAndPop(&itCopy->value, i--);
967 				} else {
968 					itCopy->value[i]->setPresent = true;
969 					itCopy->value[i]->setValue = val.castTo<Value>();
970 				}
971 			}
972 
973 			if( itCopy->value.size() == 0 )
974 				ryw->watchMap.erase(itCopy);
975 		}
976 	}
977 
triggerWatches(ReadYourWritesTransaction * ryw,KeyRef key,Optional<ValueRef> val,bool valueKnown=true)978 	static void triggerWatches(ReadYourWritesTransaction *ryw, KeyRef key, Optional<ValueRef> val, bool valueKnown = true) {
979 		triggerWatches(ryw, singleKeyRange(key), val, valueKnown);
980 	}
981 
watch(ReadYourWritesTransaction * ryw,Key key)982 	ACTOR static Future<Void> watch( ReadYourWritesTransaction *ryw, Key key ) {
983 		state Future<Optional<Value>> val;
984 		state Future<Void> watchFuture;
985 		state Reference<Watch> watch(new Watch(key));
986 		state Promise<Void> done;
987 
988 		ryw->reading.add( done.getFuture() );
989 
990 		if(!ryw->options.readYourWritesDisabled) {
991 			ryw->watchMap[key].push_back(watch);
992 			val = readWithConflictRange( ryw, GetValueReq(key), false );
993 		}
994 		else
995 			val = ryw->tr.get(key);
996 
997 		try {
998 			wait(ryw->resetPromise.getFuture() || success(val) || watch->onChangeTrigger.getFuture());
999 		} catch( Error &e ) {
1000 			done.send(Void());
1001 			throw;
1002 		}
1003 
1004 		if( watch->onChangeTrigger.getFuture().isReady() ) {
1005 			done.send(Void());
1006 			if( watch->onChangeTrigger.getFuture().isError() )
1007 				throw watch->onChangeTrigger.getFuture().getError();
1008 			return Void();
1009 		}
1010 
1011 		watch->valuePresent = true;
1012 		watch->value = val.get();
1013 
1014 		if( watch->setPresent && ( watch->setValue.present() != watch->value.present() || (watch->value.present() && watch->setValue.get() != watch->value.get()) ) ) {
1015 			watch->onChangeTrigger.send(Void());
1016 			done.send(Void());
1017 			return Void();
1018 		}
1019 
1020 		watchFuture = ryw->tr.watch(watch); // throws if there are too many outstanding watches
1021 		done.send(Void());
1022 
1023 		wait(watchFuture);
1024 
1025 		return Void();
1026 	}
1027 
commit(ReadYourWritesTransaction * ryw)1028 	ACTOR static Future<Void> commit( ReadYourWritesTransaction *ryw ) {
1029 		try {
1030 			ryw->commitStarted = true;
1031 
1032 			Future<Void> ready = ryw->reading;
1033 			wait( ryw->resetPromise.getFuture() || ready );
1034 
1035 			if( ryw->options.readYourWritesDisabled ) {
1036 				if (ryw->resetPromise.isSet())
1037 					throw ryw->resetPromise.getFuture().getError();
1038 				wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
1039 
1040 				ryw->debugLogRetries();
1041 
1042 				if(!ryw->tr.apiVersionAtLeast(410)) {
1043 					ryw->reset();
1044 				}
1045 
1046 				return Void();
1047 			}
1048 
1049 			ryw->writeRangeToNativeTransaction(KeyRangeRef(StringRef(), allKeys.end));
1050 
1051 			auto conflictRanges = ryw->readConflicts.ranges();
1052 			for( auto iter = conflictRanges.begin(); iter != conflictRanges.end(); ++iter ) {
1053 				if( iter->value() ) {
1054 					ryw->tr.addReadConflictRange( iter->range() );
1055 				}
1056 			}
1057 
1058 			wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
1059 
1060 			ryw->debugLogRetries();
1061 			if(!ryw->tr.apiVersionAtLeast(410)) {
1062 				ryw->reset();
1063 			}
1064 
1065 			return Void();
1066 		} catch( Error &e ) {
1067 			if(!ryw->tr.apiVersionAtLeast(410)) {
1068 				ryw->commitStarted = false;
1069 				if( !ryw->resetPromise.isSet() ) {
1070 					ryw->tr.reset();
1071 					ryw->resetRyow();
1072 				}
1073 			}
1074 
1075 			throw;
1076 		}
1077 	}
1078 
onError(ReadYourWritesTransaction * ryw,Error e)1079 	ACTOR static Future<Void> onError( ReadYourWritesTransaction *ryw, Error e ) {
1080 		try {
1081 			if ( ryw->resetPromise.isSet() ) {
1082 				throw ryw->resetPromise.getFuture().getError();
1083 			}
1084 
1085 			bool retry_limit_hit = ryw->options.maxRetries != -1 && ryw->retries >= ryw->options.maxRetries;
1086 			if (ryw->retries < std::numeric_limits<int>::max())
1087 				ryw->retries++;
1088 			if(retry_limit_hit) {
1089 				throw e;
1090 			}
1091 
1092 			wait( ryw->resetPromise.getFuture() || ryw->tr.onError(e) );
1093 
1094 			ryw->debugLogRetries(e);
1095 
1096 			ryw->resetRyow();
1097 			return Void();
1098 		} catch( Error &e ) {
1099 			if ( !ryw->resetPromise.isSet() ) {
1100 				if(ryw->tr.apiVersionAtLeast(610)) {
1101 					ryw->resetPromise.sendError(transaction_cancelled());
1102 				}
1103 				else {
1104 					ryw->resetRyow();
1105 				}
1106 			}
1107 			if( e.code() == error_code_broken_promise )
1108 				throw transaction_cancelled();
1109 			throw;
1110 		}
1111 	}
1112 
getReadVersion(ReadYourWritesTransaction * ryw)1113 	ACTOR static Future<Version> getReadVersion(ReadYourWritesTransaction* ryw) {
1114 		choose{
1115 			when(Version v = wait(ryw->tr.getReadVersion())) {
1116 				return v;
1117 			}
1118 
1119 			when(wait(ryw->resetPromise.getFuture())) {
1120 				throw internal_error();
1121 			}
1122 		}
1123 	}
1124 };
1125 
ReadYourWritesTransaction(Database const & cx)1126 ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr), deferredError(cx->deferredError) {
1127 	resetTimeout();
1128 }
1129 
timebomb(double endTime,Promise<Void> resetPromise)1130 ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
1131 	if (now() < endTime) {
1132 		wait ( delayUntil( endTime ) );
1133 	}
1134 	if( !resetPromise.isSet() )
1135 		resetPromise.sendError(transaction_timed_out());
1136 	throw transaction_timed_out();
1137 }
1138 
resetTimeout()1139 void ReadYourWritesTransaction::resetTimeout() {
1140 	timeoutActor = options.timeoutInSeconds == 0.0 ? Void() : timebomb(options.timeoutInSeconds + creationTime, resetPromise);
1141 }
1142 
getReadVersion()1143 Future<Version> ReadYourWritesTransaction::getReadVersion() {
1144 	if (tr.apiVersionAtLeast(101)) {
1145 		if (resetPromise.isSet())
1146 			return resetPromise.getFuture().getError();
1147 		return RYWImpl::getReadVersion(this);
1148 	}
1149 	return tr.getReadVersion();
1150 }
1151 
getValueFromJSON(StatusObject statusObj)1152 Optional<Value> getValueFromJSON(StatusObject statusObj) {
1153 	try {
1154 		Value output = StringRef(json_spirit::write_string(json_spirit::mValue(statusObj), json_spirit::Output_options::raw_utf8).c_str());
1155 		return output;
1156 	}
1157 	catch (std::exception& e){
1158 		TraceEvent(SevError, "UnableToUnparseStatusJSON").detail("What", e.what());
1159 		throw internal_error();
1160 	}
1161 }
1162 
getJSON(Reference<ClusterConnectionFile> clusterFile)1163 ACTOR Future<Optional<Value>> getJSON(Reference<ClusterConnectionFile> clusterFile) {
1164 	StatusObject statusObj = wait(StatusClient::statusFetcher(clusterFile));
1165 	return getValueFromJSON(statusObj);
1166 }
1167 
getWorkerInterfaces(Reference<ClusterConnectionFile> clusterFile)1168 ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces (Reference<ClusterConnectionFile> clusterFile){
1169 	state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
1170 	state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);
1171 
1172 	loop{
1173 		choose {
1174 			when( vector<ClientWorkerInterface> workers = wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().getClientWorkers.getReply( GetClientWorkersRequest() ) ) : Never() ) ) {
1175 				Standalone<RangeResultRef> result;
1176 				for(auto& it : workers) {
1177 					result.push_back_deep(result.arena(), KeyValueRef(it.address().toString(), BinaryWriter::toValue(it, IncludeVersion())));
1178 				}
1179 
1180 				return result;
1181 			}
1182 			when( wait(clusterInterface->onChange()) ) {}
1183 		}
1184 	}
1185 }
1186 
get(const Key & key,bool snapshot)1187 Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool snapshot ) {
1188 	TEST(true);
1189 
1190 	if (key == LiteralStringRef("\xff\xff/status/json")){
1191 		if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1192 			return getJSON(tr.getDatabase()->getConnectionFile());
1193 		}
1194 		else {
1195 			return Optional<Value>();
1196 		}
1197 	}
1198 
1199 	if (key == LiteralStringRef("\xff\xff/cluster_file_path")) {
1200 		try {
1201 			if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1202 				Optional<Value> output = StringRef(tr.getDatabase()->getConnectionFile()->getFilename());
1203 				return output;
1204 			}
1205 		}
1206 		catch (Error &e){
1207 			return e;
1208 		}
1209 		return Optional<Value>();
1210 	}
1211 
1212 	if (key == LiteralStringRef("\xff\xff/connection_string")){
1213 		try {
1214 			if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1215 				Reference<ClusterConnectionFile> f = tr.getDatabase()->getConnectionFile();
1216 				Optional<Value> output = StringRef(f->getConnectionString().toString());
1217 				return output;
1218 			}
1219 		}
1220 		catch (Error &e){
1221 			return e;
1222 		}
1223 		return Optional<Value>();
1224 	}
1225 
1226 	if(checkUsedDuringCommit()) {
1227 		return used_during_commit();
1228 	}
1229 
1230 	if( resetPromise.isSet() )
1231 		return resetPromise.getFuture().getError();
1232 
1233 	if(key >= getMaxReadKey() && key != metadataVersionKey)
1234 		return key_outside_legal_range();
1235 
1236 	//There are no keys in the database with size greater than KEY_SIZE_LIMIT
1237 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1238 		return Optional<Value>();
1239 
1240 	Future< Optional<Value> > result = RYWImpl::readWithConflictRange( this, RYWImpl::GetValueReq(key), snapshot );
1241 	reading.add( success( result ) );
1242 	return result;
1243 }
1244 
getKey(const KeySelector & key,bool snapshot)1245 Future< Key > ReadYourWritesTransaction::getKey( const KeySelector& key, bool snapshot ) {
1246 	if(checkUsedDuringCommit()) {
1247 		return used_during_commit();
1248 	}
1249 
1250 	if( resetPromise.isSet() )
1251 		return resetPromise.getFuture().getError();
1252 
1253 	if(key.getKey() > getMaxReadKey())
1254 		return key_outside_legal_range();
1255 
1256 	Future< Key > result = RYWImpl::readWithConflictRange(this, RYWImpl::GetKeyReq(key), snapshot);
1257 	reading.add( success( result ) );
1258 	return result;
1259 }
1260 
getRange(KeySelector begin,KeySelector end,GetRangeLimits limits,bool snapshot,bool reverse)1261 Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
1262 	KeySelector begin,
1263 	KeySelector end,
1264 	GetRangeLimits limits,
1265 	bool snapshot,
1266 	bool reverse )
1267 {
1268 	if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")){
1269 		if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1270 			return getWorkerInterfaces(tr.getDatabase()->getConnectionFile());
1271 		}
1272 		else {
1273 			return Standalone<RangeResultRef>();
1274 		}
1275 	}
1276 
1277 	if(checkUsedDuringCommit()) {
1278 		return used_during_commit();
1279 	}
1280 
1281 	if( resetPromise.isSet() )
1282 		return resetPromise.getFuture().getError();
1283 
1284 	KeyRef maxKey = getMaxReadKey();
1285 	if(begin.getKey() > maxKey || end.getKey() > maxKey)
1286 		return key_outside_legal_range();
1287 
1288 	//This optimization prevents NULL operations from being added to the conflict range
1289 	if( limits.isReached() ) {
1290 		TEST(true); // RYW range read limit 0
1291 		return Standalone<RangeResultRef>();
1292 	}
1293 
1294 	if( !limits.isValid() )
1295 		return range_limits_invalid();
1296 
1297 	if( begin.orEqual )
1298 		begin.removeOrEqual(begin.arena());
1299 
1300 	if( end.orEqual )
1301 		end.removeOrEqual(end.arena());
1302 
1303 	if( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) {
1304 		TEST(true); // RYW range inverted
1305 		return Standalone<RangeResultRef>();
1306 	}
1307 
1308 	Future< Standalone<RangeResultRef> > result = reverse
1309 		? RYWImpl::readWithConflictRange( this, RYWImpl::GetRangeReq<true>(begin, end, limits), snapshot )
1310 		: RYWImpl::readWithConflictRange( this, RYWImpl::GetRangeReq<false>(begin, end, limits), snapshot );
1311 
1312 	reading.add( success( result ) );
1313 	return result;
1314 }
1315 
getRange(const KeySelector & begin,const KeySelector & end,int limit,bool snapshot,bool reverse)1316 Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
1317 	const KeySelector& begin,
1318 	const KeySelector& end,
1319 	int limit,
1320 	bool snapshot,
1321 	bool reverse )
1322 {
1323 	return getRange( begin, end, GetRangeLimits( limit ), snapshot, reverse );
1324 }
1325 
getAddressesForKey(const Key & key)1326 Future< Standalone<VectorRef<const char*> >> ReadYourWritesTransaction::getAddressesForKey( const Key& key ) {
1327 	if(checkUsedDuringCommit()) {
1328 		return used_during_commit();
1329 	}
1330 
1331 	if( resetPromise.isSet() )
1332 		return resetPromise.getFuture().getError();
1333 
1334 	// If key >= allKeys.end, then our resulting address vector will be empty.
1335 
1336 	Future< Standalone<VectorRef<const char*> >> result = waitOrError(tr.getAddressesForKey(key), resetPromise.getFuture());
1337 	reading.add( success( result ) );
1338 	return result;
1339 }
1340 
addReadConflictRange(KeyRangeRef const & keys)1341 void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) {
1342 	if(checkUsedDuringCommit()) {
1343 		throw used_during_commit();
1344 	}
1345 
1346 	if (tr.apiVersionAtLeast(300)) {
1347 		if (keys.begin > getMaxReadKey() || keys.end > getMaxReadKey()) {
1348 			throw key_outside_legal_range();
1349 		}
1350 	}
1351 
1352 	//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
1353 	//we can translate it to an equivalent one with smaller keys
1354 	KeyRef begin = keys.begin;
1355 	KeyRef end = keys.end;
1356 
1357 	if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1358 		begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1359 	if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1360 		end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1361 
1362 	KeyRangeRef r = KeyRangeRef(begin, end);
1363 
1364 	if(r.empty()) {
1365 		return;
1366 	}
1367 
1368 	if(options.readYourWritesDisabled) {
1369 		tr.addReadConflictRange(r);
1370 		return;
1371 	}
1372 
1373 	WriteMap::iterator it( &writes );
1374 	KeyRangeRef readRange( arena, r );
1375 	it.skip( readRange.begin );
1376 	updateConflictMap(readRange, it);
1377 }
1378 
updateConflictMap(KeyRef const & key,WriteMap::iterator & it)1379 void ReadYourWritesTransaction::updateConflictMap( KeyRef const& key, WriteMap::iterator& it ) {
1380 	//it.skip( key );
1381 	//ASSERT( it.beginKey() <= key && key < it.endKey() );
1382 	if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
1383 		readConflicts.insert( singleKeyRange( key, arena ), true );
1384 	}
1385 }
1386 
updateConflictMap(KeyRangeRef const & keys,WriteMap::iterator & it)1387 void ReadYourWritesTransaction::updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ) {
1388 	//it.skip( keys.begin );
1389 	//ASSERT( it.beginKey() <= keys.begin && keys.begin < it.endKey() );
1390 	for(; it.beginKey() < keys.end; ++it ) {
1391 		if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
1392 			KeyRangeRef insert_range = KeyRangeRef( std::max( keys.begin, it.beginKey().toArenaOrRef( arena ) ), std::min( keys.end, it.endKey().toArenaOrRef( arena ) ) );
1393 			if( !insert_range.empty() )
1394 				readConflicts.insert( insert_range, true );
1395 		}
1396 	}
1397 }
1398 
writeRangeToNativeTransaction(KeyRangeRef const & keys)1399 void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const& keys ) {
1400 	WriteMap::iterator it( &writes );
1401 	it.skip(keys.begin);
1402 
1403 	bool inClearRange = false;
1404 	ExtStringRef clearBegin;
1405 
1406 	//Clear ranges must be done first because of keys that are both cleared and set to a new value
1407 	for(; it.beginKey() < keys.end; ++it) {
1408 		if( it.is_cleared_range() && !inClearRange ) {
1409 			clearBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
1410 			inClearRange = true;
1411 		} else if( !it.is_cleared_range() && inClearRange ) {
1412 			tr.clear( KeyRangeRef( clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), false );
1413 			inClearRange = false;
1414 		}
1415 	}
1416 
1417 	if( inClearRange ) {
1418 		tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), keys.end), false);
1419 	}
1420 
1421 	it.skip(keys.begin);
1422 
1423 	bool inConflictRange = false;
1424 	ExtStringRef conflictBegin;
1425 
1426 	for(; it.beginKey() < keys.end; ++it) {
1427 		if( it.is_conflict_range() && !inConflictRange ) {
1428 			conflictBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
1429 			inConflictRange = true;
1430 		} else if( !it.is_conflict_range() && inConflictRange ) {
1431 			tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ) );
1432 			inConflictRange = false;
1433 		}
1434 
1435 		//SOMEDAY: make atomicOp take set to avoid switch
1436 		if( it.is_operation() ) {
1437 			auto op = it.op();
1438 			for( int i = 0; i < op.size(); ++i) {
1439 				switch(op[i].type) {
1440 					case MutationRef::SetValue:
1441 						if (op[i].value.present()) {
1442 							tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
1443 						} else {
1444 							tr.clear( it.beginKey().assertRef(), false );
1445 						}
1446 						break;
1447 					case MutationRef::AddValue:
1448 					case MutationRef::AppendIfFits:
1449 					case MutationRef::And:
1450 					case MutationRef::Or:
1451 					case MutationRef::Xor:
1452 					case MutationRef::Max:
1453 					case MutationRef::Min:
1454 					case MutationRef::SetVersionstampedKey:
1455 					case MutationRef::SetVersionstampedValue:
1456 					case MutationRef::ByteMin:
1457 					case MutationRef::ByteMax:
1458 					case MutationRef::MinV2:
1459 					case MutationRef::AndV2:
1460 					case MutationRef::CompareAndClear:
1461 						tr.atomicOp(it.beginKey().assertRef(), op[i].value.get(), op[i].type, false);
1462 						break;
1463 					default:
1464 						break;
1465 				}
1466 			}
1467 		}
1468 	}
1469 
1470 	if( inConflictRange ) {
1471 		tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), keys.end ) );
1472 	}
1473 }
1474 
ReadYourWritesTransactionOptions(Transaction const & tr)1475 ReadYourWritesTransactionOptions::ReadYourWritesTransactionOptions(Transaction const& tr) {
1476 	Database cx = tr.getDatabase();
1477 	timeoutInSeconds = cx->transactionTimeout;
1478 	maxRetries = cx->transactionMaxRetries;
1479 	reset(tr);
1480 }
1481 
reset(Transaction const & tr)1482 void ReadYourWritesTransactionOptions::reset(Transaction const& tr) {
1483 	double oldTimeout = timeoutInSeconds;
1484 	int oldMaxRetries = maxRetries;
1485 	memset(this, 0, sizeof(*this));
1486 	if( tr.apiVersionAtLeast(610) ) {
1487 		// Starting in API version 610, these options are not cleared after reset.
1488 		timeoutInSeconds = oldTimeout;
1489 		maxRetries = oldMaxRetries;
1490 	}
1491 	else {
1492 		Database cx = tr.getDatabase();
1493 		maxRetries = cx->transactionMaxRetries;
1494 		timeoutInSeconds = cx->transactionTimeout;
1495 	}
1496 	snapshotRywEnabled = tr.getDatabase()->snapshotRywEnabled;
1497 }
1498 
fullReset(Transaction const & tr)1499 void ReadYourWritesTransactionOptions::fullReset(Transaction const& tr) {
1500 	reset(tr);
1501 	Database cx = tr.getDatabase();
1502 	maxRetries = cx->transactionMaxRetries;
1503 	timeoutInSeconds = cx->transactionTimeout;
1504 }
1505 
getAndResetWriteConflictDisabled()1506 bool ReadYourWritesTransactionOptions::getAndResetWriteConflictDisabled() {
1507 	bool disabled = nextWriteDisableConflictRange;
1508 	nextWriteDisableConflictRange = false;
1509 	return disabled;
1510 }
1511 
getWriteConflicts(KeyRangeMap<bool> * result)1512 void ReadYourWritesTransaction::getWriteConflicts( KeyRangeMap<bool> *result ) {
1513 	WriteMap::iterator it( &writes );
1514 	it.skip(allKeys.begin);
1515 
1516 	bool inConflictRange = false;
1517 	ExtStringRef conflictBegin;
1518 
1519 	for(; it.beginKey() < getMaxWriteKey(); ++it) {
1520 		if( it.is_conflict_range() && !inConflictRange ) {
1521 			conflictBegin = it.beginKey();
1522 			inConflictRange = true;
1523 		} else if( !it.is_conflict_range() && inConflictRange ) {
1524 			result->insert(  KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), true );
1525 			inConflictRange = false;
1526 		}
1527 	}
1528 
1529 	if( inConflictRange ) {
1530 		result->insert(  KeyRangeRef( conflictBegin.toArenaOrRef(arena), getMaxWriteKey() ), true );
1531 	}
1532 }
1533 
atomicOp(const KeyRef & key,const ValueRef & operand,uint32_t operationType)1534 void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& operand, uint32_t operationType ) {
1535 	bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1536 
1537 	if(checkUsedDuringCommit()) {
1538 		throw used_during_commit();
1539 	}
1540 
1541 	if (key == metadataVersionKey) {
1542 		if(operationType != MutationRef::SetVersionstampedValue || operand != metadataVersionRequiredValue) {
1543 			throw client_invalid_operation();
1544 		}
1545 	}
1546 	else if(key >= getMaxWriteKey()) {
1547 		throw key_outside_legal_range();
1548 	}
1549 
1550 	if(!isValidMutationType(operationType) || !isAtomicOp((MutationRef::Type) operationType))
1551 		throw invalid_mutation_type();
1552 
1553 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1554 		throw key_too_large();
1555 	if(operand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
1556 		throw value_too_large();
1557 
1558 	if (tr.apiVersionAtLeast(510)) {
1559 		if (operationType == MutationRef::Min)
1560 			operationType = MutationRef::MinV2;
1561 		else if (operationType == MutationRef::And)
1562 			operationType = MutationRef::AndV2;
1563 	}
1564 
1565 	KeyRef k;
1566 	if(!tr.apiVersionAtLeast(520) && operationType == MutationRef::SetVersionstampedKey) {
1567 		k = key.withSuffix( LiteralStringRef("\x00\x00"), arena );
1568 	} else {
1569 		k = KeyRef( arena, key );
1570 	}
1571 	ValueRef v;
1572 	if(!tr.apiVersionAtLeast(520) && operationType == MutationRef::SetVersionstampedValue) {
1573 		v = operand.withSuffix( LiteralStringRef("\x00\x00\x00\x00"), arena );
1574 	} else {
1575 		v = ValueRef( arena, operand );
1576 	}
1577 
1578 	if(operationType == MutationRef::SetVersionstampedKey) {
1579 		KeyRangeRef range = getVersionstampKeyRange(arena, k, getMaxReadKey()); // this does validation of the key and needs to be performed before the readYourWritesDisabled path
1580 		if(!options.readYourWritesDisabled) {
1581 			writeRangeToNativeTransaction(range);
1582 			writes.addUnmodifiedAndUnreadableRange(range);
1583 		}
1584 	}
1585 
1586 	if(operationType == MutationRef::SetVersionstampedValue) {
1587 		if(v.size() < 4)
1588 			throw client_invalid_operation();
1589 		int32_t pos;
1590 		memcpy(&pos, v.end() - sizeof(int32_t), sizeof(int32_t));
1591 		pos = littleEndian32(pos);
1592 		if (pos < 0 || pos + 10 > v.size() - 4)
1593 			throw client_invalid_operation();
1594 	}
1595 
1596 	if(options.readYourWritesDisabled) {
1597 		return tr.atomicOp(k, v, (MutationRef::Type) operationType, addWriteConflict);
1598 	}
1599 
1600 	writes.mutate(k, (MutationRef::Type) operationType, v, addWriteConflict);
1601 	RYWImpl::triggerWatches(this, k, Optional<ValueRef>(), false);
1602 }
1603 
set(const KeyRef & key,const ValueRef & value)1604 void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) {
1605 	if (key == LiteralStringRef("\xff\xff/reboot_worker")){
1606 		BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest() );
1607 		return;
1608 	}
1609 	if (key == LiteralStringRef("\xff\xff/reboot_and_check_worker")){
1610 		BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest(false, true) );
1611 		return;
1612 	}
1613 	if (key == metadataVersionKey) {
1614 		throw client_invalid_operation();
1615 	}
1616 
1617 	bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1618 
1619 	if(checkUsedDuringCommit()) {
1620 		throw used_during_commit();
1621 	}
1622 
1623 	if(key >= getMaxWriteKey())
1624 		throw key_outside_legal_range();
1625 
1626 	if(options.readYourWritesDisabled ) {
1627 		return tr.set(key, value, addWriteConflict);
1628 	}
1629 
1630 	//TODO: check transaction size here
1631 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1632 		throw key_too_large();
1633 	if(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
1634 		throw value_too_large();
1635 
1636 	KeyRef k = KeyRef( arena, key );
1637 	ValueRef v = ValueRef( arena, value );
1638 
1639 	writes.mutate(k, MutationRef::SetValue, v, addWriteConflict);
1640 	RYWImpl::triggerWatches(this, key, value);
1641 }
1642 
clear(const KeyRangeRef & range)1643 void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
1644 	bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1645 
1646 	if(checkUsedDuringCommit()) {
1647 		throw used_during_commit();
1648 	}
1649 
1650 	KeyRef maxKey = getMaxWriteKey();
1651 	if(range.begin > maxKey || range.end > maxKey)
1652 		throw key_outside_legal_range();
1653 
1654 	if( options.readYourWritesDisabled ) {
1655 		return tr.clear(range, addWriteConflict);
1656 	}
1657 
1658 	//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
1659 	//we can translate it to an equivalent one with smaller keys
1660 	KeyRef begin = range.begin;
1661 	KeyRef end = range.end;
1662 
1663 	if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1664 		begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1665 	if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1666 		end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1667 
1668 	KeyRangeRef r = KeyRangeRef(begin, end);
1669 
1670 	if(r.empty()) {
1671 		return;
1672 	}
1673 
1674 	r = KeyRangeRef( arena, r );
1675 
1676 	writes.clear(r, addWriteConflict);
1677 	RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
1678 }
1679 
clear(const KeyRef & key)1680 void ReadYourWritesTransaction::clear( const KeyRef& key ) {
1681 	bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1682 
1683 	if(checkUsedDuringCommit()) {
1684 		throw used_during_commit();
1685 	}
1686 
1687 	if(key >= getMaxWriteKey())
1688 		throw key_outside_legal_range();
1689 
1690 	if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1691 		return;
1692 
1693 	if( options.readYourWritesDisabled ) {
1694 		return tr.clear(key, addWriteConflict);
1695 	}
1696 
1697 	KeyRangeRef r = singleKeyRange( key, arena );
1698 
1699 	//SOMEDAY: add an optimized single key clear to write map
1700 	writes.clear(r, addWriteConflict);
1701 
1702 	RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
1703 }
1704 
watch(const Key & key)1705 Future<Void> ReadYourWritesTransaction::watch(const Key& key) {
1706 	if(checkUsedDuringCommit()) {
1707 		return used_during_commit();
1708 	}
1709 
1710 	if( resetPromise.isSet() )
1711 		return resetPromise.getFuture().getError();
1712 
1713 	if( options.readYourWritesDisabled )
1714 		return watches_disabled();
1715 
1716 	if(key >= allKeys.end || (key >= getMaxReadKey() && key != metadataVersionKey && tr.apiVersionAtLeast(300)))
1717 		return key_outside_legal_range();
1718 
1719 	if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1720 		return key_too_large();
1721 
1722 	return RYWImpl::watch(this, key);
1723 }
1724 
addWriteConflictRange(KeyRangeRef const & keys)1725 void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) {
1726 	if(checkUsedDuringCommit()) {
1727 		throw used_during_commit();
1728 	}
1729 
1730 	if (tr.apiVersionAtLeast(300)) {
1731 		if (keys.begin > getMaxWriteKey() || keys.end > getMaxWriteKey()) {
1732 			throw key_outside_legal_range();
1733 		}
1734 	}
1735 
1736 	//There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
1737 	//we can translate it to an equivalent one with smaller keys
1738 	KeyRef begin = keys.begin;
1739 	KeyRef end = keys.end;
1740 
1741 	if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1742 		begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1743 	if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1744 		end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1745 
1746 	KeyRangeRef r = KeyRangeRef(begin, end);
1747 
1748 	if(r.empty()) {
1749 		return;
1750 	}
1751 
1752 	if(options.readYourWritesDisabled) {
1753 		tr.addWriteConflictRange(r);
1754 		return;
1755 	}
1756 
1757 	r = KeyRangeRef( arena, r );
1758 	writes.addConflictRange(r);
1759 }
1760 
commit()1761 Future<Void> ReadYourWritesTransaction::commit() {
1762 	if(checkUsedDuringCommit()) {
1763 		return used_during_commit();
1764 	}
1765 
1766 	if( resetPromise.isSet() )
1767 		return resetPromise.getFuture().getError();
1768 
1769 	return RYWImpl::commit( this );
1770 }
1771 
getVersionstamp()1772 Future<Standalone<StringRef>> ReadYourWritesTransaction::getVersionstamp() {
1773 	if(checkUsedDuringCommit()) {
1774 		return used_during_commit();
1775 	}
1776 
1777 	return waitOrError(tr.getVersionstamp(), resetPromise.getFuture());
1778 }
1779 
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)1780 void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
1781 	switch(option) {
1782 		case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
1783 			validateOptionValue(value, false);
1784 
1785 			if (!reading.isReady() || !cache.empty() || !writes.empty())
1786 				throw client_invalid_operation();
1787 
1788 			options.readYourWritesDisabled = true;
1789 			break;
1790 
1791 		case FDBTransactionOptions::READ_AHEAD_DISABLE:
1792 			validateOptionValue(value, false);
1793 
1794 			options.readAheadDisabled = true;
1795 			break;
1796 
1797 		case FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE:
1798 			validateOptionValue(value, false);
1799 
1800 			options.nextWriteDisableConflictRange = true;
1801 			break;
1802 
1803 		case FDBTransactionOptions::ACCESS_SYSTEM_KEYS:
1804 			validateOptionValue(value, false);
1805 
1806 			options.readSystemKeys = true;
1807 			options.writeSystemKeys = true;
1808 			break;
1809 
1810 		case FDBTransactionOptions::READ_SYSTEM_KEYS:
1811 			validateOptionValue(value, false);
1812 
1813 			options.readSystemKeys = true;
1814 			break;
1815 
1816 		case FDBTransactionOptions::TIMEOUT:
1817 			options.timeoutInSeconds = extractIntOption(value, 0, std::numeric_limits<int>::max())/1000.0;
1818 			resetTimeout();
1819 			break;
1820 
1821 		case FDBTransactionOptions::RETRY_LIMIT:
1822 			options.maxRetries = (int)extractIntOption(value, -1, std::numeric_limits<int>::max());
1823 			break;
1824 
1825 		case FDBTransactionOptions::DEBUG_RETRY_LOGGING:
1826 			options.debugRetryLogging = true;
1827 			if(!transactionDebugInfo) {
1828 				transactionDebugInfo = Reference<TransactionDebugInfo>::addRef(new TransactionDebugInfo());
1829 				transactionDebugInfo->lastRetryLogTime = creationTime;
1830 			}
1831 
1832 			transactionDebugInfo->transactionName = value.present() ? value.get().toString() : "";
1833 			break;
1834 		case FDBTransactionOptions::SNAPSHOT_RYW_ENABLE:
1835 			validateOptionValue(value, false);
1836 
1837 			options.snapshotRywEnabled++;
1838 			break;
1839 		case FDBTransactionOptions::SNAPSHOT_RYW_DISABLE:
1840 			validateOptionValue(value, false);
1841 
1842 			options.snapshotRywEnabled--;
1843 			break;
1844 		case FDBTransactionOptions::USED_DURING_COMMIT_PROTECTION_DISABLE:
1845 			validateOptionValue(value, false);
1846 
1847 			options.disableUsedDuringCommitProtection = true;
1848 			break;
1849 		default:
1850 			break;
1851 	}
1852 
1853 	tr.setOption( option, value );
1854 }
1855 
operator =(ReadYourWritesTransaction && r)1856 void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT {
1857 	cache = std::move( r.cache );
1858 	writes = std::move( r.writes );
1859 	arena = std::move( r.arena );
1860 	tr = std::move( r.tr );
1861 	readConflicts = std::move( r.readConflicts );
1862 	watchMap = std::move( r.watchMap );
1863 	reading = std::move( r.reading );
1864 	resetPromise = std::move( r.resetPromise );
1865 	r.resetPromise = Promise<Void>();
1866 	deferredError = std::move( r.deferredError );
1867 	retries = r.retries;
1868 	timeoutActor = r.timeoutActor;
1869 	creationTime = r.creationTime;
1870 	commitStarted = r.commitStarted;
1871 	options = r.options;
1872 	transactionDebugInfo = r.transactionDebugInfo;
1873 	cache.arena = &arena;
1874 	writes.arena = &arena;
1875 }
1876 
ReadYourWritesTransaction(ReadYourWritesTransaction && r)1877 ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT :
1878 	cache( std::move(r.cache) ),
1879 	writes( std::move(r.writes) ),
1880 	arena( std::move(r.arena) ),
1881 	reading( std::move(r.reading) ),
1882 	retries( r.retries ),
1883 	creationTime( r.creationTime ),
1884 	deferredError( std::move(r.deferredError) ),
1885 	timeoutActor( std::move(r.timeoutActor) ),
1886 	resetPromise( std::move(r.resetPromise) ),
1887 	commitStarted( r.commitStarted ),
1888 	options( r.options ),
1889 	transactionDebugInfo( r.transactionDebugInfo )
1890 {
1891 	cache.arena = &arena;
1892 	writes.arena = &arena;
1893 	tr = std::move( r.tr );
1894 	readConflicts = std::move(r.readConflicts);
1895 	watchMap = std::move( r.watchMap );
1896 	r.resetPromise = Promise<Void>();
1897 }
1898 
onError(Error const & e)1899 Future<Void> ReadYourWritesTransaction::onError(Error const& e) {
1900 	return RYWImpl::onError( this, e );
1901 }
1902 
resetRyow()1903 void ReadYourWritesTransaction::resetRyow() {
1904 	Promise<Void> oldReset = resetPromise;
1905 	resetPromise = Promise<Void>();
1906 
1907 	timeoutActor.cancel();
1908 	arena = Arena();
1909 	cache = SnapshotCache(&arena);
1910 	writes = WriteMap(&arena);
1911 	readConflicts = CoalescedKeyRefRangeMap<bool>();
1912 	watchMap.clear();
1913 	reading = AndFuture();
1914 	commitStarted = false;
1915 
1916 	deferredError = Error();
1917 
1918 	if(tr.apiVersionAtLeast(16)) {
1919 		options.reset(tr);
1920 		resetTimeout();
1921 	}
1922 
1923 	if ( !oldReset.isSet() )
1924 		oldReset.sendError(transaction_cancelled());
1925 }
1926 
cancel()1927 void ReadYourWritesTransaction::cancel() {
1928 	if(!resetPromise.isSet() )
1929 		resetPromise.sendError(transaction_cancelled());
1930 }
1931 
reset()1932 void ReadYourWritesTransaction::reset() {
1933 	retries = 0;
1934 	creationTime = now();
1935 	timeoutActor.cancel();
1936 	options.fullReset(tr);
1937 	transactionDebugInfo.clear();
1938 	tr.fullReset();
1939 	resetRyow();
1940 }
1941 
getMaxReadKey()1942 KeyRef ReadYourWritesTransaction::getMaxReadKey() {
1943 	if(options.readSystemKeys)
1944 		return systemKeys.end;
1945 	else
1946 		return normalKeys.end;
1947 }
1948 
getMaxWriteKey()1949 KeyRef ReadYourWritesTransaction::getMaxWriteKey() {
1950 	if(options.writeSystemKeys)
1951 		return systemKeys.end;
1952 	else
1953 		return normalKeys.end;
1954 }
1955 
~ReadYourWritesTransaction()1956 ReadYourWritesTransaction::~ReadYourWritesTransaction() {
1957 	if( !resetPromise.isSet() )
1958 		resetPromise.sendError(transaction_cancelled());
1959 }
1960 
checkUsedDuringCommit()1961 bool ReadYourWritesTransaction::checkUsedDuringCommit() {
1962 	if(commitStarted && !resetPromise.isSet() && !options.disableUsedDuringCommitProtection) {
1963 		resetPromise.sendError(used_during_commit());
1964 	}
1965 
1966 	return commitStarted;
1967 }
1968 
debugLogRetries(Optional<Error> error)1969 void ReadYourWritesTransaction::debugLogRetries(Optional<Error> error) {
1970 	bool committed = !error.present();
1971 	if(options.debugRetryLogging) {
1972 		double timeSinceLastLog = now() - transactionDebugInfo->lastRetryLogTime;
1973 		double elapsed = now() - creationTime;
1974 		if(timeSinceLastLog >= 1 || (committed && elapsed > 1)) {
1975 			std::string transactionNameStr = "";
1976 			if(!transactionDebugInfo->transactionName.empty())
1977 				transactionNameStr = format(" in transaction '%s'", printable(StringRef(transactionDebugInfo->transactionName)).c_str());
1978 			if(!g_network->isSimulated()) //Fuzz workload turns this on, but we do not want stderr output in simulation
1979 				fprintf(stderr, "fdb WARNING: long transaction (%.2fs elapsed%s, %d retries, %s)\n", elapsed, transactionNameStr.c_str(), retries, committed ? "committed" : error.get().what());
1980 			{
1981 				TraceEvent trace = TraceEvent("LongTransaction");
1982 				if(error.present())
1983 					trace.error(error.get(), true);
1984 				if(!transactionDebugInfo->transactionName.empty())
1985 					trace.detail("TransactionName", transactionDebugInfo->transactionName);
1986 				trace.detail("Elapsed", elapsed).detail("Retries", retries).detail("Committed", committed);
1987 			}
1988 			transactionDebugInfo->lastRetryLogTime = now();
1989 		}
1990 	}
1991 }
1992