1 /*
2  * CoordinatedState.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbserver/CoordinatedState.h"
22 #include "fdbserver/CoordinationInterface.h"
23 #include "fdbserver/Knobs.h"
24 #include "flow/ActorCollection.h"
25 #include "fdbserver/LeaderElection.h"
26 #include "flow/actorcompiler.h" // has to be last include
27 
waitAndSendRead(RequestStream<GenerationRegReadRequest> to,GenerationRegReadRequest req)28 ACTOR Future<GenerationRegReadReply> waitAndSendRead( RequestStream<GenerationRegReadRequest> to, GenerationRegReadRequest req ) {
29 	if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
30 		wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
31 	state GenerationRegReadReply reply = wait( retryBrokenPromise( to, req ) );
32 	if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
33 		wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
34 	return reply;
35 }
36 
waitAndSendWrite(RequestStream<GenerationRegWriteRequest> to,GenerationRegWriteRequest req)37 ACTOR Future<UniqueGeneration> waitAndSendWrite(RequestStream<GenerationRegWriteRequest> to, GenerationRegWriteRequest req) {
38 	if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
39 		wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
40 	state UniqueGeneration reply = wait( retryBrokenPromise( to, req ) );
41 	if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
42 		wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
43 	return reply;
44 }
45 
emptyToNever(Future<GenerationRegReadReply> f)46 ACTOR Future<GenerationRegReadReply> emptyToNever( Future<GenerationRegReadReply> f ) {
47 	state GenerationRegReadReply r = wait(f);
48 	if (r.gen.generation == 0)
49 		wait( Future<Void>(Never()) );
50 	return r;
51 }
52 
nonemptyToNever(Future<GenerationRegReadReply> f)53 ACTOR Future<GenerationRegReadReply> nonemptyToNever( Future<GenerationRegReadReply> f ) {
54 	state GenerationRegReadReply r = wait(f);
55 	if (r.gen.generation != 0)
56 		wait( Future<Void>(Never()) );
57 	return r;
58 }
59 
60 struct CoordinatedStateImpl {
61 	ServerCoordinators coordinators;
62 	int stage;
63 	UniqueGeneration gen;
64 	uint64_t conflictGen;
65 	bool doomed;
66 	ActorCollection ac; //Errors are not reported
67 	bool initial;
68 
CoordinatedStateImplCoordinatedStateImpl69 	CoordinatedStateImpl( ServerCoordinators const& c ) : coordinators(c), stage(0), conflictGen(0), doomed(false), ac(false), initial(false) {}
getConflictCoordinatedStateImpl70 	uint64_t getConflict() { return conflictGen; }
71 
isDoomedCoordinatedStateImpl72 	bool isDoomed( GenerationRegReadReply const& rep ) {
73 		return rep.gen > gen // setExclusive is doomed, because there was a write at least started at a higher generation, which means a read completed at that higher generation
74 			// || rep.rgen > gen // setExclusive isn't absolutely doomed, but it may/probably will fail
75 			;
76 	}
77 
readCoordinatedStateImpl78 	ACTOR static Future<Value> read( CoordinatedStateImpl* self ) {
79 		ASSERT( self->stage == 0 );
80 
81 		{
82 			self->stage = 1;
83 			GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, UniqueGeneration() ) ) );
84 			self->conflictGen = std::max( self->conflictGen, std::max(rep.gen.generation, rep.rgen.generation) ) + 1;
85 			self->gen = UniqueGeneration( self->conflictGen, g_random->randomUniqueID() );
86 		}
87 
88 		{
89 			self->stage = 2;
90 			GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, self->gen ) ) );
91 			self->stage = 3;
92 			self->conflictGen = std::max(self->conflictGen, std::max( rep.gen.generation, rep.rgen.generation ));
93 			if (self->isDoomed(rep))
94 				self->doomed = true;
95 			self->initial = rep.gen.generation == 0;
96 
97 			self->stage = 4;
98 			return rep.value.present() ? rep.value.get() : Value();
99 		}
100 	}
onConflictCoordinatedStateImpl101 	ACTOR static Future<Void> onConflict( CoordinatedStateImpl* self ) {
102 		ASSERT( self->stage == 4 );
103 		if (self->doomed) return Void();
104 		loop {
105 			wait( delay( SERVER_KNOBS->COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL ) );
106 			GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, UniqueGeneration() ) ) );
107 			if (self->stage > 4) break;
108 			self->conflictGen = std::max(self->conflictGen, std::max( rep.gen.generation, rep.rgen.generation ));
109 			if (self->isDoomed(rep))
110 				return Void();
111 		}
112 		wait( Future<Void>(Never()) );
113 		return Void();
114 	}
setExclusiveCoordinatedStateImpl115 	ACTOR static Future<Void> setExclusive( CoordinatedStateImpl* self, Value v ) {
116 		ASSERT( self->stage == 4 );
117 		self->stage = 5;
118 
119 		UniqueGeneration wgen = wait( self->replicatedWrite( self, GenerationRegWriteRequest( KeyValueRef(self->coordinators.clusterKey, v), self->gen ) ) );
120 		self->stage = 6;
121 
122 		TraceEvent("CoordinatedStateSet").detail("Gen", self->gen.generation).detail("Wgen", wgen.generation)
123 			.detail("Genu", self->gen.uid).detail("Wgenu", wgen.uid)
124 			.detail("Cgen", self->conflictGen);
125 
126 		if (wgen == self->gen)
127 			return Void();
128 		else {
129 			self->conflictGen = std::max(self->conflictGen, wgen.generation);
130 			throw coordinated_state_conflict();
131 		}
132 	}
133 
replicatedReadCoordinatedStateImpl134 	ACTOR static Future<GenerationRegReadReply> replicatedRead( CoordinatedStateImpl* self, GenerationRegReadRequest req ) {
135 		state std::vector<GenerationRegInterface> &replicas = self->coordinators.stateServers;
136 		state vector< Future<GenerationRegReadReply> > rep_empty_reply;
137 		state vector< Future<GenerationRegReadReply> > rep_reply;
138 		for(int i=0; i<replicas.size(); i++) {
139 			Future<GenerationRegReadReply> reply = waitAndSendRead( replicas[i].read, GenerationRegReadRequest(req.key, req.gen) );
140 			rep_empty_reply.push_back( nonemptyToNever( reply ) );
141 			rep_reply.push_back( emptyToNever( reply ) );
142 			self->ac.add( success( reply ) );
143 		}
144 
145 		state Future<Void> majorityEmpty = quorum( rep_empty_reply, (replicas.size()+1)/2 ); //enough empty to ensure we cannot achieve a majority non-empty
146 		wait( quorum( rep_reply, replicas.size()/2 + 1 ) || majorityEmpty );
147 
148 		if( majorityEmpty.isReady() ) {
149 			int best = -1;
150 			for(int i=0; i<rep_empty_reply.size(); i++)
151 				if (rep_empty_reply[i].isReady() && !rep_empty_reply[i].isError()) {
152 					if (best < 0 || rep_empty_reply[i].get().rgen > rep_empty_reply[best].get().rgen )
153 						best = i;
154 				}
155 			ASSERT( best >= 0 );
156 			auto result = rep_empty_reply[best].get();
157 			return result;
158 		} else {
159 			int best = -1;
160 			for(int i=0; i<rep_reply.size(); i++)
161 				if (rep_reply[i].isReady() && !rep_reply[i].isError()) {
162 					if (best < 0 ||
163 						rep_reply[i].get().gen > rep_reply[best].get().gen ||
164 						( rep_reply[i].get().gen == rep_reply[best].get().gen && rep_reply[i].get().rgen > rep_reply[best].get().rgen ) )
165 						best = i;
166 				}
167 			ASSERT( best >= 0 );
168 			auto result = rep_reply[best].get();
169 			return result;
170 		}
171 	}
172 
replicatedWriteCoordinatedStateImpl173 	ACTOR static Future<UniqueGeneration> replicatedWrite( CoordinatedStateImpl* self, GenerationRegWriteRequest req ) {
174 		state std::vector<GenerationRegInterface> &replicas = self->coordinators.stateServers;
175 		state vector< Future<UniqueGeneration> > wrep_reply;
176 		for(int i=0; i<replicas.size(); i++) {
177 			Future<UniqueGeneration> reply = waitAndSendWrite( replicas[i].write, GenerationRegWriteRequest( req.kv, req.gen ) );
178 			wrep_reply.push_back( reply );
179 			self->ac.add( success( reply ) );
180 		}
181 
182 		wait( quorum( wrep_reply, self->initial ? replicas.size() : replicas.size()/2 + 1 ) );
183 
184 		UniqueGeneration maxGen;
185 		for(int i=0; i<wrep_reply.size(); i++)
186 			if (wrep_reply[i].isReady())
187 				maxGen = std::max(maxGen, wrep_reply[i].get());
188 		return maxGen;
189 	}
190 };
191 
CoordinatedState(ServerCoordinators const & coord)192 CoordinatedState::CoordinatedState( ServerCoordinators const& coord ) : impl( new CoordinatedStateImpl(coord) ) { }
~CoordinatedState()193 CoordinatedState::~CoordinatedState() { delete impl; }
read()194 Future<Value> CoordinatedState::read() { return CoordinatedStateImpl::read(impl); }
onConflict()195 Future<Void> CoordinatedState::onConflict() { return CoordinatedStateImpl::onConflict(impl); }
setExclusive(Value v)196 Future<Void> CoordinatedState::setExclusive(Value v) { return CoordinatedStateImpl::setExclusive(impl,v); }
getConflict()197 uint64_t CoordinatedState::getConflict() { return impl->getConflict(); }
198 
199 struct MovableValue {
200 	enum MoveState {
201 		MaybeTo = 1,
202 		Active = 2,
203 		MovingFrom = 3
204 	};
205 
206 	Value value;
207 	int32_t mode;
208 	Optional<Value> other;   // a cluster connection string
209 
MovableValueMovableValue210 	MovableValue() : mode( Active ) {}
MovableValueMovableValue211 	MovableValue( Value const& v, int mode, Optional<Value> other = Optional<Value>() ) : value( v ), mode( mode ), other( other ) {}
212 
213 	template <class Ar>
serializeMovableValue214 	void serialize(Ar& ar) {
215 		ASSERT( ar.protocolVersion() >= 0x0FDB00A2000D0001LL );
216 		serializer(ar, value, mode, other);
217 	}
218 };
219 
220 struct MovableCoordinatedStateImpl {
221 	ServerCoordinators coordinators;
222 	CoordinatedState cs;
223 	Optional<Value> lastValue,  // The value passed to setExclusive()
224 		lastCSValue;       // The value passed to cs.setExclusive()
225 
MovableCoordinatedStateImplMovableCoordinatedStateImpl226 	MovableCoordinatedStateImpl( ServerCoordinators const& c ) : coordinators(c), cs(c) {}
227 
readMovableCoordinatedStateImpl228 	ACTOR static Future<Value> read( MovableCoordinatedStateImpl* self ) {
229 		state MovableValue moveState;
230 		Value rawValue = wait( self->cs.read() );
231 		if( rawValue.size() ) {
232 			BinaryReader r( rawValue, IncludeVersion() );
233 			if (r.protocolVersion() < 0x0FDB00A2000D0001LL) {
234 				// Old coordinated state, not a MovableValue
235 				moveState.value = rawValue;
236 			} else
237 				r >> moveState;
238 		}
239 		// SOMEDAY: If moveState.mode == MovingFrom, read (without locking) old state and assert that it corresponds with our state and is ReallyTo(coordinators)
240 		if (moveState.mode == MovableValue::MaybeTo) {
241 			TEST(true);
242 			ASSERT( moveState.other.present() );
243 			wait( self->moveTo( self, &self->cs, ClusterConnectionString( moveState.other.get().toString() ), moveState.value ) );
244 		}
245 		return moveState.value;
246 	}
247 
onConflictMovableCoordinatedStateImpl248 	Future<Void> onConflict() {
249 		return cs.onConflict();
250 	}
251 
setExclusiveMovableCoordinatedStateImpl252 	Future<Void> setExclusive( Value v ) {
253 		lastValue=v;
254 		lastCSValue=BinaryWriter::toValue( MovableValue( v, MovableValue::Active ), IncludeVersion() );
255 		return cs.setExclusive( lastCSValue.get() );
256 	}
257 
moveMovableCoordinatedStateImpl258 	ACTOR static Future<Void> move( MovableCoordinatedStateImpl* self, ClusterConnectionString nc ) {
259 	// Call only after setExclusive returns.  Attempts to move the coordinated state
260 	// permanently to the new ServerCoordinators, which must be uninitialized.  Returns when the process has
261 	// reached the point where a leader elected by the new coordinators should be doing the rest of the work
262 	// (and therefore the caller should die).
263 		state CoordinatedState cs( self->coordinators );
264 		state CoordinatedState nccs( ServerCoordinators( Reference<ClusterConnectionFile>( new ClusterConnectionFile(nc) ) ) );
265 		state Future<Void> creationTimeout = delay(30);
266 		ASSERT( self->lastValue.present() && self->lastCSValue.present() );
267 		TraceEvent("StartMove").detail("ConnectionString", nc.toString() );
268 		choose {
269 			when (wait(creationTimeout)) { throw new_coordinators_timed_out(); }
270 			when (Value ncInitialValue = wait( nccs.read() )) {
271 				ASSERT( !ncInitialValue.size() );  // The new coordinators must be uninitialized!
272 			}
273 		}
274 		TraceEvent("FinishedRead").detail("ConnectionString", nc.toString() );
275 
276 		choose {
277 			when (wait(creationTimeout)) { throw new_coordinators_timed_out(); }
278 			when ( wait( nccs.setExclusive( BinaryWriter::toValue( MovableValue( self->lastValue.get(), MovableValue::MovingFrom, self->coordinators.ccf->getConnectionString().toString() ), IncludeVersion() ) ) ) ) {}
279 		}
280 
281 		if (BUGGIFY) wait(delay(5));
282 
283 		Value oldQuorumState = wait( cs.read() );
284 		if ( oldQuorumState != self->lastCSValue.get() ) {
285 			TEST(true);  // Quorum change aborted by concurrent write to old coordination state
286 			TraceEvent("QuorumChangeAbortedByConcurrency");
287 			throw coordinated_state_conflict();
288 		}
289 
290 		wait( self->moveTo( self, &cs, nc, self->lastValue.get() ) );
291 
292 		throw coordinators_changed();
293 	}
294 
moveToMovableCoordinatedStateImpl295 	ACTOR static Future<Void> moveTo( MovableCoordinatedStateImpl* self, CoordinatedState* coordinatedState, ClusterConnectionString nc, Value value ) {
296 		wait( coordinatedState->setExclusive( BinaryWriter::toValue( MovableValue( value, MovableValue::MaybeTo, nc.toString() ), IncludeVersion() ) ) );
297 
298 		if (BUGGIFY) wait( delay(5) );
299 
300 		// SOMEDAY: If we are worried about someone magically getting the new cluster ID and interfering, do a second cs.setExclusive( encode( ReallyTo, ... ) )
301 		TraceEvent("ChangingQuorum").detail("ConnectionString", nc.toString());
302 		wait( changeLeaderCoordinators( self->coordinators, StringRef(nc.toString()) ) );
303 		TraceEvent("ChangedQuorum").detail("ConnectionString", nc.toString());
304 		throw coordinators_changed();
305 	}
306 };
307 
operator =(MovableCoordinatedState && av)308 void MovableCoordinatedState::operator=(MovableCoordinatedState&& av) {
309 	if(impl) {
310 		delete impl;
311 	}
312 	impl = av.impl;
313 	av.impl = 0;
314 }
MovableCoordinatedState(class ServerCoordinators const & coord)315 MovableCoordinatedState::MovableCoordinatedState( class ServerCoordinators const& coord ) : impl( new MovableCoordinatedStateImpl(coord) ) {}
~MovableCoordinatedState()316 MovableCoordinatedState::~MovableCoordinatedState() {
317 	if(impl) {
318 		delete impl;
319 	}
320 }
read()321 Future<Value> MovableCoordinatedState::read() { return MovableCoordinatedStateImpl::read(impl); }
onConflict()322 Future<Void> MovableCoordinatedState::onConflict() { return impl->onConflict(); }
setExclusive(Value v)323 Future<Void> MovableCoordinatedState::setExclusive(Value v) { return impl->setExclusive(v); }
move(ClusterConnectionString const & nc)324 Future<Void> MovableCoordinatedState::move( ClusterConnectionString const& nc ) { return MovableCoordinatedStateImpl::move(impl, nc); }
325