Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 25, 2023
1 parent 020704b commit 5ad4544
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 116 deletions.
30 changes: 21 additions & 9 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import static org.jgroups.protocols.relay.Relay2Header.*;

// todo: check if copy is needed in route(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not)

// todo: use CompletableFutures in routeThen(); this could parallelize routing and delivery/passsing up
// todo: check if a message can bypass RELAY2 completely when NO_RELAY is set (in up(),down())
/**
* Provides relaying of messages between autonomous sites.<br/>
* Design: ./doc/design/RELAY2.txt and at https://github.com/belaban/JGroups/blob/master/doc/design/RELAY2.txt.<br/>
Expand All @@ -41,7 +42,8 @@
@MBean(description="RELAY2 protocol")
public class RELAY2 extends Protocol {
// reserved flags
public static final short can_become_site_master_flag = 1 << 1;
public static final short can_become_site_master_flag = 1 << 1;
protected static final String RELAY2_CL=RELAY2.class.getSimpleName();

/* ------------------------------------------ Properties ---------------------------------------------- */
@Property(description="Name of the site; must be defined in the configuration",writable=false)
Expand Down Expand Up @@ -430,6 +432,8 @@ public Object down(Event evt) {
}

public Object down(Message msg) {
//if(msg.isFlagSet(Flag.NO_RELAY))
// return down_prot.down(msg);
msg.src(local_addr);
return process(true, msg);
}
Expand All @@ -441,6 +445,8 @@ public Object up(Event evt) {
}

public Object up(Message msg) {
// if(msg.isFlagSet(Flag.NO_RELAY))
// return up_prot.up(msg);
Message copy=msg;
Relay2Header hdr=msg.getHeader(id);
if(hdr != null) {
Expand All @@ -458,6 +464,8 @@ public void up(MessageBatch batch) {
List<SiteAddress> unreachable_sites=null;
for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
Message msg=it.next(), copy=msg;
// if(msg.isFlagSet(Flag.NO_RELAY))
// continue;
Relay2Header hdr=msg.getHeader(id);
it.remove();
if(hdr != null) {
Expand Down Expand Up @@ -546,6 +554,9 @@ public void handleView(View view) {
topo().adjust(this.site, view.getMembers());
}

public String toString() {
return String.format("%s%s", RELAY2_CL, local_addr != null? String.format(" (%s)", local_addr) : "");
}

/** Called to handle a message received from a different site (via a bridge channel) */
protected void handleRelayMessage(Relay2Header hdr, Message msg) {
Expand Down Expand Up @@ -605,21 +616,22 @@ protected Object process(boolean down, Message msg) {
switch(type) {
case ALL:
if(down)
return routeThen(msg,null,() -> deliver(null, msg, true));
return routeThen(msg,null,() -> passUp(msg));
return routeThen(msg, null,() -> deliver(null, msg, true));
return routeThen(msg, null, () -> passUp(msg));
case SM_ALL:
return routeThen(msg,null,() -> passUp(msg));
return routeThen(msg, null, () -> passUp(msg));
case SM:
if(sameSite(dst))
return passUp(msg);
return route(msg, Arrays.asList(dst.getSite()));
case UNICAST:
if(sameSite(dst)) {
if(down)
return deliver(dst, msg, false);
if(local_addr.equals(dst))
return passUp(msg);
return deliver(dst, msg, false);
if(down)
return deliver(dst, msg, false);
String s=String.format("a unicast requires dest (%s) == local_addr (%s)", dst, local_addr);
throw new IllegalStateException(s);
}
else
return route(msg, Arrays.asList(dst.getSite()));
Expand All @@ -638,7 +650,7 @@ protected Object process(boolean down, Message msg) {
throw new IllegalStateException(String.format("non site master received a sg with dst %s",dst));
case UNICAST:
if(down) {
if(sameSite(dst))
if(sameSite(dst)) // todo: if same address -> passUp()
return deliver(dst, msg, false);
return sendToLocalSiteMaster(local_addr, msg);
}
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/util/MyReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ public class MyReceiver<T> implements Receiver, Closeable {
public void receive(Message msg) {
T obj=raw_msgs? (T)msg : (T)msg.getObject();
list.add(obj);
if(verbose) {
if(verbose)
System.out.println((name() != null? name() + ":" : "") + " received message from " + msg.getSrc() + ": " + obj);
}
}

@Override
Expand Down
Loading

0 comments on commit 5ad4544

Please sign in to comment.