1 /* Copyright (C) 2013 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15
16 #include "mariadb.h"
17 #include "mysql/service_wsrep.h"
18 #include "wsrep_binlog.h"
19 #include "wsrep_priv.h"
20 #include "log.h"
21 #include "slave.h"
22 #include "log_event.h"
23 #include "wsrep_applier.h"
24
25 #include "transaction.h"
26
27 extern handlerton *binlog_hton;
28 /*
29 Write the contents of a cache to a memory buffer.
30
31 This function quite the same as MYSQL_BIN_LOG::write_cache(),
32 with the exception that here we write in buffer instead of log file.
33 */
wsrep_write_cache_buf(IO_CACHE * cache,uchar ** buf,size_t * buf_len)34 int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
35 {
36 *buf= NULL;
37 *buf_len= 0;
38 my_off_t const saved_pos(my_b_tell(cache));
39 DBUG_ENTER("wsrep_write_cache_buf");
40
41 if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
42 {
43 WSREP_ERROR("failed to initialize io-cache");
44 DBUG_RETURN(ER_ERROR_ON_WRITE);
45 }
46
47 uint length= my_b_bytes_in_cache(cache);
48 if (unlikely(0 == length)) length= my_b_fill(cache);
49
50 size_t total_length= 0;
51
52 if (likely(length > 0)) do
53 {
54 total_length += length;
55 /*
56 Bail out if buffer grows too large.
57 A temporary fix to avoid allocating indefinitely large buffer,
58 not a real limit on a writeset size which includes other things
59 like header and keys.
60 */
61 if (total_length > wsrep_max_ws_size)
62 {
63 WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
64 wsrep_max_ws_size, total_length);
65 goto error;
66 }
67 uchar* tmp= (uchar *)my_realloc(*buf, total_length,
68 MYF(MY_ALLOW_ZERO_PTR));
69 if (!tmp)
70 {
71 WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
72 *buf_len, length);
73 goto error;
74 }
75 *buf= tmp;
76
77 memcpy(*buf + *buf_len, cache->read_pos, length);
78 *buf_len= total_length;
79
80 if (cache->file < 0)
81 {
82 cache->read_pos= cache->read_end;
83 break;
84 }
85 } while ((length= my_b_fill(cache)));
86
87 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
88 {
89 WSREP_WARN("failed to initialize io-cache");
90 goto cleanup;
91 }
92
93 DBUG_RETURN(0);
94
95 error:
96 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
97 {
98 WSREP_WARN("failed to initialize io-cache");
99 }
100 cleanup:
101 my_free(*buf);
102 *buf= NULL;
103 *buf_len= 0;
104 DBUG_RETURN(ER_ERROR_ON_WRITE);
105 }
106
107 #define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
108 * many transactions would fit in there
109 * so there is no need to reach for the heap */
110
111 /*
112 Write the contents of a cache to wsrep provider.
113
114 This function quite the same as MYSQL_BIN_LOG::write_cache(),
115 with the exception that here we write in buffer instead of log file.
116
117 This version uses incremental data appending as it reads it from cache.
118 */
wsrep_write_cache_inc(THD * const thd,IO_CACHE * const cache,size_t * const len)119 static int wsrep_write_cache_inc(THD* const thd,
120 IO_CACHE* const cache,
121 size_t* const len)
122 {
123 DBUG_ENTER("wsrep_write_cache_inc");
124 my_off_t const saved_pos(my_b_tell(cache));
125
126 if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
127 {
128 WSREP_ERROR("failed to initialize io-cache");
129 DBUG_RETURN(1);;
130 }
131
132 int ret= 0;
133 size_t total_length(0);
134
135 uint length(my_b_bytes_in_cache(cache));
136 if (unlikely(0 == length)) length= my_b_fill(cache);
137
138 if (likely(length > 0))
139 {
140 do
141 {
142 total_length += length;
143 /* bail out if buffer grows too large
144 not a real limit on a writeset size which includes other things
145 like header and keys.
146 */
147 if (unlikely(total_length > wsrep_max_ws_size))
148 {
149 WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
150 wsrep_max_ws_size, total_length);
151 ret= 1;
152 goto cleanup;
153 }
154 if (thd->wsrep_cs().append_data(wsrep::const_buffer(cache->read_pos, length)))
155 goto cleanup;
156 cache->read_pos= cache->read_end;
157 } while ((cache->file >= 0) && (length= my_b_fill(cache)));
158 }
159 if (ret == 0)
160 {
161 assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
162 }
163
164 cleanup:
165 *len= total_length;
166 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
167 {
168 WSREP_ERROR("failed to reinitialize io-cache");
169 }
170 DBUG_RETURN(ret);
171 }
172
173 /*
174 Write the contents of a cache to wsrep provider.
175
176 This function quite the same as MYSQL_BIN_LOG::write_cache(),
177 with the exception that here we write in buffer instead of log file.
178 */
wsrep_write_cache(THD * const thd,IO_CACHE * const cache,size_t * const len)179 int wsrep_write_cache(THD* const thd,
180 IO_CACHE* const cache,
181 size_t* const len)
182 {
183 return wsrep_write_cache_inc(thd, cache, len);
184 }
185
wsrep_dump_rbr_buf(THD * thd,const void * rbr_buf,size_t buf_len)186 void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
187 {
188 int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld.log",
189 wsrep_data_home_dir, (longlong) thd->thread_id,
190 (longlong) wsrep_thd_trx_seqno(thd));
191 if (len < 0)
192 {
193 WSREP_ERROR("snprintf error: %d, skipping dump.", len);
194 return;
195 }
196 /*
197 len doesn't count the \0 end-of-string. Use len+1 below
198 to alloc and pass as an argument to snprintf.
199 */
200
201 char *filename= (char *)malloc(len+1);
202 int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld.log",
203 wsrep_data_home_dir, (longlong) thd->thread_id,
204 (long long)wsrep_thd_trx_seqno(thd));
205
206 if (len > len1)
207 {
208 WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
209 free(filename);
210 return;
211 }
212
213 FILE *of= fopen(filename, "wb");
214
215 if (of)
216 {
217 if (fwrite(rbr_buf, buf_len, 1, of) == 0)
218 WSREP_ERROR("Failed to write buffer of length %llu to '%s'",
219 (unsigned long long)buf_len, filename);
220
221 fclose(of);
222 }
223 else
224 {
225 WSREP_ERROR("Failed to open file '%s': %d (%s)",
226 filename, errno, strerror(errno));
227 }
228 free(filename);
229 }
230
231 /* Dump replication buffer along with header to a file. */
wsrep_dump_rbr_buf_with_header(THD * thd,const void * rbr_buf,size_t buf_len)232 void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
233 size_t buf_len)
234 {
235 DBUG_ENTER("wsrep_dump_rbr_buf_with_header");
236
237 File file;
238 IO_CACHE cache;
239 Log_event_writer writer(&cache, 0);
240 Format_description_log_event *ev= 0;
241
242 longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd);
243 int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld_v2.log",
244 wsrep_data_home_dir, (longlong)thd->thread_id,
245 thd_trx_seqno);
246 /*
247 len doesn't count the \0 end-of-string. Use len+1 below
248 to alloc and pass as an argument to snprintf.
249 */
250 char *filename;
251 if (len < 0 || !(filename= (char*)malloc(len+1)))
252 {
253 WSREP_ERROR("snprintf error: %d, skipping dump.", len);
254 DBUG_VOID_RETURN;
255 }
256
257 int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld_v2.log",
258 wsrep_data_home_dir, (longlong) thd->thread_id,
259 thd_trx_seqno);
260
261 if (len > len1)
262 {
263 WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
264 free(filename);
265 DBUG_VOID_RETURN;
266 }
267
268 if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
269 O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
270 {
271 WSREP_ERROR("Failed to open file '%s' : %d (%s)",
272 filename, errno, strerror(errno));
273 goto cleanup1;
274 }
275
276 if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
277 {
278 goto cleanup2;
279 }
280
281 if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
282 {
283 goto cleanup2;
284 }
285
286 /*
287 Instantiate an FDLE object for non-wsrep threads (to be written
288 to the dump file).
289 */
290 ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
291 (new Format_description_log_event(4));
292
293 if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
294 flush_io_cache(&cache))
295 {
296 WSREP_ERROR("Failed to write to '%s'.", filename);
297 goto cleanup2;
298 }
299
300 cleanup2:
301 end_io_cache(&cache);
302
303 cleanup1:
304 free(filename);
305 mysql_file_close(file, MYF(MY_WME));
306
307 if (!thd->wsrep_applier) delete ev;
308
309 DBUG_VOID_RETURN;
310 }
311
wsrep_write_skip_event(THD * thd)312 int wsrep_write_skip_event(THD* thd)
313 {
314 DBUG_ENTER("wsrep_write_skip_event");
315 Ignorable_log_event skip_event(thd);
316 int ret= mysql_bin_log.write_event(&skip_event);
317 if (ret)
318 {
319 WSREP_WARN("wsrep_write_skip_event: write to binlog failed: %d", ret);
320 }
321 if (!ret && (ret= trans_commit_stmt(thd)))
322 {
323 WSREP_WARN("wsrep_write_skip_event: statt commit failed");
324 }
325 DBUG_RETURN(ret);
326 }
327
wsrep_write_dummy_event_low(THD * thd,const char * msg)328 int wsrep_write_dummy_event_low(THD *thd, const char *msg)
329 {
330 ::abort();
331 return 0;
332 }
333
wsrep_write_dummy_event(THD * orig_thd,const char * msg)334 int wsrep_write_dummy_event(THD *orig_thd, const char *msg)
335 {
336 return 0;
337 }
338
wsrep_commit_will_write_binlog(THD * thd)339 bool wsrep_commit_will_write_binlog(THD *thd)
340 {
341 return (!wsrep_emulate_bin_log && /* binlog enabled*/
342 (wsrep_thd_is_local(thd) || /* local thd*/
343 (thd->wsrep_applier_service && /* applier and log-slave-updates */
344 opt_log_slave_updates)));
345 }
346
347 /*
348 The last THD/commit_for_wait registered for group commit.
349 */
350 static wait_for_commit *commit_order_tail= NULL;
351
wsrep_register_for_group_commit(THD * thd)352 void wsrep_register_for_group_commit(THD *thd)
353 {
354 DBUG_ENTER("wsrep_register_for_group_commit");
355 if (wsrep_emulate_bin_log)
356 {
357 /* Binlog is off, no need to maintain group commit queue */
358 DBUG_VOID_RETURN;
359 }
360
361 DBUG_ASSERT(thd->wsrep_trx().ordered());
362
363 wait_for_commit *wfc= thd->wait_for_commit_ptr= &thd->wsrep_wfc;
364
365 mysql_mutex_lock(&LOCK_wsrep_group_commit);
366 if (commit_order_tail)
367 {
368 wfc->register_wait_for_prior_commit(commit_order_tail);
369 }
370 commit_order_tail= thd->wait_for_commit_ptr;
371 mysql_mutex_unlock(&LOCK_wsrep_group_commit);
372
373 /*
374 Now we have queued for group commit. If the commit will go
375 through TC log_and_order(), the commit ordering is done
376 by TC group commit. Otherwise the wait for prior
377 commits to complete is done in ha_commit_one_phase().
378 */
379 DBUG_VOID_RETURN;
380 }
381
wsrep_unregister_from_group_commit(THD * thd)382 void wsrep_unregister_from_group_commit(THD *thd)
383 {
384 DBUG_ASSERT(thd->wsrep_trx().ordered());
385 wait_for_commit *wfc= thd->wait_for_commit_ptr;
386
387 if (wfc)
388 {
389 mysql_mutex_lock(&LOCK_wsrep_group_commit);
390 wfc->unregister_wait_for_prior_commit();
391 thd->wakeup_subsequent_commits(0);
392
393 /* The last one queued for group commit has completed commit, it is
394 safe to set tail to NULL. */
395 if (wfc == commit_order_tail)
396 commit_order_tail= NULL;
397 mysql_mutex_unlock(&LOCK_wsrep_group_commit);
398 thd->wait_for_commit_ptr= NULL;
399 }
400 }
401