Skip to content

Commit

Permalink
[Kubernetes Integration] Fix for apiserver token expiration (elastic#…
Browse files Browse the repository at this point in the history
…42016)

* initial fix for apiserver

* adding fix for controller and schedule
  • Loading branch information
gizas authored Jan 7, 2025
1 parent ac57409 commit 7e25c4d
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Do not report non-existant 0 values for RSS metrics in docker/memory {pull}41449[41449]
- Log Cisco Meraki `getDevicePerformanceScores` errors without stopping metrics collection. {pull}41622[41622]
- Don't skip first bucket value in GCP metrics metricset for distribution type metrics {pull}41822[41822]
- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016]
- Fixed `creation_date` scientific notation output in the `elasticsearch.index` metricset. {pull}42053[42053]
- Fix bug where metricbeat unintentionally triggers Windows ASR. {pull}42177[42177]

Expand Down
12 changes: 12 additions & 0 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1`

// Prometheus helper retrieves prometheus formatted metrics
type Prometheus interface {
// GetHttp returns the HTTP Client that handles the connection towards remote endpoint
GetHttp() (*helper.HTTP, error)

// GetFamilies requests metric families from prometheus endpoint and returns them
GetFamilies() ([]*MetricFamily, error)

Expand Down Expand Up @@ -66,6 +69,15 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) {
return &prometheus{http, base.Logger()}, nil
}

// GetHttp returns HTTP Client
func (p *prometheus) GetHttp() (*helper.HTTP, error) {
httpClient, ok := p.httpfetcher.(*helper.HTTP)
if !ok {
return nil, fmt.Errorf("httpfetcher is not of type *helper.HTTP")
}
return httpClient, nil
}

// GetFamilies requests metric families from prometheus endpoint and returns them
func (p *prometheus) GetFamilies() ([]*MetricFamily, error) {
var reader io.Reader
Expand Down
59 changes: 47 additions & 12 deletions metricbeat/module/kubernetes/apiserver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@ package apiserver

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// Metricset for apiserver is a prometheus based metricset
type Metricset struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

var _ mb.ReportingMetricSetV2Error = (*Metricset)(nil)
Expand All @@ -41,11 +48,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &Metricset{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}

return ms, nil
Expand All @@ -54,20 +73,36 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch gathers information from the apiserver and reports events with this information.
func (m *Metricset) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
errorString := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}

for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
return nil
}

return nil
}
59 changes: 48 additions & 11 deletions metricbeat/module/kubernetes/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ package controllermanager

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -74,9 +79,11 @@ func init() {
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -87,31 +94,61 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &MetricSet{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}
return ms, nil
}

// Fetch gathers information from the apiserver and reports events with this information.
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
errorString := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
}

return nil
return nil
}
}
60 changes: 48 additions & 12 deletions metricbeat/module/kubernetes/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ package scheduler

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -78,9 +83,11 @@ func init() {
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
prometheusClient prometheus.Prometheus
prometheusMappings *prometheus.MetricsMapping
clusterMeta mapstr.M
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -91,32 +98,61 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

http, err := pc.GetHttp()
if err != nil {
return nil, fmt.Errorf("the http connection is not valid")
}
ms := &MetricSet{
BaseMetricSet: base,
http: http,
prometheusClient: pc,
prometheusMappings: mapping,
clusterMeta: util.AddClusterECSMeta(base),
mod: mod,
}
return ms, nil
}

// Fetch gathers information from the apiserver and reports events with this information.
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
errorString := fmt.Sprintf("%s", err)
errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized)
if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) {
count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error
for count > 0 {
if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil {
events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings)
}
if err != nil {
time.Sleep(m.mod.Config().Period)
count--
} else {
break
}
}
}
// We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed
if err != nil {
return fmt.Errorf("error getting metrics: %w", err)
}

for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
} else {
for _, e := range events {
event := mb.TransformMapStrToEvent("kubernetes", e, nil)
if len(m.clusterMeta) != 0 {
event.RootFields.DeepUpdate(m.clusterMeta)
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}
isOpen := reporter.Event(event)
if !isOpen {
return nil
}
}

return nil
return nil
}
}

0 comments on commit 7e25c4d

Please sign in to comment.