1 /* Copyright (c) 2013, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software Foundation,
21    51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #include <string>
24 #include <vector>
25 
26 #include "observer_trans.h"
27 #include "plugin_log.h"
28 #include "plugin.h"
29 #include <mysql/service_rpl_transaction_ctx.h>
30 #include <mysql/service_rpl_transaction_write_set.h>
31 #include "sql_command_test.h"
32 #include "sql_service_interface.h"
33 #include "sql_service_command.h"
34 #include "base64.h"
35 
36 /*
37   Buffer to read the write_set value as a string.
38   Since we support up to 64 bits hashes, 8 bytes are enough to store the info.
39 */
40 #define BUFFER_READ_PKE 8
41 
42 /*
43   Map to store all open unused IO_CACHE.
44   Each ongoing transaction will have a busy cache, when the cache
45   is no more needed, it is added to this list for future use by
46   another transaction.
47 */
48 typedef std::list<IO_CACHE*> IO_CACHE_unused_list;
49 static IO_CACHE_unused_list io_cache_unused_list;
50 
51 /*
52   Read/write lock to protect map find operations against new cache inserts.
53 */
54 static Checkable_rwlock *io_cache_unused_list_lock= NULL;
55 
observer_trans_initialize()56 void observer_trans_initialize()
57 {
58   DBUG_ENTER("observer_trans_initialize");
59 
60   io_cache_unused_list_lock= new Checkable_rwlock(
61 #ifdef HAVE_PSI_INTERFACE
62     key_GR_RWLOCK_io_cache_unused_list
63 #endif /* HAVE_PSI_INTERFACE */
64   );
65 
66   DBUG_VOID_RETURN;
67 }
68 
observer_trans_terminate()69 void observer_trans_terminate()
70 {
71   DBUG_ENTER("observer_trans_terminate");
72 
73   delete io_cache_unused_list_lock;
74   io_cache_unused_list_lock= NULL;
75 
76   DBUG_VOID_RETURN;
77 }
78 
observer_trans_clear_io_cache_unused_list()79 void observer_trans_clear_io_cache_unused_list()
80 {
81   DBUG_ENTER("observer_trans_clear_io_cache_unused_list");
82   io_cache_unused_list_lock->wrlock();
83 
84   for (IO_CACHE_unused_list::iterator it= io_cache_unused_list.begin();
85        it != io_cache_unused_list.end();
86        ++it)
87   {
88     IO_CACHE *cache= *it;
89     close_cached_file(cache);
90     my_free(cache);
91   }
92 
93   io_cache_unused_list.clear();
94 
95   io_cache_unused_list_lock->unlock();
96   DBUG_VOID_RETURN;
97 }
98 
99 /*
100   Internal auxiliary functions signatures.
101 */
102 static bool reinit_cache(IO_CACHE *cache,
103                          enum cache_type type,
104                          my_off_t position);
105 
106 IO_CACHE* observer_trans_get_io_cache(my_thread_id thread_id,
107                                       ulonglong cache_size);
108 
109 void observer_trans_put_io_cache(IO_CACHE *cache);
110 
cleanup_transaction_write_set(Transaction_write_set * transaction_write_set)111 void cleanup_transaction_write_set(Transaction_write_set *transaction_write_set)
112 {
113   DBUG_ENTER("cleanup_transaction_write_set");
114   if (transaction_write_set != NULL)
115   {
116     my_free (transaction_write_set->write_set);
117     my_free (transaction_write_set);
118   }
119   DBUG_VOID_RETURN;
120 }
121 
add_write_set(Transaction_context_log_event * tcle,Transaction_write_set * set)122 int add_write_set(Transaction_context_log_event *tcle,
123                   Transaction_write_set *set)
124 {
125   DBUG_ENTER("add_write_set");
126   int iterator= set->write_set_size;
127   for (int i = 0; i < iterator; i++)
128   {
129     uchar buff[BUFFER_READ_PKE];
130     int8store(buff, set->write_set[i]);
131     uint64 const tmp_str_sz= base64_needed_encoded_length((uint64) BUFFER_READ_PKE);
132     char *write_set_value= (char *) my_malloc(PSI_NOT_INSTRUMENTED,
133                                               static_cast<size_t>(tmp_str_sz), MYF(MY_WME));
134     if (!write_set_value)
135     {
136       /* purecov: begin inspected */
137       log_message(MY_ERROR_LEVEL, "No memory to generate write identification hash");
138       DBUG_RETURN(1);
139       /* purecov: end */
140     }
141 
142     if (base64_encode(buff, (size_t) BUFFER_READ_PKE, write_set_value))
143     {
144       /* purecov: begin inspected */
145       log_message(MY_ERROR_LEVEL,
146                   "Base 64 encoding of the write identification hash failed");
147       DBUG_RETURN(1);
148       /* purecov: end */
149     }
150 
151     tcle->add_write_set(write_set_value);
152   }
153   DBUG_RETURN(0);
154 }
155 
156 /*
157   Transaction lifecycle events observers.
158 */
159 
group_replication_trans_before_dml(Trans_param * param,int & out)160 int group_replication_trans_before_dml(Trans_param *param, int& out)
161 {
162   DBUG_ENTER("group_replication_trans_before_dml");
163 
164   out= 0;
165 
166   //If group replication has not started, then moving along...
167   if (!plugin_is_group_replication_running())
168   {
169     DBUG_RETURN(0);
170   }
171 
172   /*
173    The first check to be made is if the session binlog is active
174    If it is not active, this query is not relevant for the plugin.
175    */
176   if(!param->trans_ctx_info.binlog_enabled)
177   {
178     DBUG_RETURN(0);
179   }
180 
181   /*
182    In runtime, check the global variables that can change.
183    */
184   if( (out+= (param->trans_ctx_info.binlog_format != BINLOG_FORMAT_ROW)) )
185   {
186     log_message(MY_ERROR_LEVEL, "Binlog format should be ROW for Group Replication");
187 
188     DBUG_RETURN(0);
189   }
190 
191   if( (out+= (param->trans_ctx_info.binlog_checksum_options !=
192                                                    binary_log::BINLOG_CHECKSUM_ALG_OFF)) )
193   {
194     log_message(MY_ERROR_LEVEL, "binlog_checksum should be NONE for Group Replication");
195 
196     DBUG_RETURN(0);
197   }
198 
199   if ((out+= (param->trans_ctx_info.transaction_write_set_extraction ==
200               HASH_ALGORITHM_OFF)))
201   {
202     /* purecov: begin inspected */
203     log_message(MY_ERROR_LEVEL,
204                 "A transaction_write_set_extraction algorithm "
205                 "should be selected when running Group Replication");
206     DBUG_RETURN(0);
207     /* purecov: end */
208   }
209 
210   if (local_member_info->has_enforces_update_everywhere_checks() &&
211       (out+= (param->trans_ctx_info.tx_isolation == ISO_SERIALIZABLE)))
212   {
213     log_message(MY_ERROR_LEVEL, "Transaction isolation level (tx_isolation) "
214                 "is set to SERIALIZABLE, which is not compatible with Group "
215                 "Replication");
216     DBUG_RETURN(0);
217   }
218   /*
219     Cycle through all involved tables to assess if they all
220     comply with the plugin runtime requirements. For now:
221     - The table must be from a transactional engine
222     - It must contain at least one primary key
223     - It should not contain 'ON DELETE/UPDATE CASCADE' referential action
224    */
225   for(uint table=0; out == 0 && table < param->number_of_tables; table++)
226   {
227     if (param->tables_info[table].db_type != DB_TYPE_INNODB)
228     {
229       log_message(MY_ERROR_LEVEL, "Table %s does not use the InnoDB storage "
230                                   "engine. This is not compatible with Group "
231                                   "Replication",
232                   param->tables_info[table].table_name);
233       out++;
234     }
235 
236     if(param->tables_info[table].number_of_primary_keys == 0)
237     {
238       log_message(MY_ERROR_LEVEL, "Table %s does not have any PRIMARY KEY. This is not compatible with Group Replication",
239                   param->tables_info[table].table_name);
240       out++;
241     }
242     if (local_member_info->has_enforces_update_everywhere_checks() &&
243         param->tables_info[table].has_cascade_foreign_key)
244     {
245       log_message(MY_ERROR_LEVEL, "Table %s has a foreign key with"
246                   " 'CASCADE' clause. This is not compatible with Group"
247                   " Replication", param->tables_info[table].table_name);
248       out++;
249     }
250   }
251 
252   DBUG_RETURN(0);
253 }
254 
group_replication_trans_before_commit(Trans_param * param)255 int group_replication_trans_before_commit(Trans_param *param)
256 {
257   DBUG_ENTER("group_replication_trans_before_commit");
258   int error= 0;
259   const int pre_wait_error= 1;
260   const int post_wait_error= 2;
261 
262   DBUG_EXECUTE_IF("group_replication_force_error_on_before_commit_listener",
263                   DBUG_RETURN(1););
264 
265   DBUG_EXECUTE_IF("group_replication_before_commit_hook_wait",
266                   {
267                     const char act[]= "now wait_for continue_commit";
268                     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
269                   });
270 
271   /*
272     If the originating id belongs to a thread in the plugin, the transaction
273     was already certified. Channel operations can deadlock against
274     plugin/applier thread stops so they must remain outside the plugin stop
275     lock below.
276   */
277   Replication_thread_api channel_interface;
278   if (GR_APPLIER_CHANNEL == param->rpl_channel_type) {
279     // If plugin is not initialized, there is nothing to do.
280     if (NULL == local_member_info) {
281       DBUG_RETURN(0);
282     }
283 
284     // If plugin is stopping, there is no point in update the statistics.
285     bool fail_to_lock= shared_plugin_stop_lock->try_grab_read_lock();
286     if (!fail_to_lock)
287     {
288       if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
289       {
290         applier_module->get_pipeline_stats_member_collector()
291             ->decrement_transactions_waiting_apply();
292         applier_module->get_pipeline_stats_member_collector()
293             ->increment_transactions_applied();
294       }
295       shared_plugin_stop_lock->release_read_lock();
296     }
297 
298     DBUG_RETURN(0);
299   }
300   if (GR_RECOVERY_CHANNEL == param->rpl_channel_type) {
301     DBUG_RETURN(0);
302   }
303 
304   if (shared_plugin_stop_lock->try_grab_read_lock()) {
305     /* If plugin is stopping, rollback the transaction immediatly. */
306     DBUG_RETURN(1);
307   }
308 
309   if (is_plugin_waiting_to_set_server_read_mode())
310   {
311     log_message(MY_ERROR_LEVEL,
312                 "Transaction cannot be executed while Group Replication is stopping.");
313     shared_plugin_stop_lock->release_read_lock();
314     DBUG_RETURN(1);
315   }
316 
317   /* If the plugin is not running, before commit should return success. */
318   if (!plugin_is_group_replication_running())
319   {
320     shared_plugin_stop_lock->release_read_lock();
321     DBUG_RETURN(0);
322   }
323 
324   assert(applier_module != NULL && recovery_module != NULL);
325   Group_member_info::Group_member_status member_status=
326       local_member_info->get_recovery_status();
327 
328   if (member_status == Group_member_info::MEMBER_IN_RECOVERY)
329   {
330     /* purecov: begin inspected */
331     log_message(MY_ERROR_LEVEL,
332                 "Transaction cannot be executed while Group Replication is recovering."
333                 " Try again when the server is ONLINE.");
334     shared_plugin_stop_lock->release_read_lock();
335     DBUG_RETURN(1);
336     /* purecov: end */
337   }
338 
339   if (member_status == Group_member_info::MEMBER_ERROR)
340   {
341     log_message(MY_ERROR_LEVEL,
342                 "Transaction cannot be executed while Group Replication is on ERROR state."
343                 " Check for errors and restart the plugin");
344     shared_plugin_stop_lock->release_read_lock();
345     DBUG_RETURN(1);
346   }
347 
348   if (member_status == Group_member_info::MEMBER_OFFLINE)
349   {
350     /* purecov: begin inspected */
351     log_message(MY_ERROR_LEVEL,
352                 "Transaction cannot be executed while Group Replication is OFFLINE."
353                 " Check for errors and restart the plugin");
354     shared_plugin_stop_lock->release_read_lock();
355     DBUG_RETURN(1);
356     /* purecov: end */
357   }
358 
359   // Transaction information.
360   const ulong transaction_size_limit= get_transaction_size_limit();
361   my_off_t transaction_size= 0;
362 
363   const bool is_gtid_specified= param->gtid_info.type == GTID_GROUP;
364   Gtid gtid= { param->gtid_info.sidno, param->gtid_info.gno };
365   if (!is_gtid_specified)
366   {
367     // Dummy values that will be replaced after certification.
368     gtid.sidno= 1;
369     gtid.gno= 1;
370   }
371 
372   const Gtid_specification gtid_specification= { GTID_GROUP, gtid };
373   Gtid_log_event *gle= NULL;
374 
375   Transaction_context_log_event *tcle= NULL;
376 
377   // group replication cache.
378   IO_CACHE *cache= NULL;
379 
380   // Todo optimize for memory (IO-cache's buf to start with, if not enough then trans mem-root)
381   // to avoid New message create/delete and/or its implicit MessageBuffer.
382   Transaction_Message transaction_msg;
383 
384   enum enum_gcs_error send_error= GCS_OK;
385 
386   // Binlog cache.
387   bool is_dml= true;
388   bool may_have_sbr_stmts= !is_dml;
389   IO_CACHE *cache_log= NULL;
390   my_off_t cache_log_position= 0;
391   bool reinit_cache_log_required= false;
392   const my_off_t trx_cache_log_position= my_b_tell(param->trx_cache_log);
393   const my_off_t stmt_cache_log_position= my_b_tell(param->stmt_cache_log);
394 
395   if (trx_cache_log_position > 0 && stmt_cache_log_position == 0)
396   {
397     cache_log= param->trx_cache_log;
398     cache_log_position= trx_cache_log_position;
399   }
400   else if (trx_cache_log_position == 0 && stmt_cache_log_position > 0)
401   {
402     cache_log= param->stmt_cache_log;
403     cache_log_position= stmt_cache_log_position;
404     is_dml= false;
405     may_have_sbr_stmts= true;
406   }
407   else
408   {
409     /* purecov: begin inspected */
410     log_message(MY_ERROR_LEVEL, "We can only use one cache type at a "
411                                 "time on session %u", param->thread_id);
412     shared_plugin_stop_lock->release_read_lock();
413     DBUG_RETURN(1);
414     /* purecov: end */
415   }
416 
417   applier_module->get_pipeline_stats_member_collector()
418       ->increment_transactions_local();
419 
420   assert(cache_log->type == WRITE_CACHE);
421   DBUG_PRINT("cache_log", ("thread_id: %u, trx_cache_log_position: %llu,"
422                            " stmt_cache_log_position: %llu",
423                            param->thread_id, trx_cache_log_position,
424                            stmt_cache_log_position));
425 
426   /*
427     Open group replication cache.
428     Reuse the same cache on each session for improved performance.
429   */
430   cache= observer_trans_get_io_cache(param->thread_id,
431                                      param->cache_log_max_size);
432   if (cache == NULL)
433   {
434     /* purecov: begin inspected */
435     error= pre_wait_error;
436     goto err;
437     /* purecov: end */
438   }
439 
440   // Reinit binlog cache to read.
441   if (reinit_cache(cache_log, READ_CACHE, 0))
442   {
443     /* purecov: begin inspected */
444     log_message(MY_ERROR_LEVEL, "Failed to reinit binlog cache log for read "
445                                 "on session %u", param->thread_id);
446     error= pre_wait_error;
447     goto err;
448     /* purecov: end */
449   }
450 
451   /*
452     After this, cache_log should be reinit to old saved value when we
453     are going out of the function scope.
454   */
455   reinit_cache_log_required= true;
456 
457   // Create transaction context.
458   tcle= new Transaction_context_log_event(param->server_uuid,
459                                           is_dml,
460                                           param->thread_id,
461                                           is_gtid_specified);
462   if (!tcle->is_valid())
463   {
464     /* purecov: begin inspected */
465     log_message(MY_ERROR_LEVEL,
466                 "Failed to create the context of the current "
467                 "transaction on session %u", param->thread_id);
468     error= pre_wait_error;
469     goto err;
470     /* purecov: end */
471   }
472 
473   if (is_dml)
474   {
475     Transaction_write_set* write_set= get_transaction_write_set(param->thread_id);
476     /*
477       When GTID is specified we may have empty transactions, that is,
478       a transaction may have not write set at all because it didn't
479       change any data, it will just persist that GTID as applied.
480     */
481     if ((write_set == NULL) && (!is_gtid_specified))
482     {
483       log_message(MY_ERROR_LEVEL, "Failed to extract the set of items written "
484                                   "during the execution of the current "
485                                   "transaction on session %u", param->thread_id);
486       error= pre_wait_error;
487       goto err;
488     }
489 
490     if (write_set != NULL)
491     {
492       if (add_write_set(tcle, write_set))
493       {
494         /* purecov: begin inspected */
495         cleanup_transaction_write_set(write_set);
496         log_message(MY_ERROR_LEVEL, "Failed to gather the set of items written "
497                                     "during the execution of the current "
498                                     "transaction on session %u", param->thread_id);
499         error= pre_wait_error;
500         goto err;
501         /* purecov: end */
502       }
503       cleanup_transaction_write_set(write_set);
504       assert(is_gtid_specified || (tcle->get_write_set()->size() > 0));
505     }
506     else
507     {
508       /*
509         For empty transactions we should set the GTID may_have_sbr_stmts. See
510         comment at binlog_cache_data::may_have_sbr_stmts().
511       */
512       may_have_sbr_stmts= true;
513     }
514   }
515 
516   // Write transaction context to group replication cache.
517   tcle->write(cache);
518 
519   // Write Gtid log event to group replication cache.
520   gle= new Gtid_log_event(param->server_id, is_dml, 0, 1,
521                           may_have_sbr_stmts,
522                           gtid_specification);
523   gle->write(cache);
524 
525   transaction_size= cache_log_position + my_b_tell(cache);
526   if (is_dml && transaction_size_limit &&
527      transaction_size > transaction_size_limit)
528   {
529     log_message(MY_ERROR_LEVEL, "Error on session %u. "
530                 "Transaction of size %llu exceeds specified limit %lu. "
531                 "To increase the limit please adjust group_replication_transaction_size_limit option.",
532                 param->thread_id, transaction_size,
533                 transaction_size_limit);
534     error= pre_wait_error;
535     goto err;
536   }
537 
538   // Reinit group replication cache to read.
539   if (reinit_cache(cache, READ_CACHE, 0))
540   {
541     /* purecov: begin inspected */
542     log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
543                                 "cache, for read operations, on session %u",
544                                 param->thread_id);
545     error= pre_wait_error;
546     goto err;
547     /* purecov: end */
548   }
549 
550   // Copy group replication cache to buffer.
551   if (transaction_msg.append_cache(cache))
552   {
553     /* purecov: begin inspected */
554     log_message(MY_ERROR_LEVEL, "Error while appending data to an internal "
555                                 "cache on session %u", param->thread_id);
556     error= pre_wait_error;
557     goto err;
558     /* purecov: end */
559   }
560 
561   // Copy binlog cache content to buffer.
562   if (transaction_msg.append_cache(cache_log))
563   {
564     /* purecov: begin inspected */
565     log_message(MY_ERROR_LEVEL, "Error while writing binary log cache on "
566                                 "session %u", param->thread_id);
567     error= pre_wait_error;
568     goto err;
569     /* purecov: end */
570   }
571 
572 
573   assert(certification_latch != NULL);
574   if (certification_latch->registerTicket(param->thread_id))
575   {
576     /* purecov: begin inspected */
577     log_message(MY_ERROR_LEVEL, "Unable to register for getting notifications "
578                                 "regarding the outcome of the transaction on "
579                                 "session %u", param->thread_id);
580     error= pre_wait_error;
581     goto err;
582     /* purecov: end */
583   }
584 
585 #ifndef NDEBUG
586   DBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface",
587                   {
588                     DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface");
589                     assert(!sql_command_check());
590                   };);
591 
592   DBUG_EXECUTE_IF("group_replication_before_message_broadcast",
593                   {
594                     const char act[]= "now wait_for waiting";
595                     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
596                   });
597 #endif
598 
599   /*
600     Check if member needs to throttle its transactions to avoid
601     cause starvation on the group.
602   */
603   applier_module->get_flow_control_module()->do_wait();
604 
605   //Broadcast the Transaction Message
606   send_error= gcs_module->send_message(transaction_msg);
607   if (send_error == GCS_MESSAGE_TOO_BIG)
608   {
609     /* purecov: begin inspected */
610     log_message(MY_ERROR_LEVEL, "Error broadcasting transaction to the group "
611                                 "on session %u. Message is too big.",
612                                 param->thread_id);
613     error= pre_wait_error;
614     goto err;
615     /* purecov: end */
616   }
617   else if (send_error == GCS_NOK)
618   {
619     /* purecov: begin inspected */
620     log_message(MY_ERROR_LEVEL, "Error while broadcasting the transaction to "
621                                 "the group on session %u", param->thread_id);
622     error= pre_wait_error;
623     goto err;
624     /* purecov: end */
625   }
626 
627   shared_plugin_stop_lock->release_read_lock();
628 
629   assert(certification_latch != NULL);
630   if (certification_latch->waitTicket(param->thread_id))
631   {
632     /* purecov: begin inspected */
633     log_message(MY_ERROR_LEVEL, "Error while waiting for conflict detection "
634                                 "procedure to finish on session %u",
635                                 param->thread_id);
636     error= post_wait_error;
637     goto err;
638     /* purecov: end */
639   }
640 
641 err:
642   // Reinit binlog cache to write (revert what we did).
643   if (reinit_cache_log_required &&
644       reinit_cache(cache_log, WRITE_CACHE, cache_log_position))
645   {
646     /* purecov: begin inspected */
647     log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
648                                 "cache, for write operations, on session %u",
649                                 param->thread_id);
650     /* purecov: end */
651   }
652   observer_trans_put_io_cache(cache);
653   delete gle;
654   delete tcle;
655 
656   if (error)
657   {
658     if (error == pre_wait_error)
659       shared_plugin_stop_lock->release_read_lock();
660 
661     assert(certification_latch != NULL);
662     // Release and remove certification latch ticket.
663     certification_latch->releaseTicket(param->thread_id);
664     certification_latch->waitTicket(param->thread_id);
665   }
666 
667   DBUG_EXECUTE_IF("group_replication_after_before_commit_hook",
668                  {
669                     const char act[]= "now wait_for signal.commit_continue";
670                     assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
671                  });
672   DBUG_RETURN(error);
673 }
674 
group_replication_trans_before_rollback(Trans_param * param)675 int group_replication_trans_before_rollback(Trans_param *param)
676 {
677   DBUG_ENTER("group_replication_trans_before_rollback");
678   DBUG_RETURN(0);
679 }
680 
group_replication_trans_after_commit(Trans_param * param)681 int group_replication_trans_after_commit(Trans_param *param)
682 {
683   DBUG_ENTER("group_replication_trans_after_commit");
684   DBUG_RETURN(0);
685 }
686 
group_replication_trans_after_rollback(Trans_param * param)687 int group_replication_trans_after_rollback(Trans_param *param)
688 {
689   DBUG_ENTER("group_replication_trans_after_rollback");
690   DBUG_RETURN(0);
691 }
692 
693 Trans_observer trans_observer = {
694   sizeof(Trans_observer),
695 
696   group_replication_trans_before_dml,
697   group_replication_trans_before_commit,
698   group_replication_trans_before_rollback,
699   group_replication_trans_after_commit,
700   group_replication_trans_after_rollback,
701 };
702 
703 /*
704   Internal auxiliary functions.
705 */
706 
707 /*
708   Reinit IO_cache type.
709 
710   @param[in] cache     cache
711   @param[in] type      type to which cache will change
712   @param[in] position  position to which cache will seek
713 */
reinit_cache(IO_CACHE * cache,enum cache_type type,my_off_t position)714 static bool reinit_cache(IO_CACHE *cache,
715                          enum cache_type type,
716                          my_off_t position)
717 {
718   DBUG_ENTER("reinit_cache");
719 
720   /*
721     Avoid call flush_io_cache() before reinit_io_cache() if
722     temporary file does not exist.
723     Call flush_io_cache() forces the creation of the cache
724     temporary file, even when it does not exist.
725   */
726   if (READ_CACHE == type && cache->file != -1 && flush_io_cache(cache))
727     DBUG_RETURN(true); /* purecov: inspected */
728 
729   if (reinit_io_cache(cache, type, position, 0, 0))
730     DBUG_RETURN(true); /* purecov: inspected */
731 
732   DBUG_RETURN(false);
733 }
734 
735 /*
736   Get already initialized cache or create a new cache for
737   this session.
738 
739   @param[in] thread_id   the session
740   @param[in] cache_size  the cache size
741 
742   @return The cache or NULL on error
743 */
observer_trans_get_io_cache(my_thread_id thread_id,ulonglong cache_size)744 IO_CACHE* observer_trans_get_io_cache(my_thread_id thread_id,
745                                       ulonglong cache_size)
746 {
747   DBUG_ENTER("observer_trans_get_io_cache");
748   IO_CACHE *cache= NULL;
749 
750   io_cache_unused_list_lock->wrlock();
751   if (io_cache_unused_list.empty())
752   {
753     io_cache_unused_list_lock->unlock();
754     // Open IO_CACHE file
755     cache= (IO_CACHE*) my_malloc(PSI_NOT_INSTRUMENTED,
756                                  sizeof(IO_CACHE),
757                                  MYF(MY_ZEROFILL));
758     if (!cache || (!my_b_inited(cache) &&
759                    open_cached_file(cache, mysql_tmpdir,
760                                     "group_replication_trans_before_commit",
761                                     static_cast<size_t>(cache_size), MYF(MY_WME))))
762     {
763       /* purecov: begin inspected */
764       my_free(cache);
765       cache= NULL;
766       log_message(MY_ERROR_LEVEL,
767                   "Failed to create group replication commit cache on session %u",
768                   thread_id);
769       goto end;
770       /* purecov: end */
771     }
772   }
773   else
774   {
775     // Reuse cache created previously.
776     cache= io_cache_unused_list.front();
777     io_cache_unused_list.pop_front();
778     io_cache_unused_list_lock->unlock();
779 
780     if (reinit_cache(cache, WRITE_CACHE, 0))
781     {
782       /* purecov: begin inspected */
783       close_cached_file(cache);
784       my_free(cache);
785       cache= NULL;
786       log_message(MY_ERROR_LEVEL,
787                   "Failed to reinit group replication commit cache for write "
788                   "on session %u", thread_id);
789       goto end;
790       /* purecov: end */
791     }
792   }
793 
794 end:
795   DBUG_RETURN(cache);
796 }
797 
798 /*
799   Save already initialized cache for a future session.
800 
801   @param[in] cache       the cache
802 */
observer_trans_put_io_cache(IO_CACHE * cache)803 void observer_trans_put_io_cache(IO_CACHE *cache)
804 {
805   DBUG_ENTER("observer_trans_put_io_cache");
806 
807   io_cache_unused_list_lock->wrlock();
808   io_cache_unused_list.push_back(cache);
809   io_cache_unused_list_lock->unlock();
810 
811   DBUG_VOID_RETURN;
812 }
813 
814 //Transaction Message implementation
815 
Transaction_Message()816 Transaction_Message::Transaction_Message()
817   :Plugin_gcs_message(CT_TRANSACTION_MESSAGE)
818 {
819 }
820 
~Transaction_Message()821 Transaction_Message::~Transaction_Message()
822 {
823 }
824 
825 bool
append_cache(IO_CACHE * src)826 Transaction_Message::append_cache(IO_CACHE *src)
827 {
828   DBUG_ENTER("append_cache");
829   assert(src->type == READ_CACHE);
830 
831   uchar *buffer= src->read_pos;
832   size_t length= my_b_fill(src);
833   if (src->file == -1)
834   {
835     // Read cache size directly when temporary file does not exist.
836     length= my_b_bytes_in_cache(src);
837   }
838 
839   while (length > 0 && !src->error)
840   {
841     data.insert(data.end(),
842                 buffer,
843                 buffer + length);
844 
845     src->read_pos= src->read_end;
846     length= my_b_fill(src);
847     buffer= src->read_pos;
848   }
849 
850   DBUG_RETURN(src->error ? true : false);
851 }
852 
853 void
encode_payload(std::vector<unsigned char> * buffer) const854 Transaction_Message::encode_payload(std::vector<unsigned char>* buffer) const
855 {
856   DBUG_ENTER("Transaction_Message::encode_payload");
857 
858   encode_payload_item_type_and_length(buffer, PIT_TRANSACTION_DATA, data.size());
859   buffer->insert(buffer->end(), data.begin(), data.end());
860 
861   DBUG_VOID_RETURN;
862 }
863 
864 void
decode_payload(const unsigned char * buffer,const unsigned char * end)865 Transaction_Message::decode_payload(const unsigned char* buffer,
866                                     const unsigned char* end)
867 {
868   DBUG_ENTER("Transaction_Message::decode_payload");
869   const unsigned char *slider= buffer;
870   uint16 payload_item_type= 0;
871   unsigned long long payload_item_length= 0;
872 
873   decode_payload_item_type_and_length(&slider,
874                                       &payload_item_type,
875                                       &payload_item_length);
876   data.clear();
877   data.insert(data.end(), slider, slider + payload_item_length);
878 
879   DBUG_VOID_RETURN;
880 }
881