1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2 
3 #include "icingadb/redisconnection.hpp"
4 #include "base/array.hpp"
5 #include "base/convert.hpp"
6 #include "base/defer.hpp"
7 #include "base/exception.hpp"
8 #include "base/io-engine.hpp"
9 #include "base/logger.hpp"
10 #include "base/objectlock.hpp"
11 #include "base/string.hpp"
12 #include "base/tcpsocket.hpp"
13 #include "base/tlsutility.hpp"
14 #include "base/utility.hpp"
15 #include <boost/asio.hpp>
16 #include <boost/coroutine/exceptions.hpp>
17 #include <boost/date_time/posix_time/posix_time_duration.hpp>
18 #include <boost/utility/string_view.hpp>
19 #include <boost/variant/get.hpp>
20 #include <exception>
21 #include <future>
22 #include <iterator>
23 #include <memory>
24 #include <openssl/ssl.h>
25 #include <openssl/x509_vfy.h>
26 #include <utility>
27 
28 using namespace icinga;
29 namespace asio = boost::asio;
30 
31 boost::regex RedisConnection::m_ErrAuth ("\\AERR AUTH ");
32 
RedisConnection(const String & host,int port,const String & path,const String & password,int db,bool useTls,bool insecure,const String & certPath,const String & keyPath,const String & caPath,const String & crlPath,const String & tlsProtocolmin,const String & cipherList,double connectTimeout,DebugInfo di,const RedisConnection::Ptr & parent)33 RedisConnection::RedisConnection(const String& host, int port, const String& path, const String& password, int db,
34 	bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath,
35 	const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
36 	: RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db,
37 	  useTls, insecure, certPath, keyPath, caPath, crlPath, tlsProtocolmin, cipherList, connectTimeout, std::move(di), parent)
38 {
39 }
40 
RedisConnection(boost::asio::io_context & io,String host,int port,String path,String password,int db,bool useTls,bool insecure,String certPath,String keyPath,String caPath,String crlPath,String tlsProtocolmin,String cipherList,double connectTimeout,DebugInfo di,const RedisConnection::Ptr & parent)41 RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password,
42 	int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath,
43 	String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
44 	: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)),
45 	  m_DbIndex(db), m_CertPath(std::move(certPath)), m_KeyPath(std::move(keyPath)), m_Insecure(insecure),
46 	  m_CaPath(std::move(caPath)), m_CrlPath(std::move(crlPath)), m_TlsProtocolmin(std::move(tlsProtocolmin)),
47 	  m_CipherList(std::move(cipherList)), m_ConnectTimeout(connectTimeout), m_DebugInfo(std::move(di)), m_Connecting(false), m_Connected(false),
48 	  m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent)
49 {
50 	if (useTls && m_Path.IsEmpty()) {
51 		UpdateTLSContext();
52 	}
53 }
54 
UpdateTLSContext()55 void RedisConnection::UpdateTLSContext()
56 {
57 	m_TLSContext = SetupSslContext(m_CertPath, m_KeyPath, m_CaPath,
58 		m_CrlPath, m_CipherList, m_TlsProtocolmin, m_DebugInfo);
59 }
60 
Start()61 void RedisConnection::Start()
62 {
63 	if (!m_Started.exchange(true)) {
64 		Ptr keepAlive (this);
65 
66 		IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
67 		IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
68 
69 		if (!m_Parent) {
70 			IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); });
71 		}
72 	}
73 
74 	if (!m_Connecting.exchange(true)) {
75 		Ptr keepAlive (this);
76 
77 		IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
78 	}
79 }
80 
IsConnected()81 bool RedisConnection::IsConnected() {
82 	return m_Connected.load();
83 }
84 
85 /**
86  * Append a Redis query to a log message
87  *
88  * @param query Redis query
89  * @param msg Log message
90  */
91 static inline
LogQuery(RedisConnection::Query & query,Log & msg)92 void LogQuery(RedisConnection::Query& query, Log& msg)
93 {
94 	int i = 0;
95 
96 	for (auto& arg : query) {
97 		if (++i == 8) {
98 			msg << " ...";
99 			break;
100 		}
101 
102 		if (arg.GetLength() > 64) {
103 			msg << " '" << arg.SubStr(0, 61) << "...'";
104 		} else {
105 			msg << " '" << arg << '\'';
106 		}
107 	}
108 }
109 
110 /**
111  * Queue a Redis query for sending
112  *
113  * @param query Redis query
114  * @param priority The query's priority
115  */
FireAndForgetQuery(RedisConnection::Query query,RedisConnection::QueryPriority priority)116 void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
117 {
118 	{
119 		Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
120 		LogQuery(query, msg);
121 	}
122 
123 	auto item (Shared<Query>::Make(std::move(query)));
124 
125 	asio::post(m_Strand, [this, item, priority]() {
126 		m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
127 		m_QueuedWrites.Set();
128 		IncreasePendingQueries(1);
129 	});
130 }
131 
132 /**
133  * Queue Redis queries for sending
134  *
135  * @param queries Redis queries
136  * @param priority The queries' priority
137  */
FireAndForgetQueries(RedisConnection::Queries queries,RedisConnection::QueryPriority priority)138 void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
139 {
140 	for (auto& query : queries) {
141 		Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
142 		LogQuery(query, msg);
143 	}
144 
145 	auto item (Shared<Queries>::Make(std::move(queries)));
146 
147 	asio::post(m_Strand, [this, item, priority]() {
148 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
149 		m_QueuedWrites.Set();
150 		IncreasePendingQueries(item->size());
151 	});
152 }
153 
154 /**
155  * Queue a Redis query for sending, wait for the response and return (or throw) it
156  *
157  * @param query Redis query
158  * @param priority The query's priority
159  *
160  * @return The response
161  */
GetResultOfQuery(RedisConnection::Query query,RedisConnection::QueryPriority priority)162 RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
163 {
164 	{
165 		Log msg (LogDebug, "IcingaDB", "Executing query:");
166 		LogQuery(query, msg);
167 	}
168 
169 	std::promise<Reply> promise;
170 	auto future (promise.get_future());
171 	auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
172 
173 	asio::post(m_Strand, [this, item, priority]() {
174 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
175 		m_QueuedWrites.Set();
176 		IncreasePendingQueries(1);
177 	});
178 
179 	item = nullptr;
180 	future.wait();
181 	return future.get();
182 }
183 
184 /**
185  * Queue Redis queries for sending, wait for the responses and return (or throw) them
186  *
187  * @param queries Redis queries
188  * @param priority The queries' priority
189  *
190  * @return The responses
191  */
GetResultsOfQueries(RedisConnection::Queries queries,RedisConnection::QueryPriority priority)192 RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
193 {
194 	for (auto& query : queries) {
195 		Log msg (LogDebug, "IcingaDB", "Executing query:");
196 		LogQuery(query, msg);
197 	}
198 
199 	std::promise<Replies> promise;
200 	auto future (promise.get_future());
201 	auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
202 
203 	asio::post(m_Strand, [this, item, priority]() {
204 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
205 		m_QueuedWrites.Set();
206 		IncreasePendingQueries(item->first.size());
207 	});
208 
209 	item = nullptr;
210 	future.wait();
211 	return future.get();
212 }
213 
EnqueueCallback(const std::function<void (boost::asio::yield_context &)> & callback,RedisConnection::QueryPriority priority)214 void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, RedisConnection::QueryPriority priority)
215 {
216 	asio::post(m_Strand, [this, callback, priority]() {
217 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback});
218 		m_QueuedWrites.Set();
219 	});
220 }
221 
222 /**
223  * Puts a no-op command with a result at the end of the queue and wait for the result,
224  * i.e. for everything enqueued to be processed by the server.
225  *
226  * @ingroup icingadb
227  */
Sync()228 void RedisConnection::Sync()
229 {
230 	GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection);
231 }
232 
233 /**
234  * Mark kind as kind of queries not to actually send yet
235  *
236  * @param kind Query kind
237  */
SuppressQueryKind(RedisConnection::QueryPriority kind)238 void RedisConnection::SuppressQueryKind(RedisConnection::QueryPriority kind)
239 {
240 	asio::post(m_Strand, [this, kind]() { m_SuppressedQueryKinds.emplace(kind); });
241 }
242 
243 /**
244  * Unmark kind as kind of queries not to actually send yet
245  *
246  * @param kind Query kind
247  */
UnsuppressQueryKind(RedisConnection::QueryPriority kind)248 void RedisConnection::UnsuppressQueryKind(RedisConnection::QueryPriority kind)
249 {
250 	asio::post(m_Strand, [this, kind]() {
251 		m_SuppressedQueryKinds.erase(kind);
252 		m_QueuedWrites.Set();
253 	});
254 }
255 
256 /**
257  * Try to connect to Redis
258  */
Connect(asio::yield_context & yc)259 void RedisConnection::Connect(asio::yield_context& yc)
260 {
261 	Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
262 
263 	boost::asio::deadline_timer timer (m_Strand.context());
264 
265 	for (;;) {
266 		try {
267 			if (m_Path.IsEmpty()) {
268 				if (m_TLSContext) {
269 					Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
270 						<< "Trying to connect to Redis server (async, TLS) on host '" << m_Host << ":" << m_Port << "'";
271 
272 					auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
273 					auto& tlsConn (conn->next_layer());
274 					auto connectTimeout (MakeTimeout(conn));
275 					Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
276 
277 					icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
278 					tlsConn.async_handshake(tlsConn.client, yc);
279 
280 					if (!m_Insecure) {
281 						std::shared_ptr<X509> cert (tlsConn.GetPeerCertificate());
282 
283 						if (!cert) {
284 							BOOST_THROW_EXCEPTION(std::runtime_error(
285 								"Redis didn't present any TLS certificate."
286 							));
287 						}
288 
289 						if (!tlsConn.IsVerifyOK()) {
290 							BOOST_THROW_EXCEPTION(std::runtime_error(
291 								"TLS certificate validation failed: " + std::string(tlsConn.GetVerifyError())
292 							));
293 						}
294 					}
295 
296 					Handshake(conn, yc);
297 					m_TlsConn = std::move(conn);
298 				} else {
299 					Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
300 						<< "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'";
301 
302 					auto conn (Shared<TcpConn>::Make(m_Strand.context()));
303 					auto connectTimeout (MakeTimeout(conn));
304 					Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
305 
306 					icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
307 					Handshake(conn, yc);
308 					m_TcpConn = std::move(conn);
309 				}
310 			} else {
311 				Log(LogInformation, "IcingaDB")
312 					<< "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'";
313 
314 				auto conn (Shared<UnixConn>::Make(m_Strand.context()));
315 				auto connectTimeout (MakeTimeout(conn));
316 				Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
317 
318 				conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
319 				Handshake(conn, yc);
320 				m_UnixConn = std::move(conn);
321 			}
322 
323 			m_Connected.store(true);
324 
325 			Log(m_Parent ? LogNotice : LogInformation, "IcingaDB", "Connected to Redis server");
326 
327 			// Operate on a copy so that the callback can set a new callback without destroying itself while running.
328 			auto callback (m_ConnectedCallback);
329 			if (callback) {
330 				callback(yc);
331 			}
332 
333 			break;
334 		} catch (const boost::coroutines::detail::forced_unwind&) {
335 			throw;
336 		} catch (const std::exception& ex) {
337 			Log(LogCritical, "IcingaDB")
338 				<< "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
339 		}
340 
341 		timer.expires_from_now(boost::posix_time::seconds(5));
342 		timer.async_wait(yc);
343 	}
344 
345 }
346 
347 /**
348  * Actually receive the responses to the Redis queries send by WriteItem() and handle them
349  */
ReadLoop(asio::yield_context & yc)350 void RedisConnection::ReadLoop(asio::yield_context& yc)
351 {
352 	for (;;) {
353 		m_QueuedReads.Wait(yc);
354 
355 		while (!m_Queues.FutureResponseActions.empty()) {
356 			auto item (std::move(m_Queues.FutureResponseActions.front()));
357 			m_Queues.FutureResponseActions.pop();
358 
359 			switch (item.Action) {
360 				case ResponseAction::Ignore:
361 					try {
362 						for (auto i (item.Amount); i; --i) {
363 							ReadOne(yc);
364 						}
365 					} catch (const boost::coroutines::detail::forced_unwind&) {
366 						throw;
367 					} catch (const std::exception& ex) {
368 						Log(LogCritical, "IcingaDB")
369 							<< "Error during receiving the response to a query which has been fired and forgotten: " << ex.what();
370 
371 						continue;
372 					} catch (...) {
373 						Log(LogCritical, "IcingaDB")
374 							<< "Error during receiving the response to a query which has been fired and forgotten";
375 
376 						continue;
377 					}
378 
379 					break;
380 				case ResponseAction::Deliver:
381 					for (auto i (item.Amount); i; --i) {
382 						auto promise (std::move(m_Queues.ReplyPromises.front()));
383 						m_Queues.ReplyPromises.pop();
384 
385 						Reply reply;
386 
387 						try {
388 							reply = ReadOne(yc);
389 						} catch (const boost::coroutines::detail::forced_unwind&) {
390 							throw;
391 						} catch (...) {
392 							promise.set_exception(std::current_exception());
393 
394 							continue;
395 						}
396 
397 						promise.set_value(std::move(reply));
398 					}
399 
400 					break;
401 				case ResponseAction::DeliverBulk:
402 					{
403 						auto promise (std::move(m_Queues.RepliesPromises.front()));
404 						m_Queues.RepliesPromises.pop();
405 
406 						Replies replies;
407 						replies.reserve(item.Amount);
408 
409 						for (auto i (item.Amount); i; --i) {
410 							try {
411 								replies.emplace_back(ReadOne(yc));
412 							} catch (const boost::coroutines::detail::forced_unwind&) {
413 								throw;
414 							} catch (...) {
415 								promise.set_exception(std::current_exception());
416 
417 								continue;
418 							}
419 						}
420 
421 						promise.set_value(std::move(replies));
422 					}
423 			}
424 		}
425 
426 		m_QueuedReads.Clear();
427 	}
428 }
429 
430 /**
431  * Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}()
432  */
WriteLoop(asio::yield_context & yc)433 void RedisConnection::WriteLoop(asio::yield_context& yc)
434 {
435 	for (;;) {
436 		m_QueuedWrites.Wait(yc);
437 
438 	WriteFirstOfHighestPrio:
439 		for (auto& queue : m_Queues.Writes) {
440 			if (m_SuppressedQueryKinds.find(queue.first) != m_SuppressedQueryKinds.end() || queue.second.empty()) {
441 				continue;
442 			}
443 
444 			auto next (std::move(queue.second.front()));
445 			queue.second.pop();
446 
447 			WriteItem(yc, std::move(next));
448 
449 			goto WriteFirstOfHighestPrio;
450 		}
451 
452 		m_QueuedWrites.Clear();
453 	}
454 }
455 
456 /**
457  * Periodically log current query performance
458  */
LogStats(asio::yield_context & yc)459 void RedisConnection::LogStats(asio::yield_context& yc)
460 {
461 	double lastMessage = 0;
462 
463 	m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
464 
465 	for (;;) {
466 		m_LogStatsTimer.async_wait(yc);
467 		m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
468 
469 		if (!IsConnected())
470 			continue;
471 
472 		auto now (Utility::GetTime());
473 		bool timeoutReached = now - lastMessage >= 5 * 60;
474 
475 		if (m_PendingQueries < 1 && !timeoutReached)
476 			continue;
477 
478 		auto output (round(m_OutputQueries.CalculateRate(now, 10)));
479 
480 		if (m_PendingQueries < output * 5 && !timeoutReached)
481 			continue;
482 
483 		Log(LogInformation, "IcingaDB")
484 			<< "Pending queries: " << m_PendingQueries << " (Input: "
485 			<< round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)";
486 
487 		lastMessage = now;
488 	}
489 }
490 
491 /**
492  * Send next and schedule receiving the response
493  *
494  * @param next Redis queries
495  */
WriteItem(boost::asio::yield_context & yc,RedisConnection::WriteQueueItem next)496 void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
497 {
498 	if (next.FireAndForgetQuery) {
499 		auto& item (*next.FireAndForgetQuery);
500 		DecreasePendingQueries(1);
501 
502 		try {
503 			WriteOne(item, yc);
504 		} catch (const boost::coroutines::detail::forced_unwind&) {
505 			throw;
506 		} catch (const std::exception& ex) {
507 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
508 			LogQuery(item, msg);
509 			msg << " which has been fired and forgotten: " << ex.what();
510 
511 			return;
512 		} catch (...) {
513 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
514 			LogQuery(item, msg);
515 			msg << " which has been fired and forgotten";
516 
517 			return;
518 		}
519 
520 		if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
521 			m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
522 		} else {
523 			++m_Queues.FutureResponseActions.back().Amount;
524 		}
525 
526 		m_QueuedReads.Set();
527 	}
528 
529 	if (next.FireAndForgetQueries) {
530 		auto& item (*next.FireAndForgetQueries);
531 		size_t i = 0;
532 
533 		DecreasePendingQueries(item.size());
534 
535 		try {
536 			for (auto& query : item) {
537 				WriteOne(query, yc);
538 				++i;
539 			}
540 		} catch (const boost::coroutines::detail::forced_unwind&) {
541 			throw;
542 		} catch (const std::exception& ex) {
543 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
544 			LogQuery(item[i], msg);
545 			msg << " which has been fired and forgotten: " << ex.what();
546 
547 			return;
548 		} catch (...) {
549 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
550 			LogQuery(item[i], msg);
551 			msg << " which has been fired and forgotten";
552 
553 			return;
554 		}
555 
556 		if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
557 			m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
558 		} else {
559 			m_Queues.FutureResponseActions.back().Amount += item.size();
560 		}
561 
562 		m_QueuedReads.Set();
563 	}
564 
565 	if (next.GetResultOfQuery) {
566 		auto& item (*next.GetResultOfQuery);
567 		DecreasePendingQueries(1);
568 
569 		try {
570 			WriteOne(item.first, yc);
571 		} catch (const boost::coroutines::detail::forced_unwind&) {
572 			throw;
573 		} catch (...) {
574 			item.second.set_exception(std::current_exception());
575 
576 			return;
577 		}
578 
579 		m_Queues.ReplyPromises.emplace(std::move(item.second));
580 
581 		if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
582 			m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
583 		} else {
584 			++m_Queues.FutureResponseActions.back().Amount;
585 		}
586 
587 		m_QueuedReads.Set();
588 	}
589 
590 	if (next.GetResultsOfQueries) {
591 		auto& item (*next.GetResultsOfQueries);
592 		DecreasePendingQueries(item.first.size());
593 
594 		try {
595 			for (auto& query : item.first) {
596 				WriteOne(query, yc);
597 			}
598 		} catch (const boost::coroutines::detail::forced_unwind&) {
599 			throw;
600 		} catch (...) {
601 			item.second.set_exception(std::current_exception());
602 
603 			return;
604 		}
605 
606 		m_Queues.RepliesPromises.emplace(std::move(item.second));
607 		m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
608 
609 		m_QueuedReads.Set();
610 	}
611 
612 	if (next.Callback) {
613 		next.Callback(yc);
614 	}
615 }
616 
617 /**
618  * Receive the response to a Redis query
619  *
620  * @return The response
621  */
ReadOne(boost::asio::yield_context & yc)622 RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
623 {
624 	if (m_Path.IsEmpty()) {
625 		if (m_TLSContext) {
626 			return ReadOne(m_TlsConn, yc);
627 		} else {
628 			return ReadOne(m_TcpConn, yc);
629 		}
630 	} else {
631 		return ReadOne(m_UnixConn, yc);
632 	}
633 }
634 
635 /**
636  * Send query
637  *
638  * @param query Redis query
639  */
WriteOne(RedisConnection::Query & query,asio::yield_context & yc)640 void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
641 {
642 	if (m_Path.IsEmpty()) {
643 		if (m_TLSContext) {
644 			WriteOne(m_TlsConn, query, yc);
645 		} else {
646 			WriteOne(m_TcpConn, query, yc);
647 		}
648 	} else {
649 		WriteOne(m_UnixConn, query, yc);
650 	}
651 }
652 
653 /**
654  * Specify a callback that is run each time a connection is successfully established
655  *
656  * The callback is executed from a Boost.Asio coroutine and should therefore not perform blocking operations.
657  *
658  * @param callback Callback to execute
659  */
SetConnectedCallback(std::function<void (asio::yield_context & yc)> callback)660 void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
661 	m_ConnectedCallback = std::move(callback);
662 }
663 
IncreasePendingQueries(int count)664 void RedisConnection::IncreasePendingQueries(int count)
665 {
666 	if (m_Parent) {
667 		auto parent (m_Parent);
668 
669 		asio::post(parent->m_Strand, [parent, count]() {
670 			parent->IncreasePendingQueries(count);
671 		});
672 	} else {
673 		m_PendingQueries += count;
674 		m_InputQueries.InsertValue(Utility::GetTime(), count);
675 	}
676 }
677 
DecreasePendingQueries(int count)678 void RedisConnection::DecreasePendingQueries(int count)
679 {
680 	if (m_Parent) {
681 		auto parent (m_Parent);
682 
683 		asio::post(parent->m_Strand, [parent, count]() {
684 			parent->DecreasePendingQueries(count);
685 		});
686 	} else {
687 		m_PendingQueries -= count;
688 		m_OutputQueries.InsertValue(Utility::GetTime(), count);
689 	}
690 }
691