Skip to content

Commit

Permalink
Add demo orphan cleanup. Run demo delete call in addition to assets b…
Browse files Browse the repository at this point in the history
…ecause FK doesnt cascade for some reason
  • Loading branch information
leighmacdonald committed Aug 6, 2024
1 parent 7a22a08 commit 7c5d3d0
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 33 deletions.
12 changes: 6 additions & 6 deletions internal/asset/asset_repository_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (l localRepository) Put(ctx context.Context, asset domain.Asset, body io.Re
return domain.Asset{}, errExisting
}

outPath, errOutPath := l.genAssetPath(asset.HashString())
outPath, errOutPath := l.GenAssetPath(asset.HashString())
if errOutPath != nil {
return domain.Asset{}, errOutPath
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func (l localRepository) Delete(ctx context.Context, assetID uuid.UUID) (int64,
return 0, l.db.DBErr(errExec)
}

assetPath, errAssetPath := l.genAssetPath(asset.HashString())
assetPath, errAssetPath := l.GenAssetPath(asset.HashString())
if errAssetPath != nil {
return 0, errAssetPath
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func (l localRepository) Get(ctx context.Context, assetID uuid.UUID) (domain.Ass
return domain.Asset{}, nil, errAsset
}

assetPath, errAssetPath := l.genAssetPath(asset.HashString())
assetPath, errAssetPath := l.GenAssetPath(asset.HashString())
if errAssetPath != nil {
return domain.Asset{}, nil, errAssetPath
}
Expand All @@ -128,7 +128,7 @@ func (l localRepository) Get(ctx context.Context, assetID uuid.UUID) (domain.Ass
return asset, reader, nil
}

func (l localRepository) genAssetPath(hash string) (string, error) {
func (l localRepository) GenAssetPath(hash string) (string, error) {
if len(hash) < 2 {
return "", domain.ErrInvalidParameter
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func (l localRepository) getAssetByUUID(ctx context.Context, assetID uuid.UUID)

asset.AuthorID = steamid.New(authorID)

assetPath, errAssetPath := l.genAssetPath(asset.HashString())
assetPath, errAssetPath := l.GenAssetPath(asset.HashString())
if errAssetPath != nil {
return domain.Asset{}, errAssetPath
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func (l localRepository) getAssetByHash(ctx context.Context, hash []byte) (domai

asset.AuthorID = steamid.New(authorID)

assetPath, errAssetPath := l.genAssetPath(asset.HashString())
assetPath, errAssetPath := l.GenAssetPath(asset.HashString())
if errAssetPath != nil {
return domain.Asset{}, errAssetPath
}
Expand Down
4 changes: 4 additions & 0 deletions internal/asset/asset_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (s assets) Delete(ctx context.Context, assetID uuid.UUID) (int64, error) {
return size, nil
}

func (s assets) GenAssetPath(hash string) (string, error) {
return s.repository.GenAssetPath(hash)
}

func generateFileHash(file io.Reader) ([]byte, error) {
hasher := sha256.New()
if _, err := io.Copy(hasher, file); err != nil {
Expand Down
11 changes: 10 additions & 1 deletion internal/demo/demo_respository.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (r *demoRepository) ExpiredDemos(ctx context.Context, limit uint64) ([]doma
Select("d.demo_id", "d.title", "d.asset_id").
From("demo d").
LeftJoin("report r on d.demo_id = r.demo_id").
Where("r.demo_id > 0").
Where("d.archive = false").
OrderBy("d.demo_id").
Offset(limit))
if errRow != nil {
Expand Down Expand Up @@ -204,3 +204,12 @@ func (r *demoRepository) updateDemo(ctx context.Context, demoFile *domain.DemoFi

return nil
}

func (r *demoRepository) Delete(ctx context.Context, demoID int64) error {
const query = `DELETE FROM demo WHERE demo_id = $1`
if err := r.db.Exec(ctx, query, demoID); err != nil {
return r.db.DBErr(err)
}

return nil
}
88 changes: 76 additions & 12 deletions internal/demo/demo_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/leighmacdonald/gbans/internal/domain"
"github.com/leighmacdonald/gbans/pkg/demoparser"
"github.com/leighmacdonald/gbans/pkg/fs"
"github.com/leighmacdonald/gbans/pkg/log"
"github.com/ricochet2200/go-disk-usage/du"
)
Expand Down Expand Up @@ -69,7 +70,7 @@ func diskPercentageUsed(path string) float32 {
return info.Usage() * 100
}

func (d demoUsecase) truncateBySpace(ctx context.Context, root string, maxAllowedPctUsed float32) (int, int64, error) {
func (d demoUsecase) TruncateBySpace(ctx context.Context, root string, maxAllowedPctUsed float32) (int, int64, error) {
var (
count int
size int64
Expand Down Expand Up @@ -105,7 +106,7 @@ func (d demoUsecase) truncateBySpace(ctx context.Context, root string, maxAllowe
}
}

func (d demoUsecase) truncateByCount(ctx context.Context, maxCount uint64) (int, int64, error) {
func (d demoUsecase) TruncateByCount(ctx context.Context, maxCount uint64) (int, int64, error) {
var (
count int
size int64
Expand All @@ -125,7 +126,7 @@ func (d demoUsecase) truncateByCount(ctx context.Context, maxCount uint64) (int,
}

for _, demo := range expired {
// Dropping asset will cascade to demo
// FIXME cascade delete does not work????
demoSize, errDrop := d.asset.Delete(ctx, demo.AssetID)
if errDrop != nil {
slog.Error("Failed to remove demo asset", log.ErrAttr(errDrop),
Expand All @@ -134,6 +135,13 @@ func (d demoUsecase) truncateByCount(ctx context.Context, maxCount uint64) (int,
continue
}

if err := d.repository.Delete(ctx, demo.DemoID); err != nil {
slog.Error("Failed to remove demo entry",
slog.Int64("demo_id", demo.DemoID),
slog.String("asset_id", demo.AssetID.String()),
log.ErrAttr(err))
}

size += demoSize
count++
}
Expand All @@ -160,9 +168,9 @@ func (d demoUsecase) executeCleanup(ctx context.Context) {

switch conf.Demo.DemoCleanupStrategy {
case domain.DemoStrategyPctFree:
count, size, err = d.truncateBySpace(ctx, conf.Demo.DemoCleanupMount, conf.Demo.DemoCleanupMinPct)
count, size, err = d.TruncateBySpace(ctx, conf.Demo.DemoCleanupMount, conf.Demo.DemoCleanupMinPct)
case domain.DemoStrategyCount:
count, size, err = d.truncateByCount(ctx, conf.Demo.DemoCountLimit)
count, size, err = d.TruncateByCount(ctx, conf.Demo.DemoCountLimit)
}

if err != nil {
Expand All @@ -178,15 +186,24 @@ func (d demoUsecase) TriggerCleanup() {

func (d demoUsecase) Start(ctx context.Context) {
ticker := time.NewTicker(time.Hour)
tickerOrphans := time.NewTicker(time.Hour * 24)

d.executeCleanup(ctx)

if err := d.RemoveOrphans(ctx); err != nil {
slog.Error("Failed to execute orphans", log.ErrAttr(err))
}

for {
select {
case <-ticker.C:
d.cleanupChan <- true
case <-d.cleanupChan:
d.executeCleanup(ctx)
case <-tickerOrphans.C:
if err := d.RemoveOrphans(ctx); err != nil {
slog.Error("Failed to execute orphans", log.ErrAttr(err))
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -228,15 +245,20 @@ func (d demoUsecase) CreateFromAsset(ctx context.Context, asset domain.Asset, se
mapName = nameParts[0]
}

var demoInfo demoparser.DemoInfo
if errParse := demoparser.Parse(ctx, asset.LocalPath, &demoInfo); errParse != nil {
return nil, errParse
}

intStats := map[string]gin.H{}

for _, steamID := range demoInfo.SteamIDs() {
intStats[steamID.String()] = gin.H{}
// temp thing until proper demo parsing is implemented
if d.config.Config().General.Mode != domain.TestMode {
var demoInfo demoparser.DemoInfo
if errParse := demoparser.Parse(ctx, asset.LocalPath, &demoInfo); errParse != nil {
return nil, errParse
}

for _, steamID := range demoInfo.SteamIDs() {
intStats[steamID.String()] = gin.H{}
}
} else {
intStats[d.config.Config().Owner] = gin.H{}
}

timeStr := fmt.Sprintf("%s-%s", namePartsAll[0], namePartsAll[1])
Expand Down Expand Up @@ -265,3 +287,45 @@ func (d demoUsecase) CreateFromAsset(ctx context.Context, asset domain.Asset, se

return &newDemo, nil
}

func (d demoUsecase) RemoveOrphans(ctx context.Context) error {
demos, errDemos := d.GetDemos(ctx)
if errDemos != nil {
return errDemos
}

for _, demo := range demos {
var remove bool
asset, _, errAsset := d.asset.Get(ctx, demo.AssetID)
if errAsset != nil {
if errors.Is(errAsset, domain.ErrNoResult) {
remove = true
} else {
return errAsset
}
} else {
localPath, errPath := d.asset.GenAssetPath(asset.HashString())
if errPath != nil {
return errPath
}

remove = !fs.Exists(localPath)
}

if !remove {
continue
}

if _, err := d.asset.Delete(ctx, asset.AssetID); err != nil {
slog.Error("Failed to remove orphan demo asset", log.ErrAttr(err))
}

if err := d.repository.Delete(ctx, demo.DemoID); err != nil {
slog.Error("Failed to remove orphan demo entry", log.ErrAttr(err))
}

slog.Info("Removed orphan demo file", slog.String("filename", demo.Title))
}

return nil
}
28 changes: 14 additions & 14 deletions internal/demo/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/viant/afs/storage"
)

type demoUpdate struct {
name string
server domain.Server
content []byte
type UploadedDemo struct {
Name string
Server domain.Server
Content []byte
}

type Fetcher struct {
Expand All @@ -31,7 +31,7 @@ type Fetcher struct {
configUsecase domain.ConfigUsecase
assetUsecase domain.AssetUsecase
demoUsecase domain.DemoUsecase
demoChan chan demoUpdate
demoChan chan UploadedDemo
}

func NewFetcher(database database.Database, configUsecase domain.ConfigUsecase, serversUsecase domain.ServersUsecase,
Expand All @@ -43,7 +43,7 @@ func NewFetcher(database database.Database, configUsecase domain.ConfigUsecase,
serversUsecase: serversUsecase,
assetUsecase: assetUsecase,
demoUsecase: demoUsecase,
demoChan: make(chan demoUpdate),
demoChan: make(chan UploadedDemo),
}
}

Expand All @@ -52,18 +52,18 @@ func (d Fetcher) Start(ctx context.Context) {
sshExec.Start(ctx)
}

func (d Fetcher) onDemoReceived(ctx context.Context, demo demoUpdate) error {
func (d Fetcher) OnDemoReceived(ctx context.Context, demo UploadedDemo) error {
slog.Debug("Got new demo",
slog.String("server", demo.server.ShortName),
slog.String("name", demo.name))
slog.String("server", demo.Server.ShortName),
slog.String("name", demo.Name))

demoAsset, errNewAsset := d.assetUsecase.Create(ctx, steamid.SteamID{},
domain.BucketDemo, demo.name, bytes.NewReader(demo.content))
demoAsset, errNewAsset := d.assetUsecase.Create(ctx, steamid.New(d.configUsecase.Config().Owner),
domain.BucketDemo, demo.Name, bytes.NewReader(demo.Content))
if errNewAsset != nil {
return errNewAsset
}

_, errDemo := d.demoUsecase.CreateFromAsset(ctx, demoAsset, demo.server.ServerID)
_, errDemo := d.demoUsecase.CreateFromAsset(ctx, demoAsset, demo.Server.ServerID)
if errDemo != nil {
// Cleanup the asset not attached to a demo
if _, errDelete := d.assetUsecase.Delete(ctx, demoAsset.AssetID); errDelete != nil {
Expand Down Expand Up @@ -120,9 +120,9 @@ func (d Fetcher) OnClientConnect(ctx context.Context, client storage.Storager, s
_ = reader.Close()

// need Seeker, but afs does not provide
demo := demoUpdate{name: file.Name(), server: server, content: data}
demo := UploadedDemo{Name: file.Name(), Server: server, Content: data}

if errDemo := d.onDemoReceived(ctx, demo); errDemo != nil {
if errDemo := d.OnDemoReceived(ctx, demo); errDemo != nil {
if !errors.Is(errDemo, domain.ErrAssetTooLarge) {
slog.Error("Failed to create new demo asset", log.ErrAttr(errDemo))

Expand Down
2 changes: 2 additions & 0 deletions internal/domain/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ type AssetRepository interface {
Get(ctx context.Context, uuid uuid.UUID) (Asset, io.ReadSeeker, error)
Put(ctx context.Context, asset Asset, body io.ReadSeeker) (Asset, error)
Delete(ctx context.Context, uuid uuid.UUID) (int64, error)
GenAssetPath(hash string) (string, error)
}

type AssetUsecase interface {
Create(ctx context.Context, author steamid.SteamID, bucket Bucket, fileName string, content io.ReadSeeker) (Asset, error)
Get(ctx context.Context, assetID uuid.UUID) (Asset, io.ReadSeeker, error)
Delete(ctx context.Context, assetID uuid.UUID) (int64, error)
GenAssetPath(hash string) (string, error)
}

type UserUploadedFile struct {
Expand Down
1 change: 1 addition & 0 deletions internal/domain/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type DemoRepository interface {
GetDemoByName(ctx context.Context, demoName string, demoFile *DemoFile) error
GetDemos(ctx context.Context) ([]DemoFile, error)
SaveDemo(ctx context.Context, demoFile *DemoFile) error
Delete(ctx context.Context, demoID int64) error
}

type DemoPlayerStats struct {
Expand Down
43 changes: 43 additions & 0 deletions internal/test/demos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package test_test

import (
"context"
"crypto/rand"
"fmt"
"testing"

"github.com/leighmacdonald/gbans/internal/demo"
"github.com/leighmacdonald/gbans/internal/domain"
"github.com/stretchr/testify/require"
)

func TestDemosCleanup(t *testing.T) {
ctx := context.Background()
fetcher := demo.NewFetcher(tempDB, configUC, serversUC, assetUC, demoUC)
go demoUC.Start(ctx)

for demoNum := range 10 {
content := make([]byte, 100000)
_, err := rand.Read(content)
require.NoError(t, err)

require.NoError(t, fetcher.OnDemoReceived(ctx, demo.UploadedDemo{
Name: fmt.Sprintf("2023111%d-063943-koth_harvest_final.dem", demoNum),
Server: testServer,
Content: content,
}))
}

conf := configUC.Config()
conf.Demo.DemoCleanupEnabled = true
conf.Demo.DemoCleanupStrategy = domain.DemoStrategyCount
conf.Demo.DemoCountLimit = 5

require.NoError(t, configUC.Write(ctx, conf))

demoUC.TriggerCleanup()

allDemos, err := demoUC.GetDemos(ctx)
require.NoError(t, err)
require.Len(t, len(allDemos), 5)
}
Loading

0 comments on commit 7c5d3d0

Please sign in to comment.