Skip to content

Commit

Permalink
cleanup queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mostlikelee committed Sep 26, 2024
1 parent 3ca1f5b commit 93b2069
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 24 deletions.
74 changes: 52 additions & 22 deletions server/datastore/mysql/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,35 +539,65 @@ func (ds *Datastore) CleanupActivitiesAndAssociatedData(ctx context.Context, max
// `activities` and `queries` are not tied because the activity itself holds
// the query SQL so they don't need to be executed on the same transaction.
//
if err := ds.withTx(ctx, func(tx sqlx.ExtContext) error {
// Delete temporary queries (aka "not saved").
if _, err := tx.ExecContext(ctx,
`DELETE FROM queries
// All expired live queries are deleted in batch sizes of `maxCount` to ensure
// the table size is kept in check with high volumes of live queries (zero-trust workflows).
// This differs from the `activities` cleanup which uses maxCount as a limit to
// the number of activities to delete.
//
for {
if ctx.Err() != nil {
return ctx.Err()
}

var rowsAffected int64

// Start a new transaction for each batch of deletions.
err := ds.withTx(ctx, func(tx sqlx.ExtContext) error {
// Delete expired live queries (aka "not saved")
result, err := tx.ExecContext(ctx,
`DELETE FROM queries
WHERE NOT saved AND created_at < DATE_SUB(NOW(), INTERVAL ? DAY)
LIMIT ?`,
expiredWindowDays, maxCount,
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired non-saved queries")
}
// Delete distributed campaigns that reference unexisting query (removed in the previous query).
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaigns FROM distributed_query_campaigns
expiredWindowDays, maxCount,
)
if err != nil {
return ctxerr.Wrap(ctx, err, "delete expired non-saved queries")
}

rowsAffected, err = result.RowsAffected()
if err != nil {
return ctxerr.Wrap(ctx, err, "retrieving rows affected from delete query")
}

// Cleanup orphaned distributed campaigns that reference non-existing queries.
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaigns FROM distributed_query_campaigns
LEFT JOIN queries ON (distributed_query_campaigns.query_id=queries.id)
WHERE queries.id IS NULL`,
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaigns")
}
// Delete distributed campaign targets that reference unexisting distributed campaign (removed in the previous query).
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaign_targets FROM distributed_query_campaign_targets
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaigns")
}

// Cleanup orphaned distributed campaign targets that reference non-existing distributed campaigns.
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaign_targets FROM distributed_query_campaign_targets
LEFT JOIN distributed_query_campaigns ON (distributed_query_campaign_targets.distributed_query_campaign_id=distributed_query_campaigns.id)
WHERE distributed_query_campaigns.id IS NULL`,
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaign_targets")
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaign_targets")
}

return nil
})
if err != nil {
return ctxerr.Wrap(ctx, err, "delete expired queries in batch")
}

// Break the loop if no rows were deleted in the current batch.
if rowsAffected == 0 {
break
}
return nil
}); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired distributed queries")
}

return nil
}
4 changes: 2 additions & 2 deletions server/datastore/mysql/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ func testCleanupActivitiesAndAssociatedDataBatch(t *testing.T, ds *Datastore) {
ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error {
return sqlx.GetContext(ctx, q, &queriesLen, `SELECT COUNT(*) FROM queries WHERE NOT saved;`)
})
require.Equal(t, 1000, queriesLen)
require.Equal(t, 250, queriesLen) // All expired queries should be cleaned up.

err = ds.CleanupActivitiesAndAssociatedData(ctx, maxCount, 1)
require.NoError(t, err)
Expand All @@ -1002,7 +1002,7 @@ func testCleanupActivitiesAndAssociatedDataBatch(t *testing.T, ds *Datastore) {
ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error {
return sqlx.GetContext(ctx, q, &queriesLen, `SELECT COUNT(*) FROM queries WHERE NOT saved;`)
})
require.Equal(t, 500, queriesLen)
require.Equal(t, 250, queriesLen)

err = ds.CleanupActivitiesAndAssociatedData(ctx, maxCount, 1)
require.NoError(t, err)
Expand Down

0 comments on commit 93b2069

Please sign in to comment.