Skip to content

Commit

Permalink
[ingester] split off-cpu profile by splitting granularity
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed May 27, 2024
1 parent c5ffc42 commit 5858c30
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 44 deletions.
31 changes: 17 additions & 14 deletions server/ingester/profile/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,24 @@ import (
var log = logging.MustGetLogger("profile.config")

type Config struct {
Base *config.Config
CKWriterConfig config.CKWriterConfig `yaml:"profile-ck-writer"`
ProfileTTL int `yaml:"profile-ttl-hour"`
DecoderQueueCount int `yaml:"profile-decoder-queue-count"`
DecoderQueueSize int `yaml:"profile-decoder-queue-size"`
CompressionAlgorithm *string `yaml:"profile-compression-algorithm"`
Base *config.Config
CKWriterConfig config.CKWriterConfig `yaml:"profile-ck-writer"`
ProfileTTL int `yaml:"profile-ttl-hour"`
DecoderQueueCount int `yaml:"profile-decoder-queue-count"`
DecoderQueueSize int `yaml:"profile-decoder-queue-size"`
CompressionAlgorithm *string `yaml:"profile-compression-algorithm"`
OffCpuSplittingGranularity int `yaml:"profile-off-cpu-splitting-granularity"`
}

type ProfileConfig struct {
Profile Config `yaml:"ingester"`
}

const (
DefaultProfileTTL = 72 // hour
DefaultDecoderQueueCount = 2
DefaultDecoderQueueSize = 1 << 14
DefaultProfileTTL = 72 // hour
DefaultDecoderQueueCount = 2
DefaultDecoderQueueSize = 1 << 14
DefaultOffCpuSplittingGranularity = 1
)

func (c *Config) Validate() error {
Expand Down Expand Up @@ -72,11 +74,12 @@ func (c *Config) Validate() error {
func Load(base *config.Config, path string) *Config {
config := &ProfileConfig{
Profile: Config{
Base: base,
CKWriterConfig: config.CKWriterConfig{QueueCount: 1, QueueSize: 100000, BatchSize: 51200, FlushTimeout: 5},
ProfileTTL: DefaultProfileTTL,
DecoderQueueCount: DefaultDecoderQueueCount,
DecoderQueueSize: DefaultDecoderQueueSize,
Base: base,
CKWriterConfig: config.CKWriterConfig{QueueCount: 1, QueueSize: 100000, BatchSize: 51200, FlushTimeout: 5},
ProfileTTL: DefaultProfileTTL,
DecoderQueueCount: DefaultDecoderQueueCount,
DecoderQueueSize: DefaultDecoderQueueSize,
OffCpuSplittingGranularity: DefaultOffCpuSplittingGranularity,
},
}
if _, err := os.Stat(path); os.IsNotExist(err) {
Expand Down
18 changes: 14 additions & 4 deletions server/ingester/profile/dbwriter/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ func ProfileColumns() []*ckdb.Column {
ckdb.NewColumn("ip6", ckdb.IPv6).SetComment("IPV6地址"),
ckdb.NewColumn("is_ipv4", ckdb.UInt8).SetComment("是否为IPv4地址").SetIndex(ckdb.IndexMinmax),

ckdb.NewColumn("app_service", ckdb.String).SetComment("应用名称, 用户配置上报"),
ckdb.NewColumn("app_service", ckdb.LowCardinalityString).SetComment("应用名称, 用户配置上报"),
ckdb.NewColumn("profile_location_str", ckdb.String).SetComment("单次 profile 堆栈"),
ckdb.NewColumn("profile_value", ckdb.Int64).SetComment("profile self value"),
ckdb.NewColumn("profile_value_unit", ckdb.String).SetComment("profile value 的单位"),
ckdb.NewColumn("profile_event_type", ckdb.String).SetComment("剖析类型"),
ckdb.NewColumn("profile_value_unit", ckdb.LowCardinalityString).SetComment("profile value 的单位"),
ckdb.NewColumn("profile_event_type", ckdb.LowCardinalityString).SetComment("剖析类型"),
ckdb.NewColumn("profile_create_timestamp", ckdb.DateTime64us).SetIndex(ckdb.IndexSet).SetComment("client 端聚合时间"),
ckdb.NewColumn("profile_in_timestamp", ckdb.DateTime64us).SetComment("DeepFlow 的写入时间,同批上报的批次数据具备相同的值"),
ckdb.NewColumn("profile_language_type", ckdb.String).SetComment("语言类型"),
ckdb.NewColumn("profile_language_type", ckdb.LowCardinalityString).SetComment("语言类型"),
ckdb.NewColumn("profile_id", ckdb.String).SetComment("含义等同 l7_flow_log 的 span_id"),
ckdb.NewColumn("trace_id", ckdb.String).SetComment("含义等同 l7_flow_log 的 trace_id"),
ckdb.NewColumn("span_name", ckdb.String).SetComment("含义等同 l7_flow_log 的 endpoint"),
Expand Down Expand Up @@ -271,6 +271,16 @@ func ReleaseInProcess(p *InProcessProfile) {
poolInProcess.Put(p)
}

func (p *InProcessProfile) Clone() *InProcessProfile {
c := AcquireInProcess()
*c = *p
c.TagNames = make([]string, len(p.TagNames))
copy(p.TagNames, p.TagNames)
c.TagValues = make([]string, len(p.TagValues))
copy(p.TagValues, p.TagValues)
return c
}

func (p *InProcessProfile) FillProfile(input *storage.PutInput,
platformData *grpc.PlatformInfoTable,
vtapID uint16,
Expand Down
8 changes: 4 additions & 4 deletions server/ingester/profile/dbwriter/profile_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ func (p *ProfileWriter) GetCounter() interface{} {
return counter
}

func (p *ProfileWriter) Write(m interface{}) {
inProcess := m.(*InProcessProfile)
func (p *ProfileWriter) Write(m []interface{}) {
inProcess := m[0].(*InProcessProfile)
inProcess.GenerateFlowTags(p.flowTagWriter.Cache)
p.flowTagWriter.WriteFieldsAndFieldValuesInCache()

atomic.AddInt64(&p.counter.ProfilesCount, 1)
p.ckWriter.Put(m)
atomic.AddInt64(&p.counter.ProfilesCount, int64(len(m)))
p.ckWriter.Put(m...)
}

func NewProfileWriter(msgType datatype.MessageType, decoderIndex int, config *config.Config) (*ProfileWriter, error) {
Expand Down
37 changes: 21 additions & 16 deletions server/ingester/profile/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,26 @@ type Decoder struct {
profileWriter *dbwriter.ProfileWriter
compressionAlgo string

offCpuSplittingGranularity int

counter *Counter
utils.Closable
}

func NewDecoder(index int, msgType datatype.MessageType, compressionAlgo string,
offCpuSplittingGranularity int,
platformData *grpc.PlatformInfoTable,
inQueue queue.QueueReader,
profileWriter *dbwriter.ProfileWriter) *Decoder {
return &Decoder{
index: index,
msgType: msgType,
platformData: platformData,
inQueue: inQueue,
profileWriter: profileWriter,
compressionAlgo: compressionAlgo,
counter: &Counter{},
index: index,
msgType: msgType,
platformData: platformData,
inQueue: inQueue,
profileWriter: profileWriter,
compressionAlgo: compressionAlgo,
offCpuSplittingGranularity: offCpuSplittingGranularity,
counter: &Counter{},
}
}

Expand Down Expand Up @@ -159,15 +163,16 @@ func (d *Decoder) handleProfileData(vtapID uint16, decoder *codec.SimpleDecoder)
}

parser := &Parser{
vtapID: vtapID,
inTimestamp: time.Now(),
callBack: d.profileWriter.Write,
platformData: d.platformData,
IP: make([]byte, len(profile.Ip)),
podID: profile.PodId,
compressionAlgo: d.compressionAlgo,
observer: &observer{},
Counter: d.counter,
vtapID: vtapID,
inTimestamp: time.Now(),
callBack: d.profileWriter.Write,
platformData: d.platformData,
IP: make([]byte, len(profile.Ip)),
podID: profile.PodId,
compressionAlgo: d.compressionAlgo,
observer: &observer{},
offCpuSplittingGranularity: d.offCpuSplittingGranularity,
Counter: d.counter,
}
copy(parser.IP, profile.Ip[:len(profile.Ip)])

Expand Down
43 changes: 37 additions & 6 deletions server/ingester/profile/decoder/decoder_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/deepflowio/deepflow/server/ingester/profile/common"
"github.com/deepflowio/deepflow/server/ingester/profile/dbwriter"
"github.com/deepflowio/deepflow/server/libs/flow-metrics/pb"
"github.com/deepflowio/deepflow/server/libs/grpc"
"github.com/deepflowio/deepflow/server/libs/utils"
"github.com/klauspost/compress/zstd"
Expand All @@ -39,7 +40,9 @@ type Parser struct {
podID uint32

// profileWriter.Write
callBack func(interface{})
callBack func([]interface{})
writeItemCache []interface{}
offCpuSplittingGranularity int

platformData *grpc.PlatformInfoTable
inTimestamp time.Time
Expand Down Expand Up @@ -75,7 +78,7 @@ func (p *Parser) Evaluate(i *storage.PutInput) (storage.SampleObserver, bool) {
return p.observer, true
}

func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value uint64) *dbwriter.InProcessProfile {
func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value uint64) []interface{} { // []*dbwriter.InProcessProfile {
labels := input.Key.Labels()
tagNames := make([]string, 0, len(labels))
tagValues := make([]string, 0, len(labels))
Expand Down Expand Up @@ -105,9 +108,9 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value
location := compress(onelineStack, p.compressionAlgo)
atomic.AddInt64(&p.Counter.CompressedSize, int64(len(location)))

profileValue := value
profileValueUs := int64(value)
if p.processTracer != nil {
profileValue = uint64(p.value)
profileValueUs = int64(p.value)
pid = p.processTracer.pid
stime = p.processTracer.stime
eventType = p.processTracer.eventType
Expand All @@ -122,15 +125,43 @@ func (p *Parser) stackToInProcess(input *storage.PutInput, stack []string, value
eventType,
location,
p.compressionAlgo,
int64(profileValue),
profileValueUs,
p.inTimestamp,
spyMap[input.SpyName],
pid,
stime,
tagNames,
tagValues)

return ret
granularityUs := int64(p.offCpuSplittingGranularity) * int64(time.Second/time.Microsecond)
if ret.ProfileEventType == pb.ProfileEventType_EbpfOffCpu.String() &&
p.offCpuSplittingGranularity > 0 &&
profileValueUs > granularityUs {

splitCount := profileValueUs / granularityUs
if profileValueUs%granularityUs > 0 {
splitCount++
}
for i := int64(0); i < splitCount-1; i++ {
splitItem := ret.Clone()
splitItem.Time = splitItem.Time + uint32(i)*uint32(p.offCpuSplittingGranularity)
splitItem.ProfileCreateTimestamp = splitItem.ProfileCreateTimestamp + i*granularityUs
splitItem.ProfileValue = granularityUs
p.writeItemCache = append(p.writeItemCache, splitItem)
}
// test
p.writeItemCache = append(p.writeItemCache, ret)
ret = ret.Clone()
// add ret for last split item
ret.Time = ret.Time + uint32(splitCount-1)*uint32(p.offCpuSplittingGranularity)
ret.ProfileCreateTimestamp = ret.ProfileCreateTimestamp + (splitCount-1)*granularityUs
ret.ProfileValue = profileValueUs - (splitCount-1)*granularityUs
p.writeItemCache = append(p.writeItemCache, ret)
} else {
p.writeItemCache = append(p.writeItemCache, ret)
}

return p.writeItemCache
}

type observer struct {
Expand Down
1 change: 1 addition & 0 deletions server/ingester/profile/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func NewProfiler(msgType datatype.MessageType, config *config.Config, platformDa
i,
msgType,
*config.CompressionAlgorithm,
config.OffCpuSplittingGranularity,
platformDatas[i],
queue.QueueReader(decodeQueues.FixedMultiQueue[i]),
profileWriter,
Expand Down
3 changes: 3 additions & 0 deletions server/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ ingester:
## profile compression algorithm, default is zstd, empty string for not compress
#profile-compression-algorithm: "zstd"

## off-cpu pofile splitting granularity, 0 mean disable splitting (unit: second)
# profile-off-cpu-splitting-granularity: 1

## 默认读超时,修改数据保留时长时使用
#ck-read-timeout: 300

Expand Down

0 comments on commit 5858c30

Please sign in to comment.