Skip to content

Commit

Permalink
- RELAY3: only the first site master should route a multicast message…
Browse files Browse the repository at this point in the history
… if we have multiple site masters (https://issues.redhat.com/browse/JGRP-2696)
  • Loading branch information
belaban committed Aug 7, 2023
1 parent 1d08a4f commit 47321ee
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 11 deletions.
16 changes: 15 additions & 1 deletion src/org/jgroups/protocols/relay/RELAY3.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ protected Object process(boolean down, Message msg) {
case ALL:
if(down)
return routeThen(msg, null,() -> deliver(null, msg, true));
return routeThen(msg, null, () -> passUp(msg));
return dontRoute(msg)? passUp(msg) : routeThen(msg, null, () -> passUp(msg));
case SM_ALL:
return routeThen(msg, null, () -> passUp(msg));
case SM:
Expand Down Expand Up @@ -287,6 +287,20 @@ protected Object process(boolean down, Message msg) {
return null;
}

/**
* Determines if a message should be routed. If NO_RELAY is set, then the message won't be routed. If we have
* multiple site masters, and this site master is picked to route the message, then return true, else return false.
* JIRA: https://issues.redhat.com/browse/JGRP-2696
*/
protected boolean dontRoute(Message msg) {
if(msg.isFlagSet(Flag.NO_RELAY))
return true; // don't route
final List<Address> sms=site_masters;
if(sms == null || sms.size() < 2)
return false; // do route
Address first_sm=sms.get(0);
return local_addr.equals(first_sm);
}

/**
* Sends a message to the given sites, or all sites (excluding the local site)
Expand Down
122 changes: 117 additions & 5 deletions tests/junit-functional/org/jgroups/tests/RelayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -36,8 +37,8 @@ public class RelayTest extends RelayTests {
protected JChannel d, e, f; // used for other tests AB CD EF
protected JChannel x, y, z; // members in site "sfo

protected static final String BRIDGE_CLUSTER = "global";
protected static final String SFO = "sfo", LON="lon", NYC="nyc";
protected static final String BRIDGE_CLUSTER = "global";
protected static final String SFO = "sfo", LON="lon", NYC="nyc";

@DataProvider
protected Object[][] relayProvider() {
Expand All @@ -56,7 +57,7 @@ protected Object[][] relayProvider() {
public void testAddRelay2ToAnAlreadyConnectedChannel(Class<? extends RELAY> cl) throws Exception {
// Create and connect a channel.
a=new JChannel(defaultStack()).name("A").connect(SFO);
System.out.println("Channel " + a.getName() + " is connected. View: " + a.getView());
System.out.printf("Channel %s is connected. View: %s\n", a.getName(), a.getView());

// Add RELAY protocol to the already connected channel.
RELAY relay=createSymmetricRELAY(cl, SFO, BRIDGE_CLUSTER, LON, SFO);
Expand Down Expand Up @@ -436,7 +437,7 @@ assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> getRecei

// check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters)
assert allChannels().stream().filter(RelayTests::isSiteMaster)
.map(ch -> ((MyReceiver<Message>)ch.getReceiver()).list())
.map(ch -> getReceiver(ch).list())
.allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6);
}

Expand All @@ -454,7 +455,7 @@ public void testSendingToAllSiteMasters(Class<? extends RELAY> cl) throws Except
// each site-master received 6 messages and 3 responses, each non-SM received 3 responses only
Util.waitUntil(5000, 200, () -> {
for(JChannel ch: allChannels()) {
List<Message> list=((MyReceiver<Message>)ch.getReceiver()).list();
List<Message> list=getReceiver(ch).list();
int expected_size=isSiteMaster(ch)? 9 : 3;
if(expected_size != list.size())
return false;
Expand Down Expand Up @@ -590,6 +591,117 @@ public void testUnicasts(Class<? extends RELAY> cl) throws Exception {
assertNumMessages(1, a,d);
}

/** Tests https://issues.redhat.com/browse/JGRP-2696 */
public void testMulticastWithMultipleSiteMasters(Class<? extends RELAY> cl) throws Exception {
if(cl.equals(RELAY2.class))
return;
a=createNode(cl, LON, "A", BRIDGE_CLUSTER, false, LON, NYC, SFO);
b=createNode(cl, LON, "B", BRIDGE_CLUSTER, false, LON, NYC, SFO);
c=createNode(cl, LON, "C", BRIDGE_CLUSTER, false, LON, NYC, SFO);

d=createNode(cl, NYC, "D", BRIDGE_CLUSTER, false, LON, NYC, SFO);
e=createNode(cl, NYC, "E", BRIDGE_CLUSTER, false, LON, NYC, SFO);
f=createNode(cl, NYC, "F", BRIDGE_CLUSTER, false, LON, NYC, SFO);

try(JChannel _g=createNode(cl, SFO, "G", BRIDGE_CLUSTER, false, LON, NYC, SFO);
JChannel _h=createNode(cl, SFO, "H", BRIDGE_CLUSTER, false, LON, NYC, SFO);
JChannel _i=createNode(cl, SFO, "I", BRIDGE_CLUSTER, false, LON, NYC, SFO)) {

Supplier<Stream<JChannel>> generator=() -> Stream.concat(allChannels().stream(), Stream.of(_g, _h, _i));
generator.get().forEach(RelayTest::changeRELAY);
for(JChannel ch: Arrays.asList(a,b,c))
ch.connect(LON);
Util.waitUntilAllChannelsHaveSameView(5000, 100, a, b, c);

for(JChannel ch: Arrays.asList(d,e,f))
ch.connect(NYC);
Util.waitUntilAllChannelsHaveSameView(5000, 100, d, e, f);

for(JChannel ch: Arrays.asList(_g,_h,_i))
ch.connect(SFO);
Util.waitUntilAllChannelsHaveSameView(5000, 100, _g, _h, _i);

waitUntilRoute(NYC, true, 5000, 500, a);
waitUntilRoute(SFO, true, 5000, 500, a);
waitUntilRoute(LON, true, 5000, 500, d);
waitUntilRoute(SFO, true, 5000, 500, d);
waitUntilRoute(LON, true, 5000, 500, _g);
waitUntilRoute(NYC, true, 5000, 500, _g);

assert Stream.of(a,b,d,e,_g,_h).map(ch -> ch.getProtocolStack().findProtocol(RELAY.class))
.allMatch(r -> ((RELAY)r).isSiteMaster());
assert Stream.of(c,f,_i).map(ch -> ch.getProtocolStack().findProtocol(RELAY.class))
.noneMatch(r -> ((RELAY)r).isSiteMaster());


generator.get().forEach(ch -> ch.setReceiver(new MyReceiver<Message>().rawMsgs(true)));

// A and B (site masters) multicast 1 message each: every receiver should have exactly 2 messages
a.send(null, "from A");
b.send(null, "from B");

Util.waitUntil(5000, 100,
() -> generator.get().map(RelayTest::getReceiver).allMatch(r -> r.size() == 2),
() -> printMessages(generator.get()));

System.out.printf("received messages:\n%s\n", printMessages(generator.get()));
generator.get().forEach(ch -> getReceiver(ch).reset());

// destination of SiteMaster(null) is only available in RELAY3:
if(cl.equals(RELAY3.class)) {
// send to all site masters, but only *one* site master from each site is picked

a.send(new SiteMaster(null), "from A");
b.send(new SiteMaster(null), "from B");

// A sends to itself, plus site masters from NYC (D or E) and SFO (G or H)
// B sends to itself, plus site masters from NYC (D or E) and SFO (G or H)
// -> the default SiteMasterPicker impl in RELAY pick a random site master / route; if we disabled
// this and always picked the first site master / route in the list, only D and G would
// receive messages (2 each); E and H would receive 0 messages
Util.waitUntil(3000, 100,
() -> Stream.of(a,b).map(RelayTest::getReceiver).allMatch(r -> r.size() == 1));

// all other site masters (D or E, G or H) get A's and B's message:
Util.waitUntil(3000, 100,
() -> Stream.of(d,e,_g,_h)
// a site master receives 0, 1 or 2 messages:
.map(RelayTest::getReceiver).allMatch(r -> r.size() >= 0 && r.size() <= 2),
() -> printMessages(generator.get()));
System.out.printf("-- received messages:\n%s\n", printMessages(generator.get()));
generator.get().forEach(ch -> getReceiver(ch).reset());

c.send(new SiteMaster(null), "from C");
// same as above: A or B receives 1 message, D or E and G or H
Util.waitUntil(3000, 100, () -> Stream.of(a,b,d,e,_g,_h).map(RelayTest::getReceiver)
.allMatch(r -> r.size() >= 0 && r.size() <= 2), () -> printMessages(generator.get()));
System.out.printf("-- received messages:\n%s\n", printMessages(generator.get()));
generator.get().forEach(ch -> getReceiver(ch).reset());
}

// C sends a multicast; A *or* B (but not both) should forward it to the other sites NYC and SFO
c.send(null, "from C");
Util.waitUntil(3000, 100,
() -> generator.get().map(RelayTest::getReceiver).allMatch(r -> r.size() == 1),
() -> printMessages(generator.get()));
System.out.printf("-- received messages:\n%s\n", printMessages(generator.get()));
}
}

protected static String printMessages(Stream<JChannel> s) {
return s.map(ch -> String.format("%s: %d msgs (%s)", ch.address(), getReceiver(ch).size(),
getReceiver(ch).list(Message::getObject)))
.collect(Collectors.joining("\n"));
}

/** Set max_site_masters to 2 and relay_multicasts (in RELAY2) to true */
protected static void changeRELAY(JChannel ch) {
RELAY relay=ch.getProtocolStack().findProtocol(RELAY.class);
relay.setMaxSiteMasters(2);
if(relay instanceof RELAY2)
((RELAY2)relay).relayMulticasts(true);
}

protected static SiteUUID makeSiteUUID(Address addr, String site) {
String name=NameCache.get(addr);
return new SiteUUID((UUID)addr, name, site);
Expand Down
12 changes: 7 additions & 5 deletions tests/junit-functional/org/jgroups/tests/RelayTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import org.jgroups.*;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.LOCAL_PING;
import org.jgroups.protocols.MERGE3;
import org.jgroups.protocols.TCP;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.relay.*;
Expand Down Expand Up @@ -65,9 +62,14 @@ protected static Protocol[] defaultStack(Protocol ... additional_protocols) {
*/
protected static JChannel createNode(Class<? extends RELAY> cl, String site, String name, String bridge,
String ... sites) throws Exception {
return createNode(cl, site, name, bridge, true, sites);
}

protected static JChannel createNode(Class<? extends RELAY> cl, String site, String name, String bridge,
boolean connect, String ... sites) throws Exception {
RELAY relay=createSymmetricRELAY(cl, site, bridge, sites);
JChannel ch=new JChannel(defaultStack(relay)).name(name);
if(site != null)
if(connect)
ch.connect(site);
return ch;
}
Expand Down

0 comments on commit 47321ee

Please sign in to comment.