1 /* Copyright (c) 2014 Percona LLC and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16
17 #include "buffer.h"
18
19 #include <my_pthread.h>
20 #include <my_sys.h>
21 #include "audit_log.h"
22 #include <my_atomic.h>
23
24 struct audit_log_buffer {
25 char *buf;
26 size_t size;
27 size_t write_pos;
28 size_t flush_pos;
29 pthread_t flush_worker_thread;
30 int stop;
31 int drop_if_full;
32 void *write_func_data;
33 audit_log_write_func write_func;
34 mysql_mutex_t mutex;
35 mysql_cond_t flushed_cond;
36 mysql_cond_t written_cond;
37 log_record_state_t state;
38 };
39
40 #if defined(HAVE_PSI_INTERFACE)
41 /* These belong to the service initialization */
42 static PSI_mutex_key key_log_mutex;
43 static PSI_mutex_info mutex_key_list[]=
44 {{ &key_log_mutex, "audit_log_buffer::mutex", PSI_FLAG_GLOBAL}};
45
46 static PSI_cond_key key_log_written_cond, key_log_flushed_cond;
47 static PSI_cond_info cond_key_list[]=
48 {{ &key_log_written_cond, "audit_log_buffer::written_cond", PSI_FLAG_GLOBAL },
49 { &key_log_flushed_cond, "audit_log_buffer::flushed_cond", PSI_FLAG_GLOBAL }};
50
51 #endif
52
53 #ifndef min
54 #define min(a,b) (((a)<(b))?(a):(b))
55 #endif
56
57
58 static
audit_log_flush(audit_log_buffer_t * log)59 void audit_log_flush(audit_log_buffer_t *log)
60 {
61 mysql_mutex_lock(&log->mutex);
62 while (log->flush_pos == log->write_pos)
63 {
64 struct timespec abstime;
65 if (log->stop)
66 {
67 mysql_mutex_unlock(&log->mutex);
68 return;
69 }
70 set_timespec(abstime, 1);
71 mysql_cond_timedwait(&log->written_cond, &log->mutex, &abstime);
72 }
73
74 if (log->flush_pos >= log->write_pos % log->size)
75 {
76 log->state= LOG_RECORD_INCOMPLETE;
77 mysql_mutex_unlock(&log->mutex);
78 log->write_func(log->write_func_data,
79 log->buf + log->flush_pos,
80 log->size - log->flush_pos,
81 LOG_RECORD_INCOMPLETE);
82 mysql_mutex_lock(&log->mutex);
83 log->flush_pos= 0;
84 log->write_pos%= log->size;
85 }
86 else
87 {
88 size_t flushlen= log->write_pos - log->flush_pos;
89 mysql_mutex_unlock(&log->mutex);
90 log->write_func(log->write_func_data,
91 log->buf + log->flush_pos, flushlen,
92 LOG_RECORD_COMPLETE);
93 mysql_mutex_lock(&log->mutex);
94 log->flush_pos+= flushlen;
95 log->state= LOG_RECORD_COMPLETE;
96 }
97 DBUG_ASSERT(log->write_pos >= log->flush_pos);
98 mysql_cond_broadcast(&log->flushed_cond);
99 mysql_mutex_unlock(&log->mutex);
100 }
101
102
103 static
audit_log_flush_worker(void * arg)104 void *audit_log_flush_worker(void *arg)
105 {
106 audit_log_buffer_t *log= (audit_log_buffer_t*) arg;
107
108 my_thread_init();
109 while (!(log->stop && log->flush_pos == log->write_pos))
110 {
111 audit_log_flush(log);
112 }
113 my_thread_end();
114
115 return NULL;
116 }
117
118
audit_log_buffer_init(size_t size,int drop_if_full,audit_log_write_func write_func,void * data)119 audit_log_buffer_t *audit_log_buffer_init(size_t size, int drop_if_full,
120 audit_log_write_func write_func, void *data)
121 {
122 audit_log_buffer_t *log= (audit_log_buffer_t*)
123 calloc(sizeof(audit_log_buffer_t) + size, 1);
124
125 #ifdef HAVE_PSI_INTERFACE
126 mysql_mutex_register(AUDIT_LOG_PSI_CATEGORY,
127 mutex_key_list, array_elements(mutex_key_list));
128 mysql_cond_register(AUDIT_LOG_PSI_CATEGORY,
129 cond_key_list, array_elements(cond_key_list));
130 #endif /* HAVE_PSI_INTERFACE */
131
132 if (log != NULL)
133 {
134 log->buf= ((char*) log + sizeof(audit_log_buffer_t));
135 log->drop_if_full= drop_if_full;
136 log->write_func= write_func;
137 log->write_func_data= data;
138 log->size= size;
139 log->state= LOG_RECORD_COMPLETE;
140
141 mysql_mutex_init(key_log_mutex, &log->mutex, MY_MUTEX_INIT_FAST);
142 mysql_cond_init(key_log_flushed_cond, &log->flushed_cond, NULL);
143 mysql_cond_init(key_log_written_cond, &log->written_cond, NULL);
144 pthread_create(&log->flush_worker_thread, NULL,
145 audit_log_flush_worker, log);
146
147 }
148
149 return log;
150 }
151
152
audit_log_buffer_shutdown(audit_log_buffer_t * log)153 void audit_log_buffer_shutdown(audit_log_buffer_t *log)
154 {
155 log->stop= TRUE;
156
157 pthread_join(log->flush_worker_thread, NULL);
158 mysql_cond_destroy(&log->flushed_cond);
159 mysql_cond_destroy(&log->written_cond);
160 mysql_mutex_destroy(&log->mutex);
161
162 free(log);
163 }
164
165
audit_log_buffer_pause(audit_log_buffer_t * log)166 void audit_log_buffer_pause(audit_log_buffer_t *log)
167 {
168 mysql_mutex_lock(&log->mutex);
169 while (log->state == LOG_RECORD_INCOMPLETE)
170 {
171 mysql_cond_wait(&log->flushed_cond, &log->mutex);
172 }
173 }
174
175
audit_log_buffer_resume(audit_log_buffer_t * log)176 void audit_log_buffer_resume(audit_log_buffer_t *log)
177 {
178 mysql_mutex_unlock(&log->mutex);
179 }
180
181
audit_log_buffer_write(audit_log_buffer_t * log,const char * buf,size_t len)182 int audit_log_buffer_write(audit_log_buffer_t *log, const char *buf, size_t len)
183 {
184 if (len > log->size)
185 {
186 if (!log->drop_if_full)
187 {
188 /* pause flushing thread and write out one record bypassing the buffer */
189 audit_log_buffer_pause(log);
190 log->write_func(log->write_func_data, buf, len, LOG_RECORD_COMPLETE);
191 audit_log_buffer_resume(log);
192 }
193 my_atomic_add64(&audit_log_buffer_size_overflow, (int64)1);
194 return(0);
195 }
196
197 mysql_mutex_lock(&log->mutex);
198 loop:
199 if (log->write_pos + len <= log->flush_pos + log->size)
200 {
201 size_t wrlen= min(len, log->size -
202 (log->write_pos % log->size));
203 memcpy(log->buf + (log->write_pos % log->size), buf, wrlen);
204 if (wrlen < len)
205 memcpy(log->buf, buf + wrlen, len - wrlen);
206 log->write_pos= log->write_pos + len;
207 DBUG_ASSERT(log->write_pos >= log->flush_pos);
208 }
209 else
210 {
211 if (!log->drop_if_full)
212 {
213 mysql_cond_wait(&log->flushed_cond, &log->mutex);
214 goto loop;
215 }
216 }
217 if (log->write_pos > log->flush_pos + log->size / 2)
218 {
219 mysql_cond_signal(&log->written_cond);
220 }
221 mysql_mutex_unlock(&log->mutex);
222
223 return(0);
224 }
225