1 /*
2       Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3 
4       This program is free software; you can redistribute it and/or modify
5       it under the terms of the GNU General Public License, version 2.0,
6       as published by the Free Software Foundation.
7 
8       This program is also distributed with certain software (including
9       but not limited to OpenSSL) that is licensed under separate terms,
10       as designated in a particular file or component or in included license
11       documentation.  The authors of MySQL hereby grant you an additional
12       permission to link the program and your derivative works with the
13       separately licensed software that they have included with MySQL.
14 
15       This program is distributed in the hope that it will be useful,
16       but WITHOUT ANY WARRANTY; without even the implied warranty of
17       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18       GNU General Public License, version 2.0, for more details.
19 
20       You should have received a copy of the GNU General Public License
21       along with this program; if not, write to the Free Software
22       Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 /**
25   @file storage/perfschema/table_replication_applier_configuration.cc
26   Table replication_applier_configuration (implementation).
27 */
28 
29 //#define HAVE_REPLICATION
30 
31 #include "my_global.h"
32 #include "table_replication_applier_configuration.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "slave.h"
36 //#include "rpl_info.h"
37 #include "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 //#include "rpl_msr.h"   /* Multisource replication */
41 
42 #ifdef HAVE_REPLICATION
43 THR_LOCK table_replication_applier_configuration::m_table_lock;
44 
45 PFS_engine_table_share
46 table_replication_applier_configuration::m_share=
47 {
48   { C_STRING_WITH_LEN("replication_applier_configuration") },
49   &pfs_readonly_acl,
50   table_replication_applier_configuration::create,
51   NULL, /* write_row */
52   NULL, /* delete_all_rows */
53   table_replication_applier_configuration::get_row_count,
54   sizeof(pos_t), /* ref length */
55   &m_table_lock,
56   { C_STRING_WITH_LEN("CREATE TABLE replication_applier_configuration("
57   "CHANNEL_NAME CHAR(64) collate utf8_general_ci not null comment 'Replication channel name.',"
58   "DESIRED_DELAY INTEGER not null comment 'Desired replica delay functionality not supported by MariaDB. Always 0.')") },
59   false  /* perpetual */
60 };
61 
create(void)62 PFS_engine_table* table_replication_applier_configuration::create(void)
63 {
64   return new table_replication_applier_configuration();
65 }
66 
67 table_replication_applier_configuration
table_replication_applier_configuration()68   ::table_replication_applier_configuration()
69   : PFS_engine_table(&m_share, &m_pos),
70     m_row_exists(false), m_pos(0), m_next_pos(0)
71 {}
72 
73 table_replication_applier_configuration
~table_replication_applier_configuration()74   ::~table_replication_applier_configuration()
75 {}
76 
reset_position(void)77 void table_replication_applier_configuration::reset_position(void)
78 {
79   m_pos.m_index= 0;
80   m_next_pos.m_index= 0;
81 }
82 
83 
get_row_count()84 ha_rows table_replication_applier_configuration::get_row_count()
85 {
86  return master_info_index->master_info_hash.records;
87 }
88 
89 
rnd_next(void)90 int table_replication_applier_configuration::rnd_next(void)
91 {
92   Master_info *mi;
93   mysql_mutex_lock(&LOCK_active_mi);
94 
95   for (m_pos.set_at(&m_next_pos);
96        m_pos.m_index < master_info_index->master_info_hash.records;
97        m_pos.next())
98   {
99     mi= (Master_info *)my_hash_element(&master_info_index->master_info_hash, m_pos.m_index);
100 
101     if (mi && mi->host[0])
102     {
103       make_row(mi);
104       m_next_pos.set_after(&m_pos);
105       mysql_mutex_unlock(&LOCK_active_mi);
106       return 0;
107     }
108   }
109 
110   mysql_mutex_unlock(&LOCK_active_mi);
111   return HA_ERR_END_OF_FILE;
112 }
113 
rnd_pos(const void * pos)114 int table_replication_applier_configuration::rnd_pos(const void *pos)
115 {
116   Master_info *mi;
117   int res= HA_ERR_RECORD_DELETED;
118 
119   set_position(pos);
120 
121   mysql_mutex_lock(&LOCK_active_mi);
122 
123   if ((mi= (Master_info *)my_hash_element(&master_info_index->master_info_hash, m_pos.m_index)))
124   {
125     make_row(mi);
126     res= 0;
127   }
128 
129   mysql_mutex_unlock(&LOCK_active_mi);
130   return res;
131 }
132 
make_row(Master_info * mi)133 void table_replication_applier_configuration::make_row(Master_info *mi)
134 {
135   m_row_exists= false;
136 
137   DBUG_ASSERT(mi != NULL);
138 
139   mysql_mutex_lock(&mi->data_lock);
140   mysql_mutex_lock(&mi->rli.data_lock);
141 
142   m_row.channel_name_length= static_cast<uint>(mi->connection_name.length);
143   memcpy(m_row.channel_name, mi->connection_name.str, m_row.channel_name_length);
144   m_row.desired_delay= 0; //mi->rli->get_sql_delay();
145 
146   mysql_mutex_unlock(&mi->rli.data_lock);
147   mysql_mutex_unlock(&mi->data_lock);
148 
149   m_row_exists= true;
150 }
151 
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)152 int table_replication_applier_configuration::read_row_values(TABLE *table,
153                                                              unsigned char *buf,
154                                                              Field **fields,
155                                                              bool read_all)
156 {
157   Field *f;
158 
159   if (unlikely(! m_row_exists))
160     return HA_ERR_RECORD_DELETED;
161 
162   /*
163     Note:
164     There are no NULL columns in this table,
165     so there are no null bits reserved for NULL flags per column.
166     There are no VARCHAR columns either, so the record is not
167     in HA_OPTION_PACK_RECORD format as most other performance_schema tables.
168     When HA_OPTION_PACK_RECORD is not set,
169     the table record reserves an extra null byte, see open_binary_frm().
170   */
171 
172   assert(table->s->null_bytes == 1);
173   buf[0]= 0;
174 
175   for (; (f= *fields) ; fields++)
176   {
177     if (read_all || bitmap_is_set(table->read_set, f->field_index))
178     {
179       switch(f->field_index)
180       {
181       case 0: /**channel_name*/
182         set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
183         break;
184       case 1: /** desired_delay */
185         set_field_ulong(f, static_cast<ulong>(m_row.desired_delay));
186         break;
187       default:
188         assert(false);
189       }
190     }
191   }
192   return 0;
193 }
194 #endif
195