Skip to content

Commit

Permalink
Add UI changes for kube exec functionality. (#41466)
Browse files Browse the repository at this point in the history
* Add UI changes for kube exec functionality.

* Remove unneeded commented code.

* Revert imports changes for ResourceActionButton.tsx

* Add type annotation to DocumentKubeExecWrapper props

* split flow of kube exec

* Introduce status 'waiting-for-exec-data'.

* Move KubeExecDataWaitTimeout closer to where it's used.

* Wrap error.

* Change session login.

* Reduce error's scope.

Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com>

* Use any instead of interface{}

Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com>

* reduce the scope of the error

Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com>

* Fix UI tests.

* Validate pod exec request on the backend.

* Add more clear separation between kube CLI access instructions and WebUI exec.

* Update ui snapshots.

* Fix after merge from master.

---------

Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com>
  • Loading branch information
AntonAM and rosstimothy authored Jun 7, 2024
1 parent c2d9a05 commit 55de5cf
Show file tree
Hide file tree
Showing 28 changed files with 1,475 additions and 166 deletions.
32 changes: 26 additions & 6 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import (
"github.com/gravitational/teleport/lib"
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/modules"
Expand Down Expand Up @@ -1675,25 +1676,44 @@ func testKubeExecWeb(t *testing.T, suite *KubeSuite) {

// Login and run the tests.
webPack := helpers.LoginWebClient(t, proxyAddr.String(), testUser, userPassword)
endpoint, err := url.JoinPath("sites", "$site", "kube", kubeClusterName, "connect/ws") // :site/kube/:clusterName/connect/ws
require.NoError(t, err)
endpoint := "sites/$site/kube/exec/ws"

openWebsocketAndReadSession := func(t *testing.T, endpoint string, req web.PodExecRequest) *websocket.Conn {
ws, resp, err := webPack.OpenWebsocket(t, endpoint, req)
termSize := struct {
Term session.TerminalParams `json:"term"`
}{
Term: session.TerminalParams{W: req.Term.W, H: req.Term.H},
}
ws, resp, err := webPack.OpenWebsocket(t, endpoint, termSize)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

_, data, err := ws.ReadMessage()
data, err := json.Marshal(req)
require.NoError(t, err)

reqEnvelope := &terminal.Envelope{
Version: defaults.WebsocketVersion,
Type: defaults.WebsocketKubeExec,
Payload: string(data),
}

envelopeBytes, err := proto.Marshal(reqEnvelope)
require.NoError(t, err)

err = ws.WriteMessage(websocket.BinaryMessage, envelopeBytes)
require.NoError(t, err)

_, data, err = ws.ReadMessage()
require.NoError(t, err)
require.Equal(t, `{"type":"create_session_response","status":"ok"}`+"\n", string(data))

execSocket := executionWebsocketReader{ws}

// First message: session metadata
envelope, err := execSocket.Read()
sessionEnvelope, err := execSocket.Read()
require.NoError(t, err)
var sessionMetadata sessionMetadataResponse
require.NoError(t, json.Unmarshal([]byte(envelope.Payload), &sessionMetadata))
require.NoError(t, json.Unmarshal([]byte(sessionEnvelope.Payload), &sessionMetadata))

return ws
}
Expand Down
3 changes: 3 additions & 0 deletions lib/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ const (

// WebsocketLatency provides latency information for a session.
WebsocketLatency = "l"

// WebsocketKubeExec provides latency information for a session.
WebsocketKubeExec = "k"
)

// The following are cryptographic primitives Teleport does not support in
Expand Down
71 changes: 65 additions & 6 deletions lib/web/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sync/atomic"
"time"

gogoproto "github.com/gogo/protobuf/proto"
"github.com/google/safetext/shsprintf"
"github.com/google/uuid"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -774,7 +775,7 @@ func (h *Handler) bindDefaultEndpoints() {
h.GET("/webapi/sites/:site/connect/ws", h.WithClusterAuthWebSocket(h.siteNodeConnect)) // connect to an active session (via websocket, with auth over websocket)
h.GET("/webapi/sites/:site/sessions", h.WithClusterAuth(h.clusterActiveAndPendingSessionsGet)) // get list of active and pending sessions

h.GET("/webapi/sites/:site/kube/:clusterName/connect/ws", h.WithClusterAuthWebSocket(h.podConnect)) // connect to a pod with exec (via websocket, with auth over websocket)
h.GET("/webapi/sites/:site/kube/exec/ws", h.WithClusterAuthWebSocket(h.podConnect)) // connect to a pod with exec (via websocket, with auth over websocket)

// Audit events handlers.
h.GET("/webapi/sites/:site/events/search", h.WithClusterAuth(h.clusterSearchEvents)) // search site events
Expand Down Expand Up @@ -3370,6 +3371,11 @@ func (h *Handler) siteNodeConnect(
return nil, nil
}

type podConnectParams struct {
// Term is the initial PTY size.
Term session.TerminalParams `json:"term"`
}

func (h *Handler) podConnect(
w http.ResponseWriter,
r *http.Request,
Expand All @@ -3379,13 +3385,29 @@ func (h *Handler) podConnect(
ws *websocket.Conn,
) (interface{}, error) {
q := r.URL.Query()
params := q.Get("params")
if params == "" {
if q.Get("params") == "" {
return nil, trace.BadParameter("missing params")
}
var params podConnectParams
if err := json.Unmarshal([]byte(q.Get("params")), &params); err != nil {
return nil, trace.Wrap(err)
}

var execReq PodExecRequest
if err := json.Unmarshal([]byte(params), &execReq); err != nil {
execReq, err := readPodExecRequestFromWS(ws)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || terminal.IsOKWebsocketCloseError(trace.Unwrap(err)) {
return nil, nil
}
var netError net.Error
if errors.As(trace.Unwrap(err), &netError) && netError.Timeout() {
return nil, trace.BadParameter("timed out waiting for pod exec request data on websocket connection")
}

return nil, trace.Wrap(err)
}
execReq.Term = params.Term

if err := execReq.Validate(); err != nil {
return nil, trace.Wrap(err)
}

Expand All @@ -3405,7 +3427,7 @@ func (h *Handler) podConnect(

sess := session.Session{
Kind: types.KubernetesSessionKind,
Login: sctx.GetUser(),
Login: "root",
ClusterName: clusterName,
KubernetesClusterName: execReq.KubeCluster,
Moderated: accessEvaluator.IsModerated(),
Expand All @@ -3414,6 +3436,7 @@ func (h *Handler) podConnect(
LastActive: h.clock.Now().UTC(),
Namespace: apidefaults.Namespace,
Owner: sctx.GetUser(),
Command: execReq.Command,
}

h.log.Debugf("New kube exec request for namespace=%s pod=%s container=%s, sid=%s, websid=%s.",
Expand Down Expand Up @@ -3462,6 +3485,42 @@ func (h *Handler) podConnect(
return nil, nil
}

// KubeExecDataWaitTimeout is how long server would wait for user to send pod exec data (namespace, pod name etc)
// on websocket connection, after user initiated the exec into pod flow.
const KubeExecDataWaitTimeout = defaults.HeadlessLoginTimeout

func readPodExecRequestFromWS(ws *websocket.Conn) (*PodExecRequest, error) {
err := ws.SetReadDeadline(time.Now().Add(KubeExecDataWaitTimeout))
if err != nil {
return nil, trace.Wrap(err, "failed to set read deadline for websocket connection")
}

messageType, bytes, err := ws.ReadMessage()
if err != nil {
return nil, trace.Wrap(err)
}

if err := ws.SetReadDeadline(time.Time{}); err != nil {
return nil, trace.Wrap(err, "failed to set read deadline for websocket connection")
}

if messageType != websocket.BinaryMessage {
return nil, trace.BadParameter("Expected binary message of type websocket.BinaryMessage, got %v", messageType)
}

var envelope terminal.Envelope
if err := gogoproto.Unmarshal(bytes, &envelope); err != nil {
return nil, trace.BadParameter("Failed to parse envelope: %v", err)
}

var req PodExecRequest
if err := json.Unmarshal([]byte(envelope.Payload), &req); err != nil {
return nil, trace.Wrap(err)
}

return &req, nil
}

func (h *Handler) getKubeExecClusterData(netConfig types.ClusterNetworkingConfig) (string, string, error) {
if netConfig.GetProxyListenerMode() == types.ProxyListenerMode_Separate {
return "https://" + h.kubeProxyHostPort(), "", nil
Expand Down
113 changes: 105 additions & 8 deletions lib/web/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type podHandler struct {
teleportCluster string
configTLSServerName string
configServerAddr string
req PodExecRequest
req *PodExecRequest
sess session.Session
sctx *SessionContext
ws *websocket.Conn
Expand All @@ -75,6 +75,8 @@ type podHandler struct {
// PodExecRequest describes a request to create a web-based terminal
// to exec into a pod.
type PodExecRequest struct {
// KubeCluster specifies what Kubernetes cluster to connect to.
KubeCluster string `json:"kubeCluster"`
// Namespace is the namespace of the target pod
Namespace string `json:"namespace"`
// Pod is the target pod to connect to.
Expand All @@ -83,14 +85,41 @@ type PodExecRequest struct {
Container string `json:"container"`
// Command is the command to run at the target pod.
Command string `json:"command"`
// KubeCluster specifies what Kubernetes cluster to connect to.
KubeCluster string `json:"kube_cluster"`
// IsInteractive specifies whether exec request should have interactive TTY.
IsInteractive bool `json:"is_interactive"`
IsInteractive bool `json:"isInteractive"`
// Term is the initial PTY size.
Term session.TerminalParams `json:"term"`
}

func (r *PodExecRequest) Validate() error {
if r.KubeCluster == "" {
return trace.BadParameter("missing parameter KubeCluster")
}
if r.Namespace == "" {
return trace.BadParameter("missing parameter Namespace")
}
if r.Pod == "" {
return trace.BadParameter("missing parameter Pod")
}
if r.Command == "" {
return trace.BadParameter("missing parameter Command")
}
if len(r.Namespace) > 63 {
return trace.BadParameter("Namespace is too long, maximum length is 63 characters")
}
if len(r.Pod) > 63 {
return trace.BadParameter("Pod is too long, maximum length is 63 characters")
}
if len(r.Container) > 63 {
return trace.BadParameter("Container is too long, maximum length is 63 characters")
}
if len(r.Command) > 10000 {
return trace.BadParameter("Command is too long, maximum length is 10000 characters")
}

return nil
}

// ServeHTTP sends session metadata to web UI to signal beginning of the session, then
// handles Kube exec request and connects it to web based terminal input/output.
func (p *podHandler) ServeHTTP(_ http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -192,7 +221,11 @@ func (p *podHandler) handler(r *http.Request) error {
TLSCert: p.sctx.cfg.Session.GetTLSCert(),
}

stream := terminal.NewStream(ctx, terminal.StreamConfig{WS: p.ws, Logger: p.log})
resizeQueue := newTermSizeQueue(ctx, remotecommand.TerminalSize{
Width: p.req.Term.Winsize().Width,
Height: p.req.Term.Winsize().Height,
})
stream := terminal.NewStream(ctx, terminal.StreamConfig{WS: p.ws, Logger: p.log, Handlers: map[string]terminal.WSHandlerFunc{defaults.WebsocketResize: p.handleResize(resizeQueue)}})

certsReq := clientproto.UserCertsRequest{
PublicKey: userKey.MarshalSSHPublicKey(),
Expand Down Expand Up @@ -267,13 +300,15 @@ func (p *podHandler) handler(r *http.Request) error {
}

streamOpts := remotecommand.StreamOptions{
Stdin: stream,
Stdout: stream,
Tty: p.req.IsInteractive,
Stdin: stream,
Stdout: stream,
Tty: p.req.IsInteractive,
TerminalSizeQueue: resizeQueue,
}
if !p.req.IsInteractive {
streamOpts.Stderr = stderrWriter{stream: stream}
}

if err := wsExec.StreamWithContext(ctx, streamOpts); err != nil {
return trace.Wrap(err, "failed exec command streaming")
}
Expand Down Expand Up @@ -302,6 +337,68 @@ func (p *podHandler) handler(r *http.Request) error {
return nil
}

func (p *podHandler) handleResize(termSizeQueue *termSizeQueue) func(context.Context, terminal.Envelope) {
return func(ctx context.Context, envelope terminal.Envelope) {
var e map[string]any
if err := json.Unmarshal([]byte(envelope.Payload), &e); err != nil {
p.log.Warnf("Failed to parse resize payload: %v", err)
return
}

size, ok := e["size"].(string)
if !ok {
p.log.Errorf("expected size to be of type string, got type %T instead", size)
return
}

params, err := session.UnmarshalTerminalParams(size)
if err != nil {
p.log.Warnf("Failed to retrieve terminal size: %v", err)
return
}

// nil params indicates the channel was closed
if params == nil {
return
}

termSizeQueue.AddSize(remotecommand.TerminalSize{
Width: params.Winsize().Width,
Height: params.Winsize().Height,
})
}
}

type termSizeQueue struct {
incoming chan remotecommand.TerminalSize
ctx context.Context
}

func newTermSizeQueue(ctx context.Context, initialSize remotecommand.TerminalSize) *termSizeQueue {
queue := &termSizeQueue{
incoming: make(chan remotecommand.TerminalSize, 1),
ctx: ctx,
}
queue.AddSize(initialSize)
return queue
}

func (r *termSizeQueue) Next() *remotecommand.TerminalSize {
select {
case <-r.ctx.Done():
return nil
case size := <-r.incoming:
return &size
}
}

func (r *termSizeQueue) AddSize(term remotecommand.TerminalSize) {
select {
case <-r.ctx.Done():
case r.incoming <- term:
}
}

func createKubeRestConfig(serverAddr, tlsServerName string, ca types.CertAuthority, clientCert, rsaKey []byte) (*rest.Config, error) {
var clusterCACerts [][]byte
for _, keyPair := range ca.GetTrustedTLSKeyPairs() {
Expand Down
4 changes: 2 additions & 2 deletions lib/web/terminal/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (t *WSStream) SetReadDeadline(deadline time.Time) error {
return t.WSConn.SetReadDeadline(deadline)
}

func isOKWebsocketCloseError(err error) bool {
func IsOKWebsocketCloseError(err error) bool {
return websocket.IsCloseError(err,
websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
Expand All @@ -145,7 +145,7 @@ func (t *WSStream) processMessages(ctx context.Context) {
default:
ty, bytes, err := t.WSConn.ReadMessage()
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || isOKWebsocketCloseError(err) {
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || IsOKWebsocketCloseError(err) {
return
}

Expand Down
7 changes: 7 additions & 0 deletions web/packages/build/vite/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ export function createViteConfig(
secure: false,
ws: true,
},
// /webapi/sites/:site/kube/exec
[`^\\/v1\\/webapi\\/sites\\/${siteName}\\/kube/exec`]: {
target: `wss://${target}`,
changeOrigin: false,
secure: false,
ws: true,
},
// /webapi/sites/:site/desktopplayback/:sid
'^\\/v1\\/webapi\\/sites\\/(.*?)\\/desktopplayback\\/(.*?)': {
target: `wss://${target}`,
Expand Down
Loading

0 comments on commit 55de5cf

Please sign in to comment.