Skip to content

Commit

Permalink
x-pack/filebeat/input/gcs: allow absent credentials when using GCS wi…
Browse files Browse the repository at this point in the history
…th ADC (#40072)
  • Loading branch information
efd6 authored Jul 3, 2024
1 parent 8d1d59c commit 5225e0d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]

*Auditbeat*

Expand Down
36 changes: 35 additions & 1 deletion x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@
package gcs

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"time"

"cloud.google.com/go/storage"
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common/match"
)

Expand All @@ -17,7 +25,7 @@ type config struct {
// ProjectId - Defines the project id of the concerned gcs bucket in Google Cloud.
ProjectId string `config:"project_id" validate:"required"`
// Auth - Defines the authentication mechanism to be used for accessing the gcs bucket.
Auth authConfig `config:"auth" validate:"required"`
Auth authConfig `config:"auth"`
// MaxWorkers - Defines the maximum number of go routines that will be spawned.
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
// Poll - Defines if polling should be performed on the input bucket source.
Expand Down Expand Up @@ -71,3 +79,29 @@ type fileCredentialsConfig struct {
type jsonCredentialsConfig struct {
AccountKey string `config:"account_key"`
}

func (c authConfig) Validate() error {
// credentials_file
if c.CredentialsFile != nil {
_, err := os.Stat(c.CredentialsFile.Path)
if errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("credentials_file is configured, but the file %q cannot be found", c.CredentialsFile.Path)
} else {
return nil
}
}

// credentials_json
if c.CredentialsJSON != nil && len(c.CredentialsJSON.AccountKey) > 0 {
return nil
}

// Application Default Credentials (ADC)
_, err := google.FindDefaultCredentials(context.Background(), storage.ScopeReadOnly)
if err == nil {
return nil
}

return fmt.Errorf("no authentication credentials were configured or detected " +
"(credentials_file, credentials_json, and application default credentials (ADC))")
}
44 changes: 22 additions & 22 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Test_StorageClient(t *testing.T) {
name: "SingleBucketWithPoll_NoErr",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 2,
"poll": true,
"poll_interval": "5s",
Expand All @@ -70,7 +70,7 @@ func Test_StorageClient(t *testing.T) {
name: "SingleBucketWithoutPoll_NoErr",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 2,
"poll": false,
"poll_interval": "10s",
Expand All @@ -91,7 +91,7 @@ func Test_StorageClient(t *testing.T) {
name: "TwoBucketsWithPoll_NoErr",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 2,
"poll": true,
"poll_interval": "5s",
Expand All @@ -117,7 +117,7 @@ func Test_StorageClient(t *testing.T) {
name: "TwoBucketsWithoutPoll_NoErr",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 2,
"poll": false,
"poll_interval": "10s",
Expand All @@ -143,7 +143,7 @@ func Test_StorageClient(t *testing.T) {
name: "SingleBucketWithPoll_InvalidBucketErr",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 2,
"poll": true,
"poll_interval": "5s",
Expand All @@ -161,7 +161,7 @@ func Test_StorageClient(t *testing.T) {
name: "SingleBucketWithoutPoll_InvalidBucketErr",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 2,
"poll": false,
"poll_interval": "5s",
Expand All @@ -179,7 +179,7 @@ func Test_StorageClient(t *testing.T) {
name: "TwoBucketsWithPoll_InvalidBucketErr",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 2,
"poll": true,
"poll_interval": "5s",
Expand All @@ -200,7 +200,7 @@ func Test_StorageClient(t *testing.T) {
name: "SingleBucketWithPoll_InvalidConfigValue",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 5100,
"poll": true,
"poll_interval": "5s",
Expand All @@ -218,7 +218,7 @@ func Test_StorageClient(t *testing.T) {
name: "TwoBucketWithPoll_InvalidConfigValue",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 5100,
"poll": true,
"poll_interval": "5s",
Expand All @@ -239,7 +239,7 @@ func Test_StorageClient(t *testing.T) {
name: "SingleBucketWithPoll_parseJSON",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -261,7 +261,7 @@ func Test_StorageClient(t *testing.T) {
name: "ReadJSON",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -282,7 +282,7 @@ func Test_StorageClient(t *testing.T) {
name: "ReadOctetStreamJSON",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -302,7 +302,7 @@ func Test_StorageClient(t *testing.T) {
name: "ReadNDJSON",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -322,7 +322,7 @@ func Test_StorageClient(t *testing.T) {
name: "ReadMultilineGzJSON",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -342,7 +342,7 @@ func Test_StorageClient(t *testing.T) {
name: "ReadJSONWithRootAsArray",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -364,7 +364,7 @@ func Test_StorageClient(t *testing.T) {
name: "FilterByTimeStampEpoch",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -385,7 +385,7 @@ func Test_StorageClient(t *testing.T) {
name: "FilterByFileSelectorRegexSingle",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -409,7 +409,7 @@ func Test_StorageClient(t *testing.T) {
name: "FilterByFileSelectorRegexMulti",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand Down Expand Up @@ -437,7 +437,7 @@ func Test_StorageClient(t *testing.T) {
name: "ExpandEventListFromField",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand All @@ -463,7 +463,7 @@ func Test_StorageClient(t *testing.T) {
name: "MultiContainerWithMultiFileSelectors",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand Down Expand Up @@ -496,7 +496,7 @@ func Test_StorageClient(t *testing.T) {
name: "FilterByFileSelectorEmptyRegex",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "/gcs_creds.json",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "5s",
Expand Down Expand Up @@ -536,7 +536,7 @@ func Test_StorageClient(t *testing.T) {
conf := config{}
err := cfg.Unpack(&conf)
if err != nil {
assert.EqualError(t, err, tt.isError.Error())
assert.EqualError(t, err, fmt.Sprint(tt.isError))
return
}
input := newStatelessInput(conf)
Expand Down
Empty file.

0 comments on commit 5225e0d

Please sign in to comment.