Skip to content

Commit

Permalink
plugin framework async external client
Browse files Browse the repository at this point in the history
Signed-off-by: Erhan Cagirici <erhan@upbound.io>
  • Loading branch information
erhancagirici committed Jan 5, 2024
1 parent 15b4781 commit 9fdf5ee
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 2 deletions.
201 changes: 201 additions & 0 deletions pkg/controller/external_async_terraform_plugin_framework.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// SPDX-FileCopyrightText: 2023 The Crossplane Authors <https://crossplane.io>
//
// SPDX-License-Identifier: Apache-2.0

package controller

import (
"context"

"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/hashicorp/terraform-plugin-framework/provider"
"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/crossplane/upjet/pkg/config"
"github.com/crossplane/upjet/pkg/controller/handler"
"github.com/crossplane/upjet/pkg/metrics"
"github.com/crossplane/upjet/pkg/resource"
"github.com/crossplane/upjet/pkg/terraform"
tferrors "github.com/crossplane/upjet/pkg/terraform/errors"
)

type TerraformPluginFrameworkAsyncConnector struct {
*TerraformPluginFrameworkConnector
callback CallbackProvider
eventHandler *handler.EventHandler
}

type TerraformPluginFrameworkAsyncOption func(connector *TerraformPluginFrameworkAsyncConnector)

func NewTerraformPluginFrameworkAsyncConnector(kube client.Client,
ots *OperationTrackerStore,
sf terraform.SetupFn,
cfg *config.Resource,
provider *provider.Provider,
opts ...TerraformPluginFrameworkAsyncOption) *TerraformPluginFrameworkAsyncConnector {
nfac := &TerraformPluginFrameworkAsyncConnector{
TerraformPluginFrameworkConnector: NewTerraformPluginFrameworkConnector(kube, sf, cfg, ots, provider),
}
for _, f := range opts {
f(nfac)
}
return nfac
}

func (c *TerraformPluginFrameworkAsyncConnector) Connect(ctx context.Context, mg xpresource.Managed) (managed.ExternalClient, error) {
ec, err := c.TerraformPluginFrameworkConnector.Connect(ctx, mg)
if err != nil {
return nil, errors.Wrap(err, "cannot initialize the no-fork async external client")
}

return &terraformPluginFrameworkAsyncExternalClient{
terraformPluginFrameworkExternalClient: ec.(*terraformPluginFrameworkExternalClient),
callback: c.callback,
eventHandler: c.eventHandler,
}, nil
}

// WithTerraformPluginFrameworkAsyncConnectorEventHandler configures the EventHandler so that
// the no-fork external clients can requeue reconciliation requests.
func WithTerraformPluginFrameworkAsyncConnectorEventHandler(e *handler.EventHandler) TerraformPluginFrameworkAsyncOption {
return func(c *TerraformPluginFrameworkAsyncConnector) {
c.eventHandler = e
}
}

// WithTerraformPluginFrameworkAsyncCallbackProvider configures the controller to use async variant of the functions
// of the Terraform client and run given callbacks once those operations are
// completed.
func WithTerraformPluginFrameworkAsyncCallbackProvider(ac CallbackProvider) TerraformPluginFrameworkAsyncOption {
return func(c *TerraformPluginFrameworkAsyncConnector) {
c.callback = ac
}
}

// WithTerraformPluginFrameworkAsyncLogger configures a logger for the TerraformPluginFrameworkAsyncConnector.
func WithTerraformPluginFrameworkAsyncLogger(l logging.Logger) TerraformPluginFrameworkAsyncOption {
return func(c *TerraformPluginFrameworkAsyncConnector) {
c.logger = l
}
}

// WithTerraformPluginFrameworkAsyncMetricRecorder configures a metrics.MetricRecorder for the
// TerraformPluginFrameworkAsyncConnector.
func WithTerraformPluginFrameworkAsyncMetricRecorder(r *metrics.MetricRecorder) TerraformPluginFrameworkAsyncOption {
return func(c *TerraformPluginFrameworkAsyncConnector) {
c.metricRecorder = r
}
}

// WithTerraformPluginFrameworkAsyncManagementPolicies configures whether the client should
// handle management policies.
func WithTerraformPluginFrameworkAsyncManagementPolicies(isManagementPoliciesEnabled bool) TerraformPluginFrameworkAsyncOption {
return func(c *TerraformPluginFrameworkAsyncConnector) {
c.isManagementPoliciesEnabled = isManagementPoliciesEnabled
}
}

type terraformPluginFrameworkAsyncExternalClient struct {
*terraformPluginFrameworkExternalClient
callback CallbackProvider
eventHandler *handler.EventHandler
}

func (n *terraformPluginFrameworkAsyncExternalClient) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) {
if n.opTracker.LastOperation.IsRunning() {
n.logger.WithValues("opType", n.opTracker.LastOperation.Type).Debug("ongoing async operation")
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
}, nil
}
n.opTracker.LastOperation.Flush()

o, err := n.terraformPluginFrameworkExternalClient.Observe(ctx, mg)
// clear any previously reported LastAsyncOperation error condition here,
// because there are no pending updates on the existing resource and it's
// not scheduled to be deleted.
if err == nil && o.ResourceExists && o.ResourceUpToDate && !meta.WasDeleted(mg) {
mg.(resource.Terraformed).SetConditions(resource.LastAsyncOperationCondition(nil))
}
return o, err
}

func (n *terraformPluginFrameworkAsyncExternalClient) Create(_ context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
if !n.opTracker.LastOperation.MarkStart("create") {
return managed.ExternalCreation{}, errors.Errorf("%s operation that started at %s is still running", n.opTracker.LastOperation.Type, n.opTracker.LastOperation.StartTime().String())
}

ctx, cancel := context.WithDeadline(context.Background(), n.opTracker.LastOperation.StartTime().Add(defaultAsyncTimeout))
go func() {
defer cancel()

n.opTracker.logger.Debug("Async create starting...", "tfID", n.opTracker.GetFrameworkTFID())
_, err := n.terraformPluginFrameworkExternalClient.Create(ctx, mg)
err = tferrors.NewAsyncCreateFailed(err)
n.opTracker.LastOperation.SetError(err)
n.opTracker.logger.Debug("Async create ended.", "error", err, "tfID", n.opTracker.GetFrameworkTFID())

n.opTracker.LastOperation.MarkEnd()
if cErr := n.callback.Create(mg.GetName())(err, ctx); cErr != nil {
n.opTracker.logger.Info("Async create callback failed", "error", cErr.Error())
}
}()

return managed.ExternalCreation{}, nil
}

func (n *terraformPluginFrameworkAsyncExternalClient) Update(_ context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) {
if !n.opTracker.LastOperation.MarkStart("update") {
return managed.ExternalUpdate{}, errors.Errorf("%s operation that started at %s is still running", n.opTracker.LastOperation.Type, n.opTracker.LastOperation.StartTime().String())
}

ctx, cancel := context.WithDeadline(context.Background(), n.opTracker.LastOperation.StartTime().Add(defaultAsyncTimeout))
go func() {
defer cancel()

n.opTracker.logger.Debug("Async update starting...", "tfID", n.opTracker.GetFrameworkTFID())
_, err := n.terraformPluginFrameworkExternalClient.Update(ctx, mg)
err = tferrors.NewAsyncUpdateFailed(err)
n.opTracker.LastOperation.SetError(err)
n.opTracker.logger.Debug("Async update ended.", "error", err, "tfID", n.opTracker.GetFrameworkTFID())

n.opTracker.LastOperation.MarkEnd()
if cErr := n.callback.Update(mg.GetName())(err, ctx); cErr != nil {
n.opTracker.logger.Info("Async update callback failed", "error", cErr.Error())
}
}()

return managed.ExternalUpdate{}, nil
}

func (n *terraformPluginFrameworkAsyncExternalClient) Delete(_ context.Context, mg xpresource.Managed) error {
switch {
case n.opTracker.LastOperation.Type == "delete":
n.opTracker.logger.Debug("The previous delete operation is still ongoing", "tfID", n.opTracker.GetFrameworkTFID())
return nil
case !n.opTracker.LastOperation.MarkStart("delete"):
return errors.Errorf("%s operation that started at %s is still running", n.opTracker.LastOperation.Type, n.opTracker.LastOperation.StartTime().String())
}

ctx, cancel := context.WithDeadline(context.Background(), n.opTracker.LastOperation.StartTime().Add(defaultAsyncTimeout))
go func() {
defer cancel()

n.opTracker.logger.Debug("Async delete starting...", "tfID", n.opTracker.GetFrameworkTFID())
err := tferrors.NewAsyncDeleteFailed(n.terraformPluginFrameworkExternalClient.Delete(ctx, mg))
n.opTracker.LastOperation.SetError(err)
n.opTracker.logger.Debug("Async delete ended.", "error", err, "tfID", n.opTracker.GetFrameworkTFID())

n.opTracker.LastOperation.MarkEnd()
if cErr := n.callback.Destroy(mg.GetName())(err, ctx); cErr != nil {
n.opTracker.logger.Info("Async delete callback failed", "error", cErr.Error())
}
}()

return nil
}
16 changes: 14 additions & 2 deletions pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), *o.SecretStoreConfigGVK, connection.WithTLSConfig(o.ESSOptions.TLSConfig)))
}
eventHandler := handler.NewEventHandler(handler.WithLogger(o.Logger.WithValues("gvk", {{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind)))
{{- if and .UseAsync (not .UseTerraformPluginFrameworkClient) }}
ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(eventHandler){{ if .UseNoForkClient }}, tjcontroller.WithStatusUpdates(false){{ end }})
{{- if .UseAsync }}
ac := tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), tjcontroller.WithEventHandler(eventHandler){{ if or .UseNoForkClient .UseTerraformPluginFrameworkClient }}, tjcontroller.WithStatusUpdates(false){{ end }})
{{- end}}
opts := []managed.ReconcilerOption{
managed.WithExternalConnecter(
Expand All @@ -67,13 +67,25 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
)
{{- end -}}
{{- else if .UseTerraformPluginFrameworkClient -}}
{{- if .UseAsync }}
tjcontroller.NewTerraformPluginFrameworkAsyncConnector(mgr.GetClient(), o.OperationTrackerStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], o.Provider.TerraformPluginFrameworkProvider,
tjcontroller.WithTerraformPluginFrameworkAsyncLogger(o.Logger),
tjcontroller.WithTerraformPluginFrameworkAsyncConnectorEventHandler(eventHandler),
tjcontroller.WithTerraformPluginFrameworkAsyncCallbackProvider(ac),
tjcontroller.WithTerraformPluginFrameworkAsyncMetricRecorder(metrics.NewMetricRecorder({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind, mgr, o.PollInterval)),
{{if .FeaturesPackageAlias -}}
tjcontroller.WithTerraformPluginFrameworkAsyncManagementPolicies(o.Features.Enabled({{ .FeaturesPackageAlias }}EnableBetaManagementPolicies))
{{- end -}}
)
{{- else }}
tjcontroller.NewTerraformPluginFrameworkConnector(mgr.GetClient(), o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], o.OperationTrackerStore, o.Provider.TerraformPluginFrameworkProvider,
tjcontroller.WithTerraformPluginFrameworkLogger(o.Logger),
tjcontroller.WithTerraformPluginFrameworkMetricRecorder(metrics.NewMetricRecorder({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind, mgr, o.PollInterval)),
{{if .FeaturesPackageAlias -}}
tjcontroller.WithTerraformPluginFrameworkManagementPolicies(o.Features.Enabled({{ .FeaturesPackageAlias }}EnableBetaManagementPolicies))
{{- end -}}
)
{{- end }}
{{- else -}}
tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), tjcontroller.WithConnectorEventHandler(eventHandler),
{{- if .UseAsync }}
Expand Down

0 comments on commit 9fdf5ee

Please sign in to comment.