1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "icinga/clusterevents.hpp"
4 #include "icinga/icingaapplication.hpp"
5 #include "remote/apilistener.hpp"
6 #include "base/configuration.hpp"
7 #include "base/defer.hpp"
8 #include "base/serializer.hpp"
9 #include "base/exception.hpp"
10 #include <boost/thread/once.hpp>
11 #include <thread>
12
13 using namespace icinga;
14
15 std::mutex ClusterEvents::m_Mutex;
16 std::deque<std::function<void ()>> ClusterEvents::m_CheckRequestQueue;
17 bool ClusterEvents::m_CheckSchedulerRunning;
18 int ClusterEvents::m_ChecksExecutedDuringInterval;
19 int ClusterEvents::m_ChecksDroppedDuringInterval;
20 Timer::Ptr ClusterEvents::m_LogTimer;
21
RemoteCheckThreadProc()22 void ClusterEvents::RemoteCheckThreadProc()
23 {
24 Utility::SetThreadName("Remote Check Scheduler");
25
26 int maxConcurrentChecks = IcingaApplication::GetInstance()->GetMaxConcurrentChecks();
27
28 std::unique_lock<std::mutex> lock(m_Mutex);
29
30 for(;;) {
31 if (m_CheckRequestQueue.empty())
32 break;
33
34 lock.unlock();
35 Checkable::AquirePendingCheckSlot(maxConcurrentChecks);
36 lock.lock();
37
38 auto callback = m_CheckRequestQueue.front();
39 m_CheckRequestQueue.pop_front();
40 m_ChecksExecutedDuringInterval++;
41 lock.unlock();
42
43 callback();
44 Checkable::DecreasePendingChecks();
45
46 lock.lock();
47 }
48
49 m_CheckSchedulerRunning = false;
50 }
51
EnqueueCheck(const MessageOrigin::Ptr & origin,const Dictionary::Ptr & params)52 void ClusterEvents::EnqueueCheck(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
53 {
54 static boost::once_flag once = BOOST_ONCE_INIT;
55
56 boost::call_once(once, []() {
57 m_LogTimer = new Timer();
58 m_LogTimer->SetInterval(10);
59 m_LogTimer->OnTimerExpired.connect([](const Timer * const&) { LogRemoteCheckQueueInformation(); });
60 m_LogTimer->Start();
61 });
62
63 std::unique_lock<std::mutex> lock(m_Mutex);
64
65 if (m_CheckRequestQueue.size() >= 25000) {
66 m_ChecksDroppedDuringInterval++;
67 return;
68 }
69
70 m_CheckRequestQueue.emplace_back([origin, params]() { ExecuteCheckFromQueue(origin, params); });
71
72 if (!m_CheckSchedulerRunning) {
73 std::thread t(ClusterEvents::RemoteCheckThreadProc);
74 t.detach();
75 m_CheckSchedulerRunning = true;
76 }
77 }
78
SendEventExecutedCommand(const Dictionary::Ptr & params,long exitStatus,const String & output,double start,double end,const ApiListener::Ptr & listener,const MessageOrigin::Ptr & origin,const Endpoint::Ptr & sourceEndpoint)79 static void SendEventExecutedCommand(const Dictionary::Ptr& params, long exitStatus, const String& output,
80 double start, double end, const ApiListener::Ptr& listener, const MessageOrigin::Ptr& origin,
81 const Endpoint::Ptr& sourceEndpoint)
82 {
83 Dictionary::Ptr executedParams = new Dictionary();
84 executedParams->Set("execution", params->Get("source"));
85 executedParams->Set("host", params->Get("host"));
86
87 if (params->Contains("service"))
88 executedParams->Set("service", params->Get("service"));
89
90 executedParams->Set("exit", exitStatus);
91 executedParams->Set("output", output);
92 executedParams->Set("start", start);
93 executedParams->Set("end", end);
94
95 if (origin->IsLocal()) {
96 ClusterEvents::ExecutedCommandAPIHandler(origin, executedParams);
97 } else {
98 Dictionary::Ptr executedMessage = new Dictionary();
99 executedMessage->Set("jsonrpc", "2.0");
100 executedMessage->Set("method", "event::ExecutedCommand");
101 executedMessage->Set("params", executedParams);
102
103 listener->SyncSendMessage(sourceEndpoint, executedMessage);
104 }
105 }
106
ExecuteCheckFromQueue(const MessageOrigin::Ptr & origin,const Dictionary::Ptr & params)107 void ClusterEvents::ExecuteCheckFromQueue(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) {
108
109 Endpoint::Ptr sourceEndpoint;
110
111 if (origin->FromClient) {
112 sourceEndpoint = origin->FromClient->GetEndpoint();
113 } else if (origin->IsLocal()){
114 sourceEndpoint = Endpoint::GetLocalEndpoint();
115 }
116
117 if (!sourceEndpoint || (origin->FromZone && !Zone::GetLocalZone()->IsChildOf(origin->FromZone))) {
118 Log(LogNotice, "ClusterEvents")
119 << "Discarding 'execute command' message from '" << origin->FromClient->GetIdentity() << "': Invalid endpoint origin (client not allowed).";
120 return;
121 }
122
123 ApiListener::Ptr listener = ApiListener::GetInstance();
124
125 if (!listener) {
126 Log(LogCritical, "ApiListener") << "No instance available.";
127 return;
128 }
129
130 Defer resetExecuteCommandProcessFinishedHandler ([]() {
131 Checkable::ExecuteCommandProcessFinishedHandler = nullptr;
132 });
133
134 if (params->Contains("source")) {
135 String uuid = params->Get("source");
136
137 String checkableName = params->Get("host");
138
139 if (params->Contains("service"))
140 checkableName += "!" + params->Get("service");
141
142 /* Check deadline */
143 double deadline = params->Get("deadline");
144
145 if (Utility::GetTime() > deadline) {
146 Log(LogNotice, "ApiListener")
147 << "Discarding 'ExecuteCheckFromQueue' event for checkable '" << checkableName
148 << "' from '" << origin->FromClient->GetIdentity() << "': Deadline has expired.";
149 return;
150 }
151
152 Checkable::ExecuteCommandProcessFinishedHandler = [checkableName, listener, sourceEndpoint, origin, params] (const Value& commandLine, const ProcessResult& pr) {
153 if (params->Get("command_type") == "check_command") {
154 Checkable::CurrentConcurrentChecks.fetch_sub(1);
155 Checkable::DecreasePendingChecks();
156 }
157
158 if (pr.ExitStatus > 3) {
159 Process::Arguments parguments = Process::PrepareCommand(commandLine);
160 Log(LogWarning, "ApiListener")
161 << "Command for object '" << checkableName << "' (PID: " << pr.PID
162 << ", arguments: " << Process::PrettyPrintArguments(parguments) << ") terminated with exit code "
163 << pr.ExitStatus << ", output: " << pr.Output;
164 }
165
166 SendEventExecutedCommand(params, pr.ExitStatus, pr.Output, pr.ExecutionStart, pr.ExecutionEnd, listener,
167 origin, sourceEndpoint);
168 };
169 }
170
171 if (!listener->GetAcceptCommands() && !origin->IsLocal()) {
172 Log(LogWarning, "ApiListener")
173 << "Ignoring command. '" << listener->GetName() << "' does not accept commands.";
174
175 String output = "Endpoint '" + Endpoint::GetLocalEndpoint()->GetName() + "' does not accept commands.";
176
177 if (params->Contains("source")) {
178 double now = Utility::GetTime();
179 SendEventExecutedCommand(params, 126, output, now, now, listener, origin, sourceEndpoint);
180 } else {
181 Host::Ptr host = new Host();
182 Dictionary::Ptr attrs = new Dictionary();
183
184 attrs->Set("__name", params->Get("host"));
185 attrs->Set("type", "Host");
186 attrs->Set("enable_active_checks", false);
187
188 Deserialize(host, attrs, false, FAConfig);
189
190 if (params->Contains("service"))
191 host->SetExtension("agent_service_name", params->Get("service"));
192
193 CheckResult::Ptr cr = new CheckResult();
194 cr->SetState(ServiceUnknown);
195 cr->SetOutput(output);
196
197 Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
198 listener->SyncSendMessage(sourceEndpoint, message);
199 }
200
201 return;
202 }
203
204 /* use a virtual host object for executing the command */
205 Host::Ptr host = new Host();
206 Dictionary::Ptr attrs = new Dictionary();
207
208 attrs->Set("__name", params->Get("host"));
209 attrs->Set("type", "Host");
210
211 /*
212 * Override the check timeout if the parent caller provided the value. Compatible with older versions not
213 * passing this inside the cluster message.
214 * This happens with host/service command_endpoint agents and the 'check_timeout' attribute being specified.
215 */
216 if (params->Contains("check_timeout"))
217 attrs->Set("check_timeout", params->Get("check_timeout"));
218
219 Deserialize(host, attrs, false, FAConfig);
220
221 if (params->Contains("service"))
222 host->SetExtension("agent_service_name", params->Get("service"));
223
224 String command = params->Get("command");
225 String command_type = params->Get("command_type");
226
227 if (command_type == "check_command") {
228 if (!CheckCommand::GetByName(command)) {
229 ServiceState state = ServiceUnknown;
230 String output = "Check command '" + command + "' does not exist.";
231 double now = Utility::GetTime();
232
233 if (params->Contains("source")) {
234 SendEventExecutedCommand(params, state, output, now, now, listener, origin, sourceEndpoint);
235 } else {
236 CheckResult::Ptr cr = new CheckResult();
237 cr->SetState(state);
238 cr->SetOutput(output);
239 Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
240 listener->SyncSendMessage(sourceEndpoint, message);
241 }
242
243 return;
244 }
245 } else if (command_type == "event_command") {
246 if (!EventCommand::GetByName(command)) {
247 String output = "Event command '" + command + "' does not exist.";
248 Log(LogWarning, "ClusterEvents") << output;
249
250 if (params->Contains("source")) {
251 double now = Utility::GetTime();
252 SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
253 }
254
255 return;
256 }
257 } else if (command_type == "notification_command") {
258 if (!NotificationCommand::GetByName(command)) {
259 String output = "Notification command '" + command + "' does not exist.";
260 Log(LogWarning, "ClusterEvents") << output;
261
262 if (params->Contains("source")) {
263 double now = Utility::GetTime();
264 SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
265 }
266
267 return;
268 }
269 }
270
271 attrs->Set(command_type, params->Get("command"));
272 attrs->Set("command_endpoint", sourceEndpoint->GetName());
273
274 Deserialize(host, attrs, false, FAConfig);
275
276 host->SetExtension("agent_check", true);
277
278 Dictionary::Ptr macros = params->Get("macros");
279
280 if (command_type == "check_command") {
281 try {
282 host->ExecuteRemoteCheck(macros);
283 } catch (const std::exception& ex) {
284 String output = "Exception occurred while checking '" + host->GetName() + "': " + DiagnosticInformation(ex);
285 ServiceState state = ServiceUnknown;
286 double now = Utility::GetTime();
287
288 if (params->Contains("source")) {
289 SendEventExecutedCommand(params, state, output, now, now, listener, origin, sourceEndpoint);
290 } else {
291 CheckResult::Ptr cr = new CheckResult();
292 cr->SetState(state);
293 cr->SetOutput(output);
294 cr->SetScheduleStart(now);
295 cr->SetScheduleEnd(now);
296 cr->SetExecutionStart(now);
297 cr->SetExecutionEnd(now);
298
299 Dictionary::Ptr message = MakeCheckResultMessage(host, cr);
300 listener->SyncSendMessage(sourceEndpoint, message);
301 }
302
303 Log(LogCritical, "checker", output);
304 }
305 } else if (command_type == "event_command") {
306 try {
307 host->ExecuteEventHandler(macros, true);
308 } catch (const std::exception& ex) {
309 if (params->Contains("source")) {
310 String output = "Exception occurred while executing event command '" + command + "' for '" +
311 host->GetName() + "': " + DiagnosticInformation(ex);
312
313 double now = Utility::GetTime();
314 SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
315 } else {
316 throw;
317 }
318 }
319 } else if (command_type == "notification_command" && params->Contains("source")) {
320 /* Get user */
321 User::Ptr user = new User();
322 Dictionary::Ptr attrs = new Dictionary();
323 attrs->Set("__name", params->Get("user"));
324 attrs->Set("type", User::GetTypeName());
325
326 Deserialize(user, attrs, false, FAConfig);
327
328 /* Get notification */
329 Notification::Ptr notification = new Notification();
330 attrs->Clear();
331 attrs->Set("__name", params->Get("notification"));
332 attrs->Set("type", Notification::GetTypeName());
333 attrs->Set("command", command);
334
335 Deserialize(notification, attrs, false, FAConfig);
336
337 try {
338 CheckResult::Ptr cr = new CheckResult();
339 String author = macros->Get("notification_author");
340 NotificationCommand::Ptr notificationCommand = NotificationCommand::GetByName(command);
341
342 notificationCommand->Execute(notification, user, cr, NotificationType::NotificationCustom,
343 author, "");
344 } catch (const std::exception& ex) {
345 String output = "Exception occurred during notification '" + notification->GetName()
346 + "' for checkable '" + notification->GetCheckable()->GetName()
347 + "' and user '" + user->GetName() + "' using command '" + command + "': "
348 + DiagnosticInformation(ex, false);
349 double now = Utility::GetTime();
350 SendEventExecutedCommand(params, ServiceUnknown, output, now, now, listener, origin, sourceEndpoint);
351 }
352 }
353 }
354
GetCheckRequestQueueSize()355 int ClusterEvents::GetCheckRequestQueueSize()
356 {
357 return m_CheckRequestQueue.size();
358 }
359
LogRemoteCheckQueueInformation()360 void ClusterEvents::LogRemoteCheckQueueInformation() {
361 if (m_ChecksDroppedDuringInterval > 0) {
362 Log(LogCritical, "ClusterEvents")
363 << "Remote check queue ran out of slots. "
364 << m_ChecksDroppedDuringInterval << " checks dropped.";
365 m_ChecksDroppedDuringInterval = 0;
366 }
367
368 if (m_ChecksExecutedDuringInterval == 0)
369 return;
370
371 Log(LogInformation, "RemoteCheckQueue")
372 << "items: " << m_CheckRequestQueue.size()
373 << ", rate: " << m_ChecksExecutedDuringInterval / 10 << "/s "
374 << "(" << m_ChecksExecutedDuringInterval * 6 << "/min "
375 << m_ChecksExecutedDuringInterval * 6 * 5 << "/5min "
376 << m_ChecksExecutedDuringInterval * 6 * 15 << "/15min" << ");";
377
378 m_ChecksExecutedDuringInterval = 0;
379 }
380