From 579fa1efc67555b2cdc3709ed3466a6c742d8701 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Mon, 13 May 2024 16:05:09 +0200 Subject: [PATCH] - JDBC_PING2: new schema (https://issues.redhat.com/browse/JGRP-2795) - Tested with postgresql and hsqldb - Tested with MySql - Removed contains() (never used) - Changed uuidTo/FromString() -> addressTo/FromString() - Use of try(resource) with Connection - Support for stored procedures in JDBC_PING2 - Sample configs for Postgres and MySql - Added sample config for hsqldb - Updated documentation --- conf/jdbc-hsql.xml | 60 +++ conf/jdbc-mysql.xml | 63 +++ conf/jdbc-pg.xml | 59 +++ conf/jg-messages.properties | 1 - conf/jg-protocol-ids.xml | 1 + doc/manual/protocols.adoc | 93 +++- src/org/jgroups/protocols/FILE_PING.java | 2 - src/org/jgroups/protocols/JDBC_PING2.java | 479 ++++++++++++++++++ src/org/jgroups/stack/IpAddress.java | 6 +- src/org/jgroups/util/Util.java | 41 +- .../org/jgroups/tests/JDBC_PING2_Test.java | 14 + .../org/jgroups/tests/UtilTest.java | 25 +- 12 files changed, 831 insertions(+), 13 deletions(-) create mode 100644 conf/jdbc-hsql.xml create mode 100644 conf/jdbc-mysql.xml create mode 100644 conf/jdbc-pg.xml create mode 100644 src/org/jgroups/protocols/JDBC_PING2.java create mode 100644 tests/junit-functional/org/jgroups/tests/JDBC_PING2_Test.java diff --git a/conf/jdbc-hsql.xml b/conf/jdbc-hsql.xml new file mode 100644 index 00000000000..50a4decd6de --- /dev/null +++ b/conf/jdbc-hsql.xml @@ -0,0 +1,60 @@ + + + + + + + /> + + + + + + + + + + + + + diff --git a/conf/jdbc-mysql.xml b/conf/jdbc-mysql.xml new file mode 100644 index 00000000000..85654e6eefa --- /dev/null +++ b/conf/jdbc-mysql.xml @@ -0,0 +1,63 @@ + + + + + + + /> + + + + + + + + + + + + + diff --git a/conf/jdbc-pg.xml b/conf/jdbc-pg.xml new file mode 100644 index 00000000000..eb840c3feda --- /dev/null +++ b/conf/jdbc-pg.xml @@ -0,0 +1,59 @@ + + + + + + + /> + + + + + + + + + + + + + diff --git a/conf/jg-messages.properties b/conf/jg-messages.properties index afebca972ed..e0d60631a56 100644 --- a/conf/jg-messages.properties +++ b/conf/jg-messages.properties @@ -78,7 +78,6 @@ ErrorReadingObjects = JGRP000140: Error reading objects ErrorReadingTable = JGRP000141: Error reading table ErrorSerializingPingData = JGRP000143: error serializing PingData ErrorUnmarshallingObject = JGRP000144: Error unmarshalling object -ErrorUpdatingJDBCPINGTable = JGRP000145: Error updating JDBC_PING table ExceptionOccurredTryingToFragmentMessage = JGRP000152: exception occurred trying to fragment message ExceptionSwitchingToClientRole = JGRP000155: exception switching to client role ExceptionSwitchingToCoordinatorRole = JGRP000156: exception switching to coordinator role diff --git a/conf/jg-protocol-ids.xml b/conf/jg-protocol-ids.xml index 7e330e18b09..87ad2549037 100644 --- a/conf/jg-protocol-ids.xml +++ b/conf/jg-protocol-ids.xml @@ -64,6 +64,7 @@ + diff --git a/doc/manual/protocols.adoc b/doc/manual/protocols.adoc index b4dce6559e6..1cfc22ddb50 100644 --- a/doc/manual/protocols.adoc +++ b/doc/manual/protocols.adoc @@ -585,10 +585,10 @@ link:$$https://github.com/belaban/JGroups/blob/master/doc/design/CloudBasedDisco ===== Removal of zombie files By default, a new coordinator C never removes a file created by an old coordinator `A`. E.g. in `{A,B,C,D}` (with -coordinator `A`), if `C` becomes coordinator on a split `{A,B} | {C,D}`, then `C` doesn't remove `A`'s file, as there +coordinator `A`), if `C` becomes coordinator on a split `{A,B} | {C,D}`, then `C` doesn't remove `A` 's file, as there is no way for `C` to know whether `A` crashed or whether `A` was partitioned away. -Every coordinator `P` installs a shutdown hook which removes `P`'s file on termination. However, this doesn't apply +Every coordinator `P` installs a shutdown hook which removes `P` 's file on termination. However, this doesn't apply to a process killed ungracefully, e.g. by `kill -9`. In this case, no shutdown hook will get called. If we had view `{A,B,C}`, and `A` was killed via kill -9, and `B` takes over, we'd have files `A.list` and `B.list`. @@ -613,7 +613,7 @@ it is recommended to set both attributes to false. ${FILE_PING} - +[[JDBC_PING]] ==== JDBC_PING JDBC_PING uses a DB to store information about cluster nodes used for discovery. All cluster nodes are supposed to be @@ -623,13 +623,13 @@ When a node starts, it queries information about existing members from the datab then asks the coord to join the cluster. It also inserts information about itself into the table, so others can subsequently find it. -When a node P has crashed, the current coordinator removes P's information from the DB. However, if there is a network +When a node P has crashed, the current coordinator removes `P` 's information from the DB. However, if there is a network split, then this can be problematic, as crashed members cannot be told from partitioned-away members. For instance, if we have `{A,B,C,D}`, and the split creates 2 subclusters `{A,B}` and `{C,D}`, then `A` would remove `{C,D}` because it thinks they crashed, and - likewise - `C` would remove `{A,B}`. -To solve this, every member re-inserts its information into the DB after a _view change_. So when `C` and `D`'s view +To solve this, every member re-inserts its information into the DB after a _view change_. So when `C` and `D` 's view changes from `{A,B,C,D}` to `{C,D}`, both sides of the split re-insert their information. Ditto for the other side of the network split. @@ -655,6 +655,89 @@ NOTE: Processes killed with kill -3 are removed from the DB as a shutdown handle ${JDBC_PING} +[[JDBC_PING2]] +==== JDBC_PING2 + +<> is quite old (created in 2010) and hasn't seen much maintenance ever since. JDBC_PING2 +(https://issues.redhat.com/browse/JGRP-2795) is the cleaned-up and refactored version of it. It changes the database +schema, hence the new name. + +Besides the refactoring, the main change is the new schema. Whereas the old schema has binary data (`PingData`), +the new one has only strings (varchars) and a boolean. It consists of + +* `address`: the stringified UUID (identity of a member) +* `name`: the logical name of a member +* `cluster`: the cluster name +* `ip`: the IP address and port of the member +* `coord`: whether this member is a coordinator + +Example: +.... +bela=# select * from jgroups; + address | name | cluster | ip | coord +---------------------------------------------+------+---------+--------------------+------- + uuid://eb4c91b5-238c-4dc3-b241-24017d14e8af | A | chat | 192.168.1.110:7800 | t + uuid://a023a43b-c68c-48af-ba6f-c686c953698f | B | chat | 192.168.1.110:7801 | f +(2 rows) +.... + +Whereas only binary data was seen for `address`, `ip` and `coord`, we now see human-readable data, which is +helpful for trouble-shooting / auditing / reporting. + +In additional, upgrading is possibly helped by this, as incompatible JGroups versions might be able to read each +other's data. + +===== Injecting a datasource +As an alternative to setting `connection_url`, `connection_username`, `connection_password` and `connection_driver`, +if an application already has a datasource configured, it can be injected into `JDBC_PING2`. This can be done in +two ways: + +* Fetching it from JNDI: to do this, `datasource_jndi_name` needs to be set +* User-defined code: `datasource_injecter_class` can be set to a fully-qualified class name, which implements +`Function`. An instance `inst` of this class will be created and and `inst.apply(jdbc)` will +be called, where `jdbc` points to the `JDBC_PING2` instance. The returned datasource will be used. + +===== Use of stored procedures +The default SQL statements used by `JDBC_PING2` are basic, to accommodate a wide number of SQL dialects. This can be +inefficient, especially when inserting a new row: the insertion needs to delete an existing row, before adding a new +one. This results in a `DELETE` being sent to the database, followed by an `INSERT`, resulting in two roundtrips. + +To reduce the two roundtrip to one, we can use a stored procedure (defined in `insert_sp`), for example (Postgres): +[source,xml] +---- + +---- + +The `insert_sp` defines a stored procedure `deleteAndInsert` accepting parameters `address`, `name`, `cluster`, `ip` +and `coord`. It first deletes an existing (or non-existing row) with the same address, then inserts the new one. + +The stored procedure is called with the SQL statement defined in `call_insert_sp`. There are samples shipped with +JGroups for a number of databases, e.g. Postgres, MySql, hsqldb. + + +===== Upgrading from JDBC_PING +Since the schema changed between `JDBC_PING` and `JDBC_PING2`, an upgrade needs to be done from the former to the latter. +This is quite simple: add `JDBC_PING2` to the new configuration, so that both protocols are present. The old one will +read from table `jgroupsping` (default); the new one from `jgroups` (table names can of course be changed). + +Discovery always asks all discovery protocols for members, so both `JDBC_PING` and `JDBC_PING2` are involved. + +When done upgrading, the old `JDBC_PING` protocol can simply be removed. + +NOTE: Another advantage of multiple `JDBC_PING` protocols in the same stack is that multiple databases can be used +for high redundancy; when one DB fails, members will still be able to discover each other with the help of the +second database. + +${JDBC_PING2} ==== BPING diff --git a/src/org/jgroups/protocols/FILE_PING.java b/src/org/jgroups/protocols/FILE_PING.java index 90357499178..f765dda8679 100644 --- a/src/org/jgroups/protocols/FILE_PING.java +++ b/src/org/jgroups/protocols/FILE_PING.java @@ -178,8 +178,6 @@ public void findMembers(final List
members, final boolean initial_disco } } - - protected static String addressToFilename(Address mbr) { String logical_name=NameCache.get(mbr); String name=(addressAsString(mbr) + (logical_name != null? "." + logical_name + SUFFIX : SUFFIX)); diff --git a/src/org/jgroups/protocols/JDBC_PING2.java b/src/org/jgroups/protocols/JDBC_PING2.java new file mode 100644 index 00000000000..287f7f7eb4b --- /dev/null +++ b/src/org/jgroups/protocols/JDBC_PING2.java @@ -0,0 +1,479 @@ +package org.jgroups.protocols; + +import org.jgroups.Address; +import org.jgroups.annotations.ManagedOperation; +import org.jgroups.annotations.Property; +import org.jgroups.protocols.relay.SiteUUID; +import org.jgroups.stack.IpAddress; +import org.jgroups.util.NameCache; +import org.jgroups.util.Responses; +import org.jgroups.util.Util; + +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.sql.DataSource; +import java.sql.*; +import java.util.LinkedList; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.sql.ResultSet.CONCUR_UPDATABLE; +import static java.sql.ResultSet.TYPE_FORWARD_ONLY; + +/** + * New version of {@link JDBC_PING}. Has a new, better legible schema. plus some refactoring + * + * @author Bela Ban + * @since 5.4, 5.3.7 + */ +public class JDBC_PING2 extends FILE_PING { + + /* ----------------------------------------- Properties -------------------------------------------------- */ + + @Property(description="The JDBC connection URL", writable=false) + protected String connection_url; + + @Property(description="The JDBC connection username", writable=false) + protected String connection_username; + + @Property(description="The JDBC connection password", writable=false, exposeAsManagedAttribute=false) + protected String connection_password; + + @Property(description="The JDBC connection driver name", writable=false) + protected String connection_driver; + + @Property(description="If not null, this SQL statement will be performed at startup. Customize it to create the " + + "needed table. To allow for creation attempts, errors performing this statement will be logged " + + "but not considered fatal. To avoid any creation, set this to null.") + protected String initialize_sql="CREATE TABLE jgroups (address varchar(200) NOT NULL, " + + "name varchar(200), " + + "cluster varchar(200) NOT NULL, " + + "ip varchar(200) NOT NULL, " + + "coord boolean, " + + "PRIMARY KEY (address) )"; + + @Property(description="Definition of a stored procedure which deletes an existing row and inserts a new one. Used " + + "only if non-null (as an optimization of calling delete, then insert (1 SQL statement instead of 2). Needs to " + + "accept address (varchar), name (varchar), cluster (varchar), ip (varchar) and coord (boolean") + protected String insert_sp; + + @Property(description="Calls the insert_sp stored procedure. Not used if null.") + protected String call_insert_sp; + + @Property(description="SQL used to insert a new row") + protected String insert_single_sql="INSERT INTO jgroups values (?, ?, ?, ?, ?)"; + + @Property(description="SQL used to delete a row") + protected String delete_single_sql="DELETE FROM jgroups WHERE address=?"; + + @Property(description="SQL to clear the table") + protected String clear_sql="DELETE from jgroups WHERE cluster=?"; + + @Property(description="SQL used to fetch the data of all nodes") + protected String select_all_pingdata_sql="SELECT address, name, ip, coord FROM jgroups WHERE cluster=?"; + + @Property(description="To use a DataSource registered in JNDI, specify the JNDI name here") + protected String datasource_jndi_name; + + @Property(description="The fully qualified name of a class which implements a Function. " + + "If not null, this has precedence over datasource_jndi_name.") + protected String datasource_injecter_class; + + /* --------------------------------------------- Fields ------------------------------------------------------ */ + + protected DataSource dataSource; + + + @Override + protected void createRootDir() { + ; // do *not* create root file system (don't remove !) + } + + public JDBC_PING2 setDataSource(DataSource ds) {this.dataSource=ds; return this;} + public DataSource getDataSource() {return dataSource;} + public String getConnectionUrl() {return connection_url;} + public JDBC_PING2 setConnectionUrl(String c) {this.connection_url=c; return this;} + public String getConnectionUsername() {return connection_username;} + public JDBC_PING2 setConnectionUsername(String c) {this.connection_username=c; return this;} + public String getConnectionPassword() {return connection_password;} + public JDBC_PING2 setConnectionPassword(String c) {this.connection_password=c; return this;} + public String getConnectionDriver() {return connection_driver;} + public JDBC_PING2 setConnectionDriver(String c) {this.connection_driver=c; return this;} + public String getInitializeSql() {return initialize_sql;} + public JDBC_PING2 setInitializeSql(String i) {this.initialize_sql=i; return this;} + public String getInsertSingleSql() {return insert_single_sql;} + public JDBC_PING2 setInsertSingleSql(String i) {this.insert_single_sql=i; return this;} + public String getInsertSp() {return insert_sp;} + public JDBC_PING2 setInsertSp(String sp) {insert_sp=sp; return this;} + public String getCallInsertSp() {return call_insert_sp;} + public JDBC_PING2 setCallInsertSp(String sp) {call_insert_sp=sp; return this;} + public String getDeleteSingleSql() {return delete_single_sql;} + public JDBC_PING2 setDeleteSingleSql(String d) {this.delete_single_sql=d; return this;} + public String getClearSql() {return clear_sql;} + public JDBC_PING2 setClearSql(String c) {this.clear_sql=c; return this;} + public String getSelectAllPingdataSql() {return select_all_pingdata_sql;} + public JDBC_PING2 setSelectAllPingdataSql(String s) {this.select_all_pingdata_sql=s; return this;} + public String getDatasourceJndiName() {return datasource_jndi_name;} + public JDBC_PING2 setDatasourceJndiName(String d) {this.datasource_jndi_name=d; return this;} + public String getDatasourceInjecterClass() {return datasource_injecter_class;} + public JDBC_PING2 setDatasourceInjecterClass(String d) {this.datasource_injecter_class=d; return this;} + + + @Override + public void init() throws Exception { + super.init(); + // If dataSource is already set, skip loading driver or JNDI lookup + if(dataSource == null) { + if(datasource_injecter_class != null) { + dataSource=injectDataSource(datasource_injecter_class); + if(dataSource == null) { + String m=String.format("datasource_injecter_class %s created null datasource", datasource_injecter_class); + throw new IllegalArgumentException(m); + } + } + else { + if(datasource_jndi_name != null) + dataSource=getDataSourceFromJNDI(datasource_jndi_name.trim()); + else + loadDriver(); + } + } + createSchema(); + createInsertStoredProcedure(); + } + + @ManagedOperation(description="Lists all rows in the database") + public String dump(String cluster) throws Exception { + List list=readFromDB(cluster); + return list.stream().map(pd -> String.format("%s", pd)).collect(Collectors.joining("\n")); + } + + protected void write(List list, String clustername) { + for(PingData data: list) { + try { + writeToDB(data, clustername); + } + catch(SQLException e) { + log.error("%s: failed writing to DB: %s", local_addr, e); + } + } + writes++; + } + + + // It's possible that multiple threads in the same cluster node invoke this concurrently; + // Since delete and insert operations are not atomic + // (and there is no SQL standard way to do this without introducing a transaction) + // we need the synchronization or risk a duplicate insertion on same primary key. + // This synchronization should not be a performance problem as this is just a Discovery protocol. + // Many SQL dialects have some "insert or update" expression, but that would need + // additional configuration and testing on each database. See JGRP-1440 + protected synchronized void writeToDB(PingData data, String clustername) throws SQLException { + try(Connection connection=getConnection()) { + if(call_insert_sp != null && insert_sp != null) + callInsertStoredProcedure(connection, data, clustername); + else { + delete(connection, clustername, data.getAddress()); + insert(connection, data, clustername); + } + } + } + + protected void remove(String clustername, Address addr) { + try { + delete(clustername, addr); + } + catch(SQLException e) { + log.error(String.format("%s: failed deleting %s from the table", local_addr, addr), e); + } + } + + protected void removeAll(String clustername) { + try { + clearTable(clustername); + } + catch(Exception ex) { + log.error(String.format("%s: failed clearing the table for cluster %s", local_addr, clustername), ex); + } + } + + protected void readAll(List
members, String cluster, Responses rsps) { + try { + List list=readFromDB(cluster); + for(PingData data: list) { + Address addr=data.getAddress(); + if(data == null || (members != null && !members.contains(addr))) + continue; + rsps.addResponse(data, false); + if(local_addr != null && !local_addr.equals(addr)) + addDiscoveryResponseToCaches(addr, data.getLogicalName(), data.getPhysicalAddr()); + } + } + catch(Exception e) { + log.error(String.format("%s: failed reading from the DB", local_addr), e); + } + } + + protected List readFromDB(String cluster) throws Exception { + try(Connection conn=getConnection(); + PreparedStatement ps=prepare(conn, select_all_pingdata_sql, TYPE_FORWARD_ONLY, CONCUR_UPDATABLE)) { + ps.setString(1, cluster); + if(log.isTraceEnabled()) + log.trace("%s: SQL for reading: %s", local_addr, ps); + try(ResultSet resultSet=ps.executeQuery()) { + reads++; + List retval=new LinkedList<>(); + while(resultSet.next()) { + String uuid=resultSet.getString(1); + String name=resultSet.getString(2); + String ip=resultSet.getString(3); + boolean coord=resultSet.getBoolean(4); + Address addr=Util.addressFromString(uuid); + IpAddress ip_addr=new IpAddress(ip); + PingData data=new PingData(addr, true, name, ip_addr).coord(coord); + retval.add(data); + } + return retval; + } + } + } + + protected static PreparedStatement prepare(final Connection conn, final String sql, final int resultSetType, + final int resultSetConcurrency) throws SQLException { + try { + return conn.prepareStatement(sql, resultSetType, resultSetConcurrency); + } + catch(final SQLException x) { + try { + return conn.prepareStatement(sql); + } + catch(final SQLException x2) { + x.addSuppressed(x2); + throw x; + } + } + } + + protected void createSchema() { + if(initialize_sql == null) { + log.debug("%s: table creation step skipped: initialize_sql attribute is missing", local_addr); + return; + } + try(Connection conn=getConnection(); PreparedStatement ps=conn.prepareStatement(initialize_sql)) { + log.trace("%s: SQL for initializing schema: %s", local_addr, ps); + ps.execute(); + log.debug("%s: table created for JDBC_PING discovery protocol", local_addr); + } + catch(SQLException e) { + log.debug("%s: failed executing initialize_sql statement; not necessarily an error, we always attempt to " + + "create the schema. To suppress this message, set initialize_sql to null. Cause: %s", + local_addr, e.getMessage()); + } + } + protected void createInsertStoredProcedure() throws SQLException { + if(insert_sp == null) + return; + try(Connection conn=getConnection()) { + try(PreparedStatement ps=conn.prepareStatement(insert_sp)) { + log.trace("%s: attempting to create stored procedure %s", local_addr, insert_sp); + ps.execute(); + log.debug("%s: successfully created stored procedure %s", local_addr, insert_sp); + } + catch(SQLException ex) { + log.debug("%s: failed creating stored procedure %s: %s", local_addr, insert_sp, ex.getMessage()); + } + } + } + + protected void loadDriver() { + assertNonNull("connection_driver", connection_driver); + log.debug("%s: loading JDBC driver %s", local_addr, connection_driver); + try { + Util.loadClass(connection_driver, this.getClass().getClassLoader()); + } + catch(ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("JDBC driver could not be loaded: '%s'", connection_driver)); + } + } + + protected DataSource injectDataSource(String ds_class) throws Exception { + Class cl=Util.loadClass(ds_class, Thread.currentThread().getContextClassLoader()); + Object obj=cl.getConstructor().newInstance(); + Function fun=(Function)obj; + return fun.apply(this); + } + + protected Connection getConnection() throws SQLException { + return dataSource != null? dataSource.getConnection() : + DriverManager.getConnection(connection_url, connection_username, connection_password); + } + + protected synchronized void insert(Connection connection, PingData data, String clustername) throws SQLException { + try(PreparedStatement ps=connection.prepareStatement(insert_single_sql)) { + Address address=data.getAddress(); + String addr=Util.addressToString(address); + String name=address instanceof SiteUUID? ((SiteUUID)address).getName() : NameCache.get(address); + IpAddress ip_addr=(IpAddress)data.getPhysicalAddr(); + String ip=ip_addr.toString(); + ps.setString(1, addr); + ps.setString(2, name); + ps.setString(3, clustername); + ps.setString(4, ip); + ps.setBoolean(5, data.isCoord()); + if(log.isTraceEnabled()) + log.trace("%s: SQL for insertion: %s", local_addr, ps); + ps.executeUpdate(); + log.debug("%s: inserted %s for cluster %s", local_addr, address, clustername); + } + } + + protected synchronized void callInsertStoredProcedure(Connection connection, PingData data, String clustername) throws SQLException { + try(PreparedStatement ps=connection.prepareStatement(call_insert_sp)) { + Address address=data.getAddress(); + String addr=Util.addressToString(address); + String name=address instanceof SiteUUID? ((SiteUUID)address).getName() : NameCache.get(address); + IpAddress ip_addr=(IpAddress)data.getPhysicalAddr(); + String ip=ip_addr.toString(); + ps.setString(1, addr); + ps.setString(2, name); + ps.setString(3, clustername); + ps.setString(4, ip); + ps.setBoolean(5, data.isCoord()); + if(log.isTraceEnabled()) + log.trace("%s: SQL for insertion: %s", local_addr, ps); + ps.executeUpdate(); + log.debug("%s: inserted %s for cluster %s", local_addr, address, clustername); + } + } + + protected synchronized void delete(Connection conn, String clustername, Address addressToDelete) throws SQLException { + try(PreparedStatement ps=conn.prepareStatement(delete_single_sql)) { + String addr=Util.addressToString(addressToDelete); + ps.setString(1, addr); + if(log.isTraceEnabled()) + log.trace("%s: SQL for deletion: %s", local_addr, ps); + ps.executeUpdate(); + log.debug("%s: removed %s for cluster %s from database", local_addr, addressToDelete, clustername); + } + } + + protected void delete(String clustername, Address addressToDelete) throws SQLException { + try(Connection connection=getConnection()) { + delete(connection, clustername, addressToDelete); + } + } + + protected void clearTable(String clustername) throws SQLException { + try(Connection conn=getConnection(); PreparedStatement ps=conn.prepareStatement(clear_sql)) { + // check presence of cluster parameter for backwards compatibility + if(clear_sql.indexOf('?') >= 0) + ps.setString(1, clustername); + else + log.debug("%s: please update your clear_sql to include the cluster parameter", local_addr); + ps.execute(); + log.debug("%s: cleared table for cluster %s", local_addr, clustername); + } + } + + protected DataSource getDataSourceFromJNDI(String name) { + final DataSource data_source; + InitialContext ctx=null; + try { + ctx=new InitialContext(); + Object whatever=ctx.lookup(name); + if(whatever == null) + throw new IllegalArgumentException("JNDI name " + name + " is not bound"); + if(!(whatever instanceof DataSource)) + throw new IllegalArgumentException("JNDI name " + name + " was found but is not a DataSource"); + data_source=(DataSource)whatever; + log.debug("%s: datasource found via JNDI lookup via name: %s", local_addr, name); + return data_source; + } + catch(NamingException e) { + throw new IllegalArgumentException("Could not lookup datasource " + name, e); + } + finally { + if(ctx != null) { + try { + ctx.close(); + } + catch(NamingException e) { + log.warn("%s: failed to close naming context: %s", local_addr, e); + } + } + } + } + + + protected static void assertNonNull(String... strings) { + for(int i=0; i < strings.length; i+=2) { + String attr=strings[i], val=strings[i + 1]; + if(val == null) + throw new IllegalArgumentException(String.format("%s must not be null", attr)); + } + } + + + public static void main(String[] args) throws Exception { + String driver="org.hsqldb.jdbcDriver"; + String user="SA"; + String pwd=""; + String conn="jdbc:hsqldb:hsql://localhost/"; + String cluster="draw"; + String select="SELECT address, name, cluster, ip, coord FROM JGROUPS WHERE cluster=?"; + + for(int i=0; i < args.length; i++) { + if(args[i].equals("-driver")) { + driver=args[++i]; + continue; + } + if(args[i].equals("-conn")) { + conn=args[++i]; + continue; + } + if(args[i].equals("-user")) { + user=args[++i]; + continue; + } + if(args[i].equals("-pwd")) { + pwd=args[++i]; + continue; + } + if(args[i].equals("-cluster")) { + cluster=args[++i]; + continue; + } + if(args[i].equals("-select")) { + select=args[++i]; + continue; + } + System.out.println("JDBC_PING2 [-driver driver] [-conn conn-url] [-user user] [-pwd password] " + + "[-cluster cluster-name] [-select select-stmt]"); + return; + } + + Class.forName(driver); + + try(Connection c=DriverManager.getConnection(conn, user, pwd); + PreparedStatement ps=prepare(c, select, TYPE_FORWARD_ONLY, CONCUR_UPDATABLE)) { + ps.setString(1, cluster); + try(ResultSet resultSet=ps.executeQuery()) { + int index=1; + while(resultSet.next()) { + String uuid=resultSet.getString(1); + String name=resultSet.getString(2); + String cluster_name=resultSet.getString(3); + String ip=resultSet.getString(4); + boolean coord=resultSet.getBoolean(5); + System.out.printf("%d: %s, name=%s, ip=%s, %b (cluster=%s)\n", + index++, uuid, name, ip, coord? "coord" : "server", cluster_name); + } + } + } + catch(SQLException e) { + e.printStackTrace(); + } + } + +} diff --git a/src/org/jgroups/stack/IpAddress.java b/src/org/jgroups/stack/IpAddress.java index e07ac7c25cf..a78d97d0c8f 100644 --- a/src/org/jgroups/stack/IpAddress.java +++ b/src/org/jgroups/stack/IpAddress.java @@ -138,8 +138,6 @@ public int hashCode() { } - - public String toString() { return printIpAddress(); } @@ -152,6 +150,10 @@ public String printIpAddress2() { return String.format("%s[%d]", ip_addr != null? ip_addr.getHostAddress() : "localhost", port); } + public String printHostAddress() { + return ip_addr != null? ip_addr.getHostAddress() : ""; + } + @Override public void writeTo(DataOutput out) throws IOException { diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index b2a8fd86d86..986db0de54b 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -118,6 +118,9 @@ public enum AddressScope {GLOBAL,SITE_LOCAL,LINK_LOCAL,LOOPBACK,NON_LOOPBACK} public static final boolean can_bind_to_mcast_addr; protected static ResourceBundle resource_bundle; static DateTimeFormatter UTF_FORMAT=DateTimeFormatter.ofPattern("E MMM d H:m:s 'UTC' y"); + protected static final String UUID_PREFIX="uuid://"; + protected static final String SITE_UUID_PREFIX="site-addr://"; + protected static final String IP_PREFIX="ip://"; static { @@ -1039,8 +1042,6 @@ public static T streamableFromByteBuffer(Supplier fact return streamableFromByteBuffer(factory, buffer, 0, buffer.length); } - - /** * Poor man's serialization of an exception. Serializes only the message, stack trace and cause (not suppressed exceptions) */ @@ -1887,6 +1888,42 @@ public static long size(Address[] addrs) { return retval; } + public static String addressToString(Address addr) { + if(addr == null) + return null; + if(addr.isSiteAddress()) { // SiteUUID + SiteUUID su=(SiteUUID)addr; + return String.format("%s%s:%s:%s", SITE_UUID_PREFIX, su.toStringLong(), su.getName(), su.getSite()); + } + Class cl=addr.getClass(); + if(UUID.class.equals(cl)) + return String.format("%s%s", UUID_PREFIX, ((UUID)addr).toStringLong()); + if(IpAddress.class.equals(cl)) + return String.format("%s%s", IP_PREFIX, addr); + return null; + } + + public static Address addressFromString(String s) throws Exception { + if(s == null) + return null; + int index=s.indexOf(UUID_PREFIX); + if(index >= 0) + return UUID.fromString(s.substring(index+UUID_PREFIX.length())); + index=s.indexOf(SITE_UUID_PREFIX); + if(index >= 0) { + String[] tmp=s.substring(index + SITE_UUID_PREFIX.length()).split(":"); + if(tmp.length == 1) + return UUID.fromString(tmp[0]); + UUID u=UUID.fromString(tmp[0]); + return new SiteUUID(u, tmp[1], tmp[2]); + } + index=s.indexOf(IP_PREFIX); + if(index >= 0) + return new IpAddress(s.substring(index + IP_PREFIX.length())); + return null; + + + } public static void writeStreamable(Streamable obj,DataOutput out) throws IOException { if(obj == null) { diff --git a/tests/junit-functional/org/jgroups/tests/JDBC_PING2_Test.java b/tests/junit-functional/org/jgroups/tests/JDBC_PING2_Test.java new file mode 100644 index 00000000000..ef5e37757bb --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/JDBC_PING2_Test.java @@ -0,0 +1,14 @@ +package org.jgroups.tests; + +import org.jgroups.Global; +import org.testng.annotations.Test; + +/** + * Misc tests for {@link org.jgroups.protocols.JDBC_PING2} + * @author Bela Ban + * @since 5.4, 5.3.7 + */ +@Test(groups= Global.FUNCTIONAL) +public class JDBC_PING2_Test { + +} diff --git a/tests/junit-functional/org/jgroups/tests/UtilTest.java b/tests/junit-functional/org/jgroups/tests/UtilTest.java index 9813e3ce30e..deab1e60d8d 100644 --- a/tests/junit-functional/org/jgroups/tests/UtilTest.java +++ b/tests/junit-functional/org/jgroups/tests/UtilTest.java @@ -6,7 +6,6 @@ import org.jgroups.Message.TransientFlag; import org.jgroups.protocols.relay.SiteUUID; import org.jgroups.stack.IpAddress; -import org.jgroups.util.Bits; import org.jgroups.util.UUID; import org.jgroups.util.*; import org.testng.Assert; @@ -1610,6 +1609,30 @@ public void testGetSites() { assert sites_old.containsKey("nyc"); } + public void testToUUID() throws Exception { + Address uuid=UUID.randomUUID(); + String s=Util.addressToString(null); + assert s == null; + s=Util.addressToString(uuid); + Address uuid2=Util.addressFromString(s); + assert uuid2.equals(uuid); + + SiteUUID u1=new SiteUUID(UUID.randomUUID(), "lon", "X"); + s=Util.addressToString(u1); + Address u2=Util.addressFromString(s); + assert u2 instanceof SiteUUID; + assert u2.equals(u1); + + Address ip1=new IpAddress(7500); + s=Util.addressToString(ip1); + Address ip2=Util.addressFromString(s); + assert ip1.equals(ip2); + ip1=new IpAddress("127.0.0.1", 7500); + s=Util.addressToString(ip1); + ip2=Util.addressFromString(s); + assert ip1.equals(ip2); + } + protected static void check(Enumeration en, Integer[] expected) { List list=new ArrayList<>(); while(en.hasMoreElements())