diff --git a/server/ingester/ckissu/ckissu.go b/server/ingester/ckissu/ckissu.go index 369a16f34ff6..01f242742a7a 100644 --- a/server/ingester/ckissu/ckissu.go +++ b/server/ingester/ckissu/ckissu.go @@ -859,9 +859,10 @@ func getColumnRenames(columnRenamess []*ColumnRenames) []*ColumnRename { return renames } -func (i *Issu) renameColumns(connect *sql.DB) ([]*ColumnRename, error) { +func (i *Issu) renameColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnRename, error) { dones := []*ColumnRename{} for _, renameColumn := range i.columnRenames { + renameColumn.Db = getOrgDatabase(renameColumn.Db, orgIDPrefix) version, err := i.getTableVersion(connect, renameColumn.Db, renameColumn.Table) if err != nil { if strings.Contains(err.Error(), "doesn't exist") { @@ -883,9 +884,10 @@ func (i *Issu) renameColumns(connect *sql.DB) ([]*ColumnRename, error) { return dones, nil } -func (i *Issu) modColumns(connect *sql.DB) ([]*ColumnMod, error) { +func (i *Issu) modColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnMod, error) { dones := []*ColumnMod{} for _, modColumn := range i.columnMods { + modColumn.Db = getOrgDatabase(modColumn.Db, orgIDPrefix) version, err := i.getTableVersion(connect, modColumn.Db, modColumn.Table) if err != nil { return dones, err @@ -902,9 +904,10 @@ func (i *Issu) modColumns(connect *sql.DB) ([]*ColumnMod, error) { return dones, nil } -func (i *Issu) dropColumns(connect *sql.DB) ([]*ColumnDrop, error) { +func (i *Issu) dropColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnDrop, error) { dones := []*ColumnDrop{} for _, dropColumn := range i.columnDrops { + dropColumn.Db = getOrgDatabase(dropColumn.Db, orgIDPrefix) version, err := i.getTableVersion(connect, dropColumn.Db, dropColumn.Table) if err != nil { return dones, err @@ -920,8 +923,9 @@ func (i *Issu) dropColumns(connect *sql.DB) ([]*ColumnDrop, error) { return dones, nil } -func (i *Issu) modTableTTLs(connect *sql.DB) error { +func (i *Issu) modTableTTLs(connect *sql.DB, orgIDPrefix string) error { for _, modTTL := range i.modTTLs { + modTTL.Db = getOrgDatabase(modTTL.Db, orgIDPrefix) version, err := i.getTableVersion(connect, modTTL.Db, modTTL.Table) if err != nil { log.Error(err) @@ -1012,9 +1016,10 @@ func getColumnDatasourceAdds(columnDatasourceAddss []*ColumnDatasourceAdds) []*C return adds } -func (i *Issu) addColumns(connect *sql.DB) ([]*ColumnAdd, error) { +func (i *Issu) addColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnAdd, error) { dones := []*ColumnAdd{} for _, add := range i.columnAdds { + add.Db = getOrgDatabase(add.Db, orgIDPrefix) version, err := i.getTableVersion(connect, add.Db, add.Table) if err != nil { return dones, err @@ -1032,7 +1037,7 @@ func (i *Issu) addColumns(connect *sql.DB) ([]*ColumnAdd, error) { for _, tableName := range []string{ flow_metrics.NETWORK_1M.TableName(), flow_metrics.NETWORK_MAP_1M.TableName(), flow_metrics.APPLICATION_1M.TableName(), flow_metrics.APPLICATION_MAP_1M.TableName()} { - datasourceInfos, err := i.getUserDefinedDatasourceInfos(connect, ckdb.METRICS_DB, strings.Split(tableName, ".")[0]) + datasourceInfos, err := i.getUserDefinedDatasourceInfos(connect, getOrgDatabase(ckdb.METRICS_DB, orgIDPrefix), strings.Split(tableName, ".")[0]) if err != nil { log.Warning(err) continue @@ -1049,9 +1054,10 @@ func (i *Issu) addColumns(connect *sql.DB) ([]*ColumnAdd, error) { return dones, nil } -func (i *Issu) addIndexs(connect *sql.DB) ([]*IndexAdd, error) { +func (i *Issu) addIndexs(connect *sql.DB, orgIDPrefix string) ([]*IndexAdd, error) { dones := []*IndexAdd{} for _, add := range i.indexAdds { + add.Db = getOrgDatabase(add.Db, orgIDPrefix) version, err := i.getTableVersion(connect, add.Db, add.Table) if err != nil { return dones, err @@ -1069,63 +1075,121 @@ func (i *Issu) addIndexs(connect *sql.DB) ([]*IndexAdd, error) { return dones, nil } -func (i *Issu) Start() error { - connects := i.Connections - if len(connects) == 0 { - return fmt.Errorf("connections is nil") +func isNumeric(s string) bool { + for _, ch := range s { + if ch < '0' || ch > '9' { + return false + } } - for _, connect := range connects { - renames, errRenames := i.renameColumns(connect) - if errRenames != nil { - return errRenames + return true +} + +func getOrgDatabase(db string, orgIDPrefix string) string { + if len(db) > 5 && db[4] == '_' && isNumeric(db[:4]) { + return orgIDPrefix + db[5:] + } + return orgIDPrefix + db +} + +func (i *Issu) startOrg(connect *sql.DB, orgIDPrefix string) error { + renames, errRenames := i.renameColumns(connect, orgIDPrefix) + if errRenames != nil { + return errRenames + } + mods, errMods := i.modColumns(connect, orgIDPrefix) + if errMods != nil { + return errMods + } + + adds, errAdds := i.addColumns(connect, orgIDPrefix) + if errAdds != nil { + return errAdds + } + + addIndexs, errAddIndexs := i.addIndexs(connect, orgIDPrefix) + if errAddIndexs != nil { + log.Warning(errAddIndexs) + } + + drops, errDrops := i.dropColumns(connect, orgIDPrefix) + if errDrops != nil { + return errDrops + } + + for _, cr := range renames { + if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { + return err } - mods, errMods := i.modColumns(connect) - if errMods != nil { - return errMods + } + for _, cr := range mods { + if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { + return err } - - adds, errAdds := i.addColumns(connect) - if errAdds != nil { - return errAdds + } + for _, cr := range adds { + if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { + return err } - - addIndexs, errAddIndexs := i.addIndexs(connect) - if errAddIndexs != nil { - log.Warning(errAddIndexs) + } + for _, cr := range addIndexs { + if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { + return err } - - drops, errDrops := i.dropColumns(connect) - if errDrops != nil { - return errDrops + } + for _, cr := range drops { + if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { + return err } + } + go i.modTableTTLs(connect, orgIDPrefix) + return nil +} - for _, cr := range renames { - if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { - return err - } - } - for _, cr := range mods { - if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { - return err - } +func (i *Issu) getOrgIDPrefixs(connect *sql.DB) ([]string, error) { + checkOrgDatabase := "flow_metrics" + sql := fmt.Sprintf("SELECT name FROM system.databases WHERE name like '%%%s%%'", checkOrgDatabase) + rows, err := connect.Query(sql) + if err != nil { + return nil, err + } + log.Info("get orgs sql: ", sql) + var db string + orgIDPrefixs := []string{} + for rows.Next() { + err := rows.Scan(&db) + if err != nil { + return nil, err } - for _, cr := range adds { - if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { - return err - } + + if len(db) > 5 && db[4] == '_' && isNumeric(db[:4]) { + orgIDPrefixs = append(orgIDPrefixs, db[:5]) + } else if db == checkOrgDatabase { + orgIDPrefixs = append(orgIDPrefixs, "") } - for _, cr := range addIndexs { - if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { - return err - } + } + return orgIDPrefixs, nil + +} + +func (i *Issu) Start() error { + connects := i.Connections + if len(connects) == 0 { + return fmt.Errorf("connections is nil") + } + + for _, connect := range connects { + orgIDPrefixs, err := i.getOrgIDPrefixs(connect) + if err != nil { + return fmt.Errorf("get orgIDs failed, err: %s", err) } - for _, cr := range drops { - if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil { - return err + for _, orgIDPrefix := range orgIDPrefixs { + err := i.startOrg(connect, orgIDPrefix) + if err != nil { + log.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, err) } } - go i.modTableTTLs(connect) } + return nil } diff --git a/server/ingester/ckissu/updates.go b/server/ingester/ckissu/updates.go index 11f77964c7bc..c3b1e30c388f 100644 --- a/server/ingester/ckissu/updates.go +++ b/server/ingester/ckissu/updates.go @@ -130,6 +130,65 @@ var ColumnAdd65 = []*ColumnAdds{ ColumnNames: []string{"_id"}, ColumnType: ckdb.UInt64, }, + + { + Dbs: []string{"flow_metrics"}, + Tables: []string{"application.1m", "application.1m_local", "application_map.1m", "application_map.1m_local", + "application.1s", "application.1s_local", "application_map.1s", "application_map.1s_local", "traffic_policy.1m", "traffic_policy.1m_local"}, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, + { + Dbs: []string{"flow_log"}, + Tables: []string{"l4_flow_log", "l4_flow_log_local", "l7_flow_log_local", "l7_flow_log", "l4_packet_local", "l4_packet", "l7_packet_local", "l7_packet"}, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, + { + Dbs: []string{"event"}, + Tables: []string{"event_local", "event", "alarm_event_local", "alarm_event", "perf_event", "perf_event_local"}, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, + { + Dbs: []string{"ext_metrics"}, + Tables: []string{"metrics_local", "metrics"}, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, + { + Dbs: []string{"profile"}, + Tables: []string{"in_process_local", "in_process"}, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, + { + Dbs: []string{"prometheus"}, + Tables: []string{"samples_local", "samples"}, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, + { + Dbs: []string{"deepflow_system"}, + Tables: []string{"deepflow_system_local", "deepflow_system"}, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, + { + Dbs: []string{"flow_tag"}, + Tables: []string{ + "deepflow_system_custom_field_local", "deepflow_system_custom_field", + "deepflow_system_custom_field_value_local", "deepflow_system_custom_field_value", + "event_custom_field_local", "event_custom_field_local", + "event_custom_field_value_local", "event_custom_field_value", + "ext_metrics_custom_field_local", "ext_metrics_custom_field", + "ext_metrics_custom_field_value_local", "ext_metrics_custom_field_value", + "flow_log_custom_field_local", "flow_log_custom_field", + "flow_log_custom_field_value_local", "flow_log_custom_field_value", + }, + ColumnNames: []string{"team_id"}, + ColumnType: ckdb.UInt16, + }, } var TableRenames65 = []*TableRename{ diff --git a/server/ingester/common/const.go b/server/ingester/common/const.go index 214a0796fa5d..a05e30ed1059 100644 --- a/server/ingester/common/const.go +++ b/server/ingester/common/const.go @@ -17,5 +17,5 @@ package common const ( - CK_VERSION = "v6.5.1.6" // 用于表示clickhouse的表版本号 + CK_VERSION = "v6.5.3.1" // 用于表示clickhouse的表版本号 ) diff --git a/server/ingester/event/dbwriter/alarm_event_writer.go b/server/ingester/event/dbwriter/alarm_event_writer.go index 417893c4685c..1715bf539e32 100644 --- a/server/ingester/event/dbwriter/alarm_event_writer.go +++ b/server/ingester/event/dbwriter/alarm_event_writer.go @@ -70,6 +70,8 @@ type AlarmEventStore struct { PolicyThresholdCritical string PolicyThresholdError string PolicyThresholdWarning string + OrgId uint16 + TeamID uint16 } func AlarmEventColumns() []*ckdb.Column { @@ -102,6 +104,7 @@ func AlarmEventColumns() []*ckdb.Column { ckdb.NewColumn("policy_threshold_critical", ckdb.String), ckdb.NewColumn("policy_threshold_error", ckdb.String), ckdb.NewColumn("policy_threshold_warning", ckdb.String), + ckdb.NewColumn("team_id", ckdb.UInt16), } } @@ -134,6 +137,7 @@ func (e *AlarmEventStore) WriteBlock(block *ckdb.Block) { e.PolicyThresholdCritical, e.PolicyThresholdError, e.PolicyThresholdWarning, + e.TeamID, ) } @@ -141,6 +145,10 @@ func (e *AlarmEventStore) Release() { ReleaseAlarmEventStore(e) } +func (e *AlarmEventStore) OrgID() uint16 { + return e.OrgId +} + func GenAlarmEventCKTable(cluster, storagePolicy string, ttl int, coldStorage *ckdb.ColdStorage) *ckdb.Table { table := common.ALARM_EVENT.TableName() timeKey := "time" @@ -180,7 +188,7 @@ func NewAlarmEventWriter(config *config.Config) (*EventWriter, error) { ckTable := GenAlarmEventCKTable(w.ckdbCluster, w.ckdbStoragePolicy, w.ttl, ckdb.GetColdStorage(w.ckdbColdStorages, EVENT_DB, common.ALARM_EVENT.TableName())) ckwriter, err := ckwriter.NewCKWriter(w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword, - common.ALARM_EVENT.TableName(), config.Base.CKDB.TimeZone, ckTable, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout) + common.ALARM_EVENT.TableName(), config.Base.CKDB.TimeZone, ckTable, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, config.Base.CKDB.Watcher) if err != nil { return nil, err } diff --git a/server/ingester/event/dbwriter/event.go b/server/ingester/event/dbwriter/event.go index 5d1aab99fc69..d129ba61c813 100644 --- a/server/ingester/event/dbwriter/event.go +++ b/server/ingester/event/dbwriter/event.go @@ -76,6 +76,9 @@ type EventStore struct { IP4 uint32 IP6 net.IP + OrgId uint16 //no need to store + TeamID uint16 + AutoInstanceID uint32 AutoInstanceType uint8 AutoServiceID uint32 @@ -126,6 +129,7 @@ func (e *EventStore) WriteBlock(block *ckdb.Block) { block.WriteIPv6(e.IP6) block.Write( + e.TeamID, e.AutoInstanceID, e.AutoInstanceType, e.AutoServiceID, @@ -144,6 +148,10 @@ func (e *EventStore) WriteBlock(block *ckdb.Block) { } } +func (e *EventStore) OrgID() uint16 { + return e.OrgId +} + func (e *EventStore) Table() string { if e.HasMetrics { return common.PERF_EVENT.TableName() @@ -198,6 +206,8 @@ func EventColumns(hasMetrics bool) []*ckdb.Column { ckdb.NewColumn("ip4", ckdb.IPv4), ckdb.NewColumn("ip6", ckdb.IPv6), + ckdb.NewColumn("team_id", ckdb.UInt16).SetComment("团队ID"), + ckdb.NewColumn("auto_instance_id", ckdb.UInt32), ckdb.NewColumn("auto_instance_type", ckdb.UInt8), ckdb.NewColumn("auto_service_id", ckdb.UInt32), @@ -252,6 +262,8 @@ func (e *EventStore) GenerateNewFlowTags(cache *flow_tag.FlowTagCache) { Table: e.Table(), VpcId: e.L3EpcID, PodNsId: e.PodNSID, + OrgId: e.OrgId, + TeamID: e.TeamID, } cache.Fields = cache.Fields[:0] cache.FieldValues = cache.FieldValues[:0] diff --git a/server/ingester/event/dbwriter/event_writer.go b/server/ingester/event/dbwriter/event_writer.go index 2f895b5d8083..a32870780ce9 100644 --- a/server/ingester/event/dbwriter/event_writer.go +++ b/server/ingester/event/dbwriter/event_writer.go @@ -94,7 +94,7 @@ func NewEventWriter(table string, decoderIndex int, config *config.Config) (*Eve ckTable := GenEventCKTable(w.ckdbCluster, w.ckdbStoragePolicy, table, w.ttl, ckdb.GetColdStorage(w.ckdbColdStorages, EVENT_DB, table)) ckwriter, err := ckwriter.NewCKWriter(w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword, - table, config.Base.CKDB.TimeZone, ckTable, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout) + table, config.Base.CKDB.TimeZone, ckTable, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, config.Base.CKDB.Watcher) if err != nil { return nil, err } diff --git a/server/ingester/event/decoder/decoder.go b/server/ingester/event/decoder/decoder.go index 9180f2394e4a..b41930f862ff 100644 --- a/server/ingester/event/decoder/decoder.go +++ b/server/ingester/event/decoder/decoder.go @@ -184,6 +184,7 @@ func (d *Decoder) WritePerfEvent(vtapId uint16, e *pb.ProcEvent) { s.Duration = uint64(s.EndTime - s.StartTime) } s.VTAPID = vtapId + s.OrgId, s.TeamID = d.platformData.QueryVtapOrgAndTeamID(uint32(vtapId)) s.L3EpcID = d.platformData.QueryVtapEpc0(uint32(vtapId)) var info *grpc.Info diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics.go b/server/ingester/ext_metrics/dbwriter/ext_metrics.go index a697d70f4b72..a299ebf42512 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics.go @@ -20,7 +20,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/flow_tag" "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/datatype" - "github.com/deepflowio/deepflow/server/libs/flow-metrics" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" "github.com/deepflowio/deepflow/server/libs/pool" ) @@ -80,6 +80,10 @@ func (m *ExtMetrics) WriteBlock(block *ckdb.Block) { ) } +func (m *ExtMetrics) OrgID() uint16 { + return m.UniversalTag.VTAPID%10 + 1 +} + // Note: The order of append() must be consistent with the order of Write() in WriteBlock. func (m *ExtMetrics) Columns() []*ckdb.Column { columns := []*ckdb.Column{} @@ -146,6 +150,7 @@ func (m *ExtMetrics) GenerateNewFlowTags(cache *flow_tag.FlowTagCache) { Table: tableName, VpcId: m.UniversalTag.L3EpcID, PodNsId: m.UniversalTag.PodNSID, + VtapId: m.UniversalTag.VTAPID, } cache.Fields = cache.Fields[:0] cache.FieldValues = cache.FieldValues[:0] diff --git a/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go b/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go index 7a7f0a5678d1..75f7ec80b126 100644 --- a/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go +++ b/server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go @@ -124,19 +124,12 @@ func (w *ExtMetricsWriter) getOrCreateCkwriter(s *ExtMetrics) (*ckwriter.CKWrite ckwriter, err := ckwriter.NewCKWriter( w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword, fmt.Sprintf("%s-%s-%d", w.msgType, s.TableName(), w.decoderIndex), w.ckdbTimeZone, - table, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout) + table, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, w.ckdbWatcher) if err != nil { return nil, err } - // 需要在cluseter其他节点也创建 - if err := w.createTableOnCluster(table); err != nil { - log.Warningf("crate table on cluster other node failed. %s", err) - } ckwriter.Run() - if w.ttl != config.DefaultExtMetricsTTL { - w.setTTL(s.DatabaseName(), s.TableName()) - } w.tables[s.TableName()] = &tableInfo{ tableName: s.TableName(), @@ -146,26 +139,6 @@ func (w *ExtMetricsWriter) getOrCreateCkwriter(s *ExtMetrics) (*ckwriter.CKWrite return ckwriter, nil } -func (w *ExtMetricsWriter) createTableOnCluster(table *ckdb.Table) error { - // in standalone mode, ckdbWatcher will be nil - if w.ckdbWatcher == nil { - return nil - } - endpoints, err := w.ckdbWatcher.GetClickhouseEndpointsWithoutMyself() - if err != nil { - return err - } - for _, endpoint := range endpoints { - err := ckwriter.InitTable(fmt.Sprintf("%s:%d", endpoint.Host, endpoint.Port), w.ckdbUsername, w.ckdbPassword, w.ckdbTimeZone, table) - if err != nil { - log.Warningf("node %s:%d init table failed. err: %s", endpoint.Host, endpoint.Port, err) - } else { - log.Infof("node %s:%d init table %s success", endpoint.Host, endpoint.Port, table.LocalName) - } - } - return nil -} - func (w *ExtMetricsWriter) getClusterNodesWithoutLocal(clusterName string) ([]ClusterNode, error) { sql := fmt.Sprintf("SELECT host_address,port,is_local FROM system.clusters WHERE cluster='%s'", clusterName) log.Info(sql) diff --git a/server/ingester/flow_log/dbwriter/ck_writer.go b/server/ingester/flow_log/dbwriter/ck_writer.go index 19e6ac887b75..982c702ac1c6 100644 --- a/server/ingester/flow_log/dbwriter/ck_writer.go +++ b/server/ingester/flow_log/dbwriter/ck_writer.go @@ -82,14 +82,14 @@ func GetFlowLogTables(engine ckdb.EngineType, cluster, storagePolicy string, l4L } } -func NewFlowLogWriter(addrs []string, user, password, cluster, storagePolicy, timeZone string, ckWriterCfg config.CKWriterConfig, flowLogTtl flowlogconfig.FlowLogTTL, coldStorages map[string]*ckdb.ColdStorage) (*FlowLogWriter, error) { +func NewFlowLogWriter(addrs []string, user, password, cluster, storagePolicy, timeZone string, ckWriterCfg config.CKWriterConfig, flowLogTtl flowlogconfig.FlowLogTTL, coldStorages map[string]*ckdb.ColdStorage, ckdbWatcher *config.Watcher) (*FlowLogWriter, error) { ckwriters := make([]*ckwriter.CKWriter, common.FLOWLOG_ID_MAX) var err error tables := GetFlowLogTables(ckdb.MergeTree, cluster, storagePolicy, flowLogTtl.L4FlowLog, flowLogTtl.L7FlowLog, flowLogTtl.L4Packet, coldStorages) for i, table := range tables { counterName := common.FlowLogID(table.ID).String() ckwriters[i], err = ckwriter.NewCKWriter(addrs, user, password, counterName, timeZone, table, - ckWriterCfg.QueueCount, ckWriterCfg.QueueSize, ckWriterCfg.BatchSize, ckWriterCfg.FlushTimeout) + ckWriterCfg.QueueCount, ckWriterCfg.QueueSize, ckWriterCfg.BatchSize, ckWriterCfg.FlushTimeout, ckdbWatcher) if err != nil { log.Error(err) return nil, err diff --git a/server/ingester/flow_log/decoder/decoder.go b/server/ingester/flow_log/decoder/decoder.go index ceae05ea7512..1be7fa488b02 100644 --- a/server/ingester/flow_log/decoder/decoder.go +++ b/server/ingester/flow_log/decoder/decoder.go @@ -262,7 +262,7 @@ func (d *Decoder) sendOpenMetetry(vtapID uint16, tracesData *v1.TracesData) { func (d *Decoder) handleL4Packet(vtapID uint16, decoder *codec.SimpleDecoder) { for !decoder.IsEnd() { - l4Packet, err := log_data.DecodePacketSequence(decoder, vtapID) + l4Packet, err := log_data.DecodePacketSequence(decoder, vtapID, d.platformData) if decoder.Failed() || err != nil { if d.counter.ErrorCount == 0 { log.Errorf("packet sequence decode failed, offset=%d len=%d, err: %s", decoder.Offset(), len(decoder.Bytes()), err) diff --git a/server/ingester/flow_log/flow_log/flow_log.go b/server/ingester/flow_log/flow_log/flow_log.go index ae6519a0028f..6101e81d13ae 100644 --- a/server/ingester/flow_log/flow_log/flow_log.go +++ b/server/ingester/flow_log/flow_log/flow_log.go @@ -86,7 +86,7 @@ func NewFlowLog(config *config.Config, recv *receiver.Receiver, platformDataMana flowLogWriter, err := dbwriter.NewFlowLogWriter( config.Base.CKDB.ActualAddrs, config.Base.CKDBAuth.Username, config.Base.CKDBAuth.Password, config.Base.CKDB.ClusterName, config.Base.CKDB.StoragePolicy, config.Base.CKDB.TimeZone, - config.CKWriterConfig, config.FlowLogTTL, config.Base.GetCKDBColdStorages()) + config.CKWriterConfig, config.FlowLogTTL, config.Base.GetCKDBColdStorages(), config.Base.CKDB.Watcher) if err != nil { return nil, err } @@ -105,7 +105,7 @@ func NewFlowLog(config *config.Config, recv *receiver.Receiver, platformDataMana if err != nil { return nil, err } - l4PacketLogger, err := NewLogger(datatype.MESSAGE_TYPE_PACKETSEQUENCE, config, nil, manager, recv, flowLogWriter, common.L4_PACKET_ID, nil) + l4PacketLogger, err := NewLogger(datatype.MESSAGE_TYPE_PACKETSEQUENCE, config, platformDataManager, manager, recv, flowLogWriter, common.L4_PACKET_ID, nil) if err != nil { return nil, err } diff --git a/server/ingester/flow_log/log_data/l4_flow_log.go b/server/ingester/flow_log/log_data/l4_flow_log.go index 2b82381dad75..c8716200e831 100644 --- a/server/ingester/flow_log/log_data/l4_flow_log.go +++ b/server/ingester/flow_log/log_data/l4_flow_log.go @@ -267,6 +267,9 @@ type KnowledgeGraph struct { TagSource0 uint8 TagSource1 uint8 + + OrgId uint16 // no need to store + TeamID uint16 } var KnowledgeGraphColumns = []*ckdb.Column{ @@ -312,6 +315,8 @@ var KnowledgeGraphColumns = []*ckdb.Column{ ckdb.NewColumn("tag_source_0", ckdb.UInt8), ckdb.NewColumn("tag_source_1", ckdb.UInt8), + + ckdb.NewColumn("team_id", ckdb.UInt16), } func (k *KnowledgeGraph) WriteBlock(block *ckdb.Block) { @@ -357,6 +362,7 @@ func (k *KnowledgeGraph) WriteBlock(block *ckdb.Block) { k.TagSource0, k.TagSource1, + k.TeamID, ) } @@ -839,6 +845,9 @@ func (k *KnowledgeGraph) fill( k.AutoInstanceID1, k.AutoInstanceType1 = common.GetAutoInstance(k.PodID1, gpID1, k.PodNodeID1, k.L3DeviceID1, k.L3DeviceType1, k.L3EpcID1) k.AutoServiceID1, k.AutoServiceType1 = common.GetAutoService(k.ServiceID1, k.PodGroupID1, gpID1, k.PodNodeID1, k.L3DeviceID1, k.L3DeviceType1, k.PodGroupType1, k.L3EpcID1) + + k.OrgId, k.TeamID = platformData.QueryVtapOrgAndTeamID(vtapId) + } func (k *KnowledgeGraph) FillL4(f *pb.Flow, isIPv6 bool, platformData *grpc.PlatformInfoTable) { @@ -1001,6 +1010,10 @@ func (f *L4FlowLog) WriteBlock(block *ckdb.Block) { f.Metrics.WriteBlock(block) } +func (f *L4FlowLog) OrgID() uint16 { + return f.KnowledgeGraph.OrgId +} + func (f *L4FlowLog) EndTime() time.Duration { return time.Duration(f.FlowInfo.EndTime) * time.Microsecond } diff --git a/server/ingester/flow_log/log_data/l4_packet.go b/server/ingester/flow_log/log_data/l4_packet.go index dd3016bbba85..896e955e575d 100644 --- a/server/ingester/flow_log/log_data/l4_packet.go +++ b/server/ingester/flow_log/log_data/l4_packet.go @@ -21,6 +21,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/codec" + "github.com/deepflowio/deepflow/server/libs/grpc" "github.com/deepflowio/deepflow/server/libs/pool" "github.com/deepflowio/deepflow/server/libs/utils" ) @@ -32,6 +33,8 @@ type L4Packet struct { EndTime int64 FlowID uint64 VtapID uint16 + orgID uint16 + TeamID uint16 PacketCount uint32 PacketBatch []byte } @@ -43,6 +46,7 @@ func L4PacketColumns() []*ckdb.Column { ckdb.NewColumn("end_time", ckdb.DateTime64us).SetComment("精度: 微秒"), ckdb.NewColumn("flow_id", ckdb.UInt64).SetIndex(ckdb.IndexMinmax), ckdb.NewColumn("agent_id", ckdb.UInt16).SetIndex(ckdb.IndexSet), + ckdb.NewColumn("team_id", ckdb.UInt16).SetIndex(ckdb.IndexSet), ckdb.NewColumn("packet_count", ckdb.UInt32).SetIndex(ckdb.IndexNone), ckdb.NewColumn("packet_batch", ckdb.String).SetIndex(ckdb.IndexNone), } @@ -54,10 +58,15 @@ func (s *L4Packet) WriteBlock(block *ckdb.Block) { block.Write(s.EndTime) block.Write(s.FlowID) block.Write(s.VtapID) + block.Write(s.TeamID) block.Write(s.PacketCount) block.Write(utils.String(s.PacketBatch)) } +func (s *L4Packet) OrgID() uint16 { + return s.orgID +} + func (p *L4Packet) Release() { ReleaseL4Packet(p) } @@ -85,7 +94,7 @@ func ReleaseL4Packet(l *L4Packet) { poolL4Packet.Put(l) } -func DecodePacketSequence(decoder *codec.SimpleDecoder, vtapID uint16) (*L4Packet, error) { +func DecodePacketSequence(decoder *codec.SimpleDecoder, vtapID uint16, platformData *grpc.PlatformInfoTable) (*L4Packet, error) { l4Packet := AcquireL4Packet() l4Packet.VtapID = vtapID blockSize := decoder.ReadU32() @@ -99,6 +108,7 @@ func DecodePacketSequence(decoder *codec.SimpleDecoder, vtapID uint16) (*L4Packe l4Packet.StartTime = l4Packet.EndTime - 5*US_TO_S_DEVISOR l4Packet.PacketCount = uint32(endTimePacketCount >> 56) l4Packet.PacketBatch = append(l4Packet.PacketBatch, decoder.ReadBytesN(int(blockSize)-BLOCK_HEAD_SIZE)...) + l4Packet.orgID, l4Packet.TeamID = platformData.QueryVtapOrgAndTeamID(uint32(vtapID)) return l4Packet, nil } diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index 915900c78eca..9d9bed8ece09 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -338,6 +338,10 @@ func (h *L7FlowLog) WriteBlock(block *ckdb.Block) { ) } +func (h *L7FlowLog) OrgID() uint16 { + return h.KnowledgeGraph.OrgId +} + func base64ToHexString(str string) string { if len(str) < 2 || str[len(str)-1] != '=' { return str @@ -671,6 +675,8 @@ func (h *L7FlowLog) GenerateNewFlowTags(cache *flow_tag.FlowTagCache) { Table: common.L7_FLOW_ID.String(), VpcId: L3EpcIDs[idx], PodNsId: PodNSIDs[idx], + OrgId: h.OrgId, + TeamID: h.TeamID, } for i, name := range attributeNames[:minNamesLen] { diff --git a/server/ingester/flow_metrics/dbwriter/dbwriter.go b/server/ingester/flow_metrics/dbwriter/dbwriter.go index 0b4d5b22976b..ceaaebec2ae9 100644 --- a/server/ingester/flow_metrics/dbwriter/dbwriter.go +++ b/server/ingester/flow_metrics/dbwriter/dbwriter.go @@ -37,7 +37,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/app" "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/datatype/prompb" - "github.com/deepflowio/deepflow/server/libs/flow-metrics" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" "github.com/deepflowio/deepflow/server/libs/pool" "github.com/deepflowio/deepflow/server/libs/queue" "github.com/deepflowio/deepflow/server/libs/stats" @@ -62,7 +62,7 @@ type CkDbWriter struct { ckwriters []*ckwriter.CKWriter } -func NewCkDbWriter(addrs []string, user, password, clusterName, storagePolicy, timeZone string, ckWriterCfg config.CKWriterConfig, flowMetricsTtl flowmetricsconfig.FlowMetricsTTL, coldStorages map[string]*ckdb.ColdStorage) (DbWriter, error) { +func NewCkDbWriter(addrs []string, user, password, clusterName, storagePolicy, timeZone string, ckWriterCfg config.CKWriterConfig, flowMetricsTtl flowmetricsconfig.FlowMetricsTTL, coldStorages map[string]*ckdb.ColdStorage, ckdbWatcher *config.Watcher) (DbWriter, error) { ckwriters := []*ckwriter.CKWriter{} tables := flow_metrics.GetMetricsTables(ckdb.MergeTree, common.CK_VERSION, clusterName, storagePolicy, flowMetricsTtl.VtapFlow1M, flowMetricsTtl.VtapFlow1S, flowMetricsTtl.VtapApp1M, flowMetricsTtl.VtapApp1S, coldStorages) for _, table := range tables { @@ -75,7 +75,7 @@ func NewCkDbWriter(addrs []string, user, password, clusterName, storagePolicy, t counterName = "app_1m" } ckwriter, err := ckwriter.NewCKWriter(addrs, user, password, counterName, timeZone, table, - ckWriterCfg.QueueCount, ckWriterCfg.QueueSize, ckWriterCfg.BatchSize, ckWriterCfg.FlushTimeout) + ckWriterCfg.QueueCount, ckWriterCfg.QueueSize, ckWriterCfg.BatchSize, ckWriterCfg.FlushTimeout, ckdbWatcher) if err != nil { log.Error(err) return nil, err diff --git a/server/ingester/flow_metrics/flow_metrics/flow_metrics.go b/server/ingester/flow_metrics/flow_metrics/flow_metrics.go index ff6ebc6022a0..5d6cfc40f23d 100644 --- a/server/ingester/flow_metrics/flow_metrics/flow_metrics.go +++ b/server/ingester/flow_metrics/flow_metrics/flow_metrics.go @@ -59,7 +59,7 @@ func NewFlowMetrics(cfg *config.Config, recv *receiver.Receiver, platformDataMan var err error var writers []dbwriter.DbWriter ckWriter, err := dbwriter.NewCkDbWriter(cfg.Base.CKDB.ActualAddrs, cfg.Base.CKDBAuth.Username, cfg.Base.CKDBAuth.Password, cfg.Base.CKDB.ClusterName, cfg.Base.CKDB.StoragePolicy, cfg.Base.CKDB.TimeZone, - cfg.CKWriterConfig, cfg.FlowMetricsTTL, cfg.Base.GetCKDBColdStorages()) + cfg.CKWriterConfig, cfg.FlowMetricsTTL, cfg.Base.GetCKDBColdStorages(), cfg.Base.CKDB.Watcher) if err != nil { log.Error(err) return nil, err diff --git a/server/ingester/flow_metrics/unmarshaller/handle_document.go b/server/ingester/flow_metrics/unmarshaller/handle_document.go index ec59664ea96e..57192178e137 100644 --- a/server/ingester/flow_metrics/unmarshaller/handle_document.go +++ b/server/ingester/flow_metrics/unmarshaller/handle_document.go @@ -23,7 +23,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/common" "github.com/deepflowio/deepflow/server/libs/app" "github.com/deepflowio/deepflow/server/libs/datatype" - "github.com/deepflowio/deepflow/server/libs/flow-metrics" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" "github.com/deepflowio/deepflow/server/libs/grpc" "github.com/deepflowio/deepflow/server/libs/utils" ) @@ -133,6 +133,7 @@ func DocumentExpand(doc *app.Document, platformData *grpc.PlatformInfoTable) err t.Code |= MainAddCode } + t.OrgId, t.TeamID = platformData.QueryVtapOrgAndTeamID(uint32(t.VTAPID)) podGroupType, podGroupType1 := uint8(0), uint8(0) if info1 != nil { t.RegionID1 = uint16(info1.RegionID) diff --git a/server/ingester/flow_tag/flow_tag.go b/server/ingester/flow_tag/flow_tag.go index faf4b877d1e0..f7a75a2b6c41 100644 --- a/server/ingester/flow_tag/flow_tag.go +++ b/server/ingester/flow_tag/flow_tag.go @@ -68,6 +68,7 @@ type FlowTagInfo struct { Table string // Represents virtual_table_name in ext_metrics FieldName string FieldValue string + VtapId uint16 // IDs only for prometheus TableId uint32 @@ -77,6 +78,9 @@ type FlowTagInfo struct { VpcId int32 // XXX: can use int16 PodNsId uint16 FieldType FieldType + + OrgId uint16 + TeamID uint16 } type FlowTag struct { @@ -100,12 +104,17 @@ func (t *FlowTag) WriteBlock(block *ckdb.Block) { t.FieldType.String(), t.FieldName, fieldValueType, + t.TeamID, ) if t.TagType == TagFieldValue { block.Write(t.FieldValue, uint64(1)) // count is 1 } } +func (t *FlowTag) OrgID() uint16 { + return t.OrgId +} + func (t *FlowTag) Columns() []*ckdb.Column { columns := []*ckdb.Column{} columns = append(columns, @@ -116,6 +125,7 @@ func (t *FlowTag) Columns() []*ckdb.Column { ckdb.NewColumn("field_type", ckdb.LowCardinalityString).SetComment("value: tag, metrics"), ckdb.NewColumn("field_name", ckdb.LowCardinalityString), ckdb.NewColumn("field_value_type", ckdb.LowCardinalityString).SetComment("value: string, float"), + ckdb.NewColumn("team_id", ckdb.UInt16), ) if t.TagType == TagFieldValue { columns = append(columns, diff --git a/server/ingester/flow_tag/flow_tag_writer.go b/server/ingester/flow_tag/flow_tag_writer.go index c6c27115d624..f8ace49ccb1e 100644 --- a/server/ingester/flow_tag/flow_tag_writer.go +++ b/server/ingester/flow_tag/flow_tag_writer.go @@ -119,7 +119,7 @@ func NewFlowTagWriter( fmt.Sprintf("%s-%s-%d", name, tableName, decoderIndex), config.CKDB.TimeZone, t.GenCKTable(config.CKDB.ClusterName, config.CKDB.StoragePolicy, tableName, ttl, partition), - w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout) + w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, config.CKDB.Watcher) if err != nil { return nil, err } diff --git a/server/ingester/pcap/dbwriter/pcap.go b/server/ingester/pcap/dbwriter/pcap.go index 7ae7a3c00072..c6399abaa2f3 100644 --- a/server/ingester/pcap/dbwriter/pcap.go +++ b/server/ingester/pcap/dbwriter/pcap.go @@ -65,6 +65,10 @@ func (s *PcapStore) WriteBlock(block *ckdb.Block) { s.AclGids) } +func (s *PcapStore) OrgID() uint16 { + return s.VtapID%10 + 1 +} + func (p *PcapStore) Release() { ReleasePcapStore(p) } diff --git a/server/ingester/pcap/dbwriter/pcap_writer.go b/server/ingester/pcap/dbwriter/pcap_writer.go index 06a4c49aa95a..d80db2ca1909 100644 --- a/server/ingester/pcap/dbwriter/pcap_writer.go +++ b/server/ingester/pcap/dbwriter/pcap_writer.go @@ -63,7 +63,7 @@ func NewPcapWriter(config *config.Config) (*PcapWriter, error) { table := GenPcapCKTable(w.ckdbCluster, w.ckdbStoragePolicy, w.ttl, ckdb.GetColdStorage(w.ckdbColdStorages, PCAP_DB, PCAP_TABLE)) ckwriter, err := ckwriter.NewCKWriter(w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword, - PCAP_TABLE, config.Base.CKDB.TimeZone, table, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout) + PCAP_TABLE, config.Base.CKDB.TimeZone, table, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, config.Base.CKDB.Watcher) if err != nil { return nil, err } diff --git a/server/ingester/pkg/ckwriter/ckwriter.go b/server/ingester/pkg/ckwriter/ckwriter.go index a795938459f5..aed8962d867c 100644 --- a/server/ingester/pkg/ckwriter/ckwriter.go +++ b/server/ingester/pkg/ckwriter/ckwriter.go @@ -26,6 +26,7 @@ import ( "time" "github.com/deepflowio/deepflow/server/ingester/common" + "github.com/deepflowio/deepflow/server/ingester/config" "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/queue" "github.com/deepflowio/deepflow/server/libs/stats" @@ -39,20 +40,22 @@ import ( var log = logging.MustGetLogger("ckwriter") const ( - FLUSH_TIMEOUT = 10 * time.Second - SQL_LOG_LENGTH = 256 + FLUSH_TIMEOUT = 10 * time.Second + SQL_LOG_LENGTH = 256 + MAX_ORGANIZATINON_ID = 1024 ) type CKWriter struct { - addrs []string - user string - password string - table *ckdb.Table - queueCount int - queueSize int // 队列长度 - batchSize int // 累积多少行数据,一起写入 - flushTimeout int // 超时写入: 单位秒 - counterName string // 写入成功失败的统计数据表名称,若写入失败,会根据该数据上报告警 + addrs []string + user string + password string + timeZone string + table *ckdb.Table + queueCount int + queueSize int // 队列长度 + batchSize int // 累积多少行数据,一起写入 + flushDuration time.Duration // 超时写入 + counterName string // 写入成功失败的统计数据表名称,若写入失败,会根据该数据上报告警 name string // 数据库名-表名 用作 queue名字和counter名字 prepare string // 写入数据时,先执行prepare @@ -63,6 +66,7 @@ type CKWriter struct { counters []Counter putCounter int writeCounter uint64 + ckdbwatcher *config.Watcher wg sync.WaitGroup exit bool @@ -70,6 +74,7 @@ type CKWriter struct { type CKItem interface { WriteBlock(block *ckdb.Block) + OrgID() uint16 Release() } @@ -82,7 +87,35 @@ func ExecSQL(conn clickhouse.Conn, query string) error { return conn.Exec(context.Background(), query) } -func InitTable(addr, user, password, timeZone string, t *ckdb.Table) error { +func initTable(conn clickhouse.Conn, timeZone string, t *ckdb.Table, orgID uint16) error { + if err := ExecSQL(conn, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", t.OrgDatabase(orgID))); err != nil { + return err + } + + if err := ExecSQL(conn, t.MakeOrgLocalTableCreateSQL(orgID)); err != nil { + return err + } + if err := ExecSQL(conn, t.MakeOrgGlobalTableCreateSQL(orgID)); err != nil { + return err + } + + for _, c := range t.Columns { + for _, table := range []string{t.GlobalName, t.LocalName} { + modTimeZoneSql := c.MakeModifyTimeZoneSQL(t.OrgDatabase(orgID), table, timeZone) + if modTimeZoneSql == "" { + break + } + + if err := ExecSQL(conn, modTimeZoneSql); err != nil { + log.Warningf("modify time zone failed, error: %s", err) + } + } + } + + return nil +} + +func InitTable(addr, user, password, timeZone string, t *ckdb.Table, orgID uint16) error { conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{addr}, Auth: clickhouse.Auth{ @@ -95,35 +128,46 @@ func InitTable(addr, user, password, timeZone string, t *ckdb.Table) error { return err } - if err := ExecSQL(conn, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", t.Database)); err != nil { + if err := initTable(conn, timeZone, t, orgID); err != nil { + conn.Close() return err } + conn.Close() - if err := ExecSQL(conn, t.MakeLocalTableCreateSQL()); err != nil { - return err + return nil +} + +func (w *CKWriter) InitTable(orgID uint16) error { + for _, conn := range w.conns { + if err := initTable(conn, w.timeZone, w.table, orgID); err != nil { + return err + } } - if err := ExecSQL(conn, t.MakeGlobalTableCreateSQL()); err != nil { - return err + + // in standalone mode, ckdbWatcher will be nil + if w.ckdbwatcher == nil { + return nil } - for _, c := range t.Columns { - for _, table := range []string{t.GlobalName, t.LocalName} { - modTimeZoneSql := c.MakeModifyTimeZoneSQL(t.Database, table, timeZone) - if modTimeZoneSql == "" { - break - } + endpoints, err := w.ckdbwatcher.GetClickhouseEndpointsWithoutMyself() + if err != nil { + log.Warningf("get clickhouse endpoints without myself failed: %s", err) + return err + } - if err := ExecSQL(conn, modTimeZoneSql); err != nil { - log.Warningf("modify time zone failed, error: %s", err) - } + for _, endpoint := range endpoints { + err := InitTable(fmt.Sprintf("%s:%d", endpoint.Host, endpoint.Port), w.user, w.password, w.timeZone, w.table, orgID) + if err != nil { + log.Warningf("node %s:%d init table failed. err: %s", endpoint.Host, endpoint.Port, err) + } else { + log.Infof("node %s:%d init table %s success", endpoint.Host, endpoint.Port, w.table.LocalName) } } - conn.Close() return nil } -func NewCKWriter(addrs []string, user, password, counterName, timeZone string, table *ckdb.Table, queueCount, queueSize, batchSize, flushTimeout int) (*CKWriter, error) { +func NewCKWriter(addrs []string, user, password, counterName, timeZone string, table *ckdb.Table, queueCount, queueSize, batchSize, flushTimeout int, ckdbwatcher *config.Watcher) (*CKWriter, error) { log.Infof("New CK writer: Addrs=%v, user=%s, database=%s, table=%s, queueCount=%d, queueSize=%d, batchSize=%d, flushTimeout=%ds, counterName=%s, timeZone=%s", addrs, user, table.Database, table.LocalName, queueCount, queueSize, batchSize, flushTimeout, counterName, timeZone) @@ -133,9 +177,9 @@ func NewCKWriter(addrs []string, user, password, counterName, timeZone string, t var err error - // clickhouse的初始化创建表 + // clickhouse init default organization database/tables for _, addr := range addrs { - if err = InitTable(addr, user, password, timeZone, table); err != nil { + if err = InitTable(addr, user, password, timeZone, table, 1); err != nil { return nil, err } } @@ -165,23 +209,25 @@ func NewCKWriter(addrs []string, user, password, counterName, timeZone string, t common.QUEUE_STATS_MODULE_INGESTER) return &CKWriter{ - addrs: addrs, - user: user, - password: password, - table: table, - queueCount: queueCount, - queueSize: queueSize, - batchSize: batchSize, - flushTimeout: flushTimeout, - counterName: counterName, - - name: name, - prepare: table.MakePrepareTableInsertSQL(), - conns: conns, - batchs: batchs, - connCount: uint64(len(conns)), - dataQueues: dataQueues, - counters: make([]Counter, queueCount), + addrs: addrs, + user: user, + password: password, + timeZone: timeZone, + table: table, + queueCount: queueCount, + queueSize: queueSize, + batchSize: batchSize, + flushDuration: time.Duration(flushTimeout) * time.Second, + counterName: counterName, + + name: name, + prepare: table.MakePrepareTableInsertSQL(), + conns: conns, + batchs: batchs, + connCount: uint64(len(conns)), + dataQueues: dataQueues, + counters: make([]Counter, queueCount), + ckdbwatcher: ckdbwatcher, }, nil } @@ -196,6 +242,7 @@ type Counter struct { WriteFailedCount int64 `statsd:"write-failed-count"` RetryCount int64 `statsd:"retry-count"` RetryFailedCount int64 `statsd:"retry-failed-count"` + OrgInvalidCount int64 `statsd:"org-invalid-count"` utils.Closable } @@ -219,31 +266,62 @@ func (w *CKWriter) Put(items ...interface{}) { w.dataQueues.Put(queue.HashKey(w.putCounter%w.queueCount), items...) } +type Cache struct { + orgID uint16 + prepare string + items []CKItem + lastWriteTime time.Time + tableCreated bool +} + +func (c *Cache) Release() { + for _, item := range c.items { + item.Release() + } + c.items = c.items[:0] +} + func (w *CKWriter) queueProcess(queueID int) { common.RegisterCountableForIngester("ckwriter", &(w.counters[queueID]), stats.OptionStatTags{"thread": strconv.Itoa(queueID), "table": w.name, "name": w.counterName}) defer w.wg.Done() w.wg.Add(1) - var lastWriteTime time.Time - rawItems := make([]interface{}, 1024) - caches := make([]CKItem, 0, w.batchSize) + var cache *Cache + orgCaches := make([]*Cache, MAX_ORGANIZATINON_ID+1) + for i := range orgCaches { + orgCaches[i] = new(Cache) + orgCaches[i].items = make([]CKItem, 0) + orgCaches[i].orgID = uint16(i) + orgCaches[i].prepare = w.table.MakeOrgPrepareTableInsertSQL(uint16(i)) + } + for !w.exit { n := w.dataQueues.Gets(queue.HashKey(queueID), rawItems) for i := 0; i < n; i++ { item := rawItems[i] if ck, ok := item.(CKItem); ok { - caches = append(caches, ck) - if len(caches) >= w.batchSize { - w.Write(queueID, caches) - caches = caches[:0] - lastWriteTime = time.Now() + orgID := ck.OrgID() + if orgID > MAX_ORGANIZATINON_ID || orgID == 0 { + if w.counters[queueID].OrgInvalidCount == 0 { + log.Warningf("get writer queue(%s) data orgID wrong %d", w.name, orgID) + } + w.counters[queueID].OrgInvalidCount++ + continue + } + cache = orgCaches[orgID] + cache.items = append(cache.items, ck) + if len(cache.items) >= w.batchSize { + w.Write(queueID, cache) + cache.lastWriteTime = time.Now() } } else if IsNil(item) { // flush ticker - if time.Since(lastWriteTime) > time.Duration(w.flushTimeout)*time.Second { - w.Write(queueID, caches) - caches = caches[:0] - lastWriteTime = time.Now() + now := time.Now() + for _, cache := range orgCaches { + if len(cache.items) > 0 && now.Sub(cache.lastWriteTime) > w.flushDuration { + w.Write(queueID, cache) + cache.lastWriteTime = now + } } } else { log.Warningf("get writer queue data type wrong %T", ck) @@ -269,46 +347,57 @@ func (w *CKWriter) ResetConnection(connID int) error { return err } -func (w *CKWriter) Write(queueID int, items []CKItem) { +func (w *CKWriter) Write(queueID int, cache *Cache) { connID := int(atomic.AddUint64(&w.writeCounter, 1) % w.connCount) - if err := w.writeItems(queueID, connID, items); err != nil { - // Prevent frequent log writing - logEnabled := w.counters[queueID].WriteFailedCount == 0 + itemsLen := len(cache.items) + // Prevent frequent log writing + logEnabled := w.counters[queueID].WriteFailedCount == 0 + if !cache.tableCreated { + err := w.InitTable(cache.orgID) + if err != nil { + if logEnabled { + log.Warningf("create table(%s.%s) failed, drop(%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err) + } + w.counters[queueID].WriteFailedCount += int64(itemsLen) + cache.Release() + return + } + cache.tableCreated = true + } + if err := w.writeItems(queueID, connID, cache); err != nil { if logEnabled { - log.Warningf("write table(%s.%s) failed, will retry write(%d) items: %s", w.table.Database, w.table.LocalName, len(items), err) + log.Warningf("write table(%s.%s) failed, will retry write(%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err) } if err := w.ResetConnection(connID); err != nil { log.Warningf("reconnect clickhouse failed: %s", err) time.Sleep(time.Second * 10) } else { if logEnabled { - log.Infof("reconnect clickhouse success: %s %s", w.table.Database, w.table.LocalName) + log.Infof("reconnect clickhouse success: %s %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName) } } w.counters[queueID].RetryCount++ // 写失败重连后重试一次, 规避偶尔写失败问题 - err = w.writeItems(queueID, connID, items) + err = w.writeItems(queueID, connID, cache) if logEnabled { if err != nil { w.counters[queueID].RetryFailedCount++ - log.Warningf("retry write table(%s.%s) failed, drop(%d) items: %s", w.table.Database, w.table.LocalName, len(items), err) + log.Warningf("retry write table(%s.%s) failed, drop(%d) items: %s", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen, err) } else { - log.Infof("retry write table(%s.%s) success, write(%d) items", w.table.Database, w.table.LocalName, len(items)) + log.Infof("retry write table(%s.%s) success, write(%d) items", w.table.OrgDatabase(cache.orgID), w.table.LocalName, itemsLen) } } if err != nil { - w.counters[queueID].WriteFailedCount += int64(len(items)) + w.counters[queueID].WriteFailedCount += int64(itemsLen) } else { - w.counters[queueID].WriteSuccessCount += int64(len(items)) + w.counters[queueID].WriteSuccessCount += int64(itemsLen) } } else { - w.counters[queueID].WriteSuccessCount += int64(len(items)) + w.counters[queueID].WriteSuccessCount += int64(itemsLen) } - for _, item := range items { - item.Release() - } + cache.Release() } func IsNil(i interface{}) bool { @@ -322,8 +411,8 @@ func IsNil(i interface{}) bool { return false } -func (w *CKWriter) writeItems(queueID, connID int, items []CKItem) error { - if len(items) == 0 { +func (w *CKWriter) writeItems(queueID, connID int, cache *Cache) error { + if len(cache.items) == 0 { return nil } ck := w.conns[connID] @@ -338,13 +427,13 @@ func (w *CKWriter) writeItems(queueID, connID int, items []CKItem) error { batchID := queueID*int(w.connCount) + connID batch := w.batchs[batchID] if IsNil(batch) { - w.batchs[batchID], err = ck.PrepareBatch(context.Background(), w.prepare) + w.batchs[batchID], err = ck.PrepareBatch(context.Background(), cache.prepare) if err != nil { return err } batch = w.batchs[batchID] } else { - batch, err = ck.PrepareReuseBatch(context.Background(), w.prepare, batch) + batch, err = ck.PrepareReuseBatch(context.Background(), cache.prepare, batch) if err != nil { return err } @@ -352,7 +441,7 @@ func (w *CKWriter) writeItems(queueID, connID int, items []CKItem) error { } ckdbBlock := ckdb.NewBlock(batch) - for _, item := range items { + for _, item := range cache.items { item.WriteBlock(ckdbBlock) if err := ckdbBlock.WriteAll(); err != nil { return fmt.Errorf("item write block failed: %s", err) @@ -361,7 +450,7 @@ func (w *CKWriter) writeItems(queueID, connID int, items []CKItem) error { if err = ckdbBlock.Send(); err != nil { return fmt.Errorf("send write block failed: %s", err) } else { - log.Debugf("batch write success, table (%s.%s) commit %d items", w.table.Database, w.table.LocalName, len(items)) + log.Debugf("batch write success, table (%s.%s) commit %d items", w.table.Database, w.table.LocalName, len(cache.items)) } return nil } diff --git a/server/ingester/profile/dbwriter/profile.go b/server/ingester/profile/dbwriter/profile.go index 0a29f5ce9f40..2e37b84f7c61 100644 --- a/server/ingester/profile/dbwriter/profile.go +++ b/server/ingester/profile/dbwriter/profile.go @@ -90,6 +90,9 @@ type InProcessProfile struct { L3DeviceType uint8 L3DeviceID uint32 ServiceID uint32 + + OrgId uint16 + TeamID uint16 } // profile_event_type <-> profile_value_unit relation @@ -161,6 +164,7 @@ func ProfileColumns() []*ckdb.Column { ckdb.NewColumn("l3_device_type", ckdb.UInt8).SetComment("资源类型"), ckdb.NewColumn("l3_device_id", ckdb.UInt32).SetComment("资源ID"), ckdb.NewColumn("service_id", ckdb.UInt32).SetComment("服务ID"), + ckdb.NewColumn("team_id", ckdb.UInt16).SetComment("团队ID"), } } @@ -228,9 +232,14 @@ func (p *InProcessProfile) WriteBlock(block *ckdb.Block) { p.L3DeviceType, p.L3DeviceID, p.ServiceID, + p.TeamID, ) } +func (p *InProcessProfile) OrgID() uint16 { + return p.OrgId +} + var poolInProcess = pool.NewLockFreePool(func() interface{} { return new(InProcessProfile) }) @@ -302,6 +311,7 @@ func (p *InProcessProfile) FillProfile(input *storage.PutInput, p.TagValues = tagValues p.fillResource(uint32(vtapID), podID, platformData) + p.OrgId, p.TeamID = platformData.QueryVtapOrgAndTeamID(uint32(vtapID)) } func genID(time uint32, counter *uint32, vtapID uint16) uint64 { @@ -436,6 +446,8 @@ func (p *InProcessProfile) GenerateFlowTags(cache *flow_tag.FlowTagCache) { Table: fmt.Sprintf("%s.%s", p.ProfileLanguageType, p.ProfileEventType), VpcId: p.L3EpcID, PodNsId: p.PodNSID, + OrgId: p.OrgId, + TeamID: p.TeamID, } cache.Fields = cache.Fields[:0] cache.FieldValues = cache.FieldValues[:0] diff --git a/server/ingester/profile/dbwriter/profile_writer.go b/server/ingester/profile/dbwriter/profile_writer.go index b0ed82c10390..d243fa9baf3c 100644 --- a/server/ingester/profile/dbwriter/profile_writer.go +++ b/server/ingester/profile/dbwriter/profile_writer.go @@ -106,7 +106,8 @@ func NewProfileWriter(msgType datatype.MessageType, decoderIndex int, config *co writer.writerConfig.QueueCount, writer.writerConfig.QueueSize, writer.writerConfig.BatchSize, - writer.writerConfig.FlushTimeout) + writer.writerConfig.FlushTimeout, + config.Base.CKDB.Watcher) if err != nil { log.Error(err) return nil, err diff --git a/server/ingester/prometheus/dbwriter/prometheus_sample.go b/server/ingester/prometheus/dbwriter/prometheus_sample.go index 5614c15855c0..a93074b994b7 100644 --- a/server/ingester/prometheus/dbwriter/prometheus_sample.go +++ b/server/ingester/prometheus/dbwriter/prometheus_sample.go @@ -24,7 +24,7 @@ import ( "github.com/deepflowio/deepflow/server/ingester/flow_tag" "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/datatype/prompb" - "github.com/deepflowio/deepflow/server/libs/flow-metrics" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" "github.com/deepflowio/deepflow/server/libs/pool" "github.com/prometheus/common/model" ) @@ -38,6 +38,7 @@ type PrometheusSampleInterface interface { DatabaseName() string TableName() string WriteBlock(*ckdb.Block) + OrgID() uint16 Columns(int) []*ckdb.Column AppLabelLen() int GenCKTable(string, string, int, *ckdb.ColdStorage, int) *ckdb.Table @@ -54,8 +55,11 @@ type PrometheusSample struct { type PrometheusSampleMini struct { Timestamp uint32 // s + VtapId uint16 MetricID uint32 TargetID uint32 + OrgId uint16 + TeamID uint16 AppLabelValueIDs []uint32 Value float64 @@ -87,6 +91,7 @@ func (m *PrometheusSampleMini) WriteBlock(block *ckdb.Block) { block.Write( m.MetricID, m.TargetID, + m.TeamID, ) for _, v := range m.AppLabelValueIDs[1:] { block.Write(v) @@ -94,6 +99,10 @@ func (m *PrometheusSampleMini) WriteBlock(block *ckdb.Block) { block.Write(m.Value) } +func (m *PrometheusSampleMini) OrgID() uint16 { + return m.OrgId +} + // Note: The order of append() must be consistent with the order of Write() in WriteBlock. func (m *PrometheusSampleMini) Columns(appLabelColumnCount int) []*ckdb.Column { columns := []*ckdb.Column{} @@ -102,6 +111,7 @@ func (m *PrometheusSampleMini) Columns(appLabelColumnCount int) []*ckdb.Column { columns = append(columns, ckdb.NewColumn("metric_id", ckdb.UInt32).SetComment("encoded ID of the metric name"), ckdb.NewColumn("target_id", ckdb.UInt32).SetComment("the encoded ID of the target"), + ckdb.NewColumn("team_id", ckdb.UInt32).SetComment("the team ID"), ) for i := 1; i <= appLabelColumnCount; i++ { columns = append(columns, ckdb.NewColumn(fmt.Sprintf("app_label_value_id_%d", i), ckdb.UInt32)) @@ -144,6 +154,8 @@ func (m *PrometheusSampleMini) GenerateNewFlowTags(cache *flow_tag.FlowTagCache, TableId: m.MetricID, VpcId: m.VpcId(), PodNsId: m.PodNsId(), + VtapId: m.VtapId, + TeamID: m.TeamID, } cache.Fields = cache.Fields[:0] cache.FieldValues = cache.FieldValues[:0] @@ -229,6 +241,10 @@ func (m *PrometheusSample) WriteBlock(block *ckdb.Block) { m.UniversalTag.WriteBlock(block) } +func (m *PrometheusSample) OrgID() uint16 { + return m.UniversalTag.VTAPID%10 + 1 +} + // Note: The order of append() must be consistent with the order of Write() in WriteBlock. func (m *PrometheusSample) Columns(appLabelColumnCount int) []*ckdb.Column { columns := m.PrometheusSampleMini.Columns(appLabelColumnCount) diff --git a/server/ingester/prometheus/dbwriter/prometheus_writer.go b/server/ingester/prometheus/dbwriter/prometheus_writer.go index ad416405a625..cdc0715471da 100644 --- a/server/ingester/prometheus/dbwriter/prometheus_writer.go +++ b/server/ingester/prometheus/dbwriter/prometheus_writer.go @@ -148,7 +148,7 @@ func (w *PrometheusWriter) getOrCreateCkwriter(s PrometheusSampleInterface) (*ck ckwriter, err := ckwriter.NewCKWriter( w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword, fmt.Sprintf("%s-%s-%d-%d", w.name, s.TableName(), w.decoderIndex, appLabelCount), w.ckdbTimeZone, - table, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout) + table, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, w.ckdbWatcher) if err != nil { return nil, err } @@ -171,11 +171,7 @@ func (w *PrometheusWriter) getOrCreateCkwriter(s PrometheusSampleInterface) (*ck if err := w.addAppLabelColumns(w.ckdbConn, startIndex, endIndex); err != nil { return nil, err } - // 需要在cluseter其他节点也增加列 - if err := w.createTableOnCluster(table); err != nil { - log.Warningf("other node failed when create table: %s", err) - } if err := w.addAppLabelColumnsOnCluster(startIndex, endIndex); err != nil { log.Warningf("other node failed when add app_value_id columns which index from %d to %d: %s", startIndex, endIndex, err) } @@ -188,26 +184,6 @@ func (w *PrometheusWriter) getOrCreateCkwriter(s PrometheusSampleInterface) (*ck return ckwriter, nil } -func (w *PrometheusWriter) createTableOnCluster(table *ckdb.Table) error { - // in standalone mode, ckdbWatcher will be nil - if w.ckdbWatcher == nil { - return nil - } - endpoints, err := w.ckdbWatcher.GetClickhouseEndpointsWithoutMyself() - if err != nil { - return err - } - for _, endpoint := range endpoints { - err := ckwriter.InitTable(fmt.Sprintf("%s:%d", endpoint.Host, endpoint.Port), w.ckdbUsername, w.ckdbPassword, w.ckdbTimeZone, table) - if err != nil { - log.Warningf("node %s:%d init table failed. err: %s", endpoint.Host, endpoint.Port, err) - } else { - log.Infof("node %s:%d init table %s success", endpoint.Host, endpoint.Port, table.LocalName) - } - } - return nil -} - func (w *PrometheusWriter) addAppLabelColumnsOnCluster(startIndex, endIndex int) error { // in standalone mode, ckdbWatcher will be nil if w.ckdbWatcher == nil { diff --git a/server/ingester/prometheus/decoder/decoder.go b/server/ingester/prometheus/decoder/decoder.go index e5a42b9cba3f..c2667c516e08 100644 --- a/server/ingester/prometheus/decoder/decoder.go +++ b/server/ingester/prometheus/decoder/decoder.go @@ -33,7 +33,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/codec" "github.com/deepflowio/deepflow/server/libs/datatype" "github.com/deepflowio/deepflow/server/libs/datatype/prompb" - "github.com/deepflowio/deepflow/server/libs/flow-metrics" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" "github.com/deepflowio/deepflow/server/libs/flow-metrics/pb" "github.com/deepflowio/deepflow/server/libs/grpc" "github.com/deepflowio/deepflow/server/libs/queue" @@ -456,7 +456,9 @@ func (b *PrometheusSamplesBuilder) TimeSeriesToStore(vtapID, epcId, podClusterId m.TargetID = targetID m.AppLabelValueIDs = append(m.AppLabelValueIDs, b.appLabelValueIDsBuffer...) m.Value = v + m.VtapId = vtapID b.samplesBuffer = append(b.samplesBuffer, m) + m.OrgId, m.TeamID = b.platformData.QueryVtapOrgAndTeamID(uint32(vtapID)) } else { m := dbwriter.AcquirePrometheusSample() m.Timestamp = uint32(model.Time(s.Timestamp).Unix()) @@ -464,6 +466,7 @@ func (b *PrometheusSamplesBuilder) TimeSeriesToStore(vtapID, epcId, podClusterId m.TargetID = targetID m.AppLabelValueIDs = append(m.AppLabelValueIDs, b.appLabelValueIDsBuffer...) m.Value = v + m.OrgId, m.TeamID = b.platformData.QueryVtapOrgAndTeamID(uint32(vtapID)) if i == 0 { b.fillUniversalTag(m, vtapID, podName, instance, podNameID, instanceID, false) diff --git a/server/libs/app/document.go b/server/libs/app/document.go index 747945bdde45..ae49d607ff46 100644 --- a/server/libs/app/document.go +++ b/server/libs/app/document.go @@ -22,7 +22,7 @@ import ( "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/codec" - "github.com/deepflowio/deepflow/server/libs/flow-metrics" + flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics" "github.com/deepflowio/deepflow/server/libs/flow-metrics/pb" "github.com/deepflowio/deepflow/server/libs/pool" ) @@ -164,6 +164,10 @@ func (d *Document) WriteBlock(block *ckdb.Block) { d.Meter.WriteBlock(block) } +func (d *Document) OrgID() uint16 { + return d.Tagger.(*flow_metrics.Tag).OrgId +} + func (d *Document) TableID() (uint8, error) { tag, _ := d.Tagger.(*flow_metrics.Tag) return tag.TableID((d.Flags & FLAG_PER_SECOND_METRICS) == 1) diff --git a/server/libs/ckdb/table.go b/server/libs/ckdb/table.go index 15cf7b0a7e41..58364ba431f2 100644 --- a/server/libs/ckdb/table.go +++ b/server/libs/ckdb/table.go @@ -21,6 +21,20 @@ import ( "strings" ) +const ( + DEFAULT_ORG_ID = 1 + INVALID_ORG_ID = 0 + DEFAULT_TEAM_ID = 1 + INVALID_TEAM_ID = 0 +) + +func OrgDatabasePrefix(orgID uint16) string { + if orgID == DEFAULT_ORG_ID || orgID == INVALID_ORG_ID { // 0 is invalid Organization, 1 is default Organization + return "" + } + return fmt.Sprintf("%04d_", orgID) +} + const ( METRICS_DB = "flow_metrics" LOCAL_SUBFFIX = "_local" @@ -63,7 +77,11 @@ type Table struct { PrimaryKeyCount int // 一级索引的key的个数, 从orderKeys中数前n个, } -func (t *Table) MakeLocalTableCreateSQL() string { +func (t *Table) OrgDatabase(orgID uint16) string { + return OrgDatabasePrefix(orgID) + t.Database +} + +func (t *Table) makeLocalTableCreateSQL(database string) string { columns := []string{} for _, c := range t.Columns { comment := "" @@ -115,7 +133,7 @@ ORDER BY (%s) %s %s SETTINGS storage_policy = '%s'`, - t.Database, fmt.Sprintf("`%s`", t.LocalName), + database, fmt.Sprintf("`%s`", t.LocalName), strings.Join(columns, ",\n"), engine, strings.Join(t.OrderKeys[:t.PrimaryKeyCount], ","), @@ -126,13 +144,29 @@ SETTINGS storage_policy = '%s'`, return createTable } -func (t *Table) MakeGlobalTableCreateSQL() string { +func (t *Table) MakeLocalTableCreateSQL() string { + return t.makeLocalTableCreateSQL(t.Database) +} + +func (t *Table) MakeOrgLocalTableCreateSQL(orgID uint16) string { + return t.makeLocalTableCreateSQL(t.OrgDatabase(orgID)) +} + +func (t *Table) makeGlobalTableCreateSQL(database string) string { engine := fmt.Sprintf(Distributed.String(), t.Cluster, t.Database, t.LocalName) return fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.`%s` AS %s.`%s` ENGINE=%s", - t.Database, t.GlobalName, t.Database, t.LocalName, engine) + database, t.GlobalName, t.Database, t.LocalName, engine) } -func (t *Table) MakePrepareTableInsertSQL() string { +func (t *Table) MakeGlobalTableCreateSQL() string { + return t.makeGlobalTableCreateSQL(t.Database) +} + +func (t *Table) MakeOrgGlobalTableCreateSQL(orgID uint16) string { + return t.makeGlobalTableCreateSQL(t.OrgDatabase(orgID)) +} + +func (t *Table) makePrepareTableInsertSQL(database string) string { columns := []string{} values := []string{} for _, c := range t.Columns { @@ -141,9 +175,17 @@ func (t *Table) MakePrepareTableInsertSQL() string { } prepare := fmt.Sprintf("INSERT INTO %s.`%s` (%s) VALUES (%s)", - t.Database, t.LocalName, + database, t.LocalName, strings.Join(columns, ","), strings.Join(values, ",")) return prepare } + +func (t *Table) MakePrepareTableInsertSQL() string { + return t.makePrepareTableInsertSQL(t.Database) +} + +func (t *Table) MakeOrgPrepareTableInsertSQL(orgID uint16) string { + return t.makePrepareTableInsertSQL(t.OrgDatabase(orgID)) +} diff --git a/server/libs/flow-metrics/tag.go b/server/libs/flow-metrics/tag.go index 92695d033933..c31edc178852 100644 --- a/server/libs/flow-metrics/tag.go +++ b/server/libs/flow-metrics/tag.go @@ -301,6 +301,8 @@ type Field struct { Protocol layers.IPProtocol ServerPort uint16 VTAPID uint16 + OrgId uint16 // no need to store + TeamID uint16 TAPPort datatype.TapPort TAPSide TAPSideEnum TAPType TAPTypeEnum @@ -866,6 +868,8 @@ func (t *Tag) MarshalTo(b []byte) int { if t.Code&VTAPID != 0 { offset += copy(b[offset:], ",agent_id=") offset += copy(b[offset:], strconv.FormatUint(uint64(t.VTAPID), 10)) + offset += copy(b[offset:], ",team_id=") + offset += copy(b[offset:], strconv.FormatUint(uint64(t.TeamID), 10)) } return offset @@ -1089,6 +1093,7 @@ func GenTagColumns(code Code) []*ckdb.Column { } if code&VTAPID != 0 { columns = append(columns, ckdb.NewColumnWithGroupBy("agent_id", ckdb.UInt16).SetComment("采集器的ID")) + columns = append(columns, ckdb.NewColumnWithGroupBy("team_id", ckdb.UInt16).SetComment("团队的ID")) } return columns @@ -1287,6 +1292,7 @@ func (t *Tag) WriteBlock(block *ckdb.Block, time uint32) { } if code&VTAPID != 0 { block.Write(t.VTAPID) + block.Write(t.TeamID) } } diff --git a/server/libs/grpc/grpc_platformdata.go b/server/libs/grpc/grpc_platformdata.go index 0c8a53221147..a75828cdcb6a 100644 --- a/server/libs/grpc/grpc_platformdata.go +++ b/server/libs/grpc/grpc_platformdata.go @@ -34,6 +34,7 @@ import ( "golang.org/x/net/context" "github.com/deepflowio/deepflow/message/trident" + "github.com/deepflowio/deepflow/server/libs/ckdb" "github.com/deepflowio/deepflow/server/libs/datatype" "github.com/deepflowio/deepflow/server/libs/hmap/lru" "github.com/deepflowio/deepflow/server/libs/receiver" @@ -111,6 +112,8 @@ type VtapInfo struct { EpcId int32 Ip string PodClusterId uint32 + OrgId uint16 + TeamId uint16 } type Counter struct { @@ -1420,6 +1423,13 @@ func (t *PlatformInfoTable) QueryVtapInfo(vtapId uint32) *VtapInfo { return nil } +func (t *PlatformInfoTable) QueryVtapOrgAndTeamID(vtapId uint32) (uint16, uint16) { + if vtapInfo, ok := t.vtapIdInfos[vtapId]; ok { + return vtapInfo.OrgId, vtapInfo.TeamId + } + return ckdb.DEFAULT_ORG_ID, ckdb.INVALID_TEAM_ID +} + func (t *PlatformInfoTable) inPlatformData(epcID int32, isIPv4 bool, ip4 uint32, ip6 net.IP) bool { if isIPv4 { if t.queryIPV4Infos(epcID, ip4) != nil { @@ -1486,6 +1496,8 @@ func (t *PlatformInfoTable) updateVtapIps(vtapIps []*trident.VtapIp) { EpcId: epcId, Ip: vtapIp.GetIp(), PodClusterId: vtapIp.GetPodClusterId(), + OrgId: uint16(vtapIp.GetOrgId()), + TeamId: uint16(vtapIp.GetTeamId()), } } t.vtapIdInfos = vtapIdInfos diff --git a/server/libs/stats/message.go b/server/libs/stats/message.go index 28397460ba15..cb2ba258e841 100644 --- a/server/libs/stats/message.go +++ b/server/libs/stats/message.go @@ -50,6 +50,7 @@ type Field struct { type DFStats struct { Time uint32 + VtapID uint16 TableName string Tags []Tag Fields []Field @@ -120,6 +121,10 @@ func (s *DFStats) WriteBlock(block *ckdb.Block) error { return nil } +func (s *DFStats) OrgID() uint16 { + return s.VtapID%10 + 1 +} + func (s *DFStats) Release() { ReleaseDFStats(s) }