Skip to content

Commit

Permalink
[CONTROLLER/RECORDER] publishes sub_domain id
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengya authored and SongZhen0704 committed May 28, 2024
1 parent 73508ed commit e4a8573
Show file tree
Hide file tree
Showing 62 changed files with 508 additions and 497 deletions.
2 changes: 1 addition & 1 deletion server/controller/http/service/resource/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func deleteDomain(domain *mysql.Domain, db *mysql.DB, userInfo *svc.UserInfo, cf
db.Delete(&domain)

// pub to tagrecorder
metadata := message.NewMetadata(db.ORGID, domain.TeamID, domain.ID)
metadata := message.NewMetadata(db.ORGID, domain.TeamID, domain.ID, 0)
for _, s := range tagrecorder.GetSubscriberManager().GetSubscribers("domain") {
s.OnDomainDeleted(metadata)
}
Expand Down
94 changes: 47 additions & 47 deletions server/controller/recorder/cache/cache.go

Large diffs are not rendered by default.

390 changes: 195 additions & 195 deletions server/controller/recorder/cache/tool/data_set.go

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions server/controller/recorder/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func newCleaner(orgID int) (*Cleaner, error) {

func (c *Cleaner) cleanDeletedData(retentionInterval int) {
expiredAt := time.Now().Add(time.Duration(-retentionInterval) * time.Hour)
log.Info(c.org.LogPre("clean soft deleted resources (deleted_at < %s) started", expiredAt.Format(ctrlrcommon.GO_BIRTHDAY)))
log.Info(c.org.Logf("clean soft deleted resources (deleted_at < %s) started", expiredAt.Format(ctrlrcommon.GO_BIRTHDAY)))
deleteExpired[mysql.Region](c.org.DB, expiredAt)
deleteExpired[mysql.AZ](c.org.DB, expiredAt)
deleteExpired[mysql.Host](c.org.DB, expiredAt)
Expand All @@ -247,19 +247,19 @@ func (c *Cleaner) cleanDeletedData(retentionInterval int) {
deleteExpired[mysql.Pod](c.org.DB, expiredAt)
deleteExpired[mysql.Process](c.org.DB, expiredAt)
deleteExpired[mysql.PrometheusTarget](c.org.DB, expiredAt)
log.Info(c.org.LogPre("clean soft deleted resources completed"))
log.Info(c.org.Logf("clean soft deleted resources completed"))
}

func (c *Cleaner) cleanDirtyData() {
log.Info(c.org.LogPre("clean dirty data started"))
log.Info(c.org.Logf("clean dirty data started"))
c.cleanNetworkDirty()
c.cleanVRouterDirty()
c.cleanPodIngressDirty()
c.cleanPodServiceDirty()
c.cleanPodNodeDirty()
c.cleanPodDirty()
c.cleanVInterfaceDirty()
log.Info(c.org.LogPre("clean dirty data completed"))
log.Info(c.org.Logf("clean dirty data completed"))
}

func (c *Cleaner) cleanNetworkDirty() {
Expand All @@ -269,7 +269,7 @@ func (c *Cleaner) cleanNetworkDirty() {
c.org.DB.Where("vl2id NOT IN ?", networkIDs).Find(&subnets)
if len(subnets) != 0 {
c.org.DB.Delete(&subnets)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_SUBNET_EN, ctrlrcommon.RESOURCE_TYPE_NETWORK_EN, subnets)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_SUBNET_EN, ctrlrcommon.RESOURCE_TYPE_NETWORK_EN, subnets)))
}
}
}
Expand All @@ -281,7 +281,7 @@ func (c *Cleaner) cleanVRouterDirty() {
c.org.DB.Where("vnet_id NOT IN ?", vrouterIDs).Find(&rts)
if len(rts) != 0 {
c.org.DB.Delete(&rts)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_ROUTING_TABLE_EN, ctrlrcommon.RESOURCE_TYPE_VROUTER_EN, rts)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_ROUTING_TABLE_EN, ctrlrcommon.RESOURCE_TYPE_VROUTER_EN, rts)))
}
}
}
Expand All @@ -293,14 +293,14 @@ func (c *Cleaner) cleanPodIngressDirty() {
c.org.DB.Where("pod_ingress_id NOT IN ?", podIngressIDs).Find(&podIngressRules)
if len(podIngressRules) != 0 {
c.org.DB.Delete(&podIngressRules)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_RULE_EN, ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_EN, podIngressRules)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_RULE_EN, ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_EN, podIngressRules)))
}

var podIngressRuleBkds []mysql.PodIngressRuleBackend
c.org.DB.Where("pod_ingress_id NOT IN ?", podIngressIDs).Find(&podIngressRuleBkds)
if len(podIngressRuleBkds) != 0 {
c.org.DB.Delete(&podIngressRuleBkds)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_RULE_BACKEND_EN, ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_EN, podIngressRuleBkds)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_RULE_BACKEND_EN, ctrlrcommon.RESOURCE_TYPE_POD_INGRESS_EN, podIngressRuleBkds)))
}
}
}
Expand All @@ -312,21 +312,21 @@ func (c *Cleaner) cleanPodServiceDirty() {
c.org.DB.Where("pod_service_id NOT IN ?", podServiceIDs).Find(&podServicePorts)
if len(podServicePorts) != 0 {
c.org.DB.Delete(&podServicePorts)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_PORT_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, podServicePorts)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_PORT_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, podServicePorts)))
}

var podGroupPorts []mysql.PodGroupPort
c.org.DB.Where("pod_service_id NOT IN ?", podServiceIDs).Find(&podGroupPorts)
if len(podGroupPorts) != 0 {
c.org.DB.Delete(&podGroupPorts)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_GROUP_PORT_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, podGroupPorts)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_GROUP_PORT_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, podGroupPorts)))
}

var vifs []mysql.VInterface
c.org.DB.Where("devicetype = ? AND deviceid NOT IN ?", ctrlrcommon.VIF_DEVICE_TYPE_POD_SERVICE, podServiceIDs).Find(&vifs)
if len(vifs) != 0 {
c.org.DB.Delete(&vifs)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, vifs)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, vifs)))
}
}
}
Expand All @@ -338,21 +338,21 @@ func (c *Cleaner) cleanPodNodeDirty() {
c.org.DB.Where("devicetype = ? AND deviceid NOT IN ?", ctrlrcommon.VIF_DEVICE_TYPE_POD_NODE, podNodeIDs).Find(&vifs)
if len(vifs) != 0 {
c.org.DB.Delete(&vifs)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, vifs)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, vifs)))
}

var vmPodNodeConns []mysql.VMPodNodeConnection
c.org.DB.Where("pod_node_id NOT IN ?", podNodeIDs).Find(&vmPodNodeConns)
if len(vmPodNodeConns) != 0 {
c.org.DB.Delete(&vmPodNodeConns)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VM_POD_NODE_CONNECTION_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, vmPodNodeConns)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VM_POD_NODE_CONNECTION_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, vmPodNodeConns)))
}

var pods []mysql.Pod
c.org.DB.Where("pod_node_id NOT IN ?", podNodeIDs).Find(&pods)
if len(pods) != 0 {
c.org.DB.Delete(&pods)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, pods)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_POD_EN, ctrlrcommon.RESOURCE_TYPE_POD_NODE_EN, pods)))
}
}
}
Expand All @@ -364,7 +364,7 @@ func (c *Cleaner) cleanPodDirty() {
c.org.DB.Where("devicetype = ? AND deviceid NOT IN ?", ctrlrcommon.VIF_DEVICE_TYPE_POD, podIDs).Find(&vifs)
if len(vifs) != 0 {
c.org.DB.Delete(&vifs)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_EN, vifs)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, ctrlrcommon.RESOURCE_TYPE_POD_EN, vifs)))
}
}
}
Expand All @@ -376,13 +376,13 @@ func (c *Cleaner) cleanVInterfaceDirty() {
c.org.DB.Where("vifid NOT IN ?", vifIDs).Find(&lanIPs)
if len(lanIPs) != 0 {
c.org.DB.Delete(&lanIPs)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_LAN_IP_EN, ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, lanIPs)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_LAN_IP_EN, ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, lanIPs)))
}
var wanIPs []mysql.WANIP
c.org.DB.Where("vifid NOT IN ?", vifIDs).Find(&wanIPs)
if len(wanIPs) != 0 {
c.org.DB.Delete(&wanIPs)
log.Error(c.org.LogPre(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_WAN_IP_EN, ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, wanIPs)))
log.Error(c.org.Logf(formatLogDeleteABecauseBHasGone(ctrlrcommon.RESOURCE_TYPE_WAN_IP_EN, ctrlrcommon.RESOURCE_TYPE_VINTERFACE_EN, wanIPs)))
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions server/controller/recorder/common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ func (m *Metadata) SetSubDomain(subDomain mysql.SubDomain) {
m.Logger.SetSubDomainName(subDomain.Name)
}

// LogPre adds org id, domain info, sub_domain info to logs
func (m *Metadata) LogPre(format string, a ...any) string {
// Logf adds org id, domain info, sub_domain info to logs
func (m *Metadata) Logf(format string, a ...any) string {
return m.Logger.AddPre(format, a...)
}

func (m *Metadata) Log(format string) string {
return m.Logger.AddPre(format)
}

type DomainInfo struct {
mysql.Domain
}
Expand Down
8 changes: 6 additions & 2 deletions server/controller/recorder/common/org.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ func NewORG(id int) (*ORG, error) {
}, err
}

// LogPre adds org id, domain info, sub_domain info to logs
func (o *ORG) LogPre(format string, a ...any) string {
// Logf adds org id, domain info, sub_domain info to logs
func (o *ORG) Logf(format string, a ...any) string {
return o.Logger.AddPre(format, a...)
}

func (o *ORG) Log(format string) string {
return o.Logger.AddPre(format)
}

func FmtORGID(id int) string {
return fmt.Sprintf("[OID-%d] ", id)
}
22 changes: 11 additions & 11 deletions server/controller/recorder/db/idmng/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newIDManager(cfg RecorderConfig, orgID int) (*IDManager, error) {
}

func (m *IDManager) Refresh() error {
log.Info(m.org.LogPre("refresh id pools"))
log.Info(m.org.Logf("refresh id pools"))
var result error
for _, idPool := range m.resourceTypeToIDPool {
err := idPool.refresh()
Expand All @@ -93,7 +93,7 @@ func (m *IDManager) Refresh() error {
func (m *IDManager) AllocateIDs(resourceType string, count int) []int {
idPool, ok := m.resourceTypeToIDPool[resourceType]
if !ok {
log.Error(m.org.LogPre("resource type (%s) does not need to allocate id", resourceType))
log.Error(m.org.Logf("resource type (%s) does not need to allocate id", resourceType))
return []int{}
}
ids, _ := idPool.allocate(count)
Expand All @@ -103,7 +103,7 @@ func (m *IDManager) AllocateIDs(resourceType string, count int) []int {
func (m *IDManager) RecycleIDs(resourceType string, ids []int) {
idPool, ok := m.resourceTypeToIDPool[resourceType]
if !ok {
log.Error(m.org.LogPre("resource type (%s) does not need to allocate id", resourceType))
log.Error(m.org.Logf("resource type (%s) does not need to allocate id", resourceType))
return
}
idPool.recycle(ids)
Expand Down Expand Up @@ -135,7 +135,7 @@ func newIDPool[MT MySQLModel](org *common.ORG, resourceType string, max int) *ID
}

func (p *IDPool[MT]) refresh() error {
log.Info(p.org.LogPre("refresh %s id pools started", p.resourceType))
log.Info(p.org.Logf("refresh %s id pools started", p.resourceType))

var items []*MT
var err error
Expand All @@ -148,7 +148,7 @@ func (p *IDPool[MT]) refresh() error {
err = p.org.DB.Unscoped().Select("id").Find(&items).Error
}
if err != nil {
log.Error(p.org.LogPre("db query %s failed: %v", p.resourceType, err))
log.Error(p.org.Logf("db query %s failed: %v", p.resourceType, err))
return err
}
inUseIDsSet := mapset.NewSet[int]()
Expand Down Expand Up @@ -192,7 +192,7 @@ func (p *IDPool[MT]) refresh() error {
}
p.usableIDs = usableIDs

log.Info(p.org.LogPre("refresh %s id pools (usable ids count: %d) completed", p.resourceType, len(p.usableIDs)))
log.Info(p.org.Logf("refresh %s id pools (usable ids count: %d) completed", p.resourceType, len(p.usableIDs)))
return nil
}

Expand All @@ -203,7 +203,7 @@ func (p *IDPool[MT]) allocate(count int) (ids []int, err error) {
defer p.mutex.Unlock()

if len(p.usableIDs) == 0 {
log.Error(p.org.LogPre("%s has no more usable ids", p.resourceType))
log.Error(p.org.Logf("%s has no more usable ids", p.resourceType))
return
}

Expand All @@ -218,18 +218,18 @@ func (p *IDPool[MT]) allocate(count int) (ids []int, err error) {
var dbItems []*MT
err = p.org.DB.Unscoped().Where("id IN ?", ids).Find(&dbItems).Error
if err != nil {
log.Error(p.org.LogPre("db query %s failed: %v", p.resourceType, err))
log.Error(p.org.Logf("db query %s failed: %v", p.resourceType, err))
return
}
if len(dbItems) != 0 {
inUseIDs := make([]int, 0, len(dbItems))
for _, item := range dbItems {
inUseIDs = append(inUseIDs, (*item).GetID())
}
log.Info(p.org.LogPre("%s ids: %+v are in use.", p.resourceType, inUseIDs))
log.Info(p.org.Logf("%s ids: %+v are in use.", p.resourceType, inUseIDs))
ids = mapset.NewSet(ids...).Difference(mapset.NewSet(inUseIDs...)).ToSlice()
}
log.Info(p.org.LogPre("allocate %s ids: %v (expected count: %d, true count: %d)", p.resourceType, ids, count, len(ids)))
log.Info(p.org.Logf("allocate %s ids: %v (expected count: %d, true count: %d)", p.resourceType, ids, count, len(ids)))
return
}

Expand All @@ -239,5 +239,5 @@ func (p *IDPool[MT]) recycle(ids []int) {

sort.IntSlice(ids).Sort()
p.usableIDs = append(p.usableIDs, ids...)
log.Info(p.org.LogPre("recycle %s ids: %v", p.resourceType, ids))
log.Info(p.org.Logf("recycle %s ids: %v", p.resourceType, ids))
}
Loading

0 comments on commit e4a8573

Please sign in to comment.