Skip to content

Commit

Permalink
Tag agent logs with 'service.name' (#2044) (#2159)
Browse files Browse the repository at this point in the history
Tag agent logs with 'service.name' (#2044)

(cherry picked from commit cd31f99)

Co-authored-by: Michal Pristas <michal.pristas@gmail.com>
  • Loading branch information
mergify[bot] and michalpristas authored Jan 24, 2023
1 parent 35ed09a commit a1e1afe
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 35 deletions.
3 changes: 2 additions & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
// New creates a new Agent and bootstrap the required subsystem.
func New(
log *logger.Logger,
baseLogger *logger.Logger,
logLevel logp.Level,
agentInfo *info.AgentInfo,
reexec coordinator.ReExecManager,
Expand Down Expand Up @@ -73,7 +74,7 @@ func New(
upgrader := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)

runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), agentInfo, tracer, monitor, cfg.Settings.GRPC)
runtime, err := runtime.NewManager(log, baseLogger, cfg.Settings.GRPC.String(), agentInfo, tracer, monitor, cfg.Settings.GRPC)
if err != nil {
return nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
}
Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,15 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
},
},
},
map[string]interface{}{
// injecting component log source to stay aligned with command runtime logs
"add_fields": map[string]interface{}{
"target": "log",
"fields": map[string]interface{}{
"source": comp.ID,
},
},
},
},
})
}
Expand Down
8 changes: 6 additions & 2 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,15 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {
if cfg.Settings.LoggingConfig != nil {
logLvl = cfg.Settings.LoggingConfig.Level
}
l, err := logger.NewFromConfig("", cfg.Settings.LoggingConfig, true)
baseLogger, err := logger.NewFromConfig("", cfg.Settings.LoggingConfig, true)
if err != nil {
return err
}

l := baseLogger.With("log", map[string]interface{}{
"source": agentName,
})

cfg, err = tryDelayEnroll(ctx, l, cfg, override)
if err != nil {
err = errors.New(err, "failed to perform delayed enrollment")
Expand Down Expand Up @@ -188,7 +192,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {
l.Info("APM instrumentation disabled")
}

coord, err := application.New(l, logLvl, agentInfo, rex, tracer, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
coord, err := application.New(l, baseLogger, logLvl, agentInfo, rex, tracer, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
if err != nil {
return err
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type procState struct {

// CommandRuntime provides the command runtime for running a component as a subprocess.
type CommandRuntime struct {
logger *logger.Logger
logStd *logWriter
logErr *logWriter

Expand All @@ -77,7 +76,7 @@ type CommandRuntime struct {
}

// NewCommandRuntime creates a new command runtime for the provided component.
func NewCommandRuntime(comp component.Component, logger *logger.Logger, monitor MonitoringManager) (ComponentRuntime, error) {
func NewCommandRuntime(comp component.Component, monitor MonitoringManager) (ComponentRuntime, error) {
c := &CommandRuntime{
current: comp,
monitor: monitor,
Expand All @@ -92,11 +91,6 @@ func NewCommandRuntime(comp component.Component, logger *logger.Logger, monitor
if cmdSpec == nil {
return nil, errors.New("must have command defined in specification")
}
c.logger = logger.With("component", map[string]interface{}{
"id": comp.ID,
"type": c.getSpecType(),
"binary": c.getSpecBinaryName(),
})
ll, unitLevels := getLogLevels(comp)
c.logStd = createLogWriter(c.current, c.getCommandSpec(), c.getSpecType(), c.getSpecBinaryName(), ll, unitLevels, logSourceStdout)
ll, unitLevels = getLogLevels(comp) // don't want to share mapping of units (so new map is generated)
Expand Down Expand Up @@ -500,12 +494,18 @@ func attachOutErr(stdOut *logWriter, stdErr *logWriter) process.CmdOption {

func createLogWriter(comp component.Component, cmdSpec *component.CommandSpec, typeStr string, binaryName string, ll zapcore.Level, unitLevels map[string]zapcore.Level, src logSource) *logWriter {
dataset := fmt.Sprintf("elastic_agent.%s", strings.ReplaceAll(strings.ReplaceAll(binaryName, "-", "_"), "/", "_"))
logger := logger.NewWithoutConfig("").With("component", map[string]interface{}{
"id": comp.ID,
"type": typeStr,
"binary": binaryName,
"dataset": dataset,
})
logger := logger.NewWithoutConfig("").
With(
"component", map[string]interface{}{
"id": comp.ID,
"type": typeStr,
"binary": binaryName,
"dataset": dataset,
},
"log", map[string]interface{}{
"source": comp.ID,
},
)
return newLogWriter(logger.Core(), cmdSpec.Log, ll, unitLevels, src)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Manager struct {
proto.UnimplementedElasticAgentServer

logger *logger.Logger
baseLogger *logger.Logger
ca *authority.CertificateAuthority
listenAddr string
agentInfo *info.AgentInfo
Expand Down Expand Up @@ -106,13 +107,14 @@ type Manager struct {
}

// NewManager creates a new manager.
func NewManager(logger *logger.Logger, listenAddr string, agentInfo *info.AgentInfo, tracer *apm.Tracer, monitor MonitoringManager, grpcConfig *configuration.GRPCConfig) (*Manager, error) {
func NewManager(logger, baseLogger *logger.Logger, listenAddr string, agentInfo *info.AgentInfo, tracer *apm.Tracer, monitor MonitoringManager, grpcConfig *configuration.GRPCConfig) (*Manager, error) {
ca, err := authority.NewCA()
if err != nil {
return nil, err
}
m := &Manager{
logger: logger,
baseLogger: baseLogger,
ca: ca,
listenAddr: listenAddr,
agentInfo: agentInfo,
Expand Down Expand Up @@ -655,7 +657,7 @@ func (m *Manager) update(components []component.Component, teardown bool) error
}
} else {
// new component; create its runtime
logger := m.logger.Named(fmt.Sprintf("component.runtime.%s", comp.ID))
logger := m.baseLogger.Named(fmt.Sprintf("component.runtime.%s", comp.ID))
state, err := newComponentRuntimeState(m, logger, m.monitor, comp)
if err != nil {
return fmt.Errorf("failed to create new component %s: %w", comp.ID, err)
Expand Down
30 changes: 15 additions & 15 deletions pkg/component/runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestManager_SimpleComponentErr(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestManager_FakeInput_StartStop(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestManager_FakeInput_BadUnitToGood(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestManager_FakeInput_GoodUnitToBad(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -622,7 +622,7 @@ func TestManager_FakeInput_Configure(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -749,7 +749,7 @@ func TestManager_FakeInput_RemoveUnit(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -909,7 +909,7 @@ func TestManager_FakeInput_ActionState(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func TestManager_FakeInput_Restarts(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func TestManager_FakeInput_Restarts_ConfigKill(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func TestManager_FakeInput_KeepsRestarting(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1480,7 +1480,7 @@ func TestManager_FakeInput_RestartsOnMissedCheckins(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1601,7 +1601,7 @@ func TestManager_FakeInput_InvalidAction(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1725,7 +1725,7 @@ func TestManager_FakeInput_MultiComponent(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1937,7 +1937,7 @@ func TestManager_FakeInput_LogLevel(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -2089,7 +2089,7 @@ func TestManager_FakeShipper(t *testing.T) {
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/component/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewComponentRuntime(comp component.Component, logger *logger.Logger, monito
}
if comp.InputSpec != nil {
if comp.InputSpec.Spec.Command != nil {
return NewCommandRuntime(comp, logger, monitor)
return NewCommandRuntime(comp, monitor)
}
if comp.InputSpec.Spec.Service != nil {
return NewServiceRuntime(comp, logger)
Expand All @@ -69,7 +69,7 @@ func NewComponentRuntime(comp component.Component, logger *logger.Logger, monito
}
if comp.ShipperSpec != nil {
if comp.ShipperSpec.Spec.Command != nil {
return NewCommandRuntime(comp, logger, monitor)
return NewCommandRuntime(comp, monitor)
}
return nil, errors.New("components for shippers can only support command runtime")
}
Expand Down

0 comments on commit a1e1afe

Please sign in to comment.