diff --git a/src/org/jgroups/demos/RelayDemo.java b/src/org/jgroups/demos/RelayDemo.java
index 879745ab0d..d449097845 100644
--- a/src/org/jgroups/demos/RelayDemo.java
+++ b/src/org/jgroups/demos/RelayDemo.java
@@ -138,6 +138,14 @@ protected boolean process(String line) {
System.out.printf("configured sites: %s\n", relay.getSites());
return true;
}
+ if(line.startsWith("tp")) { // topo-print
+ System.out.printf("\n%s\n", relay.topo().print());
+ return true;
+ }
+ if(line.startsWith("tc")) { // topo-clean
+ relay.topo().removeAll(null);
+ return true;
+ }
if(line.startsWith("topo")) {
String sub=line.substring("topo".length()).trim();
String site=null;
diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java
index 0e801842e0..09be38fbc8 100644
--- a/src/org/jgroups/protocols/relay/RELAY2.java
+++ b/src/org/jgroups/protocols/relay/RELAY2.java
@@ -1,6 +1,7 @@
package org.jgroups.protocols.relay;
import org.jgroups.*;
+import org.jgroups.Message.Flag;
import org.jgroups.annotations.*;
import org.jgroups.conf.AttributeType;
import org.jgroups.conf.ConfiguratorFactory;
@@ -24,8 +25,7 @@
import static org.jgroups.protocols.relay.Relay2Header.*;
-// todo: instead of sending one TOPO-RSP for *each* member, send one TOPO-RSP for *all* members -> 1 instead of N msgs
-// todo: check if copy is needed in reoute(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not)
+// todo: check if copy is needed in route(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not)
/**
* Provides relaying of messages between autonomous sites.
@@ -40,7 +40,7 @@
@MBean(description="RELAY2 protocol")
public class RELAY2 extends Protocol {
// reserved flags
- public static final short can_become_site_master_flag = 1 << 1;
+ public static final short can_become_site_master_flag = 1 << 1;
/* ------------------------------------------ Properties ---------------------------------------------- */
@Property(description="Name of the site; must be defined in the configuration",writable=false)
@@ -79,14 +79,10 @@ public class RELAY2 extends Protocol {
@Property(description="Fully qualified name of a class implementing SiteMasterPicker")
protected String site_master_picker_impl;
-
@Property(description="Time during which identical errors about no route to host will be suppressed. " +
"0 disables this (every error will be logged).",type=AttributeType.TIME)
protected long suppress_time_no_route_errors=60000;
-
- /* --------------------------------------------- Fields ------------------------------------------------ */
-
/** A map containing site names (e.g. "LON") as keys and SiteConfigs as values */
protected final Map sites=new HashMap<>();
@@ -149,7 +145,7 @@ public class RELAY2 extends Protocol {
protected final LongAdder forward_to_local_mbr_time=new LongAdder();
@Component(description="Maintains a cache of sites and members",name="topo")
- protected Topology topo=new Topology(this);
+ protected final Topology topo=new Topology(this);
/** Log to suppress identical errors for messages to non-existing sites ('no route to site X') */
protected SuppressLog suppress_log_no_route;
@@ -198,7 +194,7 @@ public class RELAY2 extends Protocol {
public RELAY2 broadcastRouteNotifications(boolean b) {this.broadcast_route_notifications=b; return this;}
public boolean canForwardLocalCluster() {return false;}
- public RELAY2 canForwardLocalCluster(boolean c) {return this;}
+ public RELAY2 canForwardLocalCluster(boolean ignored){return this;}
public long getTopoWaitTime() {return topo_wait_time;}
public RELAY2 setTopoWaitTime(long t) {this.topo_wait_time=t; return this;}
@@ -211,7 +207,7 @@ public class RELAY2 extends Protocol {
public RELAY2 setSiteMasterListener(Consumer l) {site_master_listener=l; return this;}
@ManagedAttribute(description="Number of messages forwarded to the local SiteMaster")
- public long getNumForwardedToSiteMaster() {return forward_to_site_master.sum();}
+ public long getNumForwardedToSiteMaster() {return forward_to_site_master.sum();}
@ManagedAttribute(description="The total time (in ms) spent forwarding messages to the local SiteMaster"
,type=AttributeType.TIME)
@@ -221,8 +217,6 @@ public class RELAY2 extends Protocol {
public long getAvgMsgsForwardingToSM() {return getTimeForwardingToSM() > 0?
(long)(getNumForwardedToSiteMaster() / (getTimeForwardingToSM()/1000.0)) : 0;}
-
-
@ManagedAttribute(description="Number of messages sent by this SiteMaster to a remote SiteMaster")
public long getNumRelayed() {return relayed.sum();}
@@ -353,7 +347,7 @@ else if(site_masters_ratio > 1) {
public void stop() {
super.stop();
is_site_master=false;
- log.trace(local_addr + ": ceased to be site master; closing bridges");
+ log.trace("%s: ceased to be site master; closing bridges", local_addr);
if(relayer != null)
relayer.stop();
}
@@ -433,57 +427,11 @@ public Object down(Event evt) {
return down_prot.down(evt);
}
-
public Object down(Message msg) {
msg.src(local_addr);
return process(true, msg);
-
- /*Address dest=msg.getDest();
- SiteAddress target=(SiteAddress)dest;
- Address src=msg.getSrc();
- SiteAddress sender=src instanceof SiteMaster? new SiteMaster(((SiteMaster)src).getSite())
- : new SiteUUID((UUID)local_addr, NameCache.get(local_addr), site);
- if(local_addr instanceof ExtendedUUID)
- ((ExtendedUUID)sender).addContents((ExtendedUUID)local_addr);
-
- if(target instanceof SiteMaster) {
- if(!is_site_master)
- sendToLocalSiteMaster(sender, msg);
- else {
- if(target.getSite() == null) { // send to *all* site masters
- msg.setSrc(local_addr);
- sendToBridges(msg);
- }
- if(local_addr.equals(target) || (target instanceof SiteMaster && is_site_master)) {
- // we cannot simply pass msg down, as the transport doesn't know how to send a message to a (e.g.) SiteMaster
- deliver(local_addr, msg);
- }
- }
- return null;
- }
-
- // target is in the same site; we can deliver the message in our local cluster
- if(site.equals(target.getSite()) || target.getSite() == null) {
- // we're the target or we're the site master and need to forward the message to a member of the local cluster
- if(local_addr.equals(target) || (target instanceof SiteMaster && is_site_master)) {
- // we cannot simply pass msg down, as the transport doesn't know how to send a message to a (e.g.) SiteMaster
- deliver(local_addr, msg);
- }
- else // forward to another member of the local cluster
- deliverLocally(target, sender, msg);
- return null;
- }
-
- // forward to the site master unless we're the site master (then route the message directly)
- if(!is_site_master)
- sendToLocalSiteMaster(sender, msg);
- else
- route(target, sender, msg);
- return null; */
}
-
-
public Object up(Event evt) {
if(evt.getType() == Event.VIEW_CHANGE)
handleView(evt.getArg());
@@ -493,29 +441,6 @@ public Object up(Event evt) {
public Object up(Message msg) {
Message copy=msg;
Relay2Header hdr=msg.getHeader(id);
- Address dest=msg.getDest(),
- sender=hdr != null && hdr.original_sender != null? hdr.original_sender : msg.src();
-
- // forward a multicast message to all bridges except myself, then pass up
- /*if(dest == null && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY))
- sendToBridges(msg);
-
- if(hdr == null) {
- TopoHeader topo_hdr=msg.getHeader(TOPO_ID);
- if(topo_hdr != null) {
- handleTopo(topo_hdr, sender, msg);
- return null;
- }
- deliver(dest, sender, msg); // fixes https://issues.redhat.com/browse/JGRP-2710
- }
- else {
- if(handleAdminMessage(hdr))
- return null;
- if(dest != null)
- handleMessage(hdr, msg);
- else
- deliver(dest, sender, msg);
- }*/
if(hdr != null) {
if(hdr.getType() == SITE_UNREACHABLE) {
triggerSiteUnreachableEvent((SiteAddress)hdr.final_dest);
@@ -523,7 +448,6 @@ public Object up(Message msg) {
}
//todo: check if copy is needed!
copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr);
- // msg.dest(hdr.final_dest).src(hdr.original_sender); // message can be changed as it is delivered (no xmits)
}
return process(false, copy);
}
@@ -533,9 +457,6 @@ public void up(MessageBatch batch) {
for(Iterator it=batch.iterator(); it.hasNext();) {
Message msg=it.next(), copy=msg;
Relay2Header hdr=msg.getHeader(id);
- Address dest=msg.getDest(),
- sender=hdr != null && hdr.original_sender != null? hdr.original_sender : batch.sender();
-
it.remove();
if(hdr != null) {
if(hdr.getType() == SITE_UNREACHABLE) {
@@ -549,44 +470,8 @@ public void up(MessageBatch batch) {
continue;
}
copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr);
- // msg.dest(hdr.final_dest).src(hdr.original_sender); // message can be changed as it is delivered (no xmits)
}
process(false, copy);
-
- // forward a multicast message to all bridges except myself, then pass up
- /*if((dest == null || dest instanceof SiteMaster && ((SiteMaster)dest).getSite() == null)
- && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY))
- sendToBridges(msg);*/
-
- /* if(dest == null)
- route(msg, null);
- if(hdr == null) {
- TopoHeader topo_hdr=msg.getHeader(TOPO_ID);
- if(topo_hdr != null) {
- handleTopo(topo_hdr, sender, msg);
- it.remove();
- }
- }
- else {
- it.remove(); // message is consumed
- if(handleAdminMessage(hdr))
- continue;
- if(dest != null) {
- if(hdr.getType() == SITE_UNREACHABLE) {
- SiteAddress site_addr=(SiteAddress)hdr.final_dest;
- String site_name=site_addr.getSite();
- if(unreachable_sites == null)
- unreachable_sites=new ArrayList<>();
- boolean contains=unreachable_sites.stream().anyMatch(sa -> sa.getSite().equals(site_name));
- if(!contains)
- unreachable_sites.add(site_addr);
- }
- else
- handleMessage(hdr, msg);
- }
- else
- deliver(null, hdr.original_sender, msg); //todo: replace in batch rather than pass up each msg
- }*/
}
if(unreachable_sites != null) {
for(SiteAddress sa: unreachable_sites)
@@ -660,20 +545,10 @@ public void handleView(View view) {
}
- /** Called to handle a message received by the relayer */
+ /** Called to handle a message received from a different site (via a bridge channel) */
protected void handleRelayMessage(Relay2Header hdr, Message msg) {
- //if(hdr.final_dest != null)
- // handleMessage(hdr, msg);
- // else {
- // Message copy=copy(msg).setDest(null).setSrc(null).putHeader(id, hdr);
- // down_prot.down(copy); // multicast locally
- // }
-
- Message copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender)
- .putHeader(id, hdr);
-
+ Message copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr);
// todo: check if copy is needed!
- // msg.dest(hdr.final_dest).src(hdr.original_sender); // message can be changed as it is delivered (no xmits)
process(true, copy);
}
@@ -709,113 +584,17 @@ protected boolean handleAdminMessage(Relay2Header hdr, Message msg) {
return true;
case TOPO_RSP:
Members mbrs=msg.getObject();
- topo.handleResponse(mbrs);
+ if(mbrs != null)
+ topo.handleResponse(mbrs);
return true;
}
return false;
}
- /** Called to handle a message received by the transport */
- protected void handleMessage(Relay2Header hdr, Message msg) {
- switch(hdr.type) {
- case DATA:
- route((SiteAddress)hdr.final_dest, (SiteAddress)hdr.original_sender, msg);
- break;
- case SITE_UNREACHABLE:
- String unreachable_site=hdr.sites != null && !hdr.sites.isEmpty()? hdr.sites.iterator().next() : null;
- if(unreachable_site != null)
- triggerSiteUnreachableEvent(new SiteMaster(unreachable_site));
- break;
- default:
- log.error("type " + hdr.type + " unknown");
- break;
- }
- }
-
-
- /**
- * Routes the message to the target destination, used by a site master (coordinator)
- *
- * @param dest the destination site address
- * @param sender the address of the sender
- * @param msg The message
- */
- protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
- String target_site=dest.getSite();
- if(site.equals(target_site) || target_site == null) {
- if(local_addr.equals(dest) || is_site_master && dest instanceof SiteMaster)
- deliver(dest, sender, msg);
- else
- deliverLocally(dest, sender, msg); // send to member in same local site
- return;
- }
- Relayer tmp=relayer;
- if(tmp == null) {
- log.warn(local_addr + ": not site master; dropping message");
- return;
- }
-
- Route route=tmp.getRoute(target_site, sender);
- if(route == null)
- route=tmp.getForwardingRouteMatching(target_site, sender);
- if(route == null) {
- suppress_log_no_route.log(SuppressLog.Level.error, target_site, suppress_time_no_route_errors, sender, target_site);
- sendSiteUnreachableTo(msg.getSrc(), target_site);
- }
- else
- route.send(dest,sender,msg);
- }
-
-
- /** Sends the message to all sites in the routing table, minus the local site */
- protected void sendToBridges(final Message msg) {
- Relayer tmp=relayer;
- Map> routes=tmp != null? tmp.routes : null;
- if(routes == null || routes.isEmpty())
- return;
- Address src=msg.getSrc();
- Relay2Header hdr=msg.getHeader(this.id);
- Address original_sender=hdr != null && hdr.original_sender != null? hdr.getOriginalSender() :
- new SiteUUID((UUID)src, NameCache.get(src), site);
- if(src instanceof ExtendedUUID)
- ((ExtendedUUID)original_sender).addContents((ExtendedUUID)src);
-
- Set visited_sites=new HashSet<>(routes.keySet()), // to be added to the header
- sites_to_visit=new HashSet<>(routes.keySet()); // sites to which to forward the message
-
- if(this.site != null) {
- visited_sites.add(this.site);
- sites_to_visit.remove(this.site); // don't send to the local site
- }
-
- if(hdr != null && hdr.hasVisitedSites()) {
- visited_sites.addAll(hdr.getVisitedSites());
- sites_to_visit.removeAll(hdr.getVisitedSites()); // avoid cycles (https://issues.redhat.com/browse/JGRP-1519)
- }
-
- for(String dest_site: sites_to_visit) {
- List val=routes.get(dest_site);
- if(val == null)
- continue;
- // try sending over all routes; break after the first successful send
- for(Route route: val) {
- if(log.isTraceEnabled())
- log.trace(local_addr + ": relaying multicast message from " + original_sender + " via route " + route);
- try {
- route.send(msg.dest(), original_sender, msg, visited_sites);
- break;
- }
- catch(Exception ex) {
- log.error(local_addr + ": failed relaying message from " + original_sender + " via route " + route, ex);
- }
- }
- }
- }
-
- // todo: use ComptableFutures and possibly thenRunAsync() to parallelize (e.g.) routing and local delivery
+ // todo: use CompletableFutures and possibly thenRunAsync() to parallelize (e.g.) routing and local delivery
protected Object routeThen(Message msg, List sites, Supplier