Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][graph] Remove connectorNode's separate baseConsumer #11333

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 56 additions & 109 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type connectorNode struct {
exprPipelineType pipeline.Signal
rcvrPipelineType pipeline.Signal
component.Component
baseConsumer
}

func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode {
Expand All @@ -43,7 +42,7 @@ func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID
}

func (n *connectorNode) getConsumer() baseConsumer {
return n.baseConsumer
return n.Component.(baseConsumer)
}

func (n *connectorNode) buildComponent(
Expand Down Expand Up @@ -80,40 +79,27 @@ func (n *connectorNode) buildTraces(
}
next := connector.NewTracesRouter(consumers)

var err error
switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToTraces(ctx, set, next)
var conn connector.Traces
conn, err = builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err
n.Component = componentTraces{
Component: conn,
Traces: capabilityconsumer.NewTraces(conn, aggregateCap(conn, nexts)),
}
n.Component, n.baseConsumer = conn, conn
return nil
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalTraces {
n.baseConsumer = capabilityconsumer.NewTraces(
n.Component.(connector.Traces),
aggregateCapabilities(n.baseConsumer, nexts),
)
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
}
return nil
return err
}

func (n *connectorNode) buildMetrics(
Expand All @@ -128,40 +114,27 @@ func (n *connectorNode) buildMetrics(
}
next := connector.NewMetricsRouter(consumers)

var err error
switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
var conn connector.Metrics
conn, err = builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err
n.Component = componentMetrics{
Component: conn,
Metrics: capabilityconsumer.NewMetrics(conn, aggregateCap(conn, nexts)),
}
n.Component, n.baseConsumer = conn, conn
return nil
case pipeline.SignalTraces:
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalMetrics {
n.baseConsumer = capabilityconsumer.NewMetrics(
n.Component.(connector.Metrics),
aggregateCapabilities(n.baseConsumer, nexts),
)
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
}
return nil
return err
}

func (n *connectorNode) buildLogs(
Expand All @@ -176,40 +149,27 @@ func (n *connectorNode) buildLogs(
}
next := connector.NewLogsRouter(consumers)

var err error
switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
var conn connector.Logs
conn, err = builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err
n.Component = componentLogs{
Component: conn,
Logs: capabilityconsumer.NewLogs(conn, aggregateCap(conn, nexts)),
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalLogs {
n.baseConsumer = capabilityconsumer.NewLogs(
n.Component.(connector.Logs),
aggregateCapabilities(n.baseConsumer, nexts),
)
return nil
case pipeline.SignalTraces:
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
}
return nil
return err
}

func (n *connectorNode) buildProfiles(
Expand All @@ -224,47 +184,34 @@ func (n *connectorNode) buildProfiles(
}
next := connectorprofiles.NewProfilesRouter(consumers)

var err error
switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
case componentprofiles.SignalProfiles:
var conn connectorprofiles.Profiles
conn, err = builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err
n.Component = componentProfiles{
Component: conn,
Profiles: capabilityconsumer.NewProfiles(conn, aggregateCap(conn, nexts)),
}
n.Component, n.baseConsumer = conn, conn
return nil
case pipeline.SignalTraces:
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
}

if n.exprPipelineType == componentprofiles.SignalProfiles {
n.baseConsumer = capabilityconsumer.NewProfiles(
n.Component.(connectorprofiles.Profiles),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
return err
}

// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
// that the connector itself may mutate the data and pass it along.
func aggregateCap(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
capabilities := base.Capabilities()
for _, next := range nexts {
capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData
Expand Down
22 changes: 22 additions & 0 deletions service/internal/graph/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
)

// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
Expand All @@ -15,3 +17,23 @@ type baseConsumer interface {
type consumerNode interface {
getConsumer() baseConsumer
}

type componentTraces struct {
component.Component
consumer.Traces
}

type componentMetrics struct {
component.Component
consumer.Metrics
}

type componentLogs struct {
component.Component
consumer.Logs
}

type componentProfiles struct {
component.Component
consumerprofiles.Profiles
}
57 changes: 12 additions & 45 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectorprofiles"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -832,17 +831,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
require.Empty(t, e.Logs)
require.Empty(t, e.Profiles)
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Started())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand All @@ -857,17 +848,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
case *receiverNode:
require.True(t, c.Component.(*testcomponents.ExampleReceiver).Started())
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Started())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand Down Expand Up @@ -937,17 +920,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
case *receiverNode:
require.True(t, c.Component.(*testcomponents.ExampleReceiver).Stopped())
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Stopped())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand All @@ -963,17 +938,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
e := c.Component.(*testcomponents.ExampleExporter)
require.True(t, e.Stopped())
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Stopped())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand Down
Loading
Loading