1 package org.jgroups.protocols;
2 
3 import org.jgroups.Address;
4 import org.jgroups.annotations.Property;
5 
6 import java.io.ByteArrayInputStream;
7 import java.io.ByteArrayOutputStream;
8 import java.io.DataInputStream;
9 import java.io.DataOutputStream;
10 import java.io.IOException;
11 import java.sql.Connection;
12 import java.sql.DriverManager;
13 import java.sql.PreparedStatement;
14 import java.sql.ResultSet;
15 import java.sql.SQLException;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.List;
19 
20 import javax.naming.InitialContext;
21 import javax.naming.NamingException;
22 import javax.sql.DataSource;
23 
24 /**
25  * <p>Discovery protocol using a JDBC connection to a shared database.
26  * Connection options can be defined as configuration properties, or the JNDI
27  * name of a <code>DataSource</code> can be provided (avoid providing both).</p>
28  *
29  * <p>Both the schema and the used SQL statements can be customized; make sure
30  * the order of parameters of such customized SQL statements is maintained and
31  * that compatible types are used for the columns. The recommended schema uses a
32  * single table, with two String columns being used primary key (local address,
33  * cluster name) and a third column to store the serialized form of the objects
34  * needed by JGroups.</p>
35  *
36  * <p>A default table will be created at first connection, errors during this
37  * operation are not considered critical. Set the <code>initialize_sql</code>
38  * to an empty value to prevent this initial table creation, or change it to
39  * create a customized table.</p>
40  *
41  * @author Sanne Grinovero
42  * @since 2.12
43  */
44 public class JDBC_PING extends FILE_PING {
45 
46     /* -----------------------------------------    Properties     -------------------------------------------------- */
47 
48     @Property(description = "The JDBC connection URL", writable = false)
49     protected String connection_url = null;
50 
51     @Property(description = "The JDBC connection username", writable = false)
52     protected String connection_username = null;
53 
54     @Property(description = "The JDBC connection password", writable = false)
55     protected String connection_password = null;
56 
57     @Property(description = "The JDBC connection driver name", writable = false)
58     protected String connection_driver = null;
59 
60     @Property(description = "If not empty, this SQL statement will be performed at startup."
61                 + "Customize it to create the needed table on those databases which permit table creation attempt without loosing data, such as "
62                 + "PostgreSQL and MySQL (using IF NOT EXISTS). To allow for creation attempts, errors performing this statement will be logged"
63                 + "but not considered fatal. To avoid any DDL operation, set this to an empty string.")
64     protected String initialize_sql =
65         "CREATE TABLE JGROUPSPING (" +
66         "own_addr varchar(200) NOT NULL, " +
67         "cluster_name varchar(200) NOT NULL, " +
68         "ping_data varbinary(5000) DEFAULT NULL, " +
69         "PRIMARY KEY (own_addr, cluster_name) )";
70 
71     @Property(description = "SQL used to insert a new row. Customizable, but keep the order of parameters and pick compatible types: " +
72         "1)Own Address, as String 2)Cluster name, as String 3)Serialized PingData as byte[]")
73     protected String insert_single_sql = "INSERT INTO JGROUPSPING (own_addr, cluster_name, ping_data) values (?, ?, ?)";
74 
75     @Property(description = "SQL used to delete a row. Customizable, but keep the order of parameters and pick compatible types: " +
76         "1)Own Address, as String 2)Cluster name, as String")
77     protected String delete_single_sql = "DELETE FROM JGROUPSPING WHERE own_addr=? AND cluster_name=?";
78 
79     @Property(description = "SQL used to fetch all node's PingData. Customizable, but keep the order of parameters and pick compatible types: " +
80                 "only one parameter needed, String compatible, representing the Cluster name. Must return a byte[], the Serialized PingData as" +
81                 " it was stored by the insert_single_sql statement")
82     protected String select_all_pingdata_sql = "SELECT ping_data FROM JGROUPSPING WHERE cluster_name=?";
83 
84     @Property(description = "To use a DataSource registered in JNDI, specify the JNDI name here. " +
85         "This is an alternative to all connection_* configuration options: if this property is not empty, then all connection related" +
86         "properties must be empty.")
87     protected String datasource_jndi_name;
88 
89     /* --------------------------------------------- Fields ------------------------------------------------------ */
90 
91     private DataSource dataSourceFromJNDI = null;
92 
93     @Override
init()94     public void init() throws Exception {
95         super.init();
96         verifyconfigurationParameters();
97         if (stringIsEmpty(datasource_jndi_name)) {
98             loadDriver();
99         }
100         else {
101             dataSourceFromJNDI = getDataSourceFromJNDI(datasource_jndi_name.trim());
102         }
103         attemptSchemaInitialization();
104     }
105 
106     @Override
stop()107     public void stop() {
108         try {
109             deleteSelf();
110         } catch (SQLException e) {
111             log.error("Error while unregistering of our own Address from JDBC_PING database during shutdown", e);
112         }
113         super.stop();
114     }
115 
attemptSchemaInitialization()116     protected void attemptSchemaInitialization() {
117         if (stringIsEmpty(initialize_sql)) {
118             log.info("Table creation step skipped: initialize_sql property is missing");
119             return;
120         }
121         Connection connection = getConnection();
122         if (connection != null) {
123             try {
124                 try {
125                     PreparedStatement preparedStatement =
126                         connection.prepareStatement(initialize_sql);
127                     preparedStatement.execute();
128                     log.info("Table created for JDBC_PING Discovery Protocol");
129                 } catch (SQLException e) {
130                     if (log.isDebugEnabled()) {
131                         log.debug("Could not execute initialize_sql statement; not necessarily an error.", e);
132                     }
133                     else {
134                         //avoid printing out the stacktrace
135                         log.info("Could not execute initialize_sql statement; not necessarily an error. Set to debug logging level for details.");
136                     }
137                 }
138             } finally {
139                 try {
140                     connection.close();
141                 } catch (SQLException e) {
142                     log.error("Error closing connection", e);
143                 }
144             }
145         }
146     }
147 
loadDriver()148     protected void loadDriver() {
149         if (stringIsEmpty(connection_driver)) {
150             return;
151         }
152         if (log.isDebugEnabled()) {
153             log.debug("Registering JDBC Driver named '" + connection_driver + "'");
154         }
155         try {
156             Class.forName(connection_driver);
157         } catch (ClassNotFoundException e) {
158             throw new IllegalArgumentException("JDBC Driver required for JDBC_PING Discovery"
159                         + "protocol could not be loaded: '" + connection_driver + "'");
160         }
161     }
162 
getConnection()163     protected Connection getConnection() {
164         if (dataSourceFromJNDI == null) {
165             Connection connection;
166             try {
167                 connection = DriverManager.getConnection(
168                             connection_url, connection_username, connection_password);
169             } catch (SQLException e) {
170                 log.error("Could not open connection to database", e);
171                 return null;
172             }
173             if (connection == null) {
174                 log.error("Received null connection from the DriverManager!");
175             }
176             return connection;
177         }
178         else {
179             try {
180                 return dataSourceFromJNDI.getConnection();
181             } catch (SQLException e) {
182                 log.error("Could not open connection to database", e);
183                 return null;
184             }
185         }
186     }
187 
188     @Override
createRootDir()189     protected void createRootDir() {
190         // No-Op to prevent file creations from super.init().
191         // TODO refactor this class and FILE_PING to have a common parent?
192         // would also be nice to remove unwanted configuration properties which where inherited.
193     }
194 
195     @Override
remove(String clustername, Address addr)196     protected void remove(String clustername, Address addr) {
197         final String addressAsString = addressAsString(addr);
198         try {
199             delete(clustername, addressAsString);
200         } catch (SQLException e) {
201             log.error("Error", e);
202         }
203     }
204 
205     @Override
readAll(String clustername)206     protected List<PingData> readAll(String clustername) {
207         final Connection connection = getConnection();
208         if (connection != null) {
209             try {
210                 return readAll(connection, clustername);
211             } catch (SQLException e) {
212                 log.error("Error reading JDBC_PING table", e);
213                 return Collections.emptyList();
214             } finally {
215                 closeConnection(connection);
216             }
217         } else {
218             return Collections.emptyList();
219         }
220     }
221 
readAll(Connection connection, String clustername)222     protected List<PingData> readAll(Connection connection, String clustername) throws SQLException {
223         PreparedStatement ps = connection.prepareStatement(select_all_pingdata_sql);
224         try {
225             ps.setString(1, clustername);
226             ResultSet resultSet = ps.executeQuery();
227             ArrayList<PingData> results = new ArrayList<PingData>();
228             while (resultSet.next()) {
229                 byte[] bytes = resultSet.getBytes(1);
230                 PingData pingData = deserialize(bytes);
231                 results.add(pingData);
232             }
233             return results;
234         } finally {
235             ps.close();
236         }
237     }
238 
239     @Override
writeToFile(PingData data, String clustername)240     protected void writeToFile(PingData data, String clustername) {
241         final String ownAddress = addressAsString(data.getAddress());
242         final Connection connection = getConnection();
243         if (connection != null) {
244             try {
245                 delete(connection, clustername, ownAddress);
246                 insert(connection, data, clustername, ownAddress);
247             } catch (SQLException e) {
248                 log.error("Error updating JDBC_PING table", e);
249             } finally {
250                 closeConnection(connection);
251             }
252         }
253         else {
254             log.error("Failed to store PingData in database");
255         }
256     }
257 
insert(Connection connection, PingData data, String clustername, String address)258     protected void insert(Connection connection, PingData data, String clustername, String address) throws SQLException {
259         final byte[] serializedPingData = serializeWithoutView(data);
260         PreparedStatement ps = connection.prepareStatement(insert_single_sql);
261         try {
262             ps.setString(1, address);
263             ps.setString(2, clustername);
264             ps.setBytes(3, serializedPingData);
265             ps.executeUpdate();
266             if (log.isDebugEnabled())
267                 log.debug("Registered " + address + " for clustername " + clustername + " into database.");
268         } finally {
269             ps.close();
270         }
271     }
272 
delete(Connection connection, String clustername, String addressToDelete)273     protected void delete(Connection connection, String clustername, String addressToDelete) throws SQLException {
274         PreparedStatement ps = connection.prepareStatement(delete_single_sql);
275         try {
276             ps.setString(1, addressToDelete);
277             ps.setString(2, clustername);
278             ps.executeUpdate();
279             if (log.isDebugEnabled())
280                 log.debug("Removed " + addressToDelete + " for clustername " + clustername + " from database.");
281         } finally {
282             ps.close();
283         }
284     }
285 
delete(String clustername, String addressToDelete)286     protected void delete(String clustername, String addressToDelete) throws SQLException {
287         final Connection connection = getConnection();
288         if (connection != null) {
289             try {
290                 delete(connection, clustername, addressToDelete);
291             } catch (SQLException e) {
292                 log.error("Error updating JDBC_PING table", e);
293             } finally {
294                 closeConnection(connection);
295             }
296         } else {
297             log.error("Failed to delete PingData in database");
298         }
299     }
300 
deleteSelf()301     protected void deleteSelf() throws SQLException {
302         final String ownAddress = addressAsString(local_addr);
303         delete(group_addr, ownAddress);
304     }
305 
306     /**
307      * Creates a byte[] representation of the PingData, but DISCARDING
308      * the view it contains.
309      * @param data the PingData instance to serialize.
310      * @return
311      */
serializeWithoutView(PingData data)312     protected byte[] serializeWithoutView(PingData data) {
313         final PingData clone = new PingData(data.getAddress(), null, data.isServer(), data.getLogicalName(),  data.getPhysicalAddrs());
314         ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream( 512 );
315         DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
316         try {
317             clone.writeTo(outputStream);
318         } catch (IOException e) {
319             //not expecting this to happen as it's an in-memory stream
320             log.error("Error", e);
321         }
322         return byteArrayOutputStream.toByteArray();
323     }
324 
deserialize(final byte[] data)325     protected PingData deserialize(final byte[] data) {
326         final PingData pingData = new PingData();
327         final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
328         final DataInputStream outputStream = new DataInputStream(byteArrayInputStream);
329         try {
330             pingData.readFrom(outputStream);
331         } catch (IllegalAccessException e) {
332             log.error("Error", e);
333         } catch (InstantiationException e) {
334             log.error("Error", e);
335         } catch (IOException e) {
336             // not expecting this to happen as it's an in-memory stream
337             log.error("Error", e);
338         }
339         return pingData;
340     }
341 
closeConnection(final Connection connection)342     protected void closeConnection(final Connection connection) {
343         try {
344             connection.close();
345         } catch (SQLException e) {
346             log.error("Error closing connection to JDBC_PING database", e);
347         }
348     }
349 
getDataSourceFromJNDI(String name)350     protected DataSource getDataSourceFromJNDI(String name) {
351         final DataSource dataSource;
352         InitialContext ctx = null;
353         try {
354             ctx = new InitialContext();
355             Object wathever = ctx.lookup(name);
356             if (wathever == null) {
357                 throw new IllegalArgumentException(
358                             "JNDI name " + name + " is not bound");
359             } else if (!(wathever instanceof DataSource)) {
360                 throw new IllegalArgumentException(
361                             "JNDI name " + name + " was found but is not a DataSource");
362             } else {
363                 dataSource = (DataSource) wathever;
364                 if (log.isDebugEnabled()) {
365                     log.debug(
366                             "Datasource found via JNDI lookup via name: '"+ name + "'.");
367                 }
368                 return dataSource;
369             }
370         } catch (NamingException e) {
371             throw new IllegalArgumentException(
372                         "Could not lookup datasource " + name, e);
373         } finally {
374             if (ctx != null) {
375                 try {
376                     ctx.close();
377                 } catch (NamingException e) {
378                     log.warn("Failed to close naming context.", e);
379                 }
380             }
381         }
382     }
383 
verifyconfigurationParameters()384     protected void verifyconfigurationParameters() {
385         if (stringIsEmpty(this.connection_url) ||
386                     stringIsEmpty(this.connection_driver) ||
387                     stringIsEmpty(this.connection_url) ||
388                     stringIsEmpty(this.connection_username) ) {
389             if (stringIsEmpty(this.datasource_jndi_name)) {
390                 throw new IllegalArgumentException("Either the 4 configuration properties starting with 'connection_' or the datasource_jndi_name must be set");
391             }
392         }
393         if (stringNotEmpty(this.connection_url) ||
394                     stringNotEmpty(this.connection_driver) ||
395                     stringNotEmpty(this.connection_url) ||
396                     stringNotEmpty(this.connection_username) ) {
397             if (stringNotEmpty(this.datasource_jndi_name)) {
398                 throw new IllegalArgumentException("When using the 'datasource_jndi_name' configuration property, all properties starting with 'connection_' must not be set");
399             }
400         }
401         if (stringIsEmpty(this.insert_single_sql)) {
402             throw new IllegalArgumentException("The insert_single_sql configuration property is mandatory");
403         }
404         if (stringIsEmpty(this.delete_single_sql)) {
405             throw new IllegalArgumentException("The delete_single_sql configuration property is mandatory");
406         }
407         if (stringIsEmpty(this.select_all_pingdata_sql)) {
408             throw new IllegalArgumentException("The select_all_pingdata_sql configuration property is mandatory");
409         }
410     }
411 
stringIsEmpty(final String value)412     private static final boolean stringIsEmpty(final String value) {
413         return value == null || value.trim().length() == 0;
414     }
415 
stringNotEmpty(final String value)416     private static final boolean stringNotEmpty(final String value) {
417         return value != null && value.trim().length() >= 0;
418     }
419 
420 }
421