1 /*
2 * Coordination.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbserver/CoordinationInterface.h"
22 #include "fdbserver/IKeyValueStore.h"
23 #include "flow/ActorCollection.h"
24 #include "fdbserver/Knobs.h"
25 #include "flow/UnitTest.h"
26 #include "flow/IndexedSet.h"
27 #include "flow/actorcompiler.h" // This must be the last #include.
28
29 // This module implements coordinationServer() and the interfaces in CoordinationInterface.h
30
31 struct GenerationRegVal {
32 UniqueGeneration readGen, writeGen;
33 Optional<Value> val;
34 template <class Ar>
serializeGenerationRegVal35 void serialize(Ar& ar) {
36 serializer(ar, readGen, writeGen, val);
37 }
38 };
39
40 // UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 ); // from fdbclient/MonitorLeader.actor.cpp
41 UID WLTOKEN_LEADERELECTIONREG_CANDIDACY( -1, 3 );
42 UID WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT( -1, 4 );
43 UID WLTOKEN_LEADERELECTIONREG_FORWARD( -1, 5 );
44 UID WLTOKEN_GENERATIONREG_READ( -1, 6 );
45 UID WLTOKEN_GENERATIONREG_WRITE( -1, 7 );
46
GenerationRegInterface(NetworkAddress remote)47 GenerationRegInterface::GenerationRegInterface( NetworkAddress remote )
48 : read( Endpoint({remote}, WLTOKEN_GENERATIONREG_READ) ),
49 write( Endpoint({remote}, WLTOKEN_GENERATIONREG_WRITE) )
50 {
51 }
52
GenerationRegInterface(INetwork * local)53 GenerationRegInterface::GenerationRegInterface( INetwork* local )
54 {
55 read.makeWellKnownEndpoint( WLTOKEN_GENERATIONREG_READ, TaskCoordination );
56 write.makeWellKnownEndpoint( WLTOKEN_GENERATIONREG_WRITE, TaskCoordination );
57 }
58
LeaderElectionRegInterface(NetworkAddress remote)59 LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress remote)
60 : ClientLeaderRegInterface(remote),
61 candidacy( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_CANDIDACY) ),
62 leaderHeartbeat( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT) ),
63 forward( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_FORWARD) )
64 {
65 }
66
LeaderElectionRegInterface(INetwork * local)67 LeaderElectionRegInterface::LeaderElectionRegInterface(INetwork* local)
68 : ClientLeaderRegInterface(local)
69 {
70 candidacy.makeWellKnownEndpoint( WLTOKEN_LEADERELECTIONREG_CANDIDACY, TaskCoordination );
71 leaderHeartbeat.makeWellKnownEndpoint( WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT, TaskCoordination );
72 forward.makeWellKnownEndpoint( WLTOKEN_LEADERELECTIONREG_FORWARD, TaskCoordination );
73 }
74
ServerCoordinators(Reference<ClusterConnectionFile> cf)75 ServerCoordinators::ServerCoordinators( Reference<ClusterConnectionFile> cf )
76 : ClientCoordinators(cf)
77 {
78 ClusterConnectionString cs = ccf->getConnectionString();
79 for(auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) {
80 leaderElectionServers.push_back( LeaderElectionRegInterface( *s ) );
81 stateServers.push_back( GenerationRegInterface( *s ) );
82 }
83 }
84
85 // The coordination server wants to create its key value store only if it is actually used
86 struct OnDemandStore {
87 public:
OnDemandStoreOnDemandStore88 OnDemandStore( std::string folder, UID myID ) : folder(folder), store(NULL), myID(myID) {}
~OnDemandStoreOnDemandStore89 ~OnDemandStore() { if (store) store->close(); }
90
getOnDemandStore91 IKeyValueStore* get() {
92 if (!store) open();
93 return store;
94 }
95
existsOnDemandStore96 bool exists() {
97 if (store)
98 return true;
99 return fileExists( joinPath(folder, "coordination-0.fdq") ) || fileExists( joinPath(folder, "coordination-1.fdq") ) || fileExists( joinPath(folder, "coordination.fdb") );
100 }
101
operator ->OnDemandStore102 IKeyValueStore* operator->() { return get(); }
103
getErrorOnDemandStore104 Future<Void> getError() { return onErr(err.getFuture()); }
105
106 private:
107 std::string folder;
108 UID myID;
109 IKeyValueStore* store;
110 Promise<Future<Void>> err;
111
onErrOnDemandStore112 ACTOR static Future<Void> onErr( Future<Future<Void>> e ) {
113 Future<Void> f = wait(e);
114 wait(f);
115 return Void();
116 }
117
openOnDemandStore118 void open() {
119 platform::createDirectory( folder );
120 store = keyValueStoreMemory( joinPath(folder, "coordination-"), myID, 500e6 );
121 err.send( store->getError() );
122 }
123 };
124
localGenerationReg(GenerationRegInterface interf,OnDemandStore * pstore)125 ACTOR Future<Void> localGenerationReg( GenerationRegInterface interf, OnDemandStore* pstore ) {
126 state GenerationRegVal v;
127 state OnDemandStore& store = *pstore;
128 // SOMEDAY: concurrent access to different keys?
129 loop choose {
130 when ( GenerationRegReadRequest _req = waitNext( interf.read.getFuture() ) ) {
131 TraceEvent("GenerationRegReadRequest").detail("From", _req.reply.getEndpoint().getPrimaryAddress()).detail("K", _req.key);
132 state GenerationRegReadRequest req = _req;
133 Optional<Value> rawV = wait( store->readValue( req.key ) );
134 v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>( rawV.get(), IncludeVersion() ) : GenerationRegVal();
135 TraceEvent("GenerationRegReadReply").detail("RVSize", rawV.present() ? rawV.get().size() : -1).detail("VWG", v.writeGen.generation);
136 if (v.readGen < req.gen) {
137 v.readGen = req.gen;
138 store->set( KeyValueRef( req.key, BinaryWriter::toValue(v, IncludeVersion()) ) );
139 wait(store->commit());
140 }
141 req.reply.send( GenerationRegReadReply( v.val, v.writeGen, v.readGen ) );
142 }
143 when ( GenerationRegWriteRequest _wrq = waitNext( interf.write.getFuture() ) ) {
144 state GenerationRegWriteRequest wrq = _wrq;
145 Optional<Value> rawV = wait( store->readValue( wrq.kv.key ) );
146 v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>( rawV.get(), IncludeVersion() ) : GenerationRegVal();
147 if (v.readGen <= wrq.gen && v.writeGen < wrq.gen) {
148 v.writeGen = wrq.gen;
149 v.val = wrq.kv.value;
150 store->set( KeyValueRef( wrq.kv.key, BinaryWriter::toValue(v, IncludeVersion()) ) );
151 wait(store->commit());
152 TraceEvent("GenerationRegWrote").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", wrq.kv.key)
153 .detail("ReqGen", wrq.gen.generation).detail("Returning", v.writeGen.generation);
154 wrq.reply.send( v.writeGen );
155 } else {
156 TraceEvent("GenerationRegWriteFail").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", wrq.kv.key)
157 .detail("ReqGen", wrq.gen.generation).detail("ReadGen", v.readGen.generation).detail("WriteGen", v.writeGen.generation);
158 wrq.reply.send( std::max( v.readGen, v.writeGen ) );
159 }
160 }
161 }
162 };
163
164 TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
165 state GenerationRegInterface reg;
166 state OnDemandStore store("simfdb/unittests/", //< FIXME
167 g_random->randomUniqueID());
168 state Future<Void> actor = localGenerationReg(reg, &store);
169 state Key the_key(g_random->randomAlphaNumeric( g_random->randomInt(0, 10)));
170
171 state UniqueGeneration firstGen(0, g_random->randomUniqueID());
172
173 {
174 GenerationRegReadReply r = wait(reg.read.getReply(GenerationRegReadRequest(the_key, firstGen)));
175 // If there was no prior write(_,_,0) or a data loss fault,
176 // returns (Optional(),0,gen2)
177 ASSERT(!r.value.present());
178 ASSERT(r.gen == UniqueGeneration());
179 ASSERT(r.rgen == firstGen);
180 }
181
182 {
183 UniqueGeneration g = wait(reg.write.getReply(GenerationRegWriteRequest(KeyValueRef(the_key, LiteralStringRef("Value1")), firstGen)));
184 // (gen1==gen is considered a "successful" write)
185 ASSERT(g == firstGen);
186 }
187
188 {
189 GenerationRegReadReply r = wait(reg.read.getReply(GenerationRegReadRequest(the_key, UniqueGeneration())));
190 // read(key,gen2) returns (value,gen,rgen).
191 // There was some earlier or concurrent write(key,value,gen).
192 ASSERT(r.value == LiteralStringRef("Value1"));
193 ASSERT(r.gen == firstGen);
194 // There was some earlier or concurrent read(key,rgen).
195 ASSERT(r.rgen == firstGen);
196 // If there is a write(key,_,gen1)=>gen1 s.t. gen1 < gen2 OR the write completed before this read started, then gen >= gen1.
197 ASSERT(r.gen >= firstGen);
198 // If there is a read(key,gen1) that completed before this read started, then rgen >= gen1
199 ASSERT(r.rgen >= firstGen);
200
201 ASSERT(!actor.isReady());
202 }
203 return Void();
204 }
205
206 // This actor implements a *single* leader-election register (essentially, it ignores
207 // the .key member of each request). It returns any time the leader election is in the
208 // default state, so that only active registers consume memory.
leaderRegister(LeaderElectionRegInterface interf,Key key)209 ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
210 state std::set<LeaderInfo> availableCandidates;
211 state std::set<LeaderInfo> availableLeaders;
212 state Optional<LeaderInfo> currentNominee;
213 state Deque<ReplyPromise<Optional<LeaderInfo>>> notify;
214 state Future<Void> nextInterval = delay( 0 );
215 state double candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
216 state int leaderIntervalCount = 0;
217 state Future<Void> notifyCheck = delay(SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / SERVER_KNOBS->MIN_NOTIFICATIONS);
218
219 loop choose {
220 when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
221 if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
222 req.reply.send( currentNominee.get() );
223 } else {
224 notify.push_back( req.reply );
225 if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
226 TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
227 for (uint32_t i=0; i<notify.size(); i++)
228 notify[i].send( currentNominee.get() );
229 notify.clear();
230 }
231 }
232 }
233 when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
234 //TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID );
235 availableCandidates.erase( LeaderInfo(req.prevChangeID) );
236 availableCandidates.insert( req.myInfo );
237 if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
238 req.reply.send( currentNominee.get() );
239 } else {
240 notify.push_back( req.reply );
241 if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
242 TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
243 for (uint32_t i=0; i<notify.size(); i++)
244 notify[i].send( currentNominee.get() );
245 notify.clear();
246 }
247 }
248 }
249 when (LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
250 //TODO: use notify to only send a heartbeat once per interval
251 availableLeaders.erase( LeaderInfo(req.prevChangeID) );
252 availableLeaders.insert( req.myInfo );
253 req.reply.send( currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) );
254 }
255 when (ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
256 LeaderInfo newInfo;
257 newInfo.forward = true;
258 newInfo.serializedInfo = req.conn.toString();
259 for(unsigned int i=0; i<notify.size(); i++)
260 notify[i].send( newInfo );
261 notify.clear();
262 req.reply.send( Void() );
263 return Void();
264 }
265 when ( wait(nextInterval) ) {
266 if (!availableLeaders.size() && !availableCandidates.size() && !notify.size() &&
267 !currentNominee.present())
268 {
269 // Our state is back to the initial state, so we can safely stop this actor
270 TraceEvent("EndingLeaderNomination").detail("Key", key);
271 return Void();
272 } else {
273 Optional<LeaderInfo> nextNominee;
274 if (availableLeaders.size() && availableCandidates.size()) {
275 nextNominee = ( *availableLeaders.begin() < *availableCandidates.begin() ) ? *availableLeaders.begin() : *availableCandidates.begin();
276 } else if (availableLeaders.size()) {
277 nextNominee = *availableLeaders.begin();
278 } else if (availableCandidates.size()) {
279 nextNominee = *availableCandidates.begin();
280 } else {
281 nextNominee = Optional<LeaderInfo>();
282 }
283
284 bool foundCurrentNominee = false;
285 if(currentNominee.present()) {
286 for(auto& it : availableLeaders) {
287 if(currentNominee.get().equalInternalId(it)) {
288 foundCurrentNominee = true;
289 break;
290 }
291 }
292 }
293
294 if ( !nextNominee.present() || !foundCurrentNominee || currentNominee.get().leaderChangeRequired(nextNominee.get()) ) {
295 TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
296 .detail("Changed", nextNominee != currentNominee).detail("Key", key);
297 for(unsigned int i=0; i<notify.size(); i++)
298 notify[i].send( nextNominee );
299 notify.clear();
300 currentNominee = nextNominee;
301 } else if (currentNominee.get().equalInternalId(nextNominee.get())) {
302 // leader becomes better
303 currentNominee = nextNominee;
304 }
305
306 if( availableLeaders.size() ) {
307 nextInterval = delay( SERVER_KNOBS->POLLING_FREQUENCY );
308 if(leaderIntervalCount++ > 5) {
309 candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
310 }
311 } else {
312 nextInterval = delay( candidateDelay );
313 candidateDelay = std::min(SERVER_KNOBS->CANDIDATE_MAX_DELAY, candidateDelay * SERVER_KNOBS->CANDIDATE_GROWTH_RATE);
314 leaderIntervalCount = 0;
315 }
316
317 availableLeaders.clear();
318 availableCandidates.clear();
319 }
320 }
321 when( wait(notifyCheck) ) {
322 notifyCheck = delay( SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / std::max<double>(SERVER_KNOBS->MIN_NOTIFICATIONS, notify.size()) );
323 if(!notify.empty() && currentNominee.present()) {
324 notify.front().send( currentNominee.get() );
325 notify.pop_front();
326 }
327 }
328 }
329 }
330
331 // Generation register values are stored without prefixing in the coordinated state, but always begin with an alphanumeric character
332 // (they are always derived from a ClusterConnectionString key).
333 // Forwarding values are stored in this range:
334 const KeyRangeRef fwdKeys( LiteralStringRef( "\xff" "fwd" ), LiteralStringRef( "\xff" "fwe" ) );
335
336 struct LeaderRegisterCollection {
337 // SOMEDAY: Factor this into a generic tool? Extend ActorCollection to support removal actions? What?
338 ActorCollection actors;
339 Map<Key, LeaderElectionRegInterface> registerInterfaces;
340 Map<Key, LeaderInfo> forward;
341 OnDemandStore *pStore;
342
LeaderRegisterCollectionLeaderRegisterCollection343 LeaderRegisterCollection( OnDemandStore *pStore ) : actors( false ), pStore( pStore ) {}
344
initLeaderRegisterCollection345 ACTOR static Future<Void> init( LeaderRegisterCollection *self ) {
346 if( !self->pStore->exists() )
347 return Void();
348 OnDemandStore &store = *self->pStore;
349 Standalone<VectorRef<KeyValueRef>> forwardingInfo = wait( store->readRange( fwdKeys ) );
350 for( int i = 0; i < forwardingInfo.size(); i++ ) {
351 LeaderInfo forwardInfo;
352 forwardInfo.forward = true;
353 forwardInfo.serializedInfo = forwardingInfo[i].value;
354 self->forward[ forwardingInfo[i].key.removePrefix( fwdKeys.begin ) ] = forwardInfo;
355 }
356 return Void();
357 }
358
onErrorLeaderRegisterCollection359 Future<Void> onError() { return actors.getResult(); }
360
getForwardLeaderRegisterCollection361 Optional<LeaderInfo> getForward(KeyRef key) {
362 auto i = forward.find( key );
363 if (i == forward.end())
364 return Optional<LeaderInfo>();
365 return i->value;
366 }
367
setForwardLeaderRegisterCollection368 ACTOR static Future<Void> setForward(LeaderRegisterCollection *self, KeyRef key, ClusterConnectionString conn) {
369 LeaderInfo forwardInfo;
370 forwardInfo.forward = true;
371 forwardInfo.serializedInfo = conn.toString();
372 self->forward[ key ] = forwardInfo;
373 OnDemandStore &store = *self->pStore;
374 store->set( KeyValueRef( key.withPrefix( fwdKeys.begin ), conn.toString() ) );
375 wait(store->commit());
376 return Void();
377 }
378
getInterfaceLeaderRegisterCollection379 LeaderElectionRegInterface& getInterface(KeyRef key) {
380 auto i = registerInterfaces.find( key );
381 if (i == registerInterfaces.end()) {
382 Key k = key;
383 Future<Void> a = wrap(this, k, leaderRegister(registerInterfaces[k], k) );
384 if (a.isError()) throw a.getError();
385 ASSERT( !a.isReady() );
386 actors.add( a );
387 i = registerInterfaces.find( key );
388 }
389 ASSERT( i != registerInterfaces.end() );
390 return i->value;
391 }
392
wrapLeaderRegisterCollection393 ACTOR static Future<Void> wrap( LeaderRegisterCollection* self, Key key, Future<Void> actor ) {
394 state Error e;
395 try {
396 wait(actor);
397 } catch (Error& err) {
398 if (err.code() == error_code_actor_cancelled)
399 throw;
400 e = err;
401 }
402 self->registerInterfaces.erase(key);
403 if (e.code() != invalid_error_code) throw e;
404 return Void();
405 }
406
407 };
408
409 // leaderServer multiplexes multiple leaderRegisters onto a single LeaderElectionRegInterface,
410 // creating and destroying them on demand.
leaderServer(LeaderElectionRegInterface interf,OnDemandStore * pStore)411 ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore *pStore) {
412 state LeaderRegisterCollection regs( pStore );
413 state ActorCollection forwarders(false);
414
415 wait( LeaderRegisterCollection::init( ®s ) );
416
417 loop choose {
418 when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
419 Optional<LeaderInfo> forward = regs.getForward(req.key);
420 if( forward.present() )
421 req.reply.send( forward.get() );
422 else
423 regs.getInterface(req.key).getLeader.send( req );
424 }
425 when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
426 Optional<LeaderInfo> forward = regs.getForward(req.key);
427 if( forward.present() )
428 req.reply.send( forward.get() );
429 else
430 regs.getInterface(req.key).candidacy.send(req);
431 }
432 when ( LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
433 Optional<LeaderInfo> forward = regs.getForward(req.key);
434 if( forward.present() )
435 req.reply.send( false );
436 else
437 regs.getInterface(req.key).leaderHeartbeat.send(req);
438 }
439 when ( ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
440 Optional<LeaderInfo> forward = regs.getForward(req.key);
441 if( forward.present() )
442 req.reply.send( Void() );
443 else {
444 forwarders.add( LeaderRegisterCollection::setForward( ®s, req.key, ClusterConnectionString(req.conn.toString()) ) );
445 regs.getInterface(req.key).forward.send(req);
446 }
447 }
448 when( wait( forwarders.getResult() ) ) { ASSERT(false); throw internal_error(); }
449 }
450 }
451
coordinationServer(std::string dataFolder)452 ACTOR Future<Void> coordinationServer(std::string dataFolder) {
453 state UID myID = g_random->randomUniqueID();
454 state LeaderElectionRegInterface myLeaderInterface( g_network );
455 state GenerationRegInterface myInterface( g_network );
456 state OnDemandStore store( dataFolder, myID );
457
458 TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()).detail("Folder", dataFolder);
459
460 try {
461 wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store) || store.getError() );
462 throw internal_error();
463 } catch (Error& e) {
464 TraceEvent("CoordinationServerError", myID).error(e, true);
465 throw;
466 }
467 }
468