Skip to content

Commit

Permalink
Elasticsearch output batching, and gzip compression support.
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Maus <aleksandr.maus@elastic.co>
  • Loading branch information
aleksmaus committed Aug 22, 2024
1 parent c6e0752 commit 49b8054
Show file tree
Hide file tree
Showing 10 changed files with 557 additions and 82 deletions.
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,15 @@ func getConfig() *types.Configuration {
c.Elasticsearch.NumberOfShards = 3
}

if c.Elasticsearch.Batching.Enabled {
if c.Elasticsearch.Batching.BatchSize <= 0 {
c.Elasticsearch.Batching.BatchSize = types.DefaultBatchSize
}
if c.Elasticsearch.Batching.FlushInterval <= 0 {
c.Elasticsearch.Batching.FlushInterval = types.DefaultFlushInterval
}
}

if c.Prometheus.ExtraLabels != "" {
c.Prometheus.ExtraLabelsList = strings.Split(strings.ReplaceAll(c.Prometheus.ExtraLabels, " ", ""), ",")
}
Expand Down
6 changes: 6 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ elasticsearch:
# numberofreplicas: 3 # number of replicas set by the index template (default: 3)
# customHeaders: # Custom headers to add in POST, useful for Authentication
# key: value
# enablecompression: # if true enables gzip compression for http requests (default: false)
# batching: # batching configuration, improves throuput dramatically using _bulk Elasticsearch API
# enabled: true # if true enables batching
# batchsize: 5242880 # batch request size in bytes (default: 5 MB)
# flushinterval: 1s # batch fush interval (default: 1s)
# maxconcurrentrequests: # max number of concurrent http requests (default: 1)

quickwit:
# hostport: "" # http(s)://{domain or ip}:{port}, if not empty, Quickwit output is enabled
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/embano1/memlog v0.4.6
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43
github.com/emersion/go-smtp v0.21.3
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.5
github.com/jackc/pgx/v5 v5.6.0
Expand Down
132 changes: 132 additions & 0 deletions internal/pkg/batcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

package batcher

import (
"bytes"
"encoding/json"
"sync"
"time"

"github.com/falcosecurity/falcosidekick/types"
)

const (
defaultBatchSize = 5 * 1024 * 1024 // max batch size in bytes, 5MB by default
defaultFlushInterval = time.Second
)

type CallbackFunc func(falcoPayloads []types.FalcoPayload, serialized []byte)

type OptionFunc func(b *Batcher)

// MarshalFunc is a callback that allows the user of the batcher to overwrite the default JSON marshalling
type MarshalFunc func(payload types.FalcoPayload) ([]byte, error)

// Batcher A simple generic implementation of Falco payloads batching
// Batching can be configured by the batchSize which is a max number of payloads in the batch or the flushInterval.
// The callback function is called when the number of payloads reaches the batchSize or upon the flushInterval
type Batcher struct {
batchSize int
flushInterval time.Duration

callbackFn CallbackFunc
marshalFn MarshalFunc

mx sync.Mutex

pending bytes.Buffer
// Keeping the original payloads for errors resolution
pendingPayloads []types.FalcoPayload

curTimer *time.Timer
}

func New(opts ...OptionFunc) *Batcher {
b := &Batcher{
batchSize: defaultBatchSize,
flushInterval: defaultFlushInterval,
callbackFn: func(falcoPayloads []types.FalcoPayload, batch []byte) {},
marshalFn: jsonMarshal,
}

for _, opt := range opts {
opt(b)
}

return b
}

func WithBatchSize(sz int) OptionFunc {
return func(b *Batcher) {
b.batchSize = sz
}
}

func WithFlushInterval(interval time.Duration) OptionFunc {
return func(b *Batcher) {
b.flushInterval = interval
}
}

func WithCallback(cb CallbackFunc) OptionFunc {
return func(b *Batcher) {
b.callbackFn = cb
}
}

func WithMarshal(fn MarshalFunc) OptionFunc {
return func(b *Batcher) {
b.marshalFn = fn
}
}

func (b *Batcher) Push(falcopayload types.FalcoPayload) error {
b.mx.Lock()
defer b.mx.Unlock()

data, err := b.marshalFn(falcopayload)
if err != nil {
return err
}
if b.pending.Len() == 0 {
b.scheduleFlushInterval()
} else if b.pending.Len()+len(data) > b.batchSize {
b.flush()
b.scheduleFlushInterval()
}
_, _ = b.pending.Write(data)
b.pendingPayloads = append(b.pendingPayloads, falcopayload)
return nil
}

func (b *Batcher) scheduleFlushInterval() {
if b.curTimer != nil {
b.curTimer.Stop()
}
b.curTimer = time.AfterFunc(b.flushInterval, b.flushOnTimer)
}

func (b *Batcher) flushOnTimer() {
b.mx.Lock()
defer b.mx.Unlock()
b.flush()
}

func (b *Batcher) flush() {
if b.pending.Len() == 0 {
return
}

serialized := b.pending.Bytes()
falcoPayloads := b.pendingPayloads

b.pending = bytes.Buffer{}
b.pendingPayloads = nil
b.callbackFn(falcoPayloads, serialized)
}

// jsonMarshal default marshal function
func jsonMarshal(payload types.FalcoPayload) ([]byte, error) {
return json.Marshal(payload)
}
79 changes: 79 additions & 0 deletions internal/pkg/batcher/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

package batcher

import (
"encoding/json"
"sync"
"testing"
"time"

"github.com/falcosecurity/falcosidekick/types"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
)

func TestElasticsearchBatcher(t *testing.T) {
const (
batchSize = 1234
testCount = 100
flushInterval = 300 * time.Millisecond
)

// Just to emulated similar payload for testing, not strictly needed
type eSPayload struct {
types.FalcoPayload
Timestamp time.Time `json:"@timestamp"`
}

marshalFunc := func(payload types.FalcoPayload) ([]byte, error) {
return json.Marshal(eSPayload{FalcoPayload: payload, Timestamp: payload.Time})
}

var wantBatches, gotBatches [][]byte

var mx sync.Mutex
batcher := New(
WithBatchSize(batchSize),
WithFlushInterval(500*time.Millisecond),
WithMarshal(marshalFunc),
WithCallback(func(falcoPayloads []types.FalcoPayload, data []byte) {
mx.Lock()
defer mx.Unlock()
gotBatches = append(gotBatches, data)
}))

var currentBatch []byte
for i := 0; i < testCount; i++ {
payload := types.FalcoPayload{UUID: uuid.Must(uuid.NewV7()).String()}
data, err := marshalFunc(payload)
if err != nil {
t.Fatal(err)
}

if len(currentBatch)+len(data) > batchSize {
wantBatches = append(wantBatches, currentBatch)
currentBatch = nil
}

currentBatch = append(currentBatch, data...)

err = batcher.Push(payload)
if err != nil {
t.Fatal(err)
}
}
wantBatches = append(wantBatches, currentBatch)

// give it time to flush
time.Sleep(flushInterval * 2)

mx.Lock()
defer mx.Unlock()
diff := cmp.Diff(wantBatches, gotBatches)
if diff != "" {
t.Fatal(diff)
}

}
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ func init() {

if config.Elasticsearch.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/%s/%s", config.Elasticsearch.HostPort, config.Elasticsearch.Index, config.Elasticsearch.Type)
elasticsearchClient, err = outputs.NewClient("Elasticsearch", endpointUrl, config.Elasticsearch.CommonConfig, *initClientArgs)
elasticsearchClient, err = outputs.NewElasticsearchClient(*initClientArgs)
if err != nil {
config.Elasticsearch.HostPort = ""
} else {
Expand Down
Loading

0 comments on commit 49b8054

Please sign in to comment.