Skip to content

Commit

Permalink
[service] Remove servicetelemetry.TelemetrySettings (#10728)
Browse files Browse the repository at this point in the history
#### Description
Reorganizes service to not require `servicetelemetry.TelemetrySettings`
and instead depend directly on `component.TelemetrySettings`

Whether or not we move forward with
#10725 I
think this is a useful change for service.

#### Testing
Unit tests
  • Loading branch information
TylerHelmuth authored Jul 29, 2024
1 parent a6287ac commit fb5b1e6
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 235 deletions.
25 changes: 25 additions & 0 deletions .chloggen/service-remove-servicetelemetry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/extensions

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds `Options` to `extensions.New`.

# One or more tracking issues or pull requests related to the change
issues: [10728]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This is only a breaking change if you are depending on `extensions.New`'s signature. Calls to `extensions.New` are not broken.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 0 additions & 4 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
)

// TelemetrySettings provides components with APIs to report telemetry.
//
// Note: there is a service version of this struct, servicetelemetry.TelemetrySettings, that mirrors
// this struct except ReportStatus. When adding or removing anything from
// this struct consider whether the same should be done for the service version.
type TelemetrySettings struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
Expand Down
46 changes: 35 additions & 11 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/servicetelemetry"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/internal/zpages"
)

const zExtensionName = "zextensionname"

// Extensions is a map of extensions created from extension configs.
type Extensions struct {
telemetry servicetelemetry.TelemetrySettings
telemetry component.TelemetrySettings
extMap map[component.ID]extension.Extension
instanceIDs map[component.ID]*component.InstanceID
extensionIDs []component.ID // start order (and reverse stop order)
reporter status.Reporter
}

// Start starts all extensions.
Expand All @@ -38,20 +39,20 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
bes.telemetry.Status.ReportStatus(
bes.reporter.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)
if err := ext.Start(ctx, host); err != nil {
bes.telemetry.Status.ReportStatus(
bes.reporter.ReportStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
// We log with zap.AddStacktrace(zap.DPanicLevel) to avoid adding the stack trace to the error log
extLogger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error("Failed to start extension", zap.Error(err))
return err
}
bes.telemetry.Status.ReportOKIfStarting(instanceID)
bes.reporter.ReportOKIfStarting(instanceID)
extLogger.Info("Extension started.")
}
return nil
Expand All @@ -65,19 +66,19 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
bes.telemetry.Status.ReportStatus(
bes.reporter.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)
if err := ext.Shutdown(ctx); err != nil {
bes.telemetry.Status.ReportStatus(
bes.reporter.ReportStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
errs = multierr.Append(errs, err)
continue
}
bes.telemetry.Status.ReportStatus(
bes.reporter.ReportStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
Expand Down Expand Up @@ -166,31 +167,46 @@ func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) {

// Settings holds configuration for building Extensions.
type Settings struct {
Telemetry servicetelemetry.TelemetrySettings
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo

// Extensions builder for extensions.
Extensions *extension.Builder
}

type Option func(*Extensions)

func WithReporter(reporter status.Reporter) Option {
return func(e *Extensions) {
e.reporter = reporter
}
}

// New creates a new Extensions from Config.
func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {
func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Extensions, error) {
exts := &Extensions{
telemetry: set.Telemetry,
extMap: make(map[component.ID]extension.Extension),
instanceIDs: make(map[component.ID]*component.InstanceID),
extensionIDs: make([]component.ID, 0, len(cfg)),
reporter: &nopReporter{},
}

for _, opt := range options {
opt(exts)
}

for _, extID := range cfg {
instanceID := &component.InstanceID{
ID: extID,
Kind: component.KindExtension,
}
extSet := extension.Settings{
ID: extID,
TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID),
TelemetrySettings: set.Telemetry,
BuildInfo: set.BuildInfo,
}
extSet.TelemetrySettings.ReportStatus = status.NewReportStatusFunc(instanceID, exts.reporter.ReportStatus)
extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID)

ext, err := set.Extensions.Create(ctx, extSet)
Expand All @@ -213,3 +229,11 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {
exts.extensionIDs = order
return exts, nil
}

type nopReporter struct{}

func (r *nopReporter) Ready() {}

func (r *nopReporter) ReportStatus(*component.InstanceID, *component.StatusEvent) {}

func (r *nopReporter) ReportOKIfStarting(*component.InstanceID) {}
26 changes: 13 additions & 13 deletions service/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/service/internal/servicetelemetry"
"go.opentelemetry.io/collector/service/internal/status"
)

Expand Down Expand Up @@ -83,7 +82,7 @@ func TestBuildExtensions(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := New(context.Background(), Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories),
}, tt.config)
Expand Down Expand Up @@ -175,7 +174,7 @@ func (tc testOrderCase) testOrdering(t *testing.T) {
}

exts, err := New(context.Background(), Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Extensions: extension.NewBuilder(
extCfgs,
Expand Down Expand Up @@ -284,7 +283,7 @@ func TestNotifyConfig(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
extensions, err := New(context.Background(), Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories),
}, tt.serviceExtensions)
Expand Down Expand Up @@ -420,25 +419,26 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
factories := map[component.Type]extension.Factory{
statusType: factory,
}

var actualStatuses []*component.StatusEvent
rep := status.NewReporter(func(_ *component.InstanceID, ev *component.StatusEvent) {
actualStatuses = append(actualStatuses, ev)
}, func(err error) {
require.NoError(t, err)
})

extensions, err := New(
context.Background(),
Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Extensions: extension.NewBuilder(extensionsConfigs, factories),
},
[]component.ID{compID},
WithReporter(rep),
)

assert.NoError(t, err)

var actualStatuses []*component.StatusEvent
rep := status.NewReporter(func(_ *component.InstanceID, ev *component.StatusEvent) {
actualStatuses = append(actualStatuses, ev)
}, func(err error) {
require.NoError(t, err)
})
extensions.telemetry.Status = rep
rep.Ready()

assert.Equal(t, tc.startErr, extensions.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
10 changes: 6 additions & 4 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/servicetelemetry"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/pipelines"
)

// Settings holds configuration for building builtPipelines.
type Settings struct {
Telemetry servicetelemetry.TelemetrySettings
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo

ReceiverBuilder *receiver.Builder
Expand All @@ -49,6 +48,8 @@ type Settings struct {

// PipelineConfigs is a map of component.ID to PipelineConfig.
PipelineConfigs pipelines.Config

ReportStatus status.ServiceStatusFunc
}

type Graph struct {
Expand All @@ -61,7 +62,7 @@ type Graph struct {
// Keep track of status source per node
instanceIDs map[int64]*component.InstanceID

telemetry servicetelemetry.TelemetrySettings
telemetry component.TelemetrySettings
}

// Build builds a full pipeline graph.
Expand Down Expand Up @@ -301,7 +302,8 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
// skipped for capabilitiesNodes and fanoutNodes as they are not assigned componentIDs.
var telemetrySettings component.TelemetrySettings
if instanceID, ok := g.instanceIDs[node.ID()]; ok {
telemetrySettings = set.Telemetry.ToComponentTelemetrySettings(instanceID)
telemetrySettings = set.Telemetry
telemetrySettings.ReportStatus = status.NewReportStatusFunc(instanceID, set.ReportStatus)
}

switch n := node.(type) {
Expand Down
16 changes: 7 additions & 9 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/service/internal/servicetelemetry"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/internal/status/statustest"
"go.opentelemetry.io/collector/service/internal/testcomponents"
Expand Down Expand Up @@ -145,7 +144,7 @@ func TestGraphStartStop(t *testing.T) {
}

pg := &Graph{componentGraph: simple.NewDirectedGraph()}
pg.telemetry = servicetelemetry.NewNopTelemetrySettings()
pg.telemetry = componenttest.NewNopTelemetrySettings()
pg.instanceIDs = make(map[int64]*component.InstanceID)

for _, edge := range tt.edges {
Expand Down Expand Up @@ -200,7 +199,7 @@ func TestGraphStartStopCycle(t *testing.T) {

func TestGraphStartStopComponentError(t *testing.T) {
pg := &Graph{componentGraph: simple.NewDirectedGraph()}
pg.telemetry = servicetelemetry.NewNopTelemetrySettings()
pg.telemetry = componenttest.NewNopTelemetrySettings()
r1 := &testNode{
id: component.MustNewIDWithName("r", "1"),
startErr: errors.New("foo"),
Expand Down Expand Up @@ -719,7 +718,7 @@ func TestConnectorPipelinesGraph(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// Build the pipeline
set := Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
Expand Down Expand Up @@ -1006,7 +1005,7 @@ func TestConnectorRouter(t *testing.T) {

ctx := context.Background()
set := Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
Expand Down Expand Up @@ -2050,7 +2049,7 @@ func TestGraphBuildErrors(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
set := Settings{
BuildInfo: component.NewDefaultBuildInfo(),
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
ReceiverBuilder: receiver.NewBuilder(
test.receiverCfgs,
map[component.Type]receiver.Factory{
Expand Down Expand Up @@ -2097,7 +2096,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
nopConnectorFactory := connectortest.NewNopFactory()

set := Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
Expand Down Expand Up @@ -2333,15 +2332,14 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
pg := &Graph{componentGraph: simple.NewDirectedGraph()}
pg.telemetry = servicetelemetry.NewNopTelemetrySettings()
pg.telemetry = componenttest.NewNopTelemetrySettings()

actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent)
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses[id] = append(actualStatuses[id], ev)
}, func(error) {
})

pg.telemetry.Status = rep
rep.Ready()

e0, e1 := tc.edge[0], tc.edge[1]
Expand Down
5 changes: 2 additions & 3 deletions service/internal/proctelemetry/process_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/metadata"
"go.opentelemetry.io/collector/service/internal/servicetelemetry"
)

// processMetrics is a struct that contains views related to process metrics (cpu, mem, etc)
Expand Down Expand Up @@ -53,7 +52,7 @@ func WithHostProc(hostProc string) RegisterOption {

// RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...RegisterOption) error {
func RegisterProcessMetrics(cfg component.TelemetrySettings, opts ...RegisterOption) error {
set := registerOption{}
for _, opt := range opts {
opt.apply(&set)
Expand All @@ -74,7 +73,7 @@ func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...Regi
return err
}

_, err = metadata.NewTelemetryBuilder(cfg.ToComponentTelemetrySettings(&component.InstanceID{}),
_, err = metadata.NewTelemetryBuilder(cfg,
metadata.WithProcessUptimeCallback(pm.updateProcessUptime),
metadata.WithProcessRuntimeHeapAllocBytesCallback(pm.updateAllocMem),
metadata.WithProcessRuntimeTotalAllocBytesCallback(pm.updateTotalAllocMem),
Expand Down
Loading

0 comments on commit fb5b1e6

Please sign in to comment.