1 /* Copyright (c) 2013, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software Foundation,
21 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22
23 #include <string>
24 #include <vector>
25
26 #include "observer_trans.h"
27 #include "plugin_log.h"
28 #include "plugin.h"
29 #include <mysql/service_rpl_transaction_ctx.h>
30 #include <mysql/service_rpl_transaction_write_set.h>
31 #include "sql_command_test.h"
32 #include "sql_service_interface.h"
33 #include "sql_service_command.h"
34 #include "base64.h"
35
36 /*
37 Buffer to read the write_set value as a string.
38 Since we support up to 64 bits hashes, 8 bytes are enough to store the info.
39 */
40 #define BUFFER_READ_PKE 8
41
42 /*
43 Map to store all open unused IO_CACHE.
44 Each ongoing transaction will have a busy cache, when the cache
45 is no more needed, it is added to this list for future use by
46 another transaction.
47 */
48 typedef std::list<IO_CACHE*> IO_CACHE_unused_list;
49 static IO_CACHE_unused_list io_cache_unused_list;
50
51 /*
52 Read/write lock to protect map find operations against new cache inserts.
53 */
54 static Checkable_rwlock *io_cache_unused_list_lock= NULL;
55
observer_trans_initialize()56 void observer_trans_initialize()
57 {
58 DBUG_ENTER("observer_trans_initialize");
59
60 io_cache_unused_list_lock= new Checkable_rwlock(
61 #ifdef HAVE_PSI_INTERFACE
62 key_GR_RWLOCK_io_cache_unused_list
63 #endif /* HAVE_PSI_INTERFACE */
64 );
65
66 DBUG_VOID_RETURN;
67 }
68
observer_trans_terminate()69 void observer_trans_terminate()
70 {
71 DBUG_ENTER("observer_trans_terminate");
72
73 delete io_cache_unused_list_lock;
74 io_cache_unused_list_lock= NULL;
75
76 DBUG_VOID_RETURN;
77 }
78
observer_trans_clear_io_cache_unused_list()79 void observer_trans_clear_io_cache_unused_list()
80 {
81 DBUG_ENTER("observer_trans_clear_io_cache_unused_list");
82 io_cache_unused_list_lock->wrlock();
83
84 for (IO_CACHE_unused_list::iterator it= io_cache_unused_list.begin();
85 it != io_cache_unused_list.end();
86 ++it)
87 {
88 IO_CACHE *cache= *it;
89 close_cached_file(cache);
90 my_free(cache);
91 }
92
93 io_cache_unused_list.clear();
94
95 io_cache_unused_list_lock->unlock();
96 DBUG_VOID_RETURN;
97 }
98
99 /*
100 Internal auxiliary functions signatures.
101 */
102 static bool reinit_cache(IO_CACHE *cache,
103 enum cache_type type,
104 my_off_t position);
105
106 IO_CACHE* observer_trans_get_io_cache(my_thread_id thread_id,
107 ulonglong cache_size);
108
109 void observer_trans_put_io_cache(IO_CACHE *cache);
110
cleanup_transaction_write_set(Transaction_write_set * transaction_write_set)111 void cleanup_transaction_write_set(Transaction_write_set *transaction_write_set)
112 {
113 DBUG_ENTER("cleanup_transaction_write_set");
114 if (transaction_write_set != NULL)
115 {
116 my_free (transaction_write_set->write_set);
117 my_free (transaction_write_set);
118 }
119 DBUG_VOID_RETURN;
120 }
121
add_write_set(Transaction_context_log_event * tcle,Transaction_write_set * set)122 int add_write_set(Transaction_context_log_event *tcle,
123 Transaction_write_set *set)
124 {
125 DBUG_ENTER("add_write_set");
126 int iterator= set->write_set_size;
127 for (int i = 0; i < iterator; i++)
128 {
129 uchar buff[BUFFER_READ_PKE];
130 int8store(buff, set->write_set[i]);
131 uint64 const tmp_str_sz= base64_needed_encoded_length((uint64) BUFFER_READ_PKE);
132 char *write_set_value= (char *) my_malloc(PSI_NOT_INSTRUMENTED,
133 static_cast<size_t>(tmp_str_sz), MYF(MY_WME));
134 if (!write_set_value)
135 {
136 /* purecov: begin inspected */
137 log_message(MY_ERROR_LEVEL, "No memory to generate write identification hash");
138 DBUG_RETURN(1);
139 /* purecov: end */
140 }
141
142 if (base64_encode(buff, (size_t) BUFFER_READ_PKE, write_set_value))
143 {
144 /* purecov: begin inspected */
145 log_message(MY_ERROR_LEVEL,
146 "Base 64 encoding of the write identification hash failed");
147 DBUG_RETURN(1);
148 /* purecov: end */
149 }
150
151 tcle->add_write_set(write_set_value);
152 }
153 DBUG_RETURN(0);
154 }
155
156 /*
157 Transaction lifecycle events observers.
158 */
159
group_replication_trans_before_dml(Trans_param * param,int & out)160 int group_replication_trans_before_dml(Trans_param *param, int& out)
161 {
162 DBUG_ENTER("group_replication_trans_before_dml");
163
164 out= 0;
165
166 //If group replication has not started, then moving along...
167 if (!plugin_is_group_replication_running())
168 {
169 DBUG_RETURN(0);
170 }
171
172 /*
173 The first check to be made is if the session binlog is active
174 If it is not active, this query is not relevant for the plugin.
175 */
176 if(!param->trans_ctx_info.binlog_enabled)
177 {
178 DBUG_RETURN(0);
179 }
180
181 /*
182 In runtime, check the global variables that can change.
183 */
184 if( (out+= (param->trans_ctx_info.binlog_format != BINLOG_FORMAT_ROW)) )
185 {
186 log_message(MY_ERROR_LEVEL, "Binlog format should be ROW for Group Replication");
187
188 DBUG_RETURN(0);
189 }
190
191 if( (out+= (param->trans_ctx_info.binlog_checksum_options !=
192 binary_log::BINLOG_CHECKSUM_ALG_OFF)) )
193 {
194 log_message(MY_ERROR_LEVEL, "binlog_checksum should be NONE for Group Replication");
195
196 DBUG_RETURN(0);
197 }
198
199 if ((out+= (param->trans_ctx_info.transaction_write_set_extraction ==
200 HASH_ALGORITHM_OFF)))
201 {
202 /* purecov: begin inspected */
203 log_message(MY_ERROR_LEVEL,
204 "A transaction_write_set_extraction algorithm "
205 "should be selected when running Group Replication");
206 DBUG_RETURN(0);
207 /* purecov: end */
208 }
209
210 if (local_member_info->has_enforces_update_everywhere_checks() &&
211 (out+= (param->trans_ctx_info.tx_isolation == ISO_SERIALIZABLE)))
212 {
213 log_message(MY_ERROR_LEVEL, "Transaction isolation level (tx_isolation) "
214 "is set to SERIALIZABLE, which is not compatible with Group "
215 "Replication");
216 DBUG_RETURN(0);
217 }
218 /*
219 Cycle through all involved tables to assess if they all
220 comply with the plugin runtime requirements. For now:
221 - The table must be from a transactional engine
222 - It must contain at least one primary key
223 - It should not contain 'ON DELETE/UPDATE CASCADE' referential action
224 */
225 for(uint table=0; out == 0 && table < param->number_of_tables; table++)
226 {
227 if (param->tables_info[table].db_type != DB_TYPE_INNODB)
228 {
229 log_message(MY_ERROR_LEVEL, "Table %s does not use the InnoDB storage "
230 "engine. This is not compatible with Group "
231 "Replication",
232 param->tables_info[table].table_name);
233 out++;
234 }
235
236 if(param->tables_info[table].number_of_primary_keys == 0)
237 {
238 log_message(MY_ERROR_LEVEL, "Table %s does not have any PRIMARY KEY. This is not compatible with Group Replication",
239 param->tables_info[table].table_name);
240 out++;
241 }
242 if (local_member_info->has_enforces_update_everywhere_checks() &&
243 param->tables_info[table].has_cascade_foreign_key)
244 {
245 log_message(MY_ERROR_LEVEL, "Table %s has a foreign key with"
246 " 'CASCADE' clause. This is not compatible with Group"
247 " Replication", param->tables_info[table].table_name);
248 out++;
249 }
250 }
251
252 DBUG_RETURN(0);
253 }
254
group_replication_trans_before_commit(Trans_param * param)255 int group_replication_trans_before_commit(Trans_param *param)
256 {
257 DBUG_ENTER("group_replication_trans_before_commit");
258 int error= 0;
259 const int pre_wait_error= 1;
260 const int post_wait_error= 2;
261
262 DBUG_EXECUTE_IF("group_replication_force_error_on_before_commit_listener",
263 DBUG_RETURN(1););
264
265 DBUG_EXECUTE_IF("group_replication_before_commit_hook_wait",
266 {
267 const char act[]= "now wait_for continue_commit";
268 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
269 });
270
271 /*
272 If the originating id belongs to a thread in the plugin, the transaction
273 was already certified. Channel operations can deadlock against
274 plugin/applier thread stops so they must remain outside the plugin stop
275 lock below.
276 */
277 Replication_thread_api channel_interface;
278 if (GR_APPLIER_CHANNEL == param->rpl_channel_type) {
279 // If plugin is not initialized, there is nothing to do.
280 if (NULL == local_member_info) {
281 DBUG_RETURN(0);
282 }
283
284 // If plugin is stopping, there is no point in update the statistics.
285 bool fail_to_lock= shared_plugin_stop_lock->try_grab_read_lock();
286 if (!fail_to_lock)
287 {
288 if (local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
289 {
290 applier_module->get_pipeline_stats_member_collector()
291 ->decrement_transactions_waiting_apply();
292 applier_module->get_pipeline_stats_member_collector()
293 ->increment_transactions_applied();
294 }
295 shared_plugin_stop_lock->release_read_lock();
296 }
297
298 DBUG_RETURN(0);
299 }
300 if (GR_RECOVERY_CHANNEL == param->rpl_channel_type) {
301 DBUG_RETURN(0);
302 }
303
304 if (shared_plugin_stop_lock->try_grab_read_lock()) {
305 /* If plugin is stopping, rollback the transaction immediatly. */
306 DBUG_RETURN(1);
307 }
308
309 if (is_plugin_waiting_to_set_server_read_mode())
310 {
311 log_message(MY_ERROR_LEVEL,
312 "Transaction cannot be executed while Group Replication is stopping.");
313 shared_plugin_stop_lock->release_read_lock();
314 DBUG_RETURN(1);
315 }
316
317 /* If the plugin is not running, before commit should return success. */
318 if (!plugin_is_group_replication_running())
319 {
320 shared_plugin_stop_lock->release_read_lock();
321 DBUG_RETURN(0);
322 }
323
324 assert(applier_module != NULL && recovery_module != NULL);
325 Group_member_info::Group_member_status member_status=
326 local_member_info->get_recovery_status();
327
328 if (member_status == Group_member_info::MEMBER_IN_RECOVERY)
329 {
330 /* purecov: begin inspected */
331 log_message(MY_ERROR_LEVEL,
332 "Transaction cannot be executed while Group Replication is recovering."
333 " Try again when the server is ONLINE.");
334 shared_plugin_stop_lock->release_read_lock();
335 DBUG_RETURN(1);
336 /* purecov: end */
337 }
338
339 if (member_status == Group_member_info::MEMBER_ERROR)
340 {
341 log_message(MY_ERROR_LEVEL,
342 "Transaction cannot be executed while Group Replication is on ERROR state."
343 " Check for errors and restart the plugin");
344 shared_plugin_stop_lock->release_read_lock();
345 DBUG_RETURN(1);
346 }
347
348 if (member_status == Group_member_info::MEMBER_OFFLINE)
349 {
350 /* purecov: begin inspected */
351 log_message(MY_ERROR_LEVEL,
352 "Transaction cannot be executed while Group Replication is OFFLINE."
353 " Check for errors and restart the plugin");
354 shared_plugin_stop_lock->release_read_lock();
355 DBUG_RETURN(1);
356 /* purecov: end */
357 }
358
359 // Transaction information.
360 const ulong transaction_size_limit= get_transaction_size_limit();
361 my_off_t transaction_size= 0;
362
363 const bool is_gtid_specified= param->gtid_info.type == GTID_GROUP;
364 Gtid gtid= { param->gtid_info.sidno, param->gtid_info.gno };
365 if (!is_gtid_specified)
366 {
367 // Dummy values that will be replaced after certification.
368 gtid.sidno= 1;
369 gtid.gno= 1;
370 }
371
372 const Gtid_specification gtid_specification= { GTID_GROUP, gtid };
373 Gtid_log_event *gle= NULL;
374
375 Transaction_context_log_event *tcle= NULL;
376
377 // group replication cache.
378 IO_CACHE *cache= NULL;
379
380 // Todo optimize for memory (IO-cache's buf to start with, if not enough then trans mem-root)
381 // to avoid New message create/delete and/or its implicit MessageBuffer.
382 Transaction_Message transaction_msg;
383
384 enum enum_gcs_error send_error= GCS_OK;
385
386 // Binlog cache.
387 bool is_dml= true;
388 bool may_have_sbr_stmts= !is_dml;
389 IO_CACHE *cache_log= NULL;
390 my_off_t cache_log_position= 0;
391 bool reinit_cache_log_required= false;
392 const my_off_t trx_cache_log_position= my_b_tell(param->trx_cache_log);
393 const my_off_t stmt_cache_log_position= my_b_tell(param->stmt_cache_log);
394
395 if (trx_cache_log_position > 0 && stmt_cache_log_position == 0)
396 {
397 cache_log= param->trx_cache_log;
398 cache_log_position= trx_cache_log_position;
399 }
400 else if (trx_cache_log_position == 0 && stmt_cache_log_position > 0)
401 {
402 cache_log= param->stmt_cache_log;
403 cache_log_position= stmt_cache_log_position;
404 is_dml= false;
405 may_have_sbr_stmts= true;
406 }
407 else
408 {
409 /* purecov: begin inspected */
410 log_message(MY_ERROR_LEVEL, "We can only use one cache type at a "
411 "time on session %u", param->thread_id);
412 shared_plugin_stop_lock->release_read_lock();
413 DBUG_RETURN(1);
414 /* purecov: end */
415 }
416
417 applier_module->get_pipeline_stats_member_collector()
418 ->increment_transactions_local();
419
420 assert(cache_log->type == WRITE_CACHE);
421 DBUG_PRINT("cache_log", ("thread_id: %u, trx_cache_log_position: %llu,"
422 " stmt_cache_log_position: %llu",
423 param->thread_id, trx_cache_log_position,
424 stmt_cache_log_position));
425
426 /*
427 Open group replication cache.
428 Reuse the same cache on each session for improved performance.
429 */
430 cache= observer_trans_get_io_cache(param->thread_id,
431 param->cache_log_max_size);
432 if (cache == NULL)
433 {
434 /* purecov: begin inspected */
435 error= pre_wait_error;
436 goto err;
437 /* purecov: end */
438 }
439
440 // Reinit binlog cache to read.
441 if (reinit_cache(cache_log, READ_CACHE, 0))
442 {
443 /* purecov: begin inspected */
444 log_message(MY_ERROR_LEVEL, "Failed to reinit binlog cache log for read "
445 "on session %u", param->thread_id);
446 error= pre_wait_error;
447 goto err;
448 /* purecov: end */
449 }
450
451 /*
452 After this, cache_log should be reinit to old saved value when we
453 are going out of the function scope.
454 */
455 reinit_cache_log_required= true;
456
457 // Create transaction context.
458 tcle= new Transaction_context_log_event(param->server_uuid,
459 is_dml,
460 param->thread_id,
461 is_gtid_specified);
462 if (!tcle->is_valid())
463 {
464 /* purecov: begin inspected */
465 log_message(MY_ERROR_LEVEL,
466 "Failed to create the context of the current "
467 "transaction on session %u", param->thread_id);
468 error= pre_wait_error;
469 goto err;
470 /* purecov: end */
471 }
472
473 if (is_dml)
474 {
475 Transaction_write_set* write_set= get_transaction_write_set(param->thread_id);
476 /*
477 When GTID is specified we may have empty transactions, that is,
478 a transaction may have not write set at all because it didn't
479 change any data, it will just persist that GTID as applied.
480 */
481 if ((write_set == NULL) && (!is_gtid_specified))
482 {
483 log_message(MY_ERROR_LEVEL, "Failed to extract the set of items written "
484 "during the execution of the current "
485 "transaction on session %u", param->thread_id);
486 error= pre_wait_error;
487 goto err;
488 }
489
490 if (write_set != NULL)
491 {
492 if (add_write_set(tcle, write_set))
493 {
494 /* purecov: begin inspected */
495 cleanup_transaction_write_set(write_set);
496 log_message(MY_ERROR_LEVEL, "Failed to gather the set of items written "
497 "during the execution of the current "
498 "transaction on session %u", param->thread_id);
499 error= pre_wait_error;
500 goto err;
501 /* purecov: end */
502 }
503 cleanup_transaction_write_set(write_set);
504 assert(is_gtid_specified || (tcle->get_write_set()->size() > 0));
505 }
506 else
507 {
508 /*
509 For empty transactions we should set the GTID may_have_sbr_stmts. See
510 comment at binlog_cache_data::may_have_sbr_stmts().
511 */
512 may_have_sbr_stmts= true;
513 }
514 }
515
516 // Write transaction context to group replication cache.
517 tcle->write(cache);
518
519 // Write Gtid log event to group replication cache.
520 gle= new Gtid_log_event(param->server_id, is_dml, 0, 1,
521 may_have_sbr_stmts,
522 gtid_specification);
523 gle->write(cache);
524
525 transaction_size= cache_log_position + my_b_tell(cache);
526 if (is_dml && transaction_size_limit &&
527 transaction_size > transaction_size_limit)
528 {
529 log_message(MY_ERROR_LEVEL, "Error on session %u. "
530 "Transaction of size %llu exceeds specified limit %lu. "
531 "To increase the limit please adjust group_replication_transaction_size_limit option.",
532 param->thread_id, transaction_size,
533 transaction_size_limit);
534 error= pre_wait_error;
535 goto err;
536 }
537
538 // Reinit group replication cache to read.
539 if (reinit_cache(cache, READ_CACHE, 0))
540 {
541 /* purecov: begin inspected */
542 log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
543 "cache, for read operations, on session %u",
544 param->thread_id);
545 error= pre_wait_error;
546 goto err;
547 /* purecov: end */
548 }
549
550 // Copy group replication cache to buffer.
551 if (transaction_msg.append_cache(cache))
552 {
553 /* purecov: begin inspected */
554 log_message(MY_ERROR_LEVEL, "Error while appending data to an internal "
555 "cache on session %u", param->thread_id);
556 error= pre_wait_error;
557 goto err;
558 /* purecov: end */
559 }
560
561 // Copy binlog cache content to buffer.
562 if (transaction_msg.append_cache(cache_log))
563 {
564 /* purecov: begin inspected */
565 log_message(MY_ERROR_LEVEL, "Error while writing binary log cache on "
566 "session %u", param->thread_id);
567 error= pre_wait_error;
568 goto err;
569 /* purecov: end */
570 }
571
572
573 assert(certification_latch != NULL);
574 if (certification_latch->registerTicket(param->thread_id))
575 {
576 /* purecov: begin inspected */
577 log_message(MY_ERROR_LEVEL, "Unable to register for getting notifications "
578 "regarding the outcome of the transaction on "
579 "session %u", param->thread_id);
580 error= pre_wait_error;
581 goto err;
582 /* purecov: end */
583 }
584
585 #ifndef NDEBUG
586 DBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface",
587 {
588 DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface");
589 assert(!sql_command_check());
590 };);
591
592 DBUG_EXECUTE_IF("group_replication_before_message_broadcast",
593 {
594 const char act[]= "now wait_for waiting";
595 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
596 });
597 #endif
598
599 /*
600 Check if member needs to throttle its transactions to avoid
601 cause starvation on the group.
602 */
603 applier_module->get_flow_control_module()->do_wait();
604
605 //Broadcast the Transaction Message
606 send_error= gcs_module->send_message(transaction_msg);
607 if (send_error == GCS_MESSAGE_TOO_BIG)
608 {
609 /* purecov: begin inspected */
610 log_message(MY_ERROR_LEVEL, "Error broadcasting transaction to the group "
611 "on session %u. Message is too big.",
612 param->thread_id);
613 error= pre_wait_error;
614 goto err;
615 /* purecov: end */
616 }
617 else if (send_error == GCS_NOK)
618 {
619 /* purecov: begin inspected */
620 log_message(MY_ERROR_LEVEL, "Error while broadcasting the transaction to "
621 "the group on session %u", param->thread_id);
622 error= pre_wait_error;
623 goto err;
624 /* purecov: end */
625 }
626
627 shared_plugin_stop_lock->release_read_lock();
628
629 assert(certification_latch != NULL);
630 if (certification_latch->waitTicket(param->thread_id))
631 {
632 /* purecov: begin inspected */
633 log_message(MY_ERROR_LEVEL, "Error while waiting for conflict detection "
634 "procedure to finish on session %u",
635 param->thread_id);
636 error= post_wait_error;
637 goto err;
638 /* purecov: end */
639 }
640
641 err:
642 // Reinit binlog cache to write (revert what we did).
643 if (reinit_cache_log_required &&
644 reinit_cache(cache_log, WRITE_CACHE, cache_log_position))
645 {
646 /* purecov: begin inspected */
647 log_message(MY_ERROR_LEVEL, "Error while re-initializing an internal "
648 "cache, for write operations, on session %u",
649 param->thread_id);
650 /* purecov: end */
651 }
652 observer_trans_put_io_cache(cache);
653 delete gle;
654 delete tcle;
655
656 if (error)
657 {
658 if (error == pre_wait_error)
659 shared_plugin_stop_lock->release_read_lock();
660
661 assert(certification_latch != NULL);
662 // Release and remove certification latch ticket.
663 certification_latch->releaseTicket(param->thread_id);
664 certification_latch->waitTicket(param->thread_id);
665 }
666
667 DBUG_EXECUTE_IF("group_replication_after_before_commit_hook",
668 {
669 const char act[]= "now wait_for signal.commit_continue";
670 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
671 });
672 DBUG_RETURN(error);
673 }
674
group_replication_trans_before_rollback(Trans_param * param)675 int group_replication_trans_before_rollback(Trans_param *param)
676 {
677 DBUG_ENTER("group_replication_trans_before_rollback");
678 DBUG_RETURN(0);
679 }
680
group_replication_trans_after_commit(Trans_param * param)681 int group_replication_trans_after_commit(Trans_param *param)
682 {
683 DBUG_ENTER("group_replication_trans_after_commit");
684 DBUG_RETURN(0);
685 }
686
group_replication_trans_after_rollback(Trans_param * param)687 int group_replication_trans_after_rollback(Trans_param *param)
688 {
689 DBUG_ENTER("group_replication_trans_after_rollback");
690 DBUG_RETURN(0);
691 }
692
693 Trans_observer trans_observer = {
694 sizeof(Trans_observer),
695
696 group_replication_trans_before_dml,
697 group_replication_trans_before_commit,
698 group_replication_trans_before_rollback,
699 group_replication_trans_after_commit,
700 group_replication_trans_after_rollback,
701 };
702
703 /*
704 Internal auxiliary functions.
705 */
706
707 /*
708 Reinit IO_cache type.
709
710 @param[in] cache cache
711 @param[in] type type to which cache will change
712 @param[in] position position to which cache will seek
713 */
reinit_cache(IO_CACHE * cache,enum cache_type type,my_off_t position)714 static bool reinit_cache(IO_CACHE *cache,
715 enum cache_type type,
716 my_off_t position)
717 {
718 DBUG_ENTER("reinit_cache");
719
720 /*
721 Avoid call flush_io_cache() before reinit_io_cache() if
722 temporary file does not exist.
723 Call flush_io_cache() forces the creation of the cache
724 temporary file, even when it does not exist.
725 */
726 if (READ_CACHE == type && cache->file != -1 && flush_io_cache(cache))
727 DBUG_RETURN(true); /* purecov: inspected */
728
729 if (reinit_io_cache(cache, type, position, 0, 0))
730 DBUG_RETURN(true); /* purecov: inspected */
731
732 DBUG_RETURN(false);
733 }
734
735 /*
736 Get already initialized cache or create a new cache for
737 this session.
738
739 @param[in] thread_id the session
740 @param[in] cache_size the cache size
741
742 @return The cache or NULL on error
743 */
observer_trans_get_io_cache(my_thread_id thread_id,ulonglong cache_size)744 IO_CACHE* observer_trans_get_io_cache(my_thread_id thread_id,
745 ulonglong cache_size)
746 {
747 DBUG_ENTER("observer_trans_get_io_cache");
748 IO_CACHE *cache= NULL;
749
750 io_cache_unused_list_lock->wrlock();
751 if (io_cache_unused_list.empty())
752 {
753 io_cache_unused_list_lock->unlock();
754 // Open IO_CACHE file
755 cache= (IO_CACHE*) my_malloc(PSI_NOT_INSTRUMENTED,
756 sizeof(IO_CACHE),
757 MYF(MY_ZEROFILL));
758 if (!cache || (!my_b_inited(cache) &&
759 open_cached_file(cache, mysql_tmpdir,
760 "group_replication_trans_before_commit",
761 static_cast<size_t>(cache_size), MYF(MY_WME))))
762 {
763 /* purecov: begin inspected */
764 my_free(cache);
765 cache= NULL;
766 log_message(MY_ERROR_LEVEL,
767 "Failed to create group replication commit cache on session %u",
768 thread_id);
769 goto end;
770 /* purecov: end */
771 }
772 }
773 else
774 {
775 // Reuse cache created previously.
776 cache= io_cache_unused_list.front();
777 io_cache_unused_list.pop_front();
778 io_cache_unused_list_lock->unlock();
779
780 if (reinit_cache(cache, WRITE_CACHE, 0))
781 {
782 /* purecov: begin inspected */
783 close_cached_file(cache);
784 my_free(cache);
785 cache= NULL;
786 log_message(MY_ERROR_LEVEL,
787 "Failed to reinit group replication commit cache for write "
788 "on session %u", thread_id);
789 goto end;
790 /* purecov: end */
791 }
792 }
793
794 end:
795 DBUG_RETURN(cache);
796 }
797
798 /*
799 Save already initialized cache for a future session.
800
801 @param[in] cache the cache
802 */
observer_trans_put_io_cache(IO_CACHE * cache)803 void observer_trans_put_io_cache(IO_CACHE *cache)
804 {
805 DBUG_ENTER("observer_trans_put_io_cache");
806
807 io_cache_unused_list_lock->wrlock();
808 io_cache_unused_list.push_back(cache);
809 io_cache_unused_list_lock->unlock();
810
811 DBUG_VOID_RETURN;
812 }
813
814 //Transaction Message implementation
815
Transaction_Message()816 Transaction_Message::Transaction_Message()
817 :Plugin_gcs_message(CT_TRANSACTION_MESSAGE)
818 {
819 }
820
~Transaction_Message()821 Transaction_Message::~Transaction_Message()
822 {
823 }
824
825 bool
append_cache(IO_CACHE * src)826 Transaction_Message::append_cache(IO_CACHE *src)
827 {
828 DBUG_ENTER("append_cache");
829 assert(src->type == READ_CACHE);
830
831 uchar *buffer= src->read_pos;
832 size_t length= my_b_fill(src);
833 if (src->file == -1)
834 {
835 // Read cache size directly when temporary file does not exist.
836 length= my_b_bytes_in_cache(src);
837 }
838
839 while (length > 0 && !src->error)
840 {
841 data.insert(data.end(),
842 buffer,
843 buffer + length);
844
845 src->read_pos= src->read_end;
846 length= my_b_fill(src);
847 buffer= src->read_pos;
848 }
849
850 DBUG_RETURN(src->error ? true : false);
851 }
852
853 void
encode_payload(std::vector<unsigned char> * buffer) const854 Transaction_Message::encode_payload(std::vector<unsigned char>* buffer) const
855 {
856 DBUG_ENTER("Transaction_Message::encode_payload");
857
858 encode_payload_item_type_and_length(buffer, PIT_TRANSACTION_DATA, data.size());
859 buffer->insert(buffer->end(), data.begin(), data.end());
860
861 DBUG_VOID_RETURN;
862 }
863
864 void
decode_payload(const unsigned char * buffer,const unsigned char * end)865 Transaction_Message::decode_payload(const unsigned char* buffer,
866 const unsigned char* end)
867 {
868 DBUG_ENTER("Transaction_Message::decode_payload");
869 const unsigned char *slider= buffer;
870 uint16 payload_item_type= 0;
871 unsigned long long payload_item_length= 0;
872
873 decode_payload_item_type_and_length(&slider,
874 &payload_item_type,
875 &payload_item_length);
876 data.clear();
877 data.insert(data.end(), slider, slider + payload_item_length);
878
879 DBUG_VOID_RETURN;
880 }
881