1 #include <ctime>
2 #include <cctype>
3 
4 #include "core/RoleFactory.h"
5 #include "config/constraints.h"
6 #include "EventLogger.h"
7 #include "util/EventSender.h"
8 
9 #include "msgpack_decode.h"
10 
11 static RoleConfigGroup el_config("eventlogger");
12 static ConfigVariable<std::string> bind_addr("bind", "0.0.0.0:7197", el_config);
13 static ConfigVariable<std::string> output_format("output", "events-%Y%m%d-%H%M%S.log", el_config);
14 static ConfigVariable<std::string> rotate_interval("rotate_interval", "0", el_config);
15 static ValidAddressConstraint valid_bind_addr(bind_addr);
16 
EventLogger(RoleConfig roleconfig)17 EventLogger::EventLogger(RoleConfig roleconfig) : Role(roleconfig),
18     m_log("eventlogger", "Event Logger"), m_file(nullptr)
19 {
20     bind(bind_addr.get_rval(roleconfig));
21 
22     m_file_format = output_format.get_rval(roleconfig);
23     open_log();
24 
25     LoggedEvent event("log-opened", "EventLogger");
26     event.add("msg", "Log opened upon Event Logger startup.");
27     process_packet(event.make_datagram(), m_local);
28 }
29 
bind(const std::string & addr)30 void EventLogger::bind(const std::string &addr)
31 {
32     // We have to make sure our bind happens within the context of the main thread.
33     assert(std::this_thread::get_id() == g_main_thread_id);
34 
35     m_log.info() << "Opening UDP socket..." << std::endl;
36     auto addresses = resolve_address(addr, 7197, g_loop);
37 
38     if(addresses.size() == 0) {
39         m_log.fatal() << "Failed to bind to EventLogger address " << addr << "\n";
40         exit(1);
41     }
42 
43     m_local = addresses.front();
44 
45     m_socket = g_loop->resource<uvw::UDPHandle>();
46     m_socket->bind(m_local);
47     start_receive();
48 }
49 
open_log()50 void EventLogger::open_log()
51 {
52     time_t rawtime;
53     time(&rawtime);
54 
55     char filename[1024];
56     strftime(filename, 1024, m_file_format.c_str(), localtime(&rawtime));
57     m_log.debug() << "New log filename: " << filename << std::endl;
58 
59     if(m_file) {
60         m_file->close();
61     }
62 
63     m_file.reset(new std::ofstream(filename));
64 
65     m_log.info() << "Opened new log." << std::endl;
66 }
67 
cycle_log()68 void EventLogger::cycle_log()
69 {
70     open_log();
71 
72     LoggedEvent event("log-opened", "EventLogger");
73     event.add("msg", "Log cycled.");
74     process_packet(event.make_datagram(), m_local);
75 }
76 
process_packet(DatagramHandle dg,const uvw::Addr & sender)77 void EventLogger::process_packet(DatagramHandle dg, const uvw::Addr& sender)
78 {
79     DatagramIterator dgi(dg);
80     std::stringstream stream;
81 
82     try {
83         msgpack_decode(stream, dgi);
84     } catch(const DatagramIteratorEOF&) {
85         m_log.error() << "Received truncated packet from "
86                       << sender.ip << ":" << sender.port << std::endl;
87         return;
88     }
89 
90     if(dgi.tell() != dg->size()) {
91         m_log.error() << "Received packet with extraneous data from "
92                       << sender.ip << ":" << sender.port << std::endl;
93         return;
94     }
95 
96     std::string data = stream.str();
97     m_log.trace() << "Received: " << data << std::endl;
98 
99     // This is a little bit of a kludge, but we should make sure we got a
100     // MessagePack map as the event log element, and not some other type. The
101     // easiest way to do this is to make sure that the JSON representation
102     // begins with {
103     if(data[0] != '{') {
104         m_log.error() << "Received non-map event log from "
105                       << sender.ip << ":" << sender.port
106                       << ": " << data << std::endl;
107         return;
108     }
109 
110     // Now let's insert our timestamp:
111     time_t rawtime;
112     time(&rawtime);
113 
114     char timestamp[64];
115     strftime(timestamp, 64, "{\"_time\": \"%Y-%m-%d %H:%M:%S%z\", ", localtime(&rawtime));
116 
117     *m_file.get() << timestamp << data.substr(1) << std::endl;
118 }
119 
start_receive()120 void EventLogger::start_receive()
121 {
122     m_socket->on<uvw::UDPDataEvent>([this](const uvw::UDPDataEvent &event, uvw::UDPHandle &) {
123         m_log.trace() << "Got packet from " << event.sender.ip
124                       << ":" << event.sender.port << ".\n";
125 
126         DatagramPtr dg = Datagram::create(reinterpret_cast<const uint8_t*>(event.data.get()), event.length);
127         process_packet(dg, event.sender);
128     });
129 
130     m_socket->recv();
131 }
132 
133 static RoleFactoryItem<EventLogger> el_fact("eventlogger");
134