Skip to content

Commit

Permalink
Removed unneeded now unneeded synchronization from Conditions in Lock…
Browse files Browse the repository at this point in the history
…ing. Added additional tests for Conditions used in LockService.
  • Loading branch information
wburns committed Aug 15, 2013
1 parent 48f355c commit f983e96
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 23 deletions.
28 changes: 11 additions & 17 deletions src/org/jgroups/protocols/Locking.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,7 @@ protected void handleDeleteAwaitRequest(String lock_name, Owner owner) {
protected void handleSignalResponse(String lock_name, Owner owner) {
ClientLock lock=client_lock_table.getLock(lock_name,owner,false);
if(lock != null) {
synchronized (lock.condition) {
lock.condition.signaled();
}
lock.condition.signaled();
}
else {
log.error("Condition response was client lock was not present. Ignored signal.");
Expand Down Expand Up @@ -494,12 +492,10 @@ protected void handleDeleteLockRequest(String lock_name) {
ServerLock server_lock = server_locks.get(lock_name);
if(server_lock == null)
return;
synchronized (server_lock.condition) {
if (server_lock.condition.queue.isEmpty())
server_locks.remove(lock_name);
else
server_lock.current_owner = null;
}
if (server_lock.condition.queue.isEmpty())
server_locks.remove(lock_name);
else
server_lock.current_owner = null;
}
finally {
lock.unlock();
Expand Down Expand Up @@ -532,11 +528,9 @@ protected void handleDeleteAwaitingRequest(String lock_name, Owner owner) {
try {
ServerLock server_lock = server_locks.get(lock_name);
if (server_lock != null) {
synchronized (server_lock.condition) {
server_lock.condition.queue.remove(owner);
if (server_lock.condition.queue.isEmpty() && server_lock.current_owner == null) {
server_locks.remove(lock_name);
}
server_lock.condition.queue.remove(owner);
if (server_lock.condition.queue.isEmpty() && server_lock.current_owner == null) {
server_locks.remove(lock_name);
}
}
}
Expand Down Expand Up @@ -787,23 +781,23 @@ public ServerCondition(ServerLock lock) {
this.lock = lock;
}

public synchronized void addWaiter(Owner waiter) {
public void addWaiter(Owner waiter) {
notifyAwaiting(lock.lock_name, waiter);
if (log.isTraceEnabled()) {
log.trace("Waiter [" + waiter + "] was added for " + lock.lock_name);
}
queue.add(waiter);
}

public synchronized void removeWaiter(Owner waiter) {
public void removeWaiter(Owner waiter) {
notifyAwaited(lock.lock_name, waiter);
if (log.isTraceEnabled()) {
log.trace("Waiter [" + waiter + "] was removed for " + lock.lock_name);
}
queue.remove(waiter);
}

public synchronized void signal(boolean all) {
public void signal(boolean all) {
if (queue.isEmpty()) {
if (log.isTraceEnabled()) {
log.trace("Signal for [" + lock.lock_name +
Expand Down
142 changes: 136 additions & 6 deletions tests/junit-functional/org/jgroups/blocks/LockServiceTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package org.jgroups.blocks;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.blocks.locking.LockService;
Expand All @@ -11,12 +20,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/** Tests {@link org.jgroups.blocks.locking.LockService}
* @author Bela Ban
*/
Expand Down Expand Up @@ -239,6 +242,56 @@ public void testConcurrentLockRequestsFromDifferentMembers() throws Exception {
assert num_acquired == 1 : "expected 1 but got " + num_acquired;
}

public void testSuccessfulSignalOneTimeout() throws InterruptedException, BrokenBarrierException {
Lock lock2 = s2.getLock(LOCK);
Thread locker = new Signaller(false);
boolean rc = tryLock(lock2, 5000, LOCK);
assert rc;
locker.start();
assert awaitNanos(lock2.newCondition(), TimeUnit.SECONDS.toNanos(5), LOCK) > 0 : "Condition was not signalled";
unlock(lock2, LOCK);
}

public void testInterruptWhileWaitingForCondition() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Thread awaiter = new Thread(new InterruptAwaiter(latch));
awaiter.start();
Lock lock2 = s2.getLock(LOCK);
assert tryLock(lock2, 5000, LOCK);
awaiter.interrupt();
// This should not hit, since we have the lock and the condition can't
// come out yet then
assert !latch.await(1, TimeUnit.SECONDS);
assert awaiter.isAlive();
lock2.unlock();
assert latch.await(100, TimeUnit.MILLISECONDS);
}

public void testSignalAllAwakesAllForCondition() throws InterruptedException {
final int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);

ExecutorService service = Executors.newFixedThreadPool(threadCount);
try {

for (int i = 0; i < threadCount; ++i) {
service.submit(new SyncAwaiter(latch));

}
// Wait for all the threads to be waiting on condition
latch.await(2, TimeUnit.SECONDS);

Lock lock2 = s2.getLock(LOCK);
assert tryLock(lock2, 5000, LOCK);
lock2.newCondition().signalAll();
lock2.unlock();
service.shutdown();
service.awaitTermination(2, TimeUnit.SECONDS);
}
finally {
service.shutdownNow();
}
}

protected JChannel createChannel(String name) throws Exception {
Protocol[] tmp=Util.getTestStack();
Expand Down Expand Up @@ -302,6 +355,57 @@ public void run() {
}
}

protected abstract class AbstractAwaiter implements Runnable {
public void afterLock() { }

public void onInterrupt() { }

public void run() {
lock(lock, LOCK);
try {
afterLock();
try {
lock.newCondition().await(2, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
onInterrupt();
}
}
catch(Exception e) {
e.printStackTrace();
}
finally {
unlock(lock, LOCK);
}
}
}

protected class InterruptAwaiter extends AbstractAwaiter {
final CountDownLatch latch;

public InterruptAwaiter(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void onInterrupt() {
latch.countDown();
}
}

protected class SyncAwaiter extends AbstractAwaiter {
final CountDownLatch latch;

public SyncAwaiter(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void afterLock() {
latch.countDown();
}
}


protected static class TryLocker extends Thread {
protected final Lock mylock;
Expand Down Expand Up @@ -340,6 +444,32 @@ public void run() {
}
}

protected static class AcquireLockAndAwaitCondition extends Thread {
private final Lock lock;

public AcquireLockAndAwaitCondition(Lock lock) {
this.lock = lock;
}

@Override
public void run() {
if (tryLock(lock, LOCK)) {
try {
Condition condition = lock.newCondition();
try {
condition.await();
} catch (InterruptedException e) {
System.out.println("");
}
}
finally {
unlock(lock, LOCK);
}
}

}
}


protected static void lock(Lock lock, String name) {
System.out.println("[" + Thread.currentThread().getId() + "] locking " + name);
Expand Down

0 comments on commit f983e96

Please sign in to comment.