1 /* Copyright (C) 2013 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License along
13    with this program; if not, write to the Free Software Foundation, Inc.,
14    51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
15 
16 #include "mariadb.h"
17 #include "mysql/service_wsrep.h"
18 #include "wsrep_binlog.h"
19 #include "wsrep_priv.h"
20 #include "log.h"
21 #include "slave.h"
22 #include "log_event.h"
23 #include "wsrep_applier.h"
24 
25 #include "transaction.h"
26 
27 extern handlerton *binlog_hton;
28 /*
29   Write the contents of a cache to a memory buffer.
30 
31   This function quite the same as MYSQL_BIN_LOG::write_cache(),
32   with the exception that here we write in buffer instead of log file.
33  */
wsrep_write_cache_buf(IO_CACHE * cache,uchar ** buf,size_t * buf_len)34 int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
35 {
36   *buf= NULL;
37   *buf_len= 0;
38   my_off_t const saved_pos(my_b_tell(cache));
39   DBUG_ENTER("wsrep_write_cache_buf");
40 
41   if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
42   {
43     WSREP_ERROR("failed to initialize io-cache");
44     DBUG_RETURN(ER_ERROR_ON_WRITE);
45   }
46 
47   uint length= my_b_bytes_in_cache(cache);
48   if (unlikely(0 == length)) length= my_b_fill(cache);
49 
50   size_t total_length= 0;
51 
52   if (likely(length > 0)) do
53   {
54       total_length += length;
55       /*
56         Bail out if buffer grows too large.
57         A temporary fix to avoid allocating indefinitely large buffer,
58         not a real limit on a writeset size which includes other things
59         like header and keys.
60       */
61       if (total_length > wsrep_max_ws_size)
62       {
63           WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
64                      wsrep_max_ws_size, total_length);
65           goto error;
66       }
67       uchar* tmp= (uchar *)my_realloc(*buf, total_length,
68                                        MYF(MY_ALLOW_ZERO_PTR));
69       if (!tmp)
70       {
71           WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
72                       *buf_len, length);
73           goto error;
74       }
75       *buf= tmp;
76 
77       memcpy(*buf + *buf_len, cache->read_pos, length);
78       *buf_len= total_length;
79 
80       if (cache->file < 0)
81       {
82         cache->read_pos= cache->read_end;
83         break;
84       }
85   } while ((length= my_b_fill(cache)));
86 
87   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
88   {
89     WSREP_WARN("failed to initialize io-cache");
90     goto cleanup;
91   }
92 
93   DBUG_RETURN(0);
94 
95 error:
96   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
97   {
98     WSREP_WARN("failed to initialize io-cache");
99   }
100 cleanup:
101   my_free(*buf);
102   *buf= NULL;
103   *buf_len= 0;
104   DBUG_RETURN(ER_ERROR_ON_WRITE);
105 }
106 
107 #define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
108                          * many transactions would fit in there
109                          * so there is no need to reach for the heap */
110 
111 /*
112   Write the contents of a cache to wsrep provider.
113 
114   This function quite the same as MYSQL_BIN_LOG::write_cache(),
115   with the exception that here we write in buffer instead of log file.
116 
117   This version uses incremental data appending as it reads it from cache.
118  */
wsrep_write_cache_inc(THD * const thd,IO_CACHE * const cache,size_t * const len)119 static int wsrep_write_cache_inc(THD*      const thd,
120                                  IO_CACHE* const cache,
121                                  size_t*   const len)
122 {
123   DBUG_ENTER("wsrep_write_cache_inc");
124   my_off_t const saved_pos(my_b_tell(cache));
125 
126   if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
127   {
128     WSREP_ERROR("failed to initialize io-cache");
129     DBUG_RETURN(1);;
130   }
131 
132   int ret= 0;
133   size_t total_length(0);
134 
135   uint length(my_b_bytes_in_cache(cache));
136   if (unlikely(0 == length)) length= my_b_fill(cache);
137 
138   if (likely(length > 0))
139   {
140     do
141     {
142       total_length += length;
143       /* bail out if buffer grows too large
144          not a real limit on a writeset size which includes other things
145          like header and keys.
146       */
147       if (unlikely(total_length > wsrep_max_ws_size))
148       {
149         WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
150                    wsrep_max_ws_size, total_length);
151         ret= 1;
152         goto cleanup;
153       }
154       if (thd->wsrep_cs().append_data(wsrep::const_buffer(cache->read_pos, length)))
155         goto cleanup;
156       cache->read_pos= cache->read_end;
157     } while ((cache->file >= 0) && (length= my_b_fill(cache)));
158   }
159   if (ret == 0)
160   {
161     assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
162   }
163 
164 cleanup:
165   *len= total_length;
166   if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
167   {
168     WSREP_ERROR("failed to reinitialize io-cache");
169   }
170   DBUG_RETURN(ret);
171 }
172 
173 /*
174   Write the contents of a cache to wsrep provider.
175 
176   This function quite the same as MYSQL_BIN_LOG::write_cache(),
177   with the exception that here we write in buffer instead of log file.
178  */
wsrep_write_cache(THD * const thd,IO_CACHE * const cache,size_t * const len)179 int wsrep_write_cache(THD*      const thd,
180                       IO_CACHE* const cache,
181                       size_t*   const len)
182 {
183   return wsrep_write_cache_inc(thd, cache, len);
184 }
185 
wsrep_dump_rbr_buf(THD * thd,const void * rbr_buf,size_t buf_len)186 void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
187 {
188   int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld.log",
189                     wsrep_data_home_dir, (longlong) thd->thread_id,
190                     (longlong) wsrep_thd_trx_seqno(thd));
191   if (len < 0)
192   {
193     WSREP_ERROR("snprintf error: %d, skipping dump.", len);
194     return;
195   }
196   /*
197     len doesn't count the \0 end-of-string. Use len+1 below
198     to alloc and pass as an argument to snprintf.
199   */
200 
201   char *filename= (char *)malloc(len+1);
202   int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld.log",
203                     wsrep_data_home_dir, (longlong) thd->thread_id,
204                     (long long)wsrep_thd_trx_seqno(thd));
205 
206   if (len > len1)
207   {
208     WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
209     free(filename);
210     return;
211   }
212 
213   FILE *of= fopen(filename, "wb");
214 
215   if (of)
216   {
217     if (fwrite(rbr_buf, buf_len, 1, of) == 0)
218        WSREP_ERROR("Failed to write buffer of length %llu to '%s'",
219                    (unsigned long long)buf_len, filename);
220 
221     fclose(of);
222   }
223   else
224   {
225     WSREP_ERROR("Failed to open file '%s': %d (%s)",
226                 filename, errno, strerror(errno));
227   }
228   free(filename);
229 }
230 
231 /* Dump replication buffer along with header to a file. */
wsrep_dump_rbr_buf_with_header(THD * thd,const void * rbr_buf,size_t buf_len)232 void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
233                                     size_t buf_len)
234 {
235   DBUG_ENTER("wsrep_dump_rbr_buf_with_header");
236 
237   File file;
238   IO_CACHE cache;
239   Log_event_writer writer(&cache, 0);
240   Format_description_log_event *ev= 0;
241 
242   longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd);
243   int len= snprintf(NULL, 0, "%s/GRA_%lld_%lld_v2.log",
244                     wsrep_data_home_dir, (longlong)thd->thread_id,
245                     thd_trx_seqno);
246   /*
247     len doesn't count the \0 end-of-string. Use len+1 below
248     to alloc and pass as an argument to snprintf.
249   */
250   char *filename;
251   if (len < 0 || !(filename= (char*)malloc(len+1)))
252   {
253     WSREP_ERROR("snprintf error: %d, skipping dump.", len);
254     DBUG_VOID_RETURN;
255   }
256 
257   int len1= snprintf(filename, len+1, "%s/GRA_%lld_%lld_v2.log",
258                      wsrep_data_home_dir, (longlong) thd->thread_id,
259                      thd_trx_seqno);
260 
261   if (len > len1)
262   {
263     WSREP_ERROR("RBR dump path truncated: %d, skipping dump.", len);
264     free(filename);
265     DBUG_VOID_RETURN;
266   }
267 
268   if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
269                              O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
270   {
271     WSREP_ERROR("Failed to open file '%s' : %d (%s)",
272                 filename, errno, strerror(errno));
273     goto cleanup1;
274   }
275 
276   if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
277   {
278     goto cleanup2;
279   }
280 
281   if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
282   {
283     goto cleanup2;
284   }
285 
286   /*
287     Instantiate an FDLE object for non-wsrep threads (to be written
288     to the dump file).
289   */
290   ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
291     (new Format_description_log_event(4));
292 
293   if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
294       flush_io_cache(&cache))
295   {
296     WSREP_ERROR("Failed to write to '%s'.", filename);
297     goto cleanup2;
298   }
299 
300 cleanup2:
301   end_io_cache(&cache);
302 
303 cleanup1:
304   free(filename);
305   mysql_file_close(file, MYF(MY_WME));
306 
307   if (!thd->wsrep_applier) delete ev;
308 
309   DBUG_VOID_RETURN;
310 }
311 
wsrep_write_skip_event(THD * thd)312 int wsrep_write_skip_event(THD* thd)
313 {
314   DBUG_ENTER("wsrep_write_skip_event");
315   Ignorable_log_event skip_event(thd);
316   int ret= mysql_bin_log.write_event(&skip_event);
317   if (ret)
318   {
319     WSREP_WARN("wsrep_write_skip_event: write to binlog failed: %d", ret);
320   }
321   if (!ret && (ret= trans_commit_stmt(thd)))
322   {
323     WSREP_WARN("wsrep_write_skip_event: statt commit failed");
324   }
325   DBUG_RETURN(ret);
326 }
327 
wsrep_write_dummy_event_low(THD * thd,const char * msg)328 int wsrep_write_dummy_event_low(THD *thd, const char *msg)
329 {
330   ::abort();
331   return 0;
332 }
333 
wsrep_write_dummy_event(THD * orig_thd,const char * msg)334 int wsrep_write_dummy_event(THD *orig_thd, const char *msg)
335 {
336   return 0;
337 }
338 
wsrep_commit_will_write_binlog(THD * thd)339 bool wsrep_commit_will_write_binlog(THD *thd)
340 {
341   return (!wsrep_emulate_bin_log && /* binlog enabled*/
342           (wsrep_thd_is_local(thd) || /* local thd*/
343            (thd->wsrep_applier_service && /* applier and log-slave-updates */
344             opt_log_slave_updates)));
345 }
346 
347 /*
348   The last THD/commit_for_wait registered for group commit.
349 */
350 static wait_for_commit *commit_order_tail= NULL;
351 
wsrep_register_for_group_commit(THD * thd)352 void wsrep_register_for_group_commit(THD *thd)
353 {
354   DBUG_ENTER("wsrep_register_for_group_commit");
355   if (wsrep_emulate_bin_log)
356   {
357     /* Binlog is off, no need to maintain group commit queue */
358     DBUG_VOID_RETURN;
359   }
360 
361   DBUG_ASSERT(thd->wsrep_trx().ordered());
362 
363   wait_for_commit *wfc= thd->wait_for_commit_ptr= &thd->wsrep_wfc;
364 
365   mysql_mutex_lock(&LOCK_wsrep_group_commit);
366   if (commit_order_tail)
367   {
368     wfc->register_wait_for_prior_commit(commit_order_tail);
369   }
370   commit_order_tail= thd->wait_for_commit_ptr;
371   mysql_mutex_unlock(&LOCK_wsrep_group_commit);
372 
373   /*
374     Now we have queued for group commit. If the commit will go
375     through TC log_and_order(), the commit ordering is done
376     by TC group commit. Otherwise the wait for prior
377     commits to complete is done in ha_commit_one_phase().
378   */
379   DBUG_VOID_RETURN;
380 }
381 
wsrep_unregister_from_group_commit(THD * thd)382 void wsrep_unregister_from_group_commit(THD *thd)
383 {
384   DBUG_ASSERT(thd->wsrep_trx().ordered());
385   wait_for_commit *wfc= thd->wait_for_commit_ptr;
386 
387   if (wfc)
388   {
389     mysql_mutex_lock(&LOCK_wsrep_group_commit);
390     wfc->unregister_wait_for_prior_commit();
391     thd->wakeup_subsequent_commits(0);
392 
393     /* The last one queued for group commit has completed commit, it is
394        safe to set tail to NULL. */
395     if (wfc == commit_order_tail)
396       commit_order_tail= NULL;
397     mysql_mutex_unlock(&LOCK_wsrep_group_commit);
398     thd->wait_for_commit_ptr= NULL;
399   }
400 }
401