From 7c5d3d0332fca36fa1cbd072685202d8fb48214c Mon Sep 17 00:00:00 2001 From: Leigh MacDonald Date: Mon, 5 Aug 2024 23:45:19 -0600 Subject: [PATCH] Add demo orphan cleanup. Run demo delete call in addition to assets because FK doesnt cascade for some reason --- internal/asset/asset_repository_local.go | 12 ++-- internal/asset/asset_usecase.go | 4 ++ internal/demo/demo_respository.go | 11 ++- internal/demo/demo_usecase.go | 88 ++++++++++++++++++++---- internal/demo/fetcher.go | 28 ++++---- internal/domain/asset.go | 2 + internal/domain/demo.go | 1 + internal/test/demos_test.go | 43 ++++++++++++ internal/test/main_test.go | 4 ++ 9 files changed, 160 insertions(+), 33 deletions(-) create mode 100644 internal/test/demos_test.go diff --git a/internal/asset/asset_repository_local.go b/internal/asset/asset_repository_local.go index 63c68061..254b2a3d 100644 --- a/internal/asset/asset_repository_local.go +++ b/internal/asset/asset_repository_local.go @@ -36,7 +36,7 @@ func (l localRepository) Put(ctx context.Context, asset domain.Asset, body io.Re return domain.Asset{}, errExisting } - outPath, errOutPath := l.genAssetPath(asset.HashString()) + outPath, errOutPath := l.GenAssetPath(asset.HashString()) if errOutPath != nil { return domain.Asset{}, errOutPath } @@ -84,7 +84,7 @@ func (l localRepository) Delete(ctx context.Context, assetID uuid.UUID) (int64, return 0, l.db.DBErr(errExec) } - assetPath, errAssetPath := l.genAssetPath(asset.HashString()) + assetPath, errAssetPath := l.GenAssetPath(asset.HashString()) if errAssetPath != nil { return 0, errAssetPath } @@ -115,7 +115,7 @@ func (l localRepository) Get(ctx context.Context, assetID uuid.UUID) (domain.Ass return domain.Asset{}, nil, errAsset } - assetPath, errAssetPath := l.genAssetPath(asset.HashString()) + assetPath, errAssetPath := l.GenAssetPath(asset.HashString()) if errAssetPath != nil { return domain.Asset{}, nil, errAssetPath } @@ -128,7 +128,7 @@ func (l localRepository) Get(ctx context.Context, assetID uuid.UUID) (domain.Ass return asset, reader, nil } -func (l localRepository) genAssetPath(hash string) (string, error) { +func (l localRepository) GenAssetPath(hash string) (string, error) { if len(hash) < 2 { return "", domain.ErrInvalidParameter } @@ -169,7 +169,7 @@ func (l localRepository) getAssetByUUID(ctx context.Context, assetID uuid.UUID) asset.AuthorID = steamid.New(authorID) - assetPath, errAssetPath := l.genAssetPath(asset.HashString()) + assetPath, errAssetPath := l.GenAssetPath(asset.HashString()) if errAssetPath != nil { return domain.Asset{}, errAssetPath } @@ -202,7 +202,7 @@ func (l localRepository) getAssetByHash(ctx context.Context, hash []byte) (domai asset.AuthorID = steamid.New(authorID) - assetPath, errAssetPath := l.genAssetPath(asset.HashString()) + assetPath, errAssetPath := l.GenAssetPath(asset.HashString()) if errAssetPath != nil { return domain.Asset{}, errAssetPath } diff --git a/internal/asset/asset_usecase.go b/internal/asset/asset_usecase.go index dbfda8d1..0458793d 100644 --- a/internal/asset/asset_usecase.go +++ b/internal/asset/asset_usecase.go @@ -83,6 +83,10 @@ func (s assets) Delete(ctx context.Context, assetID uuid.UUID) (int64, error) { return size, nil } +func (s assets) GenAssetPath(hash string) (string, error) { + return s.repository.GenAssetPath(hash) +} + func generateFileHash(file io.Reader) ([]byte, error) { hasher := sha256.New() if _, err := io.Copy(hasher, file); err != nil { diff --git a/internal/demo/demo_respository.go b/internal/demo/demo_respository.go index 439059e6..f6037a60 100644 --- a/internal/demo/demo_respository.go +++ b/internal/demo/demo_respository.go @@ -24,7 +24,7 @@ func (r *demoRepository) ExpiredDemos(ctx context.Context, limit uint64) ([]doma Select("d.demo_id", "d.title", "d.asset_id"). From("demo d"). LeftJoin("report r on d.demo_id = r.demo_id"). - Where("r.demo_id > 0"). + Where("d.archive = false"). OrderBy("d.demo_id"). Offset(limit)) if errRow != nil { @@ -204,3 +204,12 @@ func (r *demoRepository) updateDemo(ctx context.Context, demoFile *domain.DemoFi return nil } + +func (r *demoRepository) Delete(ctx context.Context, demoID int64) error { + const query = `DELETE FROM demo WHERE demo_id = $1` + if err := r.db.Exec(ctx, query, demoID); err != nil { + return r.db.DBErr(err) + } + + return nil +} diff --git a/internal/demo/demo_usecase.go b/internal/demo/demo_usecase.go index e7f15c56..bc5a484a 100644 --- a/internal/demo/demo_usecase.go +++ b/internal/demo/demo_usecase.go @@ -12,6 +12,7 @@ import ( "github.com/gin-gonic/gin" "github.com/leighmacdonald/gbans/internal/domain" "github.com/leighmacdonald/gbans/pkg/demoparser" + "github.com/leighmacdonald/gbans/pkg/fs" "github.com/leighmacdonald/gbans/pkg/log" "github.com/ricochet2200/go-disk-usage/du" ) @@ -69,7 +70,7 @@ func diskPercentageUsed(path string) float32 { return info.Usage() * 100 } -func (d demoUsecase) truncateBySpace(ctx context.Context, root string, maxAllowedPctUsed float32) (int, int64, error) { +func (d demoUsecase) TruncateBySpace(ctx context.Context, root string, maxAllowedPctUsed float32) (int, int64, error) { var ( count int size int64 @@ -105,7 +106,7 @@ func (d demoUsecase) truncateBySpace(ctx context.Context, root string, maxAllowe } } -func (d demoUsecase) truncateByCount(ctx context.Context, maxCount uint64) (int, int64, error) { +func (d demoUsecase) TruncateByCount(ctx context.Context, maxCount uint64) (int, int64, error) { var ( count int size int64 @@ -125,7 +126,7 @@ func (d demoUsecase) truncateByCount(ctx context.Context, maxCount uint64) (int, } for _, demo := range expired { - // Dropping asset will cascade to demo + // FIXME cascade delete does not work???? demoSize, errDrop := d.asset.Delete(ctx, demo.AssetID) if errDrop != nil { slog.Error("Failed to remove demo asset", log.ErrAttr(errDrop), @@ -134,6 +135,13 @@ func (d demoUsecase) truncateByCount(ctx context.Context, maxCount uint64) (int, continue } + if err := d.repository.Delete(ctx, demo.DemoID); err != nil { + slog.Error("Failed to remove demo entry", + slog.Int64("demo_id", demo.DemoID), + slog.String("asset_id", demo.AssetID.String()), + log.ErrAttr(err)) + } + size += demoSize count++ } @@ -160,9 +168,9 @@ func (d demoUsecase) executeCleanup(ctx context.Context) { switch conf.Demo.DemoCleanupStrategy { case domain.DemoStrategyPctFree: - count, size, err = d.truncateBySpace(ctx, conf.Demo.DemoCleanupMount, conf.Demo.DemoCleanupMinPct) + count, size, err = d.TruncateBySpace(ctx, conf.Demo.DemoCleanupMount, conf.Demo.DemoCleanupMinPct) case domain.DemoStrategyCount: - count, size, err = d.truncateByCount(ctx, conf.Demo.DemoCountLimit) + count, size, err = d.TruncateByCount(ctx, conf.Demo.DemoCountLimit) } if err != nil { @@ -178,15 +186,24 @@ func (d demoUsecase) TriggerCleanup() { func (d demoUsecase) Start(ctx context.Context) { ticker := time.NewTicker(time.Hour) + tickerOrphans := time.NewTicker(time.Hour * 24) d.executeCleanup(ctx) + if err := d.RemoveOrphans(ctx); err != nil { + slog.Error("Failed to execute orphans", log.ErrAttr(err)) + } + for { select { case <-ticker.C: d.cleanupChan <- true case <-d.cleanupChan: d.executeCleanup(ctx) + case <-tickerOrphans.C: + if err := d.RemoveOrphans(ctx); err != nil { + slog.Error("Failed to execute orphans", log.ErrAttr(err)) + } case <-ctx.Done(): return } @@ -228,15 +245,20 @@ func (d demoUsecase) CreateFromAsset(ctx context.Context, asset domain.Asset, se mapName = nameParts[0] } - var demoInfo demoparser.DemoInfo - if errParse := demoparser.Parse(ctx, asset.LocalPath, &demoInfo); errParse != nil { - return nil, errParse - } - intStats := map[string]gin.H{} - for _, steamID := range demoInfo.SteamIDs() { - intStats[steamID.String()] = gin.H{} + // temp thing until proper demo parsing is implemented + if d.config.Config().General.Mode != domain.TestMode { + var demoInfo demoparser.DemoInfo + if errParse := demoparser.Parse(ctx, asset.LocalPath, &demoInfo); errParse != nil { + return nil, errParse + } + + for _, steamID := range demoInfo.SteamIDs() { + intStats[steamID.String()] = gin.H{} + } + } else { + intStats[d.config.Config().Owner] = gin.H{} } timeStr := fmt.Sprintf("%s-%s", namePartsAll[0], namePartsAll[1]) @@ -265,3 +287,45 @@ func (d demoUsecase) CreateFromAsset(ctx context.Context, asset domain.Asset, se return &newDemo, nil } + +func (d demoUsecase) RemoveOrphans(ctx context.Context) error { + demos, errDemos := d.GetDemos(ctx) + if errDemos != nil { + return errDemos + } + + for _, demo := range demos { + var remove bool + asset, _, errAsset := d.asset.Get(ctx, demo.AssetID) + if errAsset != nil { + if errors.Is(errAsset, domain.ErrNoResult) { + remove = true + } else { + return errAsset + } + } else { + localPath, errPath := d.asset.GenAssetPath(asset.HashString()) + if errPath != nil { + return errPath + } + + remove = !fs.Exists(localPath) + } + + if !remove { + continue + } + + if _, err := d.asset.Delete(ctx, asset.AssetID); err != nil { + slog.Error("Failed to remove orphan demo asset", log.ErrAttr(err)) + } + + if err := d.repository.Delete(ctx, demo.DemoID); err != nil { + slog.Error("Failed to remove orphan demo entry", log.ErrAttr(err)) + } + + slog.Info("Removed orphan demo file", slog.String("filename", demo.Title)) + } + + return nil +} diff --git a/internal/demo/fetcher.go b/internal/demo/fetcher.go index d1d572a4..a644d3df 100644 --- a/internal/demo/fetcher.go +++ b/internal/demo/fetcher.go @@ -19,10 +19,10 @@ import ( "github.com/viant/afs/storage" ) -type demoUpdate struct { - name string - server domain.Server - content []byte +type UploadedDemo struct { + Name string + Server domain.Server + Content []byte } type Fetcher struct { @@ -31,7 +31,7 @@ type Fetcher struct { configUsecase domain.ConfigUsecase assetUsecase domain.AssetUsecase demoUsecase domain.DemoUsecase - demoChan chan demoUpdate + demoChan chan UploadedDemo } func NewFetcher(database database.Database, configUsecase domain.ConfigUsecase, serversUsecase domain.ServersUsecase, @@ -43,7 +43,7 @@ func NewFetcher(database database.Database, configUsecase domain.ConfigUsecase, serversUsecase: serversUsecase, assetUsecase: assetUsecase, demoUsecase: demoUsecase, - demoChan: make(chan demoUpdate), + demoChan: make(chan UploadedDemo), } } @@ -52,18 +52,18 @@ func (d Fetcher) Start(ctx context.Context) { sshExec.Start(ctx) } -func (d Fetcher) onDemoReceived(ctx context.Context, demo demoUpdate) error { +func (d Fetcher) OnDemoReceived(ctx context.Context, demo UploadedDemo) error { slog.Debug("Got new demo", - slog.String("server", demo.server.ShortName), - slog.String("name", demo.name)) + slog.String("server", demo.Server.ShortName), + slog.String("name", demo.Name)) - demoAsset, errNewAsset := d.assetUsecase.Create(ctx, steamid.SteamID{}, - domain.BucketDemo, demo.name, bytes.NewReader(demo.content)) + demoAsset, errNewAsset := d.assetUsecase.Create(ctx, steamid.New(d.configUsecase.Config().Owner), + domain.BucketDemo, demo.Name, bytes.NewReader(demo.Content)) if errNewAsset != nil { return errNewAsset } - _, errDemo := d.demoUsecase.CreateFromAsset(ctx, demoAsset, demo.server.ServerID) + _, errDemo := d.demoUsecase.CreateFromAsset(ctx, demoAsset, demo.Server.ServerID) if errDemo != nil { // Cleanup the asset not attached to a demo if _, errDelete := d.assetUsecase.Delete(ctx, demoAsset.AssetID); errDelete != nil { @@ -120,9 +120,9 @@ func (d Fetcher) OnClientConnect(ctx context.Context, client storage.Storager, s _ = reader.Close() // need Seeker, but afs does not provide - demo := demoUpdate{name: file.Name(), server: server, content: data} + demo := UploadedDemo{Name: file.Name(), Server: server, Content: data} - if errDemo := d.onDemoReceived(ctx, demo); errDemo != nil { + if errDemo := d.OnDemoReceived(ctx, demo); errDemo != nil { if !errors.Is(errDemo, domain.ErrAssetTooLarge) { slog.Error("Failed to create new demo asset", log.ErrAttr(errDemo)) diff --git a/internal/domain/asset.go b/internal/domain/asset.go index 287099e2..4b39c8ca 100644 --- a/internal/domain/asset.go +++ b/internal/domain/asset.go @@ -25,12 +25,14 @@ type AssetRepository interface { Get(ctx context.Context, uuid uuid.UUID) (Asset, io.ReadSeeker, error) Put(ctx context.Context, asset Asset, body io.ReadSeeker) (Asset, error) Delete(ctx context.Context, uuid uuid.UUID) (int64, error) + GenAssetPath(hash string) (string, error) } type AssetUsecase interface { Create(ctx context.Context, author steamid.SteamID, bucket Bucket, fileName string, content io.ReadSeeker) (Asset, error) Get(ctx context.Context, assetID uuid.UUID) (Asset, io.ReadSeeker, error) Delete(ctx context.Context, assetID uuid.UUID) (int64, error) + GenAssetPath(hash string) (string, error) } type UserUploadedFile struct { diff --git a/internal/domain/demo.go b/internal/domain/demo.go index c24589bd..c9ec9f7d 100644 --- a/internal/domain/demo.go +++ b/internal/domain/demo.go @@ -25,6 +25,7 @@ type DemoRepository interface { GetDemoByName(ctx context.Context, demoName string, demoFile *DemoFile) error GetDemos(ctx context.Context) ([]DemoFile, error) SaveDemo(ctx context.Context, demoFile *DemoFile) error + Delete(ctx context.Context, demoID int64) error } type DemoPlayerStats struct { diff --git a/internal/test/demos_test.go b/internal/test/demos_test.go new file mode 100644 index 00000000..ded2a50c --- /dev/null +++ b/internal/test/demos_test.go @@ -0,0 +1,43 @@ +package test_test + +import ( + "context" + "crypto/rand" + "fmt" + "testing" + + "github.com/leighmacdonald/gbans/internal/demo" + "github.com/leighmacdonald/gbans/internal/domain" + "github.com/stretchr/testify/require" +) + +func TestDemosCleanup(t *testing.T) { + ctx := context.Background() + fetcher := demo.NewFetcher(tempDB, configUC, serversUC, assetUC, demoUC) + go demoUC.Start(ctx) + + for demoNum := range 10 { + content := make([]byte, 100000) + _, err := rand.Read(content) + require.NoError(t, err) + + require.NoError(t, fetcher.OnDemoReceived(ctx, demo.UploadedDemo{ + Name: fmt.Sprintf("2023111%d-063943-koth_harvest_final.dem", demoNum), + Server: testServer, + Content: content, + })) + } + + conf := configUC.Config() + conf.Demo.DemoCleanupEnabled = true + conf.Demo.DemoCleanupStrategy = domain.DemoStrategyCount + conf.Demo.DemoCountLimit = 5 + + require.NoError(t, configUC.Write(ctx, conf)) + + demoUC.TriggerCleanup() + + allDemos, err := demoUC.GetDemos(ctx) + require.NoError(t, err) + require.Len(t, len(allDemos), 5) +} diff --git a/internal/test/main_test.go b/internal/test/main_test.go index 190dea21..3bf5893e 100644 --- a/internal/test/main_test.go +++ b/internal/test/main_test.go @@ -53,6 +53,7 @@ import ( var ( dbContainer *postgresContainer + tempDB database.Database testServer domain.Server testBan domain.BannedSteamPerson testTarget domain.Person @@ -178,6 +179,8 @@ func TestMain(m *testing.M) { testServer = server + getOwner() + mod := getModerator() target := getUser() @@ -203,6 +206,7 @@ func TestMain(m *testing.M) { testBan = bannedPerson testTarget = target + tempDB = databaseConn m.Run() }