From 421e2194aed019a385bed378b0358ea1b7f68ea0 Mon Sep 17 00:00:00 2001 From: AxiomSamarth Date: Fri, 7 Oct 2022 20:38:04 +0530 Subject: [PATCH 1/4] added logic to stream logs in loki --- cmd/run.go | 29 ++++++++---- go.mod | 6 +-- internal/runner/runner.go | 94 ++++++++++++++++++--------------------- 3 files changed, 67 insertions(+), 62 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 090821b..1a34545 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/go-kit/log" "github.com/pkg/errors" @@ -16,24 +17,28 @@ import ( ) type runOptions struct { - kubeconfig string + kubeconfig string + streamDuration time.Duration } var runOpts = &runOptions{} var runCmd = &cobra.Command{ - Use: "run", - Short: "run ", - Long: "run ", - Example: " lomba run", - //Args: cobra.MaximumNArgs(1), + Use: "run", + Short: "run ", + Long: "run ", + Example: " lomba run\n" + + " lomba run --kubeconfig --stream-duration 10m\n" + + " lomba run -k -s 15m30s", RunE: func(cmd *cobra.Command, args []string) error { return runRun() }, } func init() { - runCmd.Flags().StringVarP(&runOpts.kubeconfig, "kubeconfig", "k", "", "kubeconfig") + runCmd.Flags().StringVarP(&runOpts.kubeconfig, "kubeconfig", "k", os.Getenv("HOME")+"/.kube/config", "kubeconfig") + runCmd.Flags().DurationVarP(&runOpts.streamDuration, "stream-duration", "s", 1*time.Hour, "time duration to "+ + "stream logs into loki") RootCmd.AddCommand(runCmd) } @@ -53,15 +58,21 @@ func runRun() error { return err } - err = rr.Run(context.Background()) + cancelCtx, cancelFunc := context.WithTimeout(context.Background(), runOpts.streamDuration) + err = rr.Run(cancelCtx) if err != nil { + cancelFunc() return err } grafanaEndpoint := grafana.GetOutboundIPOrLocalhost() - fmt.Printf("Kubernetes logs are injested to Loki. Ready to query at http://%s:3000\n", grafanaEndpoint) + fmt.Printf("Kubernetes logs will be streamed to Loki for next %s minutes. Ready to query at http://%s:3000\n", + runOpts.streamDuration, grafanaEndpoint) + // sleep for duration as much as set in stream-duration flag to keep the goroutines active + time.Sleep(runOpts.streamDuration) + cancelFunc() return nil } diff --git a/go.mod b/go.mod index 3eec937..46836fc 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/zawachte/lomba -go 1.19 +go 1.18 require ( github.com/go-kit/log v0.2.0 @@ -10,6 +10,8 @@ require ( github.com/prometheus/client_golang v1.12.1 github.com/prometheus/common v0.32.1 github.com/spf13/cobra v1.0.0 + k8s.io/api v0.23.6 + k8s.io/apimachinery v0.23.6 k8s.io/client-go v0.23.6 ) @@ -125,8 +127,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.23.6 // indirect - k8s.io/apimachinery v0.23.6 // indirect k8s.io/klog/v2 v2.40.1 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 3a8bff0..a1fc948 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -2,9 +2,8 @@ package runner import ( "bufio" - "bytes" "context" - "io" + "fmt" "github.com/go-kit/log" corev1 "k8s.io/api/core/v1" @@ -48,8 +47,7 @@ func NewRunner(params RunnerParams) (Runner, error) { cs: params.ClientSet}, nil } -func (r *runner) Run(ctx context.Context) error { - +func (r *runner) Run(cancelCtx context.Context) error { err := loki.BringUpPod() if err != nil { return err @@ -60,68 +58,64 @@ func (r *runner) Run(ctx context.Context) error { return err } - namespaceList, err := r.cs.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + // get list of pods from all namespaces + podList, err := r.cs.CoreV1().Pods("").List(cancelCtx, metav1.ListOptions{}) if err != nil { return err } - for _, ns := range namespaceList.Items { - podList, err := r.cs.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{}) - if err != nil { - return err - } - - for _, pod := range podList.Items { + for _, pod := range podList.Items { + go r.streamPodLogs(cancelCtx, r.cs, pod) - for _, container := range pod.Spec.Containers { - req := r.cs.CoreV1().Pods(ns.Name).GetLogs(pod.Name, &corev1.PodLogOptions{ - Timestamps: true, - Container: container.Name, - }) + } - podLogs, err := req.Stream(ctx) - if err != nil { - return err - } - defer podLogs.Close() + return nil +} - buf := new(bytes.Buffer) +func (r *runner) loadLogsToLoki(logLine string, logParser parser.Parser, labels map[string]string) error { + tm, labelset, err := logParser.Parse(logLine, labels) + if err != nil { + fmt.Printf("%s\n", err.Error()) + r.logger.Log("Skipping log due to invalid parse", "Error", err.Error()) + return err + } + r.lokiClient.PostLog(logLine, tm, labelset) - _, err = io.Copy(buf, podLogs) - if err != nil { - return err - } + return nil +} - labels := make(map[string]string) - labels["namespace"] = ns.Name - labels["pod_name"] = pod.Name - labels["container_name"] = container.Name +// streamPodLogs will stream the pod logs and load the logs to loki with relevant +// labels, loglines and timestamp +func (r *runner) streamPodLogs(cancelCtx context.Context, cs kubernetes.Interface, pod corev1.Pod) error { + for _, container := range pod.Spec.Containers { + podLogOptions := &corev1.PodLogOptions{ + Follow: true, + Timestamps: true, + } - err = r.loadLogsToLoki(buf, parser.NewContainerParser(), labels) - if err != nil { - return err - } - } + req := cs.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, podLogOptions) + stream, err := req.Stream(cancelCtx) + if err != nil { + return fmt.Errorf("error in streaming logs: %s", err.Error()) } - } - return nil -} + reader := bufio.NewScanner(stream) + reader.Split(bufio.ScanLines) + defer stream.Close() -func (r *runner) loadLogsToLoki(rawLogs *bytes.Buffer, logParser parser.Parser, labels map[string]string) error { + for reader.Scan() { + labels := make(map[string]string) + labels["namespace"] = pod.Namespace + labels["pod_name"] = pod.Name + labels["container_name"] = container.Name - scanner := bufio.NewScanner(rawLogs) - scanner.Split(bufio.ScanLines) + logLine := reader.Text() - for scanner.Scan() { - log_line := scanner.Text() - tm, labels, err := logParser.Parse(log_line, labels) - if err != nil { - r.logger.Log("Skipping log due to invalid parse", "Error", err.Error()) - continue + err = r.loadLogsToLoki(logLine, parser.NewContainerParser(), labels) + if err != nil { + return fmt.Errorf("error loading logs to loki: %s", err.Error()) + } } - r.lokiClient.PostLog(log_line, tm, labels) } - return nil } From 52be1adb7c384d69c4c4ab87f237fd64a037a653 Mon Sep 17 00:00:00 2001 From: AxiomSamarth Date: Sat, 8 Oct 2022 17:50:54 +0530 Subject: [PATCH 2/4] refactor and remove print statements from goroutine --- internal/runner/runner.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index a1fc948..9c67e6a 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -3,7 +3,6 @@ package runner import ( "bufio" "context" - "fmt" "github.com/go-kit/log" corev1 "k8s.io/api/core/v1" @@ -75,7 +74,6 @@ func (r *runner) Run(cancelCtx context.Context) error { func (r *runner) loadLogsToLoki(logLine string, logParser parser.Parser, labels map[string]string) error { tm, labelset, err := logParser.Parse(logLine, labels) if err != nil { - fmt.Printf("%s\n", err.Error()) r.logger.Log("Skipping log due to invalid parse", "Error", err.Error()) return err } @@ -96,7 +94,7 @@ func (r *runner) streamPodLogs(cancelCtx context.Context, cs kubernetes.Interfac req := cs.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, podLogOptions) stream, err := req.Stream(cancelCtx) if err != nil { - return fmt.Errorf("error in streaming logs: %s", err.Error()) + return err } reader := bufio.NewScanner(stream) @@ -111,10 +109,8 @@ func (r *runner) streamPodLogs(cancelCtx context.Context, cs kubernetes.Interfac logLine := reader.Text() - err = r.loadLogsToLoki(logLine, parser.NewContainerParser(), labels) - if err != nil { - return fmt.Errorf("error loading logs to loki: %s", err.Error()) - } + // ignore the error and continue reading stream & loading to loki + _ = r.loadLogsToLoki(logLine, parser.NewContainerParser(), labels) } } return nil From dca02ad7752b91f129587bd989aad876771b4553 Mon Sep 17 00:00:00 2001 From: AxiomSamarth Date: Sat, 8 Oct 2022 19:21:10 +0530 Subject: [PATCH 3/4] update go.mod to use go v1.19 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 46836fc..ced1a69 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/zawachte/lomba -go 1.18 +go 1.19 require ( github.com/go-kit/log v0.2.0 From da4acb6a28f830bfb67c1cc5f65d431d5f0d8ae3 Mon Sep 17 00:00:00 2001 From: AxiomSamarth Date: Sun, 9 Oct 2022 13:03:08 +0530 Subject: [PATCH 4/4] trigger goroutine to collect podlogs per container --- internal/runner/runner.go | 49 +++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 9c67e6a..d113b61 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -64,8 +64,9 @@ func (r *runner) Run(cancelCtx context.Context) error { } for _, pod := range podList.Items { - go r.streamPodLogs(cancelCtx, r.cs, pod) - + for _, container := range pod.Spec.Containers { + go r.streamPodLogs(cancelCtx, r.cs, pod, container.Name) + } } return nil @@ -84,34 +85,32 @@ func (r *runner) loadLogsToLoki(logLine string, logParser parser.Parser, labels // streamPodLogs will stream the pod logs and load the logs to loki with relevant // labels, loglines and timestamp -func (r *runner) streamPodLogs(cancelCtx context.Context, cs kubernetes.Interface, pod corev1.Pod) error { - for _, container := range pod.Spec.Containers { - podLogOptions := &corev1.PodLogOptions{ - Follow: true, - Timestamps: true, - } +func (r *runner) streamPodLogs(cancelCtx context.Context, cs kubernetes.Interface, pod corev1.Pod, containerName string) error { + podLogOptions := &corev1.PodLogOptions{ + Follow: true, + Timestamps: true, + } - req := cs.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, podLogOptions) - stream, err := req.Stream(cancelCtx) - if err != nil { - return err - } + req := cs.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, podLogOptions) + stream, err := req.Stream(cancelCtx) + if err != nil { + return err + } - reader := bufio.NewScanner(stream) - reader.Split(bufio.ScanLines) - defer stream.Close() + reader := bufio.NewScanner(stream) + reader.Split(bufio.ScanLines) + defer stream.Close() - for reader.Scan() { - labels := make(map[string]string) - labels["namespace"] = pod.Namespace - labels["pod_name"] = pod.Name - labels["container_name"] = container.Name + for reader.Scan() { + labels := make(map[string]string) + labels["namespace"] = pod.Namespace + labels["pod_name"] = pod.Name + labels["container_name"] = containerName - logLine := reader.Text() + logLine := reader.Text() - // ignore the error and continue reading stream & loading to loki - _ = r.loadLogsToLoki(logLine, parser.NewContainerParser(), labels) - } + // ignore the error and continue reading stream & loading to loki + _ = r.loadLogsToLoki(logLine, parser.NewContainerParser(), labels) } return nil }