Skip to content

Commit

Permalink
- Removed unused methods
Browse files Browse the repository at this point in the history
- Fixed sitesUp()/sitesDown() notifications (https://issues.redhat.com/browse/JGRP-2712)
  • Loading branch information
belaban committed Jul 20, 2023
1 parent 5e8c440 commit 780022d
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 336 deletions.
8 changes: 8 additions & 0 deletions src/org/jgroups/demos/RelayDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ protected boolean process(String line) {
System.out.printf("configured sites: %s\n", relay.getSites());
return true;
}
if(line.startsWith("tp")) { // topo-print
System.out.printf("\n%s\n", relay.topo().print());
return true;
}
if(line.startsWith("tc")) { // topo-clean
relay.topo().removeAll(null);
return true;
}
if(line.startsWith("topo")) {
String sub=line.substring("topo".length()).trim();
String site=null;
Expand Down
348 changes: 37 additions & 311 deletions src/org/jgroups/protocols/relay/RELAY2.java

Large diffs are not rendered by default.

28 changes: 22 additions & 6 deletions src/org/jgroups/protocols/relay/Relay2Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,24 @@ public Relay2Header(byte type, Address final_dest, Address original_sender) {
this.final_dest=final_dest;
this.original_sender=original_sender;
}

public short getMagicId() {return 80;}
public Supplier<? extends Header> create() {return Relay2Header::new;}
public byte getType() {return type;}
public Address getFinalDest() {return final_dest;}
public Relay2Header setFinalDestination(Address d) {final_dest=d; return this;}
public Address getOriginalSender() {return original_sender;}
public Relay2Header setOriginalSender(Address s) {original_sender=s; return this;}
public Set<String> getSites() {return sites;}
public Set<String> getSites() {return sites != null? new HashSet<>(sites) : null;}
public boolean hasSites() {return sites != null && !sites.isEmpty();}

public Relay2Header addToSites(Collection<String> s) {
if(s != null) {
if(this.sites == null)
this.sites=new HashSet<>(s.size());
this.sites.addAll(s);
}
assertNonNullSites();
return this;
}

Expand All @@ -73,6 +76,7 @@ public Relay2Header addToSites(String ... s) {
this.sites=new HashSet<>();
this.sites.addAll(Arrays.asList(s));
}
assertNonNullSites();
return this;
}

Expand All @@ -96,14 +100,16 @@ public Relay2Header addToVisitedSites(Collection<String> list) {

public Relay2Header copy() {
Relay2Header hdr=new Relay2Header(type, final_dest, original_sender)
.addToSites(this.sites);
if(visited_sites != null)
hdr.addToVisitedSites(visited_sites);
.addToSites(this.sites)
.addToVisitedSites(visited_sites);
assertNonNullSites();
hdr.assertNonNullSites();
return hdr;
}

@Override
public int serializedSize() {
assertNonNullSites();
return Global.BYTE_SIZE + Util.size(final_dest) + Util.size(original_sender) +
sizeOf(sites) + sizeOf(visited_sites);
}
Expand All @@ -123,6 +129,7 @@ public void writeTo(DataOutput out) throws IOException {
for(String s: visited_sites)
Bits.writeString(s, out);
}
assertNonNullSites();
}

@Override
Expand All @@ -132,16 +139,17 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
original_sender=Util.readAddress(in);
int num_elements=in.readInt();
if(num_elements > 0) {
sites=new HashSet<>();
sites=new HashSet<>(num_elements);
for(int i=0; i < num_elements; i++)
sites.add(Bits.readString(in));
}
num_elements=in.readInt();
if(num_elements > 0) {
visited_sites=new HashSet<>();
visited_sites=new HashSet<>(num_elements);
for(int i=0; i < num_elements; i++)
visited_sites.add(Bits.readString(in));
}
assertNonNullSites();
}

public String toString() {
Expand Down Expand Up @@ -172,4 +180,12 @@ protected static int sizeOf(Collection<String> list) {
}
return retval;
}

protected void assertNonNullSites() {
if(type == SITES_UP || type == SITES_DOWN) {
if(sites == null)
throw new IllegalStateException(String.format("sites cannot be null with type %s\n", type));
}
}

}
61 changes: 61 additions & 0 deletions src/org/jgroups/protocols/relay/SiteStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.jgroups.protocols.relay;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Maintains the status of sites (up, down, undefined)
* @author Bela Ban
* @since 5.2.17
*/
public class SiteStatus {
public enum Status {up,down};
protected final Map<String,Status> sites=new HashMap<>();

/**
* Adds a set of sites to the cache. Returns a set of sites for which notifications should be emitted. For each
* site S, the following happens:
* <pre>
* - S is not present: add a new entry with the given status for S and add S to the return value
* - S is present: if S != status: change the status and add S to the return value, else no-op
* </pre>
* @param sites
* @param status
* @return
*/
public synchronized Set<String> add(Set<String> sites, Status status) {
Set<String> retval=new HashSet<>();
for(String site: sites) {
Status s=this.sites.get(site);
if(s == null) {
this.sites.put(site, status);
retval.add(site);
}
else {
if(s != status) {
this.sites.put(site, status);
retval.add(site);
}
}
}
return retval;
}

public synchronized Status get(String site) {
return this.sites.get(site);
}

public synchronized SiteStatus clear() {
sites.clear();
return this;
}

public String toString() {
return sites.entrySet().stream()
.map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n"));
}
}
17 changes: 6 additions & 11 deletions src/org/jgroups/protocols/relay/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class Topology {
protected final RELAY2 relay;
protected final Map<String,Set<MemberInfo>> cache=new ConcurrentHashMap<>(); // cache of sites and members
protected BiConsumer<String,MemberInfo> rsp_handler;
protected BiConsumer<String,Members> rsp_handler;


public Topology(RELAY2 relay) {
Expand All @@ -38,9 +38,10 @@ public Topology(RELAY2 relay) {

/**
* Sets a response handler
* @param c The response handler. Arguments are the site and MemberInfo of the member from which we received the rsp
* @param c The response handler. Arguments are the site and {@link Members}
* ({@link MemberInfo} of joined and left members)
*/
public Topology setResponseHandler(BiConsumer<String,MemberInfo> c) {rsp_handler=c; return this;}
public Topology setResponseHandler(BiConsumer<String,Members> c) {rsp_handler=c; return this;}


@ManagedOperation(description="Fetches information (site, address, IP address) from all members")
Expand Down Expand Up @@ -118,20 +119,14 @@ protected void handleResponse(Members rsp) {
Set<MemberInfo> infos=cache.computeIfAbsent(site,s -> new ConcurrentSkipListSet<>());
if(rsp.joined != null) {
infos.addAll(rsp.joined);
if(rsp_handler != null)
for(MemberInfo mi: rsp.joined)
rsp_handler.accept(site, mi);
}

if(rsp.left != null) {
// todo: implement
}
if(rsp_handler != null)
rsp_handler.accept(site, rsp);
}

@FunctionalInterface
public interface ResponseHandler {
void handle(String site, MemberInfo rsp);
}

/** Contains information about joined and left members for a given site */
public static class Members implements SizeStreamable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public static long sleep(long timeout) {


protected RELAY2 createRELAY2(String site_name) {
RELAY2 relay=new RELAY2().site(site_name).enableAddressTagging(false).asyncRelayCreation(true);
RELAY2 relay=new RELAY2().site(site_name).asyncRelayCreation(true);

RelayConfig.SiteConfig lon_cfg=new RelayConfig.SiteConfig(LON),
sfo_cfg=new RelayConfig.SiteConfig(SFO);
Expand Down
16 changes: 9 additions & 7 deletions tests/junit-functional/org/jgroups/tests/Relay2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ public void testSitesUp() throws Exception {

JChannel _g=createNode(SFO, "G", BRIDGE_CLUSTER, LON, NYC, SFO);
JChannel _h=createNode(SFO, "H", BRIDGE_CLUSTER, LON, NYC, SFO);
JChannel _i=createNode(SFO, "I", BRIDGE_CLUSTER, LON, NYC, SFO);) {
JChannel _i=createNode(SFO, "I", BRIDGE_CLUSTER, LON, NYC, SFO)) {

Util.waitUntilAllChannelsHaveSameView(5000, 100, _a,_b,_c);
Util.waitUntilAllChannelsHaveSameView(5000, 100, _d,_e,_f);
Util.waitUntilAllChannelsHaveSameView(5000, 100, _g,_h,_i);

waitUntilRoute(NYC, true, 5000, 500, _a);
waitUntilRoute(SFO, true, 5000, 500, _a);
Expand All @@ -399,19 +403,17 @@ public void testSitesUp() throws Exception {
assert Stream.of(_a,_d,_g).map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class))
.allMatch(RELAY2::isSiteMaster);

Stream.of(_b,_c,_d,_e,_f,_g,_h,_i)
Stream.of(_d,_e,_f,_g,_h,_i)
.map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class))
.forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(true)));
.forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(false)));

// now stop A; B will become new site master and we should get a site-down(NYC), then site-up(NYC)
Util.close(_a);

Util.waitUntil(5000, 500, () -> Stream.of(_b,_c,_d, _e, _f, _g, _h, _i)
Util.waitUntil(5000, 500, () -> Stream.of(_d,_e,_f,_g,_h,_i)
.map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class))
.peek(r -> System.out.printf("%s: %s\n", r.getAddress(), r.getRouteStatusListener()))
.map(r -> (MyRouteStatusListener)r.getRouteStatusListener())
.allMatch(l -> l.down().size() == 1 && l.down().contains(NYC)
&& l.up().size() == 1 && l.up().contains(NYC)));
.allMatch(l -> l.down().contains(LON) && l.up().contains(LON)));
}
}

Expand Down
46 changes: 46 additions & 0 deletions tests/junit-functional/org/jgroups/tests/SiteStatusTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.jgroups.tests;

import org.jgroups.Global;
import org.jgroups.protocols.relay.SiteStatus;
import org.jgroups.protocols.relay.SiteStatus.Status;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Set;

/**
* Tests {@link org.jgroups.protocols.relay.SiteStatus}
* @author Bela Ban
* @since 5.2.17
*/
@Test(groups= Global.FUNCTIONAL)
public class SiteStatusTest {
public void testSiteStatus() {
SiteStatus s=new SiteStatus();
Set<String> res=s.add(Set.of("net1"), Status.up);
assert res.size() == 1 && res.contains("net1");
Status status=s.get("net1");
assert status == Status.up;
res=s.add(Set.of("net1"), Status.up);
assert res.isEmpty();
res=s.add(Set.of("net1"), Status.down);
assert res.size() == 1 && res.contains("net1");
status=s.get("net1");
assert status == Status.down;

status=s.get("hf");
assert status == null;
}

public void testSiteStatus2() {
SiteStatus s=new SiteStatus();
Set<String> retval=s.add(Set.of("hf","net1","net2"),Status.down);
assert retval.size() == 3;
for(String st: Arrays.asList("hf", "net1", "net2"))
assert s.get(st) == Status.down;
retval=s.add(Set.of("net1"), Status.up);
assert retval.size() == 1;
retval=s.add(Set.of("net1", "net2"), Status.up);
assert retval.size() == 1 && retval.contains("net2");
}
}

0 comments on commit 780022d

Please sign in to comment.