Skip to content

Commit

Permalink
fix: more minimal lock
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 28, 2024
1 parent e4e4320 commit 890a55d
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 68 deletions.
4 changes: 2 additions & 2 deletions ext/pkg/io/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func (n *SQLNode) action(proc *process.Process, inPck *packet.Packet) (*packet.P

proc.AddExitHook(process.ExitFunc(func(err error) {
if err != nil {
tx.Rollback()
_ = tx.Rollback()
} else {
tx.Commit()
_ = tx.Commit()
}
}))

Expand Down
4 changes: 3 additions & 1 deletion ext/pkg/language/javascript/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func NewCompiler(options ...api.TransformOptions) language.Compiler {
},
}

return language.RunFunc(func(ctx context.Context, args []any) ([]any, error) {
return language.RunFunc(func(ctx context.Context, args []any) (_ []any, err error) {
defer func() { err, _ = recover().(error) }()

vm := vms.Get().(*goja.Runtime)
defer vms.Put(vm)

Expand Down
3 changes: 0 additions & 3 deletions ext/pkg/network/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ func (n *HTTPListenNode) Shutdown() error {

// ServeHTTP handles HTTP requests.
func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
n.mu.RLock()
defer n.mu.RUnlock()

proc := process.New()

proc.Store(KeyHTTPResponseWriter, w)
Expand Down
25 changes: 14 additions & 11 deletions pkg/node/manytoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,17 @@ func (n *ManyToOneNode) Close() error {
}

func (n *ManyToOneNode) forward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()
inPort := n.inPorts[index]

inReader := n.inPorts[index].Open(proc)
return port.ListenFunc(func(proc *process.Process) {
inReader := inPort.Open(proc)
var outWriter *packet.Writer
var errWriter *packet.Writer

readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) {
n.mu.RLock()
defer n.mu.RUnlock()

inReaders := make([]*packet.Reader, len(n.inPorts))
for i, inPort := range n.inPorts {
inReaders[i] = inPort.Open(proc)
Expand All @@ -110,19 +112,20 @@ func (n *ManyToOneNode) forward(index int) port.Listener {
for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) {
n.tracer.Reduce(inPck)
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else if outPck != nil {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}

n.tracer.Transform(inPck, outPck)
n.tracer.Write(outWriter, outPck)
} else {
Expand Down
24 changes: 10 additions & 14 deletions pkg/node/onetomany.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,17 @@ func (n *OneToManyNode) forward(proc *process.Process) {
defer n.mu.RUnlock()

inReader := n.inPort.Open(proc)
outWriters := make([]*packet.Writer, 0, len(n.outPorts))
outWriters := make([]*packet.Writer, len(n.outPorts))
var errWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if len(outWriters) == 0 {
for _, outPort := range n.outPorts {
outWriters = append(outWriters, outPort.Open(proc))
if outPcks, errPck := n.action(proc, inPck); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if outPcks, errPck := n.action(proc, inPck); errPck != nil {
n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
Expand All @@ -122,6 +117,10 @@ func (n *OneToManyNode) forward(proc *process.Process) {
count := 0
for i, outPck := range outPcks {
if i < len(outWriters) && outPck != nil {
if outWriters[i] == nil {
outWriters[i] = n.outPorts[i].Open(proc)
}

n.tracer.Write(outWriters[i], outPck)
count++
}
Expand All @@ -135,12 +134,9 @@ func (n *OneToManyNode) forward(proc *process.Process) {
}

func (n *OneToManyNode) backward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

outPort := n.outPorts[index]
outPort := n.outPorts[index]

return port.ListenFunc(func(proc *process.Process) {
outWriter := outPort.Open(proc)

for backPck := range outWriter.Receive() {
Expand Down
15 changes: 8 additions & 7 deletions pkg/node/onetoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,18 @@ func (n *OneToOneNode) forward(proc *process.Process) {
for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if outPck, errPck := n.action(proc, inPck); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}

n.tracer.Transform(inPck, outPck)
n.tracer.Write(outWriter, outPck)
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/port/inport.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,16 @@ func (p *InPort) AddListener(listener Listener) bool {

// Open prepares the input port for a given process and returns a reader.
func (p *InPort) Open(proc *process.Process) *packet.Reader {
p.mu.RLock()
reader, ok := p.readers[proc]
p.mu.RUnlock()
if ok {
return reader
}

p.mu.Lock()

reader, ok := p.readers[proc]
reader, ok = p.readers[proc]
if ok {
p.mu.Unlock()
return reader
Expand Down
9 changes: 8 additions & 1 deletion pkg/port/outport.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,16 @@ func (p *OutPort) Unlink(in *InPort) {

// Open opens the output port for the given process and returns a writer.
func (p *OutPort) Open(proc *process.Process) *packet.Writer {
p.mu.RLock()
writer, ok := p.writers[proc]
p.mu.RUnlock()
if ok {
return writer
}

p.mu.Lock()

writer, ok := p.writers[proc]
writer, ok = p.writers[proc]
if ok {
p.mu.Unlock()
return writer
Expand Down
90 changes: 66 additions & 24 deletions pkg/process/local.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
package process

import "sync"
import (
"sync"
"sync/atomic"
)

// Local provides a concurrent cache for process-specific data.
// Local provides a concurrent cache for process-specific eager.
type Local[T any] struct {
data map[*Process]T
eager map[*Process]T
lazy map[*Process]*lazy[T]
storeHooks map[*Process]StoreHooks[T]
mu sync.RWMutex
}

type lazy[T any] struct {
fn func() (T, error)
value T
error error
done atomic.Uint32
mu sync.Mutex
}

// NewLocal creates a new Local cache instance.
func NewLocal[T any]() *Local[T] {
return &Local[T]{
data: make(map[*Process]T),
eager: make(map[*Process]T),
lazy: make(map[*Process]*lazy[T]),
storeHooks: make(map[*Process]StoreHooks[T]),
}
}
Expand All @@ -22,7 +35,7 @@ func (l *Local[T]) AddStoreHook(proc *Process, hook StoreHook[T]) bool {
l.mu.Lock()
defer l.mu.Unlock()

if val, ok := l.data[proc]; ok {
if val, ok := l.eager[proc]; ok {
l.mu.Unlock()
defer l.mu.Lock()

Expand Down Expand Up @@ -63,8 +76,8 @@ func (l *Local[T]) Keys() []*Process {
l.mu.RLock()
defer l.mu.RUnlock()

keys := make([]*Process, 0, len(l.data))
for proc := range l.data {
keys := make([]*Process, 0, len(l.eager))
for proc := range l.eager {
keys = append(keys, proc)
}
return keys
Expand All @@ -75,17 +88,17 @@ func (l *Local[T]) Load(proc *Process) (T, bool) {
l.mu.RLock()
defer l.mu.RUnlock()

val, ok := l.data[proc]
val, ok := l.eager[proc]
return val, ok
}

// Store sets the value for the given process.
func (l *Local[T]) Store(proc *Process, val T) {
l.mu.Lock()

_, ok := l.data[proc]
_, ok := l.eager[proc]

l.data[proc] = val
l.eager[proc] = val
if !ok {
proc.AddExitHook(ExitFunc(func(err error) {
l.Delete(proc)
Expand All @@ -100,55 +113,84 @@ func (l *Local[T]) Store(proc *Process, val T) {
storeHooks.Store(val)
}

// Delete removes the process and its data from the cache.
// Delete removes the process and its eager from the cache.
func (l *Local[T]) Delete(proc *Process) bool {
l.mu.Lock()
defer l.mu.Unlock()

_, ok := l.data[proc]
_, ok := l.eager[proc]

delete(l.data, proc)
delete(l.eager, proc)
delete(l.storeHooks, proc)

return ok
}

// LoadOrStore retrieves or stores a value for the given process.
func (l *Local[T]) LoadOrStore(proc *Process, val func() (T, error)) (T, error) {
l.mu.RLock()
v, ok := l.eager[proc]
l.mu.RUnlock()
if ok {
return v, nil
}

l.mu.Lock()
defer l.mu.Unlock()

if v, ok := l.data[proc]; ok {
if v, ok := l.eager[proc]; ok {
l.mu.Unlock()
return v, nil
}

v, err := val()
fn, ok := l.lazy[proc]
if !ok {
fn = &lazy[T]{fn: val}
l.lazy[proc] = fn
}

l.mu.Unlock()

v, err := fn.Do()
if err != nil {
return v, err
}

l.data[proc] = v
proc.AddExitHook(ExitFunc(func(err error) {
l.Delete(proc)
}))
l.mu.Lock()

l.eager[proc] = v
delete(l.lazy, proc)

storeHooks := l.storeHooks[proc]
delete(l.storeHooks, proc)

l.mu.Unlock()

storeHooks.Store(v)
proc.AddExitHook(ExitFunc(func(err error) {
l.Delete(proc)
}))

l.mu.Lock()
storeHooks.Store(v)

return v, nil
}

// Close clears all cached data and hooks.
// Close clears all cached eager and hooks.
func (l *Local[T]) Close() {
l.mu.Lock()
defer l.mu.Unlock()

l.data = make(map[*Process]T)
l.eager = make(map[*Process]T)
l.lazy = make(map[*Process]*lazy[T])
l.storeHooks = make(map[*Process]StoreHooks[T])
}

func (o *lazy[T]) Do() (T, error) {
o.mu.Lock()
defer o.mu.Unlock()

if o.done.Load() == 0 {
defer o.done.Store(1)
o.value, o.error = o.fn()
}
return o.value, o.error
}
Loading

0 comments on commit 890a55d

Please sign in to comment.