Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Sep 26, 2024
1 parent 96d3fdd commit c7faf48
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 18 deletions.
3 changes: 3 additions & 0 deletions pkg/controller/component/available.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (h *AvailableProbeEventHandler) Handle(cli client.Client, reqCtx intctrluti
return nil
}

// TODO:
// 1. how to merge

probeEvent := &proto.ProbeEvent{}
if err := json.Unmarshal([]byte(event.Message), probeEvent); err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/component/kbagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
minAvailablePort = 1025
maxAvailablePort = 65535
kbAgentDefaultPort = 3501

defaultProbeReportPeriodSeconds = 60
)

var (
Expand Down Expand Up @@ -224,7 +226,9 @@ func buildKBAgentStartupEnvs(synthesizedComp *SynthesizedComponent) ([]corev1.En
actions = append(actions, *a)
probes = append(probes, *p)
}
// TODO: how to schedule the execution of probes?
if a, p := buildProbe4KBAgent(synthesizedComp.LifecycleActions.AvailableProbe, availableProbe, synthesizedComp.FullCompName); a != nil && p != nil {
p.ReportPeriodSeconds = max(defaultProbeReportPeriodSeconds, p.PeriodSeconds)
actions = append(actions, *a)
probes = append(probes, *p)
}
Expand Down Expand Up @@ -264,7 +268,6 @@ func buildProbe4KBAgent(probe *appsv1.Probe, name, instance string) (*proto.Acti
PeriodSeconds: probe.PeriodSeconds,
SuccessThreshold: probe.SuccessThreshold,
FailureThreshold: probe.FailureThreshold,
ReportPeriodSeconds: nil, // TODO: impl
Instance: instance,
}
return a, p
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/component/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ func inDataContext() *multicluster.ClientOption {
return multicluster.InDataContext()
}

func inDataContextUnspecified() *multicluster.ClientOption {
return multicluster.InDataContextUnspecified()
}

func ValidateCompDefRegexp(compDefPattern string) error {
_, err := regexp.Compile(compDefPattern)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/kbagent/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Probe struct {
PeriodSeconds int32 `json:"periodSeconds,omitempty"`
SuccessThreshold int32 `json:"successThreshold,omitempty"`
FailureThreshold int32 `json:"failureThreshold,omitempty"`
ReportPeriodSeconds *int32 `json:"reportPeriodSeconds,omitempty"`
ReportPeriodSeconds int32 `json:"reportPeriodSeconds,omitempty"`
}

type ProbeEvent struct {
Expand Down
68 changes: 58 additions & 10 deletions pkg/kbagent/service/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *probeService) Start() error {
runner := &probeRunner{
logger: s.logger.WithValues("probe", name),
actionService: s.actionService,
latestEvent: make(chan proto.ProbeEvent, 1),
}
go runner.run(s.probes[name])
s.runners[name] = runner
Expand All @@ -96,6 +97,7 @@ type probeRunner struct {
succeedCount int64
failedCount int64
latestOutput []byte
latestEvent chan proto.ProbeEvent
}

func (r *probeRunner) run(probe *proto.Probe) {
Expand All @@ -105,6 +107,13 @@ func (r *probeRunner) run(probe *proto.Probe) {
time.Sleep(time.Duration(probe.InitialDelaySeconds) * time.Second)
}

// launch report loop first
r.launchReportLoop(probe)

r.launchRunLoop(probe)
}

func (r *probeRunner) launchRunLoop(probe *proto.Probe) {
if probe.PeriodSeconds <= 0 {
probe.PeriodSeconds = defaultProbePeriodSeconds
}
Expand All @@ -115,8 +124,12 @@ func (r *probeRunner) run(probe *proto.Probe) {
}

func (r *probeRunner) runLoop(probe *proto.Probe) {
runOnce := func() ([]byte, error) {
return r.actionService.handleRequest(context.Background(), &proto.ActionRequest{Action: probe.Action})
}

for range r.ticker.C {
output, err := r.runOnce(probe)
output, err := runOnce()
if err == nil {
r.succeedCount++
r.failedCount = 0
Expand All @@ -133,18 +146,47 @@ func (r *probeRunner) runLoop(probe *proto.Probe) {
}
}

func (r *probeRunner) runOnce(probe *proto.Probe) ([]byte, error) {
return r.actionService.handleRequest(context.Background(), &proto.ActionRequest{Action: probe.Action})
func (r *probeRunner) launchReportLoop(probe *proto.Probe) {
if probe.ReportPeriodSeconds <= 0 {
return
}

if probe.ReportPeriodSeconds < probe.PeriodSeconds {
probe.ReportPeriodSeconds = probe.PeriodSeconds
}

go func() {
ticker := time.NewTicker(time.Duration(probe.ReportPeriodSeconds) * time.Second)
defer ticker.Stop()

for range ticker.C {
latestEvent := gather(r.latestEvent)
if latestEvent != nil {
r.sendEvent(latestEvent)
}
}
}()
}

func (r *probeRunner) report(probe *proto.Probe, output []byte, err error) {
var latestEvent *proto.ProbeEvent

succeed, thresholdPoint := r.succeed(probe)
if succeed && thresholdPoint ||
succeed && !thresholdPoint && !reflect.DeepEqual(output, r.latestOutput) {
r.sendEvent(probe.Instance, probe.Action, 0, output, "")
latestEvent = r.buildNSendEvent(probe.Instance, probe.Action, 0, output, "")
}
if r.fail(probe) {
r.sendEvent(probe.Instance, probe.Action, -1, r.latestOutput, err.Error())
latestEvent = r.buildNSendEvent(probe.Instance, probe.Action, -1, r.latestOutput, err.Error())
}

if latestEvent != nil {
select {
case r.latestEvent <- *latestEvent:
default:
gather(r.latestEvent) // drain the channel
r.latestEvent <- *latestEvent
}
}
}

Expand All @@ -170,7 +212,7 @@ func (r *probeRunner) fail(probe *proto.Probe) bool {
return false
}

func (r *probeRunner) sendEvent(instance, probe string, code int32, output []byte, message string) {
func (r *probeRunner) buildNSendEvent(instance, probe string, code int32, output []byte, message string) *proto.ProbeEvent {
prefixLen := min(len(output), 32)
r.logger.Info("send probe event", "code", code, "output", string(output[:prefixLen]), "message", message)

Expand All @@ -181,10 +223,16 @@ func (r *probeRunner) sendEvent(instance, probe string, code int32, output []byt
Output: output,
Message: message,
}
r.sendEvent(event)
return event
}

func (r *probeRunner) sendEvent(event *proto.ProbeEvent) {
msg, err := json.Marshal(&event)
if err != nil {
r.logger.Error(err, "failed to marshal probe event")
return
if err == nil {
util.SendEventWithMessage(&r.logger, event.Probe, string(msg))
} else {
r.logger.Error(err, fmt.Sprintf("failed to marshal probe event, probe: %s, code: %d, message: %s",
event.Probe, event.Code, event.Message))
}
util.SendEventWithMessage(&r.logger, probe, string(msg))
}
3 changes: 1 addition & 2 deletions pkg/kbagent/service/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var _ = Describe("probe", func() {
PeriodSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 1,
ReportPeriodSeconds: nil,
ReportPeriodSeconds: 0,
},
}

Expand All @@ -62,7 +62,6 @@ var _ = Describe("probe", func() {
Expect(err).Should(BeNil())
})

// func newProbeService(logger logr.Logger, actionService *actionService, probes []proto.Probe) (*probeService, error) {
It("new", func() {
service, err := newProbeService(logr.New(nil), actionSvc, probes)
Expect(err).Should(BeNil())
Expand Down

0 comments on commit c7faf48

Please sign in to comment.