1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #include "remote/jsonrpcconnection.hpp"
4 #include "remote/apilistener.hpp"
5 #include "remote/apifunction.hpp"
6 #include "remote/jsonrpc.hpp"
7 #include "base/defer.hpp"
8 #include "base/configtype.hpp"
9 #include "base/io-engine.hpp"
10 #include "base/json.hpp"
11 #include "base/objectlock.hpp"
12 #include "base/utility.hpp"
13 #include "base/logger.hpp"
14 #include "base/exception.hpp"
15 #include "base/convert.hpp"
16 #include "base/tlsstream.hpp"
17 #include <memory>
18 #include <utility>
19 #include <boost/asio/io_context.hpp>
20 #include <boost/asio/spawn.hpp>
21 #include <boost/date_time/posix_time/posix_time_duration.hpp>
22 #include <boost/system/system_error.hpp>
23 #include <boost/thread/once.hpp>
24 
25 using namespace icinga;
26 
27 static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
28 REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
29 
30 static RingBuffer l_TaskStats (15 * 60);
31 
JsonRpcConnection(const String & identity,bool authenticated,const Shared<AsioTlsStream>::Ptr & stream,ConnectionRole role)32 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
33 	const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role)
34 	: JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
35 {
36 }
37 
JsonRpcConnection(const String & identity,bool authenticated,const Shared<AsioTlsStream>::Ptr & stream,ConnectionRole role,boost::asio::io_context & io)38 JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
39 	const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io)
40 	: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
41 	m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(io),
42 	m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
43 	m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
44 {
45 	if (authenticated)
46 		m_Endpoint = Endpoint::GetByName(identity);
47 }
48 
Start()49 void JsonRpcConnection::Start()
50 {
51 	namespace asio = boost::asio;
52 
53 	JsonRpcConnection::Ptr keepAlive (this);
54 
55 	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
56 	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
57 	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
58 	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
59 }
60 
HandleIncomingMessages(boost::asio::yield_context yc)61 void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
62 {
63 	m_Stream->next_layer().SetSeen(&m_Seen);
64 
65 	for (;;) {
66 		String message;
67 
68 		try {
69 			message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
70 		} catch (const std::exception& ex) {
71 			Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
72 				<< "Error while reading JSON-RPC message for identity '" << m_Identity
73 				<< "': " << DiagnosticInformation(ex);
74 
75 			break;
76 		}
77 
78 		m_Seen = Utility::GetTime();
79 
80 		try {
81 			CpuBoundWork handleMessage (yc);
82 
83 			MessageHandler(message);
84 		} catch (const std::exception& ex) {
85 			Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
86 				<< "Error while processing JSON-RPC message for identity '" << m_Identity
87 				<< "': " << DiagnosticInformation(ex);
88 
89 			break;
90 		}
91 
92 		CpuBoundWork taskStats (yc);
93 
94 		l_TaskStats.InsertValue(Utility::GetTime(), 1);
95 	}
96 
97 	Disconnect();
98 }
99 
WriteOutgoingMessages(boost::asio::yield_context yc)100 void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
101 {
102 	Defer signalWriterDone ([this]() { m_WriterDone.Set(); });
103 
104 	do {
105 		m_OutgoingMessagesQueued.Wait(yc);
106 
107 		auto queue (std::move(m_OutgoingMessagesQueue));
108 
109 		m_OutgoingMessagesQueue.clear();
110 		m_OutgoingMessagesQueued.Clear();
111 
112 		if (!queue.empty()) {
113 			try {
114 				for (auto& message : queue) {
115 					size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
116 
117 					if (m_Endpoint) {
118 						m_Endpoint->AddMessageSent(bytesSent);
119 					}
120 				}
121 
122 				m_Stream->async_flush(yc);
123 			} catch (const std::exception& ex) {
124 				Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection")
125 					<< "Error while sending JSON-RPC message for identity '"
126 					<< m_Identity << "'\n" << DiagnosticInformation(ex);
127 
128 				break;
129 			}
130 		}
131 	} while (!m_ShuttingDown);
132 
133 	Disconnect();
134 }
135 
GetTimestamp() const136 double JsonRpcConnection::GetTimestamp() const
137 {
138 	return m_Timestamp;
139 }
140 
GetIdentity() const141 String JsonRpcConnection::GetIdentity() const
142 {
143 	return m_Identity;
144 }
145 
IsAuthenticated() const146 bool JsonRpcConnection::IsAuthenticated() const
147 {
148 	return m_Authenticated;
149 }
150 
GetEndpoint() const151 Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
152 {
153 	return m_Endpoint;
154 }
155 
GetStream() const156 Shared<AsioTlsStream>::Ptr JsonRpcConnection::GetStream() const
157 {
158 	return m_Stream;
159 }
160 
GetRole() const161 ConnectionRole JsonRpcConnection::GetRole() const
162 {
163 	return m_Role;
164 }
165 
SendMessage(const Dictionary::Ptr & message)166 void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
167 {
168 	Ptr keepAlive (this);
169 
170 	m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
171 }
172 
SendRawMessage(const String & message)173 void JsonRpcConnection::SendRawMessage(const String& message)
174 {
175 	Ptr keepAlive (this);
176 
177 	m_IoStrand.post([this, keepAlive, message]() {
178 		m_OutgoingMessagesQueue.emplace_back(message);
179 		m_OutgoingMessagesQueued.Set();
180 	});
181 }
182 
SendMessageInternal(const Dictionary::Ptr & message)183 void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
184 {
185 	m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
186 	m_OutgoingMessagesQueued.Set();
187 }
188 
Disconnect()189 void JsonRpcConnection::Disconnect()
190 {
191 	namespace asio = boost::asio;
192 
193 	JsonRpcConnection::Ptr keepAlive (this);
194 
195 	IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
196 		if (!m_ShuttingDown) {
197 			m_ShuttingDown = true;
198 
199 			Log(LogWarning, "JsonRpcConnection")
200 				<< "API client disconnected for identity '" << m_Identity << "'";
201 
202 			{
203 				CpuBoundWork removeClient (yc);
204 
205 				if (m_Endpoint) {
206 					m_Endpoint->RemoveClient(this);
207 				} else {
208 					ApiListener::GetInstance()->RemoveAnonymousClient(this);
209 				}
210 			}
211 
212 			m_OutgoingMessagesQueued.Set();
213 
214 			m_WriterDone.Wait(yc);
215 
216 			/*
217 			 * Do not swallow exceptions in a coroutine.
218 			 * https://github.com/Icinga/icinga2/issues/7351
219 			 * We must not catch `detail::forced_unwind exception` as
220 			 * this is used for unwinding the stack.
221 			 *
222 			 * Just use the error_code dummy here.
223 			 */
224 			boost::system::error_code ec;
225 
226 			m_CheckLivenessTimer.cancel();
227 			m_HeartbeatTimer.cancel();
228 
229 			m_Stream->lowest_layer().cancel(ec);
230 
231 			Timeout::Ptr shutdownTimeout (new Timeout(
232 				m_IoStrand.context(),
233 				m_IoStrand,
234 				boost::posix_time::seconds(10),
235 				[this, keepAlive](asio::yield_context yc) {
236 					boost::system::error_code ec;
237 					m_Stream->lowest_layer().cancel(ec);
238 				}
239 			));
240 
241 			m_Stream->next_layer().async_shutdown(yc[ec]);
242 
243 			shutdownTimeout->Cancel();
244 
245 			m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
246 		}
247 	});
248 }
249 
MessageHandler(const String & jsonString)250 void JsonRpcConnection::MessageHandler(const String& jsonString)
251 {
252 	Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
253 
254 	if (m_Endpoint && message->Contains("ts")) {
255 		double ts = message->Get("ts");
256 
257 		/* ignore old messages */
258 		if (ts < m_Endpoint->GetRemoteLogPosition())
259 			return;
260 
261 		m_Endpoint->SetRemoteLogPosition(ts);
262 	}
263 
264 	MessageOrigin::Ptr origin = new MessageOrigin();
265 	origin->FromClient = this;
266 
267 	if (m_Endpoint) {
268 		if (m_Endpoint->GetZone() != Zone::GetLocalZone())
269 			origin->FromZone = m_Endpoint->GetZone();
270 		else
271 			origin->FromZone = Zone::GetByName(message->Get("originZone"));
272 
273 		m_Endpoint->AddMessageReceived(jsonString.GetLength());
274 	}
275 
276 	Value vmethod;
277 
278 	if (!message->Get("method", &vmethod)) {
279 		Value vid;
280 
281 		if (!message->Get("id", &vid))
282 			return;
283 
284 		Log(LogWarning, "JsonRpcConnection",
285 			"We received a JSON-RPC response message. This should never happen because we're only ever sending notifications.");
286 
287 		return;
288 	}
289 
290 	String method = vmethod;
291 
292 	Log(LogNotice, "JsonRpcConnection")
293 		<< "Received '" << method << "' message from identity '" << m_Identity << "'.";
294 
295 	Dictionary::Ptr resultMessage = new Dictionary();
296 
297 	try {
298 		ApiFunction::Ptr afunc = ApiFunction::GetByName(method);
299 
300 		if (!afunc) {
301 			Log(LogNotice, "JsonRpcConnection")
302 				<< "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
303 		} else {
304 			Dictionary::Ptr params = message->Get("params");
305 			if (params)
306 				resultMessage->Set("result", afunc->Invoke(origin, params));
307 			else
308 				resultMessage->Set("result", Empty);
309 		}
310 	} catch (const std::exception& ex) {
311 		/* TODO: Add a user readable error message for the remote caller */
312 		String diagInfo = DiagnosticInformation(ex);
313 		resultMessage->Set("error", diagInfo);
314 		Log(LogWarning, "JsonRpcConnection")
315 			<< "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo;
316 	}
317 
318 	if (message->Contains("id")) {
319 		resultMessage->Set("jsonrpc", "2.0");
320 		resultMessage->Set("id", message->Get("id"));
321 
322 		SendMessageInternal(resultMessage);
323 	}
324 }
325 
SetLogPositionHandler(const MessageOrigin::Ptr & origin,const Dictionary::Ptr & params)326 Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
327 {
328 	double log_position = params->Get("log_position");
329 	Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint();
330 
331 	if (!endpoint)
332 		return Empty;
333 
334 	if (log_position > endpoint->GetLocalLogPosition())
335 		endpoint->SetLocalLogPosition(log_position);
336 
337 	return Empty;
338 }
339 
CheckLiveness(boost::asio::yield_context yc)340 void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
341 {
342 	boost::system::error_code ec;
343 
344 	if (!m_Authenticated) {
345 		/* Anonymous connections are normally only used for requesting a certificate and are closed after this request
346 		 * is received. However, the request is only sent if the child has successfully verified the certificate of its
347 		 * parent so that it is an authenticated connection from its perspective. In case this verification fails, both
348 		 * ends view it as an anonymous connection and never actually use it but attempt a reconnect after 10 seconds
349 		 * leaking the connection. Therefore close it after a timeout.
350 		 */
351 
352 		m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10));
353 		m_CheckLivenessTimer.async_wait(yc[ec]);
354 
355 		if (m_ShuttingDown) {
356 			return;
357 		}
358 
359 		auto remote (m_Stream->lowest_layer().remote_endpoint());
360 
361 		Log(LogInformation, "JsonRpcConnection")
362 			<< "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after 10 seconds.";
363 
364 		Disconnect();
365 	} else {
366 		for (;;) {
367 			m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30));
368 			m_CheckLivenessTimer.async_wait(yc[ec]);
369 
370 			if (m_ShuttingDown) {
371 				break;
372 			}
373 
374 			if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
375 				Log(LogInformation, "JsonRpcConnection")
376 					<<  "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
377 
378 				Disconnect();
379 				break;
380 			}
381 		}
382 	}
383 }
384 
GetWorkQueueRate()385 double JsonRpcConnection::GetWorkQueueRate()
386 {
387 	return l_TaskStats.UpdateAndGetValues(Utility::GetTime(), 60) / 60.0;
388 }
389