diff --git a/lib/services/local/events.go b/lib/services/local/events.go index ee3f9eac64d35..3f45da90451cc 100644 --- a/lib/services/local/events.go +++ b/lib/services/local/events.go @@ -32,6 +32,7 @@ import ( "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/services/local/generic" ) // EventsService implements service to watch for events @@ -192,6 +193,8 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type parser = newAccessListReviewParser() case types.KindKubeWaitingContainer: parser = newKubeWaitingContainerParser() + case types.KindInstance: + parser = newInstanceParser() default: if watch.AllowPartialSuccess { continue @@ -1941,6 +1944,31 @@ func (p *kubeWaitingContainerParser) parse(event backend.Event) (types.Resource, } } +func newInstanceParser() *instanceParser { + return &instanceParser{ + baseParser: newBaseParser(backend.Key(instancePrefix)), + } +} + +type instanceParser struct { + baseParser +} + +func (p *instanceParser) parse(event backend.Event) (types.Resource, error) { + switch event.Type { + case types.OpDelete: + return resourceHeader(event, types.KindInstance, types.V1, 0) + case types.OpPut: + instance, err := generic.FastUnmarshal[*types.InstanceV1](event.Item) + if err != nil { + return nil, trace.Wrap(err) + } + return instance, nil + default: + return nil, trace.BadParameter("event %v is not supported", event.Type) + } +} + func resourceHeader(event backend.Event, kind, version string, offset int) (types.Resource, error) { name, err := base(event.Item.Key, offset) if err != nil { diff --git a/lib/services/local/generic/helpers.go b/lib/services/local/generic/helpers.go new file mode 100644 index 0000000000000..e681f21deb795 --- /dev/null +++ b/lib/services/local/generic/helpers.go @@ -0,0 +1,73 @@ +/* + * Teleport + * Copyright (C) 2023 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package generic + +import ( + "time" + + "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/utils" +) + +// UnmarshalableResource represents a resource that can have all the necessary fields from backend.Item +// set in a generic context. +type UnmarshalableResource interface { + SetExpiry(time.Time) + SetRevision(string) +} + +// FastUnmarshal is a generic helper used to unmarshal a resoruce from a backend.Item and +// set the Expiry and Revision fields. It isn't compatible with the standard Unmarshal function +// signature used elsewhere and therefore may not be the best choice for all use cases, but it +// has the benefit of being simpler to use and not requiring the caller to undergo the revision/expiry +// ceremony at each call site. +func FastUnmarshal[T UnmarshalableResource](item backend.Item) (T, error) { + var r T + if err := utils.FastUnmarshal(item.Value, &r); err != nil { + return r, err + } + + r.SetExpiry(item.Expires) + r.SetRevision(item.Revision) + return r, nil +} + +type MarshalableResource interface { + Expiry() time.Time + GetRevision() string +} + +// FastMarshal is a generic helper used to marshal a resource to a backend.Item and +// set the Expiry and Revision fields. It isn't compatible with the standard Marshal function +// signature used elsewhere and therefore may not be the best choice for all use cases, but it +// has the benefit of being simpler to use and not requiring the caller to undergo the revision/expiry +// ceremony at each call site. +func FastMarshal[T MarshalableResource](key []byte, r T) (backend.Item, error) { + value, err := utils.FastMarshal(r) + if err != nil { + return backend.Item{}, err + } + + return backend.Item{ + Key: key, + Value: value, + Expires: r.Expiry(), + Revision: r.GetRevision(), + }, nil +} diff --git a/lib/services/local/inventory.go b/lib/services/local/inventory.go index 667926e984d52..feb57132911ab 100644 --- a/lib/services/local/inventory.go +++ b/lib/services/local/inventory.go @@ -25,7 +25,7 @@ import ( "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/services/local/generic" ) // GetInstances iterates all teleport instances. @@ -49,8 +49,8 @@ func (s *PresenceService) GetInstances(ctx context.Context, req types.InstanceFi endKey := backend.RangeEnd(startKey) items := backend.StreamRange(ctx, s, startKey, endKey, pageSize) return stream.FilterMap(items, func(item backend.Item) (types.Instance, bool) { - var instance types.InstanceV1 - if err := utils.FastUnmarshal(item.Value, &instance); err != nil { + instance, err := generic.FastUnmarshal[*types.InstanceV1](item) + if err != nil { s.log.Warnf("Skipping instance at %s, failed to unmarshal: %v", item.Key, err) return nil, false } @@ -58,10 +58,10 @@ func (s *PresenceService) GetInstances(ctx context.Context, req types.InstanceFi s.log.Warnf("Skipping instance at %s: %v", item.Key, err) return nil, false } - if !req.Match(&instance) { + if !req.Match(instance) { return nil, false } - return &instance, true + return instance, true }) } @@ -72,8 +72,8 @@ func (s *PresenceService) getInstance(ctx context.Context, serverID string) (typ return nil, trace.Wrap(err) } - var instance types.InstanceV1 - if err := utils.FastUnmarshal(item.Value, &instance); err != nil { + instance, err := generic.FastUnmarshal[*types.InstanceV1](*item) + if err != nil { return nil, trace.BadParameter("failed to unmarshal instance %q: %v", serverID, err) } @@ -81,7 +81,7 @@ func (s *PresenceService) getInstance(ctx context.Context, serverID string) (typ return nil, trace.BadParameter("instance %q appears malformed: %v", serverID, err) } - return &instance, nil + return instance, nil } // UpsertInstance creates or updates an instance resource. @@ -102,19 +102,11 @@ func (s *PresenceService) UpsertInstance(ctx context.Context, instance types.Ins return trace.BadParameter("unexpected type %T, expected %T", instance, v1) } - rev := instance.GetRevision() - value, err := utils.FastMarshal(v1) + item, err := generic.FastMarshal(backend.Key(instancePrefix, instance.GetName()), v1) if err != nil { return trace.Errorf("failed to marshal Instance: %v", err) } - item := backend.Item{ - Key: backend.Key(instancePrefix, instance.GetName()), - Value: value, - Expires: instance.Expiry(), - Revision: rev, - } - _, err = s.Backend.Put(ctx, item) return trace.Wrap(err) diff --git a/lib/services/local/inventory_test.go b/lib/services/local/inventory_test.go index dcbd804c76042..5ee75a9a37f69 100644 --- a/lib/services/local/inventory_test.go +++ b/lib/services/local/inventory_test.go @@ -19,6 +19,7 @@ package local import ( "context" "testing" + "time" "github.com/google/uuid" "github.com/jonboulle/clockwork" @@ -30,6 +31,74 @@ import ( "github.com/gravitational/teleport/lib/backend/memory" ) +// TestInstanceEvents verifies that instance creation/deletion events are produced as expected. +func TestInstanceEvents(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + backend, err := memory.New(memory.Config{ + Context: ctx, + }) + require.NoError(t, err) + + defer backend.Close() + + presence := NewPresenceService(backend) + + events := NewEventsService(backend) + + watcher, err := events.NewWatcher(ctx, types.Watch{ + Kinds: []types.WatchKind{{ + Kind: types.KindInstance, + }}, + }) + require.NoError(t, err) + + names := []string{ + "server1", + "server2", + "server3", + "server4", + } + + for _, name := range names { + instance, err := types.NewInstance(uuid.NewString(), types.InstanceSpecV1{ + Hostname: name, + }) + require.NoError(t, err) + + err = presence.UpsertInstance(ctx, instance) + require.NoError(t, err) + } + + timeout := time.After(time.Second * 30) + + select { + case event := <-watcher.Events(): + require.Equal(t, types.OpInit, event.Type) + case <-watcher.Done(): + t.Fatalf("watcher closed unexpectedly") + case <-timeout: + t.Fatalf("timeout waiting for init event") + } + + for _, name := range names { + select { + case event := <-watcher.Events(): + require.Equal(t, types.OpPut, event.Type) + instance, ok := event.Resource.(*types.InstanceV1) + require.True(t, ok, "unexpected resource type: %T", event.Resource) + require.Equal(t, name, instance.GetHostname()) + case <-watcher.Done(): + t.Fatalf("watcher closed unexpectedly") + case <-timeout: + t.Fatalf("timeout waiting for instance %q creation event", name) + } + } +} + // TestInstanceUpsert verifies basic expected behavior of instance creation/update. func TestInstanceUpsert(t *testing.T) { t.Parallel()