diff --git a/conf/fork-stacks.xml b/conf/fork-stacks.xml index 4be2478da75..612a3a56c56 100644 --- a/conf/fork-stacks.xml +++ b/conf/fork-stacks.xml @@ -11,9 +11,9 @@ - + - + diff --git a/conf/fork.xml b/conf/fork.xml index dcb3bd0b6fd..44e12c5d150 100644 --- a/conf/fork.xml +++ b/conf/fork.xml @@ -23,7 +23,7 @@ xsi:schemaLocation="fork fork-stacks-4.2.xsd"> - + diff --git a/conf/jg-magic-map.xml b/conf/jg-magic-map.xml index d7c25dc3400..83563dbcac9 100644 --- a/conf/jg-magic-map.xml +++ b/conf/jg-magic-map.xml @@ -42,7 +42,6 @@ - diff --git a/conf/jg-messages.properties b/conf/jg-messages.properties index afebca972ed..a858d417dc9 100644 --- a/conf/jg-messages.properties +++ b/conf/jg-messages.properties @@ -59,8 +59,6 @@ CouldNotOpenConnectionToDatabase = JGRP000115: Could not open connection to data DefaultMembershipChangePolicyFailed = JGRP000119: default membership change policy failed DidnTFindPhysicalAddressFor = JGRP000121: didn't find physical address for DigestOrSenderIsNull = JGRP000122: digest or sender is null -DiscardedLOCKDENIEDResponseWithLockId = JGRP000123: discarded LOCK-DENIED response with lock-id= -DiscardedLOCKGRANTEDResponseWithLockId = JGRP000124: discarded LOCK-GRANTED response with lock-id= ErrorBuildingURL = JGRP000126: Error building URL ErrorCallingService = JGRP000127: Error calling service ErrorClearingTable = JGRP000128: Error clearing table diff --git a/conf/jg-protocol-ids.xml b/conf/jg-protocol-ids.xml index 91f16f74c37..7243bb30b79 100644 --- a/conf/jg-protocol-ids.xml +++ b/conf/jg-protocol-ids.xml @@ -31,7 +31,6 @@ - @@ -55,7 +54,6 @@ - diff --git a/doc/manual/blocks.adoc b/doc/manual/blocks.adoc index 236d58a013c..bac18cb21ee 100644 --- a/doc/manual/blocks.adoc +++ b/doc/manual/blocks.adoc @@ -718,141 +718,6 @@ Note that this class was written as a demo of how state can be shared between no never been heavily tested and is therefore not meant to be used in production. -[[LockService]] -=== Cluster wide locking - -`LockService` can be used to acquire locks on a cluster-wide basis; ie. only one node can acquire a given lock. E.g. if -member B acquires lock L, and member C also tries to acquire L, then C will block until B releases L -(or leaves /crashes) - -The new service is implemented as a building block (`org.jgroups.blocks.locking.LockService`) and a protocol -(`CENTRAL_LOCK` or `CENTRAL_LOCK2`). `LockService` looks up the protocol and talks to it via events. If no locking -protocol is found, `LockService` won't start and will throw an exception. - -The main abstraction of a distributed lock is an implementation of `java.util.concurrent.locks.Lock`. - -Below is an example of how LockService is typically used: - -[source,java] ----- -// locking.xml needs to contain a locking protocol, e.g. CENTRAL_LOCK -JChannel ch=new JChannel("/home/bela/locking.xml"); -LockService lock_service=new LockService(ch); -ch.connect("lock-cluster"); -Lock lock=lock_service.getLock("mylock"); // gets a cluster-wide lock -lock.lock(); -try { - // do something with the locked resource -} -finally { - lock.unlock(); -} ----- - -In the example, we create a channel, then a `LockService`, then connect the channel. If the channel's -configuration doesn't include a locking protocol, an exception will be thrown. -Then we grab a lock named `"mylock"`, which we lock and subsequently unlock. If another member P had already -acquired `"mylock"`, we'd block until P released the lock, or P left the cluster or crashed. - -Note that the owner of a lock is always a given thread in a cluster, so the owner is the JGroups address and -the thread ID. *This means that different threads inside the same JVM trying to access the same named lock -will compete for it.* If `thread-22` grabs the lock first, then `thread-5` will block until `thread-22` -releases the lock. - -NOTE: If we want the lock owner to only be the address (and not the thread-id), then property -`use_thread_id_for_lock_owner` can be set to `false`. This means that all threads in a given node can lock or unlock -a given lock. Example: thread T1 locks "lock", but thread T2 can unlock it. This is _not_ the same semantics as -`java.util.concurrent.locks.Lock`, but nevertheless useful in some scenarios. (Introduced in 3.6) - -JGroups includes a demo (`org.jgroups.demos.LockServiceDemo`), which can be used to interactively experiment -with distributed locks. `LockServiceDemo -h` dumps all command line options. - -There are two protocols which provides locking: <> and <>. - -Note that the locking protocol has to be placed at or towards the top of the stack (close to the channel), because it -requires reliable unicasts and multicasts (e.g. provided by `UNICAST3` and `NAKACK2`). - - -[[LockingAndMerges]] -==== Locking and merges - -The following scenario is susceptible to network partitioning and subsequent merging: we have a cluster -view of `{A,B,C,D}` and then the cluster splits into `{A,B}` and `{C,D}`. Assume that B and D now acquire a -lock `"mylock"`. This is what happens (with the locking protocol being `CENTRAL_LOCK`): - -* There are 2 coordinators: A for `{A,B}` and C for `{C,D}` -* B successfully acquires `"mylock"` from A -* D successfully acquires `"mylock"` from C -* The partitions merge back into `{A,B,C,D}`. Now, only A is the coordinator, but C ceases -to be a coordinator -* Problem: D still holds a lock which should actually be invalid! -There is no easy way (via the Lock API) to 'remove' the lock from D. We could for example simply release -D's lock on `"mylock"`, but then there's no way telling D that the lock it holds is actually stale! - -Therefore the recommended solution here is for nodes to listen to `MergeView` changes if they expect -merging to occur, and re-acquire all of their locks after a merge, e.g.: - -[source,java] ----- - -Lock l1, l2, l3; -LockService lock_service; -... -public void viewAccepted(View view) { - if(view instanceof MergeView) { - new Thread() { - public void run() { - lock_service.unlockAll(); - // stop all access to resources protected by l1, l2 or l3 - // every thread needs to re-acquire the locks it holds - } - }.start(); - } -} ----- - -==== Locking and merges (updated) -With <>, merging of partitions is handled differently. Contrary to CENTRAL_LOCK, which has the coordinator -back up its lock tables to one or more backup members, CENTRAL_LOCK2 doesn't do this. - -Instead, when the current coordinator leaves or crashes, the new coordinator fetches information about locks and pending -lock/unlock requests from all members, and then builds its lock table based on this information. - -In the above scenario with both B and D holding `mylock`, in case of a merge (say A becomes the new coordinator), D -will be told that its lock `mylock` has been *revoked*. This means that D needs to force-unlock D. This can be done -in the `lockRevoked()` callback, e.g.: - -[source,java] ----- -LockService lock_service; -... -public void lockRevoked(String lock_name, Owner current_owner) { - lock_service.unlockForce(lock_name); -} ----- - -This is maginally better than CENTRAL_LOCK, but admittedly less than ideal. Given the following code: - -[source,java] ----- -Lock lock=lock_service.get("mylock"; -lock.lock(); -try { - // do something while the lock is held - longRunningAction(); -} -finally { - lock.unlock -} ----- - -When `mylock` is revoked, `longRunningAction()` should be stopped immediately, or - even better - its changes should be -undone (like in a transaction). However, this isn't feasible and would unnecessarily complicate the code. - -Here, we see that the `Lock` abstraction, as easy as it is and as often it is used *locally* (inside the same JVM), -may not be the best abstraction for a distributed setting! - - [[CounterService]] === Cluster wide atomic counters diff --git a/doc/manual/protocols.adoc b/doc/manual/protocols.adoc index d22460e68a8..64de1aa99d3 100644 --- a/doc/manual/protocols.adoc +++ b/doc/manual/protocols.adoc @@ -2161,41 +2161,6 @@ JIRA: https://issues.redhat.com/browse/JGRP-2402 ${SOS} -[[LockingProtocols]] -==== Locking protocols - -The locking protocol is org.jgroups.protocols.CENTRAL_LOCK: - -${Locking} - -[[CENTRAL_LOCK]] -===== CENTRAL_LOCK - -CENTRAL_LOCK has the current coordinator of a cluster grants locks, so every node has to communicate with the -coordinator to acquire or release a lock. Lock requests by different nodes for the same lock are processed -in the order in which they are received. - -A coordinator maintains a lock table. To prevent losing the knowledge of who holds which locks, the coordinator can push -lock information to a number of backups defined by num_backups. If num_backups is 0, no replication of lock information -happens. If num_backups is greater than 0, then the coordinator pushes information about acquired and released locks to -all backup nodes. Topology changes might create new backup nodes, and lock information is pushed to those on -becoming a new backup node. - -The advantage of CENTRAL_LOCK is that all lock requests are granted in the same order across the cluster. - -${CENTRAL_LOCK} - - -[[CENTRAL_LOCK2]] -===== CENTRAL_LOCK2 - -In CENTRAL_LOCK2, the coordinator (= lock issuer) does not backup its lock table to other member(s), but instead a new -coordinator fetches information about held locks and pending lock/unlock requests from existing members, before it -starts processing lock requests. See <> for details. - -${CENTRAL_LOCK2} - - [[COUNTER]] diff --git a/src/org/jgroups/Event.java b/src/org/jgroups/Event.java index 8bc995c03da..9b028b5df95 100644 --- a/src/org/jgroups/Event.java +++ b/src/org/jgroups/Event.java @@ -41,11 +41,6 @@ public class Event { public static final int ADD_PHYSICAL_ADDRESS = 89; // arg = Tuple --> boolean public static final int REMOVE_ADDRESS = 90; // arg = Address public static final int GET_LOCAL_ADDRESS = 91; // arg = null --> UUID (local_addr) - public static final int LOCK = 95; // arg = LockInfo - public static final int UNLOCK = 96; // arg = LockInfo - public static final int UNLOCK_ALL = 97; // arg = null - public static final int LOCK_AWAIT = 98; // arg = LockInfo - public static final int LOCK_SIGNAL = 99; // arg = AwaitInfo public static final int IS_MERGE_IN_PROGRESS = 100; // returns true or false public static final int GET_PHYSICAL_ADDRESSES = 102; // arg = null (returns all physical addresses) public static final int SITE_UNREACHABLE = 104; // arg = SiteMaster (RELAY2/RELAY3) @@ -56,7 +51,6 @@ public class Event { public static final int GET_PING_DATA = 109; // arg = cluster_name public static final int GET_SECRET_KEY = 111; // arg = null -> Tuple // PK+version public static final int SET_SECRET_KEY = 112; // arg = Tuple // PK+version - public static final int UNLOCK_FORCE = 113; // arg = lock name public static final int INSTALL_MERGE_VIEW = 114; // arg = MergeView public static final int IS_LOCAL_SITEMASTER = 115; // arg = SiteMaster(site), returns true / false public static final int IS_LOCAL = 116; // arg = SiteAddress(site), returns true / false @@ -124,11 +118,6 @@ public static String type2String(int t) { case ADD_PHYSICAL_ADDRESS: return "ADD_PHYSICAL_ADDRESS"; case REMOVE_ADDRESS: return "REMOVE_ADDRESS"; case GET_LOCAL_ADDRESS: return "GET_LOCAL_ADDRESS"; - case LOCK: return "LOCK"; - case UNLOCK: return "UNLOCK"; - case UNLOCK_ALL: return "UNLOCK_ALL"; - case LOCK_AWAIT: return "LOCK_AWAIT"; - case LOCK_SIGNAL: return "LOCK_SIGNAL"; case IS_MERGE_IN_PROGRESS: return "IS_MERGE_IN_PROGRESS"; case GET_PHYSICAL_ADDRESSES: return "GET_PHYSICAL_ADDRESSES"; case SITE_UNREACHABLE: return "SITE_UNREACHABLE"; @@ -139,7 +128,6 @@ public static String type2String(int t) { case GET_PING_DATA: return "GET_PING_DATA"; case GET_SECRET_KEY: return "GET_SECRET_KEY"; case SET_SECRET_KEY: return "SET_SECRET_KEY"; - case UNLOCK_FORCE: return "UNLOCK_FORCE"; case INSTALL_MERGE_VIEW: return "INSTALL_MERGE_VIEW"; case IS_LOCAL_SITEMASTER: return "IS_LOCAL_SITEMASTER"; case IS_LOCAL: return "IS_LOCAL"; diff --git a/src/org/jgroups/blocks/locking/AwaitInfo.java b/src/org/jgroups/blocks/locking/AwaitInfo.java deleted file mode 100644 index 10f073a1d40..00000000000 --- a/src/org/jgroups/blocks/locking/AwaitInfo.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.jgroups.blocks.locking; - - -public class AwaitInfo { - protected final String name; - protected final boolean all; - - public AwaitInfo(String name, boolean all) { - this.name=name; - this.all=all; - } - - /** - * @return Returns the name. - */ - public String getName() { - return name; - } - - /** - * @return Returns whether is all. - */ - public boolean isAll() { - return all; - } - - public String toString() { - return name + ", awaitAll=" + all; - } -} diff --git a/src/org/jgroups/blocks/locking/LockInfo.java b/src/org/jgroups/blocks/locking/LockInfo.java deleted file mode 100644 index bd1f24a76f8..00000000000 --- a/src/org/jgroups/blocks/locking/LockInfo.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.jgroups.blocks.locking; - -import java.util.concurrent.TimeUnit; - -/** - * @author Bela Ban - */ -public class LockInfo { - protected final String name; - protected final boolean is_trylock; - protected final boolean lock_interruptibly; - protected final boolean use_timeout; - protected final long timeout; - protected final TimeUnit time_unit; - - public LockInfo(String name, boolean is_trylock, boolean lock_interruptibly, boolean use_timeout, - long timeout, TimeUnit time_unit) { - this.name=name; - this.is_trylock=is_trylock; - this.lock_interruptibly=lock_interruptibly; - this.use_timeout=use_timeout; - this.timeout=timeout; - this.time_unit=time_unit; - } - - - public boolean isTrylock() { - return is_trylock; - } - - public boolean isLockInterruptibly() { - return lock_interruptibly; - } - - public boolean isUseTimeout() { - return use_timeout; - } - - public String getName() { - return name; - } - - public long getTimeout() { - return timeout; - } - - public TimeUnit getTimeUnit() { - return time_unit; - } - - public String toString() { - return name + ", trylock=" + is_trylock + ", timeout=" + timeout; - } -} - diff --git a/src/org/jgroups/blocks/locking/LockNotification.java b/src/org/jgroups/blocks/locking/LockNotification.java deleted file mode 100644 index dabcf13ecf1..00000000000 --- a/src/org/jgroups/blocks/locking/LockNotification.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.jgroups.blocks.locking; - -import org.jgroups.util.Owner; - -/** - * @author Bela Ban - */ -public interface LockNotification { - void lockCreated(String name); - void lockDeleted(String name); - void lockRevoked(String lock_name, Owner current_owner); - void locked(String lock_name, Owner owner); - void unlocked(String lock_name, Owner owner); - void awaiting(String lock_name, Owner owner); - void awaited(String lock_name, Owner owner); -} diff --git a/src/org/jgroups/blocks/locking/LockService.java b/src/org/jgroups/blocks/locking/LockService.java deleted file mode 100644 index b8f9a048af3..00000000000 --- a/src/org/jgroups/blocks/locking/LockService.java +++ /dev/null @@ -1,251 +0,0 @@ -package org.jgroups.blocks.locking; - -import org.jgroups.Event; -import org.jgroups.JChannel; -import org.jgroups.protocols.Locking; - -import java.util.Date; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -/** - * LockService is the main class for to use for distributed locking functionality. LockService needs access to a - * {@link JChannel} and interacts with a locking protocol (e.g. {@link org.jgroups.protocols.CENTRAL_LOCK}) via events.

- * When no locking protocol is seen on the channel's stack, LockService will throw an exception at startup. An example - * of using LockService is: - *

-   JChannel ch=new JChannel("/home/bela/locking.xml); // locking.xml needs to have a locking protocol towards the top
-   LockService lock_service=new LockService(ch);
-   ch.connect("lock-cluster");
-   Lock lock=lock_service.getLock("mylock");
-   lock.lock();
-   try {
-      // do something with the lock acquired
-   }
-   finally {
-      lock.unlock();
-   }
- * 
- *

- * The exact semantics of this lock implemantation are defined in {@link LockImpl}. - *

- * Note that, contrary to the semantics of {@link java.util.concurrent.locks.Lock}, unlock() can be called multiple - * times; after a lock has been released, future calls to unlock() have no effect. - * @author Bela Ban - * @since 2.12 - * @deprecated See http://belaban.blogspot.com/2020/11/i-hate-distributed-locks.html. - */ -@Deprecated -public class LockService { - protected JChannel ch; - protected Locking lock_prot; - - - public LockService() { - - } - - public LockService(JChannel ch) { - setChannel(ch); - } - - public void setChannel(JChannel ch) { - this.ch=ch; - lock_prot=ch.getProtocolStack().findProtocol(Locking.class); - if(lock_prot == null) - throw new IllegalStateException("Channel configuration must include a locking protocol " + - "(subclass of " + Locking.class.getName() + ")"); - } - - public Lock getLock(String lock_name) { - return new LockImpl(lock_name); - } - - public void unlockAll() { - ch.down(new Event(Event.UNLOCK_ALL)); - } - - public void unlockForce(String lock_name) { - ch.down(new Event(Event.UNLOCK_FORCE, lock_name)); - } - - public void addLockListener(LockNotification listener) { - lock_prot.addLockListener(listener); - } - - public void removeLockListener(LockNotification listener) { - lock_prot.removeLockListener(listener); - } - - public String printLocks() { - return lock_prot.printLocks(); - } - - - /** - * Implementation of {@link Lock}. This is a client stub communicates with a server equivalent. The semantics are - * more or less those of {@link Lock}, but may differ slightly.

- * There is no reference counting of lock owners, so acquisition of a lock already held by a thread is a no-op. - * Also, releasing the lock after it was already released is a no-op as well. - *

- * An exact description is provided below. - */ - protected class LockImpl implements Lock { - protected final String name; - protected final AtomicReference holder=new AtomicReference<>(); - - public LockImpl(String name) { - this.name=name; - } - - /** - * {@inheritDoc} - * Blocks until the lock has been acquired. Masks interrupts; if an interrupt was received on entry or while - * waiting for the lock acquisition, it won't cause the call to return. However, the thread's status will be - * set to interrupted when the call returns. - */ - @Override - public void lock() { - ch.down(new Event(Event.LOCK, new LockInfo(name, false, false, false, 0, TimeUnit.MILLISECONDS))); - holder.set(Thread.currentThread()); - } - - /** - * {@inheritDoc} - * If the thread is interrupted at entry, the call will throw an InterruptedException immediately and the lock - * won't be acquired. If the thread is interrupted while waiting for the lock acquition, an InterruptedException - * will also be thrown immediately. The thread's interrupt status will not be set after the call returns. - * @throws InterruptedException - */ - public void lockInterruptibly() throws InterruptedException { - ch.down(new Event(Event.LOCK, new LockInfo(name, false, true, false, 0, TimeUnit.MILLISECONDS))); - Thread currentThread = Thread.currentThread(); - if(currentThread.isInterrupted()) - throw new InterruptedException(); - else - holder.set(currentThread); - } - - /** - * {@inheritDoc} - * If the thread is interrupted at entry or during the call, no InterruptedException will be thrown, but the - * thread's status will be set to interrupted upon return. An interrupt has therefore no impact on the - * return value (success or failure). - */ - public boolean tryLock() { - Boolean retval=(Boolean)ch.down(new Event(Event.LOCK, new LockInfo(name, true, false, false, 0, TimeUnit.MILLISECONDS))); - if(retval != null && retval) - holder.set(Thread.currentThread()); - return retval == null ? false : retval; - } - - /** - * {@inheritDoc} - * * If the thread is interrupted at entry, the call will throw an InterruptedException immediately and the lock - * won't be acquired. If the thread is interrupted while waiting for the lock acquition, an InterruptedException - * will also be thrown immediately. The thread's interrupt status will not be set after the call returns. - * @param time - * @param unit - * @return - * @throws InterruptedException - */ - public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - Boolean retval=(Boolean)ch.down(new Event(Event.LOCK, new LockInfo(name, true, true, true, time, unit))); - if(Thread.currentThread().isInterrupted()) - throw new InterruptedException(); - if(retval != null && retval) - holder.set(Thread.currentThread()); - return retval == null ? false : retval; - } - - /** - * {@inheritDoc} - * Releases a lock. Contrary to the parent's implementation, this method can be called more than once: - * the release of a lock that has already been released, or is not owned by this thread is a no-op. - */ - public void unlock() { - ch.down(new Event(Event.UNLOCK, new LockInfo(name, false, false, false, 0, TimeUnit.MILLISECONDS))); - holder.compareAndSet(Thread.currentThread(), null); // only set if the current thread is actually the holder - } - - /** - * This condition object is only allowed to work 1 for each lock. - * If more than 1 condition is created for this lock, they both will - * be awaiting/signalling on the same lock - */ - public Condition newCondition() { - return new ConditionImpl(name, holder); - } - - public String toString() { - return name + (holder.get() == null? " [unlocked]" : " [held by " + holder.get() + "]"); - } - } - - private class ConditionImpl implements Condition { - protected final String name; - protected final AtomicReference holder; - - public ConditionImpl(String name, AtomicReference holder) { - this.name=name; - this.holder=holder; - } - - @Override - public void await() throws InterruptedException { - ch.down(new Event(Event.LOCK_AWAIT, new LockInfo(name, false, - true, false, 0, TimeUnit.MILLISECONDS))); - if(Thread.currentThread().isInterrupted()) - throw new InterruptedException(); - } - - @Override - public void awaitUninterruptibly() { - ch.down(new Event(Event.LOCK_AWAIT, new LockInfo(name, false, - false, false, 0, TimeUnit.MILLISECONDS))); - } - - @Override - public long awaitNanos(long nanosTimeout) throws InterruptedException { - Long waitLeft = (Long)ch.down(new Event(Event.LOCK_AWAIT, - new LockInfo(name, false, true, true, nanosTimeout, - TimeUnit.NANOSECONDS))); - if(Thread.currentThread().isInterrupted()) - throw new InterruptedException(); - return waitLeft; - } - - @Override - public boolean await(long time, TimeUnit unit) - throws InterruptedException { - return awaitNanos(unit.toNanos(time)) > 0; - } - - @Override - public boolean awaitUntil(Date deadline) throws InterruptedException { - long waitUntilTime = deadline.getTime(); - long currentTime = System.currentTimeMillis(); - - long waitTime = waitUntilTime - currentTime; - return waitTime > 0 && await(waitTime, TimeUnit.MILLISECONDS); - } - - @Override - public void signal() { - if (holder.get() != Thread.currentThread()) { - throw new IllegalMonitorStateException(); - } - ch.down(new Event(Event.LOCK_SIGNAL, new AwaitInfo(name, false))); - } - - @Override - public void signalAll() { - if (holder.get() != Thread.currentThread()) { - throw new IllegalMonitorStateException(); - } - ch.down(new Event(Event.LOCK_SIGNAL, new AwaitInfo(name, true))); - } - } -} diff --git a/src/org/jgroups/demos/LockServiceDemo.java b/src/org/jgroups/demos/LockServiceDemo.java deleted file mode 100644 index 072b9b2977b..00000000000 --- a/src/org/jgroups/demos/LockServiceDemo.java +++ /dev/null @@ -1,218 +0,0 @@ -package org.jgroups.demos; - -import org.jgroups.JChannel; -import org.jgroups.blocks.locking.LockNotification; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.jmx.JmxConfigurator; -import org.jgroups.util.Owner; -import org.jgroups.util.Util; - -import java.util.ArrayList; -import java.util.List; -import java.util.StringTokenizer; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -/** - * Demos the LockService - */ -public class LockServiceDemo implements LockNotification { - protected String props; - protected JChannel ch; - protected LockService lock_service; - protected String name; - - public LockServiceDemo(String props, String name) { - this.props=props; - this.name=name; - } - - public void start() throws Exception { - - try { - ch=new JChannel(props); - if(name != null) - ch.setName(name); - lock_service=new LockService(ch); - lock_service.addLockListener(this); - ch.connect("lock-cluster"); - JmxConfigurator.registerChannel(ch, Util.getMBeanServer(), "lock-service", ch.getClusterName(), true); - loop(); - } - catch(Exception e) { - e.printStackTrace(); - } - finally { - Util.close(ch); - } - } - - public void start(JChannel ch) throws Exception { - this.ch=ch; - lock_service=new LockService(ch); - lock_service.addLockListener(this); - ch.connect("lock-cluster"); - JmxConfigurator.registerChannel(ch, Util.getMBeanServer(), "lock-service", ch.getClusterName(), true); - - try { - loop(); - } - catch(Exception e) { - e.printStackTrace(); - } - finally { - Util.close(ch); - } - } - - public void lockCreated(String lock_name) { - System.out.println("server lock \"" + lock_name + "\" created"); - } - - public void lockDeleted(String name) { - } - - public void lockRevoked(String lock_name, Owner current_owner) { - System.out.printf("\"%s\" has been revoked (existing owner is %s); releasing lock\n", lock_name, current_owner); - Lock lock=lock_service.getLock(lock_name); - lock.unlock(); - } - - public void locked(String lock_name, Owner owner) { - System.out.println("\"" + lock_name + "\" locked by " + owner); - } - - public void unlocked(String lock_name, Owner owner) { - System.out.println("\"" + lock_name + "\" unlocked by " + owner); - } - - public void awaiting(String lock_name, Owner owner) { - System.out.println("awaiting \"" + lock_name + "\" by " + owner); - } - - public void awaited(String lock_name, Owner owner) { - System.out.println("awaited \"" + lock_name + "\" by " + owner); - } - - protected void loop() throws Exception { - List lock_names; - while(ch.isConnected()) { - String line=Util.readStringFromStdin(": "); - if(line == null || line.startsWith("quit") || line.startsWith("exit")) - break; - - if(line.startsWith("lock")) { - lock_names=parseLockNames(line.substring("lock".length()).trim()); - for(String lock_name: lock_names) { - Lock lock=lock_service.getLock(lock_name); - lock.lock(); - } - } - else if(line.startsWith("trylock")) { - lock_names=parseLockNames(line.substring("trylock".length()).trim()); - - String tmp=lock_names.get(lock_names.size() -1); - long timeout=-1; - try { - timeout=Long.parseLong(tmp); - lock_names.remove(lock_names.size() -1); - } - catch(NumberFormatException e) { - } - - for(String lock_name: lock_names) { - Lock lock=lock_service.getLock(lock_name); - boolean rc; - if(timeout < 0) - rc=lock.tryLock(); - else - rc=lock.tryLock(timeout, TimeUnit.MILLISECONDS); - if(!rc) - System.err.println("Failed locking \"" + lock_name + "\""); - } - } - else if(line.startsWith("multilock")) { // multilock X 10 - List tokens=parseLockNames(line.substring("multilock".length()).trim()); - if(tokens == null || tokens.size() != 2) { - help(); - break; - } - String lock_name=tokens.get(0); - int num_iterations=Integer.valueOf(tokens.get(1)); - Lock lock=lock_service.getLock(lock_name); - for(int i=0; i < num_iterations; i++) { - lock.lock(); - lock.unlock(); - } - } - else if(line.startsWith("unlock")) { - lock_names=parseLockNames(line.substring("unlock".length()).trim()); - for(String lock_name: lock_names) { - if(lock_name.equalsIgnoreCase("all")) { - lock_service.unlockAll(); - break; - } - else { - Lock lock=lock_service.getLock(lock_name); - if(lock != null) - lock.unlock(); - } - } - } - else if(line.startsWith("view")) - System.out.println("View: " + ch.getView()); - else if(line.startsWith("help")) - help(); - printLocks(); - } - } - - protected static List parseLockNames(String line) { - List lock_names=new ArrayList<>(); - if(line == null || line.isEmpty()) - return lock_names; - StringTokenizer tokenizer=new StringTokenizer(line); - while(tokenizer.hasMoreTokens()) - lock_names.add(tokenizer.nextToken()); - return lock_names; - } - - protected void printLocks() { - System.out.println("\n" + lock_service.printLocks()); - } - - - - public static void main(String[] args) throws Exception { - String props=null; - String name=null; - - for(int i=0; i < args.length; i++) { - if(args[i].equals("-props")) { - props=args[++i]; - continue; - } - if(args[i].equals("-name")) { - name=args[++i]; - continue; - } - - help(); - return; - } - - LockServiceDemo demo=new LockServiceDemo(props, name); - demo.start(); - } - - protected static void help() { - System.out.println("\nLockServiceDemo [-props properties] [-name name]\n" + - "Valid commands:\n\n" + - "lock ()+\n" + - "unlock ( | \"ALL\")+\n" + - "trylock ()+ []\n" + - "multilock \n"); - System.out.println("Example:\nlock lock lock2 lock3\nunlock all\ntrylock bela michelle 300"); - } - -} diff --git a/src/org/jgroups/protocols/CENTRAL_LOCK.java b/src/org/jgroups/protocols/CENTRAL_LOCK.java deleted file mode 100644 index e0081efb185..00000000000 --- a/src/org/jgroups/protocols/CENTRAL_LOCK.java +++ /dev/null @@ -1,211 +0,0 @@ -package org.jgroups.protocols; - -import org.jgroups.Address; -import org.jgroups.View; -import org.jgroups.annotations.ManagedAttribute; -import org.jgroups.annotations.Property; -import org.jgroups.blocks.locking.LockNotification; -import org.jgroups.util.Owner; -import org.jgroups.util.Util; - -import java.util.*; - - -/** - * Implementation of a locking protocol which acquires locks by contacting the coordinator.

Because the - * coordinator maintains all locks, no total order configuration is required.

- * CENTRAL_LOCK has all members send lock and unlock requests to a central coordinator. The coordinator has a queue for - * incoming requests, and grants locks based on order of arrival. To prevent all acquired locks from being forgotten - * when the coordinator crashes, setting num_backups lets the coordinator backup lock information to a number of - * backup nodes. Valid values for num_backups are 0 (no backup) to N-1, e.g. in a cluster of 4, we can have only 3 backup - * nodes.

- * Say we have a cluster of {A,B,C,D,E} and num_backups=1. A is the coordinator, and A updates all locks (and released - * locks) in B as well. When A crashes, everybody falls over to B for sending lock and unlock requests. - * B in turn copies all existing locks over to C and - when locks are acquired or released - forwards this - * information to C as well. - * @author Bela Ban - * @since 2.12 - * @see Locking - * @deprecated See http://belaban.blogspot.com/2020/11/i-hate-distributed-locks.html. - */ -@Deprecated -public class CENTRAL_LOCK extends Locking implements LockNotification { - - @Property(description="Number of backups to the coordinator. Server locks get replicated to these nodes as well") - protected int num_backups=1; - - - protected Address coord; - - @ManagedAttribute - protected boolean is_coord; - - protected final List
backups=new ArrayList<>(); - - - public CENTRAL_LOCK() { - super(); - addLockListener(this); - } - - protected Owner getOwner() { - return use_thread_id_for_lock_owner? super.getOwner(): new Owner(local_addr, -1); - } - - public Address getCoord() { - return coord; - } - - public boolean isCoord() { - return is_coord; - } - - @ManagedAttribute - public String getCoordinator() { - return coord != null? coord.toString() : "n/a"; - } - - public int getNumberOfBackups() { - return num_backups; - } - - public CENTRAL_LOCK setNumberOfBackups(int num_backups) { - this.num_backups=num_backups; return this; - } - - @ManagedAttribute - public String getBackups() { - return backups != null? backups.toString() : null; - } - - protected void sendGrantLockRequest(String lock_name, int lock_id, Owner owner, long timeout, boolean is_trylock) { - Address dest=coord; - if(dest == null) - throw new IllegalStateException("No coordinator available, cannot send GRANT-LOCK request"); - sendRequest(dest, Type.GRANT_LOCK, lock_name, lock_id, owner, timeout, is_trylock); - } - - protected void sendReleaseLockRequest(String lock_name, int lock_id, Owner owner) { - Address dest=coord; - if(dest == null) - throw new IllegalStateException("No coordinator available, cannot send RELEASE-LOCK request"); - sendRequest(dest, Type.RELEASE_LOCK, lock_name, lock_id, owner, 0, false); - } - - protected void sendCreateLockRequest(Address dest, String lock_name, Owner owner) { - sendRequest(dest, Type.CREATE_LOCK, lock_name, owner, 0, false); - } - - protected void sendDeleteLockRequest(Address dest, String lock_name) { - sendRequest(dest, Type.DELETE_LOCK, lock_name, null, 0, false); - } - - @Override - protected void sendAwaitConditionRequest(String lock_name, Owner owner) { - sendRequest(coord, Type.LOCK_AWAIT, lock_name, owner, 0, false); - } - - @Override - protected void sendSignalConditionRequest(String lock_name, boolean all) { - sendRequest(coord, all ? Type.COND_SIG_ALL : Type.COND_SIG, lock_name, null, 0, false); - } - - @Override - protected void sendDeleteAwaitConditionRequest(String lock_name, Owner owner) { - sendRequest(coord, Type.DELETE_LOCK_AWAIT, lock_name, owner, 0, false); - } - - public void handleView(View view) { - super.handleView(view); - Address old_coord=coord; - if(view.size() > 0) { - coord=view.getCoord(); - is_coord=coord.equals(local_addr); - log.debug("[%s] coord=%s, is_coord=%b", local_addr, coord, is_coord); - } - - if(is_coord && num_backups > 0) { - List
new_backups=Util.pickNext(view.getMembers(), local_addr, num_backups); - List
copy_locks_list=null; - synchronized(backups) { - if(!backups.equals(new_backups)) { - copy_locks_list=new ArrayList<>(new_backups); - copy_locks_list.removeAll(backups); - backups.clear(); - backups.addAll(new_backups); - } - } - - if(copy_locks_list != null && !copy_locks_list.isEmpty()) - copyLocksTo(copy_locks_list); - } - - // For all non-acquired client locks, send the GRANT_LOCK request to the new coordinator (if changed) - if(old_coord != null && !old_coord.equals(coord)) - client_lock_table.resendPendingLockRequests(); - } - - public void lockCreated(String name) { - } - - public void lockDeleted(String name) { - } - - public void locked(String lock_name, Owner owner) { - if(is_coord) - updateBackups(Type.CREATE_LOCK, lock_name, owner); - } - - public void unlocked(String lock_name, Owner owner) { - if(is_coord) - updateBackups(Type.DELETE_LOCK, lock_name, owner); - } - - public void lockRevoked(String lock_name, Owner current_owner) { - log.warn("%s: lock %s has been revoked; the existing owner is %s", local_addr, lock_name, current_owner); - } - - public void awaiting(String lock_name, Owner owner) { - if(is_coord) - updateBackups(Type.CREATE_AWAITER, lock_name, owner); - } - - public void awaited(String lock_name, Owner owner) { - if(is_coord) - updateBackups(Type.DELETE_AWAITER, lock_name, owner); - } - - protected void updateBackups(Type type, String lock_name, Owner owner) { - synchronized(backups) { - for(Address backup: backups) - sendRequest(backup, type, lock_name, owner, 0, false); - } - } - - - - protected void copyLocksTo(List
new_joiners) { - Map copy; - - synchronized(server_locks) { - copy=new HashMap<>(server_locks); - } - - log.trace("[%s] copying locks to %s", local_addr, new_joiners); - for(Map.Entry entry: copy.entrySet()) { - for(Address joiner: new_joiners) { - ServerLock lock = entry.getValue(); - if (lock.owner != null) { - sendCreateLockRequest(joiner, entry.getKey(), entry.getValue().owner); - } - synchronized (lock.condition) { - Queue queue = lock.condition.queue; - for (Owner owner : queue) { - sendAwaitConditionRequest(lock.lock_name, owner); - } - } - } - } - } -} - diff --git a/src/org/jgroups/protocols/CENTRAL_LOCK2.java b/src/org/jgroups/protocols/CENTRAL_LOCK2.java deleted file mode 100644 index 46895ef92ba..00000000000 --- a/src/org/jgroups/protocols/CENTRAL_LOCK2.java +++ /dev/null @@ -1,300 +0,0 @@ -package org.jgroups.protocols; - -import org.jgroups.*; -import org.jgroups.annotations.ManagedAttribute; -import org.jgroups.annotations.ManagedOperation; -import org.jgroups.annotations.Property; -import org.jgroups.conf.AttributeType; -import org.jgroups.util.*; - -import java.util.Collection; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Stream; - - -/** - * Implementation of a locking protocol which acquires locks by asking the coordinator.
- * Because the coordinator maintains all locks, no total ordering of requests is required.
- * CENTRAL_LOCK2 has all members send lock and unlock requests to the current coordinator. The coordinator has a queue - * for incoming requests, and grants locks based on order of arrival.
- * Contrary to {@link CENTRAL_LOCK}, CENTRAL_LOCK2 has no members who act as backups for lock information. Instead, - * when the coord leaves or on a merge, the new coordinator runs a reconciliation protocol in which it fetches - * information from all members about acquired locks and pending lock and unlock requests, and then creates its lock - * table accordingly. During this phase, all regular request handling is paused.
- * This protocol requires less traffic than {@link CENTRAL_LOCK} (each request also has to be sent to the backup(s)), - * but introduces communication between the new coord and all members (and thus a small pause) on coord change. - *
- * The JIRA issue is https://issues.redhat.com/browse/JGRP-2249. - * @author Bela Ban - * @since 4.0.13 - * @see Locking - * @see CENTRAL_LOCK - * @deprecated See http://belaban.blogspot.com/2020/11/i-hate-distributed-locks.html. - */ -@Deprecated -public class CENTRAL_LOCK2 extends Locking { - @Property(description="Max time (im ms) to wait for lock info responses from members in a lock reconciliation phase", - type=AttributeType.TIME) - protected long lock_reconciliation_timeout=10_000; - - protected Address coord; - - // collect information about held locks and pending lock requests from all members during a reconciliation round - protected final ResponseCollector lock_info_responses=new ResponseCollector<>(); - - // Queue to hold requests, typically only at the coordinator. Processed by RequestHandler - protected final BlockingQueue req_queue=new LinkedBlockingQueue<>(); - - // Thread which processes requests in req-queue (running only on coord) - protected final Runner req_handler; - - - - public CENTRAL_LOCK2() { - req_handler=new Runner(new DefaultThreadFactory("lock-handler", true, true), - "lock-handler", this::processQueue, req_queue::clear); - } - - @ManagedAttribute public boolean isCoord() {return Objects.equals(local_addr, coord);} - @ManagedAttribute public String getCoordinator() {return coord != null? coord.toString() : "n/a";} - @ManagedAttribute public boolean isRequestHandlerRunning() {return req_handler.isRunning();} - @ManagedAttribute public int requestQueueSize() {return req_queue.size();} - - public void stop() { - super.stop(); - req_handler.stop(); - } - - @Override - public void handleView(View v) { - Address old_coord=this.view != null? this.view.getCoord() : null; - super.handleView(v); - if(v.size() > 0) { - coord=v.getCoord(); - log.debug("%s: coord=%s, is_coord=%b", local_addr, coord, isCoord()); - } - - if(Objects.equals(local_addr, coord)) { - if(v instanceof MergeView || !Objects.equals(local_addr, old_coord)) { - // I'm the new coord: run reconciliation to find all existing locks (and pending lock/unlock requests) - runReconciliation(); - req_handler.start(); - } - } - else { - if(Objects.equals(local_addr, old_coord)) { - log.debug("%s: not coordinator anymore; stopping the request handler", local_addr); - req_handler.stop(); // clears the req-queue - server_locks.clear(); - } - } - } - - @Override - protected void requestReceived(Request req) { - if(req == null) return; - switch(req.type) { - - // requests to be handled by the coord: - case GRANT_LOCK: - case RELEASE_LOCK: - case CREATE_LOCK: - case DELETE_LOCK: - case COND_SIG: - case COND_SIG_ALL: - case LOCK_AWAIT: - case DELETE_LOCK_AWAIT: - case CREATE_AWAITER: - case DELETE_AWAITER: - req_queue.add(req); - break; - - // requests/responses to be handled by clients - case LOCK_GRANTED: - case RELEASE_LOCK_OK: - case LOCK_DENIED: - case SIG_RET: - case LOCK_INFO_REQ: - case LOCK_INFO_RSP: - case LOCK_REVOKED: - if(log.isTraceEnabled()) - log.trace("%s <-- %s: %s", local_addr, req.sender, req); - handleRequest(req); - break; - - default: - log.error("%s: request of type %s not known", local_addr, req.type); - break; - } - } - - protected void processQueue() { - Request req=null; - try { - req=req_queue.take(); - } - catch(InterruptedException ignore) { - } - try { - if(req != null && log.isTraceEnabled()) - log.trace("%s <-- %s: %s", local_addr, req.sender, req); - handleRequest(req); - } - catch(Throwable t) { - log.error("%s: failed handling request %s: %s", local_addr, req, t); - } - } - - protected void handleLockInfoRequest(Address requester) { - if(requester != null && !Objects.equals(coord, requester)) { - log.trace("%s: changed coord from %s to %s as a result of getting a LOCK_INFO_REQ", - local_addr, coord, requester); - coord=requester; - } - LockInfoResponse response=createLockInfoResponse(); - if(log.isTraceEnabled()) - log.trace("%s --> %s LOCK-INFO-RSP:\n%s", local_addr, requester, response.printDetails()); - send(requester, new Request(Type.LOCK_INFO_RSP).infoRsp(response)); - } - - @Override - protected void handleLockInfoResponse(Address sender, Request rsp) { - lock_info_responses.add(sender, rsp.info_rsp); - } - - @Override - protected void handleLockRevoked(Request rsp) { - notifyLockRevoked(rsp.lock_name, rsp.owner); - } - - /** Grabs information about locks held and pending lock/unlock requests from all members */ - @ManagedOperation(description="Runs the reconciliation protocol to fetch information about owned locks and pending " + - "lock/unlock requests from each member to establish the server lock table. Only run by a coordinator.") - public void runReconciliation() { - if(!isCoord()) { - log.warn("%s: reconciliation protocol is not run as I'm not the coordinator (%s is)", - local_addr, getCoordinator()); - return; - } - Request lock_info_req=new Request(Type.LOCK_INFO_REQ); - Address[] mbrs=view.getMembersRaw(); - log.debug("%s: running reconciliation protocol on %d members", local_addr, mbrs != null? mbrs.length : 0); - lock_info_responses.reset(mbrs); - lock_info_responses.add(local_addr, createLockInfoResponse()); - log.trace("%s --> ALL: %s", local_addr, lock_info_req); - - // we cannot use a multicast as this may happen as a result of a MergeView and not everybody may have the view yet - sendLockInfoRequestTo(lock_info_req, mbrs, local_addr); - if(!lock_info_responses.waitForAllResponses(lock_reconciliation_timeout)) { - List
missing=lock_info_responses.getMissing(); - log.warn("%s: failed getting lock information from all members, missing responses: %d (from %s)", - local_addr, missing.size(), missing); - } - - // 1. Add all existing locks to the server lock table - Collection responses=lock_info_responses.getResults().values(); - responses.stream().filter(rsp -> rsp != null && rsp.existing_locks != null) - .map(rsp -> rsp.existing_locks).flatMap(Collection::stream) - .forEach(t -> { - String lock_name=t.getVal1(); - Owner owner=t.getVal2(); - ServerLock srv_lock=new ServerLock(lock_name, owner); - ServerLock ret=server_locks.putIfAbsent(lock_name, srv_lock); - if(ret != null) { - if(!Objects.equals(owner, ret.owner)) { - log.warn("%s: lock %s requested by %s is already present: %s", local_addr, lock_name, owner, ret); - send(owner.getAddress(), new Request(Type.LOCK_REVOKED, lock_name, ret.owner, 0)); - } - } - else { - notifyLockCreated(lock_name); - log.trace("%s: added lock %s", local_addr, lock_name); - } - }); - - // 2. Process all pending requests - responses.stream().filter(rsp -> rsp != null && rsp.pending_requests != null && !rsp.pending_requests.isEmpty()) - .map(rsp -> rsp.pending_requests).flatMap(Collection::stream) - .forEach(req -> { - try { - if(log.isTraceEnabled()) - log.trace("%s: processing request %s", local_addr, req); - handleRequest(req); - } - catch(Throwable t) { - log.error("%s: failed handling request %s: %s", local_addr, req, t); - } - }); - } - - - protected void sendLockInfoRequestTo(Request req, Address[] mbrs, Address exclude) { - Stream.of(mbrs).filter(m -> m != null && !Objects.equals(m, exclude)).forEach(dest -> { - try { - Message msg=new BytesMessage(dest, Util.streamableToBuffer(req)).putHeader(id, new LockingHeader()); - if(bypass_bundling) - msg.setFlag(Message.Flag.DONT_BUNDLE); - down_prot.down(msg); - } - catch(Throwable t) { - log.error("%s: failed sending LOCK_INFO_REQ to %s: %s", local_addr, dest, t); - } - }); - } - - - protected Owner getOwner() { - return use_thread_id_for_lock_owner? super.getOwner(): new Owner(local_addr, -1); - } - - protected void sendGrantLockRequest(String lock_name, int lock_id, Owner owner, long timeout, boolean is_trylock) { - Address dest=coord; - if(dest == null) - throw new IllegalStateException("No coordinator available, cannot send GRANT-LOCK request"); - sendRequest(dest, Type.GRANT_LOCK, lock_name, lock_id, owner, timeout, is_trylock); - } - - protected void sendReleaseLockRequest(String lock_name, int lock_id, Owner owner) { - Address dest=coord; - if(dest == null) - throw new IllegalStateException("No coordinator available, cannot send RELEASE-LOCK request"); - sendRequest(dest, Type.RELEASE_LOCK, lock_name, lock_id, owner, 0, false); - } - - @Override - protected void sendAwaitConditionRequest(String lock_name, Owner owner) { - sendRequest(coord, Type.LOCK_AWAIT, lock_name, owner, 0, false); - } - - @Override - protected void sendSignalConditionRequest(String lock_name, boolean all) { - sendRequest(coord, all ? Type.COND_SIG_ALL : Type.COND_SIG, lock_name, null, 0, false); - } - - @Override - protected void sendDeleteAwaitConditionRequest(String lock_name, Owner owner) { - sendRequest(coord, Type.DELETE_LOCK_AWAIT, lock_name, owner, 0, false); - } - - - protected LockInfoResponse createLockInfoResponse() { - LockInfoResponse rsp=new LockInfoResponse(); - List> locks=client_lock_table.getLockInfo(); // successfully acquired locks - for(Tuple t: locks) - rsp.add(t); - - List pending_reqs=client_lock_table.getPendingRequests(local_addr); // pending lock/unlock requests - if(pending_reqs != null && !pending_reqs.isEmpty()) - rsp.pending_requests=pending_reqs; - return rsp; - } - - - - - -} - diff --git a/src/org/jgroups/protocols/Locking.java b/src/org/jgroups/protocols/Locking.java deleted file mode 100644 index d7643fdfd1f..00000000000 --- a/src/org/jgroups/protocols/Locking.java +++ /dev/null @@ -1,1742 +0,0 @@ -package org.jgroups.protocols; - -import org.jgroups.*; -import org.jgroups.annotations.MBean; -import org.jgroups.annotations.ManagedAttribute; -import org.jgroups.annotations.ManagedOperation; -import org.jgroups.annotations.Property; -import org.jgroups.blocks.locking.AwaitInfo; -import org.jgroups.blocks.locking.LockInfo; -import org.jgroups.blocks.locking.LockNotification; -import org.jgroups.stack.Protocol; -import org.jgroups.util.*; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; -import java.util.stream.Collectors; - - -/** - * Base locking protocol, handling most of the protocol communication with other instances. To use distributed locking, - * {@link org.jgroups.blocks.locking.LockService} is placed on a channel. LockService talks to a subclass of Locking - * via events. - * @author Bela Ban - * @since 2.12 - * @see CENTRAL_LOCK - * @see CENTRAL_LOCK2 - * @deprecated See http://belaban.blogspot.com/2020/11/i-hate-distributed-locks.html. - */ -@Deprecated -@MBean(description="Based class for locking functionality") -abstract public class Locking extends Protocol { - - @Property(description="bypasses message bundling if set") - protected boolean bypass_bundling=true; - - @Property(description="Number of locks to be used for lock striping (for synchronized access to the server_lock entries)") - protected int lock_striping_size=10; - - @Property(description="By default, a lock owner is address:thread-id. If false, we only use the node's address. " + - "See https://issues.redhat.com/browse/JGRP-1886 for details") - protected boolean use_thread_id_for_lock_owner=true; - - protected View view; - - // server side locks - protected final ConcurrentMap server_locks=Util.createConcurrentMap(20); - - // protected access to the same locks in server_locks - protected Lock[] lock_stripes; - - // client side locks - protected final ClientLockTable client_lock_table=new ClientLockTable(); - - protected final Set lock_listeners=new CopyOnWriteArraySet<>(); - - protected final static AtomicInteger current_lock_id=new AtomicInteger(1); - - - - public enum Type { - GRANT_LOCK, // request to acquire a lock - LOCK_GRANTED, // response to sender of GRANT_LOCK on succcessful lock acquisition - LOCK_DENIED, // response to sender of GRANT_LOCK on unsuccessful lock acquisition (e.g. on tryLock()) - RELEASE_LOCK, // request to release a lock - RELEASE_LOCK_OK, // response to RELEASE_LOCK request - CREATE_LOCK, // request to create a server lock (sent by coordinator to backups). Used by LockService - DELETE_LOCK, // request to delete a server lock (sent by coordinator to backups). Used by LockService - - LOCK_AWAIT, // request to await until condition is signaled - COND_SIG, // request to signal awaiting thread - COND_SIG_ALL, // request to signal all awaiting threads - SIG_RET, // response to alert of signal - DELETE_LOCK_AWAIT, // request to delete a waiter - CREATE_AWAITER, // request to create a server lock await (sent by coordinator to backups). Used by LockService - DELETE_AWAITER, // request to delete a server lock await (sent by coordinator to backups). Used by LockService - - LOCK_INFO_REQ, // request to get information about all acquired locks and all pending lock/unlock requests - LOCK_INFO_RSP, // response to LOCK_INFO_REQ - LOCK_REVOKED // sent on reconciliation when a lock is already present (possible on a merge when both sides hold the same lock) - } - - - - public Locking() { - } - - - public boolean bypassBundling() {return bypass_bundling;} - public Locking bypassBundling(boolean b) {this.bypass_bundling=b; return this;} - public int getLockStripingSize() {return lock_striping_size;} - public Locking setLockStripingSize(int l) {this.lock_striping_size=l; return this;} - public boolean useThreadIdForLockOwner() {return use_thread_id_for_lock_owner;} - public Locking useThreadIdForLockOwner(boolean u) {this.use_thread_id_for_lock_owner=u; return this;} - - - public void addLockListener(LockNotification listener) { - if(listener != null) - lock_listeners.add(listener); - } - - public void removeLockListener(LockNotification listener) { - if(listener != null) - lock_listeners.remove(listener); - } - - @ManagedAttribute - public String getView() { - return view != null? view.toString() : null; - } - - @ManagedAttribute(description="Number of server locks (only on coord)") - public int getNumServerLocks() {return server_locks.size();} - - @ManagedAttribute(description="Number of client locks") - public int getNumClientLocks() {return client_lock_table.numLocks();} - - public void init() throws Exception { - super.init(); - lock_stripes=new Lock[lock_striping_size]; - for(int i=0; i < lock_stripes.length; i++) - lock_stripes[i]=new ReentrantLock(); - } - - public Object down(Event evt) { - switch(evt.getType()) { - case Event.LOCK: - LockInfo info=evt.getArg(); - ClientLock lock=getLock(info.getName()); - if(!info.isTrylock()) { - if(info.isLockInterruptibly()) { - try { - lock.lockInterruptibly(); - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // has to be checked by caller who has to rethrow ... - } - } - else - lock.lock(); - } - else { - if(info.isUseTimeout()) { - try { - return lock.tryLock(info.getTimeout(), info.getTimeUnit()); - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - else - return lock.tryLock(); - } - return null; - - - case Event.UNLOCK: - info=evt.getArg(); - lock=getLock(info.getName(), false); - if(lock != null) - lock.unlock(); - return null; - - case Event.UNLOCK_ALL: - unlockAll(); - return null; - - case Event.UNLOCK_FORCE: - unlockForce(evt.arg()); - break; - - - case Event.LOCK_AWAIT: - info=evt.getArg(); - lock=getLock(info.getName(), false); - if (lock == null || !lock.acquired) { - throw new IllegalMonitorStateException(); - } - Condition condition = lock.newCondition(); - if (info.isUseTimeout()) { - try { - return condition.awaitNanos(info.getTimeUnit().toNanos(info.getTimeout())); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - else if (info.isLockInterruptibly()) { - try { - condition.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - else { - condition.awaitUninterruptibly(); - } - return null; - case Event.LOCK_SIGNAL: - AwaitInfo awaitInfo =evt.getArg(); - lock=getLock(awaitInfo.getName(), false); - if (lock == null || !lock.acquired) { - throw new IllegalMonitorStateException(); - } - sendSignalConditionRequest(awaitInfo.getName(), awaitInfo.isAll()); - return null; - - case Event.VIEW_CHANGE: - handleView(evt.getArg()); - break; - } - return down_prot.down(evt); - } - - public Object up(Event evt) { - switch(evt.getType()) { - case Event.VIEW_CHANGE: - handleView(evt.getArg()); - break; - } - return up_prot.up(evt); - } - - public Object up(Message msg) { - LockingHeader hdr=msg.getHeader(id); - if(hdr == null) - return up_prot.up(msg); - - Request req=null; - try { - req=Util.streamableFromBuffer(Request::new, msg.getArray(), msg.getOffset(), msg.getLength()) - .sender(msg.getSrc()); - } - catch(Exception ex) { - log.error("%s: failed deserializing request", local_addr, ex); - return null; - } - - if(req.type != Type.LOCK_INFO_REQ && req.type != Type.LOCK_INFO_RSP && req.type != Type.LOCK_REVOKED - && null != view && !view.containsMember(msg.getSrc())) { - log.error("%s: received request from '%s' but member is not present in the current view - ignoring request", - local_addr, msg.getSrc()); - return null; - } - requestReceived(req); - return null; - } - - protected void requestReceived(Request req) { - if(log.isTraceEnabled()) - log.trace("%s <-- %s: %s", local_addr, req.sender, req); - handleRequest(req); - } - - protected void handleRequest(Request req) { - if(req == null) return; - switch(req.type) { - case GRANT_LOCK: - case RELEASE_LOCK: - handleLockRequest(req); - break; - case LOCK_GRANTED: - handleLockGrantedResponse(req.lock_name, req.lock_id, req.owner); - break; - case RELEASE_LOCK_OK: - handleLockReleasedResponse(req.lock_name, req.lock_id, req.owner); - break; - case LOCK_DENIED: - handleLockDeniedResponse(req.lock_name, req.lock_id, req.owner); - break; - case CREATE_LOCK: - handleCreateLockRequest(req.lock_name, req.owner); - break; - case DELETE_LOCK: - handleDeleteLockRequest(req.lock_name); - break; - case COND_SIG: - case COND_SIG_ALL: - handleSignalRequest(req); - break; - case LOCK_AWAIT: - handleAwaitRequest(req.lock_name, req.owner); - handleLockRequest(req); - break; - case DELETE_LOCK_AWAIT: - handleDeleteAwaitRequest(req.lock_name, req.owner); - break; - case SIG_RET: - handleSignalResponse(req.lock_name, req.owner); - break; - case CREATE_AWAITER: - handleCreateAwaitingRequest(req.lock_name, req.owner); - break; - case DELETE_AWAITER: - handleDeleteAwaitingRequest(req.lock_name, req.owner); - break; - case LOCK_INFO_REQ: - handleLockInfoRequest(req.sender); - break; - case LOCK_INFO_RSP: - handleLockInfoResponse(req.sender, req); - break; - case LOCK_REVOKED: - handleLockRevoked(req); - break; - default: - log.error("%s: request of type %s not known", local_addr, req.type); - break; - } - } - - protected ClientLock getLock(String name) { - return client_lock_table.getLock(name,getOwner(),true); - } - - protected ClientLock getLock(String name, boolean create_if_absent) { - return client_lock_table.getLock(name,getOwner(),create_if_absent); - } - - @ManagedOperation(description="Unlocks all currently held locks") - public void unlockAll() { - client_lock_table.unlockAll(); - } - - @ManagedOperation(description="Forcefully removes the client lock") - public void unlockForce(String lock_name) { - client_lock_table.unlockForce(lock_name); - } - - - @ManagedOperation(description="Dumps all locks") - public String printLocks() { - StringBuilder sb=new StringBuilder(); - Collection values=server_locks.values(); - if(values != null && !values.isEmpty()) { - sb.append("server locks: "); - for(ServerLock sl : server_locks.values()) - sb.append(sl).append("\n"); - } - - String client_locks=client_lock_table.printLocks(); - if(client_locks != null && !client_locks.isEmpty()) - sb.append("my locks: ").append(client_lock_table.printLocks()); - return sb.toString(); - } - - - @ManagedOperation(description="Dumps all server locks") - public Object printServerLocks() { - return server_locks.values().stream().map(ServerLock::toString).collect(Collectors.joining(", ")); - } - - protected void handleView(View view) { - this.view=view; - log.debug("%s: view=%s", local_addr, view); - List
members=view.getMembers(); - List responses=new ArrayList<>(); - for(Map.Entry entry: server_locks.entrySet()) { - String lock_name=entry.getKey(); - ServerLock server_lock=entry.getValue(); - Lock lock=_getLock(lock_name); - lock.lock(); - try { - Response rsp=server_lock.handleView(members); - if(rsp != null) - responses.add(rsp); - if(server_lock.isEmpty() && server_lock.owner == null && server_lock.condition.queue.isEmpty()) - server_locks.remove(lock_name); - } - finally { - lock.unlock(); - } - } - - // do the sending outside the lock scope (might block on credits or TCP send) - for(Response rsp: responses) - sendLockResponse(rsp.type, rsp.owner, rsp.lock_name, rsp.lock_id); - } - - - - protected ClientLock createLock(String lock_name, Owner owner) { - return new ClientLock(lock_name, owner); - } - - /** Gets a lock from locks based on the hash of the lock name */ - protected Lock _getLock(String lock_name) { - int index=lock_name != null? Math.abs(lock_name.hashCode() % lock_stripes.length) : 0; - return lock_stripes[index]; - } - - protected Owner getOwner() { - return new Owner(local_addr, Thread.currentThread().getId()); - } - - abstract protected void sendGrantLockRequest(String lock_name, int lock_id, Owner owner, long timeout, boolean is_trylock); - abstract protected void sendReleaseLockRequest(String lock_name, int lock_id, Owner owner); - abstract protected void sendAwaitConditionRequest(String lock_name, Owner owner); - abstract protected void sendSignalConditionRequest(String lock_name, boolean all); - abstract protected void sendDeleteAwaitConditionRequest(String lock_name, Owner owner); - - - protected void sendRequest(Address dest, Type type, String lock_name, Owner owner, long timeout, boolean is_trylock) { - send(dest, new Request(type, lock_name, owner, timeout, is_trylock)); - } - - protected void sendRequest(Address dest, Type type, String lock_name, int lock_id, Owner owner, long timeout, boolean is_trylock) { - send(dest, new Request(type, lock_name, owner, timeout, is_trylock).lockId(lock_id)); - } - - protected void sendLockResponse(Type type, Owner dest, String lock_name, int lock_id) { - send(dest.getAddress(), new Request(type, lock_name, dest, 0).lockId(lock_id)); - } - - protected void sendSignalResponse(Owner dest, String lock_name) { - send(dest.getAddress(), new Request(Type.SIG_RET, lock_name, dest, 0)); - } - - protected void send(Address dest, Request req) { - ByteArray array=null; - try { - array=Util.streamableToBuffer(req); - } - catch(Exception e) { - log.warn("%s: failed serializing request: %s", local_addr, e); - } - Message msg=new BytesMessage(dest, array).putHeader(id, new LockingHeader()); - if(bypass_bundling) - msg.setFlag(Message.Flag.DONT_BUNDLE); - log.trace("%s --> %s: %s", local_addr, dest == null? "ALL" : dest, req); - try { - down_prot.down(msg); - } - catch(Exception ex) { - log.error("%s: failed sending %s request: %s", local_addr, req.type, ex); - } - } - - - protected void handleLockRequest(Request req) { - Response rsp=null; - Lock lock=_getLock(req.lock_name); - lock.lock(); - try { - ServerLock server_lock=server_locks.get(req.lock_name); - if(server_lock == null) { - server_lock=new ServerLock(req.lock_name); - ServerLock tmp=server_locks.putIfAbsent(req.lock_name, server_lock); - if(tmp != null) - server_lock=tmp; - else - notifyLockCreated(req.lock_name); - } - rsp=server_lock.handleRequest(req); - if(server_lock.isEmpty() && server_lock.owner == null && server_lock.condition.queue.isEmpty()) - server_locks.remove(req.lock_name); - } - finally { - lock.unlock(); - } - - // moved outside the lock scope - if(rsp != null) - sendLockResponse(rsp.type, rsp.owner, rsp.lock_name, rsp.lock_id); - } - - - protected void handleLockGrantedResponse(String lock_name, int lock_id, Owner owner) { - ClientLock lock=client_lock_table.getLock(lock_name,owner,false); - if(lock != null) - lock.handleLockGrantedResponse(lock_id); - } - - protected void handleLockReleasedResponse(String lock_name, int lock_id, Owner owner) { - ClientLock lock=client_lock_table.getLock(lock_name,owner,false); - if(lock != null) - lock.handleLockReleasedResponse(lock_id); - } - - protected void handleLockDeniedResponse(String lock_name, int lock_id, Owner owner) { - ClientLock lock=client_lock_table.getLock(lock_name,owner,false); - if(lock != null) - lock.lockDenied(lock_id); - } - - protected void handleLockInfoRequest(Address requester) { - - } - - protected void handleLockInfoResponse(Address sender, Request rsp) { - - } - - protected void handleLockRevoked(Request rsp) { - - } - - protected void handleAwaitRequest(String lock_name, Owner owner) { - Lock lock=_getLock(lock_name); - lock.lock(); - try { - ServerLock server_lock=server_locks.get(lock_name); - if (server_lock != null) - server_lock.condition.addWaiter(owner); - else - log.error(Util.getMessage("ConditionAwaitWasReceivedButLockWasNotCreatedWaiterMayBlockForever")); - } - finally { - lock.unlock(); - } - } - - protected void handleDeleteAwaitRequest(String lock_name, Owner owner) { - Lock lock=_getLock(lock_name); - lock.lock(); - try { - ServerLock server_lock=server_locks.get(lock_name); - if (server_lock != null) - server_lock.condition.removeWaiter(owner); - else - log.error(Util.getMessage("ConditionAwaitDeleteWasReceivedButLockWasGone")); - } - finally { - lock.unlock(); - } - } - - protected void handleSignalResponse(String lock_name, Owner owner) { - ClientLock lock=client_lock_table.getLock(lock_name,owner,false); - if(lock != null) { - lock.condition.signaled(); - } - else { - log.error(Util.getMessage("ConditionResponseWasClientLockWasNotPresentIgnoredSignal")); - } - } - - protected void handleSignalRequest(Request req) { - Response rsp=null; - Lock lock=_getLock(req.lock_name); - lock.lock(); - try { - ServerLock server_lock=server_locks.get(req.lock_name); - if (server_lock != null) - rsp=server_lock.handleRequest(req); - else - log.error(Util.getMessage("ConditionSignalWasReceivedButLockWasNotCreatedCouldnTNotifyAnyone")); - } - finally { - lock.unlock(); - } - - // moved outside the lock scope - if(rsp != null) - sendLockResponse(rsp.type, rsp.owner, rsp.lock_name, rsp.lock_id); - } - - protected void handleCreateLockRequest(String lock_name, Owner owner) { - Lock lock=_getLock(lock_name); - lock.lock(); - try { - server_locks.put(lock_name, new ServerLock(lock_name, owner)); - } - finally { - lock.unlock(); - } - } - - - protected void handleDeleteLockRequest(String lock_name) { - Lock lock=_getLock(lock_name); - lock.lock(); - try { - ServerLock server_lock = server_locks.get(lock_name); - if(server_lock == null) - return; - if (server_lock.condition.queue.isEmpty()) - server_locks.remove(lock_name); - else - server_lock.owner= null; - } - finally { - lock.unlock(); - } - } - - - protected void handleCreateAwaitingRequest(String lock_name, Owner owner) { - Lock lock=_getLock(lock_name); - lock.lock(); - try { - ServerLock server_lock = server_locks.get(lock_name); - if (server_lock == null) { - server_lock = new ServerLock(lock_name); - ServerLock tmp=server_locks.putIfAbsent(lock_name,server_lock); - if(tmp != null) - server_lock=tmp; - } - server_lock.condition.queue.add(owner); - } - finally { - lock.unlock(); - } - } - - - protected void handleDeleteAwaitingRequest(String lock_name, Owner owner) { - Lock lock=_getLock(lock_name); - lock.lock(); - try { - ServerLock server_lock = server_locks.get(lock_name); - if (server_lock != null) { - server_lock.condition.queue.remove(owner); - if (server_lock.condition.queue.isEmpty() && server_lock.owner == null) { - server_locks.remove(lock_name); - } - } - } - finally { - lock.unlock(); - } - } - - - - protected void notifyLockCreated(String lock_name) { - for(LockNotification listener: lock_listeners) { - try { - listener.lockCreated(lock_name); - } - catch(Throwable t) { - log.error("%s: failed notifying %s: %s", local_addr, listener, t.toString()); - } - } - } - - protected void notifyLockDeleted(String lock_name) { - for(LockNotification listener: lock_listeners) { - try { - listener.lockDeleted(lock_name); - } - catch(Throwable t) { - log.error("%s: failed notifying %s: %s", local_addr, listener, t.toString()); - } - } - } - - protected void notifyLockRevoked(String lock_name, Owner current_owner) { - for(LockNotification listener: lock_listeners) { - try { - listener.lockRevoked(lock_name, current_owner); - } - catch(Throwable t) { - log.error("%s: failed notifying %s: %s", local_addr, listener, t.toString()); - } - } - } - - protected void notifyLocked(String lock_name, Owner owner) { - for(LockNotification listener: lock_listeners) { - try { - listener.locked(lock_name,owner); - } - catch(Throwable t) { - log.error("%s: failed notifying %s: %s", local_addr, listener, t.toString()); - } - } - } - - protected void notifyUnlocked(String lock_name, Owner owner) { - for(LockNotification listener: lock_listeners) { - try { - listener.unlocked(lock_name,owner); - } - catch(Throwable t) { - log.error("%s: failed notifying %s: %s", local_addr, listener, t.toString()); - } - } - } - - protected void notifyAwaiting(String lock_name, Owner owner) { - for(LockNotification listener: lock_listeners) { - try { - listener.awaiting(lock_name,owner); - } - catch(Throwable t) { - log.error("%s: failed notifying %s: %s", local_addr, listener, t.toString()); - } - } - } - - protected void notifyAwaited(String lock_name, Owner owner) { - for(LockNotification listener: lock_listeners) { - try { - listener.awaited(lock_name,owner); - } - catch(Throwable t) { - log.error("%s: failed notifying %s: %s", local_addr, listener, t.toString()); - } - } - } - - - - /** - * Server side queue for handling of lock requests (lock, release). - * @author Bela Ban - */ - protected class ServerLock { - protected final String lock_name; - protected Owner owner; - protected final List queue=new ArrayList<>(); - protected final ServerCondition condition; - - public ServerLock(String lock_name) { - this.lock_name=lock_name; - this.condition=new ServerCondition(this); - } - - protected ServerLock(String lock_name, Owner owner) { - this.lock_name=lock_name; - this.owner=owner; - this.condition=new ServerCondition(this); - } - - protected Response handleRequest(Request req) { - switch(req.type) { - case GRANT_LOCK: - if(owner == null) { - setOwner(req.owner); - return new Response(Type.LOCK_GRANTED, req.owner, req.lock_name, req.lock_id); - } - if(owner.equals(req.owner)) - return new Response(Type.LOCK_GRANTED, req.owner, req.lock_name, req.lock_id); - - if(req.is_trylock && req.timeout <= 0) - return new Response(Type.LOCK_DENIED, req.owner, req.lock_name, req.lock_id); - addToQueue(req); - break; - case RELEASE_LOCK: - case LOCK_AWAIT: - if(Objects.equals(owner, req.owner)) { - setOwner(null); - if(req.type == Type.RELEASE_LOCK) - sendLockResponse(Type.RELEASE_LOCK_OK, req.owner, req.lock_name, req.lock_id); - } - else - addToQueue(req); - break; - case COND_SIG: - condition.signal(false); - break; - case COND_SIG_ALL: - condition.signal(true); - break; - default: - throw new IllegalArgumentException("type " + req.type + " is invalid here"); - } - - return processQueue(); - } - - protected Response handleView(List
members) { - if(owner != null && !members.contains(owner.getAddress())) { - Owner tmp=owner; - setOwner(null); - log.debug("%s: unlocked \"%s\" because owner %s left", local_addr, lock_name, tmp); - } - - synchronized(queue) { - queue.removeIf(req -> !members.contains(req.owner.getAddress())); - } - - condition.queue.removeIf(own -> !members.contains(own.getAddress())); - return processQueue(); - } - - - protected void addToQueue(Request req) { - synchronized(queue) { - if(queue.isEmpty()) { - if(req.type == Type.GRANT_LOCK) - queue.add(req); - return; // RELEASE_LOCK is discarded on an empty queue - } - } - - // at this point the queue is not empty - switch(req.type) { - - // If there is already a lock request from the same owner, discard the new lock request - case GRANT_LOCK: - synchronized(queue) { - if(!isRequestPresent(Type.GRANT_LOCK, req.owner)) - queue.add(req); - } - break; - - case RELEASE_LOCK: - // Release the lock request from the same owner already in the queue - // If there is no lock request, discard the unlock request - removeRequest(Type.GRANT_LOCK, req.owner); - break; - } - } - - /** Checks if a certain request from a given owner is already in the queue */ - protected boolean isRequestPresent(Type type, Owner owner) { // holds lock on queue - for(Request req: queue) - if(req.type == type && req.owner.equals(owner)) - return true; - return false; - } - - protected void removeRequest(Type type, Owner owner) { - synchronized(queue) { - queue.removeIf(req -> req.type == type && req.owner.equals(owner)); - } - } - - protected Request getNextRequest() { - synchronized(queue) { - return !queue.isEmpty()? queue.remove(0) : null; - } - } - - protected Response processQueue() { - if(owner != null) - return null; - Request req; - while((req=getNextRequest()) != null) { - switch(req.type) { - case GRANT_LOCK: - setOwner(req.owner); - return new Response(Type.LOCK_GRANTED, req.owner, req.lock_name, req.lock_id); - case RELEASE_LOCK: - if(owner == null) - break; - if(owner.equals(req.owner)) - setOwner(null); - return new Response(Type.RELEASE_LOCK_OK, req.owner, req.lock_name, req.lock_id); - } - } - return null; - } - - protected void setOwner(Owner owner) { - if(owner == null) { - if(this.owner != null) { - Owner tmp=this.owner; - this.owner=null; - notifyUnlocked(lock_name, tmp); - } - } - else { - this.owner=owner; - notifyLocked(lock_name, owner); - } - } - - public boolean isEmpty() { - synchronized(queue) { - return queue.isEmpty(); - } - } - - public String toString() { - StringBuilder sb=new StringBuilder(lock_name + ": ").append(owner); - synchronized(queue) { - if(!queue.isEmpty()) { - sb.append(", queue: "); - for(Request req : queue) { - sb.append(req.toStringShort()).append(" "); - } - } - } - return sb.toString(); - } - } - - protected class ServerCondition { - protected final ServerLock lock; - protected final Queue queue=new ArrayDeque<>(); - - public ServerCondition(ServerLock lock) { - this.lock = lock; - } - - public void addWaiter(Owner waiter) { - notifyAwaiting(lock.lock_name, waiter); - log.trace("%s: waiter %s was added for %s", local_addr, waiter, lock.lock_name); - queue.add(waiter); - } - - public void removeWaiter(Owner waiter) { - notifyAwaited(lock.lock_name, waiter); - log.trace("%s: waiter %s was removed for %s", local_addr, waiter, lock.lock_name); - queue.remove(waiter); - } - - public void signal(boolean all) { - if (queue.isEmpty()) - log.trace("%s: signal for %s ignored since, no one is waiting in queue", local_addr, lock.lock_name); - - Owner entry; - if (all) { - while ((entry = queue.poll()) != null) { - notifyAwaited(lock.lock_name, entry); - log.trace("%s: signalled %s for %s", local_addr, entry, lock.lock_name); - sendSignalResponse(entry, lock.lock_name); - } - } - else { - entry = queue.poll(); - if (entry != null) { - notifyAwaited(lock.lock_name, entry); - log.trace("%s: signalled %s for %s", local_addr, entry, lock.lock_name); - sendSignalResponse(entry, lock.lock_name); - } - } - } - } - - - /** - * Implementation of {@link Lock}. This is a client stub communicates with a server equivalent. The semantics are - * more or less those of {@link Lock}, but may differ slightly. - * For details see {@link org.jgroups.blocks.locking.LockService}. - */ - protected class ClientLock implements Lock, Comparable { - protected final String name; - protected Owner owner; - protected volatile boolean acquired; - protected volatile boolean denied; - protected volatile boolean is_trylock; - protected long timeout; - protected final ClientCondition condition; - - // unique for locks for the same name:owner, can wrap around (that's ok) - protected final int lock_id=current_lock_id.getAndIncrement(); - - - - public ClientLock(String name) { - this.name=name; - this.condition = new ClientCondition(this); - } - - public ClientLock(String name, Owner owner) { - this(name); - this.owner=owner; - } - - public boolean isHeld() {return acquired && !denied;} - - public void lock() { - try { - acquire(false); - } - catch(InterruptedException e) { // should never happen - Thread.currentThread().interrupt(); // just a second line of defense - } - } - - public void lockInterruptibly() throws InterruptedException { - acquire(true); - } - - public boolean tryLock() { - try { - return acquireTryLock(0, false); - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - - public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - return acquireTryLock(TimeUnit.MILLISECONDS.convert(time, unit), true); - } - - public synchronized void unlock() { - _unlock(false); - } - - public Condition newCondition() { - // Currently only 1 condition per Lock is supported - return condition; - } - - public String toString() { - return String.format("%s (id=%d, locked=%b, owner=%s)", name, lock_id, acquired, owner != null? owner : "n/a"); - } - - protected synchronized void lockGranted(int lock_id) { - if(this.lock_id != lock_id) { - log.error(Util.getMessage("DiscardedLOCKGRANTEDResponseWithLockId") + lock_id + ", my lock-id=" + this.lock_id); - return; - } - acquired=true; - this.notifyAll(); - } - - protected synchronized void lockDenied(int lock_id) { - if(this.lock_id != lock_id) { - log.error(Util.getMessage("DiscardedLOCKDENIEDResponseWithLockId") + lock_id + ", my lock_id=" + this.lock_id); - return; - } - denied=true; - this.notifyAll(); - } - - protected void handleLockGrantedResponse(int lock_id) { - lockGranted(lock_id); - } - - protected void handleLockReleasedResponse(int lock_id) { - if(this.lock_id != lock_id) { - log.error(Util.getMessage("DiscardedLOCKGRANTEDResponseWithLockId") + lock_id + ", my lock-id=" + this.lock_id); - return; - } - _unlockOK(); - } - - protected synchronized void acquire(boolean throwInterrupt) throws InterruptedException { - if(acquired) - return; - if(throwInterrupt && Thread.interrupted()) - throw new InterruptedException(); - owner=getOwner(); - sendGrantLockRequest(name, lock_id, owner, 0, false); - boolean interrupted=false; - while(!acquired) { - try { - this.wait(); - } - catch(InterruptedException e) { - if(throwInterrupt && !acquired) { - _unlock(true); - throw e; - } - // If we don't throw exceptions then we just set the interrupt flag and let it loop around - interrupted=true; - } - } - if(interrupted) - Thread.currentThread().interrupt(); - } - - protected synchronized void _unlock(boolean force) { - if(!acquired && !denied && !force) - return; - this.timeout=0; - this.is_trylock=false; - if(!denied) { - if(!force) - client_lock_table.addToPendingReleaseRequests(this); - sendReleaseLockRequest(name, lock_id, owner); // lock will be released on RELEASE_LOCK_OK response - if(force && client_lock_table.removeClientLock(name,owner)) - notifyLockDeleted(name); - - if(!force) { - //unlock will return only when get RELEASE_LOCK_OK or timeLeft after some seconds - long time_left=10000; - while(acquired || denied) { - long start=System.currentTimeMillis(); - try { - wait(time_left); - } - catch(InterruptedException ie) { - break; - } - long duration=System.currentTimeMillis() - start; - if(duration > 0) - time_left-=duration; - if(time_left <= 0) { - log.warn("%s: timeout waiting for RELEASE_LOCK_OK response for lock %s", local_addr, this); - break; - } - } - } - } - else - _unlockOK(); - } - - protected synchronized void _unlockOK() { - acquired=denied=false; - notifyAll(); - if(client_lock_table.removeClientLock(name,owner)) - notifyLockDeleted(name); - owner=null; - } - - protected synchronized boolean acquireTryLock(long timeout, boolean use_timeout) throws InterruptedException { - if(denied) - return false; - if(!acquired) { - if(use_timeout && Thread.interrupted()) - throw new InterruptedException(); - is_trylock=true; - this.timeout=timeout; - if(owner == null) - owner=getOwner(); - sendGrantLockRequest(name, lock_id, owner, timeout, true); - - boolean interrupted = false; - while(!acquired && !denied) { - if(use_timeout) { - long timeout_ns=TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS), - wait_time=timeout_ns, - start=System.nanoTime(); - - while(wait_time > 0 && !acquired && !denied) { - try { - long wait_ms=TimeUnit.MILLISECONDS.convert(wait_time, TimeUnit.NANOSECONDS); - if(wait_ms <= 0) - break; - this.wait(wait_ms); - } - catch(InterruptedException e) { - interrupted=true; - } - finally { - wait_time=timeout_ns - (System.nanoTime() - start); - this.timeout=TimeUnit.MILLISECONDS.convert(wait_time, TimeUnit.NANOSECONDS); - } - } - break; - } - else { - try { - this.wait(); - } - catch(InterruptedException e) { - interrupted = true; - } - } - } - if(interrupted) - Thread.currentThread().interrupt(); - } - boolean retval=acquired && !denied; - if(!acquired || denied) - _unlock(true); - return retval; - } - - public boolean equals(Object obj) { - return this == obj || Objects.equals(owner, ((ClientLock)obj).owner); - } - - public int compareTo(ClientLock o) { - int rc=owner.compareTo(o.owner); - return rc != 0? rc : name.compareTo(o.name); - } - } - - /** Manages access to client locks */ - protected class ClientLockTable { - protected final ConcurrentMap> table=Util.createConcurrentMap(20); - protected final Set pending_release_reqs=new ConcurrentSkipListSet<>(); - - - protected int numLocks() {return table.size();} - - protected synchronized ClientLock getLock(String name, Owner owner, boolean create_if_absent) { - Map owners=table.get(name); - if(owners == null) { - if(!create_if_absent) - return null; - owners=Util.createConcurrentMap(20); - Map existing=table.putIfAbsent(name,owners); - if(existing != null) - owners=existing; - } - ClientLock lock=owners.get(owner); - if(lock == null) { - if(!create_if_absent) - return null; - lock=createLock(name, owner); - owners.put(owner, lock); - } - return lock; - } - - protected synchronized boolean removeClientLock(String lock_name, Owner owner) { - pending_release_reqs.removeIf(cl -> Objects.equals(cl.name, lock_name) && Objects.equals(cl.owner, owner)); - Map owners=table.get(lock_name); - if(owners != null) { - ClientLock lock=owners.remove(owner); - if(lock != null && owners.isEmpty()) - table.remove(lock_name); - return lock != null; - } - return false; - } - - protected void unlockAll() { - List lock_list=new ArrayList<>(); - synchronized(this) { - table.values().forEach(map -> lock_list.addAll(map.values())); - } - lock_list.forEach(ClientLock::unlock); - } - - protected void unlockForce(String lock_name) { - Map owners=table.get(lock_name); - if(owners != null) { - for(ClientLock cl : owners.values()) - cl._unlock(true); - } - pending_release_reqs.removeIf(cl -> Objects.equals(cl.name, lock_name)); - } - - protected void resendPendingLockRequests() { - final List pending_lock_reqs=new ArrayList<>(); - synchronized(this) { - if(!table.isEmpty()) { - table.values().forEach(map -> map.values().stream().filter(lock -> !lock.acquired && !lock.denied) - .forEach(pending_lock_reqs::add)); - } - } - if(!pending_lock_reqs.isEmpty()) { // send outside of the synchronized block - if(log.isTraceEnabled()) { - String tmp=pending_lock_reqs.stream().map(ClientLock::toString).collect(Collectors.joining(", ")); - log.trace("%s: resending pending lock requests: %s", local_addr, tmp); - } - pending_lock_reqs.forEach(l -> sendGrantLockRequest(l.name, l.lock_id, l.owner, l.timeout, l.is_trylock)); - } - - if(!pending_release_reqs.isEmpty()) { - if(log.isTraceEnabled()) { - String tmp=pending_release_reqs.stream().map(ClientLock::toString).collect(Collectors.joining(", ")); - log.trace("%s: resending pending unlock requests: %s", local_addr, tmp); - } - pending_release_reqs.forEach(cl -> sendReleaseLockRequest(cl.name, cl.lock_id, cl.owner)); - } - } - - protected synchronized Collection> values() { - return table.values(); - } - - /** Returns locks that have been successfully acquired */ - protected synchronized List> getLockInfo() { - List> l=new ArrayList<>(); - // table.forEach((key, value) -> value.keySet().forEach(owner -> l.add(new Tuple<>(key, owner)))); - table.forEach((k,v) -> v.forEach((owner, cl) -> { - if(cl.acquired && !cl.denied) - l.add(new Tuple<>(k, owner)); - })); - return l; - } - - protected synchronized List getPendingRequests(Address sender) { - List list=new ArrayList<>(); - - // add the pending LOCK requests - table.forEach((k,v) -> v.forEach((owner, cl) -> { - if(!cl.acquired && !cl.denied) { - Request req=new Request(Type.GRANT_LOCK, cl.name, owner, cl.timeout, cl.is_trylock).lockId(cl.lock_id); - list.add(req); - } - })); - - // add the pending UNLOCK requests - pending_release_reqs.forEach(cl -> { - if(cl.acquired && !cl.denied) { - Request req=new Request(Type.RELEASE_LOCK, cl.name, cl.owner, cl.timeout, cl.is_trylock) - .lockId(cl.lock_id).sender(sender); - list.add(req); - } - }); - - return list; - } - - - public String printLocks() { - return table.values().stream().map(Map::values).flatMap(Collection::stream) - .filter(cl -> cl.isHeld() && Objects.nonNull(cl.name)) - .map(cl -> cl.name).collect(Collectors.joining(", ")); - } - - public String toString() { - StringBuilder sb=new StringBuilder(); - boolean first_element=true; - for(Map.Entry> entry: table.entrySet()) { - if(first_element) - first_element=false; - else - sb.append(", "); - sb.append(entry.getKey()).append(" ("); - Map owners=entry.getValue(); - boolean first=true; - for(Map.Entry entry2: owners.entrySet()) { - if(first) - first=false; - else - sb.append(", "); - sb.append(entry2.getKey()); - ClientLock cl=entry2.getValue(); - if(!cl.acquired || cl.denied) - sb.append(", unlocked"); - } - sb.append(")"); - } - return sb.toString(); - } - - public void addToPendingReleaseRequests(ClientLock cl) { - if(cl != null) - pending_release_reqs.add(cl); - } - - - public void removeFromPendingReleaseRequests(ClientLock cl) { - if(cl != null) - pending_release_reqs.remove(cl); - } - - - } - - protected class ClientCondition implements Condition { - - protected final ClientLock lock; - protected final AtomicBoolean signaled = new AtomicBoolean(false); - /** - * This is okay only having 1 since a client condition is 1 per - * lock_name, thread id combination. - */ - protected volatile AtomicReference parker=new AtomicReference<>(); - - public ClientCondition(ClientLock lock) { - this.lock = lock; - } - - @Override - public void await() throws InterruptedException { - InterruptedException ex = null; - try { - await(true); - } - catch (InterruptedException e) { - ex = e; - throw ex; - } - finally { - lock.lock(); - - // If we are throwing an InterruptedException - // then clear the interrupt state as well. - if (ex != null) { - Thread.interrupted(); - } - } - } - - @Override - public void awaitUninterruptibly() { - try { - await(false); - } - catch(InterruptedException e) { - // This should never happen - } - finally { - lock.lock(); - } - } - - @Override - public long awaitNanos(long nanosTimeout) throws InterruptedException { - InterruptedException ex = null; - try { - return await(nanosTimeout); - } - catch (InterruptedException e) { - ex = e; - throw ex; - } - finally { - lock.lock(); // contract mandates we need to re-acquire the lock (see overridden method) - - // If we are throwing an InterruptedException then clear the interrupt state as well - if (ex != null) - Thread.interrupted(); - } - } - - /** - * Note this wait will only work correctly if the converted value is less - * than 292 years. This is due to the limitation in System.nano and long - * values that can only store up to 292 years (2263 nanoseconds). - * - * For more information please see {@link System#nanoTime()} - */ - @Override - public boolean await(long time, TimeUnit unit) throws InterruptedException { - return awaitNanos(unit.toNanos(time)) > 0; - } - - @Override - public boolean awaitUntil(Date deadline) throws InterruptedException { - long waitUntilTime=deadline.getTime(); - long currentTime=System.currentTimeMillis(); - - long waitTime=waitUntilTime - currentTime; - return waitTime > 0 && await(waitTime, TimeUnit.MILLISECONDS); - } - - protected void await(boolean throwInterrupt) throws InterruptedException { - if(!signaled.get()) { - lock.acquired = false; - sendAwaitConditionRequest(lock.name, lock.owner); - boolean interrupted=false; - while(!signaled.get()) { - parker.set(Thread.currentThread()); - LockSupport.park(this); - - if (Thread.interrupted()) { - // If we were interrupted and haven't received a response yet then we try to - // clean up the lock request and throw the exception - if (!signaled.get()) { - sendDeleteAwaitConditionRequest(lock.name, lock.owner); - throw new InterruptedException(); - } - // In the case that we were signaled and interrupted - // we want to return the signal but still interrupt - // our thread - interrupted = true; - } - } - if(interrupted) - Thread.currentThread().interrupt(); - } - - // We set as if this signal was no released. This way if the - // condition is reused again, but the client condition isn't lost - // we won't think we were signaled immediately - signaled.set(false); - } - - // Return the estimated time to wait (in ns), can be negative - protected long await(long nanoSeconds) throws InterruptedException { - long start=System.nanoTime(); - - if(!signaled.get()) { - // We release the lock at the same time as waiting on the - // condition - lock.acquired = false; - sendAwaitConditionRequest(lock.name, lock.owner); - - boolean interrupted = false; - while(!signaled.get()) { - long wait_nano=nanoSeconds - (System.nanoTime() - start); - - // If we waited max time break out - if(wait_nano > 0) { - parker.set(Thread.currentThread()); - LockSupport.parkNanos(this, wait_nano); - - if (Thread.interrupted()) { - // If we were interrupted and haven't received a response yet then we try to - // clean up the lock request and throw the exception - if (!signaled.get()) { - sendDeleteAwaitConditionRequest(lock.name, lock.owner); - throw new InterruptedException(); - } - // In the case that we were signaled and interrupted - // we want to return the signal but still interrupt - // our thread - interrupted = true; - } - } - else { - break; - } - } - if(interrupted) - Thread.currentThread().interrupt(); - } - - // We set as if this signal was no released. This way if the - // condition is reused again, but the client condition isn't lost - // we won't think we were signaled immediately - // If we weren't signaled then delete our request - if (!signaled.getAndSet(false)) { - sendDeleteAwaitConditionRequest(lock.name, lock.owner); - } - return nanoSeconds - (System.nanoTime() - start); - } - - @Override - public void signal() { - sendSignalConditionRequest(lock.name, false); - } - - @Override - public void signalAll() { - sendSignalConditionRequest(lock.name, true); - } - - protected void signaled() { - signaled.set(true); - Thread thread = parker.getAndSet(null); - if (thread != null) - LockSupport.unpark(thread); - } - } - - - public static class Request implements Streamable { - protected Type type; - protected String lock_name; - protected int lock_id; - protected Owner owner; - protected long timeout; - protected boolean is_trylock; - protected LockInfoResponse info_rsp; - protected Address sender; - - - public Request() { - } - - public Request(Type type) { - this.type=type; - } - - public Request(Type type, String lock_name, Owner owner, long timeout) { - this(type); - this.lock_name=lock_name; - this.owner=owner; - this.timeout=timeout; - } - - public Request(Type type, String lock_name, Owner owner, long timeout, boolean is_trylock) { - this(type, lock_name, owner, timeout); - this.is_trylock=is_trylock; - } - - public Type getType() {return type;} - public Request lockId(int lock_id) {this.lock_id=lock_id; return this;} - public int lockId() {return lock_id;} - public Request infoRsp(LockInfoResponse r) {this.info_rsp=r; return this;} - public Address sender() {return this.sender;} - public Request sender(Address sender) {this.sender=sender; return this;} - - @Override - public void writeTo(DataOutput out) throws IOException { - out.writeByte(type.ordinal()); - Bits.writeString(lock_name,out); - out.writeInt(lock_id); - Util.writeStreamable(owner, out); - out.writeLong(timeout); - out.writeBoolean(is_trylock); - Util.writeStreamable(info_rsp, out); - Util.writeAddress(sender, out); - } - - @Override - public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - type=Type.values()[in.readByte()]; - lock_name=Bits.readString(in); - lock_id=in.readInt(); - owner=Util.readStreamable(Owner::new, in); - timeout=in.readLong(); - is_trylock=in.readBoolean(); - info_rsp=Util.readStreamable(LockInfoResponse::new, in); - sender=Util.readAddress(in); - } - - public String toString() { - StringBuilder sb=new StringBuilder(type.name() + "["); - if(lock_name != null) - sb.append(lock_name); - if(lock_id > 0) - sb.append(", lock_id=").append(lock_id); - if(owner != null) - sb.append(", owner=").append(owner); - if(is_trylock) - sb.append(", trylock"); - if(timeout > 0) - sb.append(", timeout=").append(timeout); - if(sender != null) - sb.append(", sender=").append(sender); - sb.append("]"); - return sb.toString(); - } - - public String toStringShort() { - StringBuilder sb=new StringBuilder(); - switch(type) { - case RELEASE_LOCK: - sb.append("U"); - break; - case GRANT_LOCK: - sb.append(is_trylock? "TL" : "L"); - break; - default: - sb.append("N/A"); - break; - } - sb.append("(").append(lock_name).append(",").append(owner); - if(timeout > 0) - sb.append(",").append(timeout); - if(info_rsp != null) - sb.append(", lock-info-response: ").append(info_rsp); - sb.append(")"); - return sb.toString(); - } - } - - /** A response to a request, to be sent back to the requester as a message */ - protected static class Response { - protected final Type type; - protected final Owner owner; - protected final String lock_name; - protected final int lock_id; - - public Response(Type type, Owner owner, String lock_name, int lock_id) { - this.type=type; - this.owner=owner; - this.lock_name=lock_name; - this.lock_id=lock_id; - } - } - - - public static class LockingHeader extends Header { - - public LockingHeader() { - } - public short getMagicId() {return 72;} - public Supplier create() { - return LockingHeader::new; - } - - @Override - public int serializedSize() { - return 0; - } - - @Override - public void writeTo(DataOutput out) throws IOException { - } - - @Override - public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - } - } - - - protected static class LockInfoResponse implements Streamable { - protected List> existing_locks; // lock name and owner - protected List pending_requests; - - protected LockInfoResponse add(Tuple el) { - if(existing_locks == null) - existing_locks=new ArrayList<>(); - existing_locks.add(el); - return this; - } - - public void writeTo(DataOutput out) throws IOException { - if(existing_locks == null) - out.writeInt(0); - else { - out.writeInt(existing_locks.size()); - for(Tuple t: existing_locks) { - Bits.writeString(t.getVal1(), out); - t.getVal2().writeTo(out); - } - } - if(pending_requests == null) - out.writeInt(0); - else { - out.writeInt(pending_requests.size()); - for(Request req: pending_requests) - req.writeTo(out); - } - } - - public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - int size=in.readInt(); - if(size > 0) { - existing_locks=new ArrayList<>(size); - for(int i=0; i < size; i++) { - String lock_name=Bits.readString(in); - Owner owner=new Owner(); - owner.readFrom(in); - existing_locks.add(new Tuple<>(lock_name, owner)); - } - - } - size=in.readInt(); - if(size > 0) { - pending_requests=new ArrayList<>(); - for(int i=0; i < size; i++) { - Request req=new Request(); - req.readFrom(in); - pending_requests.add(req); - } - } - } - - public String toString() { - return String.format("%d locks and %d pending lock/unlock requests", - existing_locks == null? 0 : existing_locks.size(), - pending_requests == null? 0 : pending_requests.size()); - } - - public String printDetails() { - StringBuilder sb=new StringBuilder(toString()); - if(existing_locks != null && !existing_locks.isEmpty()) - sb.append(String.format("\nlocks:\n%s", existing_locks.stream().map(Tuple::getVal1) - .collect(Collectors.joining(", ")))); - if(pending_requests != null && !pending_requests.isEmpty()) - sb.append(String.format("\npending requests:\n%s", pending_requests)); - return sb.toString(); - } - } - -} diff --git a/tests/junit-functional/org/jgroups/blocks/LockServiceDuplicateLockTest.java b/tests/junit-functional/org/jgroups/blocks/LockServiceDuplicateLockTest.java deleted file mode 100644 index 2b93c33781a..00000000000 --- a/tests/junit-functional/org/jgroups/blocks/LockServiceDuplicateLockTest.java +++ /dev/null @@ -1,177 +0,0 @@ -package org.jgroups.blocks; - -import org.jgroups.*; -import org.jgroups.blocks.locking.LockNotification; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.protocols.CENTRAL_LOCK2; -import org.jgroups.protocols.Locking; -import org.jgroups.protocols.pbcast.GMS; -import org.jgroups.util.Owner; -import org.jgroups.util.UUID; -import org.jgroups.util.Util; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * Tests the lock service under partitions: when members in different partitions hold the same lock, after a merge, all - * but one lock holder will get notified that their locks have been revoked. - * See https://issues.redhat.com/browse/JGRP-2249 for details. - * @author Bela Ban - * @since 4.0.13 - */ -@Test(groups=Global.FUNCTIONAL,singleThreaded=true) -public class LockServiceDuplicateLockTest implements LockNotification { - protected final JChannel[] channels=new JChannel[6]; - protected final LockService[] lock_services=new LockService[channels.length]; - protected Lock lock_3; // lock held by member 3 - protected Lock lock_6; // lock held by member 6 - protected static final String LOCK_NAME="X"; - - @BeforeMethod protected void setup() throws Exception { - for(int i=0; i < channels.length; i++) { - channels[i]=create(i + 1).connect(LockServiceDuplicateLockTest.class.getSimpleName()); - lock_services[i]=new LockService(channels[i]); - } - Util.waitUntilAllChannelsHaveSameView(10000, 1000, channels); - System.out.printf("channels:\n%s", Stream.of(channels) - .map(ch -> String.format("%s: %s\n", ch.getAddress(), ch.getView())) - .collect(Collectors.joining("\n"))); - lock_3=lock_services[2].getLock(LOCK_NAME); - lock_6=lock_services[5].getLock(LOCK_NAME); - for(JChannel ch: channels) { - Locking l=ch.getProtocolStack().findProtocol(Locking.class); - l.addLockListener(this); - } - } - - @AfterMethod protected void destroy() { - trace(false, channels); - Util.closeReverse(channels); - } - - - public void testDuplicateLockRevocation() throws Exception { - boolean lock_acquired=lock_3.tryLock(3, TimeUnit.SECONDS); - System.out.printf("** lock_3: %s\n", lock_3); - assert lock_acquired; - System.out.println("--------- Injecting partitions ---------"); - - createAndInjectView(channels[0], channels[1], channels[2]); - createAndInjectView(channels[3], channels[4], channels[5]); - System.out.printf("channels:\n%s", Stream.of(channels) - .map(ch -> String.format("%s: %s\n", ch.getAddress(), ch.getView())) - .collect(Collectors.joining("\n"))); - Stream.of(channels[0], channels[1], channels[2]).allMatch(ch -> ch.getView().size() == 3); - Stream.of(channels[3], channels[4], channels[5]).allMatch(ch -> ch.getView().size() == 3); - - // now acquire the same lock on member 6: this will succeed because we have 2 partitions - lock_acquired=lock_6.tryLock(1, TimeUnit.SECONDS); - System.out.printf("** lock_6: %s\n", lock_6); - assert lock_acquired; - - System.out.println("----------- Merging partitions ----------"); - trace(true, channels); - MergeView mv=createMergeView(channels); - injectView(mv, channels); - - System.out.printf("channels:\n%s", Stream.of(channels) - .map(ch -> String.format("%s: %s\n", ch.getAddress(), ch.getView())) - .collect(Collectors.joining("\n"))); - Stream.of(channels).allMatch(ch -> ch.getView().size() == channels.length); - - System.out.printf("lock_3: %s, lock_6: %s\n", lock_3, lock_6); - - printLockTables(channels); - - assertServerLocks(1, 0); - assertServerLocks(0, 1,2,3,4,5); - - assertClientLocks(1, 2); - assertClientLocks(0, 0,1,3,4,5); - } - - - protected void assertServerLocks(int num, int ... indices) { - for(int index: indices) { - JChannel ch=channels[index]; - Locking lock=ch.getProtocolStack().findProtocol(Locking.class); - assert lock.getNumServerLocks() == num - : String.format("expected %d server locks but found %d in %s", num, lock.getNumServerLocks(), ch.getAddress()); - } - } - - protected void assertClientLocks(int num, int ... indices) { - for(int index: indices) { - JChannel ch=channels[index]; - Locking lock=ch.getProtocolStack().findProtocol(Locking.class); - assert lock.getNumClientLocks() == num - : String.format("expected %d client locks but found %d in %s", num, lock.getNumClientLocks(), ch.getAddress()); - } - } - - @Test(enabled=false) public void lockCreated(String name) {} - @Test(enabled=false) public void lockDeleted(String name) {} - - @Test(enabled=false) public void lockRevoked(String lock_name, Owner current_owner) { - System.out.printf("*** received lock revocation for %s (current owner=%s); force-unlocking lock\n", - lock_name, current_owner); - lock_services[5].unlockForce(lock_name); - } - - @Test(enabled=false) public void locked(String lock_name, Owner owner) {} - @Test(enabled=false) public void unlocked(String lock_name, Owner owner) {} - @Test(enabled=false) public void awaiting(String lock_name, Owner owner) {} - @Test(enabled=false) public void awaited(String lock_name, Owner owner) {} - - protected static void createAndInjectView(JChannel... channels) throws Exception { - Address[] addrs=new Address[channels.length]; - for(int i=0; i < channels.length; i++) - addrs[i]=channels[i].getAddress(); - View v=View.create(addrs[0], channels[0].getView().getViewId().getId()+1, addrs); - injectView(v, channels); - } - - protected static MergeView createMergeView(JChannel... channels) throws Exception { - Address[] addrs=new Address[channels.length]; - for(int i=0; i < channels.length; i++) - addrs[i]=channels[i].getAddress(); - List subgroups=new ArrayList<>(); - for(JChannel ch: channels) - subgroups.add(ch.getView()); - return new MergeView(new ViewId(addrs[0], channels[0].getView().getViewId().getId()+1), addrs, subgroups); - } - - protected static void injectView(View v, JChannel... channels) throws Exception { - Stream.of(channels).forEach(ch -> { - GMS gms=ch.getProtocolStack().findProtocol(GMS.class); - gms.installView(v); - }); - Util.waitUntilAllChannelsHaveSameView(10000, 500, channels); - } - - protected static void trace(boolean on, JChannel... channels) { - Stream.of(channels).forEach(ch -> ch.getProtocolStack().findProtocol(Locking.class).level(on? "trace" : "warn")); - } - - protected static void printLockTables(JChannel... channels) { - System.out.printf("\n\nlock tables:\n%s\n", - Stream.of(channels).map(ch -> { - CENTRAL_LOCK2 l=ch.getProtocolStack().findProtocol(CENTRAL_LOCK2.class); - return ch.getAddress() + ": " + l.printLocks(); - }).collect(Collectors.joining("\n"))); - } - - protected static JChannel create(int num) throws Exception { - return new JChannel(Util.getTestStack(new CENTRAL_LOCK2())).name(String.valueOf(num)) - // the address generator makes sure that 2's UUID is lower than 3's UUID, so 2 is chosen as merge leader - .addAddressGenerator(() -> new UUID(0, num)); - } -} diff --git a/tests/junit-functional/org/jgroups/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/blocks/LockServiceTest.java deleted file mode 100644 index a522e90cf98..00000000000 --- a/tests/junit-functional/org/jgroups/blocks/LockServiceTest.java +++ /dev/null @@ -1,591 +0,0 @@ -package org.jgroups.blocks; - -import org.jgroups.Global; -import org.jgroups.JChannel; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.protocols.CENTRAL_LOCK; -import org.jgroups.protocols.CENTRAL_LOCK2; -import org.jgroups.protocols.Locking; -import org.jgroups.stack.Protocol; -import org.jgroups.util.Util; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.concurrent.*; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -/** Tests {@link org.jgroups.blocks.locking.LockService} - * @author Bela Ban - */ -@Test(groups={Global.FUNCTIONAL,Global.EAP_EXCLUDED},singleThreaded=true,dataProvider="createLockingProtocol") -public class LockServiceTest { - protected JChannel c1, c2, c3; - protected LockService s1, s2, s3; - protected Lock lock; - protected static final String LOCK="sample-lock"; - protected static final Class LOCK_CLASS=Locking.class; - protected static final String CLUSTER=LockServiceTest.class.getSimpleName(); - protected static final int NUM_ITERATIONS=1_000; - - @DataProvider(name="createLockingProtocol") - Object[][] createLockingProtocol() { - return new Object[][] { - {CENTRAL_LOCK.class}, - {CENTRAL_LOCK2.class} - }; - } - - - protected void init(Class locking_class) throws Exception { - c1=createChannel("A", locking_class); - s1=new LockService(c1); - c1.connect(CLUSTER); - - c2=createChannel("B", locking_class); - s2=new LockService(c2); - c2.connect(CLUSTER); - - c3=createChannel("C", locking_class); - s3=new LockService(c3); - c3.connect(CLUSTER); - - Util.waitUntilAllChannelsHaveSameView(10000, 1000, c1, c2, c3); - lock=s1.getLock(LOCK); - } - - - @AfterMethod - protected void cleanup() { - Util.close(c3,c2,c1); - } - - - @Test(dataProvider="createLockingProtocol") - public void testSimpleLock(Class locking_class) throws Exception { - init(locking_class); - lock(lock, LOCK); - unlock(lock, LOCK); - } - - public void testLockingOfAlreadyAcquiredLock(Class locking_class) throws Exception { - init(locking_class); - lock(lock, LOCK); - lock(lock, LOCK); - unlock(lock, LOCK); - } - - public void testUnsuccessfulTryLock(Class locking_class) throws Exception { - init(locking_class); - System.out.printf("s1:\n%s\ns2:\n%s\ns3:\n%s\n", s1.printLocks(), s2.printLocks(), s3.printLocks()); - - Lock lock2=s2.getLock(LOCK); - lock(lock2, LOCK); - try { - boolean rc=tryLock(lock, LOCK); - assert !rc; - unlock(lock, LOCK); - } - finally { - unlock(lock2, LOCK); - } - } - - public void testUnsuccessfulTryLockTimeout(Class locking_class) throws Exception { - init(locking_class); - Lock lock2=s2.getLock(LOCK); - lock(lock2, LOCK); - try { - boolean rc=tryLock(lock, 1000, LOCK); - assert !rc; - } - finally { - unlock(lock2, LOCK); - } - } - - - public void testLockInterrupt(Class locking_class) throws Exception { - init(locking_class); - // Interrupt ourselves before trying to acquire lock - Thread.currentThread().interrupt(); - - lock.lock(); - try { - System.out.println("Locks we have: " + s1.printLocks()); - if(Thread.interrupted()) - System.out.println("We have the interrupt flag status, as it should be"); - else - assert false : "Interrupt status was lost - we don't want this!"; - } - finally { - lock.unlock(); - } - } - - @Test(expectedExceptions=InterruptedException.class,dataProvider="createLockingProtocol") - public void testTryLockInterruptibly(Class locking_class) throws Exception { - init(locking_class); - // Interrupt ourselves before trying to acquire lock - Thread.currentThread().interrupt(); - - lock.lockInterruptibly(); - try { - System.out.println("Locks we have: " + s1.printLocks()); - if(Thread.interrupted()) - System.out.println("We still have interrupt flag set, as it should be"); - else - assert false : "Interrupt status was lost - we don't want this!"; - } - finally { - lock.unlock(); - } - } - - - public void testTryLockInterrupt(Class locking_class) throws Exception { - init(locking_class); - Thread.currentThread().interrupt(); // interrupt myself before trying to acquire lock - boolean status=lock.tryLock(); - try { - System.out.println("Locks we have: " + s1.printLocks()); - if(Thread.interrupted()) - System.out.println("Interrupt was set - correct"); - else - assert false : "interrupt should not be set on tryLock()"; - assert status; - } - finally { - lock.unlock(); - } - } - - @Test(expectedExceptions=InterruptedException.class,dataProvider="createLockingProtocol") - public void testTimedTryLockInterrupt(Class locking_class) throws Exception { - init(locking_class); - Thread.currentThread().interrupt(); // interrupt myself before trying to acquire lock - boolean status=lock.tryLock(5000, TimeUnit.MILLISECONDS); - try { - System.out.println("Locks we have: " + s1.printLocks()); - if(Thread.interrupted()) - System.out.println("Interrupt was set - correct"); - else - assert false : "interrupt should not be set on tryLock()"; - assert status; - } - finally { - lock.unlock(); - } - } - - /** Multiple lock-unlock cycles */ - public void testLockMultipleTimes(Class locking_class) throws Exception { - init(locking_class); - - int print=NUM_ITERATIONS / 10; - for(int i=0; i < NUM_ITERATIONS; i++) { - lock(lock, LOCK); - try { - assert true: "lock not acquired!"; - } - finally { - unlock(lock, LOCK); - } - if(i > 0 && i % print == 0) - System.out.printf("-- %d iterations\n", i); - } - } - - - /** Multiple trylock-unlock cycles */ - public void testTryLockMultipleTimes(Class locking_class) throws Exception { - init(locking_class); - - int print=NUM_ITERATIONS / 10; - for(int i=0; i < NUM_ITERATIONS; i++) { - boolean rc=tryLock(lock, 10000, LOCK); - try { - assert rc : "lock not acquired!"; - } - finally { - unlock(lock, LOCK); - } - if(i > 0 && i % print == 0) - System.out.printf("-- %d iterations\n", i); - } - } - - - public void testSuccessfulSignalAllTimeout(Class locking_class) throws Exception { - init(locking_class); - Lock lock2=s2.getLock(LOCK); - Thread locker=new Signaller(true); - boolean rc=tryLock(lock2, 5000, LOCK); - assert rc; - locker.start(); - assert awaitNanos(lock2.newCondition(), TimeUnit.SECONDS.toNanos(5), LOCK) > 0 : "Condition was not signalled"; - unlock(lock2, LOCK); - } - - - public void testSuccessfulTryLockTimeout(Class locking_class) throws Exception { - init(locking_class); - final CyclicBarrier barrier=new CyclicBarrier(2); - Thread locker=new Locker(barrier); - locker.start(); - barrier.await(); - boolean rc=tryLock(lock, 10000, LOCK); - assert rc; - unlock(lock, LOCK); - } - - - public void testConcurrentLockRequests(Class locking_class) throws Exception { - init(locking_class); - int NUM=10; - final CyclicBarrier barrier=new CyclicBarrier(NUM +1); - TryLocker[] lockers=new TryLocker[NUM]; - for(int i=0; i < lockers.length; i++) { - lockers[i]=new TryLocker(lock, barrier); - lockers[i].start(); - } - barrier.await(); - for(TryLocker locker: lockers) - locker.join(); - int num_acquired=0; - for(TryLocker locker: lockers) { - if(locker.acquired) - num_acquired++; - } - System.out.println("num_acquired = " + num_acquired); - assert num_acquired == 1 : "expected 1 acquired bot got " + num_acquired; - } - - public void testConcurrentLockRequestsFromDifferentMembers(Class locking_class) throws Exception { - init(locking_class); - int NUM=10; - final CyclicBarrier barrier=new CyclicBarrier(NUM +1); - TryLocker[] lockers=new TryLocker[NUM]; - LockService[] services={s1, s2, s3}; - - for(int i=0; i < lockers.length; i++) { - Lock mylock=services[i % services.length].getLock(LOCK); - lockers[i]=new TryLocker(mylock, barrier); - lockers[i].start(); - } - barrier.await(); - for(TryLocker locker: lockers) - locker.join(); - int num_acquired=0; - for(TryLocker locker: lockers) { - if(locker.acquired) { - num_acquired++; - } - } - System.out.println("num_acquired = " + num_acquired); - assert num_acquired == 1 : "expected 1 but got " + num_acquired; - } - - /** Tests locking by T1 and unlocking by T2 (https://issues.redhat.com/browse/JGRP-1886) */ - public void testLockUnlockByDiffentThreads(Class locking_class) throws Exception { - init(locking_class); - CyclicBarrier barrier=null; - try { - setProp(LOCK_CLASS, false, c1,c2,c3); - barrier=new CyclicBarrier(2); - Thread locker=new Locker(barrier); - locker.start(); - Util.sleep(2000); - boolean rc=tryLock(lock, 10000, LOCK); - assert rc; - } - finally { - setProp(LOCK_CLASS, true,c1,c2,c3); - unlock(lock, LOCK); - } - } - - - public void testSuccessfulSignalOneTimeout(Class locking_class) throws Exception { - init(locking_class); - Lock lock2 = s2.getLock(LOCK); - Thread locker = new Signaller(false); - boolean rc = tryLock(lock2, 5000, LOCK); - assert rc; - locker.start(); - assert awaitNanos(lock2.newCondition(), TimeUnit.SECONDS.toNanos(5), LOCK) > 0 : "Condition was not signalled"; - unlock(lock2, LOCK); - } - - public void testInterruptWhileWaitingForCondition(Class locking_class) throws Exception { - init(locking_class); - CountDownLatch latch = new CountDownLatch(1); - Thread awaiter = new Thread(new InterruptAwaiter(latch)); - awaiter.start(); - Lock lock2 = s2.getLock(LOCK); - assert tryLock(lock2, 5000, LOCK); - awaiter.interrupt(); - // This should not hit, since we have the lock and the condition can't - // come out yet then - assert !latch.await(1, TimeUnit.SECONDS); - assert awaiter.isAlive(); - lock2.unlock(); - assert latch.await(100, TimeUnit.MILLISECONDS); - } - - public void testSignalAllAwakesAllForCondition(Class locking_class) throws Exception { - init(locking_class); - final int threadCount = 5; - CountDownLatch latch = new CountDownLatch(threadCount); - - ExecutorService service = Executors.newFixedThreadPool(threadCount); - try { - - for (int i = 0; i < threadCount; ++i) { - service.submit(new SyncAwaiter(latch)); - - } - // Wait for all the threads to be waiting on condition - latch.await(2, TimeUnit.SECONDS); - - Lock lock2 = s2.getLock(LOCK); - assert tryLock(lock2, 5000, LOCK); - lock2.newCondition().signalAll(); - lock2.unlock(); - service.shutdown(); - service.awaitTermination(2, TimeUnit.SECONDS); - } - finally { - service.shutdownNow(); - } - } - - - - protected static JChannel createChannel(String name, Class locking_class) throws Exception { - Protocol[] stack=Util.getTestStack(locking_class.getDeclaredConstructor().newInstance().level("trace")); - return new JChannel(stack).name(name); - } - - protected static void setProp(Class clazz, boolean value, JChannel... channels) { - for(JChannel ch: channels) { - Locking prot=ch.getProtocolStack().findProtocol(clazz); - prot.useThreadIdForLockOwner(value); - } - } - - protected class Locker extends Thread { - protected final CyclicBarrier barrier; - - public Locker(CyclicBarrier barrier) { - this.barrier=barrier; - } - - public void run() { - lock(lock, LOCK); - try { - barrier.await(); - Util.sleep(500); - } - catch(Exception e) { - } - finally { - unlock(lock, LOCK); - } - } - } - - protected class Signaller extends Thread { - protected final boolean all; - - public Signaller(boolean all) { - this.all=all; - } - - public void run() { - lock(lock, LOCK); - try { - Util.sleep(500); - - if (all) { - signallingAll(lock.newCondition(), LOCK); - } - else { - signalling(lock.newCondition(), LOCK); - } - } - catch(Exception e) { - e.printStackTrace(); - } - finally { - unlock(lock, LOCK); - } - } - } - - protected abstract class AbstractAwaiter implements Runnable { - public void afterLock() { } - - public void onInterrupt() { } - - public void run() { - lock(lock, LOCK); - try { - afterLock(); - try { - lock.newCondition().await(2, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - onInterrupt(); - } - } - catch(Exception e) { - e.printStackTrace(); - } - finally { - unlock(lock, LOCK); - } - } - } - - protected class InterruptAwaiter extends AbstractAwaiter { - final CountDownLatch latch; - - public InterruptAwaiter(CountDownLatch latch) { - this.latch = latch; - } - - @Override - public void onInterrupt() { - latch.countDown(); - } - } - - protected class SyncAwaiter extends AbstractAwaiter { - final CountDownLatch latch; - - public SyncAwaiter(CountDownLatch latch) { - this.latch = latch; - } - - @Override - public void afterLock() { - latch.countDown(); - } - } - - - protected static class TryLocker extends Thread { - protected final Lock mylock; - protected final CyclicBarrier barrier; - protected boolean acquired; - - public TryLocker(Lock mylock, CyclicBarrier barrier) { - this.mylock=mylock; - this.barrier=barrier; - } - - public boolean isAcquired() { - return acquired; - } - - public void run() { - try { - barrier.await(); - } - catch(Exception e) { - e.printStackTrace(); - } - - try { - acquired=tryLock(mylock, LOCK); - if(acquired) - Util.sleep(2000); - } - catch(Exception e) { - e.printStackTrace(); - } - finally { - if(acquired) - unlock(mylock, LOCK); - } - } - } - - protected static class AcquireLockAndAwaitCondition extends Thread { - private final Lock lock; - - public AcquireLockAndAwaitCondition(Lock lock) { - this.lock = lock; - } - - @Override - public void run() { - if (tryLock(lock, LOCK)) { - try { - Condition condition = lock.newCondition(); - try { - condition.await(); - } catch (InterruptedException e) { - System.out.println(""); - } - } - finally { - unlock(lock, LOCK); - } - } - - } - } - - - protected static void lock(Lock lock, String name) { - System.out.println("[" + Thread.currentThread().getId() + "] locking " + name); - lock.lock(); - System.out.println("[" + Thread.currentThread().getId() + "] locked " + name); - } - - protected static boolean tryLock(Lock lock, String name) { - System.out.println("[" + Thread.currentThread().getId() + "] tryLocking " + name); - boolean rc=lock.tryLock(); - System.out.println("[" + Thread.currentThread().getId() + "] " + (rc? "locked " : "failed locking ") + name); - return rc; - } - - protected static boolean tryLock(Lock lock, long timeout, String name) throws InterruptedException { - System.out.println("[" + Thread.currentThread().getId() + "] tryLocking " + name); - boolean rc=lock.tryLock(timeout, TimeUnit.MILLISECONDS); - System.out.println("[" + Thread.currentThread().getId() + "] " + (rc? "locked " : "failed locking ") + name); - return rc; - } - - protected static void unlock(Lock lock, String name) { - if(lock == null) - return; - System.out.println("[" + Thread.currentThread().getId() + "] releasing " + name); - lock.unlock(); - System.out.println("[" + Thread.currentThread().getId() + "] released " + name); - } - - protected static long awaitNanos(Condition condition, long nanoSeconds, - String name) throws InterruptedException { - System.out.println("[" + Thread.currentThread().getId() + "] waiting for signal - released lock " + name); - long value = condition.awaitNanos(nanoSeconds); - System.out.println("[" + Thread.currentThread().getId() + "] waited for signal - obtained lock " + name); - return value; - } - - protected static void signalling(Condition condition, String name) { - System.out.println("[" + Thread.currentThread().getId() + "] signalling " + name); - condition.signal(); - System.out.println("[" + Thread.currentThread().getId() + "] signalled " + name); - } - - protected static void signallingAll(Condition condition, String name) { - System.out.println("[" + Thread.currentThread().getId() + "] signalling all " + name); - condition.signalAll(); - System.out.println("[" + Thread.currentThread().getId() + "] signalled " + name); - } - -} diff --git a/tests/junit-functional/org/jgroups/blocks/LockServiceWithCentralLock2Test.java b/tests/junit-functional/org/jgroups/blocks/LockServiceWithCentralLock2Test.java deleted file mode 100644 index 05859c62bec..00000000000 --- a/tests/junit-functional/org/jgroups/blocks/LockServiceWithCentralLock2Test.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.jgroups.blocks; - -import org.jgroups.Global; -import org.jgroups.JChannel; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.fork.ForkChannel; -import org.jgroups.protocols.CENTRAL_LOCK2; -import org.jgroups.protocols.FORK; -import org.jgroups.stack.Protocol; -import org.jgroups.util.Util; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.Test; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -@Test(groups= Global.FUNCTIONAL,singleThreaded=true) -public class LockServiceWithCentralLock2Test { - protected JChannel c; - protected ForkChannel fc; - protected static final String PROPS="fork.xml"; - protected static final String CLUSTER=LockServiceWithCentralLock2Test.class.getSimpleName(); - - @AfterMethod protected void destroy() { - Util.close(fc,c); - } - - public void shouldLockServiceGetLockUsingForkChannel() throws Exception { - c = create(PROPS, "A", true).connect(CLUSTER); - _shouldLockServiceGetLockUsingForkChannel(); - } - - public void shouldLockServiceGetLockUsingForkChannelProgrammaticCreation() throws Exception { - c = create(null, "A", true).connect(CLUSTER); - _shouldLockServiceGetLockUsingForkChannel(); - } - - public void shouldRequestHandlerRunningUsingForkChannel() throws Exception { - c=create(PROPS, "A", true); - fc = new ForkChannel(c, "lock", "lock-channel"); - _shouldRequestHandlerRunningUsingForkChannel(); - } - - public void shouldRequestHandlerRunningUsingForkChannelProgrammaticCreation() throws Exception { - c=create(null, "A", true); - fc = new ForkChannel(c, "lock", "lock-channel", new CENTRAL_LOCK2()); - _shouldRequestHandlerRunningUsingForkChannel(); - } - - public void shouldLockServiceGetLockNotUsingForkChannel() throws Exception { - c = create(null, "A", false).connect(CLUSTER); - LockService lockService = new LockService(c); - Lock lock=lockService.getLock("myLock"); - try { - boolean success=lock.tryLock(5, TimeUnit.SECONDS); - assert success; - } - finally { - lock.unlock(); - } - } - - public void shouldRequestHandlerRunningNotUsingForkChannel() throws Exception{ - c = create(null, "A", false).connect(CLUSTER); - CENTRAL_LOCK2 centralLock2 = c.getProtocolStack().findProtocol(CENTRAL_LOCK2.class); - assert centralLock2.isCoord() && centralLock2.isRequestHandlerRunning(); - } - - - protected void _shouldLockServiceGetLockUsingForkChannel() throws Exception { - fc = new ForkChannel(c, "lock", "lock-channel", new CENTRAL_LOCK2()); - fc.connect("bla"); - LockService lockService = new LockService(fc); - boolean isLocked = lockService.getLock("myLock").tryLock(5, TimeUnit.SECONDS); - assert isLocked == true; - } - - protected void _shouldRequestHandlerRunningUsingForkChannel() throws Exception{ - c.connect(CLUSTER); - fc.connect("bla"); - CENTRAL_LOCK2 centralLock2 = fc.getProtocolStack().findProtocol(CENTRAL_LOCK2.class); - assert centralLock2.isCoord() && centralLock2.isRequestHandlerRunning(); - } - - protected static JChannel create(String props, String name, boolean use_fork) throws Exception { - if(props != null) - return new JChannel(props).name(name); - Protocol p=use_fork? new FORK() : new CENTRAL_LOCK2(); - return new JChannel(Util.getTestStack(p)).name(name); - } -} diff --git a/tests/junit-functional/org/jgroups/blocks/LockService_JGRP_2234_Test.java b/tests/junit-functional/org/jgroups/blocks/LockService_JGRP_2234_Test.java deleted file mode 100644 index 503617bc394..00000000000 --- a/tests/junit-functional/org/jgroups/blocks/LockService_JGRP_2234_Test.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.jgroups.blocks; - -import org.jgroups.Global; -import org.jgroups.JChannel; -import org.jgroups.Message; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.conf.ClassConfigurator; -import org.jgroups.protocols.CENTRAL_LOCK; -import org.jgroups.protocols.CENTRAL_LOCK2; -import org.jgroups.protocols.Locking; -import org.jgroups.stack.Protocol; -import org.jgroups.stack.ProtocolStack; -import org.jgroups.util.Util; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.stream.Stream; - -/** Tests https://issues.redhat.com/browse/JGRP-2234 with {@link LockService} - * @author Bela Ban - */ -@Test(groups={Global.FUNCTIONAL,Global.EAP_EXCLUDED},singleThreaded=true,dataProvider="createLockingProtocol") -public class LockService_JGRP_2234_Test { - protected JChannel a, b, c, d; - protected LockService s1, s2, s3, s4; - protected static final String LOCK="sample-lock"; - protected static final String CLUSTER=LockService_JGRP_2234_Test.class.getSimpleName(); - - - @DataProvider(name="createLockingProtocol") - Object[][] createLockingProtocol() { - return new Object[][] { - {CENTRAL_LOCK.class}, - {CENTRAL_LOCK2.class} - }; - } - - - protected void init(Class locking_class) throws Exception { - a=createChannel("A", locking_class); - s1=new LockService(a); - a.connect(CLUSTER); - - b=createChannel("B", locking_class); - s2=new LockService(b); - b.connect(CLUSTER); - - c=createChannel("C", locking_class); - s3=new LockService(c); - c.connect(CLUSTER); - - d=createChannel("D", locking_class); - s4=new LockService(d); - d.connect(CLUSTER); - - Util.waitUntilAllChannelsHaveSameView(10000, 1000, a, b, c, d); - } - - - @AfterMethod - protected void cleanup() { - Util.close(d, c, b, a); - } - - @BeforeMethod - protected void unlockAll() { - Stream.of(s4,s3,s2,s1).forEach(s -> { - if(s != null) s.unlockAll(); - }); - Thread.interrupted(); // clears any possible interrupts from the previous method - } - - /** - * The initial view is {A,B,C,D}. D holds the lock and unlocks it (on A), but the view is already {B,C,D} as A has - * left. However, at the time of the unlock request, the view is still {A,B,C,D} on D so the request is sent to A.
- * The unlock request from D (to the new coord B) is therefore lost and the lock is never released.
- * Therefore, when C tries to acquire the lock, it will fail as B thinks the lock is still held by D.
- * The lost request (due to the new view not being received at all members at the same wall-clock time) is simulated - * by a simple dropping of the release request on D. - */ - public void testUnsuccessfulUnlock(Class locking_class) throws Exception { - init(locking_class); - Lock lock=s4.getLock(LOCK); - boolean success=lock.tryLock(10, TimeUnit.SECONDS); // this should succeed as A is the lock server for LOCK - assert success; - - d.getProtocolStack().insertProtocol(new UnlockDropper(), ProtocolStack.Position.BELOW, Locking.class); - lock.unlock(); // this request will be dropped - - d.getProtocolStack().removeProtocol(UnlockDropper.class); // future release requests are not going to be dropped - - - a.close(); // B will be the new coordinator - - Util.waitUntilAllChannelsHaveSameView(10000, 1000, b,c,d); - - - Lock lock2=s3.getLock(LOCK); // C tries to acquire the lock - success=lock2.tryLock(5, TimeUnit.SECONDS); - assert success; - } - - - protected static JChannel createChannel(String name, Class locking_class) throws Exception { - Protocol[] stack=Util.getTestStack(locking_class.getDeclaredConstructor().newInstance().level("trace")); - return new JChannel(stack).name(name); - } - - - protected static void lock(Lock lock, String name) { - System.out.println("[" + Thread.currentThread().getId() + "] locking " + name); - lock.lock(); - System.out.println("[" + Thread.currentThread().getId() + "] locked " + name); - } - - protected static boolean tryLock(Lock lock, String name) { - System.out.println("[" + Thread.currentThread().getId() + "] tryLocking " + name); - boolean rc=lock.tryLock(); - System.out.println("[" + Thread.currentThread().getId() + "] " + (rc? "locked " : "failed locking ") + name); - return rc; - } - - protected static boolean tryLock(Lock lock, long timeout, String name) throws InterruptedException { - System.out.println("[" + Thread.currentThread().getId() + "] tryLocking " + name); - boolean rc=lock.tryLock(timeout, TimeUnit.MILLISECONDS); - System.out.println("[" + Thread.currentThread().getId() + "] " + (rc? "locked " : "failed locking ") + name); - return rc; - } - - protected static void unlock(Lock lock, String name) { - if(lock == null) - return; - System.out.println("[" + Thread.currentThread().getId() + "] releasing " + name); - lock.unlock(); - System.out.println("[" + Thread.currentThread().getId() + "] released " + name); - } - - - protected static class UnlockDropper extends Protocol { - - public Object down(Message msg) { - Locking lock_prot=(Locking)up_prot; - short CENTRAL_LOCK_ID=ClassConfigurator.getProtocolId(lock_prot.getClass()); - Locking.LockingHeader hdr=msg.getHeader(CENTRAL_LOCK_ID); - if(hdr != null) { - try { - Locking.Request req=Util.streamableFromBuffer(Locking.Request::new, msg.getArray(), msg.getOffset(), msg.getLength()); - switch(req.getType()) { - case RELEASE_LOCK: - System.out.printf("%s ---- dropping %s\n", up_prot.getProtocolStack().getChannel().getAddress(), req); - return null; - } - } - catch(Exception ex) { - log.error("failed deserializing request", ex); - return null; - } - } - return down_prot.down(msg); - } - } - -} diff --git a/tests/junit-functional/org/jgroups/blocks/RpcLockingTest.java b/tests/junit-functional/org/jgroups/blocks/RpcLockingTest.java deleted file mode 100644 index e2e6bc7d10e..00000000000 --- a/tests/junit-functional/org/jgroups/blocks/RpcLockingTest.java +++ /dev/null @@ -1,191 +0,0 @@ -package org.jgroups.blocks; - -import org.jgroups.Global; -import org.jgroups.JChannel; -import org.jgroups.Message; -import org.jgroups.ObjectMessage; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.protocols.*; -import org.jgroups.protocols.pbcast.GMS; -import org.jgroups.protocols.pbcast.NAKACK2; -import org.jgroups.util.Util; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.Arrays; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -@Test(groups = {Global.FUNCTIONAL,Global.EAP_EXCLUDED}, singleThreaded=true, dataProvider="createLockingProtocol") -public class RpcLockingTest { - protected JChannel a, b; - protected MessageDispatcher disp_a, disp_b; - protected Lock lock_a, lock_b; - - - - @DataProvider(name="createLockingProtocol") - Object[][] createLockingProtocol() { - return new Object[][] { - {CENTRAL_LOCK.class}, - {CENTRAL_LOCK2.class} - }; - } - - protected void setUp(Class locking_class) throws Exception { - System.out.print("Connecting channels: "); - a=createChannel("A", locking_class); - disp_a=new MessageDispatcher(a); - a.connect(RpcLockingTest.class.getSimpleName()); - lock_a=new LockService(a).getLock("lock"); - - b=createChannel("B", locking_class); - disp_b=new MessageDispatcher(b); - b.connect(RpcLockingTest.class.getSimpleName()); - lock_b=new LockService(b).getLock("lock"); - - Util.waitUntilAllChannelsHaveSameView(30000, 1000, a, b); - System.out.println(); - - disp_a.setRequestHandler(arg0 -> { - System.out.println("A received a message, will now try to lock the lock"); - if(lock_a.tryLock()) { - Assert.fail("Should not be able to lock the lock here"); - System.out.println("A aquired the lock, this shouldn't be possible"); - } - else - System.out.println("The lock was already locked, as it should be"); - return "Hello"; - }); - - disp_b.setRequestHandler(arg0 -> { - System.out.println("B received a message, will now try to lock the lock"); - if(lock_b.tryLock()) { - Assert.fail("Should not be able to lock the lock here"); - System.out.println("B aquired the lock, this shouldn't be possible"); - } - else - System.out.println("The lock already was locked, as it should be"); - return "Hello"; - }); - - // Print who is the coordinator - if (b.getView().getMembers().get(0).equals(b.getAddress())) - System.out.println("B is the coordinator"); - else - System.out.println("A is the coordinator"); - System.out.println(); - } - - @AfterMethod - void tearDown() throws Exception { - Util.close(b,a); - } - - protected static JChannel createChannel(String name, Class locking_class) throws Exception { - return new JChannel( - new SHARED_LOOPBACK(), new SHARED_LOOPBACK_PING(), - new MERGE3().setMinInterval(1000).setMaxInterval(3000), - new NAKACK2().useMcastXmit(false).logDiscardMessages(false).logNotFoundMessages(false), - new UNICAST3().setXmitTableNumRows(5).setXmitInterval(500), - new GMS().setJoinTimeout(1000).printLocalAddress(false).setLeaveTimeout(100) - .logViewWarnings(false).setViewAckCollectionTimeout(2000).logCollectMessages(false), - locking_class.getDeclaredConstructor().newInstance()) - .name(name); - } - - - - /** - * If the coordinator of the lock locks the lock and then send a message, - * the receiver will wait for ever in tryLock. However, castMessage will - * return after a while because of the default settings of RequestOptions.SYNC(). - */ - public void testCoordSendFirst(Class locking_class) throws Exception { - setUp(locking_class); - System.out.println("Running testCoordSendFirst"); - - // =========================================================================== - if (lock_a.tryLock()) { - try { - System.out.println("A aquired the lock, about to send message to B"); - String rsp=disp_a.sendMessage(new ObjectMessage(b.getAddress(), "bla"), - RequestOptions.SYNC().timeout(60000).flags(Message.Flag.OOB)); - if (rsp == null) { - System.err.println("ERROR: didn't return correctly"); - Assert.fail("Didn't return correctly"); - } else - System.out.println("Returned: " + rsp); - - } finally { - lock_a.unlock(); - } - } else { - Assert.fail("The lock was already locked"); - System.out.println("A failed to aquire the lock"); - } - // =========================================================================== - - System.out.println(); - } - - /** - * If the node that isn't the coordinator is the one who sends the message - * it works, but later when the coordinator sends the message, the receiver, will wait forever in tryLock. - */ - public void testCoordReceiveFirst(Class locking_class) throws Exception { - setUp(locking_class); - System.out.println("Running testCoordReceiveFirst"); - - if(lock_b.tryLock()) { - try { - System.out.println("B aquired the lock, about to send message to A"); - String rsp=disp_b.sendMessage(new ObjectMessage(a.getAddress(), "bla"), - RequestOptions.SYNC().flags(Message.Flag.OOB)); - if (rsp == null) { - System.err.println("ERROR: didn't return correctly"); - Assert.fail("Didn't return correctly"); - } else - System.out.println("Returned: " + rsp); - - } finally { - lock_b.unlock(); - } - } else { - Assert.fail("The lock was already locked"); - System.out.println("B failed to aquire the lock"); - } - // =========================================================================== - - if(lock_a.tryLock(5000, TimeUnit.MILLISECONDS)) { - try { - System.out.println("A aquired the lock, about to send message to B"); - String rsp = disp_a.sendMessage(new ObjectMessage(b.getAddress(), "bla"), - RequestOptions.SYNC().timeout(60000).flags(Message.Flag.OOB)); - if (rsp == null) { - System.err.println("ERROR: didn't return correctly"); - Assert.fail("Didn't return correctly"); - } - else - System.out.println("Returned: " + rsp); - } finally { - lock_a.unlock(); - } - } else { - Assert.fail("The lock was already locked"); - System.out.println("A failed to aquire the lock"); - } - // =========================================================================== - System.out.println(); - - } - - protected void enableTracing() { - for(JChannel ch: Arrays.asList(a,b)) - ch.getProtocolStack().findProtocol(Locking.class).setLevel("TRACE"); - } - - -} diff --git a/tests/junit-functional/org/jgroups/tests/ClusterSplitLockTest.java b/tests/junit-functional/org/jgroups/tests/ClusterSplitLockTest.java deleted file mode 100644 index 42bdcf6820a..00000000000 --- a/tests/junit-functional/org/jgroups/tests/ClusterSplitLockTest.java +++ /dev/null @@ -1,244 +0,0 @@ -package org.jgroups.tests; - -import org.jgroups.Address; -import org.jgroups.Global; -import org.jgroups.JChannel; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.protocols.CENTRAL_LOCK; -import org.jgroups.protocols.CENTRAL_LOCK2; -import org.jgroups.protocols.Locking; -import org.jgroups.stack.Protocol; -import org.jgroups.util.Util; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.stream.Stream; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.testng.AssertJUnit.*; - -/** Tests https://issues.redhat.com/browse/JGRP-2234 */ -@Test(groups = {Global.FUNCTIONAL, Global.EAP_EXCLUDED}, timeOut = 60000, dataProvider="createLockingProtocol") -public class ClusterSplitLockTest { - private static final int MEMBERS = 3; - private final JChannel[] channels = new JChannel[MEMBERS]; - private final LockService[] lockServices = new LockService[MEMBERS]; - private final ExecutorService[] execs = new ExecutorService[MEMBERS]; - - - @DataProvider(name="createLockingProtocol") - Object[][] createLockingProtocol() { - return new Object[][] { - {CENTRAL_LOCK.class}, - {CENTRAL_LOCK2.class} - }; - } - - protected void setUp(Class locking_class) throws Exception { - for (int i = 0; i < MEMBERS; i++) { - Locking lock_prot=locking_class.getDeclaredConstructor().newInstance().level("debug"); - if(lock_prot instanceof CENTRAL_LOCK) - ((CENTRAL_LOCK)lock_prot).setNumberOfBackups(2); - - Protocol[] stack = Util.getTestStack(lock_prot); - channels[i] = new JChannel(stack); - lockServices[i] = new LockService(channels[i]); - channels[i].setName(memberName(i)); - channels[i].connect("TEST"); - execs[i] = Executors.newCachedThreadPool(); - if (i == 0) - Util.sleep(500); - } - Util.waitUntilAllChannelsHaveSameView(10000, 1000, channels); - - // Make sure A is coordinator, because we blindly assume it is in the tests below. - assertEquals(channels[0].getAddress(), channels[0].getView().getCoord()); - } - - private void disconnectAndDestroy(int i) throws Exception { - JChannel channel = channels[i]; - Util.close(channel); - } - - @AfterMethod - protected void tearDown() throws Exception { - Util.closeReverse(channels); - Stream.of(execs).forEach(ExecutorService::shutdown); - for(ExecutorService ex: execs) - assertTrue(ex.awaitTermination(5, SECONDS)); - } - - /** - * Performs a test where the first member (also the initial coordinator) goes down - */ - public void testClusterSplitWhereAGoesDown(Class locking_class) throws Exception { - testClusterSplitImpl(0, locking_class); - } - - /** - * Performs a test where the second member goes down - */ - public void testClusterSplitWhereBGoesDown(Class locking_class) throws Exception { - testClusterSplitImpl(1, locking_class); - } - - /** - * Performs a test where the third member goes down - */ - public void testClusterSplitWhereCGoesDown(Class locking_class) throws Exception { - testClusterSplitImpl(2, locking_class); - } - - /** - * Performs a test where the specified downMember goes down when the member performed a lock operation on 50% - * of the locks. The coordinator should unlock all locks that were locked by the member that goes down. If the - * member that goes down is the coordinator the new channel coordinator should make sure the lock table is up-to-date. - */ - private void testClusterSplitImpl(final int downMember, Class locking_class) throws Exception { - setUp(locking_class); - - CountDownLatch doneOnMemberThatWillGoDown = new CountDownLatch(1); - - final int numLocks = 10; - final int halfway = numLocks / 2; - - List> futures = new ArrayList<>(); - - /* - * All members perform the specified number of lock() operations. The - * 'downMember' disconnects half way - */ - for (int i = 0; i < MEMBERS; i++) { - final int mbrIdx = i; - - final AtomicInteger unlockCount = new AtomicInteger(0); - - for (int j = 0; j < numLocks; j++) { - final int lockNr = j; - - if (mbrIdx == downMember) { - if (lockNr == halfway) { - futures.add(execs[downMember].submit(() -> { - try { - doneOnMemberThatWillGoDown.await(); - log("Disconnecting member %s", memberName(downMember)); - disconnectAndDestroy(downMember); - } catch (Exception e) { - throw new RuntimeException(e); - } - })); - } - if (lockNr >= halfway) { - break; - } - } - - futures.add(execs[mbrIdx].submit(() -> { - - Lock lock = lockServices[mbrIdx].getLock("testlock" + lockNr); - - try { - if (!lock.tryLock(5, SECONDS)) { - if (mbrIdx == downMember) { - fail(String.format("Member %s failed to lock %s using tryLock in healthy situation.", memberName(mbrIdx), lockNr)); - } else { - log("Failed to tryLock member:%s lock:%d LOCKS:\n%s", memberName(mbrIdx), lockNr, lockServices[mbrIdx].printLocks()); - return; - } - } else { - log("Member %s locked %d (threadid: %d)", memberName(mbrIdx), lockNr, Thread.currentThread().getId()); - } - } catch (InterruptedException ie) { - log("InterruptedException member:%s, lock:%d", memberName(mbrIdx), lockNr, ie); - fail("Interrupted on tryLock " + memberName(mbrIdx) + " - " + lockNr); - } - - try { - Thread.sleep(30); - } catch (InterruptedException e) { - fail("Interrupted while sleeping."); - } finally { - lock.unlock(); - log("Unlocked lock %d by member %s (threadid: %d)", lockNr, memberName(mbrIdx), Thread.currentThread().getId()); - - if (mbrIdx == downMember && halfway == unlockCount.incrementAndGet()) { - log("setting doneOnMemberThatWillGoDown flag"); - doneOnMemberThatWillGoDown.countDown(); - } - } - })); - } - - } - - /* wait for the chaos to disappear */ - for (Future fut : futures) { - fut.get(); - } - - StringBuilder locksOverview = new StringBuilder("\n==== first run done ====\n"); - for (int i = 0; i < MEMBERS; i++) { - locksOverview.append(String.format("Locks on member %s:\n%s\n", memberName(i), lockServices[i].printLocks())); - } - locksOverview.append("\n========================"); - log(locksOverview.toString()); - - /* - * All locks should be unlocked at this point so no try lock request - * should fail - */ - Thread.sleep(2000); - log("==== Checking if tryLock succeeds for all locks on all remaining members ====="); - for (int i = 0; i < MEMBERS; i++) { - if (i == downMember) { - continue; - } - - for (int j = 0; j < numLocks; j++) { - Lock l = lockServices[i].getLock("testlock" + j); - if (!l.tryLock()) { - logError("Failed to acquire lock on %d by member %s", j, memberName(i)); - Address coord = channels[i].getView().getCoord(); - int count = 0; - for (JChannel c : channels) { - if (null != c.getAddress() && c.getAddress().equals(coord)) { - logError("Lock table for %s (coord):\n%s", coord, lockServices[count].printLocks()); - break; - } - count++; - } - fail(String.format("Member %s can't lock:%d", memberName(i), j)); - } - l.unlock(); - } - - } - - } - - private static String memberName(int mbrIndex) { - return String.valueOf((char) ('A' + mbrIndex)); - } - - private static void log(String fmt, Object... args) { - log(System.out, fmt, args); - } - - private static void logError(String fmt, Object... args) { - log(System.err, fmt, args); - } - - private static void log(PrintStream out, String fmt, Object... args) { - out.println(String.format(fmt, args)); - } -} diff --git a/tests/junit-functional/org/jgroups/tests/LockServiceConcurrencyTest.java b/tests/junit-functional/org/jgroups/tests/LockServiceConcurrencyTest.java deleted file mode 100644 index d649cbd2619..00000000000 --- a/tests/junit-functional/org/jgroups/tests/LockServiceConcurrencyTest.java +++ /dev/null @@ -1,127 +0,0 @@ -package org.jgroups.tests; - -import org.jgroups.Global; -import org.jgroups.JChannel; -import org.jgroups.Message; -import org.jgroups.blocks.locking.LockService; -import org.jgroups.conf.ClassConfigurator; -import org.jgroups.protocols.CENTRAL_LOCK; -import org.jgroups.protocols.CENTRAL_LOCK2; -import org.jgroups.protocols.Locking; -import org.jgroups.stack.Protocol; -import org.jgroups.stack.ProtocolStack; -import org.jgroups.util.Util; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -/** - * Tests concurrent access to the locks provided by {@link org.jgroups.blocks.locking.LockService} - * @author Bela Ban - * @since 3.4 - */ -@Test(groups={Global.BYTEMAN,Global.EAP_EXCLUDED},singleThreaded=true,dataProvider="createLockingProtocol") -public class LockServiceConcurrencyTest { - protected JChannel a, b; - protected LockService ls_a, ls_b; - - - @DataProvider(name="createLockingProtocol") - Object[][] createLockingProtocol() { - return new Object[][] { - {CENTRAL_LOCK.class}, - {CENTRAL_LOCK2.class} - }; - } - - protected void init(Class locking_class) throws Exception { - a=new JChannel(Util.getTestStack(locking_class.getDeclaredConstructor().newInstance())).name("A"); - ls_a=new LockService(a); - a.connect("LockServiceConcurrencyTest"); - b=new JChannel(Util.getTestStack(locking_class.getDeclaredConstructor().newInstance())).name("B"); - ls_b=new LockService(b); - b.connect("LockServiceConcurrencyTest"); - Util.waitUntilAllChannelsHaveSameView(10000, 1000, a, b); - } - - @AfterMethod protected void destroy() { - ls_a.unlockAll(); - ls_b.unlockAll(); - Util.close(b,a); - } - - /** Tests JIRA https://issues.redhat.com/browse/JGRP-1679 */ - // @Test(invocationCount=100,dataProvider="createLockingProtocol") - public void testConcurrentClientLocks(Class locking_class) throws Exception { - init(locking_class); - - Lock lock=ls_b.getLock("L"); // A is the coordinator - - DropGrantResponse dropper=new DropGrantResponse(); - a.getProtocolStack().insertProtocol(dropper, ProtocolStack.Position.BELOW, Locking.class); - - // we're dropping the LOCK-GRANTED response for lock-id #1, so this lock acquisition must fail; lock L will not be released! - boolean success=lock.tryLock(1, TimeUnit.MILLISECONDS); - assert !success : "the lock acquisition should have failed"; - - - // the LOCK-GRANTED response for lock-id #2 is received, which is incorrect and therefore dropped - // tryLock() works the same, with or without timeout - success=lock.tryLock(10, TimeUnit.MILLISECONDS); - assert !success : "lock was acquired successfully - this is incorrect"; - - printLocks(a,b); - a.getProtocolStack().removeProtocol(DropGrantResponse.class); - } - - protected static void printLocks(JChannel... channels) { - for(JChannel ch: channels) { - Locking l=ch.getProtocolStack().findProtocol(Locking.class); - System.out.printf("**** server locks on %s: %s\n", ch.getAddress(), l.printServerLocks()); - } - } - - /** - * To be inserted on the coord (A): drops the first LOCK_GRANTED response (but queues it), then sends the queued - * LOCK_GRANTED as response to the next GRANT_LOCK request - */ - protected static class DropGrantResponse extends Protocol { - protected final BlockingQueue lock_granted_requests=new ArrayBlockingQueue<>(1); - - public Object down(Message msg) { - Locking lock_prot=(Locking)up_prot; - short lock_prot_id=ClassConfigurator.getProtocolId(lock_prot.getClass()); - - Locking.LockingHeader hdr=msg.getHeader(lock_prot_id); - if(hdr != null) { - try { - Locking.Request req=Util.streamableFromBuffer(Locking.Request::new, msg.getArray(), msg.getOffset(), msg.getLength()); - switch(req.getType()) { - case LOCK_GRANTED: - boolean added=lock_granted_requests.offer(msg); - if(added) - System.out.printf("==> queued the LOCK_GRANTED response to be sent %s\n", req); - else { - // send the queued LOCK_GRANTED response - Message lock_granted_req=lock_granted_requests.peek(); - System.out.println("==> sending the queued LOCK_GRANTED response"); - down_prot.down(lock_granted_req); - lock_granted_req=null; - } - return null; - } - } - catch(Exception ex) { - log.error("failed deserializing request", ex); - } - } - return down_prot != null? down_prot.down(msg) : null; - } - - } -}