1 /*
2  * LeaderElection.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbrpc/FailureMonitor.h"
22 #include "fdbrpc/Locality.h"
23 #include "fdbserver/ClusterRecruitmentInterface.h"
24 #include "fdbserver/CoordinationInterface.h"
25 #include "fdbclient/MonitorLeader.h"
26 #include "flow/actorcompiler.h"  // This must be the last #include.
27 
28 Optional<std::pair<LeaderInfo, bool>> getLeader( const vector<Optional<LeaderInfo>>& nominees );
29 
submitCandidacy(Key key,LeaderElectionRegInterface coord,LeaderInfo myInfo,UID prevChangeID,Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees,int index)30 ACTOR Future<Void> submitCandidacy( Key key, LeaderElectionRegInterface coord, LeaderInfo myInfo, UID prevChangeID, Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees, int index ) {
31 	loop {
32 		auto const& nom = nominees->get()[index];
33 		Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.candidacy, CandidacyRequest( key, myInfo, nom.present() ? nom.get().changeID : UID(), prevChangeID ), TaskCoordinationReply ) );
34 
35 		if (li != nominees->get()[index]) {
36 			vector<Optional<LeaderInfo>> v = nominees->get();
37 			v[index] = li;
38 			nominees->set(v);
39 
40 			if( li.present() && li.get().forward )
41 				wait( Future<Void>(Never()) );
42 
43 			wait( Future<Void>(Void()) ); // Make sure we weren't cancelled
44 		}
45 	}
46 }
47 
buggifyDelayedAsyncVar(Reference<AsyncVar<T>> in,Reference<AsyncVar<T>> out)48 ACTOR template <class T> Future<Void> buggifyDelayedAsyncVar( Reference<AsyncVar<T>> in, Reference<AsyncVar<T>> out ) {
49 	try {
50 		loop {
51 			wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY * g_random->random01() ) );
52 			out->set( in->get() );
53 			wait( in->onChange() );
54 		}
55 	} catch (Error& e) {
56 		out->set( in->get() );
57 		throw;
58 	}
59 }
60 
61 template <class T>
buggifyDelayedAsyncVar(Reference<AsyncVar<T>> & var)62 Future<Void> buggifyDelayedAsyncVar( Reference<AsyncVar<T>> &var ) {
63 	Reference<AsyncVar<T>> in( new AsyncVar<T> );
64 	auto f = buggifyDelayedAsyncVar(in, var);
65 	var = in;
66 	return f;
67 }
68 
changeLeaderCoordinators(ServerCoordinators coordinators,Value forwardingInfo)69 ACTOR Future<Void> changeLeaderCoordinators( ServerCoordinators coordinators, Value forwardingInfo ) {
70 	std::vector<Future<Void>> forwardRequests;
71 	for( int i = 0; i < coordinators.leaderElectionServers.size(); i++ )
72 		forwardRequests.push_back( retryBrokenPromise( coordinators.leaderElectionServers[i].forward, ForwardRequest( coordinators.clusterKey, forwardingInfo ) ) );
73 	int quorum_size = forwardRequests.size()/2 + 1;
74 	wait( quorum( forwardRequests, quorum_size ) );
75 	return Void();
76 }
77 
tryBecomeLeaderInternal(ServerCoordinators coordinators,Value proposedSerializedInterface,Reference<AsyncVar<Value>> outSerializedLeader,bool hasConnected,Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo)78 ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Value proposedSerializedInterface, Reference<AsyncVar<Value>> outSerializedLeader, bool hasConnected, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo ) {
79 	state Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees( new AsyncVar<vector<Optional<LeaderInfo>>>() );
80 	state LeaderInfo myInfo;
81 	state Future<Void> candidacies;
82 	state bool iAmLeader = false;
83 	state UID prevChangeID;
84 
85 
86 	if(asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessBad || asyncPriorityInfo->get().dcFitness == ClusterControllerPriorityInfo::FitnessRemote || asyncPriorityInfo->get().isExcluded) {
87 		wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY) );
88 	} else if( asyncPriorityInfo->get().processClassFitness > ProcessClass::UnsetFit ) {
89 		wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
90 	}
91 
92 	nominees->set( vector<Optional<LeaderInfo>>( coordinators.clientLeaderServers.size() ) );
93 
94 	myInfo.serializedInfo = proposedSerializedInterface;
95 	outSerializedLeader->set( Value() );
96 
97 	state Future<Void> buggifyDelay = (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) ? buggifyDelayedAsyncVar( outSerializedLeader ) : Void();
98 
99 	while (!iAmLeader) {
100 		state Future<Void> badCandidateTimeout;
101 
102 		myInfo.changeID = g_random->randomUniqueID();
103 		prevChangeID = myInfo.changeID;
104 		myInfo.updateChangeID( asyncPriorityInfo->get() );
105 
106 		vector<Future<Void>> cand;
107 		for(int i=0; i<coordinators.leaderElectionServers.size(); i++)
108 			cand.push_back( submitCandidacy( coordinators.clusterKey, coordinators.leaderElectionServers[i], myInfo, prevChangeID, nominees, i ) );
109 		candidacies = waitForAll(cand);
110 
111 		loop {
112 			state Optional<std::pair<LeaderInfo, bool>> leader = getLeader( nominees->get() );
113 			if( leader.present() && leader.get().first.forward ) {
114 				// These coordinators are forwarded to another set.  But before we change our own cluster file, we need to make
115 				// sure that a majority of coordinators know that.
116 				// SOMEDAY: Wait briefly to see if other coordinators will tell us they already know, to save communication?
117 				wait( changeLeaderCoordinators( coordinators, leader.get().first.serializedInfo ) );
118 
119 				if(!hasConnected) {
120 					TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection").detail("Filename", coordinators.ccf->getFilename())
121 						.detail("ConnectionStringFromFile", coordinators.ccf->getConnectionString().toString())
122 						.detail("CurrentConnectionString", leader.get().first.serializedInfo.toString());
123 				}
124 				coordinators.ccf->setConnectionString( ClusterConnectionString( leader.get().first.serializedInfo.toString() ) );
125 				TraceEvent("LeaderForwarding").detail("ConnStr", coordinators.ccf->getConnectionString().toString());
126 				throw coordinators_changed();
127 			}
128 
129 			if (leader.present() && leader.get().second) {
130 				hasConnected = true;
131 				coordinators.ccf->notifyConnected();
132 			}
133 
134 			if (leader.present() && leader.get().second && leader.get().first.equalInternalId(myInfo)) {
135 				TraceEvent("BecomingLeader", myInfo.changeID);
136 				ASSERT( leader.get().first.serializedInfo == proposedSerializedInterface );
137 				outSerializedLeader->set( leader.get().first.serializedInfo );
138 				iAmLeader = true;
139 				break;
140 			}
141 			if (leader.present()) {
142 				TraceEvent("LeaderChanged", myInfo.changeID).detail("ToID", leader.get().first.changeID);
143 				if (leader.get().first.serializedInfo != proposedSerializedInterface) // We never set outSerializedLeader to our own interface unless we are ready to become leader!
144 					outSerializedLeader->set( leader.get().first.serializedInfo );
145 			}
146 
147 			// If more than 2*SERVER_KNOBS->POLLING_FREQUENCY elapses while we are nominated by some coordinator but there is no leader,
148 			// we might be breaking the leader election process for someone with better communications but lower ID, so change IDs.
149 			if ((!leader.present() || !leader.get().second) && std::count( nominees->get().begin(), nominees->get().end(), myInfo )) {
150 				if (!badCandidateTimeout.isValid())
151 					badCandidateTimeout = delay( SERVER_KNOBS->POLLING_FREQUENCY*2, TaskCoordinationReply );
152 			} else
153 				badCandidateTimeout = Future<Void>();
154 
155 			choose {
156 				when (wait( nominees->onChange() )) {}
157 				when (wait( badCandidateTimeout.isValid() ? badCandidateTimeout : Never() )) {
158 					TEST(true); // Bad candidate timeout
159 					TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID);
160 					break;
161 				}
162 				when (wait(candidacies)) { ASSERT(false); }
163 				when (wait( asyncPriorityInfo->onChange() )) {
164 					break;
165 				}
166 			}
167 		}
168 
169 		candidacies.cancel();
170 	}
171 
172 	ASSERT( iAmLeader && outSerializedLeader->get() == proposedSerializedInterface );
173 
174 	loop {
175 		prevChangeID = myInfo.changeID;
176 		myInfo.updateChangeID( asyncPriorityInfo->get() );
177 		if (myInfo.changeID != prevChangeID) {
178 			TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID);
179 		}
180 
181 		state vector<Future<Void>> true_heartbeats;
182 		state vector<Future<Void>> false_heartbeats;
183 		for(int i=0; i<coordinators.leaderElectionServers.size(); i++) {
184 			Future<bool> hb = retryBrokenPromise( coordinators.leaderElectionServers[i].leaderHeartbeat, LeaderHeartbeatRequest( coordinators.clusterKey, myInfo, prevChangeID ), TaskCoordinationReply );
185 			true_heartbeats.push_back( onEqual(hb, true) );
186 			false_heartbeats.push_back( onEqual(hb, false) );
187 		}
188 
189 		state Future<Void> rate = delay( SERVER_KNOBS->HEARTBEAT_FREQUENCY, TaskCoordinationReply ) || asyncPriorityInfo->onChange(); // SOMEDAY: Move to server side?
190 
191 		choose {
192 			when ( wait( quorum( true_heartbeats, true_heartbeats.size()/2+1 ) ) ) {
193 				//TraceEvent("StillLeader", myInfo.changeID);
194 			} // We are still leader
195 			when ( wait( quorum( false_heartbeats, false_heartbeats.size()/2+1 ) ) ) {
196 				TraceEvent("ReplacedAsLeader", myInfo.changeID);
197 				break;
198 			} // We are definitely not leader
199 			when ( wait( delay(SERVER_KNOBS->POLLING_FREQUENCY) ) ) {
200 				for(int i = 0; i < coordinators.leaderElectionServers.size(); ++i) {
201 					if(true_heartbeats[i].isReady())
202 						TraceEvent("LeaderTrueHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress());
203 					else if(false_heartbeats[i].isReady())
204 						TraceEvent("LeaderFalseHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress());
205 					else
206 						TraceEvent("LeaderNoHeartbeat", myInfo.changeID).detail("Coordinator", coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress());
207 				}
208 				TraceEvent("ReleasingLeadership", myInfo.changeID);
209 				break;
210 			} // Give up on being leader, because we apparently have poor communications
211 			when ( wait( asyncPriorityInfo->onChange() ) ) {}
212 		}
213 
214 		wait( rate );
215 	}
216 
217 	if (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY * g_random->random01() ) );
218 
219 	return Void(); // We are no longer leader
220 }
221