diff --git a/pkg/exporter/kube_events_exporter.go b/pkg/exporter/kube_events_exporter.go index 288b257..dfd06ba 100644 --- a/pkg/exporter/kube_events_exporter.go +++ b/pkg/exporter/kube_events_exporter.go @@ -35,6 +35,8 @@ type K8sEventSource struct { inf cache.SharedIndexInformer sinkers []types.Sinker mutex sync.Mutex + + cluster string } func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) { @@ -49,9 +51,9 @@ func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) { for _, w := range c.Sinks.Webhooks { if w.Url != "" { - sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url, Cluster: s.getClusterName()}) + sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url}) } else if w.Service != nil { - sinkers = append(sinkers, &sinks.WebhookSinker{Cluster: s.getClusterName(), Url: fmt.Sprintf("http://%s.%s.svc:%d/%s", + sinkers = append(sinkers, &sinks.WebhookSinker{Url: fmt.Sprintf("http://%s.%s.svc:%d/%s", w.Service.Name, w.Service.Namespace, *w.Service.Port, w.Service.Path)}) } } @@ -155,12 +157,21 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) { s.workqueue.Done(evt) } }() + + events := types.Events{} + for _, e := range evts { + events.KubeEvents = append(events.KubeEvents, &types.ExtendedEvent{ + Event: e, + Cluster: s.cluster, + }) + } + evtSinkers := s.getSinkers() if len(evtSinkers) == 0 { return } for _, sinker := range evtSinkers { - if err = sinker.Sink(ctx, evts); err != nil { + if err = sinker.Sink(ctx, events); err != nil { err = fmt.Errorf("error sinking events: %v", err) klog.Error(err) return @@ -196,5 +207,7 @@ func NewKubeEventSource(client *kubernetes.Clientset) *K8sEventSource { }, }) + s.cluster = s.getClusterName() + return s } diff --git a/pkg/exporter/sinks/stdout.go b/pkg/exporter/sinks/stdout.go index 3e554a7..163a254 100644 --- a/pkg/exporter/sinks/stdout.go +++ b/pkg/exporter/sinks/stdout.go @@ -5,14 +5,14 @@ import ( "encoding/json" "fmt" - v1 "k8s.io/api/core/v1" + "github.com/kubesphere/kube-events/pkg/exporter/types" ) type StdoutSinker struct { } -func (s *StdoutSinker) Sink(ctx context.Context, evts []*v1.Event) error { - for _, evt := range evts { +func (s *StdoutSinker) Sink(ctx context.Context, evts types.Events) error { + for _, evt := range evts.KubeEvents { bs, err := json.Marshal(evt) if err != nil { return err diff --git a/pkg/exporter/sinks/webhook.go b/pkg/exporter/sinks/webhook.go index 4da51e7..f16cfe5 100644 --- a/pkg/exporter/sinks/webhook.go +++ b/pkg/exporter/sinks/webhook.go @@ -7,8 +7,9 @@ import ( "fmt" "net/http" + "github.com/kubesphere/kube-events/pkg/exporter/types" + "github.com/kubesphere/kube-events/pkg/util" - v1 "k8s.io/api/core/v1" ) type WebhookSinker struct { @@ -16,19 +17,10 @@ type WebhookSinker struct { Cluster string } -type extendedEvent struct { - *v1.Event `json:",inline"` - Cluster string `json:"cluster,omitempty"` -} - -func (s *WebhookSinker) Sink(ctx context.Context, evts []*v1.Event) error { +func (s *WebhookSinker) Sink(ctx context.Context, evts types.Events) error { var buf bytes.Buffer - for _, evt := range evts { - extendedEvt := extendedEvent{ - Event: evt, - Cluster: s.Cluster, - } - if bs, err := json.Marshal(extendedEvt); err != nil { + for _, evt := range evts.KubeEvents { + if bs, err := json.Marshal(evt); err != nil { return err } else if _, err := buf.Write(bs); err != nil { return err diff --git a/pkg/exporter/types/types.go b/pkg/exporter/types/types.go index 93ed937..4ab526d 100644 --- a/pkg/exporter/types/types.go +++ b/pkg/exporter/types/types.go @@ -7,9 +7,14 @@ import ( ) type Events struct { - KubeEvents []*v1.Event `json:"kubeEvents"` + KubeEvents []*ExtendedEvent `json:"kubeEvents"` +} + +type ExtendedEvent struct { + *v1.Event `json:",inline"` + Cluster string `json:"cluster,omitempty"` } type Sinker interface { - Sink(ctx context.Context, events []*v1.Event) error + Sink(ctx context.Context, events Events) error }