1 /*
2  * Coordination.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbserver/CoordinationInterface.h"
22 #include "fdbserver/IKeyValueStore.h"
23 #include "flow/ActorCollection.h"
24 #include "fdbserver/Knobs.h"
25 #include "flow/UnitTest.h"
26 #include "flow/IndexedSet.h"
27 #include "flow/actorcompiler.h"  // This must be the last #include.
28 
29 // This module implements coordinationServer() and the interfaces in CoordinationInterface.h
30 
31 struct GenerationRegVal {
32 	UniqueGeneration readGen, writeGen;
33 	Optional<Value> val;
34 	template <class Ar>
serializeGenerationRegVal35 	void serialize(Ar& ar) {
36 		serializer(ar, readGen, writeGen, val);
37 	}
38 };
39 
40 // UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 ); // from fdbclient/MonitorLeader.actor.cpp
41 UID WLTOKEN_LEADERELECTIONREG_CANDIDACY( -1, 3 );
42 UID WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT( -1, 4 );
43 UID WLTOKEN_LEADERELECTIONREG_FORWARD( -1, 5 );
44 UID WLTOKEN_GENERATIONREG_READ( -1, 6 );
45 UID WLTOKEN_GENERATIONREG_WRITE( -1, 7 );
46 
GenerationRegInterface(NetworkAddress remote)47 GenerationRegInterface::GenerationRegInterface( NetworkAddress remote )
48 	: read( Endpoint({remote}, WLTOKEN_GENERATIONREG_READ) ),
49 	  write( Endpoint({remote}, WLTOKEN_GENERATIONREG_WRITE) )
50 {
51 }
52 
GenerationRegInterface(INetwork * local)53 GenerationRegInterface::GenerationRegInterface( INetwork* local )
54 {
55 	read.makeWellKnownEndpoint( WLTOKEN_GENERATIONREG_READ, TaskCoordination );
56 	write.makeWellKnownEndpoint( WLTOKEN_GENERATIONREG_WRITE, TaskCoordination );
57 }
58 
LeaderElectionRegInterface(NetworkAddress remote)59 LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress remote)
60 	: ClientLeaderRegInterface(remote),
61 	  candidacy( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_CANDIDACY) ),
62 	  leaderHeartbeat( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT) ),
63 	  forward( Endpoint({remote}, WLTOKEN_LEADERELECTIONREG_FORWARD) )
64 {
65 }
66 
LeaderElectionRegInterface(INetwork * local)67 LeaderElectionRegInterface::LeaderElectionRegInterface(INetwork* local)
68 	: ClientLeaderRegInterface(local)
69 {
70 	candidacy.makeWellKnownEndpoint( WLTOKEN_LEADERELECTIONREG_CANDIDACY, TaskCoordination );
71 	leaderHeartbeat.makeWellKnownEndpoint( WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT, TaskCoordination );
72 	forward.makeWellKnownEndpoint( WLTOKEN_LEADERELECTIONREG_FORWARD, TaskCoordination );
73 }
74 
ServerCoordinators(Reference<ClusterConnectionFile> cf)75 ServerCoordinators::ServerCoordinators( Reference<ClusterConnectionFile> cf )
76 	: ClientCoordinators(cf)
77 {
78 	ClusterConnectionString cs = ccf->getConnectionString();
79 	for(auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s) {
80 		leaderElectionServers.push_back( LeaderElectionRegInterface( *s ) );
81 		stateServers.push_back( GenerationRegInterface( *s ) );
82 	}
83 }
84 
85 // The coordination server wants to create its key value store only if it is actually used
86 struct OnDemandStore {
87 public:
OnDemandStoreOnDemandStore88 	OnDemandStore( std::string folder, UID myID ) : folder(folder), store(NULL), myID(myID) {}
~OnDemandStoreOnDemandStore89 	~OnDemandStore() { if (store) store->close(); }
90 
getOnDemandStore91 	IKeyValueStore* get() {
92 		if (!store) open();
93 		return store;
94 	}
95 
existsOnDemandStore96 	bool exists() {
97 		if (store)
98 			return true;
99 		return fileExists( joinPath(folder, "coordination-0.fdq") ) || fileExists( joinPath(folder, "coordination-1.fdq") ) || fileExists( joinPath(folder, "coordination.fdb") );
100 	}
101 
operator ->OnDemandStore102 	IKeyValueStore* operator->() { return get(); }
103 
getErrorOnDemandStore104 	Future<Void> getError() { return onErr(err.getFuture()); }
105 
106 private:
107 	std::string folder;
108 	UID myID;
109 	IKeyValueStore* store;
110 	Promise<Future<Void>> err;
111 
onErrOnDemandStore112 	ACTOR static Future<Void> onErr( Future<Future<Void>> e ) {
113 		Future<Void> f = wait(e);
114 		wait(f);
115 		return Void();
116 	}
117 
openOnDemandStore118 	void open() {
119 		platform::createDirectory( folder );
120 		store = keyValueStoreMemory( joinPath(folder, "coordination-"), myID, 500e6 );
121 		err.send( store->getError() );
122 	}
123 };
124 
localGenerationReg(GenerationRegInterface interf,OnDemandStore * pstore)125 ACTOR Future<Void> localGenerationReg( GenerationRegInterface interf, OnDemandStore* pstore ) {
126 	state GenerationRegVal v;
127 	state OnDemandStore& store = *pstore;
128 	// SOMEDAY: concurrent access to different keys?
129 	loop choose {
130 		when ( GenerationRegReadRequest _req = waitNext( interf.read.getFuture() ) ) {
131 			TraceEvent("GenerationRegReadRequest").detail("From", _req.reply.getEndpoint().getPrimaryAddress()).detail("K", _req.key);
132 			state GenerationRegReadRequest req = _req;
133 			Optional<Value> rawV = wait( store->readValue( req.key ) );
134 			v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>( rawV.get(), IncludeVersion() ) : GenerationRegVal();
135 			TraceEvent("GenerationRegReadReply").detail("RVSize", rawV.present() ? rawV.get().size() : -1).detail("VWG", v.writeGen.generation);
136 			if (v.readGen < req.gen) {
137 				v.readGen = req.gen;
138 				store->set( KeyValueRef( req.key, BinaryWriter::toValue(v, IncludeVersion()) ) );
139 				wait(store->commit());
140 			}
141 			req.reply.send( GenerationRegReadReply( v.val, v.writeGen, v.readGen ) );
142 		}
143 		when ( GenerationRegWriteRequest _wrq = waitNext( interf.write.getFuture() ) ) {
144 			state GenerationRegWriteRequest wrq = _wrq;
145 			Optional<Value> rawV = wait( store->readValue( wrq.kv.key ) );
146 			v = rawV.present() ? BinaryReader::fromStringRef<GenerationRegVal>( rawV.get(), IncludeVersion() ) : GenerationRegVal();
147 			if (v.readGen <= wrq.gen && v.writeGen < wrq.gen) {
148 				v.writeGen = wrq.gen;
149 				v.val = wrq.kv.value;
150 				store->set( KeyValueRef( wrq.kv.key, BinaryWriter::toValue(v, IncludeVersion()) ) );
151 				wait(store->commit());
152 				TraceEvent("GenerationRegWrote").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", wrq.kv.key)
153 					.detail("ReqGen", wrq.gen.generation).detail("Returning", v.writeGen.generation);
154 				wrq.reply.send( v.writeGen );
155 			} else {
156 				TraceEvent("GenerationRegWriteFail").detail("From", wrq.reply.getEndpoint().getPrimaryAddress()).detail("Key", wrq.kv.key)
157 					.detail("ReqGen", wrq.gen.generation).detail("ReadGen", v.readGen.generation).detail("WriteGen", v.writeGen.generation);
158 				wrq.reply.send( std::max( v.readGen, v.writeGen ) );
159 			}
160 		}
161 	}
162 };
163 
164 TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
165 	state GenerationRegInterface reg;
166 	state OnDemandStore store("simfdb/unittests/", //< FIXME
167 		g_random->randomUniqueID());
168 	state Future<Void> actor = localGenerationReg(reg, &store);
169 	state Key the_key(g_random->randomAlphaNumeric( g_random->randomInt(0, 10)));
170 
171 	state UniqueGeneration firstGen(0, g_random->randomUniqueID());
172 
173 	{
174 		GenerationRegReadReply r = wait(reg.read.getReply(GenerationRegReadRequest(the_key, firstGen)));
175 		//   If there was no prior write(_,_,0) or a data loss fault,
176 		//     returns (Optional(),0,gen2)
177 		ASSERT(!r.value.present());
178 		ASSERT(r.gen == UniqueGeneration());
179 		ASSERT(r.rgen == firstGen);
180 	}
181 
182 	{
183 		UniqueGeneration g = wait(reg.write.getReply(GenerationRegWriteRequest(KeyValueRef(the_key, LiteralStringRef("Value1")), firstGen)));
184 		//   (gen1==gen is considered a "successful" write)
185 		ASSERT(g == firstGen);
186 	}
187 
188 	{
189 		GenerationRegReadReply r = wait(reg.read.getReply(GenerationRegReadRequest(the_key, UniqueGeneration())));
190 		// read(key,gen2) returns (value,gen,rgen).
191 		//     There was some earlier or concurrent write(key,value,gen).
192 		ASSERT(r.value == LiteralStringRef("Value1"));
193 		ASSERT(r.gen == firstGen);
194 		//     There was some earlier or concurrent read(key,rgen).
195 		ASSERT(r.rgen == firstGen);
196 		//     If there is a write(key,_,gen1)=>gen1 s.t. gen1 < gen2 OR the write completed before this read started, then gen >= gen1.
197 		ASSERT(r.gen >= firstGen);
198 		//     If there is a read(key,gen1) that completed before this read started, then rgen >= gen1
199 		ASSERT(r.rgen >= firstGen);
200 
201 		ASSERT(!actor.isReady());
202 	}
203 	return Void();
204 }
205 
206 // This actor implements a *single* leader-election register (essentially, it ignores
207 // the .key member of each request).  It returns any time the leader election is in the
208 // default state, so that only active registers consume memory.
leaderRegister(LeaderElectionRegInterface interf,Key key)209 ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
210 	state std::set<LeaderInfo> availableCandidates;
211 	state std::set<LeaderInfo> availableLeaders;
212 	state Optional<LeaderInfo> currentNominee;
213 	state Deque<ReplyPromise<Optional<LeaderInfo>>> notify;
214 	state Future<Void> nextInterval = delay( 0 );
215 	state double candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
216 	state int leaderIntervalCount = 0;
217 	state Future<Void> notifyCheck = delay(SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / SERVER_KNOBS->MIN_NOTIFICATIONS);
218 
219 	loop choose {
220 		when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
221 			if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
222 				req.reply.send( currentNominee.get() );
223 			} else {
224 				notify.push_back( req.reply );
225 				if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
226 					TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
227 					for (uint32_t i=0; i<notify.size(); i++)
228 						notify[i].send( currentNominee.get() );
229 					notify.clear();
230 				}
231 			}
232 		}
233 		when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
234 			//TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID );
235 			availableCandidates.erase( LeaderInfo(req.prevChangeID) );
236 			availableCandidates.insert( req.myInfo );
237 			if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
238 				req.reply.send( currentNominee.get() );
239 			} else {
240 				notify.push_back( req.reply );
241 				if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
242 					TraceEvent(SevWarnAlways, "TooManyNotifications").detail("Amount", notify.size());
243 					for (uint32_t i=0; i<notify.size(); i++)
244 						notify[i].send( currentNominee.get() );
245 					notify.clear();
246 				}
247 			}
248 		}
249 		when (LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
250 			//TODO: use notify to only send a heartbeat once per interval
251 			availableLeaders.erase( LeaderInfo(req.prevChangeID) );
252 			availableLeaders.insert( req.myInfo );
253 			req.reply.send( currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) );
254 		}
255 		when (ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
256 			LeaderInfo newInfo;
257 			newInfo.forward = true;
258 			newInfo.serializedInfo = req.conn.toString();
259 			for(unsigned int i=0; i<notify.size(); i++)
260 				notify[i].send( newInfo );
261 			notify.clear();
262 			req.reply.send( Void() );
263 			return Void();
264 		}
265 		when ( wait(nextInterval) ) {
266 			if (!availableLeaders.size() && !availableCandidates.size() && !notify.size() &&
267 				!currentNominee.present())
268 			{
269 				// Our state is back to the initial state, so we can safely stop this actor
270 				TraceEvent("EndingLeaderNomination").detail("Key", key);
271 				return Void();
272 			} else {
273 				Optional<LeaderInfo> nextNominee;
274 				if (availableLeaders.size() && availableCandidates.size()) {
275 					nextNominee = ( *availableLeaders.begin() < *availableCandidates.begin() ) ? *availableLeaders.begin() : *availableCandidates.begin();
276 				} else if (availableLeaders.size()) {
277 					nextNominee = *availableLeaders.begin();
278 				} else if (availableCandidates.size()) {
279 					nextNominee = *availableCandidates.begin();
280 				} else {
281 					nextNominee = Optional<LeaderInfo>();
282 				}
283 
284 				bool foundCurrentNominee = false;
285 				if(currentNominee.present()) {
286 					for(auto& it : availableLeaders) {
287 						if(currentNominee.get().equalInternalId(it)) {
288 							foundCurrentNominee = true;
289 							break;
290 						}
291 					}
292 				}
293 
294 				if ( !nextNominee.present() || !foundCurrentNominee || currentNominee.get().leaderChangeRequired(nextNominee.get()) ) {
295 					TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
296 						.detail("Changed", nextNominee != currentNominee).detail("Key", key);
297 					for(unsigned int i=0; i<notify.size(); i++)
298 						notify[i].send( nextNominee );
299 					notify.clear();
300 					currentNominee = nextNominee;
301 				} else if (currentNominee.get().equalInternalId(nextNominee.get())) {
302 					// leader becomes better
303 					currentNominee = nextNominee;
304 				}
305 
306 				if( availableLeaders.size() ) {
307 					nextInterval = delay( SERVER_KNOBS->POLLING_FREQUENCY );
308 					if(leaderIntervalCount++ > 5) {
309 						candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
310 					}
311 				} else {
312 					nextInterval = delay( candidateDelay );
313 					candidateDelay = std::min(SERVER_KNOBS->CANDIDATE_MAX_DELAY, candidateDelay * SERVER_KNOBS->CANDIDATE_GROWTH_RATE);
314 					leaderIntervalCount = 0;
315 				}
316 
317 				availableLeaders.clear();
318 				availableCandidates.clear();
319 			}
320 		}
321 		when( wait(notifyCheck) ) {
322 			notifyCheck = delay( SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / std::max<double>(SERVER_KNOBS->MIN_NOTIFICATIONS, notify.size()) );
323 			if(!notify.empty() && currentNominee.present()) {
324 				notify.front().send( currentNominee.get() );
325 				notify.pop_front();
326 			}
327 		}
328 	}
329 }
330 
331 // Generation register values are stored without prefixing in the coordinated state, but always begin with an alphanumeric character
332 // (they are always derived from a ClusterConnectionString key).
333 // Forwarding values are stored in this range:
334 const KeyRangeRef fwdKeys( LiteralStringRef( "\xff" "fwd" ), LiteralStringRef( "\xff" "fwe" ) );
335 
336 struct LeaderRegisterCollection {
337 	// SOMEDAY: Factor this into a generic tool?  Extend ActorCollection to support removal actions?  What?
338 	ActorCollection actors;
339 	Map<Key, LeaderElectionRegInterface> registerInterfaces;
340 	Map<Key, LeaderInfo> forward;
341 	OnDemandStore *pStore;
342 
LeaderRegisterCollectionLeaderRegisterCollection343 	LeaderRegisterCollection( OnDemandStore *pStore ) : actors( false ), pStore( pStore ) {}
344 
initLeaderRegisterCollection345 	ACTOR static Future<Void> init( LeaderRegisterCollection *self ) {
346 		if( !self->pStore->exists() )
347 			return Void();
348 		OnDemandStore &store = *self->pStore;
349 		Standalone<VectorRef<KeyValueRef>> forwardingInfo = wait( store->readRange( fwdKeys ) );
350 		for( int i = 0; i < forwardingInfo.size(); i++ ) {
351 			LeaderInfo forwardInfo;
352 			forwardInfo.forward = true;
353 			forwardInfo.serializedInfo = forwardingInfo[i].value;
354 			self->forward[ forwardingInfo[i].key.removePrefix( fwdKeys.begin ) ] = forwardInfo;
355 		}
356 		return Void();
357 	}
358 
onErrorLeaderRegisterCollection359 	Future<Void> onError() { return actors.getResult(); }
360 
getForwardLeaderRegisterCollection361 	Optional<LeaderInfo> getForward(KeyRef key) {
362 		auto i = forward.find( key );
363 		if (i == forward.end())
364 			return Optional<LeaderInfo>();
365 		return i->value;
366 	}
367 
setForwardLeaderRegisterCollection368 	ACTOR static Future<Void> setForward(LeaderRegisterCollection *self, KeyRef key, ClusterConnectionString conn) {
369 		LeaderInfo forwardInfo;
370 		forwardInfo.forward = true;
371 		forwardInfo.serializedInfo = conn.toString();
372 		self->forward[ key ] = forwardInfo;
373 		OnDemandStore &store = *self->pStore;
374 		store->set( KeyValueRef( key.withPrefix( fwdKeys.begin ), conn.toString() ) );
375 		wait(store->commit());
376 		return Void();
377 	}
378 
getInterfaceLeaderRegisterCollection379 	LeaderElectionRegInterface& getInterface(KeyRef key) {
380 		auto i = registerInterfaces.find( key );
381 		if (i == registerInterfaces.end()) {
382 			Key k = key;
383 			Future<Void> a = wrap(this, k, leaderRegister(registerInterfaces[k], k) );
384 			if (a.isError()) throw a.getError();
385 			ASSERT( !a.isReady() );
386 			actors.add( a );
387 			i  = registerInterfaces.find( key );
388 		}
389 		ASSERT( i != registerInterfaces.end() );
390 		return i->value;
391 	}
392 
wrapLeaderRegisterCollection393 	ACTOR static Future<Void> wrap( LeaderRegisterCollection* self, Key key, Future<Void> actor ) {
394 		state Error e;
395 		try {
396 			wait(actor);
397 		} catch (Error& err) {
398 			if (err.code() == error_code_actor_cancelled)
399 				throw;
400 			e = err;
401 		}
402 		self->registerInterfaces.erase(key);
403 		if (e.code() != invalid_error_code) throw e;
404 		return Void();
405 	}
406 
407 };
408 
409 // leaderServer multiplexes multiple leaderRegisters onto a single LeaderElectionRegInterface,
410 // creating and destroying them on demand.
leaderServer(LeaderElectionRegInterface interf,OnDemandStore * pStore)411 ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore *pStore) {
412 	state LeaderRegisterCollection regs( pStore );
413 	state ActorCollection forwarders(false);
414 
415 	wait( LeaderRegisterCollection::init( &regs ) );
416 
417 	loop choose {
418 		when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
419 			Optional<LeaderInfo> forward = regs.getForward(req.key);
420 			if( forward.present() )
421 				req.reply.send( forward.get() );
422 			else
423 				regs.getInterface(req.key).getLeader.send( req );
424 		}
425 		when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
426 			Optional<LeaderInfo> forward = regs.getForward(req.key);
427 			if( forward.present() )
428 				req.reply.send( forward.get() );
429 			else
430 				regs.getInterface(req.key).candidacy.send(req);
431 		}
432 		when ( LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
433 			Optional<LeaderInfo> forward = regs.getForward(req.key);
434 			if( forward.present() )
435 				req.reply.send( false );
436 			else
437 				regs.getInterface(req.key).leaderHeartbeat.send(req);
438 		}
439 		when ( ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
440 			Optional<LeaderInfo> forward = regs.getForward(req.key);
441 			if( forward.present() )
442 				req.reply.send( Void() );
443 			else {
444 				forwarders.add( LeaderRegisterCollection::setForward( &regs, req.key, ClusterConnectionString(req.conn.toString()) ) );
445 				regs.getInterface(req.key).forward.send(req);
446 			}
447 		}
448 		when( wait( forwarders.getResult() ) ) { ASSERT(false); throw internal_error(); }
449 	}
450 }
451 
coordinationServer(std::string dataFolder)452 ACTOR Future<Void> coordinationServer(std::string dataFolder) {
453 	state UID myID = g_random->randomUniqueID();
454 	state LeaderElectionRegInterface myLeaderInterface( g_network );
455 	state GenerationRegInterface myInterface( g_network );
456 	state OnDemandStore store( dataFolder, myID );
457 
458 	TraceEvent("CoordinationServer", myID).detail("MyInterfaceAddr", myInterface.read.getEndpoint().getPrimaryAddress()).detail("Folder", dataFolder);
459 
460 	try {
461 		wait( localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store) || store.getError() );
462 		throw internal_error();
463 	} catch (Error& e) {
464 		TraceEvent("CoordinationServerError", myID).error(e, true);
465 		throw;
466 	}
467 }
468