1 /* Copyright (c) 2014 Percona LLC and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
16 
17 #include "buffer.h"
18 
19 #include <my_pthread.h>
20 #include <my_sys.h>
21 #include "audit_log.h"
22 #include <my_atomic.h>
23 
24 struct audit_log_buffer {
25   char *buf;
26   size_t size;
27   size_t write_pos;
28   size_t flush_pos;
29   pthread_t flush_worker_thread;
30   int stop;
31   int drop_if_full;
32   void *write_func_data;
33   audit_log_write_func write_func;
34   mysql_mutex_t mutex;
35   mysql_cond_t flushed_cond;
36   mysql_cond_t written_cond;
37   log_record_state_t state;
38 };
39 
40 #if defined(HAVE_PSI_INTERFACE)
41 /* These belong to the service initialization */
42 static PSI_mutex_key key_log_mutex;
43 static PSI_mutex_info mutex_key_list[]=
44 {{ &key_log_mutex, "audit_log_buffer::mutex", PSI_FLAG_GLOBAL}};
45 
46 static PSI_cond_key key_log_written_cond, key_log_flushed_cond;
47 static PSI_cond_info cond_key_list[]=
48 {{ &key_log_written_cond, "audit_log_buffer::written_cond", PSI_FLAG_GLOBAL },
49  { &key_log_flushed_cond, "audit_log_buffer::flushed_cond", PSI_FLAG_GLOBAL }};
50 
51 #endif
52 
53 #ifndef min
54 #define min(a,b) (((a)<(b))?(a):(b))
55 #endif
56 
57 
58 static
audit_log_flush(audit_log_buffer_t * log)59 void audit_log_flush(audit_log_buffer_t *log)
60 {
61   mysql_mutex_lock(&log->mutex);
62   while (log->flush_pos == log->write_pos)
63   {
64     struct timespec abstime;
65     if (log->stop)
66     {
67       mysql_mutex_unlock(&log->mutex);
68       return;
69     }
70     set_timespec(abstime, 1);
71     mysql_cond_timedwait(&log->written_cond, &log->mutex, &abstime);
72   }
73 
74   if (log->flush_pos >= log->write_pos % log->size)
75   {
76     log->state= LOG_RECORD_INCOMPLETE;
77     mysql_mutex_unlock(&log->mutex);
78     log->write_func(log->write_func_data,
79                     log->buf + log->flush_pos,
80                     log->size - log->flush_pos,
81                     LOG_RECORD_INCOMPLETE);
82     mysql_mutex_lock(&log->mutex);
83     log->flush_pos= 0;
84     log->write_pos%= log->size;
85   }
86   else
87   {
88     size_t flushlen= log->write_pos - log->flush_pos;
89     mysql_mutex_unlock(&log->mutex);
90     log->write_func(log->write_func_data,
91                     log->buf + log->flush_pos, flushlen,
92                     LOG_RECORD_COMPLETE);
93     mysql_mutex_lock(&log->mutex);
94     log->flush_pos+= flushlen;
95     log->state= LOG_RECORD_COMPLETE;
96   }
97   DBUG_ASSERT(log->write_pos >= log->flush_pos);
98   mysql_cond_broadcast(&log->flushed_cond);
99   mysql_mutex_unlock(&log->mutex);
100 }
101 
102 
103 static
audit_log_flush_worker(void * arg)104 void *audit_log_flush_worker(void *arg)
105 {
106   audit_log_buffer_t *log= (audit_log_buffer_t*) arg;
107 
108   my_thread_init();
109   while (!(log->stop && log->flush_pos == log->write_pos))
110   {
111     audit_log_flush(log);
112   }
113   my_thread_end();
114 
115   return NULL;
116 }
117 
118 
audit_log_buffer_init(size_t size,int drop_if_full,audit_log_write_func write_func,void * data)119 audit_log_buffer_t *audit_log_buffer_init(size_t size, int drop_if_full,
120                                  audit_log_write_func write_func, void *data)
121 {
122   audit_log_buffer_t *log= (audit_log_buffer_t*)
123                                  calloc(sizeof(audit_log_buffer_t) + size, 1);
124 
125 #ifdef HAVE_PSI_INTERFACE
126   mysql_mutex_register(AUDIT_LOG_PSI_CATEGORY,
127                        mutex_key_list, array_elements(mutex_key_list));
128   mysql_cond_register(AUDIT_LOG_PSI_CATEGORY,
129                       cond_key_list, array_elements(cond_key_list));
130 #endif /* HAVE_PSI_INTERFACE */
131 
132   if (log != NULL)
133   {
134     log->buf= ((char*) log + sizeof(audit_log_buffer_t));
135     log->drop_if_full= drop_if_full;
136     log->write_func= write_func;
137     log->write_func_data= data;
138     log->size= size;
139     log->state= LOG_RECORD_COMPLETE;
140 
141     mysql_mutex_init(key_log_mutex, &log->mutex, MY_MUTEX_INIT_FAST);
142     mysql_cond_init(key_log_flushed_cond, &log->flushed_cond, NULL);
143     mysql_cond_init(key_log_written_cond, &log->written_cond, NULL);
144     pthread_create(&log->flush_worker_thread, NULL,
145                             audit_log_flush_worker, log);
146 
147   }
148 
149   return log;
150 }
151 
152 
audit_log_buffer_shutdown(audit_log_buffer_t * log)153 void audit_log_buffer_shutdown(audit_log_buffer_t *log)
154 {
155   log->stop= TRUE;
156 
157   pthread_join(log->flush_worker_thread, NULL);
158   mysql_cond_destroy(&log->flushed_cond);
159   mysql_cond_destroy(&log->written_cond);
160   mysql_mutex_destroy(&log->mutex);
161 
162   free(log);
163 }
164 
165 
audit_log_buffer_pause(audit_log_buffer_t * log)166 void audit_log_buffer_pause(audit_log_buffer_t *log)
167 {
168   mysql_mutex_lock(&log->mutex);
169   while (log->state == LOG_RECORD_INCOMPLETE)
170   {
171     mysql_cond_wait(&log->flushed_cond, &log->mutex);
172   }
173 }
174 
175 
audit_log_buffer_resume(audit_log_buffer_t * log)176 void audit_log_buffer_resume(audit_log_buffer_t *log)
177 {
178   mysql_mutex_unlock(&log->mutex);
179 }
180 
181 
audit_log_buffer_write(audit_log_buffer_t * log,const char * buf,size_t len)182 int audit_log_buffer_write(audit_log_buffer_t *log, const char *buf, size_t len)
183 {
184   if (len > log->size)
185   {
186     if (!log->drop_if_full)
187     {
188       /* pause flushing thread and write out one record bypassing the buffer */
189       audit_log_buffer_pause(log);
190       log->write_func(log->write_func_data, buf, len, LOG_RECORD_COMPLETE);
191       audit_log_buffer_resume(log);
192     }
193     my_atomic_add64(&audit_log_buffer_size_overflow, (int64)1);
194     return(0);
195   }
196 
197   mysql_mutex_lock(&log->mutex);
198 loop:
199   if (log->write_pos + len <= log->flush_pos + log->size)
200   {
201     size_t wrlen= min(len, log->size -
202                               (log->write_pos % log->size));
203     memcpy(log->buf + (log->write_pos % log->size), buf, wrlen);
204     if (wrlen < len)
205       memcpy(log->buf, buf + wrlen, len - wrlen);
206     log->write_pos= log->write_pos + len;
207     DBUG_ASSERT(log->write_pos >= log->flush_pos);
208   }
209   else
210   {
211     if (!log->drop_if_full)
212     {
213       mysql_cond_wait(&log->flushed_cond, &log->mutex);
214       goto loop;
215     }
216   }
217   if (log->write_pos > log->flush_pos + log->size / 2)
218   {
219     mysql_cond_signal(&log->written_cond);
220   }
221   mysql_mutex_unlock(&log->mutex);
222 
223   return(0);
224 }
225