diff --git a/azure-ipam/go.mod b/azure-ipam/go.mod index 94762d0c94..e697aa8ad5 100644 --- a/azure-ipam/go.mod +++ b/azure-ipam/go.mod @@ -3,7 +3,7 @@ module github.com/Azure/azure-container-networking/azure-ipam go 1.21 require ( - github.com/Azure/azure-container-networking v1.5.18 + github.com/Azure/azure-container-networking v1.5.19 github.com/containernetworking/cni v1.1.2 github.com/containernetworking/plugins v1.4.0 github.com/pkg/errors v0.9.1 diff --git a/azure-ipam/go.sum b/azure-ipam/go.sum index a3db83ef10..0167d9020c 100644 --- a/azure-ipam/go.sum +++ b/azure-ipam/go.sum @@ -2,8 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= code.cloudfoundry.org/clock v1.1.0 h1:XLzC6W3Ah/Y7ht1rmZ6+QfPdt1iGWEAAtIZXgiaj57c= code.cloudfoundry.org/clock v1.1.0/go.mod h1:yA3fxddT9RINQL2XHS7PS+OXxKCGhfrZmlNUCIM6AKo= -github.com/Azure/azure-container-networking v1.5.18 h1:PBFO1ON4yvgw1NKmzvpUuSAVTqzK1wtCmzdyZ3kE1ic= -github.com/Azure/azure-container-networking v1.5.18/go.mod h1:AJ5ZTZ0UBNBBIS3DAr883DrdHnCJRR3vBpPMDSCZ+nA= +github.com/Azure/azure-container-networking v1.5.19 h1:vdSUU0EjyUu5ePdJyiBT2bOUH24hFI6sB6eWocvfXr4= +github.com/Azure/azure-container-networking v1.5.19/go.mod h1:T0wq4BcGMX+S0ue3grruyej/C7GbhjN2B2ciHxqPNeg= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 h1:lGlwhPtrX6EVml1hO0ivjkUxsSyl4dsiw9qcA1k/3IQ= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1/go.mod h1:RKUqNu35KJYcVG/fqTRqmuXJZYNhYkBrnC/hX7yGbTA= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 h1:BMAjVKJM0U/CYF27gA0ZMmXGkOcvfFtD0oHVZ1TIPRI= diff --git a/go.mod b/go.mod index 51abe43000..52d4d339df 100644 --- a/go.mod +++ b/go.mod @@ -123,7 +123,7 @@ require ( gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/component-base v0.28.3 // indirect + k8s.io/component-base v0.28.5 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect @@ -132,6 +132,7 @@ require ( require ( golang.org/x/sync v0.6.0 gotest.tools/v3 v3.5.1 + k8s.io/kubectl v0.28.5 sigs.k8s.io/yaml v1.4.0 ) diff --git a/go.sum b/go.sum index 349d36e033..ecd557e43e 100644 --- a/go.sum +++ b/go.sum @@ -462,14 +462,16 @@ k8s.io/apimachinery v0.28.5 h1:EEj2q1qdTcv2p5wl88KavAn3VlFRjREgRu8Sm/EuMPY= k8s.io/apimachinery v0.28.5/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg= k8s.io/client-go v0.28.5 h1:6UNmc33vuJhh3+SAOEKku3QnKa+DtPKGnhO2MR0IEbk= k8s.io/client-go v0.28.5/go.mod h1:+pt086yx1i0HAlHzM9S+RZQDqdlzuXFl4hY01uhpcpA= -k8s.io/component-base v0.28.3 h1:rDy68eHKxq/80RiMb2Ld/tbH8uAE75JdCqJyi6lXMzI= -k8s.io/component-base v0.28.3/go.mod h1:fDJ6vpVNSk6cRo5wmDa6eKIG7UlIQkaFmZN2fYgIUD8= +k8s.io/component-base v0.28.5 h1:uFCW7USa8Fpme8dVtn2ZrdVaUPBRDwYJ+kNrV9OO1Cc= +k8s.io/component-base v0.28.5/go.mod h1:gw2d8O28okS9RrsPuJnD2mFl2It0HH9neHiGi2xoXcY= k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= +k8s.io/kubectl v0.28.5 h1:jq8xtiCCZPR8Cl/Qe1D7bLU0h8KtcunwfROqIekCUeU= +k8s.io/kubectl v0.28.5/go.mod h1:9WiwzqeKs3vLiDtEQPbjhqqysX+BIVMLt7C7gN+T5w8= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= diff --git a/test/e2e/framework/kubernetes/create-agnhost-statefulset.go b/test/e2e/framework/kubernetes/create-agnhost-statefulset.go new file mode 100644 index 0000000000..6be316c144 --- /dev/null +++ b/test/e2e/framework/kubernetes/create-agnhost-statefulset.go @@ -0,0 +1,162 @@ +package k8s + +import ( + "context" + "fmt" + "strconv" + "time" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +var ErrLabelMissingFromPod = fmt.Errorf("label missing from pod") + +const ( + AgnhostHTTPPort = 80 + AgnhostReplicas = 1 + + defaultTimeoutSeconds = 300 + defaultRetryDelay = 5 * time.Second + defaultRetryAttempts = 60 + defaultHTTPClientTimeout = 2 * time.Second +) + +type CreateAgnhostStatefulSet struct { + AgnhostName string + AgnhostNamespace string + KubeConfigFilePath string +} + +func (c *CreateAgnhostStatefulSet) Run() error { + config, err := clientcmd.BuildConfigFromFlags("", c.KubeConfigFilePath) + if err != nil { + return fmt.Errorf("error building kubeconfig: %w", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating Kubernetes client: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + defer cancel() + + agnhostStatefulest := c.getAgnhostDeployment() + + err = CreateResource(ctx, agnhostStatefulest, clientset) + if err != nil { + return fmt.Errorf("error agnhost component: %w", err) + } + + selector, exists := agnhostStatefulest.Spec.Selector.MatchLabels["app"] + if !exists { + return fmt.Errorf("missing label \"app=%s\" from agnhost statefulset: %w", c.AgnhostName, ErrLabelMissingFromPod) + } + + labelSelector := fmt.Sprintf("app=%s", selector) + err = WaitForPodReady(ctx, clientset, c.AgnhostNamespace, labelSelector) + if err != nil { + return fmt.Errorf("error waiting for agnhost pod to be ready: %w", err) + } + + return nil +} + +func (c *CreateAgnhostStatefulSet) Prevalidate() error { + return nil +} + +func (c *CreateAgnhostStatefulSet) Postvalidate() error { + return nil +} + +func (c *CreateAgnhostStatefulSet) getAgnhostDeployment() *appsv1.StatefulSet { + reps := int32(AgnhostReplicas) + + return &appsv1.StatefulSet{ + TypeMeta: metaV1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: c.AgnhostName, + Namespace: c.AgnhostNamespace, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &reps, + Selector: &metaV1.LabelSelector{ + MatchLabels: map[string]string{ + "app": c.AgnhostName, + "k8s-app": "agnhost", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metaV1.ObjectMeta{ + Labels: map[string]string{ + "app": c.AgnhostName, + "k8s-app": "agnhost", + }, + Annotations: map[string]string{ + "policy.cilium.io/proxy-visibility": "", + }, + }, + + Spec: v1.PodSpec{ + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + // prefer an even spread across the cluster to avoid scheduling on the same node + PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ + { + Weight: MaxAffinityWeight, + PodAffinityTerm: v1.PodAffinityTerm{ + TopologyKey: "kubernetes.io/hostname", + LabelSelector: &metaV1.LabelSelector{ + MatchLabels: map[string]string{ + "k8s-app": "agnhost", + }, + }, + }, + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: c.AgnhostName, + Image: "acnpublic.azurecr.io/agnhost:2.40", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "memory": resource.MustParse("20Mi"), + }, + Limits: v1.ResourceList{ + "memory": resource.MustParse("20Mi"), + }, + }, + Command: []string{ + "/agnhost", + }, + Args: []string{ + "serve-hostname", + "--http", + "--port", + strconv.Itoa(AgnhostHTTPPort), + }, + + Ports: []v1.ContainerPort{ + { + ContainerPort: AgnhostHTTPPort, + }, + }, + Env: []v1.EnvVar{}, + }, + }, + }, + }, + }, + } +} diff --git a/test/e2e/framework/kubernetes/create-kapinger-deployment.go b/test/e2e/framework/kubernetes/create-kapinger-deployment.go new file mode 100644 index 0000000000..b65732d2ee --- /dev/null +++ b/test/e2e/framework/kubernetes/create-kapinger-deployment.go @@ -0,0 +1,256 @@ +package k8s + +import ( + "context" + "fmt" + "strconv" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/resource" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + KapingerHTTPPort = 8080 + KapingerTCPPort = 8085 + KapingerUDPPort = 8086 + MaxAffinityWeight = 100 +) + +type CreateKapingerDeployment struct { + KapingerNamespace string + KapingerReplicas string + KubeConfigFilePath string +} + +func (c *CreateKapingerDeployment) Run() error { + _, err := strconv.Atoi(c.KapingerReplicas) + if err != nil { + return fmt.Errorf("error converting replicas to int for Kapinger replicas: %w", err) + } + + config, err := clientcmd.BuildConfigFromFlags("", c.KubeConfigFilePath) + if err != nil { + return fmt.Errorf("error building kubeconfig: %w", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating Kubernetes client: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resources := []runtime.Object{ + c.GetKapingerService(), + c.GetKapingerServiceAccount(), + c.GetKapingerClusterRole(), + c.GetKapingerClusterRoleBinding(), + c.GetKapingerDeployment(), + } + + for i := range resources { + err = CreateResource(ctx, resources[i], clientset) + if err != nil { + return fmt.Errorf("error kapinger component: %w", err) + } + } + + return nil +} + +func (c *CreateKapingerDeployment) Prevalidate() error { + return nil +} + +func (c *CreateKapingerDeployment) Postvalidate() error { + return nil +} + +func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { + replicas, err := strconv.ParseInt(c.KapingerReplicas, 10, 32) + if err != nil { + fmt.Println("Error converting replicas to int for Kapinger replicas: ", err) + return nil + } + reps := int32(replicas) + + return &appsv1.Deployment{ + TypeMeta: metaV1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "kapinger", + Namespace: c.KapingerNamespace, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &reps, + Selector: &metaV1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kapinger", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metaV1.ObjectMeta{ + Labels: map[string]string{ + "app": "kapinger", + "server": "good", + }, + }, + + Spec: v1.PodSpec{ + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + // prefer an even spread across the cluster to avoid scheduling on the same node + PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ + { + Weight: MaxAffinityWeight, + PodAffinityTerm: v1.PodAffinityTerm{ + TopologyKey: "kubernetes.io/hostname", + LabelSelector: &metaV1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kapinger", + }, + }, + }, + }, + }, + }, + }, + ServiceAccountName: "kapinger-sa", + Containers: []v1.Container{ + { + Name: "kapinger", + Image: "acnpublic.azurecr.io/kapinger:be57650", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "memory": resource.MustParse("20Mi"), + }, + Limits: v1.ResourceList{ + "memory": resource.MustParse("20Mi"), + }, + }, + Ports: []v1.ContainerPort{ + { + ContainerPort: KapingerHTTPPort, + }, + }, + Env: []v1.EnvVar{ + { + Name: "TARGET_TYPE", + Value: "service", + }, + { + Name: "HTTP_PORT", + Value: strconv.Itoa(KapingerHTTPPort), + }, + { + Name: "TCP_PORT", + Value: strconv.Itoa(KapingerTCPPort), + }, + { + Name: "UDP_PORT", + Value: strconv.Itoa(KapingerUDPPort), + }, + }, + }, + }, + }, + }, + }, + } +} + +func (c *CreateKapingerDeployment) GetKapingerService() *v1.Service { + return &v1.Service{ + TypeMeta: metaV1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "kapinger-service", + Namespace: c.KapingerNamespace, + Labels: map[string]string{ + "app": "kapinger", + }, + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "app": "kapinger", + }, + Ports: []v1.ServicePort{ + { + Port: KapingerHTTPPort, + Protocol: v1.ProtocolTCP, + TargetPort: intstr.FromInt(KapingerHTTPPort), + }, + }, + }, + } +} + +func (c *CreateKapingerDeployment) GetKapingerServiceAccount() *v1.ServiceAccount { + return &v1.ServiceAccount{ + TypeMeta: metaV1.TypeMeta{ + Kind: "ServiceAccount", + APIVersion: "v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "kapinger-sa", + Namespace: c.KapingerNamespace, + }, + } +} + +func (c *CreateKapingerDeployment) GetKapingerClusterRole() *rbacv1.ClusterRole { + return &rbacv1.ClusterRole{ + TypeMeta: metaV1.TypeMeta{ + Kind: "ClusterRole", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "kapinger-role", + Namespace: c.KapingerNamespace, + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"services", "pods"}, + Verbs: []string{"get", "list"}, + }, + }, + } +} + +func (c *CreateKapingerDeployment) GetKapingerClusterRoleBinding() *rbacv1.ClusterRoleBinding { + return &rbacv1.ClusterRoleBinding{ + TypeMeta: metaV1.TypeMeta{ + Kind: "ClusterRoleBinding", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metaV1.ObjectMeta{ + Name: "kapinger-rolebinding", + Namespace: c.KapingerNamespace, + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: "kapinger-sa", + Namespace: c.KapingerNamespace, + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: "kapinger-role", + }, + } +} diff --git a/test/e2e/framework/kubernetes/create-network-policy.go b/test/e2e/framework/kubernetes/create-network-policy.go new file mode 100644 index 0000000000..fa1171a257 --- /dev/null +++ b/test/e2e/framework/kubernetes/create-network-policy.go @@ -0,0 +1,114 @@ +package k8s + +import ( + "context" + "fmt" + "strings" + + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + Egress = "egress" + Ingress = "ingress" +) + +type CreateDenyAllNetworkPolicy struct { + NetworkPolicyNamespace string + KubeConfigFilePath string + DenyAllLabelSelector string +} + +func (c *CreateDenyAllNetworkPolicy) Run() error { + config, err := clientcmd.BuildConfigFromFlags("", c.KubeConfigFilePath) + if err != nil { + return fmt.Errorf("error building kubeconfig: %w", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating Kubernetes client: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + agnhostStatefulest := getNetworkPolicy(c.NetworkPolicyNamespace, c.DenyAllLabelSelector) + err = CreateResource(ctx, agnhostStatefulest, clientset) + if err != nil { + return fmt.Errorf("error creating simple deny-all network policy: %w", err) + } + + return nil +} + +func getNetworkPolicy(namespace, labelSelector string) *networkingv1.NetworkPolicy { + labelSelectorSlice := strings.Split(labelSelector, "=") + return &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "deny-all", + Namespace: namespace, + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + labelSelectorSlice[0]: labelSelectorSlice[1], + }, + }, + PolicyTypes: []networkingv1.PolicyType{ + networkingv1.PolicyTypeIngress, + networkingv1.PolicyTypeEgress, + }, + Egress: []networkingv1.NetworkPolicyEgressRule{}, + Ingress: []networkingv1.NetworkPolicyIngressRule{}, + }, + } +} + +func (c *CreateDenyAllNetworkPolicy) Prevalidate() error { + return nil +} + +func (c *CreateDenyAllNetworkPolicy) Postvalidate() error { + return nil +} + +type DeleteDenyAllNetworkPolicy struct { + NetworkPolicyNamespace string + KubeConfigFilePath string + DenyAllLabelSelector string +} + +func (d *DeleteDenyAllNetworkPolicy) Run() error { + config, err := clientcmd.BuildConfigFromFlags("", d.KubeConfigFilePath) + if err != nil { + return fmt.Errorf("error building kubeconfig: %w", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating Kubernetes client: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + agnhostStatefulest := getNetworkPolicy(d.NetworkPolicyNamespace, d.DenyAllLabelSelector) + err = DeleteResource(ctx, agnhostStatefulest, clientset) + if err != nil { + return fmt.Errorf("error creating simple deny-all network policy: %w", err) + } + + return nil +} + +func (d *DeleteDenyAllNetworkPolicy) Prevalidate() error { + return nil +} + +func (d *DeleteDenyAllNetworkPolicy) Postvalidate() error { + return nil +} diff --git a/test/e2e/framework/kubernetes/create-resource.go b/test/e2e/framework/kubernetes/create-resource.go new file mode 100644 index 0000000000..8e7eafda46 --- /dev/null +++ b/test/e2e/framework/kubernetes/create-resource.go @@ -0,0 +1,209 @@ +package k8s + +import ( + "context" + "fmt" + "log" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" +) + +var ( + ErrUnknownResourceType = fmt.Errorf("unknown resource type") + ErrCreateNilResource = fmt.Errorf("cannot create nil resource") +) + +func CreateResource(ctx context.Context, obj runtime.Object, clientset *kubernetes.Clientset) error { //nolint:gocyclo //this is just boilerplate code + if obj == nil { + return ErrCreateNilResource + } + + switch o := obj.(type) { + case *appsv1.DaemonSet: + log.Printf("Creating/Updating DaemonSet \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.AppsV1().DaemonSets(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create DaemonSet \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update DaemonSet \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *appsv1.Deployment: + log.Printf("Creating/Updating Deployment \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.AppsV1().Deployments(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create Deployment \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update Deployment \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *appsv1.StatefulSet: + log.Printf("Creating/Updating StatefulSet \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.AppsV1().StatefulSets(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create StatefulSet \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update StatefulSet \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *v1.Service: + log.Printf("Creating/Updating Service \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.CoreV1().Services(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create Service \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update Service \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *v1.ServiceAccount: + log.Printf("Creating/Updating ServiceAccount \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.CoreV1().ServiceAccounts(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create ServiceAccount \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update ServiceAccount \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *rbacv1.Role: + log.Printf("Creating/Updating Role \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.RbacV1().Roles(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create Role \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update Role \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *rbacv1.RoleBinding: + log.Printf("Creating/Updating RoleBinding \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.RbacV1().RoleBindings(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create RoleBinding \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update RoleBinding \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *rbacv1.ClusterRole: + log.Printf("Creating/Updating ClusterRole \"%s\"...\n", o.Name) + client := clientset.RbacV1().ClusterRoles() + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create ClusterRole \"%s\": %w", o.Name, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update ClusterRole \"%s\": %w", o.Name, err) + } + + case *rbacv1.ClusterRoleBinding: + log.Printf("Creating/Updating ClusterRoleBinding \"%s\"...\n", o.Name) + client := clientset.RbacV1().ClusterRoleBindings() + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create ClusterRoleBinding \"%s\": %w", o.Name, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update ClusterRoleBinding \"%s\": %w", o.Name, err) + } + + case *v1.ConfigMap: + log.Printf("Creating/Updating ConfigMap \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.CoreV1().ConfigMaps(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create ConfigMap \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update ConfigMap \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *networkingv1.NetworkPolicy: + log.Printf("Creating/Updating NetworkPolicy \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.NetworkingV1().NetworkPolicies(o.Namespace) + _, err := client.Get(ctx, o.Name, metaV1.GetOptions{}) + if errors.IsNotFound(err) { + _, err = client.Create(ctx, o, metaV1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create NetworkPolicy \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + return nil + } + _, err = client.Update(ctx, o, metaV1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to create/update NetworkPolicy \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + default: + return fmt.Errorf("unknown object type: %T, err: %w", obj, ErrUnknownResourceType) + } + return nil +} diff --git a/test/e2e/framework/kubernetes/delete-resource.go b/test/e2e/framework/kubernetes/delete-resource.go new file mode 100644 index 0000000000..d0b6a0adc5 --- /dev/null +++ b/test/e2e/framework/kubernetes/delete-resource.go @@ -0,0 +1,162 @@ +package k8s + +import ( + "context" + "fmt" + "log" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" +) + +var ErrDeleteNilResource = fmt.Errorf("cannot create nil resource") + +func DeleteResource(ctx context.Context, obj runtime.Object, clientset *kubernetes.Clientset) error { //nolint:gocyclo //this is just boilerplate code + if obj == nil { + return ErrCreateNilResource + } + + switch o := obj.(type) { + case *appsv1.DaemonSet: + log.Printf("Deleting DaemonSet \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.AppsV1().DaemonSets(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("DaemonSet \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete DaemonSet \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *appsv1.Deployment: + log.Printf("Creating/Updating Deployment \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.AppsV1().Deployments(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("Deployment \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete Deployment \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *appsv1.StatefulSet: + log.Printf("Creating/Updating StatefulSet \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.AppsV1().StatefulSets(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("StatefulSet \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete StatefulSet \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *v1.Service: + log.Printf("Creating/Updating Service \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.CoreV1().Services(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("Service \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete Service \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *v1.ServiceAccount: + log.Printf("Creating/Updating ServiceAccount \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.CoreV1().ServiceAccounts(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("ServiceAccount \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete ServiceAccount \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *rbacv1.Role: + log.Printf("Creating/Updating Role \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.RbacV1().Roles(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("Role \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete Role \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *rbacv1.RoleBinding: + log.Printf("Creating/Updating RoleBinding \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.RbacV1().RoleBindings(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("RoleBinding \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete RoleBinding \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *rbacv1.ClusterRole: + log.Printf("Creating/Updating ClusterRole \"%s\"...\n", o.Name) + client := clientset.RbacV1().ClusterRoles() + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("ClusterRole \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete ClusterRole \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *rbacv1.ClusterRoleBinding: + log.Printf("Creating/Updating ClusterRoleBinding \"%s\"...\n", o.Name) + client := clientset.RbacV1().ClusterRoleBindings() + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("ClusterRoleBinding \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete ClusterRoleBinding \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *v1.ConfigMap: + log.Printf("Creating/Updating ConfigMap \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.CoreV1().ConfigMaps(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("ConfigMap \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete ConfigMap \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + case *networkingv1.NetworkPolicy: + log.Printf("Creating/Updating NetworkPolicy \"%s\" in namespace \"%s\"...\n", o.Name, o.Namespace) + client := clientset.NetworkingV1().NetworkPolicies(o.Namespace) + err := client.Delete(ctx, o.Name, metaV1.DeleteOptions{}) + if err != nil { + if errors.IsNotFound(err) { + log.Printf("NetworkPolicy \"%s\" in namespace \"%s\" does not exist\n", o.Name, o.Namespace) + return nil + } + return fmt.Errorf("failed to delete NetworkPolicy \"%s\" in namespace \"%s\": %w", o.Name, o.Namespace, err) + } + + default: + return fmt.Errorf("unknown object type: %T, err: %w", obj, ErrUnknownResourceType) + } + return nil +} diff --git a/test/e2e/framework/kubernetes/exec-pod.go b/test/e2e/framework/kubernetes/exec-pod.go new file mode 100644 index 0000000000..aaf71c4945 --- /dev/null +++ b/test/e2e/framework/kubernetes/exec-pod.go @@ -0,0 +1,89 @@ +package k8s + +import ( + "context" + "fmt" + "log" + "os" + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" +) + +const ExecSubResources = "exec" + +type ExecInPod struct { + PodNamespace string + KubeConfigFilePath string + PodName string + Command string +} + +func (e *ExecInPod) Run() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := ExecPod(ctx, e.KubeConfigFilePath, e.PodNamespace, e.PodName, e.Command) + if err != nil { + return fmt.Errorf("error executing command [%s]: %w", e.Command, err) + } + + return nil +} + +func (e *ExecInPod) Prevalidate() error { + return nil +} + +func (e *ExecInPod) Postvalidate() error { + return nil +} + +func ExecPod(ctx context.Context, kubeConfigFilePath, namespace, podName, command string) error { + config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFilePath) + if err != nil { + return fmt.Errorf("error building kubeconfig: %w", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating Kubernetes client: %w", err) + } + + req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName). + Namespace(namespace).SubResource(ExecSubResources) + option := &v1.PodExecOptions{ + Command: strings.Fields(command), + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + } + + req.VersionedParams( + option, + scheme.ParameterCodec, + ) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return fmt.Errorf("error creating executor: %w", err) + } + + log.Printf("executing command \"%s\" on pod \"%s\" in namespace \"%s\"...", command, podName, namespace) + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: os.Stdin, + Stdout: os.Stdout, + Stderr: os.Stderr, + }) + + if err != nil { + return fmt.Errorf("error executing command: %w", err) + } + + return nil +} diff --git a/test/e2e/framework/kubernetes/port-forward.txt b/test/e2e/framework/kubernetes/port-forward.txt new file mode 100644 index 0000000000..7dcabe496d --- /dev/null +++ b/test/e2e/framework/kubernetes/port-forward.txt @@ -0,0 +1,163 @@ +// todo: matmerr, this is just going to remain broken until it can be validated with scenarios pr + +package k8s + +import ( + "context" + "fmt" + "log" + "net/http" + "strconv" + "time" + + k8s "github.com/Azure/azure-container-networking/test/integration" + "github.com/Azure/azure-container-networking/test/internal/retry" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + defaultTimeoutSeconds = 300 + defaultRetryDelay = 5 * time.Second + defaultRetryAttempts = 60 + defaultHTTPClientTimeout = 2 * time.Second +) + +var ( + ErrNoPodWithLabelFound = fmt.Errorf("no pod with label found with matching pod affinity") + + defaultRetrier = retry.Retrier{Attempts: defaultRetryAttempts, Delay: defaultRetryDelay} +) + +type PortForward struct { + Namespace string + LabelSelector string + LocalPort string + RemotePort string + KubeConfigFilePath string + OptionalLabelAffinity string + + // local properties + pf *k8s.PortForwarder + portForwardHandle k8s.PortForwardStreamHandle +} + +func (p *PortForward) Run() error { + lport, _ := strconv.Atoi(p.LocalPort) + rport, _ := strconv.Atoi(p.RemotePort) + + pctx := context.Background() + portForwardCtx, cancel := context.WithTimeout(pctx, defaultTimeoutSeconds*time.Second) + defer cancel() + + config, err := clientcmd.BuildConfigFromFlags("", p.KubeConfigFilePath) + if err != nil { + return fmt.Errorf("error building kubeconfig: %w", err) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("could not create clientset: %w", err) + } + + p.pf, err = k8s.NewPortForwarder(config) + if err != nil { + return fmt.Errorf("could not create port forwarder: %w", err) + } + + // if we have an optional label affinity, find a pod with that label, on the same node as a pod with the label selector + targetPodName := "" + if p.OptionalLabelAffinity != "" { + // get all pods with label + targetPodName, err = p.findPodsWithAffinity(pctx, clientset) + if err != nil { + return fmt.Errorf("could not find pod with affinity: %w", err) + } + } + + portForwardFn := func() error { + log.Printf("attempting port forward to a pod with label \"%s\", in namespace \"%s\"...\n", p.LabelSelector, p.Namespace) + + var handle k8s.PortForwardStreamHandle + + // if we have a pod name (likely from affinity above), use it, otherwise use label selector + if targetPodName != "" { + handle, err = p.pf.ForwardWithPodName(pctx, p.Namespace, targetPodName, lport, rport) + if err != nil { + return fmt.Errorf("could not start port forward: %w", err) + } + } else { + handle, err = p.pf.ForwardWithLabelSelector(pctx, p.Namespace, p.LabelSelector, lport, rport) + if err != nil { + return fmt.Errorf("could not start port forward: %w", err) + } + } + + // verify port forward succeeded + client := http.Client{ + Timeout: defaultHTTPClientTimeout, + } + resp, err := client.Get(handle.URL()) //nolint + if err != nil { + log.Printf("port forward validation HTTP request to %s failed: %v\n", handle.URL(), err) + handle.Stop() + return fmt.Errorf("port forward validation HTTP request to %s failed: %w", handle.URL(), err) + } + defer resp.Body.Close() + + log.Printf("port forward validation HTTP request to \"%s\" succeeded, response: %s\n", handle.URL(), resp.Status) + p.portForwardHandle = handle + + return nil + } + + if err = defaultRetrier.Do(portForwardCtx, portForwardFn); err != nil { + return fmt.Errorf("could not start port forward within %ds: %w", defaultTimeoutSeconds, err) + } + log.Printf("successfully port forwarded to \"%s\"\n", p.portForwardHandle.URL()) + return nil +} + +func (p *PortForward) findPodsWithAffinity(ctx context.Context, clientset *kubernetes.Clientset) (string, error) { + targetPods, errAffinity := clientset.CoreV1().Pods(p.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: p.LabelSelector, + FieldSelector: "status.phase=Running", + }) + if errAffinity != nil { + return "", fmt.Errorf("could not list pods in %q with label %q: %w", p.Namespace, p.LabelSelector, errAffinity) + } + + affinityPods, errAffinity := clientset.CoreV1().Pods(p.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: p.OptionalLabelAffinity, + FieldSelector: "status.phase=Running", + }) + if errAffinity != nil { + return "", fmt.Errorf("could not list affinity pods in %q with label %q: %w", p.Namespace, p.OptionalLabelAffinity, errAffinity) + } + + // keep track of where the affinity pods are scheduled + affinityNodes := make(map[string]bool) + for i := range affinityPods.Items { + affinityNodes[affinityPods.Items[i].Spec.NodeName] = true + } + + // if a pod is found on the same node as an affinity pod, use it + for i := range targetPods.Items { + if affinityNodes[targetPods.Items[i].Spec.NodeName] { + // found a pod with the specified label, on a node with the optional label affinity + return targetPods.Items[i].Name, nil + } + } + + return "", fmt.Errorf("could not find a pod with label \"%s\", on a node that also has a pod with label \"%s\": %w", p.LabelSelector, p.OptionalLabelAffinity, ErrNoPodWithLabelFound) +} + +func (p *PortForward) Prevalidate() error { + return nil +} + +func (p *PortForward) Postvalidate() error { + p.portForwardHandle.Stop() + return nil +} diff --git a/test/e2e/framework/kubernetes/wait-pod-ready.go b/test/e2e/framework/kubernetes/wait-pod-ready.go new file mode 100644 index 0000000000..41a9f1d401 --- /dev/null +++ b/test/e2e/framework/kubernetes/wait-pod-ready.go @@ -0,0 +1,64 @@ +package k8s + +import ( + "context" + "fmt" + "log" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" +) + +const ( + RetryTimeoutPodsReady = 5 * time.Minute + RetryIntervalPodsReady = 5 * time.Second +) + +func WaitForPodReady(ctx context.Context, clientset *kubernetes.Clientset, namespace, labelSelector string) error { + podReadyMap := make(map[string]bool) + + conditionFunc := wait.ConditionWithContextFunc(func(context.Context) (bool, error) { + // get a list of all cilium pods + var podList *corev1.PodList + podList, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return false, fmt.Errorf("error listing Pods: %w", err) + } + + if len(podList.Items) == 0 { + log.Printf("no pods found in namespace \"%s\" with label \"%s\"", namespace, labelSelector) + return false, nil + } + + // check each indviidual pod to see if it's in Running state + for i := range podList.Items { + var pod *corev1.Pod + pod, err = clientset.CoreV1().Pods(namespace).Get(ctx, podList.Items[i].Name, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("error getting Pod: %w", err) + } + + // Check the Pod phase + if pod.Status.Phase != corev1.PodRunning { + log.Printf("pod \"%s\" is not in Running state yet. Waiting...\n", pod.Name) + return false, nil + } + if !podReadyMap[pod.Name] { + log.Printf("pod \"%s\" is in Running state\n", pod.Name) + podReadyMap[pod.Name] = true + } + } + log.Printf("all pods in namespace \"%s\" with label \"%s\" are in Running state\n", namespace, labelSelector) + return true, nil + }) + + // wait until all cilium pods are in Running state condition is true + err := wait.PollUntilContextCancel(ctx, RetryIntervalPodsReady, true, conditionFunc) + if err != nil { + return fmt.Errorf("error waiting for pods in namespace \"%s\" with label \"%s\" to be in Running state: %w", namespace, labelSelector, err) + } + return nil +} diff --git a/test/integration/portforward.go b/test/integration/portforward.go index 605bd70602..5170b91dc7 100644 --- a/test/integration/portforward.go +++ b/test/integration/portforward.go @@ -81,7 +81,7 @@ func (p *PortForwarder) Forward(ctx context.Context) error { return fmt.Errorf("no pods found in %q with label %q", p.opts.Namespace, p.opts.LabelSelector) //nolint:goerr113 //no specific handling expected } - randomIndex := rand.Intn(len(pods.Items)) + randomIndex := rand.Intn(len(pods.Items)) //nolint:gosec //this is going to be revised in the future anyways, avoid random pods podName := pods.Items[randomIndex].Name portForwardURL := p.clientset.CoreV1().RESTClient().Post(). Resource("pods").