From f983e961a9765801615d5583a5311fb8cf1341fb Mon Sep 17 00:00:00 2001 From: William Burns Date: Thu, 15 Aug 2013 15:54:52 -0700 Subject: [PATCH] Removed unneeded now unneeded synchronization from Conditions in Locking. Added additional tests for Conditions used in LockService. --- src/org/jgroups/protocols/Locking.java | 28 ++-- .../org/jgroups/blocks/LockServiceTest.java | 142 +++++++++++++++++- 2 files changed, 147 insertions(+), 23 deletions(-) diff --git a/src/org/jgroups/protocols/Locking.java b/src/org/jgroups/protocols/Locking.java index 02b31ee2c22..860bc6fb8b4 100644 --- a/src/org/jgroups/protocols/Locking.java +++ b/src/org/jgroups/protocols/Locking.java @@ -446,9 +446,7 @@ protected void handleDeleteAwaitRequest(String lock_name, Owner owner) { protected void handleSignalResponse(String lock_name, Owner owner) { ClientLock lock=client_lock_table.getLock(lock_name,owner,false); if(lock != null) { - synchronized (lock.condition) { - lock.condition.signaled(); - } + lock.condition.signaled(); } else { log.error("Condition response was client lock was not present. Ignored signal."); @@ -494,12 +492,10 @@ protected void handleDeleteLockRequest(String lock_name) { ServerLock server_lock = server_locks.get(lock_name); if(server_lock == null) return; - synchronized (server_lock.condition) { - if (server_lock.condition.queue.isEmpty()) - server_locks.remove(lock_name); - else - server_lock.current_owner = null; - } + if (server_lock.condition.queue.isEmpty()) + server_locks.remove(lock_name); + else + server_lock.current_owner = null; } finally { lock.unlock(); @@ -532,11 +528,9 @@ protected void handleDeleteAwaitingRequest(String lock_name, Owner owner) { try { ServerLock server_lock = server_locks.get(lock_name); if (server_lock != null) { - synchronized (server_lock.condition) { - server_lock.condition.queue.remove(owner); - if (server_lock.condition.queue.isEmpty() && server_lock.current_owner == null) { - server_locks.remove(lock_name); - } + server_lock.condition.queue.remove(owner); + if (server_lock.condition.queue.isEmpty() && server_lock.current_owner == null) { + server_locks.remove(lock_name); } } } @@ -787,7 +781,7 @@ public ServerCondition(ServerLock lock) { this.lock = lock; } - public synchronized void addWaiter(Owner waiter) { + public void addWaiter(Owner waiter) { notifyAwaiting(lock.lock_name, waiter); if (log.isTraceEnabled()) { log.trace("Waiter [" + waiter + "] was added for " + lock.lock_name); @@ -795,7 +789,7 @@ public synchronized void addWaiter(Owner waiter) { queue.add(waiter); } - public synchronized void removeWaiter(Owner waiter) { + public void removeWaiter(Owner waiter) { notifyAwaited(lock.lock_name, waiter); if (log.isTraceEnabled()) { log.trace("Waiter [" + waiter + "] was removed for " + lock.lock_name); @@ -803,7 +797,7 @@ public synchronized void removeWaiter(Owner waiter) { queue.remove(waiter); } - public synchronized void signal(boolean all) { + public void signal(boolean all) { if (queue.isEmpty()) { if (log.isTraceEnabled()) { log.trace("Signal for [" + lock.lock_name + diff --git a/tests/junit-functional/org/jgroups/blocks/LockServiceTest.java b/tests/junit-functional/org/jgroups/blocks/LockServiceTest.java index 772fbfe7a98..532f4b231ea 100644 --- a/tests/junit-functional/org/jgroups/blocks/LockServiceTest.java +++ b/tests/junit-functional/org/jgroups/blocks/LockServiceTest.java @@ -1,5 +1,14 @@ package org.jgroups.blocks; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + import org.jgroups.Global; import org.jgroups.JChannel; import org.jgroups.blocks.locking.LockService; @@ -11,12 +20,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - /** Tests {@link org.jgroups.blocks.locking.LockService} * @author Bela Ban */ @@ -239,6 +242,56 @@ public void testConcurrentLockRequestsFromDifferentMembers() throws Exception { assert num_acquired == 1 : "expected 1 but got " + num_acquired; } + public void testSuccessfulSignalOneTimeout() throws InterruptedException, BrokenBarrierException { + Lock lock2 = s2.getLock(LOCK); + Thread locker = new Signaller(false); + boolean rc = tryLock(lock2, 5000, LOCK); + assert rc; + locker.start(); + assert awaitNanos(lock2.newCondition(), TimeUnit.SECONDS.toNanos(5), LOCK) > 0 : "Condition was not signalled"; + unlock(lock2, LOCK); + } + + public void testInterruptWhileWaitingForCondition() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + Thread awaiter = new Thread(new InterruptAwaiter(latch)); + awaiter.start(); + Lock lock2 = s2.getLock(LOCK); + assert tryLock(lock2, 5000, LOCK); + awaiter.interrupt(); + // This should not hit, since we have the lock and the condition can't + // come out yet then + assert !latch.await(1, TimeUnit.SECONDS); + assert awaiter.isAlive(); + lock2.unlock(); + assert latch.await(100, TimeUnit.MILLISECONDS); + } + + public void testSignalAllAwakesAllForCondition() throws InterruptedException { + final int threadCount = 5; + CountDownLatch latch = new CountDownLatch(threadCount); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + try { + + for (int i = 0; i < threadCount; ++i) { + service.submit(new SyncAwaiter(latch)); + + } + // Wait for all the threads to be waiting on condition + latch.await(2, TimeUnit.SECONDS); + + Lock lock2 = s2.getLock(LOCK); + assert tryLock(lock2, 5000, LOCK); + lock2.newCondition().signalAll(); + lock2.unlock(); + service.shutdown(); + service.awaitTermination(2, TimeUnit.SECONDS); + } + finally { + service.shutdownNow(); + } + } protected JChannel createChannel(String name) throws Exception { Protocol[] tmp=Util.getTestStack(); @@ -302,6 +355,57 @@ public void run() { } } + protected abstract class AbstractAwaiter implements Runnable { + public void afterLock() { } + + public void onInterrupt() { } + + public void run() { + lock(lock, LOCK); + try { + afterLock(); + try { + lock.newCondition().await(2, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + onInterrupt(); + } + } + catch(Exception e) { + e.printStackTrace(); + } + finally { + unlock(lock, LOCK); + } + } + } + + protected class InterruptAwaiter extends AbstractAwaiter { + final CountDownLatch latch; + + public InterruptAwaiter(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void onInterrupt() { + latch.countDown(); + } + } + + protected class SyncAwaiter extends AbstractAwaiter { + final CountDownLatch latch; + + public SyncAwaiter(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void afterLock() { + latch.countDown(); + } + } + protected static class TryLocker extends Thread { protected final Lock mylock; @@ -340,6 +444,32 @@ public void run() { } } + protected static class AcquireLockAndAwaitCondition extends Thread { + private final Lock lock; + + public AcquireLockAndAwaitCondition(Lock lock) { + this.lock = lock; + } + + @Override + public void run() { + if (tryLock(lock, LOCK)) { + try { + Condition condition = lock.newCondition(); + try { + condition.await(); + } catch (InterruptedException e) { + System.out.println(""); + } + } + finally { + unlock(lock, LOCK); + } + } + + } + } + protected static void lock(Lock lock, String name) { System.out.println("[" + Thread.currentThread().getId() + "] locking " + name);