From c98daff199d704b1242cc688fcb9097970237bbc Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Mon, 27 May 2024 18:09:28 +0800 Subject: [PATCH] [ingester] split off-cpu profile by splitting granularity --- server/ingester/profile/config/config.go | 31 +++++++------ server/ingester/profile/dbwriter/profile.go | 18 ++++++-- .../profile/dbwriter/profile_writer.go | 8 ++-- server/ingester/profile/decoder/decoder.go | 37 +++++++++------- .../profile/decoder/decoder_parser.go | 43 ++++++++++++++++--- server/ingester/profile/profile/profile.go | 1 + server/server.yaml | 3 ++ 7 files changed, 97 insertions(+), 44 deletions(-) diff --git a/server/ingester/profile/config/config.go b/server/ingester/profile/config/config.go index 58d12641671..a790bd82316 100644 --- a/server/ingester/profile/config/config.go +++ b/server/ingester/profile/config/config.go @@ -28,12 +28,13 @@ import ( var log = logging.MustGetLogger("profile.config") type Config struct { - Base *config.Config - CKWriterConfig config.CKWriterConfig `yaml:"profile-ck-writer"` - ProfileTTL int `yaml:"profile-ttl-hour"` - DecoderQueueCount int `yaml:"profile-decoder-queue-count"` - DecoderQueueSize int `yaml:"profile-decoder-queue-size"` - CompressionAlgorithm *string `yaml:"profile-compression-algorithm"` + Base *config.Config + CKWriterConfig config.CKWriterConfig `yaml:"profile-ck-writer"` + ProfileTTL int `yaml:"profile-ttl-hour"` + DecoderQueueCount int `yaml:"profile-decoder-queue-count"` + DecoderQueueSize int `yaml:"profile-decoder-queue-size"` + CompressionAlgorithm *string `yaml:"profile-compression-algorithm"` + OffCpuSplittingGranularity int `yaml:"profile-off-cpu-splitting-granularity"` } type ProfileConfig struct { @@ -41,9 +42,10 @@ type ProfileConfig struct { } const ( - DefaultProfileTTL = 72 // hour - DefaultDecoderQueueCount = 2 - DefaultDecoderQueueSize = 1 << 14 + DefaultProfileTTL = 72 // hour + DefaultDecoderQueueCount = 2 + DefaultDecoderQueueSize = 1 << 14 + DefaultOffCpuSplittingGranularity = 1 ) func (c *Config) Validate() error { @@ -72,11 +74,12 @@ func (c *Config) Validate() error { func Load(base *config.Config, path string) *Config { config := &ProfileConfig{ Profile: Config{ - Base: base, - CKWriterConfig: config.CKWriterConfig{QueueCount: 1, QueueSize: 100000, BatchSize: 51200, FlushTimeout: 5}, - ProfileTTL: DefaultProfileTTL, - DecoderQueueCount: DefaultDecoderQueueCount, - DecoderQueueSize: DefaultDecoderQueueSize, + Base: base, + CKWriterConfig: config.CKWriterConfig{QueueCount: 1, QueueSize: 100000, BatchSize: 51200, FlushTimeout: 5}, + ProfileTTL: DefaultProfileTTL, + DecoderQueueCount: DefaultDecoderQueueCount, + DecoderQueueSize: DefaultDecoderQueueSize, + OffCpuSplittingGranularity: DefaultOffCpuSplittingGranularity, }, } if _, err := os.Stat(path); os.IsNotExist(err) { diff --git a/server/ingester/profile/dbwriter/profile.go b/server/ingester/profile/dbwriter/profile.go index 0ea759e0f24..b6d59c619e8 100644 --- a/server/ingester/profile/dbwriter/profile.go +++ b/server/ingester/profile/dbwriter/profile.go @@ -131,14 +131,14 @@ func ProfileColumns() []*ckdb.Column { ckdb.NewColumn("ip6", ckdb.IPv6).SetComment("IPV6地址"), ckdb.NewColumn("is_ipv4", ckdb.UInt8).SetComment("是否为IPv4地址").SetIndex(ckdb.IndexMinmax), - ckdb.NewColumn("app_service", ckdb.String).SetComment("应用名称, 用户配置上报"), + ckdb.NewColumn("app_service", ckdb.LowCardinalityString).SetComment("应用名称, 用户配置上报"), ckdb.NewColumn("profile_location_str", ckdb.String).SetComment("单次 profile 堆栈"), ckdb.NewColumn("profile_value", ckdb.Int64).SetComment("profile self value"), - ckdb.NewColumn("profile_value_unit", ckdb.String).SetComment("profile value 的单位"), - ckdb.NewColumn("profile_event_type", ckdb.String).SetComment("剖析类型"), + ckdb.NewColumn("profile_value_unit", ckdb.LowCardinalityString).SetComment("profile value 的单位"), + ckdb.NewColumn("profile_event_type", ckdb.LowCardinalityString).SetComment("剖析类型"), ckdb.NewColumn("profile_create_timestamp", ckdb.DateTime64us).SetIndex(ckdb.IndexSet).SetComment("client 端聚合时间"), ckdb.NewColumn("profile_in_timestamp", ckdb.DateTime64us).SetComment("DeepFlow 的写入时间,同批上报的批次数据具备相同的值"), - ckdb.NewColumn("profile_language_type", ckdb.String).SetComment("语言类型"), + ckdb.NewColumn("profile_language_type", ckdb.LowCardinalityString).SetComment("语言类型"), ckdb.NewColumn("profile_id", ckdb.String).SetComment("含义等同 l7_flow_log 的 span_id"), ckdb.NewColumn("trace_id", ckdb.String).SetComment("含义等同 l7_flow_log 的 trace_id"), ckdb.NewColumn("span_name", ckdb.String).SetComment("含义等同 l7_flow_log 的 endpoint"), @@ -271,6 +271,16 @@ func ReleaseInProcess(p *InProcessProfile) { poolInProcess.Put(p) } +func (p *InProcessProfile) Clone() *InProcessProfile { + c := AcquireInProcess() + *c = *p + c.TagNames = make([]string, len(p.TagNames)) + copy(p.TagNames, p.TagNames) + c.TagValues = make([]string, len(p.TagValues)) + copy(p.TagValues, p.TagValues) + return c +} + func (p *InProcessProfile) FillProfile(input *storage.PutInput, platformData *grpc.PlatformInfoTable, vtapID uint16, diff --git a/server/ingester/profile/dbwriter/profile_writer.go b/server/ingester/profile/dbwriter/profile_writer.go index d243fa9baf3..af4cb9cd9fc 100644 --- a/server/ingester/profile/dbwriter/profile_writer.go +++ b/server/ingester/profile/dbwriter/profile_writer.go @@ -73,13 +73,13 @@ func (p *ProfileWriter) GetCounter() interface{} { return counter } -func (p *ProfileWriter) Write(m interface{}) { - inProcess := m.(*InProcessProfile) +func (p *ProfileWriter) Write(m []interface{}) { + inProcess := m[0].(*InProcessProfile) inProcess.GenerateFlowTags(p.flowTagWriter.Cache) p.flowTagWriter.WriteFieldsAndFieldValuesInCache() - atomic.AddInt64(&p.counter.ProfilesCount, 1) - p.ckWriter.Put(m) + atomic.AddInt64(&p.counter.ProfilesCount, int64(len(m))) + p.ckWriter.Put(m...) } func NewProfileWriter(msgType datatype.MessageType, decoderIndex int, config *config.Config) (*ProfileWriter, error) { diff --git a/server/ingester/profile/decoder/decoder.go b/server/ingester/profile/decoder/decoder.go index 704d151dfa2..e9194f7f711 100644 --- a/server/ingester/profile/decoder/decoder.go +++ b/server/ingester/profile/decoder/decoder.go @@ -92,22 +92,26 @@ type Decoder struct { profileWriter *dbwriter.ProfileWriter compressionAlgo string + offCpuSplittingGranularity int + counter *Counter utils.Closable } func NewDecoder(index int, msgType datatype.MessageType, compressionAlgo string, + offCpuSplittingGranularity int, platformData *grpc.PlatformInfoTable, inQueue queue.QueueReader, profileWriter *dbwriter.ProfileWriter) *Decoder { return &Decoder{ - index: index, - msgType: msgType, - platformData: platformData, - inQueue: inQueue, - profileWriter: profileWriter, - compressionAlgo: compressionAlgo, - counter: &Counter{}, + index: index, + msgType: msgType, + platformData: platformData, + inQueue: inQueue, + profileWriter: profileWriter, + compressionAlgo: compressionAlgo, + offCpuSplittingGranularity: offCpuSplittingGranularity, + counter: &Counter{}, } } @@ -159,15 +163,16 @@ func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder) } parser := &Parser{ - vtapID: vtapID, - inTimestamp: time.Now(), - callBack: d.profileWriter.Write, - platformData: d.platformData, - IP: make([]byte, len(profile.Ip)), - podID: profile.PodId, - compressionAlgo: d.compressionAlgo, - observer: &observer{}, - Counter: d.counter, + vtapID: vtapID, + inTimestamp: time.Now(), + callBack: d.profileWriter.Write, + platformData: d.platformData, + IP: make([]byte, len(profile.Ip)), + podID: profile.PodId, + compressionAlgo: d.compressionAlgo, + observer: &observer{}, + offCpuSplittingGranularity: d.offCpuSplittingGranularity, + Counter: d.counter, } copy(parser.IP, profile.Ip[:len(profile.Ip)]) diff --git a/server/ingester/profile/decoder/decoder_parser.go b/server/ingester/profile/decoder/decoder_parser.go index f8b7f3fc1a7..a1f7d297b6d 100644 --- a/server/ingester/profile/decoder/decoder_parser.go +++ b/server/ingester/profile/decoder/decoder_parser.go @@ -26,6 +26,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/profile/common" "github.com/deepflowio/deepflow/server/ingester/profile/dbwriter" + "github.com/deepflowio/deepflow/server/libs/flow-metrics/pb" "github.com/deepflowio/deepflow/server/libs/grpc" "github.com/deepflowio/deepflow/server/libs/utils" "github.com/klauspost/compress/zstd" @@ -39,7 +40,9 @@ type Parser struct { podID uint32 // profileWriter.Write - callBack func(interface{}) + callBack func([]interface{}) + writeItemCache []interface{} + offCpuSplittingGranularity int platformData *grpc.PlatformInfoTable inTimestamp time.Time @@ -75,7 +78,7 @@ func (p *Parser) Evaluate(i *storage.PutInput) (storage.SampleObserver, bool) { return p.observer, true } -func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value uint64) *dbwriter.InProcessProfile { +func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value uint64) []interface{} { // []*dbwriter.InProcessProfile { labels := input.Key.Labels() tagNames := make([]string, 0, len(labels)) tagValues := make([]string, 0, len(labels)) @@ -105,9 +108,9 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value location := compress(onelineStack, p.compressionAlgo) atomic.AddInt64(&p.Counter.CompressedSize, int64(len(location))) - profileValue := value + profileValueUs := int64(value) if p.processTracer != nil { - profileValue = uint64(p.value) + profileValueUs = int64(p.value) pid = p.processTracer.pid stime = p.processTracer.stime eventType = p.processTracer.eventType @@ -122,7 +125,7 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value eventType, location, p.compressionAlgo, - int64(profileValue), + profileValueUs, p.inTimestamp, spyMap[input.SpyName], pid, @@ -130,7 +133,35 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value tagNames, tagValues) - return ret + granularityUs := int64(p.offCpuSplittingGranularity) * int64(time.Second/time.Microsecond) + if ret.ProfileEventType == pb.ProfileEventType_EbpfOffCpu.String() && + p.offCpuSplittingGranularity > 0 && + profileValueUs > granularityUs { + + splitCount := profileValueUs / granularityUs + if profileValueUs%granularityUs > 0 { + splitCount++ + } + for i := int64(0); i < splitCount-1; i++ { + splitItem := ret.Clone() + splitItem.Time = splitItem.Time + uint32(i)*uint32(p.offCpuSplittingGranularity) + splitItem.ProfileCreateTimestamp = splitItem.ProfileCreateTimestamp + i*granularityUs + splitItem.ProfileValue = granularityUs + p.writeItemCache = append(p.writeItemCache, splitItem) + } + // test + p.writeItemCache = append(p.writeItemCache, ret) + ret = ret.Clone() + // add ret for last split item + ret.Time = ret.Time + uint32(splitCount-1)*uint32(p.offCpuSplittingGranularity) + ret.ProfileCreateTimestamp = ret.ProfileCreateTimestamp + (splitCount-1)*granularityUs + ret.ProfileValue = profileValueUs - (splitCount-1)*granularityUs + p.writeItemCache = append(p.writeItemCache, ret) + } else { + p.writeItemCache = append(p.writeItemCache, ret) + } + + return p.writeItemCache } type observer struct { diff --git a/server/ingester/profile/profile/profile.go b/server/ingester/profile/profile/profile.go index eb2b20dec98..80c0367d622 100644 --- a/server/ingester/profile/profile/profile.go +++ b/server/ingester/profile/profile/profile.go @@ -82,6 +82,7 @@ func NewProfiler(msgType datatype.MessageType, config *config.Config, platformDa i, msgType, *config.CompressionAlgorithm, + config.OffCpuSplittingGranularity, platformDatas[i], queue.QueueReader(decodeQueues.FixedMultiQueue[i]), profileWriter, diff --git a/server/server.yaml b/server/server.yaml index b75bfc57e18..75d843076df 100644 --- a/server/server.yaml +++ b/server/server.yaml @@ -524,6 +524,9 @@ ingester: ## profile compression algorithm, default is zstd, empty string for not compress #profile-compression-algorithm: "zstd" + ## off-cpu pofile splitting granularity, 0 mean disable splitting (unit: second) + # profile-off-cpu-splitting-granularity: 1 + ## 默认读超时,修改数据保留时长时使用 #ck-read-timeout: 300