Skip to content

Commit

Permalink
Notify slack when k8s jobs fail (#237)
Browse files Browse the repository at this point in the history
* Notify slack when k8s jobs fail

* trigger build

* Update cmd/daemon/kubernetes/jobs.go

Co-authored-by: Bjørn <bso@lunar.app>

* Address feedback

Co-authored-by: Bjørn <bso@lunar.app>
  • Loading branch information
jweibel22 and Crevil authored Apr 8, 2021
1 parent 3c8cbe2 commit 5224819
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 0 deletions.
10 changes: 10 additions & 0 deletions cmd/daemon/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ func StartDaemon() *cobra.Command {
}
}()

go func() {
for {
err = kubectl.HandleJobErrors(context.Background())
if err != nil && err != kubernetes.ErrWatcherClosed {
done <- errors.WithMessage(err, "kubectl handle job errors: watcher closed")
return
}
}
}()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
Expand Down
16 changes: 16 additions & 0 deletions cmd/daemon/kubernetes/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Exporter interface {
// Send a message through the exporter.
SendSuccessfulReleaseEvent(c context.Context, event httpinternal.ReleaseEvent) error
SendPodErrorEvent(c context.Context, event httpinternal.PodErrorEvent) error
SendJobErrorEvent(c context.Context, event httpinternal.JobErrorEvent) error
}

type ReleaseManagerExporter struct {
Expand Down Expand Up @@ -50,3 +51,18 @@ func (e *ReleaseManagerExporter) SendPodErrorEvent(ctx context.Context, event ht
}
return nil
}

func (e *ReleaseManagerExporter) SendJobErrorEvent(ctx context.Context, event httpinternal.JobErrorEvent) error {
e.Log.With("event", event).Infof("JobError Event")
var resp httpinternal.KubernetesNotifyResponse
url, err := e.Client.URL("webhook/daemon/k8s/joberror")
if err != nil {
return err
}
event.Environment = e.Environment
err = e.Client.Do(http.MethodPost, url, event, &resp)
if err != nil {
return err
}
return nil
}
84 changes: 84 additions & 0 deletions cmd/daemon/kubernetes/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package kubernetes

import (
"context"
"github.com/lunarway/release-manager/internal/http"
"github.com/lunarway/release-manager/internal/log"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)

func (c *Client) HandleJobErrors(ctx context.Context) error {
watcher, err := c.clientset.BatchV1().Jobs("").Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
watcher.Stop()
case e, ok := <-watcher.ResultChan():
if !ok {
return ErrWatcherClosed
}
if e.Object == nil {
continue
}
if e.Type == watch.Deleted {
continue
}
job, ok := e.Object.(*batchv1.Job)
if !ok {
continue
}

// Check if we have all the annotations we need for the release-daemon
if !isCorrectlyAnnotated(job.Annotations) {
continue
}

if isJobFailed(job) {
// Notify the release-manager with the job error event.
err = c.exporter.SendJobErrorEvent(ctx, http.JobErrorEvent{
JobName: job.Name,
Namespace: job.Namespace,
Errors: jobErrorMessages(job),
ArtifactID: job.Annotations["lunarway.com/artifact-id"],
AuthorEmail: job.Annotations["lunarway.com/author"],
})
if err != nil {
log.Errorf("Failed to send job error event: %v", err)
continue
}
}
}
}
}

func isJobFailed(job *batchv1.Job) bool {
if len(job.Status.Conditions) == 0 {
return false
}
for _, condition := range job.Status.Conditions {
if condition.Status == corev1.ConditionTrue && condition.Type == batchv1.JobFailed {
return true
}
}
return false
}

func jobErrorMessages(job *batchv1.Job) []http.JobConditionError {
var errors []http.JobConditionError

for _, condition := range job.Status.Conditions {
if condition.Status == corev1.ConditionTrue && condition.Type == batchv1.JobFailed {
errors = append(errors, http.JobConditionError {
Reason: condition.Reason,
Message: condition.Message,
})
}
}
return errors
}
45 changes: 45 additions & 0 deletions cmd/server/http/daemon_k8s_job_error_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package http

import (
"context"
"net/http"

"github.com/lunarway/release-manager/internal/flow"
httpinternal "github.com/lunarway/release-manager/internal/http"
"github.com/lunarway/release-manager/internal/log"
"github.com/lunarway/release-manager/internal/slack"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
)

func daemonk8sJobErrorWebhook(payload *payload, flowSvc *flow.Service) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// copy span from request context but ignore any deadlines on the request context
ctx := opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(r.Context()))
logger := log.WithContext(ctx)
var event httpinternal.JobErrorEvent
err := payload.decodeResponse(ctx, r.Body, &event)
if err != nil {
logger.Errorf("http: daemon k8s job error webhook: decode request body failed: %v", err)
invalidBodyError(w)
return
}
logger = logger.WithFields("event", event)
err = flowSvc.NotifyK8SJobErrorEvent(ctx, &event)
if err != nil && errors.Cause(err) != slack.ErrUnknownEmail {
logger.Errorf("http: daemon k8s pod error webhook failed: %+v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

err = payload.encodeResponse(ctx, w, httpinternal.KubernetesNotifyResponse{})
if err != nil {
logger.Errorf("http: daemon k8s job error webhook: environment: '%s' marshal response: %v", event.Environment, err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
logger.Infof("http: daemon k8s job error webhook: handled")
}
}
1 change: 1 addition & 0 deletions cmd/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewServer(opts *Options, slackClient *slack.Client, flowSvc *flow.Service,
mux.HandleFunc("/webhook/daemon/flux", trace(tracer, authenticate(opts.DaemonAuthToken, daemonFluxWebhook(&payloader, flowSvc))))
mux.HandleFunc("/webhook/daemon/k8s/deploy", trace(tracer, authenticate(opts.DaemonAuthToken, daemonk8sDeployWebhook(&payloader, flowSvc))))
mux.HandleFunc("/webhook/daemon/k8s/error", trace(tracer, authenticate(opts.DaemonAuthToken, daemonk8sPodErrorWebhook(&payloader, flowSvc))))
mux.HandleFunc("/webhook/daemon/k8s/joberror", trace(tracer, authenticate(opts.DaemonAuthToken, daemonk8sJobErrorWebhook(&payloader, flowSvc))))

// s3 endpoints
mux.HandleFunc("/artifacts/create", trace(tracer, authenticate(opts.ArtifactAuthToken, createArtifact(&payloader, artifactWriteStorage))))
Expand Down
12 changes: 12 additions & 0 deletions internal/flow/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ func (s *Service) NotifyK8SPodErrorEvent(ctx context.Context, event *http.PodErr
return nil
}

func (s *Service) NotifyK8SJobErrorEvent(ctx context.Context, event *http.JobErrorEvent) error {
span, ctx := s.Tracer.FromCtx(ctx, "flow.NotifyK8SJobErrorEvent")
defer span.Finish()
span, _ = s.Tracer.FromCtx(ctx, "post k8s NotifyK8SJobErrorEvent slack message")
err := s.Slack.NotifyK8SJobErrorEvent(ctx, event)
span.Finish()
if err != nil {
return errors.WithMessage(err, "post k8s NotifyK8SJobErrorEvent slack message")
}
return nil
}

func (s *Service) NotifyFluxEvent(ctx context.Context, event *http.FluxNotifyRequest) error {
span, ctx := s.Tracer.FromCtx(ctx, "flow.NotifyFluxEvent")
defer span.Finish()
Expand Down
15 changes: 15 additions & 0 deletions internal/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ type PodErrorEvent struct {
Environment string `json:"environment,omitempty"`
ArtifactID string `json:"artifactId,omitempty"`
}

type JobConditionError struct {
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
}

type JobErrorEvent struct {
JobName string `json:"jobName,omitempty"`
Namespace string `json:"namespace,omitempty"`
Errors []JobConditionError `json:"errors,omitempty"`
AuthorEmail string `json:"authorEmail,omitempty"`
Environment string `json:"environment,omitempty"`
ArtifactID string `json:"artifactId,omitempty"`
}

type KubernetesNotifyResponse struct {
}

Expand Down
31 changes: 31 additions & 0 deletions internal/slack/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,37 @@ func (c *Client) NotifyK8SPodErrorEvent(ctx context.Context, event *http.PodErro
return err
}

func (c *Client) NotifyK8SJobErrorEvent(ctx context.Context, event *http.JobErrorEvent) error {
if c.muteOptions.Kubernetes {
return nil
}
userID, err := c.getIdByEmail(ctx, event.AuthorEmail)
if err != nil {
return err
}
asUser := slack.MsgOptionAsUser(true)
var fields []slack.AttachmentField
for _, condition := range event.Errors {
fields = append(fields, slack.AttachmentField{
Title: fmt.Sprintf("Reason: %s", condition.Reason),
Value: fmt.Sprintf("```%s```", condition.Message),
Short: false,
})
}
attachments := slack.MsgOptionAttachments(slack.Attachment{
Title: fmt.Sprintf(":kubernetes: k8s (%s) :no_entry:", event.Environment),
Text: fmt.Sprintf("Job Error: %s\nArtifact: *%s*", event.JobName, event.ArtifactID),
Color: "#e24d42",
MarkdownIn: []string{"text", "fields"},
Fields: fields,
})
_, _, err = c.client.PostMessageContext(ctx, userID, asUser, attachments)
if err != nil {
return err
}
return err
}

func (c *Client) NotifyReleaseManagerError(ctx context.Context, msgType, service, environment, branch, namespace, actorEmail string, inputErr error) error {
if c.muteOptions.ReleaseManagerError {
return nil
Expand Down

0 comments on commit 5224819

Please sign in to comment.