Skip to content

Commit

Permalink
deflake tests and simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrato authored and github-actions committed Jun 6, 2024
1 parent 4cc6df1 commit 45e93c7
Showing 1 changed file with 49 additions and 53 deletions.
102 changes: 49 additions & 53 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
apimachinerytypes "k8s.io/apimachinery/pkg/types"
kubeapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/remotecommand"
watchtools "k8s.io/client-go/tools/watch"
Expand Down Expand Up @@ -518,7 +517,7 @@ func (s *session) checkPresence() error {

// launch waits until the session meets access requirements and then transitions the session
// to a running state.
func (s *session) launch(isEphemeralCont bool) (returnErr error) {
func (s *session) launch(ephemeralContainerStatus *corev1.ContainerStatus) (returnErr error) {
defer func() {
err := s.Close()
if err != nil {
Expand Down Expand Up @@ -640,44 +639,29 @@ func (s *session) launch(isEphemeralCont bool) (returnErr error) {
}

s.io.On()
if streamErr := executor.StreamWithContext(s.streamContext, options); streamErr != nil {
if !isEphemeralCont {
return trace.Wrap(streamErr)
}

// If attaching to the container failed, check if the container
// is terminated. If it is, try to stream the logs. If it's not
// terminated or can't be found return the original error.
clientSet, _, err := s.forwarder.impersonatedKubeClient(&s.sess.authContext, s.req.Header)
if err != nil {
return trace.Wrap(err)
}
podClient := clientSet.CoreV1().Pods(namespace)
// If the container is ephemeral and already terminated, we should
// retrieve the logs and return early.
if ephemeralContainerStatus != nil && ephemeralContainerStatus.State.Terminated != nil {
err := s.retrieveAlreadyStoppedPodLogs(
namespace,
podName,
container,
)
return trace.Wrap(err)
}

pod, err := podClient.Get(s.forwarder.ctx, podName, metav1.GetOptions{})
if err != nil {
return trace.Wrap(err)
}
status := getEphemeralContainerStatusByName(pod, container)
if status == nil {
// the container couldn't be found in the pod, return the
// original command streaming error
if streamErr := executor.StreamWithContext(s.streamContext, options); streamErr != nil {
// If the container isn't ephemeral, return the error.
if ephemeralContainerStatus == nil {
return trace.Wrap(streamErr)
}
if status.State.Terminated != nil {
if err := s.retrieveAlreadyStoppedPodLogs(
podClient,
namespace,
podName,
container,
); err != nil {
return trace.Wrap(err)
}

return nil
}

return trace.Wrap(streamErr)
fmt.Fprintf(s.io, "\r\nwarning: couldn't attach to pod/%s, falling back to streaming logs: %v\r\n", podName, streamErr)
err := s.retrieveAlreadyStoppedPodLogs(
namespace,
podName,
container,
)
return trace.Wrap(err)
}

return nil
Expand Down Expand Up @@ -1052,7 +1036,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
}

// createEphemeralContainer creates an ephemeral container and waits for it to start.
func (s *session) createEphemeralContainer() (bool, error) {
func (s *session) createEphemeralContainer() (*corev1.ContainerStatus, error) {
initUser := s.parties[s.initiator]
username := initUser.Ctx.Identity.GetIdentity().Username
namespace := s.params.ByName("podNamespace")
Expand All @@ -1070,9 +1054,9 @@ func (s *session) createEphemeralContainer() (bool, error) {
},
)
if trace.IsNotFound(err) {
return false, nil
return nil, nil
} else if err != nil {
return false, trace.Wrap(err)
return nil, trace.Wrap(err)
}

if err = s.forwarder.cfg.AuthClient.DeleteKubernetesWaitingContainer(
Expand All @@ -1085,16 +1069,12 @@ func (s *session) createEphemeralContainer() (bool, error) {
ContainerName: container,
},
); err != nil {
return false, trace.Wrap(err)
return nil, trace.Wrap(err)
}

s.log.Debugf("Creating ephemeral container %s on pod %s", container, podName)
err = s.patchAndWaitForPodEphemeralContainer(s.forwarder.ctx, &initUser.Ctx, s.req.Header, waitingCont)
if err != nil {
return false, trace.Wrap(err)
}

return true, nil
containerStatus, err := s.patchAndWaitForPodEphemeralContainer(s.forwarder.ctx, &initUser.Ctx, s.req.Header, waitingCont)
return containerStatus, trace.Wrap(err)
}

func (s *session) BroadcastMessage(format string, args ...any) {
Expand Down Expand Up @@ -1399,12 +1379,17 @@ func (s *session) getSessionMetadata() apievents.SessionMetadata {

// patchPodWithEphemeralContainer creates an ephemeral container and waits
// for it to start.
func (s *session) patchAndWaitForPodEphemeralContainer(ctx context.Context, authCtx *authContext, headers http.Header, waitingCont *kubewaitingcontainerpb.KubernetesWaitingContainer) error {
func (s *session) patchAndWaitForPodEphemeralContainer(
ctx context.Context,
authCtx *authContext,
headers http.Header,
waitingCont *kubewaitingcontainerpb.KubernetesWaitingContainer,
) (containerStatus *corev1.ContainerStatus, err error) {
fmt.Fprintf(s.io, "\r\nCreating ephemeral container %s in pod %s/%s\r\n", waitingCont.Spec.ContainerName, waitingCont.Spec.Namespace, waitingCont.Spec.PodName)

clientSet, _, err := s.forwarder.impersonatedKubeClient(authCtx, headers)
if err != nil {
return trace.Wrap(err)
return nil, trace.Wrap(err)
}
podClient := clientSet.CoreV1().Pods(authCtx.kubeResource.Namespace)
result, err := podClient.Patch(ctx,
Expand All @@ -1414,7 +1399,7 @@ func (s *session) patchAndWaitForPodEphemeralContainer(ctx context.Context, auth
metav1.PatchOptions{},
"ephemeralcontainers")
if err != nil {
return trace.Wrap(err)
return nil, trace.Wrap(err)
}

fmt.Fprintf(s.io, "Pod %s/%s successfully patched. Waiting for container to become ready.\r\n",
Expand Down Expand Up @@ -1452,28 +1437,39 @@ func (s *session) patchAndWaitForPodEphemeralContainer(ctx context.Context, auth
return false, nil
}
if s.State.Running != nil || s.State.Terminated != nil {
containerStatus = s
return true, nil
}
return false, nil
})
if err != nil {
return trace.Wrap(err)
return nil, trace.Wrap(err)
}

fmt.Fprintf(s.io, "Ephemeral container %s is ready.\r\n", waitingCont.Spec.ContainerName)

return nil
return containerStatus, nil
}

// retrieveAlreadyStoppedPodLogs retrieves the logs of a stopped pod and writes them to the session's io writer.
func (s *session) retrieveAlreadyStoppedPodLogs(podClient clientv1.PodInterface, namespace, podName, container string) error {
func (s *session) retrieveAlreadyStoppedPodLogs(namespace, podName, container string) error {
// If attaching to the container failed, check if the container
// is terminated. If it is, try to stream the logs. If it's not
// terminated or can't be found return the original error.
clientSet, _, err := s.forwarder.impersonatedKubeClient(&s.sess.authContext, s.req.Header)
if err != nil {
return trace.Wrap(err)
}
podClient := clientSet.CoreV1().Pods(namespace)

fmt.Fprintf(s.io, "Failed to attach to the container, attempting to stream logs instead...\r\n")
req := podClient.GetLogs(podName, &corev1.PodLogOptions{Container: container})
r, err := req.Stream(s.streamContext)
if err != nil {
return trace.Wrap(err)
}
if _, err := io.Copy(s.io, r); err != nil {
_ = r.Close()
return trace.Wrap(err)
}
return trace.Wrap(r.Close())
Expand Down

0 comments on commit 45e93c7

Please sign in to comment.