Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use bloom filter on get calls #135

Merged
merged 4 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,17 +892,14 @@ func (t *_table[T]) Get(ctx context.Context, sel Selector[T], optBatch ...Batch)
defer t.db.putKeyArray(valueArray)

var trs []T
err := batched[T](selPoints, t.scanPrefetchSize, func(selPoints []T) error {
err := batched[T](selPoints, len(selPoints), func(selPoints []T) error {
keys := t.keysExternal(selPoints, keyArray)

order := t.sortKeys(keys)

values, err := t.get(keys, batch, valueArray, false)
if err != nil {
return err
}

t.reorderValues(values, order)
for _, value := range values {
if len(value) == 0 {
trs = append(trs, t.valueNil)
Expand Down Expand Up @@ -958,6 +955,15 @@ func (t *_table[T]) get(keys [][]byte, batch Batch, values [][]byte, errorOnNotE
defer iter.Close()

for i := 0; i < len(keys); i++ {
if t.filter != nil && !t.filter.MayContain(context.Background(), keys[i]) {
if errorOnNotExist {
return nil, ErrNotFound
} else {
values[i] = values[i][:0]
continue
}
}

if !iter.SeekGE(keys[i]) || !bytes.Equal(iter.Key(), keys[i]) {
if errorOnNotExist {
return nil, ErrNotFound
Expand Down
100 changes: 100 additions & 0 deletions table_unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,103 @@ func (t *_table[T]) UnsafeUpdate(ctx context.Context, trs []T, oldTrs []T, optBa

return nil
}

// TableUnsafeInserter provides access to UnsafeInsert method that allows to insert
// records wihout checking if they already exist in the database.

// Warning: The indices of the records won't be updated properly if the records already exist.
type TableUnsafeInserter[T any] interface {
UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error
}

func (t *_table[T]) UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error {
t.mutex.RLock()
indexes := make(map[IndexID]*Index[T])
maps.Copy(indexes, t.secondaryIndexes)
t.mutex.RUnlock()

var (
batch Batch
externalBatch = len(optBatch) > 0 && optBatch[0] != nil
)

if externalBatch {
batch = optBatch[0]
} else {
batch = t.db.Batch(BatchTypeWriteOnly)
defer batch.Close()
}

var (
indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0]
)
defer t.db.getKeyBufferPool().Put(indexKeyBuffer[:0])

// key buffers
keysBuffer := t.db.getKeyArray(minInt(len(trs), persistentBatchSize))
defer t.db.putKeyArray(keysBuffer)

// value
value := t.db.getValueBufferPool().Get()[:0]
valueBuffer := bytes.NewBuffer(value)
defer t.db.getValueBufferPool().Put(value[:0])

// serializer
var serialize = t.serializer.Serializer.Serialize
if sw, ok := t.serializer.Serializer.(SerializerWithBuffer[any]); ok {
serialize = sw.SerializeFuncWithBuffer(valueBuffer)
}

err := batched[T](trs, persistentBatchSize, func(trs []T) error {
// keys
keys := t.keysExternal(trs, keysBuffer)

// process rows
for i, key := range keys {
select {
case <-ctx.Done():
return fmt.Errorf("context done: %w", ctx.Err())
default:
}

tr := trs[i]
// serialize
data, err := serialize(&tr)
if err != nil {
return err
}

err = batch.Set(key, data, Sync)
if err != nil {
return err
}

// index keys
for _, idx := range indexes {
err = idx.OnInsert(t, tr, batch, indexKeyBuffer[:0])
if err != nil {
return err
}
}

// add to bloom filter
if t.filter != nil {
t.filter.Add(ctx, key)
}
}

return nil
})
if err != nil {
return err
}

if !externalBatch {
err = batch.Commit(Sync)
if err != nil {
return err
}
}

return nil
}
48 changes: 48 additions & 0 deletions table_unsafe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,51 @@ func TestBondTable_UnsafeUpdate(t *testing.T) {

_ = it.Close()
}

func TestBondTable_UnsafeInsert(t *testing.T) {
db := setupDatabase()
defer tearDownDatabase(db)

const (
TokenBalanceTableID = TableID(1)
)

tokenBalanceTable := NewTable[*TokenBalance](TableOptions[*TokenBalance]{
DB: db,
TableID: TokenBalanceTableID,
TableName: "token_balance",
TablePrimaryKeyFunc: func(builder KeyBuilder, tb *TokenBalance) []byte {
return builder.AddUint64Field(tb.ID).Bytes()
},
})

tokenBalanceAccount := &TokenBalance{
ID: 1,
AccountID: 1,
ContractAddress: "0xtestContract",
AccountAddress: "0xtestAccount",
Balance: 5,
}

tableUnsafeInserter := tokenBalanceTable.(TableUnsafeInserter[*TokenBalance])
err := tableUnsafeInserter.UnsafeInsert(context.Background(), []*TokenBalance{tokenBalanceAccount})
require.NoError(t, err)

it, err := db.Backend().NewIter(&pebble.IterOptions{
LowerBound: []byte{byte(TokenBalanceTableID)},
UpperBound: []byte{byte(TokenBalanceTableID + 1)},
})
require.NoError(t, err)

for it.First(); it.Valid(); it.Next() {
rawData := it.Value()

var tokenBalanceAccount1FromDB TokenBalance
err = db.Serializer().Deserialize(rawData, &tokenBalanceAccount1FromDB)
require.NoError(t, err)
assert.Equal(t, tokenBalanceAccount, &tokenBalanceAccount1FromDB)
}

_ = it.Close()

}
Loading