1 /* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 22 23 #include "connection_control_coordinator.h" 24 #include "connection_control.h" 25 #include <my_global.h> 26 #include <my_atomic.h> 27 28 namespace connection_control 29 { 30 /** 31 Reset Connection_event_coordinator information 32 */ 33 34 void reset()35 Connection_event_coordinator::reset() 36 { 37 m_subscribers.clear(); 38 for (uint i= (uint)STAT_CONNECTION_DELAY_TRIGGERED; 39 i < (uint)STAT_LAST; ++i) 40 m_status_vars_subscription[i]= 0; 41 } 42 43 44 /** 45 Register an event subscriber. 46 47 A subscriber can provide following preferences: 48 1. Set of events for which subscriber is interested 49 2. Set of variables for which subscriber would like to receive update 50 3. Set of stats for which subscriber would like to send update 51 52 @param [in] subscriber Handle to Connection_event_observers 53 @param [in] events Event mask supplied by subscriber 54 @param [in] sys_vars opt_connection_control vector 55 @param [in] status_vars stats_connection_control vector 56 57 @returns subscription status 58 @retval false Subscription successful 59 @retval true Failure in subscription for given combination of prefernece. 60 Most probably, other subscriber has already subscribed for 61 status var update. 62 */ 63 64 bool register_event_subscriber(Connection_event_observer ** subscriber,std::vector<opt_connection_control> * sys_vars,std::vector<stats_connection_control> * status_vars)65 Connection_event_coordinator::register_event_subscriber( 66 Connection_event_observer **subscriber, 67 std::vector<opt_connection_control> *sys_vars, 68 std::vector<stats_connection_control> *status_vars) 69 { 70 DBUG_ENTER("Connection_event_coordinator::register_event_subscriber"); 71 bool error= false; 72 std::vector<opt_connection_control>::iterator sys_vars_it; 73 std::vector<stats_connection_control>::iterator status_vars_it; 74 75 DBUG_ASSERT(subscriber != 0); 76 77 if (status_vars) 78 { 79 for (status_vars_it= status_vars->begin(); 80 status_vars_it != status_vars->end(); 81 ++status_vars_it) 82 { 83 if (*status_vars_it >= STAT_LAST || 84 m_status_vars_subscription[*status_vars_it] != 0) 85 { 86 /* 87 Either an invalid status variable is specified or 88 someone has already subscribed for status variable 89 */ 90 error= true; 91 break; 92 } 93 } 94 } 95 96 if (!error && sys_vars) 97 { 98 for (sys_vars_it= sys_vars->begin(); 99 sys_vars_it != sys_vars->end(); 100 ++sys_vars_it) 101 { 102 if (*sys_vars_it >= OPT_LAST) 103 error= true; 104 break; 105 } 106 } 107 108 if (!error) 109 { 110 /* 111 Create Connection_event_subscriber object and 112 initialize it with required details. 113 */ 114 Connection_event_subscriber subscriber_info; 115 subscriber_info.m_subscriber= *subscriber; 116 117 /* Reset the list first */ 118 for (uint i= (uint)OPT_FAILED_CONNECTIONS_THRESHOLD; 119 i < (uint)OPT_LAST; 120 ++i) 121 subscriber_info.m_sys_vars[i]= false; 122 123 /* Now set the bits which are requested by subscriber */ 124 for (sys_vars_it= sys_vars->begin(); 125 sys_vars_it != sys_vars->end(); 126 ++sys_vars_it) 127 subscriber_info.m_sys_vars[*sys_vars_it]= true; 128 129 /* Insert new entry in m_subscribers */ 130 try 131 { 132 m_subscribers.push_back(subscriber_info); 133 } 134 catch (...) 135 { 136 /* Something went wrong. Mostly likely OOM. */ 137 error= true; 138 } 139 140 /* 141 Update m_status_vars_subscription only if subscriber information 142 has been inserted in m_subscribers successfully. 143 */ 144 if (!error) 145 { 146 for (status_vars_it= status_vars->begin(); 147 status_vars_it != status_vars->end(); 148 ++status_vars_it) 149 m_status_vars_subscription[*status_vars_it]= *subscriber; 150 } 151 } 152 DBUG_RETURN(error); 153 } 154 155 156 /** 157 Handle connection event. 158 When a notification from server is received, perform following: 159 1. Iterate through list of subscribers 160 - If a subscriber has shown interest in received event, 161 call notify() for the subscriber 162 2. Interate through list of status variables 163 - If subscriber has show interest in any status variable, 164 call notify_status_var() for the subscriber 165 - If subscriber suggests an action on status variable, 166 perform the action 167 168 Note : If we receive error from a subscriber, we log it and move on. 169 170 @param [in] thd THD handle 171 @param [in] error_handler Error handler class 172 @param [in] connection_event Event information 173 */ 174 175 void notify_event(MYSQL_THD thd,Error_handler * error_handler,const mysql_event_connection * connection_event)176 Connection_event_coordinator::notify_event( 177 MYSQL_THD thd, 178 Error_handler *error_handler, 179 const mysql_event_connection *connection_event) 180 { 181 DBUG_ENTER("Connection_event_coordinator::notify_event"); 182 std::vector<Connection_event_subscriber>::iterator it= m_subscribers.begin(); 183 184 while (it != m_subscribers.end()) 185 { 186 Connection_event_subscriber event_subscriber= *it; 187 (void)event_subscriber.m_subscriber->notify_event(thd, this, 188 connection_event, 189 error_handler); 190 191 ++it; 192 } 193 194 DBUG_VOID_RETURN; 195 } 196 197 198 /** 199 Process change in sys_var value 200 201 Iterate through all subscribers 202 - If a subscriber has shown interest in getting notification for given 203 system variable, call notify_sys_var. 204 205 Note : If we receive error from a subscriber, we log it and move on. 206 207 @param [in] error_hanlder Error handler class 208 @param [in] opt_connection_control Variable information 209 @param [in] new_value New value for variable 210 */ 211 212 void notify_sys_var(Error_handler * error_handler,opt_connection_control variable,void * new_value)213 Connection_event_coordinator::notify_sys_var(Error_handler *error_handler, 214 opt_connection_control variable, 215 void *new_value) 216 { 217 DBUG_ENTER("Connection_event_coordinator::notify_sys_var"); 218 std::vector<Connection_event_subscriber>::iterator it= m_subscribers.begin(); 219 220 while (it != m_subscribers.end()) 221 { 222 Connection_event_subscriber event_subscriber= *it; 223 if (event_subscriber.m_sys_vars[variable]) 224 { 225 (void) event_subscriber.m_subscriber->notify_sys_var(this, 226 variable, 227 new_value, 228 error_handler); 229 } 230 ++it; 231 } 232 233 DBUG_VOID_RETURN; 234 } 235 236 237 /** 238 Update a status variable 239 240 @param [in] observer Requestor 241 @param [in] status_var Status variable to be updated 242 @param [in] action Operation to be performed on status variable 243 244 @returns status of the operation 245 @retval false Success 246 @retval true Error in processing 247 */ 248 249 bool notify_status_var(Connection_event_observer ** observer,stats_connection_control status_var,status_var_action action)250 Connection_event_coordinator::notify_status_var(Connection_event_observer **observer, 251 stats_connection_control status_var, 252 status_var_action action) 253 { 254 DBUG_ENTER("Connection_event_coordinator::notify_status_var"); 255 bool error= false; 256 257 if (m_status_vars_subscription[status_var] == *observer) 258 { 259 if (status_var < STAT_LAST) 260 { 261 switch (action) 262 { 263 case ACTION_INC: 264 { 265 my_atomic_add64(&g_statistics.stats_array[status_var], 1); 266 break; 267 } 268 case ACTION_RESET: 269 { 270 my_atomic_store64(&g_statistics.stats_array[status_var], 0); 271 break; 272 } 273 default: 274 { 275 error= true; 276 DBUG_ASSERT(FALSE); 277 break; 278 } 279 } 280 } 281 } 282 283 DBUG_RETURN(error); 284 } 285 } 286