1 /*
2  * FailureMonitorClient.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "fdbclient/FailureMonitorClient.h"
22 #include "fdbrpc/FailureMonitor.h"
23 #include "fdbclient/ClusterInterface.h"
24 #include "flow/actorcompiler.h" // has to be last include
25 #include <unordered_set>
26 
27 struct FailureMonitorClientState : ReferenceCounted<FailureMonitorClientState> {
28 	std::unordered_set<NetworkAddress> knownAddrs;
29 	double serverFailedTimeout;
30 
FailureMonitorClientStateFailureMonitorClientState31 	FailureMonitorClientState() {
32 		serverFailedTimeout = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY;
33 	}
34 };
35 
failureMonitorClientLoop(SimpleFailureMonitor * monitor,ClusterInterface controller,Reference<FailureMonitorClientState> fmState,bool trackMyStatus)36 ACTOR Future<Void> failureMonitorClientLoop(
37 	SimpleFailureMonitor* monitor,
38 	ClusterInterface controller,
39 	Reference<FailureMonitorClientState> fmState,
40 	bool trackMyStatus)
41 {
42 	state Version version = 0;
43 	state Future<FailureMonitoringReply> request = Never();
44 	state Future<Void> nextRequest = delay(0, TaskFailureMonitor);
45 	state Future<Void> requestTimeout = Never();
46 	state double before = now();
47 	state double waitfor = 0;
48 
49 	state NetworkAddressList controlAddr = controller.failureMonitoring.getEndpoint().addresses;
50 	monitor->setStatus(controlAddr.address, FailureStatus(false));
51 	fmState->knownAddrs.insert(controlAddr.address);
52 	if(controlAddr.secondaryAddress.present()) {
53 		monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(false));
54 		fmState->knownAddrs.insert(controlAddr.secondaryAddress.get());
55 	}
56 
57 	//The cluster controller's addresses (controller.failureMonitoring.getEndpoint().addresses) are treated specially because we can declare that it is down independently
58 	//of the response from the cluster controller. It still needs to be in knownAddrs in case the cluster controller changes, so the next cluster controller resets its state
59 
60 	try {
61 		loop {
62 			choose {
63 				when( FailureMonitoringReply reply = wait( request ) ) {
64 					g_network->setCurrentTask(TaskDefaultDelay);
65 					request = Never();
66 					requestTimeout = Never();
67 					if (reply.allOthersFailed) {
68 						// Reset all systems *not* mentioned in the reply to the default (failed) state
69 						fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().addresses.address );
70 						if(controller.failureMonitoring.getEndpoint().addresses.secondaryAddress.present()) {
71 							fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().addresses.secondaryAddress.get() );
72 						}
73 
74 						std::set<NetworkAddress> changedAddresses;
75 						for(int c=0; c<reply.changes.size(); c++) {
76 							changedAddresses.insert( reply.changes[c].addresses.address );
77 							if(reply.changes[c].addresses.secondaryAddress.present()) {
78 								changedAddresses.insert( reply.changes[c].addresses.secondaryAddress.get() );
79 							}
80 						}
81 						for(auto& it : fmState->knownAddrs)
82 							if (!changedAddresses.count( it ))
83 								monitor->setStatus( it, FailureStatus() );
84 						fmState->knownAddrs.clear();
85 					} else {
86 						ASSERT( version != 0 );
87 					}
88 
89 					if( monitor->getState( controller.failureMonitoring.getEndpoint() ).isFailed() )
90 						TraceEvent("FailureMonitoringServerUp").detail("OldServer",controller.id());
91 
92 					monitor->setStatus(controlAddr.address, FailureStatus(false));
93 					fmState->knownAddrs.insert(controlAddr.address);
94 					if(controlAddr.secondaryAddress.present()) {
95 						monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(false));
96 						fmState->knownAddrs.insert(controlAddr.secondaryAddress.get());
97 					}
98 
99 					//if (version != reply.failureInformationVersion)
100 					//	printf("Client '%s': update from %lld to %lld (%d changes, aof=%d)\n", g_network->getLocalAddress().toString().c_str(), version, reply.failureInformationVersion, reply.changes.size(), reply.allOthersFailed);
101 
102 					version = reply.failureInformationVersion;
103 					fmState->serverFailedTimeout = reply.considerServerFailedTimeoutMS * .001;
104 					for(int c=0; c<reply.changes.size(); c++) {
105 						//printf("Client '%s': status of '%s' is now '%s'\n", g_network->getLocalAddress().toString().c_str(), reply.changes[c].address.toString().c_str(), reply.changes[c].status.failed ? "Failed" : "OK");
106 						auto& addrList = reply.changes[c].addresses;
107 						monitor->setStatus( addrList.address, reply.changes[c].status );
108 						if(addrList.secondaryAddress.present()) {
109 							monitor->setStatus( addrList.secondaryAddress.get(), reply.changes[c].status );
110 						}
111 						if (reply.changes[c].status != FailureStatus()) {
112 							fmState->knownAddrs.insert( addrList.address );
113 							if(addrList.secondaryAddress.present()) {
114 								fmState->knownAddrs.insert( addrList.secondaryAddress.get() );
115 							}
116 						} else {
117 							fmState->knownAddrs.erase( addrList.address );
118 							if(addrList.secondaryAddress.present()) {
119 								fmState->knownAddrs.erase( addrList.secondaryAddress.get() );
120 							}
121 						}
122 					}
123 					before = now();
124 					waitfor = reply.clientRequestIntervalMS * .001;
125 					nextRequest = delayJittered( waitfor, TaskFailureMonitor );
126 				}
127 				when( wait( requestTimeout ) ) {
128 					g_network->setCurrentTask(TaskDefaultDelay);
129 					requestTimeout = Never();
130 					TraceEvent(SevWarn, "FailureMonitoringServerDown").detail("OldServerID",controller.id());
131 					monitor->setStatus(controlAddr.address, FailureStatus(true));
132 					fmState->knownAddrs.erase(controlAddr.address);
133 					if(controlAddr.secondaryAddress.present()) {
134 						monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(true));
135 						fmState->knownAddrs.erase(controlAddr.secondaryAddress.get());
136 					}
137 				}
138 				when( wait( nextRequest ) ) {
139 					g_network->setCurrentTask(TaskDefaultDelay);
140 					nextRequest = Never();
141 
142 					double elapsed = now() - before;
143 					double slowThreshold = .200 + waitfor + FLOW_KNOBS->MAX_BUGGIFIED_DELAY;
144 					double warnAlwaysThreshold = CLIENT_KNOBS->FAILURE_MIN_DELAY/2;
145 
146 					if (elapsed > slowThreshold && g_random->random01() < elapsed / warnAlwaysThreshold) {
147 						TraceEvent(elapsed > warnAlwaysThreshold ? SevWarnAlways : SevWarn, "FailureMonitorClientSlow").detail("Elapsed", elapsed).detail("Expected", waitfor);
148 					}
149 
150 					FailureMonitoringRequest req;
151 					req.failureInformationVersion = version;
152 					req.addresses = g_network->getLocalAddresses();
153 					if (trackMyStatus)
154 						req.senderStatus = FailureStatus(false);
155 					request = controller.failureMonitoring.getReply( req, TaskFailureMonitor );
156 					if(!controller.failureMonitoring.getEndpoint().isLocal())
157 						requestTimeout = delay( fmState->serverFailedTimeout, TaskFailureMonitor );
158 				}
159 			}
160 		}
161 	} catch (Error& e) {
162 		if (e.code() == error_code_broken_promise)  // broken promise from clustercontroller means it has died (and hopefully will be replaced)
163 			return Void();
164 		TraceEvent(SevError, "FailureMonitorClientError").error(e);
165 		throw;  // goes nowhere
166 	}
167 }
168 
failureMonitorClient(Reference<AsyncVar<Optional<struct ClusterInterface>>> ci,bool trackMyStatus)169 ACTOR Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, bool trackMyStatus ) {
170 	state SimpleFailureMonitor* monitor = static_cast<SimpleFailureMonitor*>( &IFailureMonitor::failureMonitor() );
171 	state Reference<FailureMonitorClientState> fmState = Reference<FailureMonitorClientState>(new FailureMonitorClientState());
172 	auto localAddr = g_network->getLocalAddresses();
173 	monitor->setStatus(localAddr.address, FailureStatus(false));
174 	if(localAddr.secondaryAddress.present()) {
175 		monitor->setStatus(localAddr.secondaryAddress.get(), FailureStatus(false));
176 	}
177 	loop {
178 		state Future<Void> client = ci->get().present() ? failureMonitorClientLoop(monitor, ci->get().get(), fmState, trackMyStatus) : Void();
179 		wait( ci->onChange() );
180 	}
181 }
182