1 /*
2 * CoordinatedState.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbserver/CoordinatedState.h"
22 #include "fdbserver/CoordinationInterface.h"
23 #include "fdbserver/Knobs.h"
24 #include "flow/ActorCollection.h"
25 #include "fdbserver/LeaderElection.h"
26 #include "flow/actorcompiler.h" // has to be last include
27
waitAndSendRead(RequestStream<GenerationRegReadRequest> to,GenerationRegReadRequest req)28 ACTOR Future<GenerationRegReadReply> waitAndSendRead( RequestStream<GenerationRegReadRequest> to, GenerationRegReadRequest req ) {
29 if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
30 wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
31 state GenerationRegReadReply reply = wait( retryBrokenPromise( to, req ) );
32 if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
33 wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
34 return reply;
35 }
36
waitAndSendWrite(RequestStream<GenerationRegWriteRequest> to,GenerationRegWriteRequest req)37 ACTOR Future<UniqueGeneration> waitAndSendWrite(RequestStream<GenerationRegWriteRequest> to, GenerationRegWriteRequest req) {
38 if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
39 wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
40 state UniqueGeneration reply = wait( retryBrokenPromise( to, req ) );
41 if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY )
42 wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) );
43 return reply;
44 }
45
emptyToNever(Future<GenerationRegReadReply> f)46 ACTOR Future<GenerationRegReadReply> emptyToNever( Future<GenerationRegReadReply> f ) {
47 state GenerationRegReadReply r = wait(f);
48 if (r.gen.generation == 0)
49 wait( Future<Void>(Never()) );
50 return r;
51 }
52
nonemptyToNever(Future<GenerationRegReadReply> f)53 ACTOR Future<GenerationRegReadReply> nonemptyToNever( Future<GenerationRegReadReply> f ) {
54 state GenerationRegReadReply r = wait(f);
55 if (r.gen.generation != 0)
56 wait( Future<Void>(Never()) );
57 return r;
58 }
59
60 struct CoordinatedStateImpl {
61 ServerCoordinators coordinators;
62 int stage;
63 UniqueGeneration gen;
64 uint64_t conflictGen;
65 bool doomed;
66 ActorCollection ac; //Errors are not reported
67 bool initial;
68
CoordinatedStateImplCoordinatedStateImpl69 CoordinatedStateImpl( ServerCoordinators const& c ) : coordinators(c), stage(0), conflictGen(0), doomed(false), ac(false), initial(false) {}
getConflictCoordinatedStateImpl70 uint64_t getConflict() { return conflictGen; }
71
isDoomedCoordinatedStateImpl72 bool isDoomed( GenerationRegReadReply const& rep ) {
73 return rep.gen > gen // setExclusive is doomed, because there was a write at least started at a higher generation, which means a read completed at that higher generation
74 // || rep.rgen > gen // setExclusive isn't absolutely doomed, but it may/probably will fail
75 ;
76 }
77
readCoordinatedStateImpl78 ACTOR static Future<Value> read( CoordinatedStateImpl* self ) {
79 ASSERT( self->stage == 0 );
80
81 {
82 self->stage = 1;
83 GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, UniqueGeneration() ) ) );
84 self->conflictGen = std::max( self->conflictGen, std::max(rep.gen.generation, rep.rgen.generation) ) + 1;
85 self->gen = UniqueGeneration( self->conflictGen, g_random->randomUniqueID() );
86 }
87
88 {
89 self->stage = 2;
90 GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, self->gen ) ) );
91 self->stage = 3;
92 self->conflictGen = std::max(self->conflictGen, std::max( rep.gen.generation, rep.rgen.generation ));
93 if (self->isDoomed(rep))
94 self->doomed = true;
95 self->initial = rep.gen.generation == 0;
96
97 self->stage = 4;
98 return rep.value.present() ? rep.value.get() : Value();
99 }
100 }
onConflictCoordinatedStateImpl101 ACTOR static Future<Void> onConflict( CoordinatedStateImpl* self ) {
102 ASSERT( self->stage == 4 );
103 if (self->doomed) return Void();
104 loop {
105 wait( delay( SERVER_KNOBS->COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL ) );
106 GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, UniqueGeneration() ) ) );
107 if (self->stage > 4) break;
108 self->conflictGen = std::max(self->conflictGen, std::max( rep.gen.generation, rep.rgen.generation ));
109 if (self->isDoomed(rep))
110 return Void();
111 }
112 wait( Future<Void>(Never()) );
113 return Void();
114 }
setExclusiveCoordinatedStateImpl115 ACTOR static Future<Void> setExclusive( CoordinatedStateImpl* self, Value v ) {
116 ASSERT( self->stage == 4 );
117 self->stage = 5;
118
119 UniqueGeneration wgen = wait( self->replicatedWrite( self, GenerationRegWriteRequest( KeyValueRef(self->coordinators.clusterKey, v), self->gen ) ) );
120 self->stage = 6;
121
122 TraceEvent("CoordinatedStateSet").detail("Gen", self->gen.generation).detail("Wgen", wgen.generation)
123 .detail("Genu", self->gen.uid).detail("Wgenu", wgen.uid)
124 .detail("Cgen", self->conflictGen);
125
126 if (wgen == self->gen)
127 return Void();
128 else {
129 self->conflictGen = std::max(self->conflictGen, wgen.generation);
130 throw coordinated_state_conflict();
131 }
132 }
133
replicatedReadCoordinatedStateImpl134 ACTOR static Future<GenerationRegReadReply> replicatedRead( CoordinatedStateImpl* self, GenerationRegReadRequest req ) {
135 state std::vector<GenerationRegInterface> &replicas = self->coordinators.stateServers;
136 state vector< Future<GenerationRegReadReply> > rep_empty_reply;
137 state vector< Future<GenerationRegReadReply> > rep_reply;
138 for(int i=0; i<replicas.size(); i++) {
139 Future<GenerationRegReadReply> reply = waitAndSendRead( replicas[i].read, GenerationRegReadRequest(req.key, req.gen) );
140 rep_empty_reply.push_back( nonemptyToNever( reply ) );
141 rep_reply.push_back( emptyToNever( reply ) );
142 self->ac.add( success( reply ) );
143 }
144
145 state Future<Void> majorityEmpty = quorum( rep_empty_reply, (replicas.size()+1)/2 ); //enough empty to ensure we cannot achieve a majority non-empty
146 wait( quorum( rep_reply, replicas.size()/2 + 1 ) || majorityEmpty );
147
148 if( majorityEmpty.isReady() ) {
149 int best = -1;
150 for(int i=0; i<rep_empty_reply.size(); i++)
151 if (rep_empty_reply[i].isReady() && !rep_empty_reply[i].isError()) {
152 if (best < 0 || rep_empty_reply[i].get().rgen > rep_empty_reply[best].get().rgen )
153 best = i;
154 }
155 ASSERT( best >= 0 );
156 auto result = rep_empty_reply[best].get();
157 return result;
158 } else {
159 int best = -1;
160 for(int i=0; i<rep_reply.size(); i++)
161 if (rep_reply[i].isReady() && !rep_reply[i].isError()) {
162 if (best < 0 ||
163 rep_reply[i].get().gen > rep_reply[best].get().gen ||
164 ( rep_reply[i].get().gen == rep_reply[best].get().gen && rep_reply[i].get().rgen > rep_reply[best].get().rgen ) )
165 best = i;
166 }
167 ASSERT( best >= 0 );
168 auto result = rep_reply[best].get();
169 return result;
170 }
171 }
172
replicatedWriteCoordinatedStateImpl173 ACTOR static Future<UniqueGeneration> replicatedWrite( CoordinatedStateImpl* self, GenerationRegWriteRequest req ) {
174 state std::vector<GenerationRegInterface> &replicas = self->coordinators.stateServers;
175 state vector< Future<UniqueGeneration> > wrep_reply;
176 for(int i=0; i<replicas.size(); i++) {
177 Future<UniqueGeneration> reply = waitAndSendWrite( replicas[i].write, GenerationRegWriteRequest( req.kv, req.gen ) );
178 wrep_reply.push_back( reply );
179 self->ac.add( success( reply ) );
180 }
181
182 wait( quorum( wrep_reply, self->initial ? replicas.size() : replicas.size()/2 + 1 ) );
183
184 UniqueGeneration maxGen;
185 for(int i=0; i<wrep_reply.size(); i++)
186 if (wrep_reply[i].isReady())
187 maxGen = std::max(maxGen, wrep_reply[i].get());
188 return maxGen;
189 }
190 };
191
CoordinatedState(ServerCoordinators const & coord)192 CoordinatedState::CoordinatedState( ServerCoordinators const& coord ) : impl( new CoordinatedStateImpl(coord) ) { }
~CoordinatedState()193 CoordinatedState::~CoordinatedState() { delete impl; }
read()194 Future<Value> CoordinatedState::read() { return CoordinatedStateImpl::read(impl); }
onConflict()195 Future<Void> CoordinatedState::onConflict() { return CoordinatedStateImpl::onConflict(impl); }
setExclusive(Value v)196 Future<Void> CoordinatedState::setExclusive(Value v) { return CoordinatedStateImpl::setExclusive(impl,v); }
getConflict()197 uint64_t CoordinatedState::getConflict() { return impl->getConflict(); }
198
199 struct MovableValue {
200 enum MoveState {
201 MaybeTo = 1,
202 Active = 2,
203 MovingFrom = 3
204 };
205
206 Value value;
207 int32_t mode;
208 Optional<Value> other; // a cluster connection string
209
MovableValueMovableValue210 MovableValue() : mode( Active ) {}
MovableValueMovableValue211 MovableValue( Value const& v, int mode, Optional<Value> other = Optional<Value>() ) : value( v ), mode( mode ), other( other ) {}
212
213 template <class Ar>
serializeMovableValue214 void serialize(Ar& ar) {
215 ASSERT( ar.protocolVersion() >= 0x0FDB00A2000D0001LL );
216 serializer(ar, value, mode, other);
217 }
218 };
219
220 struct MovableCoordinatedStateImpl {
221 ServerCoordinators coordinators;
222 CoordinatedState cs;
223 Optional<Value> lastValue, // The value passed to setExclusive()
224 lastCSValue; // The value passed to cs.setExclusive()
225
MovableCoordinatedStateImplMovableCoordinatedStateImpl226 MovableCoordinatedStateImpl( ServerCoordinators const& c ) : coordinators(c), cs(c) {}
227
readMovableCoordinatedStateImpl228 ACTOR static Future<Value> read( MovableCoordinatedStateImpl* self ) {
229 state MovableValue moveState;
230 Value rawValue = wait( self->cs.read() );
231 if( rawValue.size() ) {
232 BinaryReader r( rawValue, IncludeVersion() );
233 if (r.protocolVersion() < 0x0FDB00A2000D0001LL) {
234 // Old coordinated state, not a MovableValue
235 moveState.value = rawValue;
236 } else
237 r >> moveState;
238 }
239 // SOMEDAY: If moveState.mode == MovingFrom, read (without locking) old state and assert that it corresponds with our state and is ReallyTo(coordinators)
240 if (moveState.mode == MovableValue::MaybeTo) {
241 TEST(true);
242 ASSERT( moveState.other.present() );
243 wait( self->moveTo( self, &self->cs, ClusterConnectionString( moveState.other.get().toString() ), moveState.value ) );
244 }
245 return moveState.value;
246 }
247
onConflictMovableCoordinatedStateImpl248 Future<Void> onConflict() {
249 return cs.onConflict();
250 }
251
setExclusiveMovableCoordinatedStateImpl252 Future<Void> setExclusive( Value v ) {
253 lastValue=v;
254 lastCSValue=BinaryWriter::toValue( MovableValue( v, MovableValue::Active ), IncludeVersion() );
255 return cs.setExclusive( lastCSValue.get() );
256 }
257
moveMovableCoordinatedStateImpl258 ACTOR static Future<Void> move( MovableCoordinatedStateImpl* self, ClusterConnectionString nc ) {
259 // Call only after setExclusive returns. Attempts to move the coordinated state
260 // permanently to the new ServerCoordinators, which must be uninitialized. Returns when the process has
261 // reached the point where a leader elected by the new coordinators should be doing the rest of the work
262 // (and therefore the caller should die).
263 state CoordinatedState cs( self->coordinators );
264 state CoordinatedState nccs( ServerCoordinators( Reference<ClusterConnectionFile>( new ClusterConnectionFile(nc) ) ) );
265 state Future<Void> creationTimeout = delay(30);
266 ASSERT( self->lastValue.present() && self->lastCSValue.present() );
267 TraceEvent("StartMove").detail("ConnectionString", nc.toString() );
268 choose {
269 when (wait(creationTimeout)) { throw new_coordinators_timed_out(); }
270 when (Value ncInitialValue = wait( nccs.read() )) {
271 ASSERT( !ncInitialValue.size() ); // The new coordinators must be uninitialized!
272 }
273 }
274 TraceEvent("FinishedRead").detail("ConnectionString", nc.toString() );
275
276 choose {
277 when (wait(creationTimeout)) { throw new_coordinators_timed_out(); }
278 when ( wait( nccs.setExclusive( BinaryWriter::toValue( MovableValue( self->lastValue.get(), MovableValue::MovingFrom, self->coordinators.ccf->getConnectionString().toString() ), IncludeVersion() ) ) ) ) {}
279 }
280
281 if (BUGGIFY) wait(delay(5));
282
283 Value oldQuorumState = wait( cs.read() );
284 if ( oldQuorumState != self->lastCSValue.get() ) {
285 TEST(true); // Quorum change aborted by concurrent write to old coordination state
286 TraceEvent("QuorumChangeAbortedByConcurrency");
287 throw coordinated_state_conflict();
288 }
289
290 wait( self->moveTo( self, &cs, nc, self->lastValue.get() ) );
291
292 throw coordinators_changed();
293 }
294
moveToMovableCoordinatedStateImpl295 ACTOR static Future<Void> moveTo( MovableCoordinatedStateImpl* self, CoordinatedState* coordinatedState, ClusterConnectionString nc, Value value ) {
296 wait( coordinatedState->setExclusive( BinaryWriter::toValue( MovableValue( value, MovableValue::MaybeTo, nc.toString() ), IncludeVersion() ) ) );
297
298 if (BUGGIFY) wait( delay(5) );
299
300 // SOMEDAY: If we are worried about someone magically getting the new cluster ID and interfering, do a second cs.setExclusive( encode( ReallyTo, ... ) )
301 TraceEvent("ChangingQuorum").detail("ConnectionString", nc.toString());
302 wait( changeLeaderCoordinators( self->coordinators, StringRef(nc.toString()) ) );
303 TraceEvent("ChangedQuorum").detail("ConnectionString", nc.toString());
304 throw coordinators_changed();
305 }
306 };
307
operator =(MovableCoordinatedState && av)308 void MovableCoordinatedState::operator=(MovableCoordinatedState&& av) {
309 if(impl) {
310 delete impl;
311 }
312 impl = av.impl;
313 av.impl = 0;
314 }
MovableCoordinatedState(class ServerCoordinators const & coord)315 MovableCoordinatedState::MovableCoordinatedState( class ServerCoordinators const& coord ) : impl( new MovableCoordinatedStateImpl(coord) ) {}
~MovableCoordinatedState()316 MovableCoordinatedState::~MovableCoordinatedState() {
317 if(impl) {
318 delete impl;
319 }
320 }
read()321 Future<Value> MovableCoordinatedState::read() { return MovableCoordinatedStateImpl::read(impl); }
onConflict()322 Future<Void> MovableCoordinatedState::onConflict() { return impl->onConflict(); }
setExclusive(Value v)323 Future<Void> MovableCoordinatedState::setExclusive(Value v) { return impl->setExclusive(v); }
move(ClusterConnectionString const & nc)324 Future<Void> MovableCoordinatedState::move( ClusterConnectionString const& nc ) { return MovableCoordinatedStateImpl::move(impl, nc); }
325