From 47321ee34e412e97b9753821f08e24633b5fa61b Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Sun, 6 Aug 2023 15:06:37 +0200 Subject: [PATCH] - RELAY3: only the first site master should route a multicast message if we have multiple site masters (https://issues.redhat.com/browse/JGRP-2696) --- src/org/jgroups/protocols/relay/RELAY3.java | 16 ++- .../org/jgroups/tests/RelayTest.java | 122 +++++++++++++++++- .../org/jgroups/tests/RelayTests.java | 12 +- 3 files changed, 139 insertions(+), 11 deletions(-) diff --git a/src/org/jgroups/protocols/relay/RELAY3.java b/src/org/jgroups/protocols/relay/RELAY3.java index 5ddcb108d1..89d4c960c5 100644 --- a/src/org/jgroups/protocols/relay/RELAY3.java +++ b/src/org/jgroups/protocols/relay/RELAY3.java @@ -245,7 +245,7 @@ protected Object process(boolean down, Message msg) { case ALL: if(down) return routeThen(msg, null,() -> deliver(null, msg, true)); - return routeThen(msg, null, () -> passUp(msg)); + return dontRoute(msg)? passUp(msg) : routeThen(msg, null, () -> passUp(msg)); case SM_ALL: return routeThen(msg, null, () -> passUp(msg)); case SM: @@ -287,6 +287,20 @@ protected Object process(boolean down, Message msg) { return null; } + /** + * Determines if a message should be routed. If NO_RELAY is set, then the message won't be routed. If we have + * multiple site masters, and this site master is picked to route the message, then return true, else return false. + * JIRA: https://issues.redhat.com/browse/JGRP-2696 + */ + protected boolean dontRoute(Message msg) { + if(msg.isFlagSet(Flag.NO_RELAY)) + return true; // don't route + final List
sms=site_masters; + if(sms == null || sms.size() < 2) + return false; // do route + Address first_sm=sms.get(0); + return local_addr.equals(first_sm); + } /** * Sends a message to the given sites, or all sites (excluding the local site) diff --git a/tests/junit-functional/org/jgroups/tests/RelayTest.java b/tests/junit-functional/org/jgroups/tests/RelayTest.java index 14bd620086..a0bbfb6eb7 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTest.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -36,8 +37,8 @@ public class RelayTest extends RelayTests { protected JChannel d, e, f; // used for other tests AB CD EF protected JChannel x, y, z; // members in site "sfo - protected static final String BRIDGE_CLUSTER = "global"; - protected static final String SFO = "sfo", LON="lon", NYC="nyc"; + protected static final String BRIDGE_CLUSTER = "global"; + protected static final String SFO = "sfo", LON="lon", NYC="nyc"; @DataProvider protected Object[][] relayProvider() { @@ -56,7 +57,7 @@ protected Object[][] relayProvider() { public void testAddRelay2ToAnAlreadyConnectedChannel(Class cl) throws Exception { // Create and connect a channel. a=new JChannel(defaultStack()).name("A").connect(SFO); - System.out.println("Channel " + a.getName() + " is connected. View: " + a.getView()); + System.out.printf("Channel %s is connected. View: %s\n", a.getName(), a.getView()); // Add RELAY protocol to the already connected channel. RELAY relay=createSymmetricRELAY(cl, SFO, BRIDGE_CLUSTER, LON, SFO); @@ -436,7 +437,7 @@ assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> getRecei // check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters) assert allChannels().stream().filter(RelayTests::isSiteMaster) - .map(ch -> ((MyReceiver)ch.getReceiver()).list()) + .map(ch -> getReceiver(ch).list()) .allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6); } @@ -454,7 +455,7 @@ public void testSendingToAllSiteMasters(Class cl) throws Except // each site-master received 6 messages and 3 responses, each non-SM received 3 responses only Util.waitUntil(5000, 200, () -> { for(JChannel ch: allChannels()) { - List list=((MyReceiver)ch.getReceiver()).list(); + List list=getReceiver(ch).list(); int expected_size=isSiteMaster(ch)? 9 : 3; if(expected_size != list.size()) return false; @@ -590,6 +591,117 @@ public void testUnicasts(Class cl) throws Exception { assertNumMessages(1, a,d); } + /** Tests https://issues.redhat.com/browse/JGRP-2696 */ + public void testMulticastWithMultipleSiteMasters(Class cl) throws Exception { + if(cl.equals(RELAY2.class)) + return; + a=createNode(cl, LON, "A", BRIDGE_CLUSTER, false, LON, NYC, SFO); + b=createNode(cl, LON, "B", BRIDGE_CLUSTER, false, LON, NYC, SFO); + c=createNode(cl, LON, "C", BRIDGE_CLUSTER, false, LON, NYC, SFO); + + d=createNode(cl, NYC, "D", BRIDGE_CLUSTER, false, LON, NYC, SFO); + e=createNode(cl, NYC, "E", BRIDGE_CLUSTER, false, LON, NYC, SFO); + f=createNode(cl, NYC, "F", BRIDGE_CLUSTER, false, LON, NYC, SFO); + + try(JChannel _g=createNode(cl, SFO, "G", BRIDGE_CLUSTER, false, LON, NYC, SFO); + JChannel _h=createNode(cl, SFO, "H", BRIDGE_CLUSTER, false, LON, NYC, SFO); + JChannel _i=createNode(cl, SFO, "I", BRIDGE_CLUSTER, false, LON, NYC, SFO)) { + + Supplier> generator=() -> Stream.concat(allChannels().stream(), Stream.of(_g, _h, _i)); + generator.get().forEach(RelayTest::changeRELAY); + for(JChannel ch: Arrays.asList(a,b,c)) + ch.connect(LON); + Util.waitUntilAllChannelsHaveSameView(5000, 100, a, b, c); + + for(JChannel ch: Arrays.asList(d,e,f)) + ch.connect(NYC); + Util.waitUntilAllChannelsHaveSameView(5000, 100, d, e, f); + + for(JChannel ch: Arrays.asList(_g,_h,_i)) + ch.connect(SFO); + Util.waitUntilAllChannelsHaveSameView(5000, 100, _g, _h, _i); + + waitUntilRoute(NYC, true, 5000, 500, a); + waitUntilRoute(SFO, true, 5000, 500, a); + waitUntilRoute(LON, true, 5000, 500, d); + waitUntilRoute(SFO, true, 5000, 500, d); + waitUntilRoute(LON, true, 5000, 500, _g); + waitUntilRoute(NYC, true, 5000, 500, _g); + + assert Stream.of(a,b,d,e,_g,_h).map(ch -> ch.getProtocolStack().findProtocol(RELAY.class)) + .allMatch(r -> ((RELAY)r).isSiteMaster()); + assert Stream.of(c,f,_i).map(ch -> ch.getProtocolStack().findProtocol(RELAY.class)) + .noneMatch(r -> ((RELAY)r).isSiteMaster()); + + + generator.get().forEach(ch -> ch.setReceiver(new MyReceiver().rawMsgs(true))); + + // A and B (site masters) multicast 1 message each: every receiver should have exactly 2 messages + a.send(null, "from A"); + b.send(null, "from B"); + + Util.waitUntil(5000, 100, + () -> generator.get().map(RelayTest::getReceiver).allMatch(r -> r.size() == 2), + () -> printMessages(generator.get())); + + System.out.printf("received messages:\n%s\n", printMessages(generator.get())); + generator.get().forEach(ch -> getReceiver(ch).reset()); + + // destination of SiteMaster(null) is only available in RELAY3: + if(cl.equals(RELAY3.class)) { + // send to all site masters, but only *one* site master from each site is picked + + a.send(new SiteMaster(null), "from A"); + b.send(new SiteMaster(null), "from B"); + + // A sends to itself, plus site masters from NYC (D or E) and SFO (G or H) + // B sends to itself, plus site masters from NYC (D or E) and SFO (G or H) + // -> the default SiteMasterPicker impl in RELAY pick a random site master / route; if we disabled + // this and always picked the first site master / route in the list, only D and G would + // receive messages (2 each); E and H would receive 0 messages + Util.waitUntil(3000, 100, + () -> Stream.of(a,b).map(RelayTest::getReceiver).allMatch(r -> r.size() == 1)); + + // all other site masters (D or E, G or H) get A's and B's message: + Util.waitUntil(3000, 100, + () -> Stream.of(d,e,_g,_h) + // a site master receives 0, 1 or 2 messages: + .map(RelayTest::getReceiver).allMatch(r -> r.size() >= 0 && r.size() <= 2), + () -> printMessages(generator.get())); + System.out.printf("-- received messages:\n%s\n", printMessages(generator.get())); + generator.get().forEach(ch -> getReceiver(ch).reset()); + + c.send(new SiteMaster(null), "from C"); + // same as above: A or B receives 1 message, D or E and G or H + Util.waitUntil(3000, 100, () -> Stream.of(a,b,d,e,_g,_h).map(RelayTest::getReceiver) + .allMatch(r -> r.size() >= 0 && r.size() <= 2), () -> printMessages(generator.get())); + System.out.printf("-- received messages:\n%s\n", printMessages(generator.get())); + generator.get().forEach(ch -> getReceiver(ch).reset()); + } + + // C sends a multicast; A *or* B (but not both) should forward it to the other sites NYC and SFO + c.send(null, "from C"); + Util.waitUntil(3000, 100, + () -> generator.get().map(RelayTest::getReceiver).allMatch(r -> r.size() == 1), + () -> printMessages(generator.get())); + System.out.printf("-- received messages:\n%s\n", printMessages(generator.get())); + } + } + + protected static String printMessages(Stream s) { + return s.map(ch -> String.format("%s: %d msgs (%s)", ch.address(), getReceiver(ch).size(), + getReceiver(ch).list(Message::getObject))) + .collect(Collectors.joining("\n")); + } + + /** Set max_site_masters to 2 and relay_multicasts (in RELAY2) to true */ + protected static void changeRELAY(JChannel ch) { + RELAY relay=ch.getProtocolStack().findProtocol(RELAY.class); + relay.setMaxSiteMasters(2); + if(relay instanceof RELAY2) + ((RELAY2)relay).relayMulticasts(true); + } + protected static SiteUUID makeSiteUUID(Address addr, String site) { String name=NameCache.get(addr); return new SiteUUID((UUID)addr, name, site); diff --git a/tests/junit-functional/org/jgroups/tests/RelayTests.java b/tests/junit-functional/org/jgroups/tests/RelayTests.java index 4fd398bb2c..d234e0dbeb 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTests.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTests.java @@ -3,10 +3,7 @@ import org.jgroups.*; import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; - import org.jgroups.protocols.LOCAL_PING; - import org.jgroups.protocols.MERGE3; - import org.jgroups.protocols.TCP; - import org.jgroups.protocols.UNICAST3; + import org.jgroups.protocols.*; import org.jgroups.protocols.pbcast.GMS; import org.jgroups.protocols.pbcast.NAKACK2; import org.jgroups.protocols.relay.*; @@ -65,9 +62,14 @@ protected static Protocol[] defaultStack(Protocol ... additional_protocols) { */ protected static JChannel createNode(Class cl, String site, String name, String bridge, String ... sites) throws Exception { + return createNode(cl, site, name, bridge, true, sites); + } + + protected static JChannel createNode(Class cl, String site, String name, String bridge, + boolean connect, String ... sites) throws Exception { RELAY relay=createSymmetricRELAY(cl, site, bridge, sites); JChannel ch=new JChannel(defaultStack(relay)).name(name); - if(site != null) + if(connect) ch.connect(site); return ch; }