1 /*
2 * FailureMonitorClient.actor.cpp
3 *
4 * This source file is part of the FoundationDB open source project
5 *
6 * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include "fdbclient/FailureMonitorClient.h"
22 #include "fdbrpc/FailureMonitor.h"
23 #include "fdbclient/ClusterInterface.h"
24 #include "flow/actorcompiler.h" // has to be last include
25 #include <unordered_set>
26
27 struct FailureMonitorClientState : ReferenceCounted<FailureMonitorClientState> {
28 std::unordered_set<NetworkAddress> knownAddrs;
29 double serverFailedTimeout;
30
FailureMonitorClientStateFailureMonitorClientState31 FailureMonitorClientState() {
32 serverFailedTimeout = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY;
33 }
34 };
35
failureMonitorClientLoop(SimpleFailureMonitor * monitor,ClusterInterface controller,Reference<FailureMonitorClientState> fmState,bool trackMyStatus)36 ACTOR Future<Void> failureMonitorClientLoop(
37 SimpleFailureMonitor* monitor,
38 ClusterInterface controller,
39 Reference<FailureMonitorClientState> fmState,
40 bool trackMyStatus)
41 {
42 state Version version = 0;
43 state Future<FailureMonitoringReply> request = Never();
44 state Future<Void> nextRequest = delay(0, TaskFailureMonitor);
45 state Future<Void> requestTimeout = Never();
46 state double before = now();
47 state double waitfor = 0;
48
49 state NetworkAddressList controlAddr = controller.failureMonitoring.getEndpoint().addresses;
50 monitor->setStatus(controlAddr.address, FailureStatus(false));
51 fmState->knownAddrs.insert(controlAddr.address);
52 if(controlAddr.secondaryAddress.present()) {
53 monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(false));
54 fmState->knownAddrs.insert(controlAddr.secondaryAddress.get());
55 }
56
57 //The cluster controller's addresses (controller.failureMonitoring.getEndpoint().addresses) are treated specially because we can declare that it is down independently
58 //of the response from the cluster controller. It still needs to be in knownAddrs in case the cluster controller changes, so the next cluster controller resets its state
59
60 try {
61 loop {
62 choose {
63 when( FailureMonitoringReply reply = wait( request ) ) {
64 g_network->setCurrentTask(TaskDefaultDelay);
65 request = Never();
66 requestTimeout = Never();
67 if (reply.allOthersFailed) {
68 // Reset all systems *not* mentioned in the reply to the default (failed) state
69 fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().addresses.address );
70 if(controller.failureMonitoring.getEndpoint().addresses.secondaryAddress.present()) {
71 fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().addresses.secondaryAddress.get() );
72 }
73
74 std::set<NetworkAddress> changedAddresses;
75 for(int c=0; c<reply.changes.size(); c++) {
76 changedAddresses.insert( reply.changes[c].addresses.address );
77 if(reply.changes[c].addresses.secondaryAddress.present()) {
78 changedAddresses.insert( reply.changes[c].addresses.secondaryAddress.get() );
79 }
80 }
81 for(auto& it : fmState->knownAddrs)
82 if (!changedAddresses.count( it ))
83 monitor->setStatus( it, FailureStatus() );
84 fmState->knownAddrs.clear();
85 } else {
86 ASSERT( version != 0 );
87 }
88
89 if( monitor->getState( controller.failureMonitoring.getEndpoint() ).isFailed() )
90 TraceEvent("FailureMonitoringServerUp").detail("OldServer",controller.id());
91
92 monitor->setStatus(controlAddr.address, FailureStatus(false));
93 fmState->knownAddrs.insert(controlAddr.address);
94 if(controlAddr.secondaryAddress.present()) {
95 monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(false));
96 fmState->knownAddrs.insert(controlAddr.secondaryAddress.get());
97 }
98
99 //if (version != reply.failureInformationVersion)
100 // printf("Client '%s': update from %lld to %lld (%d changes, aof=%d)\n", g_network->getLocalAddress().toString().c_str(), version, reply.failureInformationVersion, reply.changes.size(), reply.allOthersFailed);
101
102 version = reply.failureInformationVersion;
103 fmState->serverFailedTimeout = reply.considerServerFailedTimeoutMS * .001;
104 for(int c=0; c<reply.changes.size(); c++) {
105 //printf("Client '%s': status of '%s' is now '%s'\n", g_network->getLocalAddress().toString().c_str(), reply.changes[c].address.toString().c_str(), reply.changes[c].status.failed ? "Failed" : "OK");
106 auto& addrList = reply.changes[c].addresses;
107 monitor->setStatus( addrList.address, reply.changes[c].status );
108 if(addrList.secondaryAddress.present()) {
109 monitor->setStatus( addrList.secondaryAddress.get(), reply.changes[c].status );
110 }
111 if (reply.changes[c].status != FailureStatus()) {
112 fmState->knownAddrs.insert( addrList.address );
113 if(addrList.secondaryAddress.present()) {
114 fmState->knownAddrs.insert( addrList.secondaryAddress.get() );
115 }
116 } else {
117 fmState->knownAddrs.erase( addrList.address );
118 if(addrList.secondaryAddress.present()) {
119 fmState->knownAddrs.erase( addrList.secondaryAddress.get() );
120 }
121 }
122 }
123 before = now();
124 waitfor = reply.clientRequestIntervalMS * .001;
125 nextRequest = delayJittered( waitfor, TaskFailureMonitor );
126 }
127 when( wait( requestTimeout ) ) {
128 g_network->setCurrentTask(TaskDefaultDelay);
129 requestTimeout = Never();
130 TraceEvent(SevWarn, "FailureMonitoringServerDown").detail("OldServerID",controller.id());
131 monitor->setStatus(controlAddr.address, FailureStatus(true));
132 fmState->knownAddrs.erase(controlAddr.address);
133 if(controlAddr.secondaryAddress.present()) {
134 monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(true));
135 fmState->knownAddrs.erase(controlAddr.secondaryAddress.get());
136 }
137 }
138 when( wait( nextRequest ) ) {
139 g_network->setCurrentTask(TaskDefaultDelay);
140 nextRequest = Never();
141
142 double elapsed = now() - before;
143 double slowThreshold = .200 + waitfor + FLOW_KNOBS->MAX_BUGGIFIED_DELAY;
144 double warnAlwaysThreshold = CLIENT_KNOBS->FAILURE_MIN_DELAY/2;
145
146 if (elapsed > slowThreshold && g_random->random01() < elapsed / warnAlwaysThreshold) {
147 TraceEvent(elapsed > warnAlwaysThreshold ? SevWarnAlways : SevWarn, "FailureMonitorClientSlow").detail("Elapsed", elapsed).detail("Expected", waitfor);
148 }
149
150 FailureMonitoringRequest req;
151 req.failureInformationVersion = version;
152 req.addresses = g_network->getLocalAddresses();
153 if (trackMyStatus)
154 req.senderStatus = FailureStatus(false);
155 request = controller.failureMonitoring.getReply( req, TaskFailureMonitor );
156 if(!controller.failureMonitoring.getEndpoint().isLocal())
157 requestTimeout = delay( fmState->serverFailedTimeout, TaskFailureMonitor );
158 }
159 }
160 }
161 } catch (Error& e) {
162 if (e.code() == error_code_broken_promise) // broken promise from clustercontroller means it has died (and hopefully will be replaced)
163 return Void();
164 TraceEvent(SevError, "FailureMonitorClientError").error(e);
165 throw; // goes nowhere
166 }
167 }
168
failureMonitorClient(Reference<AsyncVar<Optional<struct ClusterInterface>>> ci,bool trackMyStatus)169 ACTOR Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, bool trackMyStatus ) {
170 state SimpleFailureMonitor* monitor = static_cast<SimpleFailureMonitor*>( &IFailureMonitor::failureMonitor() );
171 state Reference<FailureMonitorClientState> fmState = Reference<FailureMonitorClientState>(new FailureMonitorClientState());
172 auto localAddr = g_network->getLocalAddresses();
173 monitor->setStatus(localAddr.address, FailureStatus(false));
174 if(localAddr.secondaryAddress.present()) {
175 monitor->setStatus(localAddr.secondaryAddress.get(), FailureStatus(false));
176 }
177 loop {
178 state Future<Void> client = ci->get().present() ? failureMonitorClientLoop(monitor, ci->get().get(), fmState, trackMyStatus) : Void();
179 wait( ci->onChange() );
180 }
181 }
182