Skip to content

Commit

Permalink
Cache remote clients in Connect (#38201)
Browse files Browse the repository at this point in the history
* Add remote client cache

* Add an integration test

* Close all clients when stopping the service

* Move RemoteClientCache to the place where it is used

* Do not check client cert in `Get`

* Fix code style issues

* Prevent potential race condition when removing a cached client

* Test concurrent calls to `Get`

* Add TODO

* `remoteclientcache` -> `clientcache`

* Reduce the `err` scope

* Move `Config` closer to `New` and docs

* Fix lint

* Improve logging and error handling

* Add missing comments

* `Close` -> `Clear`

* Improve the test

* Remove mentions about "remote" client

* Pass `cfg` directly to `Cache`

* `InvalidateForRootCluster` -> `ClearForRootCluster`

* Add docs for the interface

* `ClearForRootCluster` -> `ClearForRoot`

* Add config validation

* Log multiple fields at once

* Improve setting logger

* Use cached remote clients in Connect (#38202)

* Replace all simple `c.clusterClient.ConnectToProxy()` calls

* Use cached proxy client to create gateways

* Use cached proxy client to assume roles

* Invalidate clients when logging in and out

* Gracefully handle expired cert error returned by the server

* Drop `GetRootClusterURI` in headless auth watcher since URIs are already root URIs

* Simplify error check

* Make `auth.ClientI` parameter naming more consistent, use `root` prefix when needed

* Reduce error scope where possible

* Clear cached clients before passwordless login

* Use `fakeClientCache` without pointers

* Move separate `proxyClient` parameter to `CreateGatewayParams` in the gateways code

* Replace checking error string with `client.ErrClientCredentialsHaveExpired`
  • Loading branch information
gzdunek authored Feb 28, 2024
1 parent 78b47dc commit 39f9951
Show file tree
Hide file tree
Showing 20 changed files with 626 additions and 547 deletions.
80 changes: 80 additions & 0 deletions integration/teleterm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand Down Expand Up @@ -116,6 +117,12 @@ func TestTeleterm(t *testing.T) {
t.Parallel()
testDeleteConnectMyComputerNode(t, pack)
})

t.Run("TestClientCache", func(t *testing.T) {
t.Parallel()

testClientCache(t, pack, creds)
})
}

func testAddingRootCluster(t *testing.T, pack *dbhelpers.DatabasePack, creds *helpers.UserCreds) {
Expand Down Expand Up @@ -335,6 +342,79 @@ func testHeadlessWatcher(t *testing.T, pack *dbhelpers.DatabasePack, creds *help
)
}

func testClientCache(t *testing.T, pack *dbhelpers.DatabasePack, creds *helpers.UserCreds) {
ctx := context.Background()

tc := mustLogin(t, pack.Root.User.GetName(), pack, creds)

storageFakeClock := clockwork.NewFakeClockAt(time.Now())

storage, err := clusters.NewStorage(clusters.Config{
Dir: tc.KeysDir,
Clock: storageFakeClock,
InsecureSkipVerify: tc.InsecureSkipVerify,
})
require.NoError(t, err)

cluster, _, err := storage.Add(ctx, tc.WebProxyAddr)
require.NoError(t, err)

daemonService, err := daemon.New(daemon.Config{
Storage: storage,
CreateTshdEventsClientCredsFunc: func() (grpc.DialOption, error) {
return grpc.WithTransportCredentials(insecure.NewCredentials()), nil
},
KubeconfigsDir: t.TempDir(),
AgentsDir: t.TempDir(),
})
require.NoError(t, err)
t.Cleanup(func() {
daemonService.Stop()
})

// Check if parallel calls trying to get a client will return the same one.
eg, egCtx := errgroup.WithContext(ctx)
blocker := make(chan struct{})
const concurrentCalls = 5
concurrentCallsForClient := make([]*client.ProxyClient, concurrentCalls)
for i := range concurrentCallsForClient {
client := &concurrentCallsForClient[i]
eg.Go(func() error {
<-blocker
c, err := daemonService.GetCachedClient(egCtx, cluster.URI)
*client = c
return err
})
}
// unblock the operation which is still in progress
close(blocker)
require.NoError(t, eg.Wait())
require.Subset(t, concurrentCallsForClient[:1], concurrentCallsForClient[1:])

// Since we have a client in the cache, it should be returned.
secondCallForClient, err := daemonService.GetCachedClient(ctx, cluster.URI)
require.NoError(t, err)
require.Equal(t, concurrentCallsForClient[0], secondCallForClient)

// Let's remove the client from the cache.
// The call to GetCachedClient will
// connect to proxy and return a new client.
err = daemonService.ClearCachedClientsForRoot(cluster.URI)
require.NoError(t, err)
thirdCallForClient, err := daemonService.GetCachedClient(ctx, cluster.URI)
require.NoError(t, err)
require.NotEqual(t, secondCallForClient, thirdCallForClient)

// After closing the client (from our or a remote side)
// it will be removed from the cache.
// The call to GetCachedClient will connect to proxy and return a new client.
err = thirdCallForClient.Close()
require.NoError(t, err)
fourthCallForClient, err := daemonService.GetCachedClient(ctx, cluster.URI)
require.NoError(t, err)
require.NotEqual(t, thirdCallForClient, fourthCallForClient)
}

func testCreateConnectMyComputerRole(t *testing.T, pack *dbhelpers.DatabasePack) {
systemUser, err := user.Current()
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion lib/teleterm/apiserver/handler/handler_apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ func (s *Handler) GetApps(ctx context.Context, req *api.GetAppsRequest) (*api.Ge
return nil, trace.Wrap(err)
}

resp, err := cluster.GetApps(ctx, req)
proxyClient, err := s.DaemonService.GetCachedClient(ctx, cluster.URI)
if err != nil {
return nil, trace.Wrap(err)
}

resp, err := cluster.GetApps(ctx, proxyClient.CurrentCluster(), req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
8 changes: 8 additions & 0 deletions lib/teleterm/apiserver/handler/handler_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s *Handler) Login(ctx context.Context, req *api.LoginRequest) (*api.EmptyR
// added by daemon.Service.ResolveClusterURI.
clusterClient.MFAPromptConstructor = nil

if err = s.DaemonService.ClearCachedClientsForRoot(cluster.URI); err != nil {
return nil, trace.Wrap(err)
}

if req.Params == nil {
return nil, trace.BadParameter("missing login parameters")
}
Expand Down Expand Up @@ -84,6 +88,10 @@ func (s *Handler) LoginPasswordless(stream api.TerminalService_LoginPasswordless
// daemon.Service.ResolveClusterURI.
clusterClient.MFAPromptConstructor = nil

if err := s.DaemonService.ClearCachedClientsForRoot(cluster.URI); err != nil {
return trace.Wrap(err)
}

// Start the prompt flow.
if err := cluster.PasswordlessLogin(stream.Context(), stream); err != nil {
return trace.Wrap(err)
Expand Down
16 changes: 13 additions & 3 deletions lib/teleterm/apiserver/handler/handler_databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ import (
"github.com/gravitational/teleport/lib/teleterm/clusters"
)

// GetDatabases gets databses with filters and returns paginated results
// GetDatabases gets databases with filters and returns paginated results
func (s *Handler) GetDatabases(ctx context.Context, req *api.GetDatabasesRequest) (*api.GetDatabasesResponse, error) {
cluster, _, err := s.DaemonService.ResolveCluster(req.ClusterUri)
if err != nil {
return nil, trace.Wrap(err)
}

resp, err := cluster.GetDatabases(ctx, req)
proxyClient, err := s.DaemonService.GetCachedClient(ctx, cluster.URI)
if err != nil {
return nil, trace.Wrap(err)
}

resp, err := cluster.GetDatabases(ctx, proxyClient.CurrentCluster(), req)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -63,7 +68,12 @@ func (s *Handler) ListDatabaseUsers(ctx context.Context, req *api.ListDatabaseUs
return nil, trace.Wrap(err)
}

dbUsers, err := cluster.GetAllowedDatabaseUsers(ctx, req.DbUri)
proxyClient, err := s.DaemonService.GetCachedClient(ctx, cluster.URI)
if err != nil {
return nil, trace.Wrap(err)
}

dbUsers, err := cluster.GetAllowedDatabaseUsers(ctx, proxyClient.CurrentCluster(), req.DbUri)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
11 changes: 11 additions & 0 deletions lib/teleterm/apiserver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ package apiserver

import (
"context"
"errors"

"github.com/gravitational/trace"
"github.com/gravitational/trace/trail"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"

"github.com/gravitational/teleport/api/client"
)

// withErrorHandling is gRPC middleware that maps internal errors to proper gRPC error codes
Expand All @@ -38,6 +41,14 @@ func withErrorHandling(log logrus.FieldLogger) grpc.UnaryServerInterceptor {
resp, err := handler(ctx, req)
if err != nil {
log.WithError(err).Error("Request failed.")
// A stop gap solution that allows us to show a relogin modal when we
// receive an error from the server saying that the cert is expired.
// Read more: https://github.com/gravitational/teleport/pull/38202#discussion_r1497181659
// TODO(gzdunek): fix when addressing https://github.com/gravitational/teleport/issues/32550
if errors.Is(err, client.ErrClientCredentialsHaveExpired) {
return resp, trail.ToGRPC(err)
}

// do not return a full error stack on access denied errors
if trace.IsAccessDenied(err) {
return resp, trail.ToGRPC(trace.AccessDenied("access denied"))
Expand Down
37 changes: 5 additions & 32 deletions lib/teleterm/clusters/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Cluster) Connected() bool {
// GetWithDetails makes requests to the auth server to return details of the current
// Cluster that cannot be found on the disk only, including details about the user
// and enabled enterprise features. This method requires a valid cert.
func (c *Cluster) GetWithDetails(ctx context.Context) (*ClusterWithDetails, error) {
func (c *Cluster) GetWithDetails(ctx context.Context, authClient auth.ClientI) (*ClusterWithDetails, error) {
var (
authPingResponse proto.PingResponse
caps *types.AccessCapabilities
Expand All @@ -97,20 +97,8 @@ func (c *Cluster) GetWithDetails(ctx context.Context) (*ClusterWithDetails, erro
return nil, trace.Wrap(err)
}

//TODO(gzdunek): These calls should be done in parallel.
err = AddMetadataToRetryableError(ctx, func() error {
//nolint:staticcheck // SA1019. TODO(tross) update to use ClusterClient
proxyClient, err := c.clusterClient.ConnectToProxy(ctx)
if err != nil {
return trace.Wrap(err)
}
defer proxyClient.Close()

authClient, err := proxyClient.ConnectToCluster(ctx, c.clusterClient.SiteName)
if err != nil {
return trace.Wrap(err)
}
defer authClient.Close()

authPingResponse, err = authClient.Ping(ctx)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -218,12 +206,10 @@ func (c *Cluster) GetRoles(ctx context.Context) ([]*types.Role, error) {
}

// GetRequestableRoles returns the requestable roles for the currently logged-in user
func (c *Cluster) GetRequestableRoles(ctx context.Context, req *api.GetRequestableRolesRequest) (*types.AccessCapabilities, error) {
func (c *Cluster) GetRequestableRoles(ctx context.Context, req *api.GetRequestableRolesRequest, authClient auth.ClientI) (*types.AccessCapabilities, error) {
var (
authClient auth.ClientI
proxyClient *client.ProxyClient
err error
response *types.AccessCapabilities
err error
response *types.AccessCapabilities
)

resourceIds := make([]types.ResourceID, 0, len(req.GetResourceIds()))
Expand All @@ -237,19 +223,6 @@ func (c *Cluster) GetRequestableRoles(ctx context.Context, req *api.GetRequestab
}

err = AddMetadataToRetryableError(ctx, func() error {
//nolint:staticcheck // SA1019. TODO(tross) update to use ClusterClient
proxyClient, err = c.clusterClient.ConnectToProxy(ctx)
if err != nil {
return trace.Wrap(err)
}
defer proxyClient.Close()

authClient, err = proxyClient.ConnectToCluster(ctx, c.clusterClient.SiteName)
if err != nil {
return trace.Wrap(err)
}
defer authClient.Close()

response, err = authClient.GetAccessCapabilities(ctx, types.AccessCapabilitiesRequest{
ResourceIDs: resourceIds,
RequestableRoles: true,
Expand Down
Loading

0 comments on commit 39f9951

Please sign in to comment.