1 /*
2  *          Copyright Andrey Semashev 2007 - 2015.
3  * Distributed under the Boost Software License, Version 1.0.
4  *    (See accompanying file LICENSE_1_0.txt or copy at
5  *          http://www.boost.org/LICENSE_1_0.txt)
6  */
7 
8 #include <string>
9 #include <fstream>
10 #include <iostream>
11 #include <stdexcept>
12 #include <boost/smart_ptr/shared_ptr.hpp>
13 #include <boost/date_time/posix_time/posix_time_types.hpp>
14 #include <boost/phoenix.hpp>
15 #include <boost/log/core.hpp>
16 #include <boost/log/expressions.hpp>
17 #include <boost/log/sinks/sync_frontend.hpp>
18 #include <boost/log/sinks/basic_sink_backend.hpp>
19 #include <boost/log/sources/logger.hpp>
20 #include <boost/log/sources/record_ostream.hpp>
21 #include <boost/log/attributes/value_visitation.hpp>
22 #include <boost/log/utility/manipulators/add_value.hpp>
23 
24 namespace logging = boost::log;
25 namespace src = boost::log::sources;
26 namespace expr = boost::log::expressions;
27 namespace sinks = boost::log::sinks;
28 namespace keywords = boost::log::keywords;
29 
30 //[ example_extension_stat_collector_definition
31 // The backend collects statistical information about network activity of the application
32 class stat_collector :
33     public sinks::basic_sink_backend<
34         sinks::combine_requirements<
35             sinks::synchronized_feeding,                                        /*< we will have to store internal data, so let's require frontend to synchronize feeding calls to the backend >*/
36             sinks::flushing                                                     /*< also enable flushing support >*/
37         >::type
38     >
39 {
40 private:
41     // The file to write the collected information to
42     std::ofstream m_csv_file;
43 
44     // Here goes the data collected so far:
45     // Active connections
46     unsigned int m_active_connections;
47     // Sent bytes
48     unsigned int m_sent_bytes;
49     // Received bytes
50     unsigned int m_received_bytes;
51 
52     // The number of collected records since the last write to the file
53     unsigned int m_collected_count;
54     // The time when the collected data has been written to the file last time
55     boost::posix_time::ptime m_last_store_time;
56 
57 public:
58     // The constructor initializes the internal data
59     explicit stat_collector(const char* file_name);
60 
61     // The function consumes the log records that come from the frontend
62     void consume(logging::record_view const& rec);
63     // The function flushes the file
64     void flush();
65 
66 private:
67     // The function resets statistical accumulators to initial values
68     void reset_accumulators();
69     // The function writes the collected data to the file
70     void write_data();
71 };
72 //]
73 
74 // The constructor initializes the internal data
stat_collector(const char * file_name)75 stat_collector::stat_collector(const char* file_name) :
76     m_csv_file(file_name, std::ios::app),
77     m_active_connections(0),
78     m_last_store_time(boost::posix_time::microsec_clock::universal_time())
79 {
80     reset_accumulators();
81     if (!m_csv_file.is_open())
82         throw std::runtime_error("could not open the CSV file");
83 }
84 
85 //[ example_extension_stat_collector_consume
86 BOOST_LOG_ATTRIBUTE_KEYWORD(sent, "Sent", unsigned int)
87 BOOST_LOG_ATTRIBUTE_KEYWORD(received, "Received", unsigned int)
88 
89 // The function consumes the log records that come from the frontend
consume(logging::record_view const & rec)90 void stat_collector::consume(logging::record_view const& rec)
91 {
92     // Accumulate statistical readings
93     if (rec.attribute_values().count("Connected"))
94         ++m_active_connections;
95     else if (rec.attribute_values().count("Disconnected"))
96         --m_active_connections;
97     else
98     {
99         namespace phoenix = boost::phoenix;
100         logging::visit(sent, rec, phoenix::ref(m_sent_bytes) += phoenix::placeholders::_1);
101         logging::visit(received, rec, phoenix::ref(m_received_bytes) += phoenix::placeholders::_1);
102     }
103     ++m_collected_count;
104 
105     // Check if it's time to write the accumulated data to the file
106     boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
107     if (now - m_last_store_time >= boost::posix_time::minutes(1))
108     {
109         write_data();
110         m_last_store_time = now;
111     }
112 }
113 
114 // The function writes the collected data to the file
write_data()115 void stat_collector::write_data()
116 {
117     m_csv_file << m_active_connections
118         << ',' << m_sent_bytes
119         << ',' << m_received_bytes
120         << std::endl;
121     reset_accumulators();
122 }
123 
124 // The function resets statistical accumulators to initial values
reset_accumulators()125 void stat_collector::reset_accumulators()
126 {
127     m_sent_bytes = m_received_bytes = 0;
128     m_collected_count = 0;
129 }
130 //]
131 
132 //[ example_extension_stat_collector_flush
133 // The function flushes the file
flush()134 void stat_collector::flush()
135 {
136     // Store any data that may have been collected since the list write to the file
137     if (m_collected_count > 0)
138     {
139         write_data();
140         m_last_store_time = boost::posix_time::microsec_clock::universal_time();
141     }
142 
143     m_csv_file.flush();
144 }
145 //]
146 
147 // Complete sink type
148 typedef sinks::synchronous_sink< stat_collector > sink_t;
149 
init_logging()150 void init_logging()
151 {
152     boost::shared_ptr< logging::core > core = logging::core::get();
153 
154     boost::shared_ptr< stat_collector > backend(new stat_collector("stat.csv"));
155     boost::shared_ptr< sink_t > sink(new sink_t(backend));
156     core->add_sink(sink);
157 }
158 
main(int,char * [])159 int main(int, char*[])
160 {
161     init_logging();
162 
163     src::logger lg;
164     BOOST_LOG(lg) << logging::add_value("Connected", true);
165     BOOST_LOG(lg) << logging::add_value("Sent", 100u);
166     BOOST_LOG(lg) << logging::add_value("Received", 200u);
167 
168     logging::core::get()->flush();
169 
170     return 0;
171 }
172