Skip to content

Commit

Permalink
refactor(udp): switch to channel mode
Browse files Browse the repository at this point in the history
  • Loading branch information
flc1125 committed Dec 29, 2023
1 parent 0d805ae commit f6c5aa0
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 22 deletions.
9 changes: 4 additions & 5 deletions udp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package main

import (
"log"
"net"

"github.com/go-kratos/kratos/v2"

Expand All @@ -17,11 +16,11 @@ import (
func main() {
err := kratos.New(
kratos.Server(
udp.NewServer(":12190", udp.WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) {
log.Println(string(buf))
}), udp.WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) {
udp.NewServer(":12190", udp.WithHandler(func(msg *udp.Message) {
log.Printf("receive message: %s", msg.Body)
}), udp.WithRecoveryHandler(func(msg *udp.Message, err interface{}) {
log.Println(err)
})),
}), udp.WithReadChanSize(10240)),
),
).Run()

Expand Down
80 changes: 66 additions & 14 deletions udp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,31 @@ import (
"context"
"log"
"net"
"sync"
)

type Message struct {
Conn net.PacketConn
Addr net.Addr
Body []byte
}

type Server struct {
address string

bufSize int

conn net.PacketConn

handler func(conn net.PacketConn, buf []byte, addr net.Addr)
handler func(message *Message)

recoveryHandler func(message *Message, err interface{})

recoveryHandler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{})
readChan chan *Message
readChanSize int // readChan size

stoped chan struct{}
stopedOnce sync.Once
}

type Option func(*Server)
Expand All @@ -28,32 +41,44 @@ func WithBufSize(bufSize int) Option {
}
}

func WithHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr)) Option {
func WithHandler(handler func(message *Message)) Option {
return func(s *Server) {
if handler != nil {
s.handler = handler
}
}
}

func WithRecoveryHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{})) Option {
func WithRecoveryHandler(handler func(message *Message, err interface{})) Option {
return func(s *Server) {
if handler != nil {
s.recoveryHandler = handler
}
}
}

func WithReadChanSize(readChanSize int) Option {
return func(s *Server) {
if readChanSize > 0 {
s.readChanSize = readChanSize
}

Check warning on line 64 in udp/server.go

View check run for this annotation

Codecov / codecov/patch

udp/server.go#L60-L64

Added lines #L60 - L64 were not covered by tests
}
}

func NewServer(address string, opts ...Option) *Server {
s := &Server{
address: address,
bufSize: 1024,
address: address,
bufSize: 1024,
readChanSize: 1024,
stoped: make(chan struct{}),
}

for _, opt := range opts {
opt(s)
}

s.readChan = make(chan *Message, s.readChanSize)

return s
}

Expand All @@ -65,38 +90,65 @@ func (s *Server) Start(ctx context.Context) (err error) {

log.Printf("udp server: listening on %s\n", s.address)

go s.start()

buf := make([]byte, s.bufSize)

for {
n, addr, err := s.conn.ReadFrom(buf)
if err != nil {
s.stop()
return err
}

if s.handler == nil {
log.Printf("udp server: receive from %s: %s\n", addr.String(), string(buf))
continue
s.readChan <- &Message{
Conn: s.conn,
Addr: addr,
Body: buf[:n],
}

go s.handle(buf[:n], addr)
}

}

func (s *Server) handle(buf []byte, addr net.Addr) {
func (s *Server) start() {
for {
select {
case <-s.stoped:
return
case message := <-s.readChan:
if s.handler != nil {
s.handle(message)
}
}
}
}

func (s *Server) handle(message *Message) {
if s.recoveryHandler != nil {
defer func() {
if err := recover(); err != nil {
s.recoveryHandler(s.conn, buf, addr, err)
s.recoveryHandler(message, err)

Check warning on line 130 in udp/server.go

View check run for this annotation

Codecov / codecov/patch

udp/server.go#L130

Added line #L130 was not covered by tests
}
}()
}

s.handler(s.conn, buf, addr)
s.handler(message)
}

func (s *Server) Stop(ctx context.Context) error {
log.Println("udp server: stopping")

s.stop()

if s.conn == nil {
return nil
}

Check warning on line 145 in udp/server.go

View check run for this annotation

Codecov / codecov/patch

udp/server.go#L144-L145

Added lines #L144 - L145 were not covered by tests

return s.conn.Close()
}

func (s *Server) stop() {
s.stopedOnce.Do(func() {
close(s.stoped)
})
}
6 changes: 3 additions & 3 deletions udp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func TestServer(t *testing.T) {
go func() {
defer wg.Done()

server = NewServer(":12190", WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) {
done <- buf
}), WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) {
server = NewServer(":12190", WithHandler(func(msg *Message) {
done <- msg.Body
}), WithRecoveryHandler(func(msg *Message, err interface{}) {
t.Log(err)
}), WithBufSize(1024))

Expand Down

0 comments on commit f6c5aa0

Please sign in to comment.