Skip to content

Commit

Permalink
Merge pull request #47 from drf/upgrade-operator
Browse files Browse the repository at this point in the history
cluster: Implement upgrade-operator
  • Loading branch information
bettio authored Dec 3, 2019
2 parents 41634dd + b10eff9 commit 07de008
Show file tree
Hide file tree
Showing 7 changed files with 472 additions and 50 deletions.
12 changes: 9 additions & 3 deletions cmd/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path/filepath"

"github.com/mitchellh/go-homedir"
apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -53,8 +54,9 @@ manage the entire lifecycle of an Astarte instance, including its installation a

// Set here all custom resources for Astarte
var (
kubernetesClient *kubernetes.Clientset
kubernetesDynamicClient dynamic.Interface
kubernetesClient *kubernetes.Clientset
kubernetesAPIExtensionsClient *apiextensions.Clientset
kubernetesDynamicClient dynamic.Interface

astarteGroupResource = schema.GroupResource{
Group: "api.astarte-platform.org",
Expand Down Expand Up @@ -107,11 +109,15 @@ func clusterPersistentPreRunE(cmd *cobra.Command, args []string) error {
return err
}

// create the clientset
// create the clientsets
kubernetesClient, err = kubernetes.NewForConfig(config)
if err != nil {
return err
}
kubernetesAPIExtensionsClient, err = apiextensions.NewForConfig(config)
if err != nil {
return err
}
kubernetesDynamicClient, err = dynamic.NewForConfig(config)
if err != nil {
return err
Expand Down
44 changes: 5 additions & 39 deletions cmd/cluster/install_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
)

var installCmd = &cobra.Command{
Expand All @@ -46,36 +44,6 @@ func init() {
ClusterCmd.AddCommand(installCmd)
}

func unmarshalYAML(res string, version string) runtime.Object {
content, err := getOperatorContent(res, version)
if err != nil {
fmt.Println("Error while parsing Kubernetes Resources. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}

decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode([]byte(content), nil, nil)
if err != nil {
fmt.Println("Error while parsing Kubernetes Resources. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}

return obj
}

func unmarshalOperatorContentYAMLToJSON(res string, version string) map[string]interface{} {
content, err := getOperatorContent(res, version)
jsonStruct, err := utils.UnmarshalYAMLToJSON([]byte(content))
if err != nil {
fmt.Println("Error while parsing Kubernetes Resources. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}
return jsonStruct
}

func clusterInstallF(command *cobra.Command, args []string) error {
_, err := getAstarteOperator()
if err == nil {
Expand Down Expand Up @@ -156,9 +124,8 @@ func clusterInstallF(command *cobra.Command, args []string) error {
fmt.Println("RBAC Roles Successfully installed.")
fmt.Println("Installing Astarte Custom Resource Definitions...")

astarteCRD := unmarshalOperatorContentYAMLToJSON("deploy/crds/api_v1alpha1_astarte_crd.yaml", version)
_, err = kubernetesDynamicClient.Resource(crdResource).Create(&unstructured.Unstructured{Object: astarteCRD},
metav1.CreateOptions{})
astarteCRD := unmarshalYAML("deploy/crds/api_v1alpha1_astarte_crd.yaml", version)
_, err = kubernetesAPIExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(astarteCRD.(*apiextensionsv1beta1.CustomResourceDefinition))
if err != nil {
if strings.Contains(err.Error(), "already exists") {
fmt.Println("WARNING: Astarte CRD already exists in the cluster.")
Expand All @@ -169,9 +136,8 @@ func clusterInstallF(command *cobra.Command, args []string) error {
}
}

astarteVoyagerIngressCRD := unmarshalOperatorContentYAMLToJSON("deploy/crds/api_v1alpha1_astarte_voyager_ingress_crd.yaml", version)
_, err = kubernetesDynamicClient.Resource(crdResource).Create(&unstructured.Unstructured{Object: astarteVoyagerIngressCRD},
metav1.CreateOptions{})
astarteVoyagerIngressCRD := unmarshalYAML("deploy/crds/api_v1alpha1_astarte_voyager_ingress_crd.yaml", version)
_, err = kubernetesAPIExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(astarteVoyagerIngressCRD.(*apiextensionsv1beta1.CustomResourceDefinition))
if err != nil {
if strings.Contains(err.Error(), "already exists") {
fmt.Println("WARNING: AstarteVoyagerIngress CRD already exists in the cluster.")
Expand Down
4 changes: 2 additions & 2 deletions cmd/cluster/uninstall_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ func clusterUninstallF(command *cobra.Command, args []string) error {
}

// Delete Astarte CRD
err = kubernetesDynamicClient.Resource(crdResource).Delete("astartes.api.astarte-platform.org",
err = kubernetesAPIExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Delete("astartes.api.astarte-platform.org",
&metav1.DeleteOptions{})
if err != nil {
fmt.Println("WARNING: Could not delete Astarte CRD.")
fmt.Println(err)
}

// Delete AstarteVoyagerIngress CRD
err = kubernetesDynamicClient.Resource(crdResource).Delete("astartevoyageringresses.api.astarte-platform.org",
err = kubernetesAPIExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Delete("astartevoyageringresses.api.astarte-platform.org",
&metav1.DeleteOptions{})
if err != nil {
fmt.Println("WARNING: Could not delete Astarte CRD.")
Expand Down
255 changes: 255 additions & 0 deletions cmd/cluster/upgrade_operator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright © 2019 Ispirata Srl
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"fmt"
"os"
"strings"

"github.com/Masterminds/semver/v3"
"github.com/astarte-platform/astartectl/utils"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch"
)

var upgradeOperatorCmd = &cobra.Command{
Use: "upgrade-operator",
Short: "Upgrade Astarte Operator in the current Kubernetes Cluster",
Long: `Upgrade Astarte Operator in the current Kubernetes Cluster. This will adhere to the same current-context
kubectl mentions. If no versions are specified, the last stable version is used as the upgrade target..`,
Example: ` astartectl cluster upgrade-operator`,
RunE: clusterUpgradeOperatorF,
}

func init() {
upgradeOperatorCmd.PersistentFlags().String("version", "", "Version of Astarte Operator to upgrade to. If not specified, last stable version will be installed (recommended)")
upgradeOperatorCmd.PersistentFlags().BoolP("non-interactive", "y", false, "Non-interactive mode. Will answer yes by default to all questions.")

ClusterCmd.AddCommand(upgradeOperatorCmd)
}

func clusterUpgradeOperatorF(command *cobra.Command, args []string) error {
currentAstarteOperator, err := getAstarteOperator()
if err != nil {
fmt.Println("Astarte Operator is not installed in your cluster. You probably want to use astartectl cluster install-operator.")
os.Exit(1)
}
currentAstarteOperatorVersion, err := semver.NewVersion(strings.Split(currentAstarteOperator.Spec.Template.Spec.Containers[0].Image, ":")[1])
if err != nil {
return err
}

version, err := command.Flags().GetString("version")
if err != nil {
return err
}
nonInteractive, err := command.Flags().GetBool("non-interactive")
if err != nil {
return err
}

if version == "" {
version, err = getLastOperatorRelease()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
upgradeVersion, err := semver.NewVersion(version)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

if !upgradeVersion.GreaterThan(currentAstarteOperatorVersion) {
fmt.Printf("You're currently running Astarte Operator version %s, no updates are available.\n", currentAstarteOperatorVersion)
return nil
}
if isUnstableVersion(currentAstarteOperatorVersion.Original()) {
baseVersion, err := getBaseVersionFromUnstable(currentAstarteOperatorVersion.Original())
if err != nil {
fmt.Println("Your cluster is currently running on snapshot - honestly, there isn't much I can do. If you're running a production cluster, I really hope you know what you're doing.")
fmt.Println("In case you didn't really mean to run on the most unstable thing you could run on, I strongly suggest running astartectl cluster uninstall-operator and astartectl cluster install-operator.")
os.Exit(1)
}
currentAstarteOperatorVersion, err = semver.NewVersion(baseVersion)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Your cluster is currently running on an unstable snapshot - which, by the way, is a bad idea. I'm happy to reconcile you to something more stable, and I'm assuming you're upgrading from %s.\n", currentAstarteOperatorVersion)
}
fmt.Printf("Will upgrade Astarte Operator to version %s.\n", version)

if !nonInteractive {
confirmation, err := utils.AskForConfirmation("Do you want to continue?")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if !confirmation {
return nil
}
}

// This section for now it's basically the same as we just need to upgrade all the resources. Moving forward, should the
// Operator change drammatically, we'll need proper cleanups+upgrades depending on the Operator version.

fmt.Println("Upgrading RBAC Roles...")

// Service Account
serviceAccount := unmarshalYAML("deploy/service_account.yaml", version)
_, err = kubernetesClient.CoreV1().ServiceAccounts("kube-system").Update(serviceAccount.(*corev1.ServiceAccount))
if err != nil {
if strings.Contains(err.Error(), "already exists") {
fmt.Println("WARNING: Service Account already exists in the cluster.")
} else {
fmt.Println("Error while deploying Service Account. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}
}

// Cluster Role
role := unmarshalYAML("deploy/role.yaml", version)
_, err = kubernetesClient.RbacV1().ClusterRoles().Update(role.(*rbacv1.ClusterRole))
if err != nil {
if strings.Contains(err.Error(), "already exists") {
fmt.Println("WARNING: Cluster Role already exists in the cluster.")
} else {
fmt.Println("Error while deploying Service Account. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}
}

// Cluster Role Binding
roleBinding := unmarshalYAML("deploy/role_binding.yaml", version)
_, err = kubernetesClient.RbacV1().ClusterRoleBindings().Update(roleBinding.(*rbacv1.ClusterRoleBinding))
if err != nil {
if strings.Contains(err.Error(), "already exists") {
fmt.Println("WARNING: Cluster Role Binding already exists in the cluster.")
} else {
fmt.Println("Error while deploying Service Account. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}
}

fmt.Println("RBAC Roles Successfully upgraded.")
fmt.Println("Upgrading Astarte Custom Resource Definitions...")

// This is where it gets tricky. For all supported CRDs, we need to either update or install them. When we update,
// we need to ensure that the resourceVersion is increased compared to the existing resource.

err = upgradeCRD("deploy/crds/api_v1alpha1_astarte_crd.yaml", version, currentAstarteOperatorVersion.Original())
if err != nil {
err = upgradeCRD("deploy/crds/api_v1alpha1_astarte_voyager_ingress_crd.yaml", version, currentAstarteOperatorVersion.Original())
}
if err != nil {
if strings.Contains(err.Error(), "already exists") {
fmt.Println("WARNING: AstarteVoyagerIngress CRD already exists in the cluster.")
} else {
fmt.Println("Error while deploying AstarteVoyagerIngress CRD. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}
}

fmt.Println("Astarte Custom Resource Definitions successfully upgraded.")
fmt.Println("Upgrading Astarte Operator...")

// Astarte Operator Deployment
astarteOperator := unmarshalYAML("deploy/operator.yaml", version)
astarteOperatorDeployment, err := kubernetesClient.AppsV1().Deployments("kube-system").Update(astarteOperator.(*appsv1.Deployment))
if err != nil {
fmt.Println("Error while deploying Astarte Operator Deployment. Your deployment might be incomplete.")
fmt.Println(err)
os.Exit(1)
}

fmt.Println("Astarte Operator successfully upgraded. Waiting until it is ready...")

var timeoutSeconds int64 = 60
watcher, err := kubernetesClient.AppsV1().Deployments("kube-system").Watch(metav1.ListOptions{TimeoutSeconds: &timeoutSeconds})
if err != nil {
fmt.Println("Could not watch the Deployment state. However, deployment might be complete. Check with astartectl cluster show in a while.")
fmt.Println(err)
os.Exit(1)
}
ch := watcher.ResultChan()
for {
event := <-ch
deployment, ok := event.Object.(*appsv1.Deployment)
if !ok {
break
}
if deployment.Name != astarteOperatorDeployment.GetObjectMeta().GetName() {
continue
}

if deployment.Status.ReadyReplicas >= 1 {
fmt.Println("Astarte Operator deployment ready! Check the state of your cluster with astartectl cluster show. Note that you might need to upgrade some of your Astarte instances depending on your Operator version.")
return nil
}
}

fmt.Println("Could not verify if Astarte Operator Deployment was successful. Please check the state of your cluster with astartectl cluster show.")
os.Exit(1)
return nil
}

func upgradeCRD(path, version, originalVersion string) error {
// TODO: Handle v1, when we start planning on supporting it.
crd := unmarshalYAML(path, version)
currentCRD, err := kubernetesDynamicClient.Resource(crdResource).Get(
crd.(*apiextensionsv1beta1.CustomResourceDefinition).Name, metav1.GetOptions{})
if err != nil || currentCRD == nil {
// It does not exist - go ahead and install it.
_, err = kubernetesAPIExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(
crd.(*apiextensionsv1beta1.CustomResourceDefinition))
if err != nil {
return err
}
} else {
// Move to a 3-way JSON Merge patch
originalCRD := unmarshalYAML(path, originalVersion)
crdJSON, err := runtimeObjectToJSON(crd)
currentCRDJSON, err := runtimeObjectToJSON(currentCRD)
originalCRDJSON, err := runtimeObjectToJSON(originalCRD)

preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(originalCRDJSON, crdJSON, currentCRDJSON,
preconditions...)

_, err = kubernetesAPIExtensionsClient.ApiextensionsV1beta1().CustomResourceDefinitions().Patch(
crd.(*apiextensionsv1beta1.CustomResourceDefinition).Name, types.MergePatchType, patch)
if err != nil {
return err
}
}

// All good.
return nil
}
Loading

0 comments on commit 07de008

Please sign in to comment.