diff --git a/src/org/jgroups/JChannel.java b/src/org/jgroups/JChannel.java index 9dd07bf3f4..98f0dab5f9 100644 --- a/src/org/jgroups/JChannel.java +++ b/src/org/jgroups/JChannel.java @@ -764,7 +764,9 @@ public JChannel up(MessageBatch batch) { return this; } - + public String toString() { + return isConnected()? String.format("%s (%s)", address(), cluster_name) : super.toString(); + } @ManagedOperation public String toString(boolean details) { diff --git a/tests/junit-functional/org/jgroups/tests/Relay2Test.java b/tests/junit-functional/org/jgroups/tests/Relay2Test.java index 9eed31323c..55a654317f 100644 --- a/tests/junit-functional/org/jgroups/tests/Relay2Test.java +++ b/tests/junit-functional/org/jgroups/tests/Relay2Test.java @@ -14,6 +14,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -25,6 +26,7 @@ @Test(groups=Global.FUNCTIONAL,singleThreaded=true) public class Relay2Test extends RelayTests { protected JChannel a, b, c; // members in site "lon" + protected JChannel d, e, f; // used for other tests AB CD EF protected JChannel x, y, z; // members in site "sfo protected static final String BRIDGE_CLUSTER = "global"; @@ -32,14 +34,14 @@ public class Relay2Test extends RelayTests { - @AfterMethod protected void destroy() {Util.close(z,y,x,c,b,a);} + @AfterMethod protected void destroy() {Util.closeReverse(a,b,c,d,e,f,x,y,z);} /** * Test that RELAY2 can be added to an already connected channel. */ public void testAddRelay2ToAnAlreadyConnectedChannel() throws Exception { // Create and connect a channel. - a=new JChannel(defaultStack()).connect(SFO); + a=new JChannel(defaultStack()).name("A").connect(SFO); System.out.println("Channel " + a.getName() + " is connected. View: " + a.getView()); // Add RELAY2 protocol to the already connected channel. @@ -72,20 +74,13 @@ public void testMissingRouteAfterMerge() throws Exception { x=createNode(SFO, "X"); assert x.getView().size() == 1; - RELAY2 ar=a.getProtocolStack().findProtocol(RELAY2.class), - xr=x.getProtocolStack().findProtocol(RELAY2.class); - + RELAY2 ar=a.getProtocolStack().findProtocol(RELAY2.class), xr=x.getProtocolStack().findProtocol(RELAY2.class); assert ar != null && xr != null; - - JChannel a_bridge=null, x_bridge=null; - for(int i=0; i < 20; i++) { - a_bridge=ar.getBridge(SFO); - x_bridge=xr.getBridge(LON); - if(a_bridge != null && x_bridge != null && a_bridge.getView().size() == 2 && x_bridge.getView().size() == 2) - break; - Util.sleep(500); - } - + Util.waitUntilTrue(10000, 500, () -> { + JChannel ab=ar.getBridge(SFO), xb=xr.getBridge(LON); + return ab != null && xb != null && ab.getView().size() == 2 && xb.getView().size() == 2; + }); + JChannel a_bridge=ar.getBridge(SFO), x_bridge=xr.getBridge(LON); assert a_bridge != null && x_bridge != null; System.out.println("A's bridge channel: " + a_bridge.getView()); @@ -123,13 +118,10 @@ public void testMissingRouteAfterMerge() throws Exception { Util.waitUntilAllChannelsHaveSameView(20000, 500, a, b); System.out.println("A's view: " + a.getView() + "\nB's view: " + b.getView()); - for(int i=0; i < 20; i++) { - bridge_view=xr.getBridgeView(BRIDGE_CLUSTER); - if(bridge_view != null && bridge_view.size() == 2) - break; - Util.sleep(500); - } - + Util.waitUntilTrue(10000, 500, () -> { + View bv=xr.getBridgeView(BRIDGE_CLUSTER); + return bv != null && bv.size() == 2; + }); route=getRoute(x, LON); System.out.println("Route at sfo to lon: " + route); assert route != null; @@ -198,10 +190,7 @@ public void testCoordinatorShutdown() throws Exception { b.close(); long time2=System.currentTimeMillis() - start2; System.out.println("B took " + time2 + " ms"); - waitForBridgeView(1, 40000, 500, BRIDGE_CLUSTER, x); - - Util.close(x,y); } @@ -247,32 +236,32 @@ public void testSiteUnreachableMessageBreaksSiteUUID() throws Exception { x=createNode(SFO, "X"); waitForBridgeView(2, 10000, 500, BRIDGE_CLUSTER, a, x); - final MyUphandler h=new MyUphandler(); - b.setUpHandler(h); + final MyUphandler up_handler=new MyUphandler(); + b.setUpHandler(up_handler); log.debug("Disconnecting X"); x.disconnect(); log.debug("A: waiting for site SFO to be UNKNOWN"); waitUntilRoute(SFO, false, 10000, 500, a); - for (int i = 0; i < 100; i++) + for (int j = 0; j < 100; j++) b.send(new SiteMaster(SFO), "to-sfo".getBytes(StandardCharsets.UTF_8)); log.debug("Sending message from A to B"); - for (int i = 0; i < 100; i++) - a.send(b.getAddress(), ("to-b-" + i).getBytes(StandardCharsets.UTF_8)); + for (int j = 0; j < 100; j++) + a.send(b.getAddress(), ("to-b-" + j).getBytes(StandardCharsets.UTF_8)); - for (int i = 0; i < 100; i++) { - Message take = h.getReceived().take(); + for (int j = 0; j < 100; j++) { + Message take = up_handler.getReceived().take(); assert take.src() instanceof SiteUUID : "Address was " + take.src(); } // https://issues.redhat.com/browse/JGRP-2586 - Util.waitUntilTrue(10000, 500, () -> h.getSiteUnreachableEvents() > 0); - assert h.getSiteUnreachableEvents() > 0 && h.getSiteUnreachableEvents() <= 100 - : "Expecting <= 100 site unreachable events on node B but got " + h.getSiteUnreachableEvents(); + Util.waitUntilTrue(10000, 500, () -> up_handler.getSiteUnreachableEvents() > 0); + assert up_handler.getSiteUnreachableEvents() > 0 && up_handler.getSiteUnreachableEvents() <= 100 + : "Expecting <= 100 site unreachable events on node B but got " + up_handler.getSiteUnreachableEvents(); // drain site-unreachable events received after this point - Util.waitUntilTrue(3000, 500, () -> h.getSiteUnreachableEvents() > 10); + Util.waitUntilTrue(3000, 500, () -> up_handler.getSiteUnreachableEvents() > 10); MyUphandler h2=new MyUphandler(); b.setUpHandler(h2); @@ -376,41 +365,39 @@ public void testForwardingRoute() { /** Tests https://issues.redhat.com/browse/JGRP-2712 */ public void testSitesUp() throws Exception { - try(JChannel _a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _c=createNode(LON, "C", BRIDGE_CLUSTER, LON, NYC, SFO); + a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); + b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); + c=createNode(LON, "C", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _d=createNode(NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _e=createNode(NYC, "E", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _f=createNode(NYC, "F", BRIDGE_CLUSTER, LON, NYC, SFO); + d=createNode(NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO); + e=createNode(NYC, "E", BRIDGE_CLUSTER, LON, NYC, SFO); + f=createNode(NYC, "F", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _g=createNode(SFO, "G", BRIDGE_CLUSTER, LON, NYC, SFO); + try(JChannel _g=createNode(SFO, "G", BRIDGE_CLUSTER, LON, NYC, SFO); JChannel _h=createNode(SFO, "H", BRIDGE_CLUSTER, LON, NYC, SFO); JChannel _i=createNode(SFO, "I", BRIDGE_CLUSTER, LON, NYC, SFO)) { - - Util.waitUntilAllChannelsHaveSameView(5000, 100, _a,_b,_c); - Util.waitUntilAllChannelsHaveSameView(5000, 100, _d,_e,_f); + Util.waitUntilAllChannelsHaveSameView(5000, 100, a,b,c); + Util.waitUntilAllChannelsHaveSameView(5000, 100, d,e,f); Util.waitUntilAllChannelsHaveSameView(5000, 100, _g,_h,_i); - waitUntilRoute(NYC, true, 5000, 500, _a); - waitUntilRoute(SFO, true, 5000, 500, _a); - waitUntilRoute(LON, true, 5000, 500, _d); - waitUntilRoute(SFO, true, 5000, 500, _d); + waitUntilRoute(NYC, true, 5000, 500, a); + waitUntilRoute(SFO, true, 5000, 500, a); + waitUntilRoute(LON, true, 5000, 500, d); + waitUntilRoute(SFO, true, 5000, 500, d); waitUntilRoute(LON, true, 5000, 500, _g); waitUntilRoute(NYC, true, 5000, 500, _g); - assert Stream.of(_a,_b,_c,_d,_e,_f,_g,_h,_i) - .allMatch(c -> c.getView().size() == 3); - assert Stream.of(_a,_d,_g).map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) + assert Stream.of(a,b,c,d,e,f,_g,_h,_i).allMatch(c -> c.getView().size() == 3); + assert Stream.of(a,d,_g).map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .allMatch(RELAY2::isSiteMaster); - Stream.of(_d,_e,_f,_g,_h,_i) + Stream.of(d,e,f,_g,_h,_i) .map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(false))); // now stop A; B will become new site master and we should get a site-down(NYC), then site-up(NYC) - Util.close(_a); - Util.waitUntil(5000, 500, () -> Stream.of(_d,_e,_f,_g,_h,_i) + Util.close(a); + Util.waitUntil(5000, 500, () -> Stream.of(d,e,f,_g,_h,_i) .map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .peek(r -> System.out.printf("%s: %s\n", r.getAddress(), r.getRouteStatusListener())) .map(r -> (MyRouteStatusListener)r.getRouteStatusListener()) @@ -418,83 +405,155 @@ public void testSitesUp() throws Exception { } } + + /** Tests sending and receiving of messages across sites */ public void testSendAndReceiveMulticasts() throws Exception { final int NUM=2; - try(JChannel _a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO).setReceiver(new MyReceiver()); - JChannel _b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO).setReceiver(new MyReceiver()); + createSymmetricNetwork(ch -> new MyReceiver()); - JChannel _c=createNode(NYC, "C", BRIDGE_CLUSTER, LON, NYC, SFO).setReceiver(new MyReceiver()); - JChannel _d=createNode(NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO).setReceiver(new MyReceiver()); + // sends multicasts from site-master (_a) and non site masters (_b,_c, ... + for(JChannel ch: allChannels()) { + for(int i=1; i <= NUM; i++) + ch.send(null, String.format("%s-%d", ch.getAddress(), i)); + } + Util.waitUntil(5000, 500, + () -> allChannels().stream() + .peek(ch -> System.out.printf("%s: %s\n", ch.getAddress(), ((MyReceiver)ch.getReceiver()).list())) + .map(ch -> (MyReceiver)ch.getReceiver()) + .allMatch(r -> r.size() == 6 * NUM)); + } - JChannel _e=createNode(SFO, "E", BRIDGE_CLUSTER, LON, NYC, SFO).setReceiver(new MyReceiver()); - JChannel _f=createNode(SFO, "F", BRIDGE_CLUSTER, LON, NYC, SFO).setReceiver(new MyReceiver())) { + /** Tests sending of multicasts and reception of responses */ + public void testSendAndReceiveMulticastsAndUnicastResponses() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + for(JChannel ch: allChannels()) + ch.send(null, String.format("%s", ch.getAddress())); + Util.waitUntil(5000, 500, + () -> allChannels().stream() + .peek(ch -> System.out.printf("%s: %s\n", ch.getAddress(), + ((MyReceiver)ch.getReceiver()).list(Message::getObject))) + .map(ch -> (MyReceiver)ch.getReceiver()) + .allMatch(r -> r.size() == 6 * 2)); + + for(JChannel ch: allChannels()) { + List list=((MyReceiver)ch.getReceiver()).list(); + // assert that there are 6 multicasts and 6 unicasts + int multicasts=list.stream().map(m -> m.dest() == null? 1 : 0).mapToInt(i -> i).sum(); + assert multicasts == 6; + int unicasts=list.stream().map(m -> m.dest() != null? 1 : 0).mapToInt(i -> i).sum(); + assert unicasts == 6; + Set
senders=list.stream().map(Message::getSrc).filter(Objects::nonNull) + .collect(Collectors.toSet()); + assert allChannels().stream().map(JChannel::getAddress).collect(Collectors.toSet()).equals(senders); + } + } - Util.waitUntilAllChannelsHaveSameView(5000, 200, _a,_b); - Util.waitUntilAllChannelsHaveSameView(5000, 200, _c,_d); - Util.waitUntilAllChannelsHaveSameView(5000, 200, _e,_f); + /** Tests sending to the 3 site masters A, C and E, and expecting responses */ + public void testSendingToSiteMasters() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + for(JChannel ch: allChannels()) { + for(String site: new String[]{LON,NYC,SFO}) { + Address target=new SiteMaster(site); + ch.send(target, String.format("%s", ch.getAddress())); + } + } - // sends multicasts from site-master (_a) and non site masters (_b,_c, ... - for(JChannel ch: Arrays.asList(_a,_b,_c,_d,_e,_f)) { - for(int i=1; i <= NUM; i++) - ch.send(null, String.format("%s-%d", ch.getAddress(), i)); + // each site-master received 6 messages and 3 responses, each non-SM received 3 responses only + Util.waitUntil(5000, 200, () -> { + for(JChannel ch: allChannels()) { + List list=((MyReceiver)ch.getReceiver()).list(); + int expected_size=isSiteMaster(ch)? 9 : 3; + if(expected_size != list.size()) + return false; + System.out.printf("%s: %s\n", ch.getAddress(), list.stream().map(Message::getObject).collect(Collectors.toList())); } - Util.waitUntil(5000, 500, - () -> Stream.of(_a,_b,_c,_d,_e,_f) - .peek(ch -> System.out.printf("%s: %s\n", ch.getAddress(), ((MyReceiver)ch.getReceiver()).list())) - .map(ch -> (MyReceiver)ch.getReceiver()) - .allMatch(r -> r.size() == 6 * NUM)); + return true; + }); + + // check the address of the responses: all non site masters must have responses from site masters to them; the + // site masters' addresses are the actual address of the members acting as site masters + assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> ((MyReceiver)ch.getReceiver()).list()) + .allMatch(l -> l.stream().allMatch(m -> m.dest() != null && m.src() != null)); + + // check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters) + assert allChannels().stream().filter(Relay2Test::isSiteMaster) + .map(ch -> ((MyReceiver)ch.getReceiver()).list()) + .allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6); + } + + /** Same as above but with SiteMaster(null) as target */ + public void testSendingToAllSiteMasters() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + for(JChannel ch: allChannels()) { + Address target=new SiteMaster(null); + ch.send(target, String.format("%s", ch.getAddress())); } + + // each site-master received 6 messages and 3 responses, each non-SM received 3 responses only + Util.waitUntil(5000, 200, () -> { + for(JChannel ch: allChannels()) { + List list=((MyReceiver)ch.getReceiver()).list(); + int expected_size=isSiteMaster(ch)? 9 : 3; + if(expected_size != list.size()) + return false; + System.out.printf("%s: %s\n", ch.getAddress(), list.stream().map(Message::getObject).collect(Collectors.toList())); + } + return true; + }); + + // check the address of the responses: all non site masters must have responses from site masters to them; the + // site masters' addresses are the actual address of the members acting as site masters + assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> ((MyReceiver)ch.getReceiver()).list()) + .allMatch(l -> l.stream().allMatch(m -> m.dest() != null && m.src() != null)); + + // check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters) + assert allChannels().stream().filter(Relay2Test::isSiteMaster) + .map(ch -> ((MyReceiver)ch.getReceiver()).list()) + .allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6); } - /** Tests sending of multicasts and reception of responses */ - public void testSendAndReceiveMulticastsAndUnicastResponses() throws Exception { - try (JChannel _a=createNode(LON,"A",BRIDGE_CLUSTER,LON,NYC,SFO); - JChannel _b=createNode(LON,"B",BRIDGE_CLUSTER,LON,NYC,SFO); - JChannel _c=createNode(NYC,"C",BRIDGE_CLUSTER,LON,NYC,SFO); - JChannel _d=createNode(NYC,"D",BRIDGE_CLUSTER,LON,NYC,SFO); + protected static JChannel createNode(String site_name, String node_name) throws Exception { + return createNode(site_name, node_name, 1, null); + } - JChannel _e=createNode(SFO,"E",BRIDGE_CLUSTER,LON,NYC,SFO); - JChannel _f=createNode(SFO,"F",BRIDGE_CLUSTER,LON,NYC,SFO)) { + protected static JChannel createNode(String site_name, String node_name, int num_site_masters, + String sm_picker) throws Exception { + RELAY2 relay=createSymmetricRELAY2(site_name, BRIDGE_CLUSTER, LON, SFO) + .setMaxSiteMasters(num_site_masters).setSiteMasterPickerImpl(sm_picker); + JChannel ch=new JChannel(defaultStack(relay)).name(node_name); + if(site_name != null) + ch.connect(site_name); + return ch; + } - Util.waitUntilAllChannelsHaveSameView(5000,200,_a,_b); - Util.waitUntilAllChannelsHaveSameView(5000,200,_c,_d); - Util.waitUntilAllChannelsHaveSameView(5000,200,_e,_f); + protected static boolean isSiteMaster(JChannel ch) { + RELAY2 r=ch.getProtocolStack().findProtocol(RELAY2.class); + return r != null && r.isSiteMaster(); + } - _a.setReceiver(new ResponseSender(_a).rawMsgs(true)); - _b.setReceiver(new ResponseSender(_b).rawMsgs(true)); - _c.setReceiver(new ResponseSender(_c).rawMsgs(true)); - _d.setReceiver(new ResponseSender(_d).rawMsgs(true)); - _e.setReceiver(new ResponseSender(_e).rawMsgs(true)); - _f.setReceiver(new ResponseSender(_f).rawMsgs(true)); + protected void createSymmetricNetwork(Function r) throws Exception { + a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); + b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); - for(JChannel ch: Arrays.asList(_a,_b,_c,_d,_e,_f)) { - ch.send(null, String.format("%s", ch.getAddress())); - } - Util.waitUntil(5000, 500, - () -> Stream.of(_a,_b,_c,_d,_e,_f) - .peek(ch -> System.out.printf("%s: %s\n", ch.getAddress(), - ((MyReceiver)ch.getReceiver()).list(Message::getObject))) - .map(ch -> (MyReceiver)ch.getReceiver()) - .allMatch(r -> r.size() == 6 * 2)); - - for(JChannel ch: Arrays.asList(_a,_b,_c,_d,_e,_f)) { - List list=((MyReceiver)ch.getReceiver()).list(); - // assert that there are 6 multicasts and 6 unicasts - int multicasts=list.stream().map(m -> m.dest() == null? 1 : 0).mapToInt(i -> i).sum(); - assert multicasts == 6; - int unicasts=list.stream().map(m -> m.dest() != null? 1 : 0).mapToInt(i -> i).sum(); - assert unicasts == 6; - Set
senders=list.stream().map(Message::getSrc).filter(Objects::nonNull) - .collect(Collectors.toSet()); - assert Set.of(_a.getAddress(), _b.getAddress(), _c.getAddress(), _d.getAddress(), - _e.getAddress(), _f.getAddress()).equals(senders); - } + c=createNode(NYC, "C", BRIDGE_CLUSTER, LON, NYC, SFO); + d=createNode(NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO); - } + e=createNode(SFO, "E", BRIDGE_CLUSTER, LON, NYC, SFO); + f=createNode(SFO, "F", BRIDGE_CLUSTER, LON, NYC, SFO); + + if(r != null) + allChannels().forEach(ch -> ch.setReceiver(r.apply(ch))); + + Util.waitUntilAllChannelsHaveSameView(5000, 200, a,b); + Util.waitUntilAllChannelsHaveSameView(5000, 200, c,d); + Util.waitUntilAllChannelsHaveSameView(5000, 200, e,f); } + protected List allChannels() { + return Arrays.asList(a,b,c,d,e,f); + } protected static class ResponseSender extends MyReceiver { protected final JChannel ch; @@ -506,8 +565,7 @@ public ResponseSender(JChannel ch) { @Override public void receive(Message msg) { super.receive(msg); - if(msg.dest() == null) { // send unicast response back to sender - Object obj=msg.getObject(); + if(msg.dest() == null || msg.dest() instanceof SiteMaster) { // send unicast response back to sender Message rsp=new ObjectMessage(msg.src(),"rsp-" + ch.getAddress()); try { ch.send(rsp); @@ -532,22 +590,4 @@ public Route pickRoute(String site, List routes, Address original_sender) return routes.get(0); } } - - - protected static JChannel createNode(String site_name, String node_name) throws Exception { - return createNode(site_name, node_name, 1, null); - } - - protected static JChannel createNode(String site_name, String node_name, int num_site_masters, - String sm_picker) throws Exception { - RELAY2 relay=createSymmetricRELAY2(site_name, BRIDGE_CLUSTER, LON, SFO) - .setMaxSiteMasters(num_site_masters).setSiteMasterPickerImpl(sm_picker); - JChannel ch=new JChannel(defaultStack(relay)).name(node_name); - if(site_name != null) - ch.connect(site_name); - return ch; - } - - - }