1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "remote/jsonrpcconnection.hpp"
4 #include "remote/apilistener.hpp"
5 #include "remote/apifunction.hpp"
6 #include "remote/jsonrpc.hpp"
7 #include "base/defer.hpp"
8 #include "base/configtype.hpp"
9 #include "base/io-engine.hpp"
10 #include "base/json.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/utility.hpp"
13 #include "base/logger.hpp"
14 #include "base/exception.hpp"
15 #include "base/convert.hpp"
16 #include "base/tlsstream.hpp"
17 #include <memory>
18 #include <utility>
19 #include <boost/asio/io_context.hpp>
20 #include <boost/asio/spawn.hpp>
21 #include <boost/date_time/posix_time/posix_time_duration.hpp>
22 #include <boost/system/system_error.hpp>
23 #include <boost/thread/once.hpp>
24
25 using namespace icinga;
26
27 static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
28 REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
29
30 static RingBuffer l_TaskStats (15 * 60);
31
JsonRpcConnection(const String & identity,bool authenticated,const Shared<AsioTlsStream>::Ptr & stream,ConnectionRole role)32 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
33 const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role)
34 : JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
35 {
36 }
37
JsonRpcConnection(const String & identity,bool authenticated,const Shared<AsioTlsStream>::Ptr & stream,ConnectionRole role,boost::asio::io_context & io)38 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
39 const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io)
40 : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
41 m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(io),
42 m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
43 m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
44 {
45 if (authenticated)
46 m_Endpoint = Endpoint::GetByName(identity);
47 }
48
Start()49 void JsonRpcConnection::Start()
50 {
51 namespace asio = boost::asio;
52
53 JsonRpcConnection::Ptr keepAlive (this);
54
55 IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
56 IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
57 IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
58 IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
59 }
60
HandleIncomingMessages(boost::asio::yield_context yc)61 void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
62 {
63 m_Stream->next_layer().SetSeen(&m_Seen);
64
65 for (;;) {
66 String message;
67
68 try {
69 message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
70 } catch (const std::exception& ex) {
71 Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
72 << "Error while reading JSON-RPC message for identity '" << m_Identity
73 << "': " << DiagnosticInformation(ex);
74
75 break;
76 }
77
78 m_Seen = Utility::GetTime();
79
80 try {
81 CpuBoundWork handleMessage (yc);
82
83 MessageHandler(message);
84 } catch (const std::exception& ex) {
85 Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
86 << "Error while processing JSON-RPC message for identity '" << m_Identity
87 << "': " << DiagnosticInformation(ex);
88
89 break;
90 }
91
92 CpuBoundWork taskStats (yc);
93
94 l_TaskStats.InsertValue(Utility::GetTime(), 1);
95 }
96
97 Disconnect();
98 }
99
WriteOutgoingMessages(boost::asio::yield_context yc)100 void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
101 {
102 Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
103
104 do {
105 m_OutgoingMessagesQueued.Wait(yc);
106
107 auto queue (std::move(m_OutgoingMessagesQueue));
108
109 m_OutgoingMessagesQueue.clear();
110 m_OutgoingMessagesQueued.Clear();
111
112 if (!queue.empty()) {
113 try {
114 for (auto& message : queue) {
115 size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
116
117 if (m_Endpoint) {
118 m_Endpoint->AddMessageSent(bytesSent);
119 }
120 }
121
122 m_Stream->async_flush(yc);
123 } catch (const std::exception& ex) {
124 Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
125 << "Error while sending JSON-RPC message for identity '"
126 << m_Identity << "'\n" << DiagnosticInformation(ex);
127
128 break;
129 }
130 }
131 } while (!m_ShuttingDown);
132
133 Disconnect();
134 }
135
GetTimestamp() const136 double JsonRpcConnection::GetTimestamp() const
137 {
138 return m_Timestamp;
139 }
140
GetIdentity() const141 String JsonRpcConnection::GetIdentity() const
142 {
143 return m_Identity;
144 }
145
IsAuthenticated() const146 bool JsonRpcConnection::IsAuthenticated() const
147 {
148 return m_Authenticated;
149 }
150
GetEndpoint() const151 Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
152 {
153 return m_Endpoint;
154 }
155
GetStream() const156 Shared<AsioTlsStream>::Ptr JsonRpcConnection::GetStream() const
157 {
158 return m_Stream;
159 }
160
GetRole() const161 ConnectionRole JsonRpcConnection::GetRole() const
162 {
163 return m_Role;
164 }
165
SendMessage(const Dictionary::Ptr & message)166 void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
167 {
168 Ptr keepAlive (this);
169
170 m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
171 }
172
SendRawMessage(const String & message)173 void JsonRpcConnection::SendRawMessage(const String& message)
174 {
175 Ptr keepAlive (this);
176
177 m_IoStrand.post([this, keepAlive, message]() {
178 m_OutgoingMessagesQueue.emplace_back(message);
179 m_OutgoingMessagesQueued.Set();
180 });
181 }
182
SendMessageInternal(const Dictionary::Ptr & message)183 void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
184 {
185 m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
186 m_OutgoingMessagesQueued.Set();
187 }
188
Disconnect()189 void JsonRpcConnection::Disconnect()
190 {
191 namespace asio = boost::asio;
192
193 JsonRpcConnection::Ptr keepAlive (this);
194
195 IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
196 if (!m_ShuttingDown) {
197 m_ShuttingDown = true;
198
199 Log(LogWarning, "JsonRpcConnection")
200 << "API client disconnected for identity '" << m_Identity << "'";
201
202 {
203 CpuBoundWork removeClient (yc);
204
205 if (m_Endpoint) {
206 m_Endpoint->RemoveClient(this);
207 } else {
208 ApiListener::GetInstance()->RemoveAnonymousClient(this);
209 }
210 }
211
212 m_OutgoingMessagesQueued.Set();
213
214 m_WriterDone.Wait(yc);
215
216 /*
217 * Do not swallow exceptions in a coroutine.
218 * https://github.com/Icinga/icinga2/issues/7351
219 * We must not catch `detail::forced_unwind exception` as
220 * this is used for unwinding the stack.
221 *
222 * Just use the error_code dummy here.
223 */
224 boost::system::error_code ec;
225
226 m_CheckLivenessTimer.cancel();
227 m_HeartbeatTimer.cancel();
228
229 m_Stream->lowest_layer().cancel(ec);
230
231 Timeout::Ptr shutdownTimeout (new Timeout(
232 m_IoStrand.context(),
233 m_IoStrand,
234 boost::posix_time::seconds(10),
235 [this, keepAlive](asio::yield_context yc) {
236 boost::system::error_code ec;
237 m_Stream->lowest_layer().cancel(ec);
238 }
239 ));
240
241 m_Stream->next_layer().async_shutdown(yc[ec]);
242
243 shutdownTimeout->Cancel();
244
245 m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
246 }
247 });
248 }
249
MessageHandler(const String & jsonString)250 void JsonRpcConnection::MessageHandler(const String& jsonString)
251 {
252 Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
253
254 if (m_Endpoint && message->Contains("ts")) {
255 double ts = message->Get("ts");
256
257 /* ignore old messages */
258 if (ts < m_Endpoint->GetRemoteLogPosition())
259 return;
260
261 m_Endpoint->SetRemoteLogPosition(ts);
262 }
263
264 MessageOrigin::Ptr origin = new MessageOrigin();
265 origin->FromClient = this;
266
267 if (m_Endpoint) {
268 if (m_Endpoint->GetZone() != Zone::GetLocalZone())
269 origin->FromZone = m_Endpoint->GetZone();
270 else
271 origin->FromZone = Zone::GetByName(message->Get("originZone"));
272
273 m_Endpoint->AddMessageReceived(jsonString.GetLength());
274 }
275
276 Value vmethod;
277
278 if (!message->Get("method", &vmethod)) {
279 Value vid;
280
281 if (!message->Get("id", &vid))
282 return;
283
284 Log(LogWarning, "JsonRpcConnection",
285 "We received a JSON-RPC response message. This should never happen because we're only ever sending notifications.");
286
287 return;
288 }
289
290 String method = vmethod;
291
292 Log(LogNotice, "JsonRpcConnection")
293 << "Received '" << method << "' message from identity '" << m_Identity << "'.";
294
295 Dictionary::Ptr resultMessage = new Dictionary();
296
297 try {
298 ApiFunction::Ptr afunc = ApiFunction::GetByName(method);
299
300 if (!afunc) {
301 Log(LogNotice, "JsonRpcConnection")
302 << "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
303 } else {
304 Dictionary::Ptr params = message->Get("params");
305 if (params)
306 resultMessage->Set("result", afunc->Invoke(origin, params));
307 else
308 resultMessage->Set("result", Empty);
309 }
310 } catch (const std::exception& ex) {
311 /* TODO: Add a user readable error message for the remote caller */
312 String diagInfo = DiagnosticInformation(ex);
313 resultMessage->Set("error", diagInfo);
314 Log(LogWarning, "JsonRpcConnection")
315 << "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo;
316 }
317
318 if (message->Contains("id")) {
319 resultMessage->Set("jsonrpc", "2.0");
320 resultMessage->Set("id", message->Get("id"));
321
322 SendMessageInternal(resultMessage);
323 }
324 }
325
SetLogPositionHandler(const MessageOrigin::Ptr & origin,const Dictionary::Ptr & params)326 Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
327 {
328 double log_position = params->Get("log_position");
329 Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint();
330
331 if (!endpoint)
332 return Empty;
333
334 if (log_position > endpoint->GetLocalLogPosition())
335 endpoint->SetLocalLogPosition(log_position);
336
337 return Empty;
338 }
339
CheckLiveness(boost::asio::yield_context yc)340 void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
341 {
342 boost::system::error_code ec;
343
344 if (!m_Authenticated) {
345 /* Anonymous connections are normally only used for requesting a certificate and are closed after this request
346 * is received. However, the request is only sent if the child has successfully verified the certificate of its
347 * parent so that it is an authenticated connection from its perspective. In case this verification fails, both
348 * ends view it as an anonymous connection and never actually use it but attempt a reconnect after 10 seconds
349 * leaking the connection. Therefore close it after a timeout.
350 */
351
352 m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10));
353 m_CheckLivenessTimer.async_wait(yc[ec]);
354
355 if (m_ShuttingDown) {
356 return;
357 }
358
359 auto remote (m_Stream->lowest_layer().remote_endpoint());
360
361 Log(LogInformation, "JsonRpcConnection")
362 << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after 10 seconds.";
363
364 Disconnect();
365 } else {
366 for (;;) {
367 m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30));
368 m_CheckLivenessTimer.async_wait(yc[ec]);
369
370 if (m_ShuttingDown) {
371 break;
372 }
373
374 if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
375 Log(LogInformation, "JsonRpcConnection")
376 << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
377
378 Disconnect();
379 break;
380 }
381 }
382 }
383 }
384
GetWorkQueueRate()385 double JsonRpcConnection::GetWorkQueueRate()
386 {
387 return l_TaskStats.UpdateAndGetValues(Utility::GetTime(), 60) / 60.0;
388 }
389