Skip to content

Commit

Permalink
registry cleanup for unavailable objects (#41694) (#41937)
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

# Conflicts:
#	x-pack/filebeat/input/awss3/states.go
#	x-pack/filebeat/input/awss3/states_test.go
(cherry picked from commit 583d345)

Co-authored-by: Kavindu Dodanduwa <Kavindu-Dodan@users.noreply.github.com>
  • Loading branch information
mergify[bot] and Kavindu-Dodan authored Dec 6, 2024
1 parent 2ad17aa commit 72be7da
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]

*Auditbeat*

Expand Down
24 changes: 21 additions & 3 deletions x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,19 @@ func (in *s3PollerInput) runPoll(ctx context.Context) {
}

// Start reading data and wait for its processing to be done
in.readerLoop(ctx, workChan)
ids, ok := in.readerLoop(ctx, workChan)
workerWg.Wait()

if !ok {
in.log.Warn("skipping state registry cleanup as object reading ended with a non-ok return")
return
}

// Perform state cleanup operation
err := in.states.CleanUp(ids)
if err != nil {
in.log.Errorf("failed to cleanup states: %v", err.Error())
}
}

func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) {
Expand Down Expand Up @@ -183,7 +194,10 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state)
}
}

func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) {
// readerLoop performs the S3 object listing and emit state to work listeners if object needs to be processed.
// Returns all tracked state IDs correlates to all tracked S3 objects iff listing is successful.
// These IDs are intended to be used for state clean-up.
func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) (knownStateIDSlice []string, ok bool) {
defer close(workChan)

bucketName := getBucketNameFromARN(in.config.getBucketARN())
Expand All @@ -202,7 +216,7 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
circuitBreaker++
if circuitBreaker >= readerLoopMaxCircuitBreaker {
in.log.Warnw(fmt.Sprintf("%d consecutive error when paginating listing, breaking the circuit.", circuitBreaker), "error", err)
break
return nil, false
}
}
// add a backoff delay and try again
Expand All @@ -219,6 +233,8 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
in.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects))
for _, object := range page.Contents {
state := newState(bucketName, *object.Key, *object.ETag, *object.LastModified)
knownStateIDSlice = append(knownStateIDSlice, state.ID())

if in.states.IsProcessed(state) {
in.log.Debugw("skipping state.", "state", state)
continue
Expand All @@ -229,6 +245,8 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
in.metrics.s3ObjectsProcessedTotal.Inc()
}
}

return knownStateIDSlice, true
}

func (in *s3PollerInput) s3EventForState(state state) s3EventV2 {
Expand Down
47 changes: 39 additions & 8 deletions x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const awsS3ObjectStatePrefix = "filebeat::aws-s3::state::"
type states struct {
// Completed S3 object states, indexed by state ID.
// statesLock must be held to access states.
states map[string]state
states map[string]*state
statesLock sync.Mutex

// The store used to persist state changes to the registry.
Expand Down Expand Up @@ -70,29 +70,61 @@ func (s *states) AddState(state state) error {
id := state.ID()
// Update in-memory copy
s.statesLock.Lock()
s.states[id] = state
s.states[id] = &state
s.statesLock.Unlock()

// Persist to the registry
s.storeLock.Lock()
defer s.storeLock.Unlock()
key := awsS3ObjectStatePrefix + id
if err := s.store.Set(key, state); err != nil {
if err := s.store.Set(getStoreKey(id), state); err != nil {
return err
}
return nil
}

// CleanUp performs state and store cleanup based on provided knownIDs.
// knownIDs must contain valid currently tracked state IDs that must be known by this state registry.
// State and underlying storage will be cleaned if ID is no longer present in knownIDs set.
func (s *states) CleanUp(knownIDs []string) error {
knownIDHashSet := map[string]struct{}{}
for _, id := range knownIDs {
knownIDHashSet[id] = struct{}{}
}

s.storeLock.Lock()
defer s.storeLock.Unlock()
s.statesLock.Lock()
defer s.statesLock.Unlock()

for id := range s.states {
if _, contains := knownIDHashSet[id]; !contains {
// remove from sate & store as ID is no longer seen in known ID set
delete(s.states, id)
err := s.store.Remove(getStoreKey(id))
if err != nil {
return fmt.Errorf("error while removing the state for ID %s: %w", id, err)
}
}
}

return nil
}

func (s *states) Close() {
s.storeLock.Lock()
s.store.Close()
s.storeLock.Unlock()
}

// getStoreKey is a helper to generate the key used by underlying persistent storage
func getStoreKey(stateID string) string {
return awsS3ObjectStatePrefix + stateID
}

// loadS3StatesFromRegistry loads a copy of the registry states.
// If prefix is set, entries will match the provided prefix(including empty prefix)
func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]state, error) {
stateTable := map[string]state{}
func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]*state, error) {
stateTable := map[string]*state{}
err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
if !strings.HasPrefix(key, awsS3ObjectStatePrefix) {
return true, nil
Expand All @@ -117,9 +149,8 @@ func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix

// filter based on prefix and add entry to local copy
if strings.HasPrefix(st.Key, prefix) {
stateTable[st.ID()] = st
stateTable[st.ID()] = &st
}

return true, nil
})
if err != nil {
Expand Down
96 changes: 84 additions & 12 deletions x-pack/filebeat/input/awss3/states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package awss3

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -42,7 +43,7 @@ func (s *testInputStore) CleanupInterval() time.Duration {
func TestStatesAddStateAndIsProcessed(t *testing.T) {
type stateTestCase struct {
// An initialization callback to invoke on the (initially empty) states.
statesEdit func(states *states)
statesEdit func(states *states) error

// The state to call IsProcessed on and the expected result
state state
Expand All @@ -62,42 +63,42 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
expectedIsProcessed: false,
},
"not existing state": {
statesEdit: func(states *states) {
_ = states.AddState(testState2)
statesEdit: func(states *states) error {
return states.AddState(testState2)
},
state: testState1,
expectedIsProcessed: false,
},
"existing state": {
statesEdit: func(states *states) {
_ = states.AddState(testState1)
statesEdit: func(states *states) error {
return states.AddState(testState1)
},
state: testState1,
expectedIsProcessed: true,
},
"existing stored state is persisted": {
statesEdit: func(states *states) {
statesEdit: func(states *states) error {
state := testState1
state.Stored = true
_ = states.AddState(state)
return states.AddState(state)
},
state: testState1,
shouldReload: true,
expectedIsProcessed: true,
},
"existing failed state is persisted": {
statesEdit: func(states *states) {
statesEdit: func(states *states) error {
state := testState1
state.Failed = true
_ = states.AddState(state)
return states.AddState(state)
},
state: testState1,
shouldReload: true,
expectedIsProcessed: true,
},
"existing unprocessed state is not persisted": {
statesEdit: func(states *states) {
_ = states.AddState(testState1)
statesEdit: func(states *states) error {
return states.AddState(testState1)
},
state: testState1,
shouldReload: true,
Expand All @@ -112,7 +113,8 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
states, err := newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")
if test.statesEdit != nil {
test.statesEdit(states)
err = test.statesEdit(states)
require.NoError(t, err, "states edit must succeed")
}
if test.shouldReload {
states, err = newStates(nil, store, "")
Expand All @@ -125,6 +127,76 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
}
}

func TestStatesCleanUp(t *testing.T) {
bucketName := "test-bucket"
lModifiedTime := time.Unix(0, 0)
stateA := newState(bucketName, "a", "a-etag", lModifiedTime)
stateB := newState(bucketName, "b", "b-etag", lModifiedTime)
stateC := newState(bucketName, "c", "c-etag", lModifiedTime)

tests := []struct {
name string
initStates []state
knownIDs []string
expectIDs []string
}{
{
name: "No cleanup if not missing from known list",
initStates: []state{stateA, stateB, stateC},
knownIDs: []string{stateA.ID(), stateB.ID(), stateC.ID()},
expectIDs: []string{stateA.ID(), stateB.ID(), stateC.ID()},
},
{
name: "Clean up if missing from known list",
initStates: []state{stateA, stateB, stateC},
knownIDs: []string{stateA.ID()},
expectIDs: []string{stateA.ID()},
},
{
name: "Clean up everything",
initStates: []state{stateA, stateC}, // given A, C
knownIDs: []string{stateB.ID()}, // but known B
expectIDs: []string{}, // empty state & store
},
{
name: "Empty known IDs are valid",
initStates: []state{stateA}, // given A
knownIDs: []string{}, // Known nothing
expectIDs: []string{}, // empty state & store
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := openTestStatestore()
statesInstance, err := newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")

for _, s := range test.initStates {
err := statesInstance.AddState(s)
require.NoError(t, err, "state initialization must succeed")
}

// perform cleanup
err = statesInstance.CleanUp(test.knownIDs)
require.NoError(t, err, "state cleanup must succeed")

// validate
for _, id := range test.expectIDs {
// must be in local state
_, ok := statesInstance.states[id]
require.True(t, ok, fmt.Errorf("expected id %s in state, but got missing", id))

// must be in store
ok, err := statesInstance.store.Has(getStoreKey(id))
require.NoError(t, err, "state has must succeed")
require.True(t, ok, fmt.Errorf("expected id %s in store, but got missing", id))
}
})
}

}

func TestStatesPrefixHandling(t *testing.T) {
logger := logp.NewLogger("state-prefix-testing")

Expand Down

0 comments on commit 72be7da

Please sign in to comment.