Skip to content

Commit

Permalink
feat(coordinator): Added Coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Flc゛ <four_leaf_clover@foxmail.com>
  • Loading branch information
flc1125 committed Feb 15, 2024
1 parent 3a46d53 commit 55b1e8b
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 0 deletions.
46 changes: 46 additions & 0 deletions coordinator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Coordinator

## Usage

```go
package main

import (
"fmt"
"sync"

"github.com/go-kratos-ecosystem/components/v2/coordinator"
)

func main() {
var wg sync.WaitGroup
wg.Add(3) //nolint:gomnd

go func() {
defer wg.Done()
if <-coordinator.Until("foo").Done(); true {
fmt.Println("foo")
}
}()

go func() {
defer wg.Done()
if <-coordinator.Until("foo").Done(); true {
fmt.Println("foo 2")
}
}()

go func() {
defer wg.Done()
if <-coordinator.Until("bar").Done(); true {
fmt.Println("bar")
}
}()

coordinator.Until("foo").Close()
coordinator.Until("bar").Close()

wg.Wait()
}

```
24 changes: 24 additions & 0 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package coordinator

import "sync"

type Coordinator struct {
c chan struct{}
once sync.Once
}

func NewCoordinator() *Coordinator {
return &Coordinator{
c: make(chan struct{}),
}
}

func (c *Coordinator) Done() <-chan struct{} {
return c.c
}

func (c *Coordinator) Close() {
c.once.Do(func() {
close(c.c)
})
}
63 changes: 63 additions & 0 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package coordinator

import (
"sync"
"testing"
"time"
)

func TestCoordinator(t *testing.T) {
c := NewCoordinator()
var wg sync.WaitGroup

wg.Add(1)

go func() {
defer wg.Done()

timer := time.NewTimer(1 * time.Second)
defer timer.Stop()

for {
select {
case <-c.Done():
return
case <-timer.C:
t.Error("timeout")
return
}
}
}()

c.Close()
wg.Wait()
}

func TestCoordinator2(t *testing.T) {
c := NewCoordinator()
var wg sync.WaitGroup

wg.Add(1)

go func() {
defer wg.Done()

timer := time.NewTimer(1 * time.Second)
defer timer.Stop()

for {
select {
case <-c.Done():
t.Error("timeout")
return
case <-timer.C:
return
}
}
}()

time.Sleep(2 * time.Second)

c.Close()
wg.Wait()
}
63 changes: 63 additions & 0 deletions coordinator/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package coordinator

import "sync"

type Manager struct {
coordinators map[string]*Coordinator
mu sync.RWMutex
}

func NewManager() *Manager {
return &Manager{
coordinators: make(map[string]*Coordinator),
}
}

func (m *Manager) Until(identifier string) *Coordinator {
m.mu.Lock()
defer m.mu.Unlock()

if c, ok := m.coordinators[identifier]; ok {
return c
}

c := NewCoordinator()
m.coordinators[identifier] = c

return c
}

func (m *Manager) Close(identifier string) {
m.mu.Lock()
defer m.mu.Unlock()

if c, ok := m.coordinators[identifier]; ok {
c.Close()
delete(m.coordinators, identifier)
}

Check warning on line 37 in coordinator/manager.go

View check run for this annotation

Codecov / codecov/patch

coordinator/manager.go#L30-L37

Added lines #L30 - L37 were not covered by tests
}

func (m *Manager) Clear() {
m.mu.Lock()
defer m.mu.Unlock()

for _, c := range m.coordinators {
c.Close()
}

Check warning on line 46 in coordinator/manager.go

View check run for this annotation

Codecov / codecov/patch

coordinator/manager.go#L40-L46

Added lines #L40 - L46 were not covered by tests

m.coordinators = make(map[string]*Coordinator)

Check warning on line 48 in coordinator/manager.go

View check run for this annotation

Codecov / codecov/patch

coordinator/manager.go#L48

Added line #L48 was not covered by tests
}

var defaultManager = NewManager()

func Until(identifier string) *Coordinator {
return defaultManager.Until(identifier)
}

func Close(identifier string) {
defaultManager.Close(identifier)

Check warning on line 58 in coordinator/manager.go

View check run for this annotation

Codecov / codecov/patch

coordinator/manager.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

func Clear() {
defaultManager.Clear()

Check warning on line 62 in coordinator/manager.go

View check run for this annotation

Codecov / codecov/patch

coordinator/manager.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}
42 changes: 42 additions & 0 deletions coordinator/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package coordinator

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestManager(t *testing.T) {
wg := sync.WaitGroup{}
ch := make(chan struct{}, 2)

c1 := Until("foo")
c2 := Until("foo")

assert.Same(t, c1, c2)
assert.Equal(t, 0, len(ch))

wg.Add(2)
go func() {
defer wg.Done()

if <-c1.Done(); true {
ch <- struct{}{}
return
}
}()
go func() {
defer wg.Done()

if <-c2.Done(); true {
ch <- struct{}{}
return
}
}()

c1.Close()

wg.Wait()
assert.Equal(t, 2, len(ch))
}

0 comments on commit 55b1e8b

Please sign in to comment.