diff --git a/server/datastore/mysql/activities.go b/server/datastore/mysql/activities.go index 650c097cb985..ccaeb4f16ae0 100644 --- a/server/datastore/mysql/activities.go +++ b/server/datastore/mysql/activities.go @@ -539,35 +539,65 @@ func (ds *Datastore) CleanupActivitiesAndAssociatedData(ctx context.Context, max // `activities` and `queries` are not tied because the activity itself holds // the query SQL so they don't need to be executed on the same transaction. // - if err := ds.withTx(ctx, func(tx sqlx.ExtContext) error { - // Delete temporary queries (aka "not saved"). - if _, err := tx.ExecContext(ctx, - `DELETE FROM queries + // All expired live queries are deleted in batch sizes of `maxCount` to ensure + // the table size is kept in check with high volumes of live queries (zero-trust workflows). + // This differs from the `activities` cleanup which uses maxCount as a limit to + // the number of activities to delete. + // + for { + if ctx.Err() != nil { + return ctx.Err() + } + + var rowsAffected int64 + + // Start a new transaction for each batch of deletions. + err := ds.withTx(ctx, func(tx sqlx.ExtContext) error { + // Delete expired live queries (aka "not saved") + result, err := tx.ExecContext(ctx, + `DELETE FROM queries WHERE NOT saved AND created_at < DATE_SUB(NOW(), INTERVAL ? DAY) LIMIT ?`, - expiredWindowDays, maxCount, - ); err != nil { - return ctxerr.Wrap(ctx, err, "delete expired non-saved queries") - } - // Delete distributed campaigns that reference unexisting query (removed in the previous query). - if _, err := tx.ExecContext(ctx, - `DELETE distributed_query_campaigns FROM distributed_query_campaigns + expiredWindowDays, maxCount, + ) + if err != nil { + return ctxerr.Wrap(ctx, err, "delete expired non-saved queries") + } + + rowsAffected, err = result.RowsAffected() + if err != nil { + return ctxerr.Wrap(ctx, err, "retrieving rows affected from delete query") + } + + // Cleanup orphaned distributed campaigns that reference non-existing queries. + if _, err := tx.ExecContext(ctx, + `DELETE distributed_query_campaigns FROM distributed_query_campaigns LEFT JOIN queries ON (distributed_query_campaigns.query_id=queries.id) WHERE queries.id IS NULL`, - ); err != nil { - return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaigns") - } - // Delete distributed campaign targets that reference unexisting distributed campaign (removed in the previous query). - if _, err := tx.ExecContext(ctx, - `DELETE distributed_query_campaign_targets FROM distributed_query_campaign_targets + ); err != nil { + return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaigns") + } + + // Cleanup orphaned distributed campaign targets that reference non-existing distributed campaigns. + if _, err := tx.ExecContext(ctx, + `DELETE distributed_query_campaign_targets FROM distributed_query_campaign_targets LEFT JOIN distributed_query_campaigns ON (distributed_query_campaign_targets.distributed_query_campaign_id=distributed_query_campaigns.id) WHERE distributed_query_campaigns.id IS NULL`, - ); err != nil { - return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaign_targets") + ); err != nil { + return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaign_targets") + } + + return nil + }) + if err != nil { + return ctxerr.Wrap(ctx, err, "delete expired queries in batch") + } + + // Break the loop if no rows were deleted in the current batch. + if rowsAffected == 0 { + break } - return nil - }); err != nil { - return ctxerr.Wrap(ctx, err, "delete expired distributed queries") } + return nil } diff --git a/server/datastore/mysql/activities_test.go b/server/datastore/mysql/activities_test.go index 17bd72e6ee8c..669680127633 100644 --- a/server/datastore/mysql/activities_test.go +++ b/server/datastore/mysql/activities_test.go @@ -991,7 +991,7 @@ func testCleanupActivitiesAndAssociatedDataBatch(t *testing.T, ds *Datastore) { ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error { return sqlx.GetContext(ctx, q, &queriesLen, `SELECT COUNT(*) FROM queries WHERE NOT saved;`) }) - require.Equal(t, 1000, queriesLen) + require.Equal(t, 250, queriesLen) // All expired queries should be cleaned up. err = ds.CleanupActivitiesAndAssociatedData(ctx, maxCount, 1) require.NoError(t, err) @@ -1002,7 +1002,7 @@ func testCleanupActivitiesAndAssociatedDataBatch(t *testing.T, ds *Datastore) { ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error { return sqlx.GetContext(ctx, q, &queriesLen, `SELECT COUNT(*) FROM queries WHERE NOT saved;`) }) - require.Equal(t, 500, queriesLen) + require.Equal(t, 250, queriesLen) err = ds.CleanupActivitiesAndAssociatedData(ctx, maxCount, 1) require.NoError(t, err)