1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #include "icinga/clusterevents.hpp"
4 #include "icinga/icingaapplication.hpp"
5 #include "remote/apilistener.hpp"
6 #include "base/configuration.hpp"
7 #include "base/defer.hpp"
8 #include "base/serializer.hpp"
9 #include "base/exception.hpp"
10 #include <boost/thread/once.hpp>
11 #include <thread>
12 
13 using namespace icinga;
14 
15 std::mutex ClusterEvents::m_Mutex;
16 std::deque<std::function<void ()>> ClusterEvents::m_CheckRequestQueue;
17 bool ClusterEvents::m_CheckSchedulerRunning;
18 int ClusterEvents::m_ChecksExecutedDuringInterval;
19 int ClusterEvents::m_ChecksDroppedDuringInterval;
20 Timer::Ptr ClusterEvents::m_LogTimer;
21 
RemoteCheckThreadProc()22 void ClusterEvents::RemoteCheckThreadProc()
23 {
24 	Utility::SetThreadName("Remote Check Scheduler");
25 
26 	int maxConcurrentChecks = IcingaApplication::GetInstance()->GetMaxConcurrentChecks();
27 
28 	std::unique_lock<std::mutex> lock(m_Mutex);
29 
30 	for(;;) {
31 		if (m_CheckRequestQueue.empty())
32 			break;
33 
34 		lock.unlock();
35 		Checkable::AquirePendingCheckSlot(maxConcurrentChecks);
36 		lock.lock();
37 
38 		auto callback = m_CheckRequestQueue.front();
39 		m_CheckRequestQueue.pop_front();
40 		m_ChecksExecutedDuringInterval++;
41 		lock.unlock();
42 
43 		callback();
44 		Checkable::DecreasePendingChecks();
45 
46 		lock.lock();
47 	}
48 
49 	m_CheckSchedulerRunning = false;
50 }
51 
EnqueueCheck(const MessageOrigin::Ptr & origin,const Dictionary::Ptr & params)52 void ClusterEvents::EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
53 {
54 	static boost::once_flag once = BOOST_ONCE_INIT;
55 
56 	boost::call_once(once, []() {
57 		m_LogTimer = new Timer();
58 		m_LogTimer->SetInterval(10);
59 		m_LogTimer->OnTimerExpired.connect([](const Timer * const&) { LogRemoteCheckQueueInformation(); });
60 		m_LogTimer->Start();
61 	});
62 
63 	std::unique_lock<std::mutex> lock(m_Mutex);
64 
65 	if (m_CheckRequestQueue.size() >= 25000) {
66 		m_ChecksDroppedDuringInterval++;
67 		return;
68 	}
69 
70 	m_CheckRequestQueue.emplace_back([origin, params]() { ExecuteCheckFromQueue(origin, params); });
71 
72 	if (!m_CheckSchedulerRunning) {
73 		std::thread t(ClusterEvents::RemoteCheckThreadProc);
74 		t.detach();
75 		m_CheckSchedulerRunning = true;
76 	}
77 }
78 
SendEventExecutedCommand(const Dictionary::Ptr & params,long exitStatus,const String & output,double start,double end,const ApiListener::Ptr & listener,const MessageOrigin::Ptr & origin,const Endpoint::Ptr & sourceEndpoint)79 static void SendEventExecutedCommand(const Dictionary::Ptr& params, long exitStatus, const String& output,
80 	double start, double end, const ApiListener::Ptr& listener, const MessageOrigin::Ptr& origin,
81 	const Endpoint::Ptr& sourceEndpoint)
82 {
83 	Dictionary::Ptr executedParams = new Dictionary();
84 	executedParams->Set("execution", params->Get("source"));
85 	executedParams->Set("host", params->Get("host"));
86 
87 	if (params->Contains("service"))
88 		executedParams->Set("service", params->Get("service"));
89 
90 	executedParams->Set("exit", exitStatus);
91 	executedParams->Set("output", output);
92 	executedParams->Set("start", start);
93 	executedParams->Set("end", end);
94 
95 	if (origin->IsLocal()) {
96 		ClusterEvents::ExecutedCommandAPIHandler(origin, executedParams);
97 	} else {
98 		Dictionary::Ptr executedMessage = new Dictionary();
99 		executedMessage->Set("jsonrpc", "2.0");
100 		executedMessage->Set("method", "event::ExecutedCommand");
101 		executedMessage->Set("params", executedParams);
102 
103 		listener->SyncSendMessage(sourceEndpoint, executedMessage);
104 	}
105 }
106 
ExecuteCheckFromQueue(const MessageOrigin::Ptr & origin,const Dictionary::Ptr & params)107 void ClusterEvents::ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) {
108 
109 	Endpoint::Ptr sourceEndpoint;
110 
111 	if (origin->FromClient) {
112 		sourceEndpoint = origin->FromClient->GetEndpoint();
113 	} else if (origin->IsLocal()){
114 		sourceEndpoint = Endpoint::GetLocalEndpoint();
115 	}
116 
117 	if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) {
118 		Log(LogNotice, "ClusterEvents")
119 			<< "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed).";
120 		return;
121 	}
122 
123 	ApiListener::Ptr listener = ApiListener::GetInstance();
124 
125 	if (!listener) {
126 		Log(LogCritical, "ApiListener") << "No instance available.";
127 		return;
128 	}
129 
130 	Defer resetExecuteCommandProcessFinishedHandler ([]() {
131 		Checkable::ExecuteCommandProcessFinishedHandler = nullptr;
132 	});
133 
134 	if (params->Contains("source")) {
135 		String uuid = params->Get("source");
136 
137 		String checkableName = params->Get("host");
138 
139 		if (params->Contains("service"))
140 			checkableName += "!" + params->Get("service");
141 
142 		/* Check deadline */
143 		double deadline = params->Get("deadline");
144 
145 		if (Utility::GetTime() > deadline) {
146 			Log(LogNotice, "ApiListener")
147 				<< "Discarding 'ExecuteCheckFromQueue' event for checkable '" << checkableName
148 				<< "' from '" << origin->FromClient->GetIdentity() << "': Deadline has expired.";
149 			return;
150 		}
151 
152 		Checkable::ExecuteCommandProcessFinishedHandler = [checkableName, listener, sourceEndpoint, origin, params] (const Value& commandLine, const ProcessResult& pr) {
153 			if (params->Get("command_type") == "check_command") {
154 				Checkable::CurrentConcurrentChecks.fetch_sub(1);
155 				Checkable::DecreasePendingChecks();
156 			}
157 
158 			if (pr.ExitStatus > 3) {
159 				Process::Arguments parguments = Process::PrepareCommand(commandLine);
160 				Log(LogWarning, "ApiListener")
161 					<< "Command for object '" << checkableName << "' (PID: " << pr.PID
162 					<< ", arguments: " << Process::PrettyPrintArguments(parguments) << ") terminated with exit code "
163 					<< pr.ExitStatus << ", output: " << pr.Output;
164 			}
165 
166 			SendEventExecutedCommand(params, pr.ExitStatus, pr.Output, pr.ExecutionStart, pr.ExecutionEnd, listener,
167 									origin, sourceEndpoint);
168 		};
169 	}
170 
171 	if (!listener->GetAcceptCommands() && !origin->IsLocal()) {
172 		Log(LogWarning, "ApiListener")
173 			<< "Ignoring command. '" << listener->GetName() << "' does not accept commands.";
174 
175 		String output = "Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands.";
176 
177 		if (params->Contains("source")) {
178 			double now = Utility::GetTime();
179 			SendEventExecutedCommand(params, 126, output, now, now, listener, origin, sourceEndpoint);
180 		} else {
181 			Host::Ptr host = new Host();
182 			Dictionary::Ptr attrs = new Dictionary();
183 
184 			attrs->Set("__name", params->Get("host"));
185 			attrs->Set("type", "Host");
186 			attrs->Set("enable_active_checks", false);
187 
188 			Deserialize(host, attrs, false, FAConfig);
189 
190 			if (params->Contains("service"))
191 				host->SetExtension("agent_service_name", params->Get("service"));
192 
193 			CheckResult::Ptr cr = new CheckResult();
194 			cr->SetState(ServiceUnknown);
195 			cr->SetOutput(output);
196 
197 			Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
198 			listener->SyncSendMessage(sourceEndpoint, message);
199 		}
200 
201 		return;
202 	}
203 
204 	/* use a virtual host object for executing the command */
205 	Host::Ptr host = new Host();
206 	Dictionary::Ptr attrs = new Dictionary();
207 
208 	attrs->Set("__name", params->Get("host"));
209 	attrs->Set("type", "Host");
210 
211 	/*
212 	 * Override the check timeout if the parent caller provided the value. Compatible with older versions not
213 	 * passing this inside the cluster message.
214 	 * This happens with host/service command_endpoint agents and the 'check_timeout' attribute being specified.
215 	 */
216 	if (params->Contains("check_timeout"))
217 		attrs->Set("check_timeout", params->Get("check_timeout"));
218 
219 	Deserialize(host, attrs, false, FAConfig);
220 
221 	if (params->Contains("service"))
222 		host->SetExtension("agent_service_name", params->Get("service"));
223 
224 	String command = params->Get("command");
225 	String command_type = params->Get("command_type");
226 
227 	if (command_type == "check_command") {
228 		if (!CheckCommand::GetByName(command)) {
229 			ServiceState state = ServiceUnknown;
230 			String output = "Check command '" + command + "' does not exist.";
231 			double now = Utility::GetTime();
232 
233 			if (params->Contains("source")) {
234 				SendEventExecutedCommand(params, state, output, now, now, listener, origin, sourceEndpoint);
235 			} else {
236 				CheckResult::Ptr cr = new CheckResult();
237 				cr->SetState(state);
238 				cr->SetOutput(output);
239 				Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
240 				listener->SyncSendMessage(sourceEndpoint, message);
241 			}
242 
243 			return;
244 		}
245 	} else if (command_type == "event_command") {
246 		if (!EventCommand::GetByName(command)) {
247 			String output = "Event command '" + command + "' does not exist.";
248 			Log(LogWarning, "ClusterEvents") << output;
249 
250 			if (params->Contains("source")) {
251 				double now = Utility::GetTime();
252 				SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
253 			}
254 
255 			return;
256 		}
257 	} else if (command_type == "notification_command") {
258 		if (!NotificationCommand::GetByName(command)) {
259 			String output = "Notification command '" + command + "' does not exist.";
260 			Log(LogWarning, "ClusterEvents") << output;
261 
262 			if (params->Contains("source")) {
263 				double now = Utility::GetTime();
264 				SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
265 			}
266 
267 			return;
268 		}
269 	}
270 
271 	attrs->Set(command_type, params->Get("command"));
272 	attrs->Set("command_endpoint", sourceEndpoint->GetName());
273 
274 	Deserialize(host, attrs, false, FAConfig);
275 
276 	host->SetExtension("agent_check", true);
277 
278 	Dictionary::Ptr macros = params->Get("macros");
279 
280 	if (command_type == "check_command") {
281 		try {
282 			host->ExecuteRemoteCheck(macros);
283 		} catch (const std::exception& ex) {
284 			String output = "Exception occurred while checking '" + host->GetName() + "': " + DiagnosticInformation(ex);
285 			ServiceState state = ServiceUnknown;
286 			double now = Utility::GetTime();
287 
288 			if (params->Contains("source")) {
289 				SendEventExecutedCommand(params, state, output, now, now, listener, origin, sourceEndpoint);
290 			} else {
291 				CheckResult::Ptr cr = new CheckResult();
292 				cr->SetState(state);
293 				cr->SetOutput(output);
294 				cr->SetScheduleStart(now);
295 				cr->SetScheduleEnd(now);
296 				cr->SetExecutionStart(now);
297 				cr->SetExecutionEnd(now);
298 
299 				Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
300 				listener->SyncSendMessage(sourceEndpoint, message);
301 			}
302 
303 			Log(LogCritical, "checker", output);
304 		}
305 	} else if (command_type == "event_command") {
306 		try {
307 			host->ExecuteEventHandler(macros, true);
308 		} catch (const std::exception& ex) {
309 			if (params->Contains("source")) {
310 				String output = "Exception occurred while executing event command '" + command + "' for '" +
311 					host->GetName() + "': " + DiagnosticInformation(ex);
312 
313 				double now = Utility::GetTime();
314 				SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
315 			} else {
316 				throw;
317 			}
318 		}
319 	} else if (command_type == "notification_command" && params->Contains("source")) {
320 		/* Get user */
321 		User::Ptr user = new User();
322 		Dictionary::Ptr attrs = new Dictionary();
323 		attrs->Set("__name", params->Get("user"));
324 		attrs->Set("type", User::GetTypeName());
325 
326 		Deserialize(user, attrs, false, FAConfig);
327 
328 		/* Get notification */
329 		Notification::Ptr notification = new Notification();
330 		attrs->Clear();
331 		attrs->Set("__name", params->Get("notification"));
332 		attrs->Set("type", Notification::GetTypeName());
333 		attrs->Set("command", command);
334 
335 		Deserialize(notification, attrs, false, FAConfig);
336 
337 		try {
338 			CheckResult::Ptr cr = new CheckResult();
339 			String author = macros->Get("notification_author");
340 			NotificationCommand::Ptr notificationCommand = NotificationCommand::GetByName(command);
341 
342 			notificationCommand->Execute(notification, user, cr, NotificationType::NotificationCustom,
343 				author, "");
344 		} catch (const std::exception& ex) {
345 			String output = "Exception occurred during notification '" + notification->GetName()
346 				+ "' for checkable '" + notification->GetCheckable()->GetName()
347 				+ "' and user '" + user->GetName() + "' using command '" + command + "': "
348 				+ DiagnosticInformation(ex, false);
349 			double now = Utility::GetTime();
350 			SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
351 		}
352 	}
353 }
354 
GetCheckRequestQueueSize()355 int ClusterEvents::GetCheckRequestQueueSize()
356 {
357 	return m_CheckRequestQueue.size();
358 }
359 
LogRemoteCheckQueueInformation()360 void ClusterEvents::LogRemoteCheckQueueInformation() {
361 	if (m_ChecksDroppedDuringInterval > 0) {
362 		Log(LogCritical, "ClusterEvents")
363 			<< "Remote check queue ran out of slots. "
364 			<< m_ChecksDroppedDuringInterval << " checks dropped.";
365 		m_ChecksDroppedDuringInterval = 0;
366 	}
367 
368 	if (m_ChecksExecutedDuringInterval == 0)
369 		return;
370 
371 	Log(LogInformation, "RemoteCheckQueue")
372 		<< "items: " << m_CheckRequestQueue.size()
373 		<< ", rate: " << m_ChecksExecutedDuringInterval / 10 << "/s "
374 		<< "(" << m_ChecksExecutedDuringInterval * 6 << "/min "
375 		<< m_ChecksExecutedDuringInterval * 6 * 5 << "/5min "
376 		<< m_ChecksExecutedDuringInterval * 6 * 15 << "/15min" << ");";
377 
378 	m_ChecksExecutedDuringInterval = 0;
379 }
380