Skip to content

Commit

Permalink
chore: fix operator tests
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop committed Dec 27, 2023
1 parent d9f0fa2 commit a9aa244
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 113 deletions.
14 changes: 14 additions & 0 deletions api/v1/system_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package v1

import (
"fmt"
"time"

"github.com/flanksource/commons/hash"
"github.com/flanksource/duty/types"
"github.com/robfig/cron/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -35,6 +37,18 @@ type TopologySpec struct {
Configs []types.ConfigQuery `json:"configs,omitempty"`
}

func (s Topology) NextRuntime() (*time.Time, error) {
if s.Spec.Schedule != "" {
schedule, err := cron.ParseStandard(s.Spec.Schedule)
if err != nil {
return nil, err
}
t := schedule.Next(time.Now())
return &t, nil
}
return nil, nil
}

func (s Topology) String() string {
return fmt.Sprintf("%s/%s", s.Namespace, s.Name)
}
Expand Down
11 changes: 7 additions & 4 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func run(cmd *cobra.Command, args []string) {
if err != nil {
logger.Fatalf(err.Error())
}

if ctx.DB() == nil {
logger.Fatalf("operator requires db connections")
logger.Fatalf("operator requires a db connection")
}
if ctx.Kommons() == nil {
logger.Fatalf("operator requires a kubernetes connection")
Expand Down Expand Up @@ -123,6 +124,7 @@ func run(cmd *cobra.Command, args []string) {
runner.RunnerLabels = labels.LoadFromFile("/etc/podinfo/labels")

canaryReconciler := &controllers.CanaryReconciler{
Context: apicontext.DefaultContext,
Client: mgr.GetClient(),
LogPass: logPass,
LogFail: logFail,
Expand All @@ -133,9 +135,10 @@ func run(cmd *cobra.Command, args []string) {
}

systemReconciler := &controllers.TopologyReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("system"),
Scheme: mgr.GetScheme(),
Context: apicontext.DefaultContext,
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("system"),
Scheme: mgr.GetScheme(),
}
if err = mgr.Add(manager.RunnableFunc(db.Start)); err != nil {
setupLog.Error(err, "unable to Add manager")
Expand Down
2 changes: 0 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ func InitContext() (context.Context, error) {

if ctx, err = db.Init(); err != nil {
logger.Warnf("error connecting to db %v", err)
} else {
ctx = context.New()
}

return ctx.
WithKubernetes(k8s).
WithKommons(kommonsClient), nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/run_now.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ func RunTopologyHandler(c echo.Context) error {
Depth: topologyRunDepth,
Namespace: topology.Namespace,
}
if err := pkgTopology.SyncComponents(opts, *topology); err != nil {
if count, err := pkgTopology.SyncComponents(opts, *topology); err != nil {
return errorResponse(c, err, http.StatusInternalServerError)
} else {
return c.JSON(http.StatusOK, map[string]string{"status": "ok", "count": fmt.Sprint(count)})
}

return c.JSON(http.StatusOK, map[string]string{"status": "ok"})
}
12 changes: 3 additions & 9 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,18 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flanksource/canary-checker/api/context"

v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
dutyContext "github.com/flanksource/duty/context"
"github.com/flanksource/kommons"
"github.com/go-logr/logr"
jsontime "github.com/liamylian/jsontime/v2/v2"
"github.com/nsf/jsondiff"
"github.com/patrickmn/go-cache"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -53,8 +49,7 @@ var json = jsontime.ConfigWithCustomTimeFormat
type CanaryReconciler struct {
LogPass, LogFail bool
client.Client
Kubernetes kubernetes.Interface
Kommons *kommons.Client
dutyContext.Context
Log logr.Logger
Scheme *runtime.Scheme
Events record.EventRecorder
Expand All @@ -71,11 +66,9 @@ const FinalizerName = "canary.canaries.flanksource.com"
// +kubebuilder:rbac:groups="",resources=pods/exec,verbs=*
// +kubebuilder:rbac:groups="",resources=pods/logs,verbs=*
func (r *CanaryReconciler) Reconcile(parentCtx gocontext.Context, req ctrl.Request) (ctrl.Result, error) {
ctx := context.DefaultContext.Wrap(parentCtx)

logger := r.Log.WithValues("canary", req.NamespacedName)
canary := &v1.Canary{}
err := r.Get(ctx, req.NamespacedName, canary)
err := r.Get(parentCtx, req.NamespacedName, canary)
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
} else if err != nil {
Expand All @@ -85,6 +78,7 @@ func (r *CanaryReconciler) Reconcile(parentCtx gocontext.Context, req ctrl.Reque
if runner.IsCanaryIgnored(&canary.ObjectMeta) {
return ctrl.Result{}, nil
}
ctx := r.Context.WithObject(canary.ObjectMeta)

canary.SetRunnerName(r.RunnerName)

Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/system_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

// TopologyReconciler reconciles a Canary object
type TopologyReconciler struct {
dutyContext.Context
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Expand All @@ -46,14 +47,14 @@ const TopologyFinalizerName = "topology.canaries.flanksource.com"
// +kubebuilder:rbac:groups=canaries.flanksource.com,resources=topologies,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=canaries.flanksource.com,resources=topologies/status,verbs=get;update;patch
func (r *TopologyReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (ctrl.Result, error) {
dCtx := dutyContext.NewContext(ctx)
logger := r.Log.WithValues("topology", req.NamespacedName)
topology := &v1.Topology{}
err := r.Get(ctx, req.NamespacedName, topology)
if errors.IsNotFound(err) {
logger.V(1).Info("Topology not found")
return ctrl.Result{}, nil
}
dCtx := r.Context.WithObject(topology.ObjectMeta)
if !controllerutil.ContainsFinalizer(topology, TopologyFinalizerName) {
controllerutil.AddFinalizer(topology, TopologyFinalizerName)
if err := r.Client.Update(ctx, topology); err != nil {
Expand Down
39 changes: 17 additions & 22 deletions pkg/db/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,40 +60,35 @@ func GetTopology(ctx context.Context, id string) (*v1.Topology, error) {

// TODO: Simplify logic and improve readability
func PersistComponent(ctx context.Context, component *pkg.Component) ([]uuid.UUID, error) {
existing := models.Component{}
var existing *models.Component
var err error
var persisted []uuid.UUID
db := ctx.DB()
var tx *gorm.DB
if component.ID == uuid.Nil {
if component.ParentId == nil {
tx = db.Find(existing, "name = ? AND type = ? and parent_id is NULL", component.Name, component.Type)
} else {
tx = db.Find(existing, "name = ? AND type = ? and parent_id = ?", component.Name, component.Type, component.ParentId)
}
} else {
if component.ParentId == nil {
tx = db.Find(existing, "topology_id = ? AND name = ? AND type = ? and parent_id is NULL", component.TopologyID, component.Name, component.Type)
} else {
tx = db.Find(existing, "topology_id = ? AND name = ? AND type = ? and parent_id = ?", component.TopologyID, component.Name, component.Type, component.ParentId)
}
}
if tx.Error != nil {
return persisted, fmt.Errorf("error finding component: %v", tx.Error)

existing, err = component.FindExisting(db)
if err != nil {
return persisted, fmt.Errorf("error finding component: %v", err)
}

if existing.ID != uuid.Nil {
tx := db.Table("components")
if existing != nil && existing.ID != uuid.Nil {
component.ID = existing.ID
tx = db.Table("components").Clauses(
tx = tx.Clauses(
clause.OnConflict{
Columns: []clause.Column{{Name: "topology_id"}, {Name: "name"}, {Name: "type"}, {Name: "parent_id"}},
UpdateAll: true,
},
).UpdateColumns(component)

// Since gorm ignores nil fields, we are setting deleted_at explicitly
db.Table("components").Where("id = ?", existing.ID).UpdateColumn("deleted_at", nil)
if existing.DeletedAt != component.DeletedAt {
// Since gorm ignores nil fields, we are setting deleted_at explicitly
if err := db.Table("components").Where("id = ?", existing.ID).UpdateColumn("deleted_at", nil).Error; err != nil {
return nil, fmt.Errorf("failed to undelete: %v", err)
}
}

Check failure on line 89 in pkg/db/topology.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
} else {
tx = db.Table("components").Clauses(
tx = tx.Clauses(
clause.OnConflict{
Columns: []clause.Column{{Name: "topology_id"}, {Name: "name"}, {Name: "type"}, {Name: "parent_id"}},
UpdateAll: true,
Expand Down
106 changes: 46 additions & 60 deletions pkg/jobs/topology/topology_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package topology
import (
"fmt"
"reflect"

gocontext "context"
"time"
"sync"

v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg/db"
Expand All @@ -15,28 +13,37 @@ import (
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/types"
)

var TopologyScheduler = cron.New()

type TopologyJob struct {
context.Context
v1.Topology
}

func (job TopologyJob) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Name: job.Topology.Name, Namespace: job.Topology.Namespace}
}

func (job TopologyJob) Run() {
opts := pkgTopology.TopologyRunOptions{
Context: job.Context.Wrap(gocontext.Background()),
Depth: 10,
Namespace: job.Namespace,
var topologyJobs sync.Map

func newTopologyJob(ctx context.Context, topology v1.Topology) {
j := &job.Job{
Name: "TopologyRun",
Context: ctx.WithObject(topology.ObjectMeta),
Schedule: topology.Spec.Schedule,
JobHistory: true,
Retention: job.RetentionHour,
ID: topology.GetPersistedID(),
Fn: func(ctx job.JobRuntime) error {
ctx.History.ResourceID = topology.GetPersistedID()
ctx.History.ResourceType = "topology"
opts := pkgTopology.TopologyRunOptions{
Context: ctx.Context,
Depth: 10,
Namespace: topology.Namespace,
}
count, err := pkgTopology.SyncComponents(opts, topology)
ctx.History.SuccessCount = count
return err
},
}
if err := pkgTopology.SyncComponents(opts, job.Topology); err != nil {
logger.Errorf("failed to run topology %s: %v", job.GetNamespacedName(), err)

topologyJobs.Store(topology.GetPersistedID(), j)
if err := j.AddToScheduler(TopologyScheduler); err != nil {
logger.Errorf("[%s] failed to schedule %v", *j, err)
}
}

Expand Down Expand Up @@ -65,58 +72,37 @@ var SyncTopology = &job.Job{
}

func SyncTopologyJob(ctx context.Context, t v1.Topology) error {
id := t.GetPersistedID()
var existingJob *job.Job
if j, ok := topologyJobs.Load(id); ok {
existingJob = j.(*job.Job)
}
if !t.DeletionTimestamp.IsZero() || t.Spec.GetSchedule() == "@never" {
DeleteTopologyJob(t.GetPersistedID())
existingJob.Unschedule()
topologyJobs.Delete(id)
return nil
}

entry := findTopologyCronEntry(t.GetPersistedID())
if entry != nil {
job := entry.Job.(TopologyJob)
if !reflect.DeepEqual(job.Topology.Spec, t.Spec) {
logger.Infof("Rescheduling %s topology with updated specs", t)
TopologyScheduler.Remove(entry.ID)
} else {
return nil
}
}
job := TopologyJob{
Context: ctx.Wrap(gocontext.Background()).WithObject(t.ObjectMeta),
Topology: t,
}

_, err := TopologyScheduler.AddJob(t.Spec.GetSchedule(), job)
if err != nil {
return fmt.Errorf("failed to schedule topology %s/%s: %v", t.Namespace, t.Name, err)
} else {
logger.Infof("Scheduled %s/%s: %s", t.Namespace, t.Name, t.Spec.GetSchedule())
}

entry = findTopologyCronEntry(t.GetPersistedID())
if entry != nil && time.Until(entry.Next) < 1*time.Hour {
// run all regular topologies on startup
job = entry.Job.(TopologyJob)
go job.Run()
if existingJob == nil {
newTopologyJob(ctx, t)
return nil
}
return nil
}

func findTopologyCronEntry(id string) *cron.Entry {
for _, entry := range TopologyScheduler.Entries() {
if entry.Job.(TopologyJob).GetPersistedID() == id {
return &entry
}
existingTopology := existingJob.Context.Value("topology")
if existingTopology != nil && !reflect.DeepEqual(existingTopology.(v1.Topology).Spec, t.Spec) {
ctx.Debugf("Rescheduling %s topology with updated specs", t)
existingJob.Unschedule()
newTopologyJob(ctx, t)
}
return nil
}

func DeleteTopologyJob(id string) {
entry := findTopologyCronEntry(id)
if entry == nil {
return
if j, ok := topologyJobs.Load(id); ok {
existingJob := j.(*job.Job)
existingJob.Unschedule()
topologyJobs.Delete(id)
}
logger.Tracef("deleting cron entry for topology:%s with entry ID: %v", id, entry.ID)
TopologyScheduler.Remove(entry.ID)
}

var CleanupComponents = &job.Job{
Expand Down
19 changes: 19 additions & 0 deletions pkg/system_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,25 @@ type Component struct {
LogSelectors dutyTypes.LogSelectors `json:"logs,omitempty" gorm:"column:log_selectors"`
}

func (component *Component) FindExisting(db *gorm.DB) (*models.Component, error) {
var existing models.Component
tx := db.Model(component).Select("id", "deleted_at")
if component.ID == uuid.Nil {
if component.ParentId == nil {
tx = tx.Find(&existing, "name = ? AND type = ? and parent_id is NULL", component.Name, component.Type)
} else {
tx = tx.Find(&existing, "name = ? AND type = ? and parent_id = ?", component.Name, component.Type, component.ParentId).Pluck("id", &existing)
}
} else {
if component.ParentId == nil {
tx = tx.Find(&existing, "topology_id = ? AND name = ? AND type = ? and parent_id is NULL", component.TopologyID, component.Name, component.Type).Pluck("id", &existing)
} else {
tx = tx.Find(&existing, "topology_id = ? AND name = ? AND type = ? and parent_id = ?", component.TopologyID, component.Name, component.Type, component.ParentId).Pluck("id", &existing)
}
}
return &existing, tx.Error
}

func (component *Component) GetConfigs(db *gorm.DB) (relationships []models.ConfigComponentRelationship, err error) {
err = db.Where("component_id = ? AND deleted_at IS NULL", component.ID).Find(&relationships).Error
return relationships, err
Expand Down
Loading

0 comments on commit a9aa244

Please sign in to comment.