Skip to content

Commit

Permalink
instance event plumbing (#43776) (#44287)
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall authored Jul 23, 2024
1 parent d01626e commit 84b27af
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 17 deletions.
28 changes: 28 additions & 0 deletions lib/services/local/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local/generic"
)

// EventsService implements service to watch for events
Expand Down Expand Up @@ -192,6 +193,8 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type
parser = newAccessListReviewParser()
case types.KindKubeWaitingContainer:
parser = newKubeWaitingContainerParser()
case types.KindInstance:
parser = newInstanceParser()
default:
if watch.AllowPartialSuccess {
continue
Expand Down Expand Up @@ -1941,6 +1944,31 @@ func (p *kubeWaitingContainerParser) parse(event backend.Event) (types.Resource,
}
}

func newInstanceParser() *instanceParser {
return &instanceParser{
baseParser: newBaseParser(backend.Key(instancePrefix)),
}
}

type instanceParser struct {
baseParser
}

func (p *instanceParser) parse(event backend.Event) (types.Resource, error) {
switch event.Type {
case types.OpDelete:
return resourceHeader(event, types.KindInstance, types.V1, 0)
case types.OpPut:
instance, err := generic.FastUnmarshal[*types.InstanceV1](event.Item)
if err != nil {
return nil, trace.Wrap(err)
}
return instance, nil
default:
return nil, trace.BadParameter("event %v is not supported", event.Type)
}
}

func resourceHeader(event backend.Event, kind, version string, offset int) (types.Resource, error) {
name, err := base(event.Item.Key, offset)
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions lib/services/local/generic/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Teleport
* Copyright (C) 2023 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package generic

import (
"time"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/utils"
)

// UnmarshalableResource represents a resource that can have all the necessary fields from backend.Item
// set in a generic context.
type UnmarshalableResource interface {
SetExpiry(time.Time)
SetRevision(string)
}

// FastUnmarshal is a generic helper used to unmarshal a resoruce from a backend.Item and
// set the Expiry and Revision fields. It isn't compatible with the standard Unmarshal function
// signature used elsewhere and therefore may not be the best choice for all use cases, but it
// has the benefit of being simpler to use and not requiring the caller to undergo the revision/expiry
// ceremony at each call site.
func FastUnmarshal[T UnmarshalableResource](item backend.Item) (T, error) {
var r T
if err := utils.FastUnmarshal(item.Value, &r); err != nil {
return r, err
}

r.SetExpiry(item.Expires)
r.SetRevision(item.Revision)
return r, nil
}

type MarshalableResource interface {
Expiry() time.Time
GetRevision() string
}

// FastMarshal is a generic helper used to marshal a resource to a backend.Item and
// set the Expiry and Revision fields. It isn't compatible with the standard Marshal function
// signature used elsewhere and therefore may not be the best choice for all use cases, but it
// has the benefit of being simpler to use and not requiring the caller to undergo the revision/expiry
// ceremony at each call site.
func FastMarshal[T MarshalableResource](key []byte, r T) (backend.Item, error) {
value, err := utils.FastMarshal(r)
if err != nil {
return backend.Item{}, err
}

return backend.Item{
Key: key,
Value: value,
Expires: r.Expiry(),
Revision: r.GetRevision(),
}, nil
}
26 changes: 9 additions & 17 deletions lib/services/local/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/services/local/generic"
)

// GetInstances iterates all teleport instances.
Expand All @@ -49,19 +49,19 @@ func (s *PresenceService) GetInstances(ctx context.Context, req types.InstanceFi
endKey := backend.RangeEnd(startKey)
items := backend.StreamRange(ctx, s, startKey, endKey, pageSize)
return stream.FilterMap(items, func(item backend.Item) (types.Instance, bool) {
var instance types.InstanceV1
if err := utils.FastUnmarshal(item.Value, &instance); err != nil {
instance, err := generic.FastUnmarshal[*types.InstanceV1](item)
if err != nil {
s.log.Warnf("Skipping instance at %s, failed to unmarshal: %v", item.Key, err)
return nil, false
}
if err := instance.CheckAndSetDefaults(); err != nil {
s.log.Warnf("Skipping instance at %s: %v", item.Key, err)
return nil, false
}
if !req.Match(&instance) {
if !req.Match(instance) {
return nil, false
}
return &instance, true
return instance, true
})
}

Expand All @@ -72,16 +72,16 @@ func (s *PresenceService) getInstance(ctx context.Context, serverID string) (typ
return nil, trace.Wrap(err)
}

var instance types.InstanceV1
if err := utils.FastUnmarshal(item.Value, &instance); err != nil {
instance, err := generic.FastUnmarshal[*types.InstanceV1](*item)
if err != nil {
return nil, trace.BadParameter("failed to unmarshal instance %q: %v", serverID, err)
}

if err := instance.CheckAndSetDefaults(); err != nil {
return nil, trace.BadParameter("instance %q appears malformed: %v", serverID, err)
}

return &instance, nil
return instance, nil
}

// UpsertInstance creates or updates an instance resource.
Expand All @@ -102,19 +102,11 @@ func (s *PresenceService) UpsertInstance(ctx context.Context, instance types.Ins
return trace.BadParameter("unexpected type %T, expected %T", instance, v1)
}

rev := instance.GetRevision()
value, err := utils.FastMarshal(v1)
item, err := generic.FastMarshal(backend.Key(instancePrefix, instance.GetName()), v1)
if err != nil {
return trace.Errorf("failed to marshal Instance: %v", err)
}

item := backend.Item{
Key: backend.Key(instancePrefix, instance.GetName()),
Value: value,
Expires: instance.Expiry(),
Revision: rev,
}

_, err = s.Backend.Put(ctx, item)

return trace.Wrap(err)
Expand Down
69 changes: 69 additions & 0 deletions lib/services/local/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package local
import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
Expand All @@ -30,6 +31,74 @@ import (
"github.com/gravitational/teleport/lib/backend/memory"
)

// TestInstanceEvents verifies that instance creation/deletion events are produced as expected.
func TestInstanceEvents(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

backend, err := memory.New(memory.Config{
Context: ctx,
})
require.NoError(t, err)

defer backend.Close()

presence := NewPresenceService(backend)

events := NewEventsService(backend)

watcher, err := events.NewWatcher(ctx, types.Watch{
Kinds: []types.WatchKind{{
Kind: types.KindInstance,
}},
})
require.NoError(t, err)

names := []string{
"server1",
"server2",
"server3",
"server4",
}

for _, name := range names {
instance, err := types.NewInstance(uuid.NewString(), types.InstanceSpecV1{
Hostname: name,
})
require.NoError(t, err)

err = presence.UpsertInstance(ctx, instance)
require.NoError(t, err)
}

timeout := time.After(time.Second * 30)

select {
case event := <-watcher.Events():
require.Equal(t, types.OpInit, event.Type)
case <-watcher.Done():
t.Fatalf("watcher closed unexpectedly")
case <-timeout:
t.Fatalf("timeout waiting for init event")
}

for _, name := range names {
select {
case event := <-watcher.Events():
require.Equal(t, types.OpPut, event.Type)
instance, ok := event.Resource.(*types.InstanceV1)
require.True(t, ok, "unexpected resource type: %T", event.Resource)
require.Equal(t, name, instance.GetHostname())
case <-watcher.Done():
t.Fatalf("watcher closed unexpectedly")
case <-timeout:
t.Fatalf("timeout waiting for instance %q creation event", name)
}
}
}

// TestInstanceUpsert verifies basic expected behavior of instance creation/update.
func TestInstanceUpsert(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 84b27af

Please sign in to comment.