Skip to content

Commit

Permalink
fix: more minimal lock
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 28, 2024
1 parent e4e4320 commit 0a858c6
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 15 deletions.
3 changes: 0 additions & 3 deletions ext/pkg/network/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ func (n *HTTPListenNode) Shutdown() error {

// ServeHTTP handles HTTP requests.
func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
n.mu.RLock()
defer n.mu.RUnlock()

proc := process.New()

proc.Store(KeyHTTPResponseWriter, w)
Expand Down
10 changes: 6 additions & 4 deletions pkg/node/manytoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,17 @@ func (n *ManyToOneNode) Close() error {
}

func (n *ManyToOneNode) forward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()
inPort := n.inPorts[index]

inReader := n.inPorts[index].Open(proc)
return port.ListenFunc(func(proc *process.Process) {
inReader := inPort.Open(proc)
var outWriter *packet.Writer
var errWriter *packet.Writer

readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) {
n.mu.RLock()
defer n.mu.RUnlock()

inReaders := make([]*packet.Reader, len(n.inPorts))
for i, inPort := range n.inPorts {
inReaders[i] = inPort.Open(proc)
Expand Down
7 changes: 2 additions & 5 deletions pkg/node/onetomany.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,9 @@ func (n *OneToManyNode) forward(proc *process.Process) {
}

func (n *OneToManyNode) backward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

outPort := n.outPorts[index]
outPort := n.outPorts[index]

return port.ListenFunc(func(proc *process.Process) {
outWriter := outPort.Open(proc)

for backPck := range outWriter.Receive() {
Expand Down
9 changes: 8 additions & 1 deletion pkg/port/inport.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,16 @@ func (p *InPort) AddListener(listener Listener) bool {

// Open prepares the input port for a given process and returns a reader.
func (p *InPort) Open(proc *process.Process) *packet.Reader {
p.mu.RLock()
reader, ok := p.readers[proc]
p.mu.RUnlock()
if ok {
return reader
}

p.mu.Lock()

reader, ok := p.readers[proc]
reader, ok = p.readers[proc]
if ok {
p.mu.Unlock()
return reader
Expand Down
9 changes: 8 additions & 1 deletion pkg/port/outport.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,16 @@ func (p *OutPort) Unlink(in *InPort) {

// Open opens the output port for the given process and returns a writer.
func (p *OutPort) Open(proc *process.Process) *packet.Writer {
p.mu.RLock()
writer, ok := p.writers[proc]
p.mu.RUnlock()
if ok {
return writer
}

p.mu.Lock()

writer, ok := p.writers[proc]
writer, ok = p.writers[proc]
if ok {
p.mu.Unlock()
return writer
Expand Down
2 changes: 1 addition & 1 deletion pkg/symbol/cluster.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package symbol

import (
"github.com/siyul-park/uniflow/pkg/packet"
"sync"

"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/spec"
Expand Down

0 comments on commit 0a858c6

Please sign in to comment.