Skip to content

Commit

Permalink
Inject custom headers defined during enroll (#2158) (#2163)
Browse files Browse the repository at this point in the history
Inject custom headers defined during enroll (#2158)

(cherry picked from commit a8b36d0)

Co-authored-by: Michal Pristas <michal.pristas@gmail.com>
  • Loading branch information
mergify[bot] and michalpristas authored Jan 24, 2023
1 parent a1e1afe commit b8553c5
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 11 deletions.
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (c *Coordinator) compute() (map[string]interface{}, []component.Component,
configInjector = c.monitorMgr.MonitoringConfig
}

comps, err := c.specs.ToComponents(cfg, configInjector, c.state.logLevel)
comps, err := c.specs.ToComponents(cfg, configInjector, c.state.logLevel, c.agentInfo)
if err != nil {
return nil, nil, fmt.Errorf("failed to render components: %w", err)
}
Expand Down
14 changes: 12 additions & 2 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
return err
}

agentInfo, err := info.NewAgentInfoWithLog("error", false)
if err != nil {
return fmt.Errorf("could not load agent info: %w", err)
}

if opts.includeMonitoring {
// Load the requirements before trying to load the configuration. These should always load
// even if the configuration is wrong.
Expand All @@ -160,7 +165,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
if err != nil {
return fmt.Errorf("failed to get monitoring: %w", err)
}
components, binaryMapping, err := specs.PolicyToComponents(cfg, lvl)
components, binaryMapping, err := specs.PolicyToComponents(cfg, lvl, agentInfo)
if err != nil {
return fmt.Errorf("failed to get binary mappings: %w", err)
}
Expand Down Expand Up @@ -252,8 +257,13 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen
return fmt.Errorf("failed to get monitoring: %w", err)
}

agentInfo, err := info.NewAgentInfoWithLog("error", false)
if err != nil {
return fmt.Errorf("could not load agent info: %w", err)
}

// Compute the components from the computed configuration.
comps, err := specs.ToComponents(m, monitorFn, lvl)
comps, err := specs.ToComponents(m, monitorFn, lvl, agentInfo)
if err != nil {
return fmt.Errorf("failed to render components: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/install/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Confi
if err != nil {
return nil, errors.New("failed to create a map from config", err)
}
allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel)
allComps, err := specs.ToComponents(mm, nil, logp.InfoLevel, nil)
if err != nil {
return nil, fmt.Errorf("failed to render components: %w", err)
}
Expand Down
40 changes: 34 additions & 6 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ import (
// GenerateMonitoringCfgFn is a function that can inject information into the model generation process.
type GenerateMonitoringCfgFn func(map[string]interface{}, []Component, map[string]string) (map[string]interface{}, error)

type HeadersProvider interface {
Headers() map[string]string
}

const (
// defaultUnitLogLevel is the default log level that a unit will get if one is not defined.
defaultUnitLogLevel = client.UnitLogLevelInfo
headersKey = "headers"
elasticsearchType = "elasticsearch"
)

// ErrInputRuntimeCheckFail error is used when an input specification runtime prevention check occurs.
Expand Down Expand Up @@ -105,8 +111,8 @@ func (c *Component) Type() string {
}

// ToComponents returns the components that should be running based on the policy and the current runtime specification.
func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInjector GenerateMonitoringCfgFn, ll logp.Level) ([]Component, error) {
components, binaryMapping, err := r.PolicyToComponents(policy, ll)
func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInjector GenerateMonitoringCfgFn, ll logp.Level, headers HeadersProvider) ([]Component, error) {
components, binaryMapping, err := r.PolicyToComponents(policy, ll, headers)
if err != nil {
return nil, err
}
Expand All @@ -119,7 +125,7 @@ func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInj

if monitoringCfg != nil {
// monitoring is enabled
monitoringComps, _, err := r.PolicyToComponents(monitoringCfg, ll)
monitoringComps, _, err := r.PolicyToComponents(monitoringCfg, ll, headers)
if err != nil {
return nil, fmt.Errorf("failed to generate monitoring components: %w", err)
}
Expand All @@ -133,8 +139,8 @@ func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInj

// PolicyToComponents takes the policy and generated a component model along with providing a mapping between component
// and the running binary.
func (r *RuntimeSpecs) PolicyToComponents(policy map[string]interface{}, ll logp.Level) ([]Component, map[string]string, error) {
outputsMap, err := toIntermediate(policy, r.aliasMapping, ll)
func (r *RuntimeSpecs) PolicyToComponents(policy map[string]interface{}, ll logp.Level, headers HeadersProvider) ([]Component, map[string]string, error) {
outputsMap, err := toIntermediate(policy, r.aliasMapping, ll, headers)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -421,7 +427,7 @@ func getSupportedShipper(r *RuntimeSpecs, output outputI, inputSpec InputRuntime

// toIntermediate takes the policy and returns it into an intermediate representation that is easier to map into a set
// of components.
func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level) (map[string]outputI, error) {
func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level, headers HeadersProvider) (map[string]outputI, error) {
const (
outputsKey = "outputs"
enabledKey = "enabled"
Expand Down Expand Up @@ -470,6 +476,28 @@ func toIntermediate(policy map[string]interface{}, aliasMapping map[string]strin
if err != nil {
return nil, fmt.Errorf("invalid 'outputs.%s.log_level', %w", name, err)
}

// inject headers configured during enroll
if t == elasticsearchType && headers != nil {
// can be nil when called from install/uninstall
if agentHeaders := headers.Headers(); len(agentHeaders) > 0 {
headers := make(map[string]interface{})
if existingHeadersRaw, found := output[headersKey]; found {
existingHeaders, ok := existingHeadersRaw.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid 'outputs.headers', expected a map not a %T", outputRaw)
}
headers = existingHeaders
}

for headerName, headerVal := range agentHeaders {
headers[headerName] = headerVal
}

output[headersKey] = headers
}
}

outputsMap[name] = outputI{
name: name,
enabled: enabled,
Expand Down
220 changes: 219 additions & 1 deletion pkg/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestToComponents(t *testing.T) {
LogLevel logp.Level
Err string
Result []Component
headers HeadersProvider
}{
{
Name: "Empty policy",
Expand Down Expand Up @@ -1588,14 +1589,223 @@ func TestToComponents(t *testing.T) {
},
},
},
{
Name: "Headers injection",
Platform: linuxAMD64Platform,
Policy: map[string]interface{}{
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"type": "elasticsearch",
"enabled": true,
},
},
"inputs": []interface{}{
map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
},
},
},
Result: []Component{
{
InputSpec: &InputRuntimeSpec{
InputType: "filestream",
BinaryName: "filebeat",
BinaryPath: filepath.Join("..", "..", "specs", "filebeat"),
},
Units: []Unit{
{
ID: "filestream-default",
Type: client.UnitTypeOutput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "elasticsearch",
"headers": map[string]interface{}{
"header-one": "val-1",
},
}),
},
{
ID: "filestream-default-filestream-0",
Type: client.UnitTypeInput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
}),
},
},
},
},
headers: &testHeadersProvider{headers: map[string]string{
"header-one": "val-1",
}},
}, {
Name: "Headers injection merge",
Platform: linuxAMD64Platform,
Policy: map[string]interface{}{
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"type": "elasticsearch",
"enabled": true,
"headers": map[string]interface{}{
"header-two": "val-2",
},
},
},
"inputs": []interface{}{
map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
},
},
},
Result: []Component{
{
InputSpec: &InputRuntimeSpec{
InputType: "filestream",
BinaryName: "filebeat",
BinaryPath: filepath.Join("..", "..", "specs", "filebeat"),
},
Units: []Unit{
{
ID: "filestream-default",
Type: client.UnitTypeOutput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "elasticsearch",
"headers": map[string]interface{}{
"header-two": "val-2",
"header-one": "val-1",
},
}),
},
{
ID: "filestream-default-filestream-0",
Type: client.UnitTypeInput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
}),
},
},
},
},
headers: &testHeadersProvider{headers: map[string]string{
"header-one": "val-1",
}},
},
{
Name: "Headers injection not injecting kafka",
Platform: linuxAMD64Platform,
Policy: map[string]interface{}{
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"type": "kafka",
"enabled": true,
},
},
"inputs": []interface{}{
map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
},
},
},
Result: []Component{
{
InputSpec: &InputRuntimeSpec{
InputType: "filestream",
BinaryName: "filebeat",
BinaryPath: filepath.Join("..", "..", "specs", "filebeat"),
},
Units: []Unit{
{
ID: "filestream-default",
Type: client.UnitTypeOutput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "kafka",
}),
},
{
ID: "filestream-default-filestream-0",
Type: client.UnitTypeInput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
}),
},
},
},
},
headers: &testHeadersProvider{headers: map[string]string{
"header-one": "val-1",
}},
},
{
Name: "Headers injection not injecting logstash",
Platform: linuxAMD64Platform,
Policy: map[string]interface{}{
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"type": "logstash",
"enabled": true,
},
},
"inputs": []interface{}{
map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
},
},
},
Result: []Component{
{
InputSpec: &InputRuntimeSpec{
InputType: "filestream",
BinaryName: "filebeat",
BinaryPath: filepath.Join("..", "..", "specs", "filebeat"),
},
Units: []Unit{
{
ID: "filestream-default",
Type: client.UnitTypeOutput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "logstash",
}),
},
{
ID: "filestream-default-filestream-0",
Type: client.UnitTypeInput,
LogLevel: defaultUnitLogLevel,
Config: MustExpectedConfig(map[string]interface{}{
"type": "filestream",
"id": "filestream-0",
}),
},
},
},
},
headers: &testHeadersProvider{headers: map[string]string{
"header-one": "val-1",
}},
},
}

for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), scenario.Platform, SkipBinaryCheck())
require.NoError(t, err)

result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel)
result, err := runtime.ToComponents(scenario.Policy, nil, scenario.LogLevel, scenario.headers)
if scenario.Err != "" {
assert.Equal(t, scenario.Err, err.Error())
} else {
Expand Down Expand Up @@ -1755,3 +1965,11 @@ func mustExpectedConfigForceType(cfg map[string]interface{}, forceType string) *
res.Type = forceType
return res
}

type testHeadersProvider struct {
headers map[string]string
}

func (h *testHeadersProvider) Headers() map[string]string {
return h.headers
}

0 comments on commit b8553c5

Please sign in to comment.