Skip to content

Commit

Permalink
feat(udp): Added UDP server (#45)
Browse files Browse the repository at this point in the history
* feat(udp): Added udp server

* feat(udp): Added udp server

* feat(udp): Added udp server

* feat(udp): Added udp server

* feat(udp): Added udp server

* feat(udp): Added udp server
  • Loading branch information
flc1125 authored Dec 28, 2023
1 parent e4b6231 commit 0d805ae
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
vendor/
.idea
_backup
_example
example
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/gorilla/mux v1.8.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sync v0.3.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g=
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ=
Expand Down
33 changes: 33 additions & 0 deletions udp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# UDP

## Server

```go
package main

import (
"log"
"net"

"github.com/go-kratos/kratos/v2"

"github.com/go-packagist/go-kratos-components/udp"
)

func main() {
err := kratos.New(
kratos.Server(
udp.NewServer(":12190", udp.WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) {
log.Println(string(buf))
}), udp.WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) {
log.Println(err)
})),
),
).Run()

if err != nil {
log.Fatal(err)
}
}

```
102 changes: 102 additions & 0 deletions udp/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package udp

import (
"context"
"log"
"net"
)

type Server struct {
address string

bufSize int

conn net.PacketConn

handler func(conn net.PacketConn, buf []byte, addr net.Addr)

recoveryHandler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{})
}

type Option func(*Server)

func WithBufSize(bufSize int) Option {
return func(s *Server) {
if bufSize > 0 {
s.bufSize = bufSize
}
}
}

func WithHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr)) Option {
return func(s *Server) {
if handler != nil {
s.handler = handler
}
}
}

func WithRecoveryHandler(handler func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{})) Option {
return func(s *Server) {
if handler != nil {
s.recoveryHandler = handler
}
}
}

func NewServer(address string, opts ...Option) *Server {
s := &Server{
address: address,
bufSize: 1024,
}

for _, opt := range opts {
opt(s)
}

return s
}

func (s *Server) Start(ctx context.Context) (err error) {
s.conn, err = net.ListenPacket("udp", s.address)
if err != nil {
return
}

log.Printf("udp server: listening on %s\n", s.address)

buf := make([]byte, s.bufSize)

for {
n, addr, err := s.conn.ReadFrom(buf)
if err != nil {
return err
}

if s.handler == nil {
log.Printf("udp server: receive from %s: %s\n", addr.String(), string(buf))
continue
}

go s.handle(buf[:n], addr)
}

}

func (s *Server) handle(buf []byte, addr net.Addr) {
if s.recoveryHandler != nil {
defer func() {
if err := recover(); err != nil {
s.recoveryHandler(s.conn, buf, addr, err)
}
}()
}

s.handler(s.conn, buf, addr)
}

func (s *Server) Stop(ctx context.Context) error {
log.Println("udp server: stopping")

return s.conn.Close()
}
60 changes: 60 additions & 0 deletions udp/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package udp

import (
"context"
"net"
"sync"
"testing"
"time"
)

func TestServer(t *testing.T) {
var (
server *Server
wg sync.WaitGroup
done = make(chan []byte, 1)
)

wg.Add(2)

go func() {
defer wg.Done()

server = NewServer(":12190", WithHandler(func(conn net.PacketConn, buf []byte, addr net.Addr) {
done <- buf
}), WithRecoveryHandler(func(conn net.PacketConn, buf []byte, addr net.Addr, err interface{}) {
t.Log(err)
}), WithBufSize(1024))

go server.Start(context.Background())

time.Sleep(time.Second * 5)
server.Stop(context.Background())
}()

go func() {
defer wg.Done()

time.Sleep(time.Second * 3)

c, err := net.Dial("udp", ":12190")
if err != nil {
t.Error(err)
return
}
defer c.Close()

_, err = c.Write([]byte("test"))
if err != nil {
t.Error(err)
return
}
}()

wg.Wait()

buf := <-done
if string(buf) != "test" {
t.Fatal("buf not equal test")
}
}

0 comments on commit 0d805ae

Please sign in to comment.