1 /*
2 Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "BufferedLogHandler.hpp"
26
27 struct ThreadData
28 {
29 BufferedLogHandler* buf_loghandler;
30 };
31
async_log_function(void * args)32 void* async_log_function(void* args)
33 {
34 ThreadData* data = (ThreadData*)args;
35 BufferedLogHandler* buf_loghandler = data->buf_loghandler;
36
37
38 while (!buf_loghandler->isStopSet())
39 {
40 buf_loghandler->writeToDestLogHandler();
41 }
42
43 // print left over messages, if any
44 while (buf_loghandler->writeToDestLogHandler());
45
46 // print lost count in the end, if any
47 buf_loghandler->writeLostMsgDestLogHandler();
48
49 delete data;
50
51 return NULL;
52 }
53
BufferedLogHandler(LogHandler * dest_loghandler)54 BufferedLogHandler::BufferedLogHandler(LogHandler* dest_loghandler)
55 : LogHandler(), m_dest_loghandler(dest_loghandler),
56 m_log_threadvar(NULL), m_stop_logging(false)
57 {
58 m_logbuf = new LogBuffer(32768, new MessageStreamLostMsgHandler()); // 32kB
59 ThreadData *thr_data = new ThreadData();
60 thr_data->buf_loghandler = this;
61
62 m_log_threadvar = NdbThread_Create(async_log_function,
63 (void**)thr_data,
64 0,
65 (char*)"async_local_log_thread",
66 NDB_THREAD_PRIO_MEAN);
67 if (m_log_threadvar == NULL)
68 {
69 abort();
70 }
71 }
72
~BufferedLogHandler()73 BufferedLogHandler::~BufferedLogHandler()
74 {
75 m_stop_logging = true;
76 m_logbuf->stop();
77 NdbThread_WaitFor(m_log_threadvar, NULL);
78 NdbThread_Destroy(&m_log_threadvar);
79 delete m_logbuf;
80 }
81
82 bool
open()83 BufferedLogHandler::open()
84 {
85 return true;
86 }
87
88 bool
close()89 BufferedLogHandler::close()
90 {
91 return true;
92 }
93
94 bool
is_open()95 BufferedLogHandler::is_open()
96 {
97 if (m_log_threadvar == NULL)
98 {
99 return false;
100 }
101 return true;
102 }
103
104 //
105 // PROTECTED
106 //
107 void
writeHeader(const char * pCategory,Logger::LoggerLevel level,time_t now)108 BufferedLogHandler::writeHeader(const char* pCategory,
109 Logger::LoggerLevel level,
110 time_t now)
111 {
112 /**
113 * Add log level, timestamp, category length to m_log_fixedpart and
114 * category to m_log_varpart.
115 */
116 m_log_fixedpart.level = level;
117 m_log_fixedpart.log_timestamp = now;
118
119 size_t pCategory_len = strlen(pCategory);
120 m_log_fixedpart.varpart_length[0] = pCategory_len;
121 memcpy(m_log_varpart, pCategory, pCategory_len);
122 }
123
124 void
writeMessage(const char * pMsg)125 BufferedLogHandler::writeMessage(const char* pMsg)
126 {
127 // add message length to m_log_fixedpart and the message to m_log_varpart
128 size_t pMsg_len = strlen(pMsg);
129
130 m_log_fixedpart.varpart_length[1] = pMsg_len;
131 memcpy(m_log_varpart + m_log_fixedpart.varpart_length[0], pMsg, pMsg_len);
132 }
133
134 void
writeFooter()135 BufferedLogHandler::writeFooter()
136 {
137 // add the LogHandler::append() parameters to the log buffer
138 /**
139 * LogBuffer contents:
140 * ([log-fixed-part] [log-var-part])*
141 */
142 size_t total_log_size = sizeof(LogMessageFixedPart) +
143 m_log_fixedpart.varpart_length[0] +
144 m_log_fixedpart.varpart_length[1];
145
146 memcpy(m_to_append, &m_log_fixedpart, sizeof(LogMessageFixedPart));
147 memcpy(m_to_append + sizeof(LogMessageFixedPart), m_log_varpart,
148 m_log_fixedpart.varpart_length[0] +
149 m_log_fixedpart.varpart_length[1]);
150
151 m_logbuf->append((void*)&m_to_append, total_log_size);
152 }
153
154 bool
isStopSet()155 BufferedLogHandler::isStopSet()
156 {
157 return m_stop_logging;
158 }
159
160 bool
setParam(const BaseString & param,const BaseString & value)161 BufferedLogHandler::setParam(const BaseString ¶m, const BaseString &value)
162 {
163 return true;
164 }
165
166 bool
writeToDestLogHandler()167 BufferedLogHandler::writeToDestLogHandler()
168 {
169 char category[LogHandler::MAX_HEADER_LENGTH + 1];
170 char msg[MAX_LOG_MESSAGE_SIZE + 1];
171 LogMessageFixedPart log_fixed_part;
172
173 if (m_logbuf->get((char*)&log_fixed_part, sizeof(LogMessageFixedPart)) != 0)
174 {
175 assert(log_fixed_part.varpart_length[0] <= LogHandler::MAX_HEADER_LENGTH);
176 assert(log_fixed_part.varpart_length[1] <= MAX_LOG_MESSAGE_SIZE);
177 m_logbuf->get(category, log_fixed_part.varpart_length[0]);
178 m_logbuf->get(msg, log_fixed_part.varpart_length[1]);
179 category[log_fixed_part.varpart_length[0]] = '\0';
180 msg[log_fixed_part.varpart_length[1]] = '\0';
181
182 m_dest_loghandler->append(category, log_fixed_part.level, msg,
183 log_fixed_part.log_timestamp);
184 return true;
185 }
186 return false;
187 }
188
189 void
writeLostMsgDestLogHandler()190 BufferedLogHandler::writeLostMsgDestLogHandler()
191 {
192 size_t lost_count = m_logbuf->getLostCount();
193 char category[LogHandler::MAX_HEADER_LENGTH + 1];
194 char msg[MAX_LOG_MESSAGE_SIZE + 1];
195
196 if (lost_count)
197 {
198 strcpy(category, "MgmtSrvr");
199 Logger::LoggerLevel level = Logger::LL_INFO;
200 BaseString::snprintf(msg, MAX_LOG_MESSAGE_SIZE,
201 "*** %lu MESSAGES LOST ***",
202 (unsigned long)lost_count);
203 time_t now = ::time((time_t*)NULL);
204 m_dest_loghandler->append(category, level, msg, now);
205 }
206 }
207
208 size_t
getSizeOfLostMsg(size_t lost_bytes,size_t lost_msgs)209 MessageStreamLostMsgHandler::getSizeOfLostMsg(size_t lost_bytes, size_t lost_msgs)
210 {
211 size_t lost_msg_len = sizeof(BufferedLogHandler::LogMessageFixedPart) +
212 snprintf(NULL, 0, m_lost_msg_fmt, lost_msgs) +
213 strlen(m_category);
214 return lost_msg_len;
215 }
216
217 bool
writeLostMsg(char * buf,size_t buf_size,size_t lost_bytes,size_t lost_msgs)218 MessageStreamLostMsgHandler::writeLostMsg(char* buf, size_t buf_size, size_t lost_bytes, size_t lost_msgs)
219 {
220 BufferedLogHandler::LogMessageFixedPart lost_message_fixedpart;
221 lost_message_fixedpart.level = Logger::LL_DEBUG;
222 lost_message_fixedpart.log_timestamp = time((time_t*)NULL);
223
224 lost_message_fixedpart.varpart_length[0] = strlen(m_category);
225 lost_message_fixedpart.varpart_length[1] =
226 snprintf(NULL, 0, m_lost_msg_fmt, lost_msgs);
227
228 const size_t sz_fixedpart = sizeof(lost_message_fixedpart);
229 memcpy(buf, &lost_message_fixedpart, sz_fixedpart);
230 memcpy(buf + sz_fixedpart, m_category, strlen(m_category));
231 snprintf(buf + sz_fixedpart + strlen(m_category),
232 lost_message_fixedpart.varpart_length[1],
233 m_lost_msg_fmt, lost_msgs);
234 return true;
235 }
236