diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index 72107636f..2e4efcbe7 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log" "net/http" "time" @@ -67,9 +66,7 @@ func main() { ctx, cancelCtxFn := context.WithCancel(ctx) defer cancelCtxFn() - if err := run(ctx); err != nil { - log.Fatal(err) - } + loggerx.ExitOnError(run(ctx), "while running application") } // run wraps the main logic of the app to be able to properly clean up resources via deferred calls. diff --git a/cmd/cli/cmd/install.go b/cmd/cli/cmd/install.go index e0254c240..c22399d3b 100644 --- a/cmd/cli/cmd/install.go +++ b/cmd/cli/cmd/install.go @@ -49,6 +49,8 @@ func NewInstall() *cobra.Command { kubex.RegisterKubeconfigFlag(flags) flags.BoolVar(&opts.LogsReportTimestamp, "logs-report-timestamp", false, "Print timestamp prefix to the Botkube logs entries") flags.IntVar(&opts.LogsScrollingHeight, "logs-scrolling-height", 10, "") + flags.DurationVar(&opts.Timeout, "timeout", 10*time.Minute, `Maximum time during which the Botkube installation is being watched, where "0" means "infinite". Valid time units are "ns", "us" (or "ยตs"), "ms", "s", "m", "h".`) + flags.BoolVarP(&opts.Watch, "watch", "w", true, "Watches the status of the Botkube installation until it finish or the defined `--timeout` occurs.") // common params for install and upgrade operation flags.StringVar(&opts.HelmParams.Version, "version", install.LatestVersionTag, "Botkube version. Possible values @latest, 1.2.0, ...") @@ -61,9 +63,6 @@ func NewInstall() *cobra.Command { flags.BoolVar(&opts.HelmParams.DisableHooks, "no-hooks", false, "Disable pre/post install/upgrade hooks") flags.BoolVar(&opts.HelmParams.DisableOpenAPIValidation, "disable-openapi-validation", false, "If set, it will not validate rendered templates against the Kubernetes OpenAPI Schema") flags.BoolVar(&opts.HelmParams.SkipCRDs, "skip-crds", false, "If set, no CRDs will be installed.") - flags.DurationVar(&opts.HelmParams.Timeout, "timeout", 5*time.Minute, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") - flags.BoolVar(&opts.HelmParams.Wait, "wait", false, "If set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") - flags.BoolVar(&opts.HelmParams.WaitForJobs, "wait-for-jobs", true, "If set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") flags.BoolVar(&opts.HelmParams.Atomic, "atomic", false, "If set, process rolls back changes made in case of failed install/upgrade. The --wait flag will be set automatically if --atomic is used") flags.BoolVar(&opts.HelmParams.SubNotes, "render-subchart-notes", false, "If set, render subchart notes along with the parent") flags.StringVar(&opts.HelmParams.Description, "description", "", "add a custom description") diff --git a/internal/cli/install/config.go b/internal/cli/install/config.go index 054fbd7f8..42176341d 100644 --- a/internal/cli/install/config.go +++ b/internal/cli/install/config.go @@ -1,6 +1,8 @@ package install import ( + "time" + "github.com/kubeshop/botkube/internal/cli/install/helm" ) @@ -27,4 +29,6 @@ type Config struct { HelmParams helm.Config LogsReportTimestamp bool LogsScrollingHeight int + Watch bool + Timeout time.Duration } diff --git a/internal/cli/install/helm/config.go b/internal/cli/install/helm/config.go index 979b10d70..fca646094 100644 --- a/internal/cli/install/helm/config.go +++ b/internal/cli/install/helm/config.go @@ -1,8 +1,6 @@ package helm import ( - "time" - "helm.sh/helm/v3/pkg/cli/values" ) @@ -20,9 +18,6 @@ type Config struct { Namespace string SkipCRDs bool - Timeout time.Duration - Wait bool - WaitForJobs bool DisableHooks bool DryRun bool Force bool diff --git a/internal/cli/install/helm/install.go b/internal/cli/install/helm/install.go index b3b61949e..bae89829f 100644 --- a/internal/cli/install/helm/install.go +++ b/internal/cli/install/helm/install.go @@ -45,13 +45,17 @@ func NewHelm(k8sCfg *rest.Config, forNamespace string) (*Helm, error) { // Install installs a given Helm chart. func (c *Helm) Install(ctx context.Context, status *printer.StatusPrinter, opts Config) (*release.Release, error) { histClient := action.NewHistory(c.helmCfg) - histClient.Max = 1 - _, err := histClient.Run(opts.ReleaseName) + //histClient.Max = 1 + rels, err := histClient.Run(opts.ReleaseName) var runFn Run switch { case err == nil: + if err := PrintReleaseStatus("Detected existing Botkube installation:", status, rels[len(rels)-1]); err != nil { + return nil, err + } + prompt := &survey.Confirm{ - Message: "Detected existing Botkube installation. Do you want to upgrade it?", + Message: "Do you want to upgrade existing installation?", Default: true, } @@ -133,9 +137,8 @@ func (c *Helm) installAction(opts Config) Run { installCli.Namespace = opts.Namespace installCli.SkipCRDs = opts.SkipCRDs - installCli.Timeout = opts.Timeout - //installCli.Wait = opts.Wait - installCli.WaitForJobs = opts.WaitForJobs + installCli.Wait = false // botkube CLI has a custom logic to do that + installCli.WaitForJobs = false installCli.DisableHooks = opts.DisableHooks installCli.DryRun = opts.DryRun installCli.Force = opts.Force @@ -157,9 +160,8 @@ func (c *Helm) upgradeAction(opts Config) Run { upgradeAction.Namespace = opts.Namespace upgradeAction.SkipCRDs = opts.SkipCRDs - upgradeAction.Timeout = opts.Timeout - //upgradeAction.Wait = opts.Wait - upgradeAction.WaitForJobs = opts.WaitForJobs + upgradeAction.Wait = false // botkube CLI has a custom logic to do that + upgradeAction.WaitForJobs = false upgradeAction.DisableHooks = opts.DisableHooks upgradeAction.DryRun = opts.DryRun upgradeAction.Force = opts.Force diff --git a/internal/cli/install/helm/status.go b/internal/cli/install/helm/status.go index 8c24a8a9a..ea7e44ead 100644 --- a/internal/cli/install/helm/status.go +++ b/internal/cli/install/helm/status.go @@ -14,14 +14,14 @@ import ( var releaseGoTpl = ` {{ Key "Name" }} {{ .Name | Val }} {{ Key "Namespace" }} {{ .Namespace | Val }} + {{ Key "Version" }} {{ .Version | Val }} {{ Key "Last Deployed" }} {{ .LastDeployed | FmtDate | Val }} {{ Key "Revision" }} {{ .Revision | Val }} - {{ Key "Description" }} {{ .Description | Val }} ` // PrintReleaseStatus returns release description similar to what Helm does, // based on https://github.com/helm/helm/blob/f31d4fb3aacabf6102b3ec9214b3433a3dbf1812/cmd/helm/status.go#L126C1-L138C3 -func PrintReleaseStatus(status *printer.StatusPrinter, r *release.Release) error { +func PrintReleaseStatus(header string, status *printer.StatusPrinter, r *release.Release) error { if r == nil { return nil } @@ -35,6 +35,7 @@ func PrintReleaseStatus(status *printer.StatusPrinter, r *release.Release) error } properties["Namespace"] = r.Namespace properties["Status"] = r.Info.Status.String() + properties["Version"] = r.Chart.AppVersion() properties["Revision"] = fmt.Sprintf("%d", r.Version) properties["Description"] = r.Info.Description @@ -43,6 +44,6 @@ func PrintReleaseStatus(status *printer.StatusPrinter, r *release.Release) error return err } - status.InfoWithBody("Release details:", indent.String(desc, 2)) + status.InfoWithBody(header, indent.String(desc, 2)) return nil } diff --git a/internal/cli/install/helm/wait.go b/internal/cli/install/helm/wait.go deleted file mode 100644 index f738294cb..000000000 --- a/internal/cli/install/helm/wait.go +++ /dev/null @@ -1,131 +0,0 @@ -package helm - -import ( - "context" - "errors" - "time" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - watchtools "k8s.io/client-go/tools/watch" -) - -// waitForTestSuite watches the given test suite until the exitCondition is true -func WaitForBotkubePod(ctx context.Context, clientset *kubernetes.Clientset, namespace string, name string, exitCondition watchtools.ConditionFunc, timeout time.Duration) error { - ctx, cancel := context.WithCancel(ctx) - if timeout > 0 { - ctx, cancel = context.WithTimeout(ctx, timeout) - } - defer cancel() - - //preconditionFunc := func(store cache.Store) (bool, error) { - // _, exists, err := store.Get(&metav1.ObjectMeta{Name: name, Namespace: namespace}) - // if err != nil { - // fmt.Println(err) - // return true, err - // } - // if !exists { - // // We need to make sure we see the object in the cache before we start waiting for events - // // or we would be waiting for the timeout if such object didn't exist. - // fmt.Println("exists", exists) - // return true, apierrors.NewNotFound(corev1.Resource("pods"), name) - // } - // - // return false, nil - //} - - //selector := labels.SelectorFromSet(map[string]string{"app": name}).String() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - //options.LabelSelector = selector - return clientset.CoreV1().Pods(namespace).List(ctx, options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - //options.LabelSelector = selector - return clientset.CoreV1().Pods(namespace).Watch(ctx, options) - }, - } - - _, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, exitCondition) - return err -} - -var ( - errPodRestartedWithError = errors.New("pod restarted with non zero exit code") -) - -// clusterTestSuiteCompleted returns true if the suite has run to completion, false if the suite has not yet -// reached running state, or an error in any other case. -func PodReady(podScheduledIndicator, phase chan string, since time.Time) func(event watch.Event) (bool, error) { - //infiniteWatch := func(event watch.Event) (bool, error) { - // if event.Type == obj.Event { - // cm := event.Object.(*corev1.ConfigMap) - // msg := fmt.Sprintf("Plugin %s detected `%s` event on `%s/%s`", pluginName, obj.Event, cm.Namespace, cm.Name) - // sink <- []byte(msg) - // } - // - // // always continue - context will cancel this watch for us :) - // return false, nil - //} - - informed := false - - sinceK8sTime := metav1.NewTime(since) - //prevPodName := "" // when we do upgrade we will just ignore it - return func(event watch.Event) (bool, error) { - //fmt.Printf("testing %v\n", event) - switch t := event.Type; t { - case watch.Added, watch.Modified: - switch pod := event.Object.(type) { - case *corev1.Pod: - - createdAt := pod.GetObjectMeta().GetCreationTimestamp() - if createdAt.Before(&sinceK8sTime) { - return false, nil - } - - phase <- string(pod.Status.Phase) - if pod.Status.Phase == corev1.PodRunning && !informed { - informed = true - podScheduledIndicator <- pod.Name - close(podScheduledIndicator) - } - - for _, cond := range pod.Status.ContainerStatuses { - if cond.Ready == false && cond.RestartCount > 0 { - // pod was already restarted because of the problem, we restart botkube on permanent errors mostly, so let's stop watching - return true, errPodRestartedWithError - } - } - - //fmt.Printf("Pod phase: %q\n", pod.Status.Phase) - return isPodReady(pod), nil - } - //case watch.Deleted: - // We need to abort to avoid cases of recreation and not to silently watch the wrong (new) object - //return false, apierrors.NewNotFound(corev1.Resource("pods"), "") - //default: - // fmt.Println("test") - // return true, fmt.Errorf("internal error: unexpected event %#v", event) - } - - return false, nil - } -} - -// isPodReady returns true if a pod is ready; false otherwise. -func isPodReady(pod *corev1.Pod) bool { - for _, c := range pod.Status.Conditions { - if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { - return true - } - //fmt.Printf("Pod is ready: %s/%s\n", pod.GetNamespace(), pod.GetName()) - //fmt.Println("Finishing!!") - } - //fmt.Printf("Pod is not ready: %s/%s\n", pod.GetNamespace(), pod.GetName()) - return false -} diff --git a/internal/cli/install/install.go b/internal/cli/install/install.go index e7677004d..4e996e9e2 100644 --- a/internal/cli/install/install.go +++ b/internal/cli/install/install.go @@ -24,6 +24,12 @@ import ( // Install installs Botkube Helm chart into cluster. func Install(ctx context.Context, w io.Writer, k8sCfg *kubex.ConfigWithMeta, opts Config) (err error) { + ctxWithTimeout, cancel := context.WithCancel(ctx) + if opts.Timeout > 0 { + ctxWithTimeout, cancel = context.WithTimeout(ctxWithTimeout, opts.Timeout) + } + defer cancel() + status := printer.NewStatus(w, "Installing Botkube on cluster...") defer func() { status.End(err == nil) @@ -64,33 +70,42 @@ func Install(ctx context.Context, w io.Writer, k8sCfg *kubex.ConfigWithMeta, opt return err } - err = ensureNamespaceCreated(ctx, clientset, opts.HelmParams.Namespace) + err = ensureNamespaceCreated(ctxWithTimeout, clientset, opts.HelmParams.Namespace) status.End(err == nil) if err != nil { return err } - parallel, _ := errgroup.WithContext(ctx) + parallel, _ := errgroup.WithContext(ctxWithTimeout) podScheduledIndicator := make(chan string) podWaitErrors := make(chan error, 1) podWaitPhase := make(chan string, 10) parallel.Go(func() error { - err := helm.WaitForBotkubePod(ctx, clientset, opts.HelmParams.Namespace, opts.HelmParams.ReleaseName, helm.PodReady(podScheduledIndicator, podWaitPhase, time.Now()), opts.HelmParams.Timeout) + err := kubex.WaitForPod(ctxWithTimeout, clientset, opts.HelmParams.Namespace, opts.HelmParams.ReleaseName, kubex.PodReady(podScheduledIndicator, podWaitPhase, time.Now())) podWaitErrors <- err return nil }) - rel, err := helmInstaller.Install(ctx, status, opts.HelmParams) + rel, err := helmInstaller.Install(ctxWithTimeout, status, opts.HelmParams) if err != nil { return err } + if !opts.Watch { + status.Infof("Watching Botkube installation is disabled") + if err := helm.PrintReleaseStatus("Release details:", status, rel); err != nil { + return err + } + + return printSuccessInstallMessage(opts.HelmParams.Version, w) + } + status.Step("Waiting until Botkube Pod is running") var podName string select { case podName = <-podScheduledIndicator: status.End(true) - case <-time.After(opts.HelmParams.Timeout): + case <-time.After(opts.Timeout): return fmt.Errorf("Timed out waiting for Pod") } @@ -99,24 +114,23 @@ func Install(ctx context.Context, w io.Writer, k8sCfg *kubex.ConfigWithMeta, opt defer cancelStreamLogs() parallel.Go(func() error { defer close(messages) - return logs.DefaultConsumeRequest(streamLogCtx, clientset, opts.HelmParams.Namespace, podName, messages) + return logs.StartsLogsStreaming(streamLogCtx, clientset, opts.HelmParams.Namespace, podName, messages) }) logsPrinter := logs.NewFixedHeightPrinter( opts.LogsScrollingHeight, - logs.NewKVParser(opts.LogsReportTimestamp), podName, ) parallel.Go(func() error { - logsPrinter.Start(ctx) + logsPrinter.Start(ctxWithTimeout) return nil }) parallel.Go(func() error { for { select { - case <-ctx.Done(): // it's canceled on OS signals or if function passed to 'Go' method returns a non-nil error - return ctx.Err() + case <-ctxWithTimeout.Done(): // it's canceled on OS signals or if function passed to 'Go' method returns a non-nil error + return ctxWithTimeout.Err() case err := <-podWaitErrors: time.Sleep(time.Second) cancelStreamLogs() @@ -128,8 +142,8 @@ func Install(ctx context.Context, w io.Writer, k8sCfg *kubex.ConfigWithMeta, opt parallel.Go(func() error { for { select { - case <-ctx.Done(): // it's canceled on OS signals or if function passed to 'Go' method returns a non-nil error - return ctx.Err() + case <-ctxWithTimeout.Done(): // it's canceled on OS signals or if function passed to 'Go' method returns a non-nil error + return ctxWithTimeout.Err() case ph := <-podWaitPhase: logsPrinter.UpdatePodPhase(ph) case entry, ok := <-messages: @@ -144,10 +158,11 @@ func Install(ctx context.Context, w io.Writer, k8sCfg *kubex.ConfigWithMeta, opt err = parallel.Wait() if err != nil { + printFailedInstallMessage(opts.HelmParams.Version, opts.HelmParams.Namespace, podName, w) return err } - if err := helm.PrintReleaseStatus(status, rel); err != nil { + if err := helm.PrintReleaseStatus("Release details:", status, rel); err != nil { return err } @@ -173,7 +188,40 @@ func printSuccessInstallMessage(version string, w io.Writer) error { return fmt.Errorf("while rendering message: %v", err) } - _, err = fmt.Fprintln(w, out) + _, err = fmt.Fprint(w, out) + if err != nil { + return err + } + + return nil +} + +var failedInstallGoTpl = ` + โ”‚ {{ printf "Botkube %s installation failed ๐Ÿ˜ฟ" .Version | Bold | Red }} + โ”‚ To get all Botkube logs, run: + โ”‚ + โ”‚ kubectl logs -n {{ .Namespace }} pod/{{ .PodName }} + + โ”‚ To receive assistance, please join our Slack community at {{ .SlackURL | Underline | Blue }}. + โ”‚ We'll be glad to help you get Botkube up and running! +` + +func printFailedInstallMessage(version string, namespace string, name string, w io.Writer) error { + renderer := style.NewGoTemplateRender(style.DefaultConfig(failedInstallGoTpl)) + + props := map[string]string{ + "SlackURL": "https://join.botkube.io", + "Version": version, + "Namespace": namespace, + "PodName": name, + } + + out, err := renderer.Render(props, cli.IsSmartTerminal(w)) + if err != nil { + return err + } + + _, err = fmt.Fprint(w, out) if err != nil { return fmt.Errorf("while printing message: %v", err) } diff --git a/internal/cli/install/logs/json_parser.go b/internal/cli/install/logs/json_parser.go new file mode 100644 index 000000000..6e90fbaf6 --- /dev/null +++ b/internal/cli/install/logs/json_parser.go @@ -0,0 +1,60 @@ +package logs + +import ( + "encoding/json" + "fmt" + "sort" + + "golang.org/x/exp/maps" + + charmlog "github.com/charmbracelet/log" + + "github.com/kubeshop/botkube/internal/cli" +) + +type JSONParser struct{} + +func (p *JSONParser) ParseLine(line string) map[string]any { + var out map[string]any + err := json.Unmarshal([]byte(line), &out) + if err != nil { + return nil + } + return out +} + +// ParseLineIntoCharm returns parsed log line with charm logger support. +func (k *JSONParser) ParseLineIntoCharm(line string) ([]any, charmlog.Level) { + result := k.ParseLine(line) + if result == nil { + return nil, 0 + } + + var fields []any + + //if k.ReportTimestamp { + // parseAny, _ := dateparse.ParseAny(result["time"]) + // fields = append(fields, charmlog.TimestampKey, parseAny) + //} + + lvl := charmlog.ParseLevel(fmt.Sprint(result["level"])) + // todo, check and ignore debug + fields = append(fields, charmlog.LevelKey, lvl) + fields = append(fields, charmlog.MessageKey, result["msg"]) + + keys := maps.Keys(result) + sort.Strings(keys) + for _, k := range keys { + switch k { + case "level", "msg", "time": // already process + continue + case "component", "url": + if !cli.VerboseMode.IsEnabled() { + continue // ignore those fields if verbose is not enabled + } + } + fields = append(fields, k, result[k]) + } + + return fields, lvl +} diff --git a/internal/cli/install/logs/k8s.go b/internal/cli/install/logs/k8s.go index ac7cc7d2e..354e102bf 100644 --- a/internal/cli/install/logs/k8s.go +++ b/internal/cli/install/logs/k8s.go @@ -16,15 +16,15 @@ const ( containerName = "botkube" ) -// DefaultConsumeRequest reads the data from request and writes into -// the out writer. It buffers data from requests until the newline or io.EOF +// StartsLogsStreaming reads the data from request and writes into +// the out channel. It buffers data from requests until the newline or io.EOF // occurs in the data, so it doesn't interleave logs sub-line // when running concurrently. // // A successful read returns err == nil, not err == io.EOF. // Because the function is defined to read from request until io.EOF, it does // not treat an io.EOF as an error to be reported. -func DefaultConsumeRequest(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string, out chan<- []byte) error { +func StartsLogsStreaming(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string, out chan<- []byte) error { return retry.Do(func() error { req := clientset.CoreV1().Pods(namespace).GetLogs(name, &v1.PodLogOptions{ Container: containerName, diff --git a/internal/cli/install/logs/kv_parser.go b/internal/cli/install/logs/kv_parser.go deleted file mode 100644 index c5e6e2afa..000000000 --- a/internal/cli/install/logs/kv_parser.go +++ /dev/null @@ -1,76 +0,0 @@ -package logs - -import ( - "regexp" - "sort" - "strings" - - "github.com/araddon/dateparse" - charmlog "github.com/charmbracelet/log" - "golang.org/x/exp/maps" -) - -// Regex pattern to match key-value pairs -var kvMatcher = regexp.MustCompile(`(\w+)(?:="([^"]+)"|=([^"\s]+))?`) - -// KVParser knows how to parse key-value log pairs. -type KVParser struct { - ReportTimestamp bool -} - -func NewKVParser(reportTimestamp bool) *KVParser { - return &KVParser{ReportTimestamp: reportTimestamp} -} - -// ParseLineIntoCharm returns parsed log line with charm logger support. -func (k *KVParser) ParseLineIntoCharm(line string) []any { - result := k.ParseLine(line) - - var fields []any - - if k.ReportTimestamp { - parseAny, _ := dateparse.ParseAny(result["time"]) - fields = append(fields, charmlog.TimestampKey, parseAny) - } - - fields = append(fields, charmlog.LevelKey, charmlog.ParseLevel(result["level"])) - fields = append(fields, charmlog.MessageKey, result["msg"]) - - keys := maps.Keys(result) - sort.Strings(keys) - for _, k := range keys { - switch k { - case "level", "msg", "time": // already process - continue - } - fields = append(fields, k, result[k]) - } - - return fields -} - -func (k *KVParser) ParseLine(line string) map[string]string { - result := make(map[string]string) - - matches := kvMatcher.FindAllStringSubmatch(line, -1) - for _, match := range matches { - m := safeSliceGetter(match) - key := m.Get(1) - value := strings.TrimSpace(m.Get(2)) - if value == "" { - value = strings.TrimSpace(m.Get(3)) - } - result[key] = value - } - - return result -} - -type safeSliceGetter []string - -func (s safeSliceGetter) Get(idx int) string { - if len(s) < idx { - return "" - } - return s[idx] -} diff --git a/internal/cli/install/logs/printer.go b/internal/cli/install/logs/printer.go index 3cfd07d31..0cd22b591 100644 --- a/internal/cli/install/logs/printer.go +++ b/internal/cli/install/logs/printer.go @@ -6,47 +6,41 @@ import ( "fmt" "os" "strings" - "sync" "time" "github.com/charmbracelet/log" + charmlog "github.com/charmbracelet/log" "github.com/morikuni/aec" + "github.com/muesli/termenv" + "github.com/kubeshop/botkube/internal/cli" "github.com/kubeshop/botkube/internal/cli/printer" ) -const extendedKitchenTimeFormat = "3:04:05 PM" - type FixedHeightPrinter struct { - height int - logsBuffer []string - moveUpByHeight aec.ANSI - parser *KVParser - logger *log.Logger - podPhase string - sync.Mutex - alreadyUsed bool + height int + logsBuffer []string + podPhase string podName string newLog chan string newPodPhase chan string stop chan struct{} + parser JSONParser + logger *log.Logger } -func NewFixedHeightPrinter(height int, parser *KVParser, name string) *FixedHeightPrinter { +func NewFixedHeightPrinter(height int, name string) *FixedHeightPrinter { return &FixedHeightPrinter{ - height: height, - logsBuffer: []string{}, - moveUpByHeight: aec.Up(uint(height)), - parser: parser, - alreadyUsed: false, - newLog: make(chan string, 10), - newPodPhase: make(chan string, 10), + height: height, + logsBuffer: []string{}, + newLog: make(chan string, 10), + newPodPhase: make(chan string, 10), + stop: make(chan struct{}), logger: log.NewWithOptions(os.Stdout, log.Options{ - TimeFormat: extendedKitchenTimeFormat, - Formatter: log.TextFormatter, + Formatter: log.TextFormatter, }), - stop: make(chan struct{}), podName: name, + parser: JSONParser{}, } } @@ -58,53 +52,46 @@ func (f *FixedHeightPrinter) Start(ctx context.Context) { buff := bytes.Buffer{} status := printer.NewStatus(&buff, "") status.Step("Streaming logs...") - f.printStatusHeader(buff.String() + "\n") // it's without new line when it's in progress - - resetHeader := func() { - fmt.Print(aec.Save) - fmt.Print(aec.Up(uint(len(f.logsBuffer) + 4))) - f.printStatusHeader(buff.String() + "\n") // it's without new line when it's in progress - fmt.Print(aec.Restore) - } - + termenv.SaveCursorPosition() for { + f.printData(buff.String() + "\n") // it's without new line when it's in progress idleDelay.Reset(refreshDuration) select { case <-f.stop: status.End(true) - fmt.Print(aec.Up(uint(len(f.logsBuffer) + 4))) + termenv.RestoreCursorPosition() f.printStatusHeader(buff.String()) - f.printLogs() + f.printLogs(true) fmt.Println() return case <-ctx.Done(): status.End(false) - fmt.Print(aec.Up(uint(len(f.logsBuffer) + 4))) + termenv.RestoreCursorPosition() + f.printStatusHeader(buff.String()) - f.printLogs() + f.printLogs(true) fmt.Println() return case <-idleDelay.C: - resetHeader() case entry := <-f.newLog: f.logsBuffer = append(f.logsBuffer, entry) - if len(f.logsBuffer) <= f.height { - f.printLogLine(entry) - continue + if len(f.logsBuffer) > f.height { + //now we need to simulate scrolling, so all lines are moved N-1, where the first line is just removed. + f.logsBuffer = f.logsBuffer[1:] } - - // now we need to simulate scrolling, so all lines are moved N-1, where the first line is just removed. - f.logsBuffer = f.logsBuffer[1:] - fmt.Print(aec.Up(uint(len(f.logsBuffer)))) - f.printLogs() case podPhase := <-f.newPodPhase: f.podPhase = podPhase - resetHeader() } + + fmt.Print(aec.Up(uint(f.height + 4))) } } +func (f *FixedHeightPrinter) printData(header string) { + f.printStatusHeader(header) + f.printLogs(false) +} func (f *FixedHeightPrinter) printStatusHeader(step string) { fmt.Println(step) fmt.Printf(" Pods: %s Phase: %s\n", f.podName, f.podPhase) @@ -118,23 +105,39 @@ func (f *FixedHeightPrinter) UpdatePodPhase(phase string) { } } -func (f *FixedHeightPrinter) printLogs() { +func (f *FixedHeightPrinter) printLogs(skip bool) { + wroteLines := 0 for _, item := range f.logsBuffer { - f.printLogLine(item) + fields, lvl := f.parser.ParseLineIntoCharm(item) + if fields == nil { + wroteLines++ + f.printLogLine(item) + continue + } + if lvl == charmlog.DebugLevel && !cli.VerboseMode.IsEnabled() { + continue + } + wroteLines++ + fmt.Print(aec.EraseLine(aec.EraseModes.Tail)) + fmt.Print(aec.Column(6)) + f.logger.With(fields...).Print(nil) + } + + if skip { + return + } + for i := wroteLines; i < f.height; i++ { + f.printLogLine("\n") } } func (f *FixedHeightPrinter) printLogLine(line string) { - fmt.Print(aec.Column(6)) fmt.Print(aec.EraseLine(aec.EraseModes.Tail)) - //fmt.Print(item) - fields := f.parser.ParseLineIntoCharm(line) - f.logger.With(fields...).Print(nil) + fmt.Print(aec.Column(6)) + fmt.Print(line) } func (f *FixedHeightPrinter) AppendLogEntry(entry string) { - //ctx, cancel := context.WithTimeout(context.Background(), time.Second) - //defer cancel() if strings.TrimSpace(entry) == "" { return } diff --git a/internal/kubex/wait.go b/internal/kubex/wait.go new file mode 100644 index 000000000..03d99c8e1 --- /dev/null +++ b/internal/kubex/wait.go @@ -0,0 +1,86 @@ +package kubex + +import ( + "context" + "errors" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" +) + +// WaitForPod watches a given Pod until the exitCondition is true. +func WaitForPod(ctx context.Context, clientset *kubernetes.Clientset, namespace string, name string, exitCondition watchtools.ConditionFunc) error { + selector := labels.SelectorFromSet(map[string]string{"app": name}).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = selector + return clientset.CoreV1().Pods(namespace).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = selector + return clientset.CoreV1().Pods(namespace).Watch(ctx, options) + }, + } + + _, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, exitCondition) + return err +} + +var ( + errPodRestartedWithError = errors.New("pod restarted with non zero exit code") +) + +// PodReady returns true if the Pod is read. +func PodReady(podScheduledIndicator, phase chan string, since time.Time) func(event watch.Event) (bool, error) { + informed := false + sinceK8sTime := metav1.NewTime(since) + return func(event watch.Event) (bool, error) { + switch t := event.Type; t { + case watch.Added, watch.Modified: + switch pod := event.Object.(type) { + case *corev1.Pod: + + createdAt := pod.GetObjectMeta().GetCreationTimestamp() + // we don't care about previously created pods, for example when user do some upgrades, we watch for a new Pod instance only. + if createdAt.Before(&sinceK8sTime) { + return false, nil + } + + phase <- string(pod.Status.Phase) + if pod.Status.Phase == corev1.PodRunning && !informed { + informed = true + podScheduledIndicator <- pod.Name + close(podScheduledIndicator) + } + + for _, cond := range pod.Status.ContainerStatuses { + if cond.Ready == false && cond.RestartCount > 0 { + // pod was already restarted because of the problem, we restart botkube on permanent errors mostly, so let's stop watching + return true, errPodRestartedWithError + } + } + + return isPodReady(pod), nil + } + } + + return false, nil + } +} + +// isPodReady returns true if a pod is ready; false otherwise. +func isPodReady(pod *corev1.Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return true + } + } + return false +} diff --git a/internal/loggerx/logger.go b/internal/loggerx/logger.go index 3cb60e24c..2f109613f 100644 --- a/internal/loggerx/logger.go +++ b/internal/loggerx/logger.go @@ -21,8 +21,28 @@ func New(cfg config.Logger) logrus.FieldLogger { logLevel = logrus.InfoLevel } logger.SetLevel(logLevel) - //logger.Formatter = &logrus.TextFormatter{FullTimestamp: true, DisableColors: cfg.DisableColors} - logger.Formatter = &logrus.JSONFormatter{} + if cfg.Formatter == config.FormatterJson { + logger.Formatter = &logrus.JSONFormatter{} + } else { + logger.Formatter = &logrus.TextFormatter{FullTimestamp: true, DisableColors: cfg.DisableColors, ForceColors: true} + } return logger } + +// ExitOnError exits an app with a given error. +func ExitOnError(err error, context string) { + if err == nil { + return + } + log := &logrus.Logger{ + Out: os.Stdout, + Formatter: &logrus.JSONFormatter{}, + Hooks: make(logrus.LevelHooks), + Level: logrus.InfoLevel, + ExitFunc: os.Exit, + ReportCaller: false, + } + + log.Fatalf("%s: %s", context, err) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index b7965c9dc..74b5e9659 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -520,10 +520,22 @@ type Settings struct { SACredentialsPathPrefix string `yaml:"saCredentialsPathPrefix"` } +// Formatter log formatter +type Formatter string + +const ( + // FormatterText text formatter for logging + FormatterText Formatter = "text" + + // FormatterJson json formatter for logging + FormatterJson Formatter = "json" +) + // Logger holds logger configuration parameters. type Logger struct { - Level string `yaml:"level"` - DisableColors bool `yaml:"disableColors"` + Level string `yaml:"level"` + DisableColors bool `yaml:"disableColors"` + Formatter Formatter `yaml:"formatter"` } // LifecycleServer contains configuration for the server with app lifecycle methods.