Skip to content

Commit

Permalink
- Removed local delivery stats in RELAY2
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jun 27, 2023
1 parent 0ddbe15 commit 31b9f0a
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 70 deletions.
28 changes: 0 additions & 28 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,9 @@ public class RELAY2 extends Protocol {

protected final LongAdder forward_to_local_mbr_time=new LongAdder();

/** Number of messages delivered locally, e.g. received and delivered to self */
protected final LongAdder local_deliveries=new LongAdder();

@Component(description="Maintains a cache of sites and members",name="topo")
protected Topology topo=new Topology(this);

/** Total time (ms) for received messages that are delivered locally */
protected final LongAdder local_delivery_time=new LongAdder();

/** Log to suppress identical errors for messages to non-existing sites ('no route to site X') */
protected SuppressLog<String> suppress_log_no_route;

Expand Down Expand Up @@ -239,16 +233,6 @@ public long getAvgMsgsForwardingToSM() {return getTimeForwardingToSM() > 0?
public long getAvgMsgsForwardingToLocalMbr() {return getTimeForwardingToLocalMbr() > 0?
(long)(getNumForwardedToLocalMbr() / (getTimeForwardingToLocalMbr()/1000.0)) : 0;}

@ManagedAttribute(description="Number of messages delivered locally, e.g. received and delivered to self")
public long getNumLocalDeliveries() {return local_deliveries.sum();}

@ManagedAttribute(description="The total time (ms) spent delivering received messages locally",type=AttributeType.TIME)
public long getTimeDeliveringLocally() {return TimeUnit.MILLISECONDS.convert(local_delivery_time.sum(),TimeUnit.NANOSECONDS);}

@ManagedAttribute(description="The average number of messages / s for delivering received messages locally")
public long getAvgMsgsDeliveringLocally() {return getTimeDeliveringLocally() > 0?
(long)(getNumLocalDeliveries() / (getTimeDeliveringLocally()/1000.0)) : 0;}

@ManagedAttribute(description="Whether or not this instance is a site master")
public boolean isSiteMaster() {return relayer != null;}

Expand All @@ -272,8 +256,6 @@ public void resetStats() {
relayed_time.reset();
forward_to_local_mbr.reset();
forward_to_local_mbr_time.reset();
local_deliveries.reset();
local_delivery_time.reset();
clearNoRouteCache();
}

Expand Down Expand Up @@ -455,12 +437,7 @@ public Object down(Message msg) {
if(target.getSite().equals(site)) {
if(local_addr.equals(target) || (target instanceof SiteMaster && is_site_master)) {
// we cannot simply pass msg down, as the transport doesn't know how to send a message to a (e.g.) SiteMaster
long start=stats? System.nanoTime() : 0;
forwardTo(local_addr, target, sender, msg, false);
if(stats) {
local_delivery_time.add(System.nanoTime() - start);
local_deliveries.increment();
}
}
else
deliverLocally(target, sender, msg);
Expand Down Expand Up @@ -855,12 +832,7 @@ protected void deliver(Address dest, Address sender, final Message msg) {
Message copy=copy(msg).setDest(dest).setSrc(sender);
if(log.isTraceEnabled())
log.trace(local_addr + ": delivering message from " + sender);
long start=stats? System.nanoTime() : 0;
up_prot.up(copy);
if(stats) {
local_delivery_time.add(System.nanoTime() - start);
local_deliveries.increment();
}
}
catch(Exception e) {
log.error(Util.getMessage("FailedDeliveringMessage"), e);
Expand Down
18 changes: 8 additions & 10 deletions src/org/jgroups/protocols/relay/SiteMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import org.jgroups.Address;
import org.jgroups.util.UUID;
import org.jgroups.util.Util;

import java.util.Arrays;
import java.util.function.Supplier;

/**
Expand All @@ -20,11 +18,11 @@ public SiteMaster() {
}

public SiteMaster(String site) {
this(Util.stringToBytes(site));
this.site=site;
}

public SiteMaster(byte[] site) {
super(0, 0, null, site);
this(new String(site));
}

public Supplier<? extends UUID> create() {
Expand All @@ -34,7 +32,7 @@ public Supplier<? extends UUID> create() {
public int compareTo(Address other) {
if(other instanceof SiteMaster) {
SiteMaster tmp=(SiteMaster)other;
return Util.compare(get(SITE_NAME), tmp.get(SITE_NAME));
return this.site.compareTo(tmp.getSite());
}
return super.compareTo(other);
}
Expand All @@ -43,15 +41,15 @@ public boolean equals(Object obj) {
return compareTo((Address)obj) == 0;
}

public int hashCode() {
return Arrays.hashCode(get(SITE_NAME));
}
// public int hashCode() {
// return Arrays.hashCode(get(SITE_NAME));
// }

public UUID copy() {
return new SiteMaster(get(SITE_NAME));
return new SiteMaster(site);
}

public String toString() {
return "SiteMaster(" + getSite() + ")";
return "SiteMaster(" + site + ")";
}
}
65 changes: 35 additions & 30 deletions src/org/jgroups/protocols/relay/SiteUUID.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import org.jgroups.Address;
import org.jgroups.util.*;

import java.util.Arrays;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.function.Supplier;

/**
Expand All @@ -12,48 +14,34 @@
* @since 3.2
*/
public class SiteUUID extends ExtendedUUID implements SiteAddress {
protected static final byte[] NAME = Util.stringToBytes("relay2.name"); // logical name, can be null
protected static final byte[] SITE_NAME = Util.stringToBytes("relay2.site");

protected String site, name;

public SiteUUID() {
}

public SiteUUID(long mostSigBits, long leastSigBits, String name, String site) {
super(mostSigBits,leastSigBits);
if(name != null)
put(NAME, Util.stringToBytes(name));
put(SITE_NAME, Util.stringToBytes(site));
this.name=name;
this.site=site;
}

public SiteUUID(long mostSigBits, long leastSigBits, byte[] name, byte[] site) {
super(mostSigBits,leastSigBits);
if(name != null)
put(NAME, name);
put(SITE_NAME, site);
this(mostSigBits, leastSigBits, name != null? new String(name) : null, new String(site));
}

public SiteUUID(UUID uuid, String name, String site) {
super(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
if(name != null)
put(NAME, Util.stringToBytes(name));
put(SITE_NAME, Util.stringToBytes(site));
this(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), name, site);
}

public String getName() {return name;}
public String getSite() {return site;}

public Supplier<? extends UUID> create() {
return SiteUUID::new;
}

public String getName() {
return Util.bytesToString(get(NAME));
}

public String getSite() {
return Util.bytesToString(get(SITE_NAME));
}

public UUID copy() {
return new SiteUUID(mostSigBits, leastSigBits, get(NAME), get(SITE_NAME));
return new SiteUUID(mostSigBits, leastSigBits, name, site);
}

@Override
Expand All @@ -63,8 +51,7 @@ public String toString() {

public int hashCode() {
int retval=super.hashCode();
byte[] site=get(SITE_NAME);
return site != null? Arrays.hashCode(site) + retval : retval;
return site != null? site.hashCode() + retval : retval;
}

public boolean equals(Object obj) {
Expand All @@ -76,17 +63,35 @@ public boolean equals(Object obj) {
@Override
public int compareTo(Address other) {
if (other instanceof SiteUUID) {
int siteCompare = Util.compare(get(SITE_NAME), ((SiteUUID) other).get(SITE_NAME));
int siteCompare = this.site.compareTo(((SiteUUID)other).getSite());
//compareTo will check the bits.
return siteCompare == 0 ? super.compareTo(other) : siteCompare;
}
return super.compareTo(other);
}

public String print(boolean detailed) {
String name=getName();
String retval=name != null? name : NameCache.get(this);
return retval + ":" + getSite() + (detailed? printOthers() : "");
return retval + ":" + site + (detailed? printOthers() : "");
}

@Override
public int serializedSize() {
return super.serializedSize() + Util.size(site) + Util.size(name);
}

@Override
public void writeTo(DataOutput out) throws IOException {
super.writeTo(out);
Bits.writeString(site, out);
Bits.writeString(name, out);
}

@Override
public void readFrom(DataInput in) throws IOException {
super.readFrom(in);
site=Bits.readString(in);
name=Bits.readString(in);
}

protected String printOthers() {
Expand All @@ -97,7 +102,7 @@ protected String printOthers() {
return sb.toString();
for(int i=0; i < keys.length; i++) {
byte[] key=keys[i];
if(key == null || Arrays.equals(key,SITE_NAME) || Arrays.equals(key, NAME))
if(key == null)
continue;
byte[] val=values[i];
Object obj=null;
Expand Down
13 changes: 11 additions & 2 deletions tests/junit-functional/org/jgroups/tests/SizeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.jgroups.protocols.relay.Relay2Header;
import org.jgroups.protocols.relay.SiteMaster;
import org.jgroups.protocols.relay.SiteUUID;
import org.jgroups.protocols.relay.Topology;
import org.jgroups.protocols.relay.Topology.MemberInfo;
import org.jgroups.protocols.relay.Topology.Members;
import org.jgroups.stack.GossipData;
Expand Down Expand Up @@ -618,6 +617,14 @@ public void testRelay2Header() throws Exception {
assert hdr.getVisitedSites().equals(hdr2.getVisitedSites());
}

public void testSiteUUID() throws Exception {
SiteUUID u1=new SiteUUID((UUID)Util.createRandomAddress(), "A", "sfo");
_testSize(u1);

SiteMaster sm=new SiteMaster("sfo");
_testSize(sm);
}

public void testMemberInfo() throws Exception {
Address addr=Util.createRandomAddress("A");
IpAddress ip_addr=new IpAddress("127.0.0.1", 5000);
Expand Down Expand Up @@ -656,7 +663,7 @@ public void testEncryptHeader() throws Exception {
}


public static void testIpAddress() throws Exception {
public void testIpAddress() throws Exception {
IpAddress addr=new IpAddress();
_testSize(addr);
}
Expand Down Expand Up @@ -884,6 +891,8 @@ private static void _testSize(Address addr) throws Exception {
byte[] serialized_form=Util.streamableToByteBuffer(addr);
System.out.println("size=" + size + ", serialized size=" + serialized_form.length);
Assert.assertEquals(serialized_form.length, size);
Address addr2=Util.streamableFromByteBuffer(addr.getClass(), serialized_form);
assert addr.equals(addr2);
}


Expand Down

0 comments on commit 31b9f0a

Please sign in to comment.