diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ca378dcbc008..012a492a5342 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -345,6 +345,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added status to monitor run log report. - Upgrade node to latest LTS v18.20.3. {pull}40038[40038] - Add journey duration to synthetics browser events. {pull}40230[40230] +- Add monitor status reporter under managed mode. {pull}41077[41077] *Metricbeat* diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 29e7713145ca..8fa82c10ea14 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" ) // ErrMonitorDisabled is returned when the monitor plugin is marked as disabled. @@ -71,6 +72,11 @@ type Monitor struct { stats plugin.RegistryRecorder monitorStateTracker *monitorstate.Tracker + statusReporter status.StatusReporter +} + +func (m *Monitor) SetStatusReporter(statusReporter status.StatusReporter) { + m.statusReporter = statusReporter } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -175,6 +181,9 @@ func newMonitorUnsafe( logp.L().Error(fullErr) p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) { + // if statusReporter is set, as it is for running managed-mode, update the input status + // to failed, specifying the error + m.updateStatus(status.Failed, fmt.Sprintf("monitor could not be started: %s, err: %s", m.stdFields.ID, fullErr)) return nil, fullErr }} @@ -237,6 +246,7 @@ func (m *Monitor) Start() { m.stats.StartMonitor(int64(m.endpoints)) m.state = MON_STARTED + m.updateStatus(status.Running, "") } // Stop stops the monitor without freeing it in global dedup @@ -262,4 +272,11 @@ func (m *Monitor) Stop() { m.stats.StopMonitor(int64(m.endpoints)) m.state = MON_STOPPED + m.updateStatus(status.Stopped, "") +} + +func (m *Monitor) updateStatus(status status.Status, msg string) { + if m.statusReporter != nil { + m.statusReporter.UpdateStatus(status, msg) + } } diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 3176d27a2fa8..0890a1697bec 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -18,6 +18,7 @@ package monitors import ( + "fmt" "testing" "time" @@ -32,7 +33,9 @@ import ( "github.com/elastic/go-lookslike/testslike" "github.com/elastic/go-lookslike/validator" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/scheduler" + "github.com/elastic/beats/v7/libbeat/management/status" ) // TestMonitorBasic tests a basic config @@ -131,3 +134,60 @@ func TestCheckInvalidConfig(t *testing.T) { require.Error(t, checkMonitorConfig(serverMonConf, reg)) } + +type MockStatusReporter struct { + us func(status status.Status, msg string) +} + +func (sr *MockStatusReporter) UpdateStatus(status status.Status, msg string) { + sr.us(status, msg) +} + +func TestStatusReporter(t *testing.T) { + confMap := map[string]interface{}{ + "type": "fail", + "urls": []string{"http://example.net"}, + "schedule": "@every 1ms", + "name": "myName", + "id": "myId", + } + cfg, err := conf.NewConfigFrom(confMap) + require.NoError(t, err) + + reg, _, _ := mockPluginsReg() + pipel := &MockPipeline{} + monReg := monitoring.NewRegistry() + + mockDegradedPluginFactory := plugin.PluginFactory{ + Name: "fail", + Aliases: []string{"failAlias"}, + Make: func(s string, cfg *conf.C) (plugin.Plugin, error) { + return plugin.Plugin{}, fmt.Errorf("error plugin") + }, + Stats: plugin.NewPluginCountersRecorder("fail", monReg), + } + _ = reg.Add(mockDegradedPluginFactory) + + sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, true) + defer sched.Stop() + + c, err := pipel.Connect() + require.NoError(t, err) + m, err := newMonitor(cfg, reg, c, sched.Add, nil, nil) + require.NoError(t, err) + + // Track status marked as failed during run_once execution + failed := false + m.SetStatusReporter(&MockStatusReporter{ + us: func(s status.Status, msg string) { + if s == status.Failed { + failed = true + } + }, + }) + m.Start() + + sched.WaitForRunOnce() + + require.True(t, failed) +}