diff --git a/api/v1/system_types.go b/api/v1/system_types.go index 2f5ffeb90..826ab652b 100644 --- a/api/v1/system_types.go +++ b/api/v1/system_types.go @@ -2,9 +2,11 @@ package v1 import ( "fmt" + "time" "github.com/flanksource/commons/hash" "github.com/flanksource/duty/types" + "github.com/robfig/cron/v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -35,6 +37,18 @@ type TopologySpec struct { Configs []types.ConfigQuery `json:"configs,omitempty"` } +func (s Topology) NextRuntime() (*time.Time, error) { + if s.Spec.Schedule != "" { + schedule, err := cron.ParseStandard(s.Spec.Schedule) + if err != nil { + return nil, err + } + t := schedule.Next(time.Now()) + return &t, nil + } + return nil, nil +} + func (s Topology) String() string { return fmt.Sprintf("%s/%s", s.Namespace, s.Name) } diff --git a/cmd/operator.go b/cmd/operator.go index f05ba9125..35349a55c 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -74,8 +74,9 @@ func run(cmd *cobra.Command, args []string) { if err != nil { logger.Fatalf(err.Error()) } + if ctx.DB() == nil { - logger.Fatalf("operator requires db connections") + logger.Fatalf("operator requires a db connection") } if ctx.Kommons() == nil { logger.Fatalf("operator requires a kubernetes connection") @@ -123,6 +124,7 @@ func run(cmd *cobra.Command, args []string) { runner.RunnerLabels = labels.LoadFromFile("/etc/podinfo/labels") canaryReconciler := &controllers.CanaryReconciler{ + Context: apicontext.DefaultContext, Client: mgr.GetClient(), LogPass: logPass, LogFail: logFail, @@ -133,9 +135,10 @@ func run(cmd *cobra.Command, args []string) { } systemReconciler := &controllers.TopologyReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("system"), - Scheme: mgr.GetScheme(), + Context: apicontext.DefaultContext, + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("system"), + Scheme: mgr.GetScheme(), } if err = mgr.Add(manager.RunnableFunc(db.Start)); err != nil { setupLog.Error(err, "unable to Add manager") diff --git a/cmd/root.go b/cmd/root.go index 942e17828..0f9a1eb1c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -31,10 +31,8 @@ func InitContext() (context.Context, error) { if ctx, err = db.Init(); err != nil { logger.Warnf("error connecting to db %v", err) - } else { ctx = context.New() } - return ctx. WithKubernetes(k8s). WithKommons(kommonsClient), nil diff --git a/pkg/api/run_now.go b/pkg/api/run_now.go index 173e18887..bbe8d9293 100644 --- a/pkg/api/run_now.go +++ b/pkg/api/run_now.go @@ -110,9 +110,9 @@ func RunTopologyHandler(c echo.Context) error { Depth: topologyRunDepth, Namespace: topology.Namespace, } - if err := pkgTopology.SyncComponents(opts, *topology); err != nil { + if count, err := pkgTopology.SyncComponents(opts, *topology); err != nil { return errorResponse(c, err, http.StatusInternalServerError) + } else { + return c.JSON(http.StatusOK, map[string]string{"status": "ok", "count": fmt.Sprint(count)}) } - - return c.JSON(http.StatusOK, map[string]string{"status": "ok"}) } diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index 40f584e98..97f1351be 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -25,14 +25,11 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/flanksource/canary-checker/api/context" - v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/runner" dutyContext "github.com/flanksource/duty/context" - "github.com/flanksource/kommons" "github.com/go-logr/logr" jsontime "github.com/liamylian/jsontime/v2/v2" "github.com/nsf/jsondiff" @@ -40,7 +37,6 @@ import ( "github.com/robfig/cron/v3" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -53,8 +49,7 @@ var json = jsontime.ConfigWithCustomTimeFormat type CanaryReconciler struct { LogPass, LogFail bool client.Client - Kubernetes kubernetes.Interface - Kommons *kommons.Client + dutyContext.Context Log logr.Logger Scheme *runtime.Scheme Events record.EventRecorder @@ -71,11 +66,9 @@ const FinalizerName = "canary.canaries.flanksource.com" // +kubebuilder:rbac:groups="",resources=pods/exec,verbs=* // +kubebuilder:rbac:groups="",resources=pods/logs,verbs=* func (r *CanaryReconciler) Reconcile(parentCtx gocontext.Context, req ctrl.Request) (ctrl.Result, error) { - ctx := context.DefaultContext.Wrap(parentCtx) - logger := r.Log.WithValues("canary", req.NamespacedName) canary := &v1.Canary{} - err := r.Get(ctx, req.NamespacedName, canary) + err := r.Get(parentCtx, req.NamespacedName, canary) if errors.IsNotFound(err) { return ctrl.Result{}, nil } else if err != nil { @@ -85,6 +78,7 @@ func (r *CanaryReconciler) Reconcile(parentCtx gocontext.Context, req ctrl.Reque if runner.IsCanaryIgnored(&canary.ObjectMeta) { return ctrl.Result{}, nil } + ctx := r.Context.WithObject(canary.ObjectMeta) canary.SetRunnerName(r.RunnerName) diff --git a/pkg/controllers/system_controller.go b/pkg/controllers/system_controller.go index 3659d2f72..920220527 100644 --- a/pkg/controllers/system_controller.go +++ b/pkg/controllers/system_controller.go @@ -34,6 +34,7 @@ import ( // TopologyReconciler reconciles a Canary object type TopologyReconciler struct { + dutyContext.Context client.Client Log logr.Logger Scheme *runtime.Scheme @@ -46,7 +47,6 @@ const TopologyFinalizerName = "topology.canaries.flanksource.com" // +kubebuilder:rbac:groups=canaries.flanksource.com,resources=topologies,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=canaries.flanksource.com,resources=topologies/status,verbs=get;update;patch func (r *TopologyReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (ctrl.Result, error) { - dCtx := dutyContext.NewContext(ctx) logger := r.Log.WithValues("topology", req.NamespacedName) topology := &v1.Topology{} err := r.Get(ctx, req.NamespacedName, topology) @@ -54,6 +54,7 @@ func (r *TopologyReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) logger.V(1).Info("Topology not found") return ctrl.Result{}, nil } + dCtx := r.Context.WithObject(topology.ObjectMeta) if !controllerutil.ContainsFinalizer(topology, TopologyFinalizerName) { controllerutil.AddFinalizer(topology, TopologyFinalizerName) if err := r.Client.Update(ctx, topology); err != nil { diff --git a/pkg/db/topology.go b/pkg/db/topology.go index 01b5dbe33..5100c5d89 100644 --- a/pkg/db/topology.go +++ b/pkg/db/topology.go @@ -60,40 +60,35 @@ func GetTopology(ctx context.Context, id string) (*v1.Topology, error) { // TODO: Simplify logic and improve readability func PersistComponent(ctx context.Context, component *pkg.Component) ([]uuid.UUID, error) { - existing := models.Component{} + var existing *models.Component + var err error var persisted []uuid.UUID db := ctx.DB() - var tx *gorm.DB - if component.ID == uuid.Nil { - if component.ParentId == nil { - tx = db.Find(existing, "name = ? AND type = ? and parent_id is NULL", component.Name, component.Type) - } else { - tx = db.Find(existing, "name = ? AND type = ? and parent_id = ?", component.Name, component.Type, component.ParentId) - } - } else { - if component.ParentId == nil { - tx = db.Find(existing, "topology_id = ? AND name = ? AND type = ? and parent_id is NULL", component.TopologyID, component.Name, component.Type) - } else { - tx = db.Find(existing, "topology_id = ? AND name = ? AND type = ? and parent_id = ?", component.TopologyID, component.Name, component.Type, component.ParentId) - } - } - if tx.Error != nil { - return persisted, fmt.Errorf("error finding component: %v", tx.Error) + + existing, err = component.FindExisting(db) + if err != nil { + return persisted, fmt.Errorf("error finding component: %v", err) } - if existing.ID != uuid.Nil { + tx := db.Table("components") + if existing != nil && existing.ID != uuid.Nil { component.ID = existing.ID - tx = db.Table("components").Clauses( + tx = tx.Clauses( clause.OnConflict{ Columns: []clause.Column{{Name: "topology_id"}, {Name: "name"}, {Name: "type"}, {Name: "parent_id"}}, UpdateAll: true, }, ).UpdateColumns(component) - // Since gorm ignores nil fields, we are setting deleted_at explicitly - db.Table("components").Where("id = ?", existing.ID).UpdateColumn("deleted_at", nil) + if existing.DeletedAt != component.DeletedAt { + // Since gorm ignores nil fields, we are setting deleted_at explicitly + if err := db.Table("components").Where("id = ?", existing.ID).UpdateColumn("deleted_at", nil).Error; err != nil { + return nil, fmt.Errorf("failed to undelete: %v", err) + } + } + } else { - tx = db.Table("components").Clauses( + tx = tx.Clauses( clause.OnConflict{ Columns: []clause.Column{{Name: "topology_id"}, {Name: "name"}, {Name: "type"}, {Name: "parent_id"}}, UpdateAll: true, diff --git a/pkg/jobs/topology/topology_jobs.go b/pkg/jobs/topology/topology_jobs.go index e59cc0e3c..00fbddcdb 100644 --- a/pkg/jobs/topology/topology_jobs.go +++ b/pkg/jobs/topology/topology_jobs.go @@ -3,9 +3,7 @@ package topology import ( "fmt" "reflect" - - gocontext "context" - "time" + "sync" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg/db" @@ -15,28 +13,37 @@ import ( "github.com/flanksource/duty/context" "github.com/flanksource/duty/job" "github.com/robfig/cron/v3" - "k8s.io/apimachinery/pkg/types" ) var TopologyScheduler = cron.New() -type TopologyJob struct { - context.Context - v1.Topology -} - -func (job TopologyJob) GetNamespacedName() types.NamespacedName { - return types.NamespacedName{Name: job.Topology.Name, Namespace: job.Topology.Namespace} -} - -func (job TopologyJob) Run() { - opts := pkgTopology.TopologyRunOptions{ - Context: job.Context.Wrap(gocontext.Background()), - Depth: 10, - Namespace: job.Namespace, +var topologyJobs sync.Map + +func newTopologyJob(ctx context.Context, topology v1.Topology) { + j := &job.Job{ + Name: "TopologyRun", + Context: ctx.WithObject(topology.ObjectMeta), + Schedule: topology.Spec.Schedule, + JobHistory: true, + Retention: job.RetentionHour, + ID: topology.GetPersistedID(), + Fn: func(ctx job.JobRuntime) error { + ctx.History.ResourceID = topology.GetPersistedID() + ctx.History.ResourceType = "topology" + opts := pkgTopology.TopologyRunOptions{ + Context: ctx.Context, + Depth: 10, + Namespace: topology.Namespace, + } + count, err := pkgTopology.SyncComponents(opts, topology) + ctx.History.SuccessCount = count + return err + }, } - if err := pkgTopology.SyncComponents(opts, job.Topology); err != nil { - logger.Errorf("failed to run topology %s: %v", job.GetNamespacedName(), err) + + topologyJobs.Store(topology.GetPersistedID(), j) + if err := j.AddToScheduler(TopologyScheduler); err != nil { + logger.Errorf("[%s] failed to schedule %v", *j, err) } } @@ -65,58 +72,37 @@ var SyncTopology = &job.Job{ } func SyncTopologyJob(ctx context.Context, t v1.Topology) error { + id := t.GetPersistedID() + var existingJob *job.Job + if j, ok := topologyJobs.Load(id); ok { + existingJob = j.(*job.Job) + } if !t.DeletionTimestamp.IsZero() || t.Spec.GetSchedule() == "@never" { - DeleteTopologyJob(t.GetPersistedID()) + existingJob.Unschedule() + topologyJobs.Delete(id) return nil } - entry := findTopologyCronEntry(t.GetPersistedID()) - if entry != nil { - job := entry.Job.(TopologyJob) - if !reflect.DeepEqual(job.Topology.Spec, t.Spec) { - logger.Infof("Rescheduling %s topology with updated specs", t) - TopologyScheduler.Remove(entry.ID) - } else { - return nil - } - } - job := TopologyJob{ - Context: ctx.Wrap(gocontext.Background()).WithObject(t.ObjectMeta), - Topology: t, - } - - _, err := TopologyScheduler.AddJob(t.Spec.GetSchedule(), job) - if err != nil { - return fmt.Errorf("failed to schedule topology %s/%s: %v", t.Namespace, t.Name, err) - } else { - logger.Infof("Scheduled %s/%s: %s", t.Namespace, t.Name, t.Spec.GetSchedule()) - } - - entry = findTopologyCronEntry(t.GetPersistedID()) - if entry != nil && time.Until(entry.Next) < 1*time.Hour { - // run all regular topologies on startup - job = entry.Job.(TopologyJob) - go job.Run() + if existingJob == nil { + newTopologyJob(ctx, t) + return nil } - return nil -} -func findTopologyCronEntry(id string) *cron.Entry { - for _, entry := range TopologyScheduler.Entries() { - if entry.Job.(TopologyJob).GetPersistedID() == id { - return &entry - } + existingTopology := existingJob.Context.Value("topology") + if existingTopology != nil && !reflect.DeepEqual(existingTopology.(v1.Topology).Spec, t.Spec) { + ctx.Debugf("Rescheduling %s topology with updated specs", t) + existingJob.Unschedule() + newTopologyJob(ctx, t) } return nil } func DeleteTopologyJob(id string) { - entry := findTopologyCronEntry(id) - if entry == nil { - return + if j, ok := topologyJobs.Load(id); ok { + existingJob := j.(*job.Job) + existingJob.Unschedule() + topologyJobs.Delete(id) } - logger.Tracef("deleting cron entry for topology:%s with entry ID: %v", id, entry.ID) - TopologyScheduler.Remove(entry.ID) } var CleanupComponents = &job.Job{ diff --git a/pkg/system_api.go b/pkg/system_api.go index a23cceab7..f022234b6 100644 --- a/pkg/system_api.go +++ b/pkg/system_api.go @@ -150,6 +150,25 @@ type Component struct { LogSelectors dutyTypes.LogSelectors `json:"logs,omitempty" gorm:"column:log_selectors"` } +func (component *Component) FindExisting(db *gorm.DB) (*models.Component, error) { + var existing models.Component + tx := db.Model(component).Select("id", "deleted_at") + if component.ID == uuid.Nil { + if component.ParentId == nil { + tx = tx.Find(&existing, "name = ? AND type = ? and parent_id is NULL", component.Name, component.Type) + } else { + tx = tx.Find(&existing, "name = ? AND type = ? and parent_id = ?", component.Name, component.Type, component.ParentId).Pluck("id", &existing) + } + } else { + if component.ParentId == nil { + tx = tx.Find(&existing, "topology_id = ? AND name = ? AND type = ? and parent_id is NULL", component.TopologyID, component.Name, component.Type).Pluck("id", &existing) + } else { + tx = tx.Find(&existing, "topology_id = ? AND name = ? AND type = ? and parent_id = ?", component.TopologyID, component.Name, component.Type, component.ParentId).Pluck("id", &existing) + } + } + return &existing, tx.Error +} + func (component *Component) GetConfigs(db *gorm.DB) (relationships []models.ConfigComponentRelationship, err error) { err = db.Where("component_id = ? AND deleted_at IS NULL", component.ID).Find(&relationships).Error return relationships, err diff --git a/pkg/topology/component_relationship.go b/pkg/topology/component_relationship.go index 3ca8ecb86..897979d5e 100644 --- a/pkg/topology/component_relationship.go +++ b/pkg/topology/component_relationship.go @@ -15,6 +15,7 @@ import ( var ComponentRelationshipSync = &job.Job{ Name: "ComponentRelationshipSync", + Schedule: "@every 5m", JobHistory: true, Retention: job.RetentionHour, Singleton: true, @@ -56,6 +57,7 @@ var ComponentRelationshipSync = &job.Job{ var ComponentStatusSummarySync = &job.Job{ Name: "ComponentStatusSummarySync", + Schedule: "@every 2m", JobHistory: true, Retention: job.RetentionHour, Singleton: true, diff --git a/pkg/topology/run.go b/pkg/topology/run.go index e0a0219f0..10df78826 100644 --- a/pkg/topology/run.go +++ b/pkg/topology/run.go @@ -456,25 +456,25 @@ func Run(opts TopologyRunOptions, t v1.Topology) ([]*pkg.Component, models.JobHi return results, *jobHistory } -func SyncComponents(opts TopologyRunOptions, topology v1.Topology) error { +func SyncComponents(opts TopologyRunOptions, topology v1.Topology) (int, error) { id := topology.GetPersistedID() opts.Context.Debugf("[%s] running topology sync", id) // Check if deleted var dbTopology models.Topology if err := opts.DB().Where("id = ?", id).First(&dbTopology).Error; err != nil { - return fmt.Errorf("failed to query topology id: %s: %w", id, err) + return 0, fmt.Errorf("failed to query topology id: %s: %w", id, err) } if dbTopology.DeletedAt != nil { opts.Context.Debugf("Skipping topology[%s] as its deleted", id) // TODO: Should we run the db.DeleteTopology function always in this scenario - return nil + return 0, nil } components, _ := Run(opts, topology) topologyID, err := uuid.Parse(id) if err != nil { - return fmt.Errorf("failed to parse topology id: %w", err) + return 0, fmt.Errorf("failed to parse topology id: %w", err) } var compIDs []uuid.UUID @@ -485,7 +485,7 @@ func SyncComponents(opts TopologyRunOptions, topology v1.Topology) error { component.TopologyID = &topologyID componentsIDs, err := db.PersistComponent(opts.Context, component) if err != nil { - return fmt.Errorf("failed to persist component(id=%s, name=%s): %w", component.ID, component.Name, err) + return 0, fmt.Errorf("failed to persist component(id=%s, name=%s): %w", component.ID, component.Name, err) } compIDs = append(compIDs, componentsIDs...) @@ -493,15 +493,15 @@ func SyncComponents(opts TopologyRunOptions, topology v1.Topology) error { dbCompsIDs, err := db.GetActiveComponentsIDsOfTopology(opts.DB(), id) if err != nil { - return fmt.Errorf("error getting components for topology (id=%s): %v", id, err) + return 0, fmt.Errorf("error getting components for topology (id=%s): %v", id, err) } deleteCompIDs := utils.SetDifference(dbCompsIDs, compIDs) if len(deleteCompIDs) != 0 { if err := db.DeleteComponentsWithIDs(opts.DB(), utils.UUIDsToStrings(deleteCompIDs)); err != nil { - return fmt.Errorf("error deleting components: %v", err) + return 0, fmt.Errorf("error deleting components: %v", err) } } - return nil + return len(components), nil } diff --git a/test/e2e-operator.sh b/test/e2e-operator.sh index 5db73a9b5..bcad0991b 100755 --- a/test/e2e-operator.sh +++ b/test/e2e-operator.sh @@ -19,7 +19,7 @@ $KARINA ca generate --name sealed-secrets --cert-path .certs/sealed-secrets-crt. fi ## starting the postgres as docker container -docker run --rm -p 5432:5432 --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres:14.1 +docker run --rm -p 5433:5433 --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres:14.1 if $KARINA provision kind-cluster -e name=$CLUSTER_NAME -v ; then echo "::endgroup::" else @@ -40,7 +40,7 @@ echo "::endgroup::" echo "::group::Operator" ## starting operator in background -go run main.go operator --db-migrations -vvv --db="postgres://postgres:mysecretpassword@localhost:5432/postgres?sslmode=disable" --maxStatusCheckCount=1 & +go run main.go operator --db-migrations -vvv --db="postgres://postgres:mysecretpassword@localhost:5433/postgres?sslmode=disable" & PROC_ID=$! echo "Started operator with PID $PROC_ID" @@ -50,7 +50,7 @@ sleep 120 echo "Waiting for server to accept connections" curl https://raw.githubusercontent.com/vishnubob/wait-for-it/master/wait-for-it.sh -o wait-for-it.sh; -chmod +x wait-for-it.sh; ./wait-for-it.sh 0.0.0.0:8080 --timeout=0; +chmod +x wait-for-it.sh; ./wait-for-it.sh 0.0.0.0:8080 --timeout=120; echo "Server is ready now" diff --git a/test/run_test.go b/test/run_test.go index 43699a5d0..851f458d7 100644 --- a/test/run_test.go +++ b/test/run_test.go @@ -35,7 +35,7 @@ var _ = ginkgo.BeforeSuite(func() { logger.Warnf("Failed to get kommons client, features that read kubernetes configs will fail: %v", err) } - DefaultContext = dutyContext.New().WithKubernetes(k8s).WithKommons(kommonsClient) + DefaultContext = dutyContext.New().WithKubernetes(k8s).WithKommons(kommonsClient).WithNamespace("default") logger.StandardLogger().SetLogLevel(verbosity)