1 /*
2  * storageserver.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 
22 #include "fdbrpc/fdbrpc.h"
23 #include "fdbrpc/LoadBalance.h"
24 #include "flow/IndexedSet.h"
25 #include "flow/Hash3.h"
26 #include "flow/ActorCollection.h"
27 #include "flow/SystemMonitor.h"
28 #include "flow/Util.h"
29 #include "fdbclient/Atomic.h"
30 #include "fdbclient/KeyRangeMap.h"
31 #include "fdbclient/SystemData.h"
32 #include "fdbclient/NativeAPI.actor.h"
33 #include "fdbclient/Notified.h"
34 #include "fdbclient/MasterProxyInterface.h"
35 #include "fdbclient/DatabaseContext.h"
36 #include "fdbserver/WorkerInterface.actor.h"
37 #include "fdbserver/TLogInterface.h"
38 #include "fdbserver/MoveKeys.actor.h"
39 #include "fdbserver/Knobs.h"
40 #include "fdbserver/WaitFailure.h"
41 #include "fdbserver/IKeyValueStore.h"
42 #include "fdbclient/VersionedMap.h"
43 #include "fdbserver/StorageMetrics.h"
44 #include "fdbrpc/sim_validation.h"
45 #include "fdbserver/ServerDBInfo.h"
46 #include "fdbrpc/Smoother.h"
47 #include "flow/Stats.h"
48 #include "fdbserver/LogSystem.h"
49 #include "fdbserver/RecoveryState.h"
50 #include "fdbserver/LogProtocolMessage.h"
51 #include "fdbserver/LatencyBandConfig.h"
52 #include "flow/TDMetric.actor.h"
53 #include "flow/actorcompiler.h"  // This must be the last #include.
54 
55 using std::make_pair;
56 
57 #pragma region Data Structures
58 
59 #define SHORT_CIRCUT_ACTUAL_STORAGE 0
60 
canReplyWith(Error e)61 inline bool canReplyWith(Error e) {
62 	switch(e.code()) {
63 		case error_code_transaction_too_old:
64 		case error_code_future_version:
65 		case error_code_wrong_shard_server:
66 		case error_code_process_behind:
67 		//case error_code_all_alternatives_failed:
68 			return true;
69 		default:
70 			return false;
71 	};
72 }
73 
74 struct StorageServer;
75 class ValueOrClearToRef {
76 public:
value(ValueRef const & v)77 	static ValueOrClearToRef value(ValueRef const& v) { return ValueOrClearToRef(v, false); }
clearTo(KeyRef const & k)78 	static ValueOrClearToRef clearTo(KeyRef const& k) { return ValueOrClearToRef(k, true); }
79 
isValue() const80 	bool isValue() const { return !isClear; };
isClearTo() const81 	bool isClearTo() const { return isClear; }
82 
getValue() const83 	ValueRef const& getValue() const { ASSERT( isValue() ); return item; };
getEndKey() const84 	KeyRef const&  getEndKey() const { ASSERT(isClearTo()); return item; };
85 
86 private:
ValueOrClearToRef(StringRef item,bool isClear)87 	ValueOrClearToRef( StringRef item, bool isClear ) : item(item), isClear(isClear) {}
88 
89 	StringRef item;
90 	bool isClear;
91 };
92 
93 struct AddingShard : NonCopyable {
94 	KeyRange keys;
95 	Future<Void> fetchClient;			// holds FetchKeys() actor
96 	Promise<Void> fetchComplete;
97 	Promise<Void> readWrite;
98 
99 	std::deque< Standalone<VerUpdateRef> > updates;  // during the Fetching phase, mutations with key in keys and version>=(fetchClient's) fetchVersion;
100 
101 	struct StorageServer* server;
102 	Version transferredVersion;
103 
104 	enum Phase { WaitPrevious, Fetching, Waiting };
105 	Phase phase;
106 
107 	AddingShard( StorageServer* server, KeyRangeRef const& keys );
108 
109 	// When fetchKeys "partially completes" (splits an adding shard in two), this is used to construct the left half
AddingShardAddingShard110 	AddingShard( AddingShard* prev, KeyRange const& keys )
111 		: keys(keys), fetchClient(prev->fetchClient), server(prev->server), transferredVersion(prev->transferredVersion), phase(prev->phase)
112 	{
113 	}
~AddingShardAddingShard114 	~AddingShard() {
115 		if( !fetchComplete.isSet() )
116 			fetchComplete.send(Void());
117 		if( !readWrite.isSet() )
118 			readWrite.send(Void());
119 	}
120 
121 	void addMutation( Version version, MutationRef const& mutation );
122 
isTransferredAddingShard123 	bool isTransferred() const { return phase == Waiting; }
124 };
125 
126 struct ShardInfo : ReferenceCounted<ShardInfo>, NonCopyable {
127 	AddingShard* adding;
128 	struct StorageServer* readWrite;
129 	KeyRange keys;
130 	uint64_t changeCounter;
131 
ShardInfoShardInfo132 	ShardInfo(KeyRange keys, AddingShard* adding, StorageServer* readWrite)
133 		: adding(adding), readWrite(readWrite), keys(keys)
134 	{
135 	}
136 
~ShardInfoShardInfo137 	~ShardInfo() {
138 		delete adding;
139 	}
140 
newNotAssignedShardInfo141 	static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, NULL, NULL); }
newReadWriteShardInfo142 	static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, NULL, data); }
newAddingShardInfo143 	static ShardInfo* newAdding(StorageServer* data, KeyRange keys) { return new ShardInfo(keys, new AddingShard(data, keys), NULL); }
addingSplitLeftShardInfo144 	static ShardInfo* addingSplitLeft( KeyRange keys, AddingShard* oldShard) { return new ShardInfo(keys, new AddingShard(oldShard, keys), NULL); }
145 
isReadableShardInfo146 	bool isReadable() const { return readWrite!=NULL; }
notAssignedShardInfo147 	bool notAssigned() const { return !readWrite && !adding; }
assignedShardInfo148 	bool assigned() const { return readWrite || adding; }
isInVersionedDataShardInfo149 	bool isInVersionedData() const { return readWrite || (adding && adding->isTransferred()); }
150 	void addMutation( Version version, MutationRef const& mutation );
isFetchedShardInfo151 	bool isFetched() const { return readWrite || ( adding && adding->fetchComplete.isSet() ); }
152 
debugDescribeStateShardInfo153 	const char* debugDescribeState() const {
154 		if (notAssigned()) return "NotAssigned";
155 		else if (adding && !adding->isTransferred()) return "AddingFetching";
156 		else if (adding) return "AddingTransferred";
157 		else return "ReadWrite";
158 	}
159 };
160 
161 struct StorageServerDisk {
StorageServerDiskStorageServerDisk162 	explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage) {}
163 
164 	void makeNewStorageServerDurable();
165 	bool makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft );
166 	void makeVersionDurable( Version version );
167 	Future<bool> restoreDurableState();
168 
169 	void changeLogProtocol(Version version, uint64_t protocol);
170 
171 	void writeMutation( MutationRef mutation );
172 	void writeKeyValue( KeyValueRef kv );
173 	void clearRange( KeyRangeRef keys );
174 
getErrorStorageServerDisk175 	Future<Void> getError() { return storage->getError(); }
initStorageServerDisk176 	Future<Void> init() { return storage->init(); }
commitStorageServerDisk177 	Future<Void> commit() { return storage->commit(); }
178 
179 	// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
readNextKeyInclusiveStorageServerDisk180 	Future<Key> readNextKeyInclusive( KeyRef key ) { return readFirstKey(storage, KeyRangeRef(key, allKeys.end)); }
readValueStorageServerDisk181 	Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) { return storage->readValue(key, debugID); }
readValuePrefixStorageServerDisk182 	Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>() ) { return storage->readValuePrefix(key, maxLength, debugID); }
readRangeStorageServerDisk183 	Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) { return storage->readRange(keys, rowLimit, byteLimit); }
184 
getKeyValueStoreTypeStorageServerDisk185 	KeyValueStoreType getKeyValueStoreType() { return storage->getType(); }
getStorageBytesStorageServerDisk186 	StorageBytes getStorageBytes() { return storage->getStorageBytes(); }
187 
188 private:
189 	struct StorageServer* data;
190 	IKeyValueStore* storage;
191 
192 	void writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext );
193 
readFirstKeyStorageServerDisk194 	ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
195 		Standalone<VectorRef<KeyValueRef>> r = wait( storage->readRange( range, 1 ) );
196 		if (r.size()) return r[0].key;
197 		else return range.end;
198 	}
199 };
200 
201 struct UpdateEagerReadInfo {
202 	vector<KeyRef> keyBegin;
203 	vector<Key> keyEnd; // these are for ClearRange
204 
205 	vector<pair<KeyRef, int>> keys;
206 	vector<Optional<Value>> value;
207 
208 	Arena arena;
209 
addMutationsUpdateEagerReadInfo210 	void addMutations( VectorRef<MutationRef> const& mutations ) {
211 		for(auto& m : mutations)
212 			addMutation(m);
213 	}
214 
addMutationUpdateEagerReadInfo215 	void addMutation( MutationRef const& m ) {
216 		// SOMEDAY: Theoretically we can avoid a read if there is an earlier overlapping ClearRange
217 		if (m.type == MutationRef::ClearRange && !m.param2.startsWith(systemKeys.end))
218 			keyBegin.push_back( m.param2 );
219 		else if (m.type == MutationRef::CompareAndClear) {
220 			keyBegin.push_back(keyAfter(m.param1, arena));
221 			if (keys.size() > 0 && keys.back().first == m.param1) {
222 				// Don't issue a second read, if the last read was equal to the current key.
223 				// CompareAndClear is likely to be used after another atomic operation on same key.
224 				keys.back().second = std::max(keys.back().second, m.param2.size() + 1);
225 			} else {
226 				keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size() + 1));
227 			}
228 		} else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) ||
229 		           (m.type == MutationRef::ByteMax))
230 			keys.push_back(pair<KeyRef, int>(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT));
231 		else if (isAtomicOp((MutationRef::Type) m.type))
232 			keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size()));
233 	}
234 
finishKeyBeginUpdateEagerReadInfo235 	void finishKeyBegin() {
236 		std::sort(keyBegin.begin(), keyBegin.end());
237 		keyBegin.resize( std::unique(keyBegin.begin(), keyBegin.end()) - keyBegin.begin() );
238 		std::sort(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return (lhs.first < rhs.first) || (lhs.first == rhs.first && lhs.second > rhs.second); } );
239 		keys.resize(std::unique(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first == rhs.first; } ) - keys.begin());
240 		//value gets populated in doEagerReads
241 	}
242 
getValueUpdateEagerReadInfo243 	Optional<Value>& getValue(KeyRef key) {
244 		int i = std::lower_bound(keys.begin(), keys.end(), pair<KeyRef, int>(key, 0), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first < rhs.first; } ) - keys.begin();
245 		ASSERT( i < keys.size() && keys[i].first == key );
246 		return value[i];
247 	}
248 
getKeyEndUpdateEagerReadInfo249 	KeyRef getKeyEnd( KeyRef key ) {
250 		int i = std::lower_bound(keyBegin.begin(), keyBegin.end(), key) - keyBegin.begin();
251 		ASSERT( i < keyBegin.size() && keyBegin[i] == key );
252 		return keyEnd[i];
253 	}
254 };
255 
256 const int VERSION_OVERHEAD = 64 + sizeof(Version) + sizeof(Standalone<VersionUpdateRef>) + //mutationLog, 64b overhead for map
257 							 2 * (64 + sizeof(Version) + sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); //versioned map [ x2 for createNewVersion(version+1) ], 64b overhead for map
mvccStorageBytes(MutationRef const & m)258 static int mvccStorageBytes( MutationRef const& m ) { return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2; }
259 
260 struct FetchInjectionInfo {
261 	Arena arena;
262 	vector<VerUpdateRef> changes;
263 };
264 
265 struct StorageServer {
266 	typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
267 
268 private:
269 	// versionedData contains sets and clears.
270 
271 	// * Nonoverlapping: No clear overlaps a set or another clear, or adjoins another clear.
272 	// ~ Clears are maximal: If versionedData.at(v) contains a clear [b,e) then
273 	//      there is a key data[e]@v, or e==allKeys.end, or a shard boundary or former boundary at e
274 
275 	// * Reads are possible: When k is in a readable shard, for any v in [storageVersion, version.get()],
276 	//      storage[k] + versionedData.at(v)[k] = database[k] @ v    (storage[k] might be @ any version in [durableVersion, storageVersion])
277 
278 	// * Transferred shards are partially readable: When k is in an adding, transferred shard, for any v in [transferredVersion, version.get()],
279 	//      storage[k] + versionedData.at(v)[k] = database[k] @ v
280 
281 	// * versionedData contains versions [storageVersion(), version.get()].  It might also contain version (version.get()+1), in which changeDurableVersion may be deleting ghosts, and/or it might
282 	//      contain later versions if applyUpdate is on the stack.
283 
284 	// * Old shards are erased: versionedData.atLatest() has entries (sets or intersecting clears) only for keys in readable or adding,transferred shards.
285 	//   Earlier versions may have extra entries for shards that *were* readable or adding,transferred when those versions were the latest, but they eventually are forgotten.
286 
287 	// * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion(), but views
288 	//   at older versions may contain older items which are also in storage (this is OK because of idempotency)
289 
290 	VersionedData versionedData;
291 	std::map<Version, Standalone<VersionUpdateRef>> mutationLog; // versions (durableVersion, version]
292 
293 public:
294 	Tag tag;
295 	vector<pair<Version,Tag>> history;
296 	vector<pair<Version,Tag>> allHistory;
297 	Version poppedAllAfter;
298 	std::map<Version, Arena> freeable;  // for each version, an Arena that must be held until that version is < oldestVersion
299 	Arena lastArena;
300 	double cpuUsage;
301 	double diskUsage;
302 
getMutationLogStorageServer303 	std::map<Version, Standalone<VersionUpdateRef>> const & getMutationLog() { return mutationLog; }
getMutableMutationLogStorageServer304 	std::map<Version, Standalone<VersionUpdateRef>>& getMutableMutationLog() { return mutationLog; }
dataStorageServer305 	VersionedData const& data() const { return versionedData; }
mutableDataStorageServer306 	VersionedData& mutableData() { return versionedData; }
307 
308 	void addMutationToMutationLogOrStorage( Version ver, MutationRef m ); // Appends m to mutationLog@ver, or to storage if ver==invalidVersion
309 
310 	// Update the byteSample, and write the updates to the mutation log@ver, or to storage if ver==invalidVersion
311 	void byteSampleApplyMutation( MutationRef const& m, Version ver );
312 	void byteSampleApplySet( KeyValueRef kv, Version ver );
313 	void byteSampleApplyClear( KeyRangeRef range, Version ver );
314 
popVersionStorageServer315 	void popVersion(Version v, bool popAllTags = false) {
316 		if(logSystem) {
317 			if(v > poppedAllAfter) {
318 				popAllTags = true;
319 				poppedAllAfter = std::numeric_limits<Version>::max();
320 			}
321 
322 			vector<pair<Version,Tag>>* hist = &history;
323 			vector<pair<Version,Tag>> allHistoryCopy;
324 			if(popAllTags) {
325 				allHistoryCopy = allHistory;
326 				hist = &allHistoryCopy;
327 			}
328 
329 			while(hist->size() && v > hist->back().first ) {
330 				logSystem->pop( v, hist->back().second );
331 				hist->pop_back();
332 			}
333 			if(hist->size()) {
334 				logSystem->pop( v, hist->back().second );
335 			} else {
336 				logSystem->pop( v, tag );
337 			}
338 		}
339 	}
340 
addVersionToMutationLogStorageServer341 	Standalone<VersionUpdateRef>& addVersionToMutationLog(Version v) {
342 		// return existing version...
343 		auto m = mutationLog.find(v);
344 		if (m != mutationLog.end())
345 			return m->second;
346 
347 		// ...or create a new one
348 		auto& u = mutationLog[v];
349 		u.version = v;
350 		if (lastArena.getSize() >= 65536) lastArena = Arena(4096);
351 		u.arena() = lastArena;
352 		counters.bytesInput += VERSION_OVERHEAD;
353 		return u;
354 	}
355 
addMutationToMutationLogStorageServer356 	MutationRef addMutationToMutationLog(Standalone<VersionUpdateRef> &mLV, MutationRef const& m){
357 		byteSampleApplyMutation(m, mLV.version);
358 		counters.bytesInput += mvccStorageBytes(m);
359 		return mLV.mutations.push_back_deep( mLV.arena(), m );
360 	}
361 
362 	StorageServerDisk storage;
363 
364 	KeyRangeMap< Reference<ShardInfo> > shards;
365 	uint64_t shardChangeCounter;      // max( shards->changecounter )
366 
367 	// newestAvailableVersion[k]
368 	//   == invalidVersion -> k is unavailable at all versions
369 	//   <= storageVersion -> k is unavailable at all versions (but might be read anyway from storage if we are in the process of committing makeShardDurable)
370 	//   == v              -> k is readable (from storage+versionedData) @ [storageVersion,v], and not being updated when version increases
371 	//   == latestVersion  -> k is readable (from storage+versionedData) @ [storageVersion,version.get()], and thus stays available when version increases
372 	CoalescedKeyRangeMap< Version > newestAvailableVersion;
373 
374 	CoalescedKeyRangeMap< Version > newestDirtyVersion; // Similar to newestAvailableVersion, but includes (only) keys that were only partly available (due to cancelled fetchKeys)
375 
376 	// The following are in rough order from newest to oldest
377 	Version lastTLogVersion, lastVersionWithData, restoredVersion;
378 	NotifiedVersion version;
379 	NotifiedVersion desiredOldestVersion;    // We can increase oldestVersion (and then durableVersion) to this version when the disk permits
380 	NotifiedVersion oldestVersion;           // See also storageVersion()
381 	NotifiedVersion durableVersion; 	     // At least this version will be readable from storage after a power failure
382 	Version rebootAfterDurableVersion;
383 	int8_t primaryLocality;
384 
385 	Deque<std::pair<Version,Version>> recoveryVersionSkips;
386 	int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage server
387 
388 	uint64_t logProtocol;
389 
390 	Reference<ILogSystem> logSystem;
391 	Reference<ILogSystem::IPeekCursor> logCursor;
392 
393 	UID thisServerID;
394 	Key sk;
395 	Reference<AsyncVar<ServerDBInfo>> db;
396 	Database cx;
397 
398 	StorageServerMetrics metrics;
399 	CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
400 	AsyncVar<bool> byteSampleClearsTooLarge;
401 	Future<Void> byteSampleRecovery;
402 	Future<Void> durableInProgress;
403 
404 	AsyncMap<Key,bool> watches;
405 	int64_t watchBytes;
406 	int64_t numWatches;
407 	AsyncVar<bool> noRecentUpdates;
408 	double lastUpdate;
409 
410 	Int64MetricHandle readQueueSizeMetric;
411 
412 	std::string folder;
413 
414 	// defined only during splitMutations()/addMutation()
415 	UpdateEagerReadInfo *updateEagerReads;
416 
417 	FlowLock durableVersionLock;
418 	FlowLock fetchKeysParallelismLock;
419 	vector< Promise<FetchInjectionInfo*> > readyFetchKeys;
420 
421 	int64_t instanceID;
422 
423 	Promise<Void> otherError;
424 	Promise<Void> coreStarted;
425 	bool shuttingDown;
426 
427 	bool behind;
428 
429 	bool debug_inApplyUpdate;
430 	double debug_lastValidateTime;
431 
432 	int maxQueryQueue;
getAndResetMaxQueryQueueSizeStorageServer433 	int getAndResetMaxQueryQueueSize() {
434 		int val = maxQueryQueue;
435 		maxQueryQueue = 0;
436 		return val;
437 	}
438 
439 	Optional<LatencyBandConfig> latencyBandConfig;
440 
441 	struct Counters {
442 		CounterCollection cc;
443 		Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries;
444 		Counter bytesInput, bytesDurable, bytesFetched,
445 			mutationBytes;  // Like bytesInput but without MVCC accounting
446 		Counter mutations, setMutations, clearRangeMutations, atomicMutations;
447 		Counter updateBatches, updateVersions;
448 		Counter loops;
449 		Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount;
450 
451 		LatencyBands readLatencyBands;
452 
CountersStorageServer::Counters453 		Counters(StorageServer* self)
454 			: cc("StorageServer", self->thisServerID.toString()),
455 			getKeyQueries("GetKeyQueries", cc),
456 			getValueQueries("GetValueQueries",cc),
457 			getRangeQueries("GetRangeQueries", cc),
458 			allQueries("QueryQueue", cc),
459 			finishedQueries("FinishedQueries", cc),
460 			rowsQueried("RowsQueried", cc),
461 			bytesQueried("BytesQueried", cc),
462 			watchQueries("WatchQueries", cc),
463 			bytesInput("BytesInput", cc),
464 			bytesDurable("BytesDurable", cc),
465 			bytesFetched("BytesFetched", cc),
466 			mutationBytes("MutationBytes", cc),
467 			mutations("Mutations", cc),
468 			setMutations("SetMutations", cc),
469 			clearRangeMutations("ClearRangeMutations", cc),
470 			atomicMutations("AtomicMutations", cc),
471 			updateBatches("UpdateBatches", cc),
472 			updateVersions("UpdateVersions", cc),
473 			loops("Loops", cc),
474 			fetchWaitingMS("FetchWaitingMS", cc),
475 			fetchWaitingCount("FetchWaitingCount", cc),
476 			fetchExecutingMS("FetchExecutingMS", cc),
477 			fetchExecutingCount("FetchExecutingCount", cc),
478 			readLatencyBands("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
479 		{
480 			specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; });
481 			specialCounter(cc, "Version", [self](){ return self->version.get(); });
482 			specialCounter(cc, "StorageVersion", [self](){ return self->storageVersion(); });
483 			specialCounter(cc, "DurableVersion", [self](){ return self->durableVersion.get(); });
484 			specialCounter(cc, "DesiredOldestVersion", [self](){ return self->desiredOldestVersion.get(); });
485 			specialCounter(cc, "VersionLag", [self](){ return self->versionLag; });
486 
487 			specialCounter(cc, "FetchKeysFetchActive", [self](){ return self->fetchKeysParallelismLock.activePermits(); });
488 			specialCounter(cc, "FetchKeysWaiting", [self](){ return self->fetchKeysParallelismLock.waiters(); });
489 
490 			specialCounter(cc, "QueryQueueMax", [self](){ return self->getAndResetMaxQueryQueueSize(); });
491 
492 			specialCounter(cc, "BytesStored", [self](){ return self->metrics.byteSample.getEstimate(allKeys); });
493 			specialCounter(cc, "ActiveWatches", [self](){ return self->numWatches; });
494 			specialCounter(cc, "WatchBytes", [self](){ return self->watchBytes; });
495 
496 			specialCounter(cc, "KvstoreBytesUsed", [self](){ return self->storage.getStorageBytes().used; });
497 			specialCounter(cc, "KvstoreBytesFree", [self](){ return self->storage.getStorageBytes().free; });
498 			specialCounter(cc, "KvstoreBytesAvailable", [self](){ return self->storage.getStorageBytes().available; });
499 			specialCounter(cc, "KvstoreBytesTotal", [self](){ return self->storage.getStorageBytes().total; });
500 		}
501 	} counters;
502 
StorageServerStorageServer503 	StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
504 		:	instanceID(g_random->randomUniqueID().first()),
505 			storage(this, storage), db(db),
506 			lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
507 			rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
508 			durableInProgress(Void()),
509 			versionLag(0), primaryLocality(tagLocalityInvalid),
510 			updateEagerReads(0),
511 			shardChangeCounter(0),
512 			fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
513 			shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
514 			logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
515 			readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
516 			behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
517 			lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
518 	{
519 		version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
520 		oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
521 		durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
522 		desiredOldestVersion.initMetric(LiteralStringRef("StorageServer.DesiredOldestVersion"), counters.cc.id);
523 
524 		newestAvailableVersion.insert(allKeys, invalidVersion);
525 		newestDirtyVersion.insert(allKeys, invalidVersion);
526 		addShard( ShardInfo::newNotAssigned( allKeys ) );
527 
528 		cx = openDBOnServer(db, TaskDefaultEndpoint, true, true);
529 	}
530 	//~StorageServer() { fclose(log); }
531 
532 	// Puts the given shard into shards.  The caller is responsible for adding shards
533 	//   for all ranges in shards.getAffectedRangesAfterInsertion(newShard->keys)), because these
534 	//   shards are invalidated by the call.
addShardStorageServer535 	void addShard( ShardInfo* newShard ) {
536 		ASSERT( !newShard->keys.empty() );
537 		newShard->changeCounter = ++shardChangeCounter;
538 		//TraceEvent("AddShard", this->thisServerID).detail("KeyBegin", newShard->keys.begin).detail("KeyEnd", newShard->keys.end).detail("State", newShard->isReadable() ? "Readable" : newShard->notAssigned() ? "NotAssigned" : "Adding").detail("Version", this->version.get());
539 		/*auto affected = shards.getAffectedRangesAfterInsertion( newShard->keys, Reference<ShardInfo>() );
540 		for(auto i = affected.begin(); i != affected.end(); ++i)
541 			shards.insert( *i, Reference<ShardInfo>() );*/
542 		shards.insert( newShard->keys, Reference<ShardInfo>(newShard) );
543 	}
544 	void addMutation(Version version, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads );
setInitialVersionStorageServer545 	void setInitialVersion(Version ver) {
546 		version = ver;
547 		desiredOldestVersion = ver;
548 		oldestVersion = ver;
549 		durableVersion = ver;
550 		lastVersionWithData = ver;
551 		restoredVersion = ver;
552 
553 		mutableData().createNewVersion(ver);
554 		mutableData().forgetVersionsBefore(ver);
555 	}
556 
557 	// This is the maximum version that might be read from storage (the minimum version is durableVersion)
storageVersionStorageServer558 	Version storageVersion() const { return oldestVersion.get(); }
559 
isReadableStorageServer560 	bool isReadable( KeyRangeRef const& keys ) {
561 		auto sh = shards.intersectingRanges(keys);
562 		for(auto i = sh.begin(); i != sh.end(); ++i)
563 			if (!i->value()->isReadable())
564 				return false;
565 		return true;
566 	}
567 
checkChangeCounterStorageServer568 	void checkChangeCounter( uint64_t oldShardChangeCounter, KeyRef const& key ) {
569 		if (oldShardChangeCounter != shardChangeCounter &&
570 			shards[key]->changeCounter > oldShardChangeCounter)
571 		{
572 			TEST(true); // shard change during getValueQ
573 			throw wrong_shard_server();
574 		}
575 	}
576 
checkChangeCounterStorageServer577 	void checkChangeCounter( uint64_t oldShardChangeCounter, KeyRangeRef const& keys ) {
578 		if (oldShardChangeCounter != shardChangeCounter) {
579 			auto sh = shards.intersectingRanges(keys);
580 			for(auto i = sh.begin(); i != sh.end(); ++i)
581 				if (i->value()->changeCounter > oldShardChangeCounter) {
582 					TEST(true); // shard change during range operation
583 					throw wrong_shard_server();
584 				}
585 		}
586 	}
587 
queueSizeStorageServer588 	Counter::Value queueSize() {
589 		return counters.bytesInput.getValue() - counters.bytesDurable.getValue();
590 	}
591 
getPenaltyStorageServer592 	double getPenalty() {
593 		 return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
594 	}
595 };
596 
597 // If and only if key:=value is in (storage+versionedData),    // NOT ACTUALLY: and key < allKeys.end,
598 //   and H(key) < |key+value|/bytesPerSample,
599 //     let sampledSize = max(|key+value|,bytesPerSample)
600 //     persistByteSampleKeys.begin()+key := sampledSize is in storage
601 //     (key,sampledSize) is in byteSample
602 
603 // So P(key is sampled) * sampledSize == |key+value|
604 
byteSampleApplyMutation(MutationRef const & m,Version ver)605 void StorageServer::byteSampleApplyMutation( MutationRef const& m, Version ver ){
606 	if (m.type == MutationRef::ClearRange)
607 		byteSampleApplyClear( KeyRangeRef(m.param1, m.param2), ver );
608 	else if (m.type == MutationRef::SetValue)
609 		byteSampleApplySet( KeyValueRef(m.param1, m.param2), ver );
610 	else
611 		ASSERT(false); // Mutation of unknown type modfying byte sample
612 }
613 
614 #pragma endregion
615 
616 /////////////////////////////////// Validation ///////////////////////////////////////
617 #pragma region Validation
validateRange(StorageServer::VersionedData::ViewAtVersion const & view,KeyRangeRef range,Version version,UID id,Version minInsertVersion)618 bool validateRange( StorageServer::VersionedData::ViewAtVersion const& view, KeyRangeRef range, Version version, UID id, Version minInsertVersion ) {
619 	// * Nonoverlapping: No clear overlaps a set or another clear, or adjoins another clear.
620 	// * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion()
621 
622 	TraceEvent("ValidateRange", id).detail("KeyBegin", range.begin).detail("KeyEnd", range.end).detail("Version", version);
623 	KeyRef k;
624 	bool ok = true;
625 	bool kIsClear = false;
626 	auto i = view.lower_bound(range.begin);
627 	if (i != view.begin()) --i;
628 	for(; i != view.end() && i.key() < range.end; ++i) {
629 		ASSERT( i.insertVersion() > minInsertVersion );
630 		if (kIsClear && i->isClearTo() ? i.key() <= k : i.key() < k) {
631 			TraceEvent(SevError,"InvalidRange",id).detail("Key1", k).detail("Key2", i.key()).detail("Version", version);
632 			ok = false;
633 		}
634 		//ASSERT( i.key() >= k );
635 		kIsClear = i->isClearTo();
636 		k = kIsClear ? i->getEndKey() : i.key();
637 	}
638 	return ok;
639 }
640 
validate(StorageServer * data,bool force=false)641 void validate(StorageServer* data, bool force = false) {
642 	try {
643 		if (force || (EXPENSIVE_VALIDATION)) {
644 			data->newestAvailableVersion.validateCoalesced();
645 			data->newestDirtyVersion.validateCoalesced();
646 
647 			for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
648 				ASSERT( s->value()->keys == s->range() );
649 				ASSERT( !s->value()->keys.empty() );
650 			}
651 
652 			for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s)
653 				if (s->value()->isReadable()) {
654 					auto ar = data->newestAvailableVersion.intersectingRanges(s->range());
655 					for(auto a = ar.begin(); a != ar.end(); ++a)
656 						ASSERT( a->value() == latestVersion );
657 				}
658 
659 			// * versionedData contains versions [storageVersion(), version.get()].  It might also contain version (version.get()+1), in which changeDurableVersion may be deleting ghosts, and/or it might
660 			//      contain later versions if applyUpdate is on the stack.
661 			ASSERT( data->data().getOldestVersion() == data->storageVersion() );
662 			ASSERT( data->data().getLatestVersion() == data->version.get() || data->data().getLatestVersion() == data->version.get()+1 || (data->debug_inApplyUpdate && data->data().getLatestVersion() > data->version.get()) );
663 
664 			auto latest = data->data().atLatest();
665 
666 			// * Old shards are erased: versionedData.atLatest() has entries (sets or clear *begins*) only for keys in readable or adding,transferred shards.
667 			for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
668 				ShardInfo* shard = s->value().getPtr();
669 				if (!shard->isInVersionedData()) {
670 					if (latest.lower_bound(s->begin()) != latest.lower_bound(s->end())) {
671 						TraceEvent(SevError, "VF", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime).detail("KeyBegin", s->begin()).detail("KeyEnd", s->end())
672 							.detail("FirstKey", latest.lower_bound(s->begin()).key()).detail("FirstInsertV", latest.lower_bound(s->begin()).insertVersion());
673 					}
674 					ASSERT( latest.lower_bound(s->begin()) == latest.lower_bound(s->end()) );
675 				}
676 			}
677 
678 			latest.validate();
679 			validateRange(latest, allKeys, data->version.get(), data->thisServerID, data->durableVersion.get());
680 
681 			data->debug_lastValidateTime = now();
682 		}
683 	} catch (...) {
684 		TraceEvent(SevError, "ValidationFailure", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime);
685 		throw;
686 	}
687 }
688 #pragma endregion
689 
690 void
updateProcessStats(StorageServer * self)691 updateProcessStats(StorageServer* self)
692 {
693 	if (g_network->isSimulated()) {
694 		// diskUsage and cpuUsage are not relevant in the simulator,
695 		// and relying on the actual values could break seed determinism
696 		self->cpuUsage = 100.0;
697 		self->diskUsage = 100.0;
698 		return;
699 	}
700 
701 	SystemStatistics sysStats = getSystemStatistics();
702 	if (sysStats.initialized) {
703 		self->cpuUsage = 100 * sysStats.processCPUSeconds / sysStats.elapsed;
704 		self->diskUsage = 100 * std::max(0.0, (sysStats.elapsed - sysStats.processDiskIdleSeconds) / sysStats.elapsed);
705 	}
706 }
707 
708 ///////////////////////////////////// Queries /////////////////////////////////
709 #pragma region Queries
waitForVersion(StorageServer * data,Version version)710 ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
711 	// This could become an Actor transparently, but for now it just does the lookup
712 	if (version == latestVersion)
713 		version = std::max(Version(1), data->version.get());
714 	if (version < data->oldestVersion.get() || version <= 0) throw transaction_too_old();
715 	else if (version <= data->version.get())
716 		return version;
717 
718 	if(data->behind && version > data->version.get()) {
719 		throw process_behind();
720 	}
721 
722 	if(g_random->random01() < 0.001)
723 		TraceEvent("WaitForVersion1000x");
724 	choose {
725 		when ( wait( data->version.whenAtLeast(version) ) ) {
726 			//FIXME: A bunch of these can block with or without the following delay 0.
727 			//wait( delay(0) );  // don't do a whole bunch of these at once
728 			if (version < data->oldestVersion.get()) throw transaction_too_old();  // just in case
729 			return version;
730 		}
731 		when ( wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) {
732 			if(g_random->random01() < 0.001)
733 				TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
734 					.detail("Version", version)
735 					.detail("MyVersion", data->version.get())
736 					.detail("ServerID", data->thisServerID);
737 			throw future_version();
738 		}
739 	}
740 }
741 
waitForVersionNoTooOld(StorageServer * data,Version version)742 ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
743 	// This could become an Actor transparently, but for now it just does the lookup
744 	if (version == latestVersion)
745 		version = std::max(Version(1), data->version.get());
746 	if (version <= data->version.get())
747 		return version;
748 	choose {
749 		when ( wait( data->version.whenAtLeast(version) ) ) {
750 			return version;
751 		}
752 		when ( wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) {
753 			if(g_random->random01() < 0.001)
754 				TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
755 					.detail("Version", version)
756 					.detail("MyVersion", data->version.get())
757 					.detail("ServerID", data->thisServerID);
758 			throw future_version();
759 		}
760 	}
761 }
762 
getValueQ(StorageServer * data,GetValueRequest req)763 ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
764 	state int64_t resultSize = 0;
765 
766 	try {
767 		++data->counters.getValueQueries;
768 		++data->counters.allQueries;
769 		++data->readQueueSizeMetric;
770 		data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
771 
772 		// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
773 		// so we need to downgrade here
774 		wait( delay(0, TaskDefaultEndpoint) );
775 
776 		if( req.debugID.present() )
777 			g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
778 
779 		state Optional<Value> v;
780 		state Version version = wait( waitForVersion( data, req.version ) );
781 		if( req.debugID.present() )
782 			g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
783 
784 		state uint64_t changeCounter = data->shardChangeCounter;
785 
786 		if (!data->shards[req.key]->isReadable()) {
787 			//TraceEvent("WrongShardServer", data->thisServerID).detail("Key", req.key).detail("Version", version).detail("In", "getValueQ");
788 			throw wrong_shard_server();
789 		}
790 
791 		state int path = 0;
792 		auto i = data->data().at(version).lastLessOrEqual(req.key);
793 		if (i && i->isValue() && i.key() == req.key) {
794 			v = (Value)i->getValue();
795 			path = 1;
796 		} else if (!i || !i->isClearTo() || i->getEndKey() <= req.key) {
797 			path = 2;
798 			Optional<Value> vv = wait( data->storage.readValue( req.key, req.debugID ) );
799 			// Validate that while we were reading the data we didn't lose the version or shard
800 			if (version < data->storageVersion()) {
801 				TEST(true); // transaction_too_old after readValue
802 				throw transaction_too_old();
803 			}
804 			data->checkChangeCounter(changeCounter, req.key);
805 			v = vv;
806 		}
807 
808 		debugMutation("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
809 		debugMutation("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
810 
811 		/*
812 		StorageMetrics m;
813 		m.bytesPerKSecond = req.key.size() + (v.present() ? v.get().size() : 0);
814 		m.iosPerKSecond = 1;
815 		data->metrics.notify(req.key, m);
816 		*/
817 
818 		if (v.present()) {
819 			++data->counters.rowsQueried;
820 			resultSize = v.get().size();
821 			data->counters.bytesQueried += resultSize;
822 		}
823 
824 		if( req.debugID.present() )
825 			g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
826 
827 		GetValueReply reply(v);
828 		reply.penalty = data->getPenalty();
829 		req.reply.send(reply);
830 	} catch (Error& e) {
831 		if(!canReplyWith(e))
832 			throw;
833 		req.reply.sendError(e);
834 	}
835 
836 	++data->counters.finishedQueries;
837 	--data->readQueueSizeMetric;
838 	if(data->latencyBandConfig.present()) {
839 		int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
840 		data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes);
841 	}
842 
843 	return Void();
844 };
845 
watchValue_impl(StorageServer * data,WatchValueRequest req)846 ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req ) {
847 	try {
848 		++data->counters.watchQueries;
849 
850 		if( req.debugID.present() )
851 			g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
852 
853 		Version version = wait( waitForVersionNoTooOld( data, req.version ) );
854 		if( req.debugID.present() )
855 			g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
856 
857 		loop {
858 			try {
859 				state Version latest = data->data().latestVersion;
860 				state Future<Void> watchFuture = data->watches.onChange(req.key);
861 				GetValueRequest getReq( req.key, latest, req.debugID );
862 				state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
863 				GetValueReply reply = wait( getReq.reply.getFuture() );
864 				//TraceEvent("WatcherCheckValue").detail("Key",  req.key  ).detail("Value",  req.value  ).detail("CurrentValue",  v  ).detail("Ver", latest);
865 
866 				debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
867 
868 				if( req.debugID.present() )
869 					g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
870 
871 				if( reply.value != req.value ) {
872 					req.reply.send( latest );
873 					return Void();
874 				}
875 
876 				if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) {
877 					TEST(true); //Too many watches, reverting to polling
878 					req.reply.sendError( watch_cancelled() );
879 					return Void();
880 				}
881 
882 				++data->numWatches;
883 				data->watchBytes += ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
884 				try {
885 					wait( watchFuture );
886 					--data->numWatches;
887 					data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
888 				} catch( Error &e ) {
889 					--data->numWatches;
890 					data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
891 					throw;
892 				}
893 			} catch( Error &e ) {
894 				if( e.code() != error_code_transaction_too_old )
895 					throw;
896 			}
897 		}
898 	} catch (Error& e) {
899 		if(!canReplyWith(e))
900 			throw;
901 		req.reply.sendError(e);
902 	}
903 	return Void();
904 }
905 
watchValueQ(StorageServer * data,WatchValueRequest req)906 ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
907 	state Future<Void> watch = watchValue_impl( data, req );
908 	state double startTime = now();
909 
910 	loop {
911 		double timeoutDelay = -1;
912 		if(data->noRecentUpdates.get()) {
913 			timeoutDelay = std::max(CLIENT_KNOBS->FAST_WATCH_TIMEOUT - (now() - startTime), 0.0);
914 		} else if(!BUGGIFY) {
915 			timeoutDelay = std::max(CLIENT_KNOBS->WATCH_TIMEOUT - (now() - startTime), 0.0);
916 		}
917 		choose {
918 			when( wait( watch ) ) {
919 				return Void();
920 			}
921 			when( wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) {
922 				req.reply.sendError( timed_out() );
923 				return Void();
924 			}
925 			when( wait( data->noRecentUpdates.onChange()) ) {}
926 		}
927 	}
928 }
929 
getShardState_impl(StorageServer * data,GetShardStateRequest req)930 ACTOR Future<Void> getShardState_impl( StorageServer* data, GetShardStateRequest req ) {
931 	ASSERT( req.mode != GetShardStateRequest::NO_WAIT );
932 
933 	loop {
934 		std::vector<Future<Void>> onChange;
935 
936 		for( auto t : data->shards.intersectingRanges( req.keys ) ) {
937 			if( !t.value()->assigned() ) {
938 				onChange.push_back( delay( SERVER_KNOBS->SHARD_READY_DELAY ) );
939 				break;
940 			}
941 
942 			if( req.mode == GetShardStateRequest::READABLE && !t.value()->isReadable() )
943 				onChange.push_back( t.value()->adding->readWrite.getFuture() );
944 
945 			if( req.mode == GetShardStateRequest::FETCHING && !t.value()->isFetched() )
946 				onChange.push_back( t.value()->adding->fetchComplete.getFuture() );
947 		}
948 
949 		if( !onChange.size() ) {
950 			req.reply.send(std::make_pair(data->version.get(), data->durableVersion.get()));
951 			return Void();
952 		}
953 
954 		wait( waitForAll( onChange ) );
955 		wait( delay(0) ); //onChange could have been triggered by cancellation, let things settle before rechecking
956 	}
957 }
958 
getShardStateQ(StorageServer * data,GetShardStateRequest req)959 ACTOR Future<Void> getShardStateQ( StorageServer* data, GetShardStateRequest req ) {
960 	choose {
961 		when( wait( getShardState_impl( data, req ) ) ) {}
962 		when( wait( delay( g_network->isSimulated() ? 10 : 60 ) ) ) {
963 			req.reply.sendError( timed_out() );
964 		}
965 	}
966 	return Void();
967 }
968 
merge(Arena & arena,VectorRef<KeyValueRef> & output,VectorRef<KeyValueRef> const & base,StorageServer::VersionedData::iterator & start,StorageServer::VersionedData::iterator const & end,int versionedDataCount,int limit,bool stopAtEndOfBase,int limitBytes=1<<30)969 void merge( Arena& arena, VectorRef<KeyValueRef>& output, VectorRef<KeyValueRef> const& base,
970 	        StorageServer::VersionedData::iterator& start, StorageServer::VersionedData::iterator const& end,
971 			int versionedDataCount, int limit, bool stopAtEndOfBase, int limitBytes = 1<<30 )
972 // Combines data from base (at an older version) with sets from newer versions in [start, end) and appends the first (up to) |limit| rows to output
973 // If limit<0, base and output are in descending order, and start->key()>end->key(), but start is still inclusive and end is exclusive
974 {
975 	if (limit==0) return;
976 	int originalLimit = abs(limit) + output.size();
977 	bool forward = limit>0;
978 	if (!forward) limit = -limit;
979 	int accumulatedBytes = 0;
980 
981 	KeyValueRef const* baseStart = base.begin();
982 	KeyValueRef const* baseEnd = base.end();
983 	while (baseStart!=baseEnd && start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
984 		if (forward ? baseStart->key < start.key() : baseStart->key > start.key())
985 			output.push_back_deep( arena, *baseStart++ );
986 		else {
987 			output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
988 			if (baseStart->key == start.key()) ++baseStart;
989 			if (forward) ++start; else --start;
990 		}
991 		accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
992 	}
993 	while (baseStart!=baseEnd && --limit>=0 && accumulatedBytes < limitBytes) {
994 		output.push_back_deep( arena, *baseStart++ );
995 		accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
996 	}
997 	if( !stopAtEndOfBase ) {
998 		while (start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
999 			output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
1000 			accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
1001 			if (forward) ++start; else --start;
1002 		}
1003 	}
1004 	ASSERT( output.size() <= originalLimit );
1005 }
1006 
1007 // readRange reads up to |limit| rows from the given range and version, combining data->storage and data->versionedData.
1008 // If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
1009 // readRange has O(|result|) + O(log |data|) cost
readRange(StorageServer * data,Version version,KeyRange range,int limit,int * pLimitBytes)1010 ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes ) {
1011 	state GetKeyValuesReply result;
1012 	state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
1013 	state StorageServer::VersionedData::iterator vStart = view.end();
1014 	state StorageServer::VersionedData::iterator vEnd = view.end();
1015 	state KeyRef readBegin;
1016 	state KeyRef readEnd;
1017 	state Key readBeginTemp;
1018 	state int vCount;
1019 	//state UID rrid = g_random->randomUniqueID();
1020 	//state int originalLimit = limit;
1021 	//state int originalLimitBytes = *pLimitBytes;
1022 	//state bool track = rrid.first() == 0x1bc134c2f752187cLL;
1023 
1024 	// FIXME: Review pLimitBytes behavior
1025 	// if (limit >= 0) we are reading forward, else backward
1026 
1027 	if (limit >= 0) {
1028 		// We might care about a clear beginning before start that
1029 		//  runs into range
1030 		vStart = view.lastLessOrEqual(range.begin);
1031 		if (vStart && vStart->isClearTo() && vStart->getEndKey() > range.begin)
1032 			readBegin = vStart->getEndKey();
1033 		else
1034 			readBegin = range.begin;
1035 
1036 		vStart = view.lower_bound(readBegin);
1037 
1038 		/*if (track) {
1039 			printf("readRange(%llx, @%lld, '%s'-'%s')\n", data->thisServerID.first(), version, printable(range.begin).c_str(), printable(range.end).c_str());
1040 			printf("mvcc:\n");
1041 			vEnd = view.upper_bound(range.end);
1042 			for(auto r=vStart; r != vEnd; ++r) {
1043 				if (r->isClearTo())
1044 					printf("  '%s'-'%s' cleared\n", printable(r.key()).c_str(), printable(r->getEndKey()).c_str());
1045 				else
1046 					printf("  '%s' := '%s'\n", printable(r.key()).c_str(), printable(r->getValue()).c_str());
1047 			}
1048 		}*/
1049 
1050 		while (limit>0 && *pLimitBytes>0 && readBegin < range.end) {
1051 			// ASSERT( vStart == view.lower_bound(readBegin) );
1052 			ASSERT( !vStart || vStart.key() >= readBegin );
1053 			if (vStart) { auto b = vStart; --b; ASSERT( !b || b.key() < readBegin ); }
1054 			ASSERT( data->storageVersion() <= version );
1055 
1056 			// Read up to limit items from the view, stopping at the next clear (or the end of the range)
1057 			vEnd = vStart;
1058 			vCount = 0;
1059 			int vSize = 0;
1060 			while (vEnd && vEnd.key() < range.end && !vEnd->isClearTo() && vCount < limit && vSize < *pLimitBytes){
1061 				vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
1062 				++vCount;
1063 				++vEnd;
1064 			}
1065 
1066 			// Read the data on disk up to vEnd (or the end of the range)
1067 			readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
1068 			Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
1069 					data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
1070 
1071 			/*if (track) {
1072 				printf("read [%s,%s): %d rows\n", printable(readBegin).c_str(), printable(readEnd).c_str(), atStorageVersion.size());
1073 				for(auto r=atStorageVersion.begin(); r != atStorageVersion.end(); ++r)
1074 					printf("  '%s' := '%s'\n", printable(r->key).c_str(), printable(r->value).c_str());
1075 			}*/
1076 
1077 			ASSERT( atStorageVersion.size() <= limit );
1078 			if (data->storageVersion() > version) throw transaction_too_old();
1079 
1080 			bool more = atStorageVersion.size()!=0;
1081 
1082 			// merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if there is 'more'
1083 			int prevSize = result.data.size();
1084 			merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, more, *pLimitBytes );
1085 			limit -= result.data.size() - prevSize;
1086 
1087 			for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
1088 				*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
1089 
1090 			// Setup for the next iteration
1091 			if (more) { // if there might be more data, begin reading right after what we already found to find out
1092 				//if (track) printf("more\n");
1093 				if (!(limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key))
1094 					TraceEvent(SevError, "ReadRangeIssue", data->thisServerID).detail("ReadBegin", readBegin).detail("ReadEnd", readEnd)
1095 						.detail("VStart", vStart ? vStart.key() : LiteralStringRef("nil")).detail("VEnd", vEnd ? vEnd.key() : LiteralStringRef("nil"))
1096 						.detail("AtStorageVersionBack", atStorageVersion.end()[-1].key).detail("ResultBack", result.data.end()[-1].key)
1097 						.detail("Limit", limit).detail("LimitBytes", *pLimitBytes).detail("ResultSize", result.data.size()).detail("PrevSize", prevSize);
1098 				readBegin = readBeginTemp = keyAfter( result.data.end()[-1].key );
1099 				ASSERT( limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key );
1100 			} else if (vStart && vStart->isClearTo()){ // if vStart is a clear, skip it.
1101 				//if (track) printf("skip clear\n");
1102 				readBegin = vStart->getEndKey();  // next disk read should start at the end of the clear
1103 				++vStart;
1104 			} else { // Otherwise, continue at readEnd
1105 				//if (track) printf("continue\n");
1106 				readBegin = readEnd;
1107 			}
1108 		}
1109 		// all but the last item are less than *pLimitBytes
1110 		ASSERT( result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0 );
1111 		/*if (*pLimitBytes <= 0)
1112 			TraceEvent(SevWarn, "ReadRangeLimitExceeded")
1113 				.detail("Version", version)
1114 				.detail("Begin", range.begin )
1115 				.detail("End", range.end )
1116 				.detail("LimitReamin", limit)
1117 				.detail("LimitBytesRemain", *pLimitBytes); */
1118 
1119 		/*GetKeyValuesReply correct = wait( readRangeOld(data, version, range, originalLimit, originalLimitBytes) );
1120 		bool prefix_equal = true;
1121 		int totalsize = 0;
1122 		int first_difference = -1;
1123 		for(int i=0; i<result.data.size() && i<correct.data.size(); i++) {
1124 			if (result.data[i] != correct.data[i]) {
1125 				first_difference = i;
1126 				prefix_equal = false;
1127 				break;
1128 			}
1129 			totalsize += result.data[i].expectedSize() + sizeof(KeyValueRef);
1130 		}
1131 
1132 		// for the following check
1133 		result.more = limit == 0 || *pLimitBytes<=0;  // FIXME: Does this have to be exact?
1134 		result.version = version;
1135 		if ( !(totalsize>originalLimitBytes ? prefix_equal : result.data==correct.data) || correct.more != result.more ) {
1136 			TraceEvent(SevError, "IncorrectResult", rrid).detail("Server", data->thisServerID).detail("CorrectRows", correct.data.size())
1137 				.detail("FirstDifference", first_difference).detail("OriginalLimit", originalLimit)
1138 				.detail("ResultRows", result.data.size()).detail("Result0", result.data[0].key).detail("Correct0", correct.data[0].key)
1139 				.detail("ResultN", result.data.size() ? result.data[std::min(correct.data.size(),result.data.size())-1].key : "nil")
1140 				.detail("CorrectN", correct.data.size() ? correct.data[std::min(correct.data.size(),result.data.size())-1].key : "nil");
1141 		}*/
1142 	} else {
1143 		// Reverse read - abandon hope alle ye who enter here
1144 		readEnd = range.end;
1145 
1146 		vStart = view.lastLess(readEnd);
1147 
1148 		// A clear might extend all the way to range.end
1149 		if (vStart && vStart->isClearTo() && vStart->getEndKey() >= readEnd) {
1150 			readEnd = vStart.key();
1151 			--vStart;
1152 		}
1153 
1154 		while (limit < 0 && *pLimitBytes > 0 && readEnd > range.begin) {
1155 			vEnd = vStart;
1156 			vCount = 0;
1157 			int vSize=0;
1158 			while (vEnd && vEnd.key() >= range.begin && !vEnd->isClearTo() && vCount < -limit && vSize < *pLimitBytes){
1159 				vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
1160 				++vCount;
1161 				--vEnd;
1162 			}
1163 
1164 			readBegin = range.begin;
1165 			if (vEnd)
1166 				readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
1167 
1168 			Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait( data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit ) );
1169 			if (data->storageVersion() > version) throw transaction_too_old();
1170 
1171 			int prevSize = result.data.size();
1172 			merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, false, *pLimitBytes );
1173 			limit += result.data.size() - prevSize;
1174 
1175 			for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
1176 				*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
1177 
1178 			vStart = vEnd;
1179 			readEnd = readBegin;
1180 
1181 			if (vStart && vStart->isClearTo()) {
1182 				ASSERT( vStart.key() < readEnd );
1183 				readEnd = vStart.key();
1184 				--vStart;
1185 			}
1186 		}
1187 	}
1188 	result.more = limit == 0 || *pLimitBytes<=0;  // FIXME: Does this have to be exact?
1189 	result.version = version;
1190 	return result;
1191 }
1192 
selectorInRange(KeySelectorRef const & sel,KeyRangeRef const & range)1193 bool selectorInRange( KeySelectorRef const& sel, KeyRangeRef const& range ) {
1194 	// Returns true if the given range suffices to at least begin to resolve the given KeySelectorRef
1195 	return sel.getKey() >= range.begin && (sel.isBackward() ? sel.getKey() <= range.end : sel.getKey() < range.end);
1196 }
1197 
findKey(StorageServer * data,KeySelectorRef sel,Version version,KeyRange range,int * pOffset)1198 ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset)
1199 // Attempts to find the key indicated by sel in the data at version, within range.
1200 // Precondition: selectorInRange(sel, range)
1201 // If it is found, offset is set to 0 and a key is returned which falls inside range.
1202 // If the search would depend on any key outside range OR if the key selector offset is too large (range read returns too many bytes), it returns either
1203 //   a negative offset and a key in [range.begin, sel.getKey()], indicating the key is (the first key <= returned key) + offset, or
1204 //   a positive offset and a key in (sel.getKey(), range.end], indicating the key is (the first key >= returned key) + offset-1
1205 // The range passed in to this function should specify a shard.  If range.begin is repeatedly not the beginning of a shard, then it is possible to get stuck looping here
1206 {
1207 	ASSERT( version != latestVersion );
1208 	ASSERT( selectorInRange(sel, range) && version >= data->oldestVersion.get() );
1209 
1210 	// Count forward or backward distance items, skipping the first one if it == key and skipEqualKey
1211 	state bool forward = sel.offset > 0;                  // If forward, result >= sel.getKey(); else result <= sel.getKey()
1212 	state int sign = forward ? +1 : -1;
1213 	state bool skipEqualKey = sel.orEqual == forward;
1214 	state int distance = forward ? sel.offset : 1-sel.offset;
1215 
1216 	//Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from the read range in this case)
1217 	state int maxBytes;
1218 	if (sel.offset <= 1 && sel.offset >= 0)
1219 		maxBytes = std::numeric_limits<int>::max();
1220 	else
1221 		maxBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_LIMIT_BYTES : SERVER_KNOBS->STORAGE_LIMIT_BYTES;
1222 
1223 	state GetKeyValuesReply rep = wait( readRange( data, version, forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())), (distance + skipEqualKey)*sign, &maxBytes ) );
1224 	state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
1225 
1226 	//If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in a loop
1227 	if(more && !forward && rep.data.size() == 1) {
1228 		TEST(true); //Reverse key selector returned only one result in range read
1229 		maxBytes = std::numeric_limits<int>::max();
1230 		GetKeyValuesReply rep2 = wait( readRange( data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes ) );
1231 		rep = rep2;
1232 		more = rep.more && rep.data.size() != distance + skipEqualKey;
1233 		ASSERT(rep.data.size() == 2 || !more);
1234 	}
1235 
1236 	int index = distance-1;
1237 	if (skipEqualKey && rep.data.size() && rep.data[0].key == sel.getKey() )
1238 		++index;
1239 
1240 	if (index < rep.data.size()) {
1241 		*pOffset = 0;
1242 		return rep.data[ index ].key;
1243 	} else {
1244 		// FIXME: If range.begin=="" && !forward, return success?
1245 		*pOffset = index - rep.data.size() + 1;
1246 		if (!forward) *pOffset = -*pOffset;
1247 
1248 		if (more) {
1249 			TEST(true); // Key selector read range had more results
1250 
1251 			ASSERT(rep.data.size());
1252 			Key returnKey = forward ? keyAfter(rep.data.back().key) : rep.data.back().key;
1253 
1254 			//This is possible if key/value pairs are very large and only one result is returned on a last less than query
1255 			//SOMEDAY: graceful handling of exceptionally sized values
1256 			ASSERT(returnKey != sel.getKey());
1257 
1258 			return returnKey;
1259 		}
1260 		else
1261 			return forward ? range.end : range.begin;
1262 	}
1263 }
1264 
getShardKeyRange(StorageServer * data,const KeySelectorRef & sel)1265 KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
1266 // Returns largest range such that the shard state isReadable and selectorInRange(sel, range) or wrong_shard_server if no such range exists
1267 {
1268 	auto i = sel.isBackward() ? data->shards.rangeContainingKeyBefore( sel.getKey() ) : data->shards.rangeContaining( sel.getKey() );
1269 	if (!i->value()->isReadable()) throw wrong_shard_server();
1270 	ASSERT( selectorInRange(sel, i->range()) );
1271 	return i->range();
1272 }
1273 
getKeyValues(StorageServer * data,GetKeyValuesRequest req)1274 ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
1275 // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents
1276 // all data from being read in one range read
1277 {
1278 	state int64_t resultSize = 0;
1279 
1280 	++data->counters.getRangeQueries;
1281 	++data->counters.allQueries;
1282 	++data->readQueueSizeMetric;
1283 	data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
1284 
1285 	// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
1286 	// so we need to downgrade here
1287 	wait( delay(0, TaskDefaultEndpoint) );
1288 
1289 	try {
1290 		if( req.debugID.present() )
1291 			g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
1292 		state Version version = wait( waitForVersion( data, req.version ) );
1293 
1294 		state uint64_t changeCounter = data->shardChangeCounter;
1295 //		try {
1296 		state KeyRange shard = getShardKeyRange( data, req.begin );
1297 
1298 		if( req.debugID.present() )
1299 			g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
1300 		//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
1301 		//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
1302 
1303 		if ( !selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end) ) {
1304 //			TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents");
1305 			throw wrong_shard_server();
1306 		}
1307 
1308 		state int offset1;
1309 		state int offset2;
1310 		state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1 );
1311 		state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
1312 		state Key begin = wait(fBegin);
1313 		state Key end = wait(fEnd);
1314 		if( req.debugID.present() )
1315 			g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
1316 		//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
1317 
1318 		// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
1319 		// An end offset of 1 is also OK because the end key is exclusive, so if the first key of the next shard is the end the last actual key returned must be from this shard.
1320 		// A begin offset of 1 is also OK because then either begin is past end or equal to end (so the result is definitely empty)
1321 		if ((offset1 && offset1!=1) || (offset2 && offset2!=1)) {
1322 			TEST(true);  // wrong_shard_server due to offset
1323 			// We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end, and return a clipped range rather
1324 			// than an error (since that is what the NativeAPI.getRange will do anyway via its "slow path"), but we would have to add some flags to the response
1325 			// to encode whether we went off the beginning and the end, since it needs that information.
1326 			//TraceEvent("WrongShardServer2", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkOffsets").detail("BeginKey", begin).detail("EndKey", end).detail("BeginOffset", offset1).detail("EndOffset", offset2);
1327 			throw wrong_shard_server();
1328 		}
1329 
1330 		if (begin >= end) {
1331 			if( req.debugID.present() )
1332 				g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Send");
1333 			//.detail("Begin",begin).detail("End",end);
1334 
1335 			GetKeyValuesReply none;
1336 			none.version = version;
1337 			none.more = false;
1338 			none.penalty = data->getPenalty();
1339 
1340 			data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.begin.getKey(), req.end.getKey()), std::max<KeyRef>(req.begin.getKey(), req.end.getKey()) ) );
1341 			req.reply.send( none );
1342 		} else {
1343 			state int remainingLimitBytes = req.limitBytes;
1344 
1345 			GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
1346 			GetKeyValuesReply r = _r;
1347 
1348 			if( req.debugID.present() )
1349 				g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange");
1350 			//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
1351 			data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())), std::max<KeyRef>(end, std::max<KeyRef>(req.begin.getKey(), req.end.getKey())) ) );
1352 			if (EXPENSIVE_VALIDATION) {
1353 				for (int i = 0; i < r.data.size(); i++)
1354 					ASSERT(r.data[i].key >= begin && r.data[i].key < end);
1355 				ASSERT(r.data.size() <= std::abs(req.limit));
1356 			}
1357 
1358 			/*for( int i = 0; i < r.data.size(); i++ ) {
1359 				StorageMetrics m;
1360 				m.bytesPerKSecond = r.data[i].expectedSize();
1361 				m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
1362 				data->metrics.notify(r.data[i].key, m);
1363 			}*/
1364 
1365 			r.penalty = data->getPenalty();
1366 			req.reply.send( r );
1367 
1368 			resultSize = req.limitBytes - remainingLimitBytes;
1369 			data->counters.bytesQueried += resultSize;
1370 			data->counters.rowsQueried += r.data.size();
1371 		}
1372 	} catch (Error& e) {
1373 		if(!canReplyWith(e))
1374 			throw;
1375 		req.reply.sendError(e);
1376 	}
1377 
1378 	++data->counters.finishedQueries;
1379 	--data->readQueueSizeMetric;
1380 
1381 	if(data->latencyBandConfig.present()) {
1382 		int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
1383 		int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
1384 		data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes || abs(req.begin.offset) > maxSelectorOffset || abs(req.end.offset) > maxSelectorOffset);
1385 	}
1386 
1387 	return Void();
1388 }
1389 
getKey(StorageServer * data,GetKeyRequest req)1390 ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
1391 	state int64_t resultSize = 0;
1392 
1393 	++data->counters.getKeyQueries;
1394 	++data->counters.allQueries;
1395 	++data->readQueueSizeMetric;
1396 	data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
1397 
1398 	// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
1399 	// so we need to downgrade here
1400 	wait( delay(0, TaskDefaultEndpoint) );
1401 
1402 	try {
1403 		state Version version = wait( waitForVersion( data, req.version ) );
1404 		state uint64_t changeCounter = data->shardChangeCounter;
1405 		state KeyRange shard = getShardKeyRange( data, req.sel );
1406 
1407 		state int offset;
1408 		Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
1409 
1410 		data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
1411 
1412 		KeySelector updated;
1413 		if (offset < 0)
1414 			updated = firstGreaterOrEqual(k)+offset;  // first thing on this shard OR (large offset case) smallest key retrieved in range read
1415 		else if (offset > 0)
1416 			updated = firstGreaterOrEqual(k)+offset-1;	// first thing on next shard OR (large offset case) keyAfter largest key retrieved in range read
1417 		else
1418 			updated = KeySelectorRef(k,true,0); //found
1419 
1420 		resultSize = k.size();
1421 		data->counters.bytesQueried += resultSize;
1422 		++data->counters.rowsQueried;
1423 
1424 		GetKeyReply reply(updated);
1425 		reply.penalty = data->getPenalty();
1426 		req.reply.send(reply);
1427 	}
1428 	catch (Error& e) {
1429 		//if (e.code() == error_code_wrong_shard_server) TraceEvent("WrongShardServer").detail("In","getKey");
1430 		if(!canReplyWith(e))
1431 			throw;
1432 		req.reply.sendError(e);
1433 	}
1434 
1435 	++data->counters.finishedQueries;
1436 	--data->readQueueSizeMetric;
1437 	if(data->latencyBandConfig.present()) {
1438 		int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
1439 		int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
1440 		data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes || abs(req.sel.offset) > maxSelectorOffset);
1441 	}
1442 
1443 	return Void();
1444 }
1445 
getQueuingMetrics(StorageServer * self,StorageQueuingMetricsRequest const & req)1446 void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const& req ) {
1447 	StorageQueuingMetricsReply reply;
1448 	reply.localTime = now();
1449 	reply.instanceID = self->instanceID;
1450 	reply.bytesInput = self->counters.bytesInput.getValue();
1451 	reply.bytesDurable = self->counters.bytesDurable.getValue();
1452 
1453 	reply.storageBytes = self->storage.getStorageBytes();
1454 
1455 	reply.version = self->version.get();
1456 	reply.cpuUsage = self->cpuUsage;
1457 	reply.diskUsage = self->diskUsage;
1458 	reply.durableVersion = self->durableVersion.get();
1459 	req.reply.send( reply );
1460 }
1461 
1462 #pragma endregion
1463 
1464 /////////////////////////// Updates ////////////////////////////////
1465 #pragma region Updates
1466 
doEagerReads(StorageServer * data,UpdateEagerReadInfo * eager)1467 ACTOR Future<Void> doEagerReads( StorageServer* data, UpdateEagerReadInfo* eager ) {
1468 	eager->finishKeyBegin();
1469 
1470 	vector<Future<Key>> keyEnd( eager->keyBegin.size() );
1471 	for(int i=0; i<keyEnd.size(); i++)
1472 		keyEnd[i] = data->storage.readNextKeyInclusive( eager->keyBegin[i] );
1473 
1474 	state Future<vector<Key>> futureKeyEnds = getAll(keyEnd);
1475 
1476 	vector<Future<Optional<Value>>> value( eager->keys.size() );
1477 	for(int i=0; i<value.size(); i++)
1478 		value[i] = data->storage.readValuePrefix( eager->keys[i].first, eager->keys[i].second );
1479 
1480 	state Future<vector<Optional<Value>>> futureValues = getAll(value);
1481 	state vector<Key> keyEndVal = wait( futureKeyEnds );
1482 	vector<Optional<Value>> optionalValues = wait ( futureValues);
1483 
1484 	eager->keyEnd = keyEndVal;
1485 	eager->value = optionalValues;
1486 
1487 	return Void();
1488 }
1489 
changeDurableVersion(StorageServer * data,Version desiredDurableVersion)1490 bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion ) {
1491 	// Remove entries from the latest version of data->versionedData that haven't changed since they were inserted
1492 	//   before or at desiredDurableVersion, to maintain the invariants for versionedData.
1493 	// Such entries remain in older versions of versionedData until they are forgotten, because it is expensive to dig them out.
1494 	// We also remove everything up to and including newDurableVersion from mutationLog, and everything
1495 	//   up to but excluding desiredDurableVersion from freeable
1496 	// May return false if only part of the work has been done, in which case the caller must call again with the same parameters
1497 
1498 	auto& verData = data->mutableData();
1499 	ASSERT( verData.getLatestVersion() == data->version.get() || verData.getLatestVersion() == data->version.get()+1 );
1500 
1501 	Version nextDurableVersion = desiredDurableVersion;
1502 
1503 	auto mlv = data->getMutationLog().begin();
1504 	if (mlv != data->getMutationLog().end() && mlv->second.version <= desiredDurableVersion) {
1505 		auto& v = mlv->second;
1506 		nextDurableVersion = v.version;
1507 		data->freeable[ data->version.get() ].dependsOn( v.arena() );
1508 
1509 		if (verData.getLatestVersion() <= data->version.get())
1510 			verData.createNewVersion( data->version.get()+1 );
1511 
1512 		int64_t bytesDurable = VERSION_OVERHEAD;
1513 		for(auto m = v.mutations.begin(); m; ++m) {
1514 			bytesDurable += mvccStorageBytes(*m);
1515 			auto i = verData.atLatest().find(m->param1);
1516 			if (i) {
1517 				ASSERT( i.key() == m->param1 );
1518 				ASSERT( i.insertVersion() >= nextDurableVersion );
1519 				if (i.insertVersion() == nextDurableVersion)
1520 					verData.erase(i);
1521 			}
1522 			if (m->type == MutationRef::SetValue) {
1523 				// A set can split a clear, so there might be another entry immediately after this one that should also be cleaned up
1524 				i = verData.atLatest().upper_bound(m->param1);
1525 				if (i) {
1526 					ASSERT( i.insertVersion() >= nextDurableVersion );
1527 					if (i.insertVersion() == nextDurableVersion)
1528 						verData.erase(i);
1529 				}
1530 			}
1531 		}
1532 		data->counters.bytesDurable += bytesDurable;
1533 	}
1534 
1535 	if (EXPENSIVE_VALIDATION) {
1536 		// Check that the above loop did its job
1537 		auto view = data->data().atLatest();
1538 		for(auto i = view.begin(); i != view.end(); ++i)
1539 			ASSERT( i.insertVersion() > nextDurableVersion );
1540 	}
1541 	data->getMutableMutationLog().erase(data->getMutationLog().begin(), data->getMutationLog().upper_bound(nextDurableVersion));
1542 	data->freeable.erase( data->freeable.begin(), data->freeable.lower_bound(nextDurableVersion) );
1543 
1544 	Future<Void> checkFatalError = data->otherError.getFuture();
1545 	data->durableVersion.set( nextDurableVersion );
1546 	if (checkFatalError.isReady()) checkFatalError.get();
1547 
1548 	//TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion);
1549 	validate(data);
1550 
1551 	return nextDurableVersion == desiredDurableVersion;
1552 }
1553 
clipMutation(MutationRef const & m,KeyRangeRef range)1554 Optional<MutationRef> clipMutation( MutationRef const& m, KeyRangeRef range ) {
1555 	if (isSingleKeyMutation((MutationRef::Type) m.type)) {
1556 		if (range.contains(m.param1)) return m;
1557 	}
1558 	else if (m.type == MutationRef::ClearRange) {
1559 		KeyRangeRef i = range & KeyRangeRef(m.param1, m.param2);
1560 		if (!i.empty())
1561 			return MutationRef( (MutationRef::Type)m.type, i.begin, i.end );
1562 	}
1563 	else
1564 		ASSERT(false);
1565 	return Optional<MutationRef>();
1566 }
1567 
expandMutation(MutationRef & m,StorageServer::VersionedData const & data,UpdateEagerReadInfo * eager,KeyRef eagerTrustedEnd,Arena & ar)1568 bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, UpdateEagerReadInfo* eager, KeyRef eagerTrustedEnd, Arena& ar ) {
1569 	// After this function call, m should be copied into an arena immediately (before modifying data, shards, or eager)
1570 	if (m.type == MutationRef::ClearRange) {
1571 		// Expand the clear
1572 		const auto& d = data.atLatest();
1573 
1574 		// If another clear overlaps the beginning of this one, engulf it
1575 		auto i = d.lastLess(m.param1);
1576 		if (i && i->isClearTo() && i->getEndKey() >= m.param1)
1577 			m.param1 = i.key();
1578 
1579 		// If another clear overlaps the end of this one, engulf it; otherwise expand
1580 		i = d.lastLessOrEqual(m.param2);
1581 		if (i && i->isClearTo() && i->getEndKey() >= m.param2) {
1582 			m.param2 = i->getEndKey();
1583 		} else {
1584 			// Expand to the next set or clear (from storage or latestVersion), and if it
1585 			// is a clear, engulf it as well
1586 			i = d.lower_bound(m.param2);
1587 			KeyRef endKeyAtStorageVersion = m.param2 == eagerTrustedEnd ? eagerTrustedEnd : std::min( eager->getKeyEnd( m.param2 ), eagerTrustedEnd );
1588 			if (!i || endKeyAtStorageVersion < i.key())
1589 				m.param2 = endKeyAtStorageVersion;
1590 			else if (i->isClearTo())
1591 				m.param2 = i->getEndKey();
1592 			else
1593 				m.param2 = i.key();
1594 		}
1595 	}
1596 	else if (m.type != MutationRef::SetValue && (m.type)) {
1597 
1598 		Optional<StringRef> oldVal;
1599 		auto it = data.atLatest().lastLessOrEqual(m.param1);
1600 		if (it != data.atLatest().end() && it->isValue() && it.key() == m.param1)
1601 			oldVal = it->getValue();
1602 		else if (it != data.atLatest().end() && it->isClearTo() && it->getEndKey() > m.param1) {
1603 			TEST(true); // Atomic op right after a clear.
1604 		}
1605 		else {
1606 			Optional<Value>& oldThing = eager->getValue(m.param1);
1607 			if (oldThing.present())
1608 				oldVal = oldThing.get();
1609 		}
1610 
1611 		switch(m.type) {
1612 		case MutationRef::AddValue:
1613 			m.param2 = doLittleEndianAdd(oldVal, m.param2, ar);
1614 			break;
1615 		case MutationRef::And:
1616 			m.param2 = doAnd(oldVal, m.param2, ar);
1617 			break;
1618 		case MutationRef::Or:
1619 			m.param2 = doOr(oldVal, m.param2, ar);
1620 			break;
1621 		case MutationRef::Xor:
1622 			m.param2 = doXor(oldVal, m.param2, ar);
1623 			break;
1624 		case MutationRef::AppendIfFits:
1625 			m.param2 = doAppendIfFits(oldVal, m.param2, ar);
1626 			break;
1627 		case MutationRef::Max:
1628 			m.param2 = doMax(oldVal, m.param2, ar);
1629 			break;
1630 		case MutationRef::Min:
1631 			m.param2 = doMin(oldVal, m.param2, ar);
1632 			break;
1633 		case MutationRef::ByteMin:
1634 			m.param2 = doByteMin(oldVal, m.param2, ar);
1635 			break;
1636 		case MutationRef::ByteMax:
1637 			m.param2 = doByteMax(oldVal, m.param2, ar);
1638 			break;
1639 		case MutationRef::MinV2:
1640 			m.param2 = doMinV2(oldVal, m.param2, ar);
1641 			break;
1642 		case MutationRef::AndV2:
1643 			m.param2 = doAndV2(oldVal, m.param2, ar);
1644 			break;
1645 		case MutationRef::CompareAndClear:
1646 			if (oldVal.present() && m.param2 == oldVal.get()) {
1647 				m.type = MutationRef::ClearRange;
1648 				m.param2 = keyAfter(m.param1, ar);
1649 				return expandMutation(m, data, eager, eagerTrustedEnd, ar);
1650 			}
1651 			return false;
1652 		}
1653 		m.type = MutationRef::SetValue;
1654 	}
1655 
1656 	return true;
1657 }
1658 
isClearContaining(StorageServer::VersionedData::ViewAtVersion const & view,KeyRef key)1659 bool isClearContaining( StorageServer::VersionedData::ViewAtVersion const& view, KeyRef key ) {
1660 	auto i = view.lastLessOrEqual(key);
1661 	return i && i->isClearTo() && i->getEndKey() > key;
1662 }
1663 
applyMutation(StorageServer * self,MutationRef const & m,Arena & arena,StorageServer::VersionedData & data)1664 void applyMutation( StorageServer *self, MutationRef const& m, Arena& arena, StorageServer::VersionedData &data ) {
1665 	// m is expected to be in arena already
1666 	// Clear split keys are added to arena
1667 	StorageMetrics metrics;
1668 	metrics.bytesPerKSecond = mvccStorageBytes( m ) / 2;
1669 	metrics.iosPerKSecond = 1;
1670 	self->metrics.notify(m.param1, metrics);
1671 
1672 	if (m.type == MutationRef::SetValue) {
1673 		auto prev = data.atLatest().lastLessOrEqual(m.param1);
1674 		if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) {
1675 			ASSERT( prev.key() <= m.param1 );
1676 			KeyRef end = prev->getEndKey();
1677 			// the insert version of the previous clear is preserved for the "left half", because in changeDurableVersion() the previous clear is still responsible for removing it
1678 			// insert() invalidates prev, so prev.key() is not safe to pass to it by reference
1679 			data.insert( KeyRef(prev.key()), ValueOrClearToRef::clearTo( m.param1 ), prev.insertVersion() );  // overwritten by below insert if empty
1680 			KeyRef nextKey = keyAfter(m.param1, arena);
1681 			if ( end != nextKey ) {
1682 				ASSERT( end > nextKey );
1683 				// the insert version of the "right half" is not preserved, because in changeDurableVersion() this set is responsible for removing it
1684 				// FIXME: This copy is technically an asymptotic problem, definitely a waste of memory (copy of keyAfter is a waste, but not asymptotic)
1685 				data.insert( nextKey, ValueOrClearToRef::clearTo( KeyRef(arena, end) ) );
1686 			}
1687 		}
1688 		data.insert( m.param1, ValueOrClearToRef::value(m.param2) );
1689 		self->watches.trigger( m.param1 );
1690 	} else if (m.type == MutationRef::ClearRange) {
1691 		data.erase( m.param1, m.param2 );
1692 		ASSERT( m.param2 > m.param1 );
1693 		ASSERT( !isClearContaining( data.atLatest(), m.param1 ) );
1694 		data.insert( m.param1, ValueOrClearToRef::clearTo(m.param2) );
1695 		self->watches.triggerRange( m.param1, m.param2 );
1696 	}
1697 
1698 }
1699 
removeDataRange(StorageServer * ss,Standalone<VersionUpdateRef> & mLV,KeyRangeMap<Reference<ShardInfo>> & shards,KeyRangeRef range)1700 void removeDataRange( StorageServer *ss, Standalone<VersionUpdateRef> &mLV, KeyRangeMap<Reference<ShardInfo>>& shards, KeyRangeRef range ) {
1701 	// modify the latest version of data to remove all sets and trim all clears to exclude range.
1702 	// Add a clear to mLV (mutationLog[data.getLatestVersion()]) that ensures all keys in range are removed from the disk when this latest version becomes durable
1703 	// mLV is also modified if necessary to ensure that split clears can be forgotten
1704 
1705 	MutationRef clearRange( MutationRef::ClearRange, range.begin, range.end );
1706 	clearRange = ss->addMutationToMutationLog( mLV, clearRange );
1707 
1708 	auto& data = ss->mutableData();
1709 
1710 	// Expand the range to the right to include other shards not in versionedData
1711 	for( auto r = shards.rangeContaining(range.end); r != shards.ranges().end() && !r->value()->isInVersionedData(); ++r )
1712 		range = KeyRangeRef(range.begin, r->end());
1713 
1714 	auto endClear = data.atLatest().lastLess( range.end );
1715 	if (endClear && endClear->isClearTo() && endClear->getEndKey() > range.end ) {
1716 		// This clear has been bumped up to insertVersion==data.getLatestVersion and needs a corresponding mutation log entry to forget
1717 		MutationRef m( MutationRef::ClearRange, range.end, endClear->getEndKey() );
1718 		m = ss->addMutationToMutationLog( mLV, m );
1719 		data.insert( m.param1, ValueOrClearToRef::clearTo( m.param2 ) );
1720 	}
1721 
1722 	auto beginClear = data.atLatest().lastLess( range.begin );
1723 	if (beginClear && beginClear->isClearTo() && beginClear->getEndKey() > range.begin ) {
1724 		// We don't need any special mutationLog entry - because the begin key and insert version are unchanged the original clear
1725 		//   mutation works to forget this one - but we need range.begin in the right arena
1726 		KeyRef rb(  mLV.arena(), range.begin );
1727 		// insert() invalidates beginClear, so beginClear.key() is not safe to pass to it by reference
1728 		data.insert( KeyRef(beginClear.key()), ValueOrClearToRef::clearTo( rb ), beginClear.insertVersion() );
1729 	}
1730 
1731 	data.erase( range.begin, range.end );
1732 }
1733 
1734 void setAvailableStatus( StorageServer* self, KeyRangeRef keys, bool available );
1735 void setAssignedStatus( StorageServer* self, KeyRangeRef keys, bool nowAssigned );
1736 
coalesceShards(StorageServer * data,KeyRangeRef keys)1737 void coalesceShards(StorageServer *data, KeyRangeRef keys) {
1738 	auto shardRanges = data->shards.intersectingRanges(keys);
1739 	auto fullRange = data->shards.ranges();
1740 
1741 	auto iter = shardRanges.begin();
1742 	if( iter != fullRange.begin() ) --iter;
1743 	auto iterEnd = shardRanges.end();
1744 	if( iterEnd != fullRange.end() ) ++iterEnd;
1745 
1746 	bool lastReadable = false;
1747 	bool lastNotAssigned = false;
1748 	KeyRangeMap<Reference<ShardInfo>>::Iterator lastRange;
1749 
1750 	for( ; iter != iterEnd; ++iter) {
1751 		if( lastReadable && iter->value()->isReadable() ) {
1752 			KeyRange range = KeyRangeRef( lastRange->begin(), iter->end() );
1753 			data->addShard( ShardInfo::newReadWrite( range, data) );
1754 			iter = data->shards.rangeContaining(range.begin);
1755 		} else if( lastNotAssigned && iter->value()->notAssigned() ) {
1756 			KeyRange range = KeyRangeRef( lastRange->begin(), iter->end() );
1757 			data->addShard( ShardInfo::newNotAssigned( range) );
1758 			iter = data->shards.rangeContaining(range.begin);
1759 		}
1760 
1761 		lastReadable = iter->value()->isReadable();
1762 		lastNotAssigned = iter->value()->notAssigned();
1763 		lastRange = iter;
1764 	}
1765 }
1766 
tryGetRange(Database cx,Version version,KeyRangeRef keys,GetRangeLimits limits,bool * isTooOld)1767 ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version version, KeyRangeRef keys, GetRangeLimits limits, bool* isTooOld ) {
1768 	state Transaction tr( cx );
1769 	state Standalone<RangeResultRef> output;
1770 	state KeySelectorRef begin = firstGreaterOrEqual( keys.begin );
1771 	state KeySelectorRef end = firstGreaterOrEqual( keys.end );
1772 
1773 	if( *isTooOld )
1774 		throw transaction_too_old();
1775 
1776 	tr.setVersion( version );
1777 	limits.minRows = 0;
1778 
1779 	try {
1780 		loop {
1781 			Standalone<RangeResultRef> rep = wait( tr.getRange( begin, end, limits, true ) );
1782 			limits.decrement( rep );
1783 
1784 			if( limits.isReached() || !rep.more ) {
1785 				if( output.size() ) {
1786 					output.arena().dependsOn( rep.arena() );
1787 					output.append( output.arena(), rep.begin(), rep.size() );
1788 					if( limits.isReached() && rep.readThrough.present() )
1789 						output.readThrough = rep.readThrough.get();
1790 				} else {
1791 					output = rep;
1792 				}
1793 
1794 				output.more = limits.isReached();
1795 
1796 				return output;
1797 			} else if( rep.readThrough.present() ) {
1798 				output.arena().dependsOn( rep.arena() );
1799 				if( rep.size() ) {
1800 					output.append( output.arena(), rep.begin(), rep.size() );
1801 					ASSERT( rep.readThrough.get() > rep.end()[-1].key );
1802 				} else {
1803 					ASSERT( rep.readThrough.get() > keys.begin );
1804 				}
1805 				begin = firstGreaterOrEqual( rep.readThrough.get() );
1806 			} else {
1807 				output.arena().dependsOn( rep.arena() );
1808 				output.append( output.arena(), rep.begin(), rep.size() );
1809 				begin = firstGreaterThan( output.end()[-1].key );
1810 			}
1811 		}
1812 	} catch( Error &e ) {
1813 		if( begin.getKey() != keys.begin && ( e.code() == error_code_transaction_too_old || e.code() == error_code_future_version || e.code() == error_code_process_behind ) ) {
1814 			if( e.code() == error_code_transaction_too_old )
1815 				*isTooOld = true;
1816 			output.more = true;
1817 			if( begin.isFirstGreaterOrEqual() )
1818 				output.readThrough = begin.getKey();
1819 			return output;
1820 		}
1821 		throw;
1822 	}
1823 }
1824 
1825 template <class T>
addMutation(T & target,Version version,MutationRef const & mutation)1826 void addMutation( T& target, Version version, MutationRef const& mutation ) {
1827 	target.addMutation( version, mutation );
1828 }
1829 
1830 template <class T>
addMutation(Reference<T> & target,Version version,MutationRef const & mutation)1831 void addMutation( Reference<T>& target, Version version, MutationRef const& mutation ) {
1832 	addMutation(*target, version, mutation);
1833 }
1834 
1835 template <class T>
splitMutations(KeyRangeMap<T> & map,VerUpdateRef const & update)1836 void splitMutations( KeyRangeMap<T>& map, VerUpdateRef const& update ) {
1837 	for(auto& m : update.mutations) {
1838 		splitMutation(map, m, update.version);
1839 	}
1840 }
1841 
1842 template <class T>
splitMutation(KeyRangeMap<T> & map,MutationRef const & m,Version ver)1843 void splitMutation( KeyRangeMap<T>& map, MutationRef const& m, Version ver ) {
1844 	if(isSingleKeyMutation((MutationRef::Type) m.type)) {
1845 		if ( !SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(m.param1) )
1846 			addMutation( map.rangeContaining(m.param1)->value(), ver, m );
1847 	}
1848 	else if (m.type == MutationRef::ClearRange) {
1849 		KeyRangeRef mKeys( m.param1, m.param2 );
1850 		if ( !SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(mKeys) ){
1851 			auto r = map.intersectingRanges( mKeys );
1852 			for(auto i = r.begin(); i != r.end(); ++i) {
1853 				KeyRangeRef k = mKeys & i->range();
1854 				addMutation( i->value(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end) );
1855 			}
1856 		}
1857 	}
1858 	else
1859 		ASSERT(false);  // Unknown mutation type in splitMutations
1860 }
1861 
fetchKeys(StorageServer * data,AddingShard * shard)1862 ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
1863 	state TraceInterval interval("FetchKeys");
1864 	state KeyRange keys = shard->keys;
1865 	state double startt = now();
1866 	state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES;
1867 
1868 	// delay(0) to force a return to the run loop before the work of fetchKeys is started.
1869 	//  This allows adding->start() to be called inline with CSK.
1870 	wait( data->coreStarted.getFuture() && delay( 0 ) );
1871 
1872 	try {
1873 		debugKeyRange("fetchKeysBegin", data->version.get(), shard->keys);
1874 
1875 		TraceEvent(SevDebug, interval.begin(), data->thisServerID)
1876 			.detail("KeyBegin", shard->keys.begin)
1877 			.detail("KeyEnd",shard->keys.end);
1878 
1879 		validate(data);
1880 
1881 		// Wait (if necessary) for the latest version at which any key in keys was previously available (+1) to be durable
1882 		auto navr = data->newestAvailableVersion.intersectingRanges( keys );
1883 		Version lastAvailable = invalidVersion;
1884 		for(auto r=navr.begin(); r!=navr.end(); ++r) {
1885 			ASSERT( r->value() != latestVersion );
1886 			lastAvailable = std::max(lastAvailable, r->value());
1887 		}
1888 		auto ndvr = data->newestDirtyVersion.intersectingRanges( keys );
1889 		for(auto r=ndvr.begin(); r!=ndvr.end(); ++r)
1890 			lastAvailable = std::max(lastAvailable, r->value());
1891 
1892 		if (lastAvailable != invalidVersion && lastAvailable >= data->durableVersion.get()) {
1893 			TEST(true);  // FetchKeys waits for previous available version to be durable
1894 			wait( data->durableVersion.whenAtLeast(lastAvailable+1) );
1895 		}
1896 
1897 		TraceEvent(SevDebug, "FetchKeysVersionSatisfied", data->thisServerID).detail("FKID", interval.pairID);
1898 
1899 		wait( data->fetchKeysParallelismLock.take( TaskDefaultYield, fetchBlockBytes ) );
1900 		state FlowLock::Releaser holdingFKPL( data->fetchKeysParallelismLock, fetchBlockBytes );
1901 
1902 		state double executeStart = now();
1903 		++data->counters.fetchWaitingCount;
1904 		data->counters.fetchWaitingMS += 1000*(executeStart - startt);
1905 
1906 		// Fetch keys gets called while the update actor is processing mutations. data->version will not be updated until all mutations for a version
1907 		// have been processed. We need to take the durableVersionLock to ensure data->version is greater than the version of the mutation which caused
1908 		// the fetch to be initiated.
1909 		wait( data->durableVersionLock.take() );
1910 
1911 		shard->phase = AddingShard::Fetching;
1912 		state Version fetchVersion = data->version.get();
1913 
1914 		data->durableVersionLock.release();
1915 
1916 		wait(delay(0));
1917 
1918 		TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID).detail("FKID", interval.pairID).detail("Version", fetchVersion);
1919 
1920 		// Get the history
1921 		state int debug_getRangeRetries = 0;
1922 		state int debug_nextRetryToLog = 1;
1923 		state bool isTooOld = false;
1924 
1925 		//FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server we must refresh the cache manually.
1926 		data->cx->invalidateCache(keys);
1927 
1928 		loop {
1929 			try {
1930 				TEST(true);		// Fetching keys for transferred shard
1931 
1932 				state Standalone<RangeResultRef> this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isTooOld ) );
1933 
1934 				int expectedSize = (int)this_block.expectedSize() + (8-(int)sizeof(KeyValueRef))*this_block.size();
1935 
1936 				TraceEvent(SevDebug, "FetchKeysBlock", data->thisServerID).detail("FKID", interval.pairID)
1937 					.detail("BlockRows", this_block.size()).detail("BlockBytes", expectedSize)
1938 					.detail("KeyBegin", keys.begin).detail("KeyEnd", keys.end)
1939 					.detail("Last", this_block.size() ? this_block.end()[-1].key : std::string())
1940 					.detail("Version", fetchVersion).detail("More", this_block.more);
1941 				debugKeyRange("fetchRange", fetchVersion, keys);
1942 				for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
1943 
1944 				data->counters.bytesFetched += expectedSize;
1945 				if( fetchBlockBytes > expectedSize ) {
1946 					holdingFKPL.release( fetchBlockBytes - expectedSize );
1947 				}
1948 
1949 				// Wait for permission to proceed
1950 				//wait( data->fetchKeysStorageWriteLock.take() );
1951 				//state FlowLock::Releaser holdingFKSWL( data->fetchKeysStorageWriteLock );
1952 
1953 				// Write this_block to storage
1954 				state KeyValueRef *kvItr = this_block.begin();
1955 				for(; kvItr != this_block.end(); ++kvItr) {
1956 					data->storage.writeKeyValue( *kvItr );
1957 					wait(yield());
1958 				}
1959 
1960 				kvItr = this_block.begin();
1961 				for(; kvItr != this_block.end(); ++kvItr) {
1962 					data->byteSampleApplySet( *kvItr, invalidVersion );
1963 					wait(yield());
1964 				}
1965 
1966 				if (this_block.more) {
1967 					Key nfk = this_block.readThrough.present() ? this_block.readThrough.get() : keyAfter( this_block.end()[-1].key );
1968 					if (nfk != keys.end) {
1969 						std::deque< Standalone<VerUpdateRef> > updatesToSplit = std::move( shard->updates );
1970 
1971 						// This actor finishes committing the keys [keys.begin,nfk) that we already fetched.
1972 						// The remaining unfetched keys [nfk,keys.end) will become a separate AddingShard with its own fetchKeys.
1973 						shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
1974 						shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
1975 						shard = data->shards.rangeContaining( keys.begin ).value()->adding;
1976 						auto otherShard = data->shards.rangeContaining( nfk ).value()->adding;
1977 						keys = shard->keys;
1978 
1979 						// Split our prior updates.  The ones that apply to our new, restricted key range will go back into shard->updates,
1980 						// and the ones delivered to the new shard will be discarded because it is in WaitPrevious phase (hasn't chosen a fetchVersion yet).
1981 						// What we are doing here is expensive and could get more expensive if we started having many more blocks per shard. May need optimization in the future.
1982 						for(auto u = updatesToSplit.begin(); u != updatesToSplit.end(); ++u)
1983 							splitMutations( data->shards, *u );
1984 
1985 						TEST( true );
1986 						TEST( shard->updates.size() );
1987 						ASSERT( otherShard->updates.empty() );
1988 					}
1989 				}
1990 
1991 				this_block = Standalone<RangeResultRef>();
1992 
1993 				if (BUGGIFY) wait( delay( 1 ) );
1994 
1995 				break;
1996 			} catch (Error& e) {
1997 				TraceEvent("FKBlockFail", data->thisServerID).error(e,true).suppressFor(1.0).detail("FKID", interval.pairID);
1998 				if (e.code() == error_code_transaction_too_old){
1999 					TEST(true); // A storage server has forgotten the history data we are fetching
2000 					Version lastFV = fetchVersion;
2001 					fetchVersion = data->version.get();
2002 					isTooOld = false;
2003 
2004 					// Throw away deferred updates from before fetchVersion, since we don't need them to use blocks fetched at that version
2005 					while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front();
2006 
2007 					//FIXME: remove when we no longer support upgrades from 5.X
2008 					if(debug_getRangeRetries >= 100) {
2009 						data->cx->enableLocalityLoadBalance = false;
2010 					}
2011 
2012 					debug_getRangeRetries++;
2013 					if (debug_nextRetryToLog==debug_getRangeRetries){
2014 						debug_nextRetryToLog += std::min(debug_nextRetryToLog, 1024);
2015 						TraceEvent(SevWarn, "FetchPast", data->thisServerID).detail("TotalAttempts", debug_getRangeRetries).detail("FKID", interval.pairID).detail("V", lastFV).detail("N", fetchVersion).detail("E", data->version.get());
2016 					}
2017 				} else if (e.code() == error_code_future_version || e.code() == error_code_process_behind) {
2018 					TEST(true); // fetchKeys got future_version or process_behind, so there must be a huge storage lag somewhere.  Keep trying.
2019 				} else {
2020 					throw;
2021 				}
2022 				wait( delayJittered( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) );
2023 			}
2024 		}
2025 
2026 		//FIXME: remove when we no longer support upgrades from 5.X
2027 		data->cx->enableLocalityLoadBalance = true;
2028 
2029 		// We have completed the fetch and write of the data, now we wait for MVCC window to pass.
2030 		//  As we have finished this work, we will allow more work to start...
2031 		shard->fetchComplete.send(Void());
2032 
2033 		TraceEvent(SevDebug, "FKBeforeFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
2034 		// Directly commit()ing the IKVS would interfere with updateStorage, possibly resulting in an incomplete version being recovered.
2035 		// Instead we wait for the updateStorage loop to commit something (and consequently also what we have written)
2036 
2037 		wait( data->durableVersion.whenAtLeast( data->storageVersion()+1 ) );
2038 		holdingFKPL.release();
2039 
2040 		TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
2041 
2042 		// Wait to run during update(), after a new batch of versions is received from the tlog but before eager reads take place.
2043 		Promise<FetchInjectionInfo*> p;
2044 		data->readyFetchKeys.push_back( p );
2045 
2046 		FetchInjectionInfo* batch = wait( p.getFuture() );
2047 		TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID);
2048 
2049 		shard->phase = AddingShard::Waiting;
2050 
2051 		// Choose a transferredVersion.  This choice and timing ensure that
2052 		//   * The transferredVersion can be mutated in versionedData
2053 		//   * The transferredVersion isn't yet committed to storage (so we can write the availability status change)
2054 		//   * The transferredVersion is <= the version of any of the updates in batch, and if there is an equal version
2055 		//     its mutations haven't been processed yet
2056 		shard->transferredVersion = data->version.get() + 1;
2057 		//shard->transferredVersion = batch->changes[0].version;  //< FIXME: This obeys the documented properties, and seems "safer" because it never introduces extra versions into the data structure, but violates some ASSERTs currently
2058 		data->mutableData().createNewVersion( shard->transferredVersion );
2059 		ASSERT( shard->transferredVersion > data->storageVersion() );
2060 		ASSERT( shard->transferredVersion == data->data().getLatestVersion() );
2061 
2062 		TraceEvent(SevDebug, "FetchKeysHaveData", data->thisServerID).detail("FKID", interval.pairID)
2063 			.detail("Version", shard->transferredVersion).detail("StorageVersion", data->storageVersion());
2064 		validate(data);
2065 
2066 		// Put the updates that were collected during the FinalCommit phase into the batch at the transferredVersion.  Eager reads will be done
2067 		// for them by update(), and the mutations will come back through AddingShard::addMutations and be applied to versionedMap and mutationLog as normal.
2068 		// The lie about their version is acceptable because this shard will never be read at versions < transferredVersion
2069 		for(auto i=shard->updates.begin(); i!=shard->updates.end(); ++i) {
2070 			i->version = shard->transferredVersion;
2071 			batch->arena.dependsOn(i->arena());
2072 		}
2073 
2074 		int startSize = batch->changes.size();
2075 		TEST(startSize); //Adding fetch data to a batch which already has changes
2076 		batch->changes.resize( batch->changes.size()+shard->updates.size() );
2077 
2078 		//FIXME: pass the deque back rather than copy the data
2079 		std::copy( shard->updates.begin(), shard->updates.end(), batch->changes.begin()+startSize );
2080 		Version checkv = shard->transferredVersion;
2081 
2082 		for(auto b = batch->changes.begin()+startSize; b != batch->changes.end(); ++b ) {
2083 			ASSERT( b->version >= checkv );
2084 			checkv = b->version;
2085 			for(auto& m : b->mutations)
2086 				debugMutation("fetchKeysFinalCommitInject", batch->changes[0].version, m);
2087 		}
2088 
2089 		shard->updates.clear();
2090 
2091 		setAvailableStatus(data, keys, true); // keys will be available when getLatestVersion()==transferredVersion is durable
2092 
2093 		// Wait for the transferredVersion (and therefore the shard data) to be committed and durable.
2094 		wait( data->durableVersion.whenAtLeast( shard->transferredVersion ) );
2095 
2096 		ASSERT( data->shards[shard->keys.begin]->assigned() && data->shards[shard->keys.begin]->keys == shard->keys );  // We aren't changing whether the shard is assigned
2097 		data->newestAvailableVersion.insert(shard->keys, latestVersion);
2098 		shard->readWrite.send(Void());
2099 		data->addShard( ShardInfo::newReadWrite(shard->keys, data) );   // invalidates shard!
2100 		coalesceShards(data, keys);
2101 
2102 		validate(data);
2103 
2104 		++data->counters.fetchExecutingCount;
2105 		data->counters.fetchExecutingMS += 1000*(now() - executeStart);
2106 
2107 		TraceEvent(SevDebug, interval.end(), data->thisServerID);
2108 	} catch (Error &e){
2109 		TraceEvent(SevDebug, interval.end(), data->thisServerID).error(e, true).detail("Version", data->version.get());
2110 
2111 		if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
2112 			if (shard->phase < AddingShard::Waiting) {
2113 				data->storage.clearRange( keys );
2114 				data->byteSampleApplyClear( keys, invalidVersion );
2115 			} else {
2116 				ASSERT( data->data().getLatestVersion() > data->version.get() );
2117 				removeDataRange( data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, keys );
2118 				setAvailableStatus(data, keys, false);
2119 				// Prevent another, overlapping fetchKeys from entering the Fetching phase until data->data().getLatestVersion() is durable
2120 				data->newestDirtyVersion.insert( keys, data->data().getLatestVersion() );
2121 			}
2122 		}
2123 
2124 		TraceEvent(SevError, "FetchKeysError", data->thisServerID)
2125 			.error(e)
2126 			.detail("Elapsed", now()-startt)
2127 			.detail("KeyBegin", keys.begin)
2128 			.detail("KeyEnd",keys.end);
2129 		if (e.code() != error_code_actor_cancelled)
2130 			data->otherError.sendError(e);  // Kill the storage server.  Are there any recoverable errors?
2131 		throw; // goes nowhere
2132 	}
2133 
2134 	return Void();
2135 };
2136 
AddingShard(StorageServer * server,KeyRangeRef const & keys)2137 AddingShard::AddingShard( StorageServer* server, KeyRangeRef const& keys )
2138 	: server(server), keys(keys), transferredVersion(invalidVersion), phase(WaitPrevious)
2139 {
2140 	fetchClient = fetchKeys(server, this);
2141 }
2142 
addMutation(Version version,MutationRef const & mutation)2143 void AddingShard::addMutation( Version version, MutationRef const& mutation ){
2144 	if (mutation.type == mutation.ClearRange) {
2145 		ASSERT( keys.begin<=mutation.param1 && mutation.param2<=keys.end );
2146 	}
2147 	else if (isSingleKeyMutation((MutationRef::Type) mutation.type)) {
2148 		ASSERT( keys.contains(mutation.param1) );
2149 	}
2150 
2151 	if (phase == WaitPrevious) {
2152 		// Updates can be discarded
2153 	} else if (phase == Fetching) {
2154 		if (!updates.size() || version > updates.end()[-1].version) {
2155 			VerUpdateRef v;
2156 			v.version = version;
2157 			v.isPrivateData = false;
2158 			updates.push_back(v);
2159 		} else {
2160 			ASSERT( version == updates.end()[-1].version );
2161 		}
2162 		updates.back().mutations.push_back_deep( updates.back().arena(), mutation );
2163 	} else if (phase == Waiting) {
2164 		server->addMutation(version, mutation, keys, server->updateEagerReads);
2165 	} else ASSERT(false);
2166 }
2167 
addMutation(Version version,MutationRef const & mutation)2168 void ShardInfo::addMutation(Version version, MutationRef const& mutation) {
2169 	ASSERT( (void *)this);
2170 	ASSERT( keys.contains( mutation.param1 ) );
2171 	if (adding)
2172 		adding->addMutation(version, mutation);
2173 	else if (readWrite)
2174 		readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads);
2175 	else if (mutation.type != MutationRef::ClearRange) {
2176 		TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation.toString());
2177 		ASSERT(false);  // Mutation delivered to notAssigned shard!
2178 	}
2179 }
2180 
2181 enum ChangeServerKeysContext { CSK_UPDATE, CSK_RESTORE };
2182 const char* changeServerKeysContextName[] = { "Update", "Restore" };
2183 
changeServerKeys(StorageServer * data,const KeyRangeRef & keys,bool nowAssigned,Version version,ChangeServerKeysContext context)2184 void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAssigned, Version version, ChangeServerKeysContext context ) {
2185 	ASSERT( !keys.empty() );
2186 
2187 	//TraceEvent("ChangeServerKeys", data->thisServerID)
2188 	//	.detail("KeyBegin", keys.begin)
2189 	//	.detail("KeyEnd", keys.end)
2190 	//	.detail("NowAssigned", nowAssigned)
2191 	//	.detail("Version", version)
2192 	//	.detail("Context", changeServerKeysContextName[(int)context]);
2193 	validate(data);
2194 
2195 	debugKeyRange( nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys );
2196 
2197 	bool isDifferent = false;
2198 	auto existingShards = data->shards.intersectingRanges(keys);
2199 	for( auto it = existingShards.begin(); it != existingShards.end(); ++it ) {
2200 		if( nowAssigned != it->value()->assigned() ) {
2201 			isDifferent = true;
2202 			/*TraceEvent("CSKRangeDifferent", data->thisServerID)
2203 			  .detail("KeyBegin", it->range().begin)
2204 			  .detail("KeyEnd", it->range().end);*/
2205 			break;
2206 		}
2207 	}
2208 	if( !isDifferent ) {
2209 		//TraceEvent("CSKShortCircuit", data->thisServerID)
2210 		//	.detail("KeyBegin", keys.begin)
2211 		//	.detail("KeyEnd", keys.end);
2212 		return;
2213 	}
2214 
2215 	// Save a backup of the ShardInfo references before we start messing with shards, in order to defer fetchKeys cancellation (and
2216 	// its potential call to removeDataRange()) until shards is again valid
2217 	vector< Reference<ShardInfo> > oldShards;
2218 	auto os = data->shards.intersectingRanges(keys);
2219 	for(auto r = os.begin(); r != os.end(); ++r)
2220 		oldShards.push_back( r->value() );
2221 
2222 	// As addShard (called below)'s documentation requires, reinitialize any overlapping range(s)
2223 	auto ranges = data->shards.getAffectedRangesAfterInsertion( keys, Reference<ShardInfo>() );  // null reference indicates the range being changed
2224 	for(int i=0; i<ranges.size(); i++) {
2225 		if (!ranges[i].value) {
2226 			ASSERT( (KeyRangeRef&)ranges[i] == keys ); // there shouldn't be any nulls except for the range being inserted
2227 		} else if (ranges[i].value->notAssigned())
2228 			data->addShard( ShardInfo::newNotAssigned(ranges[i]) );
2229 		else if (ranges[i].value->isReadable())
2230 			data->addShard( ShardInfo::newReadWrite(ranges[i], data) );
2231 		else {
2232 			ASSERT( ranges[i].value->adding );
2233 			data->addShard( ShardInfo::newAdding( data, ranges[i] ) );
2234 			TEST( true );	// ChangeServerKeys reFetchKeys
2235 		}
2236 	}
2237 
2238 	// Shard state depends on nowAssigned and whether the data is available (actually assigned in memory or on the disk) up to the given
2239 	// version.  The latter depends on data->newestAvailableVersion, so loop over the ranges of that.
2240 	// SOMEDAY: Could this just use shards?  Then we could explicitly do the removeDataRange here when an adding/transferred shard is cancelled
2241 	auto vr = data->newestAvailableVersion.intersectingRanges(keys);
2242 	vector<std::pair<KeyRange,Version>> changeNewestAvailable;
2243 	vector<KeyRange> removeRanges;
2244 	for (auto r = vr.begin(); r != vr.end(); ++r) {
2245 		KeyRangeRef range = keys & r->range();
2246 		bool dataAvailable = r->value()==latestVersion || r->value() >= version;
2247 		/*TraceEvent("CSKRange", data->thisServerID)
2248 			.detail("KeyBegin", range.begin)
2249 			.detail("KeyEnd", range.end)
2250 			.detail("Available", dataAvailable)
2251 			.detail("NowAssigned", nowAssigned)
2252 			.detail("NewestAvailable", r->value())
2253 			.detail("ShardState0", data->shards[range.begin]->debugDescribeState());*/
2254 		if (!nowAssigned) {
2255 			if (dataAvailable) {
2256 				ASSERT( r->value() == latestVersion);  // Not that we care, but this used to be checked instead of dataAvailable
2257 				ASSERT( data->mutableData().getLatestVersion() > version || context == CSK_RESTORE );
2258 				changeNewestAvailable.push_back(make_pair(range, version));
2259 				removeRanges.push_back( range );
2260 			}
2261 			data->addShard( ShardInfo::newNotAssigned(range) );
2262 			data->watches.triggerRange( range.begin, range.end );
2263 		} else if (!dataAvailable) {
2264 			// SOMEDAY: Avoid restarting adding/transferred shards
2265 			if (version==0){ // bypass fetchkeys; shard is known empty at version 0
2266 				changeNewestAvailable.push_back(make_pair(range, latestVersion));
2267 				data->addShard( ShardInfo::newReadWrite(range, data) );
2268 				setAvailableStatus(data, range, true);
2269 			} else {
2270 				auto& shard = data->shards[range.begin];
2271 				if( !shard->assigned() || shard->keys != range )
2272 					data->addShard( ShardInfo::newAdding(data, range) );
2273 			}
2274 		} else {
2275 			changeNewestAvailable.push_back(make_pair(range, latestVersion));
2276 			data->addShard( ShardInfo::newReadWrite(range, data) );
2277 		}
2278 	}
2279 	// Update newestAvailableVersion when a shard becomes (un)available (in a separate loop to avoid invalidating vr above)
2280 	for(auto r = changeNewestAvailable.begin(); r != changeNewestAvailable.end(); ++r)
2281 		data->newestAvailableVersion.insert( r->first, r->second );
2282 
2283 	if (!nowAssigned)
2284 		data->metrics.notifyNotReadable( keys );
2285 
2286 	coalesceShards( data, KeyRangeRef(ranges[0].begin, ranges[ranges.size()-1].end) );
2287 
2288 	// Now it is OK to do removeDataRanges, directly and through fetchKeys cancellation (and we have to do so before validate())
2289 	oldShards.clear();
2290 	ranges.clear();
2291 	for(auto r=removeRanges.begin(); r!=removeRanges.end(); ++r) {
2292 		removeDataRange( data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r );
2293 		setAvailableStatus(data, *r, false);
2294 	}
2295 	validate(data);
2296 }
2297 
rollback(StorageServer * data,Version rollbackVersion,Version nextVersion)2298 void rollback( StorageServer* data, Version rollbackVersion, Version nextVersion ) {
2299 	TEST(true); // call to shard rollback
2300 	debugKeyRange("Rollback", rollbackVersion, allKeys);
2301 
2302 	// We used to do a complicated dance to roll back in MVCC history.  It's much simpler, and more testable,
2303 	// to simply restart the storage server actor and restore from the persistent disk state, and then roll
2304 	// forward from the TLog's history.  It's not quite as efficient, but we rarely have to do this in practice.
2305 
2306 	// FIXME: This code is relying for liveness on an undocumented property of the log system implementation: that after a rollback the rolled back versions will
2307 	// eventually be missing from the peeked log.  A more sophisticated approach would be to make the rollback range durable and, after reboot, skip over
2308 	// those versions if they appear in peek results.
2309 
2310 	throw please_reboot();
2311 }
2312 
addMutation(Version version,MutationRef const & mutation,KeyRangeRef const & shard,UpdateEagerReadInfo * eagerReads)2313 void StorageServer::addMutation(Version version, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads ) {
2314 	MutationRef expanded = mutation;
2315 	auto& mLog = addVersionToMutationLog(version);
2316 
2317 	if ( !expandMutation( expanded, data(), eagerReads, shard.end, mLog.arena()) ) {
2318 		return;
2319 	}
2320 	expanded = addMutationToMutationLog(mLog, expanded);
2321 	if (debugMutation("expandedMutation", version, expanded)) {
2322 		const char* type =
2323 			mutation.type == MutationRef::SetValue ? "SetValue" :
2324 			mutation.type == MutationRef::ClearRange ? "ClearRange" :
2325 			mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" :
2326 			mutation.type == MutationRef::DebugKey ? "DebugKey" :
2327 			"UnknownMutation";
2328 		printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%lld\t%s\t%s\t%s\n", now(), g_network->getLocalAddress().toString().c_str(), "originalMutation", version, type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str());
2329 		printf("  shard: %s - %s\n", printable(shard.begin).c_str(), printable(shard.end).c_str());
2330 		if (mutation.type == MutationRef::ClearRange && mutation.param2 != shard.end)
2331 			printf("  eager: %s\n", printable( eagerReads->getKeyEnd( mutation.param2 ) ).c_str() );
2332 	}
2333 	applyMutation( this, expanded, mLog.arena(), mutableData() );
2334 }
2335 
2336 struct OrderByVersion {
operator ()OrderByVersion2337 	bool operator()( const VersionUpdateRef& a, const VersionUpdateRef& b ) {
2338 		if (a.version != b.version) return a.version < b.version;
2339 		if (a.isPrivateData != b.isPrivateData) return a.isPrivateData;
2340 		return false;
2341 	}
2342 };
2343 
2344 #define PERSIST_PREFIX "\xff\xff"
2345 
2346 // Immutable
2347 static const KeyValueRef persistFormat( LiteralStringRef( PERSIST_PREFIX "Format" ), LiteralStringRef("FoundationDB/StorageServer/1/4") );
2348 static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/StorageServer/1/2"), LiteralStringRef("FoundationDB/StorageServer/1/5") );
2349 static const KeyRef persistID = LiteralStringRef( PERSIST_PREFIX "ID" );
2350 
2351 // (Potentially) change with the durable version or when fetchKeys completes
2352 static const KeyRef persistVersion = LiteralStringRef( PERSIST_PREFIX "Version" );
2353 static const KeyRangeRef persistShardAssignedKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAssigned/" ), LiteralStringRef( PERSIST_PREFIX "ShardAssigned0" ) );
2354 static const KeyRangeRef persistShardAvailableKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAvailable/" ), LiteralStringRef( PERSIST_PREFIX "ShardAvailable0" ) );
2355 static const KeyRangeRef persistByteSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS0" ) );
2356 static const KeyRangeRef persistByteSampleSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0" ) );
2357 static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
2358 static const KeyRef persistPrimaryLocality = LiteralStringRef( PERSIST_PREFIX "PrimaryLocality" );
2359 // data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
2360 
2361 class StorageUpdater {
2362 public:
StorageUpdater()2363 	StorageUpdater() : fromVersion(invalidVersion), currentVersion(invalidVersion), restoredVersion(invalidVersion), processedStartKey(false) {}
StorageUpdater(Version fromVersion,Version restoredVersion)2364 	StorageUpdater(Version fromVersion, Version restoredVersion) : fromVersion(fromVersion), currentVersion(fromVersion), restoredVersion(restoredVersion), processedStartKey(false) {}
2365 
applyMutation(StorageServer * data,MutationRef const & m,Version ver)2366 	void applyMutation(StorageServer* data, MutationRef const& m, Version ver) {
2367 		//TraceEvent("SSNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver);
2368 
2369 		if(currentVersion != ver) {
2370 			fromVersion = currentVersion;
2371 			currentVersion = ver;
2372 			data->mutableData().createNewVersion(ver);
2373 		}
2374 
2375 		if (m.param1.startsWith( systemKeys.end )) {
2376 			//TraceEvent("PrivateData", data->thisServerID).detail("Mutation", m.toString()).detail("Version", ver);
2377 			applyPrivateData( data, m );
2378 		} else {
2379 			// FIXME: enable when debugMutation is active
2380 			//for(auto m = changes[c].mutations.begin(); m; ++m) {
2381 			//	debugMutation("SSUpdateMutation", changes[c].version, *m);
2382 			//}
2383 
2384 			splitMutation( data->shards, m, ver );
2385 		}
2386 
2387 		if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
2388 	}
2389 
2390 	Version currentVersion;
2391 private:
2392 	Version fromVersion;
2393 	Version restoredVersion;
2394 
2395 	KeyRef startKey;
2396 	bool nowAssigned;
2397 	bool processedStartKey;
2398 
applyPrivateData(StorageServer * data,MutationRef const & m)2399 	void applyPrivateData( StorageServer* data, MutationRef const& m ) {
2400 		TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m.toString());
2401 
2402 		if (processedStartKey) {
2403 			// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
2404 			// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same keys
2405 			ASSERT (m.type == MutationRef::SetValue && m.param1.startsWith(data->sk));
2406 			KeyRangeRef keys( startKey.removePrefix( data->sk ), m.param1.removePrefix( data->sk ));
2407 
2408 			// add changes in shard assignment to the mutation log
2409 			setAssignedStatus( data, keys, nowAssigned );
2410 
2411 			// The changes for version have already been received (and are being processed now).  We need
2412 			// to fetch the data for change.version-1 (changes from versions < change.version)
2413 			changeServerKeys( data, keys, nowAssigned, currentVersion-1, CSK_UPDATE );
2414 			processedStartKey = false;
2415 		} else if (m.type == MutationRef::SetValue && m.param1.startsWith( data->sk )) {
2416 			// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
2417 			// We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same keys
2418 			startKey = m.param1;
2419 			nowAssigned = m.param2 != serverKeysFalse;
2420 			processedStartKey = true;
2421 		} else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
2422 			// lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)
2423 			// That means we don't have to worry about the impact on changeServerKeys
2424 			//ASSERT( /*isFirstVersionUpdateFromTLog && */!std::next(it) );
2425 
2426 			Version rollbackVersion;
2427 			BinaryReader br(m.param2, Unversioned());
2428 			br >> rollbackVersion;
2429 
2430 			if ( rollbackVersion < fromVersion && rollbackVersion > restoredVersion ) {
2431 				TEST( true );  // ShardApplyPrivateData shard rollback
2432 				TraceEvent(SevWarn, "Rollback", data->thisServerID)
2433 					.detail("FromVersion", fromVersion)
2434 					.detail("ToVersion", rollbackVersion)
2435 					.detail("AtVersion", currentVersion)
2436 					.detail("StorageVersion", data->storageVersion());
2437 				ASSERT( rollbackVersion >= data->storageVersion() );
2438 				rollback( data, rollbackVersion, currentVersion );
2439 			}
2440 
2441 			data->recoveryVersionSkips.push_back(std::make_pair(rollbackVersion, currentVersion - rollbackVersion));
2442 		} else if (m.type == MutationRef::SetValue && m.param1 == killStoragePrivateKey) {
2443 			throw worker_removed();
2444 		} else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) && m.param1.substr(1).startsWith(serverTagPrefix)) {
2445 			bool matchesThisServer = decodeServerTagKey(m.param1.substr(1)) == data->thisServerID;
2446 			if( (m.type == MutationRef::SetValue && !matchesThisServer) || (m.type == MutationRef::ClearRange && matchesThisServer) )
2447 				throw worker_removed();
2448 		} else if (m.type == MutationRef::SetValue && m.param1 == rebootWhenDurablePrivateKey) {
2449 			data->rebootAfterDurableVersion = currentVersion;
2450 			TraceEvent("RebootWhenDurableSet", data->thisServerID).detail("DurableVersion", data->durableVersion.get()).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
2451 		} else if (m.type == MutationRef::SetValue && m.param1 == primaryLocalityPrivateKey) {
2452 			data->primaryLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
2453 			auto& mLV = data->addVersionToMutationLog( data->data().getLatestVersion() );
2454 			data->addMutationToMutationLog( mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2) );
2455 		} else {
2456 			ASSERT(false);  // Unknown private mutation
2457 		}
2458 	}
2459 };
2460 
update(StorageServer * data,bool * pReceivedUpdate)2461 ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
2462 {
2463 	state double start;
2464 	try {
2465 		// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
2466 		// This is often referred to as the storage server e-brake (emergency brake)
2467 		state double waitStartT = 0;
2468 		while ( data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES && data->durableVersion.get() < data->desiredOldestVersion.get() ) {
2469 			if (now() - waitStartT >= 1) {
2470 				TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
2471 					.detail("Version", data->version.get())
2472 					.detail("DurableVersion", data->durableVersion.get());
2473 				waitStartT = now();
2474 			}
2475 
2476 			data->behind = true;
2477 			wait( delayJittered(.005, TaskTLogPeekReply) );
2478 		}
2479 
2480 		while( data->byteSampleClearsTooLarge.get() ) {
2481 			wait( data->byteSampleClearsTooLarge.onChange() );
2482 		}
2483 
2484 		state Reference<ILogSystem::IPeekCursor> cursor = data->logCursor;
2485 		//TraceEvent("SSUpdatePeeking", data->thisServerID).detail("MyVer", data->version.get()).detail("Epoch", data->updateEpoch).detail("Seq", data->updateSequence);
2486 
2487 		loop {
2488 			wait( cursor->getMore() );
2489 			if(!cursor->isExhausted()) {
2490 				break;
2491 			}
2492 		}
2493 		if(cursor->popped() > 0)
2494 			throw worker_removed();
2495 
2496 		++data->counters.updateBatches;
2497 		data->lastTLogVersion = cursor->getMaxKnownVersion();
2498 		data->versionLag = std::max<int64_t>(0, data->lastTLogVersion - data->version.get());
2499 
2500 		ASSERT(*pReceivedUpdate == false);
2501 		*pReceivedUpdate = true;
2502 
2503 		start = now();
2504 		wait( data->durableVersionLock.take(TaskTLogPeekReply,1) );
2505 		state FlowLock::Releaser holdingDVL( data->durableVersionLock );
2506 		if(now() - start > 0.1)
2507 			TraceEvent("SSSlowTakeLock1", data->thisServerID).detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken).detail("Duration", now() - start).detail("Version", data->version.get());
2508 
2509 		start = now();
2510 		state UpdateEagerReadInfo eager;
2511 		state FetchInjectionInfo fii;
2512 		state Reference<ILogSystem::IPeekCursor> cloneCursor2;
2513 
2514 		loop{
2515 			state uint64_t changeCounter = data->shardChangeCounter;
2516 			bool epochEnd = false;
2517 			bool hasPrivateData = false;
2518 			bool firstMutation = true;
2519 			bool dbgLastMessageWasProtocol = false;
2520 
2521 			Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
2522 			cloneCursor2 = cursor->cloneNoMore();
2523 
2524 			cloneCursor1->setProtocolVersion(data->logProtocol);
2525 
2526 			for (; cloneCursor1->hasMessage(); cloneCursor1->nextMessage()) {
2527 				ArenaReader& cloneReader = *cloneCursor1->reader();
2528 
2529 				if (LogProtocolMessage::isNextIn(cloneReader)) {
2530 					LogProtocolMessage lpm;
2531 					cloneReader >> lpm;
2532 					dbgLastMessageWasProtocol = true;
2533 					cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
2534 				}
2535 				else {
2536 					MutationRef msg;
2537 					cloneReader >> msg;
2538 
2539 					if (firstMutation && msg.param1.startsWith(systemKeys.end))
2540 						hasPrivateData = true;
2541 					firstMutation = false;
2542 
2543 					if (msg.param1 == lastEpochEndPrivateKey) {
2544 						epochEnd = true;
2545 						ASSERT(dbgLastMessageWasProtocol);
2546 					}
2547 
2548 					eager.addMutation(msg);
2549 					dbgLastMessageWasProtocol = false;
2550 				}
2551 			}
2552 
2553 			// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
2554 			// If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in the middle of a rolled back version range.
2555 			while(!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
2556 				auto fk = data->readyFetchKeys.back();
2557 				data->readyFetchKeys.pop_back();
2558 				fk.send( &fii );
2559 			}
2560 
2561 			for(auto& c : fii.changes)
2562 				eager.addMutations(c.mutations);
2563 
2564 			wait( doEagerReads( data, &eager ) );
2565 			if (data->shardChangeCounter == changeCounter) break;
2566 			TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated.  Read it again.
2567 			// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads only selectively
2568 			eager = UpdateEagerReadInfo();
2569 		}
2570 
2571 		if(now() - start > 0.1)
2572 			TraceEvent("SSSlowTakeLock2", data->thisServerID).detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken).detail("Duration", now() - start).detail("Version", data->version.get());
2573 
2574 		data->updateEagerReads = &eager;
2575 		data->debug_inApplyUpdate = true;
2576 
2577 		state StorageUpdater updater(data->lastVersionWithData, data->restoredVersion);
2578 
2579 		if (EXPENSIVE_VALIDATION) data->data().atLatest().validate();
2580 		validate(data);
2581 
2582 		state bool injectedChanges = false;
2583 		state int changeNum = 0;
2584 		state int mutationBytes = 0;
2585 		for(; changeNum < fii.changes.size(); changeNum++) {
2586 			state int mutationNum = 0;
2587 			state VerUpdateRef* pUpdate = &fii.changes[changeNum];
2588 			for(; mutationNum < pUpdate->mutations.size(); mutationNum++) {
2589 				updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
2590 				mutationBytes += pUpdate->mutations[mutationNum].totalSize();
2591 				injectedChanges = true;
2592 				if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
2593 					mutationBytes = 0;
2594 					wait(delay(SERVER_KNOBS->UPDATE_DELAY));
2595 				}
2596 			}
2597 		}
2598 
2599 		state Version ver = invalidVersion;
2600 		cloneCursor2->setProtocolVersion(data->logProtocol);
2601 		//TraceEvent("SSUpdatePeeked", data->thisServerID).detail("FromEpoch", data->updateEpoch).detail("FromSeq", data->updateSequence).detail("ToEpoch", results.end_epoch).detail("ToSeq", results.end_seq).detail("MsgSize", results.messages.size());
2602 		for (;cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
2603 			if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
2604 				mutationBytes = 0;
2605 				//Instead of just yielding, leave time for the storage server to respond to reads
2606 				wait(delay(SERVER_KNOBS->UPDATE_DELAY));
2607 			}
2608 
2609 			if (cloneCursor2->version().version > ver) {
2610 				ASSERT(cloneCursor2->version().version > data->version.get());
2611 			}
2612 
2613 			auto &rd = *cloneCursor2->reader();
2614 
2615 			if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) {
2616 				++data->counters.updateVersions;
2617 				ver = cloneCursor2->version().version;
2618 			}
2619 
2620 			if (LogProtocolMessage::isNextIn(rd)) {
2621 				LogProtocolMessage lpm;
2622 				rd >> lpm;
2623 
2624 				data->logProtocol = rd.protocolVersion();
2625 				data->storage.changeLogProtocol(ver, data->logProtocol);
2626 				cloneCursor2->setProtocolVersion(rd.protocolVersion());
2627 			}
2628 			else {
2629 				MutationRef msg;
2630 				rd >> msg;
2631 
2632 				if (ver != invalidVersion) {  // This change belongs to a version < minVersion
2633 					if (debugMutation("SSPeek", ver, msg) || ver == 1)
2634 						TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
2635 
2636 					updater.applyMutation(data, msg, ver);
2637 					mutationBytes += msg.totalSize();
2638 					data->counters.mutationBytes += msg.totalSize();
2639 					++data->counters.mutations;
2640 					switch(msg.type) {
2641 						case MutationRef::SetValue:
2642 							++data->counters.setMutations;
2643 							break;
2644 						case MutationRef::ClearRange:
2645 							++data->counters.clearRangeMutations;
2646 							break;
2647 						case MutationRef::AddValue:
2648 						case MutationRef::And:
2649 						case MutationRef::AndV2:
2650 						case MutationRef::AppendIfFits:
2651 						case MutationRef::ByteMax:
2652 						case MutationRef::ByteMin:
2653 						case MutationRef::Max:
2654 						case MutationRef::Min:
2655 						case MutationRef::MinV2:
2656 						case MutationRef::Or:
2657 						case MutationRef::Xor:
2658 						case MutationRef::CompareAndClear:
2659 							++data->counters.atomicMutations;
2660 							break;
2661 					}
2662 				}
2663 				else
2664 					TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
2665 			}
2666 		}
2667 
2668 		if(ver != invalidVersion) {
2669 			data->lastVersionWithData = ver;
2670 		} else {
2671 			ver = cloneCursor2->version().version - 1;
2672 		}
2673 		if(injectedChanges) data->lastVersionWithData = ver;
2674 
2675 		data->updateEagerReads = NULL;
2676 		data->debug_inApplyUpdate = false;
2677 
2678 		if(ver == invalidVersion && !fii.changes.empty() ) {
2679 			ver = updater.currentVersion;
2680 		}
2681 
2682 		if(ver != invalidVersion && ver > data->version.get()) {
2683 			debugKeyRange("SSUpdate", ver, allKeys);
2684 
2685 			data->mutableData().createNewVersion(ver);
2686 			if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
2687 
2688 			data->noRecentUpdates.set(false);
2689 			data->lastUpdate = now();
2690 			data->version.set( ver );		// Triggers replies to waiting gets for new version(s)
2691 			if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
2692 
2693 			Version maxVersionsInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;
2694 			for(int i = 0; i < data->recoveryVersionSkips.size(); i++) {
2695 				maxVersionsInMemory += data->recoveryVersionSkips[i].second;
2696 			}
2697 
2698 			// Trigger updateStorage if necessary
2699 			Version proposedOldestVersion = std::max(data->version.get(), cursor->getMinKnownCommittedVersion()) - maxVersionsInMemory;
2700 			if(data->primaryLocality == tagLocalitySpecial || data->tag.locality == data->primaryLocality) {
2701 				proposedOldestVersion = std::max(proposedOldestVersion, data->lastTLogVersion - maxVersionsInMemory);
2702 			}
2703 			proposedOldestVersion = std::min(proposedOldestVersion, data->version.get()-1);
2704 			proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
2705 			proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());
2706 
2707 			//TraceEvent("StorageServerUpdated", data->thisServerID).detail("Ver", ver).detail("DataVersion", data->version.get())
2708 			//	.detail("LastTLogVersion", data->lastTLogVersion).detail("NewOldest", data->oldestVersion.get()).detail("DesiredOldest",data->desiredOldestVersion.get())
2709 			//	.detail("MaxVersionInMemory", maxVersionsInMemory).detail("Proposed", proposedOldestVersion).detail("PrimaryLocality", data->primaryLocality).detail("Tag", data->tag.toString());
2710 
2711 			while(!data->recoveryVersionSkips.empty() && proposedOldestVersion > data->recoveryVersionSkips.front().first) {
2712 				data->recoveryVersionSkips.pop_front();
2713 			}
2714 			data->desiredOldestVersion.set(proposedOldestVersion);
2715 
2716 		}
2717 
2718 		validate(data);
2719 
2720 		data->logCursor->advanceTo( cloneCursor2->version() );
2721 		if(cursor->version().version >= data->lastTLogVersion) {
2722 			if(data->behind) {
2723 				TraceEvent("StorageServerNoLongerBehind", data->thisServerID).detail("CursorVersion", cursor->version().version).detail("TLogVersion", data->lastTLogVersion);
2724 			}
2725 			data->behind = false;
2726 		}
2727 
2728 		return Void();  // update will get called again ASAP
2729 	} catch (Error& err) {
2730 		state Error e = err;
2731 		if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
2732 			TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
2733 		} else if (e.code() == error_code_please_reboot) {
2734 			wait( data->durableInProgress );
2735 		}
2736 		throw e;
2737 	}
2738 }
2739 
updateStorage(StorageServer * data)2740 ACTOR Future<Void> updateStorage(StorageServer* data) {
2741 	loop {
2742 		ASSERT( data->durableVersion.get() == data->storageVersion() );
2743 		wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
2744 		wait( delay(0, TaskUpdateStorage) );
2745 
2746 		state Promise<Void> durableInProgress;
2747 		data->durableInProgress = durableInProgress.getFuture();
2748 
2749 		state Version startOldestVersion = data->storageVersion();
2750 		state Version newOldestVersion = data->storageVersion();
2751 		state Version desiredVersion = data->desiredOldestVersion.get();
2752 		state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
2753 		loop {
2754 			state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
2755 			// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
2756 			// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
2757 			Future<Void> finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskUpdateStorage );
2758 			data->oldestVersion.set( newOldestVersion );
2759 			wait( finishedForgetting );
2760 			wait( yield(TaskUpdateStorage) );
2761 			if (done) break;
2762 		}
2763 
2764 		if (startOldestVersion != newOldestVersion)
2765 			data->storage.makeVersionDurable( newOldestVersion );
2766 
2767 		debug_advanceMaxCommittedVersion( data->thisServerID, newOldestVersion );
2768 		state Future<Void> durable = data->storage.commit();
2769 		state Future<Void> durableDelay = Void();
2770 
2771 		if (bytesLeft > 0)
2772 			durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL);
2773 
2774 		wait( durable );
2775 
2776 		debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
2777 
2778 		if(newOldestVersion > data->rebootAfterDurableVersion) {
2779 			TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("NewOldestVersion", newOldestVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
2780 			// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
2781 			// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
2782 			// Otherwise, in the race situation when storage server receives both reboot and
2783 			// brokenPromise of durableInProgress, the worker of the storage server will die.
2784 			// We will eventually end up with no worker for storage server role.
2785 			// The data distributor's buildTeam() will get stuck in building a team
2786 			durableInProgress.sendError(please_reboot());
2787 			throw please_reboot();
2788 		}
2789 
2790 		durableInProgress.send(Void());
2791 		wait( delay(0, TaskUpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
2792 
2793 		// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
2794 		// are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
2795 		// because otherwise the latest version of mutableData might be partially loaded.
2796 		wait( data->durableVersionLock.take() );
2797 		data->popVersion( data->durableVersion.get() + 1 );
2798 
2799 		while (!changeDurableVersion( data, newOldestVersion )) {
2800 			if(g_network->check_yield(TaskUpdateStorage)) {
2801 				data->durableVersionLock.release();
2802 				wait(delay(0, TaskUpdateStorage));
2803 				wait( data->durableVersionLock.take() );
2804 			}
2805 		}
2806 
2807 		data->durableVersionLock.release();
2808 
2809 		//TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion);
2810 
2811 		wait( durableDelay );
2812 	}
2813 }
2814 
2815 #pragma endregion
2816 
2817 ////////////////////////////////// StorageServerDisk ///////////////////////////////////////
2818 #pragma region StorageServerDisk
2819 
makeNewStorageServerDurable()2820 void StorageServerDisk::makeNewStorageServerDurable() {
2821 	storage->set( persistFormat );
2822 	storage->set( KeyValueRef(persistID, BinaryWriter::toValue(data->thisServerID, Unversioned())) );
2823 	storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(data->version.get(), Unversioned())) );
2824 	storage->set( KeyValueRef(persistShardAssignedKeys.begin.toString(), LiteralStringRef("0")) );
2825 	storage->set( KeyValueRef(persistShardAvailableKeys.begin.toString(), LiteralStringRef("0")) );
2826 }
2827 
setAvailableStatus(StorageServer * self,KeyRangeRef keys,bool available)2828 void setAvailableStatus( StorageServer* self, KeyRangeRef keys, bool available ) {
2829 	//ASSERT( self->debug_inApplyUpdate );
2830 	ASSERT( !keys.empty() );
2831 
2832 	auto& mLV = self->addVersionToMutationLog( self->data().getLatestVersion() );
2833 
2834 	KeyRange availableKeys = KeyRangeRef( persistShardAvailableKeys.begin.toString() + keys.begin.toString(), persistShardAvailableKeys.begin.toString() + keys.end.toString() );
2835 	//TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", availableKeys.begin).detail("RangeEnd", availableKeys.end);
2836 
2837 	self->addMutationToMutationLog( mLV, MutationRef( MutationRef::ClearRange, availableKeys.begin, availableKeys.end ) );
2838 	self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, availableKeys.begin, available ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2839 	if (keys.end != allKeys.end) {
2840 		bool endAvailable = self->shards.rangeContaining( keys.end )->value()->isInVersionedData();
2841 		self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, availableKeys.end, endAvailable ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2842 	}
2843 
2844 }
2845 
setAssignedStatus(StorageServer * self,KeyRangeRef keys,bool nowAssigned)2846 void setAssignedStatus( StorageServer* self, KeyRangeRef keys, bool nowAssigned ) {
2847 	ASSERT( !keys.empty() );
2848 	auto& mLV = self->addVersionToMutationLog( self->data().getLatestVersion() );
2849 	KeyRange assignedKeys = KeyRangeRef(
2850 		persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
2851 		persistShardAssignedKeys.begin.toString() + keys.end.toString() );
2852 	//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
2853 	self->addMutationToMutationLog( mLV, MutationRef( MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end ) );
2854 	self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.begin,
2855 			nowAssigned ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2856 	if (keys.end != allKeys.end) {
2857 		bool endAssigned = self->shards.rangeContaining( keys.end )->value()->assigned();
2858 		self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.end, endAssigned ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2859 	}
2860 }
2861 
clearRange(KeyRangeRef keys)2862 void StorageServerDisk::clearRange( KeyRangeRef keys ) {
2863 	storage->clear(keys);
2864 }
2865 
writeKeyValue(KeyValueRef kv)2866 void StorageServerDisk::writeKeyValue( KeyValueRef kv ) {
2867 	storage->set( kv );
2868 }
2869 
writeMutation(MutationRef mutation)2870 void StorageServerDisk::writeMutation( MutationRef mutation ) {
2871 	// FIXME: debugMutation(debugContext, debugVersion, *m);
2872 	if (mutation.type == MutationRef::SetValue) {
2873 		storage->set( KeyValueRef(mutation.param1, mutation.param2) );
2874 	} else if (mutation.type == MutationRef::ClearRange) {
2875 		storage->clear( KeyRangeRef(mutation.param1, mutation.param2) );
2876 	} else
2877 		ASSERT(false);
2878 }
2879 
writeMutations(MutationListRef mutations,Version debugVersion,const char * debugContext)2880 void StorageServerDisk::writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext ) {
2881 	for(auto m = mutations.begin(); m; ++m) {
2882 		debugMutation(debugContext, debugVersion, *m);
2883 		if (m->type == MutationRef::SetValue) {
2884 			storage->set( KeyValueRef(m->param1, m->param2) );
2885 		} else if (m->type == MutationRef::ClearRange) {
2886 			storage->clear( KeyRangeRef(m->param1, m->param2) );
2887 		}
2888 	}
2889 }
2890 
makeVersionMutationsDurable(Version & prevStorageVersion,Version newStorageVersion,int64_t & bytesLeft)2891 bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft ) {
2892 	if (bytesLeft <= 0) return true;
2893 
2894 	// Apply mutations from the mutationLog
2895 	auto u = data->getMutationLog().upper_bound(prevStorageVersion);
2896 	if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
2897 		VersionUpdateRef const& v = u->second;
2898 		ASSERT( v.version > prevStorageVersion && v.version <= newStorageVersion );
2899 		debugKeyRange("makeVersionMutationsDurable", v.version, allKeys);
2900 		writeMutations(v.mutations, v.version, "makeVersionDurable");
2901 		for(auto m=v.mutations.begin(); m; ++m)
2902 			bytesLeft -= mvccStorageBytes(*m);
2903 		prevStorageVersion = v.version;
2904 		return false;
2905 	} else {
2906 		prevStorageVersion = newStorageVersion;
2907 		return true;
2908 	}
2909 }
2910 
2911 // Update data->storage to persist the changes from (data->storageVersion(),version]
makeVersionDurable(Version version)2912 void StorageServerDisk::makeVersionDurable( Version version ) {
2913 	storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())) );
2914 
2915 	//TraceEvent("MakeDurable", data->thisServerID).detail("FromVersion", prevStorageVersion).detail("ToVersion", version);
2916 }
2917 
changeLogProtocol(Version version,uint64_t protocol)2918 void StorageServerDisk::changeLogProtocol(Version version, uint64_t protocol) {
2919 	data->addMutationToMutationLogOrStorage(version, MutationRef(MutationRef::SetValue, persistLogProtocol, BinaryWriter::toValue(protocol, Unversioned())));
2920 }
2921 
applyByteSampleResult(StorageServer * data,IKeyValueStore * storage,Key begin,Key end,std::vector<Standalone<VectorRef<KeyValueRef>>> * results=NULL)2922 ACTOR Future<Void> applyByteSampleResult( StorageServer* data, IKeyValueStore* storage, Key begin, Key end, std::vector<Standalone<VectorRef<KeyValueRef>>>* results = NULL) {
2923 	state int totalFetches = 0;
2924 	state int totalKeys = 0;
2925 	state int totalBytes = 0;
2926 	loop {
2927 		Standalone<VectorRef<KeyValueRef>> bs = wait( storage->readRange( KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES ) );
2928 		if(results) results->push_back(bs);
2929 		int rangeSize = bs.expectedSize();
2930 		totalFetches++;
2931 		totalKeys += bs.size();
2932 		totalBytes += rangeSize;
2933 		for( int j = 0; j < bs.size(); j++ ) {
2934 			KeyRef key = bs[j].key.removePrefix(persistByteSampleKeys.begin);
2935 			if(!data->byteSampleClears.rangeContaining(key).value()) {
2936 				data->metrics.byteSample.sample.insert( key, BinaryReader::fromStringRef<int32_t>(bs[j].value, Unversioned()), false );
2937 			}
2938 		}
2939 		if( rangeSize >= SERVER_KNOBS->STORAGE_LIMIT_BYTES ) {
2940 			Key nextBegin = keyAfter(bs.back().key);
2941 			data->byteSampleClears.insert(KeyRangeRef(begin, nextBegin).removePrefix(persistByteSampleKeys.begin), true);
2942 			data->byteSampleClearsTooLarge.set(data->byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
2943 			begin = nextBegin;
2944 			if(begin == end) {
2945 				break;
2946 			}
2947 		} else {
2948 			data->byteSampleClears.insert(KeyRangeRef(begin.removePrefix(persistByteSampleKeys.begin), end == persistByteSampleKeys.end ? LiteralStringRef("\xff\xff\xff") : end.removePrefix(persistByteSampleKeys.begin)), true);
2949 			data->byteSampleClearsTooLarge.set(data->byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
2950 			break;
2951 		}
2952 
2953 		if(!results) {
2954 			wait(delay(SERVER_KNOBS->BYTE_SAMPLE_LOAD_DELAY));
2955 		}
2956 	}
2957 	TraceEvent("RecoveredByteSampleRange", data->thisServerID).detail("Begin", begin).detail("End", end).detail("Fetches", totalFetches).detail("Keys", totalKeys).detail("ReadBytes", totalBytes);
2958 	return Void();
2959 }
2960 
restoreByteSample(StorageServer * data,IKeyValueStore * storage,Promise<Void> byteSampleSampleRecovered)2961 ACTOR Future<Void> restoreByteSample(StorageServer* data, IKeyValueStore* storage, Promise<Void> byteSampleSampleRecovered) {
2962 	state std::vector<Standalone<VectorRef<KeyValueRef>>> byteSampleSample;
2963 	wait( applyByteSampleResult(data, storage, persistByteSampleSampleKeys.begin, persistByteSampleSampleKeys.end, &byteSampleSample) );
2964 	byteSampleSampleRecovered.send(Void());
2965 	wait( delay( BUGGIFY ? g_random->random01() * 2.0 : 0.0001 ) );
2966 
2967 	size_t bytes_per_fetch = 0;
2968 	// Since the expected size also includes (as of now) the space overhead of the container, we calculate our own number here
2969 	for( auto& it : byteSampleSample ) {
2970 		for( auto& kv : it ) {
2971 			bytes_per_fetch += BinaryReader::fromStringRef<int32_t>(kv.value, Unversioned());
2972 		}
2973 	}
2974 	bytes_per_fetch = (bytes_per_fetch/SERVER_KNOBS->BYTE_SAMPLE_LOAD_PARALLELISM) + 1;
2975 
2976 	state std::vector<Future<Void>> sampleRanges;
2977 	int accumulatedSize = 0;
2978 	Key lastStart = persistByteSampleKeys.begin; // make sure the first range starts at the absolute beginning of the byte sample
2979 	for( auto& it : byteSampleSample ) {
2980 		for( auto& kv : it ) {
2981 			if( accumulatedSize >= bytes_per_fetch ) {
2982 				accumulatedSize = 0;
2983 				Key realKey = kv.key.removePrefix(  persistByteSampleKeys.begin );
2984 				sampleRanges.push_back( applyByteSampleResult(data, storage, lastStart, realKey) );
2985 				lastStart = realKey;
2986 			}
2987 			accumulatedSize += BinaryReader::fromStringRef<int32_t>(kv.value, Unversioned());
2988 		}
2989 	}
2990 	// make sure that the last range goes all the way to the end of the byte sample
2991 	sampleRanges.push_back( applyByteSampleResult(data, storage, lastStart, persistByteSampleKeys.end) );
2992 
2993 	wait( waitForAll( sampleRanges ) );
2994 	TraceEvent("RecoveredByteSampleChunkedRead", data->thisServerID).detail("Ranges",sampleRanges.size());
2995 
2996 	if( BUGGIFY )
2997 		wait( delay( g_random->random01() * 10.0 ) );
2998 
2999 	return Void();
3000 }
3001 
restoreDurableState(StorageServer * data,IKeyValueStore * storage)3002 ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* storage ) {
3003 	state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
3004 	state Future<Optional<Value>> fID = storage->readValue(persistID);
3005 	state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
3006 	state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
3007 	state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
3008 	state Future<Standalone<VectorRef<KeyValueRef>>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
3009 	state Future<Standalone<VectorRef<KeyValueRef>>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
3010 
3011 	state Promise<Void> byteSampleSampleRecovered;
3012 	data->byteSampleRecovery = restoreByteSample(data, storage, byteSampleSampleRecovered);
3013 
3014 	TraceEvent("ReadingDurableState", data->thisServerID);
3015 	wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat, fID, fVersion, fLogProtocol, fPrimaryLocality) ) );
3016 	wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fShardAssigned, fShardAvailable) ) );
3017 	wait( byteSampleSampleRecovered.getFuture() );
3018 	TraceEvent("RestoringDurableState", data->thisServerID);
3019 
3020 	if (!fFormat.get().present()) {
3021 		// The DB was never initialized
3022 		TraceEvent("DBNeverInitialized", data->thisServerID);
3023 		storage->dispose();
3024 		data->thisServerID = UID();
3025 		data->sk = Key();
3026 		return false;
3027 	}
3028 	if (!persistFormatReadableRange.contains( fFormat.get().get() )) {
3029 		TraceEvent(SevError, "UnsupportedDBFormat").detail("Format", fFormat.get().get().toString()).detail("Expected", persistFormat.value.toString());
3030 		throw worker_recovery_failed();
3031 	}
3032 	data->thisServerID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned());
3033 	data->sk = serverKeysPrefixFor( data->thisServerID ).withPrefix(systemKeys.begin);  // FFFF/serverKeys/[this server]/
3034 
3035 	if (fLogProtocol.get().present())
3036 		data->logProtocol = BinaryReader::fromStringRef<uint64_t>(fLogProtocol.get().get(), Unversioned());
3037 
3038 	if (fPrimaryLocality.get().present())
3039 		data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned());
3040 
3041 	state Version version = BinaryReader::fromStringRef<Version>( fVersion.get().get(), Unversioned() );
3042 	debug_checkRestoredVersion( data->thisServerID, version, "StorageServer" );
3043 	data->setInitialVersion( version );
3044 
3045 	state Standalone<VectorRef<KeyValueRef>> available = fShardAvailable.get();
3046 	state int availableLoc;
3047 	for(availableLoc=0; availableLoc<available.size(); availableLoc++) {
3048 		KeyRangeRef keys(
3049 			available[availableLoc].key.removePrefix(persistShardAvailableKeys.begin),
3050 			availableLoc+1==available.size() ? allKeys.end : available[availableLoc+1].key.removePrefix(persistShardAvailableKeys.begin));
3051 		ASSERT( !keys.empty() );
3052 		bool nowAvailable = available[availableLoc].value!=LiteralStringRef("0");
3053 		/*if(nowAvailable)
3054 		  TraceEvent("AvailableShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/
3055 		data->newestAvailableVersion.insert( keys, nowAvailable ? latestVersion : invalidVersion );
3056 		wait(yield());
3057 	}
3058 
3059 	state Standalone<VectorRef<KeyValueRef>> assigned = fShardAssigned.get();
3060 	state int assignedLoc;
3061 	for(assignedLoc=0; assignedLoc<assigned.size(); assignedLoc++) {
3062 		KeyRangeRef keys(
3063 			assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin),
3064 			assignedLoc+1==assigned.size() ? allKeys.end : assigned[assignedLoc+1].key.removePrefix(persistShardAssignedKeys.begin));
3065 		ASSERT( !keys.empty() );
3066 		bool nowAssigned = assigned[assignedLoc].value!=LiteralStringRef("0");
3067 		/*if(nowAssigned)
3068 		  TraceEvent("AssignedShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/
3069 		changeServerKeys(data, keys, nowAssigned, version, CSK_RESTORE);
3070 
3071 		if (!nowAssigned) ASSERT( data->newestAvailableVersion.allEqual(keys, invalidVersion) );
3072 		wait(yield());
3073 	}
3074 
3075 	wait( delay( 0.0001 ) );
3076 
3077 	{
3078 		// Erase data which isn't available (it is from some fetch at a later version)
3079 		// SOMEDAY: Keep track of keys that might be fetching, make sure we don't have any data elsewhere?
3080 		for(auto it = data->newestAvailableVersion.ranges().begin(); it != data->newestAvailableVersion.ranges().end(); ++it) {
3081 			if (it->value() == invalidVersion) {
3082 				KeyRangeRef clearRange(it->begin(), it->end());
3083 				debugKeyRange("clearInvalidVersion", invalidVersion, clearRange);
3084 				storage->clear( clearRange );
3085 				data->byteSampleApplyClear( clearRange, invalidVersion );
3086 			}
3087 		}
3088 	}
3089 
3090 	validate(data, true);
3091 
3092 	return true;
3093 }
3094 
restoreDurableState()3095 Future<bool> StorageServerDisk::restoreDurableState() {
3096 	return ::restoreDurableState(data, storage);
3097 }
3098 
3099 //Determines whether a key-value pair should be included in a byte sample
3100 //Also returns size information about the sample
isKeyValueInSample(KeyValueRef keyValue)3101 ByteSampleInfo isKeyValueInSample(KeyValueRef keyValue) {
3102 	ByteSampleInfo info;
3103 
3104 	const KeyRef key = keyValue.key;
3105 	info.size = key.size() + keyValue.value.size();
3106 
3107 	uint32_t a = 0;
3108 	uint32_t b = 0;
3109 	hashlittle2( key.begin(), key.size(), &a, &b );
3110 
3111 	double probability = (double)info.size / (key.size() + SERVER_KNOBS->BYTE_SAMPLING_OVERHEAD) / SERVER_KNOBS->BYTE_SAMPLING_FACTOR;
3112 	info.inSample = a / ((1 << 30) * 4.0) < probability;
3113 	info.sampledSize = info.size / std::min(1.0, probability);
3114 
3115 	return info;
3116 }
3117 
addMutationToMutationLogOrStorage(Version ver,MutationRef m)3118 void StorageServer::addMutationToMutationLogOrStorage( Version ver, MutationRef m ) {
3119 	if (ver != invalidVersion) {
3120 		addMutationToMutationLog( addVersionToMutationLog(ver), m );
3121 	} else {
3122 		storage.writeMutation( m );
3123 		byteSampleApplyMutation( m, ver );
3124 	}
3125 }
3126 
byteSampleApplySet(KeyValueRef kv,Version ver)3127 void StorageServer::byteSampleApplySet( KeyValueRef kv, Version ver ) {
3128 	// Update byteSample in memory and (eventually) on disk and notify waiting metrics
3129 
3130 	ByteSampleInfo sampleInfo = isKeyValueInSample(kv);
3131 	auto& byteSample = metrics.byteSample.sample;
3132 
3133 	int64_t delta = 0;
3134 	const KeyRef key = kv.key;
3135 
3136 	auto old = byteSample.find(key);
3137 	if (old != byteSample.end()) delta = -byteSample.getMetric(old);
3138 	if (sampleInfo.inSample) {
3139 		delta += sampleInfo.sampledSize;
3140 		byteSample.insert( key, sampleInfo.sampledSize );
3141 		addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::SetValue, key.withPrefix(persistByteSampleKeys.begin), BinaryWriter::toValue( sampleInfo.sampledSize, Unversioned() )) );
3142 	} else {
3143 		bool any = old != byteSample.end();
3144 		if(!byteSampleRecovery.isReady() ) {
3145 			if(!byteSampleClears.rangeContaining(key).value()) {
3146 				byteSampleClears.insert(key, true);
3147 				byteSampleClearsTooLarge.set(byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
3148 				any = true;
3149 			}
3150 		}
3151 		if (any) {
3152 			byteSample.erase(old);
3153 			auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin));
3154 			addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end) );
3155 		}
3156 	}
3157 
3158 	if (delta) metrics.notifyBytes( key, delta );
3159 }
3160 
byteSampleApplyClear(KeyRangeRef range,Version ver)3161 void StorageServer::byteSampleApplyClear( KeyRangeRef range, Version ver ) {
3162 	// Update byteSample in memory and (eventually) on disk via the mutationLog and notify waiting metrics
3163 
3164 	auto& byteSample = metrics.byteSample.sample;
3165 	bool any = false;
3166 
3167 	if(range.begin < allKeys.end) {
3168 		//NotifyBytes should not be called for keys past allKeys.end
3169 		KeyRangeRef searchRange = KeyRangeRef(range.begin, std::min(range.end, allKeys.end));
3170 		auto r = metrics.waitMetricsMap.intersectingRanges(searchRange);
3171 		for(auto shard = r.begin(); shard != r.end(); ++shard) {
3172 			KeyRangeRef intersectingRange = shard.range() & range;
3173 			int64_t bytes = byteSample.sumRange(intersectingRange.begin, intersectingRange.end);
3174 			metrics.notifyBytes(shard, -bytes);
3175 			any = any || bytes > 0;
3176 		}
3177 	}
3178 
3179 	if(range.end > allKeys.end && byteSample.sumRange(std::max(allKeys.end, range.begin), range.end) > 0)
3180 		any = true;
3181 
3182 	if(!byteSampleRecovery.isReady()) {
3183 		auto clearRanges = byteSampleClears.intersectingRanges(range);
3184 		for(auto it : clearRanges) {
3185 			if(!it.value()) {
3186 				byteSampleClears.insert(range, true);
3187 				byteSampleClearsTooLarge.set(byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
3188 				any = true;
3189 				break;
3190 			}
3191 		}
3192 	}
3193 
3194 	if (any) {
3195 		byteSample.eraseAsync( range.begin, range.end );
3196 		auto diskRange = range.withPrefix( persistByteSampleKeys.begin );
3197 		addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end) );
3198 	}
3199 }
3200 
waitMetrics(StorageServerMetrics * self,WaitMetricsRequest req,Future<Void> timeout)3201 ACTOR Future<Void> waitMetrics( StorageServerMetrics* self, WaitMetricsRequest req, Future<Void> timeout ) {
3202 	state PromiseStream< StorageMetrics > change;
3203 	state StorageMetrics metrics = self->getMetrics( req.keys );
3204 	state Error error = success();
3205 	state bool timedout = false;
3206 
3207 	if ( !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
3208 		TEST( true ); // ShardWaitMetrics return case 1 (quickly)
3209 		req.reply.send( metrics );
3210 		return Void();
3211 	}
3212 
3213 	{
3214 		auto rs = self->waitMetricsMap.modify( req.keys );
3215 		for(auto r = rs.begin(); r != rs.end(); ++r)
3216 			r->value().push_back( change );
3217 		loop {
3218 			try {
3219 				choose {
3220 					when( StorageMetrics c = waitNext( change.getFuture() ) ) {
3221 						metrics += c;
3222 
3223 						// SOMEDAY: validation! The changes here are possibly partial changes (we recieve multiple messages per
3224 						//  update to our requested range). This means that the validation would have to occur after all
3225 						//  the messages for one clear or set have been dispatched.
3226 
3227 						/*StorageMetrics m = getMetrics( data, req.keys );
3228 						  bool b = ( m.bytes != metrics.bytes || m.bytesPerKSecond != metrics.bytesPerKSecond || m.iosPerKSecond != metrics.iosPerKSecond );
3229 						  if (b) {
3230 						  printf("keys: '%s' - '%s' @%p\n", printable(req.keys.begin).c_str(), printable(req.keys.end).c_str(), this);
3231 						  printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n", b, m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond, metrics.iosPerKSecond, c.bytes, c.bytesPerKSecond, c.iosPerKSecond);
3232 
3233 						  }*/
3234 					}
3235 					when( wait( timeout ) ) {
3236 						timedout = true;
3237 					}
3238 				}
3239 			} catch (Error& e) {
3240 				if( e.code() == error_code_actor_cancelled ) throw; // This is only cancelled when the main loop had exited...no need in this case to clean up self
3241 				error = e;
3242 				break;
3243 			}
3244 
3245 			if ( timedout || !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
3246 				TEST( !timedout ); // ShardWaitMetrics return case 2 (delayed)
3247 				TEST( timedout ); // ShardWaitMetrics return on timeout
3248 				req.reply.send( metrics );
3249 				break;
3250 			}
3251 		}
3252 
3253 		wait( delay(0) ); //prevent iterator invalidation of functions sending changes
3254 	}
3255 
3256 	auto rs = self->waitMetricsMap.modify( req.keys );
3257 	for(auto i = rs.begin(); i != rs.end(); ++i) {
3258 		auto &x = i->value();
3259 		for( int j = 0; j < x.size(); j++ ) {
3260 			if( x[j] == change ) {
3261 				swapAndPop(&x, j);
3262 				break;
3263 			}
3264 		}
3265 	}
3266 	self->waitMetricsMap.coalesce( req.keys );
3267 
3268 	if (error.code() != error_code_success ) {
3269 		if (error.code() != error_code_wrong_shard_server) throw error;
3270 		TEST( true );	// ShardWaitMetrics delayed wrong_shard_server()
3271 		req.reply.sendError(error);
3272 	}
3273 
3274 	return Void();
3275 }
3276 
waitMetrics(WaitMetricsRequest req,Future<Void> delay)3277 Future<Void> StorageServerMetrics::waitMetrics(WaitMetricsRequest req, Future<Void> delay) {
3278 	return ::waitMetrics(this, req, delay);
3279 }
3280 
3281 #pragma endregion
3282 
3283 /////////////////////////////// Core //////////////////////////////////////
3284 #pragma region Core
3285 
metricsCore(StorageServer * self,StorageServerInterface ssi)3286 ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi ) {
3287 	state Future<Void> doPollMetrics = Void();
3288 	state ActorCollection actors(false);
3289 
3290 	wait( self->byteSampleRecovery );
3291 
3292 	actors.add(traceCounters("StorageMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self->counters.cc, self->thisServerID.toString() + "/StorageMetrics"));
3293 
3294 	loop {
3295 		choose {
3296 			when (WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
3297 				if (!self->isReadable( req.keys )) {
3298 					TEST( true );	// waitMetrics immediate wrong_shard_server()
3299 					req.reply.sendError(wrong_shard_server());
3300 				} else {
3301 					actors.add( self->metrics.waitMetrics( req, delayJittered( SERVER_KNOBS->STORAGE_METRIC_TIMEOUT ) ) );
3302 				}
3303 			}
3304 			when (SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) {
3305 				if (!self->isReadable( req.keys )) {
3306 					TEST( true );	// splitMetrics immediate wrong_shard_server()
3307 					req.reply.sendError(wrong_shard_server());
3308 				} else {
3309 					self->metrics.splitMetrics( req );
3310 				}
3311 			}
3312 			when (GetPhysicalMetricsRequest req = waitNext(ssi.getPhysicalMetrics.getFuture())) {
3313 				StorageBytes sb = self->storage.getStorageBytes();
3314 				self->metrics.getPhysicalMetrics( req, sb );
3315 			}
3316 			when (wait(doPollMetrics) ) {
3317 				self->metrics.poll();
3318 				doPollMetrics = delay(SERVER_KNOBS->STORAGE_SERVER_POLL_METRICS_DELAY);
3319 			}
3320 			when(wait(actors.getResult())) {}
3321 		}
3322 	}
3323 }
3324 
logLongByteSampleRecovery(Future<Void> recovery)3325 ACTOR Future<Void> logLongByteSampleRecovery(Future<Void> recovery) {
3326 	choose {
3327 		when(wait(recovery)) {}
3328 		when(wait(delay(SERVER_KNOBS->LONG_BYTE_SAMPLE_RECOVERY_DELAY))) {
3329 			TraceEvent(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LongByteSampleRecovery");
3330 		}
3331 	}
3332 
3333 	return Void();
3334 }
3335 
storageServerCore(StorageServer * self,StorageServerInterface ssi)3336 ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterface ssi )
3337 {
3338 	state Future<Void> doUpdate = Void();
3339 	state bool updateReceived = false;  // true iff the current update() actor assigned to doUpdate has already received an update from the tlog
3340 	state ActorCollection actors(false);
3341 	state double lastLoopTopTime = now();
3342 	state Future<Void> dbInfoChange = Void();
3343 	state Future<Void> checkLastUpdate = Void();
3344 	state double updateProcessStatsDelay = SERVER_KNOBS->UPDATE_STORAGE_PROCESS_STATS_INTERVAL;
3345 	state Future<Void> updateProcessStatsTimer = delay(updateProcessStatsDelay);
3346 
3347 	actors.add(updateStorage(self));
3348 	actors.add(waitFailureServer(ssi.waitFailure.getFuture()));
3349 	actors.add(self->otherError.getFuture());
3350 	actors.add(metricsCore(self, ssi));
3351 	actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
3352 
3353 	self->coreStarted.send( Void() );
3354 
3355 	loop {
3356 		++self->counters.loops;
3357 
3358 		double loopTopTime = now();
3359 		double elapsedTime = loopTopTime - lastLoopTopTime;
3360 		if( elapsedTime > 0.050 ) {
3361 			if (g_random->random01() < 0.01)
3362 				TraceEvent(SevWarn, "SlowSSLoopx100", self->thisServerID).detail("Elapsed", elapsedTime);
3363 		}
3364 		lastLoopTopTime = loopTopTime;
3365 
3366 		choose {
3367 			when( wait( checkLastUpdate ) ) {
3368 				if(now() - self->lastUpdate >= CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION) {
3369 					self->noRecentUpdates.set(true);
3370 					checkLastUpdate = delay(CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION);
3371 				} else {
3372 					checkLastUpdate = delay( std::max(CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION-(now()-self->lastUpdate), 0.1) );
3373 				}
3374 			}
3375 			when( wait( dbInfoChange ) ) {
3376 				TEST( self->logSystem );  // shardServer dbInfo changed
3377 				dbInfoChange = self->db->onChange();
3378 				if( self->db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS ) {
3379 					self->logSystem = ILogSystem::fromServerDBInfo( self->thisServerID, self->db->get() );
3380 					if (self->logSystem) {
3381 						if(self->db->get().logSystemConfig.recoveredAt.present()) {
3382 							self->poppedAllAfter = self->db->get().logSystemConfig.recoveredAt.get();
3383 						}
3384 						self->logCursor = self->logSystem->peekSingle( self->thisServerID, self->version.get() + 1, self->tag, self->history );
3385 						self->popVersion( self->durableVersion.get() + 1, true );
3386 					}
3387 					// If update() is waiting for results from the tlog, it might never get them, so needs to be cancelled.  But if it is waiting later,
3388 					// cancelling it could cause problems (e.g. fetchKeys that already committed to transitioning to waiting state)
3389 					if (!updateReceived) {
3390 						doUpdate = Void();
3391 					}
3392 				}
3393 
3394 				Optional<LatencyBandConfig> newLatencyBandConfig = self->db->get().latencyBandConfig;
3395 				if(newLatencyBandConfig.present() != self->latencyBandConfig.present()
3396 					|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().readConfig != self->latencyBandConfig.get().readConfig))
3397 				{
3398 					self->latencyBandConfig = newLatencyBandConfig;
3399 					self->counters.readLatencyBands.clearBands();
3400 					TraceEvent("LatencyBandReadUpdatingConfig").detail("Present", newLatencyBandConfig.present());
3401 					if(self->latencyBandConfig.present()) {
3402 						for(auto band : self->latencyBandConfig.get().readConfig.bands) {
3403 							self->counters.readLatencyBands.addThreshold(band);
3404 						}
3405 					}
3406 				}
3407 			}
3408 			when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
3409 				// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
3410 				if( req.debugID.present() )
3411 					g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.recieved"); //.detail("TaskID", g_network->getCurrentTask());
3412 
3413 				if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
3414 					req.reply.send(GetValueReply());
3415 				else
3416 					actors.add( getValueQ( self, req ) );
3417 			}
3418 			when( WatchValueRequest req = waitNext(ssi.watchValue.getFuture()) ) {
3419 				// TODO: fast load balancing?
3420 				// SOMEDAY: combine watches for the same key/value into a single watch
3421 				actors.add( watchValueQ( self, req ) );
3422 			}
3423 			when (GetKeyRequest req = waitNext(ssi.getKey.getFuture())) {
3424 				// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
3425 				actors.add( getKey( self, req ) );
3426 			}
3427 			when (GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture()) ) {
3428 				// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
3429 				actors.add( getKeyValues( self, req ) );
3430 			}
3431 			when (GetShardStateRequest req = waitNext(ssi.getShardState.getFuture()) ) {
3432 				if (req.mode == GetShardStateRequest::NO_WAIT ) {
3433 					if( self->isReadable( req.keys ) )
3434 						req.reply.send(std::make_pair(self->version.get(),self->durableVersion.get()));
3435 					else
3436 						req.reply.sendError(wrong_shard_server());
3437 				} else {
3438 					actors.add( getShardStateQ( self, req ) );
3439 				}
3440 			}
3441 			when (StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) {
3442 				getQueuingMetrics(self, req);
3443 			}
3444 			when( ReplyPromise<Version> reply = waitNext(ssi.getVersion.getFuture()) ) {
3445 				reply.send( self->version.get() );
3446 			}
3447 			when( ReplyPromise<KeyValueStoreType> reply = waitNext(ssi.getKeyValueStoreType.getFuture()) ) {
3448 				reply.send( self->storage.getKeyValueStoreType() );
3449 			}
3450 			when( wait(doUpdate) ) {
3451 				updateReceived = false;
3452 				if (!self->logSystem)
3453 					doUpdate = Never();
3454 				else
3455 					doUpdate = update( self, &updateReceived );
3456 			}
3457 			when(wait(updateProcessStatsTimer)) {
3458 				updateProcessStats(self);
3459 				updateProcessStatsTimer = delay(updateProcessStatsDelay);
3460 			}
3461 			when(wait(actors.getResult())) {}
3462 		}
3463 	}
3464 }
3465 
storageServerTerminated(StorageServer & self,IKeyValueStore * persistentData,Error const & e)3466 bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData, Error const& e) {
3467 	self.shuttingDown = true;
3468 
3469 	// Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with self still valid
3470 	self.shards.insert( allKeys, Reference<ShardInfo>() );
3471 
3472 	// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent.  Otherwise just close it.
3473 	if (e.code() == error_code_please_reboot) {
3474 		// do nothing.
3475 	} else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
3476 		persistentData->dispose();
3477 	} else {
3478 		persistentData->close();
3479 	}
3480 
3481 	if ( e.code() == error_code_worker_removed ||
3482 		 e.code() == error_code_recruitment_failed ||
3483 		 e.code() == error_code_file_not_found ||
3484 		 e.code() == error_code_actor_cancelled )
3485 	{
3486 		TraceEvent("StorageServerTerminated", self.thisServerID).error(e, true);
3487 		return true;
3488 	} else
3489 		return false;
3490 }
3491 
storageServer(IKeyValueStore * persistentData,StorageServerInterface ssi,Tag seedTag,ReplyPromise<InitializeStorageReply> recruitReply,Reference<AsyncVar<ServerDBInfo>> db,std::string folder)3492 ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<InitializeStorageReply> recruitReply,
3493 	Reference<AsyncVar<ServerDBInfo>> db, std::string folder )
3494 {
3495 	state StorageServer self(persistentData, db, ssi);
3496 
3497 	self.sk = serverKeysPrefixFor( self.thisServerID ).withPrefix(systemKeys.begin);  // FFFF/serverKeys/[this server]/
3498 	self.folder = folder;
3499 
3500 	try {
3501 		wait( self.storage.init() );
3502 		wait( self.storage.commit() );
3503 
3504 		if (seedTag == invalidTag) {
3505 			std::pair<Version, Tag> verAndTag = wait( addStorageServer(self.cx, ssi) ); // Might throw recruitment_failed in case of simultaneous master failure
3506 			self.tag = verAndTag.second;
3507 			self.setInitialVersion( verAndTag.first-1 );
3508 		} else {
3509 			self.tag = seedTag;
3510 		}
3511 
3512 		self.storage.makeNewStorageServerDurable();
3513 		wait( self.storage.commit() );
3514 
3515 		TraceEvent("StorageServerInit", ssi.id()).detail("Version", self.version.get()).detail("SeedTag", seedTag.toString());
3516 		InitializeStorageReply rep;
3517 		rep.interf = ssi;
3518 		rep.addedVersion = self.version.get();
3519 		recruitReply.send(rep);
3520 		self.byteSampleRecovery = Void();
3521 		wait( storageServerCore(&self, ssi) );
3522 
3523 		throw internal_error();
3524 	} catch (Error& e) {
3525 		// If we die with an error before replying to the recruitment request, send the error to the recruiter (ClusterController, and from there to the DataDistributionTeamCollection)
3526 		if (!recruitReply.isSet())
3527 			recruitReply.sendError( recruitment_failed() );
3528 		if (storageServerTerminated(self, persistentData, e))
3529 			return Void();
3530 		throw e;
3531 	}
3532 }
3533 
replaceInterface(StorageServer * self,StorageServerInterface ssi)3534 ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface ssi )
3535 {
3536 	state Transaction tr(self->cx);
3537 
3538 	loop {
3539 		state Future<Void> infoChanged = self->db->onChange();
3540 		state Reference<ProxyInfo> proxies( new ProxyInfo(self->db->get().client.proxies, self->db->get().myLocality) );
3541 		choose {
3542 			when( GetStorageServerRejoinInfoReply _rep = wait( proxies->size() ? loadBalance( proxies, &MasterProxyInterface::getStorageServerRejoinInfo, GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId()) ) : Never() ) ) {
3543 				state GetStorageServerRejoinInfoReply rep = _rep;
3544 				try {
3545 					tr.reset();
3546 					tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
3547 					tr.setVersion( rep.version );
3548 
3549 					tr.addReadConflictRange(singleKeyRange(serverListKeyFor(ssi.id())));
3550 					tr.addReadConflictRange(singleKeyRange(serverTagKeyFor(ssi.id())));
3551 					tr.addReadConflictRange(serverTagHistoryRangeFor(ssi.id()));
3552 					tr.addReadConflictRange(singleKeyRange(tagLocalityListKeyFor(ssi.locality.dcId())));
3553 
3554 					tr.set(serverListKeyFor(ssi.id()), serverListValue(ssi));
3555 
3556 					if(rep.newLocality) {
3557 						tr.addReadConflictRange(tagLocalityListKeys);
3558 						tr.set( tagLocalityListKeyFor(ssi.locality.dcId()), tagLocalityListValue(rep.newTag.get().locality) );
3559 					}
3560 
3561 					if(rep.newTag.present()) {
3562 						KeyRange conflictRange = singleKeyRange(serverTagConflictKeyFor(rep.newTag.get()));
3563 						tr.addReadConflictRange( conflictRange );
3564 						tr.addWriteConflictRange( conflictRange );
3565 						tr.setOption(FDBTransactionOptions::FIRST_IN_BATCH);
3566 						tr.set( serverTagKeyFor(ssi.id()), serverTagValue(rep.newTag.get()) );
3567 						tr.atomicOp( serverTagHistoryKeyFor(ssi.id()), serverTagValue(rep.tag), MutationRef::SetVersionstampedKey );
3568 					}
3569 
3570 					if(rep.history.size() && rep.history.back().first < self->version.get()) {
3571 						tr.clear(serverTagHistoryRangeBefore(ssi.id(), self->version.get()));
3572 					}
3573 
3574 					choose {
3575 						when ( wait( tr.commit() ) ) {
3576 							self->history = rep.history;
3577 
3578 							if(rep.newTag.present()) {
3579 								self->tag = rep.newTag.get();
3580 								self->history.insert(self->history.begin(), std::make_pair(tr.getCommittedVersion(), rep.tag));
3581 							} else {
3582 								self->tag = rep.tag;
3583 							}
3584 							self->allHistory = self->history;
3585 
3586 							TraceEvent("SSTag", self->thisServerID).detail("MyTag", self->tag.toString());
3587 							for(auto it : self->history) {
3588 								TraceEvent("SSHistory", self->thisServerID).detail("Ver", it.first).detail("Tag", it.second.toString());
3589 							}
3590 
3591 							if(self->history.size() && BUGGIFY) {
3592 								TraceEvent("SSHistoryReboot", self->thisServerID);
3593 								throw please_reboot();
3594 							}
3595 
3596 							break;
3597 						}
3598 						when ( wait(infoChanged) ) {}
3599 					}
3600 				} catch (Error& e) {
3601 					wait( tr.onError(e) );
3602 				}
3603 			}
3604 			when ( wait(infoChanged) ) {}
3605 		}
3606 	}
3607 
3608 	return Void();
3609 }
3610 
storageServer(IKeyValueStore * persistentData,StorageServerInterface ssi,Reference<AsyncVar<ServerDBInfo>> db,std::string folder,Promise<Void> recovered)3611 ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, Promise<Void> recovered )
3612 {
3613 	state StorageServer self(persistentData, db, ssi);
3614 	self.folder = folder;
3615 	self.sk = serverKeysPrefixFor( self.thisServerID ).withPrefix(systemKeys.begin);  // FFFF/serverKeys/[this server]/
3616 	try {
3617 		state double start = now();
3618 		TraceEvent("StorageServerRebootStart", self.thisServerID);
3619 		wait(self.storage.init());
3620 		wait(self.storage.commit()); //after a rollback there might be uncommitted changes.
3621 		bool ok = wait( self.storage.restoreDurableState() );
3622 		if (!ok) {
3623 			if(recovered.canBeSet()) recovered.send(Void());
3624 			return Void();
3625 		}
3626 		TraceEvent("SSTimeRestoreDurableState", self.thisServerID).detail("TimeTaken", now() - start);
3627 
3628 		ASSERT( self.thisServerID == ssi.id() );
3629 		TraceEvent("StorageServerReboot", self.thisServerID)
3630 			.detail("Version", self.version.get());
3631 
3632 		if(recovered.canBeSet()) recovered.send(Void());
3633 
3634 		wait( replaceInterface( &self, ssi ) );
3635 
3636 		TraceEvent("StorageServerStartingCore", self.thisServerID).detail("TimeTaken", now() - start);
3637 
3638 		//wait( delay(0) );  // To make sure self->zkMasterInfo.onChanged is available to wait on
3639 		wait( storageServerCore(&self, ssi) );
3640 
3641 		throw internal_error();
3642 	} catch (Error& e) {
3643 		if(recovered.canBeSet()) recovered.send(Void());
3644 		if (storageServerTerminated(self, persistentData, e))
3645 			return Void();
3646 		throw e;
3647 	}
3648 }
3649 
3650 #pragma endregion
3651 
3652 /*
3653 4 Reference count
3654 4 priority
3655 24 pointers
3656 8 lastUpdateVersion
3657 2 updated, replacedPointer
3658 --
3659 42 PTree overhead
3660 
3661 8 Version insertVersion
3662 --
3663 50 VersionedMap overhead
3664 
3665 12 KeyRef
3666 12 ValueRef
3667 1  isClear
3668 --
3669 25 payload
3670 
3671 
3672 50 overhead
3673 25 payload
3674 21 structure padding
3675 32 allocator rounds up
3676 ---
3677 128 allocated
3678 
3679 To reach 64, need to save: 11 bytes + all padding
3680 
3681 Possibilities:
3682   -8 Combine lastUpdateVersion, insertVersion?
3683   -2 Fold together updated, replacedPointer, isClear bits
3684   -3 Fold away updated, replacedPointer, isClear
3685   -8 Move value lengths into arena
3686   -4 Replace priority with H(pointer)
3687   -12 Compress pointers (using special allocator)
3688   -4 Modular lastUpdateVersion (make sure no node survives 4 billion updates)
3689 */
3690 
versionedMapTest()3691 void versionedMapTest() {
3692 	VersionedMap<int,int> vm;
3693 
3694 	printf("SS Ptree node is %zu bytes\n", sizeof( StorageServer::VersionedData::PTreeT ) );
3695 
3696 	const int NSIZE = sizeof(VersionedMap<int,int>::PTreeT);
3697 	const int ASIZE = NSIZE<=64 ? 64 : NextPowerOfTwo<NSIZE>::Result;
3698 
3699 	auto before = FastAllocator< ASIZE >::getTotalMemory();
3700 
3701 	for(int v=1; v<=1000; ++v) {
3702 		vm.createNewVersion(v);
3703 		for(int i=0; i<1000; i++) {
3704 			int k = g_random->randomInt(0, 2000000);
3705 			/*for(int k2=k-5; k2<k+5; k2++)
3706 				if (vm.atLatest().find(k2) != vm.atLatest().end())
3707 					vm.erase(k2);*/
3708 			vm.erase( k-5, k+5 );
3709 			vm.insert( k, v );
3710 		}
3711 	}
3712 
3713 	auto after = FastAllocator< ASIZE >::getTotalMemory();
3714 
3715 	int count = 0;
3716 	for(auto i = vm.atLatest().begin(); i != vm.atLatest().end(); ++i)
3717 		++count;
3718 
3719 	printf("PTree node is %d bytes, allocated as %d bytes\n", NSIZE, ASIZE);
3720 	printf("%d distinct after %d insertions\n", count, 1000*1000);
3721 	printf("Memory used: %f MB\n",
3722 		 (after - before)/ 1e6);
3723 }
3724