From 764e5c21d53f5b3806903adadc051a3d284d9a9b Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 20 Jul 2023 15:00:09 +0200 Subject: [PATCH] - Adding unit tests for send/receive across sites (https://issues.redhat.com/browse/JGRP-2699) - Added test for sending of multicasts and reception of unicasts - Added test for sending of message to SiteMaster(null) - Added slf4j-nop to make the stupid warning message by testng go away :-( --- ivy.xml | 1 + pom.xml | 9 +- src/org/jgroups/JChannel.java | 4 +- src/org/jgroups/protocols/relay/RELAY2.java | 46 +- src/org/jgroups/stack/ProtocolStack.java | 3 +- src/org/jgroups/util/MyReceiver.java | 6 +- .../org/jgroups/tests/Relay2Test.java | 469 +++++++++++++----- .../org/jgroups/tests/RelayTests.java | 115 ++++- 8 files changed, 496 insertions(+), 157 deletions(-) diff --git a/ivy.xml b/ivy.xml index 13fb3d89cc7..436a3754528 100644 --- a/ivy.xml +++ b/ivy.xml @@ -18,6 +18,7 @@ + diff --git a/pom.xml b/pom.xml index f338e73cb4c..691ff9ba6a9 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,14 @@ org.slf4j slf4j-api - 2.0.3 + 2.0.7 + true + + + + org.slf4j + slf4j-nop + 2.0.7 true diff --git a/src/org/jgroups/JChannel.java b/src/org/jgroups/JChannel.java index 9dd07bf3f43..98f0dab5f9f 100644 --- a/src/org/jgroups/JChannel.java +++ b/src/org/jgroups/JChannel.java @@ -764,7 +764,9 @@ public JChannel up(MessageBatch batch) { return this; } - + public String toString() { + return isConnected()? String.format("%s (%s)", address(), cluster_name) : super.toString(); + } @ManagedOperation public String toString(boolean details) { diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index bb906434f2a..a20c66e566a 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -27,7 +27,8 @@ import static org.jgroups.protocols.relay.Relay2Header.*; // todo: check if copy is needed in route(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not) - +// todo: use CompletableFutures in routeThen(); this could parallelize routing and delivery/passsing up +// todo: check if a message can bypass RELAY2 completely when NO_RELAY is set (in up(),down()) /** * Provides relaying of messages between autonomous sites.
* Design: ./doc/design/RELAY2.txt and at https://github.com/belaban/JGroups/blob/master/doc/design/RELAY2.txt.
@@ -41,7 +42,8 @@ @MBean(description="RELAY2 protocol") public class RELAY2 extends Protocol { // reserved flags - public static final short can_become_site_master_flag = 1 << 1; + public static final short can_become_site_master_flag = 1 << 1; + protected static final String RELAY2_CL=RELAY2.class.getSimpleName(); /* ------------------------------------------ Properties ---------------------------------------------- */ @Property(description="Name of the site; must be defined in the configuration",writable=false) @@ -430,6 +432,8 @@ public Object down(Event evt) { } public Object down(Message msg) { + //if(msg.isFlagSet(Flag.NO_RELAY)) + // return down_prot.down(msg); msg.src(local_addr); return process(true, msg); } @@ -441,6 +445,8 @@ public Object up(Event evt) { } public Object up(Message msg) { + // if(msg.isFlagSet(Flag.NO_RELAY)) + // return up_prot.up(msg); Message copy=msg; Relay2Header hdr=msg.getHeader(id); if(hdr != null) { @@ -458,6 +464,8 @@ public void up(MessageBatch batch) { List unreachable_sites=null; for(Iterator it=batch.iterator(); it.hasNext();) { Message msg=it.next(), copy=msg; + // if(msg.isFlagSet(Flag.NO_RELAY)) + // continue; Relay2Header hdr=msg.getHeader(id); it.remove(); if(hdr != null) { @@ -546,6 +554,9 @@ public void handleView(View view) { topo().adjust(this.site, view.getMembers()); } + public String toString() { + return String.format("%s%s", RELAY2_CL, local_addr != null? String.format(" (%s)", local_addr) : ""); + } /** Called to handle a message received from a different site (via a bridge channel) */ protected void handleRelayMessage(Relay2Header hdr, Message msg) { @@ -605,29 +616,24 @@ protected Object process(boolean down, Message msg) { switch(type) { case ALL: if(down) - return routeThen(msg,null,() -> deliver(null, msg, true)); - return routeThen(msg,null,() -> passUp(msg)); + return routeThen(msg, null,() -> deliver(null, msg, true)); + return routeThen(msg, null, () -> passUp(msg)); case SM_ALL: - if(down) - return routeThen(msg,null,() -> deliver(local_addr, msg, false)); - return routeThen(msg,null,() -> passUp(msg)); + return routeThen(msg, null, () -> passUp(msg)); case SM: - if(sameSite(dst)) { - if(down) - return deliver(local_addr, msg, false); + if(sameSite(dst)) return passUp(msg); - } return route(msg, Arrays.asList(dst.getSite())); case UNICAST: if(sameSite(dst)) { - if(down) - return deliver(dst, msg, false); - if(local_addr.equals(dst)) - return passUp(msg); - return deliver(dst, msg, false); + if(down) { + // no passUp() if dst == local_addr: we want the transport to use a separate thread to do + // loopbacks + return deliver(dst, msg,false); + } + return passUp(msg); } - else - return route(msg, Arrays.asList(dst.getSite())); + return route(msg, Arrays.asList(dst.getSite())); } } else { @@ -640,10 +646,10 @@ protected Object process(boolean down, Message msg) { case SM: if(down) return sendToLocalSiteMaster(local_addr, msg); // todo: local_addr or msg.src()? - throw new IllegalStateException(String.format("non site master received a sg with dst %s",dst)); + throw new IllegalStateException(String.format("non site master received a msg with dst %s",dst)); case UNICAST: if(down) { - if(sameSite(dst)) + if(sameSite(dst)) // todo: if same address -> passUp() return deliver(dst, msg, false); return sendToLocalSiteMaster(local_addr, msg); } diff --git a/src/org/jgroups/stack/ProtocolStack.java b/src/org/jgroups/stack/ProtocolStack.java index c342e0f1b5a..1382f74d2ae 100644 --- a/src/org/jgroups/stack/ProtocolStack.java +++ b/src/org/jgroups/stack/ProtocolStack.java @@ -946,8 +946,7 @@ public Object down(Event evt) { } public Object down(Message msg) { - if(top_prot != null) - return top_prot.down(msg); + if(top_prot != null) return top_prot.down(msg); return null; } diff --git a/src/org/jgroups/util/MyReceiver.java b/src/org/jgroups/util/MyReceiver.java index 23cd6367009..c7701d90a4f 100644 --- a/src/org/jgroups/util/MyReceiver.java +++ b/src/org/jgroups/util/MyReceiver.java @@ -8,6 +8,8 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Generic receiver for a JChannel @@ -23,9 +25,8 @@ public class MyReceiver implements Receiver, Closeable { public void receive(Message msg) { T obj=raw_msgs? (T)msg : (T)msg.getObject(); list.add(obj); - if(verbose) { + if(verbose) System.out.println((name() != null? name() + ":" : "") + " received message from " + msg.getSrc() + ": " + obj); - } } @Override @@ -36,6 +37,7 @@ public void viewAccepted(View new_view) { public MyReceiver rawMsgs(boolean flag) {this.raw_msgs=flag; return this;} public List list() {return list;} + public List list(Function f) {return list.stream().map(f).collect(Collectors.toList());} public MyReceiver verbose(boolean flag) {verbose=flag; return this;} public String name() {return name;} public MyReceiver name(String name) {this.name=name; return this;} diff --git a/tests/junit-functional/org/jgroups/tests/Relay2Test.java b/tests/junit-functional/org/jgroups/tests/Relay2Test.java index bd06b21a8a0..4c340f306aa 100644 --- a/tests/junit-functional/org/jgroups/tests/Relay2Test.java +++ b/tests/junit-functional/org/jgroups/tests/Relay2Test.java @@ -13,9 +13,15 @@ import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.jgroups.tests.RelayTests.Data.Type.REQ; + /** * Various RELAY2-related tests * @author Bela Ban @@ -24,6 +30,7 @@ @Test(groups=Global.FUNCTIONAL,singleThreaded=true) public class Relay2Test extends RelayTests { protected JChannel a, b, c; // members in site "lon" + protected JChannel d, e, f; // used for other tests AB CD EF protected JChannel x, y, z; // members in site "sfo protected static final String BRIDGE_CLUSTER = "global"; @@ -31,14 +38,14 @@ public class Relay2Test extends RelayTests { - @AfterMethod protected void destroy() {Util.close(z,y,x,c,b,a);} + @AfterMethod protected void destroy() {Util.closeReverse(a,b,c,d,e,f,x,y,z);} /** * Test that RELAY2 can be added to an already connected channel. */ public void testAddRelay2ToAnAlreadyConnectedChannel() throws Exception { // Create and connect a channel. - a=new JChannel(defaultStack()).connect(SFO); + a=new JChannel(defaultStack()).name("A").connect(SFO); System.out.println("Channel " + a.getName() + " is connected. View: " + a.getView()); // Add RELAY2 protocol to the already connected channel. @@ -71,20 +78,13 @@ public void testMissingRouteAfterMerge() throws Exception { x=createNode(SFO, "X"); assert x.getView().size() == 1; - RELAY2 ar=a.getProtocolStack().findProtocol(RELAY2.class), - xr=x.getProtocolStack().findProtocol(RELAY2.class); - + RELAY2 ar=a.getProtocolStack().findProtocol(RELAY2.class), xr=x.getProtocolStack().findProtocol(RELAY2.class); assert ar != null && xr != null; - - JChannel a_bridge=null, x_bridge=null; - for(int i=0; i < 20; i++) { - a_bridge=ar.getBridge(SFO); - x_bridge=xr.getBridge(LON); - if(a_bridge != null && x_bridge != null && a_bridge.getView().size() == 2 && x_bridge.getView().size() == 2) - break; - Util.sleep(500); - } - + Util.waitUntilTrue(10000, 500, () -> { + JChannel ab=ar.getBridge(SFO), xb=xr.getBridge(LON); + return ab != null && xb != null && ab.getView().size() == 2 && xb.getView().size() == 2; + }); + JChannel a_bridge=ar.getBridge(SFO), x_bridge=xr.getBridge(LON); assert a_bridge != null && x_bridge != null; System.out.println("A's bridge channel: " + a_bridge.getView()); @@ -122,39 +122,16 @@ public void testMissingRouteAfterMerge() throws Exception { Util.waitUntilAllChannelsHaveSameView(20000, 500, a, b); System.out.println("A's view: " + a.getView() + "\nB's view: " + b.getView()); - for(int i=0; i < 20; i++) { - bridge_view=xr.getBridgeView(BRIDGE_CLUSTER); - if(bridge_view != null && bridge_view.size() == 2) - break; - Util.sleep(500); - } - + Util.waitUntilTrue(10000, 500, () -> { + View bv=xr.getBridgeView(BRIDGE_CLUSTER); + return bv != null && bv.size() == 2; + }); route=getRoute(x, LON); System.out.println("Route at sfo to lon: " + route); assert route != null; } - /** - * Tests whether the bridge channel connects and disconnects ok. - */ - public void testConnectAndReconnectOfBridgeStack() throws Exception { - a=new JChannel(defaultStack()).setName("A"); - b=new JChannel(defaultStack()).setName("B"); - - a.connect(BRIDGE_CLUSTER); - b.connect(BRIDGE_CLUSTER); - Util.waitUntilAllChannelsHaveSameView(10000, 500, a, b); - - b.disconnect(); - Util.waitUntilAllChannelsHaveSameView(10000, 500, a); - - b.connect(BRIDGE_CLUSTER); - Util.waitUntilAllChannelsHaveSameView(10000, 500, a, b); - } - - - /** * Tests sites LON and SFO, with SFO disconnecting (bridge view on LON should be 1) and reconnecting (bridge view on * LON and SFO should be 2) @@ -197,10 +174,7 @@ public void testCoordinatorShutdown() throws Exception { b.close(); long time2=System.currentTimeMillis() - start2; System.out.println("B took " + time2 + " ms"); - waitForBridgeView(1, 40000, 500, BRIDGE_CLUSTER, x); - - Util.close(x,y); } @@ -225,12 +199,12 @@ public void testUnknownAndUpStateTransitions() throws Exception { System.out.println("Disconnecting X"); x.disconnect(); - System.out.println("A: waiting for site SFO to be UNKNOWN"); + System.out.println("A: waiting for site SFO to be DOWN"); waitUntilRoute(SFO, false, 20000, 500, a); - System.out.println("Reconnecting X, waiting for 5 seconds to see if the route is marked as DOWN"); + System.out.println("Reconnecting X"); x.connect(SFO); - Util.sleep(5000); + waitUntilRoute(SFO, true, 5000, 100, a); Route route=getRoute(a, SFO); assert route != null : "route is " + route + " (expected to be UP)"; @@ -246,32 +220,32 @@ public void testSiteUnreachableMessageBreaksSiteUUID() throws Exception { x=createNode(SFO, "X"); waitForBridgeView(2, 10000, 500, BRIDGE_CLUSTER, a, x); - final MyUphandler h=new MyUphandler(); - b.setUpHandler(h); + final MyUphandler up_handler=new MyUphandler(); + b.setUpHandler(up_handler); log.debug("Disconnecting X"); x.disconnect(); log.debug("A: waiting for site SFO to be UNKNOWN"); waitUntilRoute(SFO, false, 10000, 500, a); - for (int i = 0; i < 100; i++) + for (int j = 0; j < 100; j++) b.send(new SiteMaster(SFO), "to-sfo".getBytes(StandardCharsets.UTF_8)); log.debug("Sending message from A to B"); - for (int i = 0; i < 100; i++) - a.send(b.getAddress(), ("to-b-" + i).getBytes(StandardCharsets.UTF_8)); + for (int j = 0; j < 100; j++) + a.send(b.getAddress(), ("to-b-" + j).getBytes(StandardCharsets.UTF_8)); - for (int i = 0; i < 100; i++) { - Message take = h.getReceived().take(); + for (int j = 0; j < 100; j++) { + Message take = up_handler.getReceived().take(); assert take.src() instanceof SiteUUID : "Address was " + take.src(); } // https://issues.redhat.com/browse/JGRP-2586 - Util.waitUntilTrue(10000, 500, () -> h.getSiteUnreachableEvents() > 0); - assert h.getSiteUnreachableEvents() > 0 && h.getSiteUnreachableEvents() <= 100 - : "Expecting <= 100 site unreachable events on node B but got " + h.getSiteUnreachableEvents(); + Util.waitUntilTrue(10000, 500, () -> up_handler.getSiteUnreachableEvents() > 0); + assert up_handler.getSiteUnreachableEvents() > 0 && up_handler.getSiteUnreachableEvents() <= 100 + : "Expecting <= 100 site unreachable events on node B but got " + up_handler.getSiteUnreachableEvents(); // drain site-unreachable events received after this point - Util.waitUntilTrue(3000, 500, () -> h.getSiteUnreachableEvents() > 10); + Util.waitUntilTrue(3000, 500, () -> up_handler.getSiteUnreachableEvents() > 10); MyUphandler h2=new MyUphandler(); b.setUpHandler(h2); @@ -286,32 +260,6 @@ public void testSiteUnreachableMessageBreaksSiteUUID() throws Exception { : "Expecting 100 site unreachable events on node A but got " + h2.getSiteUnreachableEvents(); } - protected class MyUphandler implements UpHandler { - protected final BlockingQueue received=new LinkedBlockingDeque<>(); - protected final AtomicInteger siteUnreachableEvents=new AtomicInteger(0); - - public BlockingQueue getReceived() {return received;} - public int getSiteUnreachableEvents() {return siteUnreachableEvents.get();} - - @Override public UpHandler setLocalAddress(Address a) {return this;} - - @Override - public Object up(Event evt) { - if(evt.getType() == Event.SITE_UNREACHABLE) { - log.debug("Site %s is unreachable", (Object) evt.getArg()); - siteUnreachableEvents.incrementAndGet(); - } - return null; - } - - @Override - public Object up(Message msg) { - log.debug("Received %s from %s\n", new String(msg.getArray(), StandardCharsets.UTF_8), msg.getSrc()); - received.add(msg); - return null; - } - } - /** @@ -320,8 +268,8 @@ public Object up(Message msg) { * despite using multiple site masters. JIRA: https://issues.redhat.com/browse/JGRP-2112 */ public void testSenderOrderWithMultipleSiteMasters() throws Exception { - MyReceiver rx=new MyReceiver<>().rawMsgs(true).verbose(true), - ry=new MyReceiver<>().rawMsgs(true).verbose(true), rz=new MyReceiver<>().rawMsgs(true).verbose(true); + MyReceiver rx=new MyReceiver<>().rawMsgs(true), ry=new MyReceiver<>().rawMsgs(true), + rz=new MyReceiver<>().rawMsgs(true); final int NUM=512; final String sm_picker_impl=SiteMasterPickerImpl.class.getName(); a=createNode(LON, "A", 2, sm_picker_impl); @@ -345,17 +293,7 @@ public void testSenderOrderWithMultipleSiteMasters() throws Exception { c.send(msg); } - boolean running=true; - for(int i=0; running && i < 10; i++) { - for(MyReceiver r: Arrays.asList(rx,ry,rz)) { - if(r.size() >= NUM) { - running=false; - break; - } - } - Util.sleep(1000); - } - + Util.waitUntilTrue(10000, 500, () -> Stream.of(rx,ry,rz).anyMatch(l -> l.size() >= NUM)); System.out.printf("X: size=%d\nY: size=%d\nZ: size=%d\n", rx.size(), ry.size(), rz.size()); assert rx.size() == NUM || ry.size() == NUM; assert rz.size() == 0; @@ -375,41 +313,39 @@ public void testForwardingRoute() { /** Tests https://issues.redhat.com/browse/JGRP-2712 */ public void testSitesUp() throws Exception { - try(JChannel _a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _c=createNode(LON, "C", BRIDGE_CLUSTER, LON, NYC, SFO); + a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); + b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); + c=createNode(LON, "C", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _d=createNode(NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _e=createNode(NYC, "E", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _f=createNode(NYC, "F", BRIDGE_CLUSTER, LON, NYC, SFO); + d=createNode(NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO); + e=createNode(NYC, "E", BRIDGE_CLUSTER, LON, NYC, SFO); + f=createNode(NYC, "F", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _g=createNode(SFO, "G", BRIDGE_CLUSTER, LON, NYC, SFO); + try(JChannel _g=createNode(SFO, "G", BRIDGE_CLUSTER, LON, NYC, SFO); JChannel _h=createNode(SFO, "H", BRIDGE_CLUSTER, LON, NYC, SFO); JChannel _i=createNode(SFO, "I", BRIDGE_CLUSTER, LON, NYC, SFO)) { - - Util.waitUntilAllChannelsHaveSameView(5000, 100, _a,_b,_c); - Util.waitUntilAllChannelsHaveSameView(5000, 100, _d,_e,_f); + Util.waitUntilAllChannelsHaveSameView(5000, 100, a,b,c); + Util.waitUntilAllChannelsHaveSameView(5000, 100, d,e,f); Util.waitUntilAllChannelsHaveSameView(5000, 100, _g,_h,_i); - waitUntilRoute(NYC, true, 5000, 500, _a); - waitUntilRoute(SFO, true, 5000, 500, _a); - waitUntilRoute(LON, true, 5000, 500, _d); - waitUntilRoute(SFO, true, 5000, 500, _d); + waitUntilRoute(NYC, true, 5000, 500, a); + waitUntilRoute(SFO, true, 5000, 500, a); + waitUntilRoute(LON, true, 5000, 500, d); + waitUntilRoute(SFO, true, 5000, 500, d); waitUntilRoute(LON, true, 5000, 500, _g); waitUntilRoute(NYC, true, 5000, 500, _g); - assert Stream.of(_a,_b,_c,_d,_e,_f,_g,_h,_i) - .allMatch(c -> c.getView().size() == 3); - assert Stream.of(_a,_d,_g).map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) + assert Stream.of(a,b,c,d,e,f,_g,_h,_i).allMatch(c -> c.getView().size() == 3); + assert Stream.of(a,d,_g).map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .allMatch(RELAY2::isSiteMaster); - Stream.of(_d,_e,_f,_g,_h,_i) + Stream.of(d,e,f,_g,_h,_i) .map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(false))); // now stop A; B will become new site master and we should get a site-down(NYC), then site-up(NYC) - Util.close(_a); - Util.waitUntil(5000, 500, () -> Stream.of(_d,_e,_f,_g,_h,_i) + Util.close(a); + Util.waitUntil(5000, 500, () -> Stream.of(d,e,f,_g,_h,_i) .map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .peek(r -> System.out.printf("%s: %s\n", r.getAddress(), r.getRouteStatusListener())) .map(r -> (MyRouteStatusListener)r.getRouteStatusListener()) @@ -417,18 +353,253 @@ public void testSitesUp() throws Exception { } } - protected static class SiteMasterPickerImpl implements SiteMasterPicker { - public SiteMasterPickerImpl() { + + /** Tests sending and receiving of messages across sites */ + public void testSendAndReceiveMulticasts() throws Exception { + createSymmetricNetwork(ch -> new MyReceiver().rawMsgs(true)); + + // sends multicasts from site-master (_a) and non site masters (_b,_c, ... + for(JChannel ch: allChannels()) + ch.send(null, String.format("%s", ch.getAddress())); + Util.waitUntil(5000, 500, + () -> allChannels().stream().peek(Relay2Test::printMessages) + .map(Relay2Test::getReceiver) + .allMatch(r -> r.size() == 6)); + } + + /** Tests sending of multicasts and reception of responses */ + public void testSendAndReceiveMulticastsAndUnicastResponses() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + for(JChannel ch: allChannels()) + ch.send(null, String.format("%s", ch.getAddress())); + Util.waitUntil(5000, 500, + () -> allChannels().stream().peek(Relay2Test::printMessages) + .map(Relay2Test::getReceiver) + .allMatch(r -> r.size() == 6 * 2)); + + for(JChannel ch: allChannels()) { + List list=getReceiver(ch).list(); + // assert that there are 6 multicasts and 6 unicasts + assert expectedMulticasts(list,6); + assert expectedUnicasts(list,6); + + Set
senders=list.stream().map(Message::src).filter(Objects::nonNull) + .collect(Collectors.toSet()); + assert allChannels().stream().map(JChannel::getAddress).collect(Collectors.toSet()).equals(senders); } + } - public Address pickSiteMaster(List
site_masters, Address original_sender) { - return site_masters.get(0); + /** Tests sending to the 3 site masters A, C and E, and expecting responses */ + public void testSendingToSiteMasters() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + for(JChannel ch: allChannels()) { + for(String site: new String[]{LON,NYC,SFO}) { + Address target=new SiteMaster(site); + ch.send(target, String.format("%s", ch.getAddress())); + } } - public Route pickRoute(String site, List routes, Address original_sender) { - return routes.get(0); + // each site-master received 6 messages and 3 responses, each non-SM received 3 responses only + Util.waitUntil(5000, 200, () -> { + for(JChannel ch: allChannels()) { + List list=getReceiver(ch).list(); + int expected_size=isSiteMaster(ch)? 9 : 3; + if(expected_size != list.size()) + return false; + printMessages(ch); + } + return true; + }); + + // check the address of the responses: all non site masters must have responses from site masters to them; the + // site masters' addresses are the actual address of the members acting as site masters + assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> getReceiver(ch).list()) + .allMatch(l -> l.stream().allMatch(m -> m.dest() != null && m.src() != null)); + + // check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters) + assert allChannels().stream().filter(RelayTests::isSiteMaster) + .map(ch -> ((MyReceiver)ch.getReceiver()).list()) + .allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6); + } + + /** Same as above but with SiteMaster(null) as target */ + public void testSendingToAllSiteMasters() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + for(JChannel ch: allChannels()) { + Address target=new SiteMaster(null); + ch.send(target, String.format("%s", ch.getAddress())); } + + // each site-master received 6 messages and 3 responses, each non-SM received 3 responses only + Util.waitUntil(5000, 200, () -> { + for(JChannel ch: allChannels()) { + List list=((MyReceiver)ch.getReceiver()).list(); + int expected_size=isSiteMaster(ch)? 9 : 3; + if(expected_size != list.size()) + return false; + printMessages(ch); + } + return true; + }); + + // check the address of the responses: all non site masters must have responses from site masters to them; the + // site masters' addresses are the actual address of the members acting as site masters + assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> getReceiver(ch).list()) + .allMatch(l -> l.stream().allMatch(m -> m.dest() != null && m.src() != null)); + + // check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters) + assert allChannels().stream().filter(RelayTests::isSiteMaster) + .map(ch -> getReceiver(ch).list()) + .allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6); + } + + /** Sends a message to all members of the local site only */ + public void testMulticastsToLocalSiteOnly() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + // these messages won't get forwarded beyond site "LON" as flag NO_RELAY is set + a.send(new ObjectMessage(null, "from-A").setFlag(Message.Flag.NO_RELAY)); + b.send(new ObjectMessage(null, "from-B").setFlag(Message.Flag.NO_RELAY)); + + Util.waitUntil(5000, 200, + () -> Stream.of(a,b).peek(Relay2Test::printMessages) + .map(Relay2Test::getReceiver) + .allMatch(r -> r.list().size() == 4)); + + assert Stream.of(a,b).map(ch -> getReceiver(ch).list()) + .allMatch(l -> l.stream().filter(m -> m.dest() == null).count() == 2 && + l.stream().filter(m -> m.dest() != null).count() == 2); + + List l=new ArrayList<>(allChannels()); + l.remove(a); l.remove(b); + // make sure that the other channels did not receive any messages: + assert l.stream().map(ch -> getReceiver(ch).list()).allMatch(List::isEmpty); + } + + /** A local multicast from a non site-master is forwarded to all members of all sites */ + public void localMulticastForwardedToAllSites() throws Exception { + createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); + b.send(null, "b-req"); // non site-master (A is SM) + d.send(null, "d-req"); // non site-master (C is SM) + + // all members in all sites should received the 2 multicasts: + Util.waitUntil(5000, 200, () -> allChannels().stream().peek(Relay2Test::printMessages) + .map(ch -> getReceiver(ch).list()) + .allMatch(l -> l.size() >= 2)); + + Util.waitUntil(5000, 200, + () -> Stream.of(b,d) + .peek(Relay2Test::printMessages) + .map(ch -> getReceiver(ch).list()) + .allMatch(l -> l.size() == 2 /* mcasts */ + 6 /* unicast rsps */)); + } + + /** Tests sending of unicasts to different local members, varying between site masters and non site masters */ + public void testLocalUnicasts() throws Exception { + createSymmetricNetwork(ch -> new UnicastResponseSender<>(ch).rawMsgs(true)); + try(JChannel _c=createNode(LON, "C", BRIDGE_CLUSTER, LON, NYC, SFO)){ + Util.waitUntilAllChannelsHaveSameView(5000, 100, a,b,_c); + _c.setReceiver(new UnicastResponseSender<>(_c).rawMsgs(true)); + // site master to non SM + a.send(b.getAddress(), new Data(REQ,"from A")); + assertNumMessages(1, a,b); + + // should bypass RELAY2 + a.send(new ObjectMessage(b.getAddress(), new Data(REQ,"from A")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(1, a,b); + + // non-SM to SM + b.send(a.getAddress(), new Data(REQ,"from B")); + assertNumMessages(1, a,b); + + // bypassing RELAY2 + b.send(new ObjectMessage(a.getAddress(), new Data(REQ,"from B")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(1, a,b); + + // SM to self + a.send(a.getAddress(), new Data(REQ,"from self")); + assertNumMessages(2, a); + + // bypasses RELAY2 + a.send(new ObjectMessage(a.getAddress(), new Data(REQ,"from self")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(2, a); + + // non-SM to self + b.send(b.getAddress(), new Data(REQ,"from self")); + assertNumMessages(2, b); + + // bypasses RELAY2 + b.send(new ObjectMessage(b.getAddress(), new Data(REQ,"from self")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(2, b); + + // non-SM to non-SM + b.send(_c.getAddress(), new Data(REQ,"from B")); + assertNumMessages(1, b,_c); + + // bypasses RELAY2 + b.send(new ObjectMessage(_c.getAddress(), new Data(REQ,"from B")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(1, b,_c); + } + } + + /** Sends unicasts between members of different sites */ + public void testUnicasts() throws Exception { + createSymmetricNetwork(ch -> new UnicastResponseSender<>(ch).rawMsgs(true)); + a.send(c.getAddress(), new Data(REQ,"hello from A")); + assertNumMessages(1, a,c); + + // SM to SM + c.send(a.getAddress(), new Data(REQ,"hello from C")); + assertNumMessages(1,a,c); + + // non-SM to SM + b.send(c.getAddress(), new Data(REQ,"hello from B")); + assertNumMessages(1, b,c); + + // SM to non-SM + a.send(d.getAddress(), new Data(REQ,"hello from A")); + assertNumMessages(1, a,d); + } + + protected static void assertNumMessages(int expected, JChannel ... channels) throws TimeoutException { + try { + Util.waitUntil(5000,100, + () -> Stream.of(channels).map(ch -> getReceiver(ch).list()).allMatch(l -> l.size() == expected), + () -> msgs(channels)); + } + finally { + Stream.of(channels).forEach(ch -> getReceiver(ch).reset()); + } + } + + protected static MyReceiver getReceiver(JChannel ch) { + return (MyReceiver)ch.getReceiver(); + } + + protected static int receivedMessages(JChannel ch) { + return getReceiver(ch).list().size(); + } + + protected static boolean expectedUnicasts(List msgs,int expected) { + return expectedDests(msgs,m -> m.dest() != null,expected); + } + + protected static boolean expectedMulticasts(List msgs,int expected) { + return expectedDests(msgs,m -> m.dest() == null,expected); + } + + protected static boolean expectedDests(List msgs,Predicate p,int expected) { + return msgs.stream().filter(p).count() == expected; + } + + protected static void printMessages(JChannel ... channels) { + System.out.println(msgs(channels)); + } + + protected static String msgs(JChannel... channels) { + return Stream.of(channels) + .map(ch -> String.format("%s: %s",ch.address(),getReceiver(ch).list(Message::getObject))) + .collect(Collectors.joining("\n")); } @@ -446,6 +617,56 @@ protected static JChannel createNode(String site_name, String node_name, int num return ch; } + protected void createSymmetricNetwork(Function r) throws Exception { + a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); + b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); + + c=createNode(NYC, "C", BRIDGE_CLUSTER, LON, NYC, SFO); + d=createNode(NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO); + + e=createNode(SFO, "E", BRIDGE_CLUSTER, LON, NYC, SFO); + f=createNode(SFO, "F", BRIDGE_CLUSTER, LON, NYC, SFO); + + if(r != null) + allChannels().forEach(ch -> ch.setReceiver(r.apply(ch))); + + Util.waitUntilAllChannelsHaveSameView(5000, 200, a,b); + Util.waitUntilAllChannelsHaveSameView(5000, 200, c,d); + Util.waitUntilAllChannelsHaveSameView(5000, 200, e,f); + } + + protected List allChannels() { + return Arrays.asList(a,b,c,d,e,f); + } + + + protected class MyUphandler implements UpHandler { + protected final BlockingQueue received=new LinkedBlockingDeque<>(); + protected final AtomicInteger siteUnreachableEvents=new AtomicInteger(0); + + public BlockingQueue getReceived() {return received;} + public int getSiteUnreachableEvents() {return siteUnreachableEvents.get();} + + @Override public UpHandler setLocalAddress(Address a) {return this;} + + @Override + public Object up(Event evt) { + if(evt.getType() == Event.SITE_UNREACHABLE) { + log.debug("Site %s is unreachable", (Object) evt.getArg()); + siteUnreachableEvents.incrementAndGet(); + } + return null; + } + + @Override + public Object up(Message msg) { + log.debug("Received %s from %s\n", new String(msg.getArray(), StandardCharsets.UTF_8), msg.getSrc()); + received.add(msg); + return null; + } + } + + } diff --git a/tests/junit-functional/org/jgroups/tests/RelayTests.java b/tests/junit-functional/org/jgroups/tests/RelayTests.java index 0a8008ee4b7..13e437ecd43 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTests.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTests.java @@ -1,8 +1,6 @@ -package org.jgroups.tests; + package org.jgroups.tests; -import org.jgroups.Address; -import org.jgroups.JChannel; -import org.jgroups.View; +import org.jgroups.*; import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; import org.jgroups.protocols.LOCAL_PING; @@ -11,13 +9,17 @@ import org.jgroups.protocols.UNICAST3; import org.jgroups.protocols.pbcast.GMS; import org.jgroups.protocols.pbcast.NAKACK2; -import org.jgroups.protocols.relay.RELAY2; -import org.jgroups.protocols.relay.Route; -import org.jgroups.protocols.relay.RouteStatusListener; +import org.jgroups.protocols.relay.*; import org.jgroups.protocols.relay.config.RelayConfig; import org.jgroups.stack.Protocol; +import org.jgroups.util.Bits; +import org.jgroups.util.MyReceiver; +import org.jgroups.util.SizeStreamable; import org.jgroups.util.Util; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -99,6 +101,11 @@ protected static Route getRoute(JChannel ch, String site_name) { return relay.getRoute(site_name); } + protected static boolean isSiteMaster(JChannel ch) { + RELAY2 r=ch.getProtocolStack().findProtocol(RELAY2.class); + return r != null && r.isSiteMaster(); + } + /** Creates a singleton view for each channel listed and injects it */ protected static void injectSingletonPartitions(JChannel ... channels) { for(JChannel ch: channels) { @@ -163,4 +170,98 @@ public String toString() { } } + protected static class ResponseSender extends MyReceiver { + protected final JChannel ch; + + public ResponseSender(JChannel ch) { + this.ch=ch; + } + + @Override + public void receive(Message msg) { + super.receive(msg); + if(msg.dest() == null || msg.dest() instanceof SiteMaster) { // send unicast response back to sender + Message rsp=new ObjectMessage(msg.src(),"rsp-" + ch.getAddress()); + if(msg.isFlagSet(Message.Flag.NO_RELAY)) + rsp.setFlag(Message.Flag.NO_RELAY); + try { + ch.send(rsp); + } + catch(Exception e) { + System.out.printf("%s: failed sending response: %s", ch.getAddress(), e); + } + } + } + } + + protected static class UnicastResponseSender extends MyReceiver { + protected final JChannel ch; + + public UnicastResponseSender(JChannel ch) { + this.ch=ch; + } + + public void receive(Message msg) { + super.receive(msg); + Object obj=msg.getObject(); + Data data=(Data)obj; + if(data.type == Data.Type.REQ) { + Message rsp=new ObjectMessage(msg.src(), new Data(Data.Type.RSP,String.valueOf(ch.getAddress()))); + if(msg.isFlagSet(Message.Flag.NO_RELAY)) + rsp.setFlag(Message.Flag.NO_RELAY); + try { + ch.send(rsp); + } + catch(Exception e) { + System.out.printf("%s: failed sending response: %s",ch.getAddress(),e); + } + } + } + } + + protected static class SiteMasterPickerImpl implements SiteMasterPicker { + public SiteMasterPickerImpl() { + } + + public Address pickSiteMaster(List
site_masters, Address original_sender) { + return site_masters.get(0); + } + + public Route pickRoute(String site, List routes, Address original_sender) { + return routes.get(0); + } + } + + protected static class Data implements SizeStreamable { + enum Type {REQ,RSP} + protected Type type; + protected String payload; + + public Data() {} + public Data(Type t, String s) { + type=t; + payload=s; + } + + public Type type() {return type;} + public String payload() {return payload;} + + public int serializedSize() { + return Integer.BYTES + Bits.sizeUTF(payload) +1; + } + + public void writeTo(DataOutput out) throws IOException { + out.writeInt(type.ordinal()); + Bits.writeString(payload, out); + } + + public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + this.type=Type.values()[in.readInt()]; + this.payload=Bits.readString(in); + } + + public String toString() { + return String.format("%s: %s", type, payload); + } + } }