diff --git a/libbeat/processors/actions/append_test.go b/libbeat/processors/actions/append_test.go index 8cb8549b389..ddc03d6f322 100644 --- a/libbeat/processors/actions/append_test.go +++ b/libbeat/processors/actions/append_test.go @@ -273,6 +273,7 @@ func Test_appendProcessor_Run(t *testing.T) { logger: log, config: appendProcessorConfig{ Fields: []string{"field"}, + Values: []interface{}{"value3", "value4"}, TargetField: "target", }, }, @@ -281,7 +282,7 @@ func Test_appendProcessor_Run(t *testing.T) { Meta: mapstr.M{}, Fields: mapstr.M{ "field": "I'm being appended", - "target": []interface{}{"value1", "value2", "I'm being appended"}, + "target": []interface{}{"value1", "value2", "I'm being appended", "value3", "value4"}, }, }, }, diff --git a/libbeat/processors/processor_test.go b/libbeat/processors/processor_test.go index 41ed628fbfb..91d122365e7 100644 --- a/libbeat/processors/processor_test.go +++ b/libbeat/processors/processor_test.go @@ -18,15 +18,23 @@ package processors_test import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/processors" _ "github.com/elastic/beats/v7/libbeat/processors/actions" _ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata" + _ "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" + _ "github.com/elastic/beats/v7/libbeat/processors/convert" + _ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields" + _ "github.com/elastic/beats/v7/libbeat/processors/dissect" + _ "github.com/elastic/beats/v7/libbeat/processors/extract_array" + _ "github.com/elastic/beats/v7/libbeat/processors/urldecode" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -566,3 +574,216 @@ func TestDropMissingFields(t *testing.T) { assert.Equal(t, expectedEvent, processedEvent.Fields) } + +const ( + fieldCount = 20 + depth = 3 +) + +func BenchmarkEventBackups(b *testing.B) { + // listing all the processors that revert changes in case of an error + yml := []map[string]interface{}{ + { + "append": map[string]interface{}{ + "target_field": "append_target", + "values": []interface{}{"third", "fourth"}, + "fail_on_error": true, + }, + }, + { + "copy_fields": map[string]interface{}{ + "fields": []map[string]interface{}{ + { + "from": "copy_from", + "to": "copy.to", + }, + }, + "fail_on_error": true, + }, + }, + { + "decode_base64_field": map[string]interface{}{ + "field": map[string]interface{}{ + "from": "base64_from", + "to": "base64_to", + }, + "fail_on_error": true, + }, + }, + { + "decompress_gzip_field": map[string]interface{}{ + "field": map[string]interface{}{ + "from": "gzip_from", + "to": "gzip_to", + }, + "fail_on_error": true, + }, + }, + { + "rename": map[string]interface{}{ + "fields": []map[string]interface{}{ + { + "from": "rename_from", + "to": "rename.to", + }, + }, + "fail_on_error": true, + }, + }, + { + "replace": map[string]interface{}{ + "fields": []map[string]interface{}{ + { + "field": "replace_test", + "pattern": "to replace", + "replacement": "replaced", + }, + }, + "fail_on_error": true, + }, + }, + { + "truncate_fields": map[string]interface{}{ + "fields": []interface{}{"to_truncate"}, + "max_characters": 4, + "fail_on_error": true, + }, + }, + { + "convert": map[string]interface{}{ + "fields": []map[string]interface{}{ + { + "from": "convert_from", + "to": "convert.to", + "type": "integer", + }, + }, + "fail_on_error": true, + }, + }, + { + "decode_csv_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "csv_from": "csv.to", + }, + "fail_on_error": true, + }, + }, + // it creates a backup unless `ignore_failure` is true + { + "dissect": map[string]interface{}{ + "tokenizer": "%{key1} %{key2}", + "field": "to_dissect", + }, + }, + { + "extract_array": map[string]interface{}{ + "field": "array_test", + "mappings": map[string]interface{}{ + "array_first": 0, + "array_second": 1, + }, + "fail_on_error": true, + }, + }, + { + "urldecode": map[string]interface{}{ + "fields": []map[string]interface{}{ + { + "from": "url_from", + "to": "url.to", + }, + }, + + "fail_on_error": true, + }, + }, + } + + processors := GetProcessors(b, yml) + event := &beat.Event{ + Timestamp: time.Now(), + Meta: mapstr.M{}, + Fields: mapstr.M{ + "append_target": []interface{}{"first", "second"}, + "copy_from": "to_copy", + "base64_from": "dmFsdWU=", + // "decompressed data" + "gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}), + "rename_from": "renamed_value", + "replace_test": "something to replace", + "to_truncate": "something very long", + "convert_from": "42", + "csv_from": "1,2,3,4", + "to_dissect": "some words", + "array_test": []string{"first", "second"}, + "url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome", + }, + } + + expFields := mapstr.M{ + "append_target": []interface{}{"first", "second", "third", "fourth"}, + "copy_from": "to_copy", + "copy": mapstr.M{ + "to": "to_copy", + }, + "base64_from": "dmFsdWU=", + "base64_to": "value", + "gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}), + "gzip_to": "decompressed data", + "rename": mapstr.M{"to": "renamed_value"}, + "replace_test": "something replaced", + "to_truncate": "some", + "convert_from": "42", + "convert": mapstr.M{"to": int32(42)}, + "csv_from": "1,2,3,4", + "csv": mapstr.M{"to": []string{"1", "2", "3", "4"}}, + "to_dissect": "some words", + "dissect": mapstr.M{ + "key1": "some", + "key2": "words", + }, + "array_test": []string{"first", "second"}, + "array_first": "first", + "array_second": "second", + "url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome", + "url": mapstr.M{"to": "https://www.elastic.co?some"}, + } + + generateFields(b, event.Meta, fieldCount, depth) + generateFields(b, event.Fields, fieldCount, depth) + + var ( + result *beat.Event + clone *beat.Event + err error + ) + + b.Run("run processors that use backups", func(b *testing.B) { + for i := 0; i < b.N; i++ { + clone = event.Clone() // necessary for making and comparing changes + result, err = processors.Run(clone) + } + require.NoError(b, err) + require.NotNil(b, result) + }) + + require.Equal(b, fmt.Sprintf("%p", clone), fmt.Sprintf("%p", result), "should be the same event") + for key := range expFields { + require.Equal(b, expFields[key], clone.Fields[key], fmt.Sprintf("%s does not match", key)) + } +} + +func generateFields(t require.TestingT, m mapstr.M, count, nesting int) { + for i := 0; i < count; i++ { + var err error + if nesting == 0 { + _, err = m.Put(fmt.Sprintf("field-%d", i), fmt.Sprintf("value-%d", i)) + } else { + nested := mapstr.M{} + generateFields(t, nested, count, nesting-1) + _, err = m.Put(fmt.Sprintf("field-%d", i), nested) + } + require.NoError(t, err) + } +}