1 /*
2  * MonitorLeader.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbclient/MonitorLeader.h"
22 #include "fdbclient/CoordinationInterface.h"
23 #include "flow/ActorCollection.h"
24 #include "flow/UnitTest.h"
25 #include "fdbrpc/genericactors.actor.h"
26 #include "fdbrpc/Platform.h"
27 #include "flow/actorcompiler.h" // has to be last include
28 
lookupClusterFileName(std::string const & filename)29 std::pair< std::string, bool > ClusterConnectionFile::lookupClusterFileName( std::string const& filename ) {
30 	if (filename.length())
31 		return std::make_pair(filename, false);
32 
33 	std::string f;
34 	bool isDefaultFile = true;
35 	if (platform::getEnvironmentVar(CLUSTER_FILE_ENV_VAR_NAME, f)) {
36 		// If this is set but points to a file that does not
37 		// exist, we will not fallback to any other methods
38 		isDefaultFile = false;
39 	} else if (fileExists("fdb.cluster"))
40 		f = "fdb.cluster";
41 	else
42 		f = platform::getDefaultClusterFilePath();
43 
44 	return std::make_pair( f, isDefaultFile );
45 }
46 
getErrorString(std::pair<std::string,bool> const & resolvedClusterFile,Error const & e)47 std::string ClusterConnectionFile::getErrorString( std::pair<std::string, bool> const& resolvedClusterFile, Error const& e ) {
48 	bool isDefault = resolvedClusterFile.second;
49 	if( e.code() == error_code_connection_string_invalid ) {
50 		return format("Invalid cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
51 	} else if( e.code() == error_code_no_cluster_file_found ) {
52 		if( isDefault )
53 			return format("Unable to read cluster file `./fdb.cluster' or `%s' and %s unset: %d %s",
54 						  platform::getDefaultClusterFilePath().c_str(), CLUSTER_FILE_ENV_VAR_NAME, e.code(), e.what());
55 		else
56 			return format("Unable to read cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
57 	} else {
58 		return format("Unexpected error loading cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
59 	}
60 }
61 
ClusterConnectionFile(std::string const & filename)62 ClusterConnectionFile::ClusterConnectionFile( std::string const& filename ) {
63 	if( !fileExists( filename ) ) {
64 		throw no_cluster_file_found();
65 	}
66 
67 	cs = ClusterConnectionString(readFileBytes(filename, MAX_CLUSTER_FILE_BYTES));
68 	this->filename = filename;
69 	setConn = false;
70 }
71 
ClusterConnectionFile(std::string const & filename,ClusterConnectionString const & contents)72 ClusterConnectionFile::ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents) {
73 	this->filename = filename;
74 	cs = contents;
75 	setConn = true;
76 }
77 
getConnectionString() const78 ClusterConnectionString const& ClusterConnectionFile::getConnectionString() const {
79 	return cs;
80 }
81 
notifyConnected()82 void ClusterConnectionFile::notifyConnected() {
83 	if (setConn){
84 		this->writeFile();
85 	}
86 }
87 
fileContentsUpToDate() const88 bool ClusterConnectionFile::fileContentsUpToDate() const {
89 	ClusterConnectionString temp;
90 	return fileContentsUpToDate(temp);
91 }
92 
fileContentsUpToDate(ClusterConnectionString & fileConnectionString) const93 bool ClusterConnectionFile::fileContentsUpToDate(ClusterConnectionString &fileConnectionString) const {
94 	try {
95 		// the cluster file hasn't been created yet so there's nothing to check
96 		if (setConn)
97 			return true;
98 
99 		ClusterConnectionFile temp( filename );
100 		fileConnectionString = temp.getConnectionString();
101 		return fileConnectionString.toString() == cs.toString();
102 	}
103 	catch (Error& e) {
104 		TraceEvent(SevWarnAlways, "ClusterFileError").error(e).detail("Filename", filename);
105 		return false; // Swallow the error and report that the file is out of date
106 	}
107 }
108 
writeFile()109 bool ClusterConnectionFile::writeFile() {
110 	setConn = false;
111 	if(filename.size()) {
112 		try {
113 			atomicReplace( filename, "# DO NOT EDIT!\n# This file is auto-generated, it is not to be edited by hand\n" + cs.toString().append("\n") );
114 			if(!fileContentsUpToDate()) {
115 				// This should only happen in rare scenarios where multiple processes are updating the same file to different values simultaneously
116 				// In that case, we don't have any guarantees about which file will ultimately be written
117 				TraceEvent(SevWarnAlways, "ClusterFileChangedAfterReplace").detail("Filename", filename).detail("ConnStr", cs.toString());
118 				return false;
119 			}
120 
121 			return true;
122 		} catch( Error &e ) {
123 			TraceEvent(SevWarnAlways, "UnableToChangeConnectionFile").error(e).detail("Filename", filename).detail("ConnStr", cs.toString());
124 		}
125 	}
126 
127 	return false;
128 }
129 
setConnectionString(ClusterConnectionString const & conn)130 void ClusterConnectionFile::setConnectionString( ClusterConnectionString const& conn ) {
131 	ASSERT( filename.size() );
132 	cs = conn;
133 	writeFile();
134 }
135 
getErrorString(std::string const & source,Error const & e)136 std::string ClusterConnectionString::getErrorString( std::string const& source, Error const& e ) {
137 	if( e.code() == error_code_connection_string_invalid ) {
138 		return format("Invalid connection string `%s: %d %s", source.c_str(), e.code(), e.what());
139 	}
140 	else {
141 		return format("Unexpected error parsing connection string `%s: %d %s", source.c_str(), e.code(), e.what());
142 	}
143 }
144 
trim(std::string const & connectionString)145 std::string trim( std::string const& connectionString ) {
146 	// Strip out whitespace
147 	// Strip out characters between a # and a newline
148 	std::string trimmed;
149 	auto end = connectionString.end();
150 	for(auto c=connectionString.begin(); c!=end; ++c) {
151 		if (*c == '#') {
152 			++c;
153 			while(c!=end && *c != '\n' && *c != '\r')
154 				++c;
155 			if(c == end)
156 				break;
157 		}
158 		else if (*c != ' ' && *c != '\n' && *c != '\r' && *c != '\t')
159 			trimmed += *c;
160 	}
161 	return trimmed;
162 }
163 
ClusterConnectionString(std::string const & connectionString)164 ClusterConnectionString::ClusterConnectionString( std::string const& connectionString ) {
165 	auto trimmed = trim(connectionString);
166 
167 	// Split on '@' into key@addrs
168 	int pAt = trimmed.find_first_of('@');
169 	if (pAt == trimmed.npos)
170 		throw connection_string_invalid();
171 	std::string key = trimmed.substr(0, pAt);
172 	std::string addrs = trimmed.substr(pAt+1);
173 
174 	parseKey(key);
175 
176 	coord = NetworkAddress::parseList(addrs);
177 	ASSERT( coord.size() > 0 );  // parseList() always returns at least one address if it doesn't throw
178 
179 	std::sort( coord.begin(), coord.end() );
180 	// Check that there are no duplicate addresses
181 	if ( std::unique( coord.begin(), coord.end() ) != coord.end() )
182 		throw connection_string_invalid();
183 }
184 
185 TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") {
186 	std::string input;
187 
188 	{
189 		input = "asdf:2345@1.1.1.1:345";
190 		ClusterConnectionString cs(input);
191 		ASSERT( input == cs.toString() );
192 	}
193 
194 	{
195 		input = "0xxdeadbeef:100100100@1.1.1.1:34534,5.1.5.3:23443";
196 		ClusterConnectionString cs(input);
197 		ASSERT( input == cs.toString() );
198 	}
199 
200 	{
201 		input = "0xxdeadbeef:100100100@1.1.1.1:34534,5.1.5.3:23443";
202 		std::string commented("#start of comment\n");
203 		commented += input;
204 		commented += "\n";
205 		commented += "# asdfasdf ##";
206 
207 		ClusterConnectionString cs(commented);
208 		ASSERT( input == cs.toString() );
209 	}
210 
211 	{
212 		input = "0xxdeadbeef:100100100@[::1]:1234,[::1]:1235";
213 		std::string commented("#start of comment\n");
214 		commented += input;
215 		commented += "\n";
216 		commented += "# asdfasdf ##";
217 
218 		ClusterConnectionString cs(commented);
219 		ASSERT(input == cs.toString());
220 	}
221 
222 	{
223 		input = "0xxdeadbeef:100100100@[abcd:dcba::1]:1234,[abcd:dcba::abcd:1]:1234";
224 		std::string commented("#start of comment\n");
225 		commented += input;
226 		commented += "\n";
227 		commented += "# asdfasdf ##";
228 
229 		ClusterConnectionString cs(commented);
230 		ASSERT(input == cs.toString());
231 	}
232 
233 	return Void();
234 }
235 
236 TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/fuzz") {
237 	// For a static connection string, add in fuzzed comments and whitespace
238 	// SOMEDAY: create a series of random connection strings, rather than the one we started with
239 	std::string connectionString = "0xxdeadbeef:100100100@1.1.1.1:34534,5.1.5.3:23443";
240 	for(int i=0; i<10000; i++)
241 	{
242 		std::string output("");
243 		auto c=connectionString.begin();
244 		while(c!=connectionString.end()) {
245 			if(g_random->random01() < 0.1) // Add whitespace character
246 				output += g_random->randomChoice(LiteralStringRef(" \t\n\r"));
247 			if(g_random->random01() < 0.5) { // Add one of the input characters
248 				output += *c;
249 				++c;
250 			}
251 			if(g_random->random01() < 0.1) { // Add a comment block
252 				output += "#";
253 				int charCount = g_random->randomInt(0, 20);
254 				for(int i = 0; i < charCount; i++) {
255 					output += g_random->randomChoice(LiteralStringRef("asdfzxcv123345:!@#$#$&()<\"\' \t"));
256 				}
257 				output += g_random->randomChoice(LiteralStringRef("\n\r"));
258 			}
259 		}
260 
261 		ClusterConnectionString cs(output);
262 		ASSERT( connectionString == cs.toString() );
263 	}
264 	return Void();
265 }
266 
ClusterConnectionString(vector<NetworkAddress> servers,Key key)267 ClusterConnectionString::ClusterConnectionString( vector<NetworkAddress> servers, Key key )
268 	: coord(servers)
269 {
270 	parseKey(key.toString());
271 }
272 
parseKey(std::string const & key)273 void ClusterConnectionString::parseKey( std::string const& key ) {
274 	// Check the structure of the given key, and fill in this->key and this->keyDesc
275 
276 	// The key must contain one (and only one) : character
277 	int colon = key.find_first_of(':');
278 	if (colon == key.npos)
279 		throw connection_string_invalid();
280 	std::string desc = key.substr(0, colon);
281 	std::string id = key.substr(colon+1);
282 
283 	// Check that description contains only allowed characters (a-z, A-Z, 0-9, _)
284 	for(auto c=desc.begin(); c!=desc.end(); ++c)
285 		if (!(isalnum(*c) || *c == '_'))
286 			throw connection_string_invalid();
287 
288 	// Check that ID contains only allowed characters (a-z, A-Z, 0-9)
289 	for(auto c=id.begin(); c!=id.end(); ++c)
290 		if (!isalnum(*c))
291 			throw connection_string_invalid();
292 
293 	this->key = StringRef(key);
294 	this->keyDesc = StringRef(desc);
295 }
296 
toString() const297 std::string ClusterConnectionString::toString() const {
298 	std::string s = key.toString();
299 	s += '@';
300 	for(int i=0; i<coord.size(); i++) {
301 		if (i) s += ',';
302 		s += coord[i].toString();
303 	}
304 	return s;
305 }
306 
ClientCoordinators(Reference<ClusterConnectionFile> ccf)307 ClientCoordinators::ClientCoordinators( Reference<ClusterConnectionFile> ccf )
308 	: ccf(ccf)
309 {
310 	ClusterConnectionString cs = ccf->getConnectionString();
311 	for(auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s)
312 		clientLeaderServers.push_back( ClientLeaderRegInterface( *s ) );
313 	clusterKey = cs.clusterKey();
314 }
315 
316 UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 );
317 
ClientLeaderRegInterface(NetworkAddress remote)318 ClientLeaderRegInterface::ClientLeaderRegInterface( NetworkAddress remote )
319 	: getLeader( Endpoint({remote}, WLTOKEN_CLIENTLEADERREG_GETLEADER) )
320 {
321 }
322 
ClientLeaderRegInterface(INetwork * local)323 ClientLeaderRegInterface::ClientLeaderRegInterface( INetwork* local ) {
324 	getLeader.makeWellKnownEndpoint( WLTOKEN_CLIENTLEADERREG_GETLEADER, TaskCoordination );
325 }
326 
327 // Nominee is the worker among all workers that are considered as leader by a coordinator
328 // This function contacts a coordinator coord to ask if the worker is considered as a leader (i.e., if the worker
329 // is a nominee)
monitorNominee(Key key,ClientLeaderRegInterface coord,AsyncTrigger * nomineeChange,Optional<LeaderInfo> * info,int generation,Reference<AsyncVar<int>> connectedCoordinatorsNum)330 ACTOR Future<Void> monitorNominee( Key key, ClientLeaderRegInterface coord, AsyncTrigger* nomineeChange, Optional<LeaderInfo> *info, int generation, Reference<AsyncVar<int>> connectedCoordinatorsNum ) {
331 	state bool hasCounted = false;
332 	loop {
333 		state Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.getLeader, GetLeaderRequest( key, info->present() ? info->get().changeID : UID() ), TaskCoordinationReply ) );
334 		if (li.present() && !hasCounted && connectedCoordinatorsNum.isValid()) {
335 			connectedCoordinatorsNum->set(connectedCoordinatorsNum->get() + 1);
336 			hasCounted = true;
337 		}
338 		wait( Future<Void>(Void()) ); // Make sure we weren't cancelled
339 
340 		TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().getPrimaryAddress()).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation);
341 
342 		if (li != *info) {
343 			*info = li;
344 			nomineeChange->trigger();
345 
346 			if( li.present() && li.get().forward )
347 				wait( Future<Void>(Never()) );
348 			wait( Future<Void>(Void()) );
349 		}
350 	}
351 }
352 
353 // Also used in fdbserver/LeaderElection.actor.cpp!
354 // bool represents if the LeaderInfo is a majority answer or not.
355 // This function also masks the first 7 bits of changeId of the nominees and returns the Leader with masked changeId
getLeader(const vector<Optional<LeaderInfo>> & nominees)356 Optional<std::pair<LeaderInfo, bool>> getLeader( const vector<Optional<LeaderInfo>>& nominees ) {
357 	vector<LeaderInfo> maskedNominees;
358 	maskedNominees.reserve(nominees.size());
359 	for (auto &nominee : nominees) {
360 		if (nominee.present()) {
361 			maskedNominees.push_back(nominee.get());
362 			maskedNominees.back().changeID = UID(maskedNominees.back().changeID.first() & LeaderInfo::mask, maskedNominees.back().changeID.second());
363 		}
364 	}
365 
366 	// If any coordinator says that the quorum is forwarded, then it is
367 	for(int i=0; i<maskedNominees.size(); i++)
368 		if (maskedNominees[i].forward)
369 			return std::pair<LeaderInfo, bool>(maskedNominees[i], true);
370 
371 	if(!maskedNominees.size())
372 		return Optional<std::pair<LeaderInfo, bool>>();
373 
374 	std::sort(maskedNominees.begin(), maskedNominees.end(),
375 		[](const LeaderInfo& l, const LeaderInfo& r) { return l.changeID < r.changeID; });
376 
377 	int bestCount = 0;
378 	LeaderInfo bestNominee;
379 	LeaderInfo currentNominee;
380 	int curCount = 0;
381 	for (int i = 0; i < maskedNominees.size(); i++) {
382 		if (currentNominee == maskedNominees[i]) {
383 			curCount++;
384 		}
385 		else {
386 			if (curCount > bestCount) {
387 				bestNominee = currentNominee;
388 				bestCount = curCount;
389 			}
390 			currentNominee = maskedNominees[i];
391 			curCount = 1;
392 		}
393 	}
394 	if (curCount > bestCount) {
395 		bestNominee = currentNominee;
396 		bestCount = curCount;
397 	}
398 
399 	bool majority = bestCount >= nominees.size() / 2 + 1;
400 	return std::pair<LeaderInfo, bool>(bestNominee, majority);
401 }
402 
403 struct MonitorLeaderInfo {
404 	bool hasConnected;
405 	Reference<ClusterConnectionFile> intermediateConnFile;
406 	int generation;
407 
MonitorLeaderInfoMonitorLeaderInfo408 	MonitorLeaderInfo() : hasConnected(false), generation(0) {}
MonitorLeaderInfoMonitorLeaderInfo409 	explicit MonitorLeaderInfo( Reference<ClusterConnectionFile> intermediateConnFile ) : intermediateConnFile(intermediateConnFile), hasConnected(false), generation(0) {}
410 };
411 
412 // Leader is the process that will be elected by coordinators as the cluster controller
monitorLeaderOneGeneration(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<Value>> outSerializedLeaderInfo,MonitorLeaderInfo info,Reference<AsyncVar<int>> connectedCoordinatorsNum)413 ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, MonitorLeaderInfo info,  Reference<AsyncVar<int>> connectedCoordinatorsNum) {
414 	state ClientCoordinators coordinators( info.intermediateConnFile );
415 	state AsyncTrigger nomineeChange;
416 	state std::vector<Optional<LeaderInfo>> nominees;
417 	state Future<Void> allActors;
418 
419 	nominees.resize(coordinators.clientLeaderServers.size());
420 
421 	std::vector<Future<Void>> actors;
422 	// Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
423 	for(int i=0; i<coordinators.clientLeaderServers.size(); i++)
424 		actors.push_back( monitorNominee( coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], info.generation, connectedCoordinatorsNum) );
425 	allActors = waitForAll(actors);
426 
427 	loop {
428 		Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
429 		TraceEvent("MonitorLeaderChange").detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1,1));
430 		if (leader.present()) {
431 			if( leader.get().first.forward ) {
432 				TraceEvent("MonitorLeaderForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
433 				info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(leader.get().first.serializedInfo.toString())));
434 				return info;
435 			}
436 			if(connFile != info.intermediateConnFile) {
437 				if(!info.hasConnected) {
438 					TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection").detail("Filename", connFile->getFilename())
439 						.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
440 						.detail("CurrentConnectionString", info.intermediateConnFile->getConnectionString().toString());
441 				}
442 				connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
443 				info.intermediateConnFile = connFile;
444 			}
445 
446 			info.hasConnected = true;
447 			connFile->notifyConnected();
448 
449 			outSerializedLeaderInfo->set( leader.get().first.serializedInfo );
450 		}
451 		wait( nomineeChange.onTrigger() || allActors );
452 	}
453 }
454 
monitorLeaderInternal(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<Value>> outSerializedLeaderInfo,Reference<AsyncVar<int>> connectedCoordinatorsNum)455 ACTOR Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, Reference<AsyncVar<int>> connectedCoordinatorsNum ) {
456 	state MonitorLeaderInfo info(connFile);
457 	loop {
458 		// set the AsyncVar to 0
459 		if (connectedCoordinatorsNum.isValid()) connectedCoordinatorsNum->set(0);
460 		MonitorLeaderInfo _info = wait( monitorLeaderOneGeneration( connFile, outSerializedLeaderInfo, info, connectedCoordinatorsNum) );
461 		info = _info;
462 		info.generation++;
463 
464 	}
465 }
466