1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "remote/apilistener.hpp"
4 #include "remote/apilistener-ti.cpp"
5 #include "remote/jsonrpcconnection.hpp"
6 #include "remote/endpoint.hpp"
7 #include "remote/jsonrpc.hpp"
8 #include "remote/apifunction.hpp"
9 #include "remote/configpackageutility.hpp"
10 #include "remote/configobjectutility.hpp"
11 #include "base/convert.hpp"
12 #include "base/defer.hpp"
13 #include "base/io-engine.hpp"
14 #include "base/netstring.hpp"
15 #include "base/json.hpp"
16 #include "base/configtype.hpp"
17 #include "base/logger.hpp"
18 #include "base/objectlock.hpp"
19 #include "base/stdiostream.hpp"
20 #include "base/perfdatavalue.hpp"
21 #include "base/application.hpp"
22 #include "base/context.hpp"
23 #include "base/statsfunction.hpp"
24 #include "base/exception.hpp"
25 #include "base/tcpsocket.hpp"
26 #include <boost/asio/buffer.hpp>
27 #include <boost/asio/io_context_strand.hpp>
28 #include <boost/asio/ip/tcp.hpp>
29 #include <boost/asio/spawn.hpp>
30 #include <boost/asio/ssl/context.hpp>
31 #include <boost/date_time/posix_time/posix_time_duration.hpp>
32 #include <boost/lexical_cast.hpp>
33 #include <boost/regex.hpp>
34 #include <boost/system/error_code.hpp>
35 #include <climits>
36 #include <cstdint>
37 #include <fstream>
38 #include <memory>
39 #include <openssl/ssl.h>
40 #include <openssl/tls1.h>
41 #include <openssl/x509.h>
42 #include <sstream>
43 #include <utility>
44
45 using namespace icinga;
46
47 REGISTER_TYPE(ApiListener);
48
49 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
50 ApiListener::Ptr ApiListener::m_Instance;
51
52 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
53
54 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
55
ApiListener()56 ApiListener::ApiListener()
57 {
58 m_RelayQueue.SetName("ApiListener, RelayQueue");
59 m_SyncQueue.SetName("ApiListener, SyncQueue");
60 }
61
GetApiDir()62 String ApiListener::GetApiDir()
63 {
64 return Configuration::DataDir + "/api/";
65 }
66
GetApiZonesDir()67 String ApiListener::GetApiZonesDir()
68 {
69 return GetApiDir() + "zones/";
70 }
71
GetApiZonesStageDir()72 String ApiListener::GetApiZonesStageDir()
73 {
74 return GetApiDir() + "zones-stage/";
75 }
76
GetCertsDir()77 String ApiListener::GetCertsDir()
78 {
79 return Configuration::DataDir + "/certs/";
80 }
81
GetCaDir()82 String ApiListener::GetCaDir()
83 {
84 return Configuration::DataDir + "/ca/";
85 }
86
GetCertificateRequestsDir()87 String ApiListener::GetCertificateRequestsDir()
88 {
89 return Configuration::DataDir + "/certificate-requests/";
90 }
91
GetDefaultCertPath()92 String ApiListener::GetDefaultCertPath()
93 {
94 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt";
95 }
96
GetDefaultKeyPath()97 String ApiListener::GetDefaultKeyPath()
98 {
99 return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key";
100 }
101
GetDefaultCaPath()102 String ApiListener::GetDefaultCaPath()
103 {
104 return GetCertsDir() + "/ca.crt";
105 }
106
GetTlsHandshakeTimeout() const107 double ApiListener::GetTlsHandshakeTimeout() const
108 {
109 return Configuration::TlsHandshakeTimeout;
110 }
111
SetTlsHandshakeTimeout(double value,bool suppress_events,const Value & cookie)112 void ApiListener::SetTlsHandshakeTimeout(double value, bool suppress_events, const Value& cookie)
113 {
114 Configuration::TlsHandshakeTimeout = value;
115 }
116
CopyCertificateFile(const String & oldCertPath,const String & newCertPath)117 void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath)
118 {
119 struct stat st1, st2;
120
121 if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) {
122 Log(LogWarning, "ApiListener")
123 << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'";
124
125 Utility::MkDirP(Utility::DirName(newCertPath), 0700);
126 Utility::CopyFile(oldCertPath, newCertPath);
127 }
128 }
129
OnConfigLoaded()130 void ApiListener::OnConfigLoaded()
131 {
132 if (m_Instance)
133 BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo()));
134
135 m_Instance = this;
136
137 String defaultCertPath = GetDefaultCertPath();
138 String defaultKeyPath = GetDefaultKeyPath();
139 String defaultCaPath = GetDefaultCaPath();
140
141 /* Migrate certificate location < 2.8 to the new default path. */
142 String oldCertPath = GetCertPath();
143 String oldKeyPath = GetKeyPath();
144 String oldCaPath = GetCaPath();
145
146 CopyCertificateFile(oldCertPath, defaultCertPath);
147 CopyCertificateFile(oldKeyPath, defaultKeyPath);
148 CopyCertificateFile(oldCaPath, defaultCaPath);
149
150 if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) {
151 Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/");
152 }
153
154 /* Create the internal API object storage. */
155 ConfigObjectUtility::CreateStorage();
156
157 /* Cache API packages and their active stage name. */
158 UpdateActivePackageStagesCache();
159
160 /* set up SSL context */
161 std::shared_ptr<X509> cert;
162 try {
163 cert = GetX509Certificate(defaultCertPath);
164 } catch (const std::exception&) {
165 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
166 + defaultCertPath + "'.", GetDebugInfo()));
167 }
168
169 try {
170 SetIdentity(GetCertificateCN(cert));
171 } catch (const std::exception&) {
172 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
173 + defaultCertPath + "'.", GetDebugInfo()));
174 }
175
176 Log(LogInformation, "ApiListener")
177 << "My API identity: " << GetIdentity();
178
179 UpdateSSLContext();
180 }
181
UpdateSSLContext()182 void ApiListener::UpdateSSLContext()
183 {
184 m_SSLContext = SetupSslContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath(), GetCrlPath(), GetCipherList(), GetTlsProtocolmin(), GetDebugInfo());
185
186 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
187 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
188 client->Disconnect();
189 }
190 }
191
192 for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
193 client->Disconnect();
194 }
195 }
196
OnAllConfigLoaded()197 void ApiListener::OnAllConfigLoaded()
198 {
199 m_LocalEndpoint = Endpoint::GetByName(GetIdentity());
200
201 if (!m_LocalEndpoint)
202 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
203 }
204
205 /**
206 * Starts the component.
207 */
Start(bool runtimeCreated)208 void ApiListener::Start(bool runtimeCreated)
209 {
210 Log(LogInformation, "ApiListener")
211 << "'" << GetName() << "' started.";
212
213 SyncLocalZoneDirs();
214
215 ObjectImpl<ApiListener>::Start(runtimeCreated);
216
217 {
218 std::unique_lock<std::mutex> lock(m_LogLock);
219 OpenLogFile();
220 }
221
222 /* create the primary JSON-RPC listener */
223 if (!AddListener(GetBindHost(), GetBindPort())) {
224 Log(LogCritical, "ApiListener")
225 << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
226 Application::Exit(EXIT_FAILURE);
227 }
228
229 m_Timer = new Timer();
230 m_Timer->OnTimerExpired.connect([this](const Timer * const&) { ApiTimerHandler(); });
231 m_Timer->SetInterval(5);
232 m_Timer->Start();
233 m_Timer->Reschedule(0);
234
235 m_ReconnectTimer = new Timer();
236 m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ApiReconnectTimerHandler(); });
237 m_ReconnectTimer->SetInterval(10);
238 m_ReconnectTimer->Start();
239 m_ReconnectTimer->Reschedule(0);
240
241 /* Keep this in relative sync with the cold startup in UpdateObjectAuthority() and the reconnect interval above.
242 * Previous: 60s reconnect, 30s OA, 60s cold startup.
243 * Now: 10s reconnect, 10s OA, 30s cold startup.
244 */
245 m_AuthorityTimer = new Timer();
246 m_AuthorityTimer->OnTimerExpired.connect([](const Timer * const&) { UpdateObjectAuthority(); });
247 m_AuthorityTimer->SetInterval(10);
248 m_AuthorityTimer->Start();
249
250 m_CleanupCertificateRequestsTimer = new Timer();
251 m_CleanupCertificateRequestsTimer->OnTimerExpired.connect([this](const Timer * const&) { CleanupCertificateRequestsTimerHandler(); });
252 m_CleanupCertificateRequestsTimer->SetInterval(3600);
253 m_CleanupCertificateRequestsTimer->Start();
254 m_CleanupCertificateRequestsTimer->Reschedule(0);
255
256 m_ApiPackageIntegrityTimer = new Timer();
257 m_ApiPackageIntegrityTimer->OnTimerExpired.connect([this](const Timer * const&) { CheckApiPackageIntegrity(); });
258 m_ApiPackageIntegrityTimer->SetInterval(300);
259 m_ApiPackageIntegrityTimer->Start();
260
261 OnMasterChanged(true);
262 }
263
Stop(bool runtimeDeleted)264 void ApiListener::Stop(bool runtimeDeleted)
265 {
266 ObjectImpl<ApiListener>::Stop(runtimeDeleted);
267
268 Log(LogInformation, "ApiListener")
269 << "'" << GetName() << "' stopped.";
270
271 {
272 std::unique_lock<std::mutex> lock(m_LogLock);
273 CloseLogFile();
274 RotateLogFile();
275 }
276
277 RemoveStatusFile();
278 }
279
GetInstance()280 ApiListener::Ptr ApiListener::GetInstance()
281 {
282 return m_Instance;
283 }
284
GetMaster() const285 Endpoint::Ptr ApiListener::GetMaster() const
286 {
287 Zone::Ptr zone = Zone::GetLocalZone();
288
289 if (!zone)
290 return nullptr;
291
292 std::vector<String> names;
293
294 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints())
295 if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity())
296 names.push_back(endpoint->GetName());
297
298 std::sort(names.begin(), names.end());
299
300 return Endpoint::GetByName(*names.begin());
301 }
302
IsMaster() const303 bool ApiListener::IsMaster() const
304 {
305 Endpoint::Ptr master = GetMaster();
306
307 if (!master)
308 return false;
309
310 return master == GetLocalEndpoint();
311 }
312
313 /**
314 * Creates a new JSON-RPC listener on the specified port.
315 *
316 * @param node The host the listener should be bound to.
317 * @param service The port to listen on.
318 */
AddListener(const String & node,const String & service)319 bool ApiListener::AddListener(const String& node, const String& service)
320 {
321 namespace asio = boost::asio;
322 namespace ip = asio::ip;
323 using ip::tcp;
324
325 ObjectLock olock(this);
326
327 if (!m_SSLContext) {
328 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
329 return false;
330 }
331
332 auto& io (IoEngine::Get().GetIoContext());
333 auto acceptor (Shared<tcp::acceptor>::Make(io));
334
335 try {
336 tcp::resolver resolver (io);
337 tcp::resolver::query query (node, service, tcp::resolver::query::passive);
338
339 auto result (resolver.resolve(query));
340 auto current (result.begin());
341
342 for (;;) {
343 try {
344 acceptor->open(current->endpoint().protocol());
345
346 {
347 auto fd (acceptor->native_handle());
348
349 const int optFalse = 0;
350 setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse));
351
352 const int optTrue = 1;
353 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue));
354 #ifdef SO_REUSEPORT
355 setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue));
356 #endif /* SO_REUSEPORT */
357 }
358
359 acceptor->bind(current->endpoint());
360
361 break;
362 } catch (const std::exception&) {
363 if (++current == result.end()) {
364 throw;
365 }
366
367 if (acceptor->is_open()) {
368 acceptor->close();
369 }
370 }
371 }
372 } catch (const std::exception& ex) {
373 Log(LogCritical, "ApiListener")
374 << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "': " << ex.what();
375 return false;
376 }
377
378 acceptor->listen(INT_MAX);
379
380 auto localEndpoint (acceptor->local_endpoint());
381
382 Log(LogInformation, "ApiListener")
383 << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
384
385 IoEngine::SpawnCoroutine(io, [this, acceptor](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, m_SSLContext); });
386
387 UpdateStatusFile(localEndpoint);
388
389 return true;
390 }
391
ListenerCoroutineProc(boost::asio::yield_context yc,const Shared<boost::asio::ip::tcp::acceptor>::Ptr & server,const Shared<boost::asio::ssl::context>::Ptr & sslContext)392 void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Shared<boost::asio::ip::tcp::acceptor>::Ptr& server, const Shared<boost::asio::ssl::context>::Ptr& sslContext)
393 {
394 namespace asio = boost::asio;
395
396 auto& io (IoEngine::Get().GetIoContext());
397
398 time_t lastModified = -1;
399 const String crlPath = GetCrlPath();
400
401 if (!crlPath.IsEmpty()) {
402 lastModified = Utility::GetFileCreationTime(crlPath);
403 }
404
405 for (;;) {
406 try {
407 asio::ip::tcp::socket socket (io);
408
409 server->async_accept(socket.lowest_layer(), yc);
410
411 if (!crlPath.IsEmpty()) {
412 time_t currentCreationTime = Utility::GetFileCreationTime(crlPath);
413
414 if (lastModified != currentCreationTime) {
415 UpdateSSLContext();
416
417 lastModified = currentCreationTime;
418 }
419 }
420
421 auto sslConn (Shared<AsioTlsStream>::Make(io, *sslContext));
422 sslConn->lowest_layer() = std::move(socket);
423
424 auto strand (Shared<asio::io_context::strand>::Make(io));
425
426 IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn](asio::yield_context yc) {
427 Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
428 [sslConn](asio::yield_context yc) {
429 Log(LogWarning, "ApiListener")
430 << "Timeout while processing incoming connection from "
431 << sslConn->lowest_layer().remote_endpoint();
432
433 boost::system::error_code ec;
434 sslConn->lowest_layer().cancel(ec);
435 }
436 ));
437 Defer cancelTimeout([timeout]() { timeout->Cancel(); });
438
439 NewClientHandler(yc, strand, sslConn, String(), RoleServer);
440 });
441 } catch (const std::exception& ex) {
442 Log(LogCritical, "ApiListener")
443 << "Cannot accept new connection: " << ex.what();
444 }
445 }
446 }
447
448 /**
449 * Creates a new JSON-RPC client and connects to the specified endpoint.
450 *
451 * @param endpoint The endpoint.
452 */
AddConnection(const Endpoint::Ptr & endpoint)453 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
454 {
455 namespace asio = boost::asio;
456 using asio::ip::tcp;
457
458 if (!m_SSLContext) {
459 Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
460 return;
461 }
462
463 auto& io (IoEngine::Get().GetIoContext());
464 auto strand (Shared<asio::io_context::strand>::Make(io));
465
466 IoEngine::SpawnCoroutine(*strand, [this, strand, endpoint, &io](asio::yield_context yc) {
467 String host = endpoint->GetHost();
468 String port = endpoint->GetPort();
469
470 Log(LogInformation, "ApiListener")
471 << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
472
473 try {
474 auto sslConn (Shared<AsioTlsStream>::Make(io, *m_SSLContext, endpoint->GetName()));
475
476 Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
477 [sslConn, endpoint, host, port](asio::yield_context yc) {
478 Log(LogCritical, "ApiListener")
479 << "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host
480 << "' and port '" << port << "', cancelling attempt";
481
482 boost::system::error_code ec;
483 sslConn->lowest_layer().cancel(ec);
484 }
485 ));
486 Defer cancelTimeout([&timeout]() { timeout->Cancel(); });
487
488 Connect(sslConn->lowest_layer(), host, port, yc);
489
490 NewClientHandler(yc, strand, sslConn, endpoint->GetName(), RoleClient);
491
492 endpoint->SetConnecting(false);
493 Log(LogInformation, "ApiListener")
494 << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
495 } catch (const std::exception& ex) {
496 endpoint->SetConnecting(false);
497
498 Log(LogCritical, "ApiListener")
499 << "Cannot connect to host '" << host << "' on port '" << port << "': " << ex.what();
500 }
501 });
502 }
503
NewClientHandler(boost::asio::yield_context yc,const Shared<boost::asio::io_context::strand>::Ptr & strand,const Shared<AsioTlsStream>::Ptr & client,const String & hostname,ConnectionRole role)504 void ApiListener::NewClientHandler(
505 boost::asio::yield_context yc, const Shared<boost::asio::io_context::strand>::Ptr& strand,
506 const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role
507 )
508 {
509 try {
510 NewClientHandlerInternal(yc, strand, client, hostname, role);
511 } catch (const std::exception& ex) {
512 Log(LogCritical, "ApiListener")
513 << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
514
515 Log(LogDebug, "ApiListener")
516 << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
517 }
518 }
519
__anon290b30dd0d02() 520 static const auto l_AppVersionInt (([]() -> unsigned long {
521 auto appVersion (Application::GetAppVersion());
522 boost::regex rgx (R"EOF(^[rv]?(\d+)\.(\d+)\.(\d+))EOF");
523 boost::smatch match;
524
525 if (!boost::regex_search(appVersion.GetData(), match, rgx)) {
526 return 0;
527 }
528
529 return 100u * 100u * boost::lexical_cast<unsigned long>(match[1].str())
530 + 100u * boost::lexical_cast<unsigned long>(match[2].str())
531 + boost::lexical_cast<unsigned long>(match[3].str());
532 })());
533
534 static const auto l_MyCapabilities (ApiCapabilities::ExecuteArbitraryCommand);
535
536 /**
537 * Processes a new client connection.
538 *
539 * @param client The new client.
540 */
NewClientHandlerInternal(boost::asio::yield_context yc,const Shared<boost::asio::io_context::strand>::Ptr & strand,const Shared<AsioTlsStream>::Ptr & client,const String & hostname,ConnectionRole role)541 void ApiListener::NewClientHandlerInternal(
542 boost::asio::yield_context yc, const Shared<boost::asio::io_context::strand>::Ptr& strand,
543 const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role
544 )
545 {
546 namespace asio = boost::asio;
547 namespace ssl = asio::ssl;
548
549 String conninfo;
550
551 {
552 std::ostringstream conninfo_;
553
554 if (role == RoleClient) {
555 conninfo_ << "to";
556 } else {
557 conninfo_ << "from";
558 }
559
560 auto endpoint (client->lowest_layer().remote_endpoint());
561
562 conninfo_ << " [" << endpoint.address() << "]:" << endpoint.port();
563
564 conninfo = conninfo_.str();
565 }
566
567 auto& sslConn (client->next_layer());
568
569 boost::system::error_code ec;
570
571 {
572 Timeout::Ptr handshakeTimeout (new Timeout(
573 strand->context(),
574 *strand,
575 boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)),
576 [strand, client](asio::yield_context yc) {
577 boost::system::error_code ec;
578 client->lowest_layer().cancel(ec);
579 }
580 ));
581
582 sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]);
583
584 handshakeTimeout->Cancel();
585 }
586
587 if (ec) {
588 // https://github.com/boostorg/beast/issues/915
589 // Google Chrome 73+ seems not close the connection properly, https://stackoverflow.com/questions/56272906/how-to-fix-certificate-unknown-error-from-chrome-v73
590 if (ec == asio::ssl::error::stream_truncated) {
591 Log(LogNotice, "ApiListener")
592 << "TLS stream was truncated, ignoring connection from " << conninfo;
593 return;
594 }
595
596 Log(LogCritical, "ApiListener")
597 << "Client TLS handshake failed (" << conninfo << "): " << ec.message();
598 return;
599 }
600
601 bool willBeShutDown = false;
602
603 Defer shutDownIfNeeded ([&sslConn, &willBeShutDown, &yc]() {
604 if (!willBeShutDown) {
605 // Ignore the error, but do not throw an exception being swallowed at all cost.
606 // https://github.com/Icinga/icinga2/issues/7351
607 boost::system::error_code ec;
608 sslConn.async_shutdown(yc[ec]);
609 }
610 });
611
612 std::shared_ptr<X509> cert (sslConn.GetPeerCertificate());
613 bool verify_ok = false;
614 String identity;
615 Endpoint::Ptr endpoint;
616
617 if (cert) {
618 verify_ok = sslConn.IsVerifyOK();
619
620 String verifyError = sslConn.GetVerifyError();
621
622 try {
623 identity = GetCertificateCN(cert);
624 } catch (const std::exception&) {
625 Log(LogCritical, "ApiListener")
626 << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
627 return;
628 }
629
630 if (!hostname.IsEmpty()) {
631 if (identity != hostname) {
632 Log(LogWarning, "ApiListener")
633 << "Unexpected certificate common name while connecting to endpoint '"
634 << hostname << "': got '" << identity << "'";
635 return;
636 } else if (!verify_ok) {
637 Log(LogWarning, "ApiListener")
638 << "Certificate validation failed for endpoint '" << hostname
639 << "': " << verifyError;
640 }
641 }
642
643 if (verify_ok) {
644 endpoint = Endpoint::GetByName(identity);
645 }
646
647 Log log(LogInformation, "ApiListener");
648
649 log << "New client connection for identity '" << identity << "' " << conninfo;
650
651 if (!verify_ok) {
652 log << " (certificate validation failed: " << verifyError << ")";
653 } else if (!endpoint) {
654 log << " (no Endpoint object found for identity)";
655 }
656 } else {
657 Log(LogInformation, "ApiListener")
658 << "New client connection " << conninfo << " (no client certificate)";
659 }
660
661 ClientType ctype;
662
663 if (role == RoleClient) {
664 JsonRpc::SendMessage(client, new Dictionary({
665 { "jsonrpc", "2.0" },
666 { "method", "icinga::Hello" },
667 { "params", new Dictionary({
668 { "version", (double)l_AppVersionInt },
669 { "capabilities", (double)l_MyCapabilities }
670 }) }
671 }), yc);
672
673 client->async_flush(yc);
674
675 ctype = ClientJsonRpc;
676 } else {
677 {
678 boost::system::error_code ec;
679
680 if (client->async_fill(yc[ec]) == 0u) {
681 if (identity.IsEmpty()) {
682 Log(LogInformation, "ApiListener")
683 << "No data received on new API connection " << conninfo << ". "
684 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
685 } else {
686 Log(LogWarning, "ApiListener")
687 << "No data received on new API connection " << conninfo << " for identity '" << identity << "'. "
688 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
689 }
690
691 return;
692 }
693 }
694
695 char firstByte = 0;
696
697 {
698 asio::mutable_buffer firstByteBuf (&firstByte, 1);
699 client->peek(firstByteBuf);
700 }
701
702 if (firstByte >= '0' && firstByte <= '9') {
703 JsonRpc::SendMessage(client, new Dictionary({
704 { "jsonrpc", "2.0" },
705 { "method", "icinga::Hello" },
706 { "params", new Dictionary({
707 { "version", (double)l_AppVersionInt },
708 { "capabilities", (double)l_MyCapabilities }
709 }) }
710 }), yc);
711
712 client->async_flush(yc);
713
714 ctype = ClientJsonRpc;
715 } else {
716 ctype = ClientHttp;
717 }
718 }
719
720 if (ctype == ClientJsonRpc) {
721 Log(LogNotice, "ApiListener", "New JSON-RPC client");
722
723 if (endpoint && endpoint->GetConnected()) {
724 Log(LogNotice, "ApiListener")
725 << "Ignoring JSON-RPC connection " << conninfo
726 << ". We're already connected to Endpoint '" << endpoint->GetName() << "'.";
727 return;
728 }
729
730 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
731
732 if (endpoint) {
733 endpoint->AddClient(aclient);
734
735 Utility::QueueAsyncCallback([this, aclient, endpoint]() {
736 SyncClient(aclient, endpoint, true);
737 });
738 } else if (!AddAnonymousClient(aclient)) {
739 Log(LogNotice, "ApiListener")
740 << "Ignoring anonymous JSON-RPC connection " << conninfo
741 << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
742
743 aclient = nullptr;
744 }
745
746 if (aclient) {
747 aclient->Start();
748
749 willBeShutDown = true;
750 }
751 } else {
752 Log(LogNotice, "ApiListener", "New HTTP client");
753
754 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client);
755 AddHttpClient(aclient);
756 aclient->Start();
757
758 willBeShutDown = true;
759 }
760 }
761
SyncClient(const JsonRpcConnection::Ptr & aclient,const Endpoint::Ptr & endpoint,bool needSync)762 void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync)
763 {
764 Zone::Ptr eZone = endpoint->GetZone();
765
766 try {
767 {
768 ObjectLock olock(endpoint);
769
770 endpoint->SetSyncing(true);
771 }
772
773 Zone::Ptr myZone = Zone::GetLocalZone();
774
775 if (myZone->GetParent() == eZone) {
776 Log(LogInformation, "ApiListener")
777 << "Requesting new certificate for this Icinga instance from endpoint '" << endpoint->GetName() << "'.";
778
779 JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String());
780
781 if (Utility::PathExists(ApiListener::GetCertificateRequestsDir())) {
782 Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", [aclient](const String& newPath) {
783 JsonRpcConnection::SendCertificateRequest(aclient, nullptr, newPath);
784 }, GlobFile);
785 }
786 }
787
788 /* Make sure that the config updates are synced
789 * before the logs are replayed.
790 */
791
792 Log(LogInformation, "ApiListener")
793 << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
794
795 /* sync zone file config */
796 SendConfigUpdate(aclient);
797
798 Log(LogInformation, "ApiListener")
799 << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
800
801 /* sync runtime config */
802 SendRuntimeConfigObjects(aclient);
803
804 Log(LogInformation, "ApiListener")
805 << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
806
807 if (!needSync) {
808 ObjectLock olock2(endpoint);
809 endpoint->SetSyncing(false);
810 return;
811 }
812
813 Log(LogInformation, "ApiListener")
814 << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
815
816 ReplayLog(aclient);
817
818 if (eZone == Zone::GetLocalZone())
819 UpdateObjectAuthority();
820
821 Log(LogInformation, "ApiListener")
822 << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
823 } catch (const std::exception& ex) {
824 {
825 ObjectLock olock2(endpoint);
826 endpoint->SetSyncing(false);
827 }
828
829 Log(LogCritical, "ApiListener")
830 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
831
832 Log(LogDebug, "ApiListener")
833 << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
834 }
835
836 Log(LogInformation, "ApiListener")
837 << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
838 }
839
ApiTimerHandler()840 void ApiListener::ApiTimerHandler()
841 {
842 double now = Utility::GetTime();
843
844 std::vector<int> files;
845 Utility::Glob(GetApiDir() + "log/*", [&files](const String& file) { LogGlobHandler(files, file); }, GlobFile);
846 std::sort(files.begin(), files.end());
847
848 for (int ts : files) {
849 bool need = false;
850 auto localZone (GetLocalEndpoint()->GetZone());
851
852 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
853 if (endpoint == GetLocalEndpoint())
854 continue;
855
856 auto zone (endpoint->GetZone());
857
858 /* only care for endpoints in a) the same zone b) our parent zone c) immediate child zones */
859 if (!(zone == localZone || zone == localZone->GetParent() || zone->GetParent() == localZone)) {
860 continue;
861 }
862
863 if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
864 continue;
865
866 if (ts > endpoint->GetLocalLogPosition()) {
867 need = true;
868 break;
869 }
870 }
871
872 if (!need) {
873 String path = GetApiDir() + "log/" + Convert::ToString(ts);
874 Log(LogNotice, "ApiListener")
875 << "Removing old log file: " << path;
876 (void)unlink(path.CStr());
877 }
878 }
879
880 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
881 if (!endpoint->GetConnected())
882 continue;
883
884 double ts = endpoint->GetRemoteLogPosition();
885
886 if (ts == 0)
887 continue;
888
889 Dictionary::Ptr lmessage = new Dictionary({
890 { "jsonrpc", "2.0" },
891 { "method", "log::SetLogPosition" },
892 { "params", new Dictionary({
893 { "log_position", ts }
894 }) }
895 });
896
897 double maxTs = 0;
898
899 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
900 if (client->GetTimestamp() > maxTs)
901 maxTs = client->GetTimestamp();
902 }
903
904 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
905 if (client->GetTimestamp() == maxTs) {
906 client->SendMessage(lmessage);
907 } else {
908 client->Disconnect();
909 }
910 }
911
912 Log(LogNotice, "ApiListener")
913 << "Setting log position for identity '" << endpoint->GetName() << "': "
914 << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
915 }
916 }
917
ApiReconnectTimerHandler()918 void ApiListener::ApiReconnectTimerHandler()
919 {
920 Zone::Ptr my_zone = Zone::GetLocalZone();
921
922 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
923 /* don't connect to global zones */
924 if (zone->GetGlobal())
925 continue;
926
927 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
928 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
929 Log(LogDebug, "ApiListener")
930 << "Not connecting to Zone '" << zone->GetName()
931 << "' because it's not in the same zone, a parent or a child zone.";
932 continue;
933 }
934
935 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
936 /* don't connect to ourselves */
937 if (endpoint == GetLocalEndpoint()) {
938 Log(LogDebug, "ApiListener")
939 << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
940 continue;
941 }
942
943 /* don't try to connect to endpoints which don't have a host and port */
944 if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
945 Log(LogDebug, "ApiListener")
946 << "Not connecting to Endpoint '" << endpoint->GetName()
947 << "' because the host/port attributes are missing.";
948 continue;
949 }
950
951 /* don't try to connect if there's already a connection attempt */
952 if (endpoint->GetConnecting()) {
953 Log(LogDebug, "ApiListener")
954 << "Not connecting to Endpoint '" << endpoint->GetName()
955 << "' because we're already trying to connect to it.";
956 continue;
957 }
958
959 /* don't try to connect if we're already connected */
960 if (endpoint->GetConnected()) {
961 Log(LogDebug, "ApiListener")
962 << "Not connecting to Endpoint '" << endpoint->GetName()
963 << "' because we're already connected to it.";
964 continue;
965 }
966
967 /* Set connecting state to prevent duplicated queue inserts later. */
968 endpoint->SetConnecting(true);
969
970 AddConnection(endpoint);
971 }
972 }
973
974 Endpoint::Ptr master = GetMaster();
975
976 if (master)
977 Log(LogNotice, "ApiListener")
978 << "Current zone master: " << master->GetName();
979
980 std::vector<String> names;
981 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>())
982 if (endpoint->GetConnected())
983 names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
984
985 Log(LogNotice, "ApiListener")
986 << "Connected endpoints: " << Utility::NaturalJoin(names);
987 }
988
CleanupCertificateRequest(const String & path,double expiryTime)989 static void CleanupCertificateRequest(const String& path, double expiryTime)
990 {
991 #ifndef _WIN32
992 struct stat statbuf;
993 if (lstat(path.CStr(), &statbuf) < 0)
994 return;
995 #else /* _WIN32 */
996 struct _stat statbuf;
997 if (_stat(path.CStr(), &statbuf) < 0)
998 return;
999 #endif /* _WIN32 */
1000
1001 if (statbuf.st_mtime < expiryTime)
1002 (void) unlink(path.CStr());
1003 }
1004
CleanupCertificateRequestsTimerHandler()1005 void ApiListener::CleanupCertificateRequestsTimerHandler()
1006 {
1007 String requestsDir = GetCertificateRequestsDir();
1008
1009 if (Utility::PathExists(requestsDir)) {
1010 /* remove certificate requests that are older than a week */
1011 double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60;
1012 Utility::Glob(requestsDir + "/*.json", [expiryTime](const String& path) {
1013 CleanupCertificateRequest(path, expiryTime);
1014 }, GlobFile);
1015 }
1016 }
1017
RelayMessage(const MessageOrigin::Ptr & origin,const ConfigObject::Ptr & secobj,const Dictionary::Ptr & message,bool log)1018 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
1019 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
1020 {
1021 if (!IsActive())
1022 return;
1023
1024 m_RelayQueue.Enqueue([this, origin, secobj, message, log]() { SyncRelayMessage(origin, secobj, message, log); }, PriorityNormal, true);
1025 }
1026
PersistMessage(const Dictionary::Ptr & message,const ConfigObject::Ptr & secobj)1027 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
1028 {
1029 double ts = message->Get("ts");
1030
1031 ASSERT(ts != 0);
1032
1033 Dictionary::Ptr pmessage = new Dictionary();
1034 pmessage->Set("timestamp", ts);
1035
1036 pmessage->Set("message", JsonEncode(message));
1037
1038 if (secobj) {
1039 Dictionary::Ptr secname = new Dictionary();
1040 secname->Set("type", secobj->GetReflectionType()->GetName());
1041 secname->Set("name", secobj->GetName());
1042 pmessage->Set("secobj", secname);
1043 }
1044
1045 std::unique_lock<std::mutex> lock(m_LogLock);
1046 if (m_LogFile) {
1047 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
1048 m_LogMessageCount++;
1049 SetLogMessageTimestamp(ts);
1050
1051 if (m_LogMessageCount > 50000) {
1052 CloseLogFile();
1053 RotateLogFile();
1054 OpenLogFile();
1055 }
1056 }
1057 }
1058
SyncSendMessage(const Endpoint::Ptr & endpoint,const Dictionary::Ptr & message)1059 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
1060 {
1061 ObjectLock olock(endpoint);
1062
1063 if (!endpoint->GetSyncing()) {
1064 Log(LogNotice, "ApiListener")
1065 << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'";
1066
1067 double maxTs = 0;
1068
1069 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
1070 if (client->GetTimestamp() > maxTs)
1071 maxTs = client->GetTimestamp();
1072 }
1073
1074 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
1075 if (client->GetTimestamp() != maxTs)
1076 continue;
1077
1078 client->SendMessage(message);
1079 }
1080 }
1081 }
1082
1083 /**
1084 * Relay a message to a directly connected zone or to a global zone.
1085 * If some other zone is passed as the target zone, it is not relayed.
1086 *
1087 * @param targetZone The zone to relay to
1088 * @param origin Information about where this message is relayed from (if it was not generated locally)
1089 * @param message The message to relay
1090 * @param currentZoneMaster The current master node of the local zone
1091 * @return true if the message has been relayed to all relevant endpoints,
1092 * false if it hasn't and must be persisted in the replay log
1093 */
RelayMessageOne(const Zone::Ptr & targetZone,const MessageOrigin::Ptr & origin,const Dictionary::Ptr & message,const Endpoint::Ptr & currentZoneMaster)1094 bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentZoneMaster)
1095 {
1096 ASSERT(targetZone);
1097
1098 Zone::Ptr localZone = Zone::GetLocalZone();
1099
1100 /* only relay the message to a) the same local zone, b) the parent zone and c) direct child zones. Exception is a global zone. */
1101 if (!targetZone->GetGlobal() &&
1102 targetZone != localZone &&
1103 targetZone != localZone->GetParent() &&
1104 targetZone->GetParent() != localZone) {
1105 return true;
1106 }
1107
1108 Endpoint::Ptr localEndpoint = GetLocalEndpoint();
1109
1110 std::vector<Endpoint::Ptr> skippedEndpoints;
1111
1112 std::set<Zone::Ptr> allTargetZones;
1113 if (targetZone->GetGlobal()) {
1114 /* if the zone is global, the message has to be relayed to our local zone and direct children */
1115 allTargetZones.insert(localZone);
1116 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1117 if (zone->GetParent() == localZone) {
1118 allTargetZones.insert(zone);
1119 }
1120 }
1121 } else {
1122 /* whereas if it's not global, the message is just relayed to the zone itself */
1123 allTargetZones.insert(targetZone);
1124 }
1125
1126 bool needsReplay = false;
1127
1128 for (const Zone::Ptr& currentTargetZone : allTargetZones) {
1129 bool relayed = false, log_needed = false, log_done = false;
1130
1131 for (const Endpoint::Ptr& targetEndpoint : currentTargetZone->GetEndpoints()) {
1132 /* Don't relay messages to ourselves. */
1133 if (targetEndpoint == localEndpoint)
1134 continue;
1135
1136 log_needed = true;
1137
1138 /* Don't relay messages to disconnected endpoints. */
1139 if (!targetEndpoint->GetConnected()) {
1140 if (currentTargetZone == localZone)
1141 log_done = false;
1142
1143 continue;
1144 }
1145
1146 log_done = true;
1147
1148 /* Don't relay the message to the zone through more than one endpoint unless this is our own zone.
1149 * 'relayed' is set to true on success below, enabling the checks in the second iteration.
1150 */
1151 if (relayed && currentTargetZone != localZone) {
1152 skippedEndpoints.push_back(targetEndpoint);
1153 continue;
1154 }
1155
1156 /* Don't relay messages back to the endpoint which we got the message from. */
1157 if (origin && origin->FromClient && targetEndpoint == origin->FromClient->GetEndpoint()) {
1158 skippedEndpoints.push_back(targetEndpoint);
1159 continue;
1160 }
1161
1162 /* Don't relay messages back to the zone which we got the message from. */
1163 if (origin && origin->FromZone && currentTargetZone == origin->FromZone) {
1164 skippedEndpoints.push_back(targetEndpoint);
1165 continue;
1166 }
1167
1168 /* Only relay message to the zone master if we're not currently the zone master.
1169 * e1 is zone master, e2 and e3 are zone members.
1170 *
1171 * Message is sent from e2 or e3:
1172 * !isMaster == true
1173 * targetEndpoint e1 is zone master -> send the message
1174 * targetEndpoint e3 is not zone master -> skip it, avoid routing loops
1175 *
1176 * Message is sent from e1:
1177 * !isMaster == false -> send the messages to e2 and e3 being the zone routing master.
1178 */
1179 bool isMaster = (currentZoneMaster == localEndpoint);
1180
1181 if (!isMaster && targetEndpoint != currentZoneMaster) {
1182 skippedEndpoints.push_back(targetEndpoint);
1183 continue;
1184 }
1185
1186 relayed = true;
1187
1188 SyncSendMessage(targetEndpoint, message);
1189 }
1190
1191 if (log_needed && !log_done) {
1192 needsReplay = true;
1193 }
1194 }
1195
1196 if (!skippedEndpoints.empty()) {
1197 double ts = message->Get("ts");
1198
1199 for (const Endpoint::Ptr& skippedEndpoint : skippedEndpoints)
1200 skippedEndpoint->SetLocalLogPosition(ts);
1201 }
1202
1203 return !needsReplay;
1204 }
1205
SyncRelayMessage(const MessageOrigin::Ptr & origin,const ConfigObject::Ptr & secobj,const Dictionary::Ptr & message,bool log)1206 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
1207 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
1208 {
1209 double ts = Utility::GetTime();
1210 message->Set("ts", ts);
1211
1212 Log(LogNotice, "ApiListener")
1213 << "Relaying '" << message->Get("method") << "' message";
1214
1215 if (origin && origin->FromZone)
1216 message->Set("originZone", origin->FromZone->GetName());
1217
1218 Zone::Ptr target_zone;
1219
1220 if (secobj) {
1221 if (secobj->GetReflectionType() == Zone::TypeInstance)
1222 target_zone = static_pointer_cast<Zone>(secobj);
1223 else
1224 target_zone = static_pointer_cast<Zone>(secobj->GetZone());
1225 }
1226
1227 if (!target_zone)
1228 target_zone = Zone::GetLocalZone();
1229
1230 Endpoint::Ptr master = GetMaster();
1231
1232 bool need_log = !RelayMessageOne(target_zone, origin, message, master);
1233
1234 for (const Zone::Ptr& zone : target_zone->GetAllParentsRaw()) {
1235 if (!RelayMessageOne(zone, origin, message, master))
1236 need_log = true;
1237 }
1238
1239 if (log && need_log)
1240 PersistMessage(message, secobj);
1241 }
1242
1243 /* must hold m_LogLock */
OpenLogFile()1244 void ApiListener::OpenLogFile()
1245 {
1246 String path = GetApiDir() + "log/current";
1247
1248 Utility::MkDirP(Utility::DirName(path), 0750);
1249
1250 auto *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
1251
1252 if (!fp->good()) {
1253 Log(LogWarning, "ApiListener")
1254 << "Could not open spool file: " << path;
1255 return;
1256 }
1257
1258 m_LogFile = new StdioStream(fp, true);
1259 m_LogMessageCount = 0;
1260 SetLogMessageTimestamp(Utility::GetTime());
1261 }
1262
1263 /* must hold m_LogLock */
CloseLogFile()1264 void ApiListener::CloseLogFile()
1265 {
1266 if (!m_LogFile)
1267 return;
1268
1269 m_LogFile->Close();
1270 m_LogFile.reset();
1271 }
1272
1273 /* must hold m_LogLock */
RotateLogFile()1274 void ApiListener::RotateLogFile()
1275 {
1276 double ts = GetLogMessageTimestamp();
1277
1278 if (ts == 0)
1279 ts = Utility::GetTime();
1280
1281 String oldpath = GetApiDir() + "log/current";
1282 String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
1283
1284 // If the log is being rotated more than once per second,
1285 // don't overwrite the previous one, but silently deny rotation.
1286 if (!Utility::PathExists(newpath)) {
1287 try {
1288 Utility::RenameFile(oldpath, newpath);
1289 } catch (const std::exception& ex) {
1290 Log(LogCritical, "ApiListener")
1291 << "Cannot rotate replay log file from '" << oldpath << "' to '"
1292 << newpath << "': " << ex.what();
1293 }
1294 }
1295 }
1296
LogGlobHandler(std::vector<int> & files,const String & file)1297 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
1298 {
1299 String name = Utility::BaseName(file);
1300
1301 if (name == "current")
1302 return;
1303
1304 int ts;
1305
1306 try {
1307 ts = Convert::ToLong(name);
1308 } catch (const std::exception&) {
1309 return;
1310 }
1311
1312 files.push_back(ts);
1313 }
1314
ReplayLog(const JsonRpcConnection::Ptr & client)1315 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
1316 {
1317 Endpoint::Ptr endpoint = client->GetEndpoint();
1318
1319 if (endpoint->GetLogDuration() == 0) {
1320 ObjectLock olock2(endpoint);
1321 endpoint->SetSyncing(false);
1322 return;
1323 }
1324
1325 CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
1326
1327 int count = -1;
1328 double peer_ts = endpoint->GetLocalLogPosition();
1329 double logpos_ts = peer_ts;
1330 bool last_sync = false;
1331
1332 Endpoint::Ptr target_endpoint = client->GetEndpoint();
1333 ASSERT(target_endpoint);
1334
1335 Zone::Ptr target_zone = target_endpoint->GetZone();
1336
1337 if (!target_zone) {
1338 ObjectLock olock2(endpoint);
1339 endpoint->SetSyncing(false);
1340 return;
1341 }
1342
1343 for (;;) {
1344 std::unique_lock<std::mutex> lock(m_LogLock);
1345
1346 CloseLogFile();
1347
1348 if (count == -1 || count > 50000) {
1349 OpenLogFile();
1350 lock.unlock();
1351 } else {
1352 last_sync = true;
1353 }
1354
1355 count = 0;
1356
1357 std::vector<int> files;
1358 Utility::Glob(GetApiDir() + "log/*", [&files](const String& file) { LogGlobHandler(files, file); }, GlobFile);
1359 std::sort(files.begin(), files.end());
1360
1361 std::vector<std::pair<int, String>> allFiles;
1362
1363 for (int ts : files) {
1364 if (ts >= peer_ts) {
1365 allFiles.emplace_back(ts, GetApiDir() + "log/" + Convert::ToString(ts));
1366 }
1367 }
1368
1369 allFiles.emplace_back(Utility::GetTime() + 1, GetApiDir() + "log/current");
1370
1371 for (auto& file : allFiles) {
1372 Log(LogNotice, "ApiListener")
1373 << "Replaying log: " << file.second;
1374
1375 auto *fp = new std::fstream(file.second.CStr(), std::fstream::in | std::fstream::binary);
1376 StdioStream::Ptr logStream = new StdioStream(fp, true);
1377
1378 String message;
1379 StreamReadContext src;
1380 while (true) {
1381 Dictionary::Ptr pmessage;
1382
1383 try {
1384 StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
1385
1386 if (srs == StatusEof)
1387 break;
1388
1389 if (srs != StatusNewItem)
1390 continue;
1391
1392 pmessage = JsonDecode(message);
1393 } catch (const std::exception&) {
1394 Log(LogWarning, "ApiListener")
1395 << "Unexpected end-of-file for cluster log: " << file.second;
1396
1397 /* Log files may be incomplete or corrupted. This is perfectly OK. */
1398 break;
1399 }
1400
1401 if (pmessage->Get("timestamp") <= peer_ts)
1402 continue;
1403
1404 Dictionary::Ptr secname = pmessage->Get("secobj");
1405
1406 if (secname) {
1407 ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name"));
1408
1409 if (!secobj)
1410 continue;
1411
1412 if (!target_zone->CanAccessObject(secobj))
1413 continue;
1414 }
1415
1416 try {
1417 client->SendRawMessage(pmessage->Get("message"));
1418 count++;
1419 } catch (const std::exception& ex) {
1420 Log(LogWarning, "ApiListener")
1421 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
1422
1423 Log(LogDebug, "ApiListener")
1424 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
1425
1426 break;
1427 }
1428
1429 peer_ts = pmessage->Get("timestamp");
1430
1431 if (file.first > logpos_ts + 10) {
1432 logpos_ts = file.first;
1433
1434 Dictionary::Ptr lmessage = new Dictionary({
1435 { "jsonrpc", "2.0" },
1436 { "method", "log::SetLogPosition" },
1437 { "params", new Dictionary({
1438 { "log_position", logpos_ts }
1439 }) }
1440 });
1441
1442 client->SendMessage(lmessage);
1443 }
1444 }
1445
1446 logStream->Close();
1447 }
1448
1449 if (count > 0) {
1450 Log(LogInformation, "ApiListener")
1451 << "Replayed " << count << " messages.";
1452 }
1453 else {
1454 Log(LogNotice, "ApiListener")
1455 << "Replayed " << count << " messages.";
1456 }
1457
1458 if (last_sync) {
1459 {
1460 ObjectLock olock2(endpoint);
1461 endpoint->SetSyncing(false);
1462 }
1463
1464 OpenLogFile();
1465
1466 break;
1467 }
1468 }
1469 }
1470
StatsFunc(const Dictionary::Ptr & status,const Array::Ptr & perfdata)1471 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
1472 {
1473 std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
1474
1475 ApiListener::Ptr listener = ApiListener::GetInstance();
1476
1477 if (!listener)
1478 return;
1479
1480 stats = listener->GetStatus();
1481
1482 ObjectLock olock(stats.second);
1483 for (const Dictionary::Pair& kv : stats.second)
1484 perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second));
1485
1486 status->Set("api", stats.first);
1487 }
1488
GetStatus()1489 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
1490 {
1491 Dictionary::Ptr perfdata = new Dictionary();
1492
1493 /* cluster stats */
1494
1495 double allEndpoints = 0;
1496 Array::Ptr allNotConnectedEndpoints = new Array();
1497 Array::Ptr allConnectedEndpoints = new Array();
1498
1499 Zone::Ptr my_zone = Zone::GetLocalZone();
1500
1501 Dictionary::Ptr connectedZones = new Dictionary();
1502
1503 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1504 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
1505 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
1506 Log(LogDebug, "ApiListener")
1507 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
1508 continue;
1509 }
1510
1511 bool zoneConnected = false;
1512 int countZoneEndpoints = 0;
1513 double zoneLag = 0;
1514
1515 ArrayData zoneEndpoints;
1516
1517 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
1518 zoneEndpoints.emplace_back(endpoint->GetName());
1519
1520 if (endpoint->GetName() == GetIdentity())
1521 continue;
1522
1523 double eplag = CalculateZoneLag(endpoint);
1524
1525 if (eplag > 0 && eplag > zoneLag)
1526 zoneLag = eplag;
1527
1528 allEndpoints++;
1529 countZoneEndpoints++;
1530
1531 if (!endpoint->GetConnected()) {
1532 allNotConnectedEndpoints->Add(endpoint->GetName());
1533 } else {
1534 allConnectedEndpoints->Add(endpoint->GetName());
1535 zoneConnected = true;
1536 }
1537 }
1538
1539 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
1540 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
1541 zoneConnected = true;
1542
1543 String parentZoneName;
1544 Zone::Ptr parentZone = zone->GetParent();
1545 if (parentZone)
1546 parentZoneName = parentZone->GetName();
1547
1548 Dictionary::Ptr zoneStats = new Dictionary({
1549 { "connected", zoneConnected },
1550 { "client_log_lag", zoneLag },
1551 { "endpoints", new Array(std::move(zoneEndpoints)) },
1552 { "parent_zone", parentZoneName }
1553 });
1554
1555 connectedZones->Set(zone->GetName(), zoneStats);
1556 }
1557
1558 /* connection stats */
1559 size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
1560 size_t httpClients = GetHttpClients().size();
1561 size_t syncQueueItems = m_SyncQueue.GetLength();
1562 size_t relayQueueItems = m_RelayQueue.GetLength();
1563 double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
1564 double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
1565 double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
1566
1567 Dictionary::Ptr status = new Dictionary({
1568 { "identity", GetIdentity() },
1569 { "num_endpoints", allEndpoints },
1570 { "num_conn_endpoints", allConnectedEndpoints->GetLength() },
1571 { "num_not_conn_endpoints", allNotConnectedEndpoints->GetLength() },
1572 { "conn_endpoints", allConnectedEndpoints },
1573 { "not_conn_endpoints", allNotConnectedEndpoints },
1574
1575 { "zones", connectedZones },
1576
1577 { "json_rpc", new Dictionary({
1578 { "anonymous_clients", jsonRpcAnonymousClients },
1579 { "sync_queue_items", syncQueueItems },
1580 { "relay_queue_items", relayQueueItems },
1581 { "work_queue_item_rate", workQueueItemRate },
1582 { "sync_queue_item_rate", syncQueueItemRate },
1583 { "relay_queue_item_rate", relayQueueItemRate }
1584 }) },
1585
1586 { "http", new Dictionary({
1587 { "clients", httpClients }
1588 }) }
1589 });
1590
1591 /* performance data */
1592 perfdata->Set("num_endpoints", allEndpoints);
1593 perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
1594 perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
1595
1596 perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
1597 perfdata->Set("num_http_clients", httpClients);
1598 perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
1599 perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
1600
1601 perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
1602 perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
1603 perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
1604
1605 return std::make_pair(status, perfdata);
1606 }
1607
CalculateZoneLag(const Endpoint::Ptr & endpoint)1608 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
1609 {
1610 double remoteLogPosition = endpoint->GetRemoteLogPosition();
1611 double eplag = Utility::GetTime() - remoteLogPosition;
1612
1613 if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0)
1614 return eplag;
1615
1616 return 0;
1617 }
1618
AddAnonymousClient(const JsonRpcConnection::Ptr & aclient)1619 bool ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1620 {
1621 std::unique_lock<std::mutex> lock(m_AnonymousClientsLock);
1622
1623 if (GetMaxAnonymousClients() >= 0 && (long)m_AnonymousClients.size() + 1 > (long)GetMaxAnonymousClients())
1624 return false;
1625
1626 m_AnonymousClients.insert(aclient);
1627 return true;
1628 }
1629
RemoveAnonymousClient(const JsonRpcConnection::Ptr & aclient)1630 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1631 {
1632 std::unique_lock<std::mutex> lock(m_AnonymousClientsLock);
1633 m_AnonymousClients.erase(aclient);
1634 }
1635
GetAnonymousClients() const1636 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const
1637 {
1638 std::unique_lock<std::mutex> lock(m_AnonymousClientsLock);
1639 return m_AnonymousClients;
1640 }
1641
AddHttpClient(const HttpServerConnection::Ptr & aclient)1642 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
1643 {
1644 std::unique_lock<std::mutex> lock(m_HttpClientsLock);
1645 m_HttpClients.insert(aclient);
1646 }
1647
RemoveHttpClient(const HttpServerConnection::Ptr & aclient)1648 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
1649 {
1650 std::unique_lock<std::mutex> lock(m_HttpClientsLock);
1651 m_HttpClients.erase(aclient);
1652 }
1653
GetHttpClients() const1654 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients() const
1655 {
1656 std::unique_lock<std::mutex> lock(m_HttpClientsLock);
1657 return m_HttpClients;
1658 }
1659
LogAppVersion(unsigned long version,Log & log)1660 static void LogAppVersion(unsigned long version, Log& log)
1661 {
1662 log << version / 100u << "." << version % 100u << ".x";
1663 }
1664
HelloAPIHandler(const MessageOrigin::Ptr & origin,const Dictionary::Ptr & params)1665 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
1666 {
1667 if (origin) {
1668 auto client (origin->FromClient);
1669
1670 if (client) {
1671 auto endpoint (client->GetEndpoint());
1672
1673 if (endpoint) {
1674 unsigned long nodeVersion = params->Get("version");
1675
1676 endpoint->SetIcingaVersion(nodeVersion);
1677 endpoint->SetCapabilities((double)params->Get("capabilities"));
1678
1679 if (nodeVersion == 0u) {
1680 nodeVersion = 21200;
1681 }
1682
1683 if (endpoint->GetZone()->GetParent() == Zone::GetLocalZone()) {
1684 switch (l_AppVersionInt / 100 - nodeVersion / 100) {
1685 case 0:
1686 case 1:
1687 break;
1688 default:
1689 Log log (LogWarning, "ApiListener");
1690 log << "Unexpected Icinga version of endpoint '" << endpoint->GetName() << "': ";
1691
1692 LogAppVersion(nodeVersion / 100u, log);
1693 log << " Expected one of: ";
1694
1695 LogAppVersion(l_AppVersionInt / 100u, log);
1696 log << ", ";
1697
1698 LogAppVersion((l_AppVersionInt / 100u - 1u), log);
1699 }
1700 }
1701 }
1702 }
1703 }
1704
1705 return Empty;
1706 }
1707
GetLocalEndpoint() const1708 Endpoint::Ptr ApiListener::GetLocalEndpoint() const
1709 {
1710 return m_LocalEndpoint;
1711 }
1712
UpdateActivePackageStagesCache()1713 void ApiListener::UpdateActivePackageStagesCache()
1714 {
1715 std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock);
1716
1717 for (auto package : ConfigPackageUtility::GetPackages()) {
1718 String activeStage;
1719
1720 try {
1721 activeStage = ConfigPackageUtility::GetActiveStageFromFile(package);
1722 } catch (const std::exception& ex) {
1723 Log(LogCritical, "ApiListener")
1724 << ex.what();
1725 continue;
1726 }
1727
1728 Log(LogNotice, "ApiListener")
1729 << "Updating cache: Config package '" << package << "' has active stage '" << activeStage << "'.";
1730
1731 m_ActivePackageStages[package] = activeStage;
1732 }
1733 }
1734
CheckApiPackageIntegrity()1735 void ApiListener::CheckApiPackageIntegrity()
1736 {
1737 std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock);
1738
1739 for (auto package : ConfigPackageUtility::GetPackages()) {
1740 String activeStage;
1741 try {
1742 activeStage = ConfigPackageUtility::GetActiveStageFromFile(package);
1743 } catch (const std::exception& ex) {
1744 /* An error means that the stage is broken, try to repair it. */
1745 auto it = m_ActivePackageStages.find(package);
1746
1747 if (it == m_ActivePackageStages.end())
1748 continue;
1749
1750 String activeStageCached = it->second;
1751
1752 Log(LogInformation, "ApiListener")
1753 << "Repairing broken API config package '" << package
1754 << "', setting active stage '" << activeStageCached << "'.";
1755
1756 ConfigPackageUtility::SetActiveStageToFile(package, activeStageCached);
1757 }
1758 }
1759 }
1760
SetActivePackageStage(const String & package,const String & stage)1761 void ApiListener::SetActivePackageStage(const String& package, const String& stage)
1762 {
1763 std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock);
1764 m_ActivePackageStages[package] = stage;
1765 }
1766
GetActivePackageStage(const String & package)1767 String ApiListener::GetActivePackageStage(const String& package)
1768 {
1769 std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock);
1770
1771 if (m_ActivePackageStages.find(package) == m_ActivePackageStages.end())
1772 BOOST_THROW_EXCEPTION(ScriptError("Package " + package + " has no active stage."));
1773
1774 return m_ActivePackageStages[package];
1775 }
1776
RemoveActivePackageStage(const String & package)1777 void ApiListener::RemoveActivePackageStage(const String& package)
1778 {
1779 /* This is the rare occassion when a package has been deleted. */
1780 std::unique_lock<std::mutex> lock(m_ActivePackageStagesLock);
1781
1782 auto it = m_ActivePackageStages.find(package);
1783
1784 if (it == m_ActivePackageStages.end())
1785 return;
1786
1787 m_ActivePackageStages.erase(it);
1788 }
1789
ValidateTlsProtocolmin(const Lazy<String> & lvalue,const ValidationUtils & utils)1790 void ApiListener::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils)
1791 {
1792 ObjectImpl<ApiListener>::ValidateTlsProtocolmin(lvalue, utils);
1793
1794 try {
1795 ResolveTlsProtocolVersion(lvalue());
1796 } catch (const std::exception& ex) {
1797 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, ex.what()));
1798 }
1799 }
1800
ValidateTlsHandshakeTimeout(const Lazy<double> & lvalue,const ValidationUtils & utils)1801 void ApiListener::ValidateTlsHandshakeTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
1802 {
1803 ObjectImpl<ApiListener>::ValidateTlsHandshakeTimeout(lvalue, utils);
1804
1805 if (lvalue() <= 0)
1806 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_handshake_timeout" }, "Value must be greater than 0."));
1807 }
1808
IsHACluster()1809 bool ApiListener::IsHACluster()
1810 {
1811 Zone::Ptr zone = Zone::GetLocalZone();
1812
1813 if (!zone)
1814 return false;
1815
1816 return zone->IsSingleInstance();
1817 }
1818
1819 /* Provide a helper function for zone origin name. */
GetFromZoneName(const Zone::Ptr & fromZone)1820 String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
1821 {
1822 String fromZoneName;
1823
1824 if (fromZone) {
1825 fromZoneName = fromZone->GetName();
1826 } else {
1827 Zone::Ptr lzone = Zone::GetLocalZone();
1828
1829 if (lzone)
1830 fromZoneName = lzone->GetName();
1831 }
1832
1833 return fromZoneName;
1834 }
1835
UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint)1836 void ApiListener::UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint)
1837 {
1838 String path = Configuration::CacheDir + "/api-state.json";
1839
1840 Utility::SaveJsonFile(path, 0644, new Dictionary({
1841 {"host", String(localEndpoint.address().to_string())},
1842 {"port", localEndpoint.port()}
1843 }));
1844 }
1845
RemoveStatusFile()1846 void ApiListener::RemoveStatusFile()
1847 {
1848 String path = Configuration::CacheDir + "/api-state.json";
1849
1850 Utility::Remove(path);
1851 }
1852