diff --git a/.gitignore b/.gitignore index fadb7396..89646676 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,4 @@ vendor/ .idea _backup -_example \ No newline at end of file +example \ No newline at end of file diff --git a/go.mod b/go.mod index 4098ea72..ab4ec711 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/gorilla/mux v1.8.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sync v0.3.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/go.sum b/go.sum index 1326e39a..b693246c 100644 --- a/go.sum +++ b/go.sum @@ -38,9 +38,13 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= diff --git a/udp/README.md b/udp/README.md new file mode 100644 index 00000000..eae70b78 --- /dev/null +++ b/udp/README.md @@ -0,0 +1,33 @@ +# UDP + +## Server + +```go +package main + +import ( + "log" + "net" + + "github.com/go-kratos/kratos/v2" + + "github.com/go-packagist/go-kratos-components/udp" +) + +func main() { + err := kratos.New( + kratos.Server( + udp.NewServer(":12190", udp.WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) { + log.Println(string(buf)) + }), udp.WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) { + log.Println(err) + })), + ), + ).Run() + + if err != nil { + log.Fatal(err) + } +} + +``` \ No newline at end of file diff --git a/udp/server.go b/udp/server.go new file mode 100644 index 00000000..c5467fcd --- /dev/null +++ b/udp/server.go @@ -0,0 +1,102 @@ +package udp + +import ( + "context" + "log" + "net" +) + +type Server struct { + address string + + bufSize int + + conn net.PacketConn + + handler func(conn net.PacketConn, buf []byte, addr net.Addr) + + recoveryHandler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) +} + +type Option func(*Server) + +func WithBufSize(bufSize int) Option { + return func(s *Server) { + if bufSize > 0 { + s.bufSize = bufSize + } + } +} + +func WithHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr)) Option { + return func(s *Server) { + if handler != nil { + s.handler = handler + } + } +} + +func WithRecoveryHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{})) Option { + return func(s *Server) { + if handler != nil { + s.recoveryHandler = handler + } + } +} + +func NewServer(address string, opts ...Option) *Server { + s := &Server{ + address: address, + bufSize: 1024, + } + + for _, opt := range opts { + opt(s) + } + + return s +} + +func (s *Server) Start(ctx context.Context) (err error) { + s.conn, err = net.ListenPacket("udp", s.address) + if err != nil { + return + } + + log.Printf("udp server: listening on %s\n", s.address) + + buf := make([]byte, s.bufSize) + + for { + n, addr, err := s.conn.ReadFrom(buf) + if err != nil { + return err + } + + if s.handler == nil { + log.Printf("udp server: receive from %s: %s\n", addr.String(), string(buf)) + continue + } + + go s.handle(buf[:n], addr) + } + +} + +func (s *Server) handle(buf []byte, addr net.Addr) { + if s.recoveryHandler != nil { + defer func() { + if err := recover(); err != nil { + s.recoveryHandler(s.conn, buf, addr, err) + } + }() + } + + s.handler(s.conn, buf, addr) +} + +func (s *Server) Stop(ctx context.Context) error { + log.Println("udp server: stopping") + + return s.conn.Close() +} diff --git a/udp/server_test.go b/udp/server_test.go new file mode 100644 index 00000000..ee2ac75e --- /dev/null +++ b/udp/server_test.go @@ -0,0 +1,60 @@ +package udp + +import ( + "context" + "net" + "sync" + "testing" + "time" +) + +func TestServer(t *testing.T) { + var ( + server *Server + wg sync.WaitGroup + done = make(chan []byte, 1) + ) + + wg.Add(2) + + go func() { + defer wg.Done() + + server = NewServer(":12190", WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) { + done <- buf + }), WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) { + t.Log(err) + }), WithBufSize(1024)) + + go server.Start(context.Background()) + + time.Sleep(time.Second * 5) + server.Stop(context.Background()) + }() + + go func() { + defer wg.Done() + + time.Sleep(time.Second * 3) + + c, err := net.Dial("udp", ":12190") + if err != nil { + t.Error(err) + return + } + defer c.Close() + + _, err = c.Write([]byte("test")) + if err != nil { + t.Error(err) + return + } + }() + + wg.Wait() + + buf := <-done + if string(buf) != "test" { + t.Fatal("buf not equal test") + } +}