From d0c026c3c2d392ba28b410aebd2861fbbd42c2a5 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Thu, 20 Jun 2024 20:20:37 +0800 Subject: [PATCH] [ingester] support mod flow_tag ttl --- server/ingester/datasource/handle.go | 49 +++++++++++++++++----------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/server/ingester/datasource/handle.go b/server/ingester/datasource/handle.go index 6f30cb2c80c..02f64c9f376 100644 --- a/server/ingester/datasource/handle.go +++ b/server/ingester/datasource/handle.go @@ -31,15 +31,17 @@ const ( NETWORK = "network" APPLICATION = "application" TRAFFIC_POLICY = "traffic_policy" + FLOW_TAG_DB = "flow_tag" ERR_IS_MODIFYING = "Modifying the retention time (%s), please try again later" ) type DatasourceModifiedOnly string type DatasourceInfo struct { - ID int - DB string - Tables []string + ID int + DB string + Tables []string + FlowTagTables []string } const ( @@ -59,21 +61,23 @@ const ( DEEPFLOW_ADMIN = "deepflow_admin" ) +// to modify the datasource TTL, you need to also modify the 'flow_tag' database tables. +// FIXME: only the 'prometheus' database is supported now, and the remaining databases will be completed in the future. var DatasourceModifiedOnlyIDMap = map[DatasourceModifiedOnly]DatasourceInfo{ - DEEPFLOW_SYSTEM: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 1, "deepflow_system", []string{"deepflow_system"}}, - L4_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 2, "flow_log", []string{"l4_flow_log"}}, - L7_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 3, "flow_log", []string{"l7_flow_log"}}, - L4_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 4, "flow_log", []string{"l4_packet"}}, - L7_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 5, "flow_log", []string{"l7_packet"}}, - EXT_METRICS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 6, "ext_metrics", []string{"metrics"}}, - PROMETHEUS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 7, "prometheus", []string{"samples"}}, - EVENT_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 8, "event", []string{"event"}}, - EVENT_PERF_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 9, "event", []string{"perf_event"}}, - EVENT_ALARM_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 10, "event", []string{"alarm_event"}}, - PROFILE: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 11, "profile", []string{"in_process"}}, - APPLOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 12, "application_log", []string{"log"}}, - DEEPFLOW_TENANT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 13, "deepflow_tenant", []string{"deepflow_collector"}}, - DEEPFLOW_ADMIN: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 14, "deepflow_admin", []string{"deepflow_server"}}, + DEEPFLOW_SYSTEM: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 1, "deepflow_system", []string{"deepflow_system"}, []string{}}, + L4_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 2, "flow_log", []string{"l4_flow_log"}, []string{}}, + L7_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 3, "flow_log", []string{"l7_flow_log"}, []string{}}, + L4_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 4, "flow_log", []string{"l4_packet"}, []string{}}, + L7_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 5, "flow_log", []string{"l7_packet"}, []string{}}, + EXT_METRICS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 6, "ext_metrics", []string{"metrics"}, []string{}}, + PROMETHEUS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 7, "prometheus", []string{"samples"}, []string{"prometheus_custom_field", "prometheus_custom_field_value"}}, + EVENT_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 8, "event", []string{"event"}, []string{}}, + EVENT_PERF_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 9, "event", []string{"perf_event"}, []string{}}, + EVENT_ALARM_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 10, "event", []string{"alarm_event"}, []string{}}, + PROFILE: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 11, "profile", []string{"in_process"}, []string{}}, + APPLOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 12, "application_log", []string{"log"}, []string{}}, + DEEPFLOW_TENANT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 13, "deepflow_tenant", []string{"deepflow_collector"}, []string{}}, + DEEPFLOW_ADMIN: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 14, "deepflow_admin", []string{"deepflow_server"}, []string{}}, } func (ds DatasourceModifiedOnly) DatasourceInfo() DatasourceInfo { @@ -513,6 +517,8 @@ func (m *DatasourceManager) Handle(orgID int, action ActionEnum, dbGroup, baseTa datasourceId := datasoureInfo.ID db := ckdb.OrgDatabasePrefix(uint16(orgID)) + datasoureInfo.DB tables := datasoureInfo.Tables + flowTagDb := ckdb.OrgDatabasePrefix(uint16(orgID)) + FLOW_TAG_DB + flowTagTables := datasoureInfo.FlowTagTables cks, err := basecommon.NewCKConnections(m.ckAddrs, m.user, m.password) if err != nil { @@ -522,16 +528,21 @@ func (m *DatasourceManager) Handle(orgID int, action ActionEnum, dbGroup, baseTa if m.isModifyingFlags[datasourceId] { return fmt.Errorf(ERR_IS_MODIFYING, dbGroup) } - go func(tableNames []string, id int) { + go func(tableNames, flowTagTableNames []string, id int) { m.isModifyingFlags[id] = true for _, tableName := range tableNames { if err := m.modTableTTL(cks, db, tableName, duration); err != nil { log.Info(err) } } + for _, tableName := range flowTagTableNames { + if err := m.modTableTTL(cks, flowTagDb, tableName, duration); err != nil { + log.Info(err) + } + } m.isModifyingFlags[id] = false cks.Close() - }(tables, datasourceId) + }(tables, flowTagTables, datasourceId) return nil }