From afb2fb07f1a36b63b7e6462d682e66ea7135b374 Mon Sep 17 00:00:00 2001 From: balaji Date: Thu, 18 Apr 2024 09:17:27 +0530 Subject: [PATCH 1/4] use bloom filter on get calls --- table.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/table.go b/table.go index dd0f4c4..7064c09 100644 --- a/table.go +++ b/table.go @@ -895,14 +895,11 @@ func (t *_table[T]) Get(ctx context.Context, sel Selector[T], optBatch ...Batch) err := batched[T](selPoints, t.scanPrefetchSize, func(selPoints []T) error { keys := t.keysExternal(selPoints, keyArray) - order := t.sortKeys(keys) - values, err := t.get(keys, batch, valueArray, false) if err != nil { return err } - t.reorderValues(values, order) for _, value := range values { if len(value) == 0 { trs = append(trs, t.valueNil) @@ -958,6 +955,15 @@ func (t *_table[T]) get(keys [][]byte, batch Batch, values [][]byte, errorOnNotE defer iter.Close() for i := 0; i < len(keys); i++ { + if t.filter != nil && !t.filter.MayContain(context.Background(), keys[i]) { + if errorOnNotExist { + return nil, ErrNotFound + } else { + values[i] = values[i][:0] + continue + } + } + if !iter.SeekGE(keys[i]) || !bytes.Equal(iter.Key(), keys[i]) { if errorOnNotExist { return nil, ErrNotFound From c326b8d33bea2b18b600232e3ea415648272eb4f Mon Sep 17 00:00:00 2001 From: balaji Date: Thu, 18 Apr 2024 15:17:44 +0530 Subject: [PATCH 2/4] add unsafe insert --- table_unsafe.go | 100 +++++++++++++++++++++++++++++++++++++++++++ table_unsafe_test.go | 48 +++++++++++++++++++++ 2 files changed, 148 insertions(+) diff --git a/table_unsafe.go b/table_unsafe.go index 8a2f8fa..e6ba916 100644 --- a/table_unsafe.go +++ b/table_unsafe.go @@ -101,3 +101,103 @@ func (t *_table[T]) UnsafeUpdate(ctx context.Context, trs []T, oldTrs []T, optBa return nil } + +// TableUnsafeInserter provides access to UnsafeInsert method that allows to insert +// records wihout checking if they already exist in the database. + +// Warning: The indices of the records won't be updated properly if the records already exist. +type TableUnsafeInserter[T any] interface { + UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error +} + +func (t *_table[T]) UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error { + t.mutex.RLock() + indexes := make(map[IndexID]*Index[T]) + maps.Copy(indexes, t.secondaryIndexes) + t.mutex.RUnlock() + + var ( + batch Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil + ) + + if externalBatch { + batch = optBatch[0] + } else { + batch = t.db.Batch(BatchTypeWriteOnly) + defer batch.Close() + } + + var ( + indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] + ) + defer t.db.getKeyBufferPool().Put(indexKeyBuffer[:0]) + + // key buffers + keysBuffer := t.db.getKeyArray(minInt(len(trs), persistentBatchSize)) + defer t.db.putKeyArray(keysBuffer) + + // value + value := t.db.getValueBufferPool().Get()[:0] + valueBuffer := bytes.NewBuffer(value) + defer t.db.getValueBufferPool().Put(value[:0]) + + // serializer + var serialize = t.serializer.Serializer.Serialize + if sw, ok := t.serializer.Serializer.(SerializerWithBuffer[any]); ok { + serialize = sw.SerializeFuncWithBuffer(valueBuffer) + } + + err := batched[T](trs, persistentBatchSize, func(trs []T) error { + // keys + keys := t.keysExternal(trs, keysBuffer) + + // process rows + for i, key := range keys { + select { + case <-ctx.Done(): + return fmt.Errorf("context done: %w", ctx.Err()) + default: + } + + tr := trs[i] + // serialize + data, err := serialize(&tr) + if err != nil { + return err + } + + err = batch.Set(key, data, Sync) + if err != nil { + return err + } + + // index keys + for _, idx := range indexes { + err = idx.OnInsert(t, tr, batch, indexKeyBuffer[:0]) + if err != nil { + return err + } + } + + // add to bloom filter + if t.filter != nil { + t.filter.Add(ctx, key) + } + } + + return nil + }) + if err != nil { + return err + } + + if !externalBatch { + err = batch.Commit(Sync) + if err != nil { + return err + } + } + + return nil +} diff --git a/table_unsafe_test.go b/table_unsafe_test.go index 8b52cbf..ecef5b6 100644 --- a/table_unsafe_test.go +++ b/table_unsafe_test.go @@ -89,3 +89,51 @@ func TestBondTable_UnsafeUpdate(t *testing.T) { _ = it.Close() } + +func TestUnsafeInsert(t *testing.T) { + db := setupDatabase() + defer tearDownDatabase(db) + + const ( + TokenBalanceTableID = TableID(1) + ) + + tokenBalanceTable := NewTable[*TokenBalance](TableOptions[*TokenBalance]{ + DB: db, + TableID: TokenBalanceTableID, + TableName: "token_balance", + TablePrimaryKeyFunc: func(builder KeyBuilder, tb *TokenBalance) []byte { + return builder.AddUint64Field(tb.ID).Bytes() + }, + }) + + tokenBalanceAccount := &TokenBalance{ + ID: 1, + AccountID: 1, + ContractAddress: "0xtestContract", + AccountAddress: "0xtestAccount", + Balance: 5, + } + + tableUnsafeInserter := tokenBalanceTable.(TableUnsafeInserter[*TokenBalance]) + err := tableUnsafeInserter.UnsafeInsert(context.Background(), []*TokenBalance{tokenBalanceAccount}) + require.NoError(t, err) + + it, err := db.Backend().NewIter(&pebble.IterOptions{ + LowerBound: []byte{byte(TokenBalanceTableID)}, + UpperBound: []byte{byte(TokenBalanceTableID + 1)}, + }) + require.NoError(t, err) + + for it.First(); it.Valid(); it.Next() { + rawData := it.Value() + + var tokenBalanceAccount1FromDB TokenBalance + err = db.Serializer().Deserialize(rawData, &tokenBalanceAccount1FromDB) + require.NoError(t, err) + assert.Equal(t, tokenBalanceAccount, &tokenBalanceAccount1FromDB) + } + + _ = it.Close() + +} From f9a080f30dcaaa8c23948bd2c32fc5c5e5561b0d Mon Sep 17 00:00:00 2001 From: balaji Date: Thu, 18 Apr 2024 15:18:36 +0530 Subject: [PATCH 3/4] minor --- table_unsafe_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table_unsafe_test.go b/table_unsafe_test.go index ecef5b6..0025608 100644 --- a/table_unsafe_test.go +++ b/table_unsafe_test.go @@ -90,7 +90,7 @@ func TestBondTable_UnsafeUpdate(t *testing.T) { _ = it.Close() } -func TestUnsafeInsert(t *testing.T) { +func TestBondTable_UnsafeInsert(t *testing.T) { db := setupDatabase() defer tearDownDatabase(db) From b95f6f4d3bede403e59c82697d8b5f6d582c4e2c Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Thu, 18 Apr 2024 12:54:36 +0200 Subject: [PATCH 4/4] Table.Get points always in single batch --- table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table.go b/table.go index 7064c09..2a25b0a 100644 --- a/table.go +++ b/table.go @@ -892,7 +892,7 @@ func (t *_table[T]) Get(ctx context.Context, sel Selector[T], optBatch ...Batch) defer t.db.putKeyArray(valueArray) var trs []T - err := batched[T](selPoints, t.scanPrefetchSize, func(selPoints []T) error { + err := batched[T](selPoints, len(selPoints), func(selPoints []T) error { keys := t.keysExternal(selPoints, keyArray) values, err := t.get(keys, batch, valueArray, false)