From 374adfe12757a3ae68ee0d14716b02b21096bef9 Mon Sep 17 00:00:00 2001 From: 9iksans Date: Thu, 11 Jul 2024 19:42:46 +0700 Subject: [PATCH 1/2] feat - add udp protocol to connect logstash --- stash.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/stash.go b/stash.go index 59de2b5..14a1bab 100644 --- a/stash.go +++ b/stash.go @@ -15,10 +15,8 @@ const ( BrokenPipeError = "broken pipe" ) -var ( - // CRLF (Carriage Return and Line Feed in ASCII code) - CRLF = []byte{13, 10} -) +// CRLF (Carriage Return and Line Feed in ASCII code) +var CRLF = []byte{13, 10} func addCRLF(data []byte) []byte { return append(data, CRLF...) @@ -38,6 +36,7 @@ type Option func(*options) type options struct { dialer *net.Dialer + protocol string useTLS bool skipVerify bool readTimeout time.Duration @@ -87,13 +86,16 @@ func SetTLSConfig(config *tls.Config) Option { } } -func (s *Stash) dial(address string, o *options) error { - addr, err := net.ResolveTCPAddr("tcp", address) - if err != nil { - return err +// SetProtocolConn Option func +// set protocol connection between logstash : `tcp` or `udp` +func SetProtocolConn(protocol string) Option { + return func(o *options) { + o.protocol = protocol } +} - conn, err := net.DialTCP("tcp", nil, addr) +func (s *Stash) dial(address string, o *options) error { + conn, err := net.Dial(o.protocol, address) if err != nil { return err } @@ -142,6 +144,7 @@ func Connect(host string, port uint64, opts ...Option) (*Stash, error) { dialer: &net.Dialer{ KeepAlive: time.Minute * 5, }, + protocol: "tcp", writeTimeout: 30 * time.Second, readTimeout: 30 * time.Second, } From ef6e6abaa1b396b32c467f520ec6e700618cad97 Mon Sep 17 00:00:00 2001 From: 9iksans Date: Fri, 12 Jul 2024 16:03:53 +0700 Subject: [PATCH 2/2] enhancement - add dialer timeout option for tcp conn on dialing logstash --- stash.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/stash.go b/stash.go index 14a1bab..7a9aee1 100644 --- a/stash.go +++ b/stash.go @@ -41,6 +41,7 @@ type options struct { skipVerify bool readTimeout time.Duration writeTimeout time.Duration + dialTimeout time.Duration tlsConfig *tls.Config } @@ -72,6 +73,13 @@ func SetWriteTimeout(writeTimeout time.Duration) Option { } } +// SetDialTimeout Option func +func SetDialTimeout(dialTimeout time.Duration) Option { + return func(o *options) { + o.dialTimeout = dialTimeout + } +} + // SetKeepAlive Option func func SetKeepAlive(keepAlive time.Duration) Option { return func(o *options) { @@ -94,9 +102,10 @@ func SetProtocolConn(protocol string) Option { } } -func (s *Stash) dial(address string, o *options) error { - conn, err := net.Dial(o.protocol, address) +func (s *Stash) dial(address string) error { + conn, err := s.o.dialer.Dial(s.o.protocol, address) if err != nil { + conn.Close() return err } @@ -104,12 +113,12 @@ func (s *Stash) dial(address string, o *options) error { // if useTLS true // Force stash to use TLS - if o.useTLS { + if s.o.useTLS { var tlsConfig *tls.Config - if o.tlsConfig == nil { - tlsConfig = &tls.Config{InsecureSkipVerify: o.skipVerify} + if s.o.tlsConfig == nil { + tlsConfig = &tls.Config{InsecureSkipVerify: s.o.skipVerify} } else { - tlsConfig = o.tlsConfig + tlsConfig = s.o.tlsConfig } if tlsConfig.ServerName == "" { @@ -143,6 +152,7 @@ func Connect(host string, port uint64, opts ...Option) (*Stash, error) { o := &options{ dialer: &net.Dialer{ KeepAlive: time.Minute * 5, + Timeout: 30 * time.Second, }, protocol: "tcp", writeTimeout: 30 * time.Second, @@ -153,7 +163,7 @@ func Connect(host string, port uint64, opts ...Option) (*Stash, error) { } s.o = o - if err := s.dial(address, o); err != nil { + if err := s.dial(address); err != nil { return nil, err } @@ -182,7 +192,7 @@ func (s *Stash) Write(data []byte) (int, error) { if strings.Contains(err.Error(), BrokenPipeError) { log.Printf("go-stash: %s | do re dial\n", err.Error()) // re dial ignore error - err = s.dial(s.address, s.o) + err = s.dial(s.address) if err != nil { log.Printf("go-stash: %s | do re dial\n", err.Error()) }