Skip to content

Commit

Permalink
refactor(pkg/runtime) - refactor runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
PxyUp committed Feb 10, 2024
1 parent 2f307b9 commit 1debcd5
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 39 deletions.
53 changes: 15 additions & 38 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ type runtime struct {
ctx context.Context
cfg *config.Config
logger logger.Logger

triggers []trigger.Trigger
}

func New(ctx context.Context, cfg *config.Config, logger logger.Logger) *runtime {
Expand All @@ -27,14 +25,16 @@ func New(ctx context.Context, cfg *config.Config, logger logger.Logger) *runtime

func (r *runtime) Start() {
updates := make(chan *trigger.Message)
triggers := trigger.CreateTriggers(r.ctx, r.cfg, r.logger)
r.createRunTime(updates)
r.runScheduler(updates)
r.runHTTPServer(updates)
for _, t := range triggers {
t.Run(updates)
}
<-r.ctx.Done()
for _, t := range r.triggers {
close(updates)
for _, t := range triggers {
t.Stop()
}
close(updates)
}

func (r *runtime) createRunTime(updates <-chan *trigger.Message) {
Expand All @@ -44,43 +44,20 @@ func (r *runtime) createRunTime(updates <-chan *trigger.Message) {
select {
case <-r.ctx.Done():
return
case n := <-updates:
case n, ok := <-updates:
if !ok {
return
}
lName := n
go func(name string, value builder.Interfacable) {
r.logger.Infow("new trigger comes", "name", name, "input", value.ToJson())
fields := []string{"name", name}
if value != nil {
fields = append(fields, "input", value.ToJson())
}
r.logger.Infow("new trigger comes", fields...)
_, _ = reg.Get(name).Process(value)
}(lName.Name, lName.Value)
}
}
}()
}

func (r *runtime) runHTTPServer(updates chan<- *trigger.Message) {
needRun := false
forIgnore := []string{}
for _, item := range r.cfg.Items {
if item.TriggerConfig != nil && item.TriggerConfig.HTTPTrigger != nil {
needRun = true
} else {
forIgnore = append(forIgnore, item.Name)
}
}

if !needRun {
return
}

serverRun := trigger.HttpServer(r.ctx, r.cfg.HttpServer, forIgnore).WithLogger(r.logger.With("scheduler_type", "http_server"))
serverRun.Run(updates)
r.triggers = append(r.triggers, serverRun)
}

func (r *runtime) runScheduler(updates chan<- *trigger.Message) {
for _, item := range r.cfg.Items {
if item.TriggerConfig != nil && item.TriggerConfig.SchedulerTrigger != nil {
scheduler := trigger.Scheduler(r.ctx, item.Name, item.TriggerConfig.SchedulerTrigger).WithLogger(r.logger.With("scheduler_name", item.Name))
scheduler.Run(updates)
r.triggers = append(r.triggers, scheduler)
}
}
}
44 changes: 43 additions & 1 deletion pkg/trigger/trigger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package trigger

import "github.com/PxyUp/fitter/pkg/builder"
import (
"context"
"github.com/PxyUp/fitter/pkg/builder"
"github.com/PxyUp/fitter/pkg/config"
"github.com/PxyUp/fitter/pkg/logger"
)

type Message struct {
Name string
Expand All @@ -11,3 +16,40 @@ type Trigger interface {
Run(chan<- *Message)
Stop()
}

func createHttpTrigger(ctx context.Context, cfg *config.Config, logger logger.Logger) []Trigger {
needRun := false
forIgnore := []string{}
for _, item := range cfg.Items {
if item.TriggerConfig != nil && item.TriggerConfig.HTTPTrigger != nil {
needRun = true
} else {
forIgnore = append(forIgnore, item.Name)
}
}

if !needRun {
return nil
}

return []Trigger{HttpServer(ctx, cfg.HttpServer, forIgnore).WithLogger(logger.With("scheduler_type", "http_server"))}
}

func createSchedulerTriggers(ctx context.Context, cfg *config.Config, logger logger.Logger) []Trigger {
var schedulers []Trigger
for _, item := range cfg.Items {
if item.TriggerConfig != nil && item.TriggerConfig.SchedulerTrigger != nil {
schedulers = append(schedulers, Scheduler(ctx, item.Name, item.TriggerConfig.SchedulerTrigger).WithLogger(logger.With("scheduler_name", item.Name)))
}
}

return schedulers
}

func CreateTriggers(ctx context.Context, cfg *config.Config, logger logger.Logger) []Trigger {
var triggers []Trigger
triggers = append(triggers, createHttpTrigger(ctx, cfg, logger)...)
triggers = append(triggers, createSchedulerTriggers(ctx, cfg, logger)...)

return triggers
}

0 comments on commit 1debcd5

Please sign in to comment.