Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aws] feat: aws-s3 input registry cleanup for untracked s3 objects #41694

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]

*Auditbeat*

Expand Down
24 changes: 21 additions & 3 deletions x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,19 @@ func (in *s3PollerInput) runPoll(ctx context.Context) {
}

// Start reading data and wait for its processing to be done
in.readerLoop(ctx, workChan)
ids, ok := in.readerLoop(ctx, workChan)
workerWg.Wait()

if !ok {
in.log.Warn("skipping state registry cleanup as object reading ended with a non-ok return")
return
}

// Perform state cleanup operation
err := in.states.CleanUp(ids)
Kavindu-Dodan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
in.log.Errorf("failed to cleanup states: %v", err.Error())
}
}

func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) {
Expand Down Expand Up @@ -183,7 +194,10 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state)
}
}

func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) {
// readerLoop performs the S3 object listing and emit state to work listeners if object needs to be processed.
// Returns all tracked state IDs correlates to all tracked S3 objects iff listing is successful.
// These IDs are intended to be used for state clean-up.
func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) (knownStateIDSlice []string, ok bool) {
defer close(workChan)

bucketName := getBucketNameFromARN(in.config.getBucketARN())
Expand All @@ -202,7 +216,7 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
circuitBreaker++
if circuitBreaker >= readerLoopMaxCircuitBreaker {
in.log.Warnw(fmt.Sprintf("%d consecutive error when paginating listing, breaking the circuit.", circuitBreaker), "error", err)
break
return nil, false
}
}
// add a backoff delay and try again
Expand All @@ -219,6 +233,8 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
in.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects))
for _, object := range page.Contents {
state := newState(bucketName, *object.Key, *object.ETag, *object.LastModified)
knownStateIDSlice = append(knownStateIDSlice, state.ID())

if in.states.IsProcessed(state) {
in.log.Debugw("skipping state.", "state", state)
continue
Expand All @@ -229,6 +245,8 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
in.metrics.s3ObjectsProcessedTotal.Inc()
}
}

return knownStateIDSlice, true
}

func (in *s3PollerInput) s3EventForState(state state) s3EventV2 {
Expand Down
47 changes: 39 additions & 8 deletions x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const awsS3ObjectStatePrefix = "filebeat::aws-s3::state::"
type states struct {
// Completed S3 object states, indexed by state ID.
// statesLock must be held to access states.
states map[string]state
states map[string]*state
Kavindu-Dodan marked this conversation as resolved.
Show resolved Hide resolved
statesLock sync.Mutex

// The store used to persist state changes to the registry.
Expand Down Expand Up @@ -70,29 +70,61 @@ func (s *states) AddState(state state) error {
id := state.ID()
// Update in-memory copy
s.statesLock.Lock()
s.states[id] = state
s.states[id] = &state
s.statesLock.Unlock()

// Persist to the registry
s.storeLock.Lock()
defer s.storeLock.Unlock()
key := awsS3ObjectStatePrefix + id
if err := s.store.Set(key, state); err != nil {
if err := s.store.Set(getStoreKey(id), state); err != nil {
return err
}
return nil
}

// CleanUp performs state and store cleanup based on provided knownIDs.
// knownIDs must contain valid currently tracked state IDs that must be known by this state registry.
// State and underlying storage will be cleaned if ID is no longer present in knownIDs set.
func (s *states) CleanUp(knownIDs []string) error {
knownIDHashSet := map[string]struct{}{}
for _, id := range knownIDs {
knownIDHashSet[id] = struct{}{}
}

s.storeLock.Lock()
defer s.storeLock.Unlock()
s.statesLock.Lock()
defer s.statesLock.Unlock()

for id := range s.states {
if _, contains := knownIDHashSet[id]; !contains {
// remove from sate & store as ID is no longer seen in known ID set
delete(s.states, id)
err := s.store.Remove(getStoreKey(id))
if err != nil {
return fmt.Errorf("error while removing the state for ID %s: %w", id, err)
}
}
}

return nil
}

func (s *states) Close() {
s.storeLock.Lock()
s.store.Close()
s.storeLock.Unlock()
}

// getStoreKey is a helper to generate the key used by underlying persistent storage
func getStoreKey(stateID string) string {
return awsS3ObjectStatePrefix + stateID
}

// loadS3StatesFromRegistry loads a copy of the registry states.
// If prefix is set, entries will match the provided prefix(including empty prefix)
func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]state, error) {
stateTable := map[string]state{}
func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]*state, error) {
stateTable := map[string]*state{}
err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
if !strings.HasPrefix(key, awsS3ObjectStatePrefix) {
return true, nil
Expand All @@ -117,9 +149,8 @@ func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix

// filter based on prefix and add entry to local copy
if strings.HasPrefix(st.Key, prefix) {
stateTable[st.ID()] = st
stateTable[st.ID()] = &st
}

return true, nil
})
if err != nil {
Expand Down
96 changes: 84 additions & 12 deletions x-pack/filebeat/input/awss3/states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package awss3

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -42,7 +43,7 @@ func (s *testInputStore) CleanupInterval() time.Duration {
func TestStatesAddStateAndIsProcessed(t *testing.T) {
type stateTestCase struct {
// An initialization callback to invoke on the (initially empty) states.
statesEdit func(states *states)
statesEdit func(states *states) error

// The state to call IsProcessed on and the expected result
state state
Expand All @@ -62,42 +63,42 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
expectedIsProcessed: false,
},
"not existing state": {
statesEdit: func(states *states) {
_ = states.AddState(testState2)
statesEdit: func(states *states) error {
return states.AddState(testState2)
},
state: testState1,
expectedIsProcessed: false,
},
"existing state": {
statesEdit: func(states *states) {
_ = states.AddState(testState1)
statesEdit: func(states *states) error {
return states.AddState(testState1)
},
state: testState1,
expectedIsProcessed: true,
},
"existing stored state is persisted": {
statesEdit: func(states *states) {
statesEdit: func(states *states) error {
state := testState1
state.Stored = true
_ = states.AddState(state)
return states.AddState(state)
},
state: testState1,
shouldReload: true,
expectedIsProcessed: true,
},
"existing failed state is persisted": {
statesEdit: func(states *states) {
statesEdit: func(states *states) error {
state := testState1
state.Failed = true
_ = states.AddState(state)
return states.AddState(state)
},
state: testState1,
shouldReload: true,
expectedIsProcessed: true,
},
"existing unprocessed state is not persisted": {
statesEdit: func(states *states) {
_ = states.AddState(testState1)
statesEdit: func(states *states) error {
return states.AddState(testState1)
},
state: testState1,
shouldReload: true,
Expand All @@ -112,7 +113,8 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
states, err := newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")
if test.statesEdit != nil {
test.statesEdit(states)
err = test.statesEdit(states)
require.NoError(t, err, "states edit must succeed")
}
if test.shouldReload {
states, err = newStates(nil, store, "")
Expand All @@ -125,6 +127,76 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
}
}

func TestStatesCleanUp(t *testing.T) {
bucketName := "test-bucket"
lModifiedTime := time.Unix(0, 0)
stateA := newState(bucketName, "a", "a-etag", lModifiedTime)
stateB := newState(bucketName, "b", "b-etag", lModifiedTime)
stateC := newState(bucketName, "c", "c-etag", lModifiedTime)

tests := []struct {
name string
initStates []state
knownIDs []string
expectIDs []string
}{
{
name: "No cleanup if not missing from known list",
initStates: []state{stateA, stateB, stateC},
knownIDs: []string{stateA.ID(), stateB.ID(), stateC.ID()},
expectIDs: []string{stateA.ID(), stateB.ID(), stateC.ID()},
},
{
name: "Clean up if missing from known list",
initStates: []state{stateA, stateB, stateC},
knownIDs: []string{stateA.ID()},
expectIDs: []string{stateA.ID()},
},
{
name: "Clean up everything",
initStates: []state{stateA, stateC}, // given A, C
knownIDs: []string{stateB.ID()}, // but known B
expectIDs: []string{}, // empty state & store
Kavindu-Dodan marked this conversation as resolved.
Show resolved Hide resolved
},
{
name: "Empty known IDs are valid",
initStates: []state{stateA}, // given A
knownIDs: []string{}, // Known nothing
expectIDs: []string{}, // empty state & store
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := openTestStatestore()
statesInstance, err := newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")

for _, s := range test.initStates {
err := statesInstance.AddState(s)
require.NoError(t, err, "state initialization must succeed")
}

// perform cleanup
err = statesInstance.CleanUp(test.knownIDs)
require.NoError(t, err, "state cleanup must succeed")

// validate
for _, id := range test.expectIDs {
// must be in local state
_, ok := statesInstance.states[id]
require.True(t, ok, fmt.Errorf("expected id %s in state, but got missing", id))

// must be in store
ok, err := statesInstance.store.Has(getStoreKey(id))
require.NoError(t, err, "state has must succeed")
require.True(t, ok, fmt.Errorf("expected id %s in store, but got missing", id))
}
})
}

}

func TestStatesPrefixHandling(t *testing.T) {
logger := logp.NewLogger("state-prefix-testing")

Expand Down
Loading