Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

TryLock with timeout #168

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions zk/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"fmt"
"strconv"
"strings"
"time"
)

var (
// ErrDeadlock is returned by Lock when trying to lock twice without unlocking first
ErrDeadlock = errors.New("zk: trying to acquire a lock twice")
// ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired.
ErrNotLocked = errors.New("zk: not locked")
// ErrTimeout is returned by Lock when trying to lock and timeout is reached before lock is acquired.
ErrTimeout = errors.New("zk: acquire timeout")
)

// Lock is a mutual exclusion lock.
Expand All @@ -20,6 +23,7 @@ type Lock struct {
path string
acl []ACL
lockPath string
locked bool
seq int
}

Expand All @@ -39,6 +43,29 @@ func parseSeq(path string) (int, error) {
return strconv.Atoi(parts[len(parts)-1])
}

// Lock attempts to acquire the lock. It will wait up to its timeout duration
// to return until the lock is acquired or an error occurs.
// If this instance already has the lock then ErrDeadlock is returned. If timeout
// reached return ErrTimeout is returned.
func (l *Lock) TryLock(timeout time.Duration) error {
var err error
var done = make(chan struct{}, 1)
go func() {
err = l.Lock()
close(done)
}()
select {
case <-time.After(timeout):
l.locked = false
if err := l.c.Delete(l.lockPath, -1); err != nil {
return err
}
return ErrTimeout
case <-done:
return err
}
}

// Lock attempts to acquire the lock. It will wait to return until the lock
// is acquired or an error occurs. If this instance already has the lock
// then ErrDeadlock is returned.
Expand Down Expand Up @@ -87,6 +114,9 @@ func (l *Lock) Lock() error {
return err
}

l.seq = seq
l.lockPath = path

for {
children, _, err := l.c.Children(l.path)
if err != nil {
Expand Down Expand Up @@ -130,21 +160,21 @@ func (l *Lock) Lock() error {
}
}

l.seq = seq
l.lockPath = path
l.locked = true
return nil
}

// Unlock releases an acquired lock. If the lock is not currently acquired by
// this Lock instance than ErrNotLocked is returned.
func (l *Lock) Unlock() error {
if l.lockPath == "" {
if !l.locked || l.lockPath == "" {
return ErrNotLocked
}
if err := l.c.Delete(l.lockPath, -1); err != nil {
return err
}
l.lockPath = ""
l.seq = 0
l.locked = false
return nil
}
30 changes: 30 additions & 0 deletions zk/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,36 @@ import (
"time"
)

func TestTryLock(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
acls := WorldACL(PermAll)
l := NewLock(zk, "/test", acls)
if err := l.TryLock(time.Second); err != nil {
t.Fatal(err)
}
if err := l.Unlock(); err != nil {
t.Fatal(err)
}
//
if err := l.Lock(); err != nil {
t.Fatal(err)
}
defer l.Unlock()
// should return timeout err since lock is not released
if err := l.TryLock(time.Second); err == nil {
t.Fatal(err)
}
}

func TestLock(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down