1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "icingadb/redisconnection.hpp"
4 #include "base/array.hpp"
5 #include "base/convert.hpp"
6 #include "base/defer.hpp"
7 #include "base/exception.hpp"
8 #include "base/io-engine.hpp"
9 #include "base/logger.hpp"
10 #include "base/objectlock.hpp"
11 #include "base/string.hpp"
12 #include "base/tcpsocket.hpp"
13 #include "base/tlsutility.hpp"
14 #include "base/utility.hpp"
15 #include <boost/asio.hpp>
16 #include <boost/coroutine/exceptions.hpp>
17 #include <boost/date_time/posix_time/posix_time_duration.hpp>
18 #include <boost/utility/string_view.hpp>
19 #include <boost/variant/get.hpp>
20 #include <exception>
21 #include <future>
22 #include <iterator>
23 #include <memory>
24 #include <openssl/ssl.h>
25 #include <openssl/x509_vfy.h>
26 #include <utility>
27
28 using namespace icinga;
29 namespace asio = boost::asio;
30
31 boost::regex RedisConnection::m_ErrAuth ("\\AERR AUTH ");
32
RedisConnection(const String & host,int port,const String & path,const String & password,int db,bool useTls,bool insecure,const String & certPath,const String & keyPath,const String & caPath,const String & crlPath,const String & tlsProtocolmin,const String & cipherList,double connectTimeout,DebugInfo di,const RedisConnection::Ptr & parent)33 RedisConnection::RedisConnection(const String& host, int port, const String& path, const String& password, int db,
34 bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath,
35 const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
36 : RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db,
37 useTls, insecure, certPath, keyPath, caPath, crlPath, tlsProtocolmin, cipherList, connectTimeout, std::move(di), parent)
38 {
39 }
40
RedisConnection(boost::asio::io_context & io,String host,int port,String path,String password,int db,bool useTls,bool insecure,String certPath,String keyPath,String caPath,String crlPath,String tlsProtocolmin,String cipherList,double connectTimeout,DebugInfo di,const RedisConnection::Ptr & parent)41 RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password,
42 int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath,
43 String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
44 : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)),
45 m_DbIndex(db), m_CertPath(std::move(certPath)), m_KeyPath(std::move(keyPath)), m_Insecure(insecure),
46 m_CaPath(std::move(caPath)), m_CrlPath(std::move(crlPath)), m_TlsProtocolmin(std::move(tlsProtocolmin)),
47 m_CipherList(std::move(cipherList)), m_ConnectTimeout(connectTimeout), m_DebugInfo(std::move(di)), m_Connecting(false), m_Connected(false),
48 m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent)
49 {
50 if (useTls && m_Path.IsEmpty()) {
51 UpdateTLSContext();
52 }
53 }
54
UpdateTLSContext()55 void RedisConnection::UpdateTLSContext()
56 {
57 m_TLSContext = SetupSslContext(m_CertPath, m_KeyPath, m_CaPath,
58 m_CrlPath, m_CipherList, m_TlsProtocolmin, m_DebugInfo);
59 }
60
Start()61 void RedisConnection::Start()
62 {
63 if (!m_Started.exchange(true)) {
64 Ptr keepAlive (this);
65
66 IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
67 IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
68
69 if (!m_Parent) {
70 IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); });
71 }
72 }
73
74 if (!m_Connecting.exchange(true)) {
75 Ptr keepAlive (this);
76
77 IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
78 }
79 }
80
IsConnected()81 bool RedisConnection::IsConnected() {
82 return m_Connected.load();
83 }
84
85 /**
86 * Append a Redis query to a log message
87 *
88 * @param query Redis query
89 * @param msg Log message
90 */
91 static inline
LogQuery(RedisConnection::Query & query,Log & msg)92 void LogQuery(RedisConnection::Query& query, Log& msg)
93 {
94 int i = 0;
95
96 for (auto& arg : query) {
97 if (++i == 8) {
98 msg << " ...";
99 break;
100 }
101
102 if (arg.GetLength() > 64) {
103 msg << " '" << arg.SubStr(0, 61) << "...'";
104 } else {
105 msg << " '" << arg << '\'';
106 }
107 }
108 }
109
110 /**
111 * Queue a Redis query for sending
112 *
113 * @param query Redis query
114 * @param priority The query's priority
115 */
FireAndForgetQuery(RedisConnection::Query query,RedisConnection::QueryPriority priority)116 void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
117 {
118 {
119 Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
120 LogQuery(query, msg);
121 }
122
123 auto item (Shared<Query>::Make(std::move(query)));
124
125 asio::post(m_Strand, [this, item, priority]() {
126 m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
127 m_QueuedWrites.Set();
128 IncreasePendingQueries(1);
129 });
130 }
131
132 /**
133 * Queue Redis queries for sending
134 *
135 * @param queries Redis queries
136 * @param priority The queries' priority
137 */
FireAndForgetQueries(RedisConnection::Queries queries,RedisConnection::QueryPriority priority)138 void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
139 {
140 for (auto& query : queries) {
141 Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
142 LogQuery(query, msg);
143 }
144
145 auto item (Shared<Queries>::Make(std::move(queries)));
146
147 asio::post(m_Strand, [this, item, priority]() {
148 m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
149 m_QueuedWrites.Set();
150 IncreasePendingQueries(item->size());
151 });
152 }
153
154 /**
155 * Queue a Redis query for sending, wait for the response and return (or throw) it
156 *
157 * @param query Redis query
158 * @param priority The query's priority
159 *
160 * @return The response
161 */
GetResultOfQuery(RedisConnection::Query query,RedisConnection::QueryPriority priority)162 RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
163 {
164 {
165 Log msg (LogDebug, "IcingaDB", "Executing query:");
166 LogQuery(query, msg);
167 }
168
169 std::promise<Reply> promise;
170 auto future (promise.get_future());
171 auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
172
173 asio::post(m_Strand, [this, item, priority]() {
174 m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
175 m_QueuedWrites.Set();
176 IncreasePendingQueries(1);
177 });
178
179 item = nullptr;
180 future.wait();
181 return future.get();
182 }
183
184 /**
185 * Queue Redis queries for sending, wait for the responses and return (or throw) them
186 *
187 * @param queries Redis queries
188 * @param priority The queries' priority
189 *
190 * @return The responses
191 */
GetResultsOfQueries(RedisConnection::Queries queries,RedisConnection::QueryPriority priority)192 RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
193 {
194 for (auto& query : queries) {
195 Log msg (LogDebug, "IcingaDB", "Executing query:");
196 LogQuery(query, msg);
197 }
198
199 std::promise<Replies> promise;
200 auto future (promise.get_future());
201 auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
202
203 asio::post(m_Strand, [this, item, priority]() {
204 m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
205 m_QueuedWrites.Set();
206 IncreasePendingQueries(item->first.size());
207 });
208
209 item = nullptr;
210 future.wait();
211 return future.get();
212 }
213
EnqueueCallback(const std::function<void (boost::asio::yield_context &)> & callback,RedisConnection::QueryPriority priority)214 void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, RedisConnection::QueryPriority priority)
215 {
216 asio::post(m_Strand, [this, callback, priority]() {
217 m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback});
218 m_QueuedWrites.Set();
219 });
220 }
221
222 /**
223 * Puts a no-op command with a result at the end of the queue and wait for the result,
224 * i.e. for everything enqueued to be processed by the server.
225 *
226 * @ingroup icingadb
227 */
Sync()228 void RedisConnection::Sync()
229 {
230 GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection);
231 }
232
233 /**
234 * Mark kind as kind of queries not to actually send yet
235 *
236 * @param kind Query kind
237 */
SuppressQueryKind(RedisConnection::QueryPriority kind)238 void RedisConnection::SuppressQueryKind(RedisConnection::QueryPriority kind)
239 {
240 asio::post(m_Strand, [this, kind]() { m_SuppressedQueryKinds.emplace(kind); });
241 }
242
243 /**
244 * Unmark kind as kind of queries not to actually send yet
245 *
246 * @param kind Query kind
247 */
UnsuppressQueryKind(RedisConnection::QueryPriority kind)248 void RedisConnection::UnsuppressQueryKind(RedisConnection::QueryPriority kind)
249 {
250 asio::post(m_Strand, [this, kind]() {
251 m_SuppressedQueryKinds.erase(kind);
252 m_QueuedWrites.Set();
253 });
254 }
255
256 /**
257 * Try to connect to Redis
258 */
Connect(asio::yield_context & yc)259 void RedisConnection::Connect(asio::yield_context& yc)
260 {
261 Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
262
263 boost::asio::deadline_timer timer (m_Strand.context());
264
265 for (;;) {
266 try {
267 if (m_Path.IsEmpty()) {
268 if (m_TLSContext) {
269 Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
270 << "Trying to connect to Redis server (async, TLS) on host '" << m_Host << ":" << m_Port << "'";
271
272 auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
273 auto& tlsConn (conn->next_layer());
274 auto connectTimeout (MakeTimeout(conn));
275 Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
276
277 icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
278 tlsConn.async_handshake(tlsConn.client, yc);
279
280 if (!m_Insecure) {
281 std::shared_ptr<X509> cert (tlsConn.GetPeerCertificate());
282
283 if (!cert) {
284 BOOST_THROW_EXCEPTION(std::runtime_error(
285 "Redis didn't present any TLS certificate."
286 ));
287 }
288
289 if (!tlsConn.IsVerifyOK()) {
290 BOOST_THROW_EXCEPTION(std::runtime_error(
291 "TLS certificate validation failed: " + std::string(tlsConn.GetVerifyError())
292 ));
293 }
294 }
295
296 Handshake(conn, yc);
297 m_TlsConn = std::move(conn);
298 } else {
299 Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
300 << "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'";
301
302 auto conn (Shared<TcpConn>::Make(m_Strand.context()));
303 auto connectTimeout (MakeTimeout(conn));
304 Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
305
306 icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
307 Handshake(conn, yc);
308 m_TcpConn = std::move(conn);
309 }
310 } else {
311 Log(LogInformation, "IcingaDB")
312 << "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'";
313
314 auto conn (Shared<UnixConn>::Make(m_Strand.context()));
315 auto connectTimeout (MakeTimeout(conn));
316 Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
317
318 conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
319 Handshake(conn, yc);
320 m_UnixConn = std::move(conn);
321 }
322
323 m_Connected.store(true);
324
325 Log(m_Parent ? LogNotice : LogInformation, "IcingaDB", "Connected to Redis server");
326
327 // Operate on a copy so that the callback can set a new callback without destroying itself while running.
328 auto callback (m_ConnectedCallback);
329 if (callback) {
330 callback(yc);
331 }
332
333 break;
334 } catch (const boost::coroutines::detail::forced_unwind&) {
335 throw;
336 } catch (const std::exception& ex) {
337 Log(LogCritical, "IcingaDB")
338 << "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
339 }
340
341 timer.expires_from_now(boost::posix_time::seconds(5));
342 timer.async_wait(yc);
343 }
344
345 }
346
347 /**
348 * Actually receive the responses to the Redis queries send by WriteItem() and handle them
349 */
ReadLoop(asio::yield_context & yc)350 void RedisConnection::ReadLoop(asio::yield_context& yc)
351 {
352 for (;;) {
353 m_QueuedReads.Wait(yc);
354
355 while (!m_Queues.FutureResponseActions.empty()) {
356 auto item (std::move(m_Queues.FutureResponseActions.front()));
357 m_Queues.FutureResponseActions.pop();
358
359 switch (item.Action) {
360 case ResponseAction::Ignore:
361 try {
362 for (auto i (item.Amount); i; --i) {
363 ReadOne(yc);
364 }
365 } catch (const boost::coroutines::detail::forced_unwind&) {
366 throw;
367 } catch (const std::exception& ex) {
368 Log(LogCritical, "IcingaDB")
369 << "Error during receiving the response to a query which has been fired and forgotten: " << ex.what();
370
371 continue;
372 } catch (...) {
373 Log(LogCritical, "IcingaDB")
374 << "Error during receiving the response to a query which has been fired and forgotten";
375
376 continue;
377 }
378
379 break;
380 case ResponseAction::Deliver:
381 for (auto i (item.Amount); i; --i) {
382 auto promise (std::move(m_Queues.ReplyPromises.front()));
383 m_Queues.ReplyPromises.pop();
384
385 Reply reply;
386
387 try {
388 reply = ReadOne(yc);
389 } catch (const boost::coroutines::detail::forced_unwind&) {
390 throw;
391 } catch (...) {
392 promise.set_exception(std::current_exception());
393
394 continue;
395 }
396
397 promise.set_value(std::move(reply));
398 }
399
400 break;
401 case ResponseAction::DeliverBulk:
402 {
403 auto promise (std::move(m_Queues.RepliesPromises.front()));
404 m_Queues.RepliesPromises.pop();
405
406 Replies replies;
407 replies.reserve(item.Amount);
408
409 for (auto i (item.Amount); i; --i) {
410 try {
411 replies.emplace_back(ReadOne(yc));
412 } catch (const boost::coroutines::detail::forced_unwind&) {
413 throw;
414 } catch (...) {
415 promise.set_exception(std::current_exception());
416
417 continue;
418 }
419 }
420
421 promise.set_value(std::move(replies));
422 }
423 }
424 }
425
426 m_QueuedReads.Clear();
427 }
428 }
429
430 /**
431 * Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}()
432 */
WriteLoop(asio::yield_context & yc)433 void RedisConnection::WriteLoop(asio::yield_context& yc)
434 {
435 for (;;) {
436 m_QueuedWrites.Wait(yc);
437
438 WriteFirstOfHighestPrio:
439 for (auto& queue : m_Queues.Writes) {
440 if (m_SuppressedQueryKinds.find(queue.first) != m_SuppressedQueryKinds.end() || queue.second.empty()) {
441 continue;
442 }
443
444 auto next (std::move(queue.second.front()));
445 queue.second.pop();
446
447 WriteItem(yc, std::move(next));
448
449 goto WriteFirstOfHighestPrio;
450 }
451
452 m_QueuedWrites.Clear();
453 }
454 }
455
456 /**
457 * Periodically log current query performance
458 */
LogStats(asio::yield_context & yc)459 void RedisConnection::LogStats(asio::yield_context& yc)
460 {
461 double lastMessage = 0;
462
463 m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
464
465 for (;;) {
466 m_LogStatsTimer.async_wait(yc);
467 m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
468
469 if (!IsConnected())
470 continue;
471
472 auto now (Utility::GetTime());
473 bool timeoutReached = now - lastMessage >= 5 * 60;
474
475 if (m_PendingQueries < 1 && !timeoutReached)
476 continue;
477
478 auto output (round(m_OutputQueries.CalculateRate(now, 10)));
479
480 if (m_PendingQueries < output * 5 && !timeoutReached)
481 continue;
482
483 Log(LogInformation, "IcingaDB")
484 << "Pending queries: " << m_PendingQueries << " (Input: "
485 << round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)";
486
487 lastMessage = now;
488 }
489 }
490
491 /**
492 * Send next and schedule receiving the response
493 *
494 * @param next Redis queries
495 */
WriteItem(boost::asio::yield_context & yc,RedisConnection::WriteQueueItem next)496 void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
497 {
498 if (next.FireAndForgetQuery) {
499 auto& item (*next.FireAndForgetQuery);
500 DecreasePendingQueries(1);
501
502 try {
503 WriteOne(item, yc);
504 } catch (const boost::coroutines::detail::forced_unwind&) {
505 throw;
506 } catch (const std::exception& ex) {
507 Log msg (LogCritical, "IcingaDB", "Error during sending query");
508 LogQuery(item, msg);
509 msg << " which has been fired and forgotten: " << ex.what();
510
511 return;
512 } catch (...) {
513 Log msg (LogCritical, "IcingaDB", "Error during sending query");
514 LogQuery(item, msg);
515 msg << " which has been fired and forgotten";
516
517 return;
518 }
519
520 if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
521 m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
522 } else {
523 ++m_Queues.FutureResponseActions.back().Amount;
524 }
525
526 m_QueuedReads.Set();
527 }
528
529 if (next.FireAndForgetQueries) {
530 auto& item (*next.FireAndForgetQueries);
531 size_t i = 0;
532
533 DecreasePendingQueries(item.size());
534
535 try {
536 for (auto& query : item) {
537 WriteOne(query, yc);
538 ++i;
539 }
540 } catch (const boost::coroutines::detail::forced_unwind&) {
541 throw;
542 } catch (const std::exception& ex) {
543 Log msg (LogCritical, "IcingaDB", "Error during sending query");
544 LogQuery(item[i], msg);
545 msg << " which has been fired and forgotten: " << ex.what();
546
547 return;
548 } catch (...) {
549 Log msg (LogCritical, "IcingaDB", "Error during sending query");
550 LogQuery(item[i], msg);
551 msg << " which has been fired and forgotten";
552
553 return;
554 }
555
556 if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
557 m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
558 } else {
559 m_Queues.FutureResponseActions.back().Amount += item.size();
560 }
561
562 m_QueuedReads.Set();
563 }
564
565 if (next.GetResultOfQuery) {
566 auto& item (*next.GetResultOfQuery);
567 DecreasePendingQueries(1);
568
569 try {
570 WriteOne(item.first, yc);
571 } catch (const boost::coroutines::detail::forced_unwind&) {
572 throw;
573 } catch (...) {
574 item.second.set_exception(std::current_exception());
575
576 return;
577 }
578
579 m_Queues.ReplyPromises.emplace(std::move(item.second));
580
581 if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
582 m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
583 } else {
584 ++m_Queues.FutureResponseActions.back().Amount;
585 }
586
587 m_QueuedReads.Set();
588 }
589
590 if (next.GetResultsOfQueries) {
591 auto& item (*next.GetResultsOfQueries);
592 DecreasePendingQueries(item.first.size());
593
594 try {
595 for (auto& query : item.first) {
596 WriteOne(query, yc);
597 }
598 } catch (const boost::coroutines::detail::forced_unwind&) {
599 throw;
600 } catch (...) {
601 item.second.set_exception(std::current_exception());
602
603 return;
604 }
605
606 m_Queues.RepliesPromises.emplace(std::move(item.second));
607 m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
608
609 m_QueuedReads.Set();
610 }
611
612 if (next.Callback) {
613 next.Callback(yc);
614 }
615 }
616
617 /**
618 * Receive the response to a Redis query
619 *
620 * @return The response
621 */
ReadOne(boost::asio::yield_context & yc)622 RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
623 {
624 if (m_Path.IsEmpty()) {
625 if (m_TLSContext) {
626 return ReadOne(m_TlsConn, yc);
627 } else {
628 return ReadOne(m_TcpConn, yc);
629 }
630 } else {
631 return ReadOne(m_UnixConn, yc);
632 }
633 }
634
635 /**
636 * Send query
637 *
638 * @param query Redis query
639 */
WriteOne(RedisConnection::Query & query,asio::yield_context & yc)640 void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
641 {
642 if (m_Path.IsEmpty()) {
643 if (m_TLSContext) {
644 WriteOne(m_TlsConn, query, yc);
645 } else {
646 WriteOne(m_TcpConn, query, yc);
647 }
648 } else {
649 WriteOne(m_UnixConn, query, yc);
650 }
651 }
652
653 /**
654 * Specify a callback that is run each time a connection is successfully established
655 *
656 * The callback is executed from a Boost.Asio coroutine and should therefore not perform blocking operations.
657 *
658 * @param callback Callback to execute
659 */
SetConnectedCallback(std::function<void (asio::yield_context & yc)> callback)660 void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
661 m_ConnectedCallback = std::move(callback);
662 }
663
IncreasePendingQueries(int count)664 void RedisConnection::IncreasePendingQueries(int count)
665 {
666 if (m_Parent) {
667 auto parent (m_Parent);
668
669 asio::post(parent->m_Strand, [parent, count]() {
670 parent->IncreasePendingQueries(count);
671 });
672 } else {
673 m_PendingQueries += count;
674 m_InputQueries.InsertValue(Utility::GetTime(), count);
675 }
676 }
677
DecreasePendingQueries(int count)678 void RedisConnection::DecreasePendingQueries(int count)
679 {
680 if (m_Parent) {
681 auto parent (m_Parent);
682
683 asio::post(parent->m_Strand, [parent, count]() {
684 parent->DecreasePendingQueries(count);
685 });
686 } else {
687 m_PendingQueries -= count;
688 m_OutputQueries.InsertValue(Utility::GetTime(), count);
689 }
690 }
691