From b6ffcee62146e16741458970bad27d7e0de69935 Mon Sep 17 00:00:00 2001 From: Marco Dinis Date: Mon, 6 Jan 2025 10:51:36 +0000 Subject: [PATCH] DiscoveryConfigStatus: update even when no resource is found (#50433) * DiscoveryConfigStatus: update even when no resource is found During Auto Discover, when using a DiscoveryConfig, if no resources are found, the DiscoveryConfigStatus is not updated accordingly. This PR ensures that, even when no resources are found, the status will report so. * use comparable instead of any for generic method * remove useless un-named function on PreFetchHooks * prevent call to ssm:SendCommand with 0 instances * rename var from ok to found --- lib/srv/discovery/database_watcher.go | 46 ++++++++++++----- lib/srv/discovery/discovery.go | 49 +++++++++++++------ lib/srv/discovery/discovery_test.go | 45 +++++++++++++++++ lib/srv/discovery/kube_integration_watcher.go | 45 ++++++++++++----- lib/srv/discovery/status.go | 9 ++++ lib/srv/server/azure_watcher.go | 9 ++++ lib/srv/server/ec2_watcher.go | 6 +++ lib/srv/server/gcp_watcher.go | 22 ++++++--- lib/srv/server/ssm_install.go | 4 ++ lib/srv/server/watcher.go | 3 ++ lib/utils/slices/slices.go | 19 ++++--- lib/utils/slices/slices_test.go | 22 +++++++++ 12 files changed, 222 insertions(+), 57 deletions(-) diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index 0ce0c6ef8ef42..b14332a8f9bb4 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -74,17 +74,7 @@ func (s *Server) startDatabaseWatchers() error { TriggerFetchC: s.newDiscoveryConfigChangedSub(), Origin: types.OriginCloud, Clock: s.clock, - PreFetchHookFn: func() { - discoveryConfigs := slices.FilterMapUnique( - s.getAllDatabaseFetchers(), - func(f common.Fetcher) (s string, include bool) { - return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != "" - }, - ) - s.updateDiscoveryConfigStatus(discoveryConfigs...) - - s.awsRDSResourcesStatus.reset() - }, + PreFetchHookFn: s.databaseWatcherIterationStarted, }, ) if err != nil { @@ -151,6 +141,38 @@ func (s *Server) startDatabaseWatchers() error { return nil } +func (s *Server) databaseWatcherIterationStarted() { + allFetchers := s.getAllDatabaseFetchers() + if len(allFetchers) == 0 { + return + } + + s.submitFetchersEvent(allFetchers) + + awsResultGroups := slices.FilterMapUnique( + allFetchers, + func(f common.Fetcher) (awsResourceGroup, bool) { + include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != "" + resourceGroup := awsResourceGroup{ + discoveryConfigName: f.GetDiscoveryConfigName(), + integration: f.IntegrationName(), + } + return resourceGroup, include + }, + ) + + for _, g := range awsResultGroups { + s.awsRDSResourcesStatus.iterationStarted(g) + } + + discoveryConfigs := slices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) { + return g.discoveryConfigName, true + }) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + + s.awsRDSResourcesStatus.reset() +} + func (s *Server) getAllDatabaseFetchers() []common.Fetcher { allFetchers := make([]common.Fetcher, 0, len(s.databaseFetchers)) @@ -162,8 +184,6 @@ func (s *Server) getAllDatabaseFetchers() []common.Fetcher { allFetchers = append(allFetchers, s.databaseFetchers...) - s.submitFetchersEvent(allFetchers) - return allFetchers } diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index ffb4a76353f59..36a775a6ccec3 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -523,18 +523,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { s.ctx, s.getAllAWSServerFetchers, s.caRotationCh, server.WithPollInterval(s.PollInterval), server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), - server.WithPreFetchHookFn(func() { - discoveryConfigs := libslices.FilterMapUnique( - s.getAllAWSServerFetchers(), - func(f server.Fetcher) (s string, include bool) { - return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != "" - }, - ) - s.updateDiscoveryConfigStatus(discoveryConfigs...) - - s.awsEC2ResourcesStatus.reset() - s.awsEC2Tasks.reset() - }), + server.WithPreFetchHookFn(s.ec2WatcherIterationStarted), ) if err != nil { return trace.Wrap(err) @@ -575,6 +564,38 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { return nil } +func (s *Server) ec2WatcherIterationStarted() { + allFetchers := s.getAllAWSServerFetchers() + if len(allFetchers) == 0 { + return + } + + s.submitFetchEvent(types.CloudAWS, types.AWSMatcherEC2) + + awsResultGroups := libslices.FilterMapUnique( + allFetchers, + func(f server.Fetcher) (awsResourceGroup, bool) { + include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != "" + resourceGroup := awsResourceGroup{ + discoveryConfigName: f.GetDiscoveryConfigName(), + integration: f.IntegrationName(), + } + return resourceGroup, include + }, + ) + for _, g := range awsResultGroups { + s.awsEC2ResourcesStatus.iterationStarted(g) + } + + discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) { + return g.discoveryConfigName, true + }) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + s.awsEC2ResourcesStatus.reset() + + s.awsEC2Tasks.reset() +} + func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error { if len(matchers) == 0 { return nil @@ -1483,10 +1504,6 @@ func (s *Server) getAllAWSServerFetchers() []server.Fetcher { allFetchers = append(allFetchers, s.staticServerAWSFetchers...) - if len(allFetchers) > 0 { - s.submitFetchEvent(types.CloudAWS, types.AWSMatcherEC2) - } - return allFetchers } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 4bf7685e3cca5..79d63fbaf8502 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -285,6 +285,26 @@ func TestDiscoveryServer(t *testing.T) { ) require.NoError(t, err) + dcForEC2StatusWithoutMatchName := uuid.NewString() + dcForEC2StatusWithoutMatch, err := discoveryconfig.NewDiscoveryConfig( + header.Metadata{Name: dcForEC2StatusWithoutMatchName}, + discoveryconfig.Spec{ + DiscoveryGroup: defaultDiscoveryGroup, + AWS: []types.AWSMatcher{{ + Types: []string{"ec2"}, + Regions: []string{"eu-central-1"}, + Tags: map[string]utils.Strings{"teleport": {"yes"}}, + SSM: &types.AWSSSM{DocumentName: "document"}, + Params: &types.InstallerParams{ + InstallTeleport: true, + EnrollMode: types.InstallParamEnrollMode_INSTALL_PARAM_ENROLL_MODE_SCRIPT, + }, + Integration: "my-integration", + }}, + }, + ) + require.NoError(t, err) + discoveryConfigForUserTaskEKSTestName := uuid.NewString() discoveryConfigForUserTaskEKSTest, err := discoveryconfig.NewDiscoveryConfig( header.Metadata{Name: discoveryConfigForUserTaskEKSTestName}, @@ -573,6 +593,31 @@ func TestDiscoveryServer(t *testing.T) { }, wantInstalledInstances: []string{"instance-id-1"}, }, + { + name: "no nodes found using DiscoveryConfig and Integration, but DiscoveryConfig Status is still updated", + presentInstances: []types.Server{}, + foundEC2Instances: []ec2types.Instance{}, + ssm: &mockSSMClient{}, + emitter: &mockEmitter{}, + staticMatchers: Matchers{}, + discoveryConfig: dcForEC2StatusWithoutMatch, + wantDiscoveryConfigStatus: &discoveryconfig.Status{ + State: "DISCOVERY_CONFIG_STATE_SYNCING", + ErrorMessage: nil, + DiscoveredResources: 0, + LastSyncTime: fakeClock.Now().UTC(), + IntegrationDiscoveredResources: map[string]*discoveryconfigv1.IntegrationDiscoveredSummary{ + "my-integration": { + AwsEc2: &discoveryconfigv1.ResourcesDiscoveredSummary{ + Found: 0, + Enrolled: 0, + Failed: 0, + }, + }, + }, + }, + wantInstalledInstances: []string{}, + }, { name: "one node found but SSM Run fails and DiscoverEC2 User Task is created", presentInstances: []types.Server{}, diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index 859dd13a11949..ffbecf6497359 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -76,18 +76,7 @@ func (s *Server) startKubeIntegrationWatchers() error { Interval: s.PollInterval, Origin: types.OriginCloud, TriggerFetchC: s.newDiscoveryConfigChangedSub(), - PreFetchHookFn: func() { - discoveryConfigs := libslices.FilterMapUnique( - s.getKubeIntegrationFetchers(), - func(f common.Fetcher) (s string, include bool) { - return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != "" - }, - ) - s.updateDiscoveryConfigStatus(discoveryConfigs...) - - s.awsEKSResourcesStatus.reset() - s.awsEKSTasks.reset() - }, + PreFetchHookFn: s.kubernetesIntegrationWatcherIterationStarted, }) if err != nil { return trace.Wrap(err) @@ -194,6 +183,38 @@ func (s *Server) startKubeIntegrationWatchers() error { return nil } +func (s *Server) kubernetesIntegrationWatcherIterationStarted() { + allFetchers := s.getKubeIntegrationFetchers() + if len(allFetchers) == 0 { + return + } + + s.submitFetchersEvent(allFetchers) + + awsResultGroups := libslices.FilterMapUnique( + allFetchers, + func(f common.Fetcher) (awsResourceGroup, bool) { + include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != "" + resourceGroup := awsResourceGroup{ + discoveryConfigName: f.GetDiscoveryConfigName(), + integration: f.IntegrationName(), + } + return resourceGroup, include + }, + ) + for _, g := range awsResultGroups { + s.awsEKSResourcesStatus.iterationStarted(g) + } + + discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) { + return g.discoveryConfigName, true + }) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + + s.awsEKSResourcesStatus.reset() + s.awsEKSTasks.reset() +} + func (s *Server) enrollEKSClusters(region, integration, discoveryConfigName string, clusters []types.DiscoveredEKSCluster, agentVersion string, mu *sync.Mutex, enrollingClusters map[string]bool) { mu.Lock() for _, c := range clusters { diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index 2d168c5aea776..2647ce047b07b 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -296,6 +296,15 @@ func (ars *awsResourcesStatus) incrementFailed(g awsResourceGroup, count int) { ars.awsResourcesResults[g] = groupStats } +func (ars *awsResourcesStatus) iterationStarted(g awsResourceGroup) { + ars.mu.Lock() + defer ars.mu.Unlock() + if ars.awsResourcesResults == nil { + ars.awsResourcesResults = make(map[awsResourceGroup]awsResourceGroupResult) + } + ars.awsResourcesResults[g] = awsResourceGroupResult{} +} + func (ars *awsResourcesStatus) incrementFound(g awsResourceGroup, count int) { ars.mu.Lock() defer ars.mu.Unlock() diff --git a/lib/srv/server/azure_watcher.go b/lib/srv/server/azure_watcher.go index fda04125ae7e1..fb1110247dc0f 100644 --- a/lib/srv/server/azure_watcher.go +++ b/lib/srv/server/azure_watcher.go @@ -121,6 +121,7 @@ type azureFetcherConfig struct { ResourceGroup string AzureClientGetter azureClientGetter DiscoveryConfigName string + Integration string } type azureInstanceFetcher struct { @@ -132,6 +133,7 @@ type azureInstanceFetcher struct { Parameters map[string]string ClientID string DiscoveryConfigName string + Integration string } func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher { @@ -142,6 +144,7 @@ func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher { ResourceGroup: cfg.ResourceGroup, Labels: cfg.Matcher.ResourceTags, DiscoveryConfigName: cfg.DiscoveryConfigName, + Integration: cfg.Integration, } if cfg.Matcher.Params != nil { @@ -164,6 +167,12 @@ func (f *azureInstanceFetcher) GetDiscoveryConfigName() string { return f.DiscoveryConfigName } +// IntegrationName identifies the integration name whose credentials were used to fetch the resources. +// Might be empty when the fetcher is using ambient credentials. +func (f *azureInstanceFetcher) IntegrationName() string { + return f.Integration +} + // GetInstances fetches all Azure virtual machines matching configured filters. func (f *azureInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) { client, err := f.AzureClientGetter.GetAzureVirtualMachinesClient(f.Subscription) diff --git a/lib/srv/server/ec2_watcher.go b/lib/srv/server/ec2_watcher.go index cf3bb13a62367..1f81fb3d6952a 100644 --- a/lib/srv/server/ec2_watcher.go +++ b/lib/srv/server/ec2_watcher.go @@ -461,3 +461,9 @@ func (f *ec2InstanceFetcher) GetInstances(ctx context.Context, rotation bool) ([ func (f *ec2InstanceFetcher) GetDiscoveryConfigName() string { return f.DiscoveryConfigName } + +// IntegrationName identifies the integration name whose credentials were used to fetch the resources. +// Might be empty when the fetcher is using ambient credentials. +func (f *ec2InstanceFetcher) IntegrationName() string { + return f.Integration +} diff --git a/lib/srv/server/gcp_watcher.go b/lib/srv/server/gcp_watcher.go index 4b3ddca5ebb98..e3cf33c591d49 100644 --- a/lib/srv/server/gcp_watcher.go +++ b/lib/srv/server/gcp_watcher.go @@ -111,6 +111,7 @@ type gcpFetcherConfig struct { GCPClient gcp.InstancesClient projectsClient gcp.ProjectsClient DiscoveryConfigName string + Integration string } type gcpInstanceFetcher struct { @@ -123,16 +124,19 @@ type gcpInstanceFetcher struct { Parameters map[string]string projectsClient gcp.ProjectsClient DiscoveryConfigName string + Integration string } func newGCPInstanceFetcher(cfg gcpFetcherConfig) *gcpInstanceFetcher { fetcher := &gcpInstanceFetcher{ - GCP: cfg.GCPClient, - Zones: cfg.Matcher.Locations, - ProjectIDs: cfg.Matcher.ProjectIDs, - ServiceAccounts: cfg.Matcher.ServiceAccounts, - Labels: cfg.Matcher.GetLabels(), - projectsClient: cfg.projectsClient, + GCP: cfg.GCPClient, + Zones: cfg.Matcher.Locations, + ProjectIDs: cfg.Matcher.ProjectIDs, + ServiceAccounts: cfg.Matcher.ServiceAccounts, + Labels: cfg.Matcher.GetLabels(), + projectsClient: cfg.projectsClient, + Integration: cfg.Integration, + DiscoveryConfigName: cfg.DiscoveryConfigName, } if cfg.Matcher.Params != nil { fetcher.Parameters = map[string]string{ @@ -152,6 +156,12 @@ func (f *gcpInstanceFetcher) GetDiscoveryConfigName() string { return f.DiscoveryConfigName } +// IntegrationName identifies the integration name whose credentials were used to fetch the resources. +// Might be empty when the fetcher is using ambient credentials. +func (f *gcpInstanceFetcher) IntegrationName() string { + return f.Integration +} + // GetInstances fetches all GCP virtual machines matching configured filters. func (f *gcpInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) { // Key by project ID, then by zone. diff --git a/lib/srv/server/ssm_install.go b/lib/srv/server/ssm_install.go index 259da98b246e9..6c977a4b38be1 100644 --- a/lib/srv/server/ssm_install.go +++ b/lib/srv/server/ssm_install.go @@ -199,6 +199,10 @@ func (si *SSMInstaller) Run(ctx context.Context, req SSMRunRequest) error { validInstances = instancesState.valid } + if len(validInstances) == 0 { + return nil + } + validInstanceIDs := instanceIDsFrom(validInstances) output, err := req.SSM.SendCommand(ctx, &ssm.SendCommandInput{ DocumentName: aws.String(req.DocumentName), diff --git a/lib/srv/server/watcher.go b/lib/srv/server/watcher.go index 436cd0f128cbc..5b10097b0e045 100644 --- a/lib/srv/server/watcher.go +++ b/lib/srv/server/watcher.go @@ -45,6 +45,9 @@ type Fetcher interface { // GetDiscoveryConfigName returns the DiscoveryConfig name that created this fetcher. // Empty for Fetchers created from `teleport.yaml/discovery_service.aws.` matchers. GetDiscoveryConfigName() string + // IntegrationName identifies the integration name whose credentials were used to fetch the resources. + // Might be empty when the fetcher is using ambient credentials. + IntegrationName() string } // WithTriggerFetchC sets a poll trigger to manual start a resource polling. diff --git a/lib/utils/slices/slices.go b/lib/utils/slices/slices.go index 65fbe425b43b6..3c33c0baf8710 100644 --- a/lib/utils/slices/slices.go +++ b/lib/utils/slices/slices.go @@ -18,23 +18,22 @@ package slices -import ( - "cmp" - "slices" -) - // FilterMapUnique applies a function to all elements of a slice and collects them. // The function returns the value to collect and whether the current element should be included. -// Returned values are sorted and deduplicated. -func FilterMapUnique[T any, S cmp.Ordered](ts []T, fn func(T) (s S, include bool)) []S { +// Returned values are deduplicated. +func FilterMapUnique[T any, S comparable](ts []T, fn func(T) (s S, include bool)) []S { ss := make([]S, 0, len(ts)) + seen := make(map[S]struct{}, len(ts)) for _, t := range ts { if s, include := fn(t); include { - ss = append(ss, s) + if _, found := seen[s]; !found { + seen[s] = struct{}{} + ss = append(ss, s) + } } } - slices.Sort(ss) - return slices.Compact(ss) + + return ss } // ToPointers converts a slice of values to a slice of pointers to those values diff --git a/lib/utils/slices/slices_test.go b/lib/utils/slices/slices_test.go index 6591880c0ff27..1f031d21eca49 100644 --- a/lib/utils/slices/slices_test.go +++ b/lib/utils/slices/slices_test.go @@ -19,11 +19,16 @@ package slices import ( + "strings" "testing" "github.com/stretchr/testify/require" ) +type aType struct { + fieldA string +} + func TestFilterMapUnique(t *testing.T) { for _, tt := range []struct { name string @@ -69,4 +74,21 @@ func TestFilterMapUnique(t *testing.T) { require.Equal(t, tt.expected, got) }) } + + t.Run("structs", func(t *testing.T) { + input := []aType{ + {"+a"}, + {"+b"}, + {"+b"}, + {"b"}, + {"z"}, + } + withPlusPrefix := func(a aType) (string, bool) { + return a.fieldA, strings.HasPrefix(a.fieldA, "+") + } + got := FilterMapUnique(input, withPlusPrefix) + + expected := []string{"+a", "+b"} + require.Equal(t, expected, got) + }) }