Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 19, 2023
1 parent 0b9e3ac commit 313cf99
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
8 changes: 8 additions & 0 deletions src/org/jgroups/demos/RelayDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ protected boolean process(String line) {
System.out.printf("configured sites: %s\n", relay.getSites());
return true;
}
if(line.startsWith("tp")) { // topo-print
System.out.printf("\n%s\n", relay.topo().print());
return true;
}
if(line.startsWith("tc")) { // topo-clean
relay.topo().removeAll(null);
return true;
}
if(line.startsWith("topo")) {
String sub=line.substring("topo".length()).trim();
String site=null;
Expand Down
11 changes: 4 additions & 7 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
@MBean(description="RELAY2 protocol")
public class RELAY2 extends Protocol {
// reserved flags
public static final short can_become_site_master_flag = 1 << 1;
public static final short can_become_site_master_flag = 1 << 1;

/* ------------------------------------------ Properties ---------------------------------------------- */
@Property(description="Name of the site; must be defined in the configuration",writable=false)
Expand Down Expand Up @@ -79,14 +79,10 @@ public class RELAY2 extends Protocol {
@Property(description="Fully qualified name of a class implementing SiteMasterPicker")
protected String site_master_picker_impl;


@Property(description="Time during which identical errors about no route to host will be suppressed. " +
"0 disables this (every error will be logged).",type=AttributeType.TIME)
protected long suppress_time_no_route_errors=60000;


/* --------------------------------------------- Fields ------------------------------------------------ */

/** A map containing site names (e.g. "LON") as keys and SiteConfigs as values */
protected final Map<String,RelayConfig.SiteConfig> sites=new HashMap<>();

Expand Down Expand Up @@ -145,7 +141,7 @@ public class RELAY2 extends Protocol {
protected final LongAdder forward_to_local_mbr_time=new LongAdder();

@Component(description="Maintains a cache of sites and members",name="topo")
protected Topology topo=new Topology(this);
protected final Topology topo=new Topology(this);

/** Log to suppress identical errors for messages to non-existing sites ('no route to site X') */
protected SuppressLog<String> suppress_log_no_route;
Expand Down Expand Up @@ -581,7 +577,8 @@ protected boolean handleAdminMessage(Relay2Header hdr, Message msg) {
return true;
case TOPO_RSP:
Members mbrs=msg.getObject();
topo.handleResponse(mbrs);
if(mbrs != null)
topo.handleResponse(mbrs);
return true;
}
return false;
Expand Down
17 changes: 6 additions & 11 deletions src/org/jgroups/protocols/relay/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class Topology {
protected final RELAY2 relay;
protected final Map<String,Set<MemberInfo>> cache=new ConcurrentHashMap<>(); // cache of sites and members
protected BiConsumer<String,MemberInfo> rsp_handler;
protected BiConsumer<String,Members> rsp_handler;


public Topology(RELAY2 relay) {
Expand All @@ -38,9 +38,10 @@ public Topology(RELAY2 relay) {

/**
* Sets a response handler
* @param c The response handler. Arguments are the site and MemberInfo of the member from which we received the rsp
* @param c The response handler. Arguments are the site and {@link Members}
* ({@link MemberInfo} of joined and left members)
*/
public Topology setResponseHandler(BiConsumer<String,MemberInfo> c) {rsp_handler=c; return this;}
public Topology setResponseHandler(BiConsumer<String,Members> c) {rsp_handler=c; return this;}


@ManagedOperation(description="Fetches information (site, address, IP address) from all members")
Expand Down Expand Up @@ -118,20 +119,14 @@ protected void handleResponse(Members rsp) {
Set<MemberInfo> infos=cache.computeIfAbsent(site,s -> new ConcurrentSkipListSet<>());
if(rsp.joined != null) {
infos.addAll(rsp.joined);
if(rsp_handler != null)
for(MemberInfo mi: rsp.joined)
rsp_handler.accept(site, mi);
}

if(rsp.left != null) {
// todo: implement
}
if(rsp_handler != null)
rsp_handler.accept(site, rsp);
}

@FunctionalInterface
public interface ResponseHandler {
void handle(String site, MemberInfo rsp);
}

/** Contains information about joined and left members for a given site */
public static class Members implements SizeStreamable {
Expand Down

0 comments on commit 313cf99

Please sign in to comment.