Skip to content

Commit

Permalink
[ingester] supports organization data to be stored independently
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Mar 26, 2024
1 parent be1bc71 commit fbbff0a
Show file tree
Hide file tree
Showing 33 changed files with 543 additions and 211 deletions.
166 changes: 115 additions & 51 deletions server/ingester/ckissu/ckissu.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,10 @@ func getColumnRenames(columnRenamess []*ColumnRenames) []*ColumnRename {
return renames
}

func (i *Issu) renameColumns(connect *sql.DB) ([]*ColumnRename, error) {
func (i *Issu) renameColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnRename, error) {
dones := []*ColumnRename{}
for _, renameColumn := range i.columnRenames {
renameColumn.Db = getOrgDatabase(renameColumn.Db, orgIDPrefix)
version, err := i.getTableVersion(connect, renameColumn.Db, renameColumn.Table)
if err != nil {
if strings.Contains(err.Error(), "doesn't exist") {
Expand All @@ -883,9 +884,10 @@ func (i *Issu) renameColumns(connect *sql.DB) ([]*ColumnRename, error) {
return dones, nil
}

func (i *Issu) modColumns(connect *sql.DB) ([]*ColumnMod, error) {
func (i *Issu) modColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnMod, error) {
dones := []*ColumnMod{}
for _, modColumn := range i.columnMods {
modColumn.Db = getOrgDatabase(modColumn.Db, orgIDPrefix)
version, err := i.getTableVersion(connect, modColumn.Db, modColumn.Table)
if err != nil {
return dones, err
Expand All @@ -902,9 +904,10 @@ func (i *Issu) modColumns(connect *sql.DB) ([]*ColumnMod, error) {
return dones, nil
}

func (i *Issu) dropColumns(connect *sql.DB) ([]*ColumnDrop, error) {
func (i *Issu) dropColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnDrop, error) {
dones := []*ColumnDrop{}
for _, dropColumn := range i.columnDrops {
dropColumn.Db = getOrgDatabase(dropColumn.Db, orgIDPrefix)
version, err := i.getTableVersion(connect, dropColumn.Db, dropColumn.Table)
if err != nil {
return dones, err
Expand All @@ -920,8 +923,9 @@ func (i *Issu) dropColumns(connect *sql.DB) ([]*ColumnDrop, error) {
return dones, nil
}

func (i *Issu) modTableTTLs(connect *sql.DB) error {
func (i *Issu) modTableTTLs(connect *sql.DB, orgIDPrefix string) error {
for _, modTTL := range i.modTTLs {
modTTL.Db = getOrgDatabase(modTTL.Db, orgIDPrefix)
version, err := i.getTableVersion(connect, modTTL.Db, modTTL.Table)
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -1012,9 +1016,10 @@ func getColumnDatasourceAdds(columnDatasourceAddss []*ColumnDatasourceAdds) []*C
return adds
}

func (i *Issu) addColumns(connect *sql.DB) ([]*ColumnAdd, error) {
func (i *Issu) addColumns(connect *sql.DB, orgIDPrefix string) ([]*ColumnAdd, error) {
dones := []*ColumnAdd{}
for _, add := range i.columnAdds {
add.Db = getOrgDatabase(add.Db, orgIDPrefix)
version, err := i.getTableVersion(connect, add.Db, add.Table)
if err != nil {
return dones, err
Expand All @@ -1032,7 +1037,7 @@ func (i *Issu) addColumns(connect *sql.DB) ([]*ColumnAdd, error) {
for _, tableName := range []string{
flow_metrics.NETWORK_1M.TableName(), flow_metrics.NETWORK_MAP_1M.TableName(),
flow_metrics.APPLICATION_1M.TableName(), flow_metrics.APPLICATION_MAP_1M.TableName()} {
datasourceInfos, err := i.getUserDefinedDatasourceInfos(connect, ckdb.METRICS_DB, strings.Split(tableName, ".")[0])
datasourceInfos, err := i.getUserDefinedDatasourceInfos(connect, getOrgDatabase(ckdb.METRICS_DB, orgIDPrefix), strings.Split(tableName, ".")[0])
if err != nil {
log.Warning(err)
continue
Expand All @@ -1049,9 +1054,10 @@ func (i *Issu) addColumns(connect *sql.DB) ([]*ColumnAdd, error) {
return dones, nil
}

func (i *Issu) addIndexs(connect *sql.DB) ([]*IndexAdd, error) {
func (i *Issu) addIndexs(connect *sql.DB, orgIDPrefix string) ([]*IndexAdd, error) {
dones := []*IndexAdd{}
for _, add := range i.indexAdds {
add.Db = getOrgDatabase(add.Db, orgIDPrefix)
version, err := i.getTableVersion(connect, add.Db, add.Table)
if err != nil {
return dones, err
Expand All @@ -1069,63 +1075,121 @@ func (i *Issu) addIndexs(connect *sql.DB) ([]*IndexAdd, error) {
return dones, nil
}

func (i *Issu) Start() error {
connects := i.Connections
if len(connects) == 0 {
return fmt.Errorf("connections is nil")
func isNumeric(s string) bool {
for _, ch := range s {
if ch < '0' || ch > '9' {
return false
}
}
for _, connect := range connects {
renames, errRenames := i.renameColumns(connect)
if errRenames != nil {
return errRenames
return true
}

func getOrgDatabase(db string, orgIDPrefix string) string {
if len(db) > 5 && db[4] == '_' && isNumeric(db[:4]) {
return orgIDPrefix + db[5:]
}
return orgIDPrefix + db
}

func (i *Issu) startOrg(connect *sql.DB, orgIDPrefix string) error {
renames, errRenames := i.renameColumns(connect, orgIDPrefix)
if errRenames != nil {
return errRenames
}
mods, errMods := i.modColumns(connect, orgIDPrefix)
if errMods != nil {
return errMods
}

adds, errAdds := i.addColumns(connect, orgIDPrefix)
if errAdds != nil {
return errAdds
}

addIndexs, errAddIndexs := i.addIndexs(connect, orgIDPrefix)
if errAddIndexs != nil {
log.Warning(errAddIndexs)
}

drops, errDrops := i.dropColumns(connect, orgIDPrefix)
if errDrops != nil {
return errDrops
}

for _, cr := range renames {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}
mods, errMods := i.modColumns(connect)
if errMods != nil {
return errMods
}
for _, cr := range mods {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}

adds, errAdds := i.addColumns(connect)
if errAdds != nil {
return errAdds
}
for _, cr := range adds {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}

addIndexs, errAddIndexs := i.addIndexs(connect)
if errAddIndexs != nil {
log.Warning(errAddIndexs)
}
for _, cr := range addIndexs {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}

drops, errDrops := i.dropColumns(connect)
if errDrops != nil {
return errDrops
}
for _, cr := range drops {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}
}
go i.modTableTTLs(connect, orgIDPrefix)
return nil
}

for _, cr := range renames {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}
}
for _, cr := range mods {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}
func (i *Issu) getOrgIDPrefixs(connect *sql.DB) ([]string, error) {
checkOrgDatabase := "flow_metrics"
sql := fmt.Sprintf("SELECT name FROM system.databases WHERE name like '%%%s%%'", checkOrgDatabase)
rows, err := connect.Query(sql)
if err != nil {
return nil, err
}
log.Info("get orgs sql: ", sql)
var db string
orgIDPrefixs := []string{}
for rows.Next() {
err := rows.Scan(&db)
if err != nil {
return nil, err
}
for _, cr := range adds {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}

if len(db) > 5 && db[4] == '_' && isNumeric(db[:4]) {
orgIDPrefixs = append(orgIDPrefixs, db[:5])
} else if db == checkOrgDatabase {
orgIDPrefixs = append(orgIDPrefixs, "")
}
for _, cr := range addIndexs {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
}
}
return orgIDPrefixs, nil

}

func (i *Issu) Start() error {
connects := i.Connections
if len(connects) == 0 {
return fmt.Errorf("connections is nil")
}

for _, connect := range connects {
orgIDPrefixs, err := i.getOrgIDPrefixs(connect)
if err != nil {
return fmt.Errorf("get orgIDs failed, err: %s", err)
}
for _, cr := range drops {
if err := i.setTableVersion(connect, cr.Db, cr.Table); err != nil {
return err
for _, orgIDPrefix := range orgIDPrefixs {
err := i.startOrg(connect, orgIDPrefix)
if err != nil {
log.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, err)
}
}
go i.modTableTTLs(connect)
}

return nil
}

Expand Down
59 changes: 59 additions & 0 deletions server/ingester/ckissu/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,65 @@ var ColumnAdd65 = []*ColumnAdds{
ColumnNames: []string{"_id"},
ColumnType: ckdb.UInt64,
},

{
Dbs: []string{"flow_metrics"},
Tables: []string{"application.1m", "application.1m_local", "application_map.1m", "application_map.1m_local",
"application.1s", "application.1s_local", "application_map.1s", "application_map.1s_local", "traffic_policy.1m", "traffic_policy.1m_local"},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
{
Dbs: []string{"flow_log"},
Tables: []string{"l4_flow_log", "l4_flow_log_local", "l7_flow_log_local", "l7_flow_log", "l4_packet_local", "l4_packet", "l7_packet_local", "l7_packet"},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
{
Dbs: []string{"event"},
Tables: []string{"event_local", "event", "alarm_event_local", "alarm_event", "perf_event", "perf_event_local"},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
{
Dbs: []string{"ext_metrics"},
Tables: []string{"metrics_local", "metrics"},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
{
Dbs: []string{"profile"},
Tables: []string{"in_process_local", "in_process"},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
{
Dbs: []string{"prometheus"},
Tables: []string{"samples_local", "samples"},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
{
Dbs: []string{"deepflow_system"},
Tables: []string{"deepflow_system_local", "deepflow_system"},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
{
Dbs: []string{"flow_tag"},
Tables: []string{
"deepflow_system_custom_field_local", "deepflow_system_custom_field",
"deepflow_system_custom_field_value_local", "deepflow_system_custom_field_value",
"event_custom_field_local", "event_custom_field_local",
"event_custom_field_value_local", "event_custom_field_value",
"ext_metrics_custom_field_local", "ext_metrics_custom_field",
"ext_metrics_custom_field_value_local", "ext_metrics_custom_field_value",
"flow_log_custom_field_local", "flow_log_custom_field",
"flow_log_custom_field_value_local", "flow_log_custom_field_value",
},
ColumnNames: []string{"team_id"},
ColumnType: ckdb.UInt16,
},
}

var TableRenames65 = []*TableRename{
Expand Down
2 changes: 1 addition & 1 deletion server/ingester/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
package common

const (
CK_VERSION = "v6.5.1.6" // 用于表示clickhouse的表版本号
CK_VERSION = "v6.5.3.1" // 用于表示clickhouse的表版本号
)
10 changes: 9 additions & 1 deletion server/ingester/event/dbwriter/alarm_event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type AlarmEventStore struct {
PolicyThresholdCritical string
PolicyThresholdError string
PolicyThresholdWarning string
OrgId uint16
TeamID uint16
}

func AlarmEventColumns() []*ckdb.Column {
Expand Down Expand Up @@ -102,6 +104,7 @@ func AlarmEventColumns() []*ckdb.Column {
ckdb.NewColumn("policy_threshold_critical", ckdb.String),
ckdb.NewColumn("policy_threshold_error", ckdb.String),
ckdb.NewColumn("policy_threshold_warning", ckdb.String),
ckdb.NewColumn("team_id", ckdb.UInt16),
}
}

Expand Down Expand Up @@ -134,13 +137,18 @@ func (e *AlarmEventStore) WriteBlock(block *ckdb.Block) {
e.PolicyThresholdCritical,
e.PolicyThresholdError,
e.PolicyThresholdWarning,
e.TeamID,
)
}

func (e *AlarmEventStore) Release() {
ReleaseAlarmEventStore(e)
}

func (e *AlarmEventStore) OrgID() uint16 {
return e.OrgId
}

func GenAlarmEventCKTable(cluster, storagePolicy string, ttl int, coldStorage *ckdb.ColdStorage) *ckdb.Table {
table := common.ALARM_EVENT.TableName()
timeKey := "time"
Expand Down Expand Up @@ -180,7 +188,7 @@ func NewAlarmEventWriter(config *config.Config) (*EventWriter, error) {
ckTable := GenAlarmEventCKTable(w.ckdbCluster, w.ckdbStoragePolicy, w.ttl, ckdb.GetColdStorage(w.ckdbColdStorages, EVENT_DB, common.ALARM_EVENT.TableName()))

ckwriter, err := ckwriter.NewCKWriter(w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword,
common.ALARM_EVENT.TableName(), config.Base.CKDB.TimeZone, ckTable, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout)
common.ALARM_EVENT.TableName(), config.Base.CKDB.TimeZone, ckTable, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, config.Base.CKDB.Watcher)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit fbbff0a

Please sign in to comment.