diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index 27f320dec31f3..2bbb95ffa4a6f 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -40,7 +40,6 @@ import ( apimachinerytypes "k8s.io/apimachinery/pkg/types" kubeapitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/remotecommand" watchtools "k8s.io/client-go/tools/watch" @@ -518,7 +517,7 @@ func (s *session) checkPresence() error { // launch waits until the session meets access requirements and then transitions the session // to a running state. -func (s *session) launch(isEphemeralCont bool) (returnErr error) { +func (s *session) launch(ephemeralContainerStatus *corev1.ContainerStatus) (returnErr error) { defer func() { err := s.Close() if err != nil { @@ -640,44 +639,29 @@ func (s *session) launch(isEphemeralCont bool) (returnErr error) { } s.io.On() - if streamErr := executor.StreamWithContext(s.streamContext, options); streamErr != nil { - if !isEphemeralCont { - return trace.Wrap(streamErr) - } - - // If attaching to the container failed, check if the container - // is terminated. If it is, try to stream the logs. If it's not - // terminated or can't be found return the original error. - clientSet, _, err := s.forwarder.impersonatedKubeClient(&s.sess.authContext, s.req.Header) - if err != nil { - return trace.Wrap(err) - } - podClient := clientSet.CoreV1().Pods(namespace) + // If the container is ephemeral and already terminated, we should + // retrieve the logs and return early. + if ephemeralContainerStatus != nil && ephemeralContainerStatus.State.Terminated != nil { + err := s.retrieveAlreadyStoppedPodLogs( + namespace, + podName, + container, + ) + return trace.Wrap(err) + } - pod, err := podClient.Get(s.forwarder.ctx, podName, metav1.GetOptions{}) - if err != nil { - return trace.Wrap(err) - } - status := getEphemeralContainerStatusByName(pod, container) - if status == nil { - // the container couldn't be found in the pod, return the - // original command streaming error + if streamErr := executor.StreamWithContext(s.streamContext, options); streamErr != nil { + // If the container isn't ephemeral, return the error. + if ephemeralContainerStatus == nil { return trace.Wrap(streamErr) } - if status.State.Terminated != nil { - if err := s.retrieveAlreadyStoppedPodLogs( - podClient, - namespace, - podName, - container, - ); err != nil { - return trace.Wrap(err) - } - - return nil - } - - return trace.Wrap(streamErr) + fmt.Fprintf(s.io, "\r\nwarning: couldn't attach to pod/%s, falling back to streaming logs: %v\r\n", podName, streamErr) + err := s.retrieveAlreadyStoppedPodLogs( + namespace, + podName, + container, + ) + return trace.Wrap(err) } return nil @@ -1052,7 +1036,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { } // createEphemeralContainer creates an ephemeral container and waits for it to start. -func (s *session) createEphemeralContainer() (bool, error) { +func (s *session) createEphemeralContainer() (*corev1.ContainerStatus, error) { initUser := s.parties[s.initiator] username := initUser.Ctx.Identity.GetIdentity().Username namespace := s.params.ByName("podNamespace") @@ -1070,9 +1054,9 @@ func (s *session) createEphemeralContainer() (bool, error) { }, ) if trace.IsNotFound(err) { - return false, nil + return nil, nil } else if err != nil { - return false, trace.Wrap(err) + return nil, trace.Wrap(err) } if err = s.forwarder.cfg.AuthClient.DeleteKubernetesWaitingContainer( @@ -1085,16 +1069,12 @@ func (s *session) createEphemeralContainer() (bool, error) { ContainerName: container, }, ); err != nil { - return false, trace.Wrap(err) + return nil, trace.Wrap(err) } s.log.Debugf("Creating ephemeral container %s on pod %s", container, podName) - err = s.patchAndWaitForPodEphemeralContainer(s.forwarder.ctx, &initUser.Ctx, s.req.Header, waitingCont) - if err != nil { - return false, trace.Wrap(err) - } - - return true, nil + containerStatus, err := s.patchAndWaitForPodEphemeralContainer(s.forwarder.ctx, &initUser.Ctx, s.req.Header, waitingCont) + return containerStatus, trace.Wrap(err) } func (s *session) BroadcastMessage(format string, args ...any) { @@ -1399,12 +1379,17 @@ func (s *session) getSessionMetadata() apievents.SessionMetadata { // patchPodWithEphemeralContainer creates an ephemeral container and waits // for it to start. -func (s *session) patchAndWaitForPodEphemeralContainer(ctx context.Context, authCtx *authContext, headers http.Header, waitingCont *kubewaitingcontainerpb.KubernetesWaitingContainer) error { +func (s *session) patchAndWaitForPodEphemeralContainer( + ctx context.Context, + authCtx *authContext, + headers http.Header, + waitingCont *kubewaitingcontainerpb.KubernetesWaitingContainer, +) (containerStatus *corev1.ContainerStatus, err error) { fmt.Fprintf(s.io, "\r\nCreating ephemeral container %s in pod %s/%s\r\n", waitingCont.Spec.ContainerName, waitingCont.Spec.Namespace, waitingCont.Spec.PodName) clientSet, _, err := s.forwarder.impersonatedKubeClient(authCtx, headers) if err != nil { - return trace.Wrap(err) + return nil, trace.Wrap(err) } podClient := clientSet.CoreV1().Pods(authCtx.kubeResource.Namespace) result, err := podClient.Patch(ctx, @@ -1414,7 +1399,7 @@ func (s *session) patchAndWaitForPodEphemeralContainer(ctx context.Context, auth metav1.PatchOptions{}, "ephemeralcontainers") if err != nil { - return trace.Wrap(err) + return nil, trace.Wrap(err) } fmt.Fprintf(s.io, "Pod %s/%s successfully patched. Waiting for container to become ready.\r\n", @@ -1452,21 +1437,31 @@ func (s *session) patchAndWaitForPodEphemeralContainer(ctx context.Context, auth return false, nil } if s.State.Running != nil || s.State.Terminated != nil { + containerStatus = s return true, nil } return false, nil }) if err != nil { - return trace.Wrap(err) + return nil, trace.Wrap(err) } fmt.Fprintf(s.io, "Ephemeral container %s is ready.\r\n", waitingCont.Spec.ContainerName) - return nil + return containerStatus, nil } // retrieveAlreadyStoppedPodLogs retrieves the logs of a stopped pod and writes them to the session's io writer. -func (s *session) retrieveAlreadyStoppedPodLogs(podClient clientv1.PodInterface, namespace, podName, container string) error { +func (s *session) retrieveAlreadyStoppedPodLogs(namespace, podName, container string) error { + // If attaching to the container failed, check if the container + // is terminated. If it is, try to stream the logs. If it's not + // terminated or can't be found return the original error. + clientSet, _, err := s.forwarder.impersonatedKubeClient(&s.sess.authContext, s.req.Header) + if err != nil { + return trace.Wrap(err) + } + podClient := clientSet.CoreV1().Pods(namespace) + fmt.Fprintf(s.io, "Failed to attach to the container, attempting to stream logs instead...\r\n") req := podClient.GetLogs(podName, &corev1.PodLogOptions{Container: container}) r, err := req.Stream(s.streamContext) @@ -1474,6 +1469,7 @@ func (s *session) retrieveAlreadyStoppedPodLogs(podClient clientv1.PodInterface, return trace.Wrap(err) } if _, err := io.Copy(s.io, r); err != nil { + _ = r.Close() return trace.Wrap(err) } return trace.Wrap(r.Close())