From a1e1afe99353ffc0dc7fab76298b45a5e68a1d36 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 24 Jan 2023 10:50:02 +0100 Subject: [PATCH] Tag agent logs with 'service.name' (#2044) (#2159) Tag agent logs with 'service.name' (#2044) (cherry picked from commit cd31f99219a272d4d2a806b90b75d4d2a093960c) Co-authored-by: Michal Pristas --- internal/pkg/agent/application/application.go | 3 +- .../application/monitoring/v1_monitor.go | 9 ++++++ internal/pkg/agent/cmd/run.go | 8 +++-- pkg/component/runtime/command.go | 26 ++++++++-------- pkg/component/runtime/manager.go | 6 ++-- pkg/component/runtime/manager_test.go | 30 +++++++++---------- pkg/component/runtime/runtime.go | 4 +-- 7 files changed, 51 insertions(+), 35 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index dceb31b0dab..48d7b653fab 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -30,6 +30,7 @@ import ( // New creates a new Agent and bootstrap the required subsystem. func New( log *logger.Logger, + baseLogger *logger.Logger, logLevel logp.Level, agentInfo *info.AgentInfo, reexec coordinator.ReExecManager, @@ -73,7 +74,7 @@ func New( upgrader := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo) monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo) - runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), agentInfo, tracer, monitor, cfg.Settings.GRPC) + runtime, err := runtime.NewManager(log, baseLogger, cfg.Settings.GRPC.String(), agentInfo, tracer, monitor, cfg.Settings.GRPC) if err != nil { return nil, fmt.Errorf("failed to initialize runtime manager: %w", err) } diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 1ad5b0984f1..b534e0eb98c 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -477,6 +477,15 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] }, }, }, + map[string]interface{}{ + // injecting component log source to stay aligned with command runtime logs + "add_fields": map[string]interface{}{ + "target": "log", + "fields": map[string]interface{}{ + "source": comp.ID, + }, + }, + }, }, }) } diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index d87d004c230..003c137a132 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -109,11 +109,15 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error { if cfg.Settings.LoggingConfig != nil { logLvl = cfg.Settings.LoggingConfig.Level } - l, err := logger.NewFromConfig("", cfg.Settings.LoggingConfig, true) + baseLogger, err := logger.NewFromConfig("", cfg.Settings.LoggingConfig, true) if err != nil { return err } + l := baseLogger.With("log", map[string]interface{}{ + "source": agentName, + }) + cfg, err = tryDelayEnroll(ctx, l, cfg, override) if err != nil { err = errors.New(err, "failed to perform delayed enrollment") @@ -188,7 +192,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error { l.Info("APM instrumentation disabled") } - coord, err := application.New(l, logLvl, agentInfo, rex, tracer, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...) + coord, err := application.New(l, baseLogger, logLvl, agentInfo, rex, tracer, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...) if err != nil { return err } diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 778ce38331e..bc6bab76dcb 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -55,7 +55,6 @@ type procState struct { // CommandRuntime provides the command runtime for running a component as a subprocess. type CommandRuntime struct { - logger *logger.Logger logStd *logWriter logErr *logWriter @@ -77,7 +76,7 @@ type CommandRuntime struct { } // NewCommandRuntime creates a new command runtime for the provided component. -func NewCommandRuntime(comp component.Component, logger *logger.Logger, monitor MonitoringManager) (ComponentRuntime, error) { +func NewCommandRuntime(comp component.Component, monitor MonitoringManager) (ComponentRuntime, error) { c := &CommandRuntime{ current: comp, monitor: monitor, @@ -92,11 +91,6 @@ func NewCommandRuntime(comp component.Component, logger *logger.Logger, monitor if cmdSpec == nil { return nil, errors.New("must have command defined in specification") } - c.logger = logger.With("component", map[string]interface{}{ - "id": comp.ID, - "type": c.getSpecType(), - "binary": c.getSpecBinaryName(), - }) ll, unitLevels := getLogLevels(comp) c.logStd = createLogWriter(c.current, c.getCommandSpec(), c.getSpecType(), c.getSpecBinaryName(), ll, unitLevels, logSourceStdout) ll, unitLevels = getLogLevels(comp) // don't want to share mapping of units (so new map is generated) @@ -500,12 +494,18 @@ func attachOutErr(stdOut *logWriter, stdErr *logWriter) process.CmdOption { func createLogWriter(comp component.Component, cmdSpec *component.CommandSpec, typeStr string, binaryName string, ll zapcore.Level, unitLevels map[string]zapcore.Level, src logSource) *logWriter { dataset := fmt.Sprintf("elastic_agent.%s", strings.ReplaceAll(strings.ReplaceAll(binaryName, "-", "_"), "/", "_")) - logger := logger.NewWithoutConfig("").With("component", map[string]interface{}{ - "id": comp.ID, - "type": typeStr, - "binary": binaryName, - "dataset": dataset, - }) + logger := logger.NewWithoutConfig(""). + With( + "component", map[string]interface{}{ + "id": comp.ID, + "type": typeStr, + "binary": binaryName, + "dataset": dataset, + }, + "log", map[string]interface{}{ + "source": comp.ID, + }, + ) return newLogWriter(logger.Core(), cmdSpec.Log, ll, unitLevels, src) } diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 5ee1ccc672d..e016b883a1f 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -77,6 +77,7 @@ type Manager struct { proto.UnimplementedElasticAgentServer logger *logger.Logger + baseLogger *logger.Logger ca *authority.CertificateAuthority listenAddr string agentInfo *info.AgentInfo @@ -106,13 +107,14 @@ type Manager struct { } // NewManager creates a new manager. -func NewManager(logger *logger.Logger, listenAddr string, agentInfo *info.AgentInfo, tracer *apm.Tracer, monitor MonitoringManager, grpcConfig *configuration.GRPCConfig) (*Manager, error) { +func NewManager(logger, baseLogger *logger.Logger, listenAddr string, agentInfo *info.AgentInfo, tracer *apm.Tracer, monitor MonitoringManager, grpcConfig *configuration.GRPCConfig) (*Manager, error) { ca, err := authority.NewCA() if err != nil { return nil, err } m := &Manager{ logger: logger, + baseLogger: baseLogger, ca: ca, listenAddr: listenAddr, agentInfo: agentInfo, @@ -655,7 +657,7 @@ func (m *Manager) update(components []component.Component, teardown bool) error } } else { // new component; create its runtime - logger := m.logger.Named(fmt.Sprintf("component.runtime.%s", comp.ID)) + logger := m.baseLogger.Named(fmt.Sprintf("component.runtime.%s", comp.ID)) state, err := newComponentRuntimeState(m, logger, m.monitor, comp) if err != nil { return fmt.Errorf("failed to create new component %s: %w", comp.ID, err) diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index c930233e704..dd2e330a736 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -64,7 +64,7 @@ func TestManager_SimpleComponentErr(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -166,7 +166,7 @@ func TestManager_FakeInput_StartStop(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -292,7 +292,7 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -465,7 +465,7 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -622,7 +622,7 @@ func TestManager_FakeInput_Configure(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -749,7 +749,7 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -909,7 +909,7 @@ func TestManager_FakeInput_ActionState(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1040,7 +1040,7 @@ func TestManager_FakeInput_Restarts(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1182,7 +1182,7 @@ func TestManager_FakeInput_Restarts_ConfigKill(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1331,7 +1331,7 @@ func TestManager_FakeInput_KeepsRestarting(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1480,7 +1480,7 @@ func TestManager_FakeInput_RestartsOnMissedCheckins(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1601,7 +1601,7 @@ func TestManager_FakeInput_InvalidAction(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1725,7 +1725,7 @@ func TestManager_FakeInput_MultiComponent(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1937,7 +1937,7 @@ func TestManager_FakeInput_LogLevel(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -2089,7 +2089,7 @@ func TestManager_FakeShipper(t *testing.T) { defer cancel() ai, _ := info.NewAgentInfo(true) - m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) require.NoError(t, err) errCh := make(chan error) go func() { diff --git a/pkg/component/runtime/runtime.go b/pkg/component/runtime/runtime.go index aa780a002e5..0ed1b46c26c 100644 --- a/pkg/component/runtime/runtime.go +++ b/pkg/component/runtime/runtime.go @@ -60,7 +60,7 @@ func NewComponentRuntime(comp component.Component, logger *logger.Logger, monito } if comp.InputSpec != nil { if comp.InputSpec.Spec.Command != nil { - return NewCommandRuntime(comp, logger, monitor) + return NewCommandRuntime(comp, monitor) } if comp.InputSpec.Spec.Service != nil { return NewServiceRuntime(comp, logger) @@ -69,7 +69,7 @@ func NewComponentRuntime(comp component.Component, logger *logger.Logger, monito } if comp.ShipperSpec != nil { if comp.ShipperSpec.Spec.Command != nil { - return NewCommandRuntime(comp, logger, monitor) + return NewCommandRuntime(comp, monitor) } return nil, errors.New("components for shippers can only support command runtime") }