Skip to content

Commit

Permalink
Merge pull request #390 from DataDog/jamie/registry
Browse files Browse the repository at this point in the history
jamie/registry
  • Loading branch information
jamiealquiza authored Feb 8, 2022
2 parents 71aa618 + 8a90044 commit ab02050
Show file tree
Hide file tree
Showing 22 changed files with 568 additions and 167 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Lock interface {
// through this interface. A context is accepted for setting wait bounds.
Lock(context.Context) error
Unlock(context.Context) error
UnlockLogError(context.Context)
// Owner returns the current owner value.
Owner() interface{}
}
4 changes: 3 additions & 1 deletion cluster/zookeeper/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Overview

This package provides a ZooKeeper backed, coarse grained distributed lock. The lock path is determined at instantiation time. At request time, locks are enqueued and block until the lock is either acquired or the context deadline is met.
This package provides a ZooKeeper backed, coarse grained distributed lock. The lock path is determined at instantiation time. At request time, locks are enqueued and block until the lock is either acquired or the context deadline is met. Locks can be configured with an optional TTL.

Further implementation notes:
- Locks are enqueued and granted in order as locks ahead are relinquished or timed out.
- Session timeouts/disconnects are handled through ZooKeeper sessions with automatic cleanup; locks that fail to acquire before the context timeout are removed from the queue even if the lock session is still active.
- Setting a `ZooKeeperLockConfig.TTL` value > 0 enables lock TTLs. Take note that TTL expirations are handled at request time from contending locks; if service A is not using TTLs and service B is, service B can forcibly abort service A locks.

# Examples

Expand Down Expand Up @@ -37,6 +38,7 @@ func main() {
cfg := zklocking.ZooKeeperLockConfig{
Address: "localhost:2181",
Path: "/my/locks",
TTL: 30000,
OwnerKey: "owner",
}

Expand Down
10 changes: 10 additions & 0 deletions cluster/zookeeper/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,13 @@ type ErrUnlockingFailed struct {
func (err ErrUnlockingFailed) Error() string {
return fmt.Sprintf("attempt to release lock failed: %s", err.message)
}

// ErrExpireLockFailed is returned when a lock with an expired TTL fails to purge.
type ErrExpireLockFailed struct {
message string
}

// Error returns an error string.
func (err ErrExpireLockFailed) Error() string {
return fmt.Sprintf("failed to TTL expire lock: %s", err.message)
}
156 changes: 141 additions & 15 deletions cluster/zookeeper/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,82 @@ package zookeeper

import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"github.com/go-zookeeper/zk"
)

// lockMetadata is internal metadata persisted in the lock znode.
type lockMetadata struct {
Timestamp time.Time `json:"timestamp"`
TTLDeadline time.Time `json:"ttl_deadline"`
OwnerID string `json:"owner_id"`
}

// Lock attemps to acquire a lock. If the lock cannot be acquired by the context
// deadline, the lock attempt times out.
func (z *ZooKeeperLock) Lock(ctx context.Context) error {
// Check if the context has a lock owner value. If so, check if this owner
// already has the lock.
if owner := ctx.Value(z.OwnerKey); owner != nil && owner == z.Owner() {
owner := ctx.Value(z.OwnerKey)
if owner != nil && owner == z.Owner() {
return ErrAlreadyOwnLock
}

// Populate a lockMetadata.
meta := lockMetadata{
Timestamp: time.Now(),
TTLDeadline: time.Now().Add(time.Duration(z.TTL) * time.Millisecond),
OwnerID: fmt.Sprintf("%v", owner),
}
metaJSON, _ := json.Marshal(meta)

// Enter the claim into ZooKeeper.
lockPath := fmt.Sprintf("%s/lock-", z.Path)
node, err := z.c.CreateProtectedEphemeralSequential(lockPath, nil, zk.WorldACL(31))
node, err := z.c.CreateProtectedEphemeralSequential(lockPath, metaJSON, zk.WorldACL(31))

// In all return paths other than the case that we have successfully acquired
// a lock, it's critical that we remove the claim znode.
var removeZnodeAtExit bool = true
defer func() {
if removeZnodeAtExit {
z.deleteLockZnode(node)
}
}()

// Handle the error after the cleanup defer is registered. It's likely that
// 'node' will always be an empty string if there's a non-nil error, anyway.
if err != nil {
return ErrLockingFailed{message: err.Error()}
}

// Get our claim ID.
thisID, err := idFromZnode(node)
if err != nil {
z.deleteLockZnode(node)
return ErrLockingFailed{message: err.Error()}
}

var interval int
var lockWaitingErr error
for {
// Prevent thrashing.
interval++
if interval%5 == 0 {

// Max failure threshold.
if interval > 5 && lockWaitingErr != nil {
return ErrLockingFailed{message: lockWaitingErr.Error()}
}

// Prevent thrashing.
if interval > 1 {
time.Sleep(50 * time.Millisecond)
}

// Get all current locks.
locks, err := z.locks()
if err != nil {
z.deleteLockZnode(node)
return ErrLockingFailed{message: err.Error()}
}

Expand All @@ -51,36 +86,47 @@ func (z *ZooKeeperLock) Lock(ctx context.Context) error {
if thisID == firstClaim {
// We have the lock.
z.mu.Lock()

// Update the lock znode.
z.lockZnode, err = locks.LockPath(thisID)
// Set the owner value if the context OwnerKey is specified.
if owner := ctx.Value(z.OwnerKey); owner != nil {
z.owner = owner
}

// XXX preventing this znode from being terminated is essential.
removeZnodeAtExit = false

z.mu.Unlock()

return nil
}

// If we're here, we don't have the lock; we need to enqueue our wait position
// by watching the ID immediately ahead of ours.
lockAhead, err := locks.LockAhead(thisID)
// If we're here, we don't have the lock but can enqueue.

// First, we'll check if the lock ahead has an expired TTL.
expiredLock, err := z.expireLockAhead(locks, thisID)
if err != nil {
return ErrLockingFailed{message: err.Error()}
lockWaitingErr = err
continue
}

lockAheadPath, _ := locks.LockPath(lockAhead)
// If so, restart the iteration to get a refreshed linked list.
if expiredLock {
continue
}

// Get a ZooKeeper watch on the lock we're waiting on.
_, _, blockingLockReleased, err := z.c.GetW(lockAheadPath)
// Enqueue our wait position by watching the ID immediately ahead of ours.
blockingLockReleased, err := z.getLockAheadWait(locks, thisID)
if err != nil {
lockWaitingErr = err
continue
}

// Race the watch event against the context timeout.
select {
// We've timed out.
case <-ctx.Done():
// XXX it's critical that we clean up the attempted lock.
z.deleteLockZnode(node)
return ErrLockingTimedOut
// Else see if we can get the claim.
case <-blockingLockReleased:
Expand Down Expand Up @@ -119,6 +165,13 @@ func (z *ZooKeeperLock) Unlock(ctx context.Context) error {
return nil
}

// Unlock releases a lock and logs, rather than returning, any errors if encountered.
func (z *ZooKeeperLock) UnlockLogError(ctx context.Context) {
if err := z.Unlock(ctx); err != nil {
log.Println(err)
}
}

func (z *ZooKeeperLock) deleteLockZnode(p string) error {
// We have to get the znode first; the current version is required for
// the delete request.
Expand All @@ -135,3 +188,76 @@ func (z *ZooKeeperLock) deleteLockZnode(p string) error {

return nil
}

// expireLockAhead takes an ID and checks if the lock ahead of it has an expired
// TTL. If so, it purges the lock and returns true.
func (z *ZooKeeperLock) expireLockAhead(locks LockEntries, id int) (bool, error) {
// TTLs aren't being used.
if z.TTL == 0 {
return false, nil
}

// Get the path to the lock ahead.
lockAheadPath, err := lockAheadPath(locks, id)
if err != nil {
return false, ErrExpireLockFailed{message: err.Error()}
}

// Get its metadata.
dat, _, err := z.c.Get(lockAheadPath)
if err != nil {
return false, ErrExpireLockFailed{message: err.Error()}
}

// Deserialize.
var metadata lockMetadata
if err := json.Unmarshal(dat, &metadata); err != nil {
return false, ErrExpireLockFailed{message: err.Error()}
}

// Check if it's expired.
if time.Now().Before(metadata.TTLDeadline) {
return false, nil
}

// We can purge the lock.
if err := z.deleteLockZnode(lockAheadPath); err != nil {
return false, ErrExpireLockFailed{message: err.Error()}
}

// Clear the lock state.
z.mu.Lock()
z.lockZnode = ""
z.owner = nil
z.mu.Unlock()

return true, nil
}

// getLockAheadWait takes a lock ID and returns a watch on the lock immediately
// ahead of it.
func (z *ZooKeeperLock) getLockAheadWait(locks LockEntries, id int) (<-chan zk.Event, error) {
lockAheadPath, err := lockAheadPath(locks, id)
if err != nil {
return nil, err
}

// Get a ZooKeeper watch on the lock path we're waiting on.
_, _, watch, err := z.c.GetW(lockAheadPath)
if err != nil {
return nil, err
}

return watch, nil
}

func lockAheadPath(locks LockEntries, id int) (string, error) {
// Find the lock ID ahead.
lockAhead, err := locks.LockAhead(id)
if err != nil {
return "", err
}

// Get its path.
return locks.LockPath(lockAhead)
}
49 changes: 49 additions & 0 deletions cluster/zookeeper/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package zookeeper

import (
"context"
"fmt"
"testing"
"time"

"github.com/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -55,3 +57,50 @@ func TestUnlock(t *testing.T) {
err = lock.Lock(ctx)
assert.Nil(t, err)
}

func TestExpireLockAhead(t *testing.T) {
lock := newMockZooKeeperLock()
ctx, cf := context.WithTimeout(context.Background(), 60*time.Second)
_ = cf
ctx = context.WithValue(ctx, "owner", "test_owner")

// This lock should succeed normally.
err := lock.Lock(ctx)
assert.Nil(t, err)

// Enter a pending claim. This mimics the initial znode entry in the ZooKeeperLock
// Lock method. We do this rather than calling the Lock method entirely
// to exclude other operations that may affect what we really want to test.
lockPath := fmt.Sprintf("%s/lock-", lock.Path)
node, _ := lock.c.CreateProtectedEphemeralSequential(lockPath, nil, zk.WorldACL(31))
id, _ := idFromZnode(node)

// Check that the lock state has been populated.
assert.Equal(t, lock.owner, "test_owner")
assert.Equal(t, lock.lockZnode, "/locks/_c_979cb11f40bb3dbc6908edeaac8f2de1-lock-000000001")

// Get the current lock entries.
le, _ := lock.locks()

// Ensure we exceed the mock ZooKeeperLock.TTL of 10ms.
time.Sleep(30 * time.Millisecond)

// This scenario should result in an expiry. We have an active lock ID 1
// from the above Lock() call.
expired, err := lock.expireLockAhead(le, id)
assert.Nil(t, err)
assert.True(t, expired)

// Refresh the lock entries.
le, _ = lock.locks()

// This should now fail; the lock was expired and the only entry is ID 2
// for the pending claim we entered above.
expired, err = lock.expireLockAhead(le, id)
assert.Equal(t, err, ErrExpireLockFailed{message: "unable to determine which lock to enqueue behind"})
assert.False(t, expired)

// Check that the lock state has been cleared.
assert.Nil(t, lock.owner)
assert.Equal(t, lock.lockZnode, "")
}
Loading

0 comments on commit ab02050

Please sign in to comment.