Skip to content

Commit

Permalink
[8.16](backport #41869) [AWS] improve S3 input states copy by only st…
Browse files Browse the repository at this point in the history
…oring filtered entries (#41921)

* [AWS] improve S3 input states copy by only storing filtered entries (#41869)

* s3 state imporvement with prefix filtering

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* add changelog entry

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

---------

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
(cherry picked from commit 91070bf)

* Update CHANGELOG.next.asciidoc

---------

Co-authored-by: Kavindu Dodanduwa <Kavindu-Dodan@users.noreply.github.com>
  • Loading branch information
mergify[bot] and Kavindu-Dodan authored Dec 6, 2024
1 parent c8f0db9 commit d0c8b9f
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update CEL mito extensions to v1.12.2. {pull}39755[39755]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
s3API.pagerConstant = newS3PagerConstant(curConfig.BucketListPrefix)
store := openTestStatestore()

states, err := newStates(nil, store)
states, err := newStates(nil, store, "")
assert.NoError(t, err, "states creation should succeed")

s3EventHandlerFactory := newS3ObjectProcessorFactory(metrics, s3API, config.FileSelectors, backupConfig{})
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (in *s3PollerInput) Run(
var err error

// Load the persistent S3 polling state.
in.states, err = newStates(in.log, in.store)
in.states, err = newStates(in.log, in.store, in.config.BucketListPrefix)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestS3Poller(t *testing.T) {
logp.TestingSetup()

const bucket = "bucket"
const listPrefix = "key"
const numberOfWorkers = 5
const pollInterval = 2 * time.Second
const testTimeout = 1 * time.Second
Expand Down Expand Up @@ -127,15 +128,15 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(nil, mockAPI, nil, backupConfig{})
states, err := newStates(nil, store)
states, err := newStates(nil, store, listPrefix)
require.NoError(t, err, "states creation must succeed")
poller := &s3PollerInput{
log: logp.NewLogger(inputName),
config: config{
NumberOfWorkers: numberOfWorkers,
BucketListInterval: pollInterval,
BucketARN: bucket,
BucketListPrefix: "key",
BucketListPrefix: listPrefix,
RegionName: "region",
},
s3: mockAPI,
Expand Down Expand Up @@ -265,15 +266,15 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3, nil, backupConfig{})
states, err := newStates(nil, store)
states, err := newStates(nil, store, listPrefix)
require.NoError(t, err, "states creation must succeed")
poller := &s3PollerInput{
log: logp.NewLogger(inputName),
config: config{
NumberOfWorkers: numberOfWorkers,
BucketListInterval: pollInterval,
BucketARN: bucket,
BucketListPrefix: "key",
BucketListPrefix: listPrefix,
RegionName: "region",
},
s3: mockS3,
Expand Down
28 changes: 22 additions & 6 deletions x-pack/filebeat/input/awss3/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,27 @@ type states struct {
// storeLock must be held to access store.
store *statestore.Store
storeLock sync.Mutex

// Accepted prefixes of state keys of this registry
keyPrefix string
}

// newStates generates a new states registry.
func newStates(log *logp.Logger, stateStore beater.StateStore) (*states, error) {
func newStates(log *logp.Logger, stateStore beater.StateStore, listPrefix string) (*states, error) {
store, err := stateStore.Access()
if err != nil {
return nil, fmt.Errorf("can't access persistent store: %w", err)
}

stateTable, err := loadS3StatesFromRegistry(log, store)
stateTable, err := loadS3StatesFromRegistry(log, store, listPrefix)
if err != nil {
return nil, fmt.Errorf("loading S3 input state: %w", err)
}

return &states{
store: store,
states: stateTable,
store: store,
states: stateTable,
keyPrefix: listPrefix,
}, nil
}

Expand All @@ -57,6 +61,12 @@ func (s *states) IsProcessed(state state) bool {
}

func (s *states) AddState(state state) error {
if !strings.HasPrefix(state.Key, s.keyPrefix) {
// Note - This failure should not happen since we create a dedicated state instance per input.
// Yet, this is here to avoid any wiring errors within the component.
return fmt.Errorf("expected prefix %s in key %s, skipping state registering", s.keyPrefix, state.Key)
}

id := state.ID()
// Update in-memory copy
s.statesLock.Lock()
Expand All @@ -79,7 +89,9 @@ func (s *states) Close() {
s.storeLock.Unlock()
}

func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store) (map[string]state, error) {
// loadS3StatesFromRegistry loads a copy of the registry states.
// If prefix is set, entries will match the provided prefix(including empty prefix)
func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]state, error) {
stateTable := map[string]state{}
err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
if !strings.HasPrefix(key, awsS3ObjectStatePrefix) {
Expand All @@ -103,7 +115,11 @@ func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store) (map[st
return true, nil
}

stateTable[st.ID()] = st
// filter based on prefix and add entry to local copy
if strings.HasPrefix(st.Key, prefix) {
stateTable[st.ID()] = st
}

return true, nil
})
if err != nil {
Expand Down
89 changes: 82 additions & 7 deletions x-pack/filebeat/input/awss3/states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -62,14 +63,14 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
},
"not existing state": {
statesEdit: func(states *states) {
states.AddState(testState2)
_ = states.AddState(testState2)
},
state: testState1,
expectedIsProcessed: false,
},
"existing state": {
statesEdit: func(states *states) {
states.AddState(testState1)
_ = states.AddState(testState1)
},
state: testState1,
expectedIsProcessed: true,
Expand All @@ -78,7 +79,7 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
statesEdit: func(states *states) {
state := testState1
state.Stored = true
states.AddState(state)
_ = states.AddState(state)
},
state: testState1,
shouldReload: true,
Expand All @@ -88,15 +89,15 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
statesEdit: func(states *states) {
state := testState1
state.Failed = true
states.AddState(state)
_ = states.AddState(state)
},
state: testState1,
shouldReload: true,
expectedIsProcessed: true,
},
"existing unprocessed state is not persisted": {
statesEdit: func(states *states) {
states.AddState(testState1)
_ = states.AddState(testState1)
},
state: testState1,
shouldReload: true,
Expand All @@ -108,13 +109,13 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
store := openTestStatestore()
states, err := newStates(nil, store)
states, err := newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")
if test.statesEdit != nil {
test.statesEdit(states)
}
if test.shouldReload {
states, err = newStates(nil, store)
states, err = newStates(nil, store, "")
require.NoError(t, err, "states creation must succeed")
}

Expand All @@ -123,3 +124,77 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) {
})
}
}

func TestStatesPrefixHandling(t *testing.T) {
logger := logp.NewLogger("state-prefix-testing")

t.Run("if prefix was set, accept only states with prefix", func(t *testing.T) {
// given
registry := openTestStatestore()

// when - registry with prefix
st, err := newStates(logger, registry, "staging-")
require.NoError(t, err)

// then - fail for non prefixed
err = st.AddState(newState("bucket", "production-logA", "etag", time.Now()))
require.Error(t, err)

// then - pass for correctly prefixed
err = st.AddState(newState("bucket", "staging-logA", "etag", time.Now()))
require.NoError(t, err)
})

t.Run("states store only load entries matching the given prefix", func(t *testing.T) {
// given
registry := openTestStatestore()

sA := newState("bucket", "A", "etag", time.Unix(1733221244, 0))
sA.Stored = true
sStagingA := newState("bucket", "staging-A", "etag", time.Unix(1733224844, 0))
sStagingA.Stored = true
sProdB := newState("bucket", "production/B", "etag", time.Unix(1733228444, 0))
sProdB.Stored = true
sSpace := newState("bucket", " B", "etag", time.Unix(1733230444, 0))
sSpace.Stored = true

// add various states first with no prefix
st, err := newStates(logger, registry, "")
require.NoError(t, err)

_ = st.AddState(sA)
_ = st.AddState(sStagingA)
_ = st.AddState(sProdB)
_ = st.AddState(sSpace)

// Reload states and validate

// when - no prefix reload
stNoPrefix, err := newStates(logger, registry, "")
require.NoError(t, err)

require.True(t, stNoPrefix.IsProcessed(sA))
require.True(t, stNoPrefix.IsProcessed(sStagingA))
require.True(t, stNoPrefix.IsProcessed(sProdB))
require.True(t, stNoPrefix.IsProcessed(sSpace))

// when - with prefix `staging-`
st, err = newStates(logger, registry, "staging-")
require.NoError(t, err)

require.False(t, st.IsProcessed(sA))
require.True(t, st.IsProcessed(sStagingA))
require.False(t, st.IsProcessed(sProdB))
require.False(t, st.IsProcessed(sSpace))

// when - with prefix `production/`
st, err = newStates(logger, registry, "production/")
require.NoError(t, err)

require.False(t, st.IsProcessed(sA))
require.False(t, st.IsProcessed(sStagingA))
require.True(t, st.IsProcessed(sProdB))
require.False(t, st.IsProcessed(sSpace))
})

}

0 comments on commit d0c8b9f

Please sign in to comment.