Skip to content

Commit

Permalink
Refactor Event type and add more test coverage (#36906)
Browse files Browse the repository at this point in the history
* Added tests with full coverage
* Standardized the behavior with `mapstr.M`.
  • Loading branch information
rdner authored Oct 24, 2023
1 parent 58b2de1 commit 546a413
Show file tree
Hide file tree
Showing 3 changed files with 675 additions and 373 deletions.
234 changes: 159 additions & 75 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,30 @@ package beat

import (
"errors"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type updateMode bool

var (
updateModeOverwrite updateMode = true
updateModeNoOverwrite updateMode = false
)

// FlagField fields used to keep information or errors when events are parsed.
const FlagField = "log.flags"

const (
timestampFieldKey = "@timestamp"
metadataFieldKey = "@metadata"
TimestampFieldKey = "@timestamp"
MetadataFieldKey = "@metadata"
ErrorFieldKey = "error"
metadataKeyPrefix = MetadataFieldKey + "."
metadataKeyOffset = len(metadataKeyPrefix)
)

// Event is the common event format shared by all beats.
Expand All @@ -47,28 +58,44 @@ type Event struct {
}

var (
errNoTimestamp = errors.New("value is no timestamp")
errNoMapStr = errors.New("value is no map[string]interface{} type")
ErrValueNotTimestamp = errors.New("value is not a timestamp")
ErrValueNotMapStr = errors.New("value is not `mapstr.M` or `map[string]interface{}` type")
ErrAlterMetadataKey = fmt.Errorf("deleting/replacing %q key is not supported", MetadataFieldKey)
ErrMetadataAccess = fmt.Errorf("accessing %q key directly is not supported, try nested keys", MetadataFieldKey)
ErrDeleteTimestamp = fmt.Errorf("deleting %q key is not supported", TimestampFieldKey)
)

// SetID overwrites the "id" field in the events metadata.
// If Meta is nil, a new Meta dictionary is created.
func (e *Event) SetID(id string) {
if e.Meta == nil {
e.Meta = mapstr.M{}
}
e.Meta["_id"] = id
_, _ = e.PutValue(metadataKeyPrefix+"_id", id)
}

// GetValue gets a value from the event. If the key does not exist then an error
// is returned.
//
// Use `@timestamp` key for getting the event timestamp.
// Use `@metadata.*` keys for getting the event metadata fields.
// If `@metadata` key is used then `ErrMetadataAccess` is returned.
func (e *Event) GetValue(key string) (interface{}, error) {
if key == timestampFieldKey {
if key == TimestampFieldKey {
return e.Timestamp, nil
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" || e.Meta == nil {
return e.Meta, nil
}
if key == MetadataFieldKey {
return nil, ErrMetadataAccess
}

if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
return nil, mapstr.ErrKeyNotFound
}
return e.Meta.GetValue(subKey)
}

if e.Fields == nil {
return nil, mapstr.ErrKeyNotFound
}

return e.Fields.GetValue(key)
}

Expand All @@ -92,7 +119,7 @@ func (e *Event) Clone() *Event {
// `DeepUpdateNoOverwrite` is a version of this function that does not
// overwrite existing values.
func (e *Event) DeepUpdate(d mapstr.M) {
e.deepUpdate(d, true)
e.deepUpdate(d, updateModeOverwrite)
}

// DeepUpdateNoOverwrite recursively copies the key-value pairs from `d` to various properties of the event.
Expand All @@ -103,31 +130,34 @@ func (e *Event) DeepUpdate(d mapstr.M) {
// via `DeepUpdateNoOverwrite`.
// `DeepUpdate` is a version of this function that overwrites existing values.
func (e *Event) DeepUpdateNoOverwrite(d mapstr.M) {
e.deepUpdate(d, false)
e.deepUpdate(d, updateModeNoOverwrite)
}

func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
func (e *Event) deepUpdate(d mapstr.M, mode updateMode) {
if len(d) == 0 {
return
}

// It's supported to update the timestamp using this function.
// However, we must handle it separately since it's a separate field of the event.
timestampValue, timestampExists := d[timestampFieldKey]
timestampValue, timestampExists := d[TimestampFieldKey]
if timestampExists {
if overwrite {
_ = e.setTimestamp(timestampValue)
if mode == updateModeOverwrite {
_, _ = e.setTimestamp(timestampValue)
}

// Temporary delete it from the update map,
// so we can do `e.Fields.DeepUpdate(d)` or
// `e.Fields.DeepUpdateNoOverwrite(d)` later
delete(d, timestampFieldKey)
delete(d, TimestampFieldKey)
defer func() {
d[TimestampFieldKey] = timestampValue
}()
}

// It's supported to update the metadata using this function.
// However, we must handle it separately since it's a separate field of the event.
metaValue, metaExists := d[metadataFieldKey]
metaValue, metaExists := d[MetadataFieldKey]
if metaExists {
var metaUpdate mapstr.M

Expand All @@ -142,29 +172,23 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
if e.Meta == nil {
e.Meta = mapstr.M{}
}
if overwrite {
switch mode {
case updateModeOverwrite:
e.Meta.DeepUpdate(metaUpdate)
} else {
case updateModeNoOverwrite:
e.Meta.DeepUpdateNoOverwrite(metaUpdate)
}
}

// Temporary delete it from the update map,
// so we can do `e.Fields.DeepUpdate(d)` or
// `e.Fields.DeepUpdateNoOverwrite(d)` later
delete(d, metadataFieldKey)
delete(d, MetadataFieldKey)
defer func() {
d[MetadataFieldKey] = metaValue
}()
}

// At the end we revert all changes we made to the update map
defer func() {
if timestampExists {
d[timestampFieldKey] = timestampValue
}
if metaExists {
d[metadataFieldKey] = metaValue
}
}()

if len(d) == 0 {
return
}
Expand All @@ -173,90 +197,150 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
e.Fields = mapstr.M{}
}

if overwrite {
switch mode {
case updateModeOverwrite:
e.Fields.DeepUpdate(d)
} else {
case updateModeNoOverwrite:
e.Fields.DeepUpdateNoOverwrite(d)
}
}

func (e *Event) setTimestamp(v interface{}) error {
func (e *Event) setTimestamp(v interface{}) (interface{}, error) {
// to satisfy the PutValue interface, this function
// must return the overwritten value
prevValue := e.Timestamp

switch ts := v.(type) {
case time.Time:
e.Timestamp = ts
return prevValue, nil
case common.Time:
e.Timestamp = time.Time(ts)
return prevValue, nil
default:
return errNoTimestamp
return nil, ErrValueNotTimestamp
}

return nil
}

// Put associates the specified value with the specified key. If the event
// previously contained a mapping for the key, the old value is replaced and
// returned. The key can be expressed in dot-notation (e.g. x.y) to put a value
// into a nested map.
//
// If you need insert keys containing dots then you must use bracket notation
// to insert values (e.g. m[key] = value).
//
// Use `@timestamp` key for setting the event timestamp.
// Use `@metadata.*` keys for setting the event metadata fields.
// If `@metadata` key is used then `ErrAlterMetadataKey` is returned.
func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == timestampFieldKey {
err := e.setTimestamp(v)
return nil, err
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" {
switch meta := v.(type) {
case mapstr.M:
e.Meta = meta
case map[string]interface{}:
e.Meta = meta
default:
return nil, errNoMapStr
}
} else if e.Meta == nil {
if key == TimestampFieldKey {
return e.setTimestamp(v)
}
if key == MetadataFieldKey {
return nil, ErrAlterMetadataKey
}

if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
e.Meta = mapstr.M{}
}

return e.Meta.Put(subKey, v)
}

if e.Fields == nil {
e.Fields = mapstr.M{}
}

return e.Fields.Put(key, v)
}

// Delete deletes the given key from the event.
//
// Use `@metadata.*` keys for deleting the event metadata fields.
// If `@metadata` key is used then `ErrAlterMetadataKey` is returned.
// If `@timestamp` key is used then `ErrDeleteTimestamp` is returned.
func (e *Event) Delete(key string) error {
if subKey, ok := metadataKey(key); ok {
if subKey == "" {
e.Meta = nil
return nil
}
if key == TimestampFieldKey {
return ErrDeleteTimestamp
}
if key == MetadataFieldKey {
return ErrAlterMetadataKey
}
if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
return nil
return mapstr.ErrKeyNotFound
}
return e.Meta.Delete(subKey)
}

if e.Fields == nil {
return mapstr.ErrKeyNotFound
}
return e.Fields.Delete(key)
}

func metadataKey(key string) (string, bool) {
if !strings.HasPrefix(key, metadataFieldKey) {
func (e *Event) metadataSubKey(key string) (string, bool) {
if !strings.HasPrefix(key, metadataKeyPrefix) {
return "", false
}

subKey := key[len(metadataFieldKey):]
subKey := key[metadataKeyOffset:]
if subKey == "" {
return "", true
}
if subKey[0] == '.' {
return subKey[1:], true
return "", false
}
return "", false
return subKey, true
}

// SetErrorWithOption sets the event error field with the message when the addErrKey is set to true.
// If you want to include the data and field you can pass them as parameters and will be appended into the
// error as fields with the corresponding name.
func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string, field string) {
if addErrKey {
errorField := mapstr.M{"message": message, "type": "json"}
if data != "" {
errorField["data"] = data
}
if field != "" {
errorField["field"] = field
if !addErrKey {
return
}

errorField := mapstr.M{"message": message, "type": "json"}
if data != "" {
errorField["data"] = data
}
if field != "" {
errorField["field"] = field
}
e.Fields[ErrorFieldKey] = errorField
}

// String returns a string representation of the event.
func (e *Event) String() string {
m := mapstr.M{
TimestampFieldKey: e.Timestamp,
MetadataFieldKey: mapstr.M{},
}
if e.Meta != nil {
m[MetadataFieldKey] = e.Meta
}
m.DeepUpdate(e.Fields)
return m.String()
}

// HasKey returns true if the key exist. If an error occurs then false is
// returned with a non-nil error.
func (e *Event) HasKey(key string) (bool, error) {
if key == TimestampFieldKey || key == MetadataFieldKey {
return true, nil
}

if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
return false, nil
}
e.Fields["error"] = errorField
return e.Meta.HasKey(subKey)
}

if e.Fields == nil {
return false, nil
}

return e.Fields.HasKey(key)
}
Loading

0 comments on commit 546a413

Please sign in to comment.