diff --git a/Dockerfile b/Dockerfile index 1a2ed59..4af43d2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ WORKDIR /kubernetes-pod-monitor COPY . . -RUN go build -o kubernetes-pod-monitor +RUN go build -o kubernetes-pod-monitor cmd/main.go FROM golang:1.16.14-alpine diff --git a/main.go b/cmd/kubernetes-pod-monitor/run.go similarity index 92% rename from main.go rename to cmd/kubernetes-pod-monitor/run.go index bf16e98..bd56364 100644 --- a/main.go +++ b/cmd/kubernetes-pod-monitor/run.go @@ -1,4 +1,4 @@ -package main +package kubernetes_pod_monitor import ( "fmt" @@ -46,7 +46,7 @@ func setup() { setupApp() } -func CleanupOnSignal(cleanup func()) { +func cleanupOnSignal(cleanup func()) { go func() { sig := <-gracefulStop log.Info(fmt.Sprintf("caught sig: %+v. waiting for goroutines to finish", sig)) @@ -60,7 +60,7 @@ func setupApp() { service.Initialize() http.Initialize() sessions.HealthOrPanic() - CleanupOnSignal(cleanup) + cleanupOnSignal(cleanup) } func cleanup() { @@ -69,13 +69,9 @@ func cleanup() { done <- true } -func run() { +func Run() { + setup() go http.Run() go service.Run() <-done } - -func main() { - setup() - run() -} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..23f72bb --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,7 @@ +package main + +import kubernetes_pod_monitor "github.com/unacademy/kubernetes-pod-monitor/cmd/kubernetes-pod-monitor" + +func main() { + kubernetes_pod_monitor.Run() +} diff --git a/http/main.go b/http/http.go similarity index 100% rename from http/main.go rename to http/http.go diff --git a/service/monitor.go b/service/monitor.go index 54cec86..cfeb3ee 100644 --- a/service/monitor.go +++ b/service/monitor.go @@ -70,15 +70,13 @@ func checkPod(pod *corev1.Pod) { lastTerminationState = lastState.String() } - podLogOpts := corev1.PodLogOptions{Container: container.Name, Previous: true} - req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts) + req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{Container: container.Name, Previous: true}) podLogs, err := req.Stream() if err != nil { log.Errorln("failed to get pods:", err) continue } - buf := new(bytes.Buffer) _, err = io.Copy(buf, podLogs) if err != nil { @@ -86,6 +84,7 @@ func checkPod(pod *corev1.Pod) { continue } logs := buf.String() + containerInfo.Reason = container.LastTerminationState.Terminated.Reason sendESLogs(&containerInfo, container.RestartCount, &logs, &lastTerminationState) diff --git a/service/main.go b/service/service.go similarity index 100% rename from service/main.go rename to service/service.go diff --git a/service/utils.go b/service/utils.go index bf6d6e2..426dac5 100644 --- a/service/utils.go +++ b/service/utils.go @@ -16,8 +16,6 @@ import ( "github.com/unacademy/kubernetes-pod-monitor/sessions" ) -const createdFormat = "2006-01-02 15:04:05" //"Jan 2, 2006 at 3:04pm (MST)" - // getCurrentTimeInSeconds returns current time in seconds. func getCurrentTimeInSeconds() int64 { return time.Now().UnixNano() / int64(time.Second) @@ -78,16 +76,16 @@ func notifyOnSlack(containerInfo *ContainerInfo, lastTerminationState *string, p log.Info("not notifying...") return } + channelName := getSlackChannel(containerInfo.ClusterName, containerInfo.Namespace) api := slack.New(token) log.Info("notifying...") log.Infof("Sending to %s for %s namespace in %s cluster", channelName, containerInfo.ClusterName, containerInfo.Namespace) - link := viper.GetString("elasticsearch.dashboard") + link := viper.GetString("elasticsearch.dashboard") msg := fmt.Sprintf("*Cluster Name*:- %s\n*Namespace*:- %s\n*Container Name*:- %s\n *Reason*:- %s\n `Kibana dashboard`:- <%s|Dashboard>", containerInfo.ClusterName, containerInfo.Namespace, containerInfo.ContainerName, containerInfo.Reason, link) - err := api.ChatPostMessage(channelName, msg, nil) if err != nil { log.Error(err) @@ -95,10 +93,9 @@ func notifyOnSlack(containerInfo *ContainerInfo, lastTerminationState *string, p } func getSlackChannel(clusterName string, namespace string) string { - sqlClient := sessions.GetSqlClient() var slackChannel string var defaultSlackChannel = viper.GetString("slack.channel") - + sqlClient := sessions.GetSqlClient() rows, err := sqlClient.Raw(`select slack_channel FROM k8s_pod_crash_notify WHERE clustername=? AND namespace=?`, clusterName, namespace).Rows() if err != nil { @@ -119,9 +116,8 @@ func getSlackChannel(clusterName string, namespace string) string { } func shouldNotify(clusterName string, namespace string, containername string) bool { - sqlClient := sessions.GetSqlClient() var exists string - + sqlClient := sessions.GetSqlClient() rows, err := sqlClient.Raw(`select count(*) FROM k8s_crash_ignore_notify WHERE clustername=? AND namespace=? AND containername=?`, clusterName, namespace, containername).Rows() if err != nil { @@ -145,7 +141,7 @@ func shouldNotify(clusterName string, namespace string, containername string) bo func persistPodCrash(containerInfo *ContainerInfo, restartCount int32) { sqlClient := sessions.GetSqlClient() - currTime := time.Unix(getCurrentTimeInSeconds(), 0).Format(createdFormat) + currTime := time.Unix(getCurrentTimeInSeconds(), 0).Format("2006-01-02 15:04:05") err := sqlClient.Exec(`INSERT INTO k8s_pod_crash (clustername, namespace, containername, restartcount, date) VALUES(?,?,?,?,?)`, containerInfo.ClusterName, containerInfo.Namespace, containerInfo.ContainerName, restartCount, currTime).Error diff --git a/sessions/elasticsearch.go b/sessions/elasticsearch.go index aef01e8..fb9c56c 100644 --- a/sessions/elasticsearch.go +++ b/sessions/elasticsearch.go @@ -24,7 +24,6 @@ func NewElasticSearchLocalClient(esURL string) (*elastic.Client, error) { } func NewElasticSearchAwsClient(esURL string, awsRegion string) (*elastic.Client, error) { - sess := session.Must(session.NewSession(aws.NewConfig().WithRegion(awsRegion))) svc := sts.New(sess) creds := GetAWSChainCredentialsV1(svc, sess) @@ -54,6 +53,7 @@ func InitElasticsearchClient() { url := fmt.Sprintf("%s:%s", viper.GetString("elasticsearch.url"), viper.GetString("elasticsearch.port")) env := viper.GetString("DEPLOY_ENV") + if env == "local" { client, err := NewElasticSearchLocalClient(url) if err != nil { diff --git a/sessions/irsa.go b/sessions/irsa.go index ec18b84..3b15eec 100644 --- a/sessions/irsa.go +++ b/sessions/irsa.go @@ -34,12 +34,12 @@ func GetAWSChainCredentialsV1(svc *sts.STS, sess *session.Session) *credentials. }, &credentials.SharedCredentialsProvider{}, } + if irsa { chain = append(chain, stscreds.NewWebIdentityRoleProvider(svc, roleARN, "", tokenPath)) } creds := credentials.NewChainCredentials(chain) - creds.Get() //IMPORTANT DO NOT remove otherwise throws error: RequestCanceled: request context canceled //caused by: context deadline exceeded: no Elasticsearch node available diff --git a/sessions/k8s.go b/sessions/k8s.go index 33c3e39..00b7859 100644 --- a/sessions/k8s.go +++ b/sessions/k8s.go @@ -16,6 +16,7 @@ var ( func newClientset() *kubernetes.Clientset { env := viper.GetString("DEPLOY_ENV") + kubeconfig := "" if env == "local" { kubeconfig = filepath.Join( diff --git a/sessions/mysql.go b/sessions/mysql.go index a5f2ab6..b858650 100644 --- a/sessions/mysql.go +++ b/sessions/mysql.go @@ -20,7 +20,6 @@ func newSqlClient() *gorm.DB { if err != nil { panic(err) } - // See "Important settings" section. sqlClient.DB().SetConnMaxLifetime(time.Duration(viper.GetInt64("sql.connection_lifetime")) * time.Second) sqlClient.DB().SetMaxOpenConns(80) sqlClient.DB().SetMaxIdleConns(20)