1 /*
2    Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "BufferedLogHandler.hpp"
26 
27 struct ThreadData
28 {
29   BufferedLogHandler* buf_loghandler;
30 };
31 
async_log_function(void * args)32 void* async_log_function(void* args)
33 {
34   ThreadData* data = (ThreadData*)args;
35   BufferedLogHandler* buf_loghandler = data->buf_loghandler;
36 
37 
38   while (!buf_loghandler->isStopSet())
39   {
40     buf_loghandler->writeToDestLogHandler();
41   }
42 
43   // print left over messages, if any
44   while (buf_loghandler->writeToDestLogHandler());
45 
46   // print lost count in the end, if any
47   buf_loghandler->writeLostMsgDestLogHandler();
48 
49   delete data;
50 
51   return NULL;
52 }
53 
BufferedLogHandler(LogHandler * dest_loghandler)54 BufferedLogHandler::BufferedLogHandler(LogHandler* dest_loghandler)
55  : LogHandler(), m_dest_loghandler(dest_loghandler),
56    m_log_threadvar(NULL), m_stop_logging(false)
57 {
58   m_logbuf = new LogBuffer(32768, new MessageStreamLostMsgHandler()); // 32kB
59   ThreadData *thr_data = new ThreadData();
60   thr_data->buf_loghandler = this;
61 
62   m_log_threadvar = NdbThread_Create(async_log_function,
63                    (void**)thr_data,
64                    0,
65                    (char*)"async_local_log_thread",
66                    NDB_THREAD_PRIO_MEAN);
67   if (m_log_threadvar == NULL)
68   {
69     abort();
70   }
71 }
72 
~BufferedLogHandler()73 BufferedLogHandler::~BufferedLogHandler()
74 {
75   m_stop_logging = true;
76   m_logbuf->stop();
77   NdbThread_WaitFor(m_log_threadvar, NULL);
78   NdbThread_Destroy(&m_log_threadvar);
79   delete m_logbuf;
80 }
81 
82 bool
open()83 BufferedLogHandler::open()
84 {
85   return true;
86 }
87 
88 bool
close()89 BufferedLogHandler::close()
90 {
91   return true;
92 }
93 
94 bool
is_open()95 BufferedLogHandler::is_open()
96 {
97   if (m_log_threadvar == NULL)
98   {
99     return false;
100   }
101   return true;
102 }
103 
104 //
105 // PROTECTED
106 //
107 void
writeHeader(const char * pCategory,Logger::LoggerLevel level,time_t now)108 BufferedLogHandler::writeHeader(const char* pCategory,
109                                 Logger::LoggerLevel level,
110                                 time_t now)
111 {
112   /**
113    * Add log level, timestamp, category length to m_log_fixedpart and
114    * category to m_log_varpart.
115    */
116   m_log_fixedpart.level = level;
117   m_log_fixedpart.log_timestamp = now;
118 
119   size_t pCategory_len = strlen(pCategory);
120   m_log_fixedpart.varpart_length[0] = pCategory_len;
121   memcpy(m_log_varpart, pCategory, pCategory_len);
122 }
123 
124 void
writeMessage(const char * pMsg)125 BufferedLogHandler::writeMessage(const char* pMsg)
126 {
127   // add message length to m_log_fixedpart and the message to m_log_varpart
128   size_t pMsg_len = strlen(pMsg);
129 
130   m_log_fixedpart.varpart_length[1] = pMsg_len;
131   memcpy(m_log_varpart + m_log_fixedpart.varpart_length[0], pMsg, pMsg_len);
132 }
133 
134 void
writeFooter()135 BufferedLogHandler::writeFooter()
136 {
137   // add the LogHandler::append() parameters to the log buffer
138   /**
139    * LogBuffer contents:
140    * ([log-fixed-part] [log-var-part])*
141    */
142   size_t total_log_size = sizeof(LogMessageFixedPart) +
143       m_log_fixedpart.varpart_length[0] +
144       m_log_fixedpart.varpart_length[1];
145 
146   memcpy(m_to_append, &m_log_fixedpart, sizeof(LogMessageFixedPart));
147   memcpy(m_to_append + sizeof(LogMessageFixedPart), m_log_varpart,
148          m_log_fixedpart.varpart_length[0] +
149          m_log_fixedpart.varpart_length[1]);
150 
151   m_logbuf->append((void*)&m_to_append, total_log_size);
152 }
153 
154 bool
isStopSet()155 BufferedLogHandler::isStopSet()
156 {
157   return m_stop_logging;
158 }
159 
160 bool
setParam(const BaseString & param,const BaseString & value)161 BufferedLogHandler::setParam(const BaseString &param, const BaseString &value)
162 {
163   return true;
164 }
165 
166 bool
writeToDestLogHandler()167 BufferedLogHandler::writeToDestLogHandler()
168 {
169   char category[LogHandler::MAX_HEADER_LENGTH + 1];
170   char msg[MAX_LOG_MESSAGE_SIZE + 1];
171   LogMessageFixedPart log_fixed_part;
172 
173   if (m_logbuf->get((char*)&log_fixed_part, sizeof(LogMessageFixedPart)) != 0)
174   {
175     assert(log_fixed_part.varpart_length[0] <= LogHandler::MAX_HEADER_LENGTH);
176     assert(log_fixed_part.varpart_length[1] <= MAX_LOG_MESSAGE_SIZE);
177     m_logbuf->get(category, log_fixed_part.varpart_length[0]);
178     m_logbuf->get(msg, log_fixed_part.varpart_length[1]);
179     category[log_fixed_part.varpart_length[0]] = '\0';
180     msg[log_fixed_part.varpart_length[1]] = '\0';
181 
182     m_dest_loghandler->append(category, log_fixed_part.level, msg,
183                             log_fixed_part.log_timestamp);
184     return true;
185   }
186   return false;
187 }
188 
189 void
writeLostMsgDestLogHandler()190 BufferedLogHandler::writeLostMsgDestLogHandler()
191 {
192   size_t lost_count = m_logbuf->getLostCount();
193   char category[LogHandler::MAX_HEADER_LENGTH + 1];
194   char msg[MAX_LOG_MESSAGE_SIZE + 1];
195 
196   if (lost_count)
197   {
198     strcpy(category, "MgmtSrvr");
199     Logger::LoggerLevel level = Logger::LL_INFO;
200     BaseString::snprintf(msg, MAX_LOG_MESSAGE_SIZE,
201                          "*** %lu MESSAGES LOST ***",
202                          (unsigned long)lost_count);
203     time_t now = ::time((time_t*)NULL);
204     m_dest_loghandler->append(category, level, msg, now);
205   }
206 }
207 
208 size_t
getSizeOfLostMsg(size_t lost_bytes,size_t lost_msgs)209 MessageStreamLostMsgHandler::getSizeOfLostMsg(size_t lost_bytes, size_t lost_msgs)
210 {
211   size_t lost_msg_len = sizeof(BufferedLogHandler::LogMessageFixedPart) +
212       snprintf(NULL, 0, m_lost_msg_fmt, lost_msgs) +
213       strlen(m_category);
214   return lost_msg_len;
215 }
216 
217 bool
writeLostMsg(char * buf,size_t buf_size,size_t lost_bytes,size_t lost_msgs)218 MessageStreamLostMsgHandler::writeLostMsg(char* buf, size_t buf_size, size_t lost_bytes, size_t lost_msgs)
219 {
220   BufferedLogHandler::LogMessageFixedPart lost_message_fixedpart;
221   lost_message_fixedpart.level = Logger::LL_DEBUG;
222   lost_message_fixedpart.log_timestamp = time((time_t*)NULL);
223 
224   lost_message_fixedpart.varpart_length[0] = strlen(m_category);
225   lost_message_fixedpart.varpart_length[1] =
226       snprintf(NULL, 0, m_lost_msg_fmt, lost_msgs);
227 
228   const size_t sz_fixedpart = sizeof(lost_message_fixedpart);
229   memcpy(buf, &lost_message_fixedpart, sz_fixedpart);
230   memcpy(buf + sz_fixedpart, m_category, strlen(m_category));
231   snprintf(buf + sz_fixedpart + strlen(m_category),
232                       lost_message_fixedpart.varpart_length[1],
233                       m_lost_msg_fmt, lost_msgs);
234   return true;
235 }
236