1 /*
2 * storageserver.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21
22 #include "fdbrpc/fdbrpc.h"
23 #include "fdbrpc/LoadBalance.h"
24 #include "flow/IndexedSet.h"
25 #include "flow/Hash3.h"
26 #include "flow/ActorCollection.h"
27 #include "flow/SystemMonitor.h"
28 #include "flow/Util.h"
29 #include "fdbclient/Atomic.h"
30 #include "fdbclient/KeyRangeMap.h"
31 #include "fdbclient/SystemData.h"
32 #include "fdbclient/NativeAPI.actor.h"
33 #include "fdbclient/Notified.h"
34 #include "fdbclient/MasterProxyInterface.h"
35 #include "fdbclient/DatabaseContext.h"
36 #include "fdbserver/WorkerInterface.actor.h"
37 #include "fdbserver/TLogInterface.h"
38 #include "fdbserver/MoveKeys.actor.h"
39 #include "fdbserver/Knobs.h"
40 #include "fdbserver/WaitFailure.h"
41 #include "fdbserver/IKeyValueStore.h"
42 #include "fdbclient/VersionedMap.h"
43 #include "fdbserver/StorageMetrics.h"
44 #include "fdbrpc/sim_validation.h"
45 #include "fdbserver/ServerDBInfo.h"
46 #include "fdbrpc/Smoother.h"
47 #include "flow/Stats.h"
48 #include "fdbserver/LogSystem.h"
49 #include "fdbserver/RecoveryState.h"
50 #include "fdbserver/LogProtocolMessage.h"
51 #include "fdbserver/LatencyBandConfig.h"
52 #include "flow/TDMetric.actor.h"
53 #include "flow/actorcompiler.h" // This must be the last #include.
54
55 using std::make_pair;
56
57 #pragma region Data Structures
58
59 #define SHORT_CIRCUT_ACTUAL_STORAGE 0
60
canReplyWith(Error e)61 inline bool canReplyWith(Error e) {
62 switch(e.code()) {
63 case error_code_transaction_too_old:
64 case error_code_future_version:
65 case error_code_wrong_shard_server:
66 case error_code_process_behind:
67 //case error_code_all_alternatives_failed:
68 return true;
69 default:
70 return false;
71 };
72 }
73
74 struct StorageServer;
75 class ValueOrClearToRef {
76 public:
value(ValueRef const & v)77 static ValueOrClearToRef value(ValueRef const& v) { return ValueOrClearToRef(v, false); }
clearTo(KeyRef const & k)78 static ValueOrClearToRef clearTo(KeyRef const& k) { return ValueOrClearToRef(k, true); }
79
isValue() const80 bool isValue() const { return !isClear; };
isClearTo() const81 bool isClearTo() const { return isClear; }
82
getValue() const83 ValueRef const& getValue() const { ASSERT( isValue() ); return item; };
getEndKey() const84 KeyRef const& getEndKey() const { ASSERT(isClearTo()); return item; };
85
86 private:
ValueOrClearToRef(StringRef item,bool isClear)87 ValueOrClearToRef( StringRef item, bool isClear ) : item(item), isClear(isClear) {}
88
89 StringRef item;
90 bool isClear;
91 };
92
93 struct AddingShard : NonCopyable {
94 KeyRange keys;
95 Future<Void> fetchClient; // holds FetchKeys() actor
96 Promise<Void> fetchComplete;
97 Promise<Void> readWrite;
98
99 std::deque< Standalone<VerUpdateRef> > updates; // during the Fetching phase, mutations with key in keys and version>=(fetchClient's) fetchVersion;
100
101 struct StorageServer* server;
102 Version transferredVersion;
103
104 enum Phase { WaitPrevious, Fetching, Waiting };
105 Phase phase;
106
107 AddingShard( StorageServer* server, KeyRangeRef const& keys );
108
109 // When fetchKeys "partially completes" (splits an adding shard in two), this is used to construct the left half
AddingShardAddingShard110 AddingShard( AddingShard* prev, KeyRange const& keys )
111 : keys(keys), fetchClient(prev->fetchClient), server(prev->server), transferredVersion(prev->transferredVersion), phase(prev->phase)
112 {
113 }
~AddingShardAddingShard114 ~AddingShard() {
115 if( !fetchComplete.isSet() )
116 fetchComplete.send(Void());
117 if( !readWrite.isSet() )
118 readWrite.send(Void());
119 }
120
121 void addMutation( Version version, MutationRef const& mutation );
122
isTransferredAddingShard123 bool isTransferred() const { return phase == Waiting; }
124 };
125
126 struct ShardInfo : ReferenceCounted<ShardInfo>, NonCopyable {
127 AddingShard* adding;
128 struct StorageServer* readWrite;
129 KeyRange keys;
130 uint64_t changeCounter;
131
ShardInfoShardInfo132 ShardInfo(KeyRange keys, AddingShard* adding, StorageServer* readWrite)
133 : adding(adding), readWrite(readWrite), keys(keys)
134 {
135 }
136
~ShardInfoShardInfo137 ~ShardInfo() {
138 delete adding;
139 }
140
newNotAssignedShardInfo141 static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, NULL, NULL); }
newReadWriteShardInfo142 static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, NULL, data); }
newAddingShardInfo143 static ShardInfo* newAdding(StorageServer* data, KeyRange keys) { return new ShardInfo(keys, new AddingShard(data, keys), NULL); }
addingSplitLeftShardInfo144 static ShardInfo* addingSplitLeft( KeyRange keys, AddingShard* oldShard) { return new ShardInfo(keys, new AddingShard(oldShard, keys), NULL); }
145
isReadableShardInfo146 bool isReadable() const { return readWrite!=NULL; }
notAssignedShardInfo147 bool notAssigned() const { return !readWrite && !adding; }
assignedShardInfo148 bool assigned() const { return readWrite || adding; }
isInVersionedDataShardInfo149 bool isInVersionedData() const { return readWrite || (adding && adding->isTransferred()); }
150 void addMutation( Version version, MutationRef const& mutation );
isFetchedShardInfo151 bool isFetched() const { return readWrite || ( adding && adding->fetchComplete.isSet() ); }
152
debugDescribeStateShardInfo153 const char* debugDescribeState() const {
154 if (notAssigned()) return "NotAssigned";
155 else if (adding && !adding->isTransferred()) return "AddingFetching";
156 else if (adding) return "AddingTransferred";
157 else return "ReadWrite";
158 }
159 };
160
161 struct StorageServerDisk {
StorageServerDiskStorageServerDisk162 explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage) {}
163
164 void makeNewStorageServerDurable();
165 bool makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft );
166 void makeVersionDurable( Version version );
167 Future<bool> restoreDurableState();
168
169 void changeLogProtocol(Version version, uint64_t protocol);
170
171 void writeMutation( MutationRef mutation );
172 void writeKeyValue( KeyValueRef kv );
173 void clearRange( KeyRangeRef keys );
174
getErrorStorageServerDisk175 Future<Void> getError() { return storage->getError(); }
initStorageServerDisk176 Future<Void> init() { return storage->init(); }
commitStorageServerDisk177 Future<Void> commit() { return storage->commit(); }
178
179 // SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
readNextKeyInclusiveStorageServerDisk180 Future<Key> readNextKeyInclusive( KeyRef key ) { return readFirstKey(storage, KeyRangeRef(key, allKeys.end)); }
readValueStorageServerDisk181 Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) { return storage->readValue(key, debugID); }
readValuePrefixStorageServerDisk182 Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>() ) { return storage->readValuePrefix(key, maxLength, debugID); }
readRangeStorageServerDisk183 Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) { return storage->readRange(keys, rowLimit, byteLimit); }
184
getKeyValueStoreTypeStorageServerDisk185 KeyValueStoreType getKeyValueStoreType() { return storage->getType(); }
getStorageBytesStorageServerDisk186 StorageBytes getStorageBytes() { return storage->getStorageBytes(); }
187
188 private:
189 struct StorageServer* data;
190 IKeyValueStore* storage;
191
192 void writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext );
193
readFirstKeyStorageServerDisk194 ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
195 Standalone<VectorRef<KeyValueRef>> r = wait( storage->readRange( range, 1 ) );
196 if (r.size()) return r[0].key;
197 else return range.end;
198 }
199 };
200
201 struct UpdateEagerReadInfo {
202 vector<KeyRef> keyBegin;
203 vector<Key> keyEnd; // these are for ClearRange
204
205 vector<pair<KeyRef, int>> keys;
206 vector<Optional<Value>> value;
207
208 Arena arena;
209
addMutationsUpdateEagerReadInfo210 void addMutations( VectorRef<MutationRef> const& mutations ) {
211 for(auto& m : mutations)
212 addMutation(m);
213 }
214
addMutationUpdateEagerReadInfo215 void addMutation( MutationRef const& m ) {
216 // SOMEDAY: Theoretically we can avoid a read if there is an earlier overlapping ClearRange
217 if (m.type == MutationRef::ClearRange && !m.param2.startsWith(systemKeys.end))
218 keyBegin.push_back( m.param2 );
219 else if (m.type == MutationRef::CompareAndClear) {
220 keyBegin.push_back(keyAfter(m.param1, arena));
221 if (keys.size() > 0 && keys.back().first == m.param1) {
222 // Don't issue a second read, if the last read was equal to the current key.
223 // CompareAndClear is likely to be used after another atomic operation on same key.
224 keys.back().second = std::max(keys.back().second, m.param2.size() + 1);
225 } else {
226 keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size() + 1));
227 }
228 } else if ((m.type == MutationRef::AppendIfFits) || (m.type == MutationRef::ByteMin) ||
229 (m.type == MutationRef::ByteMax))
230 keys.push_back(pair<KeyRef, int>(m.param1, CLIENT_KNOBS->VALUE_SIZE_LIMIT));
231 else if (isAtomicOp((MutationRef::Type) m.type))
232 keys.push_back(pair<KeyRef, int>(m.param1, m.param2.size()));
233 }
234
finishKeyBeginUpdateEagerReadInfo235 void finishKeyBegin() {
236 std::sort(keyBegin.begin(), keyBegin.end());
237 keyBegin.resize( std::unique(keyBegin.begin(), keyBegin.end()) - keyBegin.begin() );
238 std::sort(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return (lhs.first < rhs.first) || (lhs.first == rhs.first && lhs.second > rhs.second); } );
239 keys.resize(std::unique(keys.begin(), keys.end(), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first == rhs.first; } ) - keys.begin());
240 //value gets populated in doEagerReads
241 }
242
getValueUpdateEagerReadInfo243 Optional<Value>& getValue(KeyRef key) {
244 int i = std::lower_bound(keys.begin(), keys.end(), pair<KeyRef, int>(key, 0), [](const pair<KeyRef, int>& lhs, const pair<KeyRef, int>& rhs) { return lhs.first < rhs.first; } ) - keys.begin();
245 ASSERT( i < keys.size() && keys[i].first == key );
246 return value[i];
247 }
248
getKeyEndUpdateEagerReadInfo249 KeyRef getKeyEnd( KeyRef key ) {
250 int i = std::lower_bound(keyBegin.begin(), keyBegin.end(), key) - keyBegin.begin();
251 ASSERT( i < keyBegin.size() && keyBegin[i] == key );
252 return keyEnd[i];
253 }
254 };
255
256 const int VERSION_OVERHEAD = 64 + sizeof(Version) + sizeof(Standalone<VersionUpdateRef>) + //mutationLog, 64b overhead for map
257 2 * (64 + sizeof(Version) + sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); //versioned map [ x2 for createNewVersion(version+1) ], 64b overhead for map
mvccStorageBytes(MutationRef const & m)258 static int mvccStorageBytes( MutationRef const& m ) { return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2; }
259
260 struct FetchInjectionInfo {
261 Arena arena;
262 vector<VerUpdateRef> changes;
263 };
264
265 struct StorageServer {
266 typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
267
268 private:
269 // versionedData contains sets and clears.
270
271 // * Nonoverlapping: No clear overlaps a set or another clear, or adjoins another clear.
272 // ~ Clears are maximal: If versionedData.at(v) contains a clear [b,e) then
273 // there is a key data[e]@v, or e==allKeys.end, or a shard boundary or former boundary at e
274
275 // * Reads are possible: When k is in a readable shard, for any v in [storageVersion, version.get()],
276 // storage[k] + versionedData.at(v)[k] = database[k] @ v (storage[k] might be @ any version in [durableVersion, storageVersion])
277
278 // * Transferred shards are partially readable: When k is in an adding, transferred shard, for any v in [transferredVersion, version.get()],
279 // storage[k] + versionedData.at(v)[k] = database[k] @ v
280
281 // * versionedData contains versions [storageVersion(), version.get()]. It might also contain version (version.get()+1), in which changeDurableVersion may be deleting ghosts, and/or it might
282 // contain later versions if applyUpdate is on the stack.
283
284 // * Old shards are erased: versionedData.atLatest() has entries (sets or intersecting clears) only for keys in readable or adding,transferred shards.
285 // Earlier versions may have extra entries for shards that *were* readable or adding,transferred when those versions were the latest, but they eventually are forgotten.
286
287 // * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion(), but views
288 // at older versions may contain older items which are also in storage (this is OK because of idempotency)
289
290 VersionedData versionedData;
291 std::map<Version, Standalone<VersionUpdateRef>> mutationLog; // versions (durableVersion, version]
292
293 public:
294 Tag tag;
295 vector<pair<Version,Tag>> history;
296 vector<pair<Version,Tag>> allHistory;
297 Version poppedAllAfter;
298 std::map<Version, Arena> freeable; // for each version, an Arena that must be held until that version is < oldestVersion
299 Arena lastArena;
300 double cpuUsage;
301 double diskUsage;
302
getMutationLogStorageServer303 std::map<Version, Standalone<VersionUpdateRef>> const & getMutationLog() { return mutationLog; }
getMutableMutationLogStorageServer304 std::map<Version, Standalone<VersionUpdateRef>>& getMutableMutationLog() { return mutationLog; }
dataStorageServer305 VersionedData const& data() const { return versionedData; }
mutableDataStorageServer306 VersionedData& mutableData() { return versionedData; }
307
308 void addMutationToMutationLogOrStorage( Version ver, MutationRef m ); // Appends m to mutationLog@ver, or to storage if ver==invalidVersion
309
310 // Update the byteSample, and write the updates to the mutation log@ver, or to storage if ver==invalidVersion
311 void byteSampleApplyMutation( MutationRef const& m, Version ver );
312 void byteSampleApplySet( KeyValueRef kv, Version ver );
313 void byteSampleApplyClear( KeyRangeRef range, Version ver );
314
popVersionStorageServer315 void popVersion(Version v, bool popAllTags = false) {
316 if(logSystem) {
317 if(v > poppedAllAfter) {
318 popAllTags = true;
319 poppedAllAfter = std::numeric_limits<Version>::max();
320 }
321
322 vector<pair<Version,Tag>>* hist = &history;
323 vector<pair<Version,Tag>> allHistoryCopy;
324 if(popAllTags) {
325 allHistoryCopy = allHistory;
326 hist = &allHistoryCopy;
327 }
328
329 while(hist->size() && v > hist->back().first ) {
330 logSystem->pop( v, hist->back().second );
331 hist->pop_back();
332 }
333 if(hist->size()) {
334 logSystem->pop( v, hist->back().second );
335 } else {
336 logSystem->pop( v, tag );
337 }
338 }
339 }
340
addVersionToMutationLogStorageServer341 Standalone<VersionUpdateRef>& addVersionToMutationLog(Version v) {
342 // return existing version...
343 auto m = mutationLog.find(v);
344 if (m != mutationLog.end())
345 return m->second;
346
347 // ...or create a new one
348 auto& u = mutationLog[v];
349 u.version = v;
350 if (lastArena.getSize() >= 65536) lastArena = Arena(4096);
351 u.arena() = lastArena;
352 counters.bytesInput += VERSION_OVERHEAD;
353 return u;
354 }
355
addMutationToMutationLogStorageServer356 MutationRef addMutationToMutationLog(Standalone<VersionUpdateRef> &mLV, MutationRef const& m){
357 byteSampleApplyMutation(m, mLV.version);
358 counters.bytesInput += mvccStorageBytes(m);
359 return mLV.mutations.push_back_deep( mLV.arena(), m );
360 }
361
362 StorageServerDisk storage;
363
364 KeyRangeMap< Reference<ShardInfo> > shards;
365 uint64_t shardChangeCounter; // max( shards->changecounter )
366
367 // newestAvailableVersion[k]
368 // == invalidVersion -> k is unavailable at all versions
369 // <= storageVersion -> k is unavailable at all versions (but might be read anyway from storage if we are in the process of committing makeShardDurable)
370 // == v -> k is readable (from storage+versionedData) @ [storageVersion,v], and not being updated when version increases
371 // == latestVersion -> k is readable (from storage+versionedData) @ [storageVersion,version.get()], and thus stays available when version increases
372 CoalescedKeyRangeMap< Version > newestAvailableVersion;
373
374 CoalescedKeyRangeMap< Version > newestDirtyVersion; // Similar to newestAvailableVersion, but includes (only) keys that were only partly available (due to cancelled fetchKeys)
375
376 // The following are in rough order from newest to oldest
377 Version lastTLogVersion, lastVersionWithData, restoredVersion;
378 NotifiedVersion version;
379 NotifiedVersion desiredOldestVersion; // We can increase oldestVersion (and then durableVersion) to this version when the disk permits
380 NotifiedVersion oldestVersion; // See also storageVersion()
381 NotifiedVersion durableVersion; // At least this version will be readable from storage after a power failure
382 Version rebootAfterDurableVersion;
383 int8_t primaryLocality;
384
385 Deque<std::pair<Version,Version>> recoveryVersionSkips;
386 int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage server
387
388 uint64_t logProtocol;
389
390 Reference<ILogSystem> logSystem;
391 Reference<ILogSystem::IPeekCursor> logCursor;
392
393 UID thisServerID;
394 Key sk;
395 Reference<AsyncVar<ServerDBInfo>> db;
396 Database cx;
397
398 StorageServerMetrics metrics;
399 CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
400 AsyncVar<bool> byteSampleClearsTooLarge;
401 Future<Void> byteSampleRecovery;
402 Future<Void> durableInProgress;
403
404 AsyncMap<Key,bool> watches;
405 int64_t watchBytes;
406 int64_t numWatches;
407 AsyncVar<bool> noRecentUpdates;
408 double lastUpdate;
409
410 Int64MetricHandle readQueueSizeMetric;
411
412 std::string folder;
413
414 // defined only during splitMutations()/addMutation()
415 UpdateEagerReadInfo *updateEagerReads;
416
417 FlowLock durableVersionLock;
418 FlowLock fetchKeysParallelismLock;
419 vector< Promise<FetchInjectionInfo*> > readyFetchKeys;
420
421 int64_t instanceID;
422
423 Promise<Void> otherError;
424 Promise<Void> coreStarted;
425 bool shuttingDown;
426
427 bool behind;
428
429 bool debug_inApplyUpdate;
430 double debug_lastValidateTime;
431
432 int maxQueryQueue;
getAndResetMaxQueryQueueSizeStorageServer433 int getAndResetMaxQueryQueueSize() {
434 int val = maxQueryQueue;
435 maxQueryQueue = 0;
436 return val;
437 }
438
439 Optional<LatencyBandConfig> latencyBandConfig;
440
441 struct Counters {
442 CounterCollection cc;
443 Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries;
444 Counter bytesInput, bytesDurable, bytesFetched,
445 mutationBytes; // Like bytesInput but without MVCC accounting
446 Counter mutations, setMutations, clearRangeMutations, atomicMutations;
447 Counter updateBatches, updateVersions;
448 Counter loops;
449 Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount;
450
451 LatencyBands readLatencyBands;
452
CountersStorageServer::Counters453 Counters(StorageServer* self)
454 : cc("StorageServer", self->thisServerID.toString()),
455 getKeyQueries("GetKeyQueries", cc),
456 getValueQueries("GetValueQueries",cc),
457 getRangeQueries("GetRangeQueries", cc),
458 allQueries("QueryQueue", cc),
459 finishedQueries("FinishedQueries", cc),
460 rowsQueried("RowsQueried", cc),
461 bytesQueried("BytesQueried", cc),
462 watchQueries("WatchQueries", cc),
463 bytesInput("BytesInput", cc),
464 bytesDurable("BytesDurable", cc),
465 bytesFetched("BytesFetched", cc),
466 mutationBytes("MutationBytes", cc),
467 mutations("Mutations", cc),
468 setMutations("SetMutations", cc),
469 clearRangeMutations("ClearRangeMutations", cc),
470 atomicMutations("AtomicMutations", cc),
471 updateBatches("UpdateBatches", cc),
472 updateVersions("UpdateVersions", cc),
473 loops("Loops", cc),
474 fetchWaitingMS("FetchWaitingMS", cc),
475 fetchWaitingCount("FetchWaitingCount", cc),
476 fetchExecutingMS("FetchExecutingMS", cc),
477 fetchExecutingCount("FetchExecutingCount", cc),
478 readLatencyBands("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
479 {
480 specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; });
481 specialCounter(cc, "Version", [self](){ return self->version.get(); });
482 specialCounter(cc, "StorageVersion", [self](){ return self->storageVersion(); });
483 specialCounter(cc, "DurableVersion", [self](){ return self->durableVersion.get(); });
484 specialCounter(cc, "DesiredOldestVersion", [self](){ return self->desiredOldestVersion.get(); });
485 specialCounter(cc, "VersionLag", [self](){ return self->versionLag; });
486
487 specialCounter(cc, "FetchKeysFetchActive", [self](){ return self->fetchKeysParallelismLock.activePermits(); });
488 specialCounter(cc, "FetchKeysWaiting", [self](){ return self->fetchKeysParallelismLock.waiters(); });
489
490 specialCounter(cc, "QueryQueueMax", [self](){ return self->getAndResetMaxQueryQueueSize(); });
491
492 specialCounter(cc, "BytesStored", [self](){ return self->metrics.byteSample.getEstimate(allKeys); });
493 specialCounter(cc, "ActiveWatches", [self](){ return self->numWatches; });
494 specialCounter(cc, "WatchBytes", [self](){ return self->watchBytes; });
495
496 specialCounter(cc, "KvstoreBytesUsed", [self](){ return self->storage.getStorageBytes().used; });
497 specialCounter(cc, "KvstoreBytesFree", [self](){ return self->storage.getStorageBytes().free; });
498 specialCounter(cc, "KvstoreBytesAvailable", [self](){ return self->storage.getStorageBytes().available; });
499 specialCounter(cc, "KvstoreBytesTotal", [self](){ return self->storage.getStorageBytes().total; });
500 }
501 } counters;
502
StorageServerStorageServer503 StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
504 : instanceID(g_random->randomUniqueID().first()),
505 storage(this, storage), db(db),
506 lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
507 rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
508 durableInProgress(Void()),
509 versionLag(0), primaryLocality(tagLocalityInvalid),
510 updateEagerReads(0),
511 shardChangeCounter(0),
512 fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
513 shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
514 logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
515 readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
516 behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
517 lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
518 {
519 version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
520 oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
521 durableVersion.initMetric(LiteralStringRef("StorageServer.DurableVersion"), counters.cc.id);
522 desiredOldestVersion.initMetric(LiteralStringRef("StorageServer.DesiredOldestVersion"), counters.cc.id);
523
524 newestAvailableVersion.insert(allKeys, invalidVersion);
525 newestDirtyVersion.insert(allKeys, invalidVersion);
526 addShard( ShardInfo::newNotAssigned( allKeys ) );
527
528 cx = openDBOnServer(db, TaskDefaultEndpoint, true, true);
529 }
530 //~StorageServer() { fclose(log); }
531
532 // Puts the given shard into shards. The caller is responsible for adding shards
533 // for all ranges in shards.getAffectedRangesAfterInsertion(newShard->keys)), because these
534 // shards are invalidated by the call.
addShardStorageServer535 void addShard( ShardInfo* newShard ) {
536 ASSERT( !newShard->keys.empty() );
537 newShard->changeCounter = ++shardChangeCounter;
538 //TraceEvent("AddShard", this->thisServerID).detail("KeyBegin", newShard->keys.begin).detail("KeyEnd", newShard->keys.end).detail("State", newShard->isReadable() ? "Readable" : newShard->notAssigned() ? "NotAssigned" : "Adding").detail("Version", this->version.get());
539 /*auto affected = shards.getAffectedRangesAfterInsertion( newShard->keys, Reference<ShardInfo>() );
540 for(auto i = affected.begin(); i != affected.end(); ++i)
541 shards.insert( *i, Reference<ShardInfo>() );*/
542 shards.insert( newShard->keys, Reference<ShardInfo>(newShard) );
543 }
544 void addMutation(Version version, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads );
setInitialVersionStorageServer545 void setInitialVersion(Version ver) {
546 version = ver;
547 desiredOldestVersion = ver;
548 oldestVersion = ver;
549 durableVersion = ver;
550 lastVersionWithData = ver;
551 restoredVersion = ver;
552
553 mutableData().createNewVersion(ver);
554 mutableData().forgetVersionsBefore(ver);
555 }
556
557 // This is the maximum version that might be read from storage (the minimum version is durableVersion)
storageVersionStorageServer558 Version storageVersion() const { return oldestVersion.get(); }
559
isReadableStorageServer560 bool isReadable( KeyRangeRef const& keys ) {
561 auto sh = shards.intersectingRanges(keys);
562 for(auto i = sh.begin(); i != sh.end(); ++i)
563 if (!i->value()->isReadable())
564 return false;
565 return true;
566 }
567
checkChangeCounterStorageServer568 void checkChangeCounter( uint64_t oldShardChangeCounter, KeyRef const& key ) {
569 if (oldShardChangeCounter != shardChangeCounter &&
570 shards[key]->changeCounter > oldShardChangeCounter)
571 {
572 TEST(true); // shard change during getValueQ
573 throw wrong_shard_server();
574 }
575 }
576
checkChangeCounterStorageServer577 void checkChangeCounter( uint64_t oldShardChangeCounter, KeyRangeRef const& keys ) {
578 if (oldShardChangeCounter != shardChangeCounter) {
579 auto sh = shards.intersectingRanges(keys);
580 for(auto i = sh.begin(); i != sh.end(); ++i)
581 if (i->value()->changeCounter > oldShardChangeCounter) {
582 TEST(true); // shard change during range operation
583 throw wrong_shard_server();
584 }
585 }
586 }
587
queueSizeStorageServer588 Counter::Value queueSize() {
589 return counters.bytesInput.getValue() - counters.bytesDurable.getValue();
590 }
591
getPenaltyStorageServer592 double getPenalty() {
593 return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
594 }
595 };
596
597 // If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end,
598 // and H(key) < |key+value|/bytesPerSample,
599 // let sampledSize = max(|key+value|,bytesPerSample)
600 // persistByteSampleKeys.begin()+key := sampledSize is in storage
601 // (key,sampledSize) is in byteSample
602
603 // So P(key is sampled) * sampledSize == |key+value|
604
byteSampleApplyMutation(MutationRef const & m,Version ver)605 void StorageServer::byteSampleApplyMutation( MutationRef const& m, Version ver ){
606 if (m.type == MutationRef::ClearRange)
607 byteSampleApplyClear( KeyRangeRef(m.param1, m.param2), ver );
608 else if (m.type == MutationRef::SetValue)
609 byteSampleApplySet( KeyValueRef(m.param1, m.param2), ver );
610 else
611 ASSERT(false); // Mutation of unknown type modfying byte sample
612 }
613
614 #pragma endregion
615
616 /////////////////////////////////// Validation ///////////////////////////////////////
617 #pragma region Validation
validateRange(StorageServer::VersionedData::ViewAtVersion const & view,KeyRangeRef range,Version version,UID id,Version minInsertVersion)618 bool validateRange( StorageServer::VersionedData::ViewAtVersion const& view, KeyRangeRef range, Version version, UID id, Version minInsertVersion ) {
619 // * Nonoverlapping: No clear overlaps a set or another clear, or adjoins another clear.
620 // * Old mutations are erased: All items in versionedData.atLatest() have insertVersion() > durableVersion()
621
622 TraceEvent("ValidateRange", id).detail("KeyBegin", range.begin).detail("KeyEnd", range.end).detail("Version", version);
623 KeyRef k;
624 bool ok = true;
625 bool kIsClear = false;
626 auto i = view.lower_bound(range.begin);
627 if (i != view.begin()) --i;
628 for(; i != view.end() && i.key() < range.end; ++i) {
629 ASSERT( i.insertVersion() > minInsertVersion );
630 if (kIsClear && i->isClearTo() ? i.key() <= k : i.key() < k) {
631 TraceEvent(SevError,"InvalidRange",id).detail("Key1", k).detail("Key2", i.key()).detail("Version", version);
632 ok = false;
633 }
634 //ASSERT( i.key() >= k );
635 kIsClear = i->isClearTo();
636 k = kIsClear ? i->getEndKey() : i.key();
637 }
638 return ok;
639 }
640
validate(StorageServer * data,bool force=false)641 void validate(StorageServer* data, bool force = false) {
642 try {
643 if (force || (EXPENSIVE_VALIDATION)) {
644 data->newestAvailableVersion.validateCoalesced();
645 data->newestDirtyVersion.validateCoalesced();
646
647 for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
648 ASSERT( s->value()->keys == s->range() );
649 ASSERT( !s->value()->keys.empty() );
650 }
651
652 for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s)
653 if (s->value()->isReadable()) {
654 auto ar = data->newestAvailableVersion.intersectingRanges(s->range());
655 for(auto a = ar.begin(); a != ar.end(); ++a)
656 ASSERT( a->value() == latestVersion );
657 }
658
659 // * versionedData contains versions [storageVersion(), version.get()]. It might also contain version (version.get()+1), in which changeDurableVersion may be deleting ghosts, and/or it might
660 // contain later versions if applyUpdate is on the stack.
661 ASSERT( data->data().getOldestVersion() == data->storageVersion() );
662 ASSERT( data->data().getLatestVersion() == data->version.get() || data->data().getLatestVersion() == data->version.get()+1 || (data->debug_inApplyUpdate && data->data().getLatestVersion() > data->version.get()) );
663
664 auto latest = data->data().atLatest();
665
666 // * Old shards are erased: versionedData.atLatest() has entries (sets or clear *begins*) only for keys in readable or adding,transferred shards.
667 for(auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
668 ShardInfo* shard = s->value().getPtr();
669 if (!shard->isInVersionedData()) {
670 if (latest.lower_bound(s->begin()) != latest.lower_bound(s->end())) {
671 TraceEvent(SevError, "VF", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime).detail("KeyBegin", s->begin()).detail("KeyEnd", s->end())
672 .detail("FirstKey", latest.lower_bound(s->begin()).key()).detail("FirstInsertV", latest.lower_bound(s->begin()).insertVersion());
673 }
674 ASSERT( latest.lower_bound(s->begin()) == latest.lower_bound(s->end()) );
675 }
676 }
677
678 latest.validate();
679 validateRange(latest, allKeys, data->version.get(), data->thisServerID, data->durableVersion.get());
680
681 data->debug_lastValidateTime = now();
682 }
683 } catch (...) {
684 TraceEvent(SevError, "ValidationFailure", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime);
685 throw;
686 }
687 }
688 #pragma endregion
689
690 void
updateProcessStats(StorageServer * self)691 updateProcessStats(StorageServer* self)
692 {
693 if (g_network->isSimulated()) {
694 // diskUsage and cpuUsage are not relevant in the simulator,
695 // and relying on the actual values could break seed determinism
696 self->cpuUsage = 100.0;
697 self->diskUsage = 100.0;
698 return;
699 }
700
701 SystemStatistics sysStats = getSystemStatistics();
702 if (sysStats.initialized) {
703 self->cpuUsage = 100 * sysStats.processCPUSeconds / sysStats.elapsed;
704 self->diskUsage = 100 * std::max(0.0, (sysStats.elapsed - sysStats.processDiskIdleSeconds) / sysStats.elapsed);
705 }
706 }
707
708 ///////////////////////////////////// Queries /////////////////////////////////
709 #pragma region Queries
waitForVersion(StorageServer * data,Version version)710 ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
711 // This could become an Actor transparently, but for now it just does the lookup
712 if (version == latestVersion)
713 version = std::max(Version(1), data->version.get());
714 if (version < data->oldestVersion.get() || version <= 0) throw transaction_too_old();
715 else if (version <= data->version.get())
716 return version;
717
718 if(data->behind && version > data->version.get()) {
719 throw process_behind();
720 }
721
722 if(g_random->random01() < 0.001)
723 TraceEvent("WaitForVersion1000x");
724 choose {
725 when ( wait( data->version.whenAtLeast(version) ) ) {
726 //FIXME: A bunch of these can block with or without the following delay 0.
727 //wait( delay(0) ); // don't do a whole bunch of these at once
728 if (version < data->oldestVersion.get()) throw transaction_too_old(); // just in case
729 return version;
730 }
731 when ( wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) {
732 if(g_random->random01() < 0.001)
733 TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
734 .detail("Version", version)
735 .detail("MyVersion", data->version.get())
736 .detail("ServerID", data->thisServerID);
737 throw future_version();
738 }
739 }
740 }
741
waitForVersionNoTooOld(StorageServer * data,Version version)742 ACTOR Future<Version> waitForVersionNoTooOld( StorageServer* data, Version version ) {
743 // This could become an Actor transparently, but for now it just does the lookup
744 if (version == latestVersion)
745 version = std::max(Version(1), data->version.get());
746 if (version <= data->version.get())
747 return version;
748 choose {
749 when ( wait( data->version.whenAtLeast(version) ) ) {
750 return version;
751 }
752 when ( wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) {
753 if(g_random->random01() < 0.001)
754 TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
755 .detail("Version", version)
756 .detail("MyVersion", data->version.get())
757 .detail("ServerID", data->thisServerID);
758 throw future_version();
759 }
760 }
761 }
762
getValueQ(StorageServer * data,GetValueRequest req)763 ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
764 state int64_t resultSize = 0;
765
766 try {
767 ++data->counters.getValueQueries;
768 ++data->counters.allQueries;
769 ++data->readQueueSizeMetric;
770 data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
771
772 // Active load balancing runs at a very high priority (to obtain accurate queue lengths)
773 // so we need to downgrade here
774 wait( delay(0, TaskDefaultEndpoint) );
775
776 if( req.debugID.present() )
777 g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
778
779 state Optional<Value> v;
780 state Version version = wait( waitForVersion( data, req.version ) );
781 if( req.debugID.present() )
782 g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
783
784 state uint64_t changeCounter = data->shardChangeCounter;
785
786 if (!data->shards[req.key]->isReadable()) {
787 //TraceEvent("WrongShardServer", data->thisServerID).detail("Key", req.key).detail("Version", version).detail("In", "getValueQ");
788 throw wrong_shard_server();
789 }
790
791 state int path = 0;
792 auto i = data->data().at(version).lastLessOrEqual(req.key);
793 if (i && i->isValue() && i.key() == req.key) {
794 v = (Value)i->getValue();
795 path = 1;
796 } else if (!i || !i->isClearTo() || i->getEndKey() <= req.key) {
797 path = 2;
798 Optional<Value> vv = wait( data->storage.readValue( req.key, req.debugID ) );
799 // Validate that while we were reading the data we didn't lose the version or shard
800 if (version < data->storageVersion()) {
801 TEST(true); // transaction_too_old after readValue
802 throw transaction_too_old();
803 }
804 data->checkChangeCounter(changeCounter, req.key);
805 v = vv;
806 }
807
808 debugMutation("ShardGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef("<null>")));
809 debugMutation("ShardGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2")));
810
811 /*
812 StorageMetrics m;
813 m.bytesPerKSecond = req.key.size() + (v.present() ? v.get().size() : 0);
814 m.iosPerKSecond = 1;
815 data->metrics.notify(req.key, m);
816 */
817
818 if (v.present()) {
819 ++data->counters.rowsQueried;
820 resultSize = v.get().size();
821 data->counters.bytesQueried += resultSize;
822 }
823
824 if( req.debugID.present() )
825 g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
826
827 GetValueReply reply(v);
828 reply.penalty = data->getPenalty();
829 req.reply.send(reply);
830 } catch (Error& e) {
831 if(!canReplyWith(e))
832 throw;
833 req.reply.sendError(e);
834 }
835
836 ++data->counters.finishedQueries;
837 --data->readQueueSizeMetric;
838 if(data->latencyBandConfig.present()) {
839 int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
840 data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes);
841 }
842
843 return Void();
844 };
845
watchValue_impl(StorageServer * data,WatchValueRequest req)846 ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req ) {
847 try {
848 ++data->counters.watchQueries;
849
850 if( req.debugID.present() )
851 g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
852
853 Version version = wait( waitForVersionNoTooOld( data, req.version ) );
854 if( req.debugID.present() )
855 g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
856
857 loop {
858 try {
859 state Version latest = data->data().latestVersion;
860 state Future<Void> watchFuture = data->watches.onChange(req.key);
861 GetValueRequest getReq( req.key, latest, req.debugID );
862 state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
863 GetValueReply reply = wait( getReq.reply.getFuture() );
864 //TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest);
865
866 debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
867
868 if( req.debugID.present() )
869 g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
870
871 if( reply.value != req.value ) {
872 req.reply.send( latest );
873 return Void();
874 }
875
876 if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) {
877 TEST(true); //Too many watches, reverting to polling
878 req.reply.sendError( watch_cancelled() );
879 return Void();
880 }
881
882 ++data->numWatches;
883 data->watchBytes += ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
884 try {
885 wait( watchFuture );
886 --data->numWatches;
887 data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
888 } catch( Error &e ) {
889 --data->numWatches;
890 data->watchBytes -= ( req.key.expectedSize() + req.value.expectedSize() + 1000 );
891 throw;
892 }
893 } catch( Error &e ) {
894 if( e.code() != error_code_transaction_too_old )
895 throw;
896 }
897 }
898 } catch (Error& e) {
899 if(!canReplyWith(e))
900 throw;
901 req.reply.sendError(e);
902 }
903 return Void();
904 }
905
watchValueQ(StorageServer * data,WatchValueRequest req)906 ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
907 state Future<Void> watch = watchValue_impl( data, req );
908 state double startTime = now();
909
910 loop {
911 double timeoutDelay = -1;
912 if(data->noRecentUpdates.get()) {
913 timeoutDelay = std::max(CLIENT_KNOBS->FAST_WATCH_TIMEOUT - (now() - startTime), 0.0);
914 } else if(!BUGGIFY) {
915 timeoutDelay = std::max(CLIENT_KNOBS->WATCH_TIMEOUT - (now() - startTime), 0.0);
916 }
917 choose {
918 when( wait( watch ) ) {
919 return Void();
920 }
921 when( wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) {
922 req.reply.sendError( timed_out() );
923 return Void();
924 }
925 when( wait( data->noRecentUpdates.onChange()) ) {}
926 }
927 }
928 }
929
getShardState_impl(StorageServer * data,GetShardStateRequest req)930 ACTOR Future<Void> getShardState_impl( StorageServer* data, GetShardStateRequest req ) {
931 ASSERT( req.mode != GetShardStateRequest::NO_WAIT );
932
933 loop {
934 std::vector<Future<Void>> onChange;
935
936 for( auto t : data->shards.intersectingRanges( req.keys ) ) {
937 if( !t.value()->assigned() ) {
938 onChange.push_back( delay( SERVER_KNOBS->SHARD_READY_DELAY ) );
939 break;
940 }
941
942 if( req.mode == GetShardStateRequest::READABLE && !t.value()->isReadable() )
943 onChange.push_back( t.value()->adding->readWrite.getFuture() );
944
945 if( req.mode == GetShardStateRequest::FETCHING && !t.value()->isFetched() )
946 onChange.push_back( t.value()->adding->fetchComplete.getFuture() );
947 }
948
949 if( !onChange.size() ) {
950 req.reply.send(std::make_pair(data->version.get(), data->durableVersion.get()));
951 return Void();
952 }
953
954 wait( waitForAll( onChange ) );
955 wait( delay(0) ); //onChange could have been triggered by cancellation, let things settle before rechecking
956 }
957 }
958
getShardStateQ(StorageServer * data,GetShardStateRequest req)959 ACTOR Future<Void> getShardStateQ( StorageServer* data, GetShardStateRequest req ) {
960 choose {
961 when( wait( getShardState_impl( data, req ) ) ) {}
962 when( wait( delay( g_network->isSimulated() ? 10 : 60 ) ) ) {
963 req.reply.sendError( timed_out() );
964 }
965 }
966 return Void();
967 }
968
merge(Arena & arena,VectorRef<KeyValueRef> & output,VectorRef<KeyValueRef> const & base,StorageServer::VersionedData::iterator & start,StorageServer::VersionedData::iterator const & end,int versionedDataCount,int limit,bool stopAtEndOfBase,int limitBytes=1<<30)969 void merge( Arena& arena, VectorRef<KeyValueRef>& output, VectorRef<KeyValueRef> const& base,
970 StorageServer::VersionedData::iterator& start, StorageServer::VersionedData::iterator const& end,
971 int versionedDataCount, int limit, bool stopAtEndOfBase, int limitBytes = 1<<30 )
972 // Combines data from base (at an older version) with sets from newer versions in [start, end) and appends the first (up to) |limit| rows to output
973 // If limit<0, base and output are in descending order, and start->key()>end->key(), but start is still inclusive and end is exclusive
974 {
975 if (limit==0) return;
976 int originalLimit = abs(limit) + output.size();
977 bool forward = limit>0;
978 if (!forward) limit = -limit;
979 int accumulatedBytes = 0;
980
981 KeyValueRef const* baseStart = base.begin();
982 KeyValueRef const* baseEnd = base.end();
983 while (baseStart!=baseEnd && start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
984 if (forward ? baseStart->key < start.key() : baseStart->key > start.key())
985 output.push_back_deep( arena, *baseStart++ );
986 else {
987 output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
988 if (baseStart->key == start.key()) ++baseStart;
989 if (forward) ++start; else --start;
990 }
991 accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
992 }
993 while (baseStart!=baseEnd && --limit>=0 && accumulatedBytes < limitBytes) {
994 output.push_back_deep( arena, *baseStart++ );
995 accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
996 }
997 if( !stopAtEndOfBase ) {
998 while (start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
999 output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
1000 accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
1001 if (forward) ++start; else --start;
1002 }
1003 }
1004 ASSERT( output.size() <= originalLimit );
1005 }
1006
1007 // readRange reads up to |limit| rows from the given range and version, combining data->storage and data->versionedData.
1008 // If limit>=0, it returns the first rows in the range (sorted ascending), otherwise the last rows (sorted descending).
1009 // readRange has O(|result|) + O(log |data|) cost
readRange(StorageServer * data,Version version,KeyRange range,int limit,int * pLimitBytes)1010 ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes ) {
1011 state GetKeyValuesReply result;
1012 state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
1013 state StorageServer::VersionedData::iterator vStart = view.end();
1014 state StorageServer::VersionedData::iterator vEnd = view.end();
1015 state KeyRef readBegin;
1016 state KeyRef readEnd;
1017 state Key readBeginTemp;
1018 state int vCount;
1019 //state UID rrid = g_random->randomUniqueID();
1020 //state int originalLimit = limit;
1021 //state int originalLimitBytes = *pLimitBytes;
1022 //state bool track = rrid.first() == 0x1bc134c2f752187cLL;
1023
1024 // FIXME: Review pLimitBytes behavior
1025 // if (limit >= 0) we are reading forward, else backward
1026
1027 if (limit >= 0) {
1028 // We might care about a clear beginning before start that
1029 // runs into range
1030 vStart = view.lastLessOrEqual(range.begin);
1031 if (vStart && vStart->isClearTo() && vStart->getEndKey() > range.begin)
1032 readBegin = vStart->getEndKey();
1033 else
1034 readBegin = range.begin;
1035
1036 vStart = view.lower_bound(readBegin);
1037
1038 /*if (track) {
1039 printf("readRange(%llx, @%lld, '%s'-'%s')\n", data->thisServerID.first(), version, printable(range.begin).c_str(), printable(range.end).c_str());
1040 printf("mvcc:\n");
1041 vEnd = view.upper_bound(range.end);
1042 for(auto r=vStart; r != vEnd; ++r) {
1043 if (r->isClearTo())
1044 printf(" '%s'-'%s' cleared\n", printable(r.key()).c_str(), printable(r->getEndKey()).c_str());
1045 else
1046 printf(" '%s' := '%s'\n", printable(r.key()).c_str(), printable(r->getValue()).c_str());
1047 }
1048 }*/
1049
1050 while (limit>0 && *pLimitBytes>0 && readBegin < range.end) {
1051 // ASSERT( vStart == view.lower_bound(readBegin) );
1052 ASSERT( !vStart || vStart.key() >= readBegin );
1053 if (vStart) { auto b = vStart; --b; ASSERT( !b || b.key() < readBegin ); }
1054 ASSERT( data->storageVersion() <= version );
1055
1056 // Read up to limit items from the view, stopping at the next clear (or the end of the range)
1057 vEnd = vStart;
1058 vCount = 0;
1059 int vSize = 0;
1060 while (vEnd && vEnd.key() < range.end && !vEnd->isClearTo() && vCount < limit && vSize < *pLimitBytes){
1061 vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
1062 ++vCount;
1063 ++vEnd;
1064 }
1065
1066 // Read the data on disk up to vEnd (or the end of the range)
1067 readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
1068 Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
1069 data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
1070
1071 /*if (track) {
1072 printf("read [%s,%s): %d rows\n", printable(readBegin).c_str(), printable(readEnd).c_str(), atStorageVersion.size());
1073 for(auto r=atStorageVersion.begin(); r != atStorageVersion.end(); ++r)
1074 printf(" '%s' := '%s'\n", printable(r->key).c_str(), printable(r->value).c_str());
1075 }*/
1076
1077 ASSERT( atStorageVersion.size() <= limit );
1078 if (data->storageVersion() > version) throw transaction_too_old();
1079
1080 bool more = atStorageVersion.size()!=0;
1081
1082 // merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if there is 'more'
1083 int prevSize = result.data.size();
1084 merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, more, *pLimitBytes );
1085 limit -= result.data.size() - prevSize;
1086
1087 for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
1088 *pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
1089
1090 // Setup for the next iteration
1091 if (more) { // if there might be more data, begin reading right after what we already found to find out
1092 //if (track) printf("more\n");
1093 if (!(limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key))
1094 TraceEvent(SevError, "ReadRangeIssue", data->thisServerID).detail("ReadBegin", readBegin).detail("ReadEnd", readEnd)
1095 .detail("VStart", vStart ? vStart.key() : LiteralStringRef("nil")).detail("VEnd", vEnd ? vEnd.key() : LiteralStringRef("nil"))
1096 .detail("AtStorageVersionBack", atStorageVersion.end()[-1].key).detail("ResultBack", result.data.end()[-1].key)
1097 .detail("Limit", limit).detail("LimitBytes", *pLimitBytes).detail("ResultSize", result.data.size()).detail("PrevSize", prevSize);
1098 readBegin = readBeginTemp = keyAfter( result.data.end()[-1].key );
1099 ASSERT( limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key );
1100 } else if (vStart && vStart->isClearTo()){ // if vStart is a clear, skip it.
1101 //if (track) printf("skip clear\n");
1102 readBegin = vStart->getEndKey(); // next disk read should start at the end of the clear
1103 ++vStart;
1104 } else { // Otherwise, continue at readEnd
1105 //if (track) printf("continue\n");
1106 readBegin = readEnd;
1107 }
1108 }
1109 // all but the last item are less than *pLimitBytes
1110 ASSERT( result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0 );
1111 /*if (*pLimitBytes <= 0)
1112 TraceEvent(SevWarn, "ReadRangeLimitExceeded")
1113 .detail("Version", version)
1114 .detail("Begin", range.begin )
1115 .detail("End", range.end )
1116 .detail("LimitReamin", limit)
1117 .detail("LimitBytesRemain", *pLimitBytes); */
1118
1119 /*GetKeyValuesReply correct = wait( readRangeOld(data, version, range, originalLimit, originalLimitBytes) );
1120 bool prefix_equal = true;
1121 int totalsize = 0;
1122 int first_difference = -1;
1123 for(int i=0; i<result.data.size() && i<correct.data.size(); i++) {
1124 if (result.data[i] != correct.data[i]) {
1125 first_difference = i;
1126 prefix_equal = false;
1127 break;
1128 }
1129 totalsize += result.data[i].expectedSize() + sizeof(KeyValueRef);
1130 }
1131
1132 // for the following check
1133 result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
1134 result.version = version;
1135 if ( !(totalsize>originalLimitBytes ? prefix_equal : result.data==correct.data) || correct.more != result.more ) {
1136 TraceEvent(SevError, "IncorrectResult", rrid).detail("Server", data->thisServerID).detail("CorrectRows", correct.data.size())
1137 .detail("FirstDifference", first_difference).detail("OriginalLimit", originalLimit)
1138 .detail("ResultRows", result.data.size()).detail("Result0", result.data[0].key).detail("Correct0", correct.data[0].key)
1139 .detail("ResultN", result.data.size() ? result.data[std::min(correct.data.size(),result.data.size())-1].key : "nil")
1140 .detail("CorrectN", correct.data.size() ? correct.data[std::min(correct.data.size(),result.data.size())-1].key : "nil");
1141 }*/
1142 } else {
1143 // Reverse read - abandon hope alle ye who enter here
1144 readEnd = range.end;
1145
1146 vStart = view.lastLess(readEnd);
1147
1148 // A clear might extend all the way to range.end
1149 if (vStart && vStart->isClearTo() && vStart->getEndKey() >= readEnd) {
1150 readEnd = vStart.key();
1151 --vStart;
1152 }
1153
1154 while (limit < 0 && *pLimitBytes > 0 && readEnd > range.begin) {
1155 vEnd = vStart;
1156 vCount = 0;
1157 int vSize=0;
1158 while (vEnd && vEnd.key() >= range.begin && !vEnd->isClearTo() && vCount < -limit && vSize < *pLimitBytes){
1159 vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
1160 ++vCount;
1161 --vEnd;
1162 }
1163
1164 readBegin = range.begin;
1165 if (vEnd)
1166 readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
1167
1168 Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait( data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit ) );
1169 if (data->storageVersion() > version) throw transaction_too_old();
1170
1171 int prevSize = result.data.size();
1172 merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, false, *pLimitBytes );
1173 limit += result.data.size() - prevSize;
1174
1175 for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
1176 *pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
1177
1178 vStart = vEnd;
1179 readEnd = readBegin;
1180
1181 if (vStart && vStart->isClearTo()) {
1182 ASSERT( vStart.key() < readEnd );
1183 readEnd = vStart.key();
1184 --vStart;
1185 }
1186 }
1187 }
1188 result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
1189 result.version = version;
1190 return result;
1191 }
1192
selectorInRange(KeySelectorRef const & sel,KeyRangeRef const & range)1193 bool selectorInRange( KeySelectorRef const& sel, KeyRangeRef const& range ) {
1194 // Returns true if the given range suffices to at least begin to resolve the given KeySelectorRef
1195 return sel.getKey() >= range.begin && (sel.isBackward() ? sel.getKey() <= range.end : sel.getKey() < range.end);
1196 }
1197
findKey(StorageServer * data,KeySelectorRef sel,Version version,KeyRange range,int * pOffset)1198 ACTOR Future<Key> findKey( StorageServer* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset)
1199 // Attempts to find the key indicated by sel in the data at version, within range.
1200 // Precondition: selectorInRange(sel, range)
1201 // If it is found, offset is set to 0 and a key is returned which falls inside range.
1202 // If the search would depend on any key outside range OR if the key selector offset is too large (range read returns too many bytes), it returns either
1203 // a negative offset and a key in [range.begin, sel.getKey()], indicating the key is (the first key <= returned key) + offset, or
1204 // a positive offset and a key in (sel.getKey(), range.end], indicating the key is (the first key >= returned key) + offset-1
1205 // The range passed in to this function should specify a shard. If range.begin is repeatedly not the beginning of a shard, then it is possible to get stuck looping here
1206 {
1207 ASSERT( version != latestVersion );
1208 ASSERT( selectorInRange(sel, range) && version >= data->oldestVersion.get() );
1209
1210 // Count forward or backward distance items, skipping the first one if it == key and skipEqualKey
1211 state bool forward = sel.offset > 0; // If forward, result >= sel.getKey(); else result <= sel.getKey()
1212 state int sign = forward ? +1 : -1;
1213 state bool skipEqualKey = sel.orEqual == forward;
1214 state int distance = forward ? sel.offset : 1-sel.offset;
1215
1216 //Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from the read range in this case)
1217 state int maxBytes;
1218 if (sel.offset <= 1 && sel.offset >= 0)
1219 maxBytes = std::numeric_limits<int>::max();
1220 else
1221 maxBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_LIMIT_BYTES : SERVER_KNOBS->STORAGE_LIMIT_BYTES;
1222
1223 state GetKeyValuesReply rep = wait( readRange( data, version, forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())), (distance + skipEqualKey)*sign, &maxBytes ) );
1224 state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
1225
1226 //If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in a loop
1227 if(more && !forward && rep.data.size() == 1) {
1228 TEST(true); //Reverse key selector returned only one result in range read
1229 maxBytes = std::numeric_limits<int>::max();
1230 GetKeyValuesReply rep2 = wait( readRange( data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes ) );
1231 rep = rep2;
1232 more = rep.more && rep.data.size() != distance + skipEqualKey;
1233 ASSERT(rep.data.size() == 2 || !more);
1234 }
1235
1236 int index = distance-1;
1237 if (skipEqualKey && rep.data.size() && rep.data[0].key == sel.getKey() )
1238 ++index;
1239
1240 if (index < rep.data.size()) {
1241 *pOffset = 0;
1242 return rep.data[ index ].key;
1243 } else {
1244 // FIXME: If range.begin=="" && !forward, return success?
1245 *pOffset = index - rep.data.size() + 1;
1246 if (!forward) *pOffset = -*pOffset;
1247
1248 if (more) {
1249 TEST(true); // Key selector read range had more results
1250
1251 ASSERT(rep.data.size());
1252 Key returnKey = forward ? keyAfter(rep.data.back().key) : rep.data.back().key;
1253
1254 //This is possible if key/value pairs are very large and only one result is returned on a last less than query
1255 //SOMEDAY: graceful handling of exceptionally sized values
1256 ASSERT(returnKey != sel.getKey());
1257
1258 return returnKey;
1259 }
1260 else
1261 return forward ? range.end : range.begin;
1262 }
1263 }
1264
getShardKeyRange(StorageServer * data,const KeySelectorRef & sel)1265 KeyRange getShardKeyRange( StorageServer* data, const KeySelectorRef& sel )
1266 // Returns largest range such that the shard state isReadable and selectorInRange(sel, range) or wrong_shard_server if no such range exists
1267 {
1268 auto i = sel.isBackward() ? data->shards.rangeContainingKeyBefore( sel.getKey() ) : data->shards.rangeContaining( sel.getKey() );
1269 if (!i->value()->isReadable()) throw wrong_shard_server();
1270 ASSERT( selectorInRange(sel, i->range()) );
1271 return i->range();
1272 }
1273
getKeyValues(StorageServer * data,GetKeyValuesRequest req)1274 ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
1275 // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents
1276 // all data from being read in one range read
1277 {
1278 state int64_t resultSize = 0;
1279
1280 ++data->counters.getRangeQueries;
1281 ++data->counters.allQueries;
1282 ++data->readQueueSizeMetric;
1283 data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
1284
1285 // Active load balancing runs at a very high priority (to obtain accurate queue lengths)
1286 // so we need to downgrade here
1287 wait( delay(0, TaskDefaultEndpoint) );
1288
1289 try {
1290 if( req.debugID.present() )
1291 g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
1292 state Version version = wait( waitForVersion( data, req.version ) );
1293
1294 state uint64_t changeCounter = data->shardChangeCounter;
1295 // try {
1296 state KeyRange shard = getShardKeyRange( data, req.begin );
1297
1298 if( req.debugID.present() )
1299 g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
1300 //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
1301 //} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
1302
1303 if ( !selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end) ) {
1304 // TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents");
1305 throw wrong_shard_server();
1306 }
1307
1308 state int offset1;
1309 state int offset2;
1310 state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual() ? Future<Key>(req.begin.getKey()) : findKey( data, req.begin, version, shard, &offset1 );
1311 state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
1312 state Key begin = wait(fBegin);
1313 state Key end = wait(fEnd);
1314 if( req.debugID.present() )
1315 g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
1316 //.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
1317
1318 // Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
1319 // An end offset of 1 is also OK because the end key is exclusive, so if the first key of the next shard is the end the last actual key returned must be from this shard.
1320 // A begin offset of 1 is also OK because then either begin is past end or equal to end (so the result is definitely empty)
1321 if ((offset1 && offset1!=1) || (offset2 && offset2!=1)) {
1322 TEST(true); // wrong_shard_server due to offset
1323 // We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end, and return a clipped range rather
1324 // than an error (since that is what the NativeAPI.getRange will do anyway via its "slow path"), but we would have to add some flags to the response
1325 // to encode whether we went off the beginning and the end, since it needs that information.
1326 //TraceEvent("WrongShardServer2", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkOffsets").detail("BeginKey", begin).detail("EndKey", end).detail("BeginOffset", offset1).detail("EndOffset", offset2);
1327 throw wrong_shard_server();
1328 }
1329
1330 if (begin >= end) {
1331 if( req.debugID.present() )
1332 g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Send");
1333 //.detail("Begin",begin).detail("End",end);
1334
1335 GetKeyValuesReply none;
1336 none.version = version;
1337 none.more = false;
1338 none.penalty = data->getPenalty();
1339
1340 data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.begin.getKey(), req.end.getKey()), std::max<KeyRef>(req.begin.getKey(), req.end.getKey()) ) );
1341 req.reply.send( none );
1342 } else {
1343 state int remainingLimitBytes = req.limitBytes;
1344
1345 GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
1346 GetKeyValuesReply r = _r;
1347
1348 if( req.debugID.present() )
1349 g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange");
1350 //.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
1351 data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())), std::max<KeyRef>(end, std::max<KeyRef>(req.begin.getKey(), req.end.getKey())) ) );
1352 if (EXPENSIVE_VALIDATION) {
1353 for (int i = 0; i < r.data.size(); i++)
1354 ASSERT(r.data[i].key >= begin && r.data[i].key < end);
1355 ASSERT(r.data.size() <= std::abs(req.limit));
1356 }
1357
1358 /*for( int i = 0; i < r.data.size(); i++ ) {
1359 StorageMetrics m;
1360 m.bytesPerKSecond = r.data[i].expectedSize();
1361 m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
1362 data->metrics.notify(r.data[i].key, m);
1363 }*/
1364
1365 r.penalty = data->getPenalty();
1366 req.reply.send( r );
1367
1368 resultSize = req.limitBytes - remainingLimitBytes;
1369 data->counters.bytesQueried += resultSize;
1370 data->counters.rowsQueried += r.data.size();
1371 }
1372 } catch (Error& e) {
1373 if(!canReplyWith(e))
1374 throw;
1375 req.reply.sendError(e);
1376 }
1377
1378 ++data->counters.finishedQueries;
1379 --data->readQueueSizeMetric;
1380
1381 if(data->latencyBandConfig.present()) {
1382 int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
1383 int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
1384 data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes || abs(req.begin.offset) > maxSelectorOffset || abs(req.end.offset) > maxSelectorOffset);
1385 }
1386
1387 return Void();
1388 }
1389
getKey(StorageServer * data,GetKeyRequest req)1390 ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
1391 state int64_t resultSize = 0;
1392
1393 ++data->counters.getKeyQueries;
1394 ++data->counters.allQueries;
1395 ++data->readQueueSizeMetric;
1396 data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
1397
1398 // Active load balancing runs at a very high priority (to obtain accurate queue lengths)
1399 // so we need to downgrade here
1400 wait( delay(0, TaskDefaultEndpoint) );
1401
1402 try {
1403 state Version version = wait( waitForVersion( data, req.version ) );
1404 state uint64_t changeCounter = data->shardChangeCounter;
1405 state KeyRange shard = getShardKeyRange( data, req.sel );
1406
1407 state int offset;
1408 Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
1409
1410 data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
1411
1412 KeySelector updated;
1413 if (offset < 0)
1414 updated = firstGreaterOrEqual(k)+offset; // first thing on this shard OR (large offset case) smallest key retrieved in range read
1415 else if (offset > 0)
1416 updated = firstGreaterOrEqual(k)+offset-1; // first thing on next shard OR (large offset case) keyAfter largest key retrieved in range read
1417 else
1418 updated = KeySelectorRef(k,true,0); //found
1419
1420 resultSize = k.size();
1421 data->counters.bytesQueried += resultSize;
1422 ++data->counters.rowsQueried;
1423
1424 GetKeyReply reply(updated);
1425 reply.penalty = data->getPenalty();
1426 req.reply.send(reply);
1427 }
1428 catch (Error& e) {
1429 //if (e.code() == error_code_wrong_shard_server) TraceEvent("WrongShardServer").detail("In","getKey");
1430 if(!canReplyWith(e))
1431 throw;
1432 req.reply.sendError(e);
1433 }
1434
1435 ++data->counters.finishedQueries;
1436 --data->readQueueSizeMetric;
1437 if(data->latencyBandConfig.present()) {
1438 int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
1439 int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
1440 data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes || abs(req.sel.offset) > maxSelectorOffset);
1441 }
1442
1443 return Void();
1444 }
1445
getQueuingMetrics(StorageServer * self,StorageQueuingMetricsRequest const & req)1446 void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const& req ) {
1447 StorageQueuingMetricsReply reply;
1448 reply.localTime = now();
1449 reply.instanceID = self->instanceID;
1450 reply.bytesInput = self->counters.bytesInput.getValue();
1451 reply.bytesDurable = self->counters.bytesDurable.getValue();
1452
1453 reply.storageBytes = self->storage.getStorageBytes();
1454
1455 reply.version = self->version.get();
1456 reply.cpuUsage = self->cpuUsage;
1457 reply.diskUsage = self->diskUsage;
1458 reply.durableVersion = self->durableVersion.get();
1459 req.reply.send( reply );
1460 }
1461
1462 #pragma endregion
1463
1464 /////////////////////////// Updates ////////////////////////////////
1465 #pragma region Updates
1466
doEagerReads(StorageServer * data,UpdateEagerReadInfo * eager)1467 ACTOR Future<Void> doEagerReads( StorageServer* data, UpdateEagerReadInfo* eager ) {
1468 eager->finishKeyBegin();
1469
1470 vector<Future<Key>> keyEnd( eager->keyBegin.size() );
1471 for(int i=0; i<keyEnd.size(); i++)
1472 keyEnd[i] = data->storage.readNextKeyInclusive( eager->keyBegin[i] );
1473
1474 state Future<vector<Key>> futureKeyEnds = getAll(keyEnd);
1475
1476 vector<Future<Optional<Value>>> value( eager->keys.size() );
1477 for(int i=0; i<value.size(); i++)
1478 value[i] = data->storage.readValuePrefix( eager->keys[i].first, eager->keys[i].second );
1479
1480 state Future<vector<Optional<Value>>> futureValues = getAll(value);
1481 state vector<Key> keyEndVal = wait( futureKeyEnds );
1482 vector<Optional<Value>> optionalValues = wait ( futureValues);
1483
1484 eager->keyEnd = keyEndVal;
1485 eager->value = optionalValues;
1486
1487 return Void();
1488 }
1489
changeDurableVersion(StorageServer * data,Version desiredDurableVersion)1490 bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion ) {
1491 // Remove entries from the latest version of data->versionedData that haven't changed since they were inserted
1492 // before or at desiredDurableVersion, to maintain the invariants for versionedData.
1493 // Such entries remain in older versions of versionedData until they are forgotten, because it is expensive to dig them out.
1494 // We also remove everything up to and including newDurableVersion from mutationLog, and everything
1495 // up to but excluding desiredDurableVersion from freeable
1496 // May return false if only part of the work has been done, in which case the caller must call again with the same parameters
1497
1498 auto& verData = data->mutableData();
1499 ASSERT( verData.getLatestVersion() == data->version.get() || verData.getLatestVersion() == data->version.get()+1 );
1500
1501 Version nextDurableVersion = desiredDurableVersion;
1502
1503 auto mlv = data->getMutationLog().begin();
1504 if (mlv != data->getMutationLog().end() && mlv->second.version <= desiredDurableVersion) {
1505 auto& v = mlv->second;
1506 nextDurableVersion = v.version;
1507 data->freeable[ data->version.get() ].dependsOn( v.arena() );
1508
1509 if (verData.getLatestVersion() <= data->version.get())
1510 verData.createNewVersion( data->version.get()+1 );
1511
1512 int64_t bytesDurable = VERSION_OVERHEAD;
1513 for(auto m = v.mutations.begin(); m; ++m) {
1514 bytesDurable += mvccStorageBytes(*m);
1515 auto i = verData.atLatest().find(m->param1);
1516 if (i) {
1517 ASSERT( i.key() == m->param1 );
1518 ASSERT( i.insertVersion() >= nextDurableVersion );
1519 if (i.insertVersion() == nextDurableVersion)
1520 verData.erase(i);
1521 }
1522 if (m->type == MutationRef::SetValue) {
1523 // A set can split a clear, so there might be another entry immediately after this one that should also be cleaned up
1524 i = verData.atLatest().upper_bound(m->param1);
1525 if (i) {
1526 ASSERT( i.insertVersion() >= nextDurableVersion );
1527 if (i.insertVersion() == nextDurableVersion)
1528 verData.erase(i);
1529 }
1530 }
1531 }
1532 data->counters.bytesDurable += bytesDurable;
1533 }
1534
1535 if (EXPENSIVE_VALIDATION) {
1536 // Check that the above loop did its job
1537 auto view = data->data().atLatest();
1538 for(auto i = view.begin(); i != view.end(); ++i)
1539 ASSERT( i.insertVersion() > nextDurableVersion );
1540 }
1541 data->getMutableMutationLog().erase(data->getMutationLog().begin(), data->getMutationLog().upper_bound(nextDurableVersion));
1542 data->freeable.erase( data->freeable.begin(), data->freeable.lower_bound(nextDurableVersion) );
1543
1544 Future<Void> checkFatalError = data->otherError.getFuture();
1545 data->durableVersion.set( nextDurableVersion );
1546 if (checkFatalError.isReady()) checkFatalError.get();
1547
1548 //TraceEvent("ForgotVersionsBefore", data->thisServerID).detail("Version", nextDurableVersion);
1549 validate(data);
1550
1551 return nextDurableVersion == desiredDurableVersion;
1552 }
1553
clipMutation(MutationRef const & m,KeyRangeRef range)1554 Optional<MutationRef> clipMutation( MutationRef const& m, KeyRangeRef range ) {
1555 if (isSingleKeyMutation((MutationRef::Type) m.type)) {
1556 if (range.contains(m.param1)) return m;
1557 }
1558 else if (m.type == MutationRef::ClearRange) {
1559 KeyRangeRef i = range & KeyRangeRef(m.param1, m.param2);
1560 if (!i.empty())
1561 return MutationRef( (MutationRef::Type)m.type, i.begin, i.end );
1562 }
1563 else
1564 ASSERT(false);
1565 return Optional<MutationRef>();
1566 }
1567
expandMutation(MutationRef & m,StorageServer::VersionedData const & data,UpdateEagerReadInfo * eager,KeyRef eagerTrustedEnd,Arena & ar)1568 bool expandMutation( MutationRef& m, StorageServer::VersionedData const& data, UpdateEagerReadInfo* eager, KeyRef eagerTrustedEnd, Arena& ar ) {
1569 // After this function call, m should be copied into an arena immediately (before modifying data, shards, or eager)
1570 if (m.type == MutationRef::ClearRange) {
1571 // Expand the clear
1572 const auto& d = data.atLatest();
1573
1574 // If another clear overlaps the beginning of this one, engulf it
1575 auto i = d.lastLess(m.param1);
1576 if (i && i->isClearTo() && i->getEndKey() >= m.param1)
1577 m.param1 = i.key();
1578
1579 // If another clear overlaps the end of this one, engulf it; otherwise expand
1580 i = d.lastLessOrEqual(m.param2);
1581 if (i && i->isClearTo() && i->getEndKey() >= m.param2) {
1582 m.param2 = i->getEndKey();
1583 } else {
1584 // Expand to the next set or clear (from storage or latestVersion), and if it
1585 // is a clear, engulf it as well
1586 i = d.lower_bound(m.param2);
1587 KeyRef endKeyAtStorageVersion = m.param2 == eagerTrustedEnd ? eagerTrustedEnd : std::min( eager->getKeyEnd( m.param2 ), eagerTrustedEnd );
1588 if (!i || endKeyAtStorageVersion < i.key())
1589 m.param2 = endKeyAtStorageVersion;
1590 else if (i->isClearTo())
1591 m.param2 = i->getEndKey();
1592 else
1593 m.param2 = i.key();
1594 }
1595 }
1596 else if (m.type != MutationRef::SetValue && (m.type)) {
1597
1598 Optional<StringRef> oldVal;
1599 auto it = data.atLatest().lastLessOrEqual(m.param1);
1600 if (it != data.atLatest().end() && it->isValue() && it.key() == m.param1)
1601 oldVal = it->getValue();
1602 else if (it != data.atLatest().end() && it->isClearTo() && it->getEndKey() > m.param1) {
1603 TEST(true); // Atomic op right after a clear.
1604 }
1605 else {
1606 Optional<Value>& oldThing = eager->getValue(m.param1);
1607 if (oldThing.present())
1608 oldVal = oldThing.get();
1609 }
1610
1611 switch(m.type) {
1612 case MutationRef::AddValue:
1613 m.param2 = doLittleEndianAdd(oldVal, m.param2, ar);
1614 break;
1615 case MutationRef::And:
1616 m.param2 = doAnd(oldVal, m.param2, ar);
1617 break;
1618 case MutationRef::Or:
1619 m.param2 = doOr(oldVal, m.param2, ar);
1620 break;
1621 case MutationRef::Xor:
1622 m.param2 = doXor(oldVal, m.param2, ar);
1623 break;
1624 case MutationRef::AppendIfFits:
1625 m.param2 = doAppendIfFits(oldVal, m.param2, ar);
1626 break;
1627 case MutationRef::Max:
1628 m.param2 = doMax(oldVal, m.param2, ar);
1629 break;
1630 case MutationRef::Min:
1631 m.param2 = doMin(oldVal, m.param2, ar);
1632 break;
1633 case MutationRef::ByteMin:
1634 m.param2 = doByteMin(oldVal, m.param2, ar);
1635 break;
1636 case MutationRef::ByteMax:
1637 m.param2 = doByteMax(oldVal, m.param2, ar);
1638 break;
1639 case MutationRef::MinV2:
1640 m.param2 = doMinV2(oldVal, m.param2, ar);
1641 break;
1642 case MutationRef::AndV2:
1643 m.param2 = doAndV2(oldVal, m.param2, ar);
1644 break;
1645 case MutationRef::CompareAndClear:
1646 if (oldVal.present() && m.param2 == oldVal.get()) {
1647 m.type = MutationRef::ClearRange;
1648 m.param2 = keyAfter(m.param1, ar);
1649 return expandMutation(m, data, eager, eagerTrustedEnd, ar);
1650 }
1651 return false;
1652 }
1653 m.type = MutationRef::SetValue;
1654 }
1655
1656 return true;
1657 }
1658
isClearContaining(StorageServer::VersionedData::ViewAtVersion const & view,KeyRef key)1659 bool isClearContaining( StorageServer::VersionedData::ViewAtVersion const& view, KeyRef key ) {
1660 auto i = view.lastLessOrEqual(key);
1661 return i && i->isClearTo() && i->getEndKey() > key;
1662 }
1663
applyMutation(StorageServer * self,MutationRef const & m,Arena & arena,StorageServer::VersionedData & data)1664 void applyMutation( StorageServer *self, MutationRef const& m, Arena& arena, StorageServer::VersionedData &data ) {
1665 // m is expected to be in arena already
1666 // Clear split keys are added to arena
1667 StorageMetrics metrics;
1668 metrics.bytesPerKSecond = mvccStorageBytes( m ) / 2;
1669 metrics.iosPerKSecond = 1;
1670 self->metrics.notify(m.param1, metrics);
1671
1672 if (m.type == MutationRef::SetValue) {
1673 auto prev = data.atLatest().lastLessOrEqual(m.param1);
1674 if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) {
1675 ASSERT( prev.key() <= m.param1 );
1676 KeyRef end = prev->getEndKey();
1677 // the insert version of the previous clear is preserved for the "left half", because in changeDurableVersion() the previous clear is still responsible for removing it
1678 // insert() invalidates prev, so prev.key() is not safe to pass to it by reference
1679 data.insert( KeyRef(prev.key()), ValueOrClearToRef::clearTo( m.param1 ), prev.insertVersion() ); // overwritten by below insert if empty
1680 KeyRef nextKey = keyAfter(m.param1, arena);
1681 if ( end != nextKey ) {
1682 ASSERT( end > nextKey );
1683 // the insert version of the "right half" is not preserved, because in changeDurableVersion() this set is responsible for removing it
1684 // FIXME: This copy is technically an asymptotic problem, definitely a waste of memory (copy of keyAfter is a waste, but not asymptotic)
1685 data.insert( nextKey, ValueOrClearToRef::clearTo( KeyRef(arena, end) ) );
1686 }
1687 }
1688 data.insert( m.param1, ValueOrClearToRef::value(m.param2) );
1689 self->watches.trigger( m.param1 );
1690 } else if (m.type == MutationRef::ClearRange) {
1691 data.erase( m.param1, m.param2 );
1692 ASSERT( m.param2 > m.param1 );
1693 ASSERT( !isClearContaining( data.atLatest(), m.param1 ) );
1694 data.insert( m.param1, ValueOrClearToRef::clearTo(m.param2) );
1695 self->watches.triggerRange( m.param1, m.param2 );
1696 }
1697
1698 }
1699
removeDataRange(StorageServer * ss,Standalone<VersionUpdateRef> & mLV,KeyRangeMap<Reference<ShardInfo>> & shards,KeyRangeRef range)1700 void removeDataRange( StorageServer *ss, Standalone<VersionUpdateRef> &mLV, KeyRangeMap<Reference<ShardInfo>>& shards, KeyRangeRef range ) {
1701 // modify the latest version of data to remove all sets and trim all clears to exclude range.
1702 // Add a clear to mLV (mutationLog[data.getLatestVersion()]) that ensures all keys in range are removed from the disk when this latest version becomes durable
1703 // mLV is also modified if necessary to ensure that split clears can be forgotten
1704
1705 MutationRef clearRange( MutationRef::ClearRange, range.begin, range.end );
1706 clearRange = ss->addMutationToMutationLog( mLV, clearRange );
1707
1708 auto& data = ss->mutableData();
1709
1710 // Expand the range to the right to include other shards not in versionedData
1711 for( auto r = shards.rangeContaining(range.end); r != shards.ranges().end() && !r->value()->isInVersionedData(); ++r )
1712 range = KeyRangeRef(range.begin, r->end());
1713
1714 auto endClear = data.atLatest().lastLess( range.end );
1715 if (endClear && endClear->isClearTo() && endClear->getEndKey() > range.end ) {
1716 // This clear has been bumped up to insertVersion==data.getLatestVersion and needs a corresponding mutation log entry to forget
1717 MutationRef m( MutationRef::ClearRange, range.end, endClear->getEndKey() );
1718 m = ss->addMutationToMutationLog( mLV, m );
1719 data.insert( m.param1, ValueOrClearToRef::clearTo( m.param2 ) );
1720 }
1721
1722 auto beginClear = data.atLatest().lastLess( range.begin );
1723 if (beginClear && beginClear->isClearTo() && beginClear->getEndKey() > range.begin ) {
1724 // We don't need any special mutationLog entry - because the begin key and insert version are unchanged the original clear
1725 // mutation works to forget this one - but we need range.begin in the right arena
1726 KeyRef rb( mLV.arena(), range.begin );
1727 // insert() invalidates beginClear, so beginClear.key() is not safe to pass to it by reference
1728 data.insert( KeyRef(beginClear.key()), ValueOrClearToRef::clearTo( rb ), beginClear.insertVersion() );
1729 }
1730
1731 data.erase( range.begin, range.end );
1732 }
1733
1734 void setAvailableStatus( StorageServer* self, KeyRangeRef keys, bool available );
1735 void setAssignedStatus( StorageServer* self, KeyRangeRef keys, bool nowAssigned );
1736
coalesceShards(StorageServer * data,KeyRangeRef keys)1737 void coalesceShards(StorageServer *data, KeyRangeRef keys) {
1738 auto shardRanges = data->shards.intersectingRanges(keys);
1739 auto fullRange = data->shards.ranges();
1740
1741 auto iter = shardRanges.begin();
1742 if( iter != fullRange.begin() ) --iter;
1743 auto iterEnd = shardRanges.end();
1744 if( iterEnd != fullRange.end() ) ++iterEnd;
1745
1746 bool lastReadable = false;
1747 bool lastNotAssigned = false;
1748 KeyRangeMap<Reference<ShardInfo>>::Iterator lastRange;
1749
1750 for( ; iter != iterEnd; ++iter) {
1751 if( lastReadable && iter->value()->isReadable() ) {
1752 KeyRange range = KeyRangeRef( lastRange->begin(), iter->end() );
1753 data->addShard( ShardInfo::newReadWrite( range, data) );
1754 iter = data->shards.rangeContaining(range.begin);
1755 } else if( lastNotAssigned && iter->value()->notAssigned() ) {
1756 KeyRange range = KeyRangeRef( lastRange->begin(), iter->end() );
1757 data->addShard( ShardInfo::newNotAssigned( range) );
1758 iter = data->shards.rangeContaining(range.begin);
1759 }
1760
1761 lastReadable = iter->value()->isReadable();
1762 lastNotAssigned = iter->value()->notAssigned();
1763 lastRange = iter;
1764 }
1765 }
1766
tryGetRange(Database cx,Version version,KeyRangeRef keys,GetRangeLimits limits,bool * isTooOld)1767 ACTOR Future<Standalone<RangeResultRef>> tryGetRange( Database cx, Version version, KeyRangeRef keys, GetRangeLimits limits, bool* isTooOld ) {
1768 state Transaction tr( cx );
1769 state Standalone<RangeResultRef> output;
1770 state KeySelectorRef begin = firstGreaterOrEqual( keys.begin );
1771 state KeySelectorRef end = firstGreaterOrEqual( keys.end );
1772
1773 if( *isTooOld )
1774 throw transaction_too_old();
1775
1776 tr.setVersion( version );
1777 limits.minRows = 0;
1778
1779 try {
1780 loop {
1781 Standalone<RangeResultRef> rep = wait( tr.getRange( begin, end, limits, true ) );
1782 limits.decrement( rep );
1783
1784 if( limits.isReached() || !rep.more ) {
1785 if( output.size() ) {
1786 output.arena().dependsOn( rep.arena() );
1787 output.append( output.arena(), rep.begin(), rep.size() );
1788 if( limits.isReached() && rep.readThrough.present() )
1789 output.readThrough = rep.readThrough.get();
1790 } else {
1791 output = rep;
1792 }
1793
1794 output.more = limits.isReached();
1795
1796 return output;
1797 } else if( rep.readThrough.present() ) {
1798 output.arena().dependsOn( rep.arena() );
1799 if( rep.size() ) {
1800 output.append( output.arena(), rep.begin(), rep.size() );
1801 ASSERT( rep.readThrough.get() > rep.end()[-1].key );
1802 } else {
1803 ASSERT( rep.readThrough.get() > keys.begin );
1804 }
1805 begin = firstGreaterOrEqual( rep.readThrough.get() );
1806 } else {
1807 output.arena().dependsOn( rep.arena() );
1808 output.append( output.arena(), rep.begin(), rep.size() );
1809 begin = firstGreaterThan( output.end()[-1].key );
1810 }
1811 }
1812 } catch( Error &e ) {
1813 if( begin.getKey() != keys.begin && ( e.code() == error_code_transaction_too_old || e.code() == error_code_future_version || e.code() == error_code_process_behind ) ) {
1814 if( e.code() == error_code_transaction_too_old )
1815 *isTooOld = true;
1816 output.more = true;
1817 if( begin.isFirstGreaterOrEqual() )
1818 output.readThrough = begin.getKey();
1819 return output;
1820 }
1821 throw;
1822 }
1823 }
1824
1825 template <class T>
addMutation(T & target,Version version,MutationRef const & mutation)1826 void addMutation( T& target, Version version, MutationRef const& mutation ) {
1827 target.addMutation( version, mutation );
1828 }
1829
1830 template <class T>
addMutation(Reference<T> & target,Version version,MutationRef const & mutation)1831 void addMutation( Reference<T>& target, Version version, MutationRef const& mutation ) {
1832 addMutation(*target, version, mutation);
1833 }
1834
1835 template <class T>
splitMutations(KeyRangeMap<T> & map,VerUpdateRef const & update)1836 void splitMutations( KeyRangeMap<T>& map, VerUpdateRef const& update ) {
1837 for(auto& m : update.mutations) {
1838 splitMutation(map, m, update.version);
1839 }
1840 }
1841
1842 template <class T>
splitMutation(KeyRangeMap<T> & map,MutationRef const & m,Version ver)1843 void splitMutation( KeyRangeMap<T>& map, MutationRef const& m, Version ver ) {
1844 if(isSingleKeyMutation((MutationRef::Type) m.type)) {
1845 if ( !SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(m.param1) )
1846 addMutation( map.rangeContaining(m.param1)->value(), ver, m );
1847 }
1848 else if (m.type == MutationRef::ClearRange) {
1849 KeyRangeRef mKeys( m.param1, m.param2 );
1850 if ( !SHORT_CIRCUT_ACTUAL_STORAGE || !normalKeys.contains(mKeys) ){
1851 auto r = map.intersectingRanges( mKeys );
1852 for(auto i = r.begin(); i != r.end(); ++i) {
1853 KeyRangeRef k = mKeys & i->range();
1854 addMutation( i->value(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end) );
1855 }
1856 }
1857 }
1858 else
1859 ASSERT(false); // Unknown mutation type in splitMutations
1860 }
1861
fetchKeys(StorageServer * data,AddingShard * shard)1862 ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
1863 state TraceInterval interval("FetchKeys");
1864 state KeyRange keys = shard->keys;
1865 state double startt = now();
1866 state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES;
1867
1868 // delay(0) to force a return to the run loop before the work of fetchKeys is started.
1869 // This allows adding->start() to be called inline with CSK.
1870 wait( data->coreStarted.getFuture() && delay( 0 ) );
1871
1872 try {
1873 debugKeyRange("fetchKeysBegin", data->version.get(), shard->keys);
1874
1875 TraceEvent(SevDebug, interval.begin(), data->thisServerID)
1876 .detail("KeyBegin", shard->keys.begin)
1877 .detail("KeyEnd",shard->keys.end);
1878
1879 validate(data);
1880
1881 // Wait (if necessary) for the latest version at which any key in keys was previously available (+1) to be durable
1882 auto navr = data->newestAvailableVersion.intersectingRanges( keys );
1883 Version lastAvailable = invalidVersion;
1884 for(auto r=navr.begin(); r!=navr.end(); ++r) {
1885 ASSERT( r->value() != latestVersion );
1886 lastAvailable = std::max(lastAvailable, r->value());
1887 }
1888 auto ndvr = data->newestDirtyVersion.intersectingRanges( keys );
1889 for(auto r=ndvr.begin(); r!=ndvr.end(); ++r)
1890 lastAvailable = std::max(lastAvailable, r->value());
1891
1892 if (lastAvailable != invalidVersion && lastAvailable >= data->durableVersion.get()) {
1893 TEST(true); // FetchKeys waits for previous available version to be durable
1894 wait( data->durableVersion.whenAtLeast(lastAvailable+1) );
1895 }
1896
1897 TraceEvent(SevDebug, "FetchKeysVersionSatisfied", data->thisServerID).detail("FKID", interval.pairID);
1898
1899 wait( data->fetchKeysParallelismLock.take( TaskDefaultYield, fetchBlockBytes ) );
1900 state FlowLock::Releaser holdingFKPL( data->fetchKeysParallelismLock, fetchBlockBytes );
1901
1902 state double executeStart = now();
1903 ++data->counters.fetchWaitingCount;
1904 data->counters.fetchWaitingMS += 1000*(executeStart - startt);
1905
1906 // Fetch keys gets called while the update actor is processing mutations. data->version will not be updated until all mutations for a version
1907 // have been processed. We need to take the durableVersionLock to ensure data->version is greater than the version of the mutation which caused
1908 // the fetch to be initiated.
1909 wait( data->durableVersionLock.take() );
1910
1911 shard->phase = AddingShard::Fetching;
1912 state Version fetchVersion = data->version.get();
1913
1914 data->durableVersionLock.release();
1915
1916 wait(delay(0));
1917
1918 TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID).detail("FKID", interval.pairID).detail("Version", fetchVersion);
1919
1920 // Get the history
1921 state int debug_getRangeRetries = 0;
1922 state int debug_nextRetryToLog = 1;
1923 state bool isTooOld = false;
1924
1925 //FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server we must refresh the cache manually.
1926 data->cx->invalidateCache(keys);
1927
1928 loop {
1929 try {
1930 TEST(true); // Fetching keys for transferred shard
1931
1932 state Standalone<RangeResultRef> this_block = wait( tryGetRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isTooOld ) );
1933
1934 int expectedSize = (int)this_block.expectedSize() + (8-(int)sizeof(KeyValueRef))*this_block.size();
1935
1936 TraceEvent(SevDebug, "FetchKeysBlock", data->thisServerID).detail("FKID", interval.pairID)
1937 .detail("BlockRows", this_block.size()).detail("BlockBytes", expectedSize)
1938 .detail("KeyBegin", keys.begin).detail("KeyEnd", keys.end)
1939 .detail("Last", this_block.size() ? this_block.end()[-1].key : std::string())
1940 .detail("Version", fetchVersion).detail("More", this_block.more);
1941 debugKeyRange("fetchRange", fetchVersion, keys);
1942 for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
1943
1944 data->counters.bytesFetched += expectedSize;
1945 if( fetchBlockBytes > expectedSize ) {
1946 holdingFKPL.release( fetchBlockBytes - expectedSize );
1947 }
1948
1949 // Wait for permission to proceed
1950 //wait( data->fetchKeysStorageWriteLock.take() );
1951 //state FlowLock::Releaser holdingFKSWL( data->fetchKeysStorageWriteLock );
1952
1953 // Write this_block to storage
1954 state KeyValueRef *kvItr = this_block.begin();
1955 for(; kvItr != this_block.end(); ++kvItr) {
1956 data->storage.writeKeyValue( *kvItr );
1957 wait(yield());
1958 }
1959
1960 kvItr = this_block.begin();
1961 for(; kvItr != this_block.end(); ++kvItr) {
1962 data->byteSampleApplySet( *kvItr, invalidVersion );
1963 wait(yield());
1964 }
1965
1966 if (this_block.more) {
1967 Key nfk = this_block.readThrough.present() ? this_block.readThrough.get() : keyAfter( this_block.end()[-1].key );
1968 if (nfk != keys.end) {
1969 std::deque< Standalone<VerUpdateRef> > updatesToSplit = std::move( shard->updates );
1970
1971 // This actor finishes committing the keys [keys.begin,nfk) that we already fetched.
1972 // The remaining unfetched keys [nfk,keys.end) will become a separate AddingShard with its own fetchKeys.
1973 shard->server->addShard( ShardInfo::addingSplitLeft( KeyRangeRef(keys.begin, nfk), shard ) );
1974 shard->server->addShard( ShardInfo::newAdding( data, KeyRangeRef(nfk, keys.end) ) );
1975 shard = data->shards.rangeContaining( keys.begin ).value()->adding;
1976 auto otherShard = data->shards.rangeContaining( nfk ).value()->adding;
1977 keys = shard->keys;
1978
1979 // Split our prior updates. The ones that apply to our new, restricted key range will go back into shard->updates,
1980 // and the ones delivered to the new shard will be discarded because it is in WaitPrevious phase (hasn't chosen a fetchVersion yet).
1981 // What we are doing here is expensive and could get more expensive if we started having many more blocks per shard. May need optimization in the future.
1982 for(auto u = updatesToSplit.begin(); u != updatesToSplit.end(); ++u)
1983 splitMutations( data->shards, *u );
1984
1985 TEST( true );
1986 TEST( shard->updates.size() );
1987 ASSERT( otherShard->updates.empty() );
1988 }
1989 }
1990
1991 this_block = Standalone<RangeResultRef>();
1992
1993 if (BUGGIFY) wait( delay( 1 ) );
1994
1995 break;
1996 } catch (Error& e) {
1997 TraceEvent("FKBlockFail", data->thisServerID).error(e,true).suppressFor(1.0).detail("FKID", interval.pairID);
1998 if (e.code() == error_code_transaction_too_old){
1999 TEST(true); // A storage server has forgotten the history data we are fetching
2000 Version lastFV = fetchVersion;
2001 fetchVersion = data->version.get();
2002 isTooOld = false;
2003
2004 // Throw away deferred updates from before fetchVersion, since we don't need them to use blocks fetched at that version
2005 while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) shard->updates.pop_front();
2006
2007 //FIXME: remove when we no longer support upgrades from 5.X
2008 if(debug_getRangeRetries >= 100) {
2009 data->cx->enableLocalityLoadBalance = false;
2010 }
2011
2012 debug_getRangeRetries++;
2013 if (debug_nextRetryToLog==debug_getRangeRetries){
2014 debug_nextRetryToLog += std::min(debug_nextRetryToLog, 1024);
2015 TraceEvent(SevWarn, "FetchPast", data->thisServerID).detail("TotalAttempts", debug_getRangeRetries).detail("FKID", interval.pairID).detail("V", lastFV).detail("N", fetchVersion).detail("E", data->version.get());
2016 }
2017 } else if (e.code() == error_code_future_version || e.code() == error_code_process_behind) {
2018 TEST(true); // fetchKeys got future_version or process_behind, so there must be a huge storage lag somewhere. Keep trying.
2019 } else {
2020 throw;
2021 }
2022 wait( delayJittered( FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY ) );
2023 }
2024 }
2025
2026 //FIXME: remove when we no longer support upgrades from 5.X
2027 data->cx->enableLocalityLoadBalance = true;
2028
2029 // We have completed the fetch and write of the data, now we wait for MVCC window to pass.
2030 // As we have finished this work, we will allow more work to start...
2031 shard->fetchComplete.send(Void());
2032
2033 TraceEvent(SevDebug, "FKBeforeFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
2034 // Directly commit()ing the IKVS would interfere with updateStorage, possibly resulting in an incomplete version being recovered.
2035 // Instead we wait for the updateStorage loop to commit something (and consequently also what we have written)
2036
2037 wait( data->durableVersion.whenAtLeast( data->storageVersion()+1 ) );
2038 holdingFKPL.release();
2039
2040 TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID).detail("FKID", interval.pairID).detail("SV", data->storageVersion()).detail("DV", data->durableVersion.get());
2041
2042 // Wait to run during update(), after a new batch of versions is received from the tlog but before eager reads take place.
2043 Promise<FetchInjectionInfo*> p;
2044 data->readyFetchKeys.push_back( p );
2045
2046 FetchInjectionInfo* batch = wait( p.getFuture() );
2047 TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID);
2048
2049 shard->phase = AddingShard::Waiting;
2050
2051 // Choose a transferredVersion. This choice and timing ensure that
2052 // * The transferredVersion can be mutated in versionedData
2053 // * The transferredVersion isn't yet committed to storage (so we can write the availability status change)
2054 // * The transferredVersion is <= the version of any of the updates in batch, and if there is an equal version
2055 // its mutations haven't been processed yet
2056 shard->transferredVersion = data->version.get() + 1;
2057 //shard->transferredVersion = batch->changes[0].version; //< FIXME: This obeys the documented properties, and seems "safer" because it never introduces extra versions into the data structure, but violates some ASSERTs currently
2058 data->mutableData().createNewVersion( shard->transferredVersion );
2059 ASSERT( shard->transferredVersion > data->storageVersion() );
2060 ASSERT( shard->transferredVersion == data->data().getLatestVersion() );
2061
2062 TraceEvent(SevDebug, "FetchKeysHaveData", data->thisServerID).detail("FKID", interval.pairID)
2063 .detail("Version", shard->transferredVersion).detail("StorageVersion", data->storageVersion());
2064 validate(data);
2065
2066 // Put the updates that were collected during the FinalCommit phase into the batch at the transferredVersion. Eager reads will be done
2067 // for them by update(), and the mutations will come back through AddingShard::addMutations and be applied to versionedMap and mutationLog as normal.
2068 // The lie about their version is acceptable because this shard will never be read at versions < transferredVersion
2069 for(auto i=shard->updates.begin(); i!=shard->updates.end(); ++i) {
2070 i->version = shard->transferredVersion;
2071 batch->arena.dependsOn(i->arena());
2072 }
2073
2074 int startSize = batch->changes.size();
2075 TEST(startSize); //Adding fetch data to a batch which already has changes
2076 batch->changes.resize( batch->changes.size()+shard->updates.size() );
2077
2078 //FIXME: pass the deque back rather than copy the data
2079 std::copy( shard->updates.begin(), shard->updates.end(), batch->changes.begin()+startSize );
2080 Version checkv = shard->transferredVersion;
2081
2082 for(auto b = batch->changes.begin()+startSize; b != batch->changes.end(); ++b ) {
2083 ASSERT( b->version >= checkv );
2084 checkv = b->version;
2085 for(auto& m : b->mutations)
2086 debugMutation("fetchKeysFinalCommitInject", batch->changes[0].version, m);
2087 }
2088
2089 shard->updates.clear();
2090
2091 setAvailableStatus(data, keys, true); // keys will be available when getLatestVersion()==transferredVersion is durable
2092
2093 // Wait for the transferredVersion (and therefore the shard data) to be committed and durable.
2094 wait( data->durableVersion.whenAtLeast( shard->transferredVersion ) );
2095
2096 ASSERT( data->shards[shard->keys.begin]->assigned() && data->shards[shard->keys.begin]->keys == shard->keys ); // We aren't changing whether the shard is assigned
2097 data->newestAvailableVersion.insert(shard->keys, latestVersion);
2098 shard->readWrite.send(Void());
2099 data->addShard( ShardInfo::newReadWrite(shard->keys, data) ); // invalidates shard!
2100 coalesceShards(data, keys);
2101
2102 validate(data);
2103
2104 ++data->counters.fetchExecutingCount;
2105 data->counters.fetchExecutingMS += 1000*(now() - executeStart);
2106
2107 TraceEvent(SevDebug, interval.end(), data->thisServerID);
2108 } catch (Error &e){
2109 TraceEvent(SevDebug, interval.end(), data->thisServerID).error(e, true).detail("Version", data->version.get());
2110
2111 if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
2112 if (shard->phase < AddingShard::Waiting) {
2113 data->storage.clearRange( keys );
2114 data->byteSampleApplyClear( keys, invalidVersion );
2115 } else {
2116 ASSERT( data->data().getLatestVersion() > data->version.get() );
2117 removeDataRange( data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, keys );
2118 setAvailableStatus(data, keys, false);
2119 // Prevent another, overlapping fetchKeys from entering the Fetching phase until data->data().getLatestVersion() is durable
2120 data->newestDirtyVersion.insert( keys, data->data().getLatestVersion() );
2121 }
2122 }
2123
2124 TraceEvent(SevError, "FetchKeysError", data->thisServerID)
2125 .error(e)
2126 .detail("Elapsed", now()-startt)
2127 .detail("KeyBegin", keys.begin)
2128 .detail("KeyEnd",keys.end);
2129 if (e.code() != error_code_actor_cancelled)
2130 data->otherError.sendError(e); // Kill the storage server. Are there any recoverable errors?
2131 throw; // goes nowhere
2132 }
2133
2134 return Void();
2135 };
2136
AddingShard(StorageServer * server,KeyRangeRef const & keys)2137 AddingShard::AddingShard( StorageServer* server, KeyRangeRef const& keys )
2138 : server(server), keys(keys), transferredVersion(invalidVersion), phase(WaitPrevious)
2139 {
2140 fetchClient = fetchKeys(server, this);
2141 }
2142
addMutation(Version version,MutationRef const & mutation)2143 void AddingShard::addMutation( Version version, MutationRef const& mutation ){
2144 if (mutation.type == mutation.ClearRange) {
2145 ASSERT( keys.begin<=mutation.param1 && mutation.param2<=keys.end );
2146 }
2147 else if (isSingleKeyMutation((MutationRef::Type) mutation.type)) {
2148 ASSERT( keys.contains(mutation.param1) );
2149 }
2150
2151 if (phase == WaitPrevious) {
2152 // Updates can be discarded
2153 } else if (phase == Fetching) {
2154 if (!updates.size() || version > updates.end()[-1].version) {
2155 VerUpdateRef v;
2156 v.version = version;
2157 v.isPrivateData = false;
2158 updates.push_back(v);
2159 } else {
2160 ASSERT( version == updates.end()[-1].version );
2161 }
2162 updates.back().mutations.push_back_deep( updates.back().arena(), mutation );
2163 } else if (phase == Waiting) {
2164 server->addMutation(version, mutation, keys, server->updateEagerReads);
2165 } else ASSERT(false);
2166 }
2167
addMutation(Version version,MutationRef const & mutation)2168 void ShardInfo::addMutation(Version version, MutationRef const& mutation) {
2169 ASSERT( (void *)this);
2170 ASSERT( keys.contains( mutation.param1 ) );
2171 if (adding)
2172 adding->addMutation(version, mutation);
2173 else if (readWrite)
2174 readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads);
2175 else if (mutation.type != MutationRef::ClearRange) {
2176 TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation.toString());
2177 ASSERT(false); // Mutation delivered to notAssigned shard!
2178 }
2179 }
2180
2181 enum ChangeServerKeysContext { CSK_UPDATE, CSK_RESTORE };
2182 const char* changeServerKeysContextName[] = { "Update", "Restore" };
2183
changeServerKeys(StorageServer * data,const KeyRangeRef & keys,bool nowAssigned,Version version,ChangeServerKeysContext context)2184 void changeServerKeys( StorageServer* data, const KeyRangeRef& keys, bool nowAssigned, Version version, ChangeServerKeysContext context ) {
2185 ASSERT( !keys.empty() );
2186
2187 //TraceEvent("ChangeServerKeys", data->thisServerID)
2188 // .detail("KeyBegin", keys.begin)
2189 // .detail("KeyEnd", keys.end)
2190 // .detail("NowAssigned", nowAssigned)
2191 // .detail("Version", version)
2192 // .detail("Context", changeServerKeysContextName[(int)context]);
2193 validate(data);
2194
2195 debugKeyRange( nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys );
2196
2197 bool isDifferent = false;
2198 auto existingShards = data->shards.intersectingRanges(keys);
2199 for( auto it = existingShards.begin(); it != existingShards.end(); ++it ) {
2200 if( nowAssigned != it->value()->assigned() ) {
2201 isDifferent = true;
2202 /*TraceEvent("CSKRangeDifferent", data->thisServerID)
2203 .detail("KeyBegin", it->range().begin)
2204 .detail("KeyEnd", it->range().end);*/
2205 break;
2206 }
2207 }
2208 if( !isDifferent ) {
2209 //TraceEvent("CSKShortCircuit", data->thisServerID)
2210 // .detail("KeyBegin", keys.begin)
2211 // .detail("KeyEnd", keys.end);
2212 return;
2213 }
2214
2215 // Save a backup of the ShardInfo references before we start messing with shards, in order to defer fetchKeys cancellation (and
2216 // its potential call to removeDataRange()) until shards is again valid
2217 vector< Reference<ShardInfo> > oldShards;
2218 auto os = data->shards.intersectingRanges(keys);
2219 for(auto r = os.begin(); r != os.end(); ++r)
2220 oldShards.push_back( r->value() );
2221
2222 // As addShard (called below)'s documentation requires, reinitialize any overlapping range(s)
2223 auto ranges = data->shards.getAffectedRangesAfterInsertion( keys, Reference<ShardInfo>() ); // null reference indicates the range being changed
2224 for(int i=0; i<ranges.size(); i++) {
2225 if (!ranges[i].value) {
2226 ASSERT( (KeyRangeRef&)ranges[i] == keys ); // there shouldn't be any nulls except for the range being inserted
2227 } else if (ranges[i].value->notAssigned())
2228 data->addShard( ShardInfo::newNotAssigned(ranges[i]) );
2229 else if (ranges[i].value->isReadable())
2230 data->addShard( ShardInfo::newReadWrite(ranges[i], data) );
2231 else {
2232 ASSERT( ranges[i].value->adding );
2233 data->addShard( ShardInfo::newAdding( data, ranges[i] ) );
2234 TEST( true ); // ChangeServerKeys reFetchKeys
2235 }
2236 }
2237
2238 // Shard state depends on nowAssigned and whether the data is available (actually assigned in memory or on the disk) up to the given
2239 // version. The latter depends on data->newestAvailableVersion, so loop over the ranges of that.
2240 // SOMEDAY: Could this just use shards? Then we could explicitly do the removeDataRange here when an adding/transferred shard is cancelled
2241 auto vr = data->newestAvailableVersion.intersectingRanges(keys);
2242 vector<std::pair<KeyRange,Version>> changeNewestAvailable;
2243 vector<KeyRange> removeRanges;
2244 for (auto r = vr.begin(); r != vr.end(); ++r) {
2245 KeyRangeRef range = keys & r->range();
2246 bool dataAvailable = r->value()==latestVersion || r->value() >= version;
2247 /*TraceEvent("CSKRange", data->thisServerID)
2248 .detail("KeyBegin", range.begin)
2249 .detail("KeyEnd", range.end)
2250 .detail("Available", dataAvailable)
2251 .detail("NowAssigned", nowAssigned)
2252 .detail("NewestAvailable", r->value())
2253 .detail("ShardState0", data->shards[range.begin]->debugDescribeState());*/
2254 if (!nowAssigned) {
2255 if (dataAvailable) {
2256 ASSERT( r->value() == latestVersion); // Not that we care, but this used to be checked instead of dataAvailable
2257 ASSERT( data->mutableData().getLatestVersion() > version || context == CSK_RESTORE );
2258 changeNewestAvailable.push_back(make_pair(range, version));
2259 removeRanges.push_back( range );
2260 }
2261 data->addShard( ShardInfo::newNotAssigned(range) );
2262 data->watches.triggerRange( range.begin, range.end );
2263 } else if (!dataAvailable) {
2264 // SOMEDAY: Avoid restarting adding/transferred shards
2265 if (version==0){ // bypass fetchkeys; shard is known empty at version 0
2266 changeNewestAvailable.push_back(make_pair(range, latestVersion));
2267 data->addShard( ShardInfo::newReadWrite(range, data) );
2268 setAvailableStatus(data, range, true);
2269 } else {
2270 auto& shard = data->shards[range.begin];
2271 if( !shard->assigned() || shard->keys != range )
2272 data->addShard( ShardInfo::newAdding(data, range) );
2273 }
2274 } else {
2275 changeNewestAvailable.push_back(make_pair(range, latestVersion));
2276 data->addShard( ShardInfo::newReadWrite(range, data) );
2277 }
2278 }
2279 // Update newestAvailableVersion when a shard becomes (un)available (in a separate loop to avoid invalidating vr above)
2280 for(auto r = changeNewestAvailable.begin(); r != changeNewestAvailable.end(); ++r)
2281 data->newestAvailableVersion.insert( r->first, r->second );
2282
2283 if (!nowAssigned)
2284 data->metrics.notifyNotReadable( keys );
2285
2286 coalesceShards( data, KeyRangeRef(ranges[0].begin, ranges[ranges.size()-1].end) );
2287
2288 // Now it is OK to do removeDataRanges, directly and through fetchKeys cancellation (and we have to do so before validate())
2289 oldShards.clear();
2290 ranges.clear();
2291 for(auto r=removeRanges.begin(); r!=removeRanges.end(); ++r) {
2292 removeDataRange( data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r );
2293 setAvailableStatus(data, *r, false);
2294 }
2295 validate(data);
2296 }
2297
rollback(StorageServer * data,Version rollbackVersion,Version nextVersion)2298 void rollback( StorageServer* data, Version rollbackVersion, Version nextVersion ) {
2299 TEST(true); // call to shard rollback
2300 debugKeyRange("Rollback", rollbackVersion, allKeys);
2301
2302 // We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
2303 // to simply restart the storage server actor and restore from the persistent disk state, and then roll
2304 // forward from the TLog's history. It's not quite as efficient, but we rarely have to do this in practice.
2305
2306 // FIXME: This code is relying for liveness on an undocumented property of the log system implementation: that after a rollback the rolled back versions will
2307 // eventually be missing from the peeked log. A more sophisticated approach would be to make the rollback range durable and, after reboot, skip over
2308 // those versions if they appear in peek results.
2309
2310 throw please_reboot();
2311 }
2312
addMutation(Version version,MutationRef const & mutation,KeyRangeRef const & shard,UpdateEagerReadInfo * eagerReads)2313 void StorageServer::addMutation(Version version, MutationRef const& mutation, KeyRangeRef const& shard, UpdateEagerReadInfo* eagerReads ) {
2314 MutationRef expanded = mutation;
2315 auto& mLog = addVersionToMutationLog(version);
2316
2317 if ( !expandMutation( expanded, data(), eagerReads, shard.end, mLog.arena()) ) {
2318 return;
2319 }
2320 expanded = addMutationToMutationLog(mLog, expanded);
2321 if (debugMutation("expandedMutation", version, expanded)) {
2322 const char* type =
2323 mutation.type == MutationRef::SetValue ? "SetValue" :
2324 mutation.type == MutationRef::ClearRange ? "ClearRange" :
2325 mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" :
2326 mutation.type == MutationRef::DebugKey ? "DebugKey" :
2327 "UnknownMutation";
2328 printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%lld\t%s\t%s\t%s\n", now(), g_network->getLocalAddress().toString().c_str(), "originalMutation", version, type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str());
2329 printf(" shard: %s - %s\n", printable(shard.begin).c_str(), printable(shard.end).c_str());
2330 if (mutation.type == MutationRef::ClearRange && mutation.param2 != shard.end)
2331 printf(" eager: %s\n", printable( eagerReads->getKeyEnd( mutation.param2 ) ).c_str() );
2332 }
2333 applyMutation( this, expanded, mLog.arena(), mutableData() );
2334 }
2335
2336 struct OrderByVersion {
operator ()OrderByVersion2337 bool operator()( const VersionUpdateRef& a, const VersionUpdateRef& b ) {
2338 if (a.version != b.version) return a.version < b.version;
2339 if (a.isPrivateData != b.isPrivateData) return a.isPrivateData;
2340 return false;
2341 }
2342 };
2343
2344 #define PERSIST_PREFIX "\xff\xff"
2345
2346 // Immutable
2347 static const KeyValueRef persistFormat( LiteralStringRef( PERSIST_PREFIX "Format" ), LiteralStringRef("FoundationDB/StorageServer/1/4") );
2348 static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/StorageServer/1/2"), LiteralStringRef("FoundationDB/StorageServer/1/5") );
2349 static const KeyRef persistID = LiteralStringRef( PERSIST_PREFIX "ID" );
2350
2351 // (Potentially) change with the durable version or when fetchKeys completes
2352 static const KeyRef persistVersion = LiteralStringRef( PERSIST_PREFIX "Version" );
2353 static const KeyRangeRef persistShardAssignedKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAssigned/" ), LiteralStringRef( PERSIST_PREFIX "ShardAssigned0" ) );
2354 static const KeyRangeRef persistShardAvailableKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "ShardAvailable/" ), LiteralStringRef( PERSIST_PREFIX "ShardAvailable0" ) );
2355 static const KeyRangeRef persistByteSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS0" ) );
2356 static const KeyRangeRef persistByteSampleSampleKeys = KeyRangeRef( LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/" ), LiteralStringRef( PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0" ) );
2357 static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
2358 static const KeyRef persistPrimaryLocality = LiteralStringRef( PERSIST_PREFIX "PrimaryLocality" );
2359 // data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
2360
2361 class StorageUpdater {
2362 public:
StorageUpdater()2363 StorageUpdater() : fromVersion(invalidVersion), currentVersion(invalidVersion), restoredVersion(invalidVersion), processedStartKey(false) {}
StorageUpdater(Version fromVersion,Version restoredVersion)2364 StorageUpdater(Version fromVersion, Version restoredVersion) : fromVersion(fromVersion), currentVersion(fromVersion), restoredVersion(restoredVersion), processedStartKey(false) {}
2365
applyMutation(StorageServer * data,MutationRef const & m,Version ver)2366 void applyMutation(StorageServer* data, MutationRef const& m, Version ver) {
2367 //TraceEvent("SSNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver);
2368
2369 if(currentVersion != ver) {
2370 fromVersion = currentVersion;
2371 currentVersion = ver;
2372 data->mutableData().createNewVersion(ver);
2373 }
2374
2375 if (m.param1.startsWith( systemKeys.end )) {
2376 //TraceEvent("PrivateData", data->thisServerID).detail("Mutation", m.toString()).detail("Version", ver);
2377 applyPrivateData( data, m );
2378 } else {
2379 // FIXME: enable when debugMutation is active
2380 //for(auto m = changes[c].mutations.begin(); m; ++m) {
2381 // debugMutation("SSUpdateMutation", changes[c].version, *m);
2382 //}
2383
2384 splitMutation( data->shards, m, ver );
2385 }
2386
2387 if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
2388 }
2389
2390 Version currentVersion;
2391 private:
2392 Version fromVersion;
2393 Version restoredVersion;
2394
2395 KeyRef startKey;
2396 bool nowAssigned;
2397 bool processedStartKey;
2398
applyPrivateData(StorageServer * data,MutationRef const & m)2399 void applyPrivateData( StorageServer* data, MutationRef const& m ) {
2400 TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m.toString());
2401
2402 if (processedStartKey) {
2403 // Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
2404 // We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same keys
2405 ASSERT (m.type == MutationRef::SetValue && m.param1.startsWith(data->sk));
2406 KeyRangeRef keys( startKey.removePrefix( data->sk ), m.param1.removePrefix( data->sk ));
2407
2408 // add changes in shard assignment to the mutation log
2409 setAssignedStatus( data, keys, nowAssigned );
2410
2411 // The changes for version have already been received (and are being processed now). We need
2412 // to fetch the data for change.version-1 (changes from versions < change.version)
2413 changeServerKeys( data, keys, nowAssigned, currentVersion-1, CSK_UPDATE );
2414 processedStartKey = false;
2415 } else if (m.type == MutationRef::SetValue && m.param1.startsWith( data->sk )) {
2416 // Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
2417 // We can also ignore clearRanges, because they are always accompanied by such a pair of sets with the same keys
2418 startKey = m.param1;
2419 nowAssigned = m.param2 != serverKeysFalse;
2420 processedStartKey = true;
2421 } else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
2422 // lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)
2423 // That means we don't have to worry about the impact on changeServerKeys
2424 //ASSERT( /*isFirstVersionUpdateFromTLog && */!std::next(it) );
2425
2426 Version rollbackVersion;
2427 BinaryReader br(m.param2, Unversioned());
2428 br >> rollbackVersion;
2429
2430 if ( rollbackVersion < fromVersion && rollbackVersion > restoredVersion ) {
2431 TEST( true ); // ShardApplyPrivateData shard rollback
2432 TraceEvent(SevWarn, "Rollback", data->thisServerID)
2433 .detail("FromVersion", fromVersion)
2434 .detail("ToVersion", rollbackVersion)
2435 .detail("AtVersion", currentVersion)
2436 .detail("StorageVersion", data->storageVersion());
2437 ASSERT( rollbackVersion >= data->storageVersion() );
2438 rollback( data, rollbackVersion, currentVersion );
2439 }
2440
2441 data->recoveryVersionSkips.push_back(std::make_pair(rollbackVersion, currentVersion - rollbackVersion));
2442 } else if (m.type == MutationRef::SetValue && m.param1 == killStoragePrivateKey) {
2443 throw worker_removed();
2444 } else if ((m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) && m.param1.substr(1).startsWith(serverTagPrefix)) {
2445 bool matchesThisServer = decodeServerTagKey(m.param1.substr(1)) == data->thisServerID;
2446 if( (m.type == MutationRef::SetValue && !matchesThisServer) || (m.type == MutationRef::ClearRange && matchesThisServer) )
2447 throw worker_removed();
2448 } else if (m.type == MutationRef::SetValue && m.param1 == rebootWhenDurablePrivateKey) {
2449 data->rebootAfterDurableVersion = currentVersion;
2450 TraceEvent("RebootWhenDurableSet", data->thisServerID).detail("DurableVersion", data->durableVersion.get()).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
2451 } else if (m.type == MutationRef::SetValue && m.param1 == primaryLocalityPrivateKey) {
2452 data->primaryLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
2453 auto& mLV = data->addVersionToMutationLog( data->data().getLatestVersion() );
2454 data->addMutationToMutationLog( mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2) );
2455 } else {
2456 ASSERT(false); // Unknown private mutation
2457 }
2458 }
2459 };
2460
update(StorageServer * data,bool * pReceivedUpdate)2461 ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
2462 {
2463 state double start;
2464 try {
2465 // If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
2466 // This is often referred to as the storage server e-brake (emergency brake)
2467 state double waitStartT = 0;
2468 while ( data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES && data->durableVersion.get() < data->desiredOldestVersion.get() ) {
2469 if (now() - waitStartT >= 1) {
2470 TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
2471 .detail("Version", data->version.get())
2472 .detail("DurableVersion", data->durableVersion.get());
2473 waitStartT = now();
2474 }
2475
2476 data->behind = true;
2477 wait( delayJittered(.005, TaskTLogPeekReply) );
2478 }
2479
2480 while( data->byteSampleClearsTooLarge.get() ) {
2481 wait( data->byteSampleClearsTooLarge.onChange() );
2482 }
2483
2484 state Reference<ILogSystem::IPeekCursor> cursor = data->logCursor;
2485 //TraceEvent("SSUpdatePeeking", data->thisServerID).detail("MyVer", data->version.get()).detail("Epoch", data->updateEpoch).detail("Seq", data->updateSequence);
2486
2487 loop {
2488 wait( cursor->getMore() );
2489 if(!cursor->isExhausted()) {
2490 break;
2491 }
2492 }
2493 if(cursor->popped() > 0)
2494 throw worker_removed();
2495
2496 ++data->counters.updateBatches;
2497 data->lastTLogVersion = cursor->getMaxKnownVersion();
2498 data->versionLag = std::max<int64_t>(0, data->lastTLogVersion - data->version.get());
2499
2500 ASSERT(*pReceivedUpdate == false);
2501 *pReceivedUpdate = true;
2502
2503 start = now();
2504 wait( data->durableVersionLock.take(TaskTLogPeekReply,1) );
2505 state FlowLock::Releaser holdingDVL( data->durableVersionLock );
2506 if(now() - start > 0.1)
2507 TraceEvent("SSSlowTakeLock1", data->thisServerID).detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken).detail("Duration", now() - start).detail("Version", data->version.get());
2508
2509 start = now();
2510 state UpdateEagerReadInfo eager;
2511 state FetchInjectionInfo fii;
2512 state Reference<ILogSystem::IPeekCursor> cloneCursor2;
2513
2514 loop{
2515 state uint64_t changeCounter = data->shardChangeCounter;
2516 bool epochEnd = false;
2517 bool hasPrivateData = false;
2518 bool firstMutation = true;
2519 bool dbgLastMessageWasProtocol = false;
2520
2521 Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
2522 cloneCursor2 = cursor->cloneNoMore();
2523
2524 cloneCursor1->setProtocolVersion(data->logProtocol);
2525
2526 for (; cloneCursor1->hasMessage(); cloneCursor1->nextMessage()) {
2527 ArenaReader& cloneReader = *cloneCursor1->reader();
2528
2529 if (LogProtocolMessage::isNextIn(cloneReader)) {
2530 LogProtocolMessage lpm;
2531 cloneReader >> lpm;
2532 dbgLastMessageWasProtocol = true;
2533 cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
2534 }
2535 else {
2536 MutationRef msg;
2537 cloneReader >> msg;
2538
2539 if (firstMutation && msg.param1.startsWith(systemKeys.end))
2540 hasPrivateData = true;
2541 firstMutation = false;
2542
2543 if (msg.param1 == lastEpochEndPrivateKey) {
2544 epochEnd = true;
2545 ASSERT(dbgLastMessageWasProtocol);
2546 }
2547
2548 eager.addMutation(msg);
2549 dbgLastMessageWasProtocol = false;
2550 }
2551 }
2552
2553 // Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
2554 // If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in the middle of a rolled back version range.
2555 while(!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
2556 auto fk = data->readyFetchKeys.back();
2557 data->readyFetchKeys.pop_back();
2558 fk.send( &fii );
2559 }
2560
2561 for(auto& c : fii.changes)
2562 eager.addMutations(c.mutations);
2563
2564 wait( doEagerReads( data, &eager ) );
2565 if (data->shardChangeCounter == changeCounter) break;
2566 TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it again.
2567 // SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads only selectively
2568 eager = UpdateEagerReadInfo();
2569 }
2570
2571 if(now() - start > 0.1)
2572 TraceEvent("SSSlowTakeLock2", data->thisServerID).detailf("From", "%016llx", debug_lastLoadBalanceResultEndpointToken).detail("Duration", now() - start).detail("Version", data->version.get());
2573
2574 data->updateEagerReads = &eager;
2575 data->debug_inApplyUpdate = true;
2576
2577 state StorageUpdater updater(data->lastVersionWithData, data->restoredVersion);
2578
2579 if (EXPENSIVE_VALIDATION) data->data().atLatest().validate();
2580 validate(data);
2581
2582 state bool injectedChanges = false;
2583 state int changeNum = 0;
2584 state int mutationBytes = 0;
2585 for(; changeNum < fii.changes.size(); changeNum++) {
2586 state int mutationNum = 0;
2587 state VerUpdateRef* pUpdate = &fii.changes[changeNum];
2588 for(; mutationNum < pUpdate->mutations.size(); mutationNum++) {
2589 updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
2590 mutationBytes += pUpdate->mutations[mutationNum].totalSize();
2591 injectedChanges = true;
2592 if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
2593 mutationBytes = 0;
2594 wait(delay(SERVER_KNOBS->UPDATE_DELAY));
2595 }
2596 }
2597 }
2598
2599 state Version ver = invalidVersion;
2600 cloneCursor2->setProtocolVersion(data->logProtocol);
2601 //TraceEvent("SSUpdatePeeked", data->thisServerID).detail("FromEpoch", data->updateEpoch).detail("FromSeq", data->updateSequence).detail("ToEpoch", results.end_epoch).detail("ToSeq", results.end_seq).detail("MsgSize", results.messages.size());
2602 for (;cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
2603 if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
2604 mutationBytes = 0;
2605 //Instead of just yielding, leave time for the storage server to respond to reads
2606 wait(delay(SERVER_KNOBS->UPDATE_DELAY));
2607 }
2608
2609 if (cloneCursor2->version().version > ver) {
2610 ASSERT(cloneCursor2->version().version > data->version.get());
2611 }
2612
2613 auto &rd = *cloneCursor2->reader();
2614
2615 if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) {
2616 ++data->counters.updateVersions;
2617 ver = cloneCursor2->version().version;
2618 }
2619
2620 if (LogProtocolMessage::isNextIn(rd)) {
2621 LogProtocolMessage lpm;
2622 rd >> lpm;
2623
2624 data->logProtocol = rd.protocolVersion();
2625 data->storage.changeLogProtocol(ver, data->logProtocol);
2626 cloneCursor2->setProtocolVersion(rd.protocolVersion());
2627 }
2628 else {
2629 MutationRef msg;
2630 rd >> msg;
2631
2632 if (ver != invalidVersion) { // This change belongs to a version < minVersion
2633 if (debugMutation("SSPeek", ver, msg) || ver == 1)
2634 TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
2635
2636 updater.applyMutation(data, msg, ver);
2637 mutationBytes += msg.totalSize();
2638 data->counters.mutationBytes += msg.totalSize();
2639 ++data->counters.mutations;
2640 switch(msg.type) {
2641 case MutationRef::SetValue:
2642 ++data->counters.setMutations;
2643 break;
2644 case MutationRef::ClearRange:
2645 ++data->counters.clearRangeMutations;
2646 break;
2647 case MutationRef::AddValue:
2648 case MutationRef::And:
2649 case MutationRef::AndV2:
2650 case MutationRef::AppendIfFits:
2651 case MutationRef::ByteMax:
2652 case MutationRef::ByteMin:
2653 case MutationRef::Max:
2654 case MutationRef::Min:
2655 case MutationRef::MinV2:
2656 case MutationRef::Or:
2657 case MutationRef::Xor:
2658 case MutationRef::CompareAndClear:
2659 ++data->counters.atomicMutations;
2660 break;
2661 }
2662 }
2663 else
2664 TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
2665 }
2666 }
2667
2668 if(ver != invalidVersion) {
2669 data->lastVersionWithData = ver;
2670 } else {
2671 ver = cloneCursor2->version().version - 1;
2672 }
2673 if(injectedChanges) data->lastVersionWithData = ver;
2674
2675 data->updateEagerReads = NULL;
2676 data->debug_inApplyUpdate = false;
2677
2678 if(ver == invalidVersion && !fii.changes.empty() ) {
2679 ver = updater.currentVersion;
2680 }
2681
2682 if(ver != invalidVersion && ver > data->version.get()) {
2683 debugKeyRange("SSUpdate", ver, allKeys);
2684
2685 data->mutableData().createNewVersion(ver);
2686 if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
2687
2688 data->noRecentUpdates.set(false);
2689 data->lastUpdate = now();
2690 data->version.set( ver ); // Triggers replies to waiting gets for new version(s)
2691 if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get();
2692
2693 Version maxVersionsInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS;
2694 for(int i = 0; i < data->recoveryVersionSkips.size(); i++) {
2695 maxVersionsInMemory += data->recoveryVersionSkips[i].second;
2696 }
2697
2698 // Trigger updateStorage if necessary
2699 Version proposedOldestVersion = std::max(data->version.get(), cursor->getMinKnownCommittedVersion()) - maxVersionsInMemory;
2700 if(data->primaryLocality == tagLocalitySpecial || data->tag.locality == data->primaryLocality) {
2701 proposedOldestVersion = std::max(proposedOldestVersion, data->lastTLogVersion - maxVersionsInMemory);
2702 }
2703 proposedOldestVersion = std::min(proposedOldestVersion, data->version.get()-1);
2704 proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get());
2705 proposedOldestVersion = std::max(proposedOldestVersion, data->desiredOldestVersion.get());
2706
2707 //TraceEvent("StorageServerUpdated", data->thisServerID).detail("Ver", ver).detail("DataVersion", data->version.get())
2708 // .detail("LastTLogVersion", data->lastTLogVersion).detail("NewOldest", data->oldestVersion.get()).detail("DesiredOldest",data->desiredOldestVersion.get())
2709 // .detail("MaxVersionInMemory", maxVersionsInMemory).detail("Proposed", proposedOldestVersion).detail("PrimaryLocality", data->primaryLocality).detail("Tag", data->tag.toString());
2710
2711 while(!data->recoveryVersionSkips.empty() && proposedOldestVersion > data->recoveryVersionSkips.front().first) {
2712 data->recoveryVersionSkips.pop_front();
2713 }
2714 data->desiredOldestVersion.set(proposedOldestVersion);
2715
2716 }
2717
2718 validate(data);
2719
2720 data->logCursor->advanceTo( cloneCursor2->version() );
2721 if(cursor->version().version >= data->lastTLogVersion) {
2722 if(data->behind) {
2723 TraceEvent("StorageServerNoLongerBehind", data->thisServerID).detail("CursorVersion", cursor->version().version).detail("TLogVersion", data->lastTLogVersion);
2724 }
2725 data->behind = false;
2726 }
2727
2728 return Void(); // update will get called again ASAP
2729 } catch (Error& err) {
2730 state Error e = err;
2731 if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
2732 TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
2733 } else if (e.code() == error_code_please_reboot) {
2734 wait( data->durableInProgress );
2735 }
2736 throw e;
2737 }
2738 }
2739
updateStorage(StorageServer * data)2740 ACTOR Future<Void> updateStorage(StorageServer* data) {
2741 loop {
2742 ASSERT( data->durableVersion.get() == data->storageVersion() );
2743 wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
2744 wait( delay(0, TaskUpdateStorage) );
2745
2746 state Promise<Void> durableInProgress;
2747 data->durableInProgress = durableInProgress.getFuture();
2748
2749 state Version startOldestVersion = data->storageVersion();
2750 state Version newOldestVersion = data->storageVersion();
2751 state Version desiredVersion = data->desiredOldestVersion.get();
2752 state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
2753 loop {
2754 state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
2755 // We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
2756 // forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
2757 Future<Void> finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskUpdateStorage );
2758 data->oldestVersion.set( newOldestVersion );
2759 wait( finishedForgetting );
2760 wait( yield(TaskUpdateStorage) );
2761 if (done) break;
2762 }
2763
2764 if (startOldestVersion != newOldestVersion)
2765 data->storage.makeVersionDurable( newOldestVersion );
2766
2767 debug_advanceMaxCommittedVersion( data->thisServerID, newOldestVersion );
2768 state Future<Void> durable = data->storage.commit();
2769 state Future<Void> durableDelay = Void();
2770
2771 if (bytesLeft > 0)
2772 durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL);
2773
2774 wait( durable );
2775
2776 debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
2777
2778 if(newOldestVersion > data->rebootAfterDurableVersion) {
2779 TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("NewOldestVersion", newOldestVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
2780 // To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
2781 // never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
2782 // Otherwise, in the race situation when storage server receives both reboot and
2783 // brokenPromise of durableInProgress, the worker of the storage server will die.
2784 // We will eventually end up with no worker for storage server role.
2785 // The data distributor's buildTeam() will get stuck in building a team
2786 durableInProgress.sendError(please_reboot());
2787 throw please_reboot();
2788 }
2789
2790 durableInProgress.send(Void());
2791 wait( delay(0, TaskUpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
2792
2793 // Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
2794 // are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
2795 // because otherwise the latest version of mutableData might be partially loaded.
2796 wait( data->durableVersionLock.take() );
2797 data->popVersion( data->durableVersion.get() + 1 );
2798
2799 while (!changeDurableVersion( data, newOldestVersion )) {
2800 if(g_network->check_yield(TaskUpdateStorage)) {
2801 data->durableVersionLock.release();
2802 wait(delay(0, TaskUpdateStorage));
2803 wait( data->durableVersionLock.take() );
2804 }
2805 }
2806
2807 data->durableVersionLock.release();
2808
2809 //TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion);
2810
2811 wait( durableDelay );
2812 }
2813 }
2814
2815 #pragma endregion
2816
2817 ////////////////////////////////// StorageServerDisk ///////////////////////////////////////
2818 #pragma region StorageServerDisk
2819
makeNewStorageServerDurable()2820 void StorageServerDisk::makeNewStorageServerDurable() {
2821 storage->set( persistFormat );
2822 storage->set( KeyValueRef(persistID, BinaryWriter::toValue(data->thisServerID, Unversioned())) );
2823 storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(data->version.get(), Unversioned())) );
2824 storage->set( KeyValueRef(persistShardAssignedKeys.begin.toString(), LiteralStringRef("0")) );
2825 storage->set( KeyValueRef(persistShardAvailableKeys.begin.toString(), LiteralStringRef("0")) );
2826 }
2827
setAvailableStatus(StorageServer * self,KeyRangeRef keys,bool available)2828 void setAvailableStatus( StorageServer* self, KeyRangeRef keys, bool available ) {
2829 //ASSERT( self->debug_inApplyUpdate );
2830 ASSERT( !keys.empty() );
2831
2832 auto& mLV = self->addVersionToMutationLog( self->data().getLatestVersion() );
2833
2834 KeyRange availableKeys = KeyRangeRef( persistShardAvailableKeys.begin.toString() + keys.begin.toString(), persistShardAvailableKeys.begin.toString() + keys.end.toString() );
2835 //TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", availableKeys.begin).detail("RangeEnd", availableKeys.end);
2836
2837 self->addMutationToMutationLog( mLV, MutationRef( MutationRef::ClearRange, availableKeys.begin, availableKeys.end ) );
2838 self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, availableKeys.begin, available ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2839 if (keys.end != allKeys.end) {
2840 bool endAvailable = self->shards.rangeContaining( keys.end )->value()->isInVersionedData();
2841 self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, availableKeys.end, endAvailable ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2842 }
2843
2844 }
2845
setAssignedStatus(StorageServer * self,KeyRangeRef keys,bool nowAssigned)2846 void setAssignedStatus( StorageServer* self, KeyRangeRef keys, bool nowAssigned ) {
2847 ASSERT( !keys.empty() );
2848 auto& mLV = self->addVersionToMutationLog( self->data().getLatestVersion() );
2849 KeyRange assignedKeys = KeyRangeRef(
2850 persistShardAssignedKeys.begin.toString() + keys.begin.toString(),
2851 persistShardAssignedKeys.begin.toString() + keys.end.toString() );
2852 //TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
2853 self->addMutationToMutationLog( mLV, MutationRef( MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end ) );
2854 self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.begin,
2855 nowAssigned ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2856 if (keys.end != allKeys.end) {
2857 bool endAssigned = self->shards.rangeContaining( keys.end )->value()->assigned();
2858 self->addMutationToMutationLog( mLV, MutationRef( MutationRef::SetValue, assignedKeys.end, endAssigned ? LiteralStringRef("1") : LiteralStringRef("0") ) );
2859 }
2860 }
2861
clearRange(KeyRangeRef keys)2862 void StorageServerDisk::clearRange( KeyRangeRef keys ) {
2863 storage->clear(keys);
2864 }
2865
writeKeyValue(KeyValueRef kv)2866 void StorageServerDisk::writeKeyValue( KeyValueRef kv ) {
2867 storage->set( kv );
2868 }
2869
writeMutation(MutationRef mutation)2870 void StorageServerDisk::writeMutation( MutationRef mutation ) {
2871 // FIXME: debugMutation(debugContext, debugVersion, *m);
2872 if (mutation.type == MutationRef::SetValue) {
2873 storage->set( KeyValueRef(mutation.param1, mutation.param2) );
2874 } else if (mutation.type == MutationRef::ClearRange) {
2875 storage->clear( KeyRangeRef(mutation.param1, mutation.param2) );
2876 } else
2877 ASSERT(false);
2878 }
2879
writeMutations(MutationListRef mutations,Version debugVersion,const char * debugContext)2880 void StorageServerDisk::writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext ) {
2881 for(auto m = mutations.begin(); m; ++m) {
2882 debugMutation(debugContext, debugVersion, *m);
2883 if (m->type == MutationRef::SetValue) {
2884 storage->set( KeyValueRef(m->param1, m->param2) );
2885 } else if (m->type == MutationRef::ClearRange) {
2886 storage->clear( KeyRangeRef(m->param1, m->param2) );
2887 }
2888 }
2889 }
2890
makeVersionMutationsDurable(Version & prevStorageVersion,Version newStorageVersion,int64_t & bytesLeft)2891 bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft ) {
2892 if (bytesLeft <= 0) return true;
2893
2894 // Apply mutations from the mutationLog
2895 auto u = data->getMutationLog().upper_bound(prevStorageVersion);
2896 if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
2897 VersionUpdateRef const& v = u->second;
2898 ASSERT( v.version > prevStorageVersion && v.version <= newStorageVersion );
2899 debugKeyRange("makeVersionMutationsDurable", v.version, allKeys);
2900 writeMutations(v.mutations, v.version, "makeVersionDurable");
2901 for(auto m=v.mutations.begin(); m; ++m)
2902 bytesLeft -= mvccStorageBytes(*m);
2903 prevStorageVersion = v.version;
2904 return false;
2905 } else {
2906 prevStorageVersion = newStorageVersion;
2907 return true;
2908 }
2909 }
2910
2911 // Update data->storage to persist the changes from (data->storageVersion(),version]
makeVersionDurable(Version version)2912 void StorageServerDisk::makeVersionDurable( Version version ) {
2913 storage->set( KeyValueRef(persistVersion, BinaryWriter::toValue(version, Unversioned())) );
2914
2915 //TraceEvent("MakeDurable", data->thisServerID).detail("FromVersion", prevStorageVersion).detail("ToVersion", version);
2916 }
2917
changeLogProtocol(Version version,uint64_t protocol)2918 void StorageServerDisk::changeLogProtocol(Version version, uint64_t protocol) {
2919 data->addMutationToMutationLogOrStorage(version, MutationRef(MutationRef::SetValue, persistLogProtocol, BinaryWriter::toValue(protocol, Unversioned())));
2920 }
2921
applyByteSampleResult(StorageServer * data,IKeyValueStore * storage,Key begin,Key end,std::vector<Standalone<VectorRef<KeyValueRef>>> * results=NULL)2922 ACTOR Future<Void> applyByteSampleResult( StorageServer* data, IKeyValueStore* storage, Key begin, Key end, std::vector<Standalone<VectorRef<KeyValueRef>>>* results = NULL) {
2923 state int totalFetches = 0;
2924 state int totalKeys = 0;
2925 state int totalBytes = 0;
2926 loop {
2927 Standalone<VectorRef<KeyValueRef>> bs = wait( storage->readRange( KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES ) );
2928 if(results) results->push_back(bs);
2929 int rangeSize = bs.expectedSize();
2930 totalFetches++;
2931 totalKeys += bs.size();
2932 totalBytes += rangeSize;
2933 for( int j = 0; j < bs.size(); j++ ) {
2934 KeyRef key = bs[j].key.removePrefix(persistByteSampleKeys.begin);
2935 if(!data->byteSampleClears.rangeContaining(key).value()) {
2936 data->metrics.byteSample.sample.insert( key, BinaryReader::fromStringRef<int32_t>(bs[j].value, Unversioned()), false );
2937 }
2938 }
2939 if( rangeSize >= SERVER_KNOBS->STORAGE_LIMIT_BYTES ) {
2940 Key nextBegin = keyAfter(bs.back().key);
2941 data->byteSampleClears.insert(KeyRangeRef(begin, nextBegin).removePrefix(persistByteSampleKeys.begin), true);
2942 data->byteSampleClearsTooLarge.set(data->byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
2943 begin = nextBegin;
2944 if(begin == end) {
2945 break;
2946 }
2947 } else {
2948 data->byteSampleClears.insert(KeyRangeRef(begin.removePrefix(persistByteSampleKeys.begin), end == persistByteSampleKeys.end ? LiteralStringRef("\xff\xff\xff") : end.removePrefix(persistByteSampleKeys.begin)), true);
2949 data->byteSampleClearsTooLarge.set(data->byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
2950 break;
2951 }
2952
2953 if(!results) {
2954 wait(delay(SERVER_KNOBS->BYTE_SAMPLE_LOAD_DELAY));
2955 }
2956 }
2957 TraceEvent("RecoveredByteSampleRange", data->thisServerID).detail("Begin", begin).detail("End", end).detail("Fetches", totalFetches).detail("Keys", totalKeys).detail("ReadBytes", totalBytes);
2958 return Void();
2959 }
2960
restoreByteSample(StorageServer * data,IKeyValueStore * storage,Promise<Void> byteSampleSampleRecovered)2961 ACTOR Future<Void> restoreByteSample(StorageServer* data, IKeyValueStore* storage, Promise<Void> byteSampleSampleRecovered) {
2962 state std::vector<Standalone<VectorRef<KeyValueRef>>> byteSampleSample;
2963 wait( applyByteSampleResult(data, storage, persistByteSampleSampleKeys.begin, persistByteSampleSampleKeys.end, &byteSampleSample) );
2964 byteSampleSampleRecovered.send(Void());
2965 wait( delay( BUGGIFY ? g_random->random01() * 2.0 : 0.0001 ) );
2966
2967 size_t bytes_per_fetch = 0;
2968 // Since the expected size also includes (as of now) the space overhead of the container, we calculate our own number here
2969 for( auto& it : byteSampleSample ) {
2970 for( auto& kv : it ) {
2971 bytes_per_fetch += BinaryReader::fromStringRef<int32_t>(kv.value, Unversioned());
2972 }
2973 }
2974 bytes_per_fetch = (bytes_per_fetch/SERVER_KNOBS->BYTE_SAMPLE_LOAD_PARALLELISM) + 1;
2975
2976 state std::vector<Future<Void>> sampleRanges;
2977 int accumulatedSize = 0;
2978 Key lastStart = persistByteSampleKeys.begin; // make sure the first range starts at the absolute beginning of the byte sample
2979 for( auto& it : byteSampleSample ) {
2980 for( auto& kv : it ) {
2981 if( accumulatedSize >= bytes_per_fetch ) {
2982 accumulatedSize = 0;
2983 Key realKey = kv.key.removePrefix( persistByteSampleKeys.begin );
2984 sampleRanges.push_back( applyByteSampleResult(data, storage, lastStart, realKey) );
2985 lastStart = realKey;
2986 }
2987 accumulatedSize += BinaryReader::fromStringRef<int32_t>(kv.value, Unversioned());
2988 }
2989 }
2990 // make sure that the last range goes all the way to the end of the byte sample
2991 sampleRanges.push_back( applyByteSampleResult(data, storage, lastStart, persistByteSampleKeys.end) );
2992
2993 wait( waitForAll( sampleRanges ) );
2994 TraceEvent("RecoveredByteSampleChunkedRead", data->thisServerID).detail("Ranges",sampleRanges.size());
2995
2996 if( BUGGIFY )
2997 wait( delay( g_random->random01() * 10.0 ) );
2998
2999 return Void();
3000 }
3001
restoreDurableState(StorageServer * data,IKeyValueStore * storage)3002 ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* storage ) {
3003 state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
3004 state Future<Optional<Value>> fID = storage->readValue(persistID);
3005 state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
3006 state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
3007 state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
3008 state Future<Standalone<VectorRef<KeyValueRef>>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
3009 state Future<Standalone<VectorRef<KeyValueRef>>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
3010
3011 state Promise<Void> byteSampleSampleRecovered;
3012 data->byteSampleRecovery = restoreByteSample(data, storage, byteSampleSampleRecovered);
3013
3014 TraceEvent("ReadingDurableState", data->thisServerID);
3015 wait( waitForAll( (vector<Future<Optional<Value>>>(), fFormat, fID, fVersion, fLogProtocol, fPrimaryLocality) ) );
3016 wait( waitForAll( (vector<Future<Standalone<VectorRef<KeyValueRef>>>>(), fShardAssigned, fShardAvailable) ) );
3017 wait( byteSampleSampleRecovered.getFuture() );
3018 TraceEvent("RestoringDurableState", data->thisServerID);
3019
3020 if (!fFormat.get().present()) {
3021 // The DB was never initialized
3022 TraceEvent("DBNeverInitialized", data->thisServerID);
3023 storage->dispose();
3024 data->thisServerID = UID();
3025 data->sk = Key();
3026 return false;
3027 }
3028 if (!persistFormatReadableRange.contains( fFormat.get().get() )) {
3029 TraceEvent(SevError, "UnsupportedDBFormat").detail("Format", fFormat.get().get().toString()).detail("Expected", persistFormat.value.toString());
3030 throw worker_recovery_failed();
3031 }
3032 data->thisServerID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned());
3033 data->sk = serverKeysPrefixFor( data->thisServerID ).withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
3034
3035 if (fLogProtocol.get().present())
3036 data->logProtocol = BinaryReader::fromStringRef<uint64_t>(fLogProtocol.get().get(), Unversioned());
3037
3038 if (fPrimaryLocality.get().present())
3039 data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned());
3040
3041 state Version version = BinaryReader::fromStringRef<Version>( fVersion.get().get(), Unversioned() );
3042 debug_checkRestoredVersion( data->thisServerID, version, "StorageServer" );
3043 data->setInitialVersion( version );
3044
3045 state Standalone<VectorRef<KeyValueRef>> available = fShardAvailable.get();
3046 state int availableLoc;
3047 for(availableLoc=0; availableLoc<available.size(); availableLoc++) {
3048 KeyRangeRef keys(
3049 available[availableLoc].key.removePrefix(persistShardAvailableKeys.begin),
3050 availableLoc+1==available.size() ? allKeys.end : available[availableLoc+1].key.removePrefix(persistShardAvailableKeys.begin));
3051 ASSERT( !keys.empty() );
3052 bool nowAvailable = available[availableLoc].value!=LiteralStringRef("0");
3053 /*if(nowAvailable)
3054 TraceEvent("AvailableShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/
3055 data->newestAvailableVersion.insert( keys, nowAvailable ? latestVersion : invalidVersion );
3056 wait(yield());
3057 }
3058
3059 state Standalone<VectorRef<KeyValueRef>> assigned = fShardAssigned.get();
3060 state int assignedLoc;
3061 for(assignedLoc=0; assignedLoc<assigned.size(); assignedLoc++) {
3062 KeyRangeRef keys(
3063 assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin),
3064 assignedLoc+1==assigned.size() ? allKeys.end : assigned[assignedLoc+1].key.removePrefix(persistShardAssignedKeys.begin));
3065 ASSERT( !keys.empty() );
3066 bool nowAssigned = assigned[assignedLoc].value!=LiteralStringRef("0");
3067 /*if(nowAssigned)
3068 TraceEvent("AssignedShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/
3069 changeServerKeys(data, keys, nowAssigned, version, CSK_RESTORE);
3070
3071 if (!nowAssigned) ASSERT( data->newestAvailableVersion.allEqual(keys, invalidVersion) );
3072 wait(yield());
3073 }
3074
3075 wait( delay( 0.0001 ) );
3076
3077 {
3078 // Erase data which isn't available (it is from some fetch at a later version)
3079 // SOMEDAY: Keep track of keys that might be fetching, make sure we don't have any data elsewhere?
3080 for(auto it = data->newestAvailableVersion.ranges().begin(); it != data->newestAvailableVersion.ranges().end(); ++it) {
3081 if (it->value() == invalidVersion) {
3082 KeyRangeRef clearRange(it->begin(), it->end());
3083 debugKeyRange("clearInvalidVersion", invalidVersion, clearRange);
3084 storage->clear( clearRange );
3085 data->byteSampleApplyClear( clearRange, invalidVersion );
3086 }
3087 }
3088 }
3089
3090 validate(data, true);
3091
3092 return true;
3093 }
3094
restoreDurableState()3095 Future<bool> StorageServerDisk::restoreDurableState() {
3096 return ::restoreDurableState(data, storage);
3097 }
3098
3099 //Determines whether a key-value pair should be included in a byte sample
3100 //Also returns size information about the sample
isKeyValueInSample(KeyValueRef keyValue)3101 ByteSampleInfo isKeyValueInSample(KeyValueRef keyValue) {
3102 ByteSampleInfo info;
3103
3104 const KeyRef key = keyValue.key;
3105 info.size = key.size() + keyValue.value.size();
3106
3107 uint32_t a = 0;
3108 uint32_t b = 0;
3109 hashlittle2( key.begin(), key.size(), &a, &b );
3110
3111 double probability = (double)info.size / (key.size() + SERVER_KNOBS->BYTE_SAMPLING_OVERHEAD) / SERVER_KNOBS->BYTE_SAMPLING_FACTOR;
3112 info.inSample = a / ((1 << 30) * 4.0) < probability;
3113 info.sampledSize = info.size / std::min(1.0, probability);
3114
3115 return info;
3116 }
3117
addMutationToMutationLogOrStorage(Version ver,MutationRef m)3118 void StorageServer::addMutationToMutationLogOrStorage( Version ver, MutationRef m ) {
3119 if (ver != invalidVersion) {
3120 addMutationToMutationLog( addVersionToMutationLog(ver), m );
3121 } else {
3122 storage.writeMutation( m );
3123 byteSampleApplyMutation( m, ver );
3124 }
3125 }
3126
byteSampleApplySet(KeyValueRef kv,Version ver)3127 void StorageServer::byteSampleApplySet( KeyValueRef kv, Version ver ) {
3128 // Update byteSample in memory and (eventually) on disk and notify waiting metrics
3129
3130 ByteSampleInfo sampleInfo = isKeyValueInSample(kv);
3131 auto& byteSample = metrics.byteSample.sample;
3132
3133 int64_t delta = 0;
3134 const KeyRef key = kv.key;
3135
3136 auto old = byteSample.find(key);
3137 if (old != byteSample.end()) delta = -byteSample.getMetric(old);
3138 if (sampleInfo.inSample) {
3139 delta += sampleInfo.sampledSize;
3140 byteSample.insert( key, sampleInfo.sampledSize );
3141 addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::SetValue, key.withPrefix(persistByteSampleKeys.begin), BinaryWriter::toValue( sampleInfo.sampledSize, Unversioned() )) );
3142 } else {
3143 bool any = old != byteSample.end();
3144 if(!byteSampleRecovery.isReady() ) {
3145 if(!byteSampleClears.rangeContaining(key).value()) {
3146 byteSampleClears.insert(key, true);
3147 byteSampleClearsTooLarge.set(byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
3148 any = true;
3149 }
3150 }
3151 if (any) {
3152 byteSample.erase(old);
3153 auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin));
3154 addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end) );
3155 }
3156 }
3157
3158 if (delta) metrics.notifyBytes( key, delta );
3159 }
3160
byteSampleApplyClear(KeyRangeRef range,Version ver)3161 void StorageServer::byteSampleApplyClear( KeyRangeRef range, Version ver ) {
3162 // Update byteSample in memory and (eventually) on disk via the mutationLog and notify waiting metrics
3163
3164 auto& byteSample = metrics.byteSample.sample;
3165 bool any = false;
3166
3167 if(range.begin < allKeys.end) {
3168 //NotifyBytes should not be called for keys past allKeys.end
3169 KeyRangeRef searchRange = KeyRangeRef(range.begin, std::min(range.end, allKeys.end));
3170 auto r = metrics.waitMetricsMap.intersectingRanges(searchRange);
3171 for(auto shard = r.begin(); shard != r.end(); ++shard) {
3172 KeyRangeRef intersectingRange = shard.range() & range;
3173 int64_t bytes = byteSample.sumRange(intersectingRange.begin, intersectingRange.end);
3174 metrics.notifyBytes(shard, -bytes);
3175 any = any || bytes > 0;
3176 }
3177 }
3178
3179 if(range.end > allKeys.end && byteSample.sumRange(std::max(allKeys.end, range.begin), range.end) > 0)
3180 any = true;
3181
3182 if(!byteSampleRecovery.isReady()) {
3183 auto clearRanges = byteSampleClears.intersectingRanges(range);
3184 for(auto it : clearRanges) {
3185 if(!it.value()) {
3186 byteSampleClears.insert(range, true);
3187 byteSampleClearsTooLarge.set(byteSampleClears.size() > SERVER_KNOBS->MAX_BYTE_SAMPLE_CLEAR_MAP_SIZE);
3188 any = true;
3189 break;
3190 }
3191 }
3192 }
3193
3194 if (any) {
3195 byteSample.eraseAsync( range.begin, range.end );
3196 auto diskRange = range.withPrefix( persistByteSampleKeys.begin );
3197 addMutationToMutationLogOrStorage( ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end) );
3198 }
3199 }
3200
waitMetrics(StorageServerMetrics * self,WaitMetricsRequest req,Future<Void> timeout)3201 ACTOR Future<Void> waitMetrics( StorageServerMetrics* self, WaitMetricsRequest req, Future<Void> timeout ) {
3202 state PromiseStream< StorageMetrics > change;
3203 state StorageMetrics metrics = self->getMetrics( req.keys );
3204 state Error error = success();
3205 state bool timedout = false;
3206
3207 if ( !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
3208 TEST( true ); // ShardWaitMetrics return case 1 (quickly)
3209 req.reply.send( metrics );
3210 return Void();
3211 }
3212
3213 {
3214 auto rs = self->waitMetricsMap.modify( req.keys );
3215 for(auto r = rs.begin(); r != rs.end(); ++r)
3216 r->value().push_back( change );
3217 loop {
3218 try {
3219 choose {
3220 when( StorageMetrics c = waitNext( change.getFuture() ) ) {
3221 metrics += c;
3222
3223 // SOMEDAY: validation! The changes here are possibly partial changes (we recieve multiple messages per
3224 // update to our requested range). This means that the validation would have to occur after all
3225 // the messages for one clear or set have been dispatched.
3226
3227 /*StorageMetrics m = getMetrics( data, req.keys );
3228 bool b = ( m.bytes != metrics.bytes || m.bytesPerKSecond != metrics.bytesPerKSecond || m.iosPerKSecond != metrics.iosPerKSecond );
3229 if (b) {
3230 printf("keys: '%s' - '%s' @%p\n", printable(req.keys.begin).c_str(), printable(req.keys.end).c_str(), this);
3231 printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n", b, m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond, metrics.iosPerKSecond, c.bytes, c.bytesPerKSecond, c.iosPerKSecond);
3232
3233 }*/
3234 }
3235 when( wait( timeout ) ) {
3236 timedout = true;
3237 }
3238 }
3239 } catch (Error& e) {
3240 if( e.code() == error_code_actor_cancelled ) throw; // This is only cancelled when the main loop had exited...no need in this case to clean up self
3241 error = e;
3242 break;
3243 }
3244
3245 if ( timedout || !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
3246 TEST( !timedout ); // ShardWaitMetrics return case 2 (delayed)
3247 TEST( timedout ); // ShardWaitMetrics return on timeout
3248 req.reply.send( metrics );
3249 break;
3250 }
3251 }
3252
3253 wait( delay(0) ); //prevent iterator invalidation of functions sending changes
3254 }
3255
3256 auto rs = self->waitMetricsMap.modify( req.keys );
3257 for(auto i = rs.begin(); i != rs.end(); ++i) {
3258 auto &x = i->value();
3259 for( int j = 0; j < x.size(); j++ ) {
3260 if( x[j] == change ) {
3261 swapAndPop(&x, j);
3262 break;
3263 }
3264 }
3265 }
3266 self->waitMetricsMap.coalesce( req.keys );
3267
3268 if (error.code() != error_code_success ) {
3269 if (error.code() != error_code_wrong_shard_server) throw error;
3270 TEST( true ); // ShardWaitMetrics delayed wrong_shard_server()
3271 req.reply.sendError(error);
3272 }
3273
3274 return Void();
3275 }
3276
waitMetrics(WaitMetricsRequest req,Future<Void> delay)3277 Future<Void> StorageServerMetrics::waitMetrics(WaitMetricsRequest req, Future<Void> delay) {
3278 return ::waitMetrics(this, req, delay);
3279 }
3280
3281 #pragma endregion
3282
3283 /////////////////////////////// Core //////////////////////////////////////
3284 #pragma region Core
3285
metricsCore(StorageServer * self,StorageServerInterface ssi)3286 ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi ) {
3287 state Future<Void> doPollMetrics = Void();
3288 state ActorCollection actors(false);
3289
3290 wait( self->byteSampleRecovery );
3291
3292 actors.add(traceCounters("StorageMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self->counters.cc, self->thisServerID.toString() + "/StorageMetrics"));
3293
3294 loop {
3295 choose {
3296 when (WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
3297 if (!self->isReadable( req.keys )) {
3298 TEST( true ); // waitMetrics immediate wrong_shard_server()
3299 req.reply.sendError(wrong_shard_server());
3300 } else {
3301 actors.add( self->metrics.waitMetrics( req, delayJittered( SERVER_KNOBS->STORAGE_METRIC_TIMEOUT ) ) );
3302 }
3303 }
3304 when (SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) {
3305 if (!self->isReadable( req.keys )) {
3306 TEST( true ); // splitMetrics immediate wrong_shard_server()
3307 req.reply.sendError(wrong_shard_server());
3308 } else {
3309 self->metrics.splitMetrics( req );
3310 }
3311 }
3312 when (GetPhysicalMetricsRequest req = waitNext(ssi.getPhysicalMetrics.getFuture())) {
3313 StorageBytes sb = self->storage.getStorageBytes();
3314 self->metrics.getPhysicalMetrics( req, sb );
3315 }
3316 when (wait(doPollMetrics) ) {
3317 self->metrics.poll();
3318 doPollMetrics = delay(SERVER_KNOBS->STORAGE_SERVER_POLL_METRICS_DELAY);
3319 }
3320 when(wait(actors.getResult())) {}
3321 }
3322 }
3323 }
3324
logLongByteSampleRecovery(Future<Void> recovery)3325 ACTOR Future<Void> logLongByteSampleRecovery(Future<Void> recovery) {
3326 choose {
3327 when(wait(recovery)) {}
3328 when(wait(delay(SERVER_KNOBS->LONG_BYTE_SAMPLE_RECOVERY_DELAY))) {
3329 TraceEvent(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LongByteSampleRecovery");
3330 }
3331 }
3332
3333 return Void();
3334 }
3335
storageServerCore(StorageServer * self,StorageServerInterface ssi)3336 ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterface ssi )
3337 {
3338 state Future<Void> doUpdate = Void();
3339 state bool updateReceived = false; // true iff the current update() actor assigned to doUpdate has already received an update from the tlog
3340 state ActorCollection actors(false);
3341 state double lastLoopTopTime = now();
3342 state Future<Void> dbInfoChange = Void();
3343 state Future<Void> checkLastUpdate = Void();
3344 state double updateProcessStatsDelay = SERVER_KNOBS->UPDATE_STORAGE_PROCESS_STATS_INTERVAL;
3345 state Future<Void> updateProcessStatsTimer = delay(updateProcessStatsDelay);
3346
3347 actors.add(updateStorage(self));
3348 actors.add(waitFailureServer(ssi.waitFailure.getFuture()));
3349 actors.add(self->otherError.getFuture());
3350 actors.add(metricsCore(self, ssi));
3351 actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
3352
3353 self->coreStarted.send( Void() );
3354
3355 loop {
3356 ++self->counters.loops;
3357
3358 double loopTopTime = now();
3359 double elapsedTime = loopTopTime - lastLoopTopTime;
3360 if( elapsedTime > 0.050 ) {
3361 if (g_random->random01() < 0.01)
3362 TraceEvent(SevWarn, "SlowSSLoopx100", self->thisServerID).detail("Elapsed", elapsedTime);
3363 }
3364 lastLoopTopTime = loopTopTime;
3365
3366 choose {
3367 when( wait( checkLastUpdate ) ) {
3368 if(now() - self->lastUpdate >= CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION) {
3369 self->noRecentUpdates.set(true);
3370 checkLastUpdate = delay(CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION);
3371 } else {
3372 checkLastUpdate = delay( std::max(CLIENT_KNOBS->NO_RECENT_UPDATES_DURATION-(now()-self->lastUpdate), 0.1) );
3373 }
3374 }
3375 when( wait( dbInfoChange ) ) {
3376 TEST( self->logSystem ); // shardServer dbInfo changed
3377 dbInfoChange = self->db->onChange();
3378 if( self->db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS ) {
3379 self->logSystem = ILogSystem::fromServerDBInfo( self->thisServerID, self->db->get() );
3380 if (self->logSystem) {
3381 if(self->db->get().logSystemConfig.recoveredAt.present()) {
3382 self->poppedAllAfter = self->db->get().logSystemConfig.recoveredAt.get();
3383 }
3384 self->logCursor = self->logSystem->peekSingle( self->thisServerID, self->version.get() + 1, self->tag, self->history );
3385 self->popVersion( self->durableVersion.get() + 1, true );
3386 }
3387 // If update() is waiting for results from the tlog, it might never get them, so needs to be cancelled. But if it is waiting later,
3388 // cancelling it could cause problems (e.g. fetchKeys that already committed to transitioning to waiting state)
3389 if (!updateReceived) {
3390 doUpdate = Void();
3391 }
3392 }
3393
3394 Optional<LatencyBandConfig> newLatencyBandConfig = self->db->get().latencyBandConfig;
3395 if(newLatencyBandConfig.present() != self->latencyBandConfig.present()
3396 || (newLatencyBandConfig.present() && newLatencyBandConfig.get().readConfig != self->latencyBandConfig.get().readConfig))
3397 {
3398 self->latencyBandConfig = newLatencyBandConfig;
3399 self->counters.readLatencyBands.clearBands();
3400 TraceEvent("LatencyBandReadUpdatingConfig").detail("Present", newLatencyBandConfig.present());
3401 if(self->latencyBandConfig.present()) {
3402 for(auto band : self->latencyBandConfig.get().readConfig.bands) {
3403 self->counters.readLatencyBands.addThreshold(band);
3404 }
3405 }
3406 }
3407 }
3408 when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
3409 // Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
3410 if( req.debugID.present() )
3411 g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.recieved"); //.detail("TaskID", g_network->getCurrentTask());
3412
3413 if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
3414 req.reply.send(GetValueReply());
3415 else
3416 actors.add( getValueQ( self, req ) );
3417 }
3418 when( WatchValueRequest req = waitNext(ssi.watchValue.getFuture()) ) {
3419 // TODO: fast load balancing?
3420 // SOMEDAY: combine watches for the same key/value into a single watch
3421 actors.add( watchValueQ( self, req ) );
3422 }
3423 when (GetKeyRequest req = waitNext(ssi.getKey.getFuture())) {
3424 // Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
3425 actors.add( getKey( self, req ) );
3426 }
3427 when (GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture()) ) {
3428 // Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
3429 actors.add( getKeyValues( self, req ) );
3430 }
3431 when (GetShardStateRequest req = waitNext(ssi.getShardState.getFuture()) ) {
3432 if (req.mode == GetShardStateRequest::NO_WAIT ) {
3433 if( self->isReadable( req.keys ) )
3434 req.reply.send(std::make_pair(self->version.get(),self->durableVersion.get()));
3435 else
3436 req.reply.sendError(wrong_shard_server());
3437 } else {
3438 actors.add( getShardStateQ( self, req ) );
3439 }
3440 }
3441 when (StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) {
3442 getQueuingMetrics(self, req);
3443 }
3444 when( ReplyPromise<Version> reply = waitNext(ssi.getVersion.getFuture()) ) {
3445 reply.send( self->version.get() );
3446 }
3447 when( ReplyPromise<KeyValueStoreType> reply = waitNext(ssi.getKeyValueStoreType.getFuture()) ) {
3448 reply.send( self->storage.getKeyValueStoreType() );
3449 }
3450 when( wait(doUpdate) ) {
3451 updateReceived = false;
3452 if (!self->logSystem)
3453 doUpdate = Never();
3454 else
3455 doUpdate = update( self, &updateReceived );
3456 }
3457 when(wait(updateProcessStatsTimer)) {
3458 updateProcessStats(self);
3459 updateProcessStatsTimer = delay(updateProcessStatsDelay);
3460 }
3461 when(wait(actors.getResult())) {}
3462 }
3463 }
3464 }
3465
storageServerTerminated(StorageServer & self,IKeyValueStore * persistentData,Error const & e)3466 bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData, Error const& e) {
3467 self.shuttingDown = true;
3468
3469 // Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with self still valid
3470 self.shards.insert( allKeys, Reference<ShardInfo>() );
3471
3472 // Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just close it.
3473 if (e.code() == error_code_please_reboot) {
3474 // do nothing.
3475 } else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
3476 persistentData->dispose();
3477 } else {
3478 persistentData->close();
3479 }
3480
3481 if ( e.code() == error_code_worker_removed ||
3482 e.code() == error_code_recruitment_failed ||
3483 e.code() == error_code_file_not_found ||
3484 e.code() == error_code_actor_cancelled )
3485 {
3486 TraceEvent("StorageServerTerminated", self.thisServerID).error(e, true);
3487 return true;
3488 } else
3489 return false;
3490 }
3491
storageServer(IKeyValueStore * persistentData,StorageServerInterface ssi,Tag seedTag,ReplyPromise<InitializeStorageReply> recruitReply,Reference<AsyncVar<ServerDBInfo>> db,std::string folder)3492 ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<InitializeStorageReply> recruitReply,
3493 Reference<AsyncVar<ServerDBInfo>> db, std::string folder )
3494 {
3495 state StorageServer self(persistentData, db, ssi);
3496
3497 self.sk = serverKeysPrefixFor( self.thisServerID ).withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
3498 self.folder = folder;
3499
3500 try {
3501 wait( self.storage.init() );
3502 wait( self.storage.commit() );
3503
3504 if (seedTag == invalidTag) {
3505 std::pair<Version, Tag> verAndTag = wait( addStorageServer(self.cx, ssi) ); // Might throw recruitment_failed in case of simultaneous master failure
3506 self.tag = verAndTag.second;
3507 self.setInitialVersion( verAndTag.first-1 );
3508 } else {
3509 self.tag = seedTag;
3510 }
3511
3512 self.storage.makeNewStorageServerDurable();
3513 wait( self.storage.commit() );
3514
3515 TraceEvent("StorageServerInit", ssi.id()).detail("Version", self.version.get()).detail("SeedTag", seedTag.toString());
3516 InitializeStorageReply rep;
3517 rep.interf = ssi;
3518 rep.addedVersion = self.version.get();
3519 recruitReply.send(rep);
3520 self.byteSampleRecovery = Void();
3521 wait( storageServerCore(&self, ssi) );
3522
3523 throw internal_error();
3524 } catch (Error& e) {
3525 // If we die with an error before replying to the recruitment request, send the error to the recruiter (ClusterController, and from there to the DataDistributionTeamCollection)
3526 if (!recruitReply.isSet())
3527 recruitReply.sendError( recruitment_failed() );
3528 if (storageServerTerminated(self, persistentData, e))
3529 return Void();
3530 throw e;
3531 }
3532 }
3533
replaceInterface(StorageServer * self,StorageServerInterface ssi)3534 ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface ssi )
3535 {
3536 state Transaction tr(self->cx);
3537
3538 loop {
3539 state Future<Void> infoChanged = self->db->onChange();
3540 state Reference<ProxyInfo> proxies( new ProxyInfo(self->db->get().client.proxies, self->db->get().myLocality) );
3541 choose {
3542 when( GetStorageServerRejoinInfoReply _rep = wait( proxies->size() ? loadBalance( proxies, &MasterProxyInterface::getStorageServerRejoinInfo, GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId()) ) : Never() ) ) {
3543 state GetStorageServerRejoinInfoReply rep = _rep;
3544 try {
3545 tr.reset();
3546 tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
3547 tr.setVersion( rep.version );
3548
3549 tr.addReadConflictRange(singleKeyRange(serverListKeyFor(ssi.id())));
3550 tr.addReadConflictRange(singleKeyRange(serverTagKeyFor(ssi.id())));
3551 tr.addReadConflictRange(serverTagHistoryRangeFor(ssi.id()));
3552 tr.addReadConflictRange(singleKeyRange(tagLocalityListKeyFor(ssi.locality.dcId())));
3553
3554 tr.set(serverListKeyFor(ssi.id()), serverListValue(ssi));
3555
3556 if(rep.newLocality) {
3557 tr.addReadConflictRange(tagLocalityListKeys);
3558 tr.set( tagLocalityListKeyFor(ssi.locality.dcId()), tagLocalityListValue(rep.newTag.get().locality) );
3559 }
3560
3561 if(rep.newTag.present()) {
3562 KeyRange conflictRange = singleKeyRange(serverTagConflictKeyFor(rep.newTag.get()));
3563 tr.addReadConflictRange( conflictRange );
3564 tr.addWriteConflictRange( conflictRange );
3565 tr.setOption(FDBTransactionOptions::FIRST_IN_BATCH);
3566 tr.set( serverTagKeyFor(ssi.id()), serverTagValue(rep.newTag.get()) );
3567 tr.atomicOp( serverTagHistoryKeyFor(ssi.id()), serverTagValue(rep.tag), MutationRef::SetVersionstampedKey );
3568 }
3569
3570 if(rep.history.size() && rep.history.back().first < self->version.get()) {
3571 tr.clear(serverTagHistoryRangeBefore(ssi.id(), self->version.get()));
3572 }
3573
3574 choose {
3575 when ( wait( tr.commit() ) ) {
3576 self->history = rep.history;
3577
3578 if(rep.newTag.present()) {
3579 self->tag = rep.newTag.get();
3580 self->history.insert(self->history.begin(), std::make_pair(tr.getCommittedVersion(), rep.tag));
3581 } else {
3582 self->tag = rep.tag;
3583 }
3584 self->allHistory = self->history;
3585
3586 TraceEvent("SSTag", self->thisServerID).detail("MyTag", self->tag.toString());
3587 for(auto it : self->history) {
3588 TraceEvent("SSHistory", self->thisServerID).detail("Ver", it.first).detail("Tag", it.second.toString());
3589 }
3590
3591 if(self->history.size() && BUGGIFY) {
3592 TraceEvent("SSHistoryReboot", self->thisServerID);
3593 throw please_reboot();
3594 }
3595
3596 break;
3597 }
3598 when ( wait(infoChanged) ) {}
3599 }
3600 } catch (Error& e) {
3601 wait( tr.onError(e) );
3602 }
3603 }
3604 when ( wait(infoChanged) ) {}
3605 }
3606 }
3607
3608 return Void();
3609 }
3610
storageServer(IKeyValueStore * persistentData,StorageServerInterface ssi,Reference<AsyncVar<ServerDBInfo>> db,std::string folder,Promise<Void> recovered)3611 ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, Promise<Void> recovered )
3612 {
3613 state StorageServer self(persistentData, db, ssi);
3614 self.folder = folder;
3615 self.sk = serverKeysPrefixFor( self.thisServerID ).withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
3616 try {
3617 state double start = now();
3618 TraceEvent("StorageServerRebootStart", self.thisServerID);
3619 wait(self.storage.init());
3620 wait(self.storage.commit()); //after a rollback there might be uncommitted changes.
3621 bool ok = wait( self.storage.restoreDurableState() );
3622 if (!ok) {
3623 if(recovered.canBeSet()) recovered.send(Void());
3624 return Void();
3625 }
3626 TraceEvent("SSTimeRestoreDurableState", self.thisServerID).detail("TimeTaken", now() - start);
3627
3628 ASSERT( self.thisServerID == ssi.id() );
3629 TraceEvent("StorageServerReboot", self.thisServerID)
3630 .detail("Version", self.version.get());
3631
3632 if(recovered.canBeSet()) recovered.send(Void());
3633
3634 wait( replaceInterface( &self, ssi ) );
3635
3636 TraceEvent("StorageServerStartingCore", self.thisServerID).detail("TimeTaken", now() - start);
3637
3638 //wait( delay(0) ); // To make sure self->zkMasterInfo.onChanged is available to wait on
3639 wait( storageServerCore(&self, ssi) );
3640
3641 throw internal_error();
3642 } catch (Error& e) {
3643 if(recovered.canBeSet()) recovered.send(Void());
3644 if (storageServerTerminated(self, persistentData, e))
3645 return Void();
3646 throw e;
3647 }
3648 }
3649
3650 #pragma endregion
3651
3652 /*
3653 4 Reference count
3654 4 priority
3655 24 pointers
3656 8 lastUpdateVersion
3657 2 updated, replacedPointer
3658 --
3659 42 PTree overhead
3660
3661 8 Version insertVersion
3662 --
3663 50 VersionedMap overhead
3664
3665 12 KeyRef
3666 12 ValueRef
3667 1 isClear
3668 --
3669 25 payload
3670
3671
3672 50 overhead
3673 25 payload
3674 21 structure padding
3675 32 allocator rounds up
3676 ---
3677 128 allocated
3678
3679 To reach 64, need to save: 11 bytes + all padding
3680
3681 Possibilities:
3682 -8 Combine lastUpdateVersion, insertVersion?
3683 -2 Fold together updated, replacedPointer, isClear bits
3684 -3 Fold away updated, replacedPointer, isClear
3685 -8 Move value lengths into arena
3686 -4 Replace priority with H(pointer)
3687 -12 Compress pointers (using special allocator)
3688 -4 Modular lastUpdateVersion (make sure no node survives 4 billion updates)
3689 */
3690
versionedMapTest()3691 void versionedMapTest() {
3692 VersionedMap<int,int> vm;
3693
3694 printf("SS Ptree node is %zu bytes\n", sizeof( StorageServer::VersionedData::PTreeT ) );
3695
3696 const int NSIZE = sizeof(VersionedMap<int,int>::PTreeT);
3697 const int ASIZE = NSIZE<=64 ? 64 : NextPowerOfTwo<NSIZE>::Result;
3698
3699 auto before = FastAllocator< ASIZE >::getTotalMemory();
3700
3701 for(int v=1; v<=1000; ++v) {
3702 vm.createNewVersion(v);
3703 for(int i=0; i<1000; i++) {
3704 int k = g_random->randomInt(0, 2000000);
3705 /*for(int k2=k-5; k2<k+5; k2++)
3706 if (vm.atLatest().find(k2) != vm.atLatest().end())
3707 vm.erase(k2);*/
3708 vm.erase( k-5, k+5 );
3709 vm.insert( k, v );
3710 }
3711 }
3712
3713 auto after = FastAllocator< ASIZE >::getTotalMemory();
3714
3715 int count = 0;
3716 for(auto i = vm.atLatest().begin(); i != vm.atLatest().end(); ++i)
3717 ++count;
3718
3719 printf("PTree node is %d bytes, allocated as %d bytes\n", NSIZE, ASIZE);
3720 printf("%d distinct after %d insertions\n", count, 1000*1000);
3721 printf("Memory used: %f MB\n",
3722 (after - before)/ 1e6);
3723 }
3724