Skip to content

Commit

Permalink
修改采集器返回抓取指标的错误信息
Browse files Browse the repository at this point in the history
  • Loading branch information
wuqingtao committed May 21, 2020
1 parent e24eb3f commit cb0c41f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 48 deletions.
2 changes: 1 addition & 1 deletion internal/collector/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func clean(idleTimeout time.Duration, logger *log.Logger) {
return true
})
for _, key := range keys {
outputLogger(logger, "delete metrics of %s", key)
outputLogger(logger, "info", "delete metrics of %s", key)
hosts.Delete(key)
}
}
122 changes: 76 additions & 46 deletions internal/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package collector
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -278,41 +280,50 @@ func totalCPU(hs mo.HostSystem) float64 {
return float64(totalCPU)
}

func outputLogger(logger *log.Logger, format string, v ...interface{}) {
func outputLogger(logger *log.Logger, level, format string, v ...interface{}) {
if logger == nil {
return
}
logger.Printf(format, v...)
switch level {
case "debug":
logger.Debugf(format, v...)
case "info":
logger.Infof(format, v...)
case "warn":
logger.Warnf(format, v...)
case "error":
logger.Errorf(format, v...)
default:
logger.Printf(format, v...)
}
}

const logHostMetrics = `%s, scrape host %s metrics error: %s`

// scrapeHostMetrics 采集宿主机指标
func (vm *VMMetrics) scrapeHostMetrics(ctx context.Context, logger *log.Logger) {
func (vm *VMMetrics) scrapeHostMetrics(ctx context.Context, logger *log.Logger) (err error) {
// outputLogger(logger, "scrape vm %s datastore", vm.host)
defer func() {
if err := recover(); err != nil {
outputLogger(logger, logHostMetrics, "panic", vm.host, err)
if e := recover(); e != nil {
err = fmt.Errorf("scrapeHostMetrics panic: %s", e)
}
}()
c, err := newClient(ctx, vm.host, vm.username, vm.password)
if err != nil {
logger.Printf(logHostMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logHostMetrics, vm.host, err)
return fmt.Errorf("scrapeHostMetrics create vm client error: %w", err)
}
defer c.Logout(ctx)
m := view.NewManager(c.Client)
v, err := m.CreateContainerView(ctx, c.ServiceContent.RootFolder, []string{"HostSystem"}, true)
if err != nil {
logger.Printf(logHostMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logHostMetrics, vm.host, err)
return fmt.Errorf("scrapeHostMetrics createContainerView error: %w", err)
}
defer v.Destroy(ctx)
var hss []mo.HostSystem
err = v.Retrieve(ctx, []string{"HostSystem"}, []string{"summary"}, &hss)
if err != nil {
logger.Printf(logHostMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logHostMetrics, vm.host, err)
return fmt.Errorf("scrapeHostMetrics retrieve error: %w", err)
}
for _, hs := range hss {
vm.prometheusHostPowerState.WithLabelValues(vm.host).Set(powerState(hs.Summary.Runtime.PowerState))
Expand All @@ -326,19 +337,19 @@ func (vm *VMMetrics) scrapeHostMetrics(ctx context.Context, logger *log.Logger)
finder := find.NewFinder(c.Client)
hs, err := finder.DefaultHostSystem(ctx)
if err != nil {
logger.Printf(logHostMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logHostMetrics, vm.host, err)
return fmt.Errorf("scrapeHostMetrics find.DefaultHostSystem error: %w", err)
}
ss, err := hs.ConfigManager().StorageSystem(ctx)
if err != nil {
logger.Printf(logHostMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logHostMetrics, vm.host, err)
return fmt.Errorf("scrapeHostMetrics read storage error: %w", err)
}
var hostss mo.HostStorageSystem
err = ss.Properties(ctx, ss.Reference(), nil, &hostss)
if err != nil {
logger.Printf(logHostMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logHostMetrics, vm.host, err)
return fmt.Errorf("scrapeHostMetrics read storage error: %w", err)
}
for _, e := range hostss.StorageDeviceInfo.ScsiLun {
lun := e.GetScsiLun()
Expand All @@ -351,72 +362,70 @@ func (vm *VMMetrics) scrapeHostMetrics(ctx context.Context, logger *log.Logger)
}
vm.prometheusDiskOk.WithLabelValues(vm.host, lun.DeviceName).Set(ok)
}
return nil
}

const logDsMetrics = `%s, scrape datastore %s metrics error: %s`

// scrapeDsMetrics 采集datastore数据
func (vm *VMMetrics) scrapeDsMetrics(ctx context.Context, logger *log.Logger) {
func (vm *VMMetrics) scrapeDsMetrics(ctx context.Context, logger *log.Logger) (err error) {
// outputLogger(logger, "scrape vm %s datastore", vm.host)
defer func() {
if err := recover(); err != nil {
outputLogger(logger, logDsMetrics, "panic", vm.host, err)
if e := recover(); e != nil {
err = fmt.Errorf("scrapeDsMetrics panic: %s", e)
}
}()
c, err := newClient(ctx, vm.host, vm.username, vm.password)
if err != nil {
logger.Printf(logDsMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logDsMetrics, vm.host, err)
return fmt.Errorf("scrapeDsMetrics create client error: %w", err)
}
defer c.Logout(ctx)
m := view.NewManager(c.Client)
v, err := m.CreateContainerView(ctx, c.ServiceContent.RootFolder, []string{"Datastore"}, true)
if err != nil {
logger.Printf(logDsMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logDsMetrics, vm.host, err)
return fmt.Errorf("scrapeDsMetrics createContainerView error: %w", err)
}
defer v.Destroy(ctx)
var dss []mo.Datastore
err = v.Retrieve(ctx, []string{"Datastore"}, []string{"summary"}, &dss)
if err != nil {
logger.Printf(logDsMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logDsMetrics, vm.host, err)
return fmt.Errorf("scrapeDsMetrics retrieve error: %w", err)
}
for _, ds := range dss {
dsname := ds.Summary.Name
vm.prometheusTotalDs.WithLabelValues(dsname, vm.host).Set(float64(ds.Summary.Capacity))
vm.prometheusUsageDs.WithLabelValues(dsname, vm.host).Set(float64(ds.Summary.FreeSpace))
}
return nil
}

const logGuestMetrics = `%s, scrape vm %s metrics error: %s`

// scrapeGuestMetrics 采集虚拟机数据
func (vm *VMMetrics) scrapeGuestMetrics(ctx context.Context, logger *log.Logger) {
func (vm *VMMetrics) scrapeGuestMetrics(ctx context.Context, logger *log.Logger) (err error) {
// outputLogger(logger, "scrape vm %s guest", vm.host)
defer func() {
if err := recover(); err != nil {
outputLogger(logger, logGuestMetrics, "panic", vm.host, err)
if e := recover(); e != nil {
err = fmt.Errorf("scrapeGuestMetric spanic: %s", e)
}
}()
c, err := newClient(ctx, vm.host, vm.username, vm.password)
if err != nil {
logger.Printf(logGuestMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logGuestMetrics, vm.host, err)
return fmt.Errorf("scrapeGuestMetric create client error: %w", err)
}
defer c.Logout(ctx)
m := view.NewManager(c.Client)
v, err := m.CreateContainerView(ctx, c.ServiceContent.RootFolder, []string{"VirtualMachine"}, true)
if err != nil {
logger.Printf(logGuestMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logGuestMetrics, vm.host, err)
return fmt.Errorf("scrapeGuestMetric createContainerView error: %w", err)
}
defer v.Destroy(ctx)
var vms []mo.VirtualMachine
err = v.Retrieve(ctx, []string{"VirtualMachine"}, []string{"summary"}, &vms)
if err != nil {
logger.Printf(logGuestMetrics, "error", vm.host, err)
return
// outputLogger(logger, "warn", logGuestMetrics, vm.host, err)
return fmt.Errorf("scrapeGuestMetric retrieve error: %w", err)
}
for _, v := range vms {
vmname := v.Summary.Config.Name
Expand All @@ -427,29 +436,50 @@ func (vm *VMMetrics) scrapeGuestMetrics(ctx context.Context, logger *log.Logger)
vm.prometheusVMMemAval.WithLabelValues(vmname, vm.host).Set(float64(v.Summary.Config.MemorySizeMB))
vm.prometheusVMMemUsage.WithLabelValues(vmname, vm.host).Set(float64(v.Summary.QuickStats.GuestMemoryUsage) * 1024 * 1024)
}
return nil
}

// Scrape 采集虚拟机数据
func (vm *VMMetrics) Scrape(ctx context.Context, logger *log.Logger) {
func (vm *VMMetrics) Scrape(ctx context.Context, logger *log.Logger) error {
ct := time.Now()
// 设置最后一次请求时间
vm.last = ct
errs := make(chan error, 3)
var wg sync.WaitGroup
wg.Add(3)
go func() {
vm.scrapeHostMetrics(ctx, logger)
errs <- vm.scrapeHostMetrics(ctx, logger)
wg.Done()
}()
go func() {
vm.scrapeDsMetrics(ctx, logger)
errs <- vm.scrapeDsMetrics(ctx, logger)
wg.Done()
}()
go func() {
vm.scrapeGuestMetrics(ctx, logger)
errs <- vm.scrapeGuestMetrics(ctx, logger)
wg.Done()
}()
ss := make([]string, 0, 3)
go func() {
for {
err, ok := <-errs
// 通道关闭
if !ok {
outputLogger(logger, "debug", "get metrics of %s in %s done", vm.host, time.Since(ct))
break
}
if err == nil {
continue
}
ss = append(ss, err.Error())
}
}()
wg.Wait()
logger.Infof("scrape %s %s", vm.host, time.Since(ct))
close(errs)
if len(ss) == 0 {
return nil
}
return errors.New(strings.Join(ss, ","))
}

// Handle 返回数据
Expand Down
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func handleMulti(w http.ResponseWriter, r *http.Request) {
// Handle 处理抓取请求
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(d)*time.Second)
defer cancel()
vm.Scrape(ctx, logger)
if err := vm.Scrape(ctx, logger); err != nil {
logger.Errorf("vm.Scrape %s", err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "%s", err)
return
}
vm.Handle(w, r)
}

Expand Down

0 comments on commit cb0c41f

Please sign in to comment.