diff --git a/gelf/chunk.go b/gelf/chunk.go new file mode 100644 index 0000000..6604d60 --- /dev/null +++ b/gelf/chunk.go @@ -0,0 +1,27 @@ +package gelf + +// Used to control GELF chunking. Should be less than (MTU - len(UDP +// header)). +// +// TODO: generate dynamically using Path MTU Discovery? +const ( + ChunkSize = 1420 + chunkedHeaderLen = 12 + chunkedDataLen = ChunkSize - chunkedHeaderLen +) + +var ( + magicChunked = []byte{0x1e, 0x0f} + magicZlib = []byte{0x78} + magicGzip = []byte{0x1f, 0x8b} +) + +// numChunks returns the number of GELF chunks necessary to transmit +// the given compressed buffer. +func numChunks(b []byte) int { + lenB := len(b) + if lenB <= ChunkSize { + return 1 + } + return len(b)/chunkedDataLen + 1 +} diff --git a/gelf/reader.go b/gelf/reader.go index ff719fc..ac13565 100644 --- a/gelf/reader.go +++ b/gelf/reader.go @@ -75,7 +75,7 @@ func (r *Reader) ReadMessage() (*Message, error) { for got := 0; got < 128 && (total == 0 || got < int(total)); got++ { if n, err = r.conn.Read(cBuf); err != nil { - return nil, fmt.Errorf("Read: %s", err) + return nil, fmt.Errorf("read buffer: %w", err) } cHead, cBuf = cBuf[:2], cBuf[:n] @@ -138,3 +138,7 @@ func (r *Reader) ReadMessage() (*Message, error) { return msg, nil } + +func (r *Reader) Close() error { + return r.conn.Close() +} diff --git a/gelf/udpwriter.go b/gelf/udpwriter.go index 23bbd5e..05de796 100644 --- a/gelf/udpwriter.go +++ b/gelf/udpwriter.go @@ -34,32 +34,6 @@ const ( CompressNone ) -// Used to control GELF chunking. Should be less than (MTU - len(UDP -// header)). -// -// TODO: generate dynamically using Path MTU Discovery? -const ( - ChunkSize = 1420 - chunkedHeaderLen = 12 - chunkedDataLen = ChunkSize - chunkedHeaderLen -) - -var ( - magicChunked = []byte{0x1e, 0x0f} - magicZlib = []byte{0x78} - magicGzip = []byte{0x1f, 0x8b} -) - -// numChunks returns the number of GELF chunks necessary to transmit -// the given compressed buffer. -func numChunks(b []byte) int { - lenB := len(b) - if lenB <= ChunkSize { - return 1 - } - return len(b)/chunkedDataLen + 1 -} - // New returns a new GELF Writer. This writer can be used to send the // output of the standard Go log functions to a central GELF server by // passing it to log.SetOutput() diff --git a/gelf/udpwriter_test.go b/gelf/udpwriter_test.go index e0f1ed4..78a2dee 100644 --- a/gelf/udpwriter_test.go +++ b/gelf/udpwriter_test.go @@ -25,14 +25,48 @@ func TestNewUDPWriter(t *testing.T) { } } -func sendAndRecv(msgData string, compress CompressType) (*Message, error) { - r, err := NewReader("127.0.0.1:0") - if err != nil { +func sendAndRecvRaw(t *testing.T, msgData string, compress CompressType) ([]byte, error) { + t.Helper() + var err error + var r *Reader + var w *UDPWriter + + if r, err = NewReader("127.0.0.1:0"); err != nil { return nil, fmt.Errorf("NewReader: %s", err) } + defer r.Close() - w, err := NewUDPWriter(r.Addr()) + if w, err = NewUDPWriter(r.Addr()); err != nil { + return nil, fmt.Errorf("NewUDPWriter: %s", err) + } + w.CompressionType = compress + + if _, err = w.Write([]byte(msgData)); err != nil { + return nil, fmt.Errorf("w.Write: %s", err) + } + + w.Close() + b := make([]byte, 512) + i, err := r.Read(b) if err != nil { + return nil, fmt.Errorf("r.Read: %s", err) + } + + return b[:i], nil +} + +func sendAndRecv(t *testing.T, msgData string, compress CompressType) (*Message, error) { + t.Helper() + var err error + var r *Reader + var w *UDPWriter + + if r, err = NewReader("127.0.0.1:0"); err != nil { + return nil, fmt.Errorf("NewReader: %s", err) + } + defer r.Close() + + if w, err = NewUDPWriter(r.Addr()); err != nil { return nil, fmt.Errorf("NewUDPWriter: %s", err) } w.CompressionType = compress @@ -45,14 +79,18 @@ func sendAndRecv(msgData string, compress CompressType) (*Message, error) { return r.ReadMessage() } -func sendAndRecvMsg(msg *Message, compress CompressType) (*Message, error) { - r, err := NewReader("127.0.0.1:0") - if err != nil { +func sendAndRecvMsg(t *testing.T, msg *Message, compress CompressType) (*Message, error) { + t.Helper() + var err error + var r *Reader + var w *UDPWriter + + if r, err = NewReader("127.0.0.1:0"); err != nil { return nil, fmt.Errorf("NewReader: %s", err) } + defer r.Close() - w, err := NewUDPWriter(r.Addr()) - if err != nil { + if w, err = NewUDPWriter(r.Addr()); err != nil { return nil, fmt.Errorf("NewUDPWriter: %s", err) } w.CompressionType = compress @@ -65,13 +103,32 @@ func sendAndRecvMsg(msg *Message, compress CompressType) (*Message, error) { return r.ReadMessage() } +// tests read raw single-message (non-chunked) messages that are split over +// multiple lines +func TestReadRawSmallMultiLine(t *testing.T) { + for _, i := range []CompressType{CompressGzip, CompressZlib, CompressNone} { + msgData := "awesomesauce\nbananas" + + raw, err := sendAndRecvRaw(t, msgData, i) + if err != nil { + t.Errorf("sendAndRecv: %s", err) + return + } + + if string(raw) != msgData { + t.Errorf("raw: expected %q, got %q", msgData, string(raw)) + return + } + } +} + // tests single-message (non-chunked) messages that are split over // multiple lines func TestWriteSmallMultiLine(t *testing.T) { for _, i := range []CompressType{CompressGzip, CompressZlib, CompressNone} { msgData := "awesomesauce\nbananas" - msg, err := sendAndRecv(msgData, i) + msg, err := sendAndRecv(t, msgData, i) if err != nil { t.Errorf("sendAndRecv: %s", err) return @@ -94,7 +151,7 @@ func TestWriteSmallOneLine(t *testing.T) { msgData := "some awesome thing\n" msgDataTrunc := msgData[:len(msgData)-1] - msg, err := sendAndRecv(msgData, CompressGzip) + msg, err := sendAndRecv(t, msgData, CompressGzip) if err != nil { t.Errorf("sendAndRecv: %s", err) return @@ -153,7 +210,7 @@ func TestWriteBigChunked(t *testing.T) { msgData := "awesomesauce\n" + base64.StdEncoding.EncodeToString(randData) for _, i := range []CompressType{CompressGzip, CompressZlib} { - msg, err := sendAndRecv(msgData, i) + msg, err := sendAndRecv(t, msgData, i) if err != nil { t.Errorf("sendAndRecv: %s", err) return @@ -198,7 +255,7 @@ func TestExtraData(t *testing.T) { } for _, i := range []CompressType{CompressGzip, CompressZlib} { - msg, err := sendAndRecvMsg(&m, i) + msg, err := sendAndRecvMsg(t, &m, i) if err != nil { t.Errorf("sendAndRecv: %s", err) return @@ -241,6 +298,8 @@ func BenchmarkWriteBestSpeed(b *testing.B) { if err != nil { b.Fatalf("NewReader: %s", err) } + defer r.Close() + go io.Copy(ioutil.Discard, r) w, err := NewUDPWriter(r.Addr()) if err != nil { @@ -267,6 +326,8 @@ func BenchmarkWriteNoCompression(b *testing.B) { if err != nil { b.Fatalf("NewReader: %s", err) } + defer r.Close() + go io.Copy(ioutil.Discard, r) w, err := NewUDPWriter(r.Addr()) if err != nil { @@ -293,6 +354,8 @@ func BenchmarkWriteDisableCompressionCompletely(b *testing.B) { if err != nil { b.Fatalf("NewReader: %s", err) } + defer r.Close() + go io.Copy(ioutil.Discard, r) w, err := NewUDPWriter(r.Addr()) if err != nil { @@ -319,6 +382,8 @@ func BenchmarkWriteDisableCompressionAndPreencodeExtra(b *testing.B) { if err != nil { b.Fatalf("NewReader: %s", err) } + defer r.Close() + go io.Copy(ioutil.Discard, r) w, err := NewUDPWriter(r.Addr()) if err != nil {