Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch output batching and gzip compression support #967

Merged
merged 3 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,15 @@ func getConfig() *types.Configuration {
c.Elasticsearch.NumberOfShards = 3
}

if c.Elasticsearch.Batching.Enabled {
if c.Elasticsearch.Batching.BatchSize <= 0 {
c.Elasticsearch.Batching.BatchSize = types.DefaultBatchSize
}
if c.Elasticsearch.Batching.FlushInterval <= 0 {
c.Elasticsearch.Batching.FlushInterval = types.DefaultFlushInterval
}
}

if c.Prometheus.ExtraLabels != "" {
c.Prometheus.ExtraLabelsList = strings.Split(strings.ReplaceAll(c.Prometheus.ExtraLabels, " ", ""), ",")
}
Expand Down
6 changes: 6 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ elasticsearch:
# numberofreplicas: 3 # number of replicas set by the index template (default: 3)
# customHeaders: # Custom headers to add in POST, useful for Authentication
# key: value
# enablecompression: false # if true enables gzip compression for http requests (default: false)
# batching: # batching configuration, improves throughput dramatically utilizing _bulk Elasticsearch API
# enabled: true # if true enables batching
# batchsize: 5242880 # batch size in bytes (default: 5 MB)
# flushinterval: 1s # batch fush interval (default: 1s)
# maxconcurrentrequests: 1 # max number of concurrent http requests (default: 1)

quickwit:
# hostport: "" # http(s)://{domain or ip}:{port}, if not empty, Quickwit output is enabled
Expand Down
20 changes: 19 additions & 1 deletion docs/outputs/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

| Setting | Env var | Default value | Description |
| ----------------------------------- | ----------------------------------- | ---------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `elasticsearch.hosport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** |
| `elasticsearch.hostport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** |
| `elasticsearch.index` | `ELASTICSEARCH_INDEX` | `falco` | Index |
| `elasticsearch.type` | `ELASTICSEARCH_TYPE` | `_doc` | Index |
| `elasticsearch.suffix` | `ELASTICSEARCH_SUFFIX` | `daily` | Date suffix for index rotation : `daily`, `monthly`, `annually`, `none` |
Expand All @@ -29,10 +29,22 @@
| `elasticsearch.mutualtls` | `ELASTICSEARCH_MUTUALTLS` | `false` | Authenticate to the output with TLS, if true, checkcert flag will be ignored (server cert will always be checked) |
| `elasticsearch.checkcert` | `ELASTICSEARCH_CHECKCERT` | `true` | Check if ssl certificate of the output is valid |
| `elasticsearch.minimumpriority` | `ELASTICSEARCH_MINIMUMPRIORITY` | `""` (= `debug`) | Minimum priority of event for using this output, order is `emergency,alert,critical,error,warning,notice,informational,debug or ""` |
| `elasticsearch.maxconcurrentrequests` | `ELASTICSEARCH_MAXCONCURRENTREQUESTS` | `1` | Max number of concurrent requests |
| `elasticsearch.enablecompression` | `ELASTICSEARCH_ENABLECOMPRESSION` | `false` | Enables gzip compression |
| `elasticsearch.batching.enabled` | | `false` | Enables batching (utilizing Elasticsearch bulk API)
| `elasticsearch.batching.batchsize` | | `5242880` | Batch size in bytes, default 5MB
| `elasticsearch.batching.flushinterval` | | `1s` | Batch flush interval, use valid Go duration string

> [!NOTE]
The Env var values override the settings from yaml file.

> [!NOTE]
Increasing the default number of concurrent requests is a good way to increase throughput of the http outputs. This also increases the potential number of open connections. Choose wisely.

> [!NOTE]
Enabling batching for Elasticsearch is invaluable when the expected number of falco alerts is in the hundreds or thousands per second. The batching of data can be fine-tuned for your specific use case. The batch request is sent to Elasticsearch when the pending data size reaches `batchsize` or upon the `flushinterval`.
Enabling gzip compression increases throughput even further.

> [!WARNING]
By enabling the creation of the index template with `elasticsearch.createindextemplate=true`, the output fields of the Falco events will be flatten to avoid any mapping conflict.

Expand All @@ -51,6 +63,12 @@ elasticsearch:
# mutualtls: false # if true, checkcert flag will be ignored (server cert will always be checked)
# checkcert: true # check if ssl certificate of the output is valid (default: true)
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
# enablecompression: # if true enables gzip compression for http requests (default: false)
# batching: # batching configuration, improves throughput dramatically utilizing _bulk Elasticsearch API
# enabled: true # if true enables batching
# batchsize: 5242880 # batch size in bytes (default: 5 MB)
# flushinterval: 1s # batch fush interval (default: 1s)
# maxconcurrentrequests: # max number of concurrent http requests (default: 1)
```

## Screenshots
Expand Down
2 changes: 1 addition & 1 deletion docs/outputs/quickwit.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

| Setting | Env var | Default value | Description |
| ------------------------------- | ------------------------------- | ---------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| `quickwit.hosport` | `QUICKWIT_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Quickwit output is **enabled** |
| `quickwit.hostport` | `QUICKWIT_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Quickwit output is **enabled** |
| `quickwit.apiendpoint` | `QUICKWIT_APIENDPOINT` | `api/v1` | API endpoint (containing the API version, overideable in case of quickwit behind a reverse proxy with URL rewriting) |
| `quickwit.index` | `QUICKWIT_INDEX` | `falco` | Index |
| `quickwit.version` | `QUICKWIT_VERSION` | `0.7` | Version of quickwit |
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/embano1/memlog v0.4.6
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43
github.com/emersion/go-smtp v0.21.3
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.5
github.com/jackc/pgx/v5 v5.6.0
Expand Down
132 changes: 132 additions & 0 deletions internal/pkg/batcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

package batcher

import (
"bytes"
"encoding/json"
"sync"
"time"

"github.com/falcosecurity/falcosidekick/types"
)

const (
defaultBatchSize = 5 * 1024 * 1024 // max batch size in bytes, 5MB by default
defaultFlushInterval = time.Second
)

type CallbackFunc func(falcoPayloads []types.FalcoPayload, serialized []byte)

type OptionFunc func(b *Batcher)

// MarshalFunc is a callback that allows the user of the batcher to overwrite the default JSON marshalling
type MarshalFunc func(payload types.FalcoPayload) ([]byte, error)

// Batcher A simple generic implementation of Falco payloads batching
// Batching can be configured by the batchSize which is a max number of payloads in the batch or the flushInterval.
// The callback function is called when the number of payloads reaches the batchSize or upon the flushInterval
type Batcher struct {
batchSize int
flushInterval time.Duration

callbackFn CallbackFunc
marshalFn MarshalFunc

mx sync.Mutex

pending bytes.Buffer
// Keeping the original payloads for errors resolution
pendingPayloads []types.FalcoPayload

curTimer *time.Timer
}

func New(opts ...OptionFunc) *Batcher {
b := &Batcher{
batchSize: defaultBatchSize,
flushInterval: defaultFlushInterval,
callbackFn: func(falcoPayloads []types.FalcoPayload, batch []byte) {},
marshalFn: jsonMarshal,
}

for _, opt := range opts {
opt(b)
}

return b
}

func WithBatchSize(sz int) OptionFunc {
return func(b *Batcher) {
b.batchSize = sz
}
}

func WithFlushInterval(interval time.Duration) OptionFunc {
return func(b *Batcher) {
b.flushInterval = interval
}
}

func WithCallback(cb CallbackFunc) OptionFunc {
return func(b *Batcher) {
b.callbackFn = cb
}
}

func WithMarshal(fn MarshalFunc) OptionFunc {
return func(b *Batcher) {
b.marshalFn = fn
}
}

func (b *Batcher) Push(falcopayload types.FalcoPayload) error {
b.mx.Lock()
defer b.mx.Unlock()

data, err := b.marshalFn(falcopayload)
if err != nil {
return err
}
if b.pending.Len() == 0 {
b.scheduleFlushInterval()
} else if b.pending.Len()+len(data) > b.batchSize {
b.flush()
b.scheduleFlushInterval()
}
_, _ = b.pending.Write(data)
b.pendingPayloads = append(b.pendingPayloads, falcopayload)
return nil
}

func (b *Batcher) scheduleFlushInterval() {
if b.curTimer != nil {
b.curTimer.Stop()
}
b.curTimer = time.AfterFunc(b.flushInterval, b.flushOnTimer)
}

func (b *Batcher) flushOnTimer() {
b.mx.Lock()
defer b.mx.Unlock()
b.flush()
}

func (b *Batcher) flush() {
if b.pending.Len() == 0 {
return
}

serialized := b.pending.Bytes()
falcoPayloads := b.pendingPayloads

b.pending = bytes.Buffer{}
b.pendingPayloads = nil
b.callbackFn(falcoPayloads, serialized)
}

// jsonMarshal default marshal function
func jsonMarshal(payload types.FalcoPayload) ([]byte, error) {
return json.Marshal(payload)
}
79 changes: 79 additions & 0 deletions internal/pkg/batcher/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

package batcher

import (
"encoding/json"
"sync"
"testing"
"time"

"github.com/falcosecurity/falcosidekick/types"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
)

func TestElasticsearchBatcher(t *testing.T) {
const (
batchSize = 1234
testCount = 100
flushInterval = 300 * time.Millisecond
)

// Just to emulated similar payload for testing, not strictly needed
type eSPayload struct {
types.FalcoPayload
Timestamp time.Time `json:"@timestamp"`
}

marshalFunc := func(payload types.FalcoPayload) ([]byte, error) {
return json.Marshal(eSPayload{FalcoPayload: payload, Timestamp: payload.Time})
}

var wantBatches, gotBatches [][]byte

var mx sync.Mutex
batcher := New(
WithBatchSize(batchSize),
WithFlushInterval(500*time.Millisecond),
WithMarshal(marshalFunc),
WithCallback(func(falcoPayloads []types.FalcoPayload, data []byte) {
mx.Lock()
defer mx.Unlock()
gotBatches = append(gotBatches, data)
}))

var currentBatch []byte
for i := 0; i < testCount; i++ {
payload := types.FalcoPayload{UUID: uuid.Must(uuid.NewV7()).String()}
data, err := marshalFunc(payload)
if err != nil {
t.Fatal(err)
}

if len(currentBatch)+len(data) > batchSize {
wantBatches = append(wantBatches, currentBatch)
currentBatch = nil
}

currentBatch = append(currentBatch, data...)

err = batcher.Push(payload)
if err != nil {
t.Fatal(err)
}
}
wantBatches = append(wantBatches, currentBatch)

// give it time to flush
time.Sleep(flushInterval * 2)

mx.Lock()
defer mx.Unlock()
diff := cmp.Diff(wantBatches, gotBatches)
if diff != "" {
t.Fatal(diff)
}

}
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ func init() {

if config.Elasticsearch.HostPort != "" {
var err error
endpointUrl := fmt.Sprintf("%s/%s/%s", config.Elasticsearch.HostPort, config.Elasticsearch.Index, config.Elasticsearch.Type)
elasticsearchClient, err = outputs.NewClient("Elasticsearch", endpointUrl, config.Elasticsearch.CommonConfig, *initClientArgs)
elasticsearchClient, err = outputs.NewElasticsearchClient(*initClientArgs)
if err != nil {
config.Elasticsearch.HostPort = ""
} else {
Expand Down
Loading