From 5ad45442b85b6e7903ddcba511bda6b254affaa2 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Tue, 25 Jul 2023 16:03:31 +0200 Subject: [PATCH] ns --- src/org/jgroups/protocols/relay/RELAY2.java | 30 ++- src/org/jgroups/util/MyReceiver.java | 3 +- .../org/jgroups/tests/Relay2Test.java | 220 ++++++++++-------- .../org/jgroups/tests/RelayTests.java | 112 ++++++++- 4 files changed, 249 insertions(+), 116 deletions(-) diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index f58d4ca719..dcb004aa11 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -27,7 +27,8 @@ import static org.jgroups.protocols.relay.Relay2Header.*; // todo: check if copy is needed in route(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not) - +// todo: use CompletableFutures in routeThen(); this could parallelize routing and delivery/passsing up +// todo: check if a message can bypass RELAY2 completely when NO_RELAY is set (in up(),down()) /** * Provides relaying of messages between autonomous sites.
* Design: ./doc/design/RELAY2.txt and at https://github.com/belaban/JGroups/blob/master/doc/design/RELAY2.txt.
@@ -41,7 +42,8 @@ @MBean(description="RELAY2 protocol") public class RELAY2 extends Protocol { // reserved flags - public static final short can_become_site_master_flag = 1 << 1; + public static final short can_become_site_master_flag = 1 << 1; + protected static final String RELAY2_CL=RELAY2.class.getSimpleName(); /* ------------------------------------------ Properties ---------------------------------------------- */ @Property(description="Name of the site; must be defined in the configuration",writable=false) @@ -430,6 +432,8 @@ public Object down(Event evt) { } public Object down(Message msg) { + //if(msg.isFlagSet(Flag.NO_RELAY)) + // return down_prot.down(msg); msg.src(local_addr); return process(true, msg); } @@ -441,6 +445,8 @@ public Object up(Event evt) { } public Object up(Message msg) { + // if(msg.isFlagSet(Flag.NO_RELAY)) + // return up_prot.up(msg); Message copy=msg; Relay2Header hdr=msg.getHeader(id); if(hdr != null) { @@ -458,6 +464,8 @@ public void up(MessageBatch batch) { List unreachable_sites=null; for(Iterator it=batch.iterator(); it.hasNext();) { Message msg=it.next(), copy=msg; + // if(msg.isFlagSet(Flag.NO_RELAY)) + // continue; Relay2Header hdr=msg.getHeader(id); it.remove(); if(hdr != null) { @@ -546,6 +554,9 @@ public void handleView(View view) { topo().adjust(this.site, view.getMembers()); } + public String toString() { + return String.format("%s%s", RELAY2_CL, local_addr != null? String.format(" (%s)", local_addr) : ""); + } /** Called to handle a message received from a different site (via a bridge channel) */ protected void handleRelayMessage(Relay2Header hdr, Message msg) { @@ -605,21 +616,22 @@ protected Object process(boolean down, Message msg) { switch(type) { case ALL: if(down) - return routeThen(msg,null,() -> deliver(null, msg, true)); - return routeThen(msg,null,() -> passUp(msg)); + return routeThen(msg, null,() -> deliver(null, msg, true)); + return routeThen(msg, null, () -> passUp(msg)); case SM_ALL: - return routeThen(msg,null,() -> passUp(msg)); + return routeThen(msg, null, () -> passUp(msg)); case SM: if(sameSite(dst)) return passUp(msg); return route(msg, Arrays.asList(dst.getSite())); case UNICAST: if(sameSite(dst)) { - if(down) - return deliver(dst, msg, false); if(local_addr.equals(dst)) return passUp(msg); - return deliver(dst, msg, false); + if(down) + return deliver(dst, msg, false); + String s=String.format("a unicast requires dest (%s) == local_addr (%s)", dst, local_addr); + throw new IllegalStateException(s); } else return route(msg, Arrays.asList(dst.getSite())); @@ -638,7 +650,7 @@ protected Object process(boolean down, Message msg) { throw new IllegalStateException(String.format("non site master received a sg with dst %s",dst)); case UNICAST: if(down) { - if(sameSite(dst)) + if(sameSite(dst)) // todo: if same address -> passUp() return deliver(dst, msg, false); return sendToLocalSiteMaster(local_addr, msg); } diff --git a/src/org/jgroups/util/MyReceiver.java b/src/org/jgroups/util/MyReceiver.java index 6c1e5735f6..c7701d90a4 100644 --- a/src/org/jgroups/util/MyReceiver.java +++ b/src/org/jgroups/util/MyReceiver.java @@ -25,9 +25,8 @@ public class MyReceiver implements Receiver, Closeable { public void receive(Message msg) { T obj=raw_msgs? (T)msg : (T)msg.getObject(); list.add(obj); - if(verbose) { + if(verbose) System.out.println((name() != null? name() + ":" : "") + " received message from " + msg.getSrc() + ": " + obj); - } } @Override diff --git a/tests/junit-functional/org/jgroups/tests/Relay2Test.java b/tests/junit-functional/org/jgroups/tests/Relay2Test.java index e965913f0b..4c340f306a 100644 --- a/tests/junit-functional/org/jgroups/tests/Relay2Test.java +++ b/tests/junit-functional/org/jgroups/tests/Relay2Test.java @@ -13,12 +13,15 @@ import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.jgroups.tests.RelayTests.Data.Type.REQ; + /** * Various RELAY2-related tests * @author Bela Ban @@ -129,26 +132,6 @@ public void testMissingRouteAfterMerge() throws Exception { } - /** - * Tests whether the bridge channel connects and disconnects ok. - */ - public void testConnectAndReconnectOfBridgeStack() throws Exception { - a=new JChannel(defaultStack()).setName("A"); - b=new JChannel(defaultStack()).setName("B"); - - a.connect(BRIDGE_CLUSTER); - b.connect(BRIDGE_CLUSTER); - Util.waitUntilAllChannelsHaveSameView(10000, 500, a, b); - - b.disconnect(); - Util.waitUntilAllChannelsHaveSameView(10000, 500, a); - - b.connect(BRIDGE_CLUSTER); - Util.waitUntilAllChannelsHaveSameView(10000, 500, a, b); - } - - - /** * Tests sites LON and SFO, with SFO disconnecting (bridge view on LON should be 1) and reconnecting (bridge view on * LON and SFO should be 2) @@ -216,12 +199,12 @@ public void testUnknownAndUpStateTransitions() throws Exception { System.out.println("Disconnecting X"); x.disconnect(); - System.out.println("A: waiting for site SFO to be UNKNOWN"); + System.out.println("A: waiting for site SFO to be DOWN"); waitUntilRoute(SFO, false, 20000, 500, a); - System.out.println("Reconnecting X, waiting for 5 seconds to see if the route is marked as DOWN"); + System.out.println("Reconnecting X"); x.connect(SFO); - Util.sleep(5000); + waitUntilRoute(SFO, true, 5000, 100, a); Route route=getRoute(a, SFO); assert route != null : "route is " + route + " (expected to be UP)"; @@ -277,32 +260,6 @@ public void testSiteUnreachableMessageBreaksSiteUUID() throws Exception { : "Expecting 100 site unreachable events on node A but got " + h2.getSiteUnreachableEvents(); } - protected class MyUphandler implements UpHandler { - protected final BlockingQueue received=new LinkedBlockingDeque<>(); - protected final AtomicInteger siteUnreachableEvents=new AtomicInteger(0); - - public BlockingQueue getReceived() {return received;} - public int getSiteUnreachableEvents() {return siteUnreachableEvents.get();} - - @Override public UpHandler setLocalAddress(Address a) {return this;} - - @Override - public Object up(Event evt) { - if(evt.getType() == Event.SITE_UNREACHABLE) { - log.debug("Site %s is unreachable", (Object) evt.getArg()); - siteUnreachableEvents.incrementAndGet(); - } - return null; - } - - @Override - public Object up(Message msg) { - log.debug("Received %s from %s\n", new String(msg.getArray(), StandardCharsets.UTF_8), msg.getSrc()); - received.add(msg); - return null; - } - } - /** @@ -311,8 +268,8 @@ public Object up(Message msg) { * despite using multiple site masters. JIRA: https://issues.redhat.com/browse/JGRP-2112 */ public void testSenderOrderWithMultipleSiteMasters() throws Exception { - MyReceiver rx=new MyReceiver<>().rawMsgs(true).verbose(true), - ry=new MyReceiver<>().rawMsgs(true).verbose(true), rz=new MyReceiver<>().rawMsgs(true).verbose(true); + MyReceiver rx=new MyReceiver<>().rawMsgs(true), ry=new MyReceiver<>().rawMsgs(true), + rz=new MyReceiver<>().rawMsgs(true); final int NUM=512; final String sm_picker_impl=SiteMasterPickerImpl.class.getName(); a=createNode(LON, "A", 2, sm_picker_impl); @@ -336,17 +293,7 @@ public void testSenderOrderWithMultipleSiteMasters() throws Exception { c.send(msg); } - boolean running=true; - for(int i=0; running && i < 10; i++) { - for(MyReceiver r: Arrays.asList(rx,ry,rz)) { - if(r.size() >= NUM) { - running=false; - break; - } - } - Util.sleep(1000); - } - + Util.waitUntilTrue(10000, 500, () -> Stream.of(rx,ry,rz).anyMatch(l -> l.size() >= NUM)); System.out.printf("X: size=%d\nY: size=%d\nZ: size=%d\n", rx.size(), ry.size(), rz.size()); assert rx.size() == NUM || ry.size() == NUM; assert rz.size() == 0; @@ -471,7 +418,7 @@ assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> getRecei .allMatch(l -> l.stream().allMatch(m -> m.dest() != null && m.src() != null)); // check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters) - assert allChannels().stream().filter(Relay2Test::isSiteMaster) + assert allChannels().stream().filter(RelayTests::isSiteMaster) .map(ch -> ((MyReceiver)ch.getReceiver()).list()) .allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6); } @@ -502,13 +449,13 @@ assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> getRecei .allMatch(l -> l.stream().allMatch(m -> m.dest() != null && m.src() != null)); // check that we have 6 messages with dest=SiteMaster(S) where S is the current site (only in site masters) - assert allChannels().stream().filter(Relay2Test::isSiteMaster) + assert allChannels().stream().filter(RelayTests::isSiteMaster) .map(ch -> getReceiver(ch).list()) .allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6); } /** Sends a message to all members of the local site only */ - public void sendToLocalSiteOnly() throws Exception { + public void testMulticastsToLocalSiteOnly() throws Exception { createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); // these messages won't get forwarded beyond site "LON" as flag NO_RELAY is set a.send(new ObjectMessage(null, "from-A").setFlag(Message.Flag.NO_RELAY)); @@ -533,7 +480,7 @@ public void sendToLocalSiteOnly() throws Exception { public void localMulticastForwardedToAllSites() throws Exception { createSymmetricNetwork(ch -> new ResponseSender(ch).rawMsgs(true)); b.send(null, "b-req"); // non site-master (A is SM) - d.send(null, "c-req"); // non site-master (C is SM) + d.send(null, "d-req"); // non site-master (C is SM) // all members in all sites should received the 2 multicasts: Util.waitUntil(5000, 200, () -> allChannels().stream().peek(Relay2Test::printMessages) @@ -545,13 +492,94 @@ public void localMulticastForwardedToAllSites() throws Exception { .peek(Relay2Test::printMessages) .map(ch -> getReceiver(ch).list()) .allMatch(l -> l.size() == 2 /* mcasts */ + 6 /* unicast rsps */)); + } + /** Tests sending of unicasts to different local members, varying between site masters and non site masters */ + public void testLocalUnicasts() throws Exception { + createSymmetricNetwork(ch -> new UnicastResponseSender<>(ch).rawMsgs(true)); + try(JChannel _c=createNode(LON, "C", BRIDGE_CLUSTER, LON, NYC, SFO)){ + Util.waitUntilAllChannelsHaveSameView(5000, 100, a,b,_c); + _c.setReceiver(new UnicastResponseSender<>(_c).rawMsgs(true)); + // site master to non SM + a.send(b.getAddress(), new Data(REQ,"from A")); + assertNumMessages(1, a,b); + + // should bypass RELAY2 + a.send(new ObjectMessage(b.getAddress(), new Data(REQ,"from A")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(1, a,b); + + // non-SM to SM + b.send(a.getAddress(), new Data(REQ,"from B")); + assertNumMessages(1, a,b); + + // bypassing RELAY2 + b.send(new ObjectMessage(a.getAddress(), new Data(REQ,"from B")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(1, a,b); + + // SM to self + a.send(a.getAddress(), new Data(REQ,"from self")); + assertNumMessages(2, a); + + // bypasses RELAY2 + a.send(new ObjectMessage(a.getAddress(), new Data(REQ,"from self")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(2, a); + + // non-SM to self + b.send(b.getAddress(), new Data(REQ,"from self")); + assertNumMessages(2, b); + + // bypasses RELAY2 + b.send(new ObjectMessage(b.getAddress(), new Data(REQ,"from self")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(2, b); + + // non-SM to non-SM + b.send(_c.getAddress(), new Data(REQ,"from B")); + assertNumMessages(1, b,_c); + + // bypasses RELAY2 + b.send(new ObjectMessage(_c.getAddress(), new Data(REQ,"from B")).setFlag(Message.Flag.NO_RELAY)); + assertNumMessages(1, b,_c); + } + } + + /** Sends unicasts between members of different sites */ + public void testUnicasts() throws Exception { + createSymmetricNetwork(ch -> new UnicastResponseSender<>(ch).rawMsgs(true)); + a.send(c.getAddress(), new Data(REQ,"hello from A")); + assertNumMessages(1, a,c); + + // SM to SM + c.send(a.getAddress(), new Data(REQ,"hello from C")); + assertNumMessages(1,a,c); + + // non-SM to SM + b.send(c.getAddress(), new Data(REQ,"hello from B")); + assertNumMessages(1, b,c); + + // SM to non-SM + a.send(d.getAddress(), new Data(REQ,"hello from A")); + assertNumMessages(1, a,d); + } + + protected static void assertNumMessages(int expected, JChannel ... channels) throws TimeoutException { + try { + Util.waitUntil(5000,100, + () -> Stream.of(channels).map(ch -> getReceiver(ch).list()).allMatch(l -> l.size() == expected), + () -> msgs(channels)); + } + finally { + Stream.of(channels).forEach(ch -> getReceiver(ch).reset()); + } } protected static MyReceiver getReceiver(JChannel ch) { return (MyReceiver)ch.getReceiver(); } + protected static int receivedMessages(JChannel ch) { + return getReceiver(ch).list().size(); + } + protected static boolean expectedUnicasts(List msgs,int expected) { return expectedDests(msgs,m -> m.dest() != null,expected); } @@ -564,8 +592,14 @@ protected static boolean expectedDests(List msgs,Predicate p,i return msgs.stream().filter(p).count() == expected; } - protected static void printMessages(JChannel ch) { - System.out.printf("%s: %s\n", ch.address(), ((MyReceiver)ch.getReceiver()).list(Message::getObject)); + protected static void printMessages(JChannel ... channels) { + System.out.println(msgs(channels)); + } + + protected static String msgs(JChannel... channels) { + return Stream.of(channels) + .map(ch -> String.format("%s: %s",ch.address(),getReceiver(ch).list(Message::getObject))) + .collect(Collectors.joining("\n")); } @@ -583,11 +617,6 @@ protected static JChannel createNode(String site_name, String node_name, int num return ch; } - protected static boolean isSiteMaster(JChannel ch) { - RELAY2 r=ch.getProtocolStack().findProtocol(RELAY2.class); - return r != null && r.isSiteMaster(); - } - protected void createSymmetricNetwork(Function r) throws Exception { a=createNode(LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); b=createNode(LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); @@ -610,39 +639,34 @@ protected List allChannels() { return Arrays.asList(a,b,c,d,e,f); } - protected static class ResponseSender extends MyReceiver { - protected final JChannel ch; - public ResponseSender(JChannel ch) { - this.ch=ch; - } + protected class MyUphandler implements UpHandler { + protected final BlockingQueue received=new LinkedBlockingDeque<>(); + protected final AtomicInteger siteUnreachableEvents=new AtomicInteger(0); + + public BlockingQueue getReceived() {return received;} + public int getSiteUnreachableEvents() {return siteUnreachableEvents.get();} + + @Override public UpHandler setLocalAddress(Address a) {return this;} @Override - public void receive(Message msg) { - super.receive(msg); - if(msg.dest() == null || msg.dest() instanceof SiteMaster) { // send unicast response back to sender - Message rsp=new ObjectMessage(msg.src(),"rsp-" + ch.getAddress()); - try { - ch.send(rsp); - } - catch(Exception e) { - System.out.printf("%s: failed sending response: %s", ch.getAddress(), e); - } + public Object up(Event evt) { + if(evt.getType() == Event.SITE_UNREACHABLE) { + log.debug("Site %s is unreachable", (Object) evt.getArg()); + siteUnreachableEvents.incrementAndGet(); } + return null; + } + + @Override + public Object up(Message msg) { + log.debug("Received %s from %s\n", new String(msg.getArray(), StandardCharsets.UTF_8), msg.getSrc()); + received.add(msg); + return null; } } - protected static class SiteMasterPickerImpl implements SiteMasterPicker { - public SiteMasterPickerImpl() { - } - public Address pickSiteMaster(List
site_masters, Address original_sender) { - return site_masters.get(0); - } - public Route pickRoute(String site, List routes, Address original_sender) { - return routes.get(0); - } - } } diff --git a/tests/junit-functional/org/jgroups/tests/RelayTests.java b/tests/junit-functional/org/jgroups/tests/RelayTests.java index 0a8008ee4b..8230a01aaf 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTests.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTests.java @@ -1,8 +1,6 @@ -package org.jgroups.tests; + package org.jgroups.tests; -import org.jgroups.Address; -import org.jgroups.JChannel; -import org.jgroups.View; +import org.jgroups.*; import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; import org.jgroups.protocols.LOCAL_PING; @@ -11,13 +9,17 @@ import org.jgroups.protocols.UNICAST3; import org.jgroups.protocols.pbcast.GMS; import org.jgroups.protocols.pbcast.NAKACK2; -import org.jgroups.protocols.relay.RELAY2; -import org.jgroups.protocols.relay.Route; -import org.jgroups.protocols.relay.RouteStatusListener; +import org.jgroups.protocols.relay.*; import org.jgroups.protocols.relay.config.RelayConfig; import org.jgroups.stack.Protocol; +import org.jgroups.util.Bits; +import org.jgroups.util.MyReceiver; +import org.jgroups.util.SizeStreamable; import org.jgroups.util.Util; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -99,6 +101,11 @@ protected static Route getRoute(JChannel ch, String site_name) { return relay.getRoute(site_name); } + protected static boolean isSiteMaster(JChannel ch) { + RELAY2 r=ch.getProtocolStack().findProtocol(RELAY2.class); + return r != null && r.isSiteMaster(); + } + /** Creates a singleton view for each channel listed and injects it */ protected static void injectSingletonPartitions(JChannel ... channels) { for(JChannel ch: channels) { @@ -163,4 +170,95 @@ public String toString() { } } + protected static class ResponseSender extends MyReceiver { + protected final JChannel ch; + + public ResponseSender(JChannel ch) { + this.ch=ch; + } + + @Override + public void receive(Message msg) { + super.receive(msg); + if(msg.dest() == null || msg.dest() instanceof SiteMaster) { // send unicast response back to sender + Message rsp=new ObjectMessage(msg.src(),"rsp-" + ch.getAddress()); + try { + ch.send(rsp); + } + catch(Exception e) { + System.out.printf("%s: failed sending response: %s", ch.getAddress(), e); + } + } + } + } + + protected static class UnicastResponseSender extends ResponseSender { + + public UnicastResponseSender(JChannel ch) { + super(ch); + } + + public void receive(Message msg) { + super.receive(msg); + Object obj=msg.getObject(); + Data data=(Data)obj; + if(data.type == Data.Type.REQ) { + Message rsp=new ObjectMessage(msg.src(),new Data(Data.Type.RSP,String.valueOf(ch.getAddress()))); + if(msg.isFlagSet(Message.Flag.NO_RELAY)) + rsp.setFlag(Message.Flag.NO_RELAY); + try { + ch.send(rsp); + } + catch(Exception e) { + System.out.printf("%s: failed sending response: %s",ch.getAddress(),e); + } + } + } + } + + protected static class SiteMasterPickerImpl implements SiteMasterPicker { + public SiteMasterPickerImpl() { + } + + public Address pickSiteMaster(List
site_masters, Address original_sender) { + return site_masters.get(0); + } + + public Route pickRoute(String site, List routes, Address original_sender) { + return routes.get(0); + } + } + + protected static class Data implements SizeStreamable { + enum Type {REQ,RSP} + protected Type type; + protected String payload; + + public Data() {} + public Data(Type t, String s) { + type=t; + payload=s; + } + + public Type type() {return type;} + public String payload() {return payload;} + + public int serializedSize() { + return Integer.BYTES + Bits.sizeUTF(payload) +1; + } + + public void writeTo(DataOutput out) throws IOException { + out.writeInt(type.ordinal()); + Bits.writeString(payload, out); + } + + public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + this.type=Type.values()[in.readInt()]; + this.payload=Bits.readString(in); + } + + public String toString() { + return String.format("%s: %s", type, payload); + } + } }