Skip to content

Commit

Permalink
Fixed concurrent access to RouterStubManager.stubs https://issues.red…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 11, 2023
1 parent ddfa6c6 commit 1f239ef
Showing 1 changed file with 53 additions and 38 deletions.
91 changes: 53 additions & 38 deletions src/org/jgroups/stack/RouterStubManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import org.jgroups.Address;
import org.jgroups.PhysicalAddress;
import org.jgroups.annotations.GuardedBy;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.util.SocketFactory;
Expand All @@ -12,8 +11,6 @@

import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -24,14 +21,11 @@
* @author Bela Ban
*/
public class RouterStubManager implements Runnable, RouterStub.CloseListener {
@GuardedBy("reconnectorLock")
protected final ConcurrentMap<RouterStub,Future<?>> futures=new ConcurrentHashMap<>();

// List of currently connected RouterStubs
protected volatile List<RouterStub> stubs;
protected final List<RouterStub> stubs=new ArrayList<>();

// List of destinations that the reconnect task needs to create and connect
protected final Set<Target> reconnect_list=new HashSet<>();
protected final Set<Target> reconnect_list=new HashSet<>();

protected final Protocol owner;
protected final TimeScheduler timer;
Expand All @@ -49,7 +43,6 @@ public class RouterStubManager implements Runnable, RouterStub.CloseListener {
public RouterStubManager(Protocol owner, String cluster_name, Address local_addr,
String logical_name, PhysicalAddress phys_addr, long interval) {
this.owner = owner;
this.stubs = new ArrayList<>();
this.log = LogFactory.getLog(owner.getClass());
this.timer = owner.getTransport().getTimer();
this.cluster_name=cluster_name;
Expand All @@ -73,21 +66,19 @@ public static RouterStubManager emptyGossipClientStubManager(Protocol p) {
* @param action
*/
public void forEach(Consumer<RouterStub> action) {
stubs.stream().filter(RouterStub::isConnected).forEach(action);
synchronized(stubs) {
stubs.stream().filter(RouterStub::isConnected).forEach(action);
}
}

/**
* Applies action to a randomly picked RouterStub that's connected
* @param action
*/
public void forAny(Consumer<RouterStub> action) {
while(!stubs.isEmpty()) {
RouterStub stub=Util.pickRandomElement(stubs);
if(stub != null && stub.isConnected()) {
action.accept(stub);
return;
}
}
RouterStub stub=findRandomConnectedStub();
if(stub != null && stub.isConnected())
action.accept(stub);
}


Expand All @@ -110,12 +101,17 @@ public RouterStub unregisterStub(IpAddress router_addr) {


public void connectStubs() {
for(RouterStub stub : stubs) {
List<RouterStub> list=null;
synchronized(stubs) {
list=new ArrayList<>(stubs);
}

for(RouterStub stub: list) {
try {
if(!stub.isConnected())
stub.connect(cluster_name, local_addr, logical_name, phys_addr);
}
catch (Throwable e) {
catch(Throwable e) {
moveStubToReconnects(stub);
}
}
Expand All @@ -124,19 +120,23 @@ public void connectStubs() {

public void disconnectStubs() {
stopReconnector();
for(RouterStub stub : stubs) {
try {
stub.disconnect(cluster_name, local_addr);
}
catch (Throwable e) {
synchronized(stubs) {
for(RouterStub stub : stubs) {
try {
stub.disconnect(cluster_name, local_addr);
}
catch(Throwable e) {
}
}
}
}
}

public void destroyStubs() {
stopReconnector();
stubs.forEach(RouterStub::destroy);
stubs.clear();
synchronized(stubs) {
stubs.forEach(RouterStub::destroy);
stubs.clear();
}
}

public String printStubs() {
Expand Down Expand Up @@ -201,10 +201,9 @@ protected void moveStubToReconnects(RouterStub stub) {

protected boolean add(RouterStub stub) {
if(stub == null) return false;
List<RouterStub> new_stubs=new ArrayList<>(stubs);
boolean retval=!new_stubs.contains(stub) && new_stubs.add(stub);
this.stubs=new_stubs;
return retval;
synchronized(stubs) {
return !stubs.contains(stub) && stubs.add(stub);
}
}


Expand All @@ -217,10 +216,8 @@ protected boolean add(Target target) {

protected boolean remove(RouterStub stub) {
if(stub == null) return false;
boolean retval=this.stubs.remove(stub);
stub.destroy();
List<RouterStub> new_stubs=new ArrayList<>(stubs);
boolean retval=new_stubs.remove(stub);
this.stubs=new_stubs;
return retval;
}

Expand All @@ -233,14 +230,32 @@ protected boolean remove(Target target) {


protected RouterStub find(IpAddress router_addr) {
for(RouterStub stub: stubs) {
IpAddress addr=stub.gossipRouterAddress();
if(Objects.equals(addr, router_addr))
return stub;
synchronized(stubs) {
for(RouterStub stub : stubs) {
IpAddress addr=stub.gossipRouterAddress();
if(Objects.equals(addr, router_addr))
return stub;
}
}
return null;
}

protected RouterStub findRandomConnectedStub() {
synchronized(stubs) {
while(connectedStubs() > 0) {
RouterStub tmp=Util.pickRandomElement(stubs);
if(tmp != null && tmp.isConnected())
return tmp;
}
return null;
}
}

// unsynchronized
protected int connectedStubs() {
return (int)stubs.stream().filter(RouterStub::isConnected).count();
}

protected synchronized void startReconnector() {
if(reconnector_task == null || reconnector_task.isDone())
reconnector_task=timer.scheduleWithFixedDelay(this, interval, interval, TimeUnit.MILLISECONDS);
Expand Down

0 comments on commit 1f239ef

Please sign in to comment.