diff --git a/src/org/jgroups/demos/RelayDemo.java b/src/org/jgroups/demos/RelayDemo.java index 879745ab0d..d449097845 100644 --- a/src/org/jgroups/demos/RelayDemo.java +++ b/src/org/jgroups/demos/RelayDemo.java @@ -138,6 +138,14 @@ protected boolean process(String line) { System.out.printf("configured sites: %s\n", relay.getSites()); return true; } + if(line.startsWith("tp")) { // topo-print + System.out.printf("\n%s\n", relay.topo().print()); + return true; + } + if(line.startsWith("tc")) { // topo-clean + relay.topo().removeAll(null); + return true; + } if(line.startsWith("topo")) { String sub=line.substring("topo".length()).trim(); String site=null; diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index 0e801842e0..bb906434f2 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -1,11 +1,13 @@ package org.jgroups.protocols.relay; import org.jgroups.*; +import org.jgroups.Message.Flag; import org.jgroups.annotations.*; import org.jgroups.conf.AttributeType; import org.jgroups.conf.ConfiguratorFactory; import org.jgroups.conf.XmlNode; import org.jgroups.protocols.relay.SiteAddress.Type; +import org.jgroups.protocols.relay.SiteStatus.Status; import org.jgroups.protocols.relay.Topology.MemberInfo; import org.jgroups.protocols.relay.Topology.Members; import org.jgroups.protocols.relay.config.RelayConfig; @@ -24,8 +26,7 @@ import static org.jgroups.protocols.relay.Relay2Header.*; -// todo: instead of sending one TOPO-RSP for *each* member, send one TOPO-RSP for *all* members -> 1 instead of N msgs -// todo: check if copy is needed in reoute(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not) +// todo: check if copy is needed in route(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not) /** * Provides relaying of messages between autonomous sites.
@@ -40,7 +41,7 @@ @MBean(description="RELAY2 protocol") public class RELAY2 extends Protocol { // reserved flags - public static final short can_become_site_master_flag = 1 << 1; + public static final short can_become_site_master_flag = 1 << 1; /* ------------------------------------------ Properties ---------------------------------------------- */ @Property(description="Name of the site; must be defined in the configuration",writable=false) @@ -79,14 +80,10 @@ public class RELAY2 extends Protocol { @Property(description="Fully qualified name of a class implementing SiteMasterPicker") protected String site_master_picker_impl; - @Property(description="Time during which identical errors about no route to host will be suppressed. " + "0 disables this (every error will be logged).",type=AttributeType.TIME) protected long suppress_time_no_route_errors=60000; - - /* --------------------------------------------- Fields ------------------------------------------------ */ - /** A map containing site names (e.g. "LON") as keys and SiteConfigs as values */ protected final Map sites=new HashMap<>(); @@ -130,7 +127,7 @@ public class RELAY2 extends Protocol { // maintained by site masters (relayer != null) // todo: replace with topo once JGRP-2706 is in place @ManagedAttribute(description="A cache maintaining a list of sites that are up") - protected final Set site_cache=new HashSet<>(); + protected final SiteStatus site_status=new SiteStatus(); /** Number of messages forwarded to the local SiteMaster */ protected final LongAdder forward_to_site_master=new LongAdder(); @@ -149,13 +146,14 @@ public class RELAY2 extends Protocol { protected final LongAdder forward_to_local_mbr_time=new LongAdder(); @Component(description="Maintains a cache of sites and members",name="topo") - protected Topology topo=new Topology(this); + protected final Topology topo=new Topology(this); /** Log to suppress identical errors for messages to non-existing sites ('no route to site X') */ protected SuppressLog suppress_log_no_route; // Fluent configuration public RELAY2 site(String site_name) {site=site_name; return this;} + public SiteStatus siteStatus() {return site_status;} public RELAY2 config(String cfg) {config=cfg; return this;} public RELAY2 canBecomeSiteMaster(boolean flag) {can_become_site_master=flag; return this;} @Deprecated @@ -198,7 +196,7 @@ public class RELAY2 extends Protocol { public RELAY2 broadcastRouteNotifications(boolean b) {this.broadcast_route_notifications=b; return this;} public boolean canForwardLocalCluster() {return false;} - public RELAY2 canForwardLocalCluster(boolean c) {return this;} + public RELAY2 canForwardLocalCluster(boolean ignored){return this;} public long getTopoWaitTime() {return topo_wait_time;} public RELAY2 setTopoWaitTime(long t) {this.topo_wait_time=t; return this;} @@ -211,7 +209,7 @@ public class RELAY2 extends Protocol { public RELAY2 setSiteMasterListener(Consumer l) {site_master_listener=l; return this;} @ManagedAttribute(description="Number of messages forwarded to the local SiteMaster") - public long getNumForwardedToSiteMaster() {return forward_to_site_master.sum();} + public long getNumForwardedToSiteMaster() {return forward_to_site_master.sum();} @ManagedAttribute(description="The total time (in ms) spent forwarding messages to the local SiteMaster" ,type=AttributeType.TIME) @@ -221,8 +219,6 @@ public class RELAY2 extends Protocol { public long getAvgMsgsForwardingToSM() {return getTimeForwardingToSM() > 0? (long)(getNumForwardedToSiteMaster() / (getTimeForwardingToSM()/1000.0)) : 0;} - - @ManagedAttribute(description="Number of messages sent by this SiteMaster to a remote SiteMaster") public long getNumRelayed() {return relayed.sum();} @@ -353,7 +349,7 @@ else if(site_masters_ratio > 1) { public void stop() { super.stop(); is_site_master=false; - log.trace(local_addr + ": ceased to be site master; closing bridges"); + log.trace("%s: ceased to be site master; closing bridges", local_addr); if(relayer != null) relayer.stop(); } @@ -433,57 +429,11 @@ public Object down(Event evt) { return down_prot.down(evt); } - public Object down(Message msg) { msg.src(local_addr); return process(true, msg); - - /*Address dest=msg.getDest(); - SiteAddress target=(SiteAddress)dest; - Address src=msg.getSrc(); - SiteAddress sender=src instanceof SiteMaster? new SiteMaster(((SiteMaster)src).getSite()) - : new SiteUUID((UUID)local_addr, NameCache.get(local_addr), site); - if(local_addr instanceof ExtendedUUID) - ((ExtendedUUID)sender).addContents((ExtendedUUID)local_addr); - - if(target instanceof SiteMaster) { - if(!is_site_master) - sendToLocalSiteMaster(sender, msg); - else { - if(target.getSite() == null) { // send to *all* site masters - msg.setSrc(local_addr); - sendToBridges(msg); - } - if(local_addr.equals(target) || (target instanceof SiteMaster && is_site_master)) { - // we cannot simply pass msg down, as the transport doesn't know how to send a message to a (e.g.) SiteMaster - deliver(local_addr, msg); - } - } - return null; - } - - // target is in the same site; we can deliver the message in our local cluster - if(site.equals(target.getSite()) || target.getSite() == null) { - // we're the target or we're the site master and need to forward the message to a member of the local cluster - if(local_addr.equals(target) || (target instanceof SiteMaster && is_site_master)) { - // we cannot simply pass msg down, as the transport doesn't know how to send a message to a (e.g.) SiteMaster - deliver(local_addr, msg); - } - else // forward to another member of the local cluster - deliverLocally(target, sender, msg); - return null; - } - - // forward to the site master unless we're the site master (then route the message directly) - if(!is_site_master) - sendToLocalSiteMaster(sender, msg); - else - route(target, sender, msg); - return null; */ } - - public Object up(Event evt) { if(evt.getType() == Event.VIEW_CHANGE) handleView(evt.getArg()); @@ -493,29 +443,6 @@ public Object up(Event evt) { public Object up(Message msg) { Message copy=msg; Relay2Header hdr=msg.getHeader(id); - Address dest=msg.getDest(), - sender=hdr != null && hdr.original_sender != null? hdr.original_sender : msg.src(); - - // forward a multicast message to all bridges except myself, then pass up - /*if(dest == null && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY)) - sendToBridges(msg); - - if(hdr == null) { - TopoHeader topo_hdr=msg.getHeader(TOPO_ID); - if(topo_hdr != null) { - handleTopo(topo_hdr, sender, msg); - return null; - } - deliver(dest, sender, msg); // fixes https://issues.redhat.com/browse/JGRP-2710 - } - else { - if(handleAdminMessage(hdr)) - return null; - if(dest != null) - handleMessage(hdr, msg); - else - deliver(dest, sender, msg); - }*/ if(hdr != null) { if(hdr.getType() == SITE_UNREACHABLE) { triggerSiteUnreachableEvent((SiteAddress)hdr.final_dest); @@ -523,7 +450,6 @@ public Object up(Message msg) { } //todo: check if copy is needed! copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr); - // msg.dest(hdr.final_dest).src(hdr.original_sender); // message can be changed as it is delivered (no xmits) } return process(false, copy); } @@ -533,9 +459,6 @@ public void up(MessageBatch batch) { for(Iterator it=batch.iterator(); it.hasNext();) { Message msg=it.next(), copy=msg; Relay2Header hdr=msg.getHeader(id); - Address dest=msg.getDest(), - sender=hdr != null && hdr.original_sender != null? hdr.original_sender : batch.sender(); - it.remove(); if(hdr != null) { if(hdr.getType() == SITE_UNREACHABLE) { @@ -549,44 +472,8 @@ public void up(MessageBatch batch) { continue; } copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr); - // msg.dest(hdr.final_dest).src(hdr.original_sender); // message can be changed as it is delivered (no xmits) } process(false, copy); - - // forward a multicast message to all bridges except myself, then pass up - /*if((dest == null || dest instanceof SiteMaster && ((SiteMaster)dest).getSite() == null) - && is_site_master && !msg.isFlagSet(Message.Flag.NO_RELAY)) - sendToBridges(msg);*/ - - /* if(dest == null) - route(msg, null); - if(hdr == null) { - TopoHeader topo_hdr=msg.getHeader(TOPO_ID); - if(topo_hdr != null) { - handleTopo(topo_hdr, sender, msg); - it.remove(); - } - } - else { - it.remove(); // message is consumed - if(handleAdminMessage(hdr)) - continue; - if(dest != null) { - if(hdr.getType() == SITE_UNREACHABLE) { - SiteAddress site_addr=(SiteAddress)hdr.final_dest; - String site_name=site_addr.getSite(); - if(unreachable_sites == null) - unreachable_sites=new ArrayList<>(); - boolean contains=unreachable_sites.stream().anyMatch(sa -> sa.getSite().equals(site_name)); - if(!contains) - unreachable_sites.add(site_addr); - } - else - handleMessage(hdr, msg); - } - else - deliver(null, hdr.original_sender, msg); //todo: replace in batch rather than pass up each msg - }*/ } if(unreachable_sites != null) { for(SiteAddress sa: unreachable_sites) @@ -660,20 +547,10 @@ public void handleView(View view) { } - /** Called to handle a message received by the relayer */ + /** Called to handle a message received from a different site (via a bridge channel) */ protected void handleRelayMessage(Relay2Header hdr, Message msg) { - //if(hdr.final_dest != null) - // handleMessage(hdr, msg); - // else { - // Message copy=copy(msg).setDest(null).setSrc(null).putHeader(id, hdr); - // down_prot.down(copy); // multicast locally - // } - - Message copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender) - .putHeader(id, hdr); - + Message copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender).putHeader(id, hdr); // todo: check if copy is needed! - // msg.dest(hdr.final_dest).src(hdr.original_sender); // message can be changed as it is delivered (no xmits) process(true, copy); } @@ -682,25 +559,21 @@ protected boolean handleAdminMessage(Relay2Header hdr, Message msg) { switch(hdr.type) { case SITES_UP: case SITES_DOWN: - Set tmp_sites=hdr.getSites(); + Set tmp_sites=new HashSet<>(); + if(hdr.hasSites()) + tmp_sites.addAll(hdr.getSites()); tmp_sites.remove(this.site); - if(tmp_sites != null) { - if(hdr.type == SITES_UP) { - tmp_sites.removeAll(site_cache); - site_cache.addAll(tmp_sites); - } - else { // SITES_DOWN + if(tmp_sites != null && !tmp_sites.isEmpty()) { + Status status=hdr.type == SITES_UP? Status.up : Status.down; + Set tmp=site_status.add(tmp_sites, status); + if(status == Status.down) topo.removeAll(tmp_sites); - tmp_sites.retainAll(site_cache); - site_cache.removeAll(tmp_sites); - } - - if(route_status_listener != null && !tmp_sites.isEmpty()) { - String[] tmp=tmp_sites.toArray(new String[]{}); + if(route_status_listener != null && !tmp.isEmpty()) { + String[] t=tmp.toArray(new String[]{}); if(hdr.type == SITES_UP) - route_status_listener.sitesUp(tmp); + route_status_listener.sitesUp(t); else - route_status_listener.sitesDown(tmp); + route_status_listener.sitesDown(t); } } return true; @@ -709,113 +582,17 @@ protected boolean handleAdminMessage(Relay2Header hdr, Message msg) { return true; case TOPO_RSP: Members mbrs=msg.getObject(); - topo.handleResponse(mbrs); + if(mbrs != null) + topo.handleResponse(mbrs); return true; } return false; } - /** Called to handle a message received by the transport */ - protected void handleMessage(Relay2Header hdr, Message msg) { - switch(hdr.type) { - case DATA: - route((SiteAddress)hdr.final_dest, (SiteAddress)hdr.original_sender, msg); - break; - case SITE_UNREACHABLE: - String unreachable_site=hdr.sites != null && !hdr.sites.isEmpty()? hdr.sites.iterator().next() : null; - if(unreachable_site != null) - triggerSiteUnreachableEvent(new SiteMaster(unreachable_site)); - break; - default: - log.error("type " + hdr.type + " unknown"); - break; - } - } - - - /** - * Routes the message to the target destination, used by a site master (coordinator) - * - * @param dest the destination site address - * @param sender the address of the sender - * @param msg The message - */ - protected void route(SiteAddress dest, SiteAddress sender, Message msg) { - String target_site=dest.getSite(); - if(site.equals(target_site) || target_site == null) { - if(local_addr.equals(dest) || is_site_master && dest instanceof SiteMaster) - deliver(dest, sender, msg); - else - deliverLocally(dest, sender, msg); // send to member in same local site - return; - } - Relayer tmp=relayer; - if(tmp == null) { - log.warn(local_addr + ": not site master; dropping message"); - return; - } - - Route route=tmp.getRoute(target_site, sender); - if(route == null) - route=tmp.getForwardingRouteMatching(target_site, sender); - if(route == null) { - suppress_log_no_route.log(SuppressLog.Level.error, target_site, suppress_time_no_route_errors, sender, target_site); - sendSiteUnreachableTo(msg.getSrc(), target_site); - } - else - route.send(dest,sender,msg); - } - - - /** Sends the message to all sites in the routing table, minus the local site */ - protected void sendToBridges(final Message msg) { - Relayer tmp=relayer; - Map> routes=tmp != null? tmp.routes : null; - if(routes == null || routes.isEmpty()) - return; - Address src=msg.getSrc(); - Relay2Header hdr=msg.getHeader(this.id); - Address original_sender=hdr != null && hdr.original_sender != null? hdr.getOriginalSender() : - new SiteUUID((UUID)src, NameCache.get(src), site); - if(src instanceof ExtendedUUID) - ((ExtendedUUID)original_sender).addContents((ExtendedUUID)src); - - Set visited_sites=new HashSet<>(routes.keySet()), // to be added to the header - sites_to_visit=new HashSet<>(routes.keySet()); // sites to which to forward the message - - if(this.site != null) { - visited_sites.add(this.site); - sites_to_visit.remove(this.site); // don't send to the local site - } - - if(hdr != null && hdr.hasVisitedSites()) { - visited_sites.addAll(hdr.getVisitedSites()); - sites_to_visit.removeAll(hdr.getVisitedSites()); // avoid cycles (https://issues.redhat.com/browse/JGRP-1519) - } - - for(String dest_site: sites_to_visit) { - List val=routes.get(dest_site); - if(val == null) - continue; - // try sending over all routes; break after the first successful send - for(Route route: val) { - if(log.isTraceEnabled()) - log.trace(local_addr + ": relaying multicast message from " + original_sender + " via route " + route); - try { - route.send(msg.dest(), original_sender, msg, visited_sites); - break; - } - catch(Exception ex) { - log.error(local_addr + ": failed relaying message from " + original_sender + " via route " + route, ex); - } - } - } - } - - // todo: use ComptableFutures and possibly thenRunAsync() to parallelize (e.g.) routing and local delivery + // todo: use CompletableFutures and possibly thenRunAsync() to parallelize (e.g.) routing and local delivery protected Object routeThen(Message msg, List sites, Supplier action) { - if(!msg.isFlagSet(Message.Flag.NO_RELAY)) + if(!msg.isFlagSet(Flag.NO_RELAY)) route(msg, sites); return action != null? action.get() : null; } @@ -880,8 +657,8 @@ protected Object process(boolean down, Message msg) { /** * Sends a message to the given sites, or all sites (excluding the local site) * @param msg The message to be sent - * @param sites The sites to send the message to. If null, msg will be sent to all sites listed in the routing table, - * excepting the local site + * @param sites The sites to send the message to. If null, msg will be sent to all sites listed in the + * routing table, excepting the local site */ protected Object route(Message msg, Collection sites) { // boolean skip_null_routes=sites != null; @@ -913,8 +690,6 @@ protected Object route(Message msg, Collection sites) { for(String s: sites) { Route route=r.getRoute(s, sender); if(route == null) { - //if(skip_null_routes) - // continue; route=r.getForwardingRouteMatching(s, sender); } if(route == null) { @@ -930,7 +705,7 @@ protected Object route(Message msg, Collection sites) { /** * Sends the message to a local destination. * @param next_dest The destination. If null, the message will be delivered to all members of the local cluster. In - * this case, flag {@link org.jgroups.Message.Flag#NO_RELAY} will be set, so that the resulting + * this case, flag {@link Flag#NO_RELAY} will be set, so that the resulting * multicast is not forwarded to other sites. * @param msg The message to deliver */ @@ -940,14 +715,13 @@ protected Object deliver(Address next_dest, Message msg, boolean dont_relay) { if(log.isTraceEnabled()) log.trace(local_addr + ": forwarding message to final destination " + final_dest + " to " + next_dest); - Relay2Header hdr=msg.getHeader(this.id); - if(hdr != null) - hdr.setOriginalSender(original_sender).setFinalDestination(final_dest); - else - hdr=new Relay2Header(DATA, final_dest, original_sender); + Relay2Header tmp=msg.getHeader(this.id); + // todo: check if copy is needed here + Relay2Header hdr=tmp != null? tmp.copy().setOriginalSender(original_sender).setFinalDestination(final_dest) + : new Relay2Header(DATA, final_dest, original_sender); Message copy=copy(msg).setDest(next_dest).setSrc(null).putHeader(id, hdr); if(dont_relay) - copy.setFlag(Message.Flag.NO_RELAY); + copy.setFlag(Flag.NO_RELAY); return down_prot.down(copy); } @@ -991,13 +765,6 @@ protected Address checkLocalAddress(Address dest) { return dest; } - protected boolean sameSite(Address addr) { - if(addr == null) - return true; - String dest_site=((SiteAddress)addr).getSite(); - return dest_site == null || this.site.equals(dest_site); - } - protected boolean sameSite(SiteAddress addr) { if(addr == null) return true; @@ -1019,57 +786,17 @@ protected void sendSiteUnreachableTo(Address src, String target_site) { return; } // send message back to the src node. - Message msg=new EmptyMessage(src).setFlag(Message.Flag.OOB) + Message msg=new EmptyMessage(src).setFlag(Flag.OOB) .putHeader(id, new Relay2Header(SITE_UNREACHABLE).addToSites(target_site)); down(msg); } - - protected void deliverLocally(SiteAddress dest, SiteAddress sender, Message msg) { - Address local_dest; - if(dest instanceof SiteUUID) { - if(dest instanceof SiteMaster) { - local_dest=pickSiteMaster(sender); - if(local_dest == null) - throw new IllegalStateException("site master was null"); - } - else { - SiteUUID tmp=(SiteUUID)dest; - local_dest=new UUID(tmp.getMostSignificantBits(), tmp.getLeastSignificantBits()); - } - } - else - local_dest=dest; - - if(log.isTraceEnabled()) - log.trace(local_addr + ": delivering message to " + dest + " in local cluster"); - long start=stats? System.nanoTime() : 0; - deliver(local_dest, msg, false); - if(stats) { - forward_to_local_mbr_time.add(System.nanoTime() - start); - forward_to_local_mbr.increment(); - } - } - - - protected void deliver(Address dest, Address sender, final Message msg) { - try { - Message copy=copy(msg).setDest(dest).setSrc(sender); - if(log.isTraceEnabled()) - log.trace(local_addr + ": delivering message from " + sender); - up_prot.up(copy); - } - catch(Exception e) { - log.error(Util.getMessage("FailedDeliveringMessage"), e); - } - } - protected void sitesChange(boolean down, Set sites) { if(!broadcast_route_notifications || sites == null || sites.isEmpty()) return; Relay2Header hdr=new Relay2Header(down? SITES_DOWN : SITES_UP, null, null) .addToSites(sites); - down_prot.down(new EmptyMessage(null).putHeader(id, hdr)); // .setFlag(Message.Flag.NO_RELAY)); + down_prot.down(new EmptyMessage(null).putHeader(id, hdr)); } /** Copies the message, but only the headers above the current protocol (RELAY) (or RpcDispatcher related headers) */ @@ -1078,7 +805,6 @@ protected Message copy(Message msg) { } - protected void startRelayer(Relayer rel, String bridge_name) { try { log.trace(local_addr + ": became site master; starting bridges"); diff --git a/src/org/jgroups/protocols/relay/Relay2Header.java b/src/org/jgroups/protocols/relay/Relay2Header.java index cc06519093..894a47e2d5 100644 --- a/src/org/jgroups/protocols/relay/Relay2Header.java +++ b/src/org/jgroups/protocols/relay/Relay2Header.java @@ -49,6 +49,7 @@ public Relay2Header(byte type, Address final_dest, Address original_sender) { this.final_dest=final_dest; this.original_sender=original_sender; } + public short getMagicId() {return 80;} public Supplier create() {return Relay2Header::new;} public byte getType() {return type;} @@ -56,7 +57,8 @@ public Relay2Header(byte type, Address final_dest, Address original_sender) { public Relay2Header setFinalDestination(Address d) {final_dest=d; return this;} public Address getOriginalSender() {return original_sender;} public Relay2Header setOriginalSender(Address s) {original_sender=s; return this;} - public Set getSites() {return sites;} + public Set getSites() {return sites != null? new HashSet<>(sites) : null;} + public boolean hasSites() {return sites != null && !sites.isEmpty();} public Relay2Header addToSites(Collection s) { if(s != null) { @@ -64,6 +66,7 @@ public Relay2Header addToSites(Collection s) { this.sites=new HashSet<>(s.size()); this.sites.addAll(s); } + assertNonNullSites(); return this; } @@ -73,6 +76,7 @@ public Relay2Header addToSites(String ... s) { this.sites=new HashSet<>(); this.sites.addAll(Arrays.asList(s)); } + assertNonNullSites(); return this; } @@ -96,14 +100,16 @@ public Relay2Header addToVisitedSites(Collection list) { public Relay2Header copy() { Relay2Header hdr=new Relay2Header(type, final_dest, original_sender) - .addToSites(this.sites); - if(visited_sites != null) - hdr.addToVisitedSites(visited_sites); + .addToSites(this.sites) + .addToVisitedSites(visited_sites); + assertNonNullSites(); + hdr.assertNonNullSites(); return hdr; } @Override public int serializedSize() { + assertNonNullSites(); return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender) + sizeOf(sites) + sizeOf(visited_sites); } @@ -123,6 +129,7 @@ public void writeTo(DataOutput out) throws IOException { for(String s: visited_sites) Bits.writeString(s, out); } + assertNonNullSites(); } @Override @@ -132,16 +139,17 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { original_sender=Util.readAddress(in); int num_elements=in.readInt(); if(num_elements > 0) { - sites=new HashSet<>(); + sites=new HashSet<>(num_elements); for(int i=0; i < num_elements; i++) sites.add(Bits.readString(in)); } num_elements=in.readInt(); if(num_elements > 0) { - visited_sites=new HashSet<>(); + visited_sites=new HashSet<>(num_elements); for(int i=0; i < num_elements; i++) visited_sites.add(Bits.readString(in)); } + assertNonNullSites(); } public String toString() { @@ -172,4 +180,12 @@ protected static int sizeOf(Collection list) { } return retval; } + + protected void assertNonNullSites() { + if(type == SITES_UP || type == SITES_DOWN) { + if(sites == null) + throw new IllegalStateException(String.format("sites cannot be null with type %s\n", type)); + } + } + } \ No newline at end of file diff --git a/src/org/jgroups/protocols/relay/SiteStatus.java b/src/org/jgroups/protocols/relay/SiteStatus.java new file mode 100644 index 0000000000..c9a81aa79d --- /dev/null +++ b/src/org/jgroups/protocols/relay/SiteStatus.java @@ -0,0 +1,61 @@ +package org.jgroups.protocols.relay; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Maintains the status of sites (up, down, undefined) + * @author Bela Ban + * @since 5.2.17 + */ +public class SiteStatus { + public enum Status {up,down}; + protected final Map sites=new HashMap<>(); + + /** + * Adds a set of sites to the cache. Returns a set of sites for which notifications should be emitted. For each + * site S, the following happens: + *
+     *  - S is not present: add a new entry with the given status for S and add S to the return value
+     *  - S is present: if S != status: change the status and add S to the return value, else no-op
+     * 
+ * @param sites + * @param status + * @return + */ + public synchronized Set add(Set sites, Status status) { + Set retval=new HashSet<>(); + for(String site: sites) { + Status s=this.sites.get(site); + if(s == null) { + this.sites.put(site, status); + retval.add(site); + } + else { + if(s != status) { + this.sites.put(site, status); + retval.add(site); + } + } + } + return retval; + } + + public synchronized Status get(String site) { + return this.sites.get(site); + } + + public synchronized SiteStatus clear() { + sites.clear(); + return this; + } + + public String toString() { + return sites.entrySet().stream() + .map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n")); + } +} diff --git a/src/org/jgroups/protocols/relay/Topology.java b/src/org/jgroups/protocols/relay/Topology.java index 1e5ab8957d..e1ccd997a0 100644 --- a/src/org/jgroups/protocols/relay/Topology.java +++ b/src/org/jgroups/protocols/relay/Topology.java @@ -27,7 +27,7 @@ public class Topology { protected final RELAY2 relay; protected final Map> cache=new ConcurrentHashMap<>(); // cache of sites and members - protected BiConsumer rsp_handler; + protected BiConsumer rsp_handler; public Topology(RELAY2 relay) { @@ -38,9 +38,10 @@ public Topology(RELAY2 relay) { /** * Sets a response handler - * @param c The response handler. Arguments are the site and MemberInfo of the member from which we received the rsp + * @param c The response handler. Arguments are the site and {@link Members} + * ({@link MemberInfo} of joined and left members) */ - public Topology setResponseHandler(BiConsumer c) {rsp_handler=c; return this;} + public Topology setResponseHandler(BiConsumer c) {rsp_handler=c; return this;} @ManagedOperation(description="Fetches information (site, address, IP address) from all members") @@ -118,20 +119,14 @@ protected void handleResponse(Members rsp) { Set infos=cache.computeIfAbsent(site,s -> new ConcurrentSkipListSet<>()); if(rsp.joined != null) { infos.addAll(rsp.joined); - if(rsp_handler != null) - for(MemberInfo mi: rsp.joined) - rsp_handler.accept(site, mi); } - if(rsp.left != null) { // todo: implement } + if(rsp_handler != null) + rsp_handler.accept(site, rsp); } - @FunctionalInterface - public interface ResponseHandler { - void handle(String site, MemberInfo rsp); - } /** Contains information about joined and left members for a given site */ public static class Members implements SizeStreamable { diff --git a/tests/junit-functional/org/jgroups/tests/Relay2RpcDispatcherTest.java b/tests/junit-functional/org/jgroups/tests/Relay2RpcDispatcherTest.java index 89873f3550..a9c2da4b35 100644 --- a/tests/junit-functional/org/jgroups/tests/Relay2RpcDispatcherTest.java +++ b/tests/junit-functional/org/jgroups/tests/Relay2RpcDispatcherTest.java @@ -191,7 +191,7 @@ public static long sleep(long timeout) { protected RELAY2 createRELAY2(String site_name) { - RELAY2 relay=new RELAY2().site(site_name).enableAddressTagging(false).asyncRelayCreation(true); + RELAY2 relay=new RELAY2().site(site_name).asyncRelayCreation(true); RelayConfig.SiteConfig lon_cfg=new RelayConfig.SiteConfig(LON), sfo_cfg=new RelayConfig.SiteConfig(SFO); diff --git a/tests/junit-functional/org/jgroups/tests/Relay2Test.java b/tests/junit-functional/org/jgroups/tests/Relay2Test.java index 76d29af750..bd06b21a8a 100644 --- a/tests/junit-functional/org/jgroups/tests/Relay2Test.java +++ b/tests/junit-functional/org/jgroups/tests/Relay2Test.java @@ -385,7 +385,11 @@ public void testSitesUp() throws Exception { JChannel _g=createNode(SFO, "G", BRIDGE_CLUSTER, LON, NYC, SFO); JChannel _h=createNode(SFO, "H", BRIDGE_CLUSTER, LON, NYC, SFO); - JChannel _i=createNode(SFO, "I", BRIDGE_CLUSTER, LON, NYC, SFO);) { + JChannel _i=createNode(SFO, "I", BRIDGE_CLUSTER, LON, NYC, SFO)) { + + Util.waitUntilAllChannelsHaveSameView(5000, 100, _a,_b,_c); + Util.waitUntilAllChannelsHaveSameView(5000, 100, _d,_e,_f); + Util.waitUntilAllChannelsHaveSameView(5000, 100, _g,_h,_i); waitUntilRoute(NYC, true, 5000, 500, _a); waitUntilRoute(SFO, true, 5000, 500, _a); @@ -399,19 +403,17 @@ public void testSitesUp() throws Exception { assert Stream.of(_a,_d,_g).map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .allMatch(RELAY2::isSiteMaster); - Stream.of(_b,_c,_d,_e,_f,_g,_h,_i) + Stream.of(_d,_e,_f,_g,_h,_i) .map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) - .forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(true))); + .forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(false))); // now stop A; B will become new site master and we should get a site-down(NYC), then site-up(NYC) Util.close(_a); - - Util.waitUntil(5000, 500, () -> Stream.of(_b,_c,_d, _e, _f, _g, _h, _i) + Util.waitUntil(5000, 500, () -> Stream.of(_d,_e,_f,_g,_h,_i) .map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class)) .peek(r -> System.out.printf("%s: %s\n", r.getAddress(), r.getRouteStatusListener())) .map(r -> (MyRouteStatusListener)r.getRouteStatusListener()) - .allMatch(l -> l.down().size() == 1 && l.down().contains(NYC) - && l.up().size() == 1 && l.up().contains(NYC))); + .allMatch(l -> l.down().contains(LON) && l.up().contains(LON))); } } diff --git a/tests/junit-functional/org/jgroups/tests/SiteStatusTest.java b/tests/junit-functional/org/jgroups/tests/SiteStatusTest.java new file mode 100644 index 0000000000..d8fa6d4143 --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/SiteStatusTest.java @@ -0,0 +1,46 @@ +package org.jgroups.tests; + +import org.jgroups.Global; +import org.jgroups.protocols.relay.SiteStatus; +import org.jgroups.protocols.relay.SiteStatus.Status; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Set; + +/** + * Tests {@link org.jgroups.protocols.relay.SiteStatus} + * @author Bela Ban + * @since 5.2.17 + */ +@Test(groups= Global.FUNCTIONAL) +public class SiteStatusTest { + public void testSiteStatus() { + SiteStatus s=new SiteStatus(); + Set res=s.add(Set.of("net1"), Status.up); + assert res.size() == 1 && res.contains("net1"); + Status status=s.get("net1"); + assert status == Status.up; + res=s.add(Set.of("net1"), Status.up); + assert res.isEmpty(); + res=s.add(Set.of("net1"), Status.down); + assert res.size() == 1 && res.contains("net1"); + status=s.get("net1"); + assert status == Status.down; + + status=s.get("hf"); + assert status == null; + } + + public void testSiteStatus2() { + SiteStatus s=new SiteStatus(); + Set retval=s.add(Set.of("hf","net1","net2"),Status.down); + assert retval.size() == 3; + for(String st: Arrays.asList("hf", "net1", "net2")) + assert s.get(st) == Status.down; + retval=s.add(Set.of("net1"), Status.up); + assert retval.size() == 1; + retval=s.add(Set.of("net1", "net2"), Status.up); + assert retval.size() == 1 && retval.contains("net2"); + } +}