Skip to content

Commit

Permalink
fix: remove unnecessary method in signal
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 20, 2024
1 parent e68749d commit 6b36b04
Showing 1 changed file with 17 additions and 26 deletions.
43 changes: 17 additions & 26 deletions ext/pkg/system/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type SignalNodeSpec struct {
type SignalNode struct {
outPort *port.OutPort
signal <-chan any
wait chan struct{}
done chan struct{}
close chan struct{}
mu sync.RWMutex
}

Expand Down Expand Up @@ -55,7 +55,7 @@ func NewSignalNodeCodec(signals map[string]func(context.Context) (<-chan any, er
n := NewSignalNode(signal)

go func() {
<-n.Wait()
<-n.Done()
cancel()
}()

Expand All @@ -68,8 +68,8 @@ func NewSignalNode(signal <-chan any) *SignalNode {
return &SignalNode{
outPort: port.NewOut(),
signal: signal,
done: nil,
close: make(chan struct{}),
wait: nil,
done: make(chan struct{}),
}
}

Expand All @@ -78,21 +78,21 @@ func (n *SignalNode) Listen() {
n.mu.Lock()
defer n.mu.Unlock()

if n.done != nil {
if n.wait != nil {
return
}

done := make(chan struct{})
n.done = done
wait := make(chan struct{})
n.wait = wait

go func() {
defer func() {
n.mu.Lock()
defer n.mu.Unlock()

if n.done != nil {
close(n.done)
n.done = nil
if n.wait != nil {
close(n.wait)
n.wait = nil
}
}()

Expand All @@ -102,9 +102,8 @@ func (n *SignalNode) Listen() {
if !ok {
return
}

n.emit(sig)
case <-done:
case <-wait:
return
}
}
Expand All @@ -116,30 +115,22 @@ func (n *SignalNode) Shutdown() {
n.mu.Lock()
defer n.mu.Unlock()

if n.done == nil {
if n.wait == nil {
return
}

close(n.done)
n.done = nil
close(n.wait)
n.wait = nil
}

// Done returns a channel that is closed when the node is shutdown.
// Done returns a channel that is closed when the node is done.
func (n *SignalNode) Done() <-chan struct{} {
n.mu.RLock()
defer n.mu.RUnlock()

return n.done
}

// Wait returns a channel that is closed when the node is close.
func (n *SignalNode) Wait() <-chan struct{} {
n.mu.RLock()
defer n.mu.RUnlock()

return n.close
}

// In returns nil as SignalNode does not have input ports.
func (n *SignalNode) In(_ string) *port.InPort {
return nil
Expand All @@ -160,9 +151,9 @@ func (n *SignalNode) Close() error {
defer n.mu.Unlock()

select {
case <-n.close:
case <-n.done:
default:
close(n.close)
close(n.done)
}
}()

Expand Down

0 comments on commit 6b36b04

Please sign in to comment.