1 package org.jgroups.protocols; 2 3 import org.jgroups.Address; 4 import org.jgroups.annotations.Property; 5 6 import java.io.ByteArrayInputStream; 7 import java.io.ByteArrayOutputStream; 8 import java.io.DataInputStream; 9 import java.io.DataOutputStream; 10 import java.io.IOException; 11 import java.sql.Connection; 12 import java.sql.DriverManager; 13 import java.sql.PreparedStatement; 14 import java.sql.ResultSet; 15 import java.sql.SQLException; 16 import java.util.ArrayList; 17 import java.util.Collections; 18 import java.util.List; 19 20 import javax.naming.InitialContext; 21 import javax.naming.NamingException; 22 import javax.sql.DataSource; 23 24 /** 25 * <p>Discovery protocol using a JDBC connection to a shared database. 26 * Connection options can be defined as configuration properties, or the JNDI 27 * name of a <code>DataSource</code> can be provided (avoid providing both).</p> 28 * 29 * <p>Both the schema and the used SQL statements can be customized; make sure 30 * the order of parameters of such customized SQL statements is maintained and 31 * that compatible types are used for the columns. The recommended schema uses a 32 * single table, with two String columns being used primary key (local address, 33 * cluster name) and a third column to store the serialized form of the objects 34 * needed by JGroups.</p> 35 * 36 * <p>A default table will be created at first connection, errors during this 37 * operation are not considered critical. Set the <code>initialize_sql</code> 38 * to an empty value to prevent this initial table creation, or change it to 39 * create a customized table.</p> 40 * 41 * @author Sanne Grinovero 42 * @since 2.12 43 */ 44 public class JDBC_PING extends FILE_PING { 45 46 /* ----------------------------------------- Properties -------------------------------------------------- */ 47 48 @Property(description = "The JDBC connection URL", writable = false) 49 protected String connection_url = null; 50 51 @Property(description = "The JDBC connection username", writable = false) 52 protected String connection_username = null; 53 54 @Property(description = "The JDBC connection password", writable = false) 55 protected String connection_password = null; 56 57 @Property(description = "The JDBC connection driver name", writable = false) 58 protected String connection_driver = null; 59 60 @Property(description = "If not empty, this SQL statement will be performed at startup." 61 + "Customize it to create the needed table on those databases which permit table creation attempt without loosing data, such as " 62 + "PostgreSQL and MySQL (using IF NOT EXISTS). To allow for creation attempts, errors performing this statement will be logged" 63 + "but not considered fatal. To avoid any DDL operation, set this to an empty string.") 64 protected String initialize_sql = 65 "CREATE TABLE JGROUPSPING (" + 66 "own_addr varchar(200) NOT NULL, " + 67 "cluster_name varchar(200) NOT NULL, " + 68 "ping_data varbinary(5000) DEFAULT NULL, " + 69 "PRIMARY KEY (own_addr, cluster_name) )"; 70 71 @Property(description = "SQL used to insert a new row. Customizable, but keep the order of parameters and pick compatible types: " + 72 "1)Own Address, as String 2)Cluster name, as String 3)Serialized PingData as byte[]") 73 protected String insert_single_sql = "INSERT INTO JGROUPSPING (own_addr, cluster_name, ping_data) values (?, ?, ?)"; 74 75 @Property(description = "SQL used to delete a row. Customizable, but keep the order of parameters and pick compatible types: " + 76 "1)Own Address, as String 2)Cluster name, as String") 77 protected String delete_single_sql = "DELETE FROM JGROUPSPING WHERE own_addr=? AND cluster_name=?"; 78 79 @Property(description = "SQL used to fetch all node's PingData. Customizable, but keep the order of parameters and pick compatible types: " + 80 "only one parameter needed, String compatible, representing the Cluster name. Must return a byte[], the Serialized PingData as" + 81 " it was stored by the insert_single_sql statement") 82 protected String select_all_pingdata_sql = "SELECT ping_data FROM JGROUPSPING WHERE cluster_name=?"; 83 84 @Property(description = "To use a DataSource registered in JNDI, specify the JNDI name here. " + 85 "This is an alternative to all connection_* configuration options: if this property is not empty, then all connection related" + 86 "properties must be empty.") 87 protected String datasource_jndi_name; 88 89 /* --------------------------------------------- Fields ------------------------------------------------------ */ 90 91 private DataSource dataSourceFromJNDI = null; 92 93 @Override init()94 public void init() throws Exception { 95 super.init(); 96 verifyconfigurationParameters(); 97 if (stringIsEmpty(datasource_jndi_name)) { 98 loadDriver(); 99 } 100 else { 101 dataSourceFromJNDI = getDataSourceFromJNDI(datasource_jndi_name.trim()); 102 } 103 attemptSchemaInitialization(); 104 } 105 106 @Override stop()107 public void stop() { 108 try { 109 deleteSelf(); 110 } catch (SQLException e) { 111 log.error("Error while unregistering of our own Address from JDBC_PING database during shutdown", e); 112 } 113 super.stop(); 114 } 115 attemptSchemaInitialization()116 protected void attemptSchemaInitialization() { 117 if (stringIsEmpty(initialize_sql)) { 118 log.info("Table creation step skipped: initialize_sql property is missing"); 119 return; 120 } 121 Connection connection = getConnection(); 122 if (connection != null) { 123 try { 124 try { 125 PreparedStatement preparedStatement = 126 connection.prepareStatement(initialize_sql); 127 preparedStatement.execute(); 128 log.info("Table created for JDBC_PING Discovery Protocol"); 129 } catch (SQLException e) { 130 if (log.isDebugEnabled()) { 131 log.debug("Could not execute initialize_sql statement; not necessarily an error.", e); 132 } 133 else { 134 //avoid printing out the stacktrace 135 log.info("Could not execute initialize_sql statement; not necessarily an error. Set to debug logging level for details."); 136 } 137 } 138 } finally { 139 try { 140 connection.close(); 141 } catch (SQLException e) { 142 log.error("Error closing connection", e); 143 } 144 } 145 } 146 } 147 loadDriver()148 protected void loadDriver() { 149 if (stringIsEmpty(connection_driver)) { 150 return; 151 } 152 if (log.isDebugEnabled()) { 153 log.debug("Registering JDBC Driver named '" + connection_driver + "'"); 154 } 155 try { 156 Class.forName(connection_driver); 157 } catch (ClassNotFoundException e) { 158 throw new IllegalArgumentException("JDBC Driver required for JDBC_PING Discovery" 159 + "protocol could not be loaded: '" + connection_driver + "'"); 160 } 161 } 162 getConnection()163 protected Connection getConnection() { 164 if (dataSourceFromJNDI == null) { 165 Connection connection; 166 try { 167 connection = DriverManager.getConnection( 168 connection_url, connection_username, connection_password); 169 } catch (SQLException e) { 170 log.error("Could not open connection to database", e); 171 return null; 172 } 173 if (connection == null) { 174 log.error("Received null connection from the DriverManager!"); 175 } 176 return connection; 177 } 178 else { 179 try { 180 return dataSourceFromJNDI.getConnection(); 181 } catch (SQLException e) { 182 log.error("Could not open connection to database", e); 183 return null; 184 } 185 } 186 } 187 188 @Override createRootDir()189 protected void createRootDir() { 190 // No-Op to prevent file creations from super.init(). 191 // TODO refactor this class and FILE_PING to have a common parent? 192 // would also be nice to remove unwanted configuration properties which where inherited. 193 } 194 195 @Override remove(String clustername, Address addr)196 protected void remove(String clustername, Address addr) { 197 final String addressAsString = addressAsString(addr); 198 try { 199 delete(clustername, addressAsString); 200 } catch (SQLException e) { 201 log.error("Error", e); 202 } 203 } 204 205 @Override readAll(String clustername)206 protected List<PingData> readAll(String clustername) { 207 final Connection connection = getConnection(); 208 if (connection != null) { 209 try { 210 return readAll(connection, clustername); 211 } catch (SQLException e) { 212 log.error("Error reading JDBC_PING table", e); 213 return Collections.emptyList(); 214 } finally { 215 closeConnection(connection); 216 } 217 } else { 218 return Collections.emptyList(); 219 } 220 } 221 readAll(Connection connection, String clustername)222 protected List<PingData> readAll(Connection connection, String clustername) throws SQLException { 223 PreparedStatement ps = connection.prepareStatement(select_all_pingdata_sql); 224 try { 225 ps.setString(1, clustername); 226 ResultSet resultSet = ps.executeQuery(); 227 ArrayList<PingData> results = new ArrayList<PingData>(); 228 while (resultSet.next()) { 229 byte[] bytes = resultSet.getBytes(1); 230 PingData pingData = deserialize(bytes); 231 results.add(pingData); 232 } 233 return results; 234 } finally { 235 ps.close(); 236 } 237 } 238 239 @Override writeToFile(PingData data, String clustername)240 protected void writeToFile(PingData data, String clustername) { 241 final String ownAddress = addressAsString(data.getAddress()); 242 final Connection connection = getConnection(); 243 if (connection != null) { 244 try { 245 delete(connection, clustername, ownAddress); 246 insert(connection, data, clustername, ownAddress); 247 } catch (SQLException e) { 248 log.error("Error updating JDBC_PING table", e); 249 } finally { 250 closeConnection(connection); 251 } 252 } 253 else { 254 log.error("Failed to store PingData in database"); 255 } 256 } 257 insert(Connection connection, PingData data, String clustername, String address)258 protected void insert(Connection connection, PingData data, String clustername, String address) throws SQLException { 259 final byte[] serializedPingData = serializeWithoutView(data); 260 PreparedStatement ps = connection.prepareStatement(insert_single_sql); 261 try { 262 ps.setString(1, address); 263 ps.setString(2, clustername); 264 ps.setBytes(3, serializedPingData); 265 ps.executeUpdate(); 266 if (log.isDebugEnabled()) 267 log.debug("Registered " + address + " for clustername " + clustername + " into database."); 268 } finally { 269 ps.close(); 270 } 271 } 272 delete(Connection connection, String clustername, String addressToDelete)273 protected void delete(Connection connection, String clustername, String addressToDelete) throws SQLException { 274 PreparedStatement ps = connection.prepareStatement(delete_single_sql); 275 try { 276 ps.setString(1, addressToDelete); 277 ps.setString(2, clustername); 278 ps.executeUpdate(); 279 if (log.isDebugEnabled()) 280 log.debug("Removed " + addressToDelete + " for clustername " + clustername + " from database."); 281 } finally { 282 ps.close(); 283 } 284 } 285 delete(String clustername, String addressToDelete)286 protected void delete(String clustername, String addressToDelete) throws SQLException { 287 final Connection connection = getConnection(); 288 if (connection != null) { 289 try { 290 delete(connection, clustername, addressToDelete); 291 } catch (SQLException e) { 292 log.error("Error updating JDBC_PING table", e); 293 } finally { 294 closeConnection(connection); 295 } 296 } else { 297 log.error("Failed to delete PingData in database"); 298 } 299 } 300 deleteSelf()301 protected void deleteSelf() throws SQLException { 302 final String ownAddress = addressAsString(local_addr); 303 delete(group_addr, ownAddress); 304 } 305 306 /** 307 * Creates a byte[] representation of the PingData, but DISCARDING 308 * the view it contains. 309 * @param data the PingData instance to serialize. 310 * @return 311 */ serializeWithoutView(PingData data)312 protected byte[] serializeWithoutView(PingData data) { 313 final PingData clone = new PingData(data.getAddress(), null, data.isServer(), data.getLogicalName(), data.getPhysicalAddrs()); 314 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream( 512 ); 315 DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); 316 try { 317 clone.writeTo(outputStream); 318 } catch (IOException e) { 319 //not expecting this to happen as it's an in-memory stream 320 log.error("Error", e); 321 } 322 return byteArrayOutputStream.toByteArray(); 323 } 324 deserialize(final byte[] data)325 protected PingData deserialize(final byte[] data) { 326 final PingData pingData = new PingData(); 327 final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data); 328 final DataInputStream outputStream = new DataInputStream(byteArrayInputStream); 329 try { 330 pingData.readFrom(outputStream); 331 } catch (IllegalAccessException e) { 332 log.error("Error", e); 333 } catch (InstantiationException e) { 334 log.error("Error", e); 335 } catch (IOException e) { 336 // not expecting this to happen as it's an in-memory stream 337 log.error("Error", e); 338 } 339 return pingData; 340 } 341 closeConnection(final Connection connection)342 protected void closeConnection(final Connection connection) { 343 try { 344 connection.close(); 345 } catch (SQLException e) { 346 log.error("Error closing connection to JDBC_PING database", e); 347 } 348 } 349 getDataSourceFromJNDI(String name)350 protected DataSource getDataSourceFromJNDI(String name) { 351 final DataSource dataSource; 352 InitialContext ctx = null; 353 try { 354 ctx = new InitialContext(); 355 Object wathever = ctx.lookup(name); 356 if (wathever == null) { 357 throw new IllegalArgumentException( 358 "JNDI name " + name + " is not bound"); 359 } else if (!(wathever instanceof DataSource)) { 360 throw new IllegalArgumentException( 361 "JNDI name " + name + " was found but is not a DataSource"); 362 } else { 363 dataSource = (DataSource) wathever; 364 if (log.isDebugEnabled()) { 365 log.debug( 366 "Datasource found via JNDI lookup via name: '"+ name + "'."); 367 } 368 return dataSource; 369 } 370 } catch (NamingException e) { 371 throw new IllegalArgumentException( 372 "Could not lookup datasource " + name, e); 373 } finally { 374 if (ctx != null) { 375 try { 376 ctx.close(); 377 } catch (NamingException e) { 378 log.warn("Failed to close naming context.", e); 379 } 380 } 381 } 382 } 383 verifyconfigurationParameters()384 protected void verifyconfigurationParameters() { 385 if (stringIsEmpty(this.connection_url) || 386 stringIsEmpty(this.connection_driver) || 387 stringIsEmpty(this.connection_url) || 388 stringIsEmpty(this.connection_username) ) { 389 if (stringIsEmpty(this.datasource_jndi_name)) { 390 throw new IllegalArgumentException("Either the 4 configuration properties starting with 'connection_' or the datasource_jndi_name must be set"); 391 } 392 } 393 if (stringNotEmpty(this.connection_url) || 394 stringNotEmpty(this.connection_driver) || 395 stringNotEmpty(this.connection_url) || 396 stringNotEmpty(this.connection_username) ) { 397 if (stringNotEmpty(this.datasource_jndi_name)) { 398 throw new IllegalArgumentException("When using the 'datasource_jndi_name' configuration property, all properties starting with 'connection_' must not be set"); 399 } 400 } 401 if (stringIsEmpty(this.insert_single_sql)) { 402 throw new IllegalArgumentException("The insert_single_sql configuration property is mandatory"); 403 } 404 if (stringIsEmpty(this.delete_single_sql)) { 405 throw new IllegalArgumentException("The delete_single_sql configuration property is mandatory"); 406 } 407 if (stringIsEmpty(this.select_all_pingdata_sql)) { 408 throw new IllegalArgumentException("The select_all_pingdata_sql configuration property is mandatory"); 409 } 410 } 411 stringIsEmpty(final String value)412 private static final boolean stringIsEmpty(final String value) { 413 return value == null || value.trim().length() == 0; 414 } 415 stringNotEmpty(final String value)416 private static final boolean stringNotEmpty(final String value) { 417 return value != null && value.trim().length() >= 0; 418 } 419 420 } 421