Skip to content

Commit

Permalink
- Adding unit tests for send/receive across sites (https://issues.red…
Browse files Browse the repository at this point in the history
…hat.com/browse/JGRP-2699)

- Added test for sending of multicasts and reception of unicasts
- Added test for sending of message to SiteMaster(null)
- Added slf4j-nop to make the stupid warning message by testng go away :-(
  • Loading branch information
belaban committed Jul 26, 2023
1 parent 780022d commit 764e5c2
Show file tree
Hide file tree
Showing 8 changed files with 496 additions and 157 deletions.
1 change: 1 addition & 0 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<dependency org="org.apache.logging.log4j" name="log4j-api" rev="2.+"/>
<dependency org="org.apache.logging.log4j" name="log4j-core" rev="2.+"/>
<dependency org="org.slf4j" name="slf4j-api" rev="2.0.+"/>
<dependency org="org.slf4j" name="slf4j-nop" rev="2.0.+"/>
<dependency org="org.testng" name="testng" rev="7.7.1"/>
<dependency org="com.beust" name="jcommander" rev="1.+"/>
<dependency org="org.wildfly.security" name="wildfly-elytron" rev="1.16.+" />
Expand Down
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.3</version>
<version>2.0.7</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>2.0.7</version>
<optional>true</optional>
</dependency>

Expand Down
4 changes: 3 additions & 1 deletion src/org/jgroups/JChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,9 @@ public JChannel up(MessageBatch batch) {
return this;
}


public String toString() {
return isConnected()? String.format("%s (%s)", address(), cluster_name) : super.toString();
}

@ManagedOperation
public String toString(boolean details) {
Expand Down
46 changes: 26 additions & 20 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import static org.jgroups.protocols.relay.Relay2Header.*;

// todo: check if copy is needed in route(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not)

// todo: use CompletableFutures in routeThen(); this could parallelize routing and delivery/passsing up
// todo: check if a message can bypass RELAY2 completely when NO_RELAY is set (in up(),down())
/**
* Provides relaying of messages between autonomous sites.<br/>
* Design: ./doc/design/RELAY2.txt and at https://github.com/belaban/JGroups/blob/master/doc/design/RELAY2.txt.<br/>
Expand All @@ -41,7 +42,8 @@
@MBean(description="RELAY2 protocol")
public class RELAY2 extends Protocol {
// reserved flags
public static final short can_become_site_master_flag = 1 << 1;
public static final short can_become_site_master_flag = 1 << 1;
protected static final String RELAY2_CL=RELAY2.class.getSimpleName();

/* ------------------------------------------ Properties ---------------------------------------------- */
@Property(description="Name of the site; must be defined in the configuration",writable=false)
Expand Down Expand Up @@ -430,6 +432,8 @@ public Object down(Event evt) {
}

public Object down(Message msg) {
//if(msg.isFlagSet(Flag.NO_RELAY))
// return down_prot.down(msg);
msg.src(local_addr);
return process(true, msg);
}
Expand All @@ -441,6 +445,8 @@ public Object up(Event evt) {
}

public Object up(Message msg) {
// if(msg.isFlagSet(Flag.NO_RELAY))
// return up_prot.up(msg);
Message copy=msg;
Relay2Header hdr=msg.getHeader(id);
if(hdr != null) {
Expand All @@ -458,6 +464,8 @@ public void up(MessageBatch batch) {
List<SiteAddress> unreachable_sites=null;
for(Iterator<Message> it=batch.iterator(); it.hasNext();) {
Message msg=it.next(), copy=msg;
// if(msg.isFlagSet(Flag.NO_RELAY))
// continue;
Relay2Header hdr=msg.getHeader(id);
it.remove();
if(hdr != null) {
Expand Down Expand Up @@ -546,6 +554,9 @@ public void handleView(View view) {
topo().adjust(this.site, view.getMembers());
}

public String toString() {
return String.format("%s%s", RELAY2_CL, local_addr != null? String.format(" (%s)", local_addr) : "");
}

/** Called to handle a message received from a different site (via a bridge channel) */
protected void handleRelayMessage(Relay2Header hdr, Message msg) {
Expand Down Expand Up @@ -605,29 +616,24 @@ protected Object process(boolean down, Message msg) {
switch(type) {
case ALL:
if(down)
return routeThen(msg,null,() -> deliver(null, msg, true));
return routeThen(msg,null,() -> passUp(msg));
return routeThen(msg, null,() -> deliver(null, msg, true));
return routeThen(msg, null, () -> passUp(msg));
case SM_ALL:
if(down)
return routeThen(msg,null,() -> deliver(local_addr, msg, false));
return routeThen(msg,null,() -> passUp(msg));
return routeThen(msg, null, () -> passUp(msg));
case SM:
if(sameSite(dst)) {
if(down)
return deliver(local_addr, msg, false);
if(sameSite(dst))
return passUp(msg);
}
return route(msg, Arrays.asList(dst.getSite()));
case UNICAST:
if(sameSite(dst)) {
if(down)
return deliver(dst, msg, false);
if(local_addr.equals(dst))
return passUp(msg);
return deliver(dst, msg, false);
if(down) {
// no passUp() if dst == local_addr: we want the transport to use a separate thread to do
// loopbacks
return deliver(dst, msg,false);
}
return passUp(msg);
}
else
return route(msg, Arrays.asList(dst.getSite()));
return route(msg, Arrays.asList(dst.getSite()));
}
}
else {
Expand All @@ -640,10 +646,10 @@ protected Object process(boolean down, Message msg) {
case SM:
if(down)
return sendToLocalSiteMaster(local_addr, msg); // todo: local_addr or msg.src()?
throw new IllegalStateException(String.format("non site master received a sg with dst %s",dst));
throw new IllegalStateException(String.format("non site master received a msg with dst %s",dst));
case UNICAST:
if(down) {
if(sameSite(dst))
if(sameSite(dst)) // todo: if same address -> passUp()
return deliver(dst, msg, false);
return sendToLocalSiteMaster(local_addr, msg);
}
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/stack/ProtocolStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,7 @@ public Object down(Event evt) {
}

public Object down(Message msg) {
if(top_prot != null)
return top_prot.down(msg);
if(top_prot != null) return top_prot.down(msg);
return null;
}

Expand Down
6 changes: 4 additions & 2 deletions src/org/jgroups/util/MyReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Generic receiver for a JChannel
Expand All @@ -23,9 +25,8 @@ public class MyReceiver<T> implements Receiver, Closeable {
public void receive(Message msg) {
T obj=raw_msgs? (T)msg : (T)msg.getObject();
list.add(obj);
if(verbose) {
if(verbose)
System.out.println((name() != null? name() + ":" : "") + " received message from " + msg.getSrc() + ": " + obj);
}
}

@Override
Expand All @@ -36,6 +37,7 @@ public void viewAccepted(View new_view) {

public MyReceiver<T> rawMsgs(boolean flag) {this.raw_msgs=flag; return this;}
public List<T> list() {return list;}
public List<String> list(Function<T,String> f) {return list.stream().map(f).collect(Collectors.toList());}
public MyReceiver<T> verbose(boolean flag) {verbose=flag; return this;}
public String name() {return name;}
public MyReceiver<T> name(String name) {this.name=name; return this;}
Expand Down
Loading

0 comments on commit 764e5c2

Please sign in to comment.