From b603742a2cec5ca4e8afadbd19bdf0082a2f38e4 Mon Sep 17 00:00:00 2001 From: ClaytonNorthey92 Date: Thu, 5 Sep 2024 19:27:54 -0400 Subject: [PATCH] add new index, trim old queue messages (#235) added a new index to the db trimming old queue messages that are no longer of value (anything created over an hour ago) --- database/bfgd/database.go | 1 + database/bfgd/database_ext_test.go | 76 ++++++++++++++++++++++++++++++ database/bfgd/postgres/postgres.go | 18 ++++++- database/bfgd/scripts/0010.sql | 12 +++++ service/bfg/bfg.go | 16 +++++++ 5 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 database/bfgd/scripts/0010.sql diff --git a/database/bfgd/database.go b/database/bfgd/database.go index c805b51b..11df3992 100644 --- a/database/bfgd/database.go +++ b/database/bfgd/database.go @@ -46,6 +46,7 @@ type Database interface { BtcTransactionBroadcastRequestGetNext(ctx context.Context, onlyNew bool) ([]byte, error) BtcTransactionBroadcastRequestConfirmBroadcast(ctx context.Context, txId string) error BtcTransactionBroadcastRequestSetLastError(ctx context.Context, txId string, lastErr string) error + BtcTransactionBroadcastRequestTrim(ctx context.Context) error } // NotificationName identifies a database notification type. diff --git a/database/bfgd/database_ext_test.go b/database/bfgd/database_ext_test.go index 86bf47f5..bf3d1fcd 100644 --- a/database/bfgd/database_ext_test.go +++ b/database/bfgd/database_ext_test.go @@ -2138,6 +2138,82 @@ func TestBtcTransactionBroadcastRequestConfirmBroadcast(t *testing.T) { } } +func BtcTransactionBroadcastRequestTrimTooNew(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + _, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET created_at = NOW() - INTERVAL '59 minutes'") + if err != nil { + t.Fatal(err) + } + + if err := db.BtcTransactionBroadcastRequestTrim(ctx); err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if savedSerializedTx == nil { + t.Fatal("expected a saved tx") + } +} + +func BtcTransactionBroadcastRequestTrim(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + serializedTx := []byte("blahblahblah") + txId := "myid" + + err := db.BtcTransactionBroadcastRequestInsert(ctx, serializedTx, txId) + if err != nil { + t.Fatal(err) + } + + _, err = sdb.ExecContext(ctx, "UPDATE btc_transaction_broadcast_request SET created_at = NOW() - INTERVAL '61 minutes'") + if err != nil { + t.Fatal(err) + } + + if err := db.BtcTransactionBroadcastRequestTrim(ctx); err != nil { + t.Fatal(err) + } + + savedSerializedTx, err := db.BtcTransactionBroadcastRequestGetNext(ctx, true) + if err != nil { + t.Fatal(err) + } + + if savedSerializedTx != nil { + t.Fatal("tx should have been trimmed") + } +} + func createBtcBlock(ctx context.Context, t *testing.T, db bfgd.Database, count int, chain bool, height int, lastHash []byte, l2BlockNumber uint32) bfgd.BtcBlock { header := make([]byte, 80) hash := make([]byte, 32) diff --git a/database/bfgd/postgres/postgres.go b/database/bfgd/postgres/postgres.go index e10f77cd..f3c3ffcd 100644 --- a/database/bfgd/postgres/postgres.go +++ b/database/bfgd/postgres/postgres.go @@ -20,7 +20,7 @@ import ( ) const ( - bfgdVersion = 9 + bfgdVersion = 10 logLevel = "INFO" verbose = false @@ -1151,3 +1151,19 @@ func (p *pgdb) BtcTransactionBroadcastRequestSetLastError(ctx context.Context, t return nil } + +func (p *pgdb) BtcTransactionBroadcastRequestTrim(ctx context.Context) error { + log.Tracef("BtcTransactionBroadcastRequestSetLastError") + defer log.Tracef("BtcTransactionBroadcastRequestSetLastError exit") + + const querySql = ` + DELETE FROM btc_transaction_broadcast_request + WHERE created_at < NOW() - INTERVAL '1 hour' + ` + _, err := p.db.ExecContext(ctx, querySql) + if err != nil { + return fmt.Errorf("could not trim broadcast: %v", err) + } + + return nil +} diff --git a/database/bfgd/scripts/0010.sql b/database/bfgd/scripts/0010.sql new file mode 100644 index 00000000..6f435acc --- /dev/null +++ b/database/bfgd/scripts/0010.sql @@ -0,0 +1,12 @@ +-- Copyright (c) 2024 Hemi Labs, Inc. +-- Use of this source code is governed by the MIT License, +-- which can be found in the LICENSE file. + +BEGIN; + +UPDATE version SET version = 10; + +CREATE INDEX IF NOT EXISTS btc_transaction_broadcast_request_created_at_retry_desc + ON btc_transaction_broadcast_request (last_broadcast_attempt_at, created_at DESC) WHERE next_broadcast_attempt_at IS NOT NULL AND broadcast_at IS NULL; + +COMMIT; diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index 02744494..1ceed993 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -1647,6 +1647,22 @@ func (s *Server) Run(pctx context.Context) error { } }() + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Minute): + if err := s.db.BtcTransactionBroadcastRequestTrim(ctx); err != nil { + log.Errorf("error trimming old requests: %v", err) + } + + } + } + }() + // Setup websockets and HTTP routes privateMux := s.server publicMux := s.publicServer