Skip to content

Commit

Permalink
fix: more minimal lock
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 28, 2024
1 parent e4e4320 commit 8db54b4
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 166 deletions.
4 changes: 2 additions & 2 deletions ext/pkg/io/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func (n *SQLNode) action(proc *process.Process, inPck *packet.Packet) (*packet.P

proc.AddExitHook(process.ExitFunc(func(err error) {
if err != nil {
tx.Rollback()
_ = tx.Rollback()
} else {
tx.Commit()
_ = tx.Commit()
}
}))

Expand Down
4 changes: 3 additions & 1 deletion ext/pkg/language/javascript/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func NewCompiler(options ...api.TransformOptions) language.Compiler {
},
}

return language.RunFunc(func(ctx context.Context, args []any) ([]any, error) {
return language.RunFunc(func(ctx context.Context, args []any) (_ []any, err error) {
defer func() { err, _ = recover().(error) }()

vm := vms.Get().(*goja.Runtime)
defer vms.Put(vm)

Expand Down
3 changes: 0 additions & 3 deletions ext/pkg/network/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ func (n *HTTPListenNode) Shutdown() error {

// ServeHTTP handles HTTP requests.
func (n *HTTPListenNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
n.mu.RLock()
defer n.mu.RUnlock()

proc := process.New()

proc.Store(KeyHTTPResponseWriter, w)
Expand Down
25 changes: 14 additions & 11 deletions pkg/node/manytoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,17 @@ func (n *ManyToOneNode) Close() error {
}

func (n *ManyToOneNode) forward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()
inPort := n.inPorts[index]

inReader := n.inPorts[index].Open(proc)
return port.ListenFunc(func(proc *process.Process) {
inReader := inPort.Open(proc)
var outWriter *packet.Writer
var errWriter *packet.Writer

readGroup, _ := n.readGroups.LoadOrStore(proc, func() (*packet.ReadGroup, error) {
n.mu.RLock()
defer n.mu.RUnlock()

inReaders := make([]*packet.Reader, len(n.inPorts))
for i, inPort := range n.inPorts {
inReaders[i] = inPort.Open(proc)
Expand All @@ -110,19 +112,20 @@ func (n *ManyToOneNode) forward(index int) port.Listener {
for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if inPcks := readGroup.Read(inReader, inPck); len(inPcks) < len(n.inPorts) {
n.tracer.Reduce(inPck)
} else if outPck, errPck := n.action(proc, inPcks); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else if outPck != nil {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}

n.tracer.Transform(inPck, outPck)
n.tracer.Write(outWriter, outPck)
} else {
Expand Down
24 changes: 10 additions & 14 deletions pkg/node/onetomany.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,17 @@ func (n *OneToManyNode) forward(proc *process.Process) {
defer n.mu.RUnlock()

inReader := n.inPort.Open(proc)
outWriters := make([]*packet.Writer, 0, len(n.outPorts))
outWriters := make([]*packet.Writer, len(n.outPorts))
var errWriter *packet.Writer

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if len(outWriters) == 0 {
for _, outPort := range n.outPorts {
outWriters = append(outWriters, outPort.Open(proc))
if outPcks, errPck := n.action(proc, inPck); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if outPcks, errPck := n.action(proc, inPck); errPck != nil {
n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
Expand All @@ -122,6 +117,10 @@ func (n *OneToManyNode) forward(proc *process.Process) {
count := 0
for i, outPck := range outPcks {
if i < len(outWriters) && outPck != nil {
if outWriters[i] == nil {
outWriters[i] = n.outPorts[i].Open(proc)
}

n.tracer.Write(outWriters[i], outPck)
count++
}
Expand All @@ -135,12 +134,9 @@ func (n *OneToManyNode) forward(proc *process.Process) {
}

func (n *OneToManyNode) backward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
n.mu.RLock()
defer n.mu.RUnlock()

outPort := n.outPorts[index]
outPort := n.outPorts[index]

return port.ListenFunc(func(proc *process.Process) {
outWriter := outPort.Open(proc)

for backPck := range outWriter.Receive() {
Expand Down
15 changes: 8 additions & 7 deletions pkg/node/onetoone.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,18 @@ func (n *OneToOneNode) forward(proc *process.Process) {
for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

if outWriter == nil {
outWriter = n.outPort.Open(proc)
}
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

if outPck, errPck := n.action(proc, inPck); errPck != nil {
if errWriter == nil {
errWriter = n.errPort.Open(proc)
}

n.tracer.Transform(inPck, errPck)
n.tracer.Write(errWriter, errPck)
} else {
if outWriter == nil {
outWriter = n.outPort.Open(proc)
}

n.tracer.Transform(inPck, outPck)
n.tracer.Write(outWriter, outPck)
}
Expand Down
64 changes: 29 additions & 35 deletions pkg/packet/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Reader struct {
writers []*Writer
in chan *Packet
out chan *Packet
done chan struct{}
done bool
inbounds Hooks
outbounds Hooks
mu sync.Mutex
Expand All @@ -20,32 +20,31 @@ type Reader struct {
// NewReader creates a new Reader instance and starts its processing loop.
func NewReader() *Reader {
r := &Reader{
in: make(chan *Packet),
out: make(chan *Packet),
done: make(chan struct{}),
in: make(chan *Packet),
out: make(chan *Packet),
}

go func() {
defer close(r.out)
defer close(r.in)

buffer := make([]*Packet, 0, 2)
for {
var pck *Packet
select {
case pck = <-r.in:
case <-r.done:
var ok bool
if pck, ok = <-r.in; !ok {
return
}

select {
case r.out <- pck:
default:
buffer = append(buffer, pck)

for len(buffer) > 0 {
select {
case pck = <-r.in:
case pck, ok = <-r.in:
if !ok {
return
}
buffer = append(buffer, pck)
case r.out <- buffer[0]:
buffer = buffer[1:]
Expand All @@ -63,37 +62,35 @@ func (r *Reader) AddInboundHook(hook Hook) bool {
r.mu.Lock()
defer r.mu.Unlock()

select {
case <-r.done:
if r.done {
return false
default:
for _, h := range r.inbounds {
if h == hook {
return false
}
}

for _, h := range r.inbounds {
if h == hook {
return false
}
r.inbounds = append(r.inbounds, hook)
return true
}
r.inbounds = append(r.inbounds, hook)
return true
}

// AddOutboundHook adds a handler to process outbound packets.
func (r *Reader) AddOutboundHook(hook Hook) bool {
r.mu.Lock()
defer r.mu.Unlock()

select {
case <-r.done:
if r.done {
return false
default:
for _, h := range r.outbounds {
if h == hook {
return false
}
}

for _, h := range r.outbounds {
if h == hook {
return false
}
r.outbounds = append(r.outbounds, hook)
return true
}
r.outbounds = append(r.outbounds, hook)
return true
}

// Read returns the channel for reading packets from the reader.
Expand Down Expand Up @@ -125,10 +122,8 @@ func (r *Reader) Close() {
r.mu.Lock()
defer r.mu.Unlock()

select {
case <-r.done:
if r.done {
return
default:
}

pck := New(types.NewError(ErrDroppedPacket))
Expand All @@ -137,8 +132,9 @@ func (r *Reader) Close() {
go w.receive(pck, r)
}

close(r.done)
close(r.in)

r.done = true
r.writers = nil
r.inbounds = nil
r.outbounds = nil
Expand All @@ -148,10 +144,8 @@ func (r *Reader) write(pck *Packet, writer *Writer) bool {
r.mu.Lock()
defer r.mu.Unlock()

select {
case <-r.done:
if r.done {
return false
default:
}

r.writers = append(r.writers, writer)
Expand Down
Loading

0 comments on commit 8db54b4

Please sign in to comment.