Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for UDP protocol and implement health check method #8

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ const (
BrokenPipeError = "broken pipe"
)

var (
// CRLF (Carriage Return and Line Feed in ASCII code)
CRLF = []byte{13, 10}
)
// CRLF (Carriage Return and Line Feed in ASCII code)
var CRLF = []byte{13, 10}

func addCRLF(data []byte) []byte {
return append(data, CRLF...)
Expand All @@ -38,10 +36,12 @@ type Option func(*options)

type options struct {
dialer *net.Dialer
protocol string
useTLS bool
skipVerify bool
readTimeout time.Duration
writeTimeout time.Duration
dialTimeout time.Duration
tlsConfig *tls.Config
}

Expand Down Expand Up @@ -73,6 +73,13 @@ func SetWriteTimeout(writeTimeout time.Duration) Option {
}
}

// SetDialTimeout Option func
func SetDialTimeout(dialTimeout time.Duration) Option {
return func(o *options) {
o.dialTimeout = dialTimeout
}
}

// SetKeepAlive Option func
func SetKeepAlive(keepAlive time.Duration) Option {
return func(o *options) {
Expand All @@ -87,27 +94,31 @@ func SetTLSConfig(config *tls.Config) Option {
}
}

func (s *Stash) dial(address string, o *options) error {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return err
// SetProtocolConn Option func
// set protocol connection between logstash : `tcp` or `udp`
func SetProtocolConn(protocol string) Option {
return func(o *options) {
o.protocol = protocol
}
}

conn, err := net.DialTCP("tcp", nil, addr)
func (s *Stash) dial(address string) error {
conn, err := s.o.dialer.Dial(s.o.protocol, address)
if err != nil {
conn.Close()
return err
}

s.conn = conn

// if useTLS true
// Force stash to use TLS
if o.useTLS {
if s.o.useTLS {
var tlsConfig *tls.Config
if o.tlsConfig == nil {
tlsConfig = &tls.Config{InsecureSkipVerify: o.skipVerify}
if s.o.tlsConfig == nil {
tlsConfig = &tls.Config{InsecureSkipVerify: s.o.skipVerify}
} else {
tlsConfig = o.tlsConfig
tlsConfig = s.o.tlsConfig
}

if tlsConfig.ServerName == "" {
Expand Down Expand Up @@ -141,7 +152,9 @@ func Connect(host string, port uint64, opts ...Option) (*Stash, error) {
o := &options{
dialer: &net.Dialer{
KeepAlive: time.Minute * 5,
Timeout: 30 * time.Second,
},
protocol: "tcp",
writeTimeout: 30 * time.Second,
readTimeout: 30 * time.Second,
}
Expand All @@ -150,7 +163,7 @@ func Connect(host string, port uint64, opts ...Option) (*Stash, error) {
}

s.o = o
if err := s.dial(address, o); err != nil {
if err := s.dial(address); err != nil {
return nil, err
}

Expand Down Expand Up @@ -179,7 +192,7 @@ func (s *Stash) Write(data []byte) (int, error) {
if strings.Contains(err.Error(), BrokenPipeError) {
log.Printf("go-stash: %s | do re dial\n", err.Error())
// re dial ignore error
err = s.dial(s.address, s.o)
err = s.dial(s.address)
if err != nil {
log.Printf("go-stash: %s | do re dial\n", err.Error())
}
Expand Down
Loading