1 /*
2 * MonitorLeader.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbclient/MonitorLeader.h"
22 #include "fdbclient/CoordinationInterface.h"
23 #include "flow/ActorCollection.h"
24 #include "flow/UnitTest.h"
25 #include "fdbrpc/genericactors.actor.h"
26 #include "fdbrpc/Platform.h"
27 #include "flow/actorcompiler.h" // has to be last include
28
lookupClusterFileName(std::string const & filename)29 std::pair< std::string, bool > ClusterConnectionFile::lookupClusterFileName( std::string const& filename ) {
30 if (filename.length())
31 return std::make_pair(filename, false);
32
33 std::string f;
34 bool isDefaultFile = true;
35 if (platform::getEnvironmentVar(CLUSTER_FILE_ENV_VAR_NAME, f)) {
36 // If this is set but points to a file that does not
37 // exist, we will not fallback to any other methods
38 isDefaultFile = false;
39 } else if (fileExists("fdb.cluster"))
40 f = "fdb.cluster";
41 else
42 f = platform::getDefaultClusterFilePath();
43
44 return std::make_pair( f, isDefaultFile );
45 }
46
getErrorString(std::pair<std::string,bool> const & resolvedClusterFile,Error const & e)47 std::string ClusterConnectionFile::getErrorString( std::pair<std::string, bool> const& resolvedClusterFile, Error const& e ) {
48 bool isDefault = resolvedClusterFile.second;
49 if( e.code() == error_code_connection_string_invalid ) {
50 return format("Invalid cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
51 } else if( e.code() == error_code_no_cluster_file_found ) {
52 if( isDefault )
53 return format("Unable to read cluster file `./fdb.cluster' or `%s' and %s unset: %d %s",
54 platform::getDefaultClusterFilePath().c_str(), CLUSTER_FILE_ENV_VAR_NAME, e.code(), e.what());
55 else
56 return format("Unable to read cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
57 } else {
58 return format("Unexpected error loading cluster file `%s': %d %s", resolvedClusterFile.first.c_str(), e.code(), e.what());
59 }
60 }
61
ClusterConnectionFile(std::string const & filename)62 ClusterConnectionFile::ClusterConnectionFile( std::string const& filename ) {
63 if( !fileExists( filename ) ) {
64 throw no_cluster_file_found();
65 }
66
67 cs = ClusterConnectionString(readFileBytes(filename, MAX_CLUSTER_FILE_BYTES));
68 this->filename = filename;
69 setConn = false;
70 }
71
ClusterConnectionFile(std::string const & filename,ClusterConnectionString const & contents)72 ClusterConnectionFile::ClusterConnectionFile(std::string const& filename, ClusterConnectionString const& contents) {
73 this->filename = filename;
74 cs = contents;
75 setConn = true;
76 }
77
getConnectionString() const78 ClusterConnectionString const& ClusterConnectionFile::getConnectionString() const {
79 return cs;
80 }
81
notifyConnected()82 void ClusterConnectionFile::notifyConnected() {
83 if (setConn){
84 this->writeFile();
85 }
86 }
87
fileContentsUpToDate() const88 bool ClusterConnectionFile::fileContentsUpToDate() const {
89 ClusterConnectionString temp;
90 return fileContentsUpToDate(temp);
91 }
92
fileContentsUpToDate(ClusterConnectionString & fileConnectionString) const93 bool ClusterConnectionFile::fileContentsUpToDate(ClusterConnectionString &fileConnectionString) const {
94 try {
95 // the cluster file hasn't been created yet so there's nothing to check
96 if (setConn)
97 return true;
98
99 ClusterConnectionFile temp( filename );
100 fileConnectionString = temp.getConnectionString();
101 return fileConnectionString.toString() == cs.toString();
102 }
103 catch (Error& e) {
104 TraceEvent(SevWarnAlways, "ClusterFileError").error(e).detail("Filename", filename);
105 return false; // Swallow the error and report that the file is out of date
106 }
107 }
108
writeFile()109 bool ClusterConnectionFile::writeFile() {
110 setConn = false;
111 if(filename.size()) {
112 try {
113 atomicReplace( filename, "# DO NOT EDIT!\n# This file is auto-generated, it is not to be edited by hand\n" + cs.toString().append("\n") );
114 if(!fileContentsUpToDate()) {
115 // This should only happen in rare scenarios where multiple processes are updating the same file to different values simultaneously
116 // In that case, we don't have any guarantees about which file will ultimately be written
117 TraceEvent(SevWarnAlways, "ClusterFileChangedAfterReplace").detail("Filename", filename).detail("ConnStr", cs.toString());
118 return false;
119 }
120
121 return true;
122 } catch( Error &e ) {
123 TraceEvent(SevWarnAlways, "UnableToChangeConnectionFile").error(e).detail("Filename", filename).detail("ConnStr", cs.toString());
124 }
125 }
126
127 return false;
128 }
129
setConnectionString(ClusterConnectionString const & conn)130 void ClusterConnectionFile::setConnectionString( ClusterConnectionString const& conn ) {
131 ASSERT( filename.size() );
132 cs = conn;
133 writeFile();
134 }
135
getErrorString(std::string const & source,Error const & e)136 std::string ClusterConnectionString::getErrorString( std::string const& source, Error const& e ) {
137 if( e.code() == error_code_connection_string_invalid ) {
138 return format("Invalid connection string `%s: %d %s", source.c_str(), e.code(), e.what());
139 }
140 else {
141 return format("Unexpected error parsing connection string `%s: %d %s", source.c_str(), e.code(), e.what());
142 }
143 }
144
trim(std::string const & connectionString)145 std::string trim( std::string const& connectionString ) {
146 // Strip out whitespace
147 // Strip out characters between a # and a newline
148 std::string trimmed;
149 auto end = connectionString.end();
150 for(auto c=connectionString.begin(); c!=end; ++c) {
151 if (*c == '#') {
152 ++c;
153 while(c!=end && *c != '\n' && *c != '\r')
154 ++c;
155 if(c == end)
156 break;
157 }
158 else if (*c != ' ' && *c != '\n' && *c != '\r' && *c != '\t')
159 trimmed += *c;
160 }
161 return trimmed;
162 }
163
ClusterConnectionString(std::string const & connectionString)164 ClusterConnectionString::ClusterConnectionString( std::string const& connectionString ) {
165 auto trimmed = trim(connectionString);
166
167 // Split on '@' into key@addrs
168 int pAt = trimmed.find_first_of('@');
169 if (pAt == trimmed.npos)
170 throw connection_string_invalid();
171 std::string key = trimmed.substr(0, pAt);
172 std::string addrs = trimmed.substr(pAt+1);
173
174 parseKey(key);
175
176 coord = NetworkAddress::parseList(addrs);
177 ASSERT( coord.size() > 0 ); // parseList() always returns at least one address if it doesn't throw
178
179 std::sort( coord.begin(), coord.end() );
180 // Check that there are no duplicate addresses
181 if ( std::unique( coord.begin(), coord.end() ) != coord.end() )
182 throw connection_string_invalid();
183 }
184
185 TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/basic") {
186 std::string input;
187
188 {
189 input = "asdf:2345@1.1.1.1:345";
190 ClusterConnectionString cs(input);
191 ASSERT( input == cs.toString() );
192 }
193
194 {
195 input = "0xxdeadbeef:100100100@1.1.1.1:34534,5.1.5.3:23443";
196 ClusterConnectionString cs(input);
197 ASSERT( input == cs.toString() );
198 }
199
200 {
201 input = "0xxdeadbeef:100100100@1.1.1.1:34534,5.1.5.3:23443";
202 std::string commented("#start of comment\n");
203 commented += input;
204 commented += "\n";
205 commented += "# asdfasdf ##";
206
207 ClusterConnectionString cs(commented);
208 ASSERT( input == cs.toString() );
209 }
210
211 {
212 input = "0xxdeadbeef:100100100@[::1]:1234,[::1]:1235";
213 std::string commented("#start of comment\n");
214 commented += input;
215 commented += "\n";
216 commented += "# asdfasdf ##";
217
218 ClusterConnectionString cs(commented);
219 ASSERT(input == cs.toString());
220 }
221
222 {
223 input = "0xxdeadbeef:100100100@[abcd:dcba::1]:1234,[abcd:dcba::abcd:1]:1234";
224 std::string commented("#start of comment\n");
225 commented += input;
226 commented += "\n";
227 commented += "# asdfasdf ##";
228
229 ClusterConnectionString cs(commented);
230 ASSERT(input == cs.toString());
231 }
232
233 return Void();
234 }
235
236 TEST_CASE("/fdbclient/MonitorLeader/parseConnectionString/fuzz") {
237 // For a static connection string, add in fuzzed comments and whitespace
238 // SOMEDAY: create a series of random connection strings, rather than the one we started with
239 std::string connectionString = "0xxdeadbeef:100100100@1.1.1.1:34534,5.1.5.3:23443";
240 for(int i=0; i<10000; i++)
241 {
242 std::string output("");
243 auto c=connectionString.begin();
244 while(c!=connectionString.end()) {
245 if(g_random->random01() < 0.1) // Add whitespace character
246 output += g_random->randomChoice(LiteralStringRef(" \t\n\r"));
247 if(g_random->random01() < 0.5) { // Add one of the input characters
248 output += *c;
249 ++c;
250 }
251 if(g_random->random01() < 0.1) { // Add a comment block
252 output += "#";
253 int charCount = g_random->randomInt(0, 20);
254 for(int i = 0; i < charCount; i++) {
255 output += g_random->randomChoice(LiteralStringRef("asdfzxcv123345:!@#$#$&()<\"\' \t"));
256 }
257 output += g_random->randomChoice(LiteralStringRef("\n\r"));
258 }
259 }
260
261 ClusterConnectionString cs(output);
262 ASSERT( connectionString == cs.toString() );
263 }
264 return Void();
265 }
266
ClusterConnectionString(vector<NetworkAddress> servers,Key key)267 ClusterConnectionString::ClusterConnectionString( vector<NetworkAddress> servers, Key key )
268 : coord(servers)
269 {
270 parseKey(key.toString());
271 }
272
parseKey(std::string const & key)273 void ClusterConnectionString::parseKey( std::string const& key ) {
274 // Check the structure of the given key, and fill in this->key and this->keyDesc
275
276 // The key must contain one (and only one) : character
277 int colon = key.find_first_of(':');
278 if (colon == key.npos)
279 throw connection_string_invalid();
280 std::string desc = key.substr(0, colon);
281 std::string id = key.substr(colon+1);
282
283 // Check that description contains only allowed characters (a-z, A-Z, 0-9, _)
284 for(auto c=desc.begin(); c!=desc.end(); ++c)
285 if (!(isalnum(*c) || *c == '_'))
286 throw connection_string_invalid();
287
288 // Check that ID contains only allowed characters (a-z, A-Z, 0-9)
289 for(auto c=id.begin(); c!=id.end(); ++c)
290 if (!isalnum(*c))
291 throw connection_string_invalid();
292
293 this->key = StringRef(key);
294 this->keyDesc = StringRef(desc);
295 }
296
toString() const297 std::string ClusterConnectionString::toString() const {
298 std::string s = key.toString();
299 s += '@';
300 for(int i=0; i<coord.size(); i++) {
301 if (i) s += ',';
302 s += coord[i].toString();
303 }
304 return s;
305 }
306
ClientCoordinators(Reference<ClusterConnectionFile> ccf)307 ClientCoordinators::ClientCoordinators( Reference<ClusterConnectionFile> ccf )
308 : ccf(ccf)
309 {
310 ClusterConnectionString cs = ccf->getConnectionString();
311 for(auto s = cs.coordinators().begin(); s != cs.coordinators().end(); ++s)
312 clientLeaderServers.push_back( ClientLeaderRegInterface( *s ) );
313 clusterKey = cs.clusterKey();
314 }
315
316 UID WLTOKEN_CLIENTLEADERREG_GETLEADER( -1, 2 );
317
ClientLeaderRegInterface(NetworkAddress remote)318 ClientLeaderRegInterface::ClientLeaderRegInterface( NetworkAddress remote )
319 : getLeader( Endpoint({remote}, WLTOKEN_CLIENTLEADERREG_GETLEADER) )
320 {
321 }
322
ClientLeaderRegInterface(INetwork * local)323 ClientLeaderRegInterface::ClientLeaderRegInterface( INetwork* local ) {
324 getLeader.makeWellKnownEndpoint( WLTOKEN_CLIENTLEADERREG_GETLEADER, TaskCoordination );
325 }
326
327 // Nominee is the worker among all workers that are considered as leader by a coordinator
328 // This function contacts a coordinator coord to ask if the worker is considered as a leader (i.e., if the worker
329 // is a nominee)
monitorNominee(Key key,ClientLeaderRegInterface coord,AsyncTrigger * nomineeChange,Optional<LeaderInfo> * info,int generation,Reference<AsyncVar<int>> connectedCoordinatorsNum)330 ACTOR Future<Void> monitorNominee( Key key, ClientLeaderRegInterface coord, AsyncTrigger* nomineeChange, Optional<LeaderInfo> *info, int generation, Reference<AsyncVar<int>> connectedCoordinatorsNum ) {
331 state bool hasCounted = false;
332 loop {
333 state Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.getLeader, GetLeaderRequest( key, info->present() ? info->get().changeID : UID() ), TaskCoordinationReply ) );
334 if (li.present() && !hasCounted && connectedCoordinatorsNum.isValid()) {
335 connectedCoordinatorsNum->set(connectedCoordinatorsNum->get() + 1);
336 hasCounted = true;
337 }
338 wait( Future<Void>(Void()) ); // Make sure we weren't cancelled
339
340 TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().getPrimaryAddress()).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation);
341
342 if (li != *info) {
343 *info = li;
344 nomineeChange->trigger();
345
346 if( li.present() && li.get().forward )
347 wait( Future<Void>(Never()) );
348 wait( Future<Void>(Void()) );
349 }
350 }
351 }
352
353 // Also used in fdbserver/LeaderElection.actor.cpp!
354 // bool represents if the LeaderInfo is a majority answer or not.
355 // This function also masks the first 7 bits of changeId of the nominees and returns the Leader with masked changeId
getLeader(const vector<Optional<LeaderInfo>> & nominees)356 Optional<std::pair<LeaderInfo, bool>> getLeader( const vector<Optional<LeaderInfo>>& nominees ) {
357 vector<LeaderInfo> maskedNominees;
358 maskedNominees.reserve(nominees.size());
359 for (auto &nominee : nominees) {
360 if (nominee.present()) {
361 maskedNominees.push_back(nominee.get());
362 maskedNominees.back().changeID = UID(maskedNominees.back().changeID.first() & LeaderInfo::mask, maskedNominees.back().changeID.second());
363 }
364 }
365
366 // If any coordinator says that the quorum is forwarded, then it is
367 for(int i=0; i<maskedNominees.size(); i++)
368 if (maskedNominees[i].forward)
369 return std::pair<LeaderInfo, bool>(maskedNominees[i], true);
370
371 if(!maskedNominees.size())
372 return Optional<std::pair<LeaderInfo, bool>>();
373
374 std::sort(maskedNominees.begin(), maskedNominees.end(),
375 [](const LeaderInfo& l, const LeaderInfo& r) { return l.changeID < r.changeID; });
376
377 int bestCount = 0;
378 LeaderInfo bestNominee;
379 LeaderInfo currentNominee;
380 int curCount = 0;
381 for (int i = 0; i < maskedNominees.size(); i++) {
382 if (currentNominee == maskedNominees[i]) {
383 curCount++;
384 }
385 else {
386 if (curCount > bestCount) {
387 bestNominee = currentNominee;
388 bestCount = curCount;
389 }
390 currentNominee = maskedNominees[i];
391 curCount = 1;
392 }
393 }
394 if (curCount > bestCount) {
395 bestNominee = currentNominee;
396 bestCount = curCount;
397 }
398
399 bool majority = bestCount >= nominees.size() / 2 + 1;
400 return std::pair<LeaderInfo, bool>(bestNominee, majority);
401 }
402
403 struct MonitorLeaderInfo {
404 bool hasConnected;
405 Reference<ClusterConnectionFile> intermediateConnFile;
406 int generation;
407
MonitorLeaderInfoMonitorLeaderInfo408 MonitorLeaderInfo() : hasConnected(false), generation(0) {}
MonitorLeaderInfoMonitorLeaderInfo409 explicit MonitorLeaderInfo( Reference<ClusterConnectionFile> intermediateConnFile ) : intermediateConnFile(intermediateConnFile), hasConnected(false), generation(0) {}
410 };
411
412 // Leader is the process that will be elected by coordinators as the cluster controller
monitorLeaderOneGeneration(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<Value>> outSerializedLeaderInfo,MonitorLeaderInfo info,Reference<AsyncVar<int>> connectedCoordinatorsNum)413 ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, MonitorLeaderInfo info, Reference<AsyncVar<int>> connectedCoordinatorsNum) {
414 state ClientCoordinators coordinators( info.intermediateConnFile );
415 state AsyncTrigger nomineeChange;
416 state std::vector<Optional<LeaderInfo>> nominees;
417 state Future<Void> allActors;
418
419 nominees.resize(coordinators.clientLeaderServers.size());
420
421 std::vector<Future<Void>> actors;
422 // Ask all coordinators if the worker is considered as a leader (leader nominee) by the coordinator.
423 for(int i=0; i<coordinators.clientLeaderServers.size(); i++)
424 actors.push_back( monitorNominee( coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], info.generation, connectedCoordinatorsNum) );
425 allActors = waitForAll(actors);
426
427 loop {
428 Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
429 TraceEvent("MonitorLeaderChange").detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1,1));
430 if (leader.present()) {
431 if( leader.get().first.forward ) {
432 TraceEvent("MonitorLeaderForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
433 info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(leader.get().first.serializedInfo.toString())));
434 return info;
435 }
436 if(connFile != info.intermediateConnFile) {
437 if(!info.hasConnected) {
438 TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection").detail("Filename", connFile->getFilename())
439 .detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
440 .detail("CurrentConnectionString", info.intermediateConnFile->getConnectionString().toString());
441 }
442 connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
443 info.intermediateConnFile = connFile;
444 }
445
446 info.hasConnected = true;
447 connFile->notifyConnected();
448
449 outSerializedLeaderInfo->set( leader.get().first.serializedInfo );
450 }
451 wait( nomineeChange.onTrigger() || allActors );
452 }
453 }
454
monitorLeaderInternal(Reference<ClusterConnectionFile> connFile,Reference<AsyncVar<Value>> outSerializedLeaderInfo,Reference<AsyncVar<int>> connectedCoordinatorsNum)455 ACTOR Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Value>> outSerializedLeaderInfo, Reference<AsyncVar<int>> connectedCoordinatorsNum ) {
456 state MonitorLeaderInfo info(connFile);
457 loop {
458 // set the AsyncVar to 0
459 if (connectedCoordinatorsNum.isValid()) connectedCoordinatorsNum->set(0);
460 MonitorLeaderInfo _info = wait( monitorLeaderOneGeneration( connFile, outSerializedLeaderInfo, info, connectedCoordinatorsNum) );
461 info = _info;
462 info.generation++;
463
464 }
465 }
466