1 /*
2 * ReadYourWrites.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbclient/ReadYourWrites.h"
22 #include "fdbclient/Atomic.h"
23 #include "fdbclient/DatabaseContext.h"
24 #include "fdbclient/StatusClient.h"
25 #include "fdbclient/MonitorLeader.h"
26 #include "flow/Util.h"
27 #include "flow/actorcompiler.h" // This must be the last #include.
28
29 class RYWImpl {
30 public:
dump(Iter it)31 template<class Iter> static void dump( Iter it ) {
32 it.skip(allKeys.begin);
33 Arena arena;
34 while( true ) {
35 Optional<StringRef> key = StringRef();
36 if (it.is_kv()) {
37 auto kv = it.kv(arena);
38 if (kv) key = kv->key;
39 }
40 TraceEvent("RYWDump")
41 .detail("Begin", it.beginKey())
42 .detail("End", it.endKey())
43 .detail("Unknown", it.is_unknown_range())
44 .detail("Empty", it.is_empty_range())
45 .detail("KV", it.is_kv())
46 .detail("Key", key.get());
47 if( it.endKey() == allKeys.end )
48 break;
49 ++it;
50 }
51 }
52
53 struct GetValueReq {
GetValueReqRYWImpl::GetValueReq54 explicit GetValueReq( Key key ) : key(key) {}
55 Key key;
56 typedef Optional<Value> Result;
57 };
58
59 struct GetKeyReq {
GetKeyReqRYWImpl::GetKeyReq60 explicit GetKeyReq( KeySelector key ) : key(key) {}
61 KeySelector key;
62 typedef Key Result;
63 };
64
65 template <bool Reverse>
66 struct GetRangeReq {
GetRangeReqRYWImpl::GetRangeReq67 GetRangeReq( KeySelector begin, KeySelector end, GetRangeLimits limits ) : begin(begin), end(end), limits(limits) {}
68 KeySelector begin, end;
69 GetRangeLimits limits;
70 typedef Standalone<RangeResultRef> Result;
71 };
72
73 // read() Performs a read (get, getKey, getRange, etc), in the context of the given transaction. Snapshot or RYW reads are distingushed by the type Iter being SnapshotCache::iterator or RYWIterator.
74 // Fills in the snapshot cache as a side effect but does not affect conflict ranges.
75 // Some (indicated) overloads of read are required to update the given *it to point to the key that was read, so that the corresponding overload of addConflictRange() can make use of it.
76
read(ReadYourWritesTransaction * ryw,GetValueReq read,Iter * it)77 ACTOR template<class Iter> static Future< Optional<Value> > read( ReadYourWritesTransaction *ryw, GetValueReq read, Iter* it ) {
78 // This overload is required to provide postcondition: it->extractWriteMapIterator().segmentContains(read.key)
79
80 it->skip(read.key);
81 state bool dependent = it->is_dependent();
82 if( it->is_kv() ) {
83 const KeyValueRef* result = it->kv(ryw->arena);
84 if (result != nullptr) {
85 return result->value;
86 } else {
87 return Optional<Value>();
88 }
89 } else if( it->is_empty_range() ) {
90 return Optional<Value>();
91 } else {
92 Optional<Value> res = wait( ryw->tr.get( read.key, true ) );
93 KeyRef k( ryw->arena, read.key );
94
95 if( res.present() ) {
96 if( ryw->cache.insert( k, res.get() ) )
97 ryw->arena.dependsOn(res.get().arena());
98 if( !dependent )
99 return res;
100 } else {
101 ryw->cache.insert( k, Optional<ValueRef>() );
102 if( !dependent )
103 return Optional<Value>();
104 }
105
106 //There was a dependent write at the key, so we need to lookup the iterator again
107 it->skip(k);
108
109 ASSERT( it->is_kv() );
110 const KeyValueRef* result = it->kv(ryw->arena);
111 if (result != nullptr) {
112 return result->value;
113 } else {
114 return Optional<Value>();
115 }
116 }
117 }
118
read(ReadYourWritesTransaction * ryw,GetKeyReq read,Iter * it)119 ACTOR template<class Iter> static Future< Key > read( ReadYourWritesTransaction* ryw, GetKeyReq read, Iter* it ) {
120 if( read.key.offset > 0 ) {
121 Standalone<RangeResultRef> result = wait( getRangeValue( ryw, read.key, firstGreaterOrEqual(ryw->getMaxReadKey()), GetRangeLimits(1), it ) );
122 if( result.readToBegin )
123 return allKeys.begin;
124 if( result.readThroughEnd || !result.size() )
125 return ryw->getMaxReadKey();
126 return result[0].key;
127 } else {
128 read.key.offset++;
129 Standalone<RangeResultRef> result = wait( getRangeValueBack( ryw, firstGreaterOrEqual(allKeys.begin), read.key, GetRangeLimits(1), it ) );
130 if( result.readThroughEnd )
131 return ryw->getMaxReadKey();
132 if( result.readToBegin || !result.size() )
133 return allKeys.begin;
134 return result[0].key;
135 }
136 };
137
read(ReadYourWritesTransaction * ryw,GetRangeReq<false> read,Iter * it)138 template <class Iter> static Future< Standalone<RangeResultRef> > read( ReadYourWritesTransaction* ryw, GetRangeReq<false> read, Iter* it ) {
139 return getRangeValue( ryw, read.begin, read.end, read.limits, it );
140 };
141
read(ReadYourWritesTransaction * ryw,GetRangeReq<true> read,Iter * it)142 template <class Iter> static Future< Standalone<RangeResultRef> > read( ReadYourWritesTransaction* ryw, GetRangeReq<true> read, Iter* it ) {
143 return getRangeValueBack( ryw, read.begin, read.end, read.limits, it );
144 };
145
146 // readThrough() performs a read in the RYW disabled case, passing it on relatively directly to the underlying transaction.
147 // Responsible for clipping results to the non-system keyspace when appropriate, since NativeAPI doesn't do that.
148
readThrough(ReadYourWritesTransaction * ryw,GetValueReq read,bool snapshot)149 static Future<Optional<Value>> readThrough( ReadYourWritesTransaction *ryw, GetValueReq read, bool snapshot ) {
150 return ryw->tr.get( read.key, snapshot );
151 }
152
readThrough(ReadYourWritesTransaction * ryw,GetKeyReq read,bool snapshot)153 ACTOR static Future<Key> readThrough( ReadYourWritesTransaction *ryw, GetKeyReq read, bool snapshot ) {
154 Key key = wait( ryw->tr.getKey( read.key, snapshot ) );
155 if (ryw->getMaxReadKey() < key) return ryw->getMaxReadKey(); // Filter out results in the system keys if they are not accessible
156 return key;
157 }
158
readThrough(ReadYourWritesTransaction * ryw,GetRangeReq<Reverse> read,bool snapshot)159 ACTOR template <bool Reverse> static Future<Standalone<RangeResultRef>> readThrough( ReadYourWritesTransaction *ryw, GetRangeReq<Reverse> read, bool snapshot ) {
160 if(Reverse && read.end.offset > 1) {
161 // FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result actually does.
162 Key key = wait( ryw->tr.getKey(read.end, snapshot) );
163 if(key > ryw->getMaxReadKey())
164 read.end = firstGreaterOrEqual(ryw->getMaxReadKey());
165 else
166 read.end = KeySelector(firstGreaterOrEqual(key), key.arena());
167 }
168
169 Standalone<RangeResultRef> v = wait( ryw->tr.getRange(read.begin, read.end, read.limits, snapshot, Reverse) );
170 KeyRef maxKey = ryw->getMaxReadKey();
171 if(v.size() > 0) {
172 if(!Reverse && v[v.size()-1].key >= maxKey) {
173 state Standalone<RangeResultRef> _v = v;
174 int i = _v.size() - 2;
175 for(; i >= 0 && _v[i].key >= maxKey; --i) { }
176 return Standalone<RangeResultRef>(RangeResultRef( VectorRef<KeyValueRef>(&_v[0], i+1), false ), _v.arena());
177 }
178 }
179
180 return v;
181 }
182
183 // addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant conflict range
184
addConflictRange(ReadYourWritesTransaction * ryw,GetValueReq read,WriteMap::iterator & it,Optional<Value> result)185 static void addConflictRange( ReadYourWritesTransaction* ryw, GetValueReq read, WriteMap::iterator& it, Optional<Value> result ) {
186 // it will already point to the right segment (see the calling code in read()), so we don't need to skip
187 // read.key will be copied into ryw->arena inside of updateConflictMap if it is being added
188 ryw->updateConflictMap(read.key, it);
189 }
190
addConflictRange(ReadYourWritesTransaction * ryw,GetKeyReq read,WriteMap::iterator & it,Key result)191 static void addConflictRange( ReadYourWritesTransaction* ryw, GetKeyReq read, WriteMap::iterator& it, Key result ) {
192 KeyRangeRef readRange;
193 if( read.key.offset <= 0 )
194 readRange = KeyRangeRef( KeyRef( ryw->arena, result ), read.key.orEqual ? keyAfter( read.key.getKey(), ryw->arena ) : KeyRef( ryw->arena, read.key.getKey() ) );
195 else
196 readRange = KeyRangeRef( read.key.orEqual ? keyAfter( read.key.getKey(), ryw->arena ) : KeyRef( ryw->arena, read.key.getKey() ), keyAfter( result, ryw->arena ) );
197
198 it.skip( readRange.begin );
199 ryw->updateConflictMap(readRange, it);
200 }
201
addConflictRange(ReadYourWritesTransaction * ryw,GetRangeReq<false> read,WriteMap::iterator & it,Standalone<RangeResultRef> const & result)202 static void addConflictRange( ReadYourWritesTransaction* ryw, GetRangeReq<false> read, WriteMap::iterator &it, Standalone<RangeResultRef> const& result ) {
203 KeyRef rangeBegin, rangeEnd;
204 bool endInArena = false;
205
206 if( read.begin.getKey() < read.end.getKey() ) {
207 rangeBegin = read.begin.getKey();
208 rangeEnd = read.end.offset > 0 && result.more ? read.begin.getKey() : read.end.getKey();
209 }
210 else {
211 rangeBegin = read.end.getKey();
212 rangeEnd = read.begin.getKey();
213 }
214
215 if( result.readToBegin && read.begin.offset <= 0 ) rangeBegin = allKeys.begin;
216 if( result.readThroughEnd && read.end.offset > 0 ) rangeEnd = ryw->getMaxReadKey();
217
218 if ( result.size() ) {
219 if( read.begin.offset <= 0 ) rangeBegin = std::min( rangeBegin, result[0].key );
220 if( rangeEnd <= result.end()[-1].key ) {
221 rangeEnd = keyAfter( result.end()[-1].key, ryw->arena );
222 endInArena = true;
223 }
224 }
225
226 KeyRangeRef readRange = KeyRangeRef( KeyRef( ryw->arena, rangeBegin ), endInArena ? rangeEnd : KeyRef( ryw->arena, rangeEnd ) );
227 it.skip( readRange.begin );
228 ryw->updateConflictMap(readRange, it);
229 }
230
addConflictRange(ReadYourWritesTransaction * ryw,GetRangeReq<true> read,WriteMap::iterator & it,Standalone<RangeResultRef> const & result)231 static void addConflictRange( ReadYourWritesTransaction* ryw, GetRangeReq<true> read, WriteMap::iterator& it, Standalone<RangeResultRef> const& result ) {
232 KeyRef rangeBegin, rangeEnd;
233 bool endInArena = false;
234
235 if( read.begin.getKey() < read.end.getKey() ) {
236 rangeBegin = read.begin.offset <= 0 && result.more ? read.end.getKey() : read.begin.getKey();
237 rangeEnd = read.end.getKey();
238 }
239 else {
240 rangeBegin = read.end.getKey();
241 rangeEnd = read.begin.getKey();
242 }
243
244 if( result.readToBegin && read.begin.offset <= 0 ) rangeBegin = allKeys.begin;
245 if( result.readThroughEnd && read.end.offset > 0 ) rangeEnd = ryw->getMaxReadKey();
246
247 if ( result.size() ) {
248 rangeBegin = std::min( rangeBegin, result.end()[-1].key );
249 if( read.end.offset > 0 && rangeEnd <= result[0].key ) {
250 rangeEnd = keyAfter( result[0].key, ryw->arena );
251 endInArena = true;
252 }
253 }
254
255 KeyRangeRef readRange = KeyRangeRef( KeyRef( ryw->arena, rangeBegin ), endInArena ? rangeEnd : KeyRef( ryw->arena, rangeEnd ) );
256 it.skip( readRange.begin );
257 ryw->updateConflictMap(readRange, it);
258 }
259
readWithConflictRangeThrough(ReadYourWritesTransaction * ryw,Req req,bool snapshot)260 ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeThrough( ReadYourWritesTransaction* ryw, Req req, bool snapshot ) {
261 choose {
262 when (typename Req::Result result = wait( readThrough( ryw, req, snapshot ) )) {
263 return result;
264 }
265 when (wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
266 }
267 }
readWithConflictRangeSnapshot(ReadYourWritesTransaction * ryw,Req req)268 ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeSnapshot( ReadYourWritesTransaction* ryw, Req req ) {
269 state SnapshotCache::iterator it(&ryw->cache, &ryw->writes);
270 choose {
271 when (typename Req::Result result = wait( read( ryw, req, &it ) )) {
272 return result;
273 }
274 when (wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
275 }
276 }
readWithConflictRangeRYW(ReadYourWritesTransaction * ryw,Req req,bool snapshot)277 ACTOR template <class Req> static Future<typename Req::Result> readWithConflictRangeRYW( ReadYourWritesTransaction* ryw, Req req, bool snapshot ) {
278 state RYWIterator it( &ryw->cache, &ryw->writes );
279 choose {
280 when (typename Req::Result result = wait( read( ryw, req, &it ) )) {
281 // Some overloads of addConflictRange() require it to point to the "right" key and others don't. The corresponding overloads of read() have to provide that guarantee!
282 if(!snapshot)
283 addConflictRange( ryw, req, it.extractWriteMapIterator(), result );
284 return result;
285 }
286 when (wait(ryw->resetPromise.getFuture())) { throw internal_error(); }
287 }
288 }
readWithConflictRange(ReadYourWritesTransaction * ryw,Req const & req,bool snapshot)289 template <class Req> static inline Future<typename Req::Result> readWithConflictRange( ReadYourWritesTransaction* ryw, Req const& req, bool snapshot ) {
290 if (ryw->options.readYourWritesDisabled) {
291 return readWithConflictRangeThrough(ryw, req, snapshot);
292 } else if (snapshot && ryw->options.snapshotRywEnabled <= 0) {
293 return readWithConflictRangeSnapshot(ryw, req);
294 }
295 return readWithConflictRangeRYW(ryw, req, snapshot);
296 }
297
resolveKeySelectorFromCache(KeySelector & key,Iter & it,KeyRef const & maxKey,bool * readToBegin,bool * readThroughEnd,int * actualOffset)298 template<class Iter> static void resolveKeySelectorFromCache( KeySelector& key, Iter& it, KeyRef const& maxKey, bool* readToBegin, bool* readThroughEnd, int* actualOffset ) {
299 // If the key indicated by `key` can be determined without reading unknown data from the snapshot, then it.kv().key is the resolved key.
300 // If the indicated key is determined to be "off the beginning or end" of the database, it points to the first or last segment in the DB,
301 // and key is an equivalent key selector relative to the beginning or end of the database.
302 // Otherwise it points to an unknown segment, and key is an equivalent key selector whose base key is in or adjoining the segment.
303
304 key.removeOrEqual(key.arena());
305
306 bool alreadyExhausted = key.offset == 1;
307
308 it.skip( key.getKey() ); // TODO: or precondition?
309
310 if ( key.offset <= 0 && it.beginKey() == key.getKey() && key.getKey() != allKeys.begin )
311 --it;
312
313 ExtStringRef keykey = key.getKey();
314 bool keyNeedsCopy = false;
315
316 // Invariant: it.beginKey() <= keykey && keykey <= it.endKey() && (key.isBackward() ? it.beginKey() != keykey : it.endKey() != keykey)
317 // Maintaining this invariant, we transform the key selector toward firstGreaterOrEqual form until we reach an unknown range or the result
318 while (key.offset > 1 && !it.is_unreadable() && !it.is_unknown_range() && it.endKey() < maxKey ) {
319 if (it.is_kv())
320 --key.offset;
321 ++it;
322 keykey = it.beginKey();
323 keyNeedsCopy = true;
324 }
325 while (key.offset < 1 && !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() != allKeys.begin) {
326 if (it.is_kv()) {
327 ++key.offset;
328 if (key.offset == 1) {
329 keykey = it.beginKey();
330 keyNeedsCopy = true;
331 break;
332 }
333 }
334 --it;
335 keykey = it.endKey();
336 keyNeedsCopy = true;
337 }
338
339 if(!alreadyExhausted) {
340 *actualOffset = key.offset;
341 }
342
343 if (!it.is_unreadable() && !it.is_unknown_range() && key.offset < 1) {
344 *readToBegin = true;
345 key.setKey(allKeys.begin);
346 key.offset = 1;
347 return;
348 }
349
350 if (!it.is_unreadable() && !it.is_unknown_range() && key.offset > 1) {
351 *readThroughEnd = true;
352 key.setKey(maxKey); // maxKey is a KeyRef, but points to a LiteralStringRef. TODO: how can we ASSERT this?
353 key.offset = 1;
354 return;
355 }
356
357 while (!it.is_unreadable() && it.is_empty_range() && it.endKey() < maxKey) {
358 ++it;
359 keykey = it.beginKey();
360 keyNeedsCopy = true;
361 }
362
363 if(keyNeedsCopy) {
364 key.setKey(keykey.toArena(key.arena()));
365 }
366 }
367
getKnownKeyRange(RangeResultRef data,KeySelector begin,KeySelector end,Arena & arena)368 static KeyRangeRef getKnownKeyRange( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) {
369 StringRef beginKey = begin.offset<=1 ? begin.getKey() : allKeys.end;
370 ExtStringRef endKey = !data.more && end.offset>=1 ? end.getKey() : allKeys.begin;
371
372 if (data.readToBegin) beginKey = allKeys.begin;
373 if (data.readThroughEnd) endKey = allKeys.end;
374
375 if( data.size() ) {
376 beginKey = std::min( beginKey, data[0].key );
377 if( data.readThrough.present() ) {
378 endKey = std::max<ExtStringRef>( endKey, data.readThrough.get() );
379 }
380 else {
381 endKey = !data.more && data.end()[-1].key < endKey ? endKey : ExtStringRef( data.end()[-1].key, 1 );
382 }
383 }
384 if (beginKey >= endKey) return KeyRangeRef();
385
386
387 return KeyRangeRef( StringRef(arena, beginKey), endKey.toArena(arena));
388 }
389
390 // Pre: it points to an unknown range
391 // Increments it to point to the unknown range just before the next nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
skipUncached(Iter & it,Iter const & end,int iterationLimit)392 template<class Iter> static int skipUncached( Iter& it, Iter const& end, int iterationLimit ) {
393 ExtStringRef b = it.beginKey();
394 ExtStringRef e = it.endKey();
395 int singleEmpty = 0;
396
397 ASSERT( !it.is_unreadable() && it.is_unknown_range() );
398
399 // b is the beginning of the most recent contiguous *empty* range
400 // e is it.endKey()
401 while( it != end && --iterationLimit>=0 ) {
402 if (it.is_unreadable() || it.is_empty_range()) {
403 if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
404 while (it.is_unreadable() || !it.is_unknown_range())
405 --it;
406 return singleEmpty;
407 }
408 singleEmpty++;
409 } else
410 b = e;
411 ++it;
412 e = it.endKey();
413 }
414 while (it.is_unreadable() || !it.is_unknown_range())
415 --it;
416 return singleEmpty;
417 }
418
419 // Pre: it points to an unknown range
420 // Returns the number of following empty single-key known ranges between it and the next nontrivial known range, but no more than maxClears
421 // Leaves `it` in an indeterminate state
countUncached(Iter && it,KeyRef maxKey,int maxClears)422 template<class Iter> static int countUncached( Iter&& it, KeyRef maxKey, int maxClears ) {
423 if (maxClears<=0) return 0;
424
425 ExtStringRef b = it.beginKey();
426 ExtStringRef e = it.endKey();
427 int singleEmpty = 0;
428
429 while( e < maxKey ) {
430 if (it.is_unreadable() || it.is_empty_range()) {
431 if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
432 return singleEmpty;
433 }
434 singleEmpty++;
435 if( singleEmpty >= maxClears )
436 return maxClears;
437 } else
438 b = e;
439 ++it;
440 e = it.endKey();
441 }
442 return singleEmpty;
443 }
444
setRequestLimits(GetRangeLimits & requestLimit,int64_t additionalRows,int offset,int requestCount)445 static void setRequestLimits(GetRangeLimits &requestLimit, int64_t additionalRows, int offset, int requestCount) {
446 requestLimit.minRows = (int)std::min(std::max(1 + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
447 if(requestLimit.hasRowLimit()) {
448 requestLimit.rows = (int)std::min(std::max(std::max(1,requestLimit.rows) + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
449 }
450
451 // Calculating request byte limit
452 if(requestLimit.bytes==0) {
453 requestLimit.bytes = CLIENT_KNOBS->BYTE_LIMIT_UNLIMITED;
454 if(!requestLimit.hasRowLimit()) {
455 requestLimit.rows = (int)std::min(std::max(std::max(1,requestLimit.rows) + additionalRows, (int64_t)offset), (int64_t)std::numeric_limits<int>::max());
456 }
457 }
458 else if(requestLimit.hasByteLimit()) {
459 requestLimit.bytes = std::min(int64_t(requestLimit.bytes)<<std::min(requestCount, 20), (int64_t)CLIENT_KNOBS->REPLY_BYTE_LIMIT);
460 }
461 }
462
463 //TODO: read to begin, read through end flags for result
getRangeValue(ReadYourWritesTransaction * ryw,KeySelector begin,KeySelector end,GetRangeLimits limits,Iter * pit)464 ACTOR template<class Iter> static Future< Standalone<RangeResultRef> > getRangeValue( ReadYourWritesTransaction *ryw, KeySelector begin, KeySelector end, GetRangeLimits limits, Iter* pit ) {
465 state Iter& it(*pit);
466 state Iter itEnd(*pit);
467 state Standalone<RangeResultRef> result;
468 state int64_t additionalRows = 0;
469 state int itemsPastEnd = 0;
470 state int requestCount = 0;
471 state bool readToBegin = false;
472 state bool readThroughEnd = false;
473 state int actualBeginOffset = begin.offset;
474 state int actualEndOffset = end.offset;
475 //state UID randomID = g_nondeterministic_random->randomUniqueID();
476
477 resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
478 resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
479
480 if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
481 return RangeResultRef(false, false);
482 }
483 else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
484 || ( end.isFirstGreaterOrEqual() && end.getKey() == allKeys.begin ) )
485 {
486 return RangeResultRef(readToBegin, readThroughEnd);
487 }
488
489 if( !end.isFirstGreaterOrEqual() && begin.getKey() > end.getKey() ) {
490 Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) );
491 if( resolvedEnd == allKeys.begin )
492 readToBegin = true;
493 if( resolvedEnd == ryw->getMaxReadKey() )
494 readThroughEnd = true;
495
496 if( begin.getKey() >= resolvedEnd && !begin.isBackward() ) {
497 return RangeResultRef(false, false);
498 }
499 else if( resolvedEnd == allKeys.begin ) {
500 return RangeResultRef(readToBegin, readThroughEnd);
501 }
502
503 resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
504 resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
505 }
506
507 //TraceEvent("RYWSelectorsStartForward", randomID).detail("ByteLimit", limits.bytes).detail("RowLimit", limits.rows);
508
509 loop {
510 /*TraceEvent("RYWSelectors", randomID).detail("Begin", begin.toString())
511 .detail("End", end.toString())
512 .detail("Reached", limits.isReached())
513 .detail("ItemsPastEnd", itemsPastEnd)
514 .detail("EndOffset", -end.offset)
515 .detail("ItBegin", it.beginKey())
516 .detail("ItEnd", itEnd.beginKey())
517 .detail("Unknown", it.is_unknown_range())
518 .detail("Requests", requestCount);*/
519
520 if( !result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
521 return RangeResultRef(false, false);
522 }
523
524 if( end.offset <= 1 && end.getKey() == allKeys.begin ) {
525 return RangeResultRef(readToBegin, readThroughEnd);
526 }
527
528 if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) ||
529 ( begin.offset >= 1 && begin.getKey() >= ryw->getMaxReadKey() ) ) {
530 if( end.isFirstGreaterOrEqual() ) break;
531 if( !result.size() ) break;
532 Key resolvedEnd = wait( read( ryw, GetKeyReq(end), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
533 if( resolvedEnd == allKeys.begin )
534 readToBegin = true;
535 if( resolvedEnd == ryw->getMaxReadKey() )
536 readThroughEnd = true;
537 end = firstGreaterOrEqual( resolvedEnd );
538 break;
539 }
540
541 if( !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() > itEnd.beginKey() ) {
542 if( end.isFirstGreaterOrEqual() ) break;
543 return RangeResultRef(readToBegin, readThroughEnd);
544 }
545
546 if( limits.isReached() && itemsPastEnd >= 1-end.offset ) break;
547
548 if (it == itEnd && ((!it.is_unreadable() && !it.is_unknown_range()) || (begin.offset > 0 && end.isFirstGreaterOrEqual() && end.getKey() == it.beginKey()))) break;
549
550 if (it.is_unknown_range()) {
551 if( limits.hasByteLimit() && result.size() && itemsPastEnd >= 1-end.offset ) {
552 result.more = true;
553 break;
554 }
555
556 Iter ucEnd(it);
557 int singleClears = 0;
558 int clearLimit = requestCount ? 1 << std::min(requestCount, 20) : 0;
559 if( it.beginKey() < itEnd.beginKey() )
560 singleClears = std::min(skipUncached(ucEnd, itEnd, BUGGIFY ? 0 : clearLimit + 100), clearLimit);
561
562 state KeySelector read_end;
563 if ( ucEnd!=itEnd ) {
564 Key k = ucEnd.endKey().toStandaloneStringRef();
565 read_end = KeySelector(firstGreaterOrEqual(k), k.arena());
566 if( end.offset < 1 ) additionalRows += 1 - end.offset; // extra for items past end
567 } else if( end.offset < 1 ) {
568 read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena());
569 additionalRows += 1 - end.offset;
570 } else {
571 read_end = end;
572 if( end.offset > 1 ) {
573 singleClears += countUncached( std::move(ucEnd), ryw->getMaxReadKey(), clearLimit-singleClears);
574 read_end.offset += singleClears;
575 }
576 }
577
578 additionalRows += singleClears;
579
580 state KeySelector read_begin;
581 if (begin.isFirstGreaterOrEqual()) {
582 Key k = it.beginKey() > begin.getKey() ? it.beginKey().toStandaloneStringRef() : Key(begin.getKey(), begin.arena());
583 begin = KeySelector(firstGreaterOrEqual(k), k.arena());
584 read_begin = begin;
585 } else if( begin.offset > 1 ) {
586 read_begin = KeySelector(firstGreaterOrEqual(begin.getKey()), begin.arena());
587 additionalRows += begin.offset - 1;
588 } else {
589 read_begin = begin;
590 ucEnd = it;
591
592 singleClears = countUncachedBack(std::move(ucEnd), clearLimit);
593 read_begin.offset -= singleClears;
594 additionalRows += singleClears;
595 }
596
597 if(read_end.getKey() < read_begin.getKey()) {
598 read_end.setKey(read_begin.getKey());
599 read_end.arena().dependsOn(read_begin.arena());
600 }
601
602 state GetRangeLimits requestLimit = limits;
603 setRequestLimits(requestLimit, additionalRows, 2-read_begin.offset, requestCount);
604 requestCount++;
605
606 ASSERT( !requestLimit.hasRowLimit() || requestLimit.rows > 0 );
607 ASSERT( requestLimit.hasRowLimit() || requestLimit.hasByteLimit() );
608
609 //TraceEvent("RYWIssuing", randomID).detail("Begin", read_begin.toString()).detail("End", read_end.toString()).detail("Bytes", requestLimit.bytes).detail("Rows", requestLimit.rows).detail("Limits", limits.bytes).detail("Reached", limits.isReached()).detail("RequestCount", requestCount).detail("SingleClears", singleClears).detail("UcEnd", ucEnd.beginKey()).detail("MinRows", requestLimit.minRows);
610
611 additionalRows = 0;
612 Standalone<RangeResultRef> snapshot_read = wait( ryw->tr.getRange( read_begin, read_end, requestLimit, true, false ) );
613 KeyRangeRef range = getKnownKeyRange( snapshot_read, read_begin, read_end, ryw->arena );
614
615 //TraceEvent("RYWCacheInsert", randomID).detail("Range", range).detail("ExpectedSize", snapshot_read.expectedSize()).detail("Rows", snapshot_read.size()).detail("Results", snapshot_read).detail("More", snapshot_read.more).detail("ReadToBegin", snapshot_read.readToBegin).detail("ReadThroughEnd", snapshot_read.readThroughEnd).detail("ReadThrough", snapshot_read.readThrough);
616
617 if( ryw->cache.insert( range, snapshot_read ) )
618 ryw->arena.dependsOn(snapshot_read.arena());
619
620 // TODO: Is there a more efficient way to deal with invalidation?
621 resolveKeySelectorFromCache( begin, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
622 resolveKeySelectorFromCache( end, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
623 } else if (it.is_kv()) {
624 KeyValueRef const* start = it.kv(ryw->arena);
625 if (start == nullptr) {
626 ++it;
627 continue;
628 }
629 it.skipContiguous( end.isFirstGreaterOrEqual() ? end.getKey() : ryw->getMaxReadKey() ); //not technically correct since this would add end.getKey(), but that is protected above
630
631 int maxCount = it.kv(ryw->arena) - start + 1;
632 int count = 0;
633 for(; count < maxCount && !limits.isReached(); count++ ) {
634 limits.decrement(start[count]);
635 }
636
637 itemsPastEnd += maxCount - count;
638
639 //TraceEvent("RYWaddKV", randomID).detail("Key", it.beginKey()).detail("Count", count).detail("MaxCount", maxCount).detail("ItemsPastEnd", itemsPastEnd);
640 if( count ) result.append( result.arena(), start, count );
641 ++it;
642 } else
643 ++it;
644 }
645
646 result.more = result.more || limits.isReached();
647
648 if( end.isFirstGreaterOrEqual() ) {
649 int keepItems = std::lower_bound( result.begin(), result.end(), end.getKey(), KeyValueRef::OrderByKey() ) - result.begin();
650 if( keepItems < result.size() )
651 result.more = false;
652 result.resize( result.arena(), keepItems );
653 }
654
655 result.readToBegin = readToBegin;
656 result.readThroughEnd = !result.more && readThroughEnd;
657 result.arena().dependsOn( ryw->arena );
658
659 return result;
660 }
661
getKnownKeyRangeBack(RangeResultRef data,KeySelector begin,KeySelector end,Arena & arena)662 static KeyRangeRef getKnownKeyRangeBack( RangeResultRef data, KeySelector begin, KeySelector end, Arena& arena ) {
663 StringRef beginKey = !data.more && begin.offset<=1 ? begin.getKey() : allKeys.end;
664 ExtStringRef endKey = end.offset>=1 ? end.getKey() : allKeys.begin;
665
666 if (data.readToBegin) beginKey = allKeys.begin;
667 if (data.readThroughEnd) endKey = allKeys.end;
668
669 if( data.size() ) {
670 if( data.readThrough.present() ) {
671 beginKey = std::min( data.readThrough.get(), beginKey );
672 }
673 else {
674 beginKey = !data.more && data.end()[-1].key > beginKey ? beginKey : data.end()[-1].key;
675 }
676
677 endKey = data[0].key < endKey ? endKey : ExtStringRef( data[0].key, 1 );
678 }
679 if (beginKey >= endKey) return KeyRangeRef();
680
681 return KeyRangeRef( StringRef(arena, beginKey), endKey.toArena(arena));
682 }
683
684 // Pre: it points to an unknown range
685 // Decrements it to point to the unknown range just before the last nontrivial known range (skips over trivial known ranges), but not more than iterationLimit ranges away
686 // Returns the number of single-key empty ranges skipped
skipUncachedBack(Iter & it,Iter const & end,int iterationLimit)687 template<class Iter> static int skipUncachedBack( Iter& it, Iter const& end, int iterationLimit ) {
688 ExtStringRef b = it.beginKey();
689 ExtStringRef e = it.endKey();
690 int singleEmpty = 0;
691 ASSERT(!it.is_unreadable() && it.is_unknown_range());
692
693 // b == it.beginKey()
694 // e is the end of the contiguous empty range containing it
695 while( it != end && --iterationLimit>=0) {
696 if (it.is_unreadable() || it.is_empty_range()) {
697 if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
698 while (it.is_unreadable() || !it.is_unknown_range())
699 ++it;
700 return singleEmpty;
701 }
702 singleEmpty++;
703 } else
704 e = b;
705 --it;
706 b = it.beginKey();
707 }
708 while (it.is_unreadable() || !it.is_unknown_range())
709 ++it;
710 return singleEmpty;
711 }
712
713 // Pre: it points to an unknown range
714 // Returns the number of preceding empty single-key known ranges between it and the previous nontrivial known range, but no more than maxClears
715 // Leaves it in an indeterminate state
countUncachedBack(Iter && it,int maxClears)716 template<class Iter> static int countUncachedBack( Iter&& it, int maxClears ) {
717 if (maxClears <= 0) return 0;
718 ExtStringRef b = it.beginKey();
719 ExtStringRef e = it.endKey();
720 int singleEmpty = 0;
721 while( b > allKeys.begin ) {
722 if (it.is_unreadable() || it.is_empty_range()) {
723 if (it.is_unreadable() || !e.isKeyAfter(b)) { //Assumes no degenerate ranges
724 return singleEmpty;
725 }
726 singleEmpty++;
727 if( singleEmpty >= maxClears )
728 return maxClears;
729 } else
730 e = b;
731 --it;
732 b = it.beginKey();
733 }
734 return singleEmpty;
735 }
736
getRangeValueBack(ReadYourWritesTransaction * ryw,KeySelector begin,KeySelector end,GetRangeLimits limits,Iter * pit)737 ACTOR template<class Iter> static Future< Standalone<RangeResultRef> > getRangeValueBack( ReadYourWritesTransaction *ryw, KeySelector begin, KeySelector end, GetRangeLimits limits, Iter* pit ) {
738 state Iter& it(*pit);
739 state Iter itEnd(*pit);
740 state Standalone<RangeResultRef> result;
741 state int64_t additionalRows = 0;
742 state int itemsPastBegin = 0;
743 state int requestCount = 0;
744 state bool readToBegin = false;
745 state bool readThroughEnd = false;
746 state int actualBeginOffset = begin.offset;
747 state int actualEndOffset = end.offset;
748 //state UID randomID = g_nondeterministic_random->randomUniqueID();
749
750 resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
751 resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
752
753 if( actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey() ) {
754 return RangeResultRef(false, false);
755 }
756 else if( ( begin.isFirstGreaterOrEqual() && begin.getKey() == ryw->getMaxReadKey() )
757 || ( end.isFirstGreaterOrEqual() && end.getKey() == allKeys.begin ) )
758 {
759 return RangeResultRef(readToBegin, readThroughEnd);
760 }
761
762 if( !begin.isFirstGreaterOrEqual() && begin.getKey() > end.getKey() ) {
763 Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) );
764 if( resolvedBegin == allKeys.begin )
765 readToBegin = true;
766 if( resolvedBegin == ryw->getMaxReadKey() )
767 readThroughEnd = true;
768
769 if( resolvedBegin >= end.getKey() && end.offset <= 1 ) {
770 return RangeResultRef(false, false);
771 }
772 else if( resolvedBegin == ryw->getMaxReadKey() ) {
773 return RangeResultRef(readToBegin, readThroughEnd);
774 }
775
776 resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
777 resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
778 }
779
780 //TraceEvent("RYWSelectorsStartReverse", randomID).detail("ByteLimit", limits.bytes).detail("RowLimit", limits.rows);
781
782 loop {
783 /*TraceEvent("RYWSelectors", randomID).detail("Begin", begin.toString())
784 .detail("End", end.toString())
785 .detail("Reached", limits.isReached())
786 .detail("ItemsPastBegin", itemsPastBegin)
787 .detail("EndOffset", end.offset)
788 .detail("ItBegin", it.beginKey())
789 .detail("ItEnd", itEnd.beginKey())
790 .detail("Unknown", it.is_unknown_range())
791 .detail("Kv", it.is_kv())
792 .detail("Requests", requestCount);*/
793
794 if(!result.size() && actualBeginOffset >= actualEndOffset && begin.getKey() >= end.getKey()) {
795 return RangeResultRef(false, false);
796 }
797
798 if( !begin.isBackward() && begin.getKey() >= ryw->getMaxReadKey() ) {
799 return RangeResultRef(readToBegin, readThroughEnd);
800 }
801
802 if( ( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) ||
803 ( end.offset <= 1 && end.getKey() == allKeys.begin ) ) {
804 if( begin.isFirstGreaterOrEqual() ) break;
805 if( !result.size() ) break;
806 Key resolvedBegin = wait( read( ryw, GetKeyReq(begin), pit ) ); //do not worry about iterator invalidation, because we are breaking for the loop
807 if( resolvedBegin == allKeys.begin )
808 readToBegin = true;
809 if( resolvedBegin == ryw->getMaxReadKey() )
810 readThroughEnd = true;
811 begin = firstGreaterOrEqual( resolvedBegin );
812 break;
813 }
814
815 if (itemsPastBegin >= begin.offset - 1 && !it.is_unreadable() && !it.is_unknown_range() && it.beginKey() < itEnd.beginKey()) {
816 if( begin.isFirstGreaterOrEqual() ) break;
817 return RangeResultRef(readToBegin, readThroughEnd);
818 }
819
820 if( limits.isReached() && itemsPastBegin >= begin.offset-1 ) break;
821
822 if( end.isFirstGreaterOrEqual() && end.getKey() == it.beginKey() ) {
823 if( itemsPastBegin >= begin.offset-1 && it == itEnd) break;
824 --it;
825 }
826
827 if (it.is_unknown_range()) {
828 if( limits.hasByteLimit() && result.size() && itemsPastBegin >= begin.offset-1 ) {
829 result.more = true;
830 break;
831 }
832
833 Iter ucEnd(it);
834 int singleClears = 0;
835 int clearLimit = requestCount ? 1 << std::min(requestCount, 20) : 0;
836 if( it.beginKey() > itEnd.beginKey() )
837 singleClears = std::min(skipUncachedBack(ucEnd, itEnd, BUGGIFY ? 0 : clearLimit+100), clearLimit);
838
839 state KeySelector read_begin;
840 if ( ucEnd!=itEnd ) {
841 Key k = ucEnd.beginKey().toStandaloneStringRef();
842 read_begin = KeySelector(firstGreaterOrEqual(k), k.arena());
843 if( begin.offset > 1 ) additionalRows += begin.offset - 1; // extra for items past end
844 } else if( begin.offset > 1 ) {
845 read_begin = KeySelector(firstGreaterOrEqual( begin.getKey() ), begin.arena());
846 additionalRows += begin.offset - 1;
847 } else {
848 read_begin = begin;
849 if( begin.offset < 1 ) {
850 singleClears += countUncachedBack(std::move(ucEnd), clearLimit-singleClears);
851 read_begin.offset -= singleClears;
852 }
853 }
854
855 additionalRows += singleClears;
856
857 state KeySelector read_end;
858 if (end.isFirstGreaterOrEqual()) {
859 Key k = it.endKey() < end.getKey() ? it.endKey().toStandaloneStringRef() : end.getKey();
860 end = KeySelector(firstGreaterOrEqual(k), k.arena());
861 read_end = end;
862 } else if (end.offset < 1) {
863 read_end = KeySelector(firstGreaterOrEqual(end.getKey()), end.arena());
864 additionalRows += 1 - end.offset;
865 } else {
866 read_end = end;
867 ucEnd = it;
868
869 singleClears = countUncached(std::move(ucEnd), ryw->getMaxReadKey(), clearLimit);
870 read_end.offset += singleClears;
871 additionalRows += singleClears;
872 }
873
874 if(read_begin.getKey() > read_end.getKey()) {
875 read_begin.setKey(read_end.getKey());
876 read_begin.arena().dependsOn(read_end.arena());
877 }
878
879 state GetRangeLimits requestLimit = limits;
880 setRequestLimits(requestLimit, additionalRows, read_end.offset, requestCount);
881 requestCount++;
882
883 ASSERT( !requestLimit.hasRowLimit() || requestLimit.rows > 0 );
884 ASSERT( requestLimit.hasRowLimit() || requestLimit.hasByteLimit() );
885
886 //TraceEvent("RYWIssuing", randomID).detail("Begin", read_begin.toString()).detail("End", read_end.toString()).detail("Bytes", requestLimit.bytes).detail("Rows", requestLimit.rows).detail("Limits", limits.bytes).detail("Reached", limits.isReached()).detail("RequestCount", requestCount).detail("SingleClears", singleClears).detail("UcEnd", ucEnd.beginKey()).detail("MinRows", requestLimit.minRows);
887
888 additionalRows = 0;
889 Standalone<RangeResultRef> snapshot_read = wait( ryw->tr.getRange( read_begin, read_end, requestLimit, true, true ) );
890 KeyRangeRef range = getKnownKeyRangeBack( snapshot_read, read_begin, read_end, ryw->arena );
891
892 //TraceEvent("RYWCacheInsert", randomID).detail("Range", range).detail("ExpectedSize", snapshot_read.expectedSize()).detail("Rows", snapshot_read.size()).detail("Results", snapshot_read).detail("More", snapshot_read.more).detail("ReadToBegin", snapshot_read.readToBegin).detail("ReadThroughEnd", snapshot_read.readThroughEnd).detail("ReadThrough", snapshot_read.readThrough);
893
894 RangeResultRef reversed;
895 reversed.resize(ryw->arena, snapshot_read.size());
896 for( int i = 0; i < snapshot_read.size(); i++ ) {
897 reversed[snapshot_read.size()-i-1] = snapshot_read[i];
898 }
899
900 if( ryw->cache.insert( range, reversed ) )
901 ryw->arena.dependsOn(snapshot_read.arena());
902
903 // TODO: Is there a more efficient way to deal with invalidation?
904 resolveKeySelectorFromCache( end, it, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualEndOffset );
905 resolveKeySelectorFromCache( begin, itEnd, ryw->getMaxReadKey(), &readToBegin, &readThroughEnd, &actualBeginOffset );
906 } else {
907 KeyValueRef const* end = it.is_kv() ? it.kv(ryw->arena) : nullptr;
908 if (end != nullptr) {
909 it.skipContiguousBack( begin.isFirstGreaterOrEqual() ? begin.getKey() : allKeys.begin );
910 KeyValueRef const* start = it.kv(ryw->arena);
911 ASSERT(start != nullptr);
912
913 int maxCount = end - start + 1;
914 int count = 0;
915 for(; count < maxCount && !limits.isReached(); count++ ) {
916 limits.decrement(start[maxCount-count-1]);
917 }
918
919 itemsPastBegin += maxCount - count;
920 //TraceEvent("RYWaddKV", randomID).detail("Key", it.beginKey()).detail("Count", count).detail("MaxCount", maxCount).detail("ItemsPastBegin", itemsPastBegin);
921 if( count ) {
922 int size = result.size();
923 result.resize(result.arena(),size+count);
924 for( int i = 0; i < count; i++ ) {
925 result[size + i] = start[maxCount-i-1];
926 }
927 }
928 }
929 if (it == itEnd) break;
930 --it;
931 }
932 }
933
934 result.more = result.more || limits.isReached();
935
936 if( begin.isFirstGreaterOrEqual() ) {
937 int keepItems = result.rend() - std::lower_bound( result.rbegin(), result.rend(), begin.getKey(), KeyValueRef::OrderByKey());
938 if( keepItems < result.size() )
939 result.more = false;
940
941 result.resize( result.arena(), keepItems );
942 }
943
944 result.readToBegin = !result.more && readToBegin;
945 result.readThroughEnd = readThroughEnd;
946 result.arena().dependsOn( ryw->arena );
947
948 return result;
949 }
950
triggerWatches(ReadYourWritesTransaction * ryw,KeyRangeRef range,Optional<ValueRef> val,bool valueKnown=true)951 static void triggerWatches(ReadYourWritesTransaction *ryw, KeyRangeRef range, Optional<ValueRef> val, bool valueKnown = true) {
952 for(auto it = ryw->watchMap.lower_bound(range.begin); it != ryw->watchMap.end() && it->key < range.end; ) {
953 auto itCopy = it;
954 ++it;
955
956 ASSERT( itCopy->value.size() );
957 TEST( itCopy->value.size() > 1 ); //Multiple watches on the same key triggered by RYOW
958
959 for( int i = 0; i < itCopy->value.size(); i++ ) {
960 if(itCopy->value[i]->onChangeTrigger.isSet()) {
961 swapAndPop(&itCopy->value, i--);
962 } else if( !valueKnown ||
963 (itCopy->value[i]->setPresent && (itCopy->value[i]->setValue.present() != val.present() || (val.present() && itCopy->value[i]->setValue.get() != val.get()))) ||
964 (itCopy->value[i]->valuePresent && (itCopy->value[i]->value.present() != val.present() || (val.present() && itCopy->value[i]->value.get() != val.get()))) ) {
965 itCopy->value[i]->onChangeTrigger.send(Void());
966 swapAndPop(&itCopy->value, i--);
967 } else {
968 itCopy->value[i]->setPresent = true;
969 itCopy->value[i]->setValue = val.castTo<Value>();
970 }
971 }
972
973 if( itCopy->value.size() == 0 )
974 ryw->watchMap.erase(itCopy);
975 }
976 }
977
triggerWatches(ReadYourWritesTransaction * ryw,KeyRef key,Optional<ValueRef> val,bool valueKnown=true)978 static void triggerWatches(ReadYourWritesTransaction *ryw, KeyRef key, Optional<ValueRef> val, bool valueKnown = true) {
979 triggerWatches(ryw, singleKeyRange(key), val, valueKnown);
980 }
981
watch(ReadYourWritesTransaction * ryw,Key key)982 ACTOR static Future<Void> watch( ReadYourWritesTransaction *ryw, Key key ) {
983 state Future<Optional<Value>> val;
984 state Future<Void> watchFuture;
985 state Reference<Watch> watch(new Watch(key));
986 state Promise<Void> done;
987
988 ryw->reading.add( done.getFuture() );
989
990 if(!ryw->options.readYourWritesDisabled) {
991 ryw->watchMap[key].push_back(watch);
992 val = readWithConflictRange( ryw, GetValueReq(key), false );
993 }
994 else
995 val = ryw->tr.get(key);
996
997 try {
998 wait(ryw->resetPromise.getFuture() || success(val) || watch->onChangeTrigger.getFuture());
999 } catch( Error &e ) {
1000 done.send(Void());
1001 throw;
1002 }
1003
1004 if( watch->onChangeTrigger.getFuture().isReady() ) {
1005 done.send(Void());
1006 if( watch->onChangeTrigger.getFuture().isError() )
1007 throw watch->onChangeTrigger.getFuture().getError();
1008 return Void();
1009 }
1010
1011 watch->valuePresent = true;
1012 watch->value = val.get();
1013
1014 if( watch->setPresent && ( watch->setValue.present() != watch->value.present() || (watch->value.present() && watch->setValue.get() != watch->value.get()) ) ) {
1015 watch->onChangeTrigger.send(Void());
1016 done.send(Void());
1017 return Void();
1018 }
1019
1020 watchFuture = ryw->tr.watch(watch); // throws if there are too many outstanding watches
1021 done.send(Void());
1022
1023 wait(watchFuture);
1024
1025 return Void();
1026 }
1027
commit(ReadYourWritesTransaction * ryw)1028 ACTOR static Future<Void> commit( ReadYourWritesTransaction *ryw ) {
1029 try {
1030 ryw->commitStarted = true;
1031
1032 Future<Void> ready = ryw->reading;
1033 wait( ryw->resetPromise.getFuture() || ready );
1034
1035 if( ryw->options.readYourWritesDisabled ) {
1036 if (ryw->resetPromise.isSet())
1037 throw ryw->resetPromise.getFuture().getError();
1038 wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
1039
1040 ryw->debugLogRetries();
1041
1042 if(!ryw->tr.apiVersionAtLeast(410)) {
1043 ryw->reset();
1044 }
1045
1046 return Void();
1047 }
1048
1049 ryw->writeRangeToNativeTransaction(KeyRangeRef(StringRef(), allKeys.end));
1050
1051 auto conflictRanges = ryw->readConflicts.ranges();
1052 for( auto iter = conflictRanges.begin(); iter != conflictRanges.end(); ++iter ) {
1053 if( iter->value() ) {
1054 ryw->tr.addReadConflictRange( iter->range() );
1055 }
1056 }
1057
1058 wait( ryw->resetPromise.getFuture() || ryw->tr.commit() );
1059
1060 ryw->debugLogRetries();
1061 if(!ryw->tr.apiVersionAtLeast(410)) {
1062 ryw->reset();
1063 }
1064
1065 return Void();
1066 } catch( Error &e ) {
1067 if(!ryw->tr.apiVersionAtLeast(410)) {
1068 ryw->commitStarted = false;
1069 if( !ryw->resetPromise.isSet() ) {
1070 ryw->tr.reset();
1071 ryw->resetRyow();
1072 }
1073 }
1074
1075 throw;
1076 }
1077 }
1078
onError(ReadYourWritesTransaction * ryw,Error e)1079 ACTOR static Future<Void> onError( ReadYourWritesTransaction *ryw, Error e ) {
1080 try {
1081 if ( ryw->resetPromise.isSet() ) {
1082 throw ryw->resetPromise.getFuture().getError();
1083 }
1084
1085 bool retry_limit_hit = ryw->options.maxRetries != -1 && ryw->retries >= ryw->options.maxRetries;
1086 if (ryw->retries < std::numeric_limits<int>::max())
1087 ryw->retries++;
1088 if(retry_limit_hit) {
1089 throw e;
1090 }
1091
1092 wait( ryw->resetPromise.getFuture() || ryw->tr.onError(e) );
1093
1094 ryw->debugLogRetries(e);
1095
1096 ryw->resetRyow();
1097 return Void();
1098 } catch( Error &e ) {
1099 if ( !ryw->resetPromise.isSet() ) {
1100 if(ryw->tr.apiVersionAtLeast(610)) {
1101 ryw->resetPromise.sendError(transaction_cancelled());
1102 }
1103 else {
1104 ryw->resetRyow();
1105 }
1106 }
1107 if( e.code() == error_code_broken_promise )
1108 throw transaction_cancelled();
1109 throw;
1110 }
1111 }
1112
getReadVersion(ReadYourWritesTransaction * ryw)1113 ACTOR static Future<Version> getReadVersion(ReadYourWritesTransaction* ryw) {
1114 choose{
1115 when(Version v = wait(ryw->tr.getReadVersion())) {
1116 return v;
1117 }
1118
1119 when(wait(ryw->resetPromise.getFuture())) {
1120 throw internal_error();
1121 }
1122 }
1123 }
1124 };
1125
ReadYourWritesTransaction(Database const & cx)1126 ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr), deferredError(cx->deferredError) {
1127 resetTimeout();
1128 }
1129
timebomb(double endTime,Promise<Void> resetPromise)1130 ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
1131 if (now() < endTime) {
1132 wait ( delayUntil( endTime ) );
1133 }
1134 if( !resetPromise.isSet() )
1135 resetPromise.sendError(transaction_timed_out());
1136 throw transaction_timed_out();
1137 }
1138
resetTimeout()1139 void ReadYourWritesTransaction::resetTimeout() {
1140 timeoutActor = options.timeoutInSeconds == 0.0 ? Void() : timebomb(options.timeoutInSeconds + creationTime, resetPromise);
1141 }
1142
getReadVersion()1143 Future<Version> ReadYourWritesTransaction::getReadVersion() {
1144 if (tr.apiVersionAtLeast(101)) {
1145 if (resetPromise.isSet())
1146 return resetPromise.getFuture().getError();
1147 return RYWImpl::getReadVersion(this);
1148 }
1149 return tr.getReadVersion();
1150 }
1151
getValueFromJSON(StatusObject statusObj)1152 Optional<Value> getValueFromJSON(StatusObject statusObj) {
1153 try {
1154 Value output = StringRef(json_spirit::write_string(json_spirit::mValue(statusObj), json_spirit::Output_options::raw_utf8).c_str());
1155 return output;
1156 }
1157 catch (std::exception& e){
1158 TraceEvent(SevError, "UnableToUnparseStatusJSON").detail("What", e.what());
1159 throw internal_error();
1160 }
1161 }
1162
getJSON(Reference<ClusterConnectionFile> clusterFile)1163 ACTOR Future<Optional<Value>> getJSON(Reference<ClusterConnectionFile> clusterFile) {
1164 StatusObject statusObj = wait(StatusClient::statusFetcher(clusterFile));
1165 return getValueFromJSON(statusObj);
1166 }
1167
getWorkerInterfaces(Reference<ClusterConnectionFile> clusterFile)1168 ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces (Reference<ClusterConnectionFile> clusterFile){
1169 state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>);
1170 state Future<Void> leaderMon = monitorLeader<ClusterInterface>(clusterFile, clusterInterface);
1171
1172 loop{
1173 choose {
1174 when( vector<ClientWorkerInterface> workers = wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().getClientWorkers.getReply( GetClientWorkersRequest() ) ) : Never() ) ) {
1175 Standalone<RangeResultRef> result;
1176 for(auto& it : workers) {
1177 result.push_back_deep(result.arena(), KeyValueRef(it.address().toString(), BinaryWriter::toValue(it, IncludeVersion())));
1178 }
1179
1180 return result;
1181 }
1182 when( wait(clusterInterface->onChange()) ) {}
1183 }
1184 }
1185 }
1186
get(const Key & key,bool snapshot)1187 Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool snapshot ) {
1188 TEST(true);
1189
1190 if (key == LiteralStringRef("\xff\xff/status/json")){
1191 if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1192 return getJSON(tr.getDatabase()->getConnectionFile());
1193 }
1194 else {
1195 return Optional<Value>();
1196 }
1197 }
1198
1199 if (key == LiteralStringRef("\xff\xff/cluster_file_path")) {
1200 try {
1201 if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1202 Optional<Value> output = StringRef(tr.getDatabase()->getConnectionFile()->getFilename());
1203 return output;
1204 }
1205 }
1206 catch (Error &e){
1207 return e;
1208 }
1209 return Optional<Value>();
1210 }
1211
1212 if (key == LiteralStringRef("\xff\xff/connection_string")){
1213 try {
1214 if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1215 Reference<ClusterConnectionFile> f = tr.getDatabase()->getConnectionFile();
1216 Optional<Value> output = StringRef(f->getConnectionString().toString());
1217 return output;
1218 }
1219 }
1220 catch (Error &e){
1221 return e;
1222 }
1223 return Optional<Value>();
1224 }
1225
1226 if(checkUsedDuringCommit()) {
1227 return used_during_commit();
1228 }
1229
1230 if( resetPromise.isSet() )
1231 return resetPromise.getFuture().getError();
1232
1233 if(key >= getMaxReadKey() && key != metadataVersionKey)
1234 return key_outside_legal_range();
1235
1236 //There are no keys in the database with size greater than KEY_SIZE_LIMIT
1237 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1238 return Optional<Value>();
1239
1240 Future< Optional<Value> > result = RYWImpl::readWithConflictRange( this, RYWImpl::GetValueReq(key), snapshot );
1241 reading.add( success( result ) );
1242 return result;
1243 }
1244
getKey(const KeySelector & key,bool snapshot)1245 Future< Key > ReadYourWritesTransaction::getKey( const KeySelector& key, bool snapshot ) {
1246 if(checkUsedDuringCommit()) {
1247 return used_during_commit();
1248 }
1249
1250 if( resetPromise.isSet() )
1251 return resetPromise.getFuture().getError();
1252
1253 if(key.getKey() > getMaxReadKey())
1254 return key_outside_legal_range();
1255
1256 Future< Key > result = RYWImpl::readWithConflictRange(this, RYWImpl::GetKeyReq(key), snapshot);
1257 reading.add( success( result ) );
1258 return result;
1259 }
1260
getRange(KeySelector begin,KeySelector end,GetRangeLimits limits,bool snapshot,bool reverse)1261 Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
1262 KeySelector begin,
1263 KeySelector end,
1264 GetRangeLimits limits,
1265 bool snapshot,
1266 bool reverse )
1267 {
1268 if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")){
1269 if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
1270 return getWorkerInterfaces(tr.getDatabase()->getConnectionFile());
1271 }
1272 else {
1273 return Standalone<RangeResultRef>();
1274 }
1275 }
1276
1277 if(checkUsedDuringCommit()) {
1278 return used_during_commit();
1279 }
1280
1281 if( resetPromise.isSet() )
1282 return resetPromise.getFuture().getError();
1283
1284 KeyRef maxKey = getMaxReadKey();
1285 if(begin.getKey() > maxKey || end.getKey() > maxKey)
1286 return key_outside_legal_range();
1287
1288 //This optimization prevents NULL operations from being added to the conflict range
1289 if( limits.isReached() ) {
1290 TEST(true); // RYW range read limit 0
1291 return Standalone<RangeResultRef>();
1292 }
1293
1294 if( !limits.isValid() )
1295 return range_limits_invalid();
1296
1297 if( begin.orEqual )
1298 begin.removeOrEqual(begin.arena());
1299
1300 if( end.orEqual )
1301 end.removeOrEqual(end.arena());
1302
1303 if( begin.offset >= end.offset && begin.getKey() >= end.getKey() ) {
1304 TEST(true); // RYW range inverted
1305 return Standalone<RangeResultRef>();
1306 }
1307
1308 Future< Standalone<RangeResultRef> > result = reverse
1309 ? RYWImpl::readWithConflictRange( this, RYWImpl::GetRangeReq<true>(begin, end, limits), snapshot )
1310 : RYWImpl::readWithConflictRange( this, RYWImpl::GetRangeReq<false>(begin, end, limits), snapshot );
1311
1312 reading.add( success( result ) );
1313 return result;
1314 }
1315
getRange(const KeySelector & begin,const KeySelector & end,int limit,bool snapshot,bool reverse)1316 Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
1317 const KeySelector& begin,
1318 const KeySelector& end,
1319 int limit,
1320 bool snapshot,
1321 bool reverse )
1322 {
1323 return getRange( begin, end, GetRangeLimits( limit ), snapshot, reverse );
1324 }
1325
getAddressesForKey(const Key & key)1326 Future< Standalone<VectorRef<const char*> >> ReadYourWritesTransaction::getAddressesForKey( const Key& key ) {
1327 if(checkUsedDuringCommit()) {
1328 return used_during_commit();
1329 }
1330
1331 if( resetPromise.isSet() )
1332 return resetPromise.getFuture().getError();
1333
1334 // If key >= allKeys.end, then our resulting address vector will be empty.
1335
1336 Future< Standalone<VectorRef<const char*> >> result = waitOrError(tr.getAddressesForKey(key), resetPromise.getFuture());
1337 reading.add( success( result ) );
1338 return result;
1339 }
1340
addReadConflictRange(KeyRangeRef const & keys)1341 void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) {
1342 if(checkUsedDuringCommit()) {
1343 throw used_during_commit();
1344 }
1345
1346 if (tr.apiVersionAtLeast(300)) {
1347 if (keys.begin > getMaxReadKey() || keys.end > getMaxReadKey()) {
1348 throw key_outside_legal_range();
1349 }
1350 }
1351
1352 //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
1353 //we can translate it to an equivalent one with smaller keys
1354 KeyRef begin = keys.begin;
1355 KeyRef end = keys.end;
1356
1357 if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1358 begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1359 if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1360 end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1361
1362 KeyRangeRef r = KeyRangeRef(begin, end);
1363
1364 if(r.empty()) {
1365 return;
1366 }
1367
1368 if(options.readYourWritesDisabled) {
1369 tr.addReadConflictRange(r);
1370 return;
1371 }
1372
1373 WriteMap::iterator it( &writes );
1374 KeyRangeRef readRange( arena, r );
1375 it.skip( readRange.begin );
1376 updateConflictMap(readRange, it);
1377 }
1378
updateConflictMap(KeyRef const & key,WriteMap::iterator & it)1379 void ReadYourWritesTransaction::updateConflictMap( KeyRef const& key, WriteMap::iterator& it ) {
1380 //it.skip( key );
1381 //ASSERT( it.beginKey() <= key && key < it.endKey() );
1382 if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
1383 readConflicts.insert( singleKeyRange( key, arena ), true );
1384 }
1385 }
1386
updateConflictMap(KeyRangeRef const & keys,WriteMap::iterator & it)1387 void ReadYourWritesTransaction::updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ) {
1388 //it.skip( keys.begin );
1389 //ASSERT( it.beginKey() <= keys.begin && keys.begin < it.endKey() );
1390 for(; it.beginKey() < keys.end; ++it ) {
1391 if( it.is_unmodified_range() || ( it.is_operation() && !it.is_independent() ) ) {
1392 KeyRangeRef insert_range = KeyRangeRef( std::max( keys.begin, it.beginKey().toArenaOrRef( arena ) ), std::min( keys.end, it.endKey().toArenaOrRef( arena ) ) );
1393 if( !insert_range.empty() )
1394 readConflicts.insert( insert_range, true );
1395 }
1396 }
1397 }
1398
writeRangeToNativeTransaction(KeyRangeRef const & keys)1399 void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const& keys ) {
1400 WriteMap::iterator it( &writes );
1401 it.skip(keys.begin);
1402
1403 bool inClearRange = false;
1404 ExtStringRef clearBegin;
1405
1406 //Clear ranges must be done first because of keys that are both cleared and set to a new value
1407 for(; it.beginKey() < keys.end; ++it) {
1408 if( it.is_cleared_range() && !inClearRange ) {
1409 clearBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
1410 inClearRange = true;
1411 } else if( !it.is_cleared_range() && inClearRange ) {
1412 tr.clear( KeyRangeRef( clearBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), false );
1413 inClearRange = false;
1414 }
1415 }
1416
1417 if( inClearRange ) {
1418 tr.clear(KeyRangeRef(clearBegin.toArenaOrRef(arena), keys.end), false);
1419 }
1420
1421 it.skip(keys.begin);
1422
1423 bool inConflictRange = false;
1424 ExtStringRef conflictBegin;
1425
1426 for(; it.beginKey() < keys.end; ++it) {
1427 if( it.is_conflict_range() && !inConflictRange ) {
1428 conflictBegin = std::max(ExtStringRef(keys.begin), it.beginKey());
1429 inConflictRange = true;
1430 } else if( !it.is_conflict_range() && inConflictRange ) {
1431 tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ) );
1432 inConflictRange = false;
1433 }
1434
1435 //SOMEDAY: make atomicOp take set to avoid switch
1436 if( it.is_operation() ) {
1437 auto op = it.op();
1438 for( int i = 0; i < op.size(); ++i) {
1439 switch(op[i].type) {
1440 case MutationRef::SetValue:
1441 if (op[i].value.present()) {
1442 tr.set( it.beginKey().assertRef(), op[i].value.get(), false );
1443 } else {
1444 tr.clear( it.beginKey().assertRef(), false );
1445 }
1446 break;
1447 case MutationRef::AddValue:
1448 case MutationRef::AppendIfFits:
1449 case MutationRef::And:
1450 case MutationRef::Or:
1451 case MutationRef::Xor:
1452 case MutationRef::Max:
1453 case MutationRef::Min:
1454 case MutationRef::SetVersionstampedKey:
1455 case MutationRef::SetVersionstampedValue:
1456 case MutationRef::ByteMin:
1457 case MutationRef::ByteMax:
1458 case MutationRef::MinV2:
1459 case MutationRef::AndV2:
1460 case MutationRef::CompareAndClear:
1461 tr.atomicOp(it.beginKey().assertRef(), op[i].value.get(), op[i].type, false);
1462 break;
1463 default:
1464 break;
1465 }
1466 }
1467 }
1468 }
1469
1470 if( inConflictRange ) {
1471 tr.addWriteConflictRange( KeyRangeRef( conflictBegin.toArenaOrRef(arena), keys.end ) );
1472 }
1473 }
1474
ReadYourWritesTransactionOptions(Transaction const & tr)1475 ReadYourWritesTransactionOptions::ReadYourWritesTransactionOptions(Transaction const& tr) {
1476 Database cx = tr.getDatabase();
1477 timeoutInSeconds = cx->transactionTimeout;
1478 maxRetries = cx->transactionMaxRetries;
1479 reset(tr);
1480 }
1481
reset(Transaction const & tr)1482 void ReadYourWritesTransactionOptions::reset(Transaction const& tr) {
1483 double oldTimeout = timeoutInSeconds;
1484 int oldMaxRetries = maxRetries;
1485 memset(this, 0, sizeof(*this));
1486 if( tr.apiVersionAtLeast(610) ) {
1487 // Starting in API version 610, these options are not cleared after reset.
1488 timeoutInSeconds = oldTimeout;
1489 maxRetries = oldMaxRetries;
1490 }
1491 else {
1492 Database cx = tr.getDatabase();
1493 maxRetries = cx->transactionMaxRetries;
1494 timeoutInSeconds = cx->transactionTimeout;
1495 }
1496 snapshotRywEnabled = tr.getDatabase()->snapshotRywEnabled;
1497 }
1498
fullReset(Transaction const & tr)1499 void ReadYourWritesTransactionOptions::fullReset(Transaction const& tr) {
1500 reset(tr);
1501 Database cx = tr.getDatabase();
1502 maxRetries = cx->transactionMaxRetries;
1503 timeoutInSeconds = cx->transactionTimeout;
1504 }
1505
getAndResetWriteConflictDisabled()1506 bool ReadYourWritesTransactionOptions::getAndResetWriteConflictDisabled() {
1507 bool disabled = nextWriteDisableConflictRange;
1508 nextWriteDisableConflictRange = false;
1509 return disabled;
1510 }
1511
getWriteConflicts(KeyRangeMap<bool> * result)1512 void ReadYourWritesTransaction::getWriteConflicts( KeyRangeMap<bool> *result ) {
1513 WriteMap::iterator it( &writes );
1514 it.skip(allKeys.begin);
1515
1516 bool inConflictRange = false;
1517 ExtStringRef conflictBegin;
1518
1519 for(; it.beginKey() < getMaxWriteKey(); ++it) {
1520 if( it.is_conflict_range() && !inConflictRange ) {
1521 conflictBegin = it.beginKey();
1522 inConflictRange = true;
1523 } else if( !it.is_conflict_range() && inConflictRange ) {
1524 result->insert( KeyRangeRef( conflictBegin.toArenaOrRef(arena), it.beginKey().toArenaOrRef(arena) ), true );
1525 inConflictRange = false;
1526 }
1527 }
1528
1529 if( inConflictRange ) {
1530 result->insert( KeyRangeRef( conflictBegin.toArenaOrRef(arena), getMaxWriteKey() ), true );
1531 }
1532 }
1533
atomicOp(const KeyRef & key,const ValueRef & operand,uint32_t operationType)1534 void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& operand, uint32_t operationType ) {
1535 bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1536
1537 if(checkUsedDuringCommit()) {
1538 throw used_during_commit();
1539 }
1540
1541 if (key == metadataVersionKey) {
1542 if(operationType != MutationRef::SetVersionstampedValue || operand != metadataVersionRequiredValue) {
1543 throw client_invalid_operation();
1544 }
1545 }
1546 else if(key >= getMaxWriteKey()) {
1547 throw key_outside_legal_range();
1548 }
1549
1550 if(!isValidMutationType(operationType) || !isAtomicOp((MutationRef::Type) operationType))
1551 throw invalid_mutation_type();
1552
1553 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1554 throw key_too_large();
1555 if(operand.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
1556 throw value_too_large();
1557
1558 if (tr.apiVersionAtLeast(510)) {
1559 if (operationType == MutationRef::Min)
1560 operationType = MutationRef::MinV2;
1561 else if (operationType == MutationRef::And)
1562 operationType = MutationRef::AndV2;
1563 }
1564
1565 KeyRef k;
1566 if(!tr.apiVersionAtLeast(520) && operationType == MutationRef::SetVersionstampedKey) {
1567 k = key.withSuffix( LiteralStringRef("\x00\x00"), arena );
1568 } else {
1569 k = KeyRef( arena, key );
1570 }
1571 ValueRef v;
1572 if(!tr.apiVersionAtLeast(520) && operationType == MutationRef::SetVersionstampedValue) {
1573 v = operand.withSuffix( LiteralStringRef("\x00\x00\x00\x00"), arena );
1574 } else {
1575 v = ValueRef( arena, operand );
1576 }
1577
1578 if(operationType == MutationRef::SetVersionstampedKey) {
1579 KeyRangeRef range = getVersionstampKeyRange(arena, k, getMaxReadKey()); // this does validation of the key and needs to be performed before the readYourWritesDisabled path
1580 if(!options.readYourWritesDisabled) {
1581 writeRangeToNativeTransaction(range);
1582 writes.addUnmodifiedAndUnreadableRange(range);
1583 }
1584 }
1585
1586 if(operationType == MutationRef::SetVersionstampedValue) {
1587 if(v.size() < 4)
1588 throw client_invalid_operation();
1589 int32_t pos;
1590 memcpy(&pos, v.end() - sizeof(int32_t), sizeof(int32_t));
1591 pos = littleEndian32(pos);
1592 if (pos < 0 || pos + 10 > v.size() - 4)
1593 throw client_invalid_operation();
1594 }
1595
1596 if(options.readYourWritesDisabled) {
1597 return tr.atomicOp(k, v, (MutationRef::Type) operationType, addWriteConflict);
1598 }
1599
1600 writes.mutate(k, (MutationRef::Type) operationType, v, addWriteConflict);
1601 RYWImpl::triggerWatches(this, k, Optional<ValueRef>(), false);
1602 }
1603
set(const KeyRef & key,const ValueRef & value)1604 void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) {
1605 if (key == LiteralStringRef("\xff\xff/reboot_worker")){
1606 BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest() );
1607 return;
1608 }
1609 if (key == LiteralStringRef("\xff\xff/reboot_and_check_worker")){
1610 BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest(false, true) );
1611 return;
1612 }
1613 if (key == metadataVersionKey) {
1614 throw client_invalid_operation();
1615 }
1616
1617 bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1618
1619 if(checkUsedDuringCommit()) {
1620 throw used_during_commit();
1621 }
1622
1623 if(key >= getMaxWriteKey())
1624 throw key_outside_legal_range();
1625
1626 if(options.readYourWritesDisabled ) {
1627 return tr.set(key, value, addWriteConflict);
1628 }
1629
1630 //TODO: check transaction size here
1631 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1632 throw key_too_large();
1633 if(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)
1634 throw value_too_large();
1635
1636 KeyRef k = KeyRef( arena, key );
1637 ValueRef v = ValueRef( arena, value );
1638
1639 writes.mutate(k, MutationRef::SetValue, v, addWriteConflict);
1640 RYWImpl::triggerWatches(this, key, value);
1641 }
1642
clear(const KeyRangeRef & range)1643 void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
1644 bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1645
1646 if(checkUsedDuringCommit()) {
1647 throw used_during_commit();
1648 }
1649
1650 KeyRef maxKey = getMaxWriteKey();
1651 if(range.begin > maxKey || range.end > maxKey)
1652 throw key_outside_legal_range();
1653
1654 if( options.readYourWritesDisabled ) {
1655 return tr.clear(range, addWriteConflict);
1656 }
1657
1658 //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
1659 //we can translate it to an equivalent one with smaller keys
1660 KeyRef begin = range.begin;
1661 KeyRef end = range.end;
1662
1663 if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1664 begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1665 if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1666 end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1667
1668 KeyRangeRef r = KeyRangeRef(begin, end);
1669
1670 if(r.empty()) {
1671 return;
1672 }
1673
1674 r = KeyRangeRef( arena, r );
1675
1676 writes.clear(r, addWriteConflict);
1677 RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
1678 }
1679
clear(const KeyRef & key)1680 void ReadYourWritesTransaction::clear( const KeyRef& key ) {
1681 bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
1682
1683 if(checkUsedDuringCommit()) {
1684 throw used_during_commit();
1685 }
1686
1687 if(key >= getMaxWriteKey())
1688 throw key_outside_legal_range();
1689
1690 if(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1691 return;
1692
1693 if( options.readYourWritesDisabled ) {
1694 return tr.clear(key, addWriteConflict);
1695 }
1696
1697 KeyRangeRef r = singleKeyRange( key, arena );
1698
1699 //SOMEDAY: add an optimized single key clear to write map
1700 writes.clear(r, addWriteConflict);
1701
1702 RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
1703 }
1704
watch(const Key & key)1705 Future<Void> ReadYourWritesTransaction::watch(const Key& key) {
1706 if(checkUsedDuringCommit()) {
1707 return used_during_commit();
1708 }
1709
1710 if( resetPromise.isSet() )
1711 return resetPromise.getFuture().getError();
1712
1713 if( options.readYourWritesDisabled )
1714 return watches_disabled();
1715
1716 if(key >= allKeys.end || (key >= getMaxReadKey() && key != metadataVersionKey && tr.apiVersionAtLeast(300)))
1717 return key_outside_legal_range();
1718
1719 if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1720 return key_too_large();
1721
1722 return RYWImpl::watch(this, key);
1723 }
1724
addWriteConflictRange(KeyRangeRef const & keys)1725 void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) {
1726 if(checkUsedDuringCommit()) {
1727 throw used_during_commit();
1728 }
1729
1730 if (tr.apiVersionAtLeast(300)) {
1731 if (keys.begin > getMaxWriteKey() || keys.end > getMaxWriteKey()) {
1732 throw key_outside_legal_range();
1733 }
1734 }
1735
1736 //There aren't any keys in the database with size larger than KEY_SIZE_LIMIT, so if range contains large keys
1737 //we can translate it to an equivalent one with smaller keys
1738 KeyRef begin = keys.begin;
1739 KeyRef end = keys.end;
1740
1741 if(begin.size() > (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1742 begin = begin.substr(0, (begin.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1743 if(end.size() > (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
1744 end = end.substr(0, (end.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)+1);
1745
1746 KeyRangeRef r = KeyRangeRef(begin, end);
1747
1748 if(r.empty()) {
1749 return;
1750 }
1751
1752 if(options.readYourWritesDisabled) {
1753 tr.addWriteConflictRange(r);
1754 return;
1755 }
1756
1757 r = KeyRangeRef( arena, r );
1758 writes.addConflictRange(r);
1759 }
1760
commit()1761 Future<Void> ReadYourWritesTransaction::commit() {
1762 if(checkUsedDuringCommit()) {
1763 return used_during_commit();
1764 }
1765
1766 if( resetPromise.isSet() )
1767 return resetPromise.getFuture().getError();
1768
1769 return RYWImpl::commit( this );
1770 }
1771
getVersionstamp()1772 Future<Standalone<StringRef>> ReadYourWritesTransaction::getVersionstamp() {
1773 if(checkUsedDuringCommit()) {
1774 return used_during_commit();
1775 }
1776
1777 return waitOrError(tr.getVersionstamp(), resetPromise.getFuture());
1778 }
1779
setOption(FDBTransactionOptions::Option option,Optional<StringRef> value)1780 void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
1781 switch(option) {
1782 case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
1783 validateOptionValue(value, false);
1784
1785 if (!reading.isReady() || !cache.empty() || !writes.empty())
1786 throw client_invalid_operation();
1787
1788 options.readYourWritesDisabled = true;
1789 break;
1790
1791 case FDBTransactionOptions::READ_AHEAD_DISABLE:
1792 validateOptionValue(value, false);
1793
1794 options.readAheadDisabled = true;
1795 break;
1796
1797 case FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE:
1798 validateOptionValue(value, false);
1799
1800 options.nextWriteDisableConflictRange = true;
1801 break;
1802
1803 case FDBTransactionOptions::ACCESS_SYSTEM_KEYS:
1804 validateOptionValue(value, false);
1805
1806 options.readSystemKeys = true;
1807 options.writeSystemKeys = true;
1808 break;
1809
1810 case FDBTransactionOptions::READ_SYSTEM_KEYS:
1811 validateOptionValue(value, false);
1812
1813 options.readSystemKeys = true;
1814 break;
1815
1816 case FDBTransactionOptions::TIMEOUT:
1817 options.timeoutInSeconds = extractIntOption(value, 0, std::numeric_limits<int>::max())/1000.0;
1818 resetTimeout();
1819 break;
1820
1821 case FDBTransactionOptions::RETRY_LIMIT:
1822 options.maxRetries = (int)extractIntOption(value, -1, std::numeric_limits<int>::max());
1823 break;
1824
1825 case FDBTransactionOptions::DEBUG_RETRY_LOGGING:
1826 options.debugRetryLogging = true;
1827 if(!transactionDebugInfo) {
1828 transactionDebugInfo = Reference<TransactionDebugInfo>::addRef(new TransactionDebugInfo());
1829 transactionDebugInfo->lastRetryLogTime = creationTime;
1830 }
1831
1832 transactionDebugInfo->transactionName = value.present() ? value.get().toString() : "";
1833 break;
1834 case FDBTransactionOptions::SNAPSHOT_RYW_ENABLE:
1835 validateOptionValue(value, false);
1836
1837 options.snapshotRywEnabled++;
1838 break;
1839 case FDBTransactionOptions::SNAPSHOT_RYW_DISABLE:
1840 validateOptionValue(value, false);
1841
1842 options.snapshotRywEnabled--;
1843 break;
1844 case FDBTransactionOptions::USED_DURING_COMMIT_PROTECTION_DISABLE:
1845 validateOptionValue(value, false);
1846
1847 options.disableUsedDuringCommitProtection = true;
1848 break;
1849 default:
1850 break;
1851 }
1852
1853 tr.setOption( option, value );
1854 }
1855
operator =(ReadYourWritesTransaction && r)1856 void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT {
1857 cache = std::move( r.cache );
1858 writes = std::move( r.writes );
1859 arena = std::move( r.arena );
1860 tr = std::move( r.tr );
1861 readConflicts = std::move( r.readConflicts );
1862 watchMap = std::move( r.watchMap );
1863 reading = std::move( r.reading );
1864 resetPromise = std::move( r.resetPromise );
1865 r.resetPromise = Promise<Void>();
1866 deferredError = std::move( r.deferredError );
1867 retries = r.retries;
1868 timeoutActor = r.timeoutActor;
1869 creationTime = r.creationTime;
1870 commitStarted = r.commitStarted;
1871 options = r.options;
1872 transactionDebugInfo = r.transactionDebugInfo;
1873 cache.arena = &arena;
1874 writes.arena = &arena;
1875 }
1876
ReadYourWritesTransaction(ReadYourWritesTransaction && r)1877 ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT :
1878 cache( std::move(r.cache) ),
1879 writes( std::move(r.writes) ),
1880 arena( std::move(r.arena) ),
1881 reading( std::move(r.reading) ),
1882 retries( r.retries ),
1883 creationTime( r.creationTime ),
1884 deferredError( std::move(r.deferredError) ),
1885 timeoutActor( std::move(r.timeoutActor) ),
1886 resetPromise( std::move(r.resetPromise) ),
1887 commitStarted( r.commitStarted ),
1888 options( r.options ),
1889 transactionDebugInfo( r.transactionDebugInfo )
1890 {
1891 cache.arena = &arena;
1892 writes.arena = &arena;
1893 tr = std::move( r.tr );
1894 readConflicts = std::move(r.readConflicts);
1895 watchMap = std::move( r.watchMap );
1896 r.resetPromise = Promise<Void>();
1897 }
1898
onError(Error const & e)1899 Future<Void> ReadYourWritesTransaction::onError(Error const& e) {
1900 return RYWImpl::onError( this, e );
1901 }
1902
resetRyow()1903 void ReadYourWritesTransaction::resetRyow() {
1904 Promise<Void> oldReset = resetPromise;
1905 resetPromise = Promise<Void>();
1906
1907 timeoutActor.cancel();
1908 arena = Arena();
1909 cache = SnapshotCache(&arena);
1910 writes = WriteMap(&arena);
1911 readConflicts = CoalescedKeyRefRangeMap<bool>();
1912 watchMap.clear();
1913 reading = AndFuture();
1914 commitStarted = false;
1915
1916 deferredError = Error();
1917
1918 if(tr.apiVersionAtLeast(16)) {
1919 options.reset(tr);
1920 resetTimeout();
1921 }
1922
1923 if ( !oldReset.isSet() )
1924 oldReset.sendError(transaction_cancelled());
1925 }
1926
cancel()1927 void ReadYourWritesTransaction::cancel() {
1928 if(!resetPromise.isSet() )
1929 resetPromise.sendError(transaction_cancelled());
1930 }
1931
reset()1932 void ReadYourWritesTransaction::reset() {
1933 retries = 0;
1934 creationTime = now();
1935 timeoutActor.cancel();
1936 options.fullReset(tr);
1937 transactionDebugInfo.clear();
1938 tr.fullReset();
1939 resetRyow();
1940 }
1941
getMaxReadKey()1942 KeyRef ReadYourWritesTransaction::getMaxReadKey() {
1943 if(options.readSystemKeys)
1944 return systemKeys.end;
1945 else
1946 return normalKeys.end;
1947 }
1948
getMaxWriteKey()1949 KeyRef ReadYourWritesTransaction::getMaxWriteKey() {
1950 if(options.writeSystemKeys)
1951 return systemKeys.end;
1952 else
1953 return normalKeys.end;
1954 }
1955
~ReadYourWritesTransaction()1956 ReadYourWritesTransaction::~ReadYourWritesTransaction() {
1957 if( !resetPromise.isSet() )
1958 resetPromise.sendError(transaction_cancelled());
1959 }
1960
checkUsedDuringCommit()1961 bool ReadYourWritesTransaction::checkUsedDuringCommit() {
1962 if(commitStarted && !resetPromise.isSet() && !options.disableUsedDuringCommitProtection) {
1963 resetPromise.sendError(used_during_commit());
1964 }
1965
1966 return commitStarted;
1967 }
1968
debugLogRetries(Optional<Error> error)1969 void ReadYourWritesTransaction::debugLogRetries(Optional<Error> error) {
1970 bool committed = !error.present();
1971 if(options.debugRetryLogging) {
1972 double timeSinceLastLog = now() - transactionDebugInfo->lastRetryLogTime;
1973 double elapsed = now() - creationTime;
1974 if(timeSinceLastLog >= 1 || (committed && elapsed > 1)) {
1975 std::string transactionNameStr = "";
1976 if(!transactionDebugInfo->transactionName.empty())
1977 transactionNameStr = format(" in transaction '%s'", printable(StringRef(transactionDebugInfo->transactionName)).c_str());
1978 if(!g_network->isSimulated()) //Fuzz workload turns this on, but we do not want stderr output in simulation
1979 fprintf(stderr, "fdb WARNING: long transaction (%.2fs elapsed%s, %d retries, %s)\n", elapsed, transactionNameStr.c_str(), retries, committed ? "committed" : error.get().what());
1980 {
1981 TraceEvent trace = TraceEvent("LongTransaction");
1982 if(error.present())
1983 trace.error(error.get(), true);
1984 if(!transactionDebugInfo->transactionName.empty())
1985 trace.detail("TransactionName", transactionDebugInfo->transactionName);
1986 trace.detail("Elapsed", elapsed).detail("Retries", retries).detail("Committed", committed);
1987 }
1988 transactionDebugInfo->lastRetryLogTime = now();
1989 }
1990 }
1991 }
1992