Skip to content

Commit

Permalink
refactor(pkg/runtime) - refactor runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
PxyUp committed Feb 10, 2024
1 parent 2f307b9 commit 3cb14ce
Showing 1 changed file with 15 additions and 38 deletions.
53 changes: 15 additions & 38 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ type runtime struct {
ctx context.Context
cfg *config.Config
logger logger.Logger

triggers []trigger.Trigger
}

func New(ctx context.Context, cfg *config.Config, logger logger.Logger) *runtime {
Expand All @@ -27,14 +25,16 @@ func New(ctx context.Context, cfg *config.Config, logger logger.Logger) *runtime

func (r *runtime) Start() {
updates := make(chan *trigger.Message)
triggers := trigger.CreateTriggers(r.ctx, r.cfg, r.logger)

Check failure on line 28 in pkg/runtime/runtime.go

View workflow job for this annotation

GitHub Actions / build

undefined: trigger.CreateTriggers
r.createRunTime(updates)
r.runScheduler(updates)
r.runHTTPServer(updates)
for _, t := range triggers {
t.Run(updates)
}
<-r.ctx.Done()
for _, t := range r.triggers {
close(updates)
for _, t := range triggers {
t.Stop()
}
close(updates)
}

func (r *runtime) createRunTime(updates <-chan *trigger.Message) {
Expand All @@ -44,43 +44,20 @@ func (r *runtime) createRunTime(updates <-chan *trigger.Message) {
select {
case <-r.ctx.Done():
return
case n := <-updates:
case n, ok := <-updates:
if !ok {
return
}
lName := n
go func(name string, value builder.Interfacable) {
r.logger.Infow("new trigger comes", "name", name, "input", value.ToJson())
fields := []string{"name", name}
if value != nil {
fields = append(fields, "input", value.ToJson())
}
r.logger.Infow("new trigger comes", fields...)
_, _ = reg.Get(name).Process(value)
}(lName.Name, lName.Value)
}
}
}()
}

func (r *runtime) runHTTPServer(updates chan<- *trigger.Message) {
needRun := false
forIgnore := []string{}
for _, item := range r.cfg.Items {
if item.TriggerConfig != nil && item.TriggerConfig.HTTPTrigger != nil {
needRun = true
} else {
forIgnore = append(forIgnore, item.Name)
}
}

if !needRun {
return
}

serverRun := trigger.HttpServer(r.ctx, r.cfg.HttpServer, forIgnore).WithLogger(r.logger.With("scheduler_type", "http_server"))
serverRun.Run(updates)
r.triggers = append(r.triggers, serverRun)
}

func (r *runtime) runScheduler(updates chan<- *trigger.Message) {
for _, item := range r.cfg.Items {
if item.TriggerConfig != nil && item.TriggerConfig.SchedulerTrigger != nil {
scheduler := trigger.Scheduler(r.ctx, item.Name, item.TriggerConfig.SchedulerTrigger).WithLogger(r.logger.With("scheduler_name", item.Name))
scheduler.Run(updates)
r.triggers = append(r.triggers, scheduler)
}
}
}

0 comments on commit 3cb14ce

Please sign in to comment.