Skip to content

Commit

Permalink
[ingester] prioritize getting teamId/orgId from message header
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed May 28, 2024
1 parent c98daff commit 8e8ff7e
Show file tree
Hide file tree
Showing 20 changed files with 156 additions and 65 deletions.
12 changes: 10 additions & 2 deletions server/ingester/app_log/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Decoder struct {
debugEnabled bool
config *config.Config
appLogEntrysCache []AppLogEntry
orgId, teamId uint16

counter *Counter
utils.Closable
Expand Down Expand Up @@ -170,6 +171,7 @@ func (d *Decoder) Run() {
continue
}
decoder.Init(recvBytes.Buffer[recvBytes.Begin:recvBytes.End])
d.orgId, d.teamId = uint16(recvBytes.OrgID), uint16(recvBytes.TeamID)
switch d.msgType {
case datatype.MESSAGE_TYPE_APPLICATION_LOG:
d.handleAppLog(recvBytes.VtapID, decoder)
Expand Down Expand Up @@ -248,7 +250,10 @@ func (d *Decoder) WriteAgentLog(agentId uint16, bs []byte) error {
s.AttributeNames = append(s.AttributeNames, "module")
s.AttributeValues = append(s.AttributeValues, string(columns[4]))

s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(agentId)
s.OrgId, s.TeamID = d.orgId, d.teamId
if s.OrgId == ckdb.INVALID_ORG_ID || s.TeamID == ckdb.INVALID_TEAM_ID {
s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(agentId)
}
d.logWriter.Write(s)
return nil
}
Expand Down Expand Up @@ -280,7 +285,10 @@ func (d *Decoder) WriteAppLog(agentId uint16, l *AppLogEntry) error {
case dbwriter.LOG_TYPE_AUDIT:
s.OrgId, s.TeamID = uint16(l.OrgID), ckdb.INVALID_TEAM_ID
default:
s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(agentId)
s.OrgId, s.TeamID = d.orgId, d.teamId
if s.OrgId == ckdb.INVALID_ORG_ID || s.TeamID == ckdb.INVALID_TEAM_ID {
s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(agentId)
}
}

s.L3EpcID = d.platformData.QueryVtapEpc0(agentId)
Expand Down
11 changes: 10 additions & 1 deletion server/ingester/event/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/event/dbwriter"
"github.com/deepflowio/deepflow/server/ingester/exporters"
exporterscommon "github.com/deepflowio/deepflow/server/ingester/exporters/common"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/codec"
"github.com/deepflowio/deepflow/server/libs/eventapi"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
Expand Down Expand Up @@ -67,6 +68,8 @@ type Decoder struct {
debugEnabled bool
config *config.Config

orgId, teamId uint16

counter *Counter
utils.Closable
}
Expand Down Expand Up @@ -143,6 +146,7 @@ func (d *Decoder) Run() {
continue
}
decoder.Init(recvBytes.Buffer[recvBytes.Begin:recvBytes.End])
d.orgId, d.teamId = uint16(recvBytes.OrgID), uint16(recvBytes.TeamID)
d.handlePerfEvent(recvBytes.VtapID, decoder)
receiver.ReleaseRecvBuffer(recvBytes)
case common.ALARM_EVENT:
Expand All @@ -161,6 +165,7 @@ func (d *Decoder) Run() {
continue
}
decoder.Init(recvBytes.Buffer[recvBytes.Begin:recvBytes.End])
d.orgId, d.teamId = uint16(recvBytes.OrgID), uint16(recvBytes.TeamID)
d.handleK8sEvent(recvBytes.VtapID, decoder)
receiver.ReleaseRecvBuffer(recvBytes)
}
Expand Down Expand Up @@ -195,7 +200,10 @@ func (d *Decoder) WritePerfEvent(vtapId uint16, e *pb.ProcEvent) {
s.Duration = uint64(s.EndTime - s.StartTime)
}
s.VTAPID = vtapId
s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(vtapId)
s.OrgId, s.TeamID = d.orgId, d.teamId
if s.OrgId == ckdb.INVALID_ORG_ID || s.TeamID == ckdb.INVALID_TEAM_ID {
s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(vtapId)
}
s.L3EpcID = d.platformData.QueryVtapEpc0(vtapId)

var info *grpc.Info
Expand Down Expand Up @@ -403,6 +411,7 @@ func (d *Decoder) handleResourceEvent(event *eventapi.ResourceEvent) {
} else if ingestercommon.IsPodServiceIP(flow_metrics.DeviceType(s.L3DeviceType), s.PodID, 0) {
s.ServiceID = d.platformData.QueryService(s.PodID, s.PodNodeID, uint32(s.PodClusterID), s.PodGroupID, s.L3EpcID, !s.IsIPv4, s.IP4, s.IP6, 0, 0)
}

s.AutoServiceID, s.AutoServiceType =
ingestercommon.GetAutoService(
s.ServiceID,
Expand Down
6 changes: 6 additions & 0 deletions server/ingester/event/decoder/k8s_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
pb "github.com/deepflowio/deepflow/message/k8s_event"
ingestercommon "github.com/deepflowio/deepflow/server/ingester/common"
"github.com/deepflowio/deepflow/server/ingester/event/dbwriter"
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/codec"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
"github.com/deepflowio/deepflow/server/libs/grpc"
Expand Down Expand Up @@ -113,6 +114,11 @@ func (d *Decoder) WriteK8sEvent(vtapId uint16, e *pb.KubernetesEvent) {
s.AutoInstanceID, s.AutoInstanceType = ingestercommon.GetAutoInstance(s.PodID, s.GProcessID, s.PodNodeID, s.L3DeviceID, uint8(s.L3DeviceType), s.L3EpcID)
s.AutoServiceID, s.AutoServiceType = ingestercommon.GetAutoService(s.ServiceID, s.PodGroupID, s.GProcessID, uint32(s.PodClusterID), s.L3DeviceID, uint8(s.L3DeviceType), podGroupType, s.L3EpcID)

s.OrgId, s.TeamID = d.orgId, d.teamId
if s.OrgId == ckdb.INVALID_ORG_ID || s.TeamID == ckdb.INVALID_TEAM_ID {
s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(vtapId)
}

d.eventWriter.Write(s)
}

Expand Down
15 changes: 12 additions & 3 deletions server/ingester/ext_metrics/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Decoder struct {
vtapIDToUniversalTag map[uint16]*flow_metrics.UniversalTag
platformDataVersion uint64

orgId, teamId uint16

counter *Counter
utils.Closable
}
Expand Down Expand Up @@ -123,6 +125,7 @@ func (d *Decoder) Run() {
continue
}
decoder.Init(recvBytes.Buffer[recvBytes.Begin:recvBytes.End])
d.orgId, d.teamId = uint16(recvBytes.OrgID), uint16(recvBytes.TeamID)
if d.msgType == datatype.MESSAGE_TYPE_TELEGRAF {
d.handleTelegraf(recvBytes.VtapID, decoder)
} else if d.msgType == datatype.MESSAGE_TYPE_DFSTATS || d.msgType == datatype.MESSAGE_TYPE_SERVER_DFSTATS {
Expand Down Expand Up @@ -217,8 +220,11 @@ func (d *Decoder) StatsToExtMetrics(vtapID uint16, s *pb.Stats) *dbwriter.ExtMet
// from deepflow_server, OrgId set default
if vtapID == 0 {
m.OrgId, m.TeamID = ckdb.DEFAULT_ORG_ID, ckdb.DEFAULT_TEAM_ID
} else { // from deepflow_agent, OrgId get from vtapID
m.OrgId, m.TeamID = grpc.QueryVtapOrgAndTeamID(vtapID)
} else { // from deepflow_agent, OrgId Get from header first, then from vtapID
m.OrgId, m.TeamID = d.orgId, d.teamId
if m.OrgId == ckdb.INVALID_ORG_ID || m.TeamID == ckdb.INVALID_TEAM_ID {
m.OrgId, m.TeamID = grpc.QueryVtapOrgAndTeamID(vtapID)
}
}
}
return m
Expand Down Expand Up @@ -335,7 +341,10 @@ func (d *Decoder) PointToExtMetrics(vtapID uint16, point models.Point) (*dbwrite
m.MsgType = datatype.MESSAGE_TYPE_TELEGRAF
tableName := string(point.Name())
m.VTableName = VTABLE_PREFIX_TELEGRAF + tableName
m.OrgId, m.TeamID = d.platformData.QueryVtapOrgAndTeamID(vtapID)
m.OrgId, m.TeamID = d.orgId, d.teamId
if m.OrgId == ckdb.INVALID_ORG_ID || m.TeamID == ckdb.INVALID_TEAM_ID {
m.OrgId, m.TeamID = d.platformData.QueryVtapOrgAndTeamID(vtapID)
}
podName := ""
for _, tag := range point.Tags() {
tagName := string(tag.Key)
Expand Down
30 changes: 17 additions & 13 deletions server/ingester/flow_log/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type Decoder struct {
cfg *config.Config
debugEnabled bool

agentId, orgId, teamId uint16

fieldsBuf []interface{}
fieldValuesBuf []interface{}
counter *Counter
Expand Down Expand Up @@ -155,18 +157,20 @@ func (d *Decoder) Run() {
log.Warning("get decode queue data type wrong")
continue
}

decoder.Init(recvBytes.Buffer[recvBytes.Begin:recvBytes.End])
d.agentId, d.orgId, d.teamId = recvBytes.VtapID, uint16(recvBytes.OrgID), uint16(recvBytes.TeamID)
switch d.msgType {
case datatype.MESSAGE_TYPE_PROTOCOLLOG:
d.handleProtoLog(decoder)
case datatype.MESSAGE_TYPE_TAGGEDFLOW:
d.handleTaggedFlow(decoder, pbTaggedFlow)
case datatype.MESSAGE_TYPE_OPENTELEMETRY:
d.handleOpenTelemetry(recvBytes.VtapID, decoder, pbTracesData, false)
d.handleOpenTelemetry(decoder, pbTracesData, false)
case datatype.MESSAGE_TYPE_OPENTELEMETRY_COMPRESSED:
d.handleOpenTelemetry(recvBytes.VtapID, decoder, pbTracesData, true)
d.handleOpenTelemetry(decoder, pbTracesData, true)
case datatype.MESSAGE_TYPE_PACKETSEQUENCE:
d.handleL4Packet(recvBytes.VtapID, decoder)
d.handleL4Packet(decoder)
default:
log.Warningf("unknown msg type: %d", d.msgType)

Expand Down Expand Up @@ -220,7 +224,7 @@ func decompressOpenTelemetry(compressed []byte) ([]byte, error) {
return ioutil.ReadAll(reader)
}

func (d *Decoder) handleOpenTelemetry(vtapID uint16, decoder *codec.SimpleDecoder, pbTracesData *v1.TracesData, compressed bool) {
func (d *Decoder) handleOpenTelemetry(decoder *codec.SimpleDecoder, pbTracesData *v1.TracesData, compressed bool) {
var err error
for !decoder.IsEnd() {
pbTracesData.Reset()
Expand All @@ -240,16 +244,16 @@ func (d *Decoder) handleOpenTelemetry(vtapID uint16, decoder *codec.SimpleDecode
d.counter.ErrorCount++
return
}
d.sendOpenMetetry(vtapID, pbTracesData)
d.sendOpenMetetry(pbTracesData)
}
}

func (d *Decoder) sendOpenMetetry(vtapID uint16, tracesData *v1.TracesData) {
func (d *Decoder) sendOpenMetetry(tracesData *v1.TracesData) {
if d.debugEnabled {
log.Debugf("decoder %d vtap %d recv otel: %s", d.index, vtapID, tracesData)
log.Debugf("decoder %d vtap %d recv otel: %s", d.index, d.agentId, tracesData)
}
d.counter.Count++
ls := log_data.OTelTracesDataToL7FlowLogs(vtapID, tracesData, d.platformData, d.cfg)
ls := log_data.OTelTracesDataToL7FlowLogs(d.agentId, d.orgId, d.teamId, tracesData, d.platformData, d.cfg)
for _, l := range ls {
l.AddReferenceCount()
if !d.throttler.SendWithThrottling(l) {
Expand All @@ -263,9 +267,9 @@ func (d *Decoder) sendOpenMetetry(vtapID uint16, tracesData *v1.TracesData) {
}
}

func (d *Decoder) handleL4Packet(vtapID uint16, decoder *codec.SimpleDecoder) {
func (d *Decoder) handleL4Packet(decoder *codec.SimpleDecoder) {
for !decoder.IsEnd() {
l4Packet, err := log_data.DecodePacketSequence(decoder, vtapID)
l4Packet, err := log_data.DecodePacketSequence(d.agentId, d.orgId, d.teamId, decoder)
if decoder.Failed() || err != nil {
if d.counter.ErrorCount == 0 {
log.Errorf("packet sequence decode failed, offset=%d len=%d, err: %s", decoder.Offset(), len(decoder.Bytes()), err)
Expand All @@ -276,7 +280,7 @@ func (d *Decoder) handleL4Packet(vtapID uint16, decoder *codec.SimpleDecoder) {
}

if d.debugEnabled {
log.Debugf("decoder %d vtap %d recv l4 packet: %s", d.index, vtapID, l4Packet)
log.Debugf("decoder %d vtap %d recv l4 packet: %s", d.index, d.agentId, l4Packet)
}
d.counter.Count++
d.throttler.SendWithoutThrottling(l4Packet)
Expand All @@ -288,7 +292,7 @@ func (d *Decoder) sendFlow(flow *pb.TaggedFlow) {
log.Debugf("decoder %d recv flow: %s", d.index, flow)
}
d.counter.Count++
l := log_data.TaggedFlowToL4FlowLog(flow, d.platformData)
l := log_data.TaggedFlowToL4FlowLog(d.orgId, d.teamId, flow, d.platformData)

if l.HitPcapPolicy() {
d.export(l)
Expand All @@ -315,7 +319,7 @@ func (d *Decoder) sendProto(proto *pb.AppProtoLogsData) {
log.Debugf("decoder %d recv proto: %s", d.index, proto)
}

l := log_data.ProtoLogToL7FlowLog(proto, d.platformData, d.cfg)
l := log_data.ProtoLogToL7FlowLog(d.orgId, d.teamId, proto, d.platformData, d.cfg)
l.AddReferenceCount()
sent := d.throttler.SendWithThrottling(l)
if sent {
Expand Down
8 changes: 5 additions & 3 deletions server/ingester/flow_log/log_data/l4_flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,9 @@ func (k *KnowledgeGraph) fill(
k.AutoInstanceID1, k.AutoInstanceType1 = common.GetAutoInstance(k.PodID1, gpID1, k.PodNodeID1, k.L3DeviceID1, k.L3DeviceType1, k.L3EpcID1)
k.AutoServiceID1, k.AutoServiceType1 = common.GetAutoService(k.ServiceID1, k.PodGroupID1, gpID1, uint32(k.PodClusterID1), k.L3DeviceID1, k.L3DeviceType1, k.PodGroupType1, k.L3EpcID1)

k.OrgId, k.TeamID = platformData.QueryVtapOrgAndTeamID(vtapId)

if k.OrgId == ckdb.INVALID_ORG_ID || k.TeamID == ckdb.INVALID_TEAM_ID {
k.OrgId, k.TeamID = platformData.QueryVtapOrgAndTeamID(vtapId)
}
}

func (k *KnowledgeGraph) FillL4(f *pb.Flow, isIPv6 bool, platformData *grpc.PlatformInfoTable) {
Expand Down Expand Up @@ -1070,10 +1071,11 @@ func genID(time uint32, counter *uint32, analyzerID uint32) uint64 {
return uint64(time)<<32 | uint64(analyzerID&0x3ff)<<22 | (uint64(count) & 0x3fffff)
}

func TaggedFlowToL4FlowLog(f *pb.TaggedFlow, platformData *grpc.PlatformInfoTable) *L4FlowLog {
func TaggedFlowToL4FlowLog(orgId, teamId uint16, f *pb.TaggedFlow, platformData *grpc.PlatformInfoTable) *L4FlowLog {
isIPV6 := f.Flow.EthType == uint32(layers.EthernetTypeIPv6)

s := AcquireL4FlowLog()
s.OrgId, s.TeamID = orgId, teamId
s._id = genID(uint32(f.Flow.EndTime/uint64(time.Second)), &L4FlowCounter, platformData.QueryAnalyzerID())
s.DataLinkLayer.Fill(f.Flow)
s.NetworkLayer.Fill(f.Flow, isIPV6)
Expand Down
8 changes: 6 additions & 2 deletions server/ingester/flow_log/log_data/l4_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func ReleaseL4Packet(l *L4Packet) {
poolL4Packet.Put(l)
}

func DecodePacketSequence(decoder *codec.SimpleDecoder, vtapID uint16) (*L4Packet, error) {
func DecodePacketSequence(vtapID, orgId, teamId uint16, decoder *codec.SimpleDecoder) (*L4Packet, error) {
l4Packet := AcquireL4Packet()
l4Packet.VtapID = vtapID
blockSize := decoder.ReadU32()
Expand All @@ -111,7 +111,11 @@ func DecodePacketSequence(decoder *codec.SimpleDecoder, vtapID uint16) (*L4Packe
l4Packet.StartTime = l4Packet.EndTime - 5*US_TO_S_DEVISOR
l4Packet.PacketCount = uint32(endTimePacketCount >> 56)
l4Packet.PacketBatch = append(l4Packet.PacketBatch, decoder.ReadBytesN(int(blockSize)-BLOCK_HEAD_SIZE)...)
l4Packet.OrgId, l4Packet.TeamID = grpc.QueryVtapOrgAndTeamID(vtapID)

l4Packet.OrgId, l4Packet.TeamID = orgId, teamId
if orgId == ckdb.INVALID_ORG_ID || teamId == ckdb.INVALID_TEAM_ID {
l4Packet.OrgId, l4Packet.TeamID = grpc.QueryVtapOrgAndTeamID(vtapID)
}

return l4Packet, nil
}
3 changes: 2 additions & 1 deletion server/ingester/flow_log/log_data/l7_flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,9 @@ func ReleaseL7FlowLog(l *L7FlowLog) {

var L7FlowLogCounter uint32

func ProtoLogToL7FlowLog(l *pb.AppProtoLogsData, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog {
func ProtoLogToL7FlowLog(orgId, teamId uint16, l *pb.AppProtoLogsData, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog {
h := AcquireL7FlowLog()
h.OrgId, h.TeamID = orgId, teamId
h._id = genID(uint32(l.Base.EndTime/uint64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID())
h.Fill(l, platformData, cfg)
return h
Expand Down
8 changes: 4 additions & 4 deletions server/ingester/flow_log/log_data/otel_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

func OTelTracesDataToL7FlowLogs(vtapID uint16, l *v1.TracesData, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*L7FlowLog {
func OTelTracesDataToL7FlowLogs(vtapID, orgId, teamId uint16, l *v1.TracesData, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) []*L7FlowLog {
ret := []*L7FlowLog{}
for _, resourceSpan := range l.GetResourceSpans() {
var resAttributes []*v11.KeyValue
Expand All @@ -46,17 +46,17 @@ func OTelTracesDataToL7FlowLogs(vtapID uint16, l *v1.TracesData, platformData *g
}
for _, scopeSpan := range resourceSpan.GetScopeSpans() {
for _, span := range scopeSpan.GetSpans() {
ret = append(ret, spanToL7FlowLog(vtapID, span, resAttributes, platformData, cfg))
ret = append(ret, spanToL7FlowLog(vtapID, orgId, teamId, span, resAttributes, platformData, cfg))
}
}
}
return ret
}

func spanToL7FlowLog(vtapID uint16, span *v1.Span, resAttributes []*v11.KeyValue, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog {
func spanToL7FlowLog(vtapID, orgId, teamId uint16, span *v1.Span, resAttributes []*v11.KeyValue, platformData *grpc.PlatformInfoTable, cfg *flowlogCfg.Config) *L7FlowLog {
h := AcquireL7FlowLog()
h._id = genID(uint32(span.EndTimeUnixNano/uint64(time.Second)), &L7FlowLogCounter, platformData.QueryAnalyzerID())
h.VtapID = vtapID
h.VtapID, h.OrgId, h.TeamID = vtapID, orgId, teamId
h.FillOTel(span, resAttributes, platformData, cfg)
return h
}
Expand Down
4 changes: 3 additions & 1 deletion server/ingester/flow_metrics/unmarshaller/handle_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func DocumentExpand(doc app.Document, platformData *grpc.PlatformInfoTable) erro
t := doc.Tags()
t.SetID("") // 由于需要修改Tag增删Field,清空ID避免字段脏

t.OrgId, t.TeamID = platformData.QueryVtapOrgAndTeamID(t.VTAPID)
if t.OrgId == ckdb.INVALID_ORG_ID || t.TeamID == ckdb.INVALID_TEAM_ID {
t.OrgId, t.TeamID = platformData.QueryVtapOrgAndTeamID(t.VTAPID)
}
// vtap_acl 分钟级数据不用填充
if doc.Meter().ID() == flow_metrics.ACL_ID &&
t.DatabaseSuffixID() == 1 { // 只有acl后缀
Expand Down
2 changes: 2 additions & 0 deletions server/ingester/flow_metrics/unmarshaller/unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ func (u *Unmarshaller) QueueProcess() {
log.Warningf("Decode failed, bytes len=%d err=%s", len([]byte(bytes)), err)
break
}
doc.Tags().TeamID = uint16(recvBytes.TeamID)
doc.Tags().OrgId = uint16(recvBytes.OrgID)
u.isGoodDocument(int64(doc.Time()))

// 秒级数据是否写入
Expand Down
Loading

0 comments on commit 8e8ff7e

Please sign in to comment.